import os from pyspark.sql import SparkSession from pyspark.sql import functions as F from pyspark.sql import DataFrame spark = SparkSession.builder \ .appName("create_marts_postgresql") \ .master("local[*]") \ .getOrCreate() def read_csv_folder(path: str) -> DataFrame: return spark.read \ .option("header", "true") \ .option("inferSchema", "true") \ .csv(path) users = read_csv_folder("data/postgresql/users") courses = read_csv_folder("data/postgresql/courses") lessons = read_csv_folder("data/postgresql/lessons") enrollments = read_csv_folder("data/postgresql/enrollments") lesson_views = read_csv_folder("data/postgresql/lesson_views") user_activity_summary = users.alias("u") \ .join(enrollments.alias("e"), F.col("u.id") == F.col("e.user_id"), "inner") \ .join(lesson_views.alias("lv"), F.col("u.id") == F.col("lv.user_id"), "inner") \ .join(lessons.alias("l"), F.col("e.course_id") == F.col("l.course_id"), "inner") \ .groupBy(F.col("u.id").alias("user_id"), F.col("u.name")) \ .agg( F.countDistinct("e.course_id").alias("enrollment_count"), F.countDistinct("lv.lesson_id").alias("viewed_lessons_count"), F.max("lv.viewed_at").alias("last_view"), F.min("lv.viewed_at").alias("first_view"), F.concat(F.round(F.countDistinct("lv.lesson_id") * 100.0 / F.countDistinct("l.id"), 2), F.lit("%")).alias("engagement_rate") ) total_views_per_course = lessons.alias("l") \ .join(lesson_views.alias("lv"), F.col("l.id") == F.col("lv.lesson_id"), "inner") \ .groupBy("l.course_id") \ .agg(F.count("lv.lesson_id").alias("total_views")) course_summary = courses.alias("c") \ .join(enrollments.alias("e"), F.col("c.id") == F.col("e.course_id"), "inner") \ .join(lessons.alias("l"), F.col("c.id") == F.col("l.course_id"), "inner") \ .join(lesson_views.alias("lv"), F.col("l.id") == F.col("lv.lesson_id"), "inner") \ .join(total_views_per_course.alias("tv"), F.col("tv.course_id") == F.col("l.course_id"), "inner") \ .groupBy(F.col("c.id").alias("course_id"), F.col("c.title").alias("course_title"), F.col("tv.total_views")) \ .agg( F.countDistinct("e.user_id").alias("unique_users"), F.countDistinct("l.id").alias("lessons_count"), F.round(F.col("tv.total_views") * 1.0 / F.countDistinct("e.user_id"), 2).alias("avg_views_per_user"), F.min("lv.viewed_at").alias("first_viewed_at"), F.max("lv.viewed_at").alias("last_viewed_at") ).orderBy("course_id") platform_summary = users.alias("u") \ .join(enrollments.alias("e"), F.col("u.id") == F.col("e.user_id"), "left") \ .join(courses.alias("c"), F.col("c.id") == F.col("e.course_id"), "left") \ .join(lessons.alias("l"), F.col("l.course_id") == F.col("c.id"), "full") \ .join(lesson_views.alias("lv"), F.col("lv.user_id") == F.col("u.id"), "left") \ .agg( F.countDistinct("u.id").alias("total_users"), F.countDistinct("c.id").alias("total_courses"), F.countDistinct("l.id").alias("total_lessons"), F.countDistinct("lv.user_id").alias("users_with_views"), F.round(F.countDistinct("l.id") * 1.0 / F.countDistinct("c.id"), 2).alias("avg_lessons_per_course"), F.round(F.count("lv.id") * 1.0 / F.countDistinct("lv.lesson_id"), 2).alias("avg_views_per_lesson") ) output_dir = "data/marts/postgresql" os.makedirs(output_dir, exist_ok=True) user_activity_summary.write.mode("overwrite").option("header", "true").csv(f"{output_dir}/user_activity_summary") course_summary.write.mode("overwrite").option("header", "true").csv(f"{output_dir}/course_summary") platform_summary.write.mode("overwrite").option("header", "true").csv(f"{output_dir}/platform_summary") print(".............витрины postgresql созданы")