Files
pyspark/extract_csv.py
2026-02-24 07:43:09 +03:00

96 lines
3.3 KiB
Python

import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from typing import Optional, List
spark = SparkSession.builder \
.appName("extract_csv") \
.config("spark.jars", "jars/postgresql-42.6.0.jar,jars/clickhouse-jdbc-0.3.2-shaded.jar") \
.config("spark.sql.adaptive.enabled", "true") \
.config("spark.sql.adaptive.coalescePartitions.enabled", "true") \
.config("spark.driver.host", "localhost") \
.config("spark.driver.bindAddress", "localhost") \
.config("spark.sql.legacy.timeParserPolicy", "LEGACY") \
.master("local[*]") \
.getOrCreate()
def extract_to_csv_dir(
url: str,
table: str,
user: str,
password: str,
driver: str,
schema: Optional[str]=None,
int_columns: Optional[List[str]]=None,
output_dir: str = "data"
) -> None:
dbtable = f"{schema}.{table}" if schema else table
df = spark.read.format("jdbc") \
.option("url", url) \
.option("dbtable", dbtable) \
.option("user", user) \
.option("password", password) \
.option("driver", driver) \
.load()
if int_columns:
for c in int_columns:
if c in df.columns:
df = df.withColumn(c, col(c).cast("int"))
os.makedirs(f"{output_dir}/{table}", exist_ok=True)
df.write.mode("overwrite").option("header", "true").csv(f"{output_dir}/{table}")
print(f"загружена {dbtable} в {output_dir}/{table}")
#PostgreSQL
postgres_url = "jdbc:postgresql://postgres.de-infra.servehttp.com:9002/mydb"
postgres_user = "gsavelev"
postgres_pass = "zBYuwrebwrQmM4E8"
postgres_driver = "org.postgresql.Driver"
postgres_schema = "gsavelev"
postgres_tables = ["users", "courses", "lessons", "enrollments", "lesson_views"]
print("PostgreSQL")
for table in postgres_tables:
extract_to_csv_dir(postgres_url, table, postgres_user, postgres_pass, postgres_driver,
schema=postgres_schema,
output_dir="data/postgresql")
#Greenplum
greenplum_url = "jdbc:postgresql://greenplum.de-infra.servehttp.com:9003/gsavelev"
greenplum_user = "gsavelev"
greenplum_pass = "zBYuwrebwrQmM4E8"
greenplum_driver = "org.postgresql.Driver"
greenplum_schema = "core"
greenplum_tables = ["dim_user", "dim_course", "dim_lesson", "fact_lesson_views", "fact_enrollments"]
print("\nGreenplum")
for table in greenplum_tables:
extract_to_csv_dir(greenplum_url, table, greenplum_user, greenplum_pass, greenplum_driver,
schema=greenplum_schema,
output_dir="data/greenplum")
#ClickHouse
clickhouse_url = "jdbc:clickhouse://clickhouse.de-infra.servehttp.com:9004/gsavelev"
clickhouse_user = "gsavelev"
clickhouse_pass = "zBYuwrebwrQmM4E8"
clickhouse_driver = "com.clickhouse.jdbc.ClickHouseDriver"
clickhouse_tables = ["users", "courses", "lessons", "enrollments", "lesson_views"]
clickhouse_int_cols = {
"users": ["id"],
"courses": ["id"],
"lessons": ["id", "course_id"],
"enrollments": ["id", "user_id", "course_id"],
"lesson_views": ["id", "user_id", "lesson_id"]
}
print("\nClickHouse")
for table in clickhouse_tables:
extract_to_csv_dir(clickhouse_url, table, clickhouse_user, clickhouse_pass, clickhouse_driver,
int_columns=clickhouse_int_cols.get(table),
output_dir="data/clickhouse")