Source code for measurenary.agglomerative_best

"""
Agglomerative Best Similarity/Distance.
---------------------------------------

This module contains classes to find the best similarity/distance equation based on Agglomerative Clustering.
"""

import measurenary.similarity as sim
import measurenary.distance as dis
import measurenary.utility as util
import pandas as pd
import numpy as np
from sklearn.metrics import confusion_matrix, homogeneity_completeness_v_measure
from sklearn.metrics.cluster import adjusted_rand_score
from scipy.cluster.hierarchy import fclusterdata
from tqdm.autonotebook import tqdm
from typing import Union

# class to get best similarity/distance measurement based on Agglomerative Clustering Algorithm
[docs]class AgglomerativeBestMeasure(): """ A class to get best usage of similarity/distance with Agglomerative Clustering. Parameters ---------- show_result : bool, optional True if you want to show the result. The default is False. result_count : int, optional The number of result to print out. The default is 5. """ def __init__(self, show_result: bool = False, result_count: int = 5): # set up all similarity and distance equation self.f_sim = [val for _, val in sim.__dict__.items() if callable(val)][6:] self.f_dis = [val for _, val in dis.__dict__.items() if callable(val)][3:] self.linkage = ['complete', 'average', 'single', 'weighted'] self.linkage_df = [] self.show_result = show_result self.result_count = result_count
[docs] def fit(self, df: pd.DataFrame, n_clusters = 2, affinity = 'all', linkage = 'all', use_sampling = 'none', sample_rate = 0.1, **kwargs): """ Fit data with Agglomerative Clustering. Parameters ---------- df : pandas.DataFrame Dataframe to fit with Agglomerative Clustering n_clusters : int, optional Number of cluster to generate, by default 2 affinity : str, optional Type of affinity to use, by default 'all' linkage : str, optional Type of linkage to use, by default 'all' use_sampling : str, optional Sampling method that used to reduce computation time by sampling the data. It can be 'none' for not to implement sampling, 'random' to implement random sampling, and 'stratified' for implement stratified sampling sample_rate : float, optional Sampling rate to determine size of sample. Value from 0 to 1 Returns ------- None """ # set up show result and result count in kwargs if 'show_result' in kwargs: self.show_result = kwargs['show_result'] if 'result_count' in kwargs: self.result_count = kwargs['result_count'] # check seed if 'seed' in kwargs: seed = kwargs['seed'] # check if df is DataFrame if not isinstance(df, pd.DataFrame): raise Exception('df must be DataFrame') # check if n_clusters is greater than 1 if n_clusters <= 1: raise Exception('n_clusters must be greater than 1') # check sample rate if sample_rate < 0 or sample_rate > 1: raise Exception('sample_rate must be between 0 and 1') affinity_sim, affinity_dis = [], [] # check if affinity is valid if isinstance(affinity, str): if affinity == 'all': affinity_sim = self.f_sim affinity_dis = self.f_dis else: _found = False for f in self.f_sim: if affinity == f.__name__: affinity_sim.append(f) _found = True break for f in self.f_dis: if affinity == f.__name__ and not _found: affinity_dis.append(f) _found = True break if not _found: raise Exception('affinity must be at least one of the following string: ' + str(sim.get_all_functions_name() + dis.get_all_functions_name())) elif isinstance(affinity, list): if len(affinity) == 0: raise Exception('affinity must be at least one of the following string: ' + str(sim.get_all_functions_name() + dis.get_all_functions_name() + ['all'])) if len(affinity) == 1 and affinity[0] == 'all': affinity_sim = self.f_sim affinity_dis = self.f_dis # prevent double value _affinity_set = set(affinity) affinity = list(_affinity_set) for i in self.f_sim: if i.__name__ in affinity: affinity_sim.append(i) for i in self.f_dis: if i.__name__ in affinity: affinity_dis.append(i) func_len = len(affinity_sim) + len(affinity_dis) if 'yuleq' in affinity: func_len -= 1 if func_len < len(affinity): raise Exception('you have affinity that is not in the list of similarity or distance') # check if linkage is valid if isinstance(linkage, str): if linkage == 'all': linkage = self.linkage elif linkage in self.linkage: linkage = [linkage] else: raise Exception('linkage must be one of the following: ' + str(self.linkage)) elif isinstance(linkage, list): if len(linkage) == 0: raise Exception('linkage must be one of the following: ' + str(self.linkage)) for i in linkage: if i not in self.linkage: raise Exception('linkage must be one of the following: ' + str(self.linkage)) # check if use_sampling if use_sampling == 'random': if 'seed' in kwargs: df_sample = util.random_sampling_data(df, sample_rate, seed) else: df_sample = util.random_sampling_data(df, sample_rate) df = df_sample elif use_sampling == 'stratified': if 'seed' in kwargs: df_sample = util.stratified_sampling_data(df, sample_rate, seed) else: df_sample = util.stratified_sampling_data(df, sample_rate) df = df_sample elif use_sampling == 'none' or use_sampling == 'None': pass else: raise Exception('Value must either "none", "stratified", or "random"') # prepare affinity function def eq(x, y) -> None: pass # separate dataframe last column to true_values true_values = df.iloc[:, -1] # initate place holder for every linkage linkage_list = [] # loop for all linkage for l in tqdm(linkage): for equation in tqdm(affinity_sim, leave=False): # update eq function def eq(x, y): # print(x, y) cm = confusion_matrix(x, y).ravel() # print(cm) if len(cm) < 4: cm = np.append(cm, np.zeros(4 - len(cm))) eq_value = equation(cm[3], cm[2], cm[1], cm[0], sum(cm)) if eq_value == None or not np.isfinite(eq_value): eq_value = 0 eq_value = util.convertEquation(eq_value, type=2) return eq_value # scipy res = fclusterdata(df.iloc[:, :-1].values, 1.0, metric=eq, method=l) score = homogeneity_completeness_v_measure(true_values, res) adj_rand_score = adjusted_rand_score(true_values, res) linkage_list.append([l, 'sim ' + equation.__name__, score[0], score[1], score[2], adj_rand_score]) for equation in tqdm(affinity_dis, leave=False): # update eq function def eq(x, y): # print(x, y) cm = confusion_matrix(x, y).ravel() # print(cm) if len(cm) < 4: cm = np.append(cm, np.zeros(4 - len(cm))) eq_value = equation(cm[3], cm[2], cm[1], cm[0], sum(cm)) if eq_value == None or not np.isfinite(eq_value): eq_value = 0 return eq_value # scipy res = fclusterdata(df.iloc[:, :-1].values, 1.0, metric=eq, method=l) score = homogeneity_completeness_v_measure(true_values, res) adj_rand_score = adjusted_rand_score(true_values, res) linkage_list.append([l, 'dis ' + equation.__name__, score[0], score[1], score[2], adj_rand_score]) # convert to dataframe linkage_df = pd.DataFrame(linkage_list, columns=['linkage', 'equation', 'homogeneity', 'completeness', 'v_measure', 'adjusted_rand_index']) # sort by v_measure sort_by_column = 'adjusted_rand_index' if 'sort_by' in kwargs: sort_by_column = kwargs['sort_by'] self.linkage_df = linkage_df.sort_values(by=sort_by_column, ascending=False, ignore_index=True) # show result if show_result is True if self.show_result: print(self.linkage_df.head(self.result_count))
# function to print out the result of linkage
[docs] def get_result(self, csv: bool = False) -> pd.DataFrame: """ Return the result of best similarity equation that match with the best linkage Returns ------- result_df : pandas.DataFrame Dataframe that contains the result of best similarity equation that match with the best linkage """ if self.linkage_df.empty: raise Exception('Fit your data first with \'fit\' method') if csv: self.linkage_df.to_csv('result.csv', index=False) return self.linkage_df