Written by
Jan Eerdekens
Jan Eerdekens
Jan Eerdekens
All blog posts
Reading time 17 min
30 APR 2021

In the first installment of this series on ETL based anonymization, I covered the research part of the problem I was trying to solve in a proof of concept. I settled on using Singer as the software solution to build an ETL pipeline. In this second part I will focus on how to set up Singer, the chosen taps targets and how to chain them together. As a small side quest, I’ll also cover how to run an Oracle database locally as the database that the Singer Oracle Tap can connect to and the issues I had with it. Set up Singer The first thing we need to do to start creating the ETL pipeline is to install Singer. Singer is basically just a collection of taps and targets. These are nothing more than simple Python code that produces data streams in the Singer JSON spec format that can be piped into each other. So the only installation requirement is to have a working Python 3 installation on your system. To see if you already have Python 3 installed correctly, run the command below: python - - version Python If this works and outputs a 3.x version then you’re already done. If this doesn’t work or outputs a 2.x version, you’ll first need to install Python 3. Instructions on how to do this for your operating system can be easily found by Googling. Set up Oracle We will be using an Oracle database that we will run as a Docker container. I went through many trials to get this set up, which you can read about at the end of this post . For the time being, we’ll start from the assumption that an Oracle database is running on localhost:1521 with an SID called OraDoc . In this database, there’ll be a schema called ETL and a user called etl . In this schema we’ll have a very simple test table called TEST with the structure you see below and a couple of rows of actual test data: FIRST_NAME , LAST_NAME , PERSON_ID , PHONE Python Set up the "Tap" With a working Python 3 installation, we can now continue with the first step of our ETL pipeline: the ingestion of Oracle database data by the Singer Oracle Tap . Even though the taps and targets are just Python code, the Oracle Tap uses the cx_Oracle library as its database driver. This library needs a native Oracle database client, called Oracle Instant Client , to be able to connect to the database. To install the client follow these installation instructions . Once this client is installed, we can continue with setting up the actual Oracle Tap. Installing it entails nothing more than creating a Python Virtual Environment , activating it and then installing the tap dependency via pip . For the Oracle Tap you can do this with these commands: python3 - m venv ~ / . virtualenvs / tap - oracle source ~ / . virtualenvs / tap - oracle / bin / activate pip3 install tap - oracle Python Once the tap is installed and the virtual environment has been activated, the tap-oracle command will be available: ( tap - oracle ) developer@laptop . virtualenvs % tap - oracle usage : tap - oracle [ - h ] - c CONFIG [ - s STATE ] [ - p PROPERTIES ] [ - - catalog CATALOG ] [ - d ] tap - oracle : error : the following arguments are required : - c / - - config Python From the message we get when just trying to run the tap, we can see that the tap needs some configuration before it can actually work. This configuration is nothing more than a simple JSON file that contains the information about the database connection and some other parameters to define how the database will be synced. To keep things simple we will create this file, called config.json , in the bin directory of our tap-oracle virtual environment with the content below: { "host" : "localhost" , "port" : 1521 , "user" : "etl" , "password" : "admin123" , "sid" : "OraDoc" , "filter_schemas" : "ETL" , "default_replication_method" : "FULL_TABLE" } Python The first 5 configuration parameters are mandatory and pretty self-explanatory, except maybe for the last two: filter_schemas : an optional parameter that contains a comma separated list of schema names that allow you to only replicate the table data of specific schemas instead of all schemas. default_replication_method : this defines how the selected schemas will be replicated. There are 3 possible values: LOG_BASED , FULL_TABLE INCREMENTAL . For this POC we’ll be using FULL_TABLE as we want to keep things simple and will only be working with a small test database. For the LOG_BASED mode you’ll also need to make sure your Oracle database is configured accordingly which is outside of the scope for this POC and its Docker based Oracle database . With this configuration in place, we’re ready to run the tap in discovery mode . In this mode, the tap uses the config file to connect to the database and query the selected schemas about the available tables and their structure. We need to save this data, called the catalog, to another JSON file. To create this file, catalog.json , run the command below from the bin directory of the tap-oracle virtual environment: ( tap - oracle ) developer@laptop bin % tap - oracle - c config . json - d catalog . json INFO starting discovery INFO dsn : ( DESCRIPTION = ( ADDRESS = ( PROTOCOL = TCP ) ( HOST = localhost ) ( PORT = 1521 ) ) ( CONNECT_DATA = ( SID = OraDoc ) ) ) INFO fetching row counts INFO fetching tables : SELECT owner , table_name FROM all_tables WHERE owner != 'SYS' AND owner IN ( : 0 ) [ 'ETL' ] INFO fetching views INFO fetching column info INFO fetching pk constraints Python If you run the tap with the config and the catalog, you’ll see that nothing is synced: ( tap - oracle ) developer@laptop bin % tap - oracle - c config . json - - catalog catalog . json INFO Selected streams : [ ] INFO No currently_syncing found Python To get the tap to actually sync something, edit the catalog file slightly by selecting the streams you want to sync: { "streams" : [ { "tap_stream_id" : "ETL-TEST" , "table_name" : "TEST" , "schema" : { "properties" : { "FIRST_NAME" : { "type" : [ "null" , "string" ] } , . . . } , "type" : "object" } , "stream" : "TEST" , "metadata" : [ { "breadcrumb" : [ ] , "metadata" : { "table-key-properties" : [ "PERSON_ID" ] , "schema-name" : "ETL" , "database-name" : "ORADOC" , "is-view" : false , "row-count" : 0 , "selected" : true } } , { "breadcrumb" : [ "properties" , "FIRST_NAME" ] , "metadata" : { "sql-datatype" : "VARCHAR2" , "inclusion" : "available" , "selected-by-default" : true } } , { "breadcrumb" : [ "properties" , "LAST_NAME" ] , "metadata" : { "sql-datatype" : "VARCHAR2" , "inclusion" : "available" , "selected-by-default" : true } } , { "breadcrumb" : [ "properties" , "PERSON_ID" ] , "metadata" : { "sql-datatype" : "NUMBER" , "inclusion" : "automatic" , "selected-by-default" : true } } , { "breadcrumb" : [ "properties" , "PHONE" ] , "metadata" : { "sql-datatype" : "VARCHAR2" , "inclusion" : "available" , "selected-by-default" : true } } ] } ] } Python If you now run the same command again, you’ll see the tap sync content of the selected stream: ( tap - oracle ) developer@laptop bin % tap - oracle - c config . json - - catalog catalog . json INFO Selected streams : [ 'ETL-TEST' ] INFO No currently_syncing found INFO Beginning sync of stream ( ETL - TEST ) with sync method ( full ) INFO Stream ETL - TEST is using full_table replication { "type" : "SCHEMA" , "stream" : "TEST" , "schema" : { "properties" : { "FIRST_NAME" : { "type" : [ "null" , "string" ] } , "LAST_NAME" : { "type" : [ "null" , "string" ] } , "PERSON_ID" : { "format" : "singer.decimal" , "type" : [ "string" ] } , "PHONE" : { "type" : [ "null" , "string" ] } } , "type" : "object" } , "key_properties" : [ "PERSON_ID" ] } INFO dsn : ( DESCRIPTION = ( ADDRESS = ( PROTOCOL = TCP ) ( HOST = localhost ) ( PORT = 1521 ) ) ( CONNECT_DATA = ( SID = OraDoc ) ) ) { "type" : "STATE" , "value" : { "bookmarks" : { "ETL-TEST" : { "last_replication_method" : "FULL_TABLE" , "version" : 1618344226757 } } , "currently_syncing" : "ETL-TEST" } } { "type" : "ACTIVATE_VERSION" , "stream" : "TEST" , "version" : 1618344226757 } INFO select SELECT "FIRST_NAME" , "LAST_NAME" , "PERSON_ID" , "PHONE" , ORA_ROWSCN FROM ETL . TEST ORDER BY ORA_ROWSCN ASC { "type" : "RECORD" , "stream" : "TEST" , "record" : { "FIRST_NAME" : "John" , "LAST_NAME" : "Doe" , "PERSON_ID" : "1" , "PHONE" : "0499010203" } , "version" : 1618344226757 , "time_extracted" : "2021-04-13T20:03:46.757794Z" } { "type" : "RECORD" , "stream" : "TEST" , "record" : { "FIRST_NAME" : "Jane" , "LAST_NAME" : "Doe" , "PERSON_ID" : "1" , "PHONE" : "0499040506" } , "version" : 1618344226757 , "time_extracted" : "2021-04-13T20:03:46.757794Z" } INFO METRIC : { "type" : "counter" , "metric" : "record_count" , "value" : 2 , "tags" : { } } { "type" : "ACTIVATE_VERSION" , "stream" : "TEST" , "version" : 1618344226757 } { "type" : "STATE" , "value" : { "bookmarks" : { "ETL-TEST" : { "last_replication_method" : "FULL_TABLE" , "version" : 1618344226757 , "ORA_ROWSCN" : null } } , "currently_syncing" : null } } Python That was already a lot of work, but luckily the hardest part is done! Set up the "Target" Now that we have a tap set up to retrieve database data in the Singer spec format, we’re going to skip the transformation step for the time being and set up a target to pipe it to first. This way we can quickly verify the Extract and the Load parts of the ETL pipeline and already see a readable CSV result instead of Singer spec JSON files. We’ll be using the Pipelinewise S3 CSV Target . The setup of this target is very similar to how the tap was set up previously. Again, you’ll need to create a virtual environment for it, activate it and install the target dependency using pip: python3 - m venv ~ / . virtualenvs / target - s3 - csv source ~ / . virtualenvs / target - s3 - csv / bin / activate pip3 install pipelinewise - target - s3 - csv Python This target also needs a small configuration file in the form of a config.json file that contains the name of the bucket we want to send the CSV files to and the credentials needed to access the bucket: { "s3_bucket" : "anonymized-data-bucket" , "aws_access_key_id" : "your_own_aws_access_key_id_value" , "aws_secret_access_key" : "your_own_aws_secret_access_key_value" } Python This simple configuration is all we need to make the target work. It will enable us to pipe the result of the Oracle Tap into this target and get, depending on the selected streams, one or more currently non-anonymized, CSV files in our S3 bucket. If we run the command below, from the bin directory of the activated tap-oracle virtual environment, a full table sync will occur: ( tap - oracle ) developer@laptop bin % tap - oracle - - config config . json - - catalog catalog . json | ~ / . virtualenvs / target - s3 - csv / bin / target - s3 - csv - - config ~ / . virtualenvs / target - s3 - csv / bin / config . json INFO Selected streams : [ 'ETL-TEST' ] INFO No currently_syncing found INFO Beginning sync of stream ( ETL - TEST ) with sync method ( full ) INFO Stream ETL - TEST is using full_table replication INFO dsn : ( DESCRIPTION = ( ADDRESS = ( PROTOCOL = TCP ) ( HOST = localhost ) ( PORT = 1521 ) ) ( CONNECT_DATA = ( SID = OraDoc ) ) ) time = 2021 - 04 - 13 22 : 23 : 29 name = target_s3_csv level = INFO message = Attempting to create AWS session INFO select SELECT "FIRST_NAME" , "LAST_NAME" , "PERSON_ID" , "PHONE" , ORA_ROWSCN FROM ETL . TEST ORDER BY ORA_ROWSCN ASC INFO METRIC : { "type" : "counter" , "metric" : "record_count" , "value" : 2 , "tags" : { } } time = 2021 - 04 - 13 22 : 23 : 29 name = target_s3_csv level = INFO message = Uploading / var / folders / 5k / 86hrfqsd60b8pnh_81fqfy8h0000gn / T / TEST - 20210413T222329 . csv to bucket anonymized - data - bucket at TEST - 20210413T222329 . csv { "bookmarks" : { "ETL-TEST" : { "last_replication_method" : "FULL_TABLE" , "version" : 1618345409645 , "ORA_ROWSCN" : null } } , "currently_syncing" : null } Python You can check the result in the S3 web interface or using the AWS command line client. To see if the bucket contains the CSV files and to retrieve them, execute the commands below: aws s3 ls s3 : // anonymized - data - bucket / - - profile your_aws_profile_name aws s3 cp s3 : // anonymized - data - bucket / lt ; a filename output by the previous command gt ; . csv test . csv - - profile your_aws_profile_name Python Or you can do a cat like thing: aws s3 cp s3 : // anonymized - data - bucket / TEST - 20210413T223846 . csv - - profile your_aws_profile_name - | cat FIRST_NAME , LAST_NAME , PERSON_ID , PHONE John , Doe , 1 , 0499010203 Jane , Doe , 2 , 0499040506 Python Set up the "Transformation" / anonymization Now that we’re able to retrieve data from our Oracle database, select the streams from it that we want and store them as CSV files in S3. We’re only missing one little, but very important step: anonymizing the data in transit between the tap and the target. We’ll be using Pipelinewise Transform Field to achieve the transformation / anonymization. This piece of software basically represents the T that Singer as an EL tool is missing to make it an actual ETL tool. To set up the transformation part, we’re staying in a familiar theme. Simply set up another virtual environment, activate it, install a dependency via pip and create a configuration file. python3 - m venv ~ / . virtualenvs / transform - field source ~ / . virtualenvs / transform - field / bin / activate pip install pipelinewise - transform - field Python Create a config.json file in the bin directory of this virtual environment. This file contains a simple list of transformations. Such a transformation simply defines the type of transformation to apply to a specific field in a specific stream. The configuration in the example below has the result of HASH ing the FIRST_NAME field in the TEST stream: { "transformations" : [ { "field_id" : "FIRST_NAME" , "tap_stream_name" : "TEST" , "type" : "HASH" } ] } Python The list of available transformations in the Pipelinewise Transform Field step are: SET-NULL : transforms any input to NULL HASH : transforms string input to hash HASH-SKIP-FIRST-n : transforms string input to hash skipping first n characters, e.g. HASH-SKIP-FIRST-2 MASK-DATE : replaces the months and day parts of date columns to be always 1st of Jan MASK-NUMBER : transforms any numeric value to zero MASK-HIDDEN : transforms any string to ‘hidden’ While these transformations are sufficient for the anonymization needs of our POC case, they can be easily altered. You can even add custom transformations. To do this, edit the transform.py file found in the lib/python3.9/site-packages/transform_field directory of the current virtual environment. Putting it all together With these 3 virtual environments in place, we can pipe them all together to produce the end result we’re looking for. When we run the command below we should see the ETL pipeline sync the selected schema, anonymize specific parts and store the end result as a CSV file in our S3 bucket: ( tap - oracle ) developer@laptop bin % tap - oracle - - config config . json - - catalog catalog . json | ~ / . virtualenvs / transform - field / bin / transform - field - - config ~ / . virtualenvs / transform - field / bin / config . json | ~ / . virtualenvs / target - s3 - csv / bin / target - s3 - csv - - config ~ / . virtualenvs / target - s3 - csv / bin / config . json INFO Selected streams : [ 'ETL-TEST' ] INFO No currently_syncing found INFO Beginning sync of stream ( ETL - TEST ) with sync method ( full ) INFO Stream ETL - TEST is using full_table replication INFO dsn : ( DESCRIPTION = ( ADDRESS = ( PROTOCOL = TCP ) ( HOST = localhost ) ( PORT = 1521 ) ) ( CONNECT_DATA = ( SID = OraDoc ) ) ) time = 2021 - 04 - 13 22 : 38 : 46 name = target_s3_csv level = INFO message = Attempting to create AWS session INFO select SELECT "FIRST_NAME" , "LAST_NAME" , "PERSON_ID" , "PHONE" , ORA_ROWSCN FROM ETL . TEST ORDER BY ORA_ROWSCN ASC INFO METRIC : { "type" : "counter" , "metric" : "record_count" , "value" : 2 , "tags" : { } } time = 2021 - 04 - 13 22 : 38 : 46 name = transform_field level = INFO message = Exiting normally time = 2021 - 04 - 13 22 : 38 : 46 name = target_s3_csv level = INFO message = Uploading / var / folders / 5k / 86hrfqsd60b8pnh_81fqfy8h0000gn / T / TEST - 20210413T223846 . csv to bucket cjm - sandbox - anonymized - data at TEST - 20210413T223846 . csv { "bookmarks" : { "ETL-TEST" : { "last_replication_method" : "FULL_TABLE" , "version" : 1618346326464 , "ORA_ROWSCN" : null } } , "currently_syncing" : null } Python When we now check the CSV in our bucket: aws s3 cp s3 : // anonymized - data - bucket / TEST - 20210413T224646 . csv - - profile your_aws_profile_name - | cat FIRST_NAME , LAST_NAME , PERSON_ID , PHONE fd53ef835b15485572a6e82cf470dcb41fd218ae5751ab7531c956a2a6bcd3c7 , Doe , 1 , 0499010203 500501150a08713128839ca1465bfdc8a426268d6d0a576a16a80c13929f3faa , Doe , 2 , 0499040506 Python Conclusion In this blog post you’ve seen that we can easily use Singer to achieve the goals of the POC described in the previous blog post . Because it only requires some simple stuff like Python and an Oracle client, it can be installed and used on most systems and in most environments. It also uses some simple JSON based configuration files that can be easily tailored to a lot of use cases. The source of the taps and targets is also available and is usually easy to understand and adapt where needed. In case you can’t find a tap or target that suits you, writing one shouldn’t be too hard . In the last installment of this series, I’ll show a way to simplify the whole system a bit more and package it in a way that’s more suited for cloud native usage/deployment. Side quest: Oracle in Docker Because I didn’t have access to an existing Oracle database, I needed to run one myself. You’d think that would be easy and quick, but as it turned out it was a bit more complicated and quite frustrating. Especially when compared to running a PostgreSQL or MySQL/MariaDB container using Docker. Oracle XE was made available a long time ago. It’s a free version of an Oracle database with some limitations , but none of those were problematic for my use case. Oracle XE can be installed natively on Windows and Linux, but because I’m working on a Mac and run my other databases via Docker I wanted to run it using a Docker image . To do this, I needed to build an Oracle docker image myself. There is a Github repository that contains a lot of Dockerfiles and scripts provided by Oracle. In this repository there is also a section for Oracle databases: Oracle Database container images . You can check out this repository, go to the OracleDatabase/SingleInstance/dockerfiles directory and run the buildContainerImage.sh script to build an actual Oracle Docker database container. I wanted to set up an Oracle 12 to match the customer’s installation, but for that one I needed to download some database installation files first. However, it looks like those aren’t publicly available anymore (you can get them via an Oracle contract). So I settled for Oracle 18.4.0 XE: . / buildContainerImage . sh - v 18.4 .0 - x . . . takes about 5 minutes . . . docker run - p 1521 : 1521 - p 5500 : 5500 - e ORACLE_PWD = admin123 oracle / database : 18.4 .0 - xe . . . takes 30 + minutes . . . Python On my MacBook Pro starting this Docker container takes more than 30 minutes. On top of that, it usually stops working after a couple of hours, needing a restart that will again take that much time. I tried a number of things, like attaching volumes to store all the data that gets generated during the first start, but that didn’t seem to help. It also uses a lot of resources on my machine. Even after getting the container running, connecting to the database in it also wasn’t really straightforward because of the pluggable database stuff. Even when connected, creating a user and database specific for our POC and connecting with that one was again a lot of trouble: service name vs. SID and no service name support in the Oracle Python library that is used by the tap. So in the end, after Googling some more, I found a Docker repository from Oracle that contains an old Oracle 12 docker container . You need a free Oracle account for it to be able to do the docker login . There is no volume support so you’ll lose your data in a lot of cases, but it is easy to set up, starts consistently in a couple of minutes and you can connect to it via an SID. Also adding a user and database wasn’t too difficult and so I ended up using this instead of the newer Github repository stuff. docker login container - registry . oracle . com docker pull container - registry . oracle . com / database / standard : 12.1 .0 .2 docker run - d - - env - file . / env - p 1521 : 1521 - p 5500 : 5500 - it - - name dockerDB - - shm - size = "4g" container - registry . oracle . com / database / standard : 12.1 .0 .2 Python

Read more
Reading time 7 min
8 APR 2021

In this technical blog post, I want to talk about how to set up simple and flexible ETL based anonymization. Why? Well, I recently had the opportunity to do a small proof of concept for a customer. The customer wanted to know the options that were available that would enable them to take internal data, remove or anonymize any personally identifiable information (PII) and make it available in a simple way and form for external parties. After further requirements gathering, the context for this proof of concept was defined as: Whatever solution, it needs to be able to extract data from an on premise Oracle database. The end result should be a set of CSV files in an Amazon S3 bucket. In between ingesting the Oracle data and dumping it in CSV form on S3, there should be something that removes/anonymizes PII data. If possible, the chosen solution should be cloud native . In this 3 part blog series I’ll explain how to set up simple and flexible ETL based anonymization with the following subjects: The research into products that might be used to solve the problem. Also, check how suitable they are for what the proof of concept needs to achieve. How the chosen product can be used to create an ETL pipeline that fits the requirements. Additionally, how to setup a local Oracle database in Docker that can be used as a data source for the data ingestion part of the proof of concept (just because this was such a PITA to do). And whether this can be done in a cloud native way. Research The research part of the proof of concept consists of 2 parts: How to extract data from an Oracle database, anonymize it somehow and store it as a bunch of CSV files in an S3 bucket aka the ETL part. Figuring out the best way to accomplish the anonymization. Extracting, transforming and storing the data Straight off the bat, the customer’s problem sounded remarkably as something that you might solve with an ETL product: E xtract T ransform L oad. So the research part for this part of the proof of concept would be concentrated on this type of product. I also got some input from someone in my team to have a look at singer.io as that was something they had used successfully in the past for this kind of problem. When looking at the Singer homepage, there are a number things that immediately catch your eye: Singer powers data extraction and consolidation for all of the tools of your organization. The open-source standard for writing scripts that move data. Unix-inspired: Singer taps and targets are simple applications composed with pipes. JSON-based: Singer applications communicate with JSON, making them easy to work with and implement in any programming language. So when getting down to the basics, Singer is a just a specification, albeit not an official one. It’s a simple JSON based data format and you can either produce something in this format (a tap in Singer terminology) or consume the format (a target ). You’re able to chain these taps and targets together to extract data from one location and store it in another. Out of the box Singer already comes with a bunch of taps (100+) and targets (10). These taps and targets are written in Python. Because the central point of the system is just a data format, it’s pretty easy to write one yourself or adapt an existing one. When checking out the taps, the default Oracle tap should cover the Extract part of our proof of concept. The same however doesn’t seem to be the case for the Load part when looking at the default targets. There is a CSV target , but it stores its results locally, not in an S3 bucket. There is the option of just using this target and do the S3 upload ourself after the ETL pipeline has finished. Another option would be to adapt the existing CSV target and change the file storage to S3. Some quick Googling turns up a community made S3 CSV Singer target . According to its documentation, this target should do exactly what we want. Whoops, Singer doesn't transform With the Extract and Load parts covered, this leaves us with just the Transform part of the ETL pipeline to figure out… and this is where it gets a bit weird. Even tough Singer is classified as an ETL tool, it doesn’t seem to have support for the transformation part? Looking further into this I came by this ominously titled post: Why our ETL tool doesn’t do transformations . Reading this, it seems they consider their JSON specification/data format as the transformation part. So they support transformation to raw data and storing it, but don’t support other kinds of transformations. That part is up to yourself after it has been stored somewhere by a Singer target. So it turns out that Singer is more like the EL part of an ELT product than an “old school” ETL product . At this point, Singer should at least be sufficient to extract the data from an Oracle database and to put it in an S3 bucket in CSV format. And because Singer is pretty simple, open and extendable, I’m going to leave it at that for now. Let’s continue by looking into the anonymization options that might fit in this Singer context. Data anonymization Similarly to the ETL part, I also received some input for this part, pointing me to Microsoft Presidio . When looking at the homepage we can read the following: It provides fast identification and anonymization modules for private entities in text and images such as credit card numbers, names and more. It facilitates both fully automated and semi-automated PII de-identification flows on multiple platforms. Customizability in PII identification and anonymization. So there’s a lot of promising stuff in there that could help me solve my anonymization needs. Upon further investigation it looks like I’m evaluating this product during a major transformation (get it? 😉) from V1 to V2. V1 incorporated some ETL-like stuff like retrieving data from sources (even though Oracle support in the roadmap never seems to have materialized ) and storing anonymized results in a number of forms/locations. However, V2 has completely dropped this approach to concentrate purely on the detection and replacement of PII data. At its core, Presidio V2 is a Python based system built on top of an AI model. This enables it to automatically discover PII data in text and images and to replace it according to the rules you define. I did some testing using their online testing tool and it kind of works, but for our specific context it definitely needs tweaking. Also, when looking at the provided test data, it seems that it is mostly simple and short data but no large text blobs or images. This then begs the question: even if we’re able to configure Presidio to do what we want it to do, might we be hitting small nails with a big hammer? Is Presidio too much? So let’s rethink this. If we can easily know and define which simple columns in which tables need to be anonymized and when just nulling or hashing the column values is sufficient, we don’t need the auto detection part of Presidio. We also wouldn’t need the Presidio full text or image support and we also wouldn’t need fancy substitution support. Presidio could be a powerful library to create an automatic anonymization transformation step for our Singer based pipeline. It also helps that Presidio is Python based. However, my gut feeling says I maybe should first try to find a slightly simpler solution. I started searching for something that’s can do a simple PII replace and that works in a Singer tap/target context. I found this Github repository: pipelinewise-transform-field . The documentation reads “Transformation component between Singer taps and targets” . Sounds suspiciously like the “ T ” part that Singer as an ETL was missing! Further down in the configuration section we even read: “You need to define which columns have to be transformed by which method and in which condition the transformation needs to be applied.” and the possible transformation types are: SET-NULL : Transforms any input to NULL HASH : Transforms string input to hash HASH-SKIP-FIRST-n : Transforms string input to hash skipping first n characters, e.g. HASH-SKIP-FIRST-2 MASK-DATE : Replaces the months and day parts of date columns to be always 1st of Jan MASK-NUMBER : Transforms any numeric value to zero MASK-HIDDEN : Transforms any string to ‘hidden' This seems to cover our simple anonymization requirements completely! We can even see how we need to use it in the context of Singer: some-singer-tap | transform-field --config [config.json] | some-singer-target Default Conclusion We now have all the pieces of the puzzle on how to set up simple and flexible ETL based anonymization. In the next blog post we’ll show how they fit together and whether they produce the results the customer is looking for.

Read more
Reading time 17 min
12 JAN 2021

Flyway is a library that is for example used in Spring Boot to provide schema migration functionality. But... does Flyway support BigQuery? In this blog post, we work out 3 proof-of-concepts to add BigQuery support to Flyway and integrate with Google Dataflow! I'm a Solution Engineer in the Data team at ACA Group. Our cloud platform of choice is Google Cloud Platform (GCP) . We’re currently using a subset of the services that are available on GCP. For batch/streaming data processing, we're using Apache Beam , or more specifically Google Dataflow , which is Google's managed version of Apache Beam. Our database of choice to push the results of our data processing to is Google BigQuery . Because our company has historically used Java as its language of choice ( Python has recently gained some traction within our company and is something the Data team uses for writing Google Cloud Functions), we chose to write our Dataflow pipelines in Java (but Apache Beam also supports Python). As a company, we also have a lot of experience in writing enterprise applications in Java and frameworks like Spring Boot. So we're very used to automating our database schema evolution . This is a best practice that we like to keep and would like to apply to our data pipelines in Dataflow. So we decided to go on a little adventure and see if we could find something out there that could solve this need for us. Research The initial approach was first to do some research and see what Google can unearth. When looking around for information/tools/libraries/frameworks on schema evolution migration for BigQuery, we did find some options that we gave a deeper look: BigQuery itself has automatic schema detection and has support for schema changes , but these are mostly things you have to do manually using the API, the bq util or SQL. While this works, there’s nothing orchestrating it. Everything would need to be done manually or scripted. bq-schema : a tool that does mostly what we want, but that has the disadvantage that it is written in Python and would be difficult to integrate unless we would also switch our Apache Beam/Dataflow pipelines to Python. Which is a possibility I’m not immediately writing off, but which is one to save for later if no other good solutions are found. bigquery_migration : a tool that is similar to the Python one above, but covers less of our requirements and because it is made in Ruby would be even more difficult to integrate. BQconvert : only helpful for actually migrating an existing database schema to BigQuery and so not at all suited for what we want to achieve. Dataflow BigQuery Schema Evolution Automated Schema Evolution for BigQuery : more of a set of ideas and inspirations than an actual solution for our needs. Schema evolution in streaming Dataflow jobs and BigQuery tables : fascinating blog post series, but I couldn’t find any actual code repository for it. While some of these options do cover some or most of our requirements, there wasn’t one that really was an ideal match. A couple of these options were also mentioned in a Stackoverflow post that also contained a reference to Flyway… and the term Flyway rings a bell! Flyway is a library that is for example used in Spring Boot to provide schema migration functionality and that based on previous experience should in theory cover all our requirements. Leaving only one big question: does Flyway have BigQuery support? At the time I started looking into the whole Dataflow/BigQuery schema migration question, there was no official Flyway BigQuery support. In the meantime, non-certified beta support has been added. Via the aforementioned Stackoverflow post, I did however find an issue in the Flyway GitHub repository about adding BigQuery support to Flyway . In that issue, I found a mention to a branch in a forked repository that should add some sort of BigQuery support to Flyway. We were already familiar with Flyway due to our Spring Boot experience, and we’ve found some Flyway code that might actually add BigQuery support to Flyway. Time to do some proof of concepts, which will hopefully answer a bunch of questions: Can we get the BigQuery Flyway branch to work? If Flyway does work correctly with BigQuery Can we do all the necessary migrations like creating a table, adding data to a table, modifying a table schema, etc…? Can we integrate it all into Dataflow? If it integrates into Dataflow, does it actually run all the test migrations correctly? Proof of concept 1 The first proof of concept was to take the code from the forked repo as-is, clone it and try to get a simple migration to work against a BigQuery table in a dataset of a GCP test project. There are 3 ways to run/use Flyway: Command line Maven/Gradle plugin Java API Because we want to integrate Flyway support into our Java based Dataflow pipelines and also because our Jenkins/Terraform based deploy currently isn’t well suited for the command line or Maven/Gradle options, we first looked at just calling the Flyway API. This was done by just adding a simple Java class to the cloned repository branch and adding a main method. In this main method we needed to do a couple of things: Create a JDBC datasource that is able to connect to BigQuery in a given GCP project. Configure Flyway to use this datasource. Run the Flyway migrate command and see if it finds and executes our SQL migration. So the first thing we need to set up for a data source is a BigQuery JDBC driver. Luckily, the Google BigQuery documentation covers this. On this page is a link to a free download of the Google BigQuery Simba Data Connector made by Magnitude. Downloading the driver from this page will get you a ZIP file that contains the actual JDBC driver JAR file, GoogleBigQueryJDBC42.jar , but also all its dependencies. In my case, I only added this driver JAR to our company’s Maven repository, because most of the other driver dependencies are already available in public Maven repositories. It's quite the chore to check them all and upload the missing ones or the ones with differing versions. For this first POC it was enough to add the following dependencies to the pom.xml of the project we cloned (the versions are only indicative for when I tested it, but can be replaced with newer ones): com.simba:bigquery-driver:1.2.4.1007 com.google.cloud:google-cloud-bigquery:1.132.1 com.google.cloud:google-cloud-bigquerystorage:1.21.1 org.apache.avro:avro:1.8.2 With these dependencies in place, we can then get the code below to work if you set the GOOGLE_APPLICATION_CREDENTIALS environment variable and point it to a service account credentials JSON file (which is needed to make the OAuthType=3 authentication mode work) and replace the GCP project ID and a dataset ID placeholders. package org.flywaydb.core; import com.simba.googlebigquery.jdbc42.DataSource; import org.flywaydb.core.Flyway; public class BQTest { public static void main(String[] args) { DataSource dataSource = new DataSource(); dataSource.setURL("jdbc:bigquery://https://www.googleapis.com/bigquery/v2:443;ProjectId= GCP project ID ;OAuthType=3"); Flyway flyway = Flyway.configure() .createSchemas(false) .defaultSchema(" GCP project ID . a dataset ID ") .schemas(" GCP project ID . a dataset ID ") .dataSource(dataSource) .baselineOnMigrate(true) .load(); flyway.migrate(); } } Default I then also added an SQL migration file to my src/main/resources/db/migration directory and executed the code and to my surprise Flyway was trying to talk to my BigQuery. There was however one small issue with the cloned Flyway BigQuery code that needed to be fixed. The INSERT statement, in the BigQueryDatabase#getInsertStatement method, that Flyway uses to add migrations to its flyway_schema_history table failed for 2 reasons: Type issues with INT64 columns that could be solved with explicit casting: CAST(? AS INT64) . Having to provide an explicit manual default, CURRENT_TIMESTAMP() , for timestamp columns. After fixing the INSERT statement, I was able to see Flyway work correctly with BigQuery and verify that it could do all the migration actions that we defined. I even managed to get mixed SQL Java migrations to work (using the Java BigQuery API to do things that can’t be expressed in SQL). There was only 1 surprise: adding data to a table can’t be done in the same SQL file that you create the table in. Those kinds of actions can’t be mixed in the same file. The output below is similar to what I got, but is from a more recent attempt with the current Flyway 8.x that has BigQuery Beta support: Nov 18, 2021 11:43:30 AM org.flywaydb.core.internal.license.VersionPrinter info INFO: Flyway Community Edition 8.0.3 by Redgate Nov 18, 2021 11:43:30 AM org.flywaydb.core.internal.database.base.BaseDatabaseType info INFO: Database: jdbc:bigquery://https://www.googleapis.com/bigquery/v2:443;ProjectId= GCP project ID ;OAuthType=3 (Google BigQuery 2.0) Nov 18, 2021 11:43:30 AM org.flywaydb.core.internal.database.base.BaseDatabaseType warn WARNING: Google BigQuery 2.0 does not support setting the schema for the current session. Default schema will NOT be changed to GCP project ID . a dataset ID ! Nov 18, 2021 11:43:30 AM org.flywaydb.database.bigquery.BigQueryDatabaseType info INFO: Join the GCP BigQuery beta via https://rd.gt/3fut40f Nov 18, 2021 11:43:30 AM org.flywaydb.database.bigquery.BigQueryDatabaseType info INFO: Nov 18, 2021 11:43:30 AM org.flywaydb.database.bigquery.BigQueryDatabaseType info INFO: Experiencing performance issues while using GCP BigQuery? Nov 18, 2021 11:43:30 AM org.flywaydb.database.bigquery.BigQueryDatabaseType info INFO: Find out how Flyway Teams improves performance with batching at https://rd.gt/3CWAuTb Nov 18, 2021 11:43:30 AM org.flywaydb.database.bigquery.BigQueryDatabaseType info INFO: Nov 18, 2021 11:43:48 AM org.flywaydb.database.bigquery.BigQueryDatabase info INFO: Google BigQuery databases have a 10 GB database size limit in Flyway Community Edition. You have used 0 GB / 10 GB Consider upgrading to Flyway Teams Edition for unlimited usage: https://rd.gt/3CWAuTb Nov 18, 2021 11:43:51 AM org.flywaydb.core.internal.command.DbValidate info INFO: Successfully validated 1 migration (execution time 00:02.091s) Nov 18, 2021 11:44:03 AM org.flywaydb.core.internal.schemahistory.JdbcTableSchemaHistory info INFO: Creating Schema History table ` GCP project ID . a dataset ID `.`flyway_schema_history` with baseline ... Nov 18, 2021 11:44:09 AM org.flywaydb.core.internal.command.DbBaseline info INFO: Successfully baselined schema with version: 1 Nov 18, 2021 11:44:19 AM org.flywaydb.core.internal.command.DbMigrate info INFO: Current version of schema ` GCP project ID . a dataset ID `: 1 Nov 18, 2021 11:44:19 AM org.flywaydb.core.internal.command.DbMigrate info INFO: Migrating schema ` GCP project ID .test_dataset` to version "1.0001 - Test migration" [non-transactional] Nov 18, 2021 11:44:47 AM org.flywaydb.core.internal.command.DbMigrate info INFO: Successfully applied 1 migration to schema ` GCP project ID . a dataset ID `, now at version v1.0001 (execution time 00:37.604s) Default Proof of concept 2 The previous POC leaves us now with a new problem to solve: get this code working inside a Google Dataflow project. Taking inspiration from Spring Boot, which runs Flyway migrations during application startup, I had to find something in Beam/Dataflow that is similar and allows us to run arbitrary code during startup. A first option that I discovered and investigated was a custom DataflowRunnerHooks implementation. While trying this out, I quickly discovered that the moment this is triggered is completely wrong for what we want to achieve as it is already executing while building the Dataflow code using the mvn compile exec:java command . Because we're building a common Dataflow artifact that is deployed to all environments and gets injected with runtime variables, triggering our custom Flyway code at this time doesn't achieve what we want. So after looking around some more I found the JvmInitializer interface. This immediately looked more promising and a quick implementation showed that it was indeed usable, but that it does have a number of quirks/gotchas that we’ll cover in more detail in the lessons learned section. package be.planetsizebrain; import com.simba.googlebigquery.jdbc42.DataSource; import com.simba.googlebigquery.support.exceptions.ErrorException; import org.apache.beam.runners.dataflow.options.DataflowWorkerHarnessOptions; import org.apache.beam.sdk.harness.JvmInitializer; import org.apache.beam.sdk.options.PipelineOptions; import org.flywaydb.core.Flyway; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class FlywayJvmInitializer implements JvmInitializer { private final Logger LOGGER = LoggerFactory.getLogger(FlywayJvmInitializer.class); @Override public void beforeProcessing(PipelineOptions options) { LOGGER.info("Running flyway JVM initializer..."); try { DataflowWorkerHarnessOptions harnessOptions = options.as(DataflowWorkerHarnessOptions.class); executeFlyway(harnessOptions); } catch (Exception e) { LOGGER.error("Flyway migrations failed!", e); throw new RuntimeException("Unexpected problem during beforeProcessing phase of JVM initializer", e); } finally { LOGGER.info("Finished running flyway JVM initializer."); } } private void executeFlyway(DataflowWorkerHarnessOptions harnessOptions) throws ErrorException { Flyway flyway = initializeFlywayClient(harnessOptions); LOGGER.info("Running flyway migrations..."); flyway.migrate(); LOGGER.info("Finished flyway migrations"); } private Flyway initializeFlywayClient(DataflowWorkerHarnessOptions harnessOptions) throws ErrorException { DataSource dataSource = createBigQueryDataSource(harnessOptions); return Flyway.configure() .createSchemas(false) .defaultSchema("FLYWAY") .schemas("FLYWAY") .dataSource(dataSource) .failOnMissingLocations(true) .locations("classpath:db/migration") .ignoreFutureMigrations(true) .load(); } private DataSource createBigQueryDataSource(DataflowWorkerHarnessOptions options) throws ErrorException { DataSource dataSource = new DataSource(); dataSource.setURL("jdbc:bigquery://https://www.googleapis.com/bigquery/v2:443;ProjectId=" + options.getProject() + ";OAuthType=3"); dataSource.setTimeout(180); dataSource.setLogLevel("INFO"); dataSource.setMaxResults(10000); return dataSource; } } Default When adding this code to a Dataflow project, there is one more thing needed to actually make it work. The JvmInitializer system works via the Java Service Provider Interface mechanism. This means we need to create a file called org.apache.beam.sdk.harness.JvmInitializer in src/main/resources/META-INF/services that contains the FQCN of our JvmInitializer implementation. When running a Dataflow pipeline, we can see the following logging (here again with the output of a more recent attempt with the Flyway version that has Beta support for BigQuery): 2021-11-08 11:26:50.593 CET "Loading pipeline options from /var/opt/google/dataflow/pipeline_options.json" 2021-11-08 11:26:50.624 CET "Worker harness starting with: {...}" 2021-11-08 11:26:53.565 CET "Running flyway JVM initializer..." 2021-11-08 11:26:54.111 CET "Running flyway migrations..." 2021-11-08 11:26:54.322 CET "[Simba][JDSI](20260) Cannot access file to use for logging: CANNOT_CREATE_LOGGING_PATH. Switching to default logging output." 2021-11-08 11:26:54.325 CET "Nov 08 10:26:54.323 1 com.simba.googlebigquery.jdbc.common.BaseConnectionFactory.getConnection: SDK Version: 10.1.20.1161" 2021-11-08 11:26:54.330 CET "Nov 08 10:26:54.330 1 com.simba.googlebigquery.jdbc.common.BaseConnectionFactory.getConnection: Driver Version: 01.02.19.1023" 2021-11-08 11:26:54.332 CET "Nov 08 10:26:54.332 1 com.simba.googlebigquery.jdbc.common.BaseConnectionFactory.getConnection: JVM Name: Java HotSpot(TM) 64-Bit Server VM" 2021-11-08 11:26:54.332 CET "Nov 08 10:26:54.332 1 com.simba.googlebigquery.jdbc.common.BaseConnectionFactory.getConnection: JVM Specification Version: 1.8" 2021-11-08 11:26:54.333 CET "Nov 08 10:26:54.333 1 com.simba.googlebigquery.jdbc.common.BaseConnectionFactory.getConnection: JVM Implementation Version: 25.151-b12" 2021-11-08 11:26:54.335 CET "Nov 08 10:26:54.335 1 com.simba.googlebigquery.jdbc.common.BaseConnectionFactory.getConnection: JVM Vendor: Oracle Corporation" 2021-11-08 11:26:54.335 CET "Nov 08 10:26:54.335 1 com.simba.googlebigquery.jdbc.common.BaseConnectionFactory.getConnection: Operating System Name: Linux" 2021-11-08 11:26:54.339 CET "Nov 08 10:26:54.339 1 com.simba.googlebigquery.jdbc.common.BaseConnectionFactory.getConnection: Operating System Version: 5.4.120+" 2021-11-08 11:26:54.340 CET "Nov 08 10:26:54.340 1 com.simba.googlebigquery.jdbc.common.BaseConnectionFactory.getConnection: Operating System Architecture: amd64" 2021-11-08 11:26:54.340 CET "Nov 08 10:26:54.340 1 com.simba.googlebigquery.jdbc.common.BaseConnectionFactory.getConnection: Locale Name: en_US" 2021-11-08 11:26:54.340 CET "Nov 08 10:26:54.340 1 com.simba.googlebigquery.jdbc.common.BaseConnectionFactory.getConnection: Default Charset Encoding: US-ASCII" 2021-11-08 11:26:54.358 CET "[Simba][JDSI](20260) Cannot access file to use for logging: CANNOT_CREATE_LOGGING_PATH. Switching to default logging output." 2021-11-08 11:26:54.474 CET "SQLWarning: reason([Simba][BigQueryJDBCDriver](1000019) Invalid connection property value for MetaDataFetchThreadCount, value overridden to 32.) SQLState(01S02)" 2021-11-08 11:26:57.024 CET "Nov 08 10:26:57.023 WARN 1 com.simba.googlebigquery.jdbc.common.BaseConnectionFactory.doConnect: [Simba][BigQueryJDBCDriver](1000019) Invalid connection property value for MetaDataFetchThreadCount, value overridden to 32." 2021-11-08 11:26:57.032 CET "SQLWarning: reason([Simba][BigQueryJDBCDriver](1000019) Invalid connection property value for MetaDataFetchThreadCount, value overridden to 32.) SQLState(01S02)" 2021-11-08 11:26:57.037 CET "Nov 08 10:26:57.037 1 com.simba.googlebigquery.jdbc.common.SConnection.SConnection: Driver version is: 01.02.19.1023" 2021-11-08 11:26:57.038 CET "Nov 08 10:26:57.038 1 com.simba.googlebigquery.jdbc.common.SConnection.SConnection: Datasource version is: 01.02.19.1023" 2021-11-08 11:26:57.144 CET "Flyway Community Edition 8.0.3 by Redgate" 2021-11-08 11:26:57.146 CET "Database: jdbc:bigquery://https://www.googleapis.com/bigquery/v2:443;ProjectId=amfori-data-acc;OAuthType=3; (Google BigQuery 2.0)" 2021-11-08 11:26:57.159 CET "Google BigQuery 2.0 does not support setting the schema for the current session. Default schema will NOT be changed to FLYWAY !" 2021-11-08 11:26:57.160 CET "Join the GCP BigQuery beta via https://rd.gt/3fut40f" 2021-11-08 11:26:57.160 CET "Experiencing performance issues while using GCP BigQuery?" 2021-11-08 11:26:57.163 CET "Find out how Flyway Teams improves performance with batching at https://rd.gt/3CWAuTb" 2021-11-08 11:27:55.640 CET "Google BigQuery databases have a 10 GB database size limit in Flyway Community Edition. You have used 0 GB / 10 GB Consider upgrading to Flyway Teams Edition for unlimited usage: https://rd.gt/3CWAuTb" 2021-11-08 11:27:56.285 CET "Nov 08 10:27:56.285 1 com.simba.googlebigquery.googlebigquery.dataengine.BQBufferManager.processTheFirstPage: Retrieving data using the standard API" 2021-11-08 11:27:57.240 CET "Nov 08 10:27:57.240 1 com.simba.googlebigquery.googlebigquery.dataengine.BQBufferManager.processTheFirstPage: Retrieving data using the standard API" 2021-11-08 11:27:58.463 CET "Nov 08 10:27:58.463 1 com.simba.googlebigquery.googlebigquery.dataengine.BQBufferManager.processTheFirstPage: Retrieving data using the standard API" 2021-11-08 11:27:58.612 CET "Successfully validated 68 migrations (execution time 00:02.154s)" 2021-11-08 11:28:08.063 CET "Creating Schema History table `FLYWAY`.`flyway_schema_history` ..." 2021-11-08 11:28:20.046 CET "Current version of schema `FLYWAY`: Empty Schema " 2021-11-08 11:28:20.049 CET "Migrating schema `FLYWAY` to version "1.0001 - Test migration" [non-transactional]" 2021-11-08 11:28:21.100 CET "Nov 08 10:28:21.100 1 com.simba.googlebigquery.googlebigquery.dataengine.BQBufferManager.processTheFirstPage: Retrieving data using the standard API" ... 2021-11-08 11:28:25.724 CET "Nov 08 10:28:25.724 1 com.simba.googlebigquery.googlebigquery.dataengine.BQBufferManager.processTheFirstPage: Retrieving data using the standard API" "Nov 08 10:28:45.547 1 com.simba.googlebigquery.googlebigquery.dataengine.BQBufferManager.processTheFirstPage: Retrieving data using the standard API" ... "Successfully applied 68 migrations to schema `FLYWAY`, now at version v8.0022 (execution time 27:00.604s)" Default Proof of concept 3 When starting to write the actual blog post, I checked out the Flyway Github repo again and spotted an interesting new module in their Maven multi-module project: flyway-gcp-bigquery (and also one for GCP Spanner). Looking at Maven Central it looks like they started to release beta versions of the BigQuery support somewhere in July 2021. So I decided to check it out and see if I could remove the forked PR code from my codebase and replace it with this beta version dependency: ?xml version="1.0" encoding="UTF-8"? project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd" ... properties bigquery-driver.version 1.2.19.1023 /bigquery-driver.version flyway.version 8.0.3 /flyway.version /properties ... dependency groupId org.flywaydb /groupId artifactId flyway-core /artifactId version ${flyway.version} /version /dependency dependency groupId org.flywaydb /groupId artifactId flyway-gcp-bigquery /artifactId version ${flyway.version}-beta /version /dependency ... /project Default After removing the code, adding the dependencies above (while also upgrading Flyway from 7.x to 8.x), recompiling and deploying, I was still able to run all the migrations successfully against an empty BigQuery environment. Lessons learned The Simba BigQuery driver The driver itself (as far as I can tell, the only JDBC BigQuery driver there is) does what it is supposed to do, but when it comes to logging it is a bit of a hot mess. Things I had to do to get the driver’s logging in Dataflow under some sort of control include: Adding a bunch of dependencies that try to redirect stuff to SLF4J and use those in my FlywayJvmInitializer constructor org.apache.logging.log4j:log4j-iostreams uk.org.lidalia:sysout-over-slf4j Debug the driver to find out why I still got stuff on System.out Overwrite the com.simba.googlebigquery.dsi.core.impl.StreamHandler class to force more logging to SLF4J public FlywayJvmInitializer() { PrintWriter logWriter = IoBuilder.forLogger().setLevel(Level.INFO).buildPrintWriter(); DriverManager.setLogWriter(logWriter); SysOutOverSLF4J.sendSystemOutAndErrToSLF4J(); LogFactory.setLogCreator(new Slf4jLogCreator()); } Default package com.simba.googlebigquery.dsi.core.impl; import com.simba.googlebigquery.dsi.core.interfaces.ILogHandler; import org.apache.logging.log4j.Level; import org.apache.logging.log4j.io.IoBuilder; import java.io.PrintWriter; public class StreamHandler implements ILogHandler { private final PrintWriter m_logWriter; public StreamHandler() { this.m_logWriter = IoBuilder.forLogger("StreamHandler").setLevel(Level.INFO).buildPrintWriter(); } public void writeLog(String var1) throws Exception { this.m_logWriter.println(var1); this.m_logWriter.flush(); } } Default No local BigQuery is annoying There is no way to run something locally on your development machine that can be used to validate BigQuery behaviour. So no BigQuery docker image or emulator means that in order to test Flyway migrations against BigQuery you will actually need either a separate Google project to test against or use prefixed datasets in an existing Google project. Due to certain limitations we had to go for the prefixed dataset approach, but managed to get it to work pretty transparently by using Dataflow runtime ValueProviders, the Flyway placeholder functionality and a custom utility that makes the dataset creation/deletion process easier. BigQuery Time Travel is your friend BigQuery has a very interesting feature called Time Travel , which comes in very handy when Flyway migrations fail. Especially for the community edition of Flyway, which doesn’t have “ undo ” functionality, Time Travel is the easiest way to restore your database to how it was before the migration. I’m even wondering if you could somehow build “undo” functionality using BigQuery’s Time Travel and Flyway’s “ callbacks ” (the ones that are available in the community version)? Time Travel also comes in handy because BigQuery has quotas on a lot of things. Manually reverting changes via SQL ALTER TABLE statements for example quickly makes you run into these. Dataflow worker scaling gives weird results We first had every Dataflow pipeline using the JvmInitializer to keep the database schema up to date, but noticed that sometimes rows in the Flyway history table were duplicated (or more). As it turns out, every Dataflow worker that gets started by a pipeline goes through JVM initialization. Sometimes, these are started close enough to each other that migrations get run multiple times. Usually Flyway tries to use some sort of locking to solve this, but in the cloned code this mechanism wasn’t available for BigQuery. It seems some sort of locking is available in the 8.x Beta for this, but I haven’t been to test if this works yet. To solve this issue, we made running the JvmInitializer configurable and turned it off by default for all pipelines and created a specific dummy Flyway pipeline for which we turned it on and which runs before all other batch pipelines. The Flyway BigQuery migration process is kinda slow Worker initialization takes about 2 minutes before the worker actually starts doing stuff and we see Flyway kicking into action. Afterwards, it also seems that every migration file takes at least 30 seconds to run (sometimes more, depending on the migration and table contents). From the logging it looks like this is partially due to how the SQL is being run: a BigQuery job for which you need to listen for the results. Luckily, due to the previous issue/solution we’re only running it once every day for one dummy pipeline and not the rest of our pipelines. So the only time it is actually slow is when you’re testing and running the full set of migrations starting from an empty environment. You will also need to set your Flyway timeout to a value that is long enough for bigger table manipulations to succeed and not cause a timeout. We’re currently working with a value of 180 seconds. Mixing SQL and Java migrations is perfectly possible For all the things that you want to do with BigQuery in the context of a migration that aren’t supported by BigQuery’s SQL implementation, you can fall back on Flyway’s Java migrations. In a Java migration, you can easily use the BigQuery Java API to do everything that the API allows you to do. package org.flywaydb.core.internal; import com.google.auth.Credentials; import com.google.auth.oauth2.ServiceAccountCredentials; import com.google.cloud.bigquery.*; import org.flywaydb.core.api.migration.BaseJavaMigration; import org.flywaydb.core.api.migration.Context; public class V1__Java_Based_Migration extends BaseJavaMigration { @Override public void migrate(Context context) throws Exception { Credentials credentials = ServiceAccountCredentials.getApplicationDefault(); BigQueryOptions bigQueryOptions = BigQueryOptions.newBuilder() .setProjectId("my-gcp-project-id") .setCredentials(credentials) .build(); BigQuery service = bigQueryOptions.getService(); TableId tableId = TableId.of("test", "test"); Table table = service.getTable(tableId); TableId tableCopyId = TableId.of("test", "test_copy"); Job job = table.copy(tableCopyId); job.waitFor(); } @Override public Integer getChecksum() { return 1; } } Default Runtime configuration in a JvmInitializer In the end, we created a more advanced JvmInitializer that allows us to turn Flyway migrations/repair/baselining on/off, dynamic prefixing of datasets, and so on. For this we of course need to provide Dataflow pipeline options and because we’re also building a pipeline artifact (JSON + JARs) in a central bucket that is used to start jobs in multiple environments these options need to be runtime options. This is where we ran into an issue with Dataflow’s option mechanism, especially if you want to use the required/default mechanism. As it turns out, this mechanism doesn’t really work like you’d expect and defaults seem to get lost when you don’t provide a value for an option, but try to access it in the JvmInitializer. The solution to this was found when looking at the Dataflow worker logs. In these logs, we could see a JSON being logged that contains most of the option info we need. This JSON is available under the sdk_pipeline_options_file environment variable on a worker. Reading this value and parsing it allows us to kind of get a working custom options object. Together with using reflection to look at the annotations and their contents, we got it to work well enough for our purposes.

Read more