Добавить postgre-localhost-demo.py
This commit is contained in:
50
postgre-localhost-demo.py
Normal file
50
postgre-localhost-demo.py
Normal file
@@ -0,0 +1,50 @@
|
||||
from airflow import DAG
|
||||
from airflow.providers.postgres.operators.postgres import PostgresOperator
|
||||
from airflow.operators.python import PythonOperator
|
||||
from airflow.utils.dates import days_ago
|
||||
import csv
|
||||
import os
|
||||
|
||||
# Параметры DAG
|
||||
default_args = {
|
||||
'owner': 'airflow',
|
||||
'start_date': days_ago(1),
|
||||
'depends_on_past': False,
|
||||
'retries': 1,
|
||||
}
|
||||
|
||||
dag = DAG(
|
||||
'extract_postgres_to_csv',
|
||||
default_args=default_args,
|
||||
schedule_interval='@daily',
|
||||
catchup=False,
|
||||
)
|
||||
|
||||
# Путь к временному файлу CSV
|
||||
output_file_path = '/opt/airflow/data/raw_data.csv'
|
||||
|
||||
# Функция для сохранения результата запроса в CSV
|
||||
def save_query_to_csv(**kwargs):
|
||||
from airflow.providers.postgres.hooks.postgres import PostgresHook
|
||||
pg_hook = PostgresHook(postgres_conn_id='pg-localhost-demo')
|
||||
|
||||
sql = "SELECT * FROM bookings.segments s
|
||||
join bookings.flights f on s.flight_id = f.flight_id
|
||||
limit 100"
|
||||
|
||||
records = pg_hook.get_records(sql)
|
||||
columns = [desc[0] for desc in pg_hook.get_conn().cursor().description]
|
||||
|
||||
os.makedirs(os.path.dirname(output_file_path), exist_ok=True)
|
||||
with open(output_file_path, mode='w', newline='') as f:
|
||||
writer = csv.writer(f)
|
||||
writer.writerow(columns)
|
||||
writer.writerows(records)
|
||||
|
||||
extract_task = PythonOperator(
|
||||
task_id='extract_postgres_to_csv',
|
||||
python_callable=save_query_to_csv,
|
||||
dag=dag,
|
||||
)
|
||||
|
||||
extract_task
|
||||
Reference in New Issue
Block a user