from airflow import DAG from airflow.providers.postgres.operators.postgres import PostgresOperator from airflow.operators.python import PythonOperator from airflow.utils.dates import days_ago import csv import os # Параметры DAG default_args = { 'owner': 'airflow', 'start_date': days_ago(1), 'depends_on_past': False, 'retries': 1, } dag = DAG( 'extract_postgres_to_csv', default_args=default_args, schedule_interval='@daily', catchup=False, ) # Путь к временному файлу CSV output_file_path = '/opt/airflow/data/raw_data.csv' # Функция для сохранения результата запроса в CSV def save_query_to_csv(**kwargs): from airflow.providers.postgres.hooks.postgres import PostgresHook pg_hook = PostgresHook(postgres_conn_id='pg-localhost-demo') sql = "SELECT * FROM bookings.segments s join bookings.flights f on s.flight_id = f.flight_id limit 100" records = pg_hook.get_records(sql) columns = [desc[0] for desc in pg_hook.get_conn().cursor().description] os.makedirs(os.path.dirname(output_file_path), exist_ok=True) with open(output_file_path, mode='w', newline='') as f: writer = csv.writer(f) writer.writerow(columns) writer.writerows(records) extract_task = PythonOperator( task_id='extract_postgres_to_csv', python_callable=save_query_to_csv, dag=dag, ) extract_task