78 lines
3.8 KiB
Python
78 lines
3.8 KiB
Python
import os
|
|
from pyspark.sql import SparkSession
|
|
from pyspark.sql import functions as F
|
|
from pyspark.sql import DataFrame
|
|
|
|
spark = SparkSession.builder \
|
|
.appName("create_marts_postgresql") \
|
|
.master("local[*]") \
|
|
.getOrCreate()
|
|
|
|
def read_csv_folder(path: str) -> DataFrame:
|
|
return spark.read \
|
|
.option("header", "true") \
|
|
.option("inferSchema", "true") \
|
|
.csv(path)
|
|
|
|
users = read_csv_folder("data/postgresql/users")
|
|
courses = read_csv_folder("data/postgresql/courses")
|
|
lessons = read_csv_folder("data/postgresql/lessons")
|
|
enrollments = read_csv_folder("data/postgresql/enrollments")
|
|
lesson_views = read_csv_folder("data/postgresql/lesson_views")
|
|
|
|
user_activity_summary = users.alias("u") \
|
|
.join(enrollments.alias("e"), F.col("u.id") == F.col("e.user_id"), "inner") \
|
|
.join(lesson_views.alias("lv"), F.col("u.id") == F.col("lv.user_id"), "inner") \
|
|
.join(lessons.alias("l"), F.col("e.course_id") == F.col("l.course_id"), "inner") \
|
|
.groupBy(F.col("u.id").alias("user_id"), F.col("u.name")) \
|
|
.agg(
|
|
F.countDistinct("e.course_id").alias("enrollment_count"),
|
|
F.countDistinct("lv.lesson_id").alias("viewed_lessons_count"),
|
|
F.max("lv.viewed_at").alias("last_view"),
|
|
F.min("lv.viewed_at").alias("first_view"),
|
|
F.concat(F.round(F.countDistinct("lv.lesson_id") * 100.0 / F.countDistinct("l.id"), 2), F.lit("%")).alias("engagement_rate")
|
|
)
|
|
|
|
total_views_per_course = lessons.alias("l") \
|
|
.join(lesson_views.alias("lv"), F.col("l.id") == F.col("lv.lesson_id"), "inner") \
|
|
.groupBy("l.course_id") \
|
|
.agg(F.count("lv.lesson_id").alias("total_views"))
|
|
|
|
course_summary = courses.alias("c") \
|
|
.join(enrollments.alias("e"), F.col("c.id") == F.col("e.course_id"), "inner") \
|
|
.join(lessons.alias("l"), F.col("c.id") == F.col("l.course_id"), "inner") \
|
|
.join(lesson_views.alias("lv"), F.col("l.id") == F.col("lv.lesson_id"), "inner") \
|
|
.join(total_views_per_course.alias("tv"), F.col("tv.course_id") == F.col("l.course_id"), "inner") \
|
|
.groupBy(F.col("c.id").alias("course_id"),
|
|
F.col("c.title").alias("course_title"),
|
|
F.col("tv.total_views")) \
|
|
.agg(
|
|
F.countDistinct("e.user_id").alias("unique_users"),
|
|
F.countDistinct("l.id").alias("lessons_count"),
|
|
F.round(F.col("tv.total_views") * 1.0 / F.countDistinct("e.user_id"), 2).alias("avg_views_per_user"),
|
|
F.min("lv.viewed_at").alias("first_viewed_at"),
|
|
F.max("lv.viewed_at").alias("last_viewed_at")
|
|
).orderBy("course_id")
|
|
|
|
platform_summary = users.alias("u") \
|
|
.join(enrollments.alias("e"), F.col("u.id") == F.col("e.user_id"), "left") \
|
|
.join(courses.alias("c"), F.col("c.id") == F.col("e.course_id"), "left") \
|
|
.join(lessons.alias("l"), F.col("l.course_id") == F.col("c.id"), "full") \
|
|
.join(lesson_views.alias("lv"), F.col("lv.user_id") == F.col("u.id"), "left") \
|
|
.agg(
|
|
F.countDistinct("u.id").alias("total_users"),
|
|
F.countDistinct("c.id").alias("total_courses"),
|
|
F.countDistinct("l.id").alias("total_lessons"),
|
|
F.countDistinct("lv.user_id").alias("users_with_views"),
|
|
F.round(F.countDistinct("l.id") * 1.0 / F.countDistinct("c.id"), 2).alias("avg_lessons_per_course"),
|
|
F.round(F.count("lv.id") * 1.0 / F.countDistinct("lv.lesson_id"), 2).alias("avg_views_per_lesson")
|
|
)
|
|
|
|
output_dir = "data/marts/postgresql"
|
|
os.makedirs(output_dir, exist_ok=True)
|
|
|
|
user_activity_summary.write.mode("overwrite").option("header", "true").csv(f"{output_dir}/user_activity_summary")
|
|
course_summary.write.mode("overwrite").option("header", "true").csv(f"{output_dir}/course_summary")
|
|
platform_summary.write.mode("overwrite").option("header", "true").csv(f"{output_dir}/platform_summary")
|
|
|
|
print(".............витрины postgresql созданы") |