30 APR. 2021
In de eerste reeks van deze serie over op ETL gebaseerd anonimisatie is het onderzoeksgedeelte van het probleem dat ik probeerde op te lossen, behandeld in een proof of concept. Ik besloot Sin g er te gebruiken als softwareoplossing om een ETL-pijplijn te ontwikkelen. In het tweede gedeelte komen de configuratie van Singer aan bod, de geselecteerde taps en targets en de verbinding daartussen. Als een kleine nevenzoektocht zal ik ook behandelen hoe je een Oracle database lokaal kunt draaien als de database waarmee de Singer Oracle Tap verbinding kan maken en de problemen die ik daarmee had. Singer instellen Het eerste wat we moeten doen om te beginnen met het maken van de ETL pijplijn is Singer installeren. Singer is eigenlijk gewoon een verzameling taps en targets. Dit is niets meer dan eenvoudige Python code die datastromen produceert in het Singer JSON spec formaat die in elkaar gepiped kunnen worden. De enige installatievereiste is dus een werkende Python 3 installatie op je systeem. Voer het onderstaande commando uit om te zien of Python 3 al correct is geïnstalleerd: python --versie Python Als dit werkt en een 3.x versie oplevert, dan ben je al klaar. Als dit niet werkt of een 2.x versie oplevert, dan moet je eerst Python 3 installeren. Instructies over hoe dit te doen voor jouw besturingssysteem kun je gemakkelijk vinden door te Googlen. Oracle instellen We gebruiken een Oracle database die we draaien als een Docker container. Ik heb heel wat tests moeten doorstaan om dit voor elkaar te krijgen, waarover je aan het einde van deze post kunt lezen. Voorlopig gaan we ervan uit dat er een Oracle database draait op localhost:1521 met een SID genaamd OraDoc . In deze database is er een schema genaamd ETL en een gebruiker genaamd etl . In dit schema hebben we een heel eenvoudige testtabel genaamd TEST met de structuur die je hieronder ziet en een paar rijen met daadwerkelijke testgegevens: VOORNAAM , ACHTERNAAM , PERSOON_ID , TELEFOON Python De "Tap" instellen Met een werkende Python 3 installatie kunnen we nu verder met de eerste stap van onze ETL pijplijn: de invoer van Oracle database gegevens door de Singer Oracle Tap . Hoewel de taps en targets gewoon Python code zijn, gebruikt de Oracle Tap de cx_Oracle bibliotheek als database driver. Deze bibliotheek heeft een native Oracle database client nodig, genaamd Oracle Instant Client , om verbinding te kunnen maken met de database. Volg deze installatie-instructies om de client te installeren. Als deze client geïnstalleerd is, kunnen we verder gaan met het opzetten van de eigenlijke Oracle Tap. Het installeren ervan houdt niets meer in dan het aanmaken van een Python Virtuele Omgeving , deze activeren en vervolgens de tap-afhankelijkheid installeren via pip . Voor de Oracle Tap kan dat met deze commando's: python3 -m venv ~/ . virtualenvs/tap-oracle source ~/ . virtualenvs/tap-oracle/bin/activate pip3 install tap-oracle Python Zodra de tap is geïnstalleerd en de virtuele omgeving is geactiveerd, is het commando tap-oracle beschikbaar: ( tap-oracle ) developer@laptop . virtualenvs % tap-oracle gebruik : tap-oracle [ -h ] -c CONFIG [ -s STATE ] [ -p PROPERTIES ] [ --catalog CATALOG ] [ -d ] tap-oracle : error : de volgende argumenten zijn vereist : -c/--config Python Uit het bericht dat we krijgen als we de tap gewoon proberen te draaien, kunnen we zien dat de tap wat configuratie nodig heeft voordat hij echt kan werken. Deze configuratie is niets meer dan een eenvoudig JSON-bestand dat de informatie bevat over de databaseverbinding en enkele andere parameters om te definiëren hoe de database zal worden gesynchroniseerd. Om het eenvoudig te houden, maken we dit bestand, config.json genaamd, aan in de bin directory van onze tap-oracle virtuele omgeving met de onderstaande inhoud: { " host" : "localhost" , " port" : 1521 , " user" : "etl" , " password" : "admin123" , " sid" : "OraDoc" , " filter_schemas" : "ETL" , " default_replication_method" : "FULL_TABLE" } Python De eerste 5 configuratieparameters zijn verplicht en spreken voor zich, behalve misschien de laatste twee: filter_schemas : een optionele parameter die een door komma's gescheiden lijst met schemanamen bevat waarmee je alleen de tabelgegevens van specifieke schema's kunt repliceren in plaats van alle schema's. default_replication_method : dit definieert hoe de geselecteerde schema's zullen worden gerepliceerd. Er zijn 3 mogelijke waarden: LOG_BASED , FULL_TABLE INCREMENTAL . Voor deze POC gebruiken we FULL_TABLE omdat we de dingen eenvoudig willen houden en alleen met een kleine testdatabase zullen werken. Voor de LOG_BASED modus moet je er ook voor zorgen dat je Oracle database dienovereenkomstig is geconfigureerd , wat buiten het bereik valt van deze POC en zijn Docker-gebaseerde Oracle database . Met deze configuratie zijn we klaar om de tap in discovery mode te draaien. In deze modus gebruikt de tap het configuratiebestand om verbinding te maken met de database en de geselecteerde schema's te bevragen over de beschikbare tabellen en hun structuur. We moeten deze gegevens, genaamd de catalogus, opslaan in een ander JSON-bestand. Om dit bestand, catalog.json , aan te maken, voer je het onderstaande commando uit vanuit de bin directory van de tap-oracle virtuele omgeving: ( tap-oracle ) developer@laptop bin % tap-oracle -c config . json -d catalog . json INFO start discovery INFO dsn : ( DESCRIPTION= ( ADDRESS= ( PROTOCOL=TCP ) ( HOST=localhost ) ( PORT=1521 ) ) ( CONNECT_DATA= ( SID=OraDoc ) ) INFO haalt rijtellingen op INFO haalt tabellen op : SELECT owner , table_name FROM all_tables WHERE owner != 'SYS' AND owner IN ( : 0 ) [ 'ETL' ] INFO ophalen views INFO ophalen kolom info INFO ophalen pk constraints Python Als je de tap uitvoert met de configuratie en de catalogus, zul je zien dat er niets wordt gesynchroniseerd: ( tap-oracle ) developer@laptop bin % tap-oracle -c config . json --catalog catalog . json INFO Geselecteerde streams : [ ] INFO Geen huidige_synchronisatie gevonden Python Om de tap daadwerkelijk iets te laten synchroniseren, bewerk je het catalogusbestand een beetje door de streams te selecteren die je wilt synchroniseren: { " streams" : [ "tap_stream_id" : "ETL-TEST" , " table_name" : "TEST" , " schema" : { "properties" : { "VOORNAAM" : { "type" : [ "null" , "string" ] } , "type" : "object" } , " stream" : "TEST" , " metadata" : [ { "breadcrumb" : [ ] , " metagegevens" : { "table-key-properties" : [ "PERSON_ID" ] , "schema-naam" : "ETL" , " database-naam" : "ORADOC" , "is-view" : false , "row-count" : 0 , "selected" : true } } } , { "breadcrumb" : [ "properties" , "FIRST_NAME" ] , "metadata" : { "sql-datatype" : "VARCHAR2" , " inclusie" : "available" , "selected-by-default" : true } }, { "breadcrumb" : [ "properties" , "LAST_NAME" ] , "metadata" : { "sql-datatype" : "VARCHAR2" , " inclusie" : "available" , "selected-by-default" : true } }, { "breadcrumb" : [ "properties" , "PERSON_ID" ] , "metadata" : { "sql-datatype" : "NUMBER" , " inclusion" : "automatic" , "selected-by-default" : true } }, { "breadcrumb" : [ "properties" , "PHONE" ] , "metadata" : { "sql-datatype" : "VARCHAR2" , " inclusion" : "available" , "selected-by-default" : true } } Python Als je nu hetzelfde commando opnieuw uitvoert, zie je de tap sync inhoud van de geselecteerde stream: ( tap-oracle ) developer@laptop bin % tap-oracle -c config . json --catalog catalog . json INFO Geselecteerde streams : [ 'ETL-TEST' ] INFO Geen huidige_synchronisatie gevonden INFO Beginnen met synchronisatie van stream ( ETL-TEST ) met synchronisatiemethode ( full ) INFO Stream ETL-TEST gebruikt full_table replicatie { "type" : "SCHEMA" , "stream" : " TEST" , "schema" : { "properties" : { "FIRST_NAME" : { "type" : [ "null" , "string" ] } , "LAST_NAME" : { "type" : [ "null" , "string" ] } , "PERSON_ID" : { "format" : "singer.decimal" , "type" : [ "string" ] } , "PHONE" : { "type" : [ "null" , "string" ] }} , "type" : "object" } , "key_properties" : [ "PERSON_ID" ] } INFO dsn : ( DESCRIPTION= ( ADDRESS= ( PROTOCOL=TCP ) ( HOST=localhost ) ( PORT=1521 ) ) ( CONNECT_DATA= ( SID=OraDoc ) ) { "type" : "STATE" , "value" : { "bladwijzers" : { "ETL-TEST" : { "last_replication_method" : "FULL_TABLE" , "version" : 1618344226757}} , "currently_syncing" : "ETL-TEST" }} { "type" : "ACTIVATE_VERSION" , "stream" : "TEST" , "versie" : 1618344226757} INFO select SELECT "FIRST_NAME" , "LAST_NAME" , "PERSON_ID" , "PHONE" , ORA_ROWSCN FROM ETL . TEST ORDER BY ORA_ROWSCN ASC { "type" : "RECORD" , "stream" : " TEST" , "record" : { "FIRST_NAME" : "John" , "LAST_NAME" : "Doe" , "PERSON_ID" : "1" , "PHONE" : "0499010203" } , "version" : 1618344226757, "time_extracted" : "2021-04-13T20:03:46.757794Z" } { "type" : "RECORD" , "stream" : "TEST" , "record" : { "FIRST_NAME" : "Jane" , "LAST_NAME" : "Doe" , "PERSON_ID" : "1" , "PHONE" : "0499040506" } , "version" : 1618344226757, "time_extracted" : "2021-04-13T20:03:46.757794Z" } , INFO METRIC : { "type" : "teller" , "metric" : "record_count" , "value" : 2 , "tags" : {}} { "type" : "ACTIVATE_VERSION" , "stream" : " TEST" , "version" : 1618344226757} { "type" : "STATE" , "value" : { "bladwijzers" : { "ETL-TEST" : { "last_replication_method" : "FULL_TABLE" , "version" : 1618344226757, "ORA_ROWSCN" : null}} , "currently_syncing" : null}} Python Dat was al een hoop werk, maar gelukkig is het moeilijkste deel achter de rug! Het "doel" instellen Nu we een tap hebben ingesteld om databasegegevens op te halen in het Singer spec-formaat, slaan we de transformatiestap voorlopig over en stellen we eerst een doel in waarnaar we het kunnen leiden. Op deze manier kunnen we snel de Extract en Load delen van de ETL pipeline controleren en al een leesbaar CSV resultaat zien in plaats van Singer spec JSON bestanden. We gebruiken de Pipelinewise S3 CSV Target . De opzet van dit doel lijkt erg op hoe de tap eerder was opgezet. Nogmaals, je moet er een virtuele omgeving voor maken, deze activeren en de target-afhankelijkheid installeren met pip: python3 -m venv ~/ . virtualenvs/target-s3-csv source ~/ . virtualenvs/target-s3-csv/bin/activate pip3 install pipelinewise-target-s3-csv Python Deze target heeft ook een klein configuratiebestand nodig in de vorm van een config.json bestand dat de naam van de emmer bevat waar we de CSV-bestanden naartoe willen sturen en de referenties die nodig zijn om toegang te krijgen tot de emmer: { " s3_bucket" : "anonymized-data-bucket" , " aws_access_key_id" : "your_own_aws_access_key_id_value" , "aws_secret_access_key" : "your_own_aws_secret_access_key_value" } Python Deze eenvoudige configuratie is alles wat we nodig hebben om het doel te laten werken. Het stelt ons in staat om het resultaat van de Oracle Tap naar dit doel te leiden en, afhankelijk van de geselecteerde streams, een of meer momenteel niet-geanonimiseerde CSV-bestanden in onze S3-bucket te krijgen. Als we het onderstaande commando uitvoeren vanuit de bin directory van de geactiveerde tap-oracle virtuele omgeving, zal er een volledige tabelsynchronisatie plaatsvinden: ( tap-oracle ) developer@laptop bin % tap-oracle --config config . json --catalog catalog . json | ~/ . virtualenvs/target-s3-csv/bin/target-s3-csv --config ~/ . virtualenvs/target-s3-csv/bin/config . json INFO Geselecteerde streams : [ 'ETL-TEST' ] INFO Geen huidige_synchronisatie gevonden INFO Begint synchronisatie van stream ( ETL-TEST ) met synchronisatiemethode ( full ) INFO Stream ETL-TEST gebruikt full_table replicatie INFO dsn : ( DESCRIPTION= ( ADDRESS= ( PROTOCOL=TCP ) ( HOST=localhost ) ( PORT=1521 ) ) ( CONNECT_DATA= ( SID=OraDoc ) ) time=2021-04-13 22 : 23 : 29 name=target_s3_csv level=INFO message=Attempting to create AWS session INFO select SELECT "FIRST_NAME" , "LAST_NAME" , "PERSON_ID" , "PHONE" , ORA_ROWSCN FROM ETL . TEST ORDER BY ORA_ROWSCN ASC INFO METRIC : { "type" : "teller" , "metric" : "record_count" , "value" : 2 , "tags" : {} time=2021-04-13 22 : 23 : 29 name=target_s3_csv level=INFO message=Uploading /var/folders/5k/86hrfqsd60b8pnh_81fqfy8h0000gn/TEST-20210413T222329 . csv to bucket anonymized-data-bucket at TEST-20210413T222329 . csv { "bookmarks" : { "ETL-TEST" : { "laatste_replicatie_methode" : "FULL_TABLE" , "version" : 1618345409645, "ORA_ROWSCN" : null}} , "currently_syncing" : null} Python U kunt het resultaat controleren in de S3-webinterface of met de AWS-opdrachtregelclient. Voer de onderstaande commando's uit om te zien of de emmer CSV-bestanden bevat en om ze op te halen: aws s3 ls s3 : //anonymized-data-bucket/ --profile your_aws_profile_name aws s3 cp s3 : //anonymized-data-bucket/ lt ; een bestandsnaam uitgevoerd door het vorige commando gt ; . csv test . csv --profile your_aws_profile_name Python Of je kunt een katachtig ding doen: aws s3 cp s3 : //anonymized-data-bucket/TEST-20210413T223846 . csv --profile your_aws_profile_name - | cat FIRST_NAME , LAST_NAME , PERSON_ID , PHONE John , Doe , 1 ,0499010203 Jane , Doe , 2 ,0499040506 Python De "transformatie" / anonimisering instellen Nu we in staat zijn om gegevens op te halen uit onze Oracle database, selecteren we de stromen die we willen en slaan we ze op als CSV-bestanden in S3. We missen nog één kleine, maar zeer belangrijke stap: het anonimiseren van de gegevens tijdens het transport tussen de tap en het doel. We gebruiken Pipelinewise Transform Field om de transformatie / anonimisering te bereiken. Dit stukje software vertegenwoordigt in feite de T die Singer als EL tool mist om er een echte ETL tool van te maken. Om het transformatiedeel op te zetten, blijven we in een bekend thema. Zet gewoon een andere virtuele omgeving op, activeer die, installeer een afhankelijkheid via pip en maak een configuratiebestand. python3 -m venv ~/ . virtualenvs/transform-field source ~/ . virtualenvs/transform-field/bin/activate pip install pipelinewise-transform-field Python Maak een config.json bestand aan in de bin directory van deze virtuele omgeving. Dit bestand bevat een eenvoudige lijst met transformaties. Zo'n transformatie definieert eenvoudig het type transformatie dat moet worden toegepast op een specifiek veld in een specifieke stroom. De configuratie in het onderstaande voorbeeld heeft als resultaat dat het veld FIRST_NAME in de TEST stream wordt gehasht : { " transformaties" : [ "field_id" : "FIRST_NAME" , " tap_stream_name" : "TEST" , " type" : "HASH" } } Python De lijst met beschikbare transformaties in de Pipelinewise Transform Field stap zijn: SET-NULL : transformeert elke invoer naar NULL HASH : transformeert tekenreeksinvoer naar hash HASH-SKIP-FIRST-n : transformeert tekenreeksinvoer naar hash waarbij de eerste n tekens worden overgeslagen, bijvoorbeeld HASH-SKIP-FIRST-2 MASK-DATE : vervangt de maand- en dagdelen van datumkolommen zodat ze altijd 1st of Jan zijn MASK-NUMBER : transformeert elke numerieke waarde naar nul MASK-HIDDEN : transformeert elke tekenreeks naar 'verborgen'. Hoewel deze transformaties voldoende zijn voor de anonimiseringsbehoeften van ons POC-geval, kunnen ze gemakkelijk worden gewijzigd. Je kunt zelfs aangepaste transformaties toevoegen. Bewerk hiervoor het bestand transform.py in de map lib/python3.9/site-packages/transform_field van de huidige virtuele omgeving. Alles samenvoegen Met deze 3 virtuele omgevingen op hun plaats kunnen we ze allemaal samenvoegen om het eindresultaat te produceren waar we naar op zoek zijn. Als we het onderstaande commando uitvoeren, zouden we moeten zien dat de ETL pipeline het geselecteerde schema synchroniseert, specifieke delen anonimiseert en het eindresultaat als een CSV-bestand opslaat in onze S3 bucket: ( tap-oracle ) developer@laptop bin % tap-oracle --config config . json --catalog catalog . json | ~/ . virtualenvs/transform-field/bin/transform-field --config ~/ . virtualenvs/transform-field/bin/config . json | ~/ . virtualenvs/target-s3-csv/bin/target-s3-csv --config ~/ . virtualenvs/target-s3-csv/bin/config . json INFO Geselecteerde streams : [ 'ETL-TEST' ] INFO Geen huidige_synchronisatie gevonden INFO Beginnen met synchronisatie van stream ( ETL-TEST ) met synchronisatiemethode ( full ) INFO Stream ETL-TEST gebruikt full_table replicatie INFO dsn : ( DESCRIPTION= ( ADDRESS= ( PROTOCOL=TCP ) ( HOST=localhost ) ( PORT=1521 ) ) ( CONNECT_DATA= ( SID=OraDoc ) ) time=2021-04-13 22 : 38 : 46 name=target_s3_csv level=INFO message=Attempting to create AWS session INFO select SELECT "FIRST_NAME" , "LAST_NAME" , "PERSON_ID" , "PHONE" , ORA_ROWSCN FROM ETL . TEST ORDER BY ORA_ROWSCN ASC INFO METRIC : { "type" : "teller" , "metric" : "record_count" , "value" : 2 , "tags" : {} time=2021-04-13 22 : 38 : 46 name=transform_field level=INFO message=Exiting normally time=2021-04-13 22 : 38 : 46 name=target_s3_csv level=INFO message=Uploading /var/folders/5k/86hrfqsd60b8pnh_81fqfy8h0000gn/T/TEST-20210413T223846 . csv naar emmer cjm-sandbox-anonymized-data at TEST-20210413T223846 . csv { "bladwijzers" : { "ETL-TEST" : { "laatste_replicatie_methode" : "FULL_TABLE" , "versie" : 1618346326464, "ORA_ROWSCN" : null}} , "currently_syncing" : null} Python Als we nu de CSV in onze emmer controleren: aws s3 cp s3 : //anonymized-data-bucket/TEST-20210413T224646 . csv --profile your_aws_profile_name - | cat FIRST_NAME , LAST_NAME , PERSON_ID , PHONE fd53ef835b15485572a6e82cf470dcb41fd218ae5751ab7531c956a2a6bcd3c7 , Doe , 1 ,0499010203 500501150a08713128839ca1465bfdc8a426268d6d0a576a16a80c13929f3faa , Doe , 2 ,0499040506 Python Conclusie In deze blogpost heb je gezien dat we Singer eenvoudig kunnen gebruiken om de doelen van de POC beschreven in de vorige blogpost te bereiken. Omdat er alleen wat eenvoudige dingen zoals Python en een Oracle client voor nodig zijn, kan het op de meeste systemen en in de meeste omgevingen geïnstalleerd en gebruikt worden. Het maakt ook gebruik van enkele eenvoudige JSON-gebaseerde configuratiebestanden die gemakkelijk kunnen worden aangepast aan een heleboel gebruikssituaties. De broncode van de taps en targets is ook beschikbaar en is meestal eenvoudig te begrijpen en aan te passen waar nodig. Als je geen tap of target kunt vinden die bij je past, moet het niet al te moeilijk zijn om er zelf een te schrijven . In de laatste aflevering van deze serie zal ik een manier laten zien om het hele systeem wat meer te vereenvoudigen en het te verpakken op een manier die meer geschikt is voor cloud native gebruik/deployment. Zij-zoektocht: Oracle in Docker Omdat ik geen toegang had tot een bestaande Oracle database, moest ik er zelf een draaien. Je zou denken dat dat makkelijk en snel zou zijn, maar het bleek iets ingewikkelder en behoorlijk frustrerend. Vooral vergeleken met het draaien van een PostgreSQL of MySQL/MariaDB container met Docker. Oracle XE werd lang geleden beschikbaar gemaakt. Het is een gratis versie van een Oracle database met enkele beperkingen , maar geen daarvan was problematisch voor mijn gebruik. Oracle XE kan worden geïnstalleerd op Windows en Linux, maar omdat ik op een Mac werk en mijn andere databases via Docker draai, wilde ik het draaien via een Docker image . Om dit te doen, moest ik zelf een Oracle docker image bouwen. Er is een Github repository die veel Dockerfiles en scripts van Oracle bevat . In deze repository is er ook een sectie voor Oracle databases: Oracle Database container images . Je kunt deze repository bekijken, naar de OracleDatabase/SingleInstance/dockerfiles directory gaan en het buildContainerImage.sh script uitvoeren om een echte Oracle Docker database container te bouwen. Ik wilde een Oracle 12 opzetten die overeenkomt met de installatie van de klant, maar daarvoor moest ik eerst wat database installatiebestanden downloaden. Het lijkt er echter op dat die niet meer publiekelijk beschikbaar zijn (je kunt ze krijgen via een Oracle contract). Dus nam ik genoegen met Oracle 18.4.0 XE: . /buildContainerImage . sh -v 18.4 .0 -x . . . duurt ongeveer 5 minuten . . . docker run -p 1521 : 1521 -p 5500 : 5500 -e ORACLE_PWD=admin123 oracle/database : 18.4 . 0-xe . .. duurt 30+ minuten . .. Python Op mijn MacBook Pro duurt het opstarten van deze Docker-container meer dan 30 minuten. Bovendien stopt het meestal na een paar uur met werken, waardoor een herstart nodig is die weer zoveel tijd in beslag neemt. Ik heb een aantal dingen geprobeerd, zoals volumes toevoegen om alle gegevens op te slaan die tijdens de eerste start worden gegenereerd, maar dat leek niet te helpen. Het gebruikt ook veel bronnen op mijn machine. Zelfs nadat de container draaide, was het verbinden met de database ook niet echt eenvoudig vanwege het pluggable database gedoe. Zelfs toen we verbonden waren, was het maken van een gebruiker en database specifiek voor onze POC en daarmee verbinden weer een hoop gedoe: service naam vs. SID en geen ondersteuning voor service naam in de Oracle Python bibliotheek die gebruikt wordt door de kraan. Dus uiteindelijk, na wat meer Googlen, vond ik een Docker repository van Oracle die een oude Oracle 12 docker container bevat . Je hebt er een gratis Oracle account voor nodig om de docker login te kunnen doen . Er is geen volume ondersteuning dus je zult in veel gevallen je data verliezen, maar het is eenvoudig op te zetten, start consistent in een paar minuten en je kunt er verbinding mee maken via een SID. Ook het toevoegen van een gebruiker en database was niet al te moeilijk en daarom heb ik dit uiteindelijk gebruikt in plaats van het nieuwere Github repository spul. docker login container-registry . oracle . com docker pull container-registry . oracle . com/database/standard : 12.1 .0 .2 docker run -d --env-file . /env -p 1521 : 1521 -p 5500 : 5500 -it --name dockerDB --shm-size= "4g" container-registry . oracle . com/database/standard : 12.1 .0 .2 Python
Lees verder