import os from pathlib import Path from pyspark.sql import DataFrame, SparkSession from pyspark.sql import functions as F spark = SparkSession.builder.appName("create_dm_gp").master("local[*]").getOrCreate() def read_csv(csv_filename: str, path: Path = Path("src/ch")) -> DataFrame: return spark.read.csv( path=str(path / csv_filename), header=True, inferSchema=True, ) core_dim_user = read_csv("core_dim_user.csv") core_dim_course = read_csv("core_dim_course.csv") core_dim_lesson = read_csv("core_dim_lesson.csv") core_fact_enrollments = read_csv("core_fact_enrollments.csv") core_fact_lesson_views = read_csv("core_fact_lesson_views.csv") def lesson_popularity_summary() -> DataFrame: result = ( core_dim_lesson .alias("l") .join( core_dim_course.alias("c"), F.col("c.course_id") == F.col("l.course_id"), how="inner", ) .join( core_fact_lesson_views.alias("lv"), F.col("l.lesson_id") == F.col("lv.lesson_id"), how="left", ) .groupBy(["l.lesson_id", "l.title", "c.title", "l.course_id"]) .agg( F.count("lv.lesson_id").alias("total_views"), F.count_distinct("lv.user_id").alias("unique_users"), F.min("lv.viewed_at").alias("first_view"), F.max("lv.viewed_at").alias("last_view"), ) .select( "lesson_id", F.col("l.title").alias("lesson_title"), "course_id", F.col("c.title").alias("course_title"), "total_views", "unique_users", "first_view", "last_view", ) ) return result def inactive_users_summary() -> DataFrame: active_users = core_fact_lesson_views.select("user_id").distinct() inactive_users = core_dim_user.join(active_users, on="user_id", how="left_anti") registered_courses_count = core_fact_enrollments.groupBy("user_id").agg( F.count_distinct("course_id").alias("registered_courses_count") ) result = inactive_users.join( registered_courses_count, on="user_id", how="left" ).select( "user_id", "name", "email", "age", F.coalesce("registered_courses_count", F.lit(0)).alias( "registered_courses_count" ), "registration_date", ) return result def course_completion_rate() -> DataFrame: course_stats = core_dim_lesson.groupBy("course_id").agg( F.count("lesson_id").alias("lessons_in_course") ) view_stats = core_fact_lesson_views.groupBy("user_id", "course_id").agg( F.count_distinct("lesson_id").alias("lessons_viewed") ) result = ( core_dim_user .alias("u") .join( view_stats.alias("vs"), F.col("u.user_id") == F.col("vs.user_id"), how="left", ) .join(core_dim_course.alias("c"), on="course_id", how="inner") .join(course_stats.alias("cs"), on="course_id", how="left") .select( "u.user_id", F.col("u.name").alias("user_name"), "c.course_id", F.col("c.title").alias("course_title"), "lessons_in_course", "lessons_viewed", F.round( F.coalesce("lessons_viewed", F.lit(0)) / F.col("lessons_in_course"), 2, ).alias("completion_rate"), ) ) return result output_dir = Path("marts/ch") os.makedirs(output_dir, exist_ok=True) lesson_popularity_summary().write.mode("overwrite").option("header", "true").csv( str(output_dir / "lesson_popularity_summary") ) inactive_users_summary().write.mode("overwrite").option("header", "true").csv( str(output_dir / "inactive_users_summary") ) course_completion_rate().write.mode("overwrite").option("header", "true").csv( str(output_dir / "course_completion_rate") )