import os from pathlib import Path import pyspark.sql.functions as F from pyspark.sql import DataFrame, SparkSession def get_spark() -> SparkSession: return SparkSession.builder.appName("create_dm_pg").getOrCreate() def load_postgres_table( spark: SparkSession, table_name: str, schema: str = "anazarenko", ) -> DataFrame: return ( spark.read .format("jdbc") .option("url", "jdbc:postgresql://postgres.de-infra.servehttp.com:9002/mydb") .option("dbtable", f"{schema}.{table_name}") .option("user", "anazarenko") .option("password", "rT7Wof6frDbqhMq0") .option("driver", "org.postgresql.Driver") .load() ) def user_activity_summary( enrollments: DataFrame, lesson_views: DataFrame, users: DataFrame, lessons: DataFrame, ) -> DataFrame: enrollment_stats = enrollments.groupBy("user_id").agg( F.count("course_id").alias("enrollment_count") ) view_stats = lesson_views.groupBy("user_id").agg( F.count_distinct("lesson_id").alias("viewed_lessons_count"), F.min("viewed_at").alias("first_view"), F.max("viewed_at").alias("last_view"), ) possible_lessons = ( enrollments .join(lessons, on="course_id", how="inner") .groupBy("user_id") .agg(F.count_distinct("lesson_id").alias("possible_lessons")) ) result = ( users .join(enrollment_stats, on="user_id", how="inner") .join(view_stats, on="user_id", how="left") .join(possible_lessons, on="user_id", how="left") ) result = result.withColumn( "engagement_rate", F.concat( F.round( F.coalesce(F.col("viewed_lessons_count"), F.lit(0)) * 100.0 / F.col("possible_lessons"), 2, ), F.lit("%"), ), ) result = result.select( "user_id", "name", "enrollment_count", "viewed_lessons_count", "first_view", "last_view", "engagement_rate", ) return result def course_summary( enrollments: DataFrame, lessons: DataFrame, lesson_views: DataFrame, courses: DataFrame, ) -> DataFrame: enrollment_stats = enrollments.groupBy("course_id").agg( F.count_distinct("user_id").alias("unique_users") ) view_stats = ( lessons .alias("l") .join(lesson_views.alias("lv"), on="lesson_id", how="left") .groupBy("course_id") .agg( F.count_distinct("l.lesson_id").alias("lessons_count"), F.count("lv.lesson_id").alias("total_views"), F.min("lv.viewed_at").alias("first_view"), F.max("lv.viewed_at").alias("last_view"), ) ) result = courses.join(enrollment_stats, on="course_id", how="inner").join( view_stats, on="course_id", how="inner" ) result = result.withColumn( "avg_views_per_user", F.round( F.coalesce(F.col("total_views"), F.lit(0)) / F.col("unique_users"), 2, ), ).select( "course_id", "title", "unique_users", "lessons_count", "total_views", "avg_views_per_user", "first_view", "last_view", ) return result def platform_summary( users: DataFrame, courses: DataFrame, lessons: DataFrame, lesson_views: DataFrame, ) -> DataFrame: total_users = users.agg(F.count("*").alias("total_users")) total_courses = courses.agg(F.count("*").alias("total_courses")) total_lessons = lessons.agg(F.count("*").alias("total_lessons")) users_with_views = lesson_views.agg( F.count_distinct("user_id").alias("users_with_views") ) lessons_per_course = lessons.groupBy("course_id").agg( F.count_distinct("lesson_id").alias("lessons_count") ) avg_lessons_per_course = lessons_per_course.agg( F.round(F.avg("lessons_count"), 2).alias("avg_lessons_per_course") ) views_per_lesson = lesson_views.groupBy("lesson_id").agg( F.count("*").alias("total_views") ) avg_views_per_lesson = views_per_lesson.agg( F.round(F.avg("total_views"), 2).alias("avg_views_per_lesson") ) result = ( total_users .crossJoin(total_courses) .crossJoin(total_lessons) .crossJoin(users_with_views) .crossJoin(avg_lessons_per_course) .crossJoin(avg_views_per_lesson) ) return result def save_result(df: DataFrame, name: str, output_dir: Path = Path("marts/pg")): os.makedirs(output_dir, exist_ok=True) df.write.mode("overwrite").option("header", "true").csv(str(output_dir / name)) def main(): spark = get_spark() users = load_postgres_table(spark, "users").withColumnRenamed("id", "user_id") courses = load_postgres_table(spark, "courses").withColumnRenamed("id", "course_id") lessons = load_postgres_table(spark, "lessons").withColumnRenamed("id", "lesson_id") enrollments = load_postgres_table(spark, "enrollments") lesson_views = load_postgres_table(spark, "lesson_views") df_user_activity_summary = user_activity_summary( enrollments=enrollments, lessons=lessons, lesson_views=lesson_views, users=users, ) df_course_summary = course_summary( enrollments=enrollments, lessons=lessons, lesson_views=lesson_views, courses=courses, ) df_platform_summary = platform_summary( courses=courses, lessons=lessons, lesson_views=lesson_views, users=users, ) save_result(df_user_activity_summary, "user_activity_summary") save_result(df_course_summary, "course_summary") save_result(df_platform_summary, "platform_summary") if __name__ == "__main__": main()