Table of contents:

Google Cloud Dataflow provides a serverless infrastructure for processing batch and streaming data jobs. One of the core strengths of Dataflow is its ability to readily handle the switch from the processing of batch historical data to streaming datasets while elegantly taking into consideration the perks of streaming processing such as windowing. Dataflow is a major component for building an end-to-end ML production pipeline on GCP.

Beam Programming

Apache Beam provides a set of broad concepts to simplify the process of building a transformation pipeline for distributed batch and stream jobs.

  • A Pipeline: A Pipeline object wraps the entire operation and prescribes the transformation process by defining the input data source to the pipeline, how that data will be transformed and where the data will be written.
  • A PCollection: A PCollection is used to define a data source. The data source can either be bounded or unbounded. A bounded data source referes to batch or historical data, whereas an unbounded data source refers to streaming data.
  • A PTransform: PTransforms refers to a particular transformation task carried out on one or more PCollections in the pipeline. A number of core Beam transforms include:
    • ParDo: for parallel processing.
    • GroupByKey: for processing collections of key/value pairs.
    • CoGroupByKey: for a relational join of two or more key/value PCollections with the same key type.
    • Combine: for combining collections of elements or values in your data.
    • Flatten: for merging multiple PCollection objects.
    • Partition: splits a single PCollection into smaller collections.
  • I/O Transforms: These are PTransforms that read or write data to different external storage systems.
A Simple Linear Pipeline with Sequential Transforms.

Enable Dataflow API

(1). Go to API & Services Dashboard
(2). Click Enable API & services
(3). Search for Dataflow API

Search for Dataflow API.

(4). Enable Dataflow API

Enable Dataflow API.

Building a Simple Data Transformation Pipeline

In this example, a transformation pipeline is built to pre-process the crypto-markets.csv dataset by removing the attributes that are not relevant for data modeling, including filtering only bitcoin crypto records.

%%bash
# create bucket
gsutil mb gs://ieee-ompi-datasets
Creating gs://ieee-ompi-datasets/...
%%bash
# transfer data from Github to the bucket.
curl https://raw.githubusercontent.com/dvdbisong/IEEE-Carleton-and-OMPI-Machine-Learning-Workshop/master/data/crypto-markets/crypto-markets.csv | gsutil cp - gs://ieee-ompi-datasets/crypto-markets.csv
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
  0     0    0     0    0     0      0      0 --:--:-- --:--:-- --:--:--     0Copying from <STDIN>...
100 47.0M  100 47.0M    0     0  25.6M      0  0:00:01  0:00:01 --:--:-- 25.6M
/ [1 files][    0.0 B/    0.0 B]                                                
Operation completed over 1 objects.                                              
# install the apache beam library and other important setup packages.
# restart the session after installing apache beam.
%%bash
source activate py2env
pip install google-cloud-dataflow
pip uninstall -y google-cloud-dataflow
conda install -y pytz==2018.4
pip install apache-beam[gcp]
# import relevant libraries
import apache_beam as beam
from apache_beam.io import ReadFromText
from apache_beam.io import WriteToText
# transformation code
def run(project, source_bucket, target_bucket):
    import csv

    options = {
        'staging_location': 'gs://ieee-ompi-datasets/staging',
        'temp_location': 'gs://ieee-ompi-datasets/temp',
        'job_name': 'dataflow-crypto',
        'project': project,
        'max_num_workers': 24,
        'teardown_policy': 'TEARDOWN_ALWAYS',
        'no_save_main_session': True,
        'runner': 'DataflowRunner'
      }
    options = beam.pipeline.PipelineOptions(flags=[], **options)
    
    crypto_dataset = 'gs://{}/crypto-markets.csv'.format(source_bucket)
    processed_ds = 'gs://{}/transformed-crypto-bitcoin'.format(target_bucket)

    pipeline = beam.Pipeline(options=options)

    # 0:slug, 3:date, 5:open, 6:high, 7:low, 8:close
    rows = (
        pipeline |
            'Read from bucket' >> ReadFromText(crypto_dataset) |
            'Tokenize as csv columns' >> beam.Map(lambda line: next(csv.reader([line]))) |
            'Select columns' >> beam.Map(lambda fields: (fields[0], fields[3], fields[5], fields[6], fields[7], fields[8])) |
            'Filter bitcoin rows' >> beam.Filter(lambda row: row[0] == 'bitcoin')
        )
        
    combined = (
        rows |
            'Write to bucket' >> beam.Map(lambda (slug, date, open, high, low, close): '{},{},{},{},{},{}'.format(
                slug, date, open, high, low, close)) |
            WriteToText(
                file_path_prefix=processed_ds,
                file_name_suffix=".csv", num_shards=2,
                shard_name_template="-SS-of-NN",
                header='slug, date, open, high, low, close')
        )

    pipeline.run()
# execute transfomation
if __name__ == '__main__':
    print 'Run pipeline on the cloud'
    run(project='oceanic-sky-230504', source_bucket='ieee-ompi-datasets', target_bucket='ieee-ompi-datasets')
Run pipeline on the cloud

Open Dataflow Dashboard

Open Dataflow.

Choose Dataflow Job

Choose Dataflow Job.

Dataflow Transformation Pipeline

Dataflow transformation pipeline.

Transformed Dataset in Bucket

Transformed dataset.