From 390d67866b434cb91ade4a26c24d08cd2bff5c86 Mon Sep 17 00:00:00 2001 From: airflow Date: Wed, 22 Oct 2025 06:38:35 +0400 Subject: [PATCH] =?UTF-8?q?=D0=94=D0=BE=D0=B1=D0=B0=D0=B2=D0=B8=D1=82?= =?UTF-8?q?=D1=8C=20postgre-localhost-demo.py?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- postgre-localhost-demo.py | 50 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 50 insertions(+) create mode 100644 postgre-localhost-demo.py diff --git a/postgre-localhost-demo.py b/postgre-localhost-demo.py new file mode 100644 index 0000000..fee6e97 --- /dev/null +++ b/postgre-localhost-demo.py @@ -0,0 +1,50 @@ +from airflow import DAG +from airflow.providers.postgres.operators.postgres import PostgresOperator +from airflow.operators.python import PythonOperator +from airflow.utils.dates import days_ago +import csv +import os + +# Параметры DAG +default_args = { + 'owner': 'airflow', + 'start_date': days_ago(1), + 'depends_on_past': False, + 'retries': 1, +} + +dag = DAG( + 'extract_postgres_to_csv', + default_args=default_args, + schedule_interval='@daily', + catchup=False, +) + +# Путь к временному файлу CSV +output_file_path = '/opt/airflow/data/raw_data.csv' + +# Функция для сохранения результата запроса в CSV +def save_query_to_csv(**kwargs): + from airflow.providers.postgres.hooks.postgres import PostgresHook + pg_hook = PostgresHook(postgres_conn_id='pg-localhost-demo') + + sql = "SELECT * FROM bookings.segments s +join bookings.flights f on s.flight_id = f.flight_id +limit 100" + + records = pg_hook.get_records(sql) + columns = [desc[0] for desc in pg_hook.get_conn().cursor().description] + + os.makedirs(os.path.dirname(output_file_path), exist_ok=True) + with open(output_file_path, mode='w', newline='') as f: + writer = csv.writer(f) + writer.writerow(columns) + writer.writerows(records) + +extract_task = PythonOperator( + task_id='extract_postgres_to_csv', + python_callable=save_query_to_csv, + dag=dag, +) + +extract_task