49 lines
1.4 KiB
Python
49 lines
1.4 KiB
Python
from airflow import DAG
|
|
from airflow.providers.postgres.operators.postgres import PostgresOperator
|
|
from airflow.operators.python import PythonOperator
|
|
from airflow.utils.dates import days_ago
|
|
import csv
|
|
import os
|
|
|
|
# Параметры DAG
|
|
default_args = {
|
|
'owner': 'airflow',
|
|
'start_date': days_ago(1),
|
|
'depends_on_past': False,
|
|
'retries': 1,
|
|
}
|
|
|
|
dag = DAG(
|
|
'extract_postgres_to_csv',
|
|
default_args=default_args,
|
|
schedule_interval='@daily',
|
|
catchup=False,
|
|
)
|
|
|
|
# Путь к временному файлу CSV
|
|
output_file_path = '/opt/airflow/data/raw_data.csv'
|
|
|
|
# Функция для сохранения результата запроса в CSV
|
|
def save_query_to_csv(**kwargs):
|
|
from airflow.providers.postgres.hooks.postgres import PostgresHook
|
|
pg_hook = PostgresHook(postgres_conn_id='pg-localhost-demo')
|
|
|
|
sql = "SELECT * FROM bookings.segments s join bookings.flights f on s.flight_id = f.flight_id limit 100"
|
|
|
|
records = pg_hook.get_records(sql)
|
|
columns = [desc[0] for desc in pg_hook.get_conn().cursor().description]
|
|
|
|
os.makedirs(os.path.dirname(output_file_path), exist_ok=True)
|
|
with open(output_file_path, mode='w', newline='') as f:
|
|
writer = csv.writer(f)
|
|
writer.writerow(columns)
|
|
writer.writerows(records)
|
|
|
|
extract_task = PythonOperator(
|
|
task_id='extract_postgres_to_csv',
|
|
python_callable=save_query_to_csv,
|
|
dag=dag,
|
|
)
|
|
|
|
extract_task
|