Files
airflow-dags/postgre-localhost-demo.py

49 lines
1.4 KiB
Python

from airflow import DAG
from airflow.providers.postgres.operators.postgres import PostgresOperator
from airflow.operators.python import PythonOperator
from airflow.utils.dates import days_ago
import csv
import os
# Параметры DAG
default_args = {
'owner': 'airflow',
'start_date': days_ago(1),
'depends_on_past': False,
'retries': 1,
}
dag = DAG(
'extract_postgres_to_csv',
default_args=default_args,
schedule_interval='@daily',
catchup=False,
)
# Путь к временному файлу CSV
output_file_path = '/opt/airflow/data/raw_data.csv'
# Функция для сохранения результата запроса в CSV
def save_query_to_csv(**kwargs):
from airflow.providers.postgres.hooks.postgres import PostgresHook
pg_hook = PostgresHook(postgres_conn_id='pg-localhost-demo')
sql = "SELECT * FROM bookings.segments s join bookings.flights f on s.flight_id = f.flight_id limit 100"
records = pg_hook.get_records(sql)
columns = [desc[0] for desc in pg_hook.get_conn().cursor().description]
os.makedirs(os.path.dirname(output_file_path), exist_ok=True)
with open(output_file_path, mode='w', newline='') as f:
writer = csv.writer(f)
writer.writerow(columns)
writer.writerows(records)
extract_task = PythonOperator(
task_id='extract_postgres_to_csv',
python_callable=save_query_to_csv,
dag=dag,
)
extract_task