From 5da4df2f14b13b2d3dae4dc5db9a96096b247bf3 Mon Sep 17 00:00:00 2001 From: gsavelev Date: Tue, 24 Feb 2026 07:09:37 +0300 Subject: [PATCH] source files only --- README.md | 21 +++++++++ create_marts_clickhouse.py | 88 ++++++++++++++++++++++++++++++++++ create_marts_greenplum.py | 83 ++++++++++++++++++++++++++++++++ create_marts_postgresql.py | 78 +++++++++++++++++++++++++++++++ extract_csv.py | 96 ++++++++++++++++++++++++++++++++++++++ main.py | 6 +++ requirements.txt | 9 ++++ 7 files changed, 381 insertions(+) create mode 100644 README.md create mode 100644 create_marts_clickhouse.py create mode 100644 create_marts_greenplum.py create mode 100644 create_marts_postgresql.py create mode 100644 extract_csv.py create mode 100644 main.py create mode 100644 requirements.txt diff --git a/README.md b/README.md new file mode 100644 index 0000000..65cf665 --- /dev/null +++ b/README.md @@ -0,0 +1,21 @@ +Установка зависимостей +1. Клонируем репозиторий (bash) + git clone https://gitea.de-infra.servehttp.com/gsavelev/pyspark.git + cd pyspark +2. Создаем и активируем виртуальное окружение (Python 3.10+) + python -m venv pyspark_env + source pyspark_env/bin/activate # Linux / WSL + # Windows CMD: pyspark_env\Scripts\activate.bat + # Windows PowerShell: pyspark_env\Scripts\Activate.ps1 +3. Устанавливаем зависимости + pip install -r requirements.txt +4. Требования + Python 3.10+ + Java JDK 11+ + PySpark 4.0.1 (requirements.txt) +5. Проверка версий + python --version + pip show pyspark + java -version +6. Запуск + python main.py \ No newline at end of file diff --git a/create_marts_clickhouse.py b/create_marts_clickhouse.py new file mode 100644 index 0000000..54a0d36 --- /dev/null +++ b/create_marts_clickhouse.py @@ -0,0 +1,88 @@ +import os +from typing import List +from pyspark.sql import SparkSession +from pyspark.sql import functions as F +from pyspark.sql import DataFrame + +spark = SparkSession.builder \ + .appName("create_marts_clickhouse") \ + .master("local[*]") \ + .getOrCreate() + +def read_csv_folder(path: str) -> DataFrame: + return spark.read \ + .option("header", "true") \ + .option("inferSchema", "true") \ + .csv(path) + +users = read_csv_folder("data/clickhouse/users") +courses = read_csv_folder("data/clickhouse/courses") +lessons = read_csv_folder("data/clickhouse/lessons") +enrollments = read_csv_folder("data/clickhouse/enrollments") +lesson_views = read_csv_folder("data/clickhouse/lesson_views") + +def cast_int_columns(df: DataFrame, int_cols: List[str]) -> DataFrame: + for c in int_cols: + if c in df.columns: + df = df.withColumn(c, F.col(c).cast("int")) + return df + +users = cast_int_columns(users, ["id"]) +courses = cast_int_columns(courses, ["id"]) +lessons = cast_int_columns(lessons, ["id", "course_id"]) +enrollments = cast_int_columns(enrollments, ["id", "user_id", "course_id"]) +lesson_views = cast_int_columns(lesson_views, ["id", "user_id", "lesson_id"]) + +lesson_popularity_summary = lesson_views.alias("lv") \ + .join(lessons.alias("l"), F.col("lv.lesson_id") == F.col("l.id")) \ + .join(courses.alias("c"), F.col("l.course_id") == F.col("c.id")) \ + .groupBy(F.col("l.id").alias("lesson_id"), + F.col("l.title").alias("lesson_title"), + F.col("c.id").alias("course_id"), + F.col("c.title").alias("course_title")) \ + .agg( + F.count("lv.lesson_id").alias("total_views"), + F.countDistinct("lv.user_id").alias("unique_users"), + F.min("lv.viewed_at").alias("first_view"), + F.max("lv.viewed_at").alias("last_view") + ).orderBy("lesson_id") + +inactive_users_summary = users.alias("u") \ + .join(enrollments.alias("e"), F.col("u.id") == F.col("e.user_id"), "left") \ + .join(lesson_views.alias("lv"), F.col("u.id") == F.col("lv.user_id"), "left") \ + .filter(F.col("lv.user_id").isNull()) \ + .groupBy(F.col("u.id").alias("user_id"), + F.col("u.name").alias("user_name"), + F.col("u.email").alias("user_email"), + F.col("u.age").alias("user_age"), + F.col("u.registration_date").alias("user_registration_date")) \ + .agg( + F.countDistinct("e.course_id").alias("enrollments_count") + ).orderBy("user_id") + +course_completion_rate = courses.alias("c") \ + .join(enrollments.alias("e"), F.col("c.id") == F.col("e.course_id")) \ + .join(users.alias("u"), F.col("e.user_id") == F.col("u.id")) \ + .join(lessons.alias("l"), F.col("c.id") == F.col("l.course_id")) \ + .join(lesson_views.alias("lv"), + (F.col("u.id") == F.col("lv.user_id")) & + (F.col("l.id") == F.col("lv.lesson_id")), + "left") \ + .groupBy(F.col("u.id").alias("user_id"), + F.col("u.name").alias("user_name"), + F.col("c.id").alias("course_id"), + F.col("c.title").alias("course_title")) \ + .agg( + F.countDistinct("l.id").alias("lessons_in_course"), + F.countDistinct("lv.lesson_id").alias("lessons_viewed"), + F.round(F.countDistinct("lv.lesson_id") * 1.0 / F.countDistinct("l.id"), 2).alias("completion_rate") + ).orderBy("user_id", "course_id") + +output_dir = "data/marts/clickhouse" +os.makedirs(output_dir, exist_ok=True) + +lesson_popularity_summary.write.mode("overwrite").option("header", "true").csv(f"{output_dir}/lesson_popularity_summary") +inactive_users_summary.write.mode("overwrite").option("header", "true").csv(f"{output_dir}/inactive_users_summary") +course_completion_rate.write.mode("overwrite").option("header", "true").csv(f"{output_dir}/course_completion_rate") + +print(".............витрины clickhouse созданы") \ No newline at end of file diff --git a/create_marts_greenplum.py b/create_marts_greenplum.py new file mode 100644 index 0000000..dd00924 --- /dev/null +++ b/create_marts_greenplum.py @@ -0,0 +1,83 @@ +import os +from pyspark.sql import SparkSession +from pyspark.sql import functions as F +from pyspark.sql import DataFrame + +spark = SparkSession.builder \ + .appName("create_marts_greenplum") \ + .master("local[*]") \ + .getOrCreate() + +def read_csv_folder(path: str) -> DataFrame: + return spark.read \ + .option("header", "true") \ + .option("inferSchema", "true") \ + .csv(path) + +dim_user = read_csv_folder("data/greenplum/dim_user") +dim_course = read_csv_folder("data/greenplum/dim_course") +dim_lesson = read_csv_folder("data/greenplum/dim_lesson") +fact_lesson_views = read_csv_folder("data/greenplum/fact_lesson_views") +fact_enrollments = read_csv_folder("data/greenplum/fact_enrollments") + +lesson_popularity_summary = fact_lesson_views.alias("flv") \ + .join(dim_lesson.alias("dl"), F.col("flv.lesson_id") == F.col("dl.lesson_id")) \ + .join(dim_course.alias("dc"), F.col("flv.course_id") == F.col("dc.course_id")) \ + .groupBy( + F.col("dl.lesson_id").alias("lesson_id"), + F.col("dl.title").alias("lesson_title"), + F.col("dc.course_id").alias("course_id"), + F.col("dc.title").alias("course_title") + ).agg( + F.count("flv.lesson_id").alias("total_views"), + F.countDistinct("flv.user_id").alias("unique_users"), + F.min("flv.viewed_at").alias("first_view"), + F.max("flv.viewed_at").alias("last_view") + ).orderBy("lesson_id") + +inactive_users_summary = dim_user.alias("du") \ + .join(fact_lesson_views.alias("flv"), F.col("du.user_id") == F.col("flv.user_id"), "left") \ + .join(fact_enrollments.alias("fe"), F.col("du.user_id") == F.col("fe.user_id"), "left") \ + .filter(F.col("flv.user_id").isNull()) \ + .groupBy( + F.col("du.user_id").alias("user_id"), + F.col("du.name").alias("user_name"), + F.col("du.email"), + F.col("du.age"), + F.col("du.registration_date") + ).agg( + F.countDistinct("fe.course_id").alias("enrollments_count") + ).orderBy("user_id") + +course_completion_rate = fact_enrollments.alias("fe") \ + .join(dim_user.alias("du"), F.col("fe.user_id") == F.col("du.user_id")) \ + .join(dim_course.alias("dc"), F.col("fe.course_id") == F.col("dc.course_id")) \ + .join(dim_lesson.alias("dl"), F.col("dc.course_id") == F.col("dl.course_id")) \ + .join(fact_lesson_views.alias("flv"), + (F.col("du.user_id") == F.col("flv.user_id")) & + (F.col("dl.lesson_id") == F.col("flv.lesson_id")), + "left") \ + .groupBy( + F.col("du.user_id").alias("user_id"), + F.col("du.name").alias("user_name"), + F.col("dc.course_id").alias("course_id"), + F.col("dc.title").alias("course_title") + ).agg( + F.countDistinct("dl.lesson_id").alias("lessons_in_course"), + F.countDistinct("flv.lesson_id").alias("lessons_viewed"), + F.when( + F.countDistinct("dl.lesson_id") == 0, + F.lit(0.0) + ).otherwise( + F.countDistinct("flv.lesson_id") * 1.0 / F.countDistinct("dl.lesson_id") + ).alias("completion_rate") + ).orderBy("user_id", "course_id") + +output_dir = "data/marts/greenplum" +os.makedirs(output_dir, exist_ok=True) + +lesson_popularity_summary.write.mode("overwrite").option("header", "true").csv(f"{output_dir}/lesson_popularity_summary") +inactive_users_summary.write.mode("overwrite").option("header", "true").csv(f"{output_dir}/inactive_users_summary") +course_completion_rate.write.mode("overwrite").option("header", "true").csv(f"{output_dir}/course_completion_rate") + +print(".............витрины greenplum созданы") \ No newline at end of file diff --git a/create_marts_postgresql.py b/create_marts_postgresql.py new file mode 100644 index 0000000..7007de9 --- /dev/null +++ b/create_marts_postgresql.py @@ -0,0 +1,78 @@ +import os +from pyspark.sql import SparkSession +from pyspark.sql import functions as F +from pyspark.sql import DataFrame + +spark = SparkSession.builder \ + .appName("create_marts_postgresql") \ + .master("local[*]") \ + .getOrCreate() + +def read_csv_folder(path: str) -> DataFrame: + return spark.read \ + .option("header", "true") \ + .option("inferSchema", "true") \ + .csv(path) + +users = read_csv_folder("data/postgresql/users") +courses = read_csv_folder("data/postgresql/courses") +lessons = read_csv_folder("data/postgresql/lessons") +enrollments = read_csv_folder("data/postgresql/enrollments") +lesson_views = read_csv_folder("data/postgresql/lesson_views") + +user_activity_summary = users.alias("u") \ + .join(enrollments.alias("e"), F.col("u.id") == F.col("e.user_id"), "inner") \ + .join(lesson_views.alias("lv"), F.col("u.id") == F.col("lv.user_id"), "inner") \ + .join(lessons.alias("l"), F.col("e.course_id") == F.col("l.course_id"), "inner") \ + .groupBy(F.col("u.id").alias("user_id"), F.col("u.name")) \ + .agg( + F.countDistinct("e.course_id").alias("enrollment_count"), + F.countDistinct("lv.lesson_id").alias("viewed_lessons_count"), + F.max("lv.viewed_at").alias("last_view"), + F.min("lv.viewed_at").alias("first_view"), + F.concat(F.round(F.countDistinct("lv.lesson_id") * 100.0 / F.countDistinct("l.id"), 2), F.lit("%")).alias("engagement_rate") + ) + +total_views_per_course = lessons.alias("l") \ + .join(lesson_views.alias("lv"), F.col("l.id") == F.col("lv.lesson_id"), "inner") \ + .groupBy("l.course_id") \ + .agg(F.count("lv.lesson_id").alias("total_views")) + +course_summary = courses.alias("c") \ + .join(enrollments.alias("e"), F.col("c.id") == F.col("e.course_id"), "inner") \ + .join(lessons.alias("l"), F.col("c.id") == F.col("l.course_id"), "inner") \ + .join(lesson_views.alias("lv"), F.col("l.id") == F.col("lv.lesson_id"), "inner") \ + .join(total_views_per_course.alias("tv"), F.col("tv.course_id") == F.col("l.course_id"), "inner") \ + .groupBy(F.col("c.id").alias("course_id"), + F.col("c.title").alias("course_title"), + F.col("tv.total_views")) \ + .agg( + F.countDistinct("e.user_id").alias("unique_users"), + F.countDistinct("l.id").alias("lessons_count"), + F.round(F.col("tv.total_views") * 1.0 / F.countDistinct("e.user_id"), 2).alias("avg_views_per_user"), + F.min("lv.viewed_at").alias("first_viewed_at"), + F.max("lv.viewed_at").alias("last_viewed_at") + ).orderBy("course_id") + +platform_summary = users.alias("u") \ + .join(enrollments.alias("e"), F.col("u.id") == F.col("e.user_id"), "left") \ + .join(courses.alias("c"), F.col("c.id") == F.col("e.course_id"), "left") \ + .join(lessons.alias("l"), F.col("l.course_id") == F.col("c.id"), "full") \ + .join(lesson_views.alias("lv"), F.col("lv.user_id") == F.col("u.id"), "left") \ + .agg( + F.countDistinct("u.id").alias("total_users"), + F.countDistinct("c.id").alias("total_courses"), + F.countDistinct("l.id").alias("total_lessons"), + F.countDistinct("lv.user_id").alias("users_with_views"), + F.round(F.countDistinct("l.id") * 1.0 / F.countDistinct("c.id"), 2).alias("avg_lessons_per_course"), + F.round(F.count("lv.id") * 1.0 / F.countDistinct("lv.lesson_id"), 2).alias("avg_views_per_lesson") + ) + +output_dir = "data/marts/postgresql" +os.makedirs(output_dir, exist_ok=True) + +user_activity_summary.write.mode("overwrite").option("header", "true").csv(f"{output_dir}/user_activity_summary") +course_summary.write.mode("overwrite").option("header", "true").csv(f"{output_dir}/course_summary") +platform_summary.write.mode("overwrite").option("header", "true").csv(f"{output_dir}/platform_summary") + +print(".............витрины postgresql созданы") \ No newline at end of file diff --git a/extract_csv.py b/extract_csv.py new file mode 100644 index 0000000..8bf8ad6 --- /dev/null +++ b/extract_csv.py @@ -0,0 +1,96 @@ +import os +from pyspark.sql import SparkSession +from pyspark.sql.functions import col +from typing import Optional, List + +spark = SparkSession.builder \ + .appName("extract_csv") \ + .config("spark.jars", "jars/postgresql-42.6.0.jar,jars/clickhouse-jdbc-0.3.2-shaded.jar") \ + .config("spark.sql.adaptive.enabled", "true") \ + .config("spark.sql.adaptive.coalescePartitions.enabled", "true") \ + .config("spark.driver.host", "localhost") \ + .config("spark.driver.bindAddress", "localhost") \ + .config("spark.sql.legacy.timeParserPolicy", "LEGACY") \ + .master("local[*]") \ + .getOrCreate() + + +def extract_to_csv_dir( + url: str, + table: str, + user: str, + password: str, + driver: str, + schema: Optional[str]=None, + int_columns: Optional[List[str]]=None, + output_dir: str = "data" + ) -> None: + dbtable = f"{schema}.{table}" if schema else table + df = spark.read.format("jdbc") \ + .option("url", url) \ + .option("dbtable", dbtable) \ + .option("user", user) \ + .option("password", password) \ + .option("driver", driver) \ + .load() + + if int_columns: + for c in int_columns: + if c in df.columns: + df = df.withColumn(c, col(c).cast("int")) + + os.makedirs(f"{output_dir}/{table}", exist_ok=True) + df.write.mode("overwrite").option("header", "true").csv(f"{output_dir}/{table}") + print(f"загружена {dbtable} в {output_dir}/{table}") + + +#PostgreSQL +postgres_url = "jdbc:postgresql://postgres.de-infra.servehttp.com:9002/mydb" +postgres_user = "gsavelev" +postgres_pass = "zBYuwrebwrQmM4E8" +postgres_driver = "org.postgresql.Driver" +postgres_schema = "gsavelev" +postgres_tables = ["users", "courses", "lessons", "enrollments", "lesson_views"] + +print("PostgreSQL") +for table in postgres_tables: + extract_to_csv_dir(postgres_url, table, postgres_user, postgres_pass, postgres_driver, + schema=postgres_schema, + output_dir="data/postgresql") + + +#Greenplum +greenplum_url = "jdbc:postgresql://greenplum.de-infra.servehttp.com:9003/gsavelev" +greenplum_user = "gsavelev" +greenplum_pass = "zBYuwrebwrQmM4E8" +greenplum_driver = "org.postgresql.Driver" +greenplum_schema = "core" +greenplum_tables = ["dim_user", "dim_course", "dim_lesson", "fact_lesson_views", "fact_enrollments"] + +print("\nGreenplum") +for table in greenplum_tables: + extract_to_csv_dir(greenplum_url, table, greenplum_user, greenplum_pass, greenplum_driver, + schema=greenplum_schema, + output_dir="data/greenplum") + + +#ClickHouse +clickhouse_url = "jdbc:clickhouse://clickhouse.de-infra.servehttp.com:9004/gsavelev" +clickhouse_user = "gsavelev" +clickhouse_pass = "zBYuwrebwrQmM4E8" +clickhouse_driver = "com.clickhouse.jdbc.ClickHouseDriver" +clickhouse_tables = ["users", "courses", "lessons", "enrollments", "lesson_views"] + +clickhouse_int_cols = { + "users": ["user_id"], + "courses": ["course_id"], + "lessons": ["lesson_id", "course_id"], + "enrollments": ["enrollment_id", "user_id", "course_id"], + "lesson_views": ["view_id", "user_id", "lesson_id"] +} + +print("\nClickHouse") +for table in clickhouse_tables: + extract_to_csv_dir(clickhouse_url, table, clickhouse_user, clickhouse_pass, clickhouse_driver, + int_columns=clickhouse_int_cols.get(table), + output_dir="data/clickhouse") \ No newline at end of file diff --git a/main.py b/main.py new file mode 100644 index 0000000..5334bb0 --- /dev/null +++ b/main.py @@ -0,0 +1,6 @@ +import subprocess + +subprocess.run(["python", "extract_csv.py"], check=True) +subprocess.run(["python", "create_marts_postgresql.py"], check=True) +subprocess.run(["python", "create_marts_greenplum.py"], check=True) +subprocess.run(["python", "create_marts_clickhouse.py"], check=True) diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..bd24227 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,9 @@ +numpy==2.2.6 +pandas==2.3.3 +py4j==0.10.9.9 +pyarrow==22.0.0 +pyspark==4.0.1 +python-dateutil==2.9.0.post0 +pytz==2025.2 +six==1.17.0 +tzdata==2025.2