133 lines
3.9 KiB
Python
133 lines
3.9 KiB
Python
import os
|
|
from pathlib import Path
|
|
|
|
from pyspark.sql import DataFrame, SparkSession
|
|
from pyspark.sql import functions as F
|
|
|
|
spark = SparkSession.builder.appName("create_dm_gp").master("local[*]").getOrCreate()
|
|
|
|
|
|
def read_csv(csv_filename: str, path: Path = Path("src/ch")) -> DataFrame:
|
|
return spark.read.csv(
|
|
path=str(path / csv_filename),
|
|
header=True,
|
|
inferSchema=True,
|
|
)
|
|
|
|
|
|
core_dim_user = read_csv("core_dim_user.csv")
|
|
core_dim_course = read_csv("core_dim_course.csv")
|
|
core_dim_lesson = read_csv("core_dim_lesson.csv")
|
|
core_fact_enrollments = read_csv("core_fact_enrollments.csv")
|
|
core_fact_lesson_views = read_csv("core_fact_lesson_views.csv")
|
|
|
|
|
|
def lesson_popularity_summary() -> DataFrame:
|
|
result = (
|
|
core_dim_lesson
|
|
.alias("l")
|
|
.join(
|
|
core_dim_course.alias("c"),
|
|
F.col("c.course_id") == F.col("l.course_id"),
|
|
how="inner",
|
|
)
|
|
.join(
|
|
core_fact_lesson_views.alias("lv"),
|
|
F.col("l.lesson_id") == F.col("lv.lesson_id"),
|
|
how="left",
|
|
)
|
|
.groupBy(["l.lesson_id", "l.title", "c.title", "l.course_id"])
|
|
.agg(
|
|
F.count("lv.lesson_id").alias("total_views"),
|
|
F.count_distinct("lv.user_id").alias("unique_users"),
|
|
F.min("lv.viewed_at").alias("first_view"),
|
|
F.max("lv.viewed_at").alias("last_view"),
|
|
)
|
|
.select(
|
|
"lesson_id",
|
|
F.col("l.title").alias("lesson_title"),
|
|
"course_id",
|
|
F.col("c.title").alias("course_title"),
|
|
"total_views",
|
|
"unique_users",
|
|
"first_view",
|
|
"last_view",
|
|
)
|
|
)
|
|
|
|
return result
|
|
|
|
|
|
def inactive_users_summary() -> DataFrame:
|
|
active_users = core_fact_lesson_views.select("user_id").distinct()
|
|
inactive_users = core_dim_user.join(active_users, on="user_id", how="left_anti")
|
|
|
|
registered_courses_count = core_fact_enrollments.groupBy("user_id").agg(
|
|
F.count_distinct("course_id").alias("registered_courses_count")
|
|
)
|
|
|
|
result = inactive_users.join(
|
|
registered_courses_count, on="user_id", how="left"
|
|
).select(
|
|
"user_id",
|
|
"name",
|
|
"email",
|
|
"age",
|
|
F.coalesce("registered_courses_count", F.lit(0)).alias(
|
|
"registered_courses_count"
|
|
),
|
|
"registration_date",
|
|
)
|
|
|
|
return result
|
|
|
|
|
|
def course_completion_rate() -> DataFrame:
|
|
course_stats = core_dim_lesson.groupBy("course_id").agg(
|
|
F.count("lesson_id").alias("lessons_in_course")
|
|
)
|
|
|
|
view_stats = core_fact_lesson_views.groupBy("user_id", "course_id").agg(
|
|
F.count_distinct("lesson_id").alias("lessons_viewed")
|
|
)
|
|
|
|
result = (
|
|
core_dim_user
|
|
.alias("u")
|
|
.join(
|
|
view_stats.alias("vs"),
|
|
F.col("u.user_id") == F.col("vs.user_id"),
|
|
how="left",
|
|
)
|
|
.join(core_dim_course.alias("c"), on="course_id", how="inner")
|
|
.join(course_stats.alias("cs"), on="course_id", how="left")
|
|
.select(
|
|
"u.user_id",
|
|
F.col("u.name").alias("user_name"),
|
|
"c.course_id",
|
|
F.col("c.title").alias("course_title"),
|
|
"lessons_in_course",
|
|
"lessons_viewed",
|
|
F.round(
|
|
F.coalesce("lessons_viewed", F.lit(0)) / F.col("lessons_in_course"),
|
|
2,
|
|
).alias("completion_rate"),
|
|
)
|
|
)
|
|
|
|
return result
|
|
|
|
|
|
output_dir = Path("marts/ch")
|
|
os.makedirs(output_dir, exist_ok=True)
|
|
|
|
lesson_popularity_summary().write.mode("overwrite").option("header", "true").csv(
|
|
str(output_dir / "lesson_popularity_summary")
|
|
)
|
|
inactive_users_summary().write.mode("overwrite").option("header", "true").csv(
|
|
str(output_dir / "inactive_users_summary")
|
|
)
|
|
course_completion_rate().write.mode("overwrite").option("header", "true").csv(
|
|
str(output_dir / "course_completion_rate")
|
|
)
|