Change point detection using the OPU online mode

Some machine learning algorithms are developed to be used online: processing one sample at a time. In this case, the optical transform can be optimized using the online mode introduced in lightonopu 1.2.

We are going to show how this work using NEWMA, an online change point detection method. For more information on the algorithm: https://arxiv.org/abs/1805.08061.

The data

We prepare a time series where samples are drawn from a mixture of Guaussians that changes every n timesteps. You can skip the details of the data generation if you are not interested and go directly to the next section.

[1]:
"""
@author: nkeriven, taken from https://github.com/lightonai/newma
"""
import numpy as np
from sklearn import mixture


def gmdraw(weights, mu, Sigma, n):
    k, d, p = weights.shape[0], mu.shape[1], np.cumsum(weights)
    label = np.random.rand(n)
    for i in range(n):
        label[i] = np.sum(label[i] > p)
    cSigma = np.zeros((k, d, d))
    for l in range(k):
        cSigma[l, :, :] = np.linalg.cholesky(Sigma[l, :, :])
    X = np.zeros((n, d))
    for i in range(n):
        j = int(label[i])
        X[i, :] = mu[j, :] + np.dot(np.random.randn(1, d), cSigma[j, :, :])
    return X, label


def generateGMM(d=10, k=10, n=1000, std_mean=1, concentration_wishart=30, concentration_dirichlet=5):
    concentration_wishart = np.max((concentration_wishart, 3))
    weights = np.random.dirichlet(concentration_dirichlet*np.ones(k))
    mu = std_mean*k**(1/d)*np.random.randn(k, d)
    Sigma = np.zeros((k, d))
    for l in range(k):
        Sigma[l, :] = (concentration_wishart - 2)/np.sum(np.random.randn(int(concentration_wishart), d)**2,
                                                         axis=0)
    clf = mixture.GaussianMixture(n_components=k, covariance_type='diag')
    clf.means_ = mu
    clf. covariances_ = Sigma
    clf.precisions_cholesky_ = mixture._gaussian_mixture._compute_precision_cholesky(
        Sigma, clf.covariance_type)
    clf.weights_ = weights
    X, label = clf.sample(n_samples=n)
    p = np.random.permutation(n)
    X, label = X[p, :], label[p]
    generated_data = {'data': X, 'weights': weights,
                      'means': mu, 'cov': Sigma,
                      'label': label, 'gmm': clf}
    return generated_data


def stream_GMM(d=10, k=10, n=1000, nb_change=50, std_mean=0.2,
               concentration_wishart=30, concentration_dirichlet=5):
    X = np.zeros((n*(nb_change), d))
    ground_truth = np.zeros(n*(nb_change))
    for i in range(nb_change):
        GM = generateGMM(d=d, k=k, n=n, std_mean=std_mean, concentration_wishart=concentration_wishart,
                         concentration_dirichlet=concentration_dirichlet)
        X[i*n:(i+1)*n, :] = GM['data']
        if i != 0:
            ground_truth[i*n] = 1
    return X, ground_truth
[17]:
d = 5000
n, nb_change = 250, 100

k = 10
std_mean = 0.15  # the bigger, the more change in means
wishart = 3  # the bigger, the less change in diagonal variances

X, ground_truth = stream_GMM(d=d, n=n, nb_change=nb_change, std_mean=std_mean, concentration_wishart=wishart,
                             k=k)
[18]:
# we binarize the data using 38 levels
n_levels = 38
Xencode = np.empty((X.shape[0], n_levels * X.shape[1]), dtype='uint8')
mi, Ma = np.min(X), np.max(X)  # rescale to 0 255
X = 255 * ((X - mi) / (Ma - mi))
X = X.astype('uint8')

for i in range(n_levels):
    Xencode[:, i * X.shape[1]:(i + 1) * X.shape[1]] = X > 65 + i * 5
del X

Preparing the detector class

[4]:
from scipy.stats import norm

class NEWMA():
    def __init__(self, init_sample, forget_factor=0.05, forget_factor2=0.1,
                 feat_func=lambda x: x, dist_func=lambda z1, z2: linalg.norm(z1 - z2),
                 thresholding_method='adapt', thresholding_quantile=0.95,
                 fixed_threshold=None, adapt_forget_factor=0.05,
                 store_values=True):
        self.statistic = 0  # Current value of the detection statistic
        self.thresholding_method = thresholding_method
        self.thresholding_mult = norm.ppf(thresholding_quantile)
        self.fixed_threshold = fixed_threshold
        self.adapt_forget_factor = adapt_forget_factor
        # for adaptive threshold method
        self.adapt_forget_factor = adapt_forget_factor
        # Current estimated mean and moment of order 2 of the statistic squared
        # NOTE: the adaptive threshold is based on the assumption that the squared statistic is approximately gaussian.
        self.adapt_mean = 0
        self.adapt_second_moment = 0
        # history of statistic
        self.store_values = store_values
        self.stat_stored = []
        self.ewma = feat_func(init_sample)  # current sketch
        self.ewma2 = feat_func(init_sample)  # current skech2
        self.forget_factor = forget_factor  # update coeff for sketch
        self.forget_factor2 = forget_factor2  # update coeff for sketch2
        self.feat_func = feat_func  # mapping Psi (identity, random features...)
        self.dist_func = dist_func  # function to compute the distance (may return an array for block distances)

    def apply_to_data(self, data):
        count = 0
        for d in data:
            self.update(d)
            count += 1
            if count % 5000 == 0:
                print(f"{count}/{len(data)}")

    def flag_sample(self):
        if self.thresholding_method == 'adapt':
            return self.statistic > np.sqrt(
                self.adapt_mean + self.thresholding_mult * np.sqrt(self.adapt_second_moment - self.adapt_mean ** 2))
        elif self.thresholding_method == 'fixed':
            return self.statistic > self.fixed_threshold
        else:
            return TypeError('Thresholding method not recognised.')

    def update(self, new_sample):
        self.statistic = self.update_stat(
            new_sample)  # compute the new detection statistic b the user-implemented function

        # compute adaptive detection result
        self.adapt_mean = (
            1 - self.adapt_forget_factor) * self.adapt_mean + self.adapt_forget_factor * self.statistic ** 2
        self.adapt_second_moment = (
            1 - self.adapt_forget_factor) * self.adapt_second_moment + self.adapt_forget_factor * self.statistic ** 4

        res = self.flag_sample()
        # if history is stored
        if self.store_values:
            thres = np.sqrt(
                self.adapt_mean + self.thresholding_mult * np.sqrt(self.adapt_second_moment - self.adapt_mean ** 2))
            self.stat_stored.append((self.statistic, thres, res))

        return res  # return the result

    def update_stat(self, new_sample):
        temp = self.feat_func(new_sample)
        # sketches
        self.ewma = (1 - self.forget_factor) * self.ewma + self.forget_factor * temp
        self.ewma2 = (1 - self.forget_factor2) * self.ewma2 + self.forget_factor2 * temp
        # distance
        return self.dist_func(self.ewma, self.ewma2)

For hyperparameter selection:

[5]:
from scipy import linalg
from scipy import optimize as opt

def convert_parameters(window_size, forget_factor):
    """From the window_size and one forgetting factor, compute the other forgetting factor..
    """
    w_ = window_size
    C = forget_factor * (1 - forget_factor) ** w_

    # educated guess for initialization
    if forget_factor > 1 / (w_ + 1):
        init = 1 / (2 * (w_ + 1))
    else:
        init = 2 / (w_ + 1)

    def func(x):
        return (x * (1 - x) ** w_ - C) ** 2

    def grad(x):
        return ((1 - x) ** w_ - w_ * x * (1 - x) ** (w_ - 1)) * 2 * (x * (1 - x) ** w_ - C)

    return opt.minimize(func, jac=grad, x0=init, bounds=((0, 1),), tol=1e-20).x[0]


def select_optimal_parameters(window_size, grid_size=1000):
    """From the window_size, give the best newma parameters, w.r.t. the error bound in the paper.
    """
    def error_bound(L, l):
        numerator = (np.sqrt(L + l) + ((1 - l) ** (2 * window_size) - (1 - L) ** (2 * window_size)))
        denominator = ((1 - l) ** window_size - (1 - L) ** window_size)
        return numerator / denominator

    ax = np.exp(np.linspace(np.log(1.001 / (window_size + 1)), -0.01, grid_size))
    errors = np.zeros(grid_size)
    for ind, L in zip(range(grid_size), ax):
        l = convert_parameters(window_size, L)
        errors[ind] = error_bound(L, l)
    Lambda = (ax[np.argmin(errors)] + 1 / (window_size + 1)) / 2
    return Lambda, convert_parameters(window_size, Lambda)

Configuration of NEWMA

[13]:
# Newma config
B = 50  # window size
big_Lambda, small_lambda = select_optimal_parameters(B)  # forget factors chosen with heuristic in the paper
thres_ff = small_lambda
# number of random features is set automatically with this criterion
m_OPU = 10 * int((1 / 4) / (small_lambda + big_Lambda) ** 2)

Preparing the OPU in online mode

To optimize the OPU for usage with one sample at a time, you just need to pass online=True when you call the fit1d method.

[20]:
from lightonopu import OPU
opu = OPU(n_components=m_OPU)
opu.fit1d(n_features=Xencode.shape[1], online=True)

Detecting change points

[8]:
def feature_function(x):
    return opu.transform(x).astype('float32')
[21]:
# convert to float online to avoid memory error
import time
mult = 1.5
detector = NEWMA(Xencode[0], forget_factor=big_Lambda, forget_factor2=small_lambda,
                       feat_func=feature_function, adapt_forget_factor=thres_ff*mult,
                       thresholding_quantile=0.95, dist_func=lambda z1, z2: np.linalg.norm(z1 - z2))
start = time.time()
detector.apply_to_data(Xencode)
print('NEWMA Online took:', time.time() - start)
5000/25000
10000/25000
15000/25000
20000/25000
25000/25000
NEWMA Online took: 138.3667573928833

Performance of the algorithm:

[22]:
def evaluate_detection(ground_truth, flagged):
    n = ground_truth.shape[0]
    if n != flagged.shape[0]:
        print('error', n, flagged.shape[0])
    cp = np.zeros(n, dtype=bool)
    for i in range(n-1):
        if not flagged[i] and flagged[i + 1]:
            cp[i] = 1
    EDD, not_detected, FA = 0, 0, 0
    num_change = int(ground_truth.sum())
    where_change = np.concatenate((np.argwhere(ground_truth).flatten(), np.array([n])))
    for i in range(num_change):
        begin_ind = where_change[i]
        end_ind = where_change[i + 1]
        middle_ind = int((begin_ind + end_ind) / 2)
        i = begin_ind
        while i <= middle_ind and not cp[i]:
            i = i+1
        if cp[i]:
            EDD += i - begin_ind
        else:
            not_detected += 1
        FA += cp[middle_ind:end_ind].sum()
    results = {'EDD': EDD / np.max((num_change - not_detected, 1)),
               'not_detected': 100 * not_detected / num_change,
               'false_alarm': FA / num_change, 'cp': cp}
    return results


def compute_curves(ground_truth, dist,
                   num_points=50,
                   start_coeff=1.3, end_coeff=2,
                   thres_values=np.array([np.nan]),
                   thres_offset=0):
    if np.isnan(thres_values)[0]:
        thres_values = np.mean(dist)
    thres_levels = np.linspace(start_coeff, end_coeff, num_points)
    EDDs = np.zeros(num_points)
    FAs = np.zeros(num_points)
    NDs = np.zeros(num_points)
    for i in range(num_points):
        flagged_points = dist > thres_levels[i] * thres_values + thres_offset
        res = evaluate_detection(ground_truth, flagged_points)
        EDDs[i] = res['EDD']
        FAs[i] = res['false_alarm']
        NDs[i] = res['not_detected']
    return EDDs, FAs, NDs
[23]:
detection_stat = np.array([i[0] for i in detector.stat_stored])[int(10 * n):]  # padding
online_th = np.array([i[1] for i in detector.stat_stored])[int(10 * n):]
ground_truth = ground_truth[int(10 * n):]

# display perf
EDD, FA, ND = compute_curves(ground_truth, detection_stat, num_points=1,
                             thres_values=online_th, start_coeff=1, end_coeff=1)
[27]:
print("Using thresholding quantile\n")
print(f"False alarms: {FA[0]:.2f}")
print(f"Missed detections: {ND[0]:.2f} %")
print(f"Expected detection delay: {EDD[0]:.2f} timesteps")
Using thresholding quantile

False alarms: 0.89
Missed detections: 2.22 %
Expected detection delay: 31.60 timesteps
[ ]: