项目概述
本项目旨在构建一个基于 Hadoop 和 MapReduce 的电影推荐系统,利用大规模数据处理技术和多种推荐算法,为用户提供个性化的电影推荐。项目分为两部分:
- MapReduce部分:负责数据预处理和分布式计算
- Python部分:负责推荐算法的实现与性能评估
项目目标
- 使用 Hadoop 和 MapReduce 处理大规模电影评分数据
- 实现多种推荐算法(基于用户的协同过滤、基于物品的协同过滤、矩阵分解)
- 对比不同算法的性能,选择最优算法
- 提供用户友好的推荐结果
技术栈
- 数据处理:Hadoop、MapReduce(Java)
- 推荐算法:Python(NumPy、pandas、scikit-learn)
- 评估指标:RMSE(均方根误差)、MAE(平均绝对误差)
项目流程
- 数据预处理:使用 MapReduce 清洗和格式化原始数据
- 推荐算法实现:使用 Python 实现多种推荐算法
- 算法对比:评估不同算法的性能,选择最优算法
- 结果生成:为用户生成个性化推荐结果
系统架构与模块设计
系统架构
系统架构分为三层:
- 数据层:存储原始数据(MovieLens数据集)以及 MapReduce 处理后的中间数据(用户-电影-平均评分)
- 计算层:利用 MapReduce 进行分布式数据处理,利用 Python 实现推荐算法并进行性能评估
- 应用层:生成推荐结果并展示给用户
模块设计
MapReduce模块
该模块专注于数据预处理和分布式计算,解决大数据环境下的性能瓶颈。
输入是原始电影评分数据,输出是用户-电影-平均评分数据。该模块主要包含的功能有:
- 数据清洗:去除无效数据
- 数据转换:生成键值对格式
- 分布式计算:计算用户对电影的平均评分
Python模块
该模块专注于算法实现和性能优化,解决推荐系统的核心逻辑。
输入是 MapReduce 输出的用户-电影-平均评分数据,输出是推荐结果;该模块主要包含的功能有:
- 数据加载:将数据加载为适合算法处理的矩阵格式
- 推荐算法实现:基于用户的协同过滤、基于物品的协同过滤、矩阵分解
- 算法对比:计算 RMSE,选择最优算法
详细功能代码
MapReduce数据清洗与处理
Mapper
在 Mapper 中,添加数据清洗逻辑,包括检查数据是否完整,检查是否有空值或无效值。在 Mapper 中,每条记录的键是 <用户ID , 电影ID>,值是 <评分>。Hadoop 的 MapReduce 框架会自动合并相同键的值,因此在 Reducer 中,相同用户对同一部电影的多个评分会被合并。Reducer 计算这些评分的平均值,确保输出结果中没有重复值。
import java.io.IOException; import org.apache.hadoop.io.FloatWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper;
public class MovieRatingMapper extends Mapper<LongWritable, Text, Text, FloatWritable> { private Text userMovie = new Text(); private FloatWritable rating = new FloatWritable();
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); String[] parts = line.split(",");
if (parts.length >= 3) { String userId = parts[0].trim(); String movieId = parts[1].trim(); String ratingStr = parts[2].trim();
if (!userId.isEmpty() && !movieId.isEmpty() && !ratingStr.isEmpty()) { try { float ratingValue = Float.parseFloat(ratingStr); userMovie.set(userId + "," + movieId); rating.set(ratingValue); context.write(userMovie, rating); } catch (NumberFormatException e) { System.err.println("Invalid rating value: " + ratingStr); } } } } }
|
Reducer
在 Reducer 中,计算每个用户对每部电影的平均评分,并确保输出结果没有重复值。
import java.io.IOException; import org.apache.hadoop.io.FloatWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer;
public class MovieRatingReducer extends Reducer<Text, FloatWritable, Text, FloatWritable> { private FloatWritable result = new FloatWritable();
public void reduce(Text key, Iterable<FloatWritable> values, Context context) throws IOException, InterruptedException { float sum = 0; int count = 0;
for (FloatWritable val : values) { sum += val.get(); count++; }
if (count > 0) { float averageRating = sum / count; result.set(averageRating); context.write(key, result); } } }
|
Driver
用于配置和启动 MapReducer 任务。
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.FloatWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class MovieRatingDriver { public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf, "Movie Rating Average"); job.setJarByClass(MovieRatingDriver.class); job.setMapperClass(MovieRatingMapper.class); job.setReducerClass(MovieRatingReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(FloatWritable.class); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); System.exit(job.waitForCompletion(true) ? 0 : 1); } }
|
Python推荐算法实现
数据加载
import numpy as np import pandas as pd
data = pd.read_csv('hadoop_output.csv', header=None, names=['user_id', 'movie_id', 'rating']) user_movie_matrix = data.pivot(index='user_id', columns='movie_id', values='rating').fillna(0)
|
基于用户的协同过滤
from sklearn.metrics.pairwise import cosine_similarity
def user_based_collaborative_filtering(user_movie_matrix, user_id, top_n=5): user_similarity = cosine_similarity(user_movie_matrix) user_similarity = pd.DataFrame(user_similarity, index=user_movie_matrix.index, columns=user_movie_matrix.index) similar_users = user_similarity[user_id].sort_values(ascending=False).index[1:top_n+1] recommendations = user_movie_matrix.loc[similar_users].mean(axis=0).sort_values(ascending=False) return recommendations.head(top_n)
|
基于物品的协同过滤
def item_based_collaborative_filtering(user_movie_matrix, movie_id, top_n=5): item_similarity = cosine_similarity(user_movie_matrix.T) item_similarity = pd.DataFrame(item_similarity, index=user_movie_matrix.columns, columns=user_movie_matrix.columns) similar_movies = item_similarity[movie_id].sort_values(ascending=False).index[1:top_n+1] return similar_movies
|
矩阵分解
def matrix_factorization(R, K=10, steps=5000, alpha=0.0002, beta=0.02): num_users, num_items = R.shape P = np.random.rand(num_users, K) Q = np.random.rand(num_items, K) for step in range(steps): for i in range(num_users): for j in range(num_items): if R[i][j] > 0: eij = R[i][j] - np.dot(P[i,:], Q[j,:].T) for k in range(K): P[i][k] += alpha * (2 * eij * Q[j][k] - beta * P[i][k]) Q[j][k] += alpha * (2 * eij * P[i][k] - beta * Q[j][k]) error = 0 for i in range(num_users): for j in range(num_items): if R[i][j] > 0: error += (R[i][j] - np.dot(P[i,:], Q[j,:].T)) ** 2 for k in range(K): error += (beta/2) * (P[i][k]**2 + Q[j][k]**2) if error < 0.001: break return P, Q
P, Q = matrix_factorization(user_movie_matrix.values) predicted_ratings = np.dot(P, Q.T)
|
算法对比
from sklearn.metrics import mean_squared_error
def calculate_rmse(actual, predicted): return np.sqrt(mean_squared_error(actual, predicted))
actual_ratings = user_movie_matrix.values[user_movie_matrix.values > 0] predicted_ratings_mf = predicted_ratings[user_movie_matrix.values > 0]
rmse_mf = calculate_rmse(actual_ratings, predicted_ratings_mf) print(f"Matrix Factorization RMSE: {rmse_mf}")
|
项目运行
环境准备
Hadoop集群
下载 Hadoop 安装包,解压并配置环境变量:
export HADOOP_HOME=/path/to/hadoop export PATH=$PATH:$HADOOP_HOME/bin:$HADOOP_HOME/sbin
|
编辑 $HADOOP_HOME/etc/hadoop/core-site.xml:
<configuration> <property> <name>fs.defaultFS</name> <value>hdfs://localhost:9000</value> </property> </configuration>
|
编辑 $HADOOP_HOME/etc/hadoop/hdfs-site.xml:
<configuration> <property> <name>dfs.replication</name> <value>1</value> </property> <property> <name>dfs.namenode.name.dir</name> <value>/path/to/hadoop/data/namenode</value> </property> <property> <name>dfs.datanode.data.dir</name> <value>/path/to/hadoop/data/datanode</value> </property> </configuration>
|
格式化 HDFS 并启动 Hadoop:
hdfs namenode -format start-dfs.sh start-yarn.sh
|
Python环境
安装 Python 以及依赖库:
pip install numpy pandas scikit-learn
|
数据上传
下载 MovieLens 数据集(如ratings.csv)。
创建 HDFS 输入目录并将数据上传到 HDFS:
hdfs dfs -mkdir -p /input hdfs dfs -put ratings.csv /input
|
运行MapReduce任务
将 MapReduce 相关代码编译并打包为 JAR 文件:
javac -classpath `hadoop classpath` -d . MovieRatingMapper.java MovieRatingReducer.java MovieRatingDriver.java jar -cvf MovieRating.jar -C . .
|
在 Hadoop 集群上运行任务:
hadoop jar MovieRating.jar MovieRatingDriver /input/ratings.csv /output
|
运行Python推荐模块
将 HDFS 输出文件下载到本地:
hdfs dfs -get /output/part-r-00000 user_movie_rating.csv
|
使用 Python 加载数据:
import pandas as pd
data = pd.read_csv('user_movie_rating.csv', header=None, names=['user_id', 'movie_id', 'rating']) user_movie_matrix = data.pivot(index='user_id', columns='movie_id', values='rating').fillna(0)
|
基于用户的协同过滤:
from sklearn.metrics.pairwise import cosine_similarity
def user_based_collaborative_filtering(user_movie_matrix, user_id, top_n=5): user_similarity = cosine_similarity(user_movie_matrix) user_similarity = pd.DataFrame(user_similarity, index=user_movie_matrix.index, columns=user_movie_matrix.index) similar_users = user_similarity[user_id].sort_values(ascending=False).index[1:top_n+1] recommendations = user_movie_matrix.loc[similar_users].mean(axis=0).sort_values(ascending=False) return recommendations.head(top_n)
|
基于物品的协同过滤:
def item_based_collaborative_filtering(user_movie_matrix, movie_id, top_n=5): item_similarity = cosine_similarity(user_movie_matrix.T) item_similarity = pd.DataFrame(item_similarity, index=user_movie_matrix.columns, columns=user_movie_matrix.columns) similar_movies = item_similarity[movie_id].sort_values(ascending=False).index[1:top_n+1] return similar_movies
|
矩阵分解:
import numpy as np
def matrix_factorization(R, K=10, steps=5000, alpha=0.0002, beta=0.02): num_users, num_items = R.shape P = np.random.rand(num_users, K) Q = np.random.rand(num_items, K) for step in range(steps): for i in range(num_users): for j in range(num_items): if R[i][j] > 0: eij = R[i][j] - np.dot(P[i,:], Q[j,:].T) for k in range(K): P[i][k] += alpha * (2 * eij * Q[j][k] - beta * P[i][k]) Q[j][k] += alpha * (2 * eij * P[i][k] - beta * Q[j][k]) error = 0 for i in range(num_users): for j in range(num_items): if R[i][j] > 0: error += (R[i][j] - np.dot(P[i,:], Q[j,:].T)) ** 2 for k in range(K): error += (beta/2) * (P[i][k]**2 + Q[j][k]**2) if error < 0.001: break return P, Q
P, Q = matrix_factorization(user_movie_matrix.values) predicted_ratings = np.dot(P, Q.T)
|
算法对比:
from sklearn.metrics import mean_squared_error
def calculate_rmse(actual, predicted): return np.sqrt(mean_squared_error(actual, predicted))
actual_ratings = user_movie_matrix.values[user_movie_matrix.values > 0] predicted_ratings_mf = predicted_ratings[user_movie_matrix.values > 0] rmse_mf = calculate_rmse(actual_ratings, predicted_ratings_mf) print(f"Matrix Factorization RMSE: {rmse_mf}")
|