Files
pyspark/create_marts_postgresql.py
2026-02-24 07:09:37 +03:00

78 lines
3.8 KiB
Python

import os
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql import DataFrame
spark = SparkSession.builder \
.appName("create_marts_postgresql") \
.master("local[*]") \
.getOrCreate()
def read_csv_folder(path: str) -> DataFrame:
return spark.read \
.option("header", "true") \
.option("inferSchema", "true") \
.csv(path)
users = read_csv_folder("data/postgresql/users")
courses = read_csv_folder("data/postgresql/courses")
lessons = read_csv_folder("data/postgresql/lessons")
enrollments = read_csv_folder("data/postgresql/enrollments")
lesson_views = read_csv_folder("data/postgresql/lesson_views")
user_activity_summary = users.alias("u") \
.join(enrollments.alias("e"), F.col("u.id") == F.col("e.user_id"), "inner") \
.join(lesson_views.alias("lv"), F.col("u.id") == F.col("lv.user_id"), "inner") \
.join(lessons.alias("l"), F.col("e.course_id") == F.col("l.course_id"), "inner") \
.groupBy(F.col("u.id").alias("user_id"), F.col("u.name")) \
.agg(
F.countDistinct("e.course_id").alias("enrollment_count"),
F.countDistinct("lv.lesson_id").alias("viewed_lessons_count"),
F.max("lv.viewed_at").alias("last_view"),
F.min("lv.viewed_at").alias("first_view"),
F.concat(F.round(F.countDistinct("lv.lesson_id") * 100.0 / F.countDistinct("l.id"), 2), F.lit("%")).alias("engagement_rate")
)
total_views_per_course = lessons.alias("l") \
.join(lesson_views.alias("lv"), F.col("l.id") == F.col("lv.lesson_id"), "inner") \
.groupBy("l.course_id") \
.agg(F.count("lv.lesson_id").alias("total_views"))
course_summary = courses.alias("c") \
.join(enrollments.alias("e"), F.col("c.id") == F.col("e.course_id"), "inner") \
.join(lessons.alias("l"), F.col("c.id") == F.col("l.course_id"), "inner") \
.join(lesson_views.alias("lv"), F.col("l.id") == F.col("lv.lesson_id"), "inner") \
.join(total_views_per_course.alias("tv"), F.col("tv.course_id") == F.col("l.course_id"), "inner") \
.groupBy(F.col("c.id").alias("course_id"),
F.col("c.title").alias("course_title"),
F.col("tv.total_views")) \
.agg(
F.countDistinct("e.user_id").alias("unique_users"),
F.countDistinct("l.id").alias("lessons_count"),
F.round(F.col("tv.total_views") * 1.0 / F.countDistinct("e.user_id"), 2).alias("avg_views_per_user"),
F.min("lv.viewed_at").alias("first_viewed_at"),
F.max("lv.viewed_at").alias("last_viewed_at")
).orderBy("course_id")
platform_summary = users.alias("u") \
.join(enrollments.alias("e"), F.col("u.id") == F.col("e.user_id"), "left") \
.join(courses.alias("c"), F.col("c.id") == F.col("e.course_id"), "left") \
.join(lessons.alias("l"), F.col("l.course_id") == F.col("c.id"), "full") \
.join(lesson_views.alias("lv"), F.col("lv.user_id") == F.col("u.id"), "left") \
.agg(
F.countDistinct("u.id").alias("total_users"),
F.countDistinct("c.id").alias("total_courses"),
F.countDistinct("l.id").alias("total_lessons"),
F.countDistinct("lv.user_id").alias("users_with_views"),
F.round(F.countDistinct("l.id") * 1.0 / F.countDistinct("c.id"), 2).alias("avg_lessons_per_course"),
F.round(F.count("lv.id") * 1.0 / F.countDistinct("lv.lesson_id"), 2).alias("avg_views_per_lesson")
)
output_dir = "data/marts/postgresql"
os.makedirs(output_dir, exist_ok=True)
user_activity_summary.write.mode("overwrite").option("header", "true").csv(f"{output_dir}/user_activity_summary")
course_summary.write.mode("overwrite").option("header", "true").csv(f"{output_dir}/course_summary")
platform_summary.write.mode("overwrite").option("header", "true").csv(f"{output_dir}/platform_summary")
print(".............витрины postgresql созданы")