概述
Alink提供了一系列与推荐相关的组件,从组件使用得角度来看,需要重点关注如下三个方面:
- 算法选择
推荐领域有很多算法,常用的有基于物品/用户的协同过滤、ALS、FM算法等。对于不同的数据场景,算法也会在计算方式上有很大的变化。
- 推荐方式
输入信息可以有多种选择,输入结果也有多种情况。
- 同时输入一个用户信息和一个物品信息,计算用户对此物品的评分。
- 输入用户的信息,可以推荐适合此用户的相关物品,也可以计算与其相似的用户。
- 输入物品的信息,推荐给可能喜欢该物品的用户,也可以计算与其相似的物品。
- 使用方法
在应用推荐引擎时,可能是在离线任务中进行批量推荐,也可能是在实时任务中对流式数据进行推荐,还可以通过使用Alink Java SDK将推荐引擎嵌入用户的应用系统。
推荐算法
基于物品的协同过滤
基于物品的协同过滤算法(Item-based Collaborative Filtering),Alink提供了相关的组件。模型训练为离线批式训练,对应组件为ItemCfTrainBatchOp,得到ItemCf模型。基于此模型可以进行多种推荐,但不是每种推荐方式使用该ItemCf算法都可以得到较好的效果。Alink只提供了适合该算法的推荐方式:评分预测(ItemCfRate)、根据用户推荐物品(ItemCfItemsPerUser)、计算相似物品(ItemCfSimilarItems)。考虑到每种推荐需要支持多种使用方式,每种方法都提供了3种组件——批式推荐(RecommBatchOp)、流式推荐(RecommStreamOp)和Pipeline节点。如下所示:

交替最小二乘法
基本思路为交替固定用户特征向量和物品特征向量的值,每次求解一个最小二乘问题,直到满足求解条件。根据用户-物品矩阵中的值额含义是评分值还是行为次数、观看/收听时长,分别选用显示反馈算法与隐式反馈算法。两种计算方式得到ALS模型格式是一样的,后面可以选用五种推荐方式,并且每种方法都提供了3种组件——批式推荐、流式推荐、Pipeline节点。如下所示:

关于最小二乘法可以参考 交替最小二乘法
Alink组件支持
Alink在推荐组件方面提供的组件是比较多的。详细说明如下
现支持的算法如下:
- 基于物品的协同过滤(ItemCf)
- 基于用户的协同过滤(UseCf)
- ALS显式反馈算法(Als)
- ALS隐式反馈算法(AlsImplicit)
- FM算法
推荐方法如下:
- 评分预测(Rate)
- 根据物品推荐用户(UserPerItem)
- 根据用户推荐物品(ItemsPerUser)
- 计算相似物品(SimilarItems)
- 计算相似用户(SimilarUsers)
使用方法如下:
- 批式推荐
- 流式推荐
- Pipeline节点
Alink实现推荐系统
实现概览
基于物品的协同过滤推荐实现概览
static TsvSourceBatchOp getSourceRatings() { return new TsvSourceBatchOp() .setFilePath(DATA_DIR + RATING_FILE) .setSchemaStr(RATING_SCHEMA_STRING); } /** * 基于ItemCf算法做推荐 * 1.基于ItemCfTrainBatchOp算子做协同过滤模型的训练,并将训练好的模型保存 * 2.基于ItemCfItemsPerUserRecommender算子的推荐过程;包括推荐、查找物品名称、选择列并排序 * */ static void c_5() throws Exception { if (!new File(DATA_DIR + ITEMCF_MODEL_FILE).exists()) { getSourceRatings() .link( new ItemCfTrainBatchOp() .setUserCol(USER_COL) .setItemCol(ITEM_COL) .setRateCol(RATING_COL) ) .link( new AkSinkBatchOp() .setFilePath(DATA_DIR + ITEMCF_MODEL_FILE) ); BatchOperator.execute(); } MemSourceBatchOp test_data = new MemSourceBatchOp(new Long[]{1L}, "user_id"); new ItemCfItemsPerUserRecommender() .setUserCol(USER_COL) .setRecommCol(RECOMM_COL) .setModelData( new AkSourceBatchOp() .setFilePath(DATA_DIR + ITEMCF_MODEL_FILE) ) .transform(test_data) .print(); LocalPredictor recomm_predictor = new ItemCfItemsPerUserRecommender() .setUserCol(USER_COL) .setRecommCol(RECOMM_COL) .setK(20) .setModelData( new AkSourceBatchOp() .setFilePath(DATA_DIR + ITEMCF_MODEL_FILE) ) .collectLocalPredictor("user_id long"); System.out.println(recomm_predictor.getOutputSchema()); LocalPredictor kv_predictor = new Lookup() .setSelectedCols(ITEM_COL) .setOutputCols("item_name") .setModelData(getSourceItems()) .setMapKeyCols("item_id") .setMapValueCols("title") .collectLocalPredictor("item_id long"); System.out.println(kv_predictor.getOutputSchema()); MTable recommResult = (MTable) recomm_predictor.map(Row.of(1L)).getField(1); System.out.println(recommResult); new Lookup() .setSelectedCols(ITEM_COL) .setOutputCols("item_name") .setModelData(getSourceItems()) .setMapKeyCols("item_id") .setMapValueCols("title") .transform( getSourceRatings().filter("user_id=1 AND rating>4") ) .select("item_name") .orderBy("item_name", 1000) .lazyPrint(-1); LocalPredictor recomm_predictor_2 = new ItemCfItemsPerUserRecommender() .setUserCol(USER_COL) .setRecommCol(RECOMM_COL) .setK(20) .setExcludeKnown(true) .setModelData( new AkSourceBatchOp() .setFilePath(DATA_DIR + ITEMCF_MODEL_FILE) ) .collectLocalPredictor("user_id long"); recommResult = (MTable) recomm_predictor_2.map(Row.of(1L)).getField(1); System.out.println(recommResult); }
ALS推荐实现概览
/** * 基于ALS算法做推荐 * 1.基于AlsTrainBatchOp算子做协同过滤模型的训练,并将训练好的模型保存 * 2.基于AlsRateRecommender算子的推荐过程;包括推荐、查找物品名称、选择列并排序 * */ static void c_4() throws Exception { TsvSourceBatchOp train_set = new TsvSourceBatchOp() .setFilePath(DATA_DIR + RATING_TRAIN_FILE) .setSchemaStr(RATING_SCHEMA_STRING); TsvSourceBatchOp test_set = new TsvSourceBatchOp() .setFilePath(DATA_DIR + RATING_TEST_FILE) .setSchemaStr(RATING_SCHEMA_STRING); train_set.lazyPrint(10); if (!new File(DATA_DIR + ALS_MODEL_FILE).exists()) { train_set .link( new AlsTrainBatchOp() .setUserCol(USER_COL) .setItemCol(ITEM_COL) .setRateCol(RATING_COL) .setLambda(0.1) .setRank(10) .setNumIter(10) ) .link( new AkSinkBatchOp() .setFilePath(DATA_DIR + ALS_MODEL_FILE) ); BatchOperator.execute(); } new PipelineModel ( new AlsRateRecommender() .setUserCol(USER_COL) .setItemCol(ITEM_COL) .setRecommCol(RECOMM_COL) .setModelData( new AkSourceBatchOp() .setFilePath(DATA_DIR + ALS_MODEL_FILE) ), new Lookup() .setSelectedCols(ITEM_COL) .setOutputCols("item_name") .setModelData(getSourceItems()) .setMapKeyCols("item_id") .setMapValueCols("title") ) .transform( test_set.filter("user_id=1") ) .select("user_id, rating, recomm, item_name") .orderBy("rating, recomm", 1000) .lazyPrint(-1); BatchOperator.execute(); new AlsRateRecommender() .setUserCol(USER_COL) .setItemCol(ITEM_COL) .setRecommCol(RECOMM_COL) .setModelData( new AkSourceBatchOp() .setFilePath(DATA_DIR + ALS_MODEL_FILE) ) .transform(test_set) .link( new EvalRegressionBatchOp() .setLabelCol(RATING_COL) .setPredictionCol(RECOMM_COL) .lazyPrintMetrics() ); BatchOperator.execute(); }