Tensorflow Extended, ML Metadata and Apache Beam on the Cloud

Original article was published on Deep Learning on Medium


Tensorflow Extended, ML Metadata and Apache Beam on the Cloud

A practical and self-contained example using GCP Dataflow

The fully end to end example that tensorflow extended provides by running tfx template copy taxi $target-dir produces 17 files scattered in 5 directories. If you are looking for a smaller, simpler and self contained example that actually runs on the cloud and not locally, this is what you are looking for. Cloud services setup is also mentioned here.

What’s going to be covered

We are going to generate statistics and a schema for the Chicago taxi trips csv dataset that you can find by running the tfx template copy taxi command under the data directory.

Generated artifacts such as data statistics or the schema are going to be viewed from a jupyter notebook, by connecting to the ML Metadata store or just by downloading artifacts from simple file/binary storage.

Full code sample at the bottom of the article

Services Used

The whole pipeline can run on your local machine (or on different cloud providers/your custom spark clusters as well). This is an example that can be scaled by using bigger datasets. If you wish to understand how this happens transparently, read this article.

Execution Process

  1. If running locally, code will not be serialised or sent to the cloud (of course). Otherwise, Beam is going to send everything to a staging location (typically bucket storage). Check out cloudpickle to get some intuition on how serialisation is done.
  2. Your cloud running service of choice (ours is Dataflow) is going to check if all the mentioned resources exist and are accessible (for example, pipeline output, temporary file storage, etc)
  3. Compute instances are going to be started and your pipeline is going to be executed in a distributed scenario, showing up in the job inspector while it is still running or finished.

It’s a good naming practise to use /temp or /tmp for temporary files and /staging or /binaries for the staging directory.

The TFX Pipeline

Tensorflow Extended provides it’s custom component wrappers around plain old beam components. They are a bit more federated in the form: artifacts are only produced and consumed. This means that they do not stream all the dataset everytime, they just pass around resource locator strings. Your dataset gets streamed for analysis preprocessing speed reasons and then saved in small chunks as tfrecords for maximum performance, taking full advantage of the fast storage technology of Storage Buckets.

This is why when you declare custom components, you declare strongly typed input and output channels (artifact types and names), which get mapped to multiple, tagged input-outputs on the beam side. You return these with a Dict . Feel free to look into the source of the default TFX Components for more insights on these

This is why you need to do things like:

example_gen = CsvExampleGen(...)

statistics_gen = StatisticsGen(examples=example_gen.outputs['examples'])