Nieuwpoort (by author)

Export data warehouses from multiple projects using Google Dataflow with advanced entity filtering.

Jakub Krajniak

This is a short continuation of the previous one shaking, where I described how to gradually export data from Datastore to BigQuery. Here, I discuss how the previous solution is extended to a situation where you have Datastors in multiple projects. The goal remains the same, we would like the data to be in BigQuery.

Overall, the problem can be expressed by the following formula

Sketch of architecture (by author)

The dataflow process can work either in one of the source projects or it can be placed in a separate project – I put the data flow process in a separate project. The results can be stored in BigQuery, which is located either in the same project as the data stream or in another project.

Let’s start with the generalization. First, I’ve expanded the configuration file with two new fields: SourceProjectIDs which is nothing more than a list of source GCP projects, and Destination it determines where the BigQuery source data set resides.

- project-a
- project-b
- project-c
ProjectID: dataflow-streaming
Dataset: datastore_dev

The extended data flow is defined as follows:

rows = (
| 'projects' >> beam.Create(project_ids)
| 'get all kinds' >> beam.ParDo(GetAllKinds(prefix_of_kinds_to_ignore))
| 'create queries' >> beam.ParDo(CreateQuery(entity_filtering))
| 'read from datastore' >> beam.ParDo(ReadFromDatastore._QueryFn())
| 'convert entities' >> beam.Map(entity_to_json)

It will be continued in one additional step projects which produces PCollection with a list of source projects (from the configuration file). A little change get all kinds a step was needed. GetAllKinds was changed to the PTransform phase, which creates a list of sets for each project (project_id, kind_name) .

processThe DoFn method accepts sets like any other object to be serialized. I saw the next step, create queries , creates queries to retrieve records kind_name who lives in Datastore in project_id .

def process(self, project_kind_name, **kwargs):
:param **kwargs:
:param project_kind_name: a tuple with project_id, kind_name
:return: [Query]
project_id, kind_name = project_kind_name

The query generated by this step already contains project_id so we no longer need to transfer the project ID.

The schema of JSON objects stored in BigQuery includes __key__ field property project in addition. The name of the BigQuery output tables is formed by the prefix kind_name with project_id.

A hint of the solution presented is in the permissions. Dataflow uses two service accounts (SAs), one used during job creation and the other used by employees for resources.

We are interested in the SA used by the employee. By default, this service account is created automatically when the Compute Engine API is enabled in your project and has a common name <project-number>

Therefore, to allow the Dataflow pipeline to access the Datastors in a different project. Therefore, add an account for each source project <project-number> the role of the project role/datastore.viewer .

IAM rights in one source project (by author)

That’s it – the tube works as expected:

Extended pipe during the project phase (according to the author)


Please enter your comment!
Please enter your name here