ACA Group Blog | Inzichten over Softwareontwikkeling, UX/UI, Data & Innovatie

Simpele en flexibele op ETL gebaseerde anonimisatie instellen – deel 2

Geschreven door Jan Eerdekens | 30-apr-2021 15:30:00

In de eerste reeks van deze serieover op ETL gebaseerd anonimisatie is het onderzoeksgedeelte van het probleem dat ik probeerde op te lossen, behandeld in een proof of concept. Ik besloot Singerte gebruiken als softwareoplossing om een ETL-pijplijn te ontwikkelen. In het tweede gedeelte komen de configuratie van Singer aan bod, de geselecteerde taps en targets en de verbinding daartussen.

Als een kleine nevenzoektocht zal ik ook behandelen hoe je een Oracle database lokaal kunt draaien als de database waarmee de Singer Oracle Tap verbinding kan maken en de problemen die ik daarmee had.

Singer instellen

Het eerste wat we moeten doen om te beginnen met het maken van de ETL pijplijn is Singer installeren. Singer is eigenlijk gewoon een verzameling taps en targets. Dit is niets meer dan eenvoudige Python code die datastromen produceert in het Singer JSON spec formaat die in elkaar gepiped kunnen worden. De enigeinstallatievereiste isdus een werkendePython 3 installatieop je systeem. Voer het onderstaande commando uit om te zien of Python 3 al correct is geïnstalleerd:

python--versie
Python

Als dit werkt en een 3.x versie oplevert, dan ben je al klaar. Als dit niet werkt of een 2.x versie oplevert, dan moet je eerst Python 3 installeren. Instructies over hoe dit te doen voor jouw besturingssysteem kun je gemakkelijk vinden door te Googlen.

Oracle instellen

We gebruiken een Oracle database die we draaien als een Docker container. Ik heb heel wat tests moeten doorstaan om dit voor elkaar te krijgen, waarover jeaan het einde van deze post kunt lezen.

Voorlopig gaan we ervan uit dat er een Oracle database draait oplocalhost:1521 meteen SID genaamdOraDoc. In deze database is er een schema genaamdETL eneen gebruiker genaamdetl. In dit schema hebben we een heel eenvoudige testtabel genaamdTEST metde structuur die je hieronder ziet en een paar rijen met daadwerkelijke testgegevens:

VOORNAAM,ACHTERNAAM,PERSOON_ID,TELEFOON
Python

De "Tap" instellen

Met een werkende Python 3 installatie kunnen we nu verder met de eerste stap van onze ETL pijplijn: de invoer van Oracle database gegevens door de SingerOracle Tap.

Hoewel de taps en targets gewoon Python code zijn, gebruikt de Oracle Tap decx_Oracle bibliotheekals database driver. Deze bibliotheek heeft een native Oracle database client nodig, genaamdOracle Instant Client, om verbinding te kunnen maken met de database. Volg dezeinstallatie-instructies om de client te installeren.

Als deze client geïnstalleerd is, kunnen we verder gaan met het opzetten van de eigenlijke Oracle Tap. Het installeren ervan houdt niets meer in dan het aanmaken van eenPython Virtuele Omgeving, deze activeren en vervolgens de tap-afhankelijkheid installeren viapip. Voor de Oracle Tap kan dat met deze commando's:

python3 -mvenv ~/.virtualenvs/tap-oraclesource ~/.virtualenvs/tap-oracle/bin/activatepip3 install  tap-oracle
Python

Zodra de tap is geïnstalleerd en de virtuele omgeving is geactiveerd, is het commandotap-oracle beschikbaar:

(tap-oracle) developer@laptop .virtualenvs %  tap-oraclegebruik:  tap-oracle [-h] -cCONFIG [-sSTATE] [-pPROPERTIES] [--catalogCATALOG] [-d]  tap-oracle: error: de volgende argumenten zijn vereist: -c/--config
Python

Uit het bericht dat we krijgen als we de tap gewoon proberen te draaien, kunnen we zien dat de tap wat configuratie nodig heeft voordat hij echt kan werken. Deze configuratie is niets meer dan een eenvoudig JSON-bestand dat de informatie bevat over de databaseverbinding en enkele andere parameters om te definiëren hoe de database zal worden gesynchroniseerd. Om het eenvoudig te houden, maken we dit bestand,config.json genaamd, aan in debin directoryvan onzetap-oracle virtueleomgeving met de onderstaande inhoud:

{" host": "localhost"," port": 1521," user": "etl"," password": "admin123"," sid": "OraDoc"," filter_schemas": "ETL"," default_replication_method": "FULL_TABLE"}
Python

De eerste 5 configuratieparameters zijn verplicht en spreken voor zich, behalve misschien de laatste twee:

  • filter_schemas: een optionele parameter die een door komma's gescheiden lijst met schemanamen bevat waarmee je alleen de tabelgegevens van specifieke schema's kunt repliceren in plaats van alle schema's.
  • default_replication_method: dit definieert hoe de geselecteerde schema's zullen worden gerepliceerd. Er zijn 3 mogelijke waarden:LOG_BASED,FULL_TABLE&INCREMENTAL. Voor deze POC gebruiken weFULL_TABLE omdat we de dingen eenvoudig willen houden en alleen met een kleine testdatabase zullen werken. Voor deLOG_BASED modus moet je er ookvoor zorgen dat je Oracle database dienovereenkomstig is geconfigureerd, wat buiten het bereik valt van deze POC enzijn Docker-gebaseerde Oracle database.

Met deze configuratie zijn we klaar om de tap indiscovery mode te draaien. In deze modus gebruikt de tap het configuratiebestand om verbinding te maken met de database en de geselecteerde schema's te bevragen over de beschikbare tabellen en hun structuur. We moeten deze gegevens, genaamd de catalogus, opslaan in een ander JSON-bestand. Om dit bestand,catalog.json, aan te maken, voer je het onderstaande commando uit vanuit debin directory van detap-oracle virtuele omgeving:

(tap-oracle) developer@laptop bin %  tap-oracle -cconfig.json -d > catalog.json INFO start discovery INFO dsn: (DESCRIPTION=(ADDRESS=(PROTOCOL=TCP)(HOST=localhost)(PORT=1521))(CONNECT_DATA=(SID=OraDoc))INFO haalt rijtellingen op INFO haalt tabellen op:SELECT owner, table_name FROM all_tables WHERE owner != 'SYS' AND owner IN (:0) ['ETL']INFO ophalen views INFO ophalen kolom info INFO ophalen pk constraints
Python

Als je de tap uitvoert met de configuratie en de catalogus, zul je zien dat er niets wordt gesynchroniseerd:

(tap-oracle) developer@laptop bin %  tap-oracle -cconfig.json --catalogcatalog.json INFO Geselecteerde streams: []INFO Geen huidige_synchronisatie gevonden
Python

Om de tap daadwerkelijk iets te laten synchroniseren, bewerk je het catalogusbestand een beetje door destreamste selecteren die je wilt synchroniseren:

{" streams": [ "tap_stream_id": "ETL-TEST"," table_name": "TEST"," schema": { "properties": { "VOORNAAM": { "type": [ "null", "string" ] }, "type": "object"}," stream": "TEST"," metadata": [ { "breadcrumb": []," metagegevens": { "table-key-properties": [ "PERSON_ID" ], "schema-naam": "ETL"," database-naam": "ORADOC", "is-view": false, "row-count": 0, "selected": true }
       } }, { "breadcrumb": [ "properties", "FIRST_NAME" ], "metadata": { "sql-datatype": "VARCHAR2"," inclusie": "available", "selected-by-default": true }}, { "breadcrumb": [ "properties", "LAST_NAME" ], "metadata": { "sql-datatype": "VARCHAR2"," inclusie": "available", "selected-by-default": true }}, { "breadcrumb": [ "properties", "PERSON_ID" ], "metadata": { "sql-datatype": "NUMBER"," inclusion": "automatic", "selected-by-default": true }}, { "breadcrumb": [ "properties", "PHONE" ], "metadata": { "sql-datatype": "VARCHAR2"," inclusion": "available", "selected-by-default": true }}
Python

Als je nu hetzelfde commando opnieuw uitvoert, zie je de tap sync inhoud van de geselecteerde stream:

(tap-oracle) developer@laptop bin %  tap-oracle -cconfig.json --catalogcatalog.json INFO Geselecteerde streams: ['ETL-TEST']INFO Geen huidige_synchronisatie gevonden INFO Beginnen met synchronisatie van stream(ETL-TEST) met synchronisatiemethode(full)INFO Stream  ETL-TEST gebruikt full_table replicatie{"type": "SCHEMA", "stream": " TEST", "schema": {"properties": {"FIRST_NAME": {"type": ["null", "string"]}, "LAST_NAME": {"type": ["null", "string"]}, "PERSON_ID": {"format": "singer.decimal", "type": ["string"]}, "PHONE": {"type": ["null", "string"]}}, "type": "object"}, "key_properties": ["PERSON_ID"]}INFO dsn: (DESCRIPTION=(ADDRESS=(PROTOCOL=TCP)(HOST=localhost)(PORT=1521))(CONNECT_DATA=(SID=OraDoc)){"type": "STATE", "value": {"bladwijzers": {"ETL-TEST": {"last_replication_method": "FULL_TABLE", "version": 1618344226757}}, "currently_syncing": "ETL-TEST"}}{"type": "ACTIVATE_VERSION", "stream": "TEST", "versie":  1618344226757}INFO select SELECT "FIRST_NAME" , "LAST_NAME" , "PERSON_ID" , "PHONE" , ORA_ROWSCN FROM ETL.TEST ORDER BY ORA_ROWSCN ASC{"type": "RECORD", "stream": " TEST", "record": {"FIRST_NAME": "John", "LAST_NAME": "Doe", "PERSON_ID": "1", "PHONE": "0499010203"}, "version": 1618344226757, "time_extracted": "2021-04-13T20:03:46.757794Z"}
{"type": "RECORD", "stream": "TEST", "record": {"FIRST_NAME": "Jane", "LAST_NAME": "Doe", "PERSON_ID": "1", "PHONE": "0499040506"}, "version": 1618344226757, "time_extracted": "2021-04-13T20:03:46.757794Z"}, INFO METRIC: {"type": "teller", "metric": "record_count", "value": 2, "tags": {}}{"type": "ACTIVATE_VERSION", "stream": " TEST", "version":  1618344226757}{"type": "STATE", "value": {"bladwijzers": {"ETL-TEST": {"last_replication_method": "FULL_TABLE", "version": 1618344226757, "ORA_ROWSCN":  null}}, "currently_syncing":  null}}
Python

Dat was al een hoop werk, maar gelukkig is het moeilijkste deel achter de rug!

Het "doel" instellen

Nu we een tap hebben ingesteld om databasegegevens op te halen in het Singer spec-formaat, slaan we de transformatiestap voorlopig over en stellen we eerst een doel in waarnaar we het kunnen leiden. Op deze manier kunnen we snel deExtract enLoad delenvan de ETL pipeline controleren en al een leesbaarCSV resultaatzien in plaats van Singer spec JSON bestanden.

We gebruiken de PipelinewiseS3 CSV Target. De opzet van dit doel lijkt erg op hoe de tap eerder was opgezet. Nogmaals, je moet er een virtuele omgeving voor maken, deze activeren en de target-afhankelijkheid installeren met pip:

python3 -mvenv ~/.virtualenvs/target-s3-csvsource ~/.virtualenvs/target-s3-csv/bin/activatepip3 install  pipelinewise-target-s3-csv
Python

Deze target heeft ook een klein configuratiebestand nodig in de vorm van eenconfig.json bestand dat de naam van de emmer bevat waar we de CSV-bestanden naartoe willen sturen en de referenties die nodig zijn om toegang te krijgen tot de emmer:

{" s3_bucket": "anonymized-data-bucket"," aws_access_key_id": "your_own_aws_access_key_id_value", "aws_secret_access_key": "your_own_aws_secret_access_key_value"}
Python

Deze eenvoudige configuratie is alles wat we nodig hebben om het doel te laten werken. Het stelt ons in staat om het resultaat van de Oracle Tap naar dit doel te leiden en, afhankelijk van de geselecteerde streams, een of meer momenteel niet-geanonimiseerde CSV-bestanden in onze S3-bucket te krijgen. Als we het onderstaande commando uitvoeren vanuit debin directory van de geactiveerdetap-oracle virtuele omgeving, zal er een volledige tabelsynchronisatie plaatsvinden:

(tap-oracle) developer@laptop bin %  tap-oracle--configconfig.json --catalogcatalog.json | ~/.virtualenvs/target-s3-csv/bin/target-s3-csv--config~/.virtualenvs/target-s3-csv/bin/config.json INFO Geselecteerde streams: ['ETL-TEST']INFO Geen huidige_synchronisatie gevonden INFO Begint synchronisatie van stream(ETL-TEST) met synchronisatiemethode(full)INFO Stream  ETL-TEST gebruikt full_table replicatie INFO dsn: (DESCRIPTION=(ADDRESS=(PROTOCOL=TCP)(HOST=localhost)(PORT=1521))(CONNECT_DATA=(SID=OraDoc))time=2021-04-13 22:23:29  name=target_s3_csv level=INFO message=Attemptingto create AWS session INFO select SELECT "FIRST_NAME" , "LAST_NAME" , "PERSON_ID" , "PHONE" , ORA_ROWSCN FROM ETL.TEST ORDER BY ORA_ROWSCN ASC INFO METRIC: {"type": "teller", "metric": "record_count", "value": 2, "tags": {}time=2021-04-13 22:23:29  name=target_s3_csv level=INFO message=Uploading/var/folders/5k/86hrfqsd60b8pnh_81fqfy8h0000gn/TEST-20210413T222329.csv to bucket  anonymized-data-bucketat  TEST-20210413T222329.csv{"bookmarks": {"ETL-TEST": {"laatste_replicatie_methode": "FULL_TABLE", "version": 1618345409645, "ORA_ROWSCN":  null}}, "currently_syncing":  null}
Python

U kunt het resultaat controleren in de S3-webinterface of met de AWS-opdrachtregelclient. Voer de onderstaande commando's uit om te zien of de emmer CSV-bestanden bevat en om ze op te halen:

aws s3 ls s3://anonymized-data-bucket/ --profileyour_aws_profile_name aws s3 cp s3://anonymized-data-bucket/<een bestandsnaam uitgevoerd door het vorige  commando>.csv test.csv --profileyour_aws_profile_name
Python

Of je kunt een katachtigding doen:

aws s3 cp s3://anonymized-data-bucket/TEST-20210413T223846.csv --profileyour_aws_profile_name - | cat FIRST_NAME,LAST_NAME,PERSON_ID,PHONE John,Doe,1,0499010203Jane,Doe,2,0499040506
Python

De "transformatie" / anonimisering instellen

Nu we in staat zijn om gegevens op te halen uit onze Oracle database, selecteren we de stromen die we willen en slaan we ze op als CSV-bestanden in S3. We missen nog één kleine, maar zeer belangrijke stap:het anonimiseren van degegevens tijdens het transport tussen de tap en het doel.

We gebruiken PipelinewiseTransform Field omde transformatie / anonimisering te bereiken. Dit stukje software vertegenwoordigt in feite deT dieSinger als EL tool mist om er een echte ETL tool van te maken.

Om het transformatiedeel op te zetten, blijven we in een bekend thema. Zet gewoon een andere virtuele omgeving op, activeer die, installeer een afhankelijkheid via pip en maak een configuratiebestand.

python3 -mvenv ~/.virtualenvs/transform-fieldsource ~/.virtualenvs/transform-field/bin/activatepip install  pipelinewise-transform-field
Python

Maak eenconfig.json bestandaan in debin directory van deze virtuele omgeving. Dit bestand bevat een eenvoudige lijst met transformaties. Zo'n transformatie definieert eenvoudig het type transformatie dat moet worden toegepast op een specifiek veld in een specifieke stroom. De configuratie in het onderstaande voorbeeld heeft als resultaat dathet veldFIRST_NAME in deTEST streamwordt gehasht:

{" transformaties": [ "field_id": "FIRST_NAME"," tap_stream_name": "TEST"," type": "HASH" }}
Python

De lijst met beschikbare transformaties in de Pipelinewise Transform Field stap zijn:

  • SET-NULL: transformeert elke invoer naar NULL
  • HASH: transformeert tekenreeksinvoer naar hash
  • HASH-SKIP-FIRST-n: transformeert tekenreeksinvoer naar hash waarbij de eerste n tekens worden overgeslagen, bijvoorbeeld HASH-SKIP-FIRST-2
  • MASK-DATE: vervangt de maand- en dagdelen van datumkolommen zodat ze altijd 1st of Jan zijn
  • MASK-NUMBER: transformeert elke numerieke waarde naar nul
  • MASK-HIDDEN: transformeert elke tekenreeks naar 'verborgen'.

Hoewel deze transformaties voldoende zijn voor de anonimiseringsbehoeften van ons POC-geval, kunnen ze gemakkelijk worden gewijzigd. Je kunt zelfs aangepaste transformaties toevoegen. Bewerk hiervoor het bestandtransform.py in de maplib/python3.9/site-packages/transform_field van de huidige virtuele omgeving.

Alles samenvoegen

Met deze 3 virtuele omgevingen op hun plaats kunnen we ze allemaal samenvoegen om het eindresultaat te produceren waar we naar op zoek zijn. Als we het onderstaande commando uitvoeren, zouden we moeten zien dat de ETL pipeline het geselecteerde schema synchroniseert, specifieke delen anonimiseert en het eindresultaat als een CSV-bestand opslaat in onze S3 bucket:

(tap-oracle) developer@laptop bin %  tap-oracle--configconfig.json --catalogcatalog.json | ~/.virtualenvs/transform-field/bin/transform-field--config~/.virtualenvs/transform-field/bin/config.json | ~/.virtualenvs/target-s3-csv/bin/target-s3-csv--config~/.virtualenvs/target-s3-csv/bin/config.json INFO Geselecteerde streams: ['ETL-TEST']INFO Geen huidige_synchronisatie gevonden INFO Beginnen met synchronisatie van stream(ETL-TEST) met synchronisatiemethode(full)INFO Stream  ETL-TEST gebruikt full_table replicatie INFO dsn: (DESCRIPTION=(ADDRESS=(PROTOCOL=TCP)(HOST=localhost)(PORT=1521))(CONNECT_DATA=(SID=OraDoc))time=2021-04-13 22:38:46  name=target_s3_csv level=INFO message=Attemptingto create AWS session INFO select SELECT "FIRST_NAME" , "LAST_NAME" , "PERSON_ID" , "PHONE" , ORA_ROWSCN FROM ETL.TEST ORDER BY ORA_ROWSCN ASC INFO METRIC: {"type": "teller", "metric": "record_count", "value": 2, "tags": {}time=2021-04-13 22:38:46  name=transform_field level=INFO message=Exitingnormally time=2021-04-13 22:38:46  name=target_s3_csv level=INFO message=Uploading/var/folders/5k/86hrfqsd60b8pnh_81fqfy8h0000gn/T/TEST-20210413T223846.csv naar emmer  cjm-sandbox-anonymized-dataat  TEST-20210413T223846.csv{"bladwijzers": {"ETL-TEST": {"laatste_replicatie_methode": "FULL_TABLE", "versie": 1618346326464, "ORA_ROWSCN":  null}}, "currently_syncing":  null}
Python

Als we nu de CSV in onze emmer controleren:

aws s3 cp s3://anonymized-data-bucket/TEST-20210413T224646.csv --profileyour_aws_profile_name - | cat FIRST_NAME,LAST_NAME,PERSON_ID,PHONE fd53ef835b15485572a6e82cf470dcb41fd218ae5751ab7531c956a2a6bcd3c7,Doe,1,0499010203
500501150a08713128839ca1465bfdc8a426268d6d0a576a16a80c13929f3faa,Doe,2,0499040506
Python

Conclusie

In deze blogpost heb je gezien dat we Singer eenvoudig kunnen gebruiken om de doelen van de POC beschrevenin de vorige blogpost te bereiken. Omdat er alleen wat eenvoudige dingen zoals Python en een Oracle client voor nodig zijn, kan het op de meeste systemen en in de meeste omgevingen geïnstalleerd en gebruikt worden. Het maakt ook gebruik van enkele eenvoudige JSON-gebaseerde configuratiebestanden die gemakkelijk kunnen worden aangepast aan een heleboel gebruikssituaties. De broncode van de taps en targets is ook beschikbaar en is meestal eenvoudig te begrijpen en aan te passen waar nodig. Als je geen tap of target kunt vinden die bij je past,moet het niet al te moeilijk zijn om er zelf een te schrijven.

In de laatste aflevering van deze serie zal ik een manier laten zien om het hele systeem wat meer te vereenvoudigen en het te verpakken op een manier die meer geschikt is voor cloud native gebruik/deployment.

Zij-zoektocht: Oracle in Docker

Omdat ik geen toegang had tot een bestaande Oracle database, moest ik er zelf een draaien. Je zou denken dat dat makkelijk en snel zou zijn, maar het bleek iets ingewikkelder en behoorlijk frustrerend. Vooral vergeleken met het draaien van een PostgreSQL of MySQL/MariaDB container met Docker.

Oracle XE werdlang geleden beschikbaar gemaakt. Het is een gratis versie van een Oracle database metenkelebeperkingen, maar geen daarvan was problematisch voor mijn gebruik. Oracle XE kan worden geïnstalleerd op Windows en Linux, maar omdat ik op een Mac werk en mijn andere databases via Docker draai, wilde ik het draaien viaeen Docker image.

Om dit te doen, moest ik zelf een Oracle docker image bouwen. Er is een Githubrepository die veel Dockerfiles en scripts vanOraclebevat. In deze repository is er ook een sectie voor Oracle databases:Oracle Database container images. Je kunt deze repository bekijken, naar deOracleDatabase/SingleInstance/dockerfiles directorygaan en hetbuildContainerImage.sh scriptuitvoeren om een echte Oracle Docker database container te bouwen.

Ik wilde een Oracle 12 opzetten die overeenkomt met de installatie van de klant, maar daarvoor moest ik eerst wat database installatiebestanden downloaden. Het lijkt er echter op dat die niet meer publiekelijk beschikbaar zijn (je kunt ze krijgen via een Oracle contract). Dus nam ik genoegen met Oracle 18.4.0 XE:

./buildContainerImage.sh -v 18.4.0 -x...duurt ongeveer 5 minuten...docker run -p 1521:1521 -p 5500:5500 -e ORACLE_PWD=admin123 oracle/database:18.4.0-xe... duurt  30+ minuten...
Python

Op mijn MacBook Pro duurt het opstarten van deze Docker-container meer dan 30 minuten. Bovendien stopt het meestal na een paar uur met werken, waardoor een herstart nodig is die weer zoveel tijd in beslag neemt. Ik heb een aantal dingen geprobeerd, zoals volumes toevoegen om alle gegevens op te slaan die tijdens de eerste start worden gegenereerd, maar dat leek niet te helpen. Het gebruikt ook veel bronnen op mijn machine.

Zelfs nadat de container draaide, was het verbinden met de database ook niet echt eenvoudig vanwege het pluggable database gedoe. Zelfs toen we verbonden waren, was het maken van een gebruiker en database specifiek voor onze POC en daarmee verbinden weer een hoop gedoe: service naam vs. SID en geen ondersteuning voor service naam in de Oracle Python bibliotheek die gebruikt wordt door de kraan.

Dus uiteindelijk, na wat meer Googlen, vond ik eenDocker repository van Oracle die een oude Oracle 12 docker container bevat. Je hebt er een gratis Oracle account voor nodig om dedocker login te kunnen doen. Er is geen volume ondersteuning dus je zult in veel gevallen je data verliezen, maar het is eenvoudig op te zetten, start consistent in een paar minuten en je kunt er verbinding mee maken via een SID. Ookhet toevoegen van een gebruiker en database was niet al te moeilijk en daarom heb ik dit uiteindelijk gebruikt in plaats van het nieuwere Github repository spul.

docker login  container-registry.oracle.com docker pull  container-registry.oracle.com/database/standard:12.1.0.2docker run -d--env-file ./env -p 1521:1521 -p 5500:5500 -it--namedockerDB --shm-size="4g"  container-registry.oracle.com/database/standard:12.1.0.2
Python