Watch Google Bucket with Elixir: Google Cloud Storage + PubSub + Elixir Broadway

Pavel Tsiukhtsiayeu
5 min readJan 7, 2021
Minsk, Belarus. August 2019

It always makes me smile when a month later I look back at a moment when I did not know how to tackle a problem, and now we have a solid solution for it. Since we are a startup, our most sensitive resource is time: no time for detailed investigation, no time for huge build-outs, no time for mistakes!

At Compass Beauty we do skin measurements with medical devices that run proprietary software mostly on a Windows-based operating system. So here we are: devices are set up, Machine Learning is actively developed. How do we get the data (mostly binary files) off the devices to Machine Learning pipeline? What the trigger will be and how do we check upload completeness in case a single measurement produces multiple files?

The collective mind of our dev team came up with a solution that will blow your mind: Google Cloud Storage + Google PubSub + Elixir Broadway. Simple and elegant. But the devil is in details. Let me take you one step down how that works.

Devices offload their data to a local server in a private network, local server constantly syncing it’s data to the Google Cloud Storage. Cloud Storage in turn has a notification that publishes a message to a certain topic in Google PubSub. The topic has a subscription that allows Elixir Broadway to ingest messages and catch up if for any reason some message was missed. Now that Elixir has a message, it feels cozy, like grandma’s home. From here I can do anything it takes to assign uploaded files to a client session, check for upload completeness, check measurements progress, and report to UI via Phoenix framework.

In this article, I will focus on Google Cloud setup and the bare minimum setup of Elixir Broadway.

Google Project Setup

Without much further talking, let me quickly set up a Google Cloud Project that will serve as a guinea pig.

I will use the gcloud command-line tool to set everything up in Google Cloud.

gcloud projects create gcs-pubsub-1gcloud config set project gcs-pubsub-1gcloud alpha billing projects link gcs-pubsub-1 \
--billing-account XXXXXX-XXXXXX-XXXXXX
gcloud services enable pubsub

Here I’ve created a new project “gcs-pubsub-1”, configured gcloud util to use a newly created project, linked billing account, and enabled PubSub service.

To find out your billing account id use gcloud alpha billing accounts list command.

The next step is to create a PubSub topic storage-topic and a subscription to that topic api-subscription. The topic is a named resource to which messages are sent by publishers (in my case, messages to the topic will be sent by Cloud Storage). The subscription is a named resource representing the stream of messages from a single, specific topic, to be delivered to the subscribing application (Elixir Broadway).

gcloud pubsub topics create storage-topicgcloud pubsub subscriptions create api-subscription \
--topic storage-topic \
--expiration-period never \
--ack-deadline 10
PubSub set up

Now I will create a bucket named “gcs-pubsub-1-bucket” and a notification config on a bucket, establishing a flow of event notifications from Cloud Storage to a Cloud Pub/Sub topic.

Note, I’ve used gsutil the command-line tool now.

gsutil mb gs://gcs-pubsub-1-bucketgsutil notification create -t storage-topic -f json gs://gcs-pubsub-1-bucket
Create a bucket and notification config

It’s time to create a service account, that will be used to access the PubSub subscription, set up IAM policy binding, and create and download the account key in JSON format. I will grant the newly created account the role of the PubSub subscriber (examine the available roles). The service account name will be backend-api :

gcloud iam service-accounts create backend-apigcloud projects add-iam-policy-binding gcs-pubsub-1 \
--member \
--role roles/pubsub.subscriber
gcloud iam service-accounts keys create ./creds.json \
The service account and a privacy policy
Create and download account key

The account key creds.json in JSON format will be used to access a topic subscription from Elixir Broadway.

Enough of gcloud, time for Elixir!

Elixir Broadway

Now it’s time to set up the Elixir pipeline to ingest messages from a PubSub subscription I’ve created earlier. I will create a demo project named “google_storage” with a supervision tree (note the --sup option) and copy the account key creds.json file to a priv/ folder within the project.

mix new google_storage --supmkdir google_storage/privmv creds.json google_storage/priv/cd google_storage/
Google storage project created

There is a BroadwayCloudPubSub library, which includes PubSub producer. I will add broadway_cloud_pub_sub and goth project dependencies, as described in README. Then configure the goth to use credentials from priv/creds.json the file and set up :producer_module configuration parameter. And finally, install dependencies with mix deps.get

mkdir configtouch config/config.exs# Update config/config.exsmix deps.get

The pipeline itself is quite simple: I will write GoogleStorage.Pipeline module which implements Broadway behaviour together with pipeline configuration.

As you can see, handle_message callback function decodes PubSub message payload and logs a message. This callback function will be run in each of 10 processor processes that Broadway will start, accordingly to configuration — default: [concurrency: 10]

On success, the acknowledger will acknowledge Google PubSub that message has been successfully received, and it will be removed from the Google PubSub subscription queue.

On failure, the PubSub message will be ignored, Google PubSub will not receive a message acknowledgment within it’s ack-deadline of 10 seconds, and it will reattempt delivery on the next pull.

The very last part is to add GoogleStorage.Pipeline to the application’s supervision tree in lib/google_storage/application.ex

Demo time! Start iex session in one terminal with iex -S mix and upload file to a bucket in another gsutil cp ./ gs://gcs-pubsub-1-bucket/ You should see storage notification logs in iex session.

PubSub messages coming in

The list of all possible storage notification event types can be found here:

This is it. Now I have an entry point for Cloud Storage events.