12 januari 2021
Leestijd 17 min
Hoe voeg je BigQuery ondersteuning en Dataflow integratie toe aan Flyway?
<span id="hs_cos_wrapper_name" class="hs_cos_wrapper hs_cos_wrapper_meta_field hs_cos_wrapper_type_text" style="" data-hs-cos-general-type="meta_field" data-hs-cos-type="text" >Hoe voeg je BigQuery ondersteuning en Dataflow integratie toe aan Flyway?</span>
Share this via:

Flyway is een bibliotheek die bijvoorbeeld wordt gebruikt in Spring Boot om schema migratie functionaliteit te bieden. Maar... ondersteunt Flyway BigQuery? In deze blogpost werken we 3 proof-of-concepts uit om ondersteuning voor BigQuery toe te voegen aan Flyway en te integreren met Google Dataflow!

Ik ben Solution Engineer in het Data-team bij ACA Group. Ons cloudplatform van keuze is Google Cloud Platform (GCP). We gebruiken momenteel een subset van de services die beschikbaar zijn op GCP.

  • Voor batch/streaming gegevensverwerking gebruiken we Apache Beam, of meer specifiek Google Dataflow, wat Google's beheerde versie van Apache Beam is.
  • Onze database van keuze om de resultaten van onze gegevensverwerking naar te pushen is Google BigQuery.

beam logogoogle bigquery

Omdat ons bedrijf van oudsher Java gebruikt als de taal van onze keuze(Python heeft onlangs wat terrein gewonnen binnen ons bedrijf en is iets wat het Data-team gebruikt voor het schrijven van Google Cloud Functions), hebben we ervoor gekozen om onze Dataflow-pijplijnen in Java te schrijven (maar Apache Beam ondersteunt ook Python). Als bedrijf hebben we ook veel ervaring met het schrijven van bedrijfsapplicaties in Java en frameworks zoals Spring Boot. We zijn dus erg gewend om de evolutie van onze databaseschema's te automatiseren. Dit is een best practice die we graag willen behouden en willen toepassen op onze datapijplijnen in Dataflow.

Dus besloten we om op avontuur te gaan en te kijken of we iets konden vinden dat deze behoefte voor ons kon oplossen.

the hobbit meme

Onderzoek

De eerste aanpak was om eerst wat onderzoek te doen en te kijken wat Google te bieden had. Toen we aan het rondkijken waren naar informatie/tools/libraries/frameworks over schema evolutie & migratie voor BigQuery, vonden we een aantal opties waar we wat dieper op in zijn gegaan:

  • BigQuery zelf heeft automatische schema detectie en heeft ondersteuning voor schema veranderingen, maar dit zijn meestal dingen die je handmatig moet doen met behulp van de API, de bq util of SQL. Hoewel dit werkt, is er niets dat het orkestreert. Alles zou handmatig of gescript moeten gebeuren.
  • bq-schema: een tool die grotendeels doet wat we willen, maar die als nadeel heeft dat het in Python geschreven is en moeilijk te integreren zou zijn, tenzij we ook onze Apache Beam/Dataflow pipelines zouden overschakelen naar Python. Dat is een mogelijkheid die ik niet meteen afschrijf, maar die we moeten bewaren voor later als we geen andere goede oplossingen vinden.
  • bigquery_migration: een tool die lijkt op de bovenstaande van Python, maar die minder van onze vereisten dekt en omdat het in Ruby is gemaakt, nog moeilijker te integreren zou zijn.
  • BQconvert: alleen nuttig voor het migreren van een bestaand databaseschema naar BigQuery en dus helemaal niet geschikt voor wat wij willen bereiken.
  • Dataflow BigQuery Schema Evolution & Automated Schema Evolution for BigQuery: meer een verzameling ideeën en inspiraties dan een daadwerkelijke oplossing voor onze behoeften.
  • Schema-evolutie in streaming Dataflow jobs en BigQuery tabellen: fascinerende serie blogposts, maar ik kon er geen daadwerkelijke code-repository voor vinden.

Hoewel sommige van deze opties voldoen aan sommige of de meeste van onze vereisten, was er niet één die echt een ideale match was.

Een paar van deze opties werden ook genoemd in een Stackoverflow post die ook een verwijzing naar Flyway bevatte... en de term Flyway doet een belletje rinkelen!

Flyway is een bibliotheek die bijvoorbeeld wordt gebruikt in Spring Boot om functionaliteit voor schema-migratie te bieden en die op basis van eerdere ervaringen in theorie aan al onze eisen zou moeten voldoen. Blijft er nog één grote vraag over: heeft Flyway ondersteuning voor BigQuery?

Op het moment dat ik me begon te verdiepen in het hele Dataflow/BigQuery schema migratie vraagstuk, was er nog geen officiële Flyway BigQuery ondersteuning. Inmiddels is er niet-gecertificeerde bèta-ondersteuning toegevoegd. Via de eerder genoemde Stackoverflow post vond ik echter wel een issue in de Flyway GitHub repository over het toevoegen van BigQuery ondersteuning aan Flyway. In dat probleem vond ik een verwijzing naar een branch in een forked repository die een soort BigQuery ondersteuning aan Flyway zou moeten toevoegen.

We waren al bekend met Flyway door onze Spring Boot ervaring, en we hebben wat Flyway code gevonden die BigQuery ondersteuning aan Flyway zou kunnen toevoegen. Tijd om wat proof of concepts te doen, die hopelijk een hoop vragen zullen beantwoorden:

  • Kunnen we de BigQuery Flyway branch laten werken?
  • Als Flyway correct werkt met BigQuery
    • Kunnen we alle noodzakelijke migraties uitvoeren, zoals het maken van een tabel, het toevoegen van gegevens aan een tabel, het wijzigen van een tabel schema, etc...?
    • Kunnen we dit allemaal integreren in Dataflow?
  • Als het in Dataflow wordt geïntegreerd, voert het dan alle testmigraties correct uit?

Proefconcept 1

De eerste proof of concept was om de code van de forked repo as-is te nemen, deze te klonen en te proberen een eenvoudige migratie te laten werken tegen een BigQuery tabel in een dataset van een GCP testproject.

Er zijn 3 manieren om Flyway uit te voeren/te gebruiken:

  • Opdrachtregel
  • Maven/Gradle plugin
  • Java API

Omdat we ondersteuning voor Flyway willen integreren in onze Java-gebaseerde Dataflow pipelines en ook omdat onze Jenkins/Terraform-gebaseerde implementatie op dit moment niet goed geschikt is voor de command line of Maven/Gradle opties, hebben we eerst gekeken naar het gewoon aanroepen van de Flyway API. Dit werd gedaan door gewoon een eenvoudige Java klasse toe te voegen aan de gekloonde repository branch en een hoofdmethode toe te voegen. In deze hoofdmethode moesten we een paar dingen doen:

  • Creëer een JDBC datasource die in staat is om verbinding te maken met BigQuery in een gegeven GCP project.
  • Configureer Flyway om deze datasource te gebruiken.
  • Voer het Flyway migrate commando uit en kijk of het onze SQL migratie vindt en uitvoert.

Dus het eerste wat we moeten instellen voor een databron is een BigQuery JDBC driver. Gelukkig wordt dit behandeld in de Google BigQuery documentatie. Op deze pagina staat een link naar een gratis download van de Google BigQuery Simba Data Connector, gemaakt door Magnitude. Als je de driver van deze pagina downloadt, krijg je een ZIP-bestand dat het eigenlijke JDBC driver JAR-bestand bevat, GoogleBigQueryJDBC42.jar, maar ook alle afhankelijkheden.

In mijn geval heb ik alleen deze driver JAR toegevoegd aan de Maven repository van ons bedrijf, omdat de meeste andere driver afhankelijkheden al beschikbaar zijn in publieke Maven repositories. Het is een hele klus om ze allemaal te controleren en de ontbrekende of degene met verschillende versies te uploaden.

Voor deze eerste POC was het voldoende om de volgende afhankelijkheden toe te voegen aan de pom.xml van het project dat we hebben gekloond (de versies zijn slechts indicatief voor toen ik het testte, maar kunnen worden vervangen door nieuwere versies):

  • com.simba:bigquery-driver:1.2.4.1007
  • com.google.cloud:google-cloud-bigquery:1.132.1
  • com.google.cloud:google-cloud-bigquerystorage:1.21.1
  • org.apache.avro:avro:1.8.2

Met deze afhankelijkheden op hun plaats kunnen we de onderstaande code laten werken als je de omgevingsvariabele GOOGLE_APPLICATION_CREDENTIALS instelt en deze naar een JSON-bestand met servicerekeningreferenties wijst (wat nodig is om de OAuthType=3 authentiseringsmodus te laten werken) en de plaatshouders <GCP project ID> en <a dataset ID> vervangt.

package org.flywaydb.core; import com.simba.googlebigquery.jdbc42.DataSource; import org.flywaydb.core.Flyway; public class BQTest { public static void main(String[] args) { DataSource dataSource = nieuwe DataSource(); dataSource.setURL("jdbc:bigquery://https://www.googleapis.com/bigquery/v2:443;ProjectId=<GCP project ID>;OAuthType=3"); Flyway flyway = Flyway.configure() .createSchemas(false) .defaultSchema("<GCP project ID>.<a dataset ID>") .schemas("<GCP project ID>.<a dataset ID>") .dataSource(dataSource) .baselineOnMigrate(true) .load(); flyway.migrate(); } }.
Standaard

Ik heb toen ook een SQL-migratiebestand toegevoegd aan mijn src/main/resources/db/migration directory en de code uitgevoerd en tot mijn verbazing probeerde Flyway te praten met mijn BigQuery. Er was echter een klein probleem met de gekloonde Flyway BigQuery code dat opgelost moest worden. Het INSERT statement, in de BigQueryDatabase#getInsertStatement method, dat Flyway gebruikt om migraties toe te voegen aan zijn flyway_schema_history tabel mislukte om 2 redenen:

  • Type problemen met INT64 kolommen die opgelost konden worden met expliciete casting: CAST(? ALS INT64).
  • Ik moest een expliciete handmatige standaard opgeven, CURRENT_TIMESTAMP(), voor tijdstempelkolommen.

Na het repareren van het INSERT statement, kon ik zien dat Flyway correct werkte met BigQuery en verifiëren dat het alle migratieacties kon uitvoeren die we hadden gedefinieerd. Het lukte me zelfs om gemengde SQL & Java migraties te laten werken (met behulp van de Java BigQuery API om dingen te doen die niet in SQL kunnen worden uitgedrukt). Er was slechts 1 verrassing: gegevens toevoegen aan een tabel kan niet in hetzelfde SQL-bestand waarin je de tabel aanmaakt. Dit soort acties kunnen niet worden gecombineerd in hetzelfde bestand.

De onderstaande uitvoer is vergelijkbaar met wat ik kreeg, maar is van een recentere poging met de huidige Flyway 8.x die BigQuery Beta ondersteunt:

Nov 18, 2021 11:43:30 AM org.flywaydb.core.internal.license.VersionPrinter info INFO: Flyway Community Edition 8.0.3 by Redgate 18 nov 2021 11:43:30 AM org.flywaydb.core.internal.database.base.BaseDatabaseType info INFO: Database: jdbc:bigquery://https://www.googleapis.com/bigquery/v2:443;ProjectId=<GCP project ID>;OAuthType=3 (Google BigQuery 2.0) Nov 18, 2021 11:43:30 AM org.flywaydb.core.internal.database.base.BaseDatabaseType warn WAARSCHUWING: Google BigQuery 2.0 ondersteunt het instellen van het schema voor de huidige sessie niet. Standaard schema zal NIET worden gewijzigd in <GCP project ID>.<a dataset ID> ! 18 Nov 2021 11:43:30 AM org.flywaydb.database.bigquery.BigQueryDatabaseType info INFO: Doe mee met de GCP BigQuery beta via https://rd.gt/3fut40f 18 nov 2021 11:43:30 AM org.flywaydb.database.bigquery.BigQueryDatabaseType info INFO: 
18 nov 2021 11:43:30 AM org.flywaydb.database.bigquery.BigQueryDatabaseType info INFO: Ondervindt u prestatieproblemen tijdens het gebruik van GCP BigQuery? 18 nov 2021 11:43:30 AM org.flywaydb.database.bigquery.BigQueryDatabaseType info INFO: Ontdek hoe Flyway Teams de prestaties verbetert met batching op https://rd.gt/3CWAuTb Nov 18, 2021 11:43:30 AM org.flywaydb.database.bigquery.BigQueryDatabaseType info INFO: 
18 nov 2021 11:43:48 AM org.flywaydb.database.bigquery.BigQueryDatabase info INFO: Google BigQuery databases hebben een limiet van 10 GB databasegrootte in Flyway Community Edition. U hebt 0 GB / 10 GB gebruikt Overweeg een upgrade naar Flyway Teams Edition voor onbeperkt gebruik: https://rd.gt/3CWAuTb Nov 18, 2021 11:43:51 AM org.flywaydb.core.internal.command.DbValidate info INFO: Succesvol 1 migratie gevalideerd (uitvoeringstijd 00:02.091s) 18 nov 2021 11:44:03 AM org.flywaydb.core.internal.schemahistory.JdbcTableSchemaHistory info INFO: Creating Schema History table `<GCP project ID>.<a dataset ID>`.`flyway_schema_history` with baseline ... Nov 18, 2021 11:44:09 AM org.flywaydb.core.internal.command.DbBaseline info INFO: Succesvol gebaseerd schema met versie: 1 18 nov 2021 11:44:19 AM org.flywaydb.core.internal.command.DbMigrate info INFO: Huidige versie van schema `<GCP project ID>.<a dataset ID>`: 1 Nov 18, 2021 11:44:19 AM org.flywaydb.core.internal.command.DbMigrate info INFO: Migratie van schema `<GCP project ID>.test_dataset` naar versie "1.0001 - Test migratie" [niet-transactioneel] 18 nov 2021 11:44:47 AM org.flywaydb.core.internal.command.DbMigrate info INFO: Succesvol 1 migratie toegepast op schema `<GCP project ID>.<a dataset ID>`, nu op versie v1.0001 (uitvoeringstijd 00:37.604s)
Standaard

Bewijs van concept 2

Na de vorige POC hebben we nu een nieuw probleem om op te lossen: deze code werkend krijgen in een Google Dataflow-project. Geïnspireerd door Spring Boot, dat Flyway migraties uitvoert tijdens het opstarten van de applicatie, moest ik iets vinden in Beam/Dataflow dat vergelijkbaar is en ons toestaat om willekeurige code uit te voeren tijdens het opstarten.

Een eerste optie die ik ontdekte en onderzocht was een aangepaste DataflowRunnerHooks implementatie. Terwijl ik dit uitprobeerde, ontdekte ik al snel dat het moment waarop dit wordt getriggerd helemaal verkeerd is voor wat we willen bereiken, omdat het al wordt uitgevoerd tijdens het bouwen van de Dataflow code met behulp van het mvn compile exec:java commando. Omdat we een gemeenschappelijk Dataflow artefact bouwen dat wordt uitgerold naar alle omgevingen en wordt geïnjecteerd met runtime variabelen, bereikt het triggeren van onze aangepaste Flyway code op dit moment niet wat we willen.

Dus na wat meer rondgekeken te hebben vond ik de JvmInitializer interface. Dit zag er meteen veelbelovender uit en een snelle implementatie liet zien dat het inderdaad bruikbaar was, maar dat het wel een aantal eigenaardigheden heeft die we in meer detail zullen behandelen in de lessons learned sectie.

package be.planetsizebrain; import com.simba.googlebigquery.jdbc42.DataSource; import com.simba.googlebigquery.support.exceptions.ErrorException; import org.apache.beam.runners.dataflow.options.DataflowWorkerHarnessOptions; import org.apache.beam.sdk.harness.JvmInitializer; import org.apache.beam.sdk.options.PipelineOptions; import org.flywaydb.core.Flyway; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class FlywayJvmInitializer implementeert JvmInitializer { private final Logger LOGGER = LoggerFactory.getLogger(FlywayJvmInitializer.class); @Override public void beforeProcessing(PipelineOptions options) { LOGGER.info("Flyway JVM initializer wordt uitgevoerd..."); try { DataflowWorkerHarnessOptions harnessOptions = options.as(DataflowWorkerHarnessOptions.class); executeFlyway(harnessOptions); } catch (Exception e) { LOGGER.error("Flyway migrations failed!", e); throw new RuntimeException("Unexpected problem during beforeProcessing phase of JVM initializer", e); } finally { LOGGER.info("Finished running flyway JVM initializer."); } private void executeFlyway(DataflowWorkerHarnessOptions harnessOptions) throws ErrorException { Flyway flyway = initializeFlywayClient(harnessOptions); LOGGER.info("Flyway-migraties uitgevoerd..."); flyway.migrate(); LOGGER.info("Flyway-migraties voltooid"); } private Flyway initializeFlywayClient(DataflowWorkerHarnessOptions harnessOptions) throws ErrorException {DataSource dataSource = createBigQueryDataSource(harnessOptions); return Flyway.configure() .createSchemas(false) .defaultSchema("FLYWAY") .schemas("FLYWAY") .dataSource(dataSource) .failOnMissingLocations(true) .locations("classpath:db/migration") .ignoreFutureMigrations(true) .load(); } private DataSource createBigQueryDataSource(DataflowWorkerHarnessOptions) throws ErrorException { DataSource dataSource = nieuwe DataSource(); dataSource.setURL("jdbc:bigquery://https://www.googleapis.com/bigquery/v2:443;ProjectId=" + options.getProject() + ";OAuthType=3"); dataSource.setTimeout(180); dataSource.setLogLevel("INFO"); dataSource.setMaxResults(10000); return dataSource; } }.
Standaard

Als je deze code toevoegt aan een Dataflow project, is er nog één ding nodig om het echt te laten werken. Het JvmInitializer systeem werkt via het Java Service Provider Interface mechanisme. Dit betekent dat we een bestand genaamd org.apache.beam.sdk.harness.JvmInitializer moeten maken in src/main/resources/META-INF/services dat de FQCN van onze JvmInitializer implementatie bevat.

Bij het uitvoeren van een Dataflow-pijplijn zien we de volgende logging (hier weer met de uitvoer van een recentere poging met de Flyway-versie die Beta-ondersteuning heeft voor BigQuery):

2021-11-08 11:26:50.593 CET "Loading pipeline options from /var/opt/google/dataflow/pipeline_options.json" 2021-11-08 11:26:50.624 CET "Worker harness starting with: {...}" 2021-11-08 11:26:53.565 CET "Uitvoeren flyway JVM initializer..." 2021-11-08 11:26:54.111 CET "Uitvoeren flyway migraties..." 2021-11-08 11:26:54.322 CET "[Simba][JDSI](20260) Kan geen toegang krijgen tot bestand om te gebruiken voor logging: CANNOT_CREATE_LOGGING_PATH. Overschakelen naar standaard logboekuitvoer." 2021-11-08 11:26:54.325 CET "08 nov 10:26:54.323 1 com.simba.googlebigquery.jdbc.common.BaseConnectionFactory.getConnection: SDK Version: 10.1.20.1161" 2021-11-08 11:26:54.330 CET "08 Nov 10:26:54.330 1 com.simba.googlebigquery.jdbc.common.BaseConnectionFactory.getConnection: Driver Version: 01.02.19.1023" 2021-11-08 11:26:54.332 CET "08 Nov 10:26:54.332 1 com.simba.googlebigquery.jdbc.common.BaseConnectionFactory.getConnection: JVM-naam: Java HotSpot(TM) 64-Bit Server VM" 2021-11-08 11:26:54.332 CET "08 Nov 10:26:54.332 1 com.simba.googlebigquery.jdbc.common.BaseConnectionFactory.getConnection: JVM Specification Version: 1.8" 2021-11-08 11:26:54.333 CET "08 Nov 10:26:54.333 1 com.simba.googlebigquery.jdbc.common.BaseConnectionFactory.getConnection: JVM Implementation Version: 25.151-b12" 2021-11-08 11:26:54.335 CET "08 Nov 10:26:54.335 1 com.simba.googlebigquery.jdbc.common.BaseConnectionFactory.getConnection: JVM Vendor: Oracle Corporation" 2021-11-08 11:26:54.335 CET "08 Nov 10:26:54.335 1 com.simba.googlebigquery.jdbc.common.BaseConnectionFactory.getConnection: Naam besturingssysteem: Linux" 2021-11-08 11:26:54.339 CET "Nov 08 10:26:54.339 1 com.simba.googlebigquery.jdbc.common.BaseConnectionFactory.getConnection: Operating System Version: 5.4.120+" 2021-11-08 11:26:54.340 CET "08 Nov 10:26:54.340 1 com.simba.googlebigquery.jdbc.common.BaseConnectionFactory.getConnection: Operating System Architecture: amd64" 2021-11-08 11:26:54.340 CET "Nov 08 10:26:54.340 1 com.simba.googlebigquery.jdbc.common.BaseConnectionFactory.getConnection: Locale Name: en_US" 2021-11-08 11:26:54.340 CET "08 Nov 10:26:54.340 1 com.simba.googlebigquery.jdbc.common.BaseConnectionFactory.getConnection: Standaard Charset Codering: US-ASCII" 2021-11-08 11:26:54.358 CET "[Simba][JDSI](20260) Kan geen toegang krijgen tot bestand om te gebruiken voor logging: CANNOT_CREATE_LOGGING_PATH. Overschakelen naar standaard logging uitvoer." 2021-11-08 11:26:54.474 CET "SQLWarning: reason([Simba][BigQueryJDBCDriver](1000019) Ongeldige verbindingseigenschap waarde voor MetaDataFetchThreadCount, waarde overschreven naar 32.) SQLState(01S02)" 2021-11-08 11:26:57.024 CET "08 Nov 10:26:57.023 WARN 1 com.simba.googlebigquery.jdbc.common.BaseConnectionFactory.doConnect: [Simba][BigQueryJDBCDriver](1000019) Ongeldige verbindingseigenschap waarde voor MetaDataFetchThreadCount, waarde overschreven naar 32." 2021-11-08 11:26:57.032 CET "SQLWarning: reason([Simba][BigQueryJDBCDriver](1000019) Ongeldige verbindingseigenschap waarde voor MetaDataFetchThreadCount, waarde overschreven naar 32.) SQLState(01S02)" 2021-11-08 11:26:57.037 CET "Nov 08 10:26:57.037 1 com.simba.googlebigquery.jdbc.common.SConnection: Driver versie is: 01.02.19.1023" 2021-11-08 11:26:57.038 CET "08 Nov 10:26:57.038 1 com.simba.googlebigquery.jdbc.common.SConnection.SConnection: Datasource version is: 01.02.19.1023" 2021-11-08 11:26:57.144 CET "Flyway Community Edition 8.0.3 by Redgate" 2021-11-08 11:26:57.146 CET "Database: jdbc:bigquery://https://www.googleapis.com/bigquery/v2:443;ProjectId=amfori-data-acc;OAuthType=3; (Google BigQuery 2.0)" 2021-11-08 11:26:57.159 CET "Google BigQuery 2.0 ondersteunt het instellen van het schema voor de huidige sessie niet. Standaard schema wordt NIET gewijzigd in FLYWAY !" 2021-11-08 11:26:57.160 CET "Doe mee met de GCP BigQuery beta via https://rd.gt/3fut40f" 2021-11-08 11:26:57.160 CET "Ondervindt u prestatieproblemen tijdens het gebruik van GCP BigQuery?" 2021-11-08 11:26:57.163 CET "Ontdek hoe Flyway Teams de prestaties verbetert met batching op https://rd.gt/3CWAuTb" 2021-11-08 11:27:55.640 CET "Google BigQuery databases hebben een limiet van 10 GB databasegrootte in Flyway Community Edition. U hebt 0 GB / 10 GB gebruikt Overweeg een upgrade naar Flyway Teams Edition voor onbeperkt gebruik: https://rd.gt/3CWAuTb" 2021-11-08 11:27:56.285 CET "Nov 08 10:27:56.285 1 com.simba.googlebigquery.googlebigquery.dataengine.BQBufferManager.processTheFirstPage: Retrieving data using the standard API" 2021-11-08 11:27:57.240 CET "Nov 08 10:27:57.240 1 com.simba.googlebigquery.googlebigquery.dataengine.BQBufferManager.processTheFirstPage: Retrieving data using the standard API" 2021-11-08 11:27:58.463 CET "Nov 08 10:27:58.463 1 com.simba.googlebigquery.googlebigquery.dataengine.BQBufferManager.processTheFirstPage: Retrieving data using the standard API" 2021-11-08 11:27:58.612 CET "Successfully validated 68 migrations (execution time 00:02.154s)" 2021-11-08 11:28:08.063 CET "Creating Schema History table `FLYWAY`.`flyway_schema_history` ..." 2021-11-08 11:28:20.046 CET "Huidige versie van schema `FLYWAY`: << Leeg schema >>" 2021-11-08 11:28:20.049 CET "Migreren van schema `FLYWAY` naar versie "1.0001 - Test migration" [non-transactional]" 2021-11-08 11:28:21.100 CET "08 nov 10:28:21.100 1 com.simba.googlebigquery.googlebigquery.dataengine.BQBufferManager.processTheFirstPage: Retrieving data using the standard API" ... 2021-11-08 11:28:25.724 CET "Nov 08 10:28:25.724 1 com.simba.googlebigquery.googlebigquery.dataengine.BQBufferManager.processTheFirstPage: Retrieving data using the standard API" "Nov 08 10:28:45.547 1 com.simba.googlebigquery.googlebigquery.dataengine.BQBufferManager.processTheFirstPage: Retrieving data using the standard API" ... "Successfully applied 68 migrations to schema `FLYWAY`, now at version v8.0022 (execution time 27:00.604s)"
Standaard

Bewijs van concept 3

Toen ik begon met het schrijven van de eigenlijke blogpost, heb ik de Flyway Github repo nog eens bekeken en ik zag een interessante nieuwe module in hun Maven multi-module project: flyway-gcp-bigquery (en ook een voor GCP Spanner). Kijkend naar Maven Central lijkt het erop dat ze ergens in juli 2021 zijn begonnen met het uitbrengen van beta versies van de BigQuery ondersteuning.

Dus besloot ik om het uit te zoeken en te kijken of ik de gevorkte PR code uit mijn codebase kon verwijderen en vervangen door deze beta versie dependency:

<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> ... <properties> <bigquery-driver.version>1.2.19.1023</bigquery-driver.version> <flyway.version>8.0.3</flyway.version> </properties> .... <dependency> <groupId>org.flywaydb</groupId> <artifactId>flyway-core</artifactId> <version>${flyway.version}</version> </dependency> <dependency> <groupId>org.flywaydb</groupId> <artifactId>flyway-gcp-bigquery</artifactId> <version>${flyway.version}-beta</version> </dependency> ... </project>
Standaard

Na het verwijderen van de code, het toevoegen van de bovenstaande afhankelijkheden (en het upgraden van Flyway van 7.x naar 8.x), opnieuw compileren en implementeren, kon ik nog steeds alle migraties succesvol uitvoeren tegen een lege BigQuery-omgeving.

Geleerde lessen

De Simba BigQuery driver

De driver zelf (voor zover ik kan zien de enige JDBC BigQuery driver die er is) doet wat het moet doen, maar als het op loggen aankomt is het een beetje een puinhoop. Dingen die ik heb moeten doen om de logging van de driver in Dataflow onder controle te krijgen zijn onder andere:

  • Een aantal afhankelijkheden toevoegen die dingen proberen om te leiden naar SLF4J en deze gebruiken in mijn FlywayJvmInitializer constructor
    • org.apache.logging.log4j:log4j-iostreams
    • uk.org.lidalia:sysout-over-slf4j
  • Debug het stuurprogramma om uit te vinden waarom ik nog steeds dingen op System.out kreeg
  • De klasse com.simba.googlebigquery.dsi.core.impl.StreamHandler overschrijven om meer logging naar SLF4J te forceren
public FlywayJvmInitializer() { PrintWriter logWriter = IoBuilder.forLogger().setLevel(Level.INFO).buildPrintWriter(); DriverManager.setLogWriter(logWriter); SysOutOverSLF4J.sendSystemOutAndErrToSLF4J(); LogFactory.setLogCreator(new Slf4jLogCreator()); }.
Standaard
pakket com.simba.googlebigquery.dsi.core.impl; import com.simba.googlebigquery.dsi.core.interfaces.ILogHandler; import org.apache.logging.log4j.Level; import org.apache.logging.log4j.io.IoBuilder; import java.io.PrintWriter; public class StreamHandler implements ILogHandler { private final PrintWriter m_logWriter; public StreamHandler() { this.m_logWriter = IoBuilder.forLogger("StreamHandler").setLevel(Level.INFO).buildPrintWriter(); } public void writeLog(String var1) throws Exception { this.m_logWriter.println(var1); this.m_logWriter.flush(); } }
Standaard

Geen lokale BigQuery is vervelend

Er is geen manier om lokaal iets op je ontwikkelmachine te draaien dat gebruikt kan worden om het gedrag van BigQuery te valideren. Dus geen BigQuery docker image of emulator betekent dat om Flyway migraties te testen tegen BigQuery je eigenlijk een apart Google project nodig hebt om tegen te testen of prefixed datasets moet gebruiken in een bestaand Google project.

Vanwege bepaalde beperkingen moesten we kiezen voor de prefixed dataset aanpak, maar het is ons gelukt om het vrij transparant te laten werken door gebruik te maken van Dataflow runtime ValueProviders, de Flyway placeholder functionaliteit en een aangepaste utility die het aanmaken/verwijderen van datasets eenvoudiger maakt.

BigQuery tijdreizen is je vriend

BigQuery heeft een zeer interessante functie genaamd Time Travel, die erg handig is wanneer Flyway migraties mislukken. Vooral voor de community editie van Flyway, die geen "undo"functionaliteit heeft, is Time Travel de makkelijkste manier om je database terug te zetten naar hoe het was voor de migratie.

Ik vraag me zelfs af of je op de een of andere manier "undo" functionaliteit zou kunnen bouwen met BigQuery's Time Travel en Flyway's "callbacks"(die beschikbaar zijn in de community versie)?

Time Travel is ook handig omdat BigQuery quota heeft voor veel dingen. Het handmatig terugdraaien van wijzigingen via SQL ALTER TABLE statements bijvoorbeeld zorgt ervoor dat je hier snel tegenaan loopt.

Dataflow worker schalen geeft rare resultaten

We hadden eerst elke Dataflow pijplijn de JvmInitializer laten gebruiken om het database schema up-to-date te houden, maar merkten dat soms rijen in de Flyway history tabel dubbel waren (of meer). Het blijkt dat elke Dataflow-werker die wordt gestart door een pijplijn door de JVM-initialisatie gaat. Soms worden deze dicht genoeg bij elkaar gestart zodat migraties meerdere keren worden uitgevoerd. Normaal gesproken probeert Flyway een soort vergrendeling te gebruiken om dit op te lossen, maar in de gekloonde code was dit mechanisme niet beschikbaar voor BigQuery. Het lijkt erop dat hiervoor een soort vergrendeling beschikbaar is in de 8.x Beta, maar ik heb nog niet kunnen testen of dit werkt.

Om dit probleem op te lossen, hebben we het uitvoeren van de JvmInitializer configureerbaar gemaakt en standaard uitgeschakeld voor alle pijplijnen en hebben we een specifieke dummy Flyway-pijplijn gemaakt waarvoor we het hebben ingeschakeld en die vóór alle andere batchpijplijnen wordt uitgevoerd.

Het Flyway BigQuery migratieproces is nogal traag

Worker initialisatie duurt ongeveer 2 minuten voordat de worker daadwerkelijk dingen begint te doen en we Flyway in actie zien komen. Daarna lijkt het erop dat elk migratiebestand minstens 30 seconden nodig heeft om uit te voeren (soms meer, afhankelijk van de migratie en de tabelinhoud). Uit de logging blijkt dat dit deels komt door de manier waarop de SQL wordt uitgevoerd: een BigQuery job waarvoor je moet luisteren naar de resultaten.

Gelukkig voeren we het vanwege het vorige probleem/oplossing slechts één keer per dag uit voor één dummy-pijplijn en niet voor de rest van onze pijplijnen. Dus de enige keer dat het echt traag is, is wanneer je de volledige set migraties test en uitvoert vanaf een lege omgeving.

Je moet ook je Flyway timeout instellen op een waarde die lang genoeg is om grotere tabelmanipulaties te laten slagen en geen timeout te veroorzaken. Wij werken momenteel met een waarde van 180 seconden.

SQL- en Java-migraties mengen is perfect mogelijk

Voor alle dingen die u wilt doen met BigQuery in de context van een migratie die niet worden ondersteund door de SQL-implementatie van BigQuery, kunt u terugvallen op de Java-migraties van Flyway. In een Java-migratie kunt u eenvoudig gebruik maken van de BigQuery Java API om alles te doen wat de API u toestaat te doen.

package org.flywaydb.core.internal; import com.google.auth.Credentials; import com.google.auth.oauth2.ServiceAccountCredentials; import com.google.cloud.bigquery.*; import org.flywaydb.core.api.migration.BaseJavaMigration; import org.flywaydb.core.api.migration.Context; public class V1__Java_Based_Migration extends BaseJavaMigration { @Override public void migrate(Context context) throws Exception { Credentials credentials = ServiceAccountCredentials.getApplicationDefault(); BigQueryOptions bigQueryOptions = BigQueryOptions.newBuilder() .setProjectId("my-gcp-project-id") .setCredentials(credentials) .build(); BigQuery service = bigQueryOptions.getService(); TableId tableId = TableId.of("test", "test"); Table = service.getTable(tableId); TableId tableCopyId = TableId.of("test", "test_copy"); Job job = table.copy(tableCopyId); job.waitFor(); } @Override public Integer getChecksum() { return 1; }
Standaard

Runtime configuratie in een JvmInitializer

Uiteindelijk hebben we een meer geavanceerde JvmInitializer gemaakt waarmee we Flyway-migraties/herstel/baselining aan/uit kunnen zetten, dynamische prefixing van datasets, enzovoort. Hiervoor moeten we natuurlijk Dataflow-pijplijnopties opgeven en omdat we ook een pijplijnartefact (JSON + JAR's) bouwen in een centrale emmer die wordt gebruikt om taken in meerdere omgevingen te starten, moeten deze opties runtime-opties zijn. Dit is waar we tegen een probleem aanliepen met het optiemechanisme van Dataflow, vooral als je het required/default mechanisme wilt gebruiken. Het blijkt dat dit mechanisme niet echt werkt zoals je zou verwachten en defaults lijken verloren te gaan als je geen waarde opgeeft voor een optie, maar deze probeert te openen in de JvmInitializer.

De oplossing hiervoor werd gevonden toen we de logs van de Dataflow worker bekeken. In deze logs konden we zien dat er een JSON werd gelogd die de meeste optie-informatie bevat die we nodig hebben. Deze JSON is beschikbaar onder de sdk_pipeline_options_file omgevingsvariabele op een worker. Door deze waarde te lezen en te parsen kunnen we een soort werkend custom options object krijgen. Samen met het gebruik van reflectie om naar de annotaties en hun inhoud te kijken, hebben we het goed genoeg laten werken voor onze doeleinden.


Jan Eerdekens
Jan Eerdekens
Solution Engineer, ACA Group
Contact us

Want to dive deeper into this topic?

Get in touch with our experts today. They are happy to help!

ACA mug mok koffie tas
Contact us

Want to dive deeper into this topic?

Get in touch with our experts today. They are happy to help!

ACA mug mok koffie tas
Contact us

Want to dive deeper into this topic?

Get in touch with our experts today. They are happy to help!

ACA mug mok koffie tas
Contact us

Want to dive deeper into this topic?

Get in touch with our experts today. They are happy to help!

ACA mug mok koffie tas