Lernen Sie die Grundlagen, um Ihre Datenpipelines mit Apache Airflow in die Produktion zu bringen. Installieren und konfigurieren Sie Airflow und schreiben Sie dann mit diesem interaktiven Tutorial Ihren ersten DAG.
Dieser Artikel ist ein wertvoller Beitrag unserer Community und wurde von DataCamp hinsichtlich Klarheit und Genauigkeit bearbeitet.
Möchten Sie Ihr Fachwissen mit uns teilen? Wir würden uns freuen, von Ihnen zu hören! Senden Sie uns Ihre Artikel oder Ideen gerne über unser Community-Beitragsformular .
Was ist Apache Airflow?
Apache Airflow oder Airflow ist ein Open-Source-Tool und Framework zum Ausführen Ihrer Datenpipelines in der Produktion. Als branchenführendes Datenworkflow-Management-Tool nutzt Apache Airflow Python, damit Datenexperten ihre Datenpipelines als Code definieren können. Airflow bietet die Möglichkeit, die Pipeline-Ausführung zu planen und die Leistung zu beobachten, und wird so zu einem zentralen Hub für alle Ihre Datenworkflows. Egal, ob Sie Trainingsdaten für ein Modell vorbereiten oder Daten in einem Datensee speichern, Airflow fügt die Funktionalität hinzu, um Ihre Datenpipelines produktionsbereit zu machen.
Airflow wurde 2014 von Maxime Beauchemin bei Airbnb entwickelt und trat später im März 2016 dem Inkubator-Programm der Apache Software Foundation bei, bevor es 2019 als Top-Level-Projekt angekündigt wurde. Laut der Airflow- Umfrage von 2022 wird Airflow jeden Monat millionenfach heruntergeladen und Tausende von großen und kleinen Unternehmen verlassen sich auf das Tool.
Hauptmerkmale des Luftstroms
Das Framework und die Architektur von Airflow weisen mehrere Schlüsselfunktionen auf, die es einzigartig machen. Lassen Sie uns zunächst etwas tiefer auf die wichtigsten Funktionen des Airflow-Frameworks eingehen.
Funktionen des Airflow-Frameworks
Die einfachste Einheit des Airflow-Frameworks sind Aufgaben. Aufgaben können als Operationen oder, für die meisten Datenteams, als Operationen in einer Datenpipeline betrachtet werden.
Ein herkömmlicher ETL-Workflow umfasst drei Aufgaben: Extrahieren, Transformieren und Laden von Daten. Abhängigkeiten definieren die Beziehungen zwischen den Aufgaben. Um auf unser ETL-Beispiel zurückzukommen: Die Aufgabe „Laden“ hängt von der Aufgabe „Transformieren“ ab, die wiederum von der Aufgabe „Extrahieren“ abhängt. Die Kombination aus Aufgaben und Abhängigkeiten erstellt DAGs oder gerichtete azyklische Graphen. DAGs stellen Datenpipelines in Airflow dar und sind etwas kompliziert zu definieren. Schauen wir uns stattdessen ein Diagramm einer grundlegenden ETL-Pipeline an:
Der obige DAG hat drei Aufgaben mit zwei Abhängigkeiten. Er wird als DAG bezeichnet, da es keine Schleifen (oder Zyklen) zwischen den Aufgaben gibt. Hier zeigen die Pfeile die gerichtete Natur des Prozesses; zuerst extractwird die Aufgabe ausgeführt, gefolgt von den Aufgaben transformund load. Mit DAGs ist es einfach, einen eindeutigen Anfang und ein eindeutiges Ende des Prozesses zu erkennen, selbst wenn die Logik komplex ist, wie beim unten gezeigten DAG:
In diesem DAG ist die Logik etwas verrückter. Es gibt eine Verzweigung basierend auf einer Bedingung und einige Aufgaben werden parallel ausgeführt. Der Graph ist jedoch gerichtet und es gibt keine zyklischen Abhängigkeiten zwischen Aufgaben. Sehen wir uns nun einen Prozess an, der kein DAG ist:
In diesem Diagramm gibt es eine eindeutige Schleife zwischen den transformund validateAufgaben. In einigen Fällen kann dieser DAG für immer ausgeführt werden, wenn es keine Möglichkeit gibt, aus dieser Schleife auszubrechen.
Beim Erstellen von Datenpipelines (auch außerhalb von Airflow) empfiehlt es sich, keine Workflows zu erstellen, die nicht als DAGs dargestellt werden können, da dabei wichtige Funktionen wie Determinismus oder Idempotenz verloren gehen könnten.
Merkmale der Airflow-Architektur
Um DAGs zu planen, Aufgaben auszuführen und Einblick in die Ausführungsdetails der Datenpipeline zu gewähren, nutzt Airflow eine Python-basierte Architektur, die aus den folgenden Komponenten besteht:
- Planer
- Testamentsvollstrecker
- Metadatendatenbank
- Webserver (Benutzeroberfläche)
Unabhängig davon, ob Airflow lokal oder in einer Produktionsumgebung ausgeführt wird, muss jede dieser Komponenten betriebsbereit sein, damit Airflow ordnungsgemäß funktioniert.
Der Scheduler ist (Sie haben es wahrscheinlich erraten) für die Planung von DAGs verantwortlich. Um einen DAG zu planen, müssen beim Schreiben des DAG als Python-Code ein Startdatum und ein Planungsintervall für den DAG angegeben werden.
Sobald ein DAG geplant ist, müssen Aufgaben innerhalb dieser DAGs ausgeführt werden. Hier kommt der Executor ins Spiel. Der Executor führt die Logik innerhalb jeder Aufgabe nicht aus. Er weist die Aufgabe lediglich den Ressourcen zu, die für die Ausführung konfiguriert sind. Die Metadatendatenbank speichert Informationen zu DAG-Läufen, z. B. ob der DAG und die zugehörigen Aufgaben erfolgreich ausgeführt wurden oder nicht.
Die Metadatendatenbank speichert auch Informationen wie benutzerdefinierte Variablen und Verbindungen, die beim Aufbau produktionsreifer Datenpipelines hilfreich sind. Schließlich stellt der Webserver die Benutzeroberfläche mit Airflow bereit.
Diese Benutzeroberfläche (UI) bietet Datenteams ein zentrales Tool zur Verwaltung ihrer Pipeline-Ausführung. In der Airflow-UI können Datenteams den Status ihrer DAGs anzeigen, DAGs manuell erneut ausführen, Variablen und Verbindungen speichern und vieles mehr. Die Airflow-UI bietet zentrale Transparenz in Datenaufnahme- und -bereitstellungsprozesse und hilft Datenteams, auf dem Laufenden zu bleiben und sich der Leistung ihrer Datenpipeline bewusst zu sein.
Installieren von Apache Airflow
Es gibt mehrere Möglichkeiten, Apache Airflow zu installieren. Wir behandeln zwei der gängigsten.
Installieren von Airflow mitpip
Voraussetzungen:
- python3Eingerichtet
Um Airflow mit pipder Paketmethode von Python zu installieren, können Sie den folgenden Befehl ausführen:
Sobald die Installation des Pakets abgeschlossen ist, müssen Sie alle Komponenten eines Airflow-Projekts erstellen, z. B. Ihr Airflow-Stammverzeichnis einrichten, eine airflow.cfgDatei erstellen, die Metadatendatenbank hochfahren und vieles mehr. Dies kann viel Arbeit bedeuten und erfordert einiges an Erfahrung mit Airflow. Glücklicherweise gibt es mit der Astro CLI einen viel einfacheren Weg.
Installieren von Airflow mit der Astro CLI
Voraussetzungen:
- python3Eingerichtet
- Docker installiert
Astronomer, ein verwalteter Airflow-Anbieter, bietet eine Reihe kostenloser Tools, die die Arbeit mit Airflow erleichtern. Eines dieser Tools ist die Astro CLI.
Mit der Astro CLI können Sie alles, was Sie zum Ausführen von Airflow benötigen, ganz einfach erstellen und verwalten. Zunächst müssen Sie die CLI installieren. Um dies auf Ihrem Computer zu tun, sehen Sie sich diesen Link zur Astronomer-Dokumentation an und befolgen Sie die Schritte für Ihr Betriebssystem.
Sobald die Astro CLI installiert ist, ist zum Konfigurieren eines gesamten Airflow-Projekts nur ein Befehl erforderlich:
Dadurch werden alle für ein Airflow-Projekt benötigten Ressourcen in Ihrem aktuellen Arbeitsverzeichnis konfiguriert. Ihr aktuelles Arbeitsverzeichnis sieht dann ungefähr so aus:
Sobald das Projekt erstellt wurde, führen Sie zum Starten des Projekts aus astro dev start. Nach etwa einer Minute können Sie die Airflow-Benutzeroberfläche in Ihrem Browser unter der Adresse öffnen https://localhost:8080/. Jetzt sind Sie bereit, Ihren ersten DAG zu schreiben!
Schreiben Ihres ersten Airflow-DAG
Wir haben die Grundlagen und erweiterten Funktionen des Frameworks und der Architektur von Airflow behandelt. Nachdem Airflow nun installiert wurde, können Sie Ihren ersten DAG schreiben. Erstellen Sie zunächst eine Datei namens sample_dag.pyim dags/Verzeichnis des gerade erstellten Airflow-Projekts. Öffnen Sie die Datei mit Ihrem bevorzugten Texteditor oder Ihrer bevorzugten IDE sample_dag.py. Lassen Sie uns zunächst den DAG instanziieren.
Oben verwenden wir die DAGFunktion aus dem airflowModul, um in Verbindung mit dem withKontextmanager einen DAG zu definieren.
start_dateEs werden ein scheduleIntervall und ein Wert für catchupbereitgestellt. Dieser DAG wird jeden Tag um 9:00 Uhr UTC ausgeführt. Da catchupauf True gesetzt ist, wird dieser DAG jeden Tag zwischen dem Tag seiner ersten Auslösung und dem 1. Januar 2024 ausgeführt und max_active_runs=1stellt sicher, dass immer nur ein DAG ausgeführt werden kann.
Fügen wir nun ein paar Aufgaben hinzu! Zuerst erstellen wir eine Aufgabe, um das Extrahieren von Daten aus einer API zu simulieren. Sehen Sie sich den folgenden Code an:
Als Nächstes möchten wir eine Aufgabe erstellen, um die von der extract_dataAufgabe zurückgegebenen Daten zu transformieren. Dies kann mit dem folgenden Code erfolgen. Hier verwenden wir eine Airflow-Funktion namens XComs, um Daten aus der vorherigen Aufgabe abzurufen.
Da render_templat_as_native_objauf gesetzt ist True, werden diese Werte als Python-Objekte und nicht als Zeichenfolgen freigegeben. Die Rohdaten der Aufgabe werden dann als Schlüsselwortargument extract_dataan übergeben . Diese Daten werden dann transformiert und zurückgegeben, wo sie von der Aufgabe auf ähnliche Weise verwendet werden.transform_data_callableload_data
Abschließend werden Abhängigkeiten zwischen den Aufgaben festgelegt. Der Code hier legt Abhängigkeiten zwischen den Aufgaben extract_data, transform_data, und load_datafest, um einen grundlegenden ETL-DAG zu erstellen.
So wird das Endprodukt aussehen!
Sobald Sie Ihre Pipeline als Python-Code definiert haben, können Sie Ihren DAG über die Airflow-Benutzeroberfläche aktivieren. Klicken Sie auf den weather_etlDAG und schalten Sie den Schalter oben links ein. Beobachten Sie, wie Ihre Aufgaben und der DAG-Lauf erfolgreich abgeschlossen werden.
Herzlichen Glückwunsch, Sie haben Ihren ersten Airflow DAG geschrieben und ausgeführt!
Zusätzlich zur Verwendung herkömmlicher Operatoren hat Airflow die TaskFlow-API eingeführt, die das Definieren von DAGs und Aufgaben mithilfe von Dekoratoren und nativem Python-Code erleichtert.
Anstatt XComs explizit zum Teilen von Daten zwischen Aufgaben zu verwenden, abstrahiert die TaskFlow-API diese Logik und verwendet stattdessen XComs im Hintergrund. Der folgende Code zeigt genau dieselbe Logik und Funktionalität wie oben, diesmal implementiert mit der TaskFlow-API, die für Datenanalysten und Wissenschaftler, die zum Erstellen skriptbasierter ETL-Logik verwendet werden, intuitiver ist.
Bewährte Verfahren für den Luftstrom
Das Erstellen von Airflow-DAGs kann schwierig sein. Beim Erstellen von Datenpipelines und Workflows sollten Sie einige bewährte Methoden beachten, nicht nur mit Airflow, sondern auch mit anderen Tools.
Modularität
Mithilfe von Aufgaben hilft Airflow dabei, die Modularität leichter visualisieren zu können. Versuchen Sie nicht, zu viel in einer einzigen Aufgabe zu erledigen. Zwar kann eine ganze ETL-Pipeline in einer einzigen Aufgabe erstellt werden, dies würde jedoch die Fehlerbehebung erschweren. Auch die Visualisierung der Leistung eines DAG würde dadurch erschwert.
Beim Erstellen einer Aufgabe ist es wichtig, sicherzustellen, dass die Aufgabe nur eine Sache tut, ähnlich wie Funktionen in Python.
Schauen Sie sich das folgende Beispiel an. Beide DAGs machen dasselbe und schlagen an derselben Stelle im Code fehl. Beim DAG auf der linken Seite ist jedoch klar, dass die loadLogik den Fehler verursacht, während dies beim DAG auf der rechten Seite nicht ganz klar ist.
Determinismus
Ein deterministischer Prozess ist ein Prozess, der bei gleicher Eingabe das gleiche Ergebnis liefert. Wenn ein DAG für ein bestimmtes Intervall ausgeführt wird, sollte er jedes Mal die gleichen Ergebnisse erzeugen. Obwohl es sich um ein komplexeres Merkmal von Datenpipelines handelt, ist Determinismus wichtig, um konsistente Ergebnisse sicherzustellen.
Nutzen Sie mit Airflow die Jinja-Vorlagenfunktion, um vorlagenbasierte Felder an Airflow-Operatoren zu übergeben, anstatt die datetime.now()Funktion zum Erstellen zeitlicher Daten zu verwenden decision tree.
Idempotenz
Was passiert, wenn Sie einen DAG zweimal für das gleiche Intervall ausführen? Oder 10 Mal? Werden Sie am Ende doppelte Daten auf Ihrem Zielspeichermedium haben? Idempotenz stellt sicher, dass selbst wenn eine Datenpipeline mehrmals ausgeführt wird, es so ist, als wäre die Pipeline nur einmal ausgeführt worden.
Um Datenpipelines deterministisch zu machen, sollten Sie über die Einbindung der folgenden Logik in Ihre DAGs nachdenken:
- Dateien überschreiben, wenn DAGs erneut ausgeführt werden, anstatt beim Ausführen im gleichen Intervall eine neue Datei mit einem anderen Namen zu erstellen
- Verwenden Sie zum Übertragen von Daten in Datenbanken und Data Warehouses ein Lösch-/Schreibmuster , anstatt INSERTsie zu verwenden, da dies zu Duplikaten führen kann.
Orchestrierung ist nicht Transformation
Airflow ist nicht dafür gedacht, riesige Datenmengen zu verarbeiten. Wenn Sie Transformationen mit mehr als ein paar Gigabyte Daten durchführen möchten, ist Airflow immer noch das richtige Tool für diese Aufgabe. Allerdings sollte Airflow ein anderes Tool wie dbt oder Databricks aufrufen, um die Transformation auszuführen.
Normalerweise werden Aufgaben lokal auf Ihrem Computer oder mit Arbeitsknoten in der Produktion ausgeführt. In beiden Fällen stehen für die erforderliche Rechenarbeit nur wenige Gigabyte Speicher zur Verfügung.
Konzentrieren Sie sich auf die Verwendung von Airflow für sehr einfache Datentransformationen und als Orchestrierungstool beim Bearbeiten größerer Datenmengen.
Apache Airflow in der Industrie
Dank der Fähigkeit von Airflow, Datenpipelines als Code zu definieren, und der großen Vielfalt an Konnektoren und Operatoren verlassen sich Unternehmen auf der ganzen Welt beim Betrieb ihrer Datenplattformen auf Airflow.
In der Industrie kann ein Datenteam mit einer Vielzahl von Tools arbeiten, von SFTP-Sites über Cloud-Dateispeichersysteme bis hin zu einem Data Lakehouse. Um eine Datenplattform aufzubauen, ist die Integration dieser unterschiedlichen Systeme von größter Bedeutung.
Dank einer lebendigen Open-Source-Community gibt es Tausende vorgefertigter Konnektoren, die Ihnen bei der Integration Ihrer Datentools helfen. Sie möchten eine Datei von S3 in Snowflake ablegen? Zum Glück S3ToSnowflakeOperatorist das mit ganz einfach! Wie wäre es mit Datenqualitätsprüfungen mit Great Expectations? Auch das gibt es bereits.
Wenn Sie nicht das richtige vorgefertigte Werkzeug für die Aufgabe finden, ist das kein Problem. Airflow ist erweiterbar, sodass Sie problemlos Ihre eigenen benutzerdefinierten Werkzeuge erstellen können, die Ihren Anforderungen entsprechen.
Wenn Sie Airflow in der Produktion ausführen, sollten Sie auch über die Tools nachdenken, die Sie zur Verwaltung der Infrastruktur verwenden. Hierfür gibt es eine Reihe von Möglichkeiten, mit Premium-Angeboten wie Astronomer, Cloud-nativen Optionen wie MWAA oder sogar einer selbst entwickelten Lösung.
Normalerweise ist dabei ein Kompromiss zwischen Kosten und Infrastrukturverwaltung erforderlich. Teurere Lösungen bedeuten möglicherweise weniger Verwaltungsaufwand, während die Ausführung aller Elemente auf einer einzigen EC2-Instanz zwar kostengünstig, aber schwierig zu warten sein kann.
Fazit
Apache Airflow ist ein branchenführendes Tool zum Ausführen von Datenpipelines in der Produktion. Airflow bietet Funktionen wie Planung, Erweiterbarkeit und Beobachtbarkeit und ermöglicht es Datenanalysten, Wissenschaftlern und Ingenieuren, Datenpipelines als Code zu definieren. So können sich Datenexperten auf die Erzielung geschäftlicher Auswirkungen konzentrieren.
Der Einstieg in Airflow ist einfach, insbesondere mit der Astro CLI. Traditionelle Operatoren und die TaskFlow-API erleichtern das Schreiben Ihrer ersten DAGs. Achten Sie beim Erstellen von Datenpipelines mit Airflow darauf, dass Modularität, Determinismus und Idempotenz bei Ihren Designentscheidungen an erster Stelle stehen. Diese bewährten Methoden helfen Ihnen, spätere Probleme zu vermeiden, insbesondere wenn bei Ihren DAGs ein Fehler auftritt.
Bei Airflow gibt es eine Menge zu lernen. Probieren Sie Airflow für Ihr nächstes Datenanalyse- oder Datenwissenschaftsprojekt aus. Experimentieren Sie mit vorgefertigten Operatoren oder erstellen Sie Ihre eigenen. Versuchen Sie, Daten zwischen Aufgaben mit herkömmlichen Operatoren und der TaskFlow-API zu teilen. Scheuen Sie sich nicht, die Grenzen auszutesten. Wenn Sie bereit sind, loszulegen, sehen Sie sich den Kurs „Einführung in Airflow in Python“ von DataCamp an , der die Grundlagen von Airflow behandelt und zeigt, wie komplexe Datentechnik-Pipelines in der Produktion implementiert werden.
Sie können auch mit unserem Einführungskurs in Datenpipelines beginnen , der Ihnen dabei hilft, Ihre Fähigkeiten zum Aufbau effektiver, leistungsfähiger und zuverlässiger Datenpipelines zu verfeinern.
Wenn Sie mehr wissen möchten, sehen Sie sich die folgenden Ressourcen an. Viel Glück und viel Spaß beim Programmieren!