
.png?auto=compress,webp&upscale=true&width=610&height=488&name=hubspot%20covers%20(7).png)
Flyway is een bibliotheek die bijvoorbeeld wordt gebruikt in Spring Boot om schema migratie functionaliteit te bieden. Maar... ondersteunt Flyway BigQuery? In deze blogpost werken we 3 proof-of-concepts uit om ondersteuning voor BigQuery toe te voegen aan Flyway en te integreren met Google Dataflow!
Ik ben Solution Engineer in het Data-team bij ACA Group. Ons cloudplatform van keuze is Google Cloud Platform (GCP). We gebruiken momenteel een subset van de services die beschikbaar zijn op GCP.
- Voor batch/streaming gegevensverwerking gebruiken we Apache Beam, of meer specifiek Google Dataflow, wat Google's beheerde versie van Apache Beam is.
- Onze database van keuze om de resultaten van onze gegevensverwerking naar te pushen is Google BigQuery.



Omdat ons bedrijf van oudsher Java gebruikt als de taal van onze keuze(Python heeft onlangs wat terrein gewonnen binnen ons bedrijf en is iets wat het Data-team gebruikt voor het schrijven van Google Cloud Functions), hebben we ervoor gekozen om onze Dataflow-pijplijnen in Java te schrijven (maar Apache Beam ondersteunt ook Python). Als bedrijf hebben we ook veel ervaring met het schrijven van bedrijfsapplicaties in Java en frameworks zoals Spring Boot. We zijn dus erg gewend om de evolutie van onze databaseschema's te automatiseren. Dit is een best practice die we graag willen behouden en willen toepassen op onze datapijplijnen in Dataflow.
Dus besloten we om op avontuur te gaan en te kijken of we iets konden vinden dat deze behoefte voor ons kon oplossen.

Onderzoek
De eerste aanpak was om eerst wat onderzoek te doen en te kijken wat Google te bieden had. Toen we aan het rondkijken waren naar informatie/tools/libraries/frameworks over schema evolutie & migratie voor BigQuery, vonden we een aantal opties waar we wat dieper op in zijn gegaan:
- BigQuery zelf heeft automatische schema detectie en heeft ondersteuning voor schema veranderingen, maar dit zijn meestal dingen die je handmatig moet doen met behulp van de API, de bq util of SQL. Hoewel dit werkt, is er niets dat het orkestreert. Alles zou handmatig of gescript moeten gebeuren.
- bq-schema: een tool die grotendeels doet wat we willen, maar die als nadeel heeft dat het in Python geschreven is en moeilijk te integreren zou zijn, tenzij we ook onze Apache Beam/Dataflow pipelines zouden overschakelen naar Python. Dat is een mogelijkheid die ik niet meteen afschrijf, maar die we moeten bewaren voor later als we geen andere goede oplossingen vinden.
- bigquery_migration: een tool die lijkt op de bovenstaande van Python, maar die minder van onze vereisten dekt en omdat het in Ruby is gemaakt, nog moeilijker te integreren zou zijn.
- BQconvert: alleen nuttig voor het migreren van een bestaand databaseschema naar BigQuery en dus helemaal niet geschikt voor wat wij willen bereiken.
- Dataflow BigQuery Schema Evolution & Automated Schema Evolution for BigQuery: meer een verzameling ideeën en inspiraties dan een daadwerkelijke oplossing voor onze behoeften.
- Schema-evolutie in streaming Dataflow jobs en BigQuery tabellen: fascinerende serie blogposts, maar ik kon er geen daadwerkelijke code-repository voor vinden.
Hoewel sommige van deze opties voldoen aan sommige of de meeste van onze vereisten, was er niet één die echt een ideale match was.
Een paar van deze opties werden ook genoemd in een Stackoverflow post die ook een verwijzing naar Flyway bevatte... en de term Flyway doet een belletje rinkelen!

Flyway is een bibliotheek die bijvoorbeeld wordt gebruikt in Spring Boot om functionaliteit voor schema-migratie te bieden en die op basis van eerdere ervaringen in theorie aan al onze eisen zou moeten voldoen. Blijft er nog één grote vraag over: heeft Flyway ondersteuning voor BigQuery?
Op het moment dat ik me begon te verdiepen in het hele Dataflow/BigQuery schema migratie vraagstuk, was er nog geen officiële Flyway BigQuery ondersteuning. Inmiddels is er niet-gecertificeerde bèta-ondersteuning toegevoegd. Via de eerder genoemde Stackoverflow post vond ik echter wel een issue in de Flyway GitHub repository over het toevoegen van BigQuery ondersteuning aan Flyway. In dat probleem vond ik een verwijzing naar een branch in een forked repository die een soort BigQuery ondersteuning aan Flyway zou moeten toevoegen.
We waren al bekend met Flyway door onze Spring Boot ervaring, en we hebben wat Flyway code gevonden die BigQuery ondersteuning aan Flyway zou kunnen toevoegen. Tijd om wat proof of concepts te doen, die hopelijk een hoop vragen zullen beantwoorden:
- Kunnen we de BigQuery Flyway branch laten werken?
- Als Flyway correct werkt met BigQuery
- Kunnen we alle noodzakelijke migraties uitvoeren, zoals het maken van een tabel, het toevoegen van gegevens aan een tabel, het wijzigen van een tabel schema, etc...?
- Kunnen we dit allemaal integreren in Dataflow?
- Als het in Dataflow wordt geïntegreerd, voert het dan alle testmigraties correct uit?

Proefconcept 1
De eerste proof of concept was om de code van de forked repo as-is te nemen, deze te klonen en te proberen een eenvoudige migratie te laten werken tegen een BigQuery tabel in een dataset van een GCP testproject.
Er zijn 3 manieren om Flyway uit te voeren/te gebruiken:
- Opdrachtregel
- Maven/Gradle plugin
- Java API
Omdat we ondersteuning voor Flyway willen integreren in onze Java-gebaseerde Dataflow pipelines en ook omdat onze Jenkins/Terraform-gebaseerde implementatie op dit moment niet goed geschikt is voor de command line of Maven/Gradle opties, hebben we eerst gekeken naar het gewoon aanroepen van de Flyway API. Dit werd gedaan door gewoon een eenvoudige Java klasse toe te voegen aan de gekloonde repository branch en een hoofdmethode toe te voegen. In deze hoofdmethode moesten we een paar dingen doen:
- Creëer een JDBC datasource die in staat is om verbinding te maken met BigQuery in een gegeven GCP project.
- Configureer Flyway om deze datasource te gebruiken.
- Voer het Flyway migrate commando uit en kijk of het onze SQL migratie vindt en uitvoert.
Dus het eerste wat we moeten instellen voor een databron is een BigQuery JDBC driver. Gelukkig wordt dit behandeld in de Google BigQuery documentatie. Op deze pagina staat een link naar een gratis download van de Google BigQuery Simba Data Connector, gemaakt door Magnitude. Als je de driver van deze pagina downloadt, krijg je een ZIP-bestand dat het eigenlijke JDBC driver JAR-bestand bevat, GoogleBigQueryJDBC42.jar, maar ook alle afhankelijkheden.
In mijn geval heb ik alleen deze driver JAR toegevoegd aan de Maven repository van ons bedrijf, omdat de meeste andere driver afhankelijkheden al beschikbaar zijn in publieke Maven repositories. Het is een hele klus om ze allemaal te controleren en de ontbrekende of degene met verschillende versies te uploaden.
Voor deze eerste POC was het voldoende om de volgende afhankelijkheden toe te voegen aan de pom.xml van het project dat we hebben gekloond (de versies zijn slechts indicatief voor toen ik het testte, maar kunnen worden vervangen door nieuwere versies):
- com.simba:bigquery-driver:1.2.4.1007
- com.google.cloud:google-cloud-bigquery:1.132.1
- com.google.cloud:google-cloud-bigquerystorage:1.21.1
- org.apache.avro:avro:1.8.2
Met deze afhankelijkheden op hun plaats kunnen we de onderstaande code laten werken als je de omgevingsvariabele GOOGLE_APPLICATION_CREDENTIALS instelt en deze naar een JSON-bestand met servicerekeningreferenties wijst (wat nodig is om de OAuthType=3 authentiseringsmodus te laten werken) en de plaatshouders <GCP project ID> en <a dataset ID> vervangt.
package org.flywaydb.core; import com.simba.googlebigquery.jdbc42.DataSource; import org.flywaydb.core.Flyway; public class BQTest { public static void main(String[] args) { DataSource dataSource = nieuwe DataSource(); dataSource.setURL("jdbc:bigquery://https://www.googleapis.com/bigquery/v2:443;ProjectId=<GCP project ID>;OAuthType=3"); Flyway flyway = Flyway.configure() .createSchemas(false) .defaultSchema("<GCP project ID>.<a dataset ID>") .schemas("<GCP project ID>.<a dataset ID>") .dataSource(dataSource) .baselineOnMigrate(true) .load(); flyway.migrate(); } }.
Ik heb toen ook een SQL-migratiebestand toegevoegd aan mijn src/main/resources/db/migration directory en de code uitgevoerd en tot mijn verbazing probeerde Flyway te praten met mijn BigQuery. Er was echter een klein probleem met de gekloonde Flyway BigQuery code dat opgelost moest worden. Het INSERT statement, in de BigQueryDatabase#getInsertStatement method, dat Flyway gebruikt om migraties toe te voegen aan zijn flyway_schema_history tabel mislukte om 2 redenen:
- Type problemen met INT64 kolommen die opgelost konden worden met expliciete casting: CAST(? ALS INT64).
- Ik moest een expliciete handmatige standaard opgeven, CURRENT_TIMESTAMP(), voor tijdstempelkolommen.
Na het repareren van het INSERT statement, kon ik zien dat Flyway correct werkte met BigQuery en verifiëren dat het alle migratieacties kon uitvoeren die we hadden gedefinieerd. Het lukte me zelfs om gemengde SQL & Java migraties te laten werken (met behulp van de Java BigQuery API om dingen te doen die niet in SQL kunnen worden uitgedrukt). Er was slechts 1 verrassing: gegevens toevoegen aan een tabel kan niet in hetzelfde SQL-bestand waarin je de tabel aanmaakt. Dit soort acties kunnen niet worden gecombineerd in hetzelfde bestand.
De onderstaande uitvoer is vergelijkbaar met wat ik kreeg, maar is van een recentere poging met de huidige Flyway 8.x die BigQuery Beta ondersteunt:
Nov 18, 2021 11:43:30 AM org.flywaydb.core.internal.license.VersionPrinter info INFO: Flyway Community Edition 8.0.3 by Redgate 18 nov 2021 11:43:30 AM org.flywaydb.core.internal.database.base.BaseDatabaseType info INFO: Database: jdbc:bigquery://https://www.googleapis.com/bigquery/v2:443;ProjectId=<GCP project ID>;OAuthType=3 (Google BigQuery 2.0) Nov 18, 2021 11:43:30 AM org.flywaydb.core.internal.database.base.BaseDatabaseType warn WAARSCHUWING: Google BigQuery 2.0 ondersteunt het instellen van het schema voor de huidige sessie niet. Standaard schema zal NIET worden gewijzigd in <GCP project ID>.<a dataset ID> ! 18 Nov 2021 11:43:30 AM org.flywaydb.database.bigquery.BigQueryDatabaseType info INFO: Doe mee met de GCP BigQuery beta via https://rd.gt/3fut40f 18 nov 2021 11:43:30 AM org.flywaydb.database.bigquery.BigQueryDatabaseType info INFO:
18 nov 2021 11:43:30 AM org.flywaydb.database.bigquery.BigQueryDatabaseType info INFO: Ondervindt u prestatieproblemen tijdens het gebruik van GCP BigQuery? 18 nov 2021 11:43:30 AM org.flywaydb.database.bigquery.BigQueryDatabaseType info INFO: Ontdek hoe Flyway Teams de prestaties verbetert met batching op https://rd.gt/3CWAuTb Nov 18, 2021 11:43:30 AM org.flywaydb.database.bigquery.BigQueryDatabaseType info INFO:
18 nov 2021 11:43:48 AM org.flywaydb.database.bigquery.BigQueryDatabase info INFO: Google BigQuery databases hebben een limiet van 10 GB databasegrootte in Flyway Community Edition. U hebt 0 GB / 10 GB gebruikt Overweeg een upgrade naar Flyway Teams Edition voor onbeperkt gebruik: https://rd.gt/3CWAuTb Nov 18, 2021 11:43:51 AM org.flywaydb.core.internal.command.DbValidate info INFO: Succesvol 1 migratie gevalideerd (uitvoeringstijd 00:02.091s) 18 nov 2021 11:44:03 AM org.flywaydb.core.internal.schemahistory.JdbcTableSchemaHistory info INFO: Creating Schema History table `<GCP project ID>.<a dataset ID>`.`flyway_schema_history` with baseline ... Nov 18, 2021 11:44:09 AM org.flywaydb.core.internal.command.DbBaseline info INFO: Succesvol gebaseerd schema met versie: 1 18 nov 2021 11:44:19 AM org.flywaydb.core.internal.command.DbMigrate info INFO: Huidige versie van schema `<GCP project ID>.<a dataset ID>`: 1 Nov 18, 2021 11:44:19 AM org.flywaydb.core.internal.command.DbMigrate info INFO: Migratie van schema `<GCP project ID>.test_dataset` naar versie "1.0001 - Test migratie" [niet-transactioneel] 18 nov 2021 11:44:47 AM org.flywaydb.core.internal.command.DbMigrate info INFO: Succesvol 1 migratie toegepast op schema `<GCP project ID>.<a dataset ID>`, nu op versie v1.0001 (uitvoeringstijd 00:37.604s)
Bewijs van concept 2
Na de vorige POC hebben we nu een nieuw probleem om op te lossen: deze code werkend krijgen in een Google Dataflow-project. Geïnspireerd door Spring Boot, dat Flyway migraties uitvoert tijdens het opstarten van de applicatie, moest ik iets vinden in Beam/Dataflow dat vergelijkbaar is en ons toestaat om willekeurige code uit te voeren tijdens het opstarten.
Een eerste optie die ik ontdekte en onderzocht was een aangepaste DataflowRunnerHooks implementatie. Terwijl ik dit uitprobeerde, ontdekte ik al snel dat het moment waarop dit wordt getriggerd helemaal verkeerd is voor wat we willen bereiken, omdat het al wordt uitgevoerd tijdens het bouwen van de Dataflow code met behulp van het mvn compile exec:java commando. Omdat we een gemeenschappelijk Dataflow artefact bouwen dat wordt uitgerold naar alle omgevingen en wordt geïnjecteerd met runtime variabelen, bereikt het triggeren van onze aangepaste Flyway code op dit moment niet wat we willen.
Dus na wat meer rondgekeken te hebben vond ik de JvmInitializer interface. Dit zag er meteen veelbelovender uit en een snelle implementatie liet zien dat het inderdaad bruikbaar was, maar dat het wel een aantal eigenaardigheden heeft die we in meer detail zullen behandelen in de lessons learned sectie.
package be.planetsizebrain; import com.simba.googlebigquery.jdbc42.DataSource; import com.simba.googlebigquery.support.exceptions.ErrorException; import org.apache.beam.runners.dataflow.options.DataflowWorkerHarnessOptions; import org.apache.beam.sdk.harness.JvmInitializer; import org.apache.beam.sdk.options.PipelineOptions; import org.flywaydb.core.Flyway; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class FlywayJvmInitializer implementeert JvmInitializer { private final Logger LOGGER = LoggerFactory.getLogger(FlywayJvmInitializer.class); @Override public void beforeProcessing(PipelineOptions options) { LOGGER.info("Flyway JVM initializer wordt uitgevoerd..."); try { DataflowWorkerHarnessOptions harnessOptions = options.as(DataflowWorkerHarnessOptions.class); executeFlyway(harnessOptions); } catch (Exception e) { LOGGER.error("Flyway migrations failed!", e); throw new RuntimeException("Unexpected problem during beforeProcessing phase of JVM initializer", e); } finally { LOGGER.info("Finished running flyway JVM initializer."); } private void executeFlyway(DataflowWorkerHarnessOptions harnessOptions) throws ErrorException { Flyway flyway = initializeFlywayClient(harnessOptions); LOGGER.info("Flyway-migraties uitgevoerd..."); flyway.migrate(); LOGGER.info("Flyway-migraties voltooid"); } private Flyway initializeFlywayClient(DataflowWorkerHarnessOptions harnessOptions) throws ErrorException {DataSource dataSource = createBigQueryDataSource(harnessOptions); return Flyway.configure() .createSchemas(false) .defaultSchema("FLYWAY") .schemas("FLYWAY") .dataSource(dataSource) .failOnMissingLocations(true) .locations("classpath:db/migration") .ignoreFutureMigrations(true) .load(); } private DataSource createBigQueryDataSource(DataflowWorkerHarnessOptions) throws ErrorException { DataSource dataSource = nieuwe DataSource(); dataSource.setURL("jdbc:bigquery://https://www.googleapis.com/bigquery/v2:443;ProjectId=" + options.getProject() + ";OAuthType=3"); dataSource.setTimeout(180); dataSource.setLogLevel("INFO"); dataSource.setMaxResults(10000); return dataSource; } }.
Als je deze code toevoegt aan een Dataflow project, is er nog één ding nodig om het echt te laten werken. Het JvmInitializer systeem werkt via het Java Service Provider Interface mechanisme. Dit betekent dat we een bestand genaamd org.apache.beam.sdk.harness.JvmInitializer moeten maken in src/main/resources/META-INF/services dat de FQCN van onze JvmInitializer implementatie bevat.
Bij het uitvoeren van een Dataflow-pijplijn zien we de volgende logging (hier weer met de uitvoer van een recentere poging met de Flyway-versie die Beta-ondersteuning heeft voor BigQuery):
2021-11-08 11:26:50.593 CET "Loading pipeline options from /var/opt/google/dataflow/pipeline_options.json" 2021-11-08 11:26:50.624 CET "Worker harness starting with: {...}" 2021-11-08 11:26:53.565 CET "Uitvoeren flyway JVM initializer..." 2021-11-08 11:26:54.111 CET "Uitvoeren flyway migraties..." 2021-11-08 11:26:54.322 CET "[Simba][JDSI](20260) Kan geen toegang krijgen tot bestand om te gebruiken voor logging: CANNOT_CREATE_LOGGING_PATH. Overschakelen naar standaard logboekuitvoer." 2021-11-08 11:26:54.325 CET "08 nov 10:26:54.323 1 com.simba.googlebigquery.jdbc.common.BaseConnectionFactory.getConnection: SDK Version: 10.1.20.1161" 2021-11-08 11:26:54.330 CET "08 Nov 10:26:54.330 1 com.simba.googlebigquery.jdbc.common.BaseConnectionFactory.getConnection: Driver Version: 01.02.19.1023" 2021-11-08 11:26:54.332 CET "08 Nov 10:26:54.332 1 com.simba.googlebigquery.jdbc.common.BaseConnectionFactory.getConnection: JVM-naam: Java HotSpot(TM) 64-Bit Server VM" 2021-11-08 11:26:54.332 CET "08 Nov 10:26:54.332 1 com.simba.googlebigquery.jdbc.common.BaseConnectionFactory.getConnection: JVM Specification Version: 1.8" 2021-11-08 11:26:54.333 CET "08 Nov 10:26:54.333 1 com.simba.googlebigquery.jdbc.common.BaseConnectionFactory.getConnection: JVM Implementation Version: 25.151-b12" 2021-11-08 11:26:54.335 CET "08 Nov 10:26:54.335 1 com.simba.googlebigquery.jdbc.common.BaseConnectionFactory.getConnection: JVM Vendor: Oracle Corporation" 2021-11-08 11:26:54.335 CET "08 Nov 10:26:54.335 1 com.simba.googlebigquery.jdbc.common.BaseConnectionFactory.getConnection: Naam besturingssysteem: Linux" 2021-11-08 11:26:54.339 CET "Nov 08 10:26:54.339 1 com.simba.googlebigquery.jdbc.common.BaseConnectionFactory.getConnection: Operating System Version: 5.4.120+" 2021-11-08 11:26:54.340 CET "08 Nov 10:26:54.340 1 com.simba.googlebigquery.jdbc.common.BaseConnectionFactory.getConnection: Operating System Architecture: amd64" 2021-11-08 11:26:54.340 CET "Nov 08 10:26:54.340 1 com.simba.googlebigquery.jdbc.common.BaseConnectionFactory.getConnection: Locale Name: en_US" 2021-11-08 11:26:54.340 CET "08 Nov 10:26:54.340 1 com.simba.googlebigquery.jdbc.common.BaseConnectionFactory.getConnection: Standaard Charset Codering: US-ASCII" 2021-11-08 11:26:54.358 CET "[Simba][JDSI](20260) Kan geen toegang krijgen tot bestand om te gebruiken voor logging: CANNOT_CREATE_LOGGING_PATH. Overschakelen naar standaard logging uitvoer." 2021-11-08 11:26:54.474 CET "SQLWarning: reason([Simba][BigQueryJDBCDriver](1000019) Ongeldige verbindingseigenschap waarde voor MetaDataFetchThreadCount, waarde overschreven naar 32.) SQLState(01S02)" 2021-11-08 11:26:57.024 CET "08 Nov 10:26:57.023 WARN 1 com.simba.googlebigquery.jdbc.common.BaseConnectionFactory.doConnect: [Simba][BigQueryJDBCDriver](1000019) Ongeldige verbindingseigenschap waarde voor MetaDataFetchThreadCount, waarde overschreven naar 32." 2021-11-08 11:26:57.032 CET "SQLWarning: reason([Simba][BigQueryJDBCDriver](1000019) Ongeldige verbindingseigenschap waarde voor MetaDataFetchThreadCount, waarde overschreven naar 32.) SQLState(01S02)" 2021-11-08 11:26:57.037 CET "Nov 08 10:26:57.037 1 com.simba.googlebigquery.jdbc.common.SConnection: Driver versie is: 01.02.19.1023" 2021-11-08 11:26:57.038 CET "08 Nov 10:26:57.038 1 com.simba.googlebigquery.jdbc.common.SConnection.SConnection: Datasource version is: 01.02.19.1023" 2021-11-08 11:26:57.144 CET "Flyway Community Edition 8.0.3 by Redgate" 2021-11-08 11:26:57.146 CET "Database: jdbc:bigquery://https://www.googleapis.com/bigquery/v2:443;ProjectId=amfori-data-acc;OAuthType=3; (Google BigQuery 2.0)" 2021-11-08 11:26:57.159 CET "Google BigQuery 2.0 ondersteunt het instellen van het schema voor de huidige sessie niet. Standaard schema wordt NIET gewijzigd in FLYWAY !" 2021-11-08 11:26:57.160 CET "Doe mee met de GCP BigQuery beta via https://rd.gt/3fut40f" 2021-11-08 11:26:57.160 CET "Ondervindt u prestatieproblemen tijdens het gebruik van GCP BigQuery?" 2021-11-08 11:26:57.163 CET "Ontdek hoe Flyway Teams de prestaties verbetert met batching op https://rd.gt/3CWAuTb" 2021-11-08 11:27:55.640 CET "Google BigQuery databases hebben een limiet van 10 GB databasegrootte in Flyway Community Edition. U hebt 0 GB / 10 GB gebruikt Overweeg een upgrade naar Flyway Teams Edition voor onbeperkt gebruik: https://rd.gt/3CWAuTb" 2021-11-08 11:27:56.285 CET "Nov 08 10:27:56.285 1 com.simba.googlebigquery.googlebigquery.dataengine.BQBufferManager.processTheFirstPage: Retrieving data using the standard API" 2021-11-08 11:27:57.240 CET "Nov 08 10:27:57.240 1 com.simba.googlebigquery.googlebigquery.dataengine.BQBufferManager.processTheFirstPage: Retrieving data using the standard API" 2021-11-08 11:27:58.463 CET "Nov 08 10:27:58.463 1 com.simba.googlebigquery.googlebigquery.dataengine.BQBufferManager.processTheFirstPage: Retrieving data using the standard API" 2021-11-08 11:27:58.612 CET "Successfully validated 68 migrations (execution time 00:02.154s)" 2021-11-08 11:28:08.063 CET "Creating Schema History table `FLYWAY`.`flyway_schema_history` ..." 2021-11-08 11:28:20.046 CET "Huidige versie van schema `FLYWAY`: << Leeg schema >>" 2021-11-08 11:28:20.049 CET "Migreren van schema `FLYWAY` naar versie "1.0001 - Test migration" [non-transactional]" 2021-11-08 11:28:21.100 CET "08 nov 10:28:21.100 1 com.simba.googlebigquery.googlebigquery.dataengine.BQBufferManager.processTheFirstPage: Retrieving data using the standard API" ... 2021-11-08 11:28:25.724 CET "Nov 08 10:28:25.724 1 com.simba.googlebigquery.googlebigquery.dataengine.BQBufferManager.processTheFirstPage: Retrieving data using the standard API" "Nov 08 10:28:45.547 1 com.simba.googlebigquery.googlebigquery.dataengine.BQBufferManager.processTheFirstPage: Retrieving data using the standard API" ... "Successfully applied 68 migrations to schema `FLYWAY`, now at version v8.0022 (execution time 27:00.604s)"
Bewijs van concept 3
Toen ik begon met het schrijven van de eigenlijke blogpost, heb ik de Flyway Github repo nog eens bekeken en ik zag een interessante nieuwe module in hun Maven multi-module project: flyway-gcp-bigquery (en ook een voor GCP Spanner). Kijkend naar Maven Central lijkt het erop dat ze ergens in juli 2021 zijn begonnen met het uitbrengen van beta versies van de BigQuery ondersteuning.
Dus besloot ik om het uit te zoeken en te kijken of ik de gevorkte PR code uit mijn codebase kon verwijderen en vervangen door deze beta versie dependency:
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> ... <properties> <bigquery-driver.version>1.2.19.1023</bigquery-driver.version> <flyway.version>8.0.3</flyway.version> </properties> .... <dependency> <groupId>org.flywaydb</groupId> <artifactId>flyway-core</artifactId> <version>${flyway.version}</version> </dependency> <dependency> <groupId>org.flywaydb</groupId> <artifactId>flyway-gcp-bigquery</artifactId> <version>${flyway.version}-beta</version> </dependency> ... </project>
Na het verwijderen van de code, het toevoegen van de bovenstaande afhankelijkheden (en het upgraden van Flyway van 7.x naar 8.x), opnieuw compileren en implementeren, kon ik nog steeds alle migraties succesvol uitvoeren tegen een lege BigQuery-omgeving.

Geleerde lessen
De Simba BigQuery driver
De driver zelf (voor zover ik kan zien de enige JDBC BigQuery driver die er is) doet wat het moet doen, maar als het op loggen aankomt is het een beetje een puinhoop. Dingen die ik heb moeten doen om de logging van de driver in Dataflow onder controle te krijgen zijn onder andere:
- Een aantal afhankelijkheden toevoegen die dingen proberen om te leiden naar SLF4J en deze gebruiken in mijn FlywayJvmInitializer constructor
- org.apache.logging.log4j:log4j-iostreams
- uk.org.lidalia:sysout-over-slf4j
- Debug het stuurprogramma om uit te vinden waarom ik nog steeds dingen op System.out kreeg
- De klasse com.simba.googlebigquery.dsi.core.impl.StreamHandler overschrijven om meer logging naar SLF4J te forceren
public FlywayJvmInitializer() { PrintWriter logWriter = IoBuilder.forLogger().setLevel(Level.INFO).buildPrintWriter(); DriverManager.setLogWriter(logWriter); SysOutOverSLF4J.sendSystemOutAndErrToSLF4J(); LogFactory.setLogCreator(new Slf4jLogCreator()); }.
pakket com.simba.googlebigquery.dsi.core.impl; import com.simba.googlebigquery.dsi.core.interfaces.ILogHandler; import org.apache.logging.log4j.Level; import org.apache.logging.log4j.io.IoBuilder; import java.io.PrintWriter; public class StreamHandler implements ILogHandler { private final PrintWriter m_logWriter; public StreamHandler() { this.m_logWriter = IoBuilder.forLogger("StreamHandler").setLevel(Level.INFO).buildPrintWriter(); } public void writeLog(String var1) throws Exception { this.m_logWriter.println(var1); this.m_logWriter.flush(); } }
Geen lokale BigQuery is vervelend
Er is geen manier om lokaal iets op je ontwikkelmachine te draaien dat gebruikt kan worden om het gedrag van BigQuery te valideren. Dus geen BigQuery docker image of emulator betekent dat om Flyway migraties te testen tegen BigQuery je eigenlijk een apart Google project nodig hebt om tegen te testen of prefixed datasets moet gebruiken in een bestaand Google project.
Vanwege bepaalde beperkingen moesten we kiezen voor de prefixed dataset aanpak, maar het is ons gelukt om het vrij transparant te laten werken door gebruik te maken van Dataflow runtime ValueProviders, de Flyway placeholder functionaliteit en een aangepaste utility die het aanmaken/verwijderen van datasets eenvoudiger maakt.
BigQuery tijdreizen is je vriend
BigQuery heeft een zeer interessante functie genaamd Time Travel, die erg handig is wanneer Flyway migraties mislukken. Vooral voor de community editie van Flyway, die geen "undo"functionaliteit heeft, is Time Travel de makkelijkste manier om je database terug te zetten naar hoe het was voor de migratie.
Ik vraag me zelfs af of je op de een of andere manier "undo" functionaliteit zou kunnen bouwen met BigQuery's Time Travel en Flyway's "callbacks"(die beschikbaar zijn in de community versie)?
Time Travel is ook handig omdat BigQuery quota heeft voor veel dingen. Het handmatig terugdraaien van wijzigingen via SQL ALTER TABLE statements bijvoorbeeld zorgt ervoor dat je hier snel tegenaan loopt.
Dataflow worker schalen geeft rare resultaten
We hadden eerst elke Dataflow pijplijn de JvmInitializer laten gebruiken om het database schema up-to-date te houden, maar merkten dat soms rijen in de Flyway history tabel dubbel waren (of meer). Het blijkt dat elke Dataflow-werker die wordt gestart door een pijplijn door de JVM-initialisatie gaat. Soms worden deze dicht genoeg bij elkaar gestart zodat migraties meerdere keren worden uitgevoerd. Normaal gesproken probeert Flyway een soort vergrendeling te gebruiken om dit op te lossen, maar in de gekloonde code was dit mechanisme niet beschikbaar voor BigQuery. Het lijkt erop dat hiervoor een soort vergrendeling beschikbaar is in de 8.x Beta, maar ik heb nog niet kunnen testen of dit werkt.
Om dit probleem op te lossen, hebben we het uitvoeren van de JvmInitializer configureerbaar gemaakt en standaard uitgeschakeld voor alle pijplijnen en hebben we een specifieke dummy Flyway-pijplijn gemaakt waarvoor we het hebben ingeschakeld en die vóór alle andere batchpijplijnen wordt uitgevoerd.
Het Flyway BigQuery migratieproces is nogal traag
Worker initialisatie duurt ongeveer 2 minuten voordat de worker daadwerkelijk dingen begint te doen en we Flyway in actie zien komen. Daarna lijkt het erop dat elk migratiebestand minstens 30 seconden nodig heeft om uit te voeren (soms meer, afhankelijk van de migratie en de tabelinhoud). Uit de logging blijkt dat dit deels komt door de manier waarop de SQL wordt uitgevoerd: een BigQuery job waarvoor je moet luisteren naar de resultaten.
Gelukkig voeren we het vanwege het vorige probleem/oplossing slechts één keer per dag uit voor één dummy-pijplijn en niet voor de rest van onze pijplijnen. Dus de enige keer dat het echt traag is, is wanneer je de volledige set migraties test en uitvoert vanaf een lege omgeving.
Je moet ook je Flyway timeout instellen op een waarde die lang genoeg is om grotere tabelmanipulaties te laten slagen en geen timeout te veroorzaken. Wij werken momenteel met een waarde van 180 seconden.
SQL- en Java-migraties mengen is perfect mogelijk
Voor alle dingen die u wilt doen met BigQuery in de context van een migratie die niet worden ondersteund door de SQL-implementatie van BigQuery, kunt u terugvallen op de Java-migraties van Flyway. In een Java-migratie kunt u eenvoudig gebruik maken van de BigQuery Java API om alles te doen wat de API u toestaat te doen.
package org.flywaydb.core.internal; import com.google.auth.Credentials; import com.google.auth.oauth2.ServiceAccountCredentials; import com.google.cloud.bigquery.*; import org.flywaydb.core.api.migration.BaseJavaMigration; import org.flywaydb.core.api.migration.Context; public class V1__Java_Based_Migration extends BaseJavaMigration { @Override public void migrate(Context context) throws Exception { Credentials credentials = ServiceAccountCredentials.getApplicationDefault(); BigQueryOptions bigQueryOptions = BigQueryOptions.newBuilder() .setProjectId("my-gcp-project-id") .setCredentials(credentials) .build(); BigQuery service = bigQueryOptions.getService(); TableId tableId = TableId.of("test", "test"); Table = service.getTable(tableId); TableId tableCopyId = TableId.of("test", "test_copy"); Job job = table.copy(tableCopyId); job.waitFor(); } @Override public Integer getChecksum() { return 1; }
Runtime configuratie in een JvmInitializer
Uiteindelijk hebben we een meer geavanceerde JvmInitializer gemaakt waarmee we Flyway-migraties/herstel/baselining aan/uit kunnen zetten, dynamische prefixing van datasets, enzovoort. Hiervoor moeten we natuurlijk Dataflow-pijplijnopties opgeven en omdat we ook een pijplijnartefact (JSON + JAR's) bouwen in een centrale emmer die wordt gebruikt om taken in meerdere omgevingen te starten, moeten deze opties runtime-opties zijn. Dit is waar we tegen een probleem aanliepen met het optiemechanisme van Dataflow, vooral als je het required/default mechanisme wilt gebruiken. Het blijkt dat dit mechanisme niet echt werkt zoals je zou verwachten en defaults lijken verloren te gaan als je geen waarde opgeeft voor een optie, maar deze probeert te openen in de JvmInitializer.
De oplossing hiervoor werd gevonden toen we de logs van de Dataflow worker bekeken. In deze logs konden we zien dat er een JSON werd gelogd die de meeste optie-informatie bevat die we nodig hebben. Deze JSON is beschikbaar onder de sdk_pipeline_options_file omgevingsvariabele op een worker. Door deze waarde te lezen en te parsen kunnen we een soort werkend custom options object krijgen. Samen met het gebruik van reflectie om naar de annotaties en hun inhoud te kijken, hebben we het goed genoeg laten werken voor onze doeleinden.

What others have also read


In deze technische blogpost wil ik het hebben over hoe je eenvoudige en flexibele ETL-gebaseerde anonimisering kunt opzetten. Waarom? Wel, ik had onlangs de gelegenheid om een klein proof of concept uit te voeren voor een klant. De klant wilde weten welke opties beschikbaar waren om interne gegevens te nemen, alle persoonlijk identificeerbare informatie (PII) te verwijderen of anonimiseren en deze op een eenvoudige manier en vorm beschikbaar te maken voor externe partijen. Na het verzamelen van verdere vereisten werd de context voor dit proof of concept als volgt gedefinieerd: Welke oplossing dan ook, het moet in staat zijn om gegevens te extraheren uit een on premise Oracle database . Het eindresultaat moet een set CSV-bestanden zijn in een Amazon S3-bucket . Tussen het ophalen van de Oracle-gegevens en het dumpen ervan in CSV-vorm op S3, moet er iets zijn dat PII-gegevens verwijdert/anonimiseert. Indien mogelijk moet de gekozen oplossing cloud native zijn. In deze 3-delige blogreeks leg ik uit hoe je eenvoudige en flexibele ETL-gebaseerde anonimisering opzet: Het onderzoek naar producten die gebruikt zouden kunnen worden om het probleem op te lossen. Controleer ook hoe geschikt ze zijn voor wat de proof of concept moet bereiken. Hoe het gekozen product gebruikt kan worden om een ETL pipeline te maken die aan de eisen voldoet. Daarnaast, hoe je een lokale Oracle database opzet in Docker die gebruikt kan worden als databron voor het data ingestion deel van het proof of concept (gewoon omdat dit zo'n PITA was om te doen). En of dit op een cloud native manier kan worden gedaan. Onderzoek Het onderzoeksdeel van het proof of concept bestaat uit 2 delen: Hoe haal je data uit een Oracle database, anonimiseer je het op de een of andere manier en sla je het op als een stel CSV bestanden in een S3 bucket aka het ETL gedeelte. Uitzoeken wat de beste manier is om de anonimisering uit te voeren. De gegevens extraheren, transformeren en opslaan Het probleem van de klant klonk meteen al opmerkelijk als iets dat je zou kunnen oplossen met een ETL-product: Extract Transform Load . Het onderzoeksgedeelte voor dit deel van het proof of concept zou zich dus concentreren op dit type product. Ik kreeg ook wat input van iemand in mijn team om eens te kijken naar singer.io , omdat dat iets was dat ze in het verleden met succes hadden gebruikt voor dit soort problemen. Als je naar de homepage van Singer kijkt, vallen een aantal dingen meteen op: Singer maakt gegevensextractie en -consolidatie mogelijk voor alle tools van je organisatie. De open-source standaard voor het schrijven van scripts die gegevens verplaatsen. Unix-geïnspireerd: Singer taps en targets zijn eenvoudige applicaties samengesteld met pipes. JSON-gebaseerd: Singer-toepassingen communiceren met JSON, waardoor ze eenvoudig te gebruiken en te implementeren zijn in elke programmeertaal. Singer is dus gewoon een specificatie, zij het geen officiële. Het is een eenvoudig, op JSON gebaseerd dataformaat en je kunt iets in dit formaat produceren (een tap in Singer terminologie) of het formaat consumeren (een target ). Je kunt deze taps en targets aan elkaar koppelen om gegevens van de ene locatie te halen en op een andere locatie op te slaan. Singer wordt standaard geleverd met een heleboel taps (meer dan 100) en targets (10). Deze taps en targets zijn geschreven in Python. Omdat het centrale punt van het systeem slechts een gegevensformaat is, is het vrij eenvoudig om er zelf een te schrijven of een bestaand formaat aan te passen. Bij het controleren van de taps zou de standaard Oracle-tap het Extract-gedeelte van ons proof of concept moeten dekken. Hetzelfde lijkt echter niet het geval te zijn voor het Load gedeelte als we kijken naar de standaard targets. Er is een CSV target , maar deze slaat de resultaten lokaal op, niet in een S3 bucket. Er is een optie om gewoon dit doel te gebruiken en de S3 upload zelf te doen nadat de ETL pijplijn is voltooid. Een andere optie zou zijn om het bestaande CSV target aan te passen en de bestandsopslag te veranderen naar S3. Even Googelen levert een door de gemeenschap gemaakt S3 CSV Singer doel op. Volgens de documentatie zou dit target precies moeten doen wat we willen. Oeps, Singer transformeert niet Met de Extract en Load delen gedekt, blijft alleen het Transform deel van de ETL pijplijn over om uit te zoeken... en dit is waar het een beetje vreemd wordt. Ook al is Singer geclassificeerd als een ETL tool, het lijkt geen ondersteuning te hebben voor het transformatie gedeelte? Toen ik hier verder naar keek, kwam ik deze onheilspellend getitelde post tegen: Why our ETL tool does not do transformations . Als ik dit lees, lijkt het erop dat ze hun JSON specificatie/gegevensformaat beschouwen als het transformatiegedeelte. Dus ze ondersteunen transformatie naar ruwe gegevens en het opslaan ervan, maar ondersteunen geen andere soorten transformaties. Dat deel mag je zelf doen nadat het ergens is opgeslagen door een Singer-doel. Het blijkt dus dat Singer meer lijkt op het EL deel van een ELT product dan op een "old school" ETL product . Op dit punt zou Singer in ieder geval voldoende moeten zijn om de gegevens uit een Oracle database te halen en in CSV-formaat in een S3 bucket te zetten. En omdat Singer vrij eenvoudig, open en uitbreidbaar is, laat ik het hier voorlopig bij. Laten we verder kijken naar de anonimiseringsopties die in deze Singer-context zouden kunnen passen. Gegevens anonimiseren Net als bij het ETL-gedeelte, kreeg ik ook voor dit gedeelte wat input die me wees op Microsoft Presidio . Op de homepage kunnen we het volgende lezen: Het biedt snelle identificatie- en anonimiseringsmodules voor privé-entiteiten in tekst en afbeeldingen , zoals creditcardnummers, namen en meer. Het faciliteert zowel volledig geautomatiseerde als semi-geautomatiseerde PII de-identificatiestromen op meerdere platforms. Aanpasbaarheid in PII-identificatie en -anonimisering. Er staan dus veel veelbelovende dingen in die me zouden kunnen helpen bij het oplossen van mijn anonimiseringsbehoeften. Bij nader onderzoek lijkt het erop dat ik dit product evalueer tijdens een grote transformatie (snap je? 😉 ) van V1 naar V2. V1 bevatte wat ETL-achtige dingen zoals het ophalen van gegevens uit bronnen (hoewel Oracle-ondersteuning in de roadmap nooit lijkt te zijn gerealiseerd ) en het opslaan van geanonimiseerde resultaten in een aantal vormen/locaties. V2 heeft deze aanpak echter volledig losgelaten en concentreert zich puur op het detecteren en vervangen van PII-gegevens. In de kern is Presidio V2 een op Python gebaseerd systeem dat bovenop een AI-model is gebouwd. Dit stelt het in staat om automatisch PII-gegevens te ontdekken in tekst en afbeeldingen en deze te vervangen volgens de regels die je definieert. Ik heb wat tests gedaan met behulp van hun online testtool en het werkt min of meer, maar voor onze specifieke context moet het zeker worden aangepast. Als we kijken naar de meegeleverde testgegevens, lijkt het erop dat het vooral eenvoudige en korte gegevens zijn, maar geen grote tekstblokken of afbeeldingen. Dit roept de vraag op: zelfs als we Presidio kunnen configureren om te doen wat we willen, slaan we misschien kleine spijkers met een grote hamer? Is Presidio te veel? Laten we hier nog eens over nadenken. Als we gemakkelijk kunnen weten en definiëren welke eenvoudige kolommen in welke tabellen moeten worden geanonimiseerd en wanneer gewoon nulling of hashing van de kolomwaarden voldoende is, dan hebben we het autodetectie deel van Presidio niet nodig. We hebben ook geen Presidio-ondersteuning nodig voor volledige tekst of afbeeldingen en we hebben ook geen fancy substitutie-ondersteuning nodig. Presidio zou een krachtige bibliotheek kunnen zijn om een automatische anonimiseringsstap te maken voor onze Singer-gebaseerde pijplijn. Het helpt ook dat Presidio gebaseerd is op Python. Maar mijn gevoel zegt dat ik misschien eerst moet proberen om een iets eenvoudigere oplossing te vinden. Ik begon te zoeken naar iets dat een eenvoudige PII-vervanging kan doen en dat werkt in een Singer tap/target context. Ik vond deze Github repository: pipelinewise-transform-field . In de documentatie staat "Transformatiecomponent tussen Singer taps en targets". Klinkt verdacht veel als het " T " deel dat Singer als een ETL miste! Verderop in de configuratiesectie lezen we zelfs: "Je moet definiëren welke kolommen door welke methode moeten worden getransformeerd en in welke conditie de transformatie moet worden toegepast." en de mogelijke transformatietypes zijn: SET-NULL : transformeert elke invoer naar NULL HASH : transformeert stringinvoer naar hash HASH-SKIP-FIRST-n : Transformeert stringinvoer naar hash waarbij de eerste n tekens worden overgeslagen, bijv. HASH-SKIP-FIRST-2 MASK-DATE : Vervangt de maand- en dagdelen van datumkolommen door 1 jan. MASK-NUMBER : Zet elke numerieke waarde om in nul. MASK-HIDDEN : verandert een willekeurige tekenreeks in 'verborgen'. Dit lijkt volledig te voldoen aan onze eenvoudige anonimiseringseisen! We kunnen zelfs zien hoe we het moeten gebruiken in de context van Singer: some-singer-tap | transform-field --config [config.json] | some-singer-target Standaard Conclusie We hebben nu alle stukjes van de puzzel voor het opzetten van eenvoudige en flexibele ETL-gebaseerde anonimisering. In de volgende blogpost laten we zien hoe ze in elkaar passen en of ze de resultaten opleveren die de klant zoekt.
Lees verder

We zijn als ACA Group officieel ISO 27001 compliant! Onze Information Security Manager Simon Vercruysse legt uit wat die certificatie precies inhoudt en wat de voordelen zijn voor jouw (toekomstige) project.
Lees verder

CloudBrew is altijd een hoogtepunt op onze kalender geweest, maar de editie van 2025 voelde anders. Misschien lag het aan de timing. Slechts een maand eerder, in november 2025, opende de Azure Belgium Central-regio eindelijk haar deuren. ACA opereert al altijd vanuit het hart van Europa, dus het live gaan van deze grote nationale mijlpaal net voor de conferentie zorgde voor een extra dosis enthousiasme.
Lees verderWant to dive deeper into this topic?
Get in touch with our experts today. They are happy to help!

Want to dive deeper into this topic?
Get in touch with our experts today. They are happy to help!

Want to dive deeper into this topic?
Get in touch with our experts today. They are happy to help!

Want to dive deeper into this topic?
Get in touch with our experts today. They are happy to help!


