import os from pyspark.sql import SparkSession from pyspark.sql import functions as F from pyspark.sql import DataFrame spark = SparkSession.builder \ .appName("create_marts_greenplum") \ .master("local[*]") \ .getOrCreate() def read_csv_folder(path: str) -> DataFrame: return spark.read \ .option("header", "true") \ .option("inferSchema", "true") \ .csv(path) dim_user = read_csv_folder("data/greenplum/dim_user") dim_course = read_csv_folder("data/greenplum/dim_course") dim_lesson = read_csv_folder("data/greenplum/dim_lesson") fact_lesson_views = read_csv_folder("data/greenplum/fact_lesson_views") fact_enrollments = read_csv_folder("data/greenplum/fact_enrollments") lesson_popularity_summary = fact_lesson_views.alias("flv") \ .join(dim_lesson.alias("dl"), F.col("flv.lesson_id") == F.col("dl.lesson_id")) \ .join(dim_course.alias("dc"), F.col("flv.course_id") == F.col("dc.course_id")) \ .groupBy( F.col("dl.lesson_id").alias("lesson_id"), F.col("dl.title").alias("lesson_title"), F.col("dc.course_id").alias("course_id"), F.col("dc.title").alias("course_title") ).agg( F.count("flv.lesson_id").alias("total_views"), F.countDistinct("flv.user_id").alias("unique_users"), F.min("flv.viewed_at").alias("first_view"), F.max("flv.viewed_at").alias("last_view") ).orderBy("lesson_id") inactive_users_summary = dim_user.alias("du") \ .join(fact_lesson_views.alias("flv"), F.col("du.user_id") == F.col("flv.user_id"), "left") \ .join(fact_enrollments.alias("fe"), F.col("du.user_id") == F.col("fe.user_id"), "left") \ .filter(F.col("flv.user_id").isNull()) \ .groupBy( F.col("du.user_id").alias("user_id"), F.col("du.name").alias("user_name"), F.col("du.email"), F.col("du.age"), F.col("du.registration_date") ).agg( F.countDistinct("fe.course_id").alias("enrollments_count") ).orderBy("user_id") course_completion_rate = fact_enrollments.alias("fe") \ .join(dim_user.alias("du"), F.col("fe.user_id") == F.col("du.user_id")) \ .join(dim_course.alias("dc"), F.col("fe.course_id") == F.col("dc.course_id")) \ .join(dim_lesson.alias("dl"), F.col("dc.course_id") == F.col("dl.course_id")) \ .join(fact_lesson_views.alias("flv"), (F.col("du.user_id") == F.col("flv.user_id")) & (F.col("dl.lesson_id") == F.col("flv.lesson_id")), "left") \ .groupBy( F.col("du.user_id").alias("user_id"), F.col("du.name").alias("user_name"), F.col("dc.course_id").alias("course_id"), F.col("dc.title").alias("course_title") ).agg( F.countDistinct("dl.lesson_id").alias("lessons_in_course"), F.countDistinct("flv.lesson_id").alias("lessons_viewed"), F.when( F.countDistinct("dl.lesson_id") == 0, F.lit(0.0) ).otherwise( F.countDistinct("flv.lesson_id") * 1.0 / F.countDistinct("dl.lesson_id") ).alias("completion_rate") ).orderBy("user_id", "course_id") output_dir = "data/marts/greenplum" os.makedirs(output_dir, exist_ok=True) lesson_popularity_summary.write.mode("overwrite").option("header", "true").csv(f"{output_dir}/lesson_popularity_summary") inactive_users_summary.write.mode("overwrite").option("header", "true").csv(f"{output_dir}/inactive_users_summary") course_completion_rate.write.mode("overwrite").option("header", "true").csv(f"{output_dir}/course_completion_rate") print(".............витрины greenplum созданы")