Recommendation System

推荐系统架构

recommendation_system_procedure

Matching

召回算法Match:包含多个渠道的召回模型,希望从资源库中选取 多样性偏好内容,缩小排序目标

  • 协同过滤
  • 主题模型
  • 内容召回
  • 热点召回

Ranking

排序:对多个召回渠道内容打分、排序、选出最优的少量结果

  • 若召回结果仍包含大量数据,可以考虑分为两个阶段
    • 粗排:进一步剔除召回结果
    • 精排:对粗排结果再次打分、排序,得到最终推荐结果

Collaborative Filtering-Based Recommendation

基于协同过滤推荐算法:推荐算法中主流

  • 模型一般为n个物品、m个用户的表

    • 只有部分用户、物品之间有评分数据
    • 要用已有部分稀疏数据预测空白物品、数据之间评分关系, 推荐高评分物品
  • 无需太多特定领域的知识,可通过基于统计的机器学习算法得到 较好推荐效果,可以分为

    • 基于用户
    • 基于物品
    • 基于模型
  • 现在指推荐算法一般指协同过滤,其他基于内容、规则、人口 统计信息等都被包含/忽略

User-based

基于用户协同过滤:主要考虑用户之间相似度,找出相似用户、相似 用户喜欢的物品,预测目标用户对对应物品的评分,推荐高评分物品

  • 特点:(相较于Item-Based)推荐更社会化

    • 反映用户所在小型兴趣群体中物品热门程度
    • 可帮助用户找到新类别、惊喜物品
  • 适合场景

    • 用户数量较少、变化慢场合,否则更新、计算用户相似度矩阵 代价大
    • 时效性强、用户个性化兴趣不明显领域
    • 无需给出推荐解释
    • 示例
      • 新闻推荐:注重热门、时效、item更新快
      • 热点视频推荐
  • 方法

    • 基于规则:大众型推荐方法,如:最多用户点击、浏览
    • 基于人口统计信息:简单根据用户基本信息发现用户相关 程度、推荐
    • 混合推荐
      • 结合多个推荐算法,集成算法推荐结果
      • 复杂度高

Item-Based Collaborative Filtering

基于项目协同过滤:考虑物品和物品之间的相似度,找到目标用户 对某些物品的评分,预测用户对相似度高的类似物品评分,推荐高 评分相似物品

  • 特点:(相较于User-Based)推荐更个性化

    • 反映用户自身的兴趣传承
    • 可帮助用户深入挖掘自身兴趣
    • 准确度一般
    • 推荐多样性弱,难以带来惊喜
  • 适合场景

    • 物品数量较少、变化慢场合,否则更新、计算物品相似度 矩阵代价大
    • 长尾物品丰富、个性化需求不明显
    • 需要向用户给出推荐理由
    • 示例
      • 电商
      • 电影:兴趣持久、更个性化

Model-Based Collaborative Filtering

基于模型:目前最主流的协同过滤类型

  • 关联算法:找出用户-物品数据里频繁出现的项集,作频繁集 挖掘,推荐频繁集、序列中其他物品

    • Apriori
    • FPTree
    • PrefixSpan
  • 聚类算法:按照用户、物品基于一定距离度量聚类,推荐高评分 同类物品、同类人群 (类似于基于用户、物品协同过滤)

    • K-means
    • BIRCH
    • DBSCAN
    • Spectral Clustering
  • 分类算法:使用分类模型划分物品

    • 逻辑回归
    • 朴素贝叶斯
  • 回归算法:使用回归模型给物品预测打分,较分类更平滑

    • 线性回归
    • 决策树
    • SVM
  • 矩阵分解:对用户-物品评分矩阵进行分解

    • FunkSVD
    • BiasSVD
    • SVD++
  • 还有基于图模型、神经网络等新模型
  • 还有依赖于自然语言处理NLP,通过挖掘文本内容特征,得到 用户的偏好,进而做推荐,同样可以找到用户独特的小众喜好

Spark MLLib

MLLib

Spark MLLib:Spark平台的机器学习库

  • 能直接操作RDD数据集,可以和其他BDAS其他组件无缝集成, 使得在全量数据上进行学习成为可能

  • 实现包括以下算法

    • Classification
    • Regression
    • Clustering
    • Collaborative Filtering
    • Dimensionality Reduction
  • MLLib是MLBase中的一部分

    • MLLib
    • MLI
    • MLOptimizer
    • MLRuntime
  • 从Spark1.2起被分为两个模块

    • spark.mllib:包含基于RDD的原始算法API
    • spark.ml:包含基于DataFrame的高层次API
      • 可以用于构建机器学习PipLine
      • ML PipLine API可以方便的进行数据处理、特征转换、 正则化、联合多个机器算法,构建单一完整的机器学习 流水线
  • MLLib算法代码可以在examples目录下找到,数据则在data 目录下
  • 机器学习算法往往需要多次迭代到收敛为止,Spark内存计算、 DAG执行引擎象相较MapReduce更理想
  • 由于Spark核心模块的高性能、通用性,Mahout已经放弃 MapReduce计算模型,选择Spark作为执行引擎

mllib.classification

Classification

Logistic Regression

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
from pyspark.mllib.classification import \
LogisticRegressionWithLBFGS, LogisticRegressionModel
from pyspark.mllib.regression import LabledPoint

def parse_point(line):
value = [float(i) for i line.split(", \r\n\t")

data = sc.textFile("data/mllib/sample_svm_data.txt")
parsed_data = data.map(parse_point)
# map `parse_point` to all data

model = LogisticRegressionWithLBFGS.train(parsed_data)
labels_and_preds = parsed_data.map(lambda p: (p.label, model.predict(p.features)))
train_err = labels_and_preds \
.filter(lambda lp: lp[0] != lp[1]) \
.count() / float(parsed_data.count())

model.save(sc, "model_path")
same_model = LogisticRegressionModel.load(sc, "model.path")
  • Decision Tree
  • Random Forest
  • Gradient
  • boosted tree
  • Multilaye Perceptron
  • Support Vector Machine
  • One-vs-Rest Classifier
  • Naive Bayes

Clustering

K-means

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
import numpy as np
from pyspark.mllib.clustering import KMeans, KMeansModel

data = sc.textFile("data/mllib/kmeans_data.txt")
parsed_data = data.map(lambda line: np.array([float(i) for i in line.split()]))

cluster_model = KMeans.train(
parsed_data,
maxIteration=10,
initializationMode="random"
)
def error(point):
center = cluster_model.centers[cluster.predict(point)]
return np.sqrt(sum([i**2 for i in (point - center)]))
WSSSE = parsed_data \
.map(lambda point.error(point)) \
.reduce(lambd x, y: x + y)

cluster_model.save(sc, "model_path")
same_model = KMeansModel.load(sc, "model_path")

Gaussian Mixture Model(GMM)

  • 混合密度模型
    • 有限混合模型:正态分布混合模型可以模拟所有分布
    • 迪利克莱混合模型:类似于泊松过程
  • 应用
    • 聚类:检验聚类结果是否合适
    • 预测:

      todo

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
import numpy as np
from pyspark.mllib.clustering import GussianMixture, \
GussianMixtureModel

data = sc.textFile("data/mllib/gmm_data.txt")
parsed_data = data.map(lambda line: np.array[float(i) for i in line.strip()]))

gmm = GaussianMixture.train(parsed_data, 2)
for w, g in zip(gmm.weights, gmm.gaussians):
print("weight = ", w,
"mu = ", g.mu,
"sigma = ", g.sigma.toArray())

gmm.save(sc, "model_path")
same_model = GussainMixtureModel.load(sc, "model_path")

Latent Dirichlet Allocation(LDA)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
from pyspark.mllib.clustering import LDA, LDAModel
from pyspark.mllib.linalg import Vectors

data = sc.textFile("data/mllib/sample_lda_data.txt")
parsed_data = data.map(lambda line: Vector.dense([float(i) for i in line.strip()]))

corpus = parsed_data.zipWithIndex() \
.map(lambda x: [x[1], x[0]).cache()
ldaModel = LDA.train(corpus, k=3)

topics = ldaModel.topicsMatrix()

for word in range(0, ldaModel.vocabSize()):
for topic in word:
print(topic)

ldaModel.save(sc, "model_path")
same_model = LDAModel.load("model_path")
  • Disecting K-means

Regression

Linear Regression

  • 耗时长、无法计算解析解(无意义)
  • 使用MSE作为极小化目标函数,使用SGD算法求解
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
from pyspark.mllib.regression import LabledPoint, \
LinearRegressionWithSGD, LinearRegressionModel

def parse_point(line):
value = [float(i) for i line.split(", \r\n\t")

data = sc.textFile("data/mllib/ridge-data/lpsa.data")
parsed_data = data.map(parse_point)
# map `parse_point` to all data

model = LinearRegressionWithSGD.train(
parsed_data,
iteration=100,
step=0.00000001
)
values_and_preds = parsed_data.map(lambda p:(p.label, model.predict(p.features)))
MSE = values_and_preds \
.map(lambda vp: (vp[0] - vp[1]) ** 2) \
.reduce(lambda x, y: x + y) / values_and_preds.count()

model.save(sc, "model_path")
# save model
same_model = LinearRegressionModel.load(sc, "model_path")
# load saved model
  • Generalized Linear Regression
  • Decision Tree Regression
  • Random Forest Regression
  • Gradient-boosted Tree Regression
  • Survival Regression
  • Isotonic Regression

Collaborative Filtering