211 lines
5.8 KiB
Python
211 lines
5.8 KiB
Python
import os
|
|
from pathlib import Path
|
|
|
|
import pyspark.sql.functions as F
|
|
from pyspark.sql import DataFrame, SparkSession
|
|
|
|
|
|
def get_spark() -> SparkSession:
|
|
return SparkSession.builder.appName("create_dm_pg").getOrCreate()
|
|
|
|
|
|
def load_postgres_table(
|
|
spark: SparkSession,
|
|
table_name: str,
|
|
schema: str = "anazarenko",
|
|
) -> DataFrame:
|
|
return (
|
|
spark.read
|
|
.format("jdbc")
|
|
.option("url", "jdbc:postgresql://postgres.de-infra.servehttp.com:9002/mydb")
|
|
.option("dbtable", f"{schema}.{table_name}")
|
|
.option("user", "anazarenko")
|
|
.option("password", "rT7Wof6frDbqhMq0")
|
|
.option("driver", "org.postgresql.Driver")
|
|
.load()
|
|
)
|
|
|
|
|
|
def user_activity_summary(
|
|
enrollments: DataFrame,
|
|
lesson_views: DataFrame,
|
|
users: DataFrame,
|
|
lessons: DataFrame,
|
|
) -> DataFrame:
|
|
enrollment_stats = enrollments.groupBy("user_id").agg(
|
|
F.count("course_id").alias("enrollment_count")
|
|
)
|
|
|
|
view_stats = lesson_views.groupBy("user_id").agg(
|
|
F.count_distinct("lesson_id").alias("viewed_lessons_count"),
|
|
F.min("viewed_at").alias("first_view"),
|
|
F.max("viewed_at").alias("last_view"),
|
|
)
|
|
|
|
possible_lessons = (
|
|
enrollments
|
|
.join(lessons, on="course_id", how="inner")
|
|
.groupBy("user_id")
|
|
.agg(F.count_distinct("lesson_id").alias("possible_lessons"))
|
|
)
|
|
|
|
result = (
|
|
users
|
|
.join(enrollment_stats, on="user_id", how="inner")
|
|
.join(view_stats, on="user_id", how="left")
|
|
.join(possible_lessons, on="user_id", how="left")
|
|
)
|
|
|
|
result = result.withColumn(
|
|
"engagement_rate",
|
|
F.concat(
|
|
F.round(
|
|
F.coalesce(F.col("viewed_lessons_count"), F.lit(0))
|
|
* 100.0
|
|
/ F.col("possible_lessons"),
|
|
2,
|
|
),
|
|
F.lit("%"),
|
|
),
|
|
)
|
|
|
|
result = result.select(
|
|
"user_id",
|
|
"name",
|
|
"enrollment_count",
|
|
"viewed_lessons_count",
|
|
"first_view",
|
|
"last_view",
|
|
"engagement_rate",
|
|
)
|
|
|
|
return result
|
|
|
|
|
|
def course_summary(
|
|
enrollments: DataFrame,
|
|
lessons: DataFrame,
|
|
lesson_views: DataFrame,
|
|
courses: DataFrame,
|
|
) -> DataFrame:
|
|
enrollment_stats = enrollments.groupBy("course_id").agg(
|
|
F.count_distinct("user_id").alias("unique_users")
|
|
)
|
|
|
|
view_stats = (
|
|
lessons
|
|
.alias("l")
|
|
.join(lesson_views.alias("lv"), on="lesson_id", how="left")
|
|
.groupBy("course_id")
|
|
.agg(
|
|
F.count_distinct("l.lesson_id").alias("lessons_count"),
|
|
F.count("lv.lesson_id").alias("total_views"),
|
|
F.min("lv.viewed_at").alias("first_view"),
|
|
F.max("lv.viewed_at").alias("last_view"),
|
|
)
|
|
)
|
|
|
|
result = courses.join(enrollment_stats, on="course_id", how="inner").join(
|
|
view_stats, on="course_id", how="inner"
|
|
)
|
|
|
|
result = result.withColumn(
|
|
"avg_views_per_user",
|
|
F.round(
|
|
F.coalesce(F.col("total_views"), F.lit(0)) / F.col("unique_users"),
|
|
2,
|
|
),
|
|
).select(
|
|
"course_id",
|
|
"title",
|
|
"unique_users",
|
|
"lessons_count",
|
|
"total_views",
|
|
"avg_views_per_user",
|
|
"first_view",
|
|
"last_view",
|
|
)
|
|
|
|
return result
|
|
|
|
|
|
def platform_summary(
|
|
users: DataFrame,
|
|
courses: DataFrame,
|
|
lessons: DataFrame,
|
|
lesson_views: DataFrame,
|
|
) -> DataFrame:
|
|
total_users = users.agg(F.count("*").alias("total_users"))
|
|
total_courses = courses.agg(F.count("*").alias("total_courses"))
|
|
total_lessons = lessons.agg(F.count("*").alias("total_lessons"))
|
|
users_with_views = lesson_views.agg(
|
|
F.count_distinct("user_id").alias("users_with_views")
|
|
)
|
|
|
|
lessons_per_course = lessons.groupBy("course_id").agg(
|
|
F.count_distinct("lesson_id").alias("lessons_count")
|
|
)
|
|
avg_lessons_per_course = lessons_per_course.agg(
|
|
F.round(F.avg("lessons_count"), 2).alias("avg_lessons_per_course")
|
|
)
|
|
|
|
views_per_lesson = lesson_views.groupBy("lesson_id").agg(
|
|
F.count("*").alias("total_views")
|
|
)
|
|
avg_views_per_lesson = views_per_lesson.agg(
|
|
F.round(F.avg("total_views"), 2).alias("avg_views_per_lesson")
|
|
)
|
|
|
|
result = (
|
|
total_users
|
|
.crossJoin(total_courses)
|
|
.crossJoin(total_lessons)
|
|
.crossJoin(users_with_views)
|
|
.crossJoin(avg_lessons_per_course)
|
|
.crossJoin(avg_views_per_lesson)
|
|
)
|
|
|
|
return result
|
|
|
|
|
|
def save_result(df: DataFrame, name: str, output_dir: Path = Path("marts/pg")):
|
|
os.makedirs(output_dir, exist_ok=True)
|
|
df.write.mode("overwrite").option("header", "true").csv(str(output_dir / name))
|
|
|
|
|
|
def main():
|
|
spark = get_spark()
|
|
|
|
users = load_postgres_table(spark, "users").withColumnRenamed("id", "user_id")
|
|
courses = load_postgres_table(spark, "courses").withColumnRenamed("id", "course_id")
|
|
lessons = load_postgres_table(spark, "lessons").withColumnRenamed("id", "lesson_id")
|
|
enrollments = load_postgres_table(spark, "enrollments")
|
|
lesson_views = load_postgres_table(spark, "lesson_views")
|
|
|
|
df_user_activity_summary = user_activity_summary(
|
|
enrollments=enrollments,
|
|
lessons=lessons,
|
|
lesson_views=lesson_views,
|
|
users=users,
|
|
)
|
|
df_course_summary = course_summary(
|
|
enrollments=enrollments,
|
|
lessons=lessons,
|
|
lesson_views=lesson_views,
|
|
courses=courses,
|
|
)
|
|
df_platform_summary = platform_summary(
|
|
courses=courses,
|
|
lessons=lessons,
|
|
lesson_views=lesson_views,
|
|
users=users,
|
|
)
|
|
|
|
save_result(df_user_activity_summary, "user_activity_summary")
|
|
save_result(df_course_summary, "course_summary")
|
|
save_result(df_platform_summary, "platform_summary")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|