Files
spark/pet-project/create_dm_pg.py
2026-06-10 00:14:21 +07:00

164 lines
4.6 KiB
Python

import os
from pathlib import Path
import pyspark.sql.functions as F
from pyspark.sql import DataFrame, SparkSession
spark = SparkSession.builder.appName("create_dm_pg").master("local[*]").getOrCreate()
def read_csv(csv_filename: str, path: Path = Path("src/pg")) -> DataFrame:
return spark.read.csv(
path=str(path / csv_filename),
header=True,
inferSchema=True,
)
users = read_csv("users.csv").withColumnRenamed("id", "user_id")
courses = read_csv("courses.csv").withColumnRenamed("id", "course_id")
lessons = read_csv("lessons.csv").withColumnRenamed("id", "lesson_id")
enrollments = read_csv("enrollments.csv")
lesson_views = read_csv("lesson_views.csv")
def user_activity_summary() -> DataFrame:
enrollment_stats = enrollments.groupBy("user_id").agg(
F.count("course_id").alias("enrollment_count")
)
view_stats = lesson_views.groupBy("user_id").agg(
F.count_distinct("lesson_id").alias("viewed_lessons_count"),
F.min("viewed_at").alias("first_view"),
F.max("viewed_at").alias("last_view"),
)
possible_lessons = (
enrollments
.join(lessons, on="course_id", how="inner")
.groupBy("user_id")
.agg(F.count_distinct("lesson_id").alias("possible_lessons"))
)
result = (
users
.join(enrollment_stats, on="user_id", how="inner")
.join(view_stats, on="user_id", how="left")
.join(possible_lessons, on="user_id", how="left")
)
result = result.withColumn(
"engagement_rate",
F.concat(
F.round(
F.coalesce(F.col("viewed_lessons_count"), F.lit(0))
* 100.0
/ F.col("possible_lessons"),
2,
),
F.lit("%"),
),
)
result = result.select(
"user_id",
"name",
"enrollment_count",
"viewed_lessons_count",
"first_view",
"last_view",
"engagement_rate",
)
return result
def course_summary() -> DataFrame:
enrollment_stats = enrollments.groupBy("course_id").agg(
F.count_distinct("user_id").alias("unique_users")
)
view_stats = (
lessons
.alias("l")
.join(lesson_views.alias("lv"), on="lesson_id", how="left")
.groupBy("course_id")
.agg(
F.count_distinct("l.lesson_id").alias("lessons_count"),
F.count("lv.lesson_id").alias("total_views"),
F.min("lv.viewed_at").alias("first_view"),
F.max("lv.viewed_at").alias("last_view"),
)
)
result = courses.join(enrollment_stats, on="course_id", how="inner").join(
view_stats, on="course_id", how="inner"
)
result = result.withColumn(
"avg_views_per_user",
F.round(
F.coalesce(F.col("total_views"), F.lit(0)) / F.col("unique_users"),
2,
),
).select(
"course_id",
"title",
"unique_users",
"lessons_count",
"total_views",
"avg_views_per_user",
"first_view",
"last_view",
)
return result
def platform_summary() -> DataFrame:
total_users = users.agg(F.count("*").alias("total_users"))
total_courses = courses.agg(F.count("*").alias("total_courses"))
total_lessons = lessons.agg(F.count("*").alias("total_lessons"))
users_with_views = lesson_views.agg(
F.count_distinct("user_id").alias("users_with_views")
)
lessons_per_course = lessons.groupBy("course_id").agg(
F.count_distinct("lesson_id").alias("lessons_count")
)
avg_lessons_per_course = lessons_per_course.agg(
F.round(F.avg("lessons_count"), 2).alias("avg_lessons_per_course")
)
views_per_lesson = lesson_views.groupBy("lesson_id").agg(
F.count("*").alias("total_views")
)
avg_views_per_lesson = views_per_lesson.agg(
F.round(F.avg("total_views"), 2).alias("avg_views_per_lesson")
)
result = (
total_users
.crossJoin(total_courses)
.crossJoin(total_lessons)
.crossJoin(users_with_views)
.crossJoin(avg_lessons_per_course)
.crossJoin(avg_views_per_lesson)
)
return result
output_dir = Path("marts/pg")
os.makedirs(output_dir, exist_ok=True)
user_activity_summary().write.mode("overwrite").option("header", "true").csv(
str(output_dir / "user_activity_summary")
)
course_summary().write.mode("overwrite").option("header", "true").csv(
str(output_dir / "course_summary")
)
platform_summary().write.mode("overwrite").option("header", "true").csv(
str(output_dir / "platform_summary")
)