Обновить clickhouse_dag_new.py
This commit is contained in:
@@ -1,16 +1,26 @@
|
|||||||
from airflow import DAG
|
from airflow import DAG
|
||||||
from airflow.providers.postgres.operators.postgres import PostgresOperator
|
from airflow_clickhouse_plugin.operators.clickhouse import ClickHouseOperator
|
||||||
from datetime import datetime
|
from airflow.utils.dates import days_ago
|
||||||
|
|
||||||
|
# Аргументы DAG по умолчанию
|
||||||
|
default_args = {
|
||||||
|
'owner': 'airflow',
|
||||||
|
'depends_on_past': False,
|
||||||
|
'start_date': days_ago(1),
|
||||||
|
}
|
||||||
|
|
||||||
|
# Создаем DAG
|
||||||
with DAG(
|
with DAG(
|
||||||
dag_id="clickhouse_list_tables_1",
|
dag_id='test_clickhouse_connection',
|
||||||
start_date=datetime(2025, 1, 1),
|
default_args=default_args,
|
||||||
schedule_interval=None,
|
schedule_interval=None, # Запуск только вручную
|
||||||
catchup=False,
|
tags=['test'],
|
||||||
) as dag:
|
) as dag:
|
||||||
|
|
||||||
list_tables = PostgresOperator(
|
# Задача с использованием ClickHouseOperator
|
||||||
task_id="list_tables",
|
test_query = ClickHouseOperator(
|
||||||
postgres_conn_id="clickhouse_db",
|
task_id='run_simple_query',
|
||||||
sql="SELECT 1;"
|
# Простой запрос для проверки соединения
|
||||||
|
sql='SELECT 1 AS test_value',
|
||||||
|
# clickhouse_conn_id='clickhouse_default' можно не указывать, т.к. это значение по умолчанию
|
||||||
)
|
)
|
||||||
Reference in New Issue
Block a user