96 lines
3.3 KiB
Python
96 lines
3.3 KiB
Python
import os
|
|
from pyspark.sql import SparkSession
|
|
from pyspark.sql.functions import col
|
|
from typing import Optional, List
|
|
|
|
spark = SparkSession.builder \
|
|
.appName("extract_csv") \
|
|
.config("spark.jars", "jars/postgresql-42.6.0.jar,jars/clickhouse-jdbc-0.3.2-shaded.jar") \
|
|
.config("spark.sql.adaptive.enabled", "true") \
|
|
.config("spark.sql.adaptive.coalescePartitions.enabled", "true") \
|
|
.config("spark.driver.host", "localhost") \
|
|
.config("spark.driver.bindAddress", "localhost") \
|
|
.config("spark.sql.legacy.timeParserPolicy", "LEGACY") \
|
|
.master("local[*]") \
|
|
.getOrCreate()
|
|
|
|
|
|
def extract_to_csv_dir(
|
|
url: str,
|
|
table: str,
|
|
user: str,
|
|
password: str,
|
|
driver: str,
|
|
schema: Optional[str]=None,
|
|
int_columns: Optional[List[str]]=None,
|
|
output_dir: str = "data"
|
|
) -> None:
|
|
dbtable = f"{schema}.{table}" if schema else table
|
|
df = spark.read.format("jdbc") \
|
|
.option("url", url) \
|
|
.option("dbtable", dbtable) \
|
|
.option("user", user) \
|
|
.option("password", password) \
|
|
.option("driver", driver) \
|
|
.load()
|
|
|
|
if int_columns:
|
|
for c in int_columns:
|
|
if c in df.columns:
|
|
df = df.withColumn(c, col(c).cast("int"))
|
|
|
|
os.makedirs(f"{output_dir}/{table}", exist_ok=True)
|
|
df.write.mode("overwrite").option("header", "true").csv(f"{output_dir}/{table}")
|
|
print(f"загружена {dbtable} в {output_dir}/{table}")
|
|
|
|
|
|
#PostgreSQL
|
|
postgres_url = "jdbc:postgresql://postgres.de-infra.servehttp.com:9002/mydb"
|
|
postgres_user = "gsavelev"
|
|
postgres_pass = "zBYuwrebwrQmM4E8"
|
|
postgres_driver = "org.postgresql.Driver"
|
|
postgres_schema = "gsavelev"
|
|
postgres_tables = ["users", "courses", "lessons", "enrollments", "lesson_views"]
|
|
|
|
print("PostgreSQL")
|
|
for table in postgres_tables:
|
|
extract_to_csv_dir(postgres_url, table, postgres_user, postgres_pass, postgres_driver,
|
|
schema=postgres_schema,
|
|
output_dir="data/postgresql")
|
|
|
|
|
|
#Greenplum
|
|
greenplum_url = "jdbc:postgresql://greenplum.de-infra.servehttp.com:9003/gsavelev"
|
|
greenplum_user = "gsavelev"
|
|
greenplum_pass = "zBYuwrebwrQmM4E8"
|
|
greenplum_driver = "org.postgresql.Driver"
|
|
greenplum_schema = "core"
|
|
greenplum_tables = ["dim_user", "dim_course", "dim_lesson", "fact_lesson_views", "fact_enrollments"]
|
|
|
|
print("\nGreenplum")
|
|
for table in greenplum_tables:
|
|
extract_to_csv_dir(greenplum_url, table, greenplum_user, greenplum_pass, greenplum_driver,
|
|
schema=greenplum_schema,
|
|
output_dir="data/greenplum")
|
|
|
|
|
|
#ClickHouse
|
|
clickhouse_url = "jdbc:clickhouse://clickhouse.de-infra.servehttp.com:9004/gsavelev"
|
|
clickhouse_user = "gsavelev"
|
|
clickhouse_pass = "zBYuwrebwrQmM4E8"
|
|
clickhouse_driver = "com.clickhouse.jdbc.ClickHouseDriver"
|
|
clickhouse_tables = ["users", "courses", "lessons", "enrollments", "lesson_views"]
|
|
|
|
clickhouse_int_cols = {
|
|
"users": ["id"],
|
|
"courses": ["id"],
|
|
"lessons": ["id", "course_id"],
|
|
"enrollments": ["id", "user_id", "course_id"],
|
|
"lesson_views": ["id", "user_id", "lesson_id"]
|
|
}
|
|
|
|
print("\nClickHouse")
|
|
for table in clickhouse_tables:
|
|
extract_to_csv_dir(clickhouse_url, table, clickhouse_user, clickhouse_pass, clickhouse_driver,
|
|
int_columns=clickhouse_int_cols.get(table),
|
|
output_dir="data/clickhouse") |