카테고리 없음

[Airflow] DAG 파일 Task 여러개 작성 및 CLI실행. web실행

parkpark4 2024. 2. 2. 15:10

> DAG 파일 작성

# DAG 인스턴스화에 사용하는 라이브러리
from airflow import DAG
from datetime import datetime, timedelta
# Operator
from airflow.operators.bash_operator import BashOperator
from airflow.utils.dates import days_ago
from airflow.operators.python_operator import PythonOperator
from airflow.decorators import task
from airflow.models import Variable


# Operator에 매개변수 전달
default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': days_ago(2),
    # 'email': ['airflow@example.com'],
    # 'email_on_failure': False,
    # 'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
    # 'queue': 'bash_queue',
    # 'pool': 'backfill',
    # 'priority_weight': 10,
    # 'end_date': datetime(2016, 1, 1),
    # 'wait_for_downstream': False,
    # 'dag': dag,
    # 'sla': timedelta(hours=2),
    # 'execution_timeout': timedelta(seconds=300),
    # 'on_failure_callback': some_function,
    # 'on_success_callback': some_other_function,
    # 'on_retry_callback': another_function,
    # 'sla_miss_callback': yet_another_function,
    # 'trigger_rule': 'all_success'
}

# DAG 인스턴스화
dag = DAG(
    'helloWorld',
    default_args=default_args,
    description='echo "hello, World!"',
    schedule_interval=timedelta(minutes=5),
)

# task1 - bash_command
my_task1 = BashOperator(
    task_id='echo_world',
    bash_command='echo "Hello, world!"',
    dag=dag
)

# Function 
def my_function():
    print("Executing Python function")

# task2 - python_callable
my_task2 = PythonOperator(
    task_id='python_operator',
    python_callable=my_function,
    dag=dag,
)

 

> DAG 실행 

  • CLI로 DAG실행
$ airflow dags list # DAG 리스트 조회
$ airflow tasks list <dag_id> # DAG 안의 Task 조회
$airflow tasks test <dag_id> <task_id> <execution_date> # DAG중 특정 Task 실행
$ airflow dags test <dag_id> <execution_date> # DAG 전체 실행
$ airflow dags backfill <dag_id> -s <start_date> -e <end_date> # DAG Backfill 실행

 

  • WEB에서 DAG 실행
    • Toggle스위치 ON

ON

 

  • DAG의 Graph를 클릭하면 태스크를 확인할 수 있음
  • 또한 코드와 로그도 확인할 수 있음

Graph

 

 

 

 

 

 

< airflow docs >

airflow refresh dags interval

 

Configuration Reference — Airflow Documentation

 

airflow.apache.org