导入或创建训练集,设定K值 随机选取K个点作为初始质心(在数据集的范围内) repeat for i=1,2,...,m(m为样本个数)do 计算K个质心到所有样本的欧式距离 把样本中的点划分给距离最近的质心 end for for i=1,2,..,k do 求每一个簇的数据的平均值 将求出的平均值赋值给各质心 end for until 当前质心基本不变或者达到最大迭代次数
import matplotlib.pyplot as plt from random import uniform from math import sqrt import numpy as np
classOnlineKMeans: """ Online K Means Algorithm """
def__init__(self, num_features: int, num_clusters: int, lr: tuple = None): """ :param num_features: The dimension of the data :param num_clusters: The number of clusters to form as well as the number of centroids to generate. :param lr: The learning rate of the online k-means (c', t0). If None, then we will use the simplest update rule (c'=1, t0=0) as described in the lecture. """ if num_features < 1: raise ValueError(f"num_features must be greater or equal to 1!\nGet {num_features}") if num_clusters < 1: raise ValueError(f"num_clusters must be greater or equal to 1!\nGet {num_clusters}")
self.num_centroids = 0 self.centroid = np.zeros((num_clusters, num_features)) self.cluster_counter = np.zeros(num_clusters) # Count how many points have been assigned into this cluster
self.num_samples = 0 self.lr = lr
deffit(self, X): """ Receive a sample (or mini batch of samples) online, and update the centroids of the clusters :param X: (num_features,) or (num_samples, num_features) :return: """
for i inrange(num_samples): self.num_samples += 1 # Did not find enough samples, directly set it to mean if self.num_centroids < self.num_clusters: self.centroid[self.num_centroids] = X[i] self.cluster_counter[self.num_centroids] += 1 self.num_centroids += 1 else: # Determine the closest centroid for this sample sample = X[i] dist = np.linalg.norm(self.centroid - sample, axis=1) centroid_idx = np.argmin(dist)
defpredict(self, X): """ Predict the cluster labels for each sample in X :param X: (num_features,) or (num_samples, num_features) :return: Returned index starts from zero """ iflen(X.shape) == 1: X = X[np.newaxis, :] num_samples, num_features = X.shape
clusters = np.zeros(num_samples) for i inrange(num_samples): sample = X[i] dist = np.linalg.norm(self.centroid - sample, axis=1) clusters[i] = np.argmin(dist) return clusters
deffit_predict(self, X): """ Compute cluster centers and predict cluster index for each sample. :param X: (num_features,) or (num_samples, num_features) :return: """ # Because the centroid may change in the online setting, we cannot determine the cluster of each label until # we finish fitting. self.fit(X) return self.predict(X)
defcalculate_cost(self, X): """ Calculate the KMean cost on the dataset X The cost is defined in the L2 distance. :param X: (num_features,) or (num_samples, num_features) the dataset :return: The cost of this KMean """