Logo

Introduction to Apache Airflow

avatar
Wuttichai Kaewlomsap@wuttichaihung

Introduction to Apache Airflow

getting-started-with-apache-airflow

Apache Airflow เป็นเครื่องมือจัดการ Workflow ที่ได้รับความนิยมสำหรับงาน Data Engineering และ Data Pipeline มันช่วยให้เราสามารถสร้างและควบคุมขั้นตอนการทำงานต่างๆ ได้อย่างมีประสิทธิภาพ ในบทความนี้เราจะมาลองติดตั้งและรันตัวอย่างการใช้งาน Airflow บน Docker ด้วยกัน ตั้งค่าสภาพแวดล้อม กำหนดตัวแปรสภาพแวดล้อมที่จำเป็นสำหรับการติดตั้งใน .env ไฟล์

.env
PYTHON_VERSION=3.9
AIRFLOW_VERSION=2.6.3
AIRFLOW_UID=1000
AIRFLOW_GID=0
AIRFLOW_FERNET_KEY=
AIRFLOW_WWW_USER_USERNAME=
AIRFLOW_WWW_USER_PASSWORD=
PIP_ADDITIONAL_REQUIREMENTS=airflow-code-editor==7.3.0 pandas requests apache-airflow-providers-mongo apache-airflow-providers-postgres

สร้าง Docker Compose File

สร้างไฟล์ docker-compose.yaml และใส่โค้ดดังนี้เพื่อกำหนดการตั้งค่าและสร้าง Container ให้กับ Airflow

docker-compose.yml
version: '3.9'

networks:
  airflow-network:
    driver: bridge

x-airflow-common:
  &airflow-common
  image: apache/airflow:${AIRFLOW_VERSION}-python${PYTHON_VERSION}
  environment:
    &airflow-common-env
    AIRFLOW__CORE__EXECUTOR: CeleryExecutor
    AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow
    AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@postgres/airflow
    AIRFLOW__CELERY__BROKER_URL: redis://:@redis:6379/0
    AIRFLOW__CORE__FERNET_KEY: ${AIRFLOW_FERNET_KEY}
    AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: 'true'
    AIRFLOW__CORE__LOAD_EXAMPLES: 'false'
    AIRFLOW__API__AUTH_BACKEND: 'airflow.api.auth.backend.basic_auth'
    _PIP_ADDITIONAL_REQUIREMENTS: ${PIP_ADDITIONAL_REQUIREMENTS:-}
  volumes:
    - ./dags:/opt/airflow/dags
    - ./logs:/opt/airflow/logs
    - ./plugins:/opt/airflow/plugins
  user: "${AIRFLOW_UID:-50000}:${AIRFLOW_GID:-50000}"
  depends_on:
    postgres:
      condition: service_healthy

Airflow ก็พร้อมสำหรับการใช้งานแล้ว มาดูขั้นตอนการรันและทดสอบการทำงานต่อไป!

การรัน Airflow แล้วลองสร้าง DAG (Directed Acyclic Graph)

การกำหนดขั้นตอนการทำงานต่างๆ ขึ้นมา ดังนี้:

  1. รัน Airflow

ใช้คำสั่งต่อไปนี้เพื่อรัน Airflow ด้วย Docker Compose

docker-compose up
  1. เข้าสู่หน้า Airflow UI

เปิดเบราว์เซอร์ แล้วไปที่ http://localhost:8080 คุณจะเห็นหน้า Airflow UI ที่สามารถควบคุมการทำงานของ DAG ต่างๆ ได้

สร้าง DAG ตัวอย่าง

  1. สร้างไฟล์ dags/first_dag.py แล้วใส่โค้ดดังนี้:
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.bash import BashOperator

default_args = {
  'owner': 'airflow',
  'depends_on_past': False,
  'start_date': datetime(2023, 5, 21),
  'email_on_failure': False,
  'email_on_retry': False,
  'retries': 1,
  'retry_delay': timedelta(minutes=5)
}

with DAG('first_dag', default_args=default_args, schedule_interval='@once') as dag:
  task1 = BashOperator(
      task_id='task1',
      bash_command='echo "Hello World!"'
  )

  task1
  1. ทดสอบรัน DAG

หลังจากรอประมาณ 1-2 นาที ให้รีเฟรชหน้า Airflow UI คุณจะเห็น DAG first_dag แสดงขึ้นมา คลิกที่ DAG แล้วกด Trigger DAG เพื่อรันดู

หากทุกอย่างถูกต้อง คุณจะเห็นสถานะของ Task ว่ามีการพรินท์ "Hello World!" ออกมา

นี่เป็นเพียงตัวอย่างง่ายๆ ของการใช้งาน Airflow ในการสร้าง DAG และ Task ต่างๆ ในความเป็นจริง DAG จะมีความซับซ้อนมากกว่านี้ โดยจะประกอบด้วย Task หลายๆ อย่างที่ทำงานประสานกัน เช่น Task สำหรับดึงข้อมูล Task ทำการประมวลผล และ Task ในการนำข้อมูลไปจัดเก็บ เป็นต้น

avatar

Wuttichai Kaewlomsap

Sr. Data Engineer