From 3a7624b12f49d8e93aa52e35e56f63f9b6e6b8f6 Mon Sep 17 00:00:00 2001 From: tdroberto Date: Mon, 30 Dec 2024 20:16:04 +0900 Subject: [PATCH] add deterministic probabilistic deduplication solution --- .../README.md | 24 ++ .../config/params.yaml | 11 + .../dm_pm.dig | 50 ++++ .../queries/cleanse.sql | 176 ++++++++++++++ .../scripts/dm.py | 221 ++++++++++++++++++ .../scripts/pm.py | 218 +++++++++++++++++ 6 files changed, 700 insertions(+) create mode 100644 data-box/deduplication_deterministic_probabilistic/README.md create mode 100644 data-box/deduplication_deterministic_probabilistic/config/params.yaml create mode 100644 data-box/deduplication_deterministic_probabilistic/dm_pm.dig create mode 100644 data-box/deduplication_deterministic_probabilistic/queries/cleanse.sql create mode 100644 data-box/deduplication_deterministic_probabilistic/scripts/dm.py create mode 100644 data-box/deduplication_deterministic_probabilistic/scripts/pm.py diff --git a/data-box/deduplication_deterministic_probabilistic/README.md b/data-box/deduplication_deterministic_probabilistic/README.md new file mode 100644 index 00000000..93b23e6d --- /dev/null +++ b/data-box/deduplication_deterministic_probabilistic/README.md @@ -0,0 +1,24 @@ +# Deterministic and probabilistic deduplication + +---- +## Overview + +This project provides a solution to deterministically and probabilistically deduplicate a given dataset that has no reliable identifier. + +---- +## Implementation +1. Copy and paste this code into Treasure Workflows or run it with TD toolbelt. +2. Set your TD master key as the workflow secret. +3. Change the database and tables in the config/params.yaml file. + +---- +## Considerations + +This project was developed for a Vietnamese automobile customer. Consider changing the cleanse.sql accordingly to normalize characters and the variables in the scripts to better suit your needs. + +The probabilistic matching script (pm.py) uses multiprocessing, consider changing the settings according to your dataset size. + +---- +## Questions + +Please feel free to reach out to apac-se@treasure-data.com with any questions you have about using this code. diff --git a/data-box/deduplication_deterministic_probabilistic/config/params.yaml b/data-box/deduplication_deterministic_probabilistic/config/params.yaml new file mode 100644 index 00000000..09c1ffdd --- /dev/null +++ b/data-box/deduplication_deterministic_probabilistic/config/params.yaml @@ -0,0 +1,11 @@ +db: rotd_dm_pm +source_tbl: customers +clean_tbl: clean_sap_customer +dm_tbl: dm_dedup +pm_tbl: pm_dedup_eval +api_endpoint: https://api.treasuredata.com +positive_threshold: 80 +name_weight: 1 +address_weight: 1 +partition_cnt: 16 +process_cnt: 8 \ No newline at end of file diff --git a/data-box/deduplication_deterministic_probabilistic/dm_pm.dig b/data-box/deduplication_deterministic_probabilistic/dm_pm.dig new file mode 100644 index 00000000..ecfb6413 --- /dev/null +++ b/data-box/deduplication_deterministic_probabilistic/dm_pm.dig @@ -0,0 +1,50 @@ +timezone: Asia/Tokyo + +_export: + !include : config/params.yaml + td: + engine: presto + database: ${db} + ++create_db_tbl_if_not_exists: + td_ddl>: + create_databases: [ "${db}" ] + create_tables: [ "${pm_tbl}" ] + empty_tables: [ "${clean_tbl}", "${dm_tbl}", "${pm_tbl}" ] + ++create_db_tbl_if_not_exists: + td_ddl>: + create_databases: [ "${db}" ] + create_tables: [ "${clean_tbl}", "${dm_tbl}", "${pm_tbl}" ] + empty_tables: [ "${clean_tbl}", "${dm_tbl}", "${pm_tbl}" ] + ++cleansing: + td>: queries/cleanse.sql + create_table: ${clean_tbl} + +# Deterministic Matching at tier 4 ++dm: + py>: scripts.dm.main + _env: + TD_API_KEY: ${secret:td.apikey} + TD_API_ENDPOINT: ${api_endpoint} + docker: + image: "digdag/digdag-python:3.10" + resource_level: "4" + +# Probabilistic Matching with multi-processing & multiple tasks at tier 4 ++pm: + loop>: ${partition_cnt} + _parallel: false + _do: + +run_serial: + py>: scripts.pm.main + _env: + TD_API_KEY: ${secret:td.apikey} + TD_API_ENDPOINT: ${api_endpoint} + PROCESS_CNT: ${process_cnt} + PART: ${i} + PARTS: ${partition_cnt} + docker: + image: "digdag/digdag-python:3.10" + resource_level: "4" diff --git a/data-box/deduplication_deterministic_probabilistic/queries/cleanse.sql b/data-box/deduplication_deterministic_probabilistic/queries/cleanse.sql new file mode 100644 index 00000000..36fa5156 --- /dev/null +++ b/data-box/deduplication_deterministic_probabilistic/queries/cleanse.sql @@ -0,0 +1,176 @@ +SELECT + TRIM(CAST(customer_code AS VARCHAR)) AS cl_cc, + regexp_replace(TRIM(mobile_phone), '\s+', '') AS cl_phone, + regexp_replace(TRIM(identification_number), '\s+', '') AS cl_vin, + LOWER(TRIM(email)) AS cl_email, + regexp_replace( + regexp_replace( + regexp_replace( + regexp_replace( + regexp_replace( + regexp_replace( + regexp_replace( + regexp_replace( + LOWER(TRIM(customer_name)), + '\s+', + ' ' + ), + '[áàảãạăắằẳẵặâấầẩẫậ]', + 'a' + ), + '[éèẻẽẹêếềểễệ]', + 'e' + ), + '[íìỉĩị]', + 'i' + ), + '[óòỏõọôốồổỗộơớờởỡợ]', + 'o' + ), + '[úùủũụưứừửữự]', + 'u' + ), + '[ýỳỷỹỵ]', + 'y' + ), + 'đ', + 'd' + ) AS cl_customer_name, + CONCAT( + regexp_replace( + regexp_replace( + regexp_replace( + regexp_replace( + regexp_replace( + regexp_replace( + regexp_replace( + regexp_replace( + LOWER(TRIM(city)), + '\s+', + ' ' + ), + '[áàảãạăắằẳẵặâấầẩẫậ]', + 'a' + ), + '[éèẻẽẹêếềểễệ]', + 'e' + ), + '[íìỉĩị]', + 'i' + ), + '[óòỏõọôốồổỗộơớờởỡợ]', + 'o' + ), + '[úùủũụưứừửữự]', + 'u' + ), + '[ýỳỷỹỵ]', + 'y' + ), + 'đ', + 'd' + ), + ';', + regexp_replace( + regexp_replace( + regexp_replace( + regexp_replace( + regexp_replace( + regexp_replace( + regexp_replace( + regexp_replace( + LOWER(TRIM(district)), + '\s+', + ' ' + ), + '[áàảãạăắằẳẵặâấầẩẫậ]', + 'a' + ), + '[éèẻẽẹêếềểễệ]', + 'e' + ), + '[íìỉĩị]', + 'i' + ), + '[óòỏõọôốồổỗộơớờởỡợ]', + 'o' + ), + '[úùủũụưứừửữự]', + 'u' + ), + '[ýỳỷỹỵ]', + 'y' + ), + 'đ', + 'd' + ), + ';', + regexp_replace( + regexp_replace( + regexp_replace( + regexp_replace( + regexp_replace( + regexp_replace( + regexp_replace( + regexp_replace( + LOWER(TRIM(customer_address)), + '\s+', + ' ' + ), + '[áàảãạăắằẳẵặâấầẩẫậ]', + 'a' + ), + '[éèẻẽẹêếềểễệ]', + 'e' + ), + '[íìỉĩị]', + 'i' + ), + '[óòỏõọôốồổỗộơớờởỡợ]', + 'o' + ), + '[úùủũụưứừửữự]', + 'u' + ), + '[ýỳỷỹỵ]', + 'y' + ), + 'đ', + 'd' + )) AS cl_full_address, + regexp_replace( + regexp_replace( + regexp_replace( + regexp_replace( + regexp_replace( + regexp_replace( + regexp_replace( + regexp_replace( + LOWER(TRIM(city)), + '\s+', + ' ' + ), + '[áàảãạăắằẳẵặâấầẩẫậ]', + 'a' + ), + '[éèẻẽẹêếềểễệ]', + 'e' + ), + '[íìỉĩị]', + 'i' + ), + '[óòỏõọôốồổỗộơớờởỡợ]', + 'o' + ), + '[úùủũụưứừửữự]', + 'u' + ), + '[ýỳỷỹỵ]', + 'y' + ), + 'đ', + 'd' + ) AS cl_city, + * +FROM + ${db}.${source_tbl} \ No newline at end of file diff --git a/data-box/deduplication_deterministic_probabilistic/scripts/dm.py b/data-box/deduplication_deterministic_probabilistic/scripts/dm.py new file mode 100644 index 00000000..6e06f456 --- /dev/null +++ b/data-box/deduplication_deterministic_probabilistic/scripts/dm.py @@ -0,0 +1,221 @@ +import os + +import pandas as pd +import hashlib +import pytd + +def main(**kwargs): + tdAPIkey = os.getenv("TD_API_KEY") + tdAPIendpoint = os.getenv("TD_API_ENDPOINT") + database = kwargs.get('db') + src_table = kwargs.get('clean_tbl') + out_table = kwargs.get('dm_tbl') + + td = pytd.Client(apikey=tdAPIkey, + endpoint=tdAPIendpoint, + database=database, + default_engine='presto') + + # Create a dataframe by importing data from Treasure Data database. + res = td.query(f'SELECT cl_cc, cl_phone, cl_email, cl_customer_name, cl_full_address, cl_city, cl_vin FROM {database}.{src_table}') + df = pd.DataFrame(**res) + + # Init + dedup_list = [] + counts = [] + customer_code_col = 'cl_cc' + phone_col = 'cl_phone' + email_col = 'cl_email' + name_col = 'cl_customer_name' + address_col = 'cl_full_address' + canonical_id_col = 'td_id' + step_col = 'step' + city_col = 'cl_city' + vin_col = 'cl_vin' + other_cc_col = 'cc_other' + other_vin_col = 'vin_other' + hash_cols = [customer_code_col, phone_col, email_col, name_col, address_col] + + # Unique, not null phones + def check_phone_dup(phone): + if pd.isna(phone): + return True + else: + return len(df[df[phone_col] == phone]) > 1 + + # Non-unique, not null phones; not null emails; phone isn't used in any other phone + email combination + def check_phone_email_dup(phone, email): + if pd.isna(email) or pd.isna(phone): + return True + else: + if len(df[df[phone_col] == phone]) == 1: + return True + else: + df_multi_ph_check = df[df[phone_col] == phone] + df_multi_ph_em_check = df[(df[phone_col] == phone) & (df[email_col] == email)] + if len(df_multi_ph_check) != len(df_multi_ph_em_check): + return True + else: + return False + + # Non-unique, not null phones; not null names and addresses; emails don't matter + def check_name_address_dup(phone, name, address): + if pd.isna(name) or pd.isna(address) or pd.isna(phone): + return True + else: + if len(df[df[phone_col] == phone]) == 1: + return True + else: + df_multi_ph_check = df[df[phone_col] == phone] + df_multi_ph_name_addr_check = df[(df[phone_col] == phone) & (df[address_col] == address) & (df[name_col] == name)] + if len(df_multi_ph_check) != len(df_multi_ph_name_addr_check): + return True + else: + return False + + # Function to generate unique identifier. + # In: row of data; column names + # Out: unique identifier + def generate_canonical_id(row, columns): + concatenated_values = ''.join(str(row[col]) for col in columns if pd.notna(row[col])) + canonical_id = hashlib.sha256(concatenated_values.encode()).hexdigest() + return canonical_id + + # Function to transform row, append unique identifier and step, delete processed rows in base dataframe. + # In: row of data; step #; array of other customer codes; array of other vehicle identification numbers + def store_row(row, step, occ, ov): + dedup_list.append({ + step_col: step, + canonical_id_col: generate_canonical_id(row, hash_cols), + customer_code_col: row[customer_code_col], + name_col: row[name_col], + address_col: row[address_col], + phone_col: row[phone_col], + email_col: row[email_col], + city_col: row[city_col], + vin_col: row[vin_col], + other_cc_col: occ, + other_vin_col: ov + }) + df.drop(df[df[customer_code_col] == row[customer_code_col]].index, inplace=True) + if occ is not None: + for oc in occ: + df.drop(df[df[customer_code_col] == oc].index, inplace=True) + + # Function to create arrays of other customer codes in merged records. + # In: temp dataframe; highest customer code + # Out: array of other customer codes + def collect_other_cc(t_df, max_cc): + other_ccs = [] + for i, r in t_df.iterrows(): + if max_cc != r[customer_code_col]: + other_ccs.append(r[customer_code_col]) + return other_ccs + + # Function to create arrays of other vehicle identification numbers in merged records. + # In: temp dataframe; highest customer code + # Out: array of other vehicle identification numbers + def collect_other_vin(t_df, max_cc): + other_vins = [] + for i, r in t_df.iterrows(): + if max_cc != r[customer_code_col]: + other_vins.append(r[vin_col]) + return other_vins + + # Step 1 - Unique, not null phone rows + s = 'S1' + print('Start ', s) + cnt_s1 = 0 + for index, row in df.iterrows(): + if not check_phone_dup(row[phone_col]): + store_row(row, s, None, None) + print(row.to_frame().T) + cnt_s1 += 1 + df = df.reset_index(drop=True) + counts.append({s: cnt_s1}) + + # Step 2a - Duplicated phone rows, merge on phone + email + s = 'S2a' + print('Start ', s) + cnt_s2a_bf = cnt_s2a_af = 0 + for index, row in df.iterrows(): + if not check_phone_email_dup(row[phone_col], row[email_col]) and index in df.index: + df_ph_em = df[(df[phone_col] == row[phone_col]) & (df[email_col] == row[email_col])] + cnt_s2a_bf += len(df_ph_em) + max_cc = df_ph_em[customer_code_col].max() + other_ccs = collect_other_cc(df_ph_em, max_cc) + other_vins = collect_other_vin(df_ph_em, max_cc) + print(df_ph_em[[customer_code_col, phone_col, email_col, name_col, address_col, vin_col]], other_ccs, other_vins) + print(df_ph_em[df_ph_em[customer_code_col] == max_cc][[customer_code_col, phone_col, email_col, name_col, address_col, vin_col]], other_ccs, other_vins) + store_row(df_ph_em[df_ph_em[customer_code_col] == max_cc].iloc[0], s, other_ccs, other_vins) + cnt_s2a_af += 1 + df = df.reset_index(drop=True) + counts.append({s: cnt_s2a_bf}) + counts.append({s: cnt_s2a_af}) + + #Step 2b - Duplicated phone rows, merge on name + address, ignore email + s = 'S2b' + print('Start ', s) + cnt_s2b_bf = cnt_s2b_af = 0 + for index, row in df.iterrows(): + if not check_name_address_dup(row[phone_col], row[name_col], row[address_col]) and index in df.index: + df_ph_em = df[(df[phone_col] == row[phone_col]) & (df[name_col] == row[name_col]) & (df[address_col] == row[address_col])] + cnt_s2b_bf += len(df_ph_em) + max_cc = df_ph_em[customer_code_col].max() + other_ccs = collect_other_cc(df_ph_em, max_cc) + other_vins = collect_other_vin(df_ph_em, max_cc) + print(df_ph_em[[customer_code_col, phone_col, email_col, name_col, address_col, vin_col]]) + print(df_ph_em[df_ph_em[customer_code_col] == max_cc][[customer_code_col, phone_col, email_col, name_col, address_col, vin_col]]) + store_row(df_ph_em[df_ph_em[customer_code_col] == max_cc].iloc[0], s, other_ccs, other_vins) + cnt_s2b_af += 1 + df = df.reset_index(drop=True) + counts.append({s: cnt_s2b_bf}) + counts.append({s: cnt_s2b_af}) + + #Step 3 - Phone is null, email isn't null + s = 'S3' + print('Start ', s) + cnt_s3 = 0 + for index, row in df.iterrows(): + if pd.isna(row[phone_col]) and not pd.isna(row[email_col]) and index in df.index: + store_row(row, s, None, None) + print(row.to_frame().T) + cnt_s3 += 1 + df = df.reset_index(drop=True) + counts.append({s: cnt_s3}) + + #Step 4 - Phone and email are null + s = 'S4' + print('Start ', s) + cnt_s4 = 0 + for index, row in df.iterrows(): + if pd.isna(row[phone_col]) and pd.isna(row[email_col]) and index in df.index: + store_row(row, s, None, None) + print(row.to_frame().T) + cnt_s4 += 1 + df = df.reset_index(drop=True) + counts.append({s: cnt_s4}) + + #Step 2c - Duplicated phone rows, no PII to merge on + s = 'S2c' + print('Start ', s) + cnt_s2c = 0 + for index, row in df.iterrows(): + if index in df.index: + store_row(row, s, None, None) + print(row.to_frame().T) + cnt_s2c += 1 + counts.append({s: cnt_s2c}) + + # Write results to TD table. + result_df = pd.DataFrame(dedup_list) + print(result_df) + td.load_table_from_dataframe(result_df,f'{database}.{out_table}',writer='bulk_import',if_exists='overwrite') + + # Counts + print(counts) + print('Fin') + +# Main +if __name__ == "__main__": + main() diff --git a/data-box/deduplication_deterministic_probabilistic/scripts/pm.py b/data-box/deduplication_deterministic_probabilistic/scripts/pm.py new file mode 100644 index 00000000..54b1329c --- /dev/null +++ b/data-box/deduplication_deterministic_probabilistic/scripts/pm.py @@ -0,0 +1,218 @@ +import os +import sys +os.system(f"{sys.executable} -m pip install fuzzywuzzy textdistance python-Levenshtein rapidfuzz") + +import pandas as pd +from fuzzywuzzy import fuzz +import textdistance as tedi +import Levenshtein as lev +import rapidfuzz as rf +import pytd +import multiprocessing + +def main(**kwargs): + tdAPIkey = os.getenv("TD_API_KEY") + tdAPIendpoint = os.getenv("TD_API_ENDPOINT") + process_cnt = int(os.getenv("PROCESS_CNT")) + parts = int(os.getenv("PARTS")) + part = int(os.getenv("PART")) + database = kwargs.get('db') + src_table = kwargs.get('dm_tbl') + out_table = kwargs.get('pm_tbl') + name_weight = int(kwargs.get('name_weight')) + address_weight = int(kwargs.get('address_weight')) + positive_threshold = int(kwargs.get('positive_threshold')) + + td = pytd.Client(apikey=tdAPIkey, + endpoint=tdAPIendpoint, + database=database, + default_engine='presto') + + # Init + customer_code_col = 'cl_cc' + phone_col = 'cl_phone' + email_col = 'cl_email' + name_col = 'cl_customer_name' + address_col = 'cl_full_address' + canonical_id_col = 'td_id' + vin_col = 'cl_vin' + cc_other_col = 'cc_other' + vin_other_col = 'vin_other' + + # Create a dataframe by importing data from Treasure Data database. + res = td.query(f'WITH with_rn AS (SELECT {customer_code_col}, {phone_col}, {email_col}, {name_col}, {address_col}, {canonical_id_col}, {vin_col}, {cc_other_col}, {vin_other_col}, ROW_NUMBER() OVER (ORDER BY {canonical_id_col}) AS rn FROM {database}.{src_table}) SELECT * FROM with_rn WHERE rn % {parts} = {part}') + base_df = pd.DataFrame(**res) + print(base_df) + + # Number of processes (vCPUs) + num_processes = process_cnt + print(f"Starting data processing using {num_processes} processes...") + + # Function to process data and store the result in the shared list. + # In: chunk of dataframe, full dataframe, process's name, result dataframe, result index + def process_data(data_chunk, base_df, process_name, results, index): + print(f"{process_name} is processing...") + pm_results = [] + + for i, row in data_chunk.iterrows(): + print(process_name, ':', i) + pm_temp_results = [] + for j, candidate in base_df.iterrows(): + if pd.notna(row[name_col]) and pd.notna(row[address_col]) and row[name_col] != '' and row[address_col] != '' and pd.notna(candidate[name_col]) and pd.notna(candidate[address_col]) and candidate[name_col] != '' and candidate[address_col] != '' and row[customer_code_col] != candidate[customer_code_col]: + score_fuzz_name = fuzz.ratio(row[name_col], candidate[name_col]) + score_textdist_jaccard_name = tedi.jaccard.normalized_similarity(row[name_col], candidate[name_col]) * 100 + score_levenshtein_name = lev.ratio(row[name_col], candidate[name_col]) * 100 + score_rapid_name = rf.fuzz.ratio(row[name_col], candidate[name_col]) + + score_fuzz_address = fuzz.ratio(row[address_col], candidate[address_col]) + score_textdist_jaccard_address = tedi.jaccard.normalized_similarity(row[address_col], candidate[address_col]) * 100 + score_levenshtein_address = lev.ratio(row[address_col], candidate[address_col]) * 100 + score_rapid_address = rf.fuzz.ratio(row[address_col], candidate[address_col]) + + average_score_name = (score_fuzz_name + score_textdist_jaccard_name + score_levenshtein_name + score_rapid_name) / 4 + average_score_address = (score_fuzz_address + score_textdist_jaccard_address + score_levenshtein_address + score_rapid_address) / 4 + + average_score = (average_score_name * name_weight + average_score_address * address_weight) / (name_weight + address_weight) + + if average_score == 100: + similar = 'Matched' + elif average_score >= positive_threshold: + similar = 'Positive' + else: + similar = 'Negative' + + if similar != 'Negative': + pm_temp_results.append({ + 'td_id': row[canonical_id_col], + 'candidate_td_id': candidate[canonical_id_col], + 'customer_code': row[customer_code_col], + 'candidate_customer_code': candidate[customer_code_col], + 'name': row[name_col], + 'candidate_name': candidate[name_col], + 'address': row[address_col], + 'candidate_address': candidate[address_col], + 'phone': str(row[phone_col]), + 'candidate_phone': str(candidate[phone_col]), + 'email': row[email_col], + 'candidate_email': candidate[email_col], + 'vin': row[vin_col], + 'candidate_vin': candidate[vin_col], + 'cc_other': row[cc_other_col], + 'candidate_cc_other': candidate[cc_other_col], + 'vin_other': row[vin_other_col], + 'candidate_vin_other': candidate[vin_other_col], + 'similarity': similar, + 'average_score': average_score, + 'average_score_name': average_score_name, + 'average_score_address': average_score_address, + 'score_fuzz_name': score_fuzz_name, + 'score_textdist_jaccard_name': score_textdist_jaccard_name, + 'score_levenshtein_name': score_levenshtein_name, + 'score_rapid_name': score_rapid_name, + 'score_fuzz_address': score_fuzz_address, + 'score_textdist_jaccard_address': score_textdist_jaccard_address, + 'score_levenshtein_address': score_levenshtein_address, + 'score_rapid_address': score_rapid_address + }) + df_group = pd.DataFrame(pm_temp_results) + if len(df_group) > 0: + df_eval = df_group.groupby('td_id').agg( + best_score=('average_score', 'max'), + best_c_td_id=('candidate_td_id', lambda x: x[df_group['average_score'].idxmax()]), + best_proba=('similarity', lambda x: x[df_group['average_score'].idxmax()]), + best_name_score=('average_score_name', lambda x: x[df_group['average_score'].idxmax()]), + best_addr_score=('average_score_address', lambda x: x[df_group['average_score'].idxmax()]), + phone=('phone', lambda x: str(x[df_group['average_score'].idxmax()])), + c_phone=('candidate_phone', lambda x: str(x[df_group['average_score'].idxmax()])), + email=('email', lambda x: x[df_group['average_score'].idxmax()]), + c_email=('candidate_email', lambda x: x[df_group['average_score'].idxmax()]), + cc=('customer_code', lambda x: x[df_group['average_score'].idxmax()]), + c_cc=('candidate_customer_code', lambda x: x[df_group['average_score'].idxmax()]), + customer_name=('name', lambda x: x[df_group['average_score'].idxmax()]), + c_customer_name=('candidate_name', lambda x: x[df_group['average_score'].idxmax()]), + address=('address', lambda x: x[df_group['average_score'].idxmax()]), + c_address=('candidate_address', lambda x: x[df_group['average_score'].idxmax()]), + vin=('vin', lambda x: x[df_group['average_score'].idxmax()]), + c_vin=('candidate_vin', lambda x: x[df_group['average_score'].idxmax()]), + cc_other=('cc_other', lambda x: x[df_group['average_score'].idxmax()]), + c_cc_other=('candidate_cc_other', lambda x: x[df_group['average_score'].idxmax()]), + vin_other=('vin_other', lambda x: x[df_group['average_score'].idxmax()]), + c_vin_other=('candidate_vin_other', lambda x: x[df_group['average_score'].idxmax()]), + all_proba=('similarity', lambda x: list(x.sort_values(ascending=False))), + all_score=('average_score', lambda x: list(x.sort_values(ascending=False))), + all_c_td_id=('candidate_td_id', lambda x: list(x.sort_values(ascending=False))) + ).reset_index() + pm_results.append(df_eval.iloc[0].tolist()) + + print(f"{process_name} finished processing.") + + # Store result in the corresponding index of the results list + results[index] = pm_results + + # Function to create and start processes + # In: full dataframe, number of processes + def run_in_processes(base_df, num_processes): + def gen_chunks(df, column_name, num_subsets): + # Sort the DataFrame by the specified column + df = df.sort_values(by=column_name) + # Split the DataFrame into the required number of subsets + subsets = [] + subset_size = len(df) // num_subsets + remainder = len(df) % num_subsets + + start_idx = 0 + for i in range(num_subsets): + # Calculate the size of the current subset + current_subset_size = subset_size + (1 if i < remainder else 0) + end_idx = start_idx + current_subset_size + # Get the current subset as a dataframe + subset = df.iloc[start_idx:end_idx].copy() + # Append the subset to the list + subsets.append(subset) + # Update the start index for the next iteration + start_idx = end_idx + return subsets + + print("Generating chunks...") + list_df = gen_chunks(base_df, customer_code_col, num_processes) + print("Finished generating chunks!") + processes = [] + + # Using Manager to create a shared list for collecting results + with multiprocessing.Manager() as manager: + results = manager.list([None] * num_processes) # Shared list to collect results + + # Create processes and assign each one a chunk of the data + for i in range(num_processes): + data_chunk = list_df[i] + print("data chunk ", i, " ", data_chunk) + + # Pass the shared results list and the index to store each process's result + process = multiprocessing.Process(target=process_data, args=(data_chunk, base_df, f"Process-{i}", results, i)) + processes.append(process) + + # Start processes + for process in processes: + process.start() + + # Wait for all processes to complete + for process in processes: + process.join() + + # Combine results + combined_results = [item for sublist in results for item in sublist] + return combined_results + + # Run the multiprocessing computation. + final_result = run_in_processes(base_df, num_processes) + result_df = pd.DataFrame(final_result) + result_df.columns = ['td_id', 'best_score', 'best_candidate_td_id', 'best_proba', 'best_name_score', 'best_addr_score', 'phone', 'best_candidate_phone', 'email', 'best_candidate_email', 'customer_code', 'best_candidate_customer_code', 'name', 'best_candidate_name', 'address', 'best_candidate_address', 'vin', 'best_candidate_vin', 'cc_other', 'best_candidate_cc_other', 'vin_other', 'best_candidate_vin_other', 'all_proba', 'all_score', 'all_candidate_td_id'] + print("All processes have finished processing.") + + # Write results to TD table. + td.load_table_from_dataframe(result_df,f'{database}.{out_table}',writer='bulk_import',if_exists='append') + print(f"Fin {part}") + +# Main +if __name__ == "__main__": + main() \ No newline at end of file