Antonio G. Quintela
Support Vectors Machines are generalizations of linear decision boundaries for linearly nonseparable cases. It is an optimization based algorithm based on the idea of soft constraint optimization in which nondesirable decisions are penalized following a cost function. Depending on the loss function, the optimization problem could be a convex optimization problem. That convex optimization problems are highly interesting for machine learning because of simplicity and the quantity of possiblities we have to optimize it.
The case of the SVM we have a linear model that we can define as:
$$f(x) = x^Tw + w_0$$and we can do predictions in the binary classification by applying $sign$ function,
$$y_{pred} = sign(x^Tw + w_0)$$The margin can be defined as $m(w) = y\cdot (x^Tw + w_0)$. If $m > 0$ it is the margin safety by which f(x) is correct. If $m < 0$ then $m$ is a measure of the margin by which f(x) is wrong.
The problem could be defined in the form,
$$w = \underset{w}{\mathrm{argmin}} \quad \mathbb{L}(w)$$The part of the cost it could be composed for different terms. The term represented the loss and a term to penalize high weight parameters, called regularization. The term of the cost can be descomposed by the samples allowing us to apply SGD.
$$ \mathbb{L}(w) = \sum_{i=0}^{n\_samp}\ell(m_t(w)) + \frac{\lambda}{2} \| w \|^2$$We could say that this optimization problem is a SVM when $\ell(m_t(w))$ is a Hinge loss function.
Hinge loss is the function used to define the margin optimization.
$${\displaystyle \ell(y) = \max(0, 1-y_{pred} \cdot y)}$$The interesting of Hinge is that is a convex function. It is not differenciable but its gradient with respect the weight parameters can be analytical expressed in a two part function in which,
$${\frac {\partial \ell }{\partial w_{i}}}={\begin{cases}-y_{pred}\cdot x_{i}&{\text{if }}y_{pred}\cdot y<1\\0&{\text{otherwise}}\end{cases}}$$Stochastic Gradient Descent or SGD for short is an iterative Gradient Descent in which it is used some parts of the sample to optimize locally using the gradient of the curve in a given point of the parameter space trying to find the maxima or the minima.
$${\displaystyle w:=w-\eta \nabla L_{i}(w)}$$in which $\eta$ is the learning rate.
import numpy as np
import matplotlib.pyplot as plt
svm.py
import numpy as np import time class SVM(object): """SVM with SGD optimizer. """ def __init__(self, loss='Hinge', batch_size=10, n_epochs=0, learning_rate=0.0001, stop_step=.000001, lambda_par=1., verbose=False, history=True): ## Setting loss loss_functions = {'hinge': Hinge} if type(loss) == str: self.lossf = loss_functions[loss.lower()]() else: self.lossf = loss ## Optimizer parameters self.optimizer = 'SGD' self.n_epochs = n_epochs self.batch_size = batch_size self.learning_rate = learning_rate self.stop_step = stop_step ## Model parameters self._reset_model() self.lambda_par = lambda_par ## Tracking results self.last_loss = 1e16 self.change_loss = 1e16 self.history = history self._reset_history() def predict(self, X): return np.dot(X, self.w)+self.w0 def score(self, X, y): """Scoring with accuracy measure by default.""" y_pred = self.predict(X) return accuracy(y_pred, y) def compute_epoch_measures(self, X_train, y_train, X_test, y_test): N_samples = len(X_train) testing_phase = (X_test is not None) and (y_test is not None) if not testing_phase: loss_test = None acc_test = None # Prediction train y_p_train = self.predict(X_train) # Hinge loss train reg_term = self._regularization() lossf_term = self.lossf.loss(y_p_train, y_train) losses_epoch = reg_term+lossf_term # Change loss self.change_loss = self.last_loss-losses_epoch self.last_loss = losses_epoch # Accuracy train train_acc = accuracy(np.sign(y_p_train), y_train) if testing_phase: # Prediction test y_p_test = self.predict(X_test) # Hinge loss test loss_test = self.lossf.loss(y_p_test, y_test) loss_test += reg_term #Accuracy test acc_test = accuracy(np.sign(y_p_test), y_test) # Add to history self._add_epoch_to_history(losses_epoch, train_acc, loss_test, acc_test) # Add new epoch to the counter self.epoch_learned += 1 def fit(self, X_train, y_train, X_test=None, y_test=None): """""" # Setting administrative parameters self._reset_history() t0 = time.time() # Setting general paramaters N_samples, n_feats = X_train.shape # Initialize model parameters self._initialization_weights(n_feats) # Setting loop epochs keep = True i = 0 while keep: # Shuffling data data, labels = permut_data(X_train, y_train) # Loop over batches for x_batch, y_batch in self._batch_generator(data, labels): # Batch prediction y_batch_pred = self.predict(x_batch) # Compute gradient gradloss_w, gradloss_w0 =\ self.lossf.gradient_loss(y_batch_pred, y_batch, x_batch) gradloss_w += self.lambda_par*self.w gradloss_w0 += self.lambda_par*self.w0 # Paramters update self.w -= self.learning_rate*gradloss_w self.w0 -= self.learning_rate*gradloss_w0 grad_w_reg, grad_w0_reg = self._gradient_regularization() gradloss_w += grad_w_reg gradloss_w0 += grad_w0_reg # Tracking loss self.compute_epoch_measures(X_train, y_train, X_test, y_test) # Tracking loop i += 1 keep = stop_condition(self.change_loss, i, self.n_epochs, self.stop_step) self.fit_times = time.time()-t0 return self def report_results(self): report = "="*25+"\n" report += "learning_rate = {}\n" report += "regularization = {:.2f}\n" report += "batch_size = {}\n" report += "="*25+"\n" report += "time expended: {:.2f}s\n" report += "num. epochs: {}\n" report += "best epoch acc: {}\n" report += "accuracy train: {:.4f}\n" report += "accuracy test: {:.4f}\n" report += "="*25+"\n" # The criteria of get the accuracy will be the accuracies in the epoch with # better test accuracy. best_i = np.argmax(self.test_accuracy_history) train_accuracy = self.train_accuracy_history[best_i] test_accuracy = self.test_accuracy_history[best_i] report = report.format(self.learning_rate, self.lambda_par, self.batch_size, self.fit_time, self.epoch_learned, best_i, train_accuracy, test_accuracy) print(report) def _add_epoch_to_history(self, train_loss, train_accuracy, test_loss=None, test_accuracy=None): if self.history: self.train_loss_history.append(train_loss) self.train_accuracy_history.append(train_accuracy) if test_loss is not None: self.test_loss_history.append(test_loss) self.test_accuracy_history.append(test_accuracy) def _reset_history(self): self.epoch_learned = 0 self.fit_time = 0. self.train_loss_history = None self.test_loss_history = None self.train_accuracy_history = None self.test_accuracy_history = None if self.history: self.train_loss_history = [] self.train_accuracy_history = [] self.test_loss_history = [] self.test_accuracy_history = [] def _reset_model(self): self.w = None self.w0 = None def _initialization_weights(self, n_feats, init_type='gauss'): if init_type == 'zeros': self.w0 = 0. self.w = np.zeros(n_feats) elif init_type == 'gauss': self.w0 = np.random.randn() self.w = np.random.randn(n_feats) def _batch_generator(self, data, labels): """We can implement here different options to sample batches.""" N_samples = len(data) for init, endit in batch_size_iter(N_samples, self.batch_size): x_batch = data[init:endit] y_batch = labels[init:endit] yield x_batch, y_batch def _regularization(self): """By default it is coded 'L2' but it could be implemented others.""" return 0.5*(np.dot(self.w, self.w)+self.w0**2)*self.lambda_par def _gradient_regularization(self): return self.lambda_par*self.w, self.lambda_par*self.w0 class LossFunction(object): """General object for loss functions.""" def __init__(self): required_functions = ['loss', 'gradient_loss'] for req in required_functions: assert(req in dir(self)) class Hinge(LossFunction): """Loss function for trainning binary linear classifiers with target {-1, 1}. It computes the aggregated loss, not the average per sample. """ def __init__(self, threshold=1.0): self.threshold = threshold super(Hinge, self).__init__() def loss(self, y_pred, y_true): z = y_pred * y_true loss = np.mean((self.threshold - z)*(z <= self.threshold).astype(float)) return loss def gradient_loss(self, y_pred, y, x): """Derivation dL/dw. It is separated the output for w and w0. """ z = y_pred * y in_margin = (z <= self.threshold).astype(float) gradloss_w = -np.dot(y*in_margin, x) gradloss_w0 = -np.sum(y*in_margin) return gradloss_w, gradloss_w0 def accuracy(y_pred, y_true): return np.sum(y_true == y_pred) / float(y_true.shape[0]) def batch_size_iter(data_size, batch_size): init = 0 keep = True while keep: endit = init+batch_size if endit >= data_size: endit = data_size keep = False yield init, endit init += batch_size def stop_condition(loss_change, i, N, stop_step): if N != 0: if i >= N: return False if loss_change < stop_step: return False return True def permut_data(data, labels=None): n_samples = len(data) reindices = np.random.permutation(n_samples) if labels is None: return data[reindices] else: return data[reindices], labels[reindices]
from svm import SVM
tests.py
import numpy as np import unittest from itertools import product from svm import * class PermutationDataTest(unittest.TestCase): def testpropershape(self): data = np.random.random((10, 4)) labels = np.random.randint(0, 2, 10)*2-1 data_per = permut_data(data) self.assertEqual(data_per.shape, data.shape) data_per, labels_per = permut_data(data, labels) self.assertEqual(data_per.shape, data.shape) self.assertEqual(labels_per.shape, labels.shape) class StopperTest(unittest.TestCase): def test_stopbychange(self): N = 0 i = 10 loss_change = 0.5 stop_step = 0.4 ifkeep = stop_condition(loss_change, i, N, stop_step) self.assertTrue(ifkeep) stop_step = 0.6 ifkeep = stop_condition(loss_change, i, N, stop_step) self.assertFalse(ifkeep) def test_stopbyiter(self): loss_change = 0.5 stop_step = 0.4 N = 100 i = 10 ifkeep = stop_condition(loss_change, i, N, stop_step) self.assertTrue(ifkeep) i = 100 ifkeep = stop_condition(loss_change, i, N, stop_step) self.assertFalse(ifkeep) class BatchCreatorTest(unittest.TestCase): def test_run_batch_iterator(self): data_size = 100 batch_size = 9 for init, endit in batch_size_iter(data_size, batch_size): self.assertTrue(init != endit) self.assertTrue(init < endit) self.assertEqual(endit, data_size) data_size = 100 batch_size = 10 for init, endit in batch_size_iter(data_size, batch_size): self.assertTrue(init != endit) self.assertTrue(init < endit) self.assertEqual(endit, data_size) class AccuracyFunctionTest(unittest.TestCase): def test_order_independency(self): n = 10 n_tests = 20 for i in range(n_tests): y0 = np.random.randint(0, 2, n) y1 = np.random.randint(0, 2, n) reindices = np.random.permutation(n) self.assertEqual((accuracy(y0, y1), accuracy(y0[reindices], y1[reindices])) def test_symetry(self): n = 10 n_tests = 20 for i in range(n_tests): y0 = np.random.randint(0, 2, n) y1 = np.random.randint(0, 2, n) self.assertEqual((accuracy(y0, y1), accuracy(y1, y0)) class HingeLossTest(unittest.TestCase): def _generator_labels(self, n): return np.random.randint(0, 2, n)*2-1 def test_loss(self): n = 20 y0 = np.random.random(n)*2-1 y1 = self._generator_labels(n) thresholds = [0, 1, 2] for thr in thresholds: lossf = Hinge(thr) lossf.loss(y0, y1) def test_gradient(self): n, n_feats = 20, 10 y0 = np.random.random(n)*2-1 y1 = self._generator_labels(n) x = np.random.random((n, n_feats)) thresholds = [0, 1, 2] for thr in thresholds: lossf = Hinge(thr) grad_w, grad_w0 = lossf.gradient_loss(y0, y1, x) self.assertEqual((len(grad_w), n_feats) class SVMTest(unittest.TestCase): def _instantiator(self): loss=['Hinge', Hinge()] batch_size=[10] n_epochs=[0, 100] learning_rate=[0.0001, 1.] stop_step=[.000001, 100] lambda_par=[0.01, 1., 10.] verbose=[True, False] history=[True, False] possibilities = [loss, batch_size, n_epochs, learning_rate, stop_step, lambda_par, verbose, history] for p in product(*possibilities): model = SVM(*p) yield p, model def test_initialization(self): n, n_feats = 100, 20 data = np.random.random((n, n_feats)) labels = np.random.randint(0, 2, n)*2-1 for p, model in self._instantiator(): ## General asserts self.assertEqual(model.optimizer, 'SGD') self.assertEqual(model.batch_size, p[1]) self.assertEqual(model.n_epochs, p[2]) self.assertEqual(model.stop_step, p[4]) self.assertEqual(model.lambda_par, p[5]) self.assertIsNone(model.w) self.assertIsNone(model.w0) ## Special cases if not p[7]: self.assertIsNone(model.train_loss_history) self.assertIsNone(model.test_loss_history) self.assertIsNone(model.train_accuracy_history) self.assertIsNone(model.test_accuracy_history) ## Weights initialization model._initialization_weights(n_feats, init_type='gauss') model._reset_model() model._initialization_weights(n_feats, init_type='zeros') model._reset_model() model._reset_history() ## Batch creation testing for x_batch, y_batch in model._batch_generator(data, labels): self.assertTrue(len(x_batch) >= p[1]) ## Computer functions if p[7]: model._initialization_weights(n_feats, init_type='gauss') model.compute_epoch_measures(data, labels, None, None) model.compute_epoch_measures(data, labels, data, labels) def test_fitmodel(self): n, n_feats = 100, 5 data = np.random.random((n, n_feats)) labels = np.random.randint(0, 2, n)*2-1 for p, model in self._instantiator(): model.n_epochs = 100 model.fit(data, labels) model.fit(data, labels, data, labels) model.predict(data) model.score(data, labels) if p[7]: self.assertEqual(model.epoch_learned, len(model.train_loss_history)) self.assertEqual(model.epoch_learned, len(model.train_accuracy_history)) self.assertEqual(model.epoch_learned, len(model.test_loss_history)) self.assertEqual(model.epoch_learned, len(model.test_accuracy_history)) if __name__ == '__main__': unittest.main()
## Apply test to the code
%run tests.py
data = np.loadtxt('data/features.txt', delimiter=',')
target = np.loadtxt('data/target.txt', delimiter=',')
def weights_histogram(model):
title = r'Histogram of the distribution of weight values '
title += r'($\eta=0.0001$, $\lambda=1.$, $b_s={}$)'.format(model.batch_size)
fig = plt.figure(figsize=(10, 5))
_ = plt.hist(model.w, bins=20)
_ = plt.title(title)
_ = plt.xlabel('weights')
_ = plt.ylabel('Counts')
def losses_accross_training(model):
title = r'Loss through epochs of SGD '
title += r'($\eta=0.0001$, $\lambda=1.$, $b_s={}$)'.format(model.batch_size)
fig = plt.figure(figsize=(14, 6))
plt.xlim([0, len(model.train_loss_history)])
_ = plt.plot(model.train_loss_history, 'b--', label='train_loss')
_ = plt.plot(model.test_loss_history, 'r-', label='test_loss')
_ = plt.title(title)
_ = plt.xlabel("epochs")
_ = plt.ylabel("Loss")
legend = plt.legend(loc='upper right')
def acc_accross_training(model):
title = r'Accuracy through epochs of SGD '
title += r'($\eta=0.0001$, $\lambda=1.$, $b_s={}$)'.format(model.batch_size)
fig = plt.figure(figsize=(14, 6))
plt.xlim([0, len(model.train_accuracy_history)])
_ = plt.plot(model.train_accuracy_history, 'b--', label='train_loss')
_ = plt.plot(model.test_accuracy_history, 'r-', label='test_loss')
_ = plt.title(title)
_ = plt.xlabel("epochs")
_ = plt.ylabel("Accuracy")
legend = plt.legend(loc='lower right')
model = SVM(n_epochs=1000, batch_size=1, learning_rate=0.0001)
model = model.fit(data[:-1000], target[:-1000], data[-1000:], target[-1000:])
weights_histogram(model)
losses_accross_training(model)
acc_accross_training(model)
model.report_results()
model = SVM(n_epochs=1000, batch_size=10, learning_rate=0.0001)
model = model.fit(data[:-1000], target[:-1000], data[-1000:], target[-1000:])
weights_histogram(model)
losses_accross_training(model)
acc_accross_training(model)
model.report_results()
model = SVM(n_epochs=1000, batch_size=100, learning_rate=0.0001)
model = model.fit(data[:-1000], target[:-1000], data[-1000:], target[-1000:])
weights_histogram(model)
losses_accross_training(model)
acc_accross_training(model)
model.report_results()
The results are self-conclusives. Quicker convergence for smaller batch size but lower accuracy reached. On the other hand, for higher batch_size, you need more epochs to converge but reach higher accuracy.
The code has lack of completitude. The design is open to new improvements that are not in the code because of the lack of time and the purpose of that exercise.
There are several improvements over that code design. From control values of the step in order to keep numerical stability (it could be certain regions with huge slope, due to the nonderivative property of Hinge in certain points of the parameter space. Also we can make dynamic learning step in order to skip quickly uninteresting places of the loss landscape and put more effort in exploring the interesting ones.
It could be easily implmented other
If we would desire to use different optimizers probably it had been better to keep separated the optimizer and the model.