# -*- coding: utf-8 -*-
"""
Class to implement the process of semi-supervised learning experiments.
"""
from .datasets import data_manipulate, base
from .base import BaseExperiments
from .utils.log_utils import get_logger
from sklearn.model_selection import ParameterGrid
from copy import deepcopy
from sklearn.externals.joblib import Parallel, delayed
import numpy as np
__all__ = ['SslExperimentsWithoutGraph', 'SslExperimentsWithGraph']
LOGGER = get_logger('Experiments')
[docs]class SslExperimentsWithoutGraph(BaseExperiments):
"""Semi-supervised learning experiments without graph.
This class implements a common process of SSL experiments in both
transductive and inductive settings. It optimize the hyper-parameters
using grid-search policy which is paralleled using multi-processing.
Parameters
----------
transductive : boolean, optional (default=True)
The experiment is transductive if True else inductive.
n_jobs : int, optional (default=1)
The nunmber of jobs to run the experiemnt.
all_class : boolean, optional (default=True)
Whether all split should have all classes.
Attributes
----------------
performance_metric_name : string, optional (default='accuracy_score')
The name of the metric.
metri_param : dict, optional (default={})
A dict store the
datasets : list
A list of tuple which store the information of datasets to run.
configs : list
A list of tuple which store the information of algorithms to evaluate.
performance_metric : callable
A callable object which is the evaluating method.
metri_param: dict
A dict which store the parameters for self.performance_metric.
Notes
-----
1. Multi-processing requests the estimator to be picklable. You may refer
to `__getstate__` and `__setstate__` methods when your self-defined
estimator has some problems with serialization.
"""
def __init__(self, transductive=True, n_jobs=1,
metri_param=dict(), all_class=True):
super(SslExperimentsWithoutGraph, self).__init__(
transductive=transductive, n_jobs=n_jobs, all_class=all_class)
[docs] def experiments_on_datasets(self, unlabel_ratio=0.8, test_ratio=0.3,
number_init=5):
"""
The datasets are splits randomly or based on given splits. Get
Label/Unlabel splits for each dataset in this funciton and conduct
experiments on them. Results are stored for each dataset.
Parameters
----------
unlabel_ratio : float
The ratio of test data for each dataset.
number_init : int
Different label initializations for each dataset.
Returns
-------
results : dict
{dataset_name: {config_name:[scores]} }
"""
if len(self.datasets) == 0:
LOGGER.debug("Haven't specified datasets.")
return
results = dict()
# dict: {dataset_name: {config_name:[scores]} }
best_estimators = dict()
# dict: {dataset_name: {config_name: [estimators]} }
for name, feature_file, label_file, split_path, _ in self.datasets:
# load dataset
X, y = base.load_dataset(name, feature_file, label_file)
LOGGER.debug("Load dataset {}".format(name))
if split_path: # provided split file
_, test_idxs, labeled_idxs, unlabeled_idxs = \
data_manipulate.split_load(split_path, name)
# load split_file from disk
else: # no provided split file
if self.transductive:
_, test_idxs, labeled_idxs, unlabeled_idxs = \
data_manipulate.inductive_split(X=X, y=y,
test_ratio=0.,
initial_label_rate=1 - unlabel_ratio,
split_count=number_init,
all_class=self.all_class
)
else:
_, test_idxs, labeled_idxs, unlabeled_idxs = \
data_manipulate.inductive_split(X=X, y=y,
test_ratio=test_ratio,
initial_label_rate=(
1 - test_ratio) * (1 - unlabel_ratio),
split_count=number_init,
all_class=self.all_class)
results[name], best_estimators[name], preds = \
self._experiment_on_single_dataset(X=X, y=y,
labeled_idxs=labeled_idxs, unlabeled_idxs=unlabeled_idxs,
test_idxs=test_idxs, number_init=number_init) # RUN
self._evaluate_selected_model(data_name=name, preds=preds,
y_truth=y, unlabeled_idxs=unlabeled_idxs)
#########
# Do something on results
#########
LOGGER.info(results)
return results
def _experiment_on_single_dataset(self, X, y, labeled_idxs, unlabeled_idxs,
test_idxs, number_init=5):
"""
Experiments with different label initialization on a single dataset.
Called by experiments_on_datasets method.
Parameters
----------
X : np.ndarray
Feaetures
y : np.ndarray
Labels
labeled_idxs : list of [array-like]
List of labeled_idxs
unlabeled_idxs : list of [array-like]
List of unlabeled_idxs
test_idxs : list of [array-like]
List of testing_idxs
number_init : int
Number of differnt initialized labeled data
Returns
-------
AveScores : dict === {config_name : [float] }
A dict storing the average result of each estimator
BestEstimators : dict === {config_name : [estimator] }
A dict storing the best estimator in each config
preds: dict === {config_name : [np.ndarray] }
A dict storing the prediction of the best estimator on testing data
"""
# label/unlabel initialization
number_init = len(labeled_idxs) if labeled_idxs is not None \
else number_init
# experiments on given configs
scores = dict()
# config_name -> [score_1, score_2,..., score_num_initial]
estimators = dict()
# config_name -> [estimator_1, ..., estimator]
preds = dict()
for name, estimator, param_dict in self.configs: # for all configs
LOGGER.debug("Estimator: {}".format(name))
if self.transductive:
if self.n_jobs == 1 or self.n_jobs == 0:
scores[name] = [0.] * number_init
estimators[name] = [[]] * number_init
preds[name] = [[]] * number_init
for i in range(number_init): # for each data split
scores[name][i], estimators[name][i], preds[name][i] = \
SslExperimentsWithoutGraph._evaluate_on_single_split(
name=name, estimator=estimator,
param_dict=param_dict, X=X, y=y,
labeled_idx=labeled_idxs[i],
unlabeled_idx=unlabeled_idxs[i],
performance_metric=self.performance_metric,
metri_param=self.metri_param,
metri_large_better=self.metri_large_better,
transductive=self.transductive)
else: # Multiprocessing
tmp = Parallel(n_jobs=min(number_init, self.n_jobs),
verbose=30)(delayed(
SslExperimentsWithoutGraph._evaluate_on_single_split)(
name=name, estimator=deepcopy(estimator),
param_dict=param_dict, X=X, y=y,
labeled_idx=labeled_idxs[i],
unlabeled_idx=unlabeled_idxs[i],
performance_metric=self.performance_metric,
metri_param=self.metri_param,
metri_large_better=self.metri_large_better,
transductive=self.transductive)
for i in range(number_init))
scores[name] = [t[0] for t in tmp]
estimators[name] = [t[1] for t in tmp]
preds[name] = [t[2] for t in tmp]
else:
if self.n_jobs == 1 or self.n_jobs == 0:
scores[name] = [0.] * number_init
estimators[name] = [[]] * number_init
preds[name] = [[]] * number_init
for i in range(number_init): # for each data split
scores[name][i], estimators[name][i], preds[name][i] = \
SslExperimentsWithoutGraph._evaluate_on_single_split(
name=name, estimator=estimator,
param_dict=param_dict, X=X, y=y,
labeled_idx=labeled_idxs[i],
unlabeled_idx=unlabeled_idxs[i],
performance_metric=self.performance_metric,
metri_param=self.metri_param,
metri_large_better=self.metri_large_better,
transductive=self.transductive,
test_idx=test_idxs[i])
else: # Multiprocessing
tmp = Parallel(n_jobs=min(number_init, self.n_jobs),
verbose=30)(delayed(
SslExperimentsWithoutGraph._evaluate_on_single_split)(
name=name, estimator=deepcopy(estimator),
param_dict=param_dict, X=X, y=y,
labeled_idx=labeled_idxs[i],
unlabeled_idx=unlabeled_idxs[i],
performance_metric=self.performance_metric,
metri_param=self.metri_param,
metri_large_better=self.metri_large_better,
transductive=self.transductive,
test_idx=test_idxs[i])
for i in range(number_init))
scores[name] = [t[0] for t in tmp]
estimators[name] = [t[1] for t in tmp]
preds[name] = [t[2] for t in tmp]
return scores, estimators, preds
@classmethod
def _evaluate_on_single_split(cls, name, estimator, param_dict, X, y,
labeled_idx, unlabeled_idx, performance_metric,
metri_param, metri_large_better, transductive,
test_idx=None, k=3,
search_strategy="grid_search"):
"""
Run single algorithm given parameters for a label initialization on
single dataset and get the evaluation in given metric.
Implement the gridsearch hyper-param optimization.
A unit of multiprocessing job.
Parameters
----------
name : string
estimator : object
X : np.ndarray
y : np.ndarray
labeled_idx :np.ndarray
unlabeled_idx : np.ndarray
performance_metric : callable
metri_param : dict
metri_large_better : boolean
transductive : boolean
test_idx : np.ndarray or None, optional
Used in inductive setting. Should be None when
self.transductive=True.
k : int
k-fold
search_strategy : str
Returns
-------
score: float
Evaluation score on the test data.
estimator: object
The best model for the label initialization.
pred: np.ndarray
The prediction made by the best model.
"""
param_list = list(ParameterGrid(param_dict))
best_param = None
best_score = -9999
# label data split
train_idxs, validation_idxs = data_manipulate.cv_split(X=X, y=y,
instance_indexes=labeled_idx, k=k,
split_count=1, all_class=True)
if transductive: # transductive setting
for param in param_list:
# hyper-parameter optimization with CV or other methods.
LOGGER.debug("Estimator: {} params: {}".format(name, param))
estimator.set_params(param) # need set_params
score = [0.] * len(train_idxs)
# PARALLEL HERE
for i in range(len(train_idxs)):
estimator.fit(X, y, train_idxs[i])
pred = estimator.predict(validation_idxs[i])
score[i] = performance_metric(y[validation_idxs[i]], pred,
metri_param)
LOGGER.debug("Training performance {}/{}-Fold: {}".format(i,
k, score[i]))
if metri_large_better:
if np.average(score) > best_score:
best_param = param
best_score = np.average(score)
else:
if np.average(score) < best_score:
best_param = param
best_score = np.average(score)
# Select the best_param, refit the estimator
estimator.set_params(best_param)
estimator.fit(X, y, labeled_idx)
pred = estimator.predict(unlabeled_idx)
best_score = performance_metric(y[unlabeled_idx], pred, metri_param)
LOGGER.debug("Validation performance: {}".format(best_score))
return best_score, estimator, pred
else: # inductive setting
for param in param_list:
# hyper-parameter optimization with CV or other methods.
LOGGER.debug("Estimator: {} params: {}".format(name, param))
estimator.set_params(param) # need set_params
score = [0.] * len(train_idxs)
for i in range(len(train_idxs)):
# train with instances in train_idx and unlabeled_idx
idx_tmp = np.concatenate([train_idxs[i], unlabeled_idx])
X_tmp = X[idx_tmp, :]
y_tmp = y[idx_tmp, :]
estimator.fit(X_tmp, y_tmp, np.arange(len(train_idxs[i])))
pred = estimator.predict(X[validation_idxs[i], :])
score[i] = performance_metric(y[validation_idxs[i]],
pred, metri_param)
LOGGER.debug("Training performance {}/{}-Fold: {}".format(i,
k, score[i]))
if metri_large_better:
if np.average(score) > best_score:
best_param = param
best_score = np.average(score)
else:
if np.average(score) < best_score:
best_param = param
best_score = np.average(score)
# Select the best_param, refit the estimator
estimator.set_params(best_param)
# train with instances in labeled_idx and unlabeled_idx
idx_tmp = np.concatenate([labeled_idx, unlabeled_idx])
X_tmp = X[idx_tmp, :]
y_tmp = y[idx_tmp, :]
estimator.fit(X_tmp, y_tmp, np.arange(len(labeled_idx)))
pred = estimator.predict(X[test_idx, :])
best_score = performance_metric(y[test_idx], pred, metri_param)
return best_score, estimator, pred
[docs]class SslExperimentsWithGraph(BaseExperiments):
"""Semi-supervised learning experiments with graph.
This class implements a common process of SSL experiments in both
transductive and inductive settings for graph-based methods. It optimize
the hyper-parameters using grid-search policy which is paralleled using
multi-processing.
Parameters
----------
transductive : boolean, optional (default=True)
The experiment is transductive if True else inductive.
n_jobs : int, optional (default=1)
The nunmber of jobs to run the experiemnt.
all_class : boolean, optional (default=True)
Whether all split should have all classes.
Attributes
----------------
performance_metric_name : string, optional (default='accuracy_score')
The name of the metric.
metri_param : dict, optional (default={})
A dict store the
datasets : list
A list of tuple which store the information of datasets to run.
configs : list
A list of tuple which store the information of algorithms to evaluate.
performance_metric : callable
A callable object which is the evaluating method.
metri_param: dict
A dict which store the parameters for self.performance_metric.
Notes
-----
1. Multi-processing requests the estimator to be picklable. You may refer
to `__getstate__` and `__setstate__` methods when your self-defined
estimator has some problems with serialization.
"""
def __init__(self, n_jobs=1):
super(SslExperimentsWithGraph, self).__init__(
transductive=True, n_jobs=n_jobs)
[docs] def experiments_on_datasets(self, unlabel_ratio=0.8, test_ratio=0.3,
number_init=5):
"""
The datasets are splits randomly or based on given splits. Get
Label/Unlabel splits for each dataset in this funciton and conduct
experiments on them. Results are stored for each dataset.
Parameters
----------
unlabel_ratio : float
The ratio of unlabeled data for each dataset.
test_ratio : float
The ratio of test data for each dataset. Is invalid when
`transductive`=True.
number_init : int
Different label initializations for each dataset.
Returns
-------
results : dict
{dataset_name: {config_name:[scores]} }
"""
if len(self.datasets) == 0:
LOGGER.debug("Haven't specified datasets.")
return
results = dict()
# dict === {dataset_name: {config_name:[scores]} }
best_estimators = dict()
# dict === {dataset_name: {config_name: [estimators]} }
for name, feature_file, label_file, split_path, graph_file \
in self.datasets:
# load dataset
X, y = base.load_dataset(name, feature_file, label_file)
W = base.load_graph(name, graph_file)
if split_path: # provided split file
_, test_idxs, labeled_idxs, unlabeled_idxs = \
data_manipulate.split_load(split_path, name)
# load split_file from disk
else: # no provided split file
if self.transductive:
_, test_idxs, labeled_idxs, unlabeled_idxs = \
data_manipulate.inductive_split(X=X, y=y, test_ratio=0.,
initial_label_rate=1 - unlabel_ratio,
split_count=number_init, all_class=True,
saving_path='.')
else:
_, test_idxs, labeled_idxs, unlabeled_idxs = \
data_manipulate.inductive_split(X=X, y=y,
test_ratio=test_ratio,
initial_label_rate=(1 - test_ratio
) * (1 - unlabel_ratio),
split_count=number_init, all_class=True,
saving_path='.')
results[name], best_estimators[name], preds = \
self._experiment_on_single_dataset(X=X, y=y, W=W,
labeled_idxs=labeled_idxs, unlabeled_idxs=unlabeled_idxs,
test_idxs=test_idxs, number_init=number_init)
self._evaluate_selected_model(data_name=name, preds=preds,
y_truth=y, unlabeled_idxs=unlabeled_idxs)
#########
# Do something on results
#########
LOGGER.info(results)
return results
def _experiment_on_single_dataset(self, X, y, W, labeled_idxs,
unlabeled_idxs, test_idxs, number_init=5):
"""Experiments with different label initialization on a single dataset.
Parameters
----------
X : np.ndarray
Feaetures
y : np.ndarray
Labels
labeled_idxs : list of [array-like]
List of labeled_idxs
unlabeled_idxs : list of [array-like]
List of unlabeled_idxs
test_idxs : list of [array-like]
List of testing_idxs
number_init : int
Number of differnt initialized labeled data
Returns
-------
AveScores : dict === {config_name : [float] }
A dict storing the average result of each estimator
BestEstimators : dict === {config_name : [estimator] }
A dict storing the best estimator in each config
preds: dict === {config_name : [np.ndarray] }
A dict storing the prediction of the best estimator on testing data
"""
# label/unlabel initialization
if not isinstance(labeled_idxs, list):
labeled_idxs = labeled_idxs.tolist()
number_init = len(labeled_idxs) if labeled_idxs else number_init
# experiments on given configs
scores = dict()
# config_name -> [score_1, score_2,..., score_num_initial]
estimators = dict()
# config_name -> [estimator_1, ..., estimator]
preds = dict()
for name, estimator, param_dict in self.configs: # for all configs
if self.n_jobs == 1 or self.n_jobs == 0:
scores[name] = [0.] * number_init
estimators[name] = [[]] * number_init
preds[name] = [[]] * number_init
for i in range(number_init): # for each data split
scores[name][i], estimators[name][i], preds[name][i] = \
SslExperimentsWithGraph._evaluate_on_single_split(
name=name, estimator=estimator,
param_dict=param_dict, X=X, y=y, W=W,
labeled_idx=labeled_idxs[i],
unlabeled_idx=unlabeled_idxs[i],
performance_metric=self.performance_metric,
metri_param=self.metri_param,
metri_large_better=self.metri_large_better,
transductive=self.transductive)
else: # Multiprocessing
tmp = Parallel(n_jobs=min(number_init, self.n_jobs),
verbose=30)(delayed(
SslExperimentsWithGraph._evaluate_on_single_split)(
name=name, estimator=estimator,
param_dict=param_dict, X=X, y=y, W=W,
labeled_idx=labeled_idxs[i],
unlabeled_idx=unlabeled_idxs[i],
performance_metric=self.performance_metric,
metri_param=self.metri_param,
metri_large_better=self.metri_large_better,
transductive=self.transductive)
for i in range(number_init))
scores[name] = [t[0] for t in tmp]
estimators[name] = [t[1] for t in tmp]
preds[name] = [t[2] for t in tmp]
return scores, estimators, preds
@classmethod
def _evaluate_on_single_split(cls, name, estimator, param_dict, X, y, W,
labeled_idx, unlabeled_idx, performance_metric,
metri_param, metri_large_better, transductive,
test_idx=None, k=3, search_strategy="grid_search"):
"""
Run single algorithm given parameters for a label initialization on
single dataset and get the evaluation in given metric.
Implement the gridsearch hyper-param optimization.
A unit of multiprocessing job.
Parameters
----------
name : string
estimator : object
X : np.ndarray
y : np.ndarray
W : np.ndarray
labeled_idx :np.ndarray
unlabeled_idx : np.ndarray
performance_metric : callable
metri_param : dict
metri_large_better : boolean
transductive : boolean
test_idx : np.ndarray or None, optional
Used in inductive setting. Should be None when
self.transductive=True.
k : int
k-fold
search_strategy : str
Returns
-------
score: float
Evaluation score on the test data.
estimator: object
The best model for the label initialization.
pred: np.ndarray
The prediction made by the best model.
Examples
-------
>>> param_grid = {'a': [1, 2], 'b': [True, False]}
>>> list(ParameterGrid(param_grid)) == (
[{'a': 1, 'b': True}, {'a': 1, 'b': False},
{'a': 2, 'b': True}, {'a': 2, 'b': False}])
>>> param_grid = {}
>>> list(ParameterGrid(param_grid)) == [{}]
"""
param_list = list(ParameterGrid(param_dict))
best_param = None
best_score = -9999
# label data split
train_idxs, validation_idxs = data_manipulate.cv_split(X=X, y=y,
instance_indexes=labeled_idx,
k=k, split_count=1, all_class=True)
for param in param_list:
# hyper-parameter optimization with CV or other methods.
estimator.set_params(param) # need set_params
score = [0.] * len(train_idxs)
for i in range(len(train_idxs)):
estimator.fit(X, y, train_idxs[i], W)
pred = estimator.predict(validation_idxs[i])
score[i] = performance_metric(y[validation_idxs[i]], pred,
metri_param)
if metri_large_better:
if np.average(score) > best_score:
best_param = param
best_score = np.average(score)
else:
if np.average(score) < best_score:
best_param = param
best_score = np.average(score)
# Select the best_param, refit the estimator
estimator.set_params(best_param)
estimator.fit(X, y, labeled_idx, W)
pred = estimator.predict(unlabeled_idx)
best_score = performance_metric(y[unlabeled_idx], pred, metri_param)
return best_score, estimator, pred