Sum Validation
설명
하이브와 소스의 숫자열의 합을 비교하는 프로그램.
사용법
%pyspark
# oracle connector
def ora_select(sql:str) -> spark:
return spark.read \
.format("jdbc") \
.option("url", "jdbc:oracle:thin:@IP:PORT:SID") \
.option("user", "USERID").option("password", "PASSWORD") \
.option("query", sql) \
.load()
# oracle number type collector
def ora_num_type_collector(table_name:str,
schema_name:str) -> spark:
sql = f"SELECT COLUMN_NAME \
FROM ALL_TAB_COLUMNS \
WHERE TABLE_NAME = '{table_name.upper()}' \
AND OWNER = '{schema_name.upper()}' \
AND DATA_TYPE LIKE 'NUMBER%'"
return list(map(lambda x : x[0], ora_select(sql).collect()))
# oracle sum select
def ora_sum_select(col_name:list,
table_name:str,
schema_name:str) -> spark:
sql = f"SELECT {','.join(col_name)} \
FROM {schema_name.upper()}.{table_name.upper()}"
return ora_select(sql)
# oracle partition sum select
def ora_part_sum_select(col_name:list,
table_name:str,
schema_name:str,
pt_key:str,
start_date:str,
end_date:str) -> spark:
sql = f"SELECT {','.join(col_name)} \
FROM {schema_name.upper()}.{table_name.upper()} \
WHERE {pt_key} >= '{start_date}' \
AND {pt_key} <= '{end_date}'"
return ora_select(sql)
# hive sum select
def hive_sum_select(col_name:list,
table_name:str,
schema_name:str) -> spark:
hql = f"SELECT {','.join(col_name)} \
FROM {schema_name}.{table_name}"
return spark.sql(hql)
# hive pratition sum select
def hive_part_sum_select(col_name:list,
table_name:str,
schema_name:str,
pt_key:str,
start_date:str,
end_date:str) -> spark:
hql = f"SELECT {','.join(col_name)} \
FROM {schema_name}.{table_name} \
WHERE {pt_key} >= '{start_date}' \
AND {pt_key} <= '{end_date}'"
return spark.sql(hql)
# oracle partition check
def ora_partition(table_name:str,
schema_name:str)->str:
sql = f"SELECT COLUMN_NAME\
FROM ALL_PART_KEY_COLUMNS\
WHERE NAME = '{table_name.upper()}'\
AND OWNER = '{schema_name.upper()}'"
pt_k_meta = ora_select(sql)
return list(map(lambda x : x[0], pt_k_meta.collect()))
# Compare Data
def compare_sum_val(df_1:spark.sql,
df_2:spark.sql,
col_name:list)-> None:
for col in col_name:
comp1 = df_1.select(col).collect()[0]
comp2 = df_2.select(col).collect()[0]
status = 'O' if comp1 == comp2 else 'X'
print(f"{status} \t", end='')
print('oracle : ', comp1 , '\t', 'hive : ', comp2 , '\n')
return
# Table & Partition 분류
'''
* pt_key 기준 Branch
* Partition 경우 start, end date 변수
'''
def tb_branch(col_name:str,
table_name:str,
schema_name:str,
lake_schema_name:str,
start_date:str = '202210',
end_date:str = '202211'
)->str:
pt_key = ora_partition(table_name=table_name, schema_name=schema_name)
sum_col_name = list(map(lambda x : ''.join(['SUM(',x,') AS ' , x ]), col_name))
hive_df:spark.sql
oracle_df:spark.sql
if pt_key:
if pt_key[0].upper() != 'BASE_DT':
start_date = ''.join([start_date, '01070000000000'])
end_date = ''.join([end_date, '01070000000000'])
hive_df = hive_part_sum_select(col_name=sum_col_name,
table_name=table_name,
schema_name=lake_schema_name,
pt_key = pt_key[0],
start_date = start_date,
end_date = end_date)
oracle_df = ora_part_sum_select(col_name=sum_col_name,
table_name=table_name,
schema_name=schema_name,
pt_key = pt_key[0],
start_date = start_date,
end_date = end_date)
else:
hive_df = hive_sum_select(col_name=sum_col_name,
table_name=table_name,
schema_name=lake_schema_name)
oracle_df = ora_sum_select(col_name=sum_col_name,
table_name=table_name,
schema_name=lake_schema_name)
compare_sum_val(df_1 = hive_df,
df_2 = oracle_df,
col_name = col_name)
# ======================================= main =======================================
table_name = 'TABLE_NAME'
schema_name = 'USER_NAME'
lake_schema_name = 'HADOOP_MANAGED_PATH'
start_date = '202010'
end_date = '202209'
data_type_lst = ora_num_type_collector(table_name=table_name,
schema_name=schema_name)
tb_branch(col_name=data_type_lst,
table_name=table_name,
schema_name=schema_name,
lake_schema_name = lake_schema_name,
start_date = start_date,
end_date = end_date)