diff --git a/pet-project/create_dm_pg.py b/pet-project/create_dm_pg.py index 4222b7c..27ebd3e 100644 --- a/pet-project/create_dm_pg.py +++ b/pet-project/create_dm_pg.py @@ -4,25 +4,34 @@ from pathlib import Path import pyspark.sql.functions as F from pyspark.sql import DataFrame, SparkSession -spark = SparkSession.builder.appName("create_dm_pg").master("local[*]").getOrCreate() + +def get_spark() -> SparkSession: + return SparkSession.builder.appName("create_dm_pg").getOrCreate() -def read_csv(csv_filename: str, path: Path = Path("src/pg")) -> DataFrame: - return spark.read.csv( - path=str(path / csv_filename), - header=True, - inferSchema=True, +def load_postgres_table( + spark: SparkSession, + table_name: str, + schema: str = "anazarenko", +) -> DataFrame: + return ( + spark.read + .format("jdbc") + .option("url", "jdbc:postgresql://postgres.de-infra.servehttp.com:9002/mydb") + .option("dbtable", f"{schema}.{table_name}") + .option("user", "anazarenko") + .option("password", "rT7Wof6frDbqhMq0") + .option("driver", "org.postgresql.Driver") + .load() ) -users = read_csv("users.csv").withColumnRenamed("id", "user_id") -courses = read_csv("courses.csv").withColumnRenamed("id", "course_id") -lessons = read_csv("lessons.csv").withColumnRenamed("id", "lesson_id") -enrollments = read_csv("enrollments.csv") -lesson_views = read_csv("lesson_views.csv") - - -def user_activity_summary() -> DataFrame: +def user_activity_summary( + enrollments: DataFrame, + lesson_views: DataFrame, + users: DataFrame, + lessons: DataFrame, +) -> DataFrame: enrollment_stats = enrollments.groupBy("user_id").agg( F.count("course_id").alias("enrollment_count") ) @@ -73,7 +82,12 @@ def user_activity_summary() -> DataFrame: return result -def course_summary() -> DataFrame: +def course_summary( + enrollments: DataFrame, + lessons: DataFrame, + lesson_views: DataFrame, + courses: DataFrame, +) -> DataFrame: enrollment_stats = enrollments.groupBy("course_id").agg( F.count_distinct("user_id").alias("unique_users") ) @@ -115,7 +129,12 @@ def course_summary() -> DataFrame: return result -def platform_summary() -> DataFrame: +def platform_summary( + users: DataFrame, + courses: DataFrame, + lessons: DataFrame, + lesson_views: DataFrame, +) -> DataFrame: total_users = users.agg(F.count("*").alias("total_users")) total_courses = courses.agg(F.count("*").alias("total_courses")) total_lessons = lessons.agg(F.count("*").alias("total_lessons")) @@ -149,15 +168,43 @@ def platform_summary() -> DataFrame: return result -output_dir = Path("marts/pg") -os.makedirs(output_dir, exist_ok=True) +def save_result(df: DataFrame, name: str, output_dir: Path = Path("marts/pg")): + os.makedirs(output_dir, exist_ok=True) + df.write.mode("overwrite").option("header", "true").csv(str(output_dir / name)) -user_activity_summary().write.mode("overwrite").option("header", "true").csv( - str(output_dir / "user_activity_summary") -) -course_summary().write.mode("overwrite").option("header", "true").csv( - str(output_dir / "course_summary") -) -platform_summary().write.mode("overwrite").option("header", "true").csv( - str(output_dir / "platform_summary") -) + +def main(): + spark = get_spark() + + users = load_postgres_table(spark, "users").withColumnRenamed("id", "user_id") + courses = load_postgres_table(spark, "courses").withColumnRenamed("id", "course_id") + lessons = load_postgres_table(spark, "lessons").withColumnRenamed("id", "lesson_id") + enrollments = load_postgres_table(spark, "enrollments") + lesson_views = load_postgres_table(spark, "lesson_views") + + df_user_activity_summary = user_activity_summary( + enrollments=enrollments, + lessons=lessons, + lesson_views=lesson_views, + users=users, + ) + df_course_summary = course_summary( + enrollments=enrollments, + lessons=lessons, + lesson_views=lesson_views, + courses=courses, + ) + df_platform_summary = platform_summary( + courses=courses, + lessons=lessons, + lesson_views=lesson_views, + users=users, + ) + + save_result(df_user_activity_summary, "user_activity_summary") + save_result(df_course_summary, "course_summary") + save_result(df_platform_summary, "platform_summary") + + +if __name__ == "__main__": + main() diff --git a/pet-project/dag_spark_create_dm_pg.py b/pet-project/dag_spark_create_dm_pg.py new file mode 100644 index 0000000..9e550c2 --- /dev/null +++ b/pet-project/dag_spark_create_dm_pg.py @@ -0,0 +1,41 @@ +from airflow import DAG +from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator +from datetime import datetime, timedelta + + +default_args = { + "owner": "anazarenko", + "depends_on_past": False, + "start_date": datetime(2025, 1, 1), + "retries": 1, + "retry_delay": timedelta(minutes=5), +} + +with DAG( + dag_id="spark_create_dm_pg", + start_date=datetime(2026, 1, 1), + schedule=None, + catchup=False, + default_args=default_args, + tags=["spark", "postgres"], +) as dag: + + spark_task = SparkSubmitOperator( + task_id="run_spark_job", + application="/opt/airflow/dag/anazarenko/spark/create_dm_pg.py", + conn_id="spark_default", + name="airflow-spark", + + application_args=[ + "--input", "s3a://anazarenko-bucket/input/", + "--output", "s3a://anazarenko-bucket/output/", + ], + conf={ + "spark.eventlog.enabled": "true", + "spark.eventlog.dir": "file:/opt/spark-events", + "spark.eventlog.compress": "true" + }, + #packages="org.postgresql:postgresql:42.7.3", + verbose=True, + ) + \ No newline at end of file