"""
Pair Best Similarity/Distance.
---------------------------------------
This module contains classes to find the best similarity/distance equation based on the target match.
"""
import numpy as np
import pandas as pd
from sklearn.metrics import confusion_matrix, roc_auc_score
from tqdm.autonotebook import tqdm
import random
import measurenary.similarity as sim
import measurenary.distance as dis
from measurenary.utility import *
# disable runtime warning
import warnings
warnings.filterwarnings('ignore', category=RuntimeWarning)
[docs]class PairBestMeasure():
"""
Class to get the best similarity and distance based on target pairs.
Parameters
----------
show_result : bool, optional
Set True to show result. The default is False.
result_count : int, optional
Set the number of result to show. The default is 5.
"""
def __init__(self, show_result: bool = False, result_count: int = 5):
# set up all similarity and dissimilarity equation
self.f_sim = [val for _, val in sim.__dict__.items() if callable(val)][6:]
self.f_dis = [val for _, val in dis.__dict__.items() if callable(val)][3:]
self.name_f_sim = [eq.__name__ + ' similarity' for eq in self.f_sim]
self.name_f_dis = [eq.__name__ + ' distance' for eq in self.f_dis]
self.result_count = result_count
self.show_result = show_result
def _find_confusion_matrix(self, df_source: pd.DataFrame, index_start: int, index_end: int) -> np.ndarray:
"""
method to generate a confusion matrix and similarity/distance results from every record pair in data binary dataframe on specific index.
Parameters
----------
df_source : pandas.DataFrame
dataframe source
index_start : int
start index
index_end : int
end index
Returns
-------
list
List of confusion matrix and similarity/distance values.
"""
# # initiate return list
row_total = index_end//2 * (1 + index_end)
res_list = np.zeros((row_total,4+len(self.f_dis + self.f_sim)), dtype=object)
counter = 0
# loop for every row in data
for index, row in tqdm(df_source.iloc[index_start:, :].iterrows()):
if index == index_end:
break
# print('sample', index, 'of', index_end)
for _index, _row in df_source.iloc[index+1:, :].iterrows():
# generate a confusion matrix
cm = confusion_matrix(row.to_list(), _row.to_list()).ravel()
# match the reference column
## 1 if match
## 0 if it doesn't match
is_match = True if int(df_source.iloc[index, -1]) == int(df_source.iloc[_index, -1]) else False
# calculate the similarity/distance value based on confusion matrix before
_sim = []
for _, f in enumerate(self.f_sim):
try:
sim = f(cm[3], cm[1], cm[2], cm[0], sum(cm))
if sim == None:
sim = np.nan
except:
sim = np.nan
_sim.append(sim)
for _, f in enumerate(self.f_dis):
try:
dis = 1 - (f(cm[3], cm[1], cm[2], cm[0], sum(cm)))**2
if dis == None:
dis = np.nan
except:
dis = np.nan
_sim.append(dis)
# combine confusion matrix and similarity/distance result
yield [index, _index, cm.ravel(), is_match] + _sim
[docs] def fit(self, df: pd.DataFrame, use_seed: bool = False, num_sample: int = 20, **kwargs):
"""
Train data to generate a suitable similarity/distance equation.
Parameters
----------
df : pandas.DataFrame
dataframe source
use_seed : bool, optional
Set True to use seed. The default is False.
num_sample : int, optional
Set the number of sample to generate. The default is 20.
Returns
-------
None
"""
# set up show result and result count in kwargs
if 'show_result' in kwargs:
self.show_result = kwargs['show_result']
if 'result_count' in kwargs:
self.result_count = kwargs['result_count']
# check boolean value of use_seed
if not isinstance(use_seed, bool):
raise Exception('use_seed must be boolean')
# check df is pandas dataframe
if not isinstance(df, pd.DataFrame):
raise Exception('df must be pandas dataframe')
# check num sample
if not isinstance(num_sample, int):
raise Exception('num_sample must be integer')
if num_sample <= 0:
raise Exception('num_sample must be greater than 0')
# set up index start and end
index_start = 0
index_end = df.shape[0]
res_list = list(self._find_confusion_matrix(df, index_start, index_end))
res_arr = np.array(res_list, dtype=object)
# print(res_arr.shape)
# set up equation function name
name_f_sim, name_f_dis = self.name_f_sim, self.name_f_dis
# create a dataframe from _find_confusion_matrix list result
res_df = pd.DataFrame(res_arr, columns=['i', 'j', 'confussion_matrix', 'is_match'] + name_f_sim + name_f_dis)
# get value of similarity/distance measurement
sim_df = pd.DataFrame(res_df.iloc[:, 4:], columns=name_f_sim + name_f_dis)
# change inf and -inf value to np.nan
sim_df.replace([np.inf, -np.inf], np.nan, inplace=True)
# get column name that has nan in sim_df and drop it
column_nan = sim_df.columns[sim_df.isnull().any()].tolist()
sim_df.drop(column_nan, axis=1, inplace=True)
# add column name that has nan
nan_value_equation = column_nan
# print excluded equation that produce nan
if len(nan_value_equation) != 0:
name_f_sim = [e for e in name_f_sim if e not in nan_value_equation]
name_f_dis = [e for e in name_f_dis if e not in nan_value_equation]
# remove column
res_df.drop(list(nan_value_equation), axis=1, inplace=True)
sim_df = pd.concat([sim_df, res_df['is_match']], axis=1)
# separate is_match from scaled_sim_df to true_df and false_df
true_df = sim_df[sim_df['is_match'] == True]
false_df = sim_df[sim_df['is_match'] == False]
# get length of 1's and 0's
n_true = len(true_df)
n_false = len(false_df)
# drop is_match column and transform to list
auc_list = [sim_df.drop('is_match', axis=1).columns.to_list()]
seed_list = []
for i in tqdm(range(num_sample)):
# set seed
if use_seed:
seed = i * 10
random.seed(seed)
seed_list.append(seed)
# handle imbalance data
if n_true <= n_false:
sample_index = random.sample(range(0, n_false), n_true)
false_sample_df = false_df.iloc[sample_index, :]
combined_df = pd.concat([true_df, false_sample_df])
else:
sample_index = random.sample(range(0, n_true), n_false)
true_sample_df = true_df.iloc[sample_index, :]
combined_df = pd.concat([true_sample_df, false_df])
_auc_list = []
for column in combined_df.drop('is_match', axis=1).columns:
# compute min max score
score = minMaxNormalization(combined_df[column])
roc_auc = roc_auc_score(combined_df['is_match'].astype(int), score)
_auc_list.append(roc_auc)
auc_list.append(_auc_list)
auc_arr = np.array(auc_list)
auc_df = pd.DataFrame(auc_arr.T, columns=['sim/dis name'] + ['iter %d' % (i+1) for i in range(num_sample)])
auc_df['mean_auc'] = auc_df.iloc[:, 1:].astype(float).mean(axis=1)
auc_df = auc_df.sort_values(by='mean_auc', ascending=False, ignore_index=True)
self.res_df = res_df
self.sim_df = sim_df
self.auc_df = auc_df
self.seed_list = seed_list
self.nan_value_equation = nan_value_equation
# print result if self.show_result is True
if self.show_result:
print('\nfinal {} best similarity: '.format(str(self.result_count)))
print(auc_df[['sim/dis name', 'mean_auc']].head(self.result_count))
[docs] def get_result(self, csv: bool = False) -> pd.DataFrame:
"""
Get result of training.
Parameters
----------
csv : bool, optional
Set True to get result in csv format. The default is False.
Returns
-------
pandas.DataFrame
dataframe result
Raises
------
Exception
If method fit() is not called before this function.
"""
if self.auc_df.empty:
raise Exception('Fit your data first with \'fit\' method')
if csv:
self.auc_df.to_csv('result.csv', index=False)
return self.auc_df