Watch Google Bucket with Elixir: Google Cloud Storage + PubSub + Elixir Broadway
It always makes me smile when a month later I look back at a moment when I did not know how to tackle a problem, and now we have a solid solution for it. Since we are a startup, our most sensitive resource is time: no time for detailed investigation, no time for huge build-outs, no time for mistakes!
At Compass Beauty we do skin measurements with medical devices that run proprietary software mostly on a Windows-based operating system. So here we are: devices are set up, Machine Learning is actively developed. How do we get the data (mostly binary files) off the devices to Machine Learning pipeline? What the trigger will be and how do we check upload completeness in case a single measurement produces multiple files?
The collective mind of our dev team came up with a solution that will blow your mind: Google Cloud Storage + Google PubSub + Elixir Broadway. Simple and elegant. But the devil is in details. Let me take you one step down how that works.
Devices offload their data to a local server in a private network, local server constantly syncing it’s data to the Google Cloud Storage. Cloud Storage in turn has a notification that publishes a message to a certain topic in Google PubSub. The topic has a subscription that allows Elixir Broadway to ingest messages and catch up if for any reason some message was missed. Now that Elixir has a message, it feels cozy, like grandma’s home. From here I can do anything it takes to assign uploaded files to a client session, check for upload completeness, check measurements progress, and report to UI via Phoenix framework.
In this article, I will focus on Google Cloud setup and the bare minimum setup of Elixir Broadway.
Google Project Setup
Without much further talking, let me quickly set up a Google Cloud Project that will serve as a guinea pig.
I will use the gcloud command-line tool to set everything up in Google Cloud.
gcloud projects create gcs-pubsub-1gcloud config set project gcs-pubsub-1gcloud alpha billing projects link gcs-pubsub-1 \
--billing-account XXXXXX-XXXXXX-XXXXXXgcloud services enable pubsub
Here I’ve created a new project “gcs-pubsub-1”, configured
gcloud util to use a newly created project, linked billing account, and enabled PubSub service.
To find out your billing account id use
gcloud alpha billing accounts listcommand.
The next step is to create a PubSub topic
storage-topic and a subscription to that topic
api-subscription. The topic is a named resource to which messages are sent by publishers (in my case, messages to the topic will be sent by Cloud Storage). The subscription is a named resource representing the stream of messages from a single, specific topic, to be delivered to the subscribing application (Elixir Broadway).
gcloud pubsub topics create storage-topicgcloud pubsub subscriptions create api-subscription \
--topic storage-topic \
--expiration-period never \
Now I will create a bucket named “gcs-pubsub-1-bucket” and a notification config on a bucket, establishing a flow of event notifications from Cloud Storage to a Cloud Pub/Sub topic.
Note, I’ve used
gsutilthe command-line tool now.
gsutil mb gs://gcs-pubsub-1-bucketgsutil notification create -t storage-topic -f json gs://gcs-pubsub-1-bucket
It’s time to create a service account, that will be used to access the PubSub subscription, set up IAM policy binding, and create and download the account key in JSON format. I will grant the newly created account the role of the PubSub subscriber (examine the available roles). The service account name will be
gcloud iam service-accounts create backend-apigcloud projects add-iam-policy-binding gcs-pubsub-1 \
--member serviceAccount:firstname.lastname@example.org \
--role roles/pubsub.subscribergcloud iam service-accounts keys create ./creds.json \
The account key
creds.jsonin JSON format will be used to access a topic subscription from Elixir Broadway.
gcloud, time for Elixir!
Now it’s time to set up the Elixir pipeline to ingest messages from a PubSub subscription I’ve created earlier. I will create a demo project named “google_storage” with a supervision tree (note the
--sup option) and copy the account key
creds.json file to a
priv/ folder within the project.
mix new google_storage --supmkdir google_storage/privmv creds.json google_storage/priv/cd google_storage/
There is a BroadwayCloudPubSub library, which includes PubSub producer. I will add
goth project dependencies, as described in README. Then configure the
goth to use credentials from
priv/creds.json the file and set up
:producer_module configuration parameter. And finally, install dependencies with
mkdir configtouch config/config.exs# Update config/config.exsmix deps.get
The pipeline itself is quite simple: I will write
GoogleStorage.Pipeline module which implements
Broadway behaviour together with pipeline configuration.
As you can see,
handle_message callback function decodes PubSub message payload and logs a message. This callback function will be run in each of 10 processor processes that Broadway will start, accordingly to configuration —
default: [concurrency: 10]
On success, the acknowledger will acknowledge Google PubSub that message has been successfully received, and it will be removed from the Google PubSub subscription queue.
On failure, the PubSub message will be ignored, Google PubSub will not receive a message acknowledgment within it’s
ack-deadline of 10 seconds, and it will reattempt delivery on the next pull.
The very last part is to add
GoogleStorage.Pipeline to the application’s supervision tree in
Demo time! Start iex session in one terminal with
iex -S mix and upload file to a bucket in another
gsutil cp ./README.md gs://gcs-pubsub-1-bucket/ You should see storage notification logs in iex session.
The list of all possible storage notification event types can be found here: https://cloud.google.com/storage/docs/pubsub-notifications#events
This is it. Now I have an entry point for Cloud Storage events.