source files only
This commit is contained in:
78
create_marts_postgresql.py
Normal file
78
create_marts_postgresql.py
Normal file
@@ -0,0 +1,78 @@
|
||||
import os
|
||||
from pyspark.sql import SparkSession
|
||||
from pyspark.sql import functions as F
|
||||
from pyspark.sql import DataFrame
|
||||
|
||||
spark = SparkSession.builder \
|
||||
.appName("create_marts_postgresql") \
|
||||
.master("local[*]") \
|
||||
.getOrCreate()
|
||||
|
||||
def read_csv_folder(path: str) -> DataFrame:
|
||||
return spark.read \
|
||||
.option("header", "true") \
|
||||
.option("inferSchema", "true") \
|
||||
.csv(path)
|
||||
|
||||
users = read_csv_folder("data/postgresql/users")
|
||||
courses = read_csv_folder("data/postgresql/courses")
|
||||
lessons = read_csv_folder("data/postgresql/lessons")
|
||||
enrollments = read_csv_folder("data/postgresql/enrollments")
|
||||
lesson_views = read_csv_folder("data/postgresql/lesson_views")
|
||||
|
||||
user_activity_summary = users.alias("u") \
|
||||
.join(enrollments.alias("e"), F.col("u.id") == F.col("e.user_id"), "inner") \
|
||||
.join(lesson_views.alias("lv"), F.col("u.id") == F.col("lv.user_id"), "inner") \
|
||||
.join(lessons.alias("l"), F.col("e.course_id") == F.col("l.course_id"), "inner") \
|
||||
.groupBy(F.col("u.id").alias("user_id"), F.col("u.name")) \
|
||||
.agg(
|
||||
F.countDistinct("e.course_id").alias("enrollment_count"),
|
||||
F.countDistinct("lv.lesson_id").alias("viewed_lessons_count"),
|
||||
F.max("lv.viewed_at").alias("last_view"),
|
||||
F.min("lv.viewed_at").alias("first_view"),
|
||||
F.concat(F.round(F.countDistinct("lv.lesson_id") * 100.0 / F.countDistinct("l.id"), 2), F.lit("%")).alias("engagement_rate")
|
||||
)
|
||||
|
||||
total_views_per_course = lessons.alias("l") \
|
||||
.join(lesson_views.alias("lv"), F.col("l.id") == F.col("lv.lesson_id"), "inner") \
|
||||
.groupBy("l.course_id") \
|
||||
.agg(F.count("lv.lesson_id").alias("total_views"))
|
||||
|
||||
course_summary = courses.alias("c") \
|
||||
.join(enrollments.alias("e"), F.col("c.id") == F.col("e.course_id"), "inner") \
|
||||
.join(lessons.alias("l"), F.col("c.id") == F.col("l.course_id"), "inner") \
|
||||
.join(lesson_views.alias("lv"), F.col("l.id") == F.col("lv.lesson_id"), "inner") \
|
||||
.join(total_views_per_course.alias("tv"), F.col("tv.course_id") == F.col("l.course_id"), "inner") \
|
||||
.groupBy(F.col("c.id").alias("course_id"),
|
||||
F.col("c.title").alias("course_title"),
|
||||
F.col("tv.total_views")) \
|
||||
.agg(
|
||||
F.countDistinct("e.user_id").alias("unique_users"),
|
||||
F.countDistinct("l.id").alias("lessons_count"),
|
||||
F.round(F.col("tv.total_views") * 1.0 / F.countDistinct("e.user_id"), 2).alias("avg_views_per_user"),
|
||||
F.min("lv.viewed_at").alias("first_viewed_at"),
|
||||
F.max("lv.viewed_at").alias("last_viewed_at")
|
||||
).orderBy("course_id")
|
||||
|
||||
platform_summary = users.alias("u") \
|
||||
.join(enrollments.alias("e"), F.col("u.id") == F.col("e.user_id"), "left") \
|
||||
.join(courses.alias("c"), F.col("c.id") == F.col("e.course_id"), "left") \
|
||||
.join(lessons.alias("l"), F.col("l.course_id") == F.col("c.id"), "full") \
|
||||
.join(lesson_views.alias("lv"), F.col("lv.user_id") == F.col("u.id"), "left") \
|
||||
.agg(
|
||||
F.countDistinct("u.id").alias("total_users"),
|
||||
F.countDistinct("c.id").alias("total_courses"),
|
||||
F.countDistinct("l.id").alias("total_lessons"),
|
||||
F.countDistinct("lv.user_id").alias("users_with_views"),
|
||||
F.round(F.countDistinct("l.id") * 1.0 / F.countDistinct("c.id"), 2).alias("avg_lessons_per_course"),
|
||||
F.round(F.count("lv.id") * 1.0 / F.countDistinct("lv.lesson_id"), 2).alias("avg_views_per_lesson")
|
||||
)
|
||||
|
||||
output_dir = "data/marts/postgresql"
|
||||
os.makedirs(output_dir, exist_ok=True)
|
||||
|
||||
user_activity_summary.write.mode("overwrite").option("header", "true").csv(f"{output_dir}/user_activity_summary")
|
||||
course_summary.write.mode("overwrite").option("header", "true").csv(f"{output_dir}/course_summary")
|
||||
platform_summary.write.mode("overwrite").option("header", "true").csv(f"{output_dir}/platform_summary")
|
||||
|
||||
print(".............витрины postgresql созданы")
|
||||
Reference in New Issue
Block a user