Files
spark/pet-project/create_dm_ch.py
2026-06-10 00:14:21 +07:00

133 lines
3.9 KiB
Python

import os
from pathlib import Path
from pyspark.sql import DataFrame, SparkSession
from pyspark.sql import functions as F
spark = SparkSession.builder.appName("create_dm_gp").master("local[*]").getOrCreate()
def read_csv(csv_filename: str, path: Path = Path("src/ch")) -> DataFrame:
return spark.read.csv(
path=str(path / csv_filename),
header=True,
inferSchema=True,
)
core_dim_user = read_csv("core_dim_user.csv")
core_dim_course = read_csv("core_dim_course.csv")
core_dim_lesson = read_csv("core_dim_lesson.csv")
core_fact_enrollments = read_csv("core_fact_enrollments.csv")
core_fact_lesson_views = read_csv("core_fact_lesson_views.csv")
def lesson_popularity_summary() -> DataFrame:
result = (
core_dim_lesson
.alias("l")
.join(
core_dim_course.alias("c"),
F.col("c.course_id") == F.col("l.course_id"),
how="inner",
)
.join(
core_fact_lesson_views.alias("lv"),
F.col("l.lesson_id") == F.col("lv.lesson_id"),
how="left",
)
.groupBy(["l.lesson_id", "l.title", "c.title", "l.course_id"])
.agg(
F.count("lv.lesson_id").alias("total_views"),
F.count_distinct("lv.user_id").alias("unique_users"),
F.min("lv.viewed_at").alias("first_view"),
F.max("lv.viewed_at").alias("last_view"),
)
.select(
"lesson_id",
F.col("l.title").alias("lesson_title"),
"course_id",
F.col("c.title").alias("course_title"),
"total_views",
"unique_users",
"first_view",
"last_view",
)
)
return result
def inactive_users_summary() -> DataFrame:
active_users = core_fact_lesson_views.select("user_id").distinct()
inactive_users = core_dim_user.join(active_users, on="user_id", how="left_anti")
registered_courses_count = core_fact_enrollments.groupBy("user_id").agg(
F.count_distinct("course_id").alias("registered_courses_count")
)
result = inactive_users.join(
registered_courses_count, on="user_id", how="left"
).select(
"user_id",
"name",
"email",
"age",
F.coalesce("registered_courses_count", F.lit(0)).alias(
"registered_courses_count"
),
"registration_date",
)
return result
def course_completion_rate() -> DataFrame:
course_stats = core_dim_lesson.groupBy("course_id").agg(
F.count("lesson_id").alias("lessons_in_course")
)
view_stats = core_fact_lesson_views.groupBy("user_id", "course_id").agg(
F.count_distinct("lesson_id").alias("lessons_viewed")
)
result = (
core_dim_user
.alias("u")
.join(
view_stats.alias("vs"),
F.col("u.user_id") == F.col("vs.user_id"),
how="left",
)
.join(core_dim_course.alias("c"), on="course_id", how="inner")
.join(course_stats.alias("cs"), on="course_id", how="left")
.select(
"u.user_id",
F.col("u.name").alias("user_name"),
"c.course_id",
F.col("c.title").alias("course_title"),
"lessons_in_course",
"lessons_viewed",
F.round(
F.coalesce("lessons_viewed", F.lit(0)) / F.col("lessons_in_course"),
2,
).alias("completion_rate"),
)
)
return result
output_dir = Path("marts/ch")
os.makedirs(output_dir, exist_ok=True)
lesson_popularity_summary().write.mode("overwrite").option("header", "true").csv(
str(output_dir / "lesson_popularity_summary")
)
inactive_users_summary().write.mode("overwrite").option("header", "true").csv(
str(output_dir / "inactive_users_summary")
)
course_completion_rate().write.mode("overwrite").option("header", "true").csv(
str(output_dir / "course_completion_rate")
)