Watch Google Bucket with Elixir: Google Cloud Storage + PubSub + Elixir Broadway
It always makes me smile when a month later I look back at a moment when I did not know how to tackle a problem, and now we have a solid solution for it. Since we are a startup, our most sensitive resource is time: no time for detailed investigation, no time for huge build-outs, no time for mistakes!
At Compass Beauty we do skin measurements with medical devices that run proprietary software mostly on a Windows-based operating system. So here we are: devices are set up, Machine Learning is actively developed. How do we get the data (mostly binary files) off the devices to Machine Learning pipeline? What the trigger will be and how do we check upload completeness in case a single measurement produces multiple files?
The collective mind of our dev team came up with a solution that will blow your mind: Google Cloud Storage + Google PubSub + Elixir Broadway. Simple and elegant. But the devil is in details. Let me take you one step down how that works.
Devices offload their data to a local server in a private network, local server constantly syncing it’s data to the Google Cloud Storage. Cloud Storage in turn has a notification that publishes a message to a certain topic in Google PubSub. The topic has a subscription that allows Elixir Broadway to ingest messages and catch up if for any reason some message was missed. Now that Elixir has a message, it feels cozy, like grandma’s home. From here I can do anything it takes to assign uploaded files to a client session, check for upload completeness, check measurements progress, and report to UI via Phoenix framework.
In this article, I will focus on Google Cloud setup and the bare minimum setup of Elixir Broadway.
Google Project Setup
Without much further talking, let me quickly set up a Google Cloud Project that will serve as a guinea pig.
I will use the gcloud command-line tool to set everything up in Google Cloud.
gcloud projects create gcs-pubsub-1gcloud config set project gcs-pubsub-1gcloud alpha billing projects link gcs-pubsub-1 \
--billing-account XXXXXX-XXXXXX-XXXXXXgcloud services enable pubsub
Here I’ve created a new project “gcs-pubsub-1”, configured gcloud
util to use a newly created project, linked billing account, and enabled PubSub service.
To find out your billing account id use
gcloud alpha billing accounts list
command.
The next step is to create a PubSub topic storage-topic
and a subscription to that topic api-subscription
. The topic is a named resource to which messages are sent by publishers (in my case, messages to the topic will be sent by Cloud Storage). The subscription is a named resource representing the stream of messages from a single, specific topic, to be delivered to the subscribing application (Elixir Broadway).
gcloud pubsub topics create storage-topicgcloud pubsub subscriptions create api-subscription \
--topic storage-topic \
--expiration-period never \
--ack-deadline 10
Now I will create a bucket named “gcs-pubsub-1-bucket” and a notification config on a bucket, establishing a flow of event notifications from Cloud Storage to a Cloud Pub/Sub topic.
Note, I’ve used
gsutil
the command-line tool now.
gsutil mb gs://gcs-pubsub-1-bucketgsutil notification create -t storage-topic -f json gs://gcs-pubsub-1-bucket
It’s time to create a service account, that will be used to access the PubSub subscription, set up IAM policy binding, and create and download the account key in JSON format. I will grant the newly created account the role of the PubSub subscriber (examine the available roles). The service account name will be backend-api
:
gcloud iam service-accounts create backend-apigcloud projects add-iam-policy-binding gcs-pubsub-1 \
--member serviceAccount:backend-api@gcs-pubsub-1.iam.gserviceaccount.com \
--role roles/pubsub.subscribergcloud iam service-accounts keys create ./creds.json \
--iam-account=backend-api@gcs-pubsub-1.iam.gserviceaccount.com
The account key
creds.json
in JSON format will be used to access a topic subscription from Elixir Broadway.
Enough of gcloud
, time for Elixir!
Elixir Broadway
Now it’s time to set up the Elixir pipeline to ingest messages from a PubSub subscription I’ve created earlier. I will create a demo project named “google_storage” with a supervision tree (note the --sup
option) and copy the account key creds.json
file to a priv/
folder within the project.
mix new google_storage --supmkdir google_storage/privmv creds.json google_storage/priv/cd google_storage/
There is a BroadwayCloudPubSub library, which includes PubSub producer. I will add broadway_cloud_pub_sub
and goth
project dependencies, as described in README. Then configure the goth
to use credentials from priv/creds.json
the file and set up :producer_module
configuration parameter. And finally, install dependencies with mix deps.get
mkdir configtouch config/config.exs# Update config/config.exsmix deps.get
The pipeline itself is quite simple: I will write GoogleStorage.Pipeline
module which implements Broadway
behaviour together with pipeline configuration.
As you can see, handle_message
callback function decodes PubSub message payload and logs a message. This callback function will be run in each of 10 processor processes that Broadway will start, accordingly to configuration — default: [concurrency: 10]
On success, the acknowledger will acknowledge Google PubSub that message has been successfully received, and it will be removed from the Google PubSub subscription queue.
On failure, the PubSub message will be ignored, Google PubSub will not receive a message acknowledgment within it’s ack-deadline
of 10 seconds, and it will reattempt delivery on the next pull.
The very last part is to add GoogleStorage.Pipeline
to the application’s supervision tree in lib/google_storage/application.ex
Demo time! Start iex session in one terminal with iex -S mix
and upload file to a bucket in another gsutil cp ./README.md gs://gcs-pubsub-1-bucket/
You should see storage notification logs in iex session.
The list of all possible storage notification event types can be found here: https://cloud.google.com/storage/docs/pubsub-notifications#events
This is it. Now I have an entry point for Cloud Storage events.