A toy problem: implement training a Support Vector Machine with SGD

Antonio G. Quintela

SVM

Support Vectors Machines are generalizations of linear decision boundaries for linearly nonseparable cases. It is an optimization based algorithm based on the idea of soft constraint optimization in which nondesirable decisions are penalized following a cost function. Depending on the loss function, the optimization problem could be a convex optimization problem. That convex optimization problems are highly interesting for machine learning because of simplicity and the quantity of possiblities we have to optimize it.

The case of the SVM we have a linear model that we can define as:

$$f(x) = x^Tw + w_0$$

and we can do predictions in the binary classification by applying $sign$ function,

$$y_{pred} = sign(x^Tw + w_0)$$

The margin can be defined as $m(w) = y\cdot (x^Tw + w_0)$. If $m > 0$ it is the margin safety by which f(x) is correct. If $m < 0$ then $m$ is a measure of the margin by which f(x) is wrong.

The problem could be defined in the form,

$$w = \underset{w}{\mathrm{argmin}} \quad \mathbb{L}(w)$$

The part of the cost it could be composed for different terms. The term represented the loss and a term to penalize high weight parameters, called regularization. The term of the cost can be descomposed by the samples allowing us to apply SGD.

$$ \mathbb{L}(w) = \sum_{i=0}^{n\_samp}\ell(m_t(w)) + \frac{\lambda}{2} \| w \|^2$$

We could say that this optimization problem is a SVM when $\ell(m_t(w))$ is a Hinge loss function.

Hinge Loss

Hinge loss is the function used to define the margin optimization.

$${\displaystyle \ell(y) = \max(0, 1-y_{pred} \cdot y)}$$

The interesting of Hinge is that is a convex function. It is not differenciable but its gradient with respect the weight parameters can be analytical expressed in a two part function in which,

$${\frac {\partial \ell }{\partial w_{i}}}={\begin{cases}-y_{pred}\cdot x_{i}&{\text{if }}y_{pred}\cdot y<1\\0&{\text{otherwise}}\end{cases}}$$

Stochastic Gradient Descent

Stochastic Gradient Descent or SGD for short is an iterative Gradient Descent in which it is used some parts of the sample to optimize locally using the gradient of the curve in a given point of the parameter space trying to find the maxima or the minima.

$${\displaystyle w:=w-\eta \nabla L_{i}(w)}$$

in which $\eta$ is the learning rate.

Imports

In [1]:
import numpy as np
import matplotlib.pyplot as plt

SVM code file

svm.py

import numpy as np
import time


class SVM(object):
    """SVM with SGD optimizer.
    """
    def __init__(self, loss='Hinge', batch_size=10, n_epochs=0, learning_rate=0.0001,
                 stop_step=.000001, lambda_par=1., verbose=False, history=True):

        ## Setting loss
        loss_functions = {'hinge': Hinge}
        if type(loss) == str:
            self.lossf = loss_functions[loss.lower()]()
        else:
            self.lossf = loss

        ## Optimizer parameters
        self.optimizer = 'SGD'
        self.n_epochs = n_epochs
        self.batch_size = batch_size
        self.learning_rate = learning_rate
        self.stop_step = stop_step

        ## Model parameters
        self._reset_model()
        self.lambda_par = lambda_par

        ## Tracking results
        self.last_loss = 1e16
        self.change_loss = 1e16
        self.history = history
        self._reset_history()

    def predict(self, X):
        return np.dot(X, self.w)+self.w0

    def score(self, X, y):
        """Scoring with accuracy measure by default."""
        y_pred = self.predict(X)
        return accuracy(y_pred, y)

    def compute_epoch_measures(self, X_train, y_train, X_test, y_test):
        N_samples = len(X_train)
        testing_phase = (X_test is not None) and (y_test is not None)
        if not testing_phase:
            loss_test = None
            acc_test = None

        # Prediction train
        y_p_train = self.predict(X_train)

        # Hinge loss train
        reg_term = self._regularization()
        lossf_term = self.lossf.loss(y_p_train, y_train)
        losses_epoch = reg_term+lossf_term

        # Change loss
        self.change_loss = self.last_loss-losses_epoch
        self.last_loss = losses_epoch

        # Accuracy train
        train_acc = accuracy(np.sign(y_p_train), y_train)

        if testing_phase:
            # Prediction test
            y_p_test = self.predict(X_test)
            # Hinge loss test
            loss_test = self.lossf.loss(y_p_test, y_test)
            loss_test += reg_term
            #Accuracy test
            acc_test = accuracy(np.sign(y_p_test), y_test)

        # Add to history
        self._add_epoch_to_history(losses_epoch, train_acc,
                                   loss_test, acc_test)
        # Add new epoch to the counter
        self.epoch_learned += 1

    def fit(self, X_train, y_train, X_test=None, y_test=None):
        """"""
        # Setting administrative parameters
        self._reset_history()
        t0 = time.time()
        # Setting general paramaters
        N_samples, n_feats = X_train.shape
        # Initialize model parameters
        self._initialization_weights(n_feats)
        # Setting loop epochs
        keep = True
        i = 0
        while keep:
            # Shuffling data
            data, labels = permut_data(X_train, y_train)
            # Loop over batches
            for x_batch, y_batch in self._batch_generator(data, labels):
                # Batch prediction
                y_batch_pred = self.predict(x_batch)
                # Compute gradient
                gradloss_w, gradloss_w0 =\
                    self.lossf.gradient_loss(y_batch_pred, y_batch, x_batch)
                gradloss_w += self.lambda_par*self.w
                gradloss_w0 += self.lambda_par*self.w0
                # Paramters update
                self.w -= self.learning_rate*gradloss_w
                self.w0 -= self.learning_rate*gradloss_w0
                grad_w_reg, grad_w0_reg = self._gradient_regularization()
                gradloss_w += grad_w_reg
                gradloss_w0 += grad_w0_reg
            # Tracking loss
            self.compute_epoch_measures(X_train, y_train, X_test, y_test)

            # Tracking loop
            i += 1
            keep = stop_condition(self.change_loss, i, self.n_epochs, self.stop_step)
        self.fit_times = time.time()-t0
        return self

    def report_results(self):
        report = "="*25+"\n"
        report += "learning_rate = {}\n"
        report += "regularization = {:.2f}\n"
        report += "batch_size = {}\n"
        report += "="*25+"\n"
        report += "time expended: {:.2f}s\n"
        report += "num. epochs: {}\n"
        report += "best epoch acc: {}\n"
        report += "accuracy train: {:.4f}\n"
        report += "accuracy test: {:.4f}\n"
        report += "="*25+"\n"
        # The criteria of get the accuracy will be the accuracies in the epoch with
        # better test accuracy.
        best_i = np.argmax(self.test_accuracy_history)
        train_accuracy = self.train_accuracy_history[best_i]
        test_accuracy = self.test_accuracy_history[best_i]
        report = report.format(self.learning_rate, self.lambda_par, self.batch_size,
                               self.fit_time, self.epoch_learned, best_i,
                               train_accuracy, test_accuracy)
        print(report)

    def _add_epoch_to_history(self, train_loss, train_accuracy,
                              test_loss=None, test_accuracy=None):
        if self.history:
            self.train_loss_history.append(train_loss)
            self.train_accuracy_history.append(train_accuracy)
            if test_loss is not None:
                self.test_loss_history.append(test_loss)
                self.test_accuracy_history.append(test_accuracy)

    def _reset_history(self):
        self.epoch_learned = 0
        self.fit_time = 0.
        self.train_loss_history = None
        self.test_loss_history = None
        self.train_accuracy_history = None
        self.test_accuracy_history = None
        if self.history:
            self.train_loss_history = []
            self.train_accuracy_history = []
            self.test_loss_history = []
            self.test_accuracy_history = []

    def _reset_model(self):
        self.w = None
        self.w0 = None

    def _initialization_weights(self, n_feats, init_type='gauss'):
        if init_type == 'zeros':
            self.w0 = 0.
            self.w = np.zeros(n_feats)
        elif init_type == 'gauss':
            self.w0 = np.random.randn()
            self.w = np.random.randn(n_feats)

    def _batch_generator(self, data, labels):
        """We can implement here different options to sample batches."""
        N_samples = len(data)
        for init, endit in batch_size_iter(N_samples, self.batch_size):
            x_batch = data[init:endit]
            y_batch = labels[init:endit]
            yield x_batch, y_batch

    def _regularization(self):
        """By default it is coded 'L2' but it could be implemented others."""
        return 0.5*(np.dot(self.w, self.w)+self.w0**2)*self.lambda_par

    def _gradient_regularization(self):
        return self.lambda_par*self.w, self.lambda_par*self.w0


class LossFunction(object):
    """General object for loss functions."""
    def __init__(self):
        required_functions = ['loss', 'gradient_loss']
        for req in required_functions:
            assert(req in dir(self))


class Hinge(LossFunction):
    """Loss function for trainning binary linear classifiers with target
    {-1, 1}. It computes the aggregated loss, not the average per sample.
    """

    def __init__(self, threshold=1.0):
        self.threshold = threshold
        super(Hinge, self).__init__()

    def loss(self, y_pred, y_true):
        z = y_pred * y_true
        loss = np.mean((self.threshold - z)*(z <= self.threshold).astype(float))
        return loss

    def gradient_loss(self, y_pred, y, x):
        """Derivation dL/dw. It is separated the output for w and w0.
        """
        z = y_pred * y
        in_margin = (z <= self.threshold).astype(float)

        gradloss_w = -np.dot(y*in_margin, x)
        gradloss_w0 = -np.sum(y*in_margin)

        return gradloss_w, gradloss_w0


def accuracy(y_pred, y_true):
    return np.sum(y_true == y_pred) / float(y_true.shape[0])


def batch_size_iter(data_size, batch_size):
    init = 0
    keep = True
    while keep:
        endit = init+batch_size
        if endit >= data_size:
            endit = data_size
            keep = False
        yield init, endit
        init += batch_size


def stop_condition(loss_change, i, N, stop_step):
    if N != 0:
        if i >= N:
            return False
    if loss_change < stop_step:
        return False
    return True


def permut_data(data, labels=None):
    n_samples = len(data)
    reindices = np.random.permutation(n_samples)
    if labels is None:
        return data[reindices]
    else:
        return data[reindices], labels[reindices]
In [2]:
from svm import SVM

Test code file

tests.py

import numpy as np
import unittest
from itertools import product

from svm import *


class PermutationDataTest(unittest.TestCase):

    def testpropershape(self):
        data = np.random.random((10, 4))
        labels = np.random.randint(0, 2, 10)*2-1

        data_per = permut_data(data)
        self.assertEqual(data_per.shape, data.shape)

        data_per, labels_per = permut_data(data, labels)
        self.assertEqual(data_per.shape, data.shape)
        self.assertEqual(labels_per.shape, labels.shape)


class StopperTest(unittest.TestCase):

    def test_stopbychange(self):
        N = 0
        i = 10
        loss_change = 0.5

        stop_step = 0.4
        ifkeep = stop_condition(loss_change, i, N, stop_step)
        self.assertTrue(ifkeep)

        stop_step = 0.6
        ifkeep = stop_condition(loss_change, i, N, stop_step)
        self.assertFalse(ifkeep)

    def test_stopbyiter(self):
        loss_change = 0.5
        stop_step = 0.4
        N = 100

        i = 10
        ifkeep = stop_condition(loss_change, i, N, stop_step)
        self.assertTrue(ifkeep)

        i = 100
        ifkeep = stop_condition(loss_change, i, N, stop_step)
        self.assertFalse(ifkeep)


class BatchCreatorTest(unittest.TestCase):

    def test_run_batch_iterator(self):
        data_size = 100
        batch_size = 9
        for init, endit in batch_size_iter(data_size, batch_size):
            self.assertTrue(init != endit)
            self.assertTrue(init < endit)
        self.assertEqual(endit, data_size)

        data_size = 100
        batch_size = 10
        for init, endit in batch_size_iter(data_size, batch_size):
            self.assertTrue(init != endit)
            self.assertTrue(init < endit)
        self.assertEqual(endit, data_size)


class AccuracyFunctionTest(unittest.TestCase):

    def test_order_independency(self):
        n = 10
        n_tests = 20

        for i in range(n_tests):
            y0 = np.random.randint(0, 2, n)
            y1 = np.random.randint(0, 2, n)
            reindices = np.random.permutation(n)
            self.assertEqual((accuracy(y0, y1), accuracy(y0[reindices], y1[reindices]))

    def test_symetry(self):
        n = 10
        n_tests = 20

        for i in range(n_tests):
            y0 = np.random.randint(0, 2, n)
            y1 = np.random.randint(0, 2, n)
            self.assertEqual((accuracy(y0, y1), accuracy(y1, y0))


class HingeLossTest(unittest.TestCase):

    def _generator_labels(self, n):
        return np.random.randint(0, 2, n)*2-1

    def test_loss(self):
        n = 20
        y0 = np.random.random(n)*2-1
        y1 = self._generator_labels(n)

        thresholds = [0, 1, 2]
        for thr in thresholds:
            lossf = Hinge(thr)
            lossf.loss(y0, y1)

    def test_gradient(self):
        n, n_feats = 20, 10
        y0 = np.random.random(n)*2-1
        y1 = self._generator_labels(n)
        x = np.random.random((n, n_feats))

        thresholds = [0, 1, 2]
        for thr in thresholds:
            lossf = Hinge(thr)
            grad_w, grad_w0 = lossf.gradient_loss(y0, y1, x)
            self.assertEqual((len(grad_w), n_feats)

class SVMTest(unittest.TestCase):

    def _instantiator(self):
        loss=['Hinge', Hinge()]
        batch_size=[10]
        n_epochs=[0, 100]
        learning_rate=[0.0001, 1.]
        stop_step=[.000001, 100]
        lambda_par=[0.01, 1., 10.]
        verbose=[True, False]
        history=[True, False]
        possibilities = [loss, batch_size, n_epochs, learning_rate, stop_step,
                         lambda_par, verbose, history]
        for p in product(*possibilities):
            model = SVM(*p)
            yield p, model

    def test_initialization(self):
        n, n_feats = 100, 20
        data = np.random.random((n, n_feats))
        labels = np.random.randint(0, 2, n)*2-1

        for p, model in self._instantiator():
            ## General asserts
            self.assertEqual(model.optimizer, 'SGD')
            self.assertEqual(model.batch_size, p[1])
            self.assertEqual(model.n_epochs, p[2])
            self.assertEqual(model.stop_step, p[4])
            self.assertEqual(model.lambda_par, p[5])
            self.assertIsNone(model.w)
            self.assertIsNone(model.w0)

            ## Special cases
            if not p[7]:
                self.assertIsNone(model.train_loss_history)
                self.assertIsNone(model.test_loss_history)
                self.assertIsNone(model.train_accuracy_history)
                self.assertIsNone(model.test_accuracy_history)

            ## Weights initialization
            model._initialization_weights(n_feats, init_type='gauss')
            model._reset_model()
            model._initialization_weights(n_feats, init_type='zeros')
            model._reset_model()
            model._reset_history()

            ## Batch creation testing
            for x_batch, y_batch in model._batch_generator(data, labels):
                self.assertTrue(len(x_batch) >= p[1])

            ## Computer functions
            if p[7]:
                model._initialization_weights(n_feats, init_type='gauss')
                model.compute_epoch_measures(data, labels, None, None)
                model.compute_epoch_measures(data, labels, data, labels)

    def test_fitmodel(self):
        n, n_feats = 100, 5
        data = np.random.random((n, n_feats))
        labels = np.random.randint(0, 2, n)*2-1

        for p, model in self._instantiator():
            model.n_epochs = 100
            model.fit(data, labels)
            model.fit(data, labels, data, labels)
            model.predict(data)
            model.score(data, labels)
            if p[7]:
                self.assertEqual(model.epoch_learned, len(model.train_loss_history))
                self.assertEqual(model.epoch_learned, len(model.train_accuracy_history))
                self.assertEqual(model.epoch_learned, len(model.test_loss_history))
                self.assertEqual(model.epoch_learned, len(model.test_accuracy_history))


if __name__ == '__main__':
    unittest.main()
In [3]:
## Apply test to the code
%run tests.py
..........
----------------------------------------------------------------------
Ran 10 tests in 5.841s

OK

Load data

In [4]:
data = np.loadtxt('data/features.txt', delimiter=',')
target = np.loadtxt('data/target.txt', delimiter=',')

Function definitions

In [5]:
def weights_histogram(model):
    title = r'Histogram of the distribution of weight values '
    title += r'($\eta=0.0001$, $\lambda=1.$, $b_s={}$)'.format(model.batch_size)
    fig = plt.figure(figsize=(10, 5))
    _ = plt.hist(model.w, bins=20)
    _ = plt.title(title)
    _ = plt.xlabel('weights')
    _ = plt.ylabel('Counts')
In [6]:
def losses_accross_training(model):
    title = r'Loss through epochs of SGD '
    title += r'($\eta=0.0001$, $\lambda=1.$, $b_s={}$)'.format(model.batch_size)
    fig = plt.figure(figsize=(14, 6))
    plt.xlim([0, len(model.train_loss_history)])
    _ = plt.plot(model.train_loss_history, 'b--', label='train_loss')
    _ = plt.plot(model.test_loss_history, 'r-', label='test_loss')
    _ = plt.title(title)
    _ = plt.xlabel("epochs")
    _ = plt.ylabel("Loss")
    legend = plt.legend(loc='upper right')
In [7]:
def acc_accross_training(model):
    title = r'Accuracy through epochs of SGD '
    title += r'($\eta=0.0001$, $\lambda=1.$, $b_s={}$)'.format(model.batch_size)
    fig = plt.figure(figsize=(14, 6))
    plt.xlim([0, len(model.train_accuracy_history)])
    _ = plt.plot(model.train_accuracy_history, 'b--', label='train_loss')
    _ = plt.plot(model.test_accuracy_history, 'r-', label='test_loss')
    _ = plt.title(title)
    _ = plt.xlabel("epochs")
    _ = plt.ylabel("Accuracy")
    legend = plt.legend(loc='lower right')

Study with batch_size=1

In [8]:
model = SVM(n_epochs=1000, batch_size=1, learning_rate=0.0001)
model = model.fit(data[:-1000], target[:-1000], data[-1000:], target[-1000:])
In [9]:
weights_histogram(model)
In [10]:
losses_accross_training(model)
In [11]:
acc_accross_training(model)
In [12]:
model.report_results()
=========================
learning_rate = 0.0001
regularization = 1.00
batch_size = 1
=========================
time expended: 0.00s
num. epochs: 15
best epoch acc: 3
accuracy train: 0.7571
accuracy test: 0.7500
=========================

Study with batch_size=10

In [13]:
model = SVM(n_epochs=1000, batch_size=10, learning_rate=0.0001)
model = model.fit(data[:-1000], target[:-1000], data[-1000:], target[-1000:])
In [14]:
weights_histogram(model)
In [15]:
losses_accross_training(model)
In [16]:
acc_accross_training(model)
In [17]:
model.report_results()
=========================
learning_rate = 0.0001
regularization = 1.00
batch_size = 10
=========================
time expended: 0.00s
num. epochs: 70
best epoch acc: 28
accuracy train: 0.8146
accuracy test: 0.8100
=========================

Study with batch_size=100

In [18]:
model = SVM(n_epochs=1000, batch_size=100, learning_rate=0.0001)
model = model.fit(data[:-1000], target[:-1000], data[-1000:], target[-1000:])
In [19]:
weights_histogram(model)
In [20]:
losses_accross_training(model)
In [21]:
acc_accross_training(model)
In [22]:
model.report_results()
=========================
learning_rate = 0.0001
regularization = 1.00
batch_size = 100
=========================
time expended: 0.00s
num. epochs: 476
best epoch acc: 156
accuracy train: 0.8400
accuracy test: 0.8440
=========================

Comments

The results are self-conclusives. Quicker convergence for smaller batch size but lower accuracy reached. On the other hand, for higher batch_size, you need more epochs to converge but reach higher accuracy.

Comments about the code

The code has lack of completitude. The design is open to new improvements that are not in the code because of the lack of time and the purpose of that exercise.

There are several improvements over that code design. From control values of the step in order to keep numerical stability (it could be certain regions with huge slope, due to the nonderivative property of Hinge in certain points of the parameter space. Also we can make dynamic learning step in order to skip quickly uninteresting places of the loss landscape and put more effort in exploring the interesting ones.

It could be easily implmented other

If we would desire to use different optimizers probably it had been better to keep separated the optimizer and the model.