# 多路召回
所谓的“多路召回”策略,就是指采用不同的策略、特征或简单模型,分别召回一部分候选集,然后把候选集混合在一起供后续排序模型使用,可以明显的看出,“多路召回策略”是在“计算速度”和“召回率”之间进行权衡的结果。其中,各种简单策略保证候选集的快速召回,从不同角度设计的策略保证召回率接近理想的状态,不至于损伤排序效果。如下图是多路召回的一个示意图,在多路召回中,每个策略之间毫不相关,所以一般可以写并发多线程同时进行,这样可以更加高效。
上图只是一个多路召回的例子,也就是说可以使用多种不同的策略来获取用户排序的候选商品集合,而具体使用哪些召回策略其实是与业务强相关的 ,针对不同的任务就会有对于该业务真实场景下需要考虑的召回规则。例如新闻推荐,召回规则可以是“热门视频”、“导演召回”、“演员召回”、“最近上映“、”流行趋势“、”类型召回“等等。
导包 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 import pandas as pd import numpy as npfrom tqdm import tqdm from collections import defaultdict import os, math, warnings, math, picklefrom tqdm import tqdmimport faissimport collectionsimport randomfrom sklearn.preprocessing import MinMaxScalerfrom sklearn.preprocessing import LabelEncoderfrom datetime import datetimefrom deepctr.feature_column import SparseFeat, VarLenSparseFeatfrom sklearn.preprocessing import LabelEncoderfrom tensorflow.python.keras import backend as Kfrom tensorflow.python.keras.models import Modelfrom tensorflow.python.keras.preprocessing.sequence import pad_sequencesfrom deepmatch.models import *from deepmatch.utils import sampledsoftmaxlosswarnings.filterwarnings('ignore' )
1 2 3 4 data_path = './data_raw/' save_path = './temp_results/' metric_recall = False
读取数据 在一般的推荐系统比赛中读取数据部分主要分为三种模式, 不同的模式对应的不同的数据集:
Debug模式: 这个的目的是帮助我们基于数据先搭建一个简易的baseline并跑通, 保证写的baseline代码没有什么问题。 由于推荐比赛的数据往往非常巨大, 如果一上来直接采用全部的数据进行分析,搭建baseline框架, 往往会带来时间和设备上的损耗, **所以这时候我们往往需要从海量数据的训练集中随机抽取一部分样本来进行调试(train_click_log_sample)**, 先跑通一个baseline。
线下验证模式: 这个的目的是帮助我们在线下基于已有的训练集数据, 来选择好合适的模型和一些超参数。 **所以我们这一块只需要加载整个训练集(train_click_log)**, 然后把整个训练集再分成训练集和验证集。 训练集是模型的训练数据, 验证集部分帮助我们调整模型的参数和其他的一些超参数。
线上模式: 我们用debug模式搭建起一个推荐系统比赛的baseline, 用线下验证模式选择好了模型和一些超参数, 这一部分就是真正的对于给定的测试集进行预测, 提交到线上, 所以这一块使用的训练数据集是全量的数据集(train_click_log+test_click_log)
下面就分别对这三种不同的数据读取模式先建立不同的代导入函数, 方便后面针对不同的模式下导入数据。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 def get_all_click_sample (data_path, sample_nums=10000 ): """ 训练集中采样一部分数据调试 data_path: 原数据的存储路径 sample_nums: 采样数目(这里由于机器的内存限制,可以采样用户做) """ all_click = pd.read_csv(data_path + 'train_click_log.csv' ) all_user_ids = all_click.user_id.unique() sample_user_ids = np.random.choice(all_user_ids, size=sample_nums, replace=False ) all_click = all_click[all_click['user_id' ].isin(sample_user_ids)] all_click = all_click.drop_duplicates((['user_id' , 'click_article_id' , 'click_timestamp' ])) return all_click def get_all_click_df (data_path='./data_raw/' , offline=True ): if offline: all_click = pd.read_csv(data_path + 'train_click_log.csv' ) else : trn_click = pd.read_csv(data_path + 'train_click_log.csv' ) tst_click = pd.read_csv(data_path + 'testA_click_log.csv' ) all_click = trn_click.append(tst_click) all_click = all_click.drop_duplicates((['user_id' , 'click_article_id' , 'click_timestamp' ])) return all_click
1 2 3 4 5 6 7 8 def get_item_info_df (data_path ): item_info_df = pd.read_csv(data_path + 'articles.csv' ) item_info_df = item_info_df.rename(columns={'article_id' : 'click_article_id' }) return item_info_df
1 2 3 4 5 6 7 8 9 10 11 12 13 def get_item_emb_dict (data_path ): item_emb_df = pd.read_csv(data_path + 'articles_emb.csv' ) item_emb_cols = [x for x in item_emb_df.columns if 'emb' in x] item_emb_np = np.ascontiguousarray(item_emb_df[item_emb_cols]) item_emb_np = item_emb_np / np.linalg.norm(item_emb_np, axis=1 , keepdims=True ) item_emb_dict = dict (zip (item_emb_df['article_id' ], item_emb_np)) pickle.dump(item_emb_dict, open (save_path + 'item_content_emb.pkl' , 'wb' )) return item_emb_dict
1 max_min_scaler = lambda x : (x-np.min (x))/(np.max (x)-np.min (x))
1 2 3 4 5 6 7 8 all_click_df = get_all_click_df(offline=False ) all_click_df['click_timestamp' ] = all_click_df[['click_timestamp' ]].apply(max_min_scaler)
1 item_info_df = get_item_info_df(data_path)
1 item_emb_dict = get_item_emb_dict(data_path)
工具函数 获取用户-文章-时间函数 这个在基于关联规则的用户协同过滤的时候会用到
1 2 3 4 5 6 7 8 9 10 11 12 13 def get_user_item_time (click_df ): click_df = click_df.sort_values('click_timestamp' ) def make_item_time_pair (df ): return list (zip (df['click_article_id' ], df['click_timestamp' ])) user_item_time_df = click_df.groupby('user_id' )['click_article_id' , 'click_timestamp' ].apply(lambda x: make_item_time_pair(x))\ .reset_index().rename(columns={0 : 'item_time_list' }) user_item_time_dict = dict (zip (user_item_time_df['user_id' ], user_item_time_df['item_time_list' ])) return user_item_time_dict
获取文章-用户-时间函数 这个在基于关联规则的文章协同过滤的时候会用到
1 2 3 4 5 6 7 8 9 10 11 12 def get_item_user_time_dict (click_df ): def make_user_time_pair (df ): return list (zip (df['user_id' ], df['click_timestamp' ])) click_df = click_df.sort_values('click_timestamp' ) item_user_time_df = click_df.groupby('click_article_id' )['user_id' , 'click_timestamp' ].apply(lambda x: make_user_time_pair(x))\ .reset_index().rename(columns={0 : 'user_time_list' }) item_user_time_dict = dict (zip (item_user_time_df['click_article_id' ], item_user_time_df['user_time_list' ])) return item_user_time_dict
获取历史和最后一次点击 这个在评估召回结果, 特征工程和制作标签转成监督学习测试集的时候回用到
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 def get_hist_and_last_click (all_click ): all_click = all_click.sort_values(by=['user_id' , 'click_timestamp' ]) click_last_df = all_click.groupby('user_id' ).tail(1 ) def hist_func (user_df ): if len (user_df) == 1 : return user_df else : return user_df[:-1 ] click_hist_df = all_click.groupby('user_id' ).apply(hist_func).reset_index(drop=True ) return click_hist_df, click_last_df
获取文章属性特征 1 2 3 4 5 6 7 8 9 10 def get_item_info_dict (item_info_df ): max_min_scaler = lambda x : (x-np.min (x))/(np.max (x)-np.min (x)) item_info_df['created_at_ts' ] = item_info_df[['created_at_ts' ]].apply(max_min_scaler) item_type_dict = dict (zip (item_info_df['click_article_id' ], item_info_df['category_id' ])) item_words_dict = dict (zip (item_info_df['click_article_id' ], item_info_df['words_count' ])) item_created_time_dict = dict (zip (item_info_df['click_article_id' ], item_info_df['created_at_ts' ])) return item_type_dict, item_words_dict, item_created_time_dict
获取用户历史点击的文章信息 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 def get_user_hist_item_info_dict (all_click ): user_hist_item_typs = all_click.groupby('user_id' )['category_id' ].agg(set ).reset_index() user_hist_item_typs_dict = dict (zip (user_hist_item_typs['user_id' ], user_hist_item_typs['category_id' ])) user_hist_item_ids_dict = all_click.groupby('user_id' )['click_article_id' ].agg(set ).reset_index() user_hist_item_ids_dict = dict (zip (user_hist_item_ids_dict['user_id' ], user_hist_item_ids_dict['click_article_id' ])) user_hist_item_words = all_click.groupby('user_id' )['words_count' ].agg('mean' ).reset_index() user_hist_item_words_dict = dict (zip (user_hist_item_words['user_id' ], user_hist_item_words['words_count' ])) all_click_ = all_click.sort_values('click_timestamp' ) user_last_item_created_time = all_click_.groupby('user_id' )['created_at_ts' ].apply(lambda x: x.iloc[-1 ]).reset_index() max_min_scaler = lambda x : (x-np.min (x))/(np.max (x)-np.min (x)) user_last_item_created_time['created_at_ts' ] = user_last_item_created_time[['created_at_ts' ]].apply(max_min_scaler) user_last_item_created_time_dict = dict (zip (user_last_item_created_time['user_id' ], \ user_last_item_created_time['created_at_ts' ])) return user_hist_item_typs_dict, user_hist_item_ids_dict, user_hist_item_words_dict, user_last_item_created_time_dict
获取点击次数最多的Top-k个文章 1 2 3 4 def get_item_topk_click (click_df, k ): topk_click = click_df['click_article_id' ].value_counts().index[:k] return topk_click
定义多路召回字典 1 2 item_type_dict, item_words_dict, item_created_time_dict = get_item_info_dict(item_info_df)
1 2 3 4 5 6 user_multi_recall_dict = {'itemcf_sim_itemcf_recall' : {}, 'embedding_sim_item_recall' : {}, 'youtubednn_recall' : {}, 'youtubednn_usercf_recall' : {}, 'cold_start_recall' : {}}
1 2 3 trn_hist_click_df, trn_last_click_df = get_hist_and_last_click(all_click_df)
召回效果评估 做完了召回有时候也需要对当前的召回方法或者参数进行调整以达到更好的召回效果,因为召回的结果决定了最终排序的上限,下面也会提供一个召回评估的方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 def metrics_recall (user_recall_items_dict, trn_last_click_df, topk=5 ): last_click_item_dict = dict (zip (trn_last_click_df['user_id' ], trn_last_click_df['click_article_id' ])) user_num = len (user_recall_items_dict) for k in range (10 , topk+1 , 10 ): hit_num = 0 for user, item_list in user_recall_items_dict.items(): tmp_recall_items = [x[0 ] for x in user_recall_items_dict[user][:k]] if last_click_item_dict[user] in set (tmp_recall_items): hit_num += 1 hit_rate = round (hit_num * 1.0 / user_num, 5 ) print(' topk: ' , k, ' : ' , 'hit_num: ' , hit_num, 'hit_rate: ' , hit_rate, 'user_num : ' , user_num)
计算相似性矩阵 这一部分主要是通过协同过滤以及向量检索得到相似性矩阵,相似性矩阵主要分为user2user和item2item,下面依次获取基于itemCF的item2item的相似性矩阵。
itemCF i2i_sim 借鉴KDD2020的去偏商品推荐,在计算item2item相似性矩阵时,使用关联规则,使得计算的文章的相似性还考虑到了:
用户点击的时间权重
用户点击的顺序权重
文章创建的时间权重
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 def itemcf_sim (df, item_created_time_dict ): """ 文章与文章之间的相似性矩阵计算 :param df: 数据表 :item_created_time_dict: 文章创建时间的字典 return : 文章与文章的相似性矩阵 思路: 基于物品的协同过滤(详细请参考上一期推荐系统基础的组队学习) + 关联规则 """ user_item_time_dict = get_user_item_time(df) i2i_sim = {} item_cnt = defaultdict(int ) for user, item_time_list in tqdm(user_item_time_dict.items()): for loc1, (i, i_click_time) in enumerate (item_time_list): item_cnt[i] += 1 i2i_sim.setdefault(i, {}) for loc2, (j, j_click_time) in enumerate (item_time_list): if (i == j): continue loc_alpha = 1.0 if loc2 > loc1 else 0.7 loc_weight = loc_alpha * (0.9 ** (np.abs (loc2 - loc1) - 1 )) click_time_weight = np.exp(0.7 ** np.abs (i_click_time - j_click_time)) created_time_weight = np.exp(0.8 ** np.abs (item_created_time_dict[i] - item_created_time_dict[j])) i2i_sim[i].setdefault(j, 0 ) i2i_sim[i][j] += loc_weight * click_time_weight * created_time_weight / math.log(len (item_time_list) + 1 ) i2i_sim_ = i2i_sim.copy() for i, related_items in i2i_sim.items(): for j, wij in related_items.items(): i2i_sim_[i][j] = wij / math.sqrt(item_cnt[i] * item_cnt[j]) pickle.dump(i2i_sim_, open (save_path + 'itemcf_i2i_sim.pkl' , 'wb' )) return i2i_sim_
1 i2i_sim = itemcf_sim(all_click_df, item_created_time_dict)
100%|██████████| 250000/250000 [14:20<00:00, 290.38it/s]
userCF u2u_sim 在计算用户之间的相似度的时候,也可以使用一些简单的关联规则,比如用户活跃度权重,这里将用户的点击次数作为用户活跃度的指标
1 2 3 4 5 6 7 8 9 def get_user_activate_degree_dict (all_click_df ): all_click_df_ = all_click_df.groupby('user_id' )['click_article_id' ].count().reset_index() mm = MinMaxScaler() all_click_df_['click_article_id' ] = mm.fit_transform(all_click_df_[['click_article_id' ]]) user_activate_degree_dict = dict (zip (all_click_df_['user_id' ], all_click_df_['click_article_id' ])) return user_activate_degree_dict
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 def usercf_sim (all_click_df, user_activate_degree_dict ): """ 用户相似性矩阵计算 :param all_click_df: 数据表 :param user_activate_degree_dict: 用户活跃度的字典 return 用户相似性矩阵 思路: 基于用户的协同过滤(详细请参考上一期推荐系统基础的组队学习) + 关联规则 """ item_user_time_dict = get_item_user_time_dict(all_click_df) u2u_sim = {} user_cnt = defaultdict(int ) for item, user_time_list in tqdm(item_user_time_dict.items()): for u, click_time in user_time_list: user_cnt[u] += 1 u2u_sim.setdefault(u, {}) for v, click_time in user_time_list: u2u_sim[u].setdefault(v, 0 ) if u == v: continue activate_weight = 100 * 0.5 * (user_activate_degree_dict[u] + user_activate_degree_dict[v]) u2u_sim[u][v] += activate_weight / math.log(len (user_time_list) + 1 ) u2u_sim_ = u2u_sim.copy() for u, related_users in u2u_sim.items(): for v, wij in related_users.items(): u2u_sim_[u][v] = wij / math.sqrt(user_cnt[u] * user_cnt[v]) pickle.dump(u2u_sim_, open (save_path + 'usercf_u2u_sim.pkl' , 'wb' )) return u2u_sim_
1 2 3 4 user_activate_degree_dict = get_user_activate_degree_dict(all_click_df) u2u_sim = usercf_sim(all_click_df, user_activate_degree_dict)
item embedding sim 使用Embedding计算item之间的相似度是为了后续冷启动的时候可以获取未出现在点击数据中的文章,后面有对冷启动专门的介绍,这里简单的说一下faiss。
aiss是Facebook的AI团队开源的一套用于做聚类或者相似性搜索的软件库,底层是用C++实现。Faiss因为超级优越的性能,被广泛应用于推荐相关的业务当中.
faiss工具包一般使用在推荐系统中的向量召回部分。在做向量召回的时候要么是u2u,u2i或者i2i,这里的u和i指的是user和item.我们知道在实际的场景中user和item的数量都是海量的,我们最容易想到的基于向量相似度的召回就是使用两层循环遍历user列表或者item列表计算两个向量的相似度,但是这样做在面对海量数据是不切实际的,faiss就是用来加速计算某个查询向量最相似的topk个索引向量。
faiss查询的原理:
faiss使用了PCA和PQ(Product quantization乘积量化)两种技术进行向量压缩和编码,当然还使用了其他的技术进行优化,但是PCA和PQ是其中最核心部分。
PCA降维算法细节参考下面这个链接进行学习主成分分析(PCA)原理总结
PQ编码的细节下面这个链接进行学习实例理解product quantization算法
faiss使用
faiss官方教程
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 def embdding_sim (click_df, item_emb_df, save_path, topk ): """ 基于内容的文章embedding相似性矩阵计算 :param click_df: 数据表 :param item_emb_df: 文章的embedding :param save_path: 保存路径 :patam topk: 找最相似的topk篇 return 文章相似性矩阵 思路: 对于每一篇文章, 基于embedding的相似性返回topk个与其最相似的文章, 只不过由于文章数量太多,这里用了faiss进行加速 """ item_idx_2_rawid_dict = dict (zip (item_emb_df.index, item_emb_df['article_id' ])) item_emb_cols = [x for x in item_emb_df.columns if 'emb' in x] item_emb_np = np.ascontiguousarray(item_emb_df[item_emb_cols].values, dtype=np.float32) item_emb_np = item_emb_np / np.linalg.norm(item_emb_np, axis=1 , keepdims=True ) item_index = faiss.IndexFlatIP(item_emb_np.shape[1 ]) item_index.add(item_emb_np) sim, idx = item_index.search(item_emb_np, topk) item_sim_dict = collections.defaultdict(dict ) for target_idx, sim_value_list, rele_idx_list in tqdm(zip (range (len (item_emb_np)), sim, idx)): target_raw_id = item_idx_2_rawid_dict[target_idx] for rele_idx, sim_value in zip (rele_idx_list[1 :], sim_value_list[1 :]): rele_raw_id = item_idx_2_rawid_dict[rele_idx] item_sim_dict[target_raw_id][rele_raw_id] = item_sim_dict.get(target_raw_id, {}).get(rele_raw_id, 0 ) + sim_value pickle.dump(item_sim_dict, open (save_path + 'emb_i2i_sim.pkl' , 'wb' )) return item_sim_dict
1 2 item_emb_df = pd.read_csv(data_path + '/articles_emb.csv' ) emb_i2i_sim = embdding_sim(all_click_df, item_emb_df, save_path, topk=10 )
364047it [00:23, 15292.14it/s]
召回 这个就是我们开篇提到的那个问题, 面的36万篇文章, 20多万用户的推荐, 我们又有哪些策略来缩减问题的规模? 我们就可以再召回阶段筛选出用户对于点击文章的候选集合, 从而降低问题的规模。召回常用的策略:
Youtube DNN 召回
基于文章的召回
基于用户的召回
上面的各种召回方式一部分在基于用户已经看得文章的基础上去召回与这些文章相似的一些文章, 而这个相似性的计算方式不同, 就得到了不同的召回方式, 比如文章的协同过滤, 文章内容的embedding等。还有一部分是根据用户的相似性进行推荐,对于某用户推荐与其相似的其他用户看过的文章,比如用户的协同过滤和用户embedding。 还有一种思路是类似矩阵分解的思路,先计算出用户和文章的embedding之后,就可以直接算用户和文章的相似度, 根据这个相似度进行推荐, 比如YouTube DNN。 我们下面详细来看一下每一个召回方法:
YoutubeDNN召回 (这一步是直接获取用户召回的候选文章列表)
论文下载地址
Youtubednn召回架构
关于YoutubeDNN原理和应用推荐看王喆的两篇博客:
重读Youtube深度学习推荐系统论文,字字珠玑,惊为神文
YouTube深度学习推荐系统的十大工程问题
参考文献:
https://zhuanlan.zhihu.com/p/52169807 (YouTubeDNN原理)
https://zhuanlan.zhihu.com/p/26306795 (Word2Vec知乎众赞文章) — word2vec放到排序中的w2v的介绍部分
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 def gen_data_set (data, negsample=0 ): data.sort_values("click_timestamp" , inplace=True ) item_ids = data['click_article_id' ].unique() train_set = [] test_set = [] for reviewerID, hist in tqdm(data.groupby('user_id' )): pos_list = hist['click_article_id' ].tolist() if negsample > 0 : candidate_set = list (set (item_ids) - set (pos_list)) neg_list = np.random.choice(candidate_set,size=len (pos_list)*negsample,replace=True ) if len (pos_list) == 1 : train_set.append((reviewerID, [pos_list[0 ]], pos_list[0 ],1 ,len (pos_list))) test_set.append((reviewerID, [pos_list[0 ]], pos_list[0 ],1 ,len (pos_list))) for i in range (1 , len (pos_list)): hist = pos_list[:i] if i != len (pos_list) - 1 : train_set.append((reviewerID, hist[::-1 ], pos_list[i], 1 , len (hist[::-1 ]))) for negi in range (negsample): train_set.append((reviewerID, hist[::-1 ], neg_list[i*negsample+negi], 0 ,len (hist[::-1 ]))) else : test_set.append((reviewerID, hist[::-1 ], pos_list[i],1 ,len (hist[::-1 ]))) random.shuffle(train_set) random.shuffle(test_set) return train_set, test_set def gen_model_input (train_set,user_profile,seq_max_len ): train_uid = np.array([line[0 ] for line in train_set]) train_seq = [line[1 ] for line in train_set] train_iid = np.array([line[2 ] for line in train_set]) train_label = np.array([line[3 ] for line in train_set]) train_hist_len = np.array([line[4 ] for line in train_set]) train_seq_pad = pad_sequences(train_seq, maxlen=seq_max_len, padding='post' , truncating='post' , value=0 ) train_model_input = {"user_id" : train_uid, "click_article_id" : train_iid, "hist_article_id" : train_seq_pad, "hist_len" : train_hist_len} return train_model_input, train_label
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 def youtubednn_u2i_dict (data, topk=20 ): sparse_features = ["click_article_id" , "user_id" ] SEQ_LEN = 30 user_profile_ = data[["user_id" ]].drop_duplicates('user_id' ) item_profile_ = data[["click_article_id" ]].drop_duplicates('click_article_id' ) features = ["click_article_id" , "user_id" ] feature_max_idx = {} for feature in features: lbe = LabelEncoder() data[feature] = lbe.fit_transform(data[feature]) feature_max_idx[feature] = data[feature].max () + 1 user_profile = data[["user_id" ]].drop_duplicates('user_id' ) item_profile = data[["click_article_id" ]].drop_duplicates('click_article_id' ) user_index_2_rawid = dict (zip (user_profile['user_id' ], user_profile_['user_id' ])) item_index_2_rawid = dict (zip (item_profile['click_article_id' ], item_profile_['click_article_id' ])) train_set, test_set = gen_data_set(data, 0 ) train_model_input, train_label = gen_model_input(train_set, user_profile, SEQ_LEN) test_model_input, test_label = gen_model_input(test_set, user_profile, SEQ_LEN) embedding_dim = 16 user_feature_columns = [SparseFeat('user_id' , feature_max_idx['user_id' ], embedding_dim), VarLenSparseFeat(SparseFeat('hist_article_id' , feature_max_idx['click_article_id' ], embedding_dim, embedding_name="click_article_id" ), SEQ_LEN, 'mean' , 'hist_len' ),] item_feature_columns = [SparseFeat('click_article_id' , feature_max_idx['click_article_id' ], embedding_dim)] model = YoutubeDNN(user_feature_columns, item_feature_columns, num_sampled=5 , user_dnn_hidden_units=(64 , embedding_dim)) model.compile (optimizer="adam" , loss=sampledsoftmaxloss) history = model.fit(train_model_input, train_label, batch_size=256 , epochs=1 , verbose=1 , validation_split=0.0 ) test_user_model_input = test_model_input all_item_model_input = {"click_article_id" : item_profile['click_article_id' ].values} user_embedding_model = Model(inputs=model.user_input, outputs=model.user_embedding) item_embedding_model = Model(inputs=model.item_input, outputs=model.item_embedding) user_embs = user_embedding_model.predict(test_user_model_input, batch_size=2 ** 12 ) item_embs = item_embedding_model.predict(all_item_model_input, batch_size=2 ** 12 ) user_embs = user_embs / np.linalg.norm(user_embs, axis=1 , keepdims=True ) item_embs = item_embs / np.linalg.norm(item_embs, axis=1 , keepdims=True ) raw_user_id_emb_dict = {user_index_2_rawid[k]: \ v for k, v in zip (user_profile['user_id' ], user_embs)} raw_item_id_emb_dict = {item_index_2_rawid[k]: \ v for k, v in zip (item_profile['click_article_id' ], item_embs)} pickle.dump(raw_user_id_emb_dict, open (save_path + 'user_youtube_emb.pkl' , 'wb' )) pickle.dump(raw_item_id_emb_dict, open (save_path + 'item_youtube_emb.pkl' , 'wb' )) index = faiss.IndexFlatIP(embedding_dim) index.add(item_embs) sim, idx = index.search(np.ascontiguousarray(user_embs), topk) user_recall_items_dict = collections.defaultdict(dict ) for target_idx, sim_value_list, rele_idx_list in tqdm(zip (test_user_model_input['user_id' ], sim, idx)): target_raw_id = user_index_2_rawid[target_idx] for rele_idx, sim_value in zip (rele_idx_list[1 :], sim_value_list[1 :]): rele_raw_id = item_index_2_rawid[rele_idx] user_recall_items_dict[target_raw_id][rele_raw_id] = user_recall_items_dict.get(target_raw_id, {})\ .get(rele_raw_id, 0 ) + sim_value user_recall_items_dict = {k: sorted (v.items(), key=lambda x: x[1 ], reverse=True ) for k, v in user_recall_items_dict.items()} pickle.dump(user_recall_items_dict, open (save_path + 'youtube_u2i_dict.pkl' , 'wb' )) return user_recall_items_dict
1 2 3 4 5 6 7 8 if not metric_recall: user_multi_recall_dict['youtubednn_recall' ] = youtubednn_u2i_dict(all_click_df, topk=20 ) else : trn_hist_click_df, trn_last_click_df = get_hist_and_last_click(all_click_df) user_multi_recall_dict['youtubednn_recall' ] = youtubednn_u2i_dict(trn_hist_click_df, topk=20 ) metrics_recall(user_multi_recall_dict['youtubednn_recall' ], trn_last_click_df, topk=20 )
100%|██████████| 250000/250000 [02:02<00:00, 2038.57it/s]
WARNING:tensorflow:From /home/ryluo/anaconda3/lib/python3.6/site-packages/tensorflow/python/keras/initializers.py:143: calling RandomNormal.__init__ (from tensorflow.python.ops.init_ops) with dtype is deprecated and will be removed in a future version.
Instructions for updating:
Call initializer instance with the dtype argument instead of passing it to the constructor
WARNING:tensorflow:From /home/ryluo/anaconda3/lib/python3.6/site-packages/tensorflow/python/autograph/impl/api.py:253: calling reduce_sum_v1 (from tensorflow.python.ops.math_ops) with keep_dims is deprecated and will be removed in a future version.
Instructions for updating:
keep_dims is deprecated, use keepdims instead
WARNING:tensorflow:From /home/ryluo/anaconda3/lib/python3.6/site-packages/tensorflow/python/autograph/impl/api.py:253: div (from tensorflow.python.ops.math_ops) is deprecated and will be removed in a future version.
Instructions for updating:
Deprecated in favor of operator or tf.math.divide.
WARNING:tensorflow:From /home/ryluo/anaconda3/lib/python3.6/site-packages/tensorflow/python/ops/init_ops.py:1288: calling VarianceScaling.__init__ (from tensorflow.python.ops.init_ops) with dtype is deprecated and will be removed in a future version.
Instructions for updating:
Call initializer instance with the dtype argument instead of passing it to the constructor
1149673/1149673 [==============================] - 216s 188us/sample - loss: 0.1326
250000it [00:32, 7720.75it/s]
itemCF recall 上面已经通过协同过滤,Embedding检索的方式得到了文章的相似度矩阵,下面使用协同过滤的思想,给用户召回与其历史文章相似的文章。 这里在召回的时候,也是用了关联规则的方式:
考虑相似文章与历史点击文章顺序的权重(细节看代码)
考虑文章创建时间的权重,也就是考虑相似文章与历史点击文章创建时间差的权重
考虑文章内容相似度权重(使用Embedding计算相似文章相似度,但是这里需要注意,在Embedding的时候并没有计算所有商品两两之间的相似度,所以相似的文章与历史点击文章不存在相似度,需要做特殊处理)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 def item_based_recommend (user_id, user_item_time_dict, i2i_sim, sim_item_topk, recall_item_num, item_topk_click, item_created_time_dict, emb_i2i_sim ): """ 基于文章协同过滤的召回 :param user_id: 用户id :param user_item_time_dict: 字典, 根据点击时间获取用户的点击文章序列 {user1: {item1: time1, item2: time2..}...} :param i2i_sim: 字典,文章相似性矩阵 :param sim_item_topk: 整数, 选择与当前文章最相似的前k篇文章 :param recall_item_num: 整数, 最后的召回文章数量 :param item_topk_click: 列表,点击次数最多的文章列表,用户召回补全 :param emb_i2i_sim: 字典基于内容embedding算的文章相似矩阵 return: 召回的文章列表 {item1:score1, item2: score2...} """ user_hist_items = user_item_time_dict[user_id] item_rank = {} for loc, (i, click_time) in enumerate (user_hist_items): for j, wij in sorted (i2i_sim[i].items(), key=lambda x: x[1 ], reverse=True )[:sim_item_topk]: if j in user_hist_items: continue created_time_weight = np.exp(0.8 ** np.abs (item_created_time_dict[i] - item_created_time_dict[j])) loc_weight = (0.9 ** (len (user_hist_items) - loc)) content_weight = 1.0 if emb_i2i_sim.get(i, {}).get(j, None ) is not None : content_weight += emb_i2i_sim[i][j] if emb_i2i_sim.get(j, {}).get(i, None ) is not None : content_weight += emb_i2i_sim[j][i] item_rank.setdefault(j, 0 ) item_rank[j] += created_time_weight * loc_weight * content_weight * wij if len (item_rank) < recall_item_num: for i, item in enumerate (item_topk_click): if item in item_rank.items(): continue item_rank[item] = - i - 100 if len (item_rank) == recall_item_num: break item_rank = sorted (item_rank.items(), key=lambda x: x[1 ], reverse=True )[:recall_item_num] return item_rank
itemCF sim召回 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 if metric_recall: trn_hist_click_df, trn_last_click_df = get_hist_and_last_click(all_click_df) else : trn_hist_click_df = all_click_df user_recall_items_dict = collections.defaultdict(dict ) user_item_time_dict = get_user_item_time(trn_hist_click_df) i2i_sim = pickle.load(open (save_path + 'itemcf_i2i_sim.pkl' , 'rb' )) emb_i2i_sim = pickle.load(open (save_path + 'emb_i2i_sim.pkl' , 'rb' )) sim_item_topk = 20 recall_item_num = 10 item_topk_click = get_item_topk_click(trn_hist_click_df, k=50 ) for user in tqdm(trn_hist_click_df['user_id' ].unique()): user_recall_items_dict[user] = item_based_recommend(user, user_item_time_dict, \ i2i_sim, sim_item_topk, recall_item_num, \ item_topk_click, item_created_time_dict, emb_i2i_sim) user_multi_recall_dict['itemcf_sim_itemcf_recall' ] = user_recall_items_dict pickle.dump(user_multi_recall_dict['itemcf_sim_itemcf_recall' ], open (save_path + 'itemcf_recall_dict.pkl' , 'wb' )) if metric_recall: metrics_recall(user_multi_recall_dict['itemcf_sim_itemcf_recall' ], trn_last_click_df, topk=recall_item_num)
100%|██████████| 250000/250000 [2:51:13<00:00, 24.33it/s]
embedding sim 召回 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 if metric_recall: trn_hist_click_df, trn_last_click_df = get_hist_and_last_click(all_click_df) else : trn_hist_click_df = all_click_df user_recall_items_dict = collections.defaultdict(dict ) user_item_time_dict = get_user_item_time(trn_hist_click_df) i2i_sim = pickle.load(open (save_path + 'emb_i2i_sim.pkl' ,'rb' )) sim_item_topk = 20 recall_item_num = 10 item_topk_click = get_item_topk_click(trn_hist_click_df, k=50 ) for user in tqdm(trn_hist_click_df['user_id' ].unique()): user_recall_items_dict[user] = item_based_recommend(user, user_item_time_dict, i2i_sim, sim_item_topk, recall_item_num, item_topk_click, item_created_time_dict, emb_i2i_sim) user_multi_recall_dict['embedding_sim_item_recall' ] = user_recall_items_dict pickle.dump(user_multi_recall_dict['embedding_sim_item_recall' ], open (save_path + 'embedding_sim_item_recall.pkl' , 'wb' )) if metric_recall: metrics_recall(user_multi_recall_dict['embedding_sim_item_recall' ], trn_last_click_df, topk=recall_item_num)
100%|██████████| 250000/250000 [04:35<00:00, 905.85it/s]
userCF召回 基于用户协同过滤,核心思想是给用户推荐与其相似的用户历史点击文章,因为这里涉及到了相似用户的历史文章,这里仍然可以加上一些关联规则来给用户可能点击的文章进行加权,这里使用的关联规则主要是考虑相似用户的历史点击文章与被推荐用户历史点击商品的关系权重,而这里的关系就可以直接借鉴基于物品的协同过滤相似的做法,只不过这里是对被推荐物品关系的一个累加的过程,下面是使用的一些关系权重,及相关的代码:
计算被推荐用户历史点击文章与相似用户历史点击文章的相似度,文章创建时间差,相对位置的总和,作为各自的权重
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 def user_based_recommend (user_id, user_item_time_dict, u2u_sim, sim_user_topk, recall_item_num, item_topk_click, item_created_time_dict, emb_i2i_sim ): """ 基于文章协同过滤的召回 :param user_id: 用户id :param user_item_time_dict: 字典, 根据点击时间获取用户的点击文章序列 {user1: {item1: time1, item2: time2..}...} :param u2u_sim: 字典,文章相似性矩阵 :param sim_user_topk: 整数, 选择与当前用户最相似的前k个用户 :param recall_item_num: 整数, 最后的召回文章数量 :param item_topk_click: 列表,点击次数最多的文章列表,用户召回补全 :param item_created_time_dict: 文章创建时间列表 :param emb_i2i_sim: 字典基于内容embedding算的文章相似矩阵 return: 召回的文章列表 {item1:score1, item2: score2...} """ user_item_time_list = user_item_time_dict[user_id] user_hist_items = set ([i for i, t in user_item_time_list]) items_rank = {} for sim_u, wuv in sorted (u2u_sim[user_id].items(), key=lambda x: x[1 ], reverse=True )[:sim_user_topk]: for i, click_time in user_item_time_dict[sim_u]: if i in user_hist_items: continue items_rank.setdefault(i, 0 ) loc_weight = 1.0 content_weight = 1.0 created_time_weight = 1.0 for loc, (j, click_time) in enumerate (user_item_time_list): loc_weight += 0.9 ** (len (user_item_time_list) - loc) if emb_i2i_sim.get(i, {}).get(j, None ) is not None : content_weight += emb_i2i_sim[i][j] if emb_i2i_sim.get(j, {}).get(i, None ) is not None : content_weight += emb_i2i_sim[j][i] created_time_weight += np.exp(0.8 * np.abs (item_created_time_dict[i] - item_created_time_dict[j])) items_rank[i] += loc_weight * content_weight * created_time_weight * wuv if len (items_rank) < recall_item_num: for i, item in enumerate (item_topk_click): if item in items_rank.items(): continue items_rank[item] = - i - 100 if len (items_rank) == recall_item_num: break items_rank = sorted (items_rank.items(), key=lambda x: x[1 ], reverse=True )[:recall_item_num] return items_rank
userCF sim召回 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 if metric_recall: trn_hist_click_df, trn_last_click_df = get_hist_and_last_click(all_click_df) else : trn_hist_click_df = all_click_df user_recall_items_dict = collections.defaultdict(dict ) user_item_time_dict = get_user_item_time(trn_hist_click_df) u2u_sim = pickle.load(open (save_path + 'usercf_u2u_sim.pkl' , 'rb' )) sim_user_topk = 20 recall_item_num = 10 item_topk_click = get_item_topk_click(trn_hist_click_df, k=50 ) for user in tqdm(trn_hist_click_df['user_id' ].unique()): user_recall_items_dict[user] = user_based_recommend(user, user_item_time_dict, u2u_sim, sim_user_topk, \ recall_item_num, item_topk_click, item_created_time_dict, emb_i2i_sim) pickle.dump(user_recall_items_dict, open (save_path + 'usercf_u2u2i_recall.pkl' , 'wb' )) if metric_recall: metrics_recall(user_recall_items_dict, trn_last_click_df, topk=recall_item_num)
user embedding sim召回 虽然没有直接跑usercf的计算用户之间的相似度,为了验证上述基于用户的协同过滤的代码,下面使用了YoutubeDNN过程中产生的user embedding来进行向量检索每个user最相似的topk个user,在使用这里得到的u2u的相似性矩阵,使用usercf进行召回,具体代码如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 def u2u_embdding_sim (click_df, user_emb_dict, save_path, topk ): user_list = [] user_emb_list = [] for user_id, user_emb in user_emb_dict.items(): user_list.append(user_id) user_emb_list.append(user_emb) user_index_2_rawid_dict = {k: v for k, v in zip (range (len (user_list)), user_list)} user_emb_np = np.array(user_emb_list, dtype=np.float32) user_index = faiss.IndexFlatIP(user_emb_np.shape[1 ]) user_index.add(user_emb_np) sim, idx = user_index.search(user_emb_np, topk) user_sim_dict = collections.defaultdict(dict ) for target_idx, sim_value_list, rele_idx_list in tqdm(zip (range (len (user_emb_np)), sim, idx)): target_raw_id = user_index_2_rawid_dict[target_idx] for rele_idx, sim_value in zip (rele_idx_list[1 :], sim_value_list[1 :]): rele_raw_id = user_index_2_rawid_dict[rele_idx] user_sim_dict[target_raw_id][rele_raw_id] = user_sim_dict.get(target_raw_id, {}).get(rele_raw_id, 0 ) + sim_value pickle.dump(user_sim_dict, open (save_path + 'youtube_u2u_sim.pkl' , 'wb' )) return user_sim_dict
1 2 3 4 5 user_emb_dict = pickle.load(open (save_path + 'user_youtube_emb.pkl' , 'rb' )) u2u_sim = u2u_embdding_sim(all_click_df, user_emb_dict, save_path, topk=10 )
250000it [00:23, 10507.45it/s]
通过YoutubeDNN得到的user_embedding
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 if metric_recall: trn_hist_click_df, trn_last_click_df = get_hist_and_last_click(all_click_df) else : trn_hist_click_df = all_click_df user_recall_items_dict = collections.defaultdict(dict ) user_item_time_dict = get_user_item_time(trn_hist_click_df) u2u_sim = pickle.load(open (save_path + 'youtube_u2u_sim.pkl' , 'rb' )) sim_user_topk = 20 recall_item_num = 10 item_topk_click = get_item_topk_click(trn_hist_click_df, k=50 ) for user in tqdm(trn_hist_click_df['user_id' ].unique()): user_recall_items_dict[user] = user_based_recommend(user, user_item_time_dict, u2u_sim, sim_user_topk, \ recall_item_num, item_topk_click, item_created_time_dict, emb_i2i_sim) user_multi_recall_dict['youtubednn_usercf_recall' ] = user_recall_items_dict pickle.dump(user_multi_recall_dict['youtubednn_usercf_recall' ], open (save_path + 'youtubednn_usercf_recall.pkl' , 'wb' )) if metric_recall: metrics_recall(user_multi_recall_dict['youtubednn_usercf_recall' ], trn_last_click_df, topk=recall_item_num)
100%|██████████| 250000/250000 [19:43<00:00, 211.22it/s]
冷启动问题 冷启动问题可以分成三类:文章冷启动,用户冷启动,系统冷启动。
文章冷启动:对于一个平台系统新加入的文章,该文章没有任何的交互记录,如何推荐给用户的问题。(对于我们场景可以认为是,日志数据中没有出现过的文章都可以认为是冷启动的文章)
用户冷启动:对于一个平台系统新来的用户,该用户还没有文章的交互信息,如何给该用户进行推荐。(对于我们场景就是,测试集中的用户是否在测试集对应的log数据中出现过,如果没有出现过,那么可以认为该用户是冷启动用户。但是有时候并没有这么严格,我们也可以自己设定某些指标来判别哪些用户是冷启动用户,比如通过使用时长,点击率,留存率等等)
系统冷启动:就是对于一个平台刚上线,还没有任何的相关历史数据,此时就是系统冷启动,其实也就是前面两种的一个综合。
当前场景下冷启动问题的分析:
对当前的数据进行分析会发现,日志中所有出现过的点击文章只有3w多个,而整个文章库中却有30多万,那么测试集中的用户最后一次点击是否会点击没有出现在日志中的文章呢?如果存在这种情况,说明用户点击的文章之前没有任何的交互信息,这也就是我们所说的文章冷启动。通过数据分析还可以发现,测试集用户只有一次点击的数据占得比例还不少,其实仅仅通过用户的一次点击就给用户推荐文章使用模型的方式也是比较难的,这里其实也可以考虑用户冷启动的问题,但是这里只给出物品冷启动的一些解决方案及代码,关于用户冷启动的话提一些可行性的做法。
文章冷启动(没有冷启动的探索问题) 其实我们这里不是为了做文章的冷启动而做冷启动,而是猜测用户可能会点击一些没有在log数据中出现的文章,我们要做的就是如何从将近27万的文章中选择一些文章作为用户冷启动的文章,这里其实也可以看成是一种召回策略,我们这里就采用简单的比较好理解的基于规则的召回策略来获取用户可能点击的未出现在log数据中的文章。 现在的问题变成了:如何给每个用户考虑从27万个商品中获取一小部分商品?随机选一些可能是一种方案。下面给出一些参考的方案。
首先基于Embedding召回一部分与用户历史相似的文章
从基于Embedding召回的文章中通过一些规则过滤掉一些文章,使得留下的文章用户更可能点击。我们这里的规则,可以是,留下那些与用户历史点击文章主题相同的文章,或者字数相差不大的文章。并且留下的文章尽量是与测试集用户最后一次点击时间更接近的文章,或者是当天的文章也行。
用户冷启动 这里对测试集中的用户点击数据进行分析会发现,测试集中有百分之20的用户只有一次点击,那么这些点击特别少的用户的召回是不是可以单独做一些策略上的补充呢?或者是在排序后直接基于规则加上一些文章呢?这些都可以去尝试,这里没有提供具体的做法。
注意:
这里看似和基于embedding计算的item之间相似度然后做itemcf是一致的,但是现在我们的目的不一样,我们这里的目的是找到相似的向量,并且还没有出现在log日志中的商品,再加上一些其他的冷启动的策略,这里需要找回的数量会偏多一点,不然被筛选完之后可能都没有文章了
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 trn_hist_click_df = all_click_df user_recall_items_dict = collections.defaultdict(dict ) user_item_time_dict = get_user_item_time(trn_hist_click_df) i2i_sim = pickle.load(open (save_path + 'emb_i2i_sim.pkl' ,'rb' )) sim_item_topk = 150 recall_item_num = 100 item_topk_click = get_item_topk_click(trn_hist_click_df, k=50 ) for user in tqdm(trn_hist_click_df['user_id' ].unique()): user_recall_items_dict[user] = item_based_recommend(user, user_item_time_dict, i2i_sim, sim_item_topk, recall_item_num, item_topk_click,item_created_time_dict, emb_i2i_sim) pickle.dump(user_recall_items_dict, open (save_path + 'cold_start_items_raw_dict.pkl' , 'wb' ))
100%|██████████| 250000/250000 [05:01<00:00, 828.60it/s]
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 def get_click_article_ids_set (all_click_df ): return set (all_click_df.click_article_id.values) def cold_start_items (user_recall_items_dict, user_hist_item_typs_dict, user_hist_item_words_dict, \ user_last_item_created_time_dict, item_type_dict, item_words_dict, item_created_time_dict, click_article_ids_set, recall_item_num ): """ 冷启动的情况下召回一些文章 :param user_recall_items_dict: 基于内容embedding相似性召回来的很多文章, 字典, {user1: [item1, item2, ..], } :param user_hist_item_typs_dict: 字典, 用户点击的文章的主题映射 :param user_hist_item_words_dict: 字典, 用户点击的历史文章的字数映射 :param user_last_item_created_time_idct: 字典,用户点击的历史文章创建时间映射 :param item_tpye_idct: 字典,文章主题映射 :param item_words_dict: 字典,文章字数映射 :param item_created_time_dict: 字典, 文章创建时间映射 :param click_article_ids_set: 集合,用户点击过得文章, 也就是日志里面出现过的文章 :param recall_item_num: 召回文章的数量, 这个指的是没有出现在日志里面的文章数量 """ cold_start_user_items_dict = {} for user, item_list in tqdm(user_recall_items_dict.items()): cold_start_user_items_dict.setdefault(user, []) for item, score in item_list: hist_item_type_set = user_hist_item_typs_dict[user] hist_mean_words = user_hist_item_words_dict[user] hist_last_item_created_time = user_last_item_created_time_dict[user] hist_last_item_created_time = datetime.fromtimestamp(hist_last_item_created_time) curr_item_type = item_type_dict[item] curr_item_words = item_words_dict[item] curr_item_created_time = item_created_time_dict[item] curr_item_created_time = datetime.fromtimestamp(curr_item_created_time) if curr_item_type not in hist_item_type_set or \ item in click_article_ids_set or \ abs (curr_item_words - hist_mean_words) > 200 or \ abs ((curr_item_created_time - hist_last_item_created_time).days) > 90 : continue cold_start_user_items_dict[user].append((item, score)) cold_start_user_items_dict = {k: sorted (v, key=lambda x:x[1 ], reverse=True )[:recall_item_num] \ for k, v in cold_start_user_items_dict.items()} pickle.dump(cold_start_user_items_dict, open (save_path + 'cold_start_user_items_dict.pkl' , 'wb' )) return cold_start_user_items_dict
1 2 3 4 5 6 7 8 9 10 11 all_click_df_ = all_click_df.copy() all_click_df_ = all_click_df_.merge(item_info_df, how='left' , on='click_article_id' ) user_hist_item_typs_dict, user_hist_item_ids_dict, user_hist_item_words_dict, user_last_item_created_time_dict = get_user_hist_item_info_dict(all_click_df_) click_article_ids_set = get_click_article_ids_set(all_click_df) cold_start_user_items_dict = cold_start_items(user_recall_items_dict, user_hist_item_typs_dict, user_hist_item_words_dict, \ user_last_item_created_time_dict, item_type_dict, item_words_dict, \ item_created_time_dict, click_article_ids_set, recall_item_num) user_multi_recall_dict['cold_start_recall' ] = cold_start_user_items_dict
100%|██████████| 250000/250000 [01:49<00:00, 2293.37it/s]
多路召回合并 多路召回合并就是将前面所有的召回策略得到的用户文章列表合并起来,下面是对前面所有召回结果的汇总
基于itemcf计算的item之间的相似度sim进行的召回
基于embedding搜索得到的item之间的相似度进行的召回
YoutubeDNN召回
YoutubeDNN得到的user之间的相似度进行的召回
基于冷启动策略的召回
注意: 在做召回评估的时候就会发现有些召回的效果不错有些召回的效果很差,所以对每一路召回的结果,我们可以认为的定义一些权重,来做最终的相似度融合
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 def combine_recall_results (user_multi_recall_dict, weight_dict=None , topk=25 ): final_recall_items_dict = {} def norm_user_recall_items_sim (sorted_item_list ): if len (sorted_item_list) < 2 : return sorted_item_list min_sim = sorted_item_list[-1 ][1 ] max_sim = sorted_item_list[0 ][1 ] norm_sorted_item_list = [] for item, score in sorted_item_list: if max_sim > 0 : norm_score = 1.0 * (score - min_sim) / (max_sim - min_sim) if max_sim > min_sim else 1.0 else : norm_score = 0.0 norm_sorted_item_list.append((item, norm_score)) return norm_sorted_item_list print('多路召回合并...' ) for method, user_recall_items in tqdm(user_multi_recall_dict.items()): print(method + '...' ) if weight_dict == None : recall_method_weight = 1 else : recall_method_weight = weight_dict[method] for user_id, sorted_item_list in user_recall_items.items(): user_recall_items[user_id] = norm_user_recall_items_sim(sorted_item_list) for user_id, sorted_item_list in user_recall_items.items(): final_recall_items_dict.setdefault(user_id, {}) for item, score in sorted_item_list: final_recall_items_dict[user_id].setdefault(item, 0 ) final_recall_items_dict[user_id][item] += recall_method_weight * score final_recall_items_dict_rank = {} for user, recall_item_dict in final_recall_items_dict.items(): final_recall_items_dict_rank[user] = sorted (recall_item_dict.items(), key=lambda x: x[1 ], reverse=True )[:topk] pickle.dump(final_recall_items_dict, open (os.path.join(save_path, 'final_recall_items_dict.pkl' ),'wb' )) return final_recall_items_dict_rank
1 2 3 4 5 6 weight_dict = {'itemcf_sim_itemcf_recall' : 1.0 , 'embedding_sim_item_recall' : 1.0 , 'youtubednn_recall' : 1.0 , 'youtubednn_usercf_recall' : 1.0 , 'cold_start_recall' : 1.0 }
1 2 final_recall_items_dict_rank = combine_recall_results(user_multi_recall_dict, weight_dict, topk=150 )
0%| | 0/5 [00:00<?, ?it/s]
多路召回合并...
itemcf_sim_itemcf_recall...
20%|██ | 1/5 [00:08<00:34, 8.66s/it]
embedding_sim_item_recall...
40%|████ | 2/5 [00:16<00:24, 8.29s/it]
youtubednn_recall...
youtubednn_usercf_recall...
80%|████████ | 4/5 [00:23<00:06, 6.98s/it]
cold_start_recall...
100%|██████████| 5/5 [00:42<00:00, 8.40s/it]
总结 上述实现了如下召回策略:
基于关联规则的itemcf
基于关联规则的usercf
youtubednn召回
冷启动召回
对于上述实现的召回策略其实都不是最优的结果,我们只是做了个简单的尝试,其中还有很多地方可以优化,包括已经实现的这些召回策略的参数或者新加一些,修改一些关联规则都可以。当然还可以尝试更多的召回策略,比如对新闻进行热度召回等等。