Files
spark/pet-project/create_dm_pg.py
2026-06-12 16:30:32 +07:00

211 lines
5.8 KiB
Python

import os
from pathlib import Path
import pyspark.sql.functions as F
from pyspark.sql import DataFrame, SparkSession
def get_spark() -> SparkSession:
return SparkSession.builder.appName("create_dm_pg").getOrCreate()
def load_postgres_table(
spark: SparkSession,
table_name: str,
schema: str = "anazarenko",
) -> DataFrame:
return (
spark.read
.format("jdbc")
.option("url", "jdbc:postgresql://postgres.de-infra.servehttp.com:9002/mydb")
.option("dbtable", f"{schema}.{table_name}")
.option("user", "anazarenko")
.option("password", "rT7Wof6frDbqhMq0")
.option("driver", "org.postgresql.Driver")
.load()
)
def user_activity_summary(
enrollments: DataFrame,
lesson_views: DataFrame,
users: DataFrame,
lessons: DataFrame,
) -> DataFrame:
enrollment_stats = enrollments.groupBy("user_id").agg(
F.count("course_id").alias("enrollment_count")
)
view_stats = lesson_views.groupBy("user_id").agg(
F.count_distinct("lesson_id").alias("viewed_lessons_count"),
F.min("viewed_at").alias("first_view"),
F.max("viewed_at").alias("last_view"),
)
possible_lessons = (
enrollments
.join(lessons, on="course_id", how="inner")
.groupBy("user_id")
.agg(F.count_distinct("lesson_id").alias("possible_lessons"))
)
result = (
users
.join(enrollment_stats, on="user_id", how="inner")
.join(view_stats, on="user_id", how="left")
.join(possible_lessons, on="user_id", how="left")
)
result = result.withColumn(
"engagement_rate",
F.concat(
F.round(
F.coalesce(F.col("viewed_lessons_count"), F.lit(0))
* 100.0
/ F.col("possible_lessons"),
2,
),
F.lit("%"),
),
)
result = result.select(
"user_id",
"name",
"enrollment_count",
"viewed_lessons_count",
"first_view",
"last_view",
"engagement_rate",
)
return result
def course_summary(
enrollments: DataFrame,
lessons: DataFrame,
lesson_views: DataFrame,
courses: DataFrame,
) -> DataFrame:
enrollment_stats = enrollments.groupBy("course_id").agg(
F.count_distinct("user_id").alias("unique_users")
)
view_stats = (
lessons
.alias("l")
.join(lesson_views.alias("lv"), on="lesson_id", how="left")
.groupBy("course_id")
.agg(
F.count_distinct("l.lesson_id").alias("lessons_count"),
F.count("lv.lesson_id").alias("total_views"),
F.min("lv.viewed_at").alias("first_view"),
F.max("lv.viewed_at").alias("last_view"),
)
)
result = courses.join(enrollment_stats, on="course_id", how="inner").join(
view_stats, on="course_id", how="inner"
)
result = result.withColumn(
"avg_views_per_user",
F.round(
F.coalesce(F.col("total_views"), F.lit(0)) / F.col("unique_users"),
2,
),
).select(
"course_id",
"title",
"unique_users",
"lessons_count",
"total_views",
"avg_views_per_user",
"first_view",
"last_view",
)
return result
def platform_summary(
users: DataFrame,
courses: DataFrame,
lessons: DataFrame,
lesson_views: DataFrame,
) -> DataFrame:
total_users = users.agg(F.count("*").alias("total_users"))
total_courses = courses.agg(F.count("*").alias("total_courses"))
total_lessons = lessons.agg(F.count("*").alias("total_lessons"))
users_with_views = lesson_views.agg(
F.count_distinct("user_id").alias("users_with_views")
)
lessons_per_course = lessons.groupBy("course_id").agg(
F.count_distinct("lesson_id").alias("lessons_count")
)
avg_lessons_per_course = lessons_per_course.agg(
F.round(F.avg("lessons_count"), 2).alias("avg_lessons_per_course")
)
views_per_lesson = lesson_views.groupBy("lesson_id").agg(
F.count("*").alias("total_views")
)
avg_views_per_lesson = views_per_lesson.agg(
F.round(F.avg("total_views"), 2).alias("avg_views_per_lesson")
)
result = (
total_users
.crossJoin(total_courses)
.crossJoin(total_lessons)
.crossJoin(users_with_views)
.crossJoin(avg_lessons_per_course)
.crossJoin(avg_views_per_lesson)
)
return result
def save_result(df: DataFrame, name: str, output_dir: Path = Path("marts/pg")):
os.makedirs(output_dir, exist_ok=True)
df.write.mode("overwrite").option("header", "true").csv(str(output_dir / name))
def main():
spark = get_spark()
users = load_postgres_table(spark, "users").withColumnRenamed("id", "user_id")
courses = load_postgres_table(spark, "courses").withColumnRenamed("id", "course_id")
lessons = load_postgres_table(spark, "lessons").withColumnRenamed("id", "lesson_id")
enrollments = load_postgres_table(spark, "enrollments")
lesson_views = load_postgres_table(spark, "lesson_views")
df_user_activity_summary = user_activity_summary(
enrollments=enrollments,
lessons=lessons,
lesson_views=lesson_views,
users=users,
)
df_course_summary = course_summary(
enrollments=enrollments,
lessons=lessons,
lesson_views=lesson_views,
courses=courses,
)
df_platform_summary = platform_summary(
courses=courses,
lessons=lessons,
lesson_views=lesson_views,
users=users,
)
save_result(df_user_activity_summary, "user_activity_summary")
save_result(df_course_summary, "course_summary")
save_result(df_platform_summary, "platform_summary")
if __name__ == "__main__":
main()