10 februari 2023
Leestijd 12 min
Hoe gegevens genereren en bewerken in Python
<span id="hs_cos_wrapper_name" class="hs_cos_wrapper hs_cos_wrapper_meta_field hs_cos_wrapper_type_text" style="" data-hs-cos-general-type="meta_field" data-hs-cos-type="text" >Hoe gegevens genereren en bewerken in Python</span>
Share this via:

In softwareontwikkeling zijn datatransformatie en het genereren van gegevens uit andere gegevens veelvoorkomende taken. Alle programmeertalen pakken dit op een andere manier aan, elk met hun eigen troeven en minpunten. Afhankelijk van het probleem, zijn sommige manieren meer aangewezen dan andere. In deze blog ontdek je eenvoudige maar toch krachtige methodes om gegevens te genereren en te transformeren in Python.

Voordat we een complexer geval bespreken, beginnen we met een basisvoorbeeld. Stel je voor dat we een paar winkels hebben en elke winkel heeft zijn eigen database met items die zijn toegevoegd door werknemers. Sommige velden zijn optioneel, wat betekent dat werknemers niet altijd alles invullen. Naarmate we groeien, kan het moeilijk worden om een duidelijk overzicht te krijgen van alle items in onze winkels. Daarom ontwikkelen we een Python-script dat de verschillende items uit de databases van onze winkels verzamelt in één enkele database.

from stores import store_1, store_2, store_3# In de hele code worden typehints gebruikt.items_1: Generator[Item, Geen, Geen] = store_1.get_items()items_2: Generator[Item, Geen, Geen] = store_2.get_items()items_3: Generator[Item, Geen, Geen] = store_3.get_items()

Generatoren

store_1.get_items() retourneert een generator van items. Generators zullen een belangrijke rol spelen in deze blogpost. Met generatoren kunnen we een complexe keten van transformaties opzetten over enorme hoeveelheden gegevens zonder dat we zonder geheugen komen te zitten, terwijl onze code beknopt en schoon blijft. Als je nog niet bekend bent met Python:

def a_generator(): for something in some_iterable: # do logic yield something

Twee dingen zijn hier belangrijk. Ten eerste, het aanroepen van een generator zal geen data teruggeven; het zal een iterator teruggeven. Ten tweede worden waarden op verzoek geproduceerd. Een meer diepgaande uitleg kan hier worden gevonden.

Syntax

Er zijn twee manieren om generatoren te maken. De eerste lijkt op een normale Python functie, maar heeft een yield statement in plaats van een return statement. De andere is beknopter, maar kan snel ingewikkeld worden als de logica complexer wordt. Het heet de Python generator expressie syntaxis en wordt voornamelijk gebruikt voor eenvoudigere generatoren.

# Basis generator syntaxdef generate_until(n: int)-> Generator[int, None, None]: while i > n; yield i i += 1# Generator expressie syntaxgen_until_5: Generator[int, None, None] = (i for i in range(5))

Code

Om het eenvoudig te houden, voeren we het script aan het eind van de dag één keer uit, zodat we een complete database hebben met alle items uit alle winkels.

uit stores importeer store_1, store_2, store_3uit database importeer all_items# In de hele code worden typehints gebruikt.items_1: Generator[Item, Geen, Geen] = store_1.get_items()items_2: Generator[Item, Geen, Geen] = store_2.get_items()items_3: Generator[Item, Geen, Geen] = store_3.get_items()# Laten we aannemen dat onze `add_or_update()` functie generators accepteert.
# Als een item al bestaat, wordt het bijgewerkt, anders wordt het toegevoegd aan de database.# We kunnen ze gewoon een voor een toevoegen, zoals hier.all_items.add_or_update(items_1)all_items.add_or_update(items_2)all_items.add_or_update(items_3)# De database bevat nu alle nieuwste items van alle winkels.

Voor dit gebruik is dit prima. Maar als de complexiteit groeit en er meer winkels worden toegevoegd, kan het snel onoverzichtelijk worden. Gelukkig heeft Python geweldige ingebouwde tools om onze code te vereenvoudigen.

Itertools

Eén module in Python heet itertools. Volgens de Python-documenten "standaardiseert de module een kernset van snelle, geheugenefficiënte hulpmiddelen die op zichzelf of in combinatie nuttig zijn. Samen vormen ze een "iterator algebra", waardoor het mogelijk wordt om gespecialiseerde tools beknopt en efficiënt in pure Python te bouwen."

Een geweldige functie is itertools.chain(). Deze wordt gebruikt om meerdere iterables aan elkaar te 'ketenen' alsof ze één zijn. We kunnen het gebruiken om onze generatoren aan elkaar te koppelen.

uit stores importeer store_1, store_2, store_3uit database importeer all_itemsuit itertools importeer chain# Typehints worden door de hele code gebruikt.items_1: Generator[Item, None, None] = store_1.get_items()items_2: Generator[Item, None, None] = store_2.get_items()items_3: Generator[Item, Geen, Geen] = store_3.get_items()# Met itertools.chain kunnen we de generators samenvoegen tot één.# Chain zelf is ook een generatorfunctie, dus er wordt nog geen data gegenereerd.items: Generator[Item, None, None] = chain(items_1, items_2, items_3)all_items.add_or_update(items) # <- hier worden gegevens gegenereerd# De database bevat nu alle nieuwste items van alle winkels.

Genertator functies

Laten we nu aannemen dat ons item een tupel is met vijf velden: naam, merk, leverancier, kosten en het aantal stuks in de winkel. Het heeft de volgende signatuur: tuple[str,str,str,int,int]. Als we de totale waarde van de artikelen in de winkel willen, hoeven we alleen maar het aantal artikelen met de kosten te vermenigvuldigen.

# zowel ontvangt als retourneert een generatordef calc_total_val(items: Generator)-> Generator: for item in items: # geef de eerste 3 artikelen en het product van de laatste 2 geef *item[:3], item[3]*item[4]# we kunnen dit ook schrijven als een generator expressie omdat het zo eenvoudig is((*item[:3], item[3]*item[4]) for item in items)

Nu ziet het er zo uit: tuple[str, str, str, int]. Maar we willen het uitvoeren als JSON. Daarvoor kunnen we gewoon een generator maken die een woordenboek teruggeeft en daar json.dumps() op aanroepen. Laten we aannemen dat we een iterator van dicts kunnen doorgeven aan de add_or_update() functie en dat deze automatisch json.dumps() aanroept.

# zowel ontvangt als retourneert een generatordef as_dict_item(items: Generator)-> Generator: for item in items: yield { "name": item[0], "brand": item[1], "supplier": item[2], "total_value": item[3], }

Nu we meer logica hebben, laten we eens kijken hoe we die kunnen samenvoegen. Een geweldig ding over generatoren is hoe duidelijk en beknopt het is om ze te gebruiken. We kunnen een functie maken voor elke processtap en de gegevens er doorheen laten lopen.


from stores import store_1, store_2, store_3from database import all_itemsfrom itertools import chaindef calc_total_val(items): for item in items: yield *item[:3], item[3]*item[4]def as_item_dict(items): for item in items: yield {" name": item[0]," brand": item[1]," supplier": item[2]," total_value": item[3], }

items_1 = store_1.get_items()items_2 = store_2.get_items()items_3 = store_3.get_items()items = chain(items_1, items_2, items_3) # <- maak één grote iterableitems = calc_total_val(items) # <- bereken de totale waardeitems = as_item_dict(items) # <- transformeer het in een dictall_items.add_or_update(items) # <- hier worden de gegevens gegenereerd# De database bevat nu alle nieuwste items van alle winkels

Om de stappen die we hebben genomen te laten zien, heb ik alles opgesplitst. Er zijn nog een paar dingen die verbeterd kunnen worden. Kijk eens naar de functie calc_total_val(). Dit is een perfect voorbeeld van een situatie waarin een generatoruitdrukking kan worden gebruikt.

from stores import store_1, store_2, store_3from database import all_itemsfrom itertools import chaindef as_item_dict(items): for item in items: yield {" naam": item[0]," merk": item[1]," leverancier": item[2]," totaal_waarde": item[3],}  items_1 = store_1.get_items()items_2 = store_2.get_items()items_3 = store_3.get_items()items = keten(items_1, items_2, items_3)items = ((*item[:3], item[3]*item[4]) voor item in items)items = as_item_dict(items)all_items.add_or_update(items)

Om het nog netter te maken, kunnen we al onze functies in een aparte module plaatsen. Op deze manier bevat ons hoofdbestand alleen de stappen die de gegevens doorlopen. Als we beschrijvende namen gebruiken voor onze generatoren, kunnen we meteen zien wat de code zal doen. Nu hebben we dus een pijplijn voor de gegevens gemaakt. Hoewel dit slechts een eenvoudig voorbeeld is, kan het ook worden gebruikt voor meer gecompliceerde workflows.

Gegevensproducten

Alles wat we in het bovenstaande voorbeeld hebben gedaan, kan eenvoudig worden toegepast op een Data Product. Als je niet bekend bent met gegevensproducten, is hier een geweldige tekst over gegevensmazen.


Stel je voor dat we een gegevensproduct hebben dat gegevens samenvoegt. Het heeft meerdere ingangen met verschillende soorten gegevens. Elk van die ingangen moet worden gefilterd, getransformeerd en opgeschoond voordat we ze kunnen samenvoegen tot één uitvoer. De klant vereist dat de uitvoer een enkel JSON-bestand is dat wordt opgeslagen in een S3-bucket. De bestaande infrastructuur staat slechts 500 Mb RAM toe voor de containers. Laten we nu alle gegevens laden, wat transformaties doen, alles samenvoegen en het in een JSON-bestand parsen.

from input_ports import port_1, port_2from output_ports import S3_portfrom json import dumps data_port_1: Generator = port_1.get_data()data_port_2: Generator = port_2.get_data()output = []voor rij in data_port_1: # doe hier wat transformatie of filteringoutput.append(rij)voor rij in data_port_2: # doe hier wat transformatie of filteringoutput.append(rij)S3_port.save(dumps(output))

Hoewel dit een uitstekende oplossing lijkt die het werk doet en eenvoudig te begrijpen is, crasht onze container plotseling door een OutOfMemory-fout. Na wat lokaal testen op onze machine, zien we dat het een 834Mb bestand heeft geproduceerd dat niet kan werken met slechts 500 MB RAM voor de container. Het probleem met de bovenstaande code is dat we alles eerst in een lijst bewaren, zodat alles in het geheugen wordt opgeslagen.

Oplossing

Laten we het nog eens proberen. Voor S3 kunnen we MultipartUpload gebruiken. Dit betekent dat we niet het hele bestand in het geheugen hoeven te bewaren. En natuurlijk moeten we onze lijsten vervangen door generatoren.

from input_ports import port_1, port_2from output_ports import S3_portfrom itertools import chainfrom json import dumps data_port_1: Generator = port_1.get_data()data_port_2: Generator = port_2.get_data()def port_1_transformer(data: Generator): voor rij in data: # doe hier wat transformatie of filtering opbrengst rijdef port_2_transformer(data: Generator): voor rij in data: # doe hier wat transformatie of filtering opbrengst rij output = chain(port_1_transformer(data_port_1), port_2_transformer(data_port_2))voor deel in output:S3_port.save_part(dumps(part))

Omdat we nu maar één onderdeel tegelijk in het geheugen hebben, gebruikt dit veel minder geheugen dan de eerdere oplossing met bijna geen extra werk. Echter, het sturen van een postverzoek naar S3 voor elk item kan een beetje veel zijn. Vooral als we 300.000 items hebben. Maar er is nog een probleem ...

De 'part size' moet tussen de 5MiB en 5GiB zijn. Om dit op te lossen, kunnen we meerdere onderdelen groeperen voordat we ze parseren. Maar als we er teveel groeperen, bereiken we opnieuw de geheugenlimiet. De grootte van de chunk moet daarom afhangen van hoe groot de individuele delen van je data zijn. Laten we, om dit te demonstreren, een grootte van 1.000 gebruiken. Hoe groter de chunkgrootte, hoe meer geheugen er wordt gebruikt, maar hoe minder verzoeken aan S3. We geven er dus de voorkeur aan dat onze chunks zo groot mogelijk zijn zonder dat het geheugen opraakt.

from input_ports import port_1, port_2from output_ports import S3_portfrom itertools import chainfrom json import dumps data_port_1: Generator = port_1.get_data()data_port_2: Generator = port_2.get_data()def makebatch(iterable, len): for first in iterable: yield chain([first], islice(iterable, len - 1))def port_1_transformer(data: Generator): for row in data: # do some transformation or filtering here yield rowdef port_2_transformer(data: Generator): for row in data: # do some transformation or filtering here yield row output = chain(port_1_transformer(data_port_1), port_2_transformer(data_port_2))for chunk in makebatch(output, 1000):S3_port.save_part(dumps(chunk))

Dit is alles wat er hoeft te gebeuren. Het is genoeg om grote hoeveelheden gegevens te transformeren en op te slaan in een S3-bucket, zelfs als de bronnen schaars zijn.

Bonus


Als je berekeningen rekenintensief zijn, is het eenvoudig om ze parallel uit te voeren. Met slechts een paar extra regels kunnen we onze transformers op meerdere cores laten draaien.

from multiprocessing.pool import Poolmet Pool(4) als pool: # imap_unordered kan ook worden gebruikt als de volgorde niet belangrijk isdata_1 = pool.imap(port_1_transformer, data_port_1, chunksize=500)data_2 = pool.imap(port_2_transformer, data_port_2, chunksize=500)uitvoer = keten(data_1, data_2) 

Het beste hieraan? We hoeven verder niets te veranderen omdat imap kan worden geïtereerd om resultaten te krijgen, net als elke andere generator. Laten we nu alles bij elkaar gooien. Dit is alles wat we nodig hebben voor rekenintensieve transformaties, over grote hoeveelheden gegevens, met gebruik van meerdere cores.

from input_ports import port_1, port_2from output_ports import S3_portfrom itertools import chainfrom json import dumpsfrom multiprocessing.pool import Pool data_port_1: Generator = port_1.get_data()data_port_2: Generator = port_2.get_data()def makebatch(iterable, len): for first in iterable: yield chain([first], islice(iterable, len - 1))def port_1_transformer(data: Generator): for row in data: # do some transformation or filtering here yield rowdef port_2_transformer(data: Generator): for row in data: # do some transformation or filtering here yield rowwith Pool(4) as pool: # imap_unordered could also be used if the order is not importantdata_1 = pool.imap(port_1_transformer, data_port_1, chunksize=500)data_2 = pool.imap(port_2_transformer, data_port_2, chunksize=500)uitvoer = keten(data_1, data_2) voor chunk in makebatch(uitvoer, 1000):S3_port.save_part(dumps(chunk))

Conclusie

Generatoren worden vaak verkeerd begrepen door nieuwe ontwikkelaars, maar ze kunnen een uitstekend hulpmiddel zijn. Of het nu gaat om een eenvoudige transformatie of iets geavanceerder zoals een gegevensproduct, Python is een goede keuze vanwege het gebruiksgemak en de overvloed aan tools die beschikbaar zijn in de standaardbibliotheek.


Thomas Eeckhout
Thomas Eeckhout
Solution Engineer, ACA Group
Contact us

Want to dive deeper into this topic?

Get in touch with our experts today. They are happy to help!

ACA mug mok koffie tas
Contact us

Want to dive deeper into this topic?

Get in touch with our experts today. They are happy to help!

ACA mug mok koffie tas
Contact us

Want to dive deeper into this topic?

Get in touch with our experts today. They are happy to help!

ACA mug mok koffie tas
Contact us

Want to dive deeper into this topic?

Get in touch with our experts today. They are happy to help!

ACA mug mok koffie tas