Introduction to Apache Airflow
Introduction to Apache Airflow
Apache Airflow เป็นเครื่องมือจัดการ Workflow ที่ได้รับความนิยมสำหรับงาน Data Engineering และ Data Pipeline มันช่วยให้เราสามารถสร้างและควบคุมขั้นตอนการทำงานต่างๆ ได้อย่างมีประสิทธิภาพ ในบทความนี้เราจะมาลองติดตั้งและรันตัวอย่างการใช้งาน Airflow บน Docker ด้วยกัน ตั้งค่าสภาพแวดล้อม กำหนดตัวแปรสภาพแวดล้อมที่จำเป็นสำหรับการติดตั้งใน .env
ไฟล์
PYTHON_VERSION=3.9
AIRFLOW_VERSION=2.6.3
AIRFLOW_UID=1000
AIRFLOW_GID=0
AIRFLOW_FERNET_KEY=
AIRFLOW_WWW_USER_USERNAME=
AIRFLOW_WWW_USER_PASSWORD=
PIP_ADDITIONAL_REQUIREMENTS=airflow-code-editor==7.3.0 pandas requests apache-airflow-providers-mongo apache-airflow-providers-postgres
สร้าง Docker Compose File
สร้างไฟล์ docker-compose.yaml
และใส่โค้ดดังนี้เพื่อกำหนดการตั้งค่าและสร้าง Container ให้กับ Airflow
version: '3.9'
networks:
airflow-network:
driver: bridge
x-airflow-common:
&airflow-common
image: apache/airflow:${AIRFLOW_VERSION}-python${PYTHON_VERSION}
environment:
&airflow-common-env
AIRFLOW__CORE__EXECUTOR: CeleryExecutor
AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow
AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@postgres/airflow
AIRFLOW__CELERY__BROKER_URL: redis://:@redis:6379/0
AIRFLOW__CORE__FERNET_KEY: ${AIRFLOW_FERNET_KEY}
AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: 'true'
AIRFLOW__CORE__LOAD_EXAMPLES: 'false'
AIRFLOW__API__AUTH_BACKEND: 'airflow.api.auth.backend.basic_auth'
_PIP_ADDITIONAL_REQUIREMENTS: ${PIP_ADDITIONAL_REQUIREMENTS:-}
volumes:
- ./dags:/opt/airflow/dags
- ./logs:/opt/airflow/logs
- ./plugins:/opt/airflow/plugins
user: "${AIRFLOW_UID:-50000}:${AIRFLOW_GID:-50000}"
depends_on:
postgres:
condition: service_healthy
Airflow ก็พร้อมสำหรับการใช้งานแล้ว มาดูขั้นตอนการรันและทดสอบการทำงานต่อไป!
การรัน Airflow แล้วลองสร้าง DAG (Directed Acyclic Graph)
การกำหนดขั้นตอนการทำงานต่างๆ ขึ้นมา ดังนี้:
- รัน Airflow
ใช้คำสั่งต่อไปนี้เพื่อรัน Airflow ด้วย Docker Compose
docker-compose up
- เข้าสู่หน้า Airflow UI
เปิดเบราว์เซอร์ แล้วไปที่ http://localhost:8080
คุณจะเห็นหน้า Airflow UI ที่สามารถควบคุมการทำงานของ DAG ต่างๆ ได้
สร้าง DAG ตัวอย่าง
- สร้างไฟล์
dags/first_dag.py
แล้วใส่โค้ดดังนี้:
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.bash import BashOperator
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2023, 5, 21),
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5)
}
with DAG('first_dag', default_args=default_args, schedule_interval='@once') as dag:
task1 = BashOperator(
task_id='task1',
bash_command='echo "Hello World!"'
)
task1
- ทดสอบรัน DAG
หลังจากรอประมาณ 1-2 นาที ให้รีเฟรชหน้า Airflow UI คุณจะเห็น DAG first_dag
แสดงขึ้นมา คลิกที่ DAG แล้วกด Trigger DAG
เพื่อรันดู
หากทุกอย่างถูกต้อง คุณจะเห็นสถานะของ Task ว่ามีการพรินท์ "Hello World!" ออกมา
นี่เป็นเพียงตัวอย่างง่ายๆ ของการใช้งาน Airflow ในการสร้าง DAG และ Task ต่างๆ ในความเป็นจริง DAG จะมีความซับซ้อนมากกว่านี้ โดยจะประกอบด้วย Task หลายๆ อย่างที่ทำงานประสานกัน เช่น Task สำหรับดึงข้อมูล Task ทำการประมวลผล และ Task ในการนำข้อมูลไปจัดเก็บ เป็นต้น
Wuttichai Kaewlomsap
Sr. Data Engineer