import os from typing import List from pyspark.sql import SparkSession from pyspark.sql import functions as F from pyspark.sql import DataFrame spark = SparkSession.builder \ .appName("create_marts_clickhouse") \ .master("local[*]") \ .getOrCreate() def read_csv_folder(path: str) -> DataFrame: return spark.read \ .option("header", "true") \ .option("inferSchema", "true") \ .csv(path) users = read_csv_folder("data/clickhouse/users") courses = read_csv_folder("data/clickhouse/courses") lessons = read_csv_folder("data/clickhouse/lessons") enrollments = read_csv_folder("data/clickhouse/enrollments") lesson_views = read_csv_folder("data/clickhouse/lesson_views") def cast_int_columns(df: DataFrame, int_cols: List[str]) -> DataFrame: for c in int_cols: if c in df.columns: df = df.withColumn(c, F.col(c).cast("int")) return df users = cast_int_columns(users, ["id"]) courses = cast_int_columns(courses, ["id"]) lessons = cast_int_columns(lessons, ["id", "course_id"]) enrollments = cast_int_columns(enrollments, ["id", "user_id", "course_id"]) lesson_views = cast_int_columns(lesson_views, ["id", "user_id", "lesson_id"]) lesson_popularity_summary = lesson_views.alias("lv") \ .join(lessons.alias("l"), F.col("lv.lesson_id") == F.col("l.id")) \ .join(courses.alias("c"), F.col("l.course_id") == F.col("c.id")) \ .groupBy(F.col("l.id").alias("lesson_id"), F.col("l.title").alias("lesson_title"), F.col("c.id").alias("course_id"), F.col("c.title").alias("course_title")) \ .agg( F.count("lv.lesson_id").alias("total_views"), F.countDistinct("lv.user_id").alias("unique_users"), F.min("lv.viewed_at").alias("first_view"), F.max("lv.viewed_at").alias("last_view") ).orderBy("lesson_id") inactive_users_summary = users.alias("u") \ .join(enrollments.alias("e"), F.col("u.id") == F.col("e.user_id"), "left") \ .join(lesson_views.alias("lv"), F.col("u.id") == F.col("lv.user_id"), "left") \ .filter(F.col("lv.user_id").isNull()) \ .groupBy(F.col("u.id").alias("user_id"), F.col("u.name").alias("user_name"), F.col("u.email").alias("user_email"), F.col("u.age").alias("user_age"), F.col("u.registration_date").alias("user_registration_date")) \ .agg( F.countDistinct("e.course_id").alias("enrollments_count") ).orderBy("user_id") course_completion_rate = courses.alias("c") \ .join(enrollments.alias("e"), F.col("c.id") == F.col("e.course_id")) \ .join(users.alias("u"), F.col("e.user_id") == F.col("u.id")) \ .join(lessons.alias("l"), F.col("c.id") == F.col("l.course_id")) \ .join(lesson_views.alias("lv"), (F.col("u.id") == F.col("lv.user_id")) & (F.col("l.id") == F.col("lv.lesson_id")), "left") \ .groupBy(F.col("u.id").alias("user_id"), F.col("u.name").alias("user_name"), F.col("c.id").alias("course_id"), F.col("c.title").alias("course_title")) \ .agg( F.countDistinct("l.id").alias("lessons_in_course"), F.countDistinct("lv.lesson_id").alias("lessons_viewed"), F.round(F.countDistinct("lv.lesson_id") * 1.0 / F.countDistinct("l.id"), 2).alias("completion_rate") ).orderBy("user_id", "course_id") output_dir = "data/marts/clickhouse" os.makedirs(output_dir, exist_ok=True) lesson_popularity_summary.write.mode("overwrite").option("header", "true").csv(f"{output_dir}/lesson_popularity_summary") inactive_users_summary.write.mode("overwrite").option("header", "true").csv(f"{output_dir}/inactive_users_summary") course_completion_rate.write.mode("overwrite").option("header", "true").csv(f"{output_dir}/course_completion_rate") print(".............витрины clickhouse созданы")