项目概述

本项目旨在构建一个基于 Hadoop 和 MapReduce 的电影推荐系统,利用大规模数据处理技术和多种推荐算法,为用户提供个性化的电影推荐。项目分为两部分:

  • MapReduce部分:负责数据预处理和分布式计算
  • Python部分:负责推荐算法的实现与性能评估

项目目标

  • 使用 Hadoop 和 MapReduce 处理大规模电影评分数据
  • 实现多种推荐算法(基于用户的协同过滤、基于物品的协同过滤、矩阵分解)
  • 对比不同算法的性能,选择最优算法
  • 提供用户友好的推荐结果

技术栈

  • 数据处理:Hadoop、MapReduce(Java)
  • 推荐算法:Python(NumPy、pandas、scikit-learn)
  • 评估指标:RMSE(均方根误差)、MAE(平均绝对误差)

项目流程

  • 数据预处理:使用 MapReduce 清洗和格式化原始数据
  • 推荐算法实现:使用 Python 实现多种推荐算法
  • 算法对比:评估不同算法的性能,选择最优算法
  • 结果生成:为用户生成个性化推荐结果

系统架构与模块设计

系统架构

系统架构分为三层:

  • 数据层:存储原始数据(MovieLens数据集)以及 MapReduce 处理后的中间数据(用户-电影-平均评分)
  • 计算层:利用 MapReduce 进行分布式数据处理,利用 Python 实现推荐算法并进行性能评估
  • 应用层:生成推荐结果并展示给用户

模块设计

MapReduce模块

该模块专注于数据预处理和分布式计算,解决大数据环境下的性能瓶颈。
输入是原始电影评分数据,输出是用户-电影-平均评分数据。该模块主要包含的功能有:

  • 数据清洗:去除无效数据
  • 数据转换:生成键值对格式
  • 分布式计算:计算用户对电影的平均评分

Python模块

该模块专注于算法实现和性能优化,解决推荐系统的核心逻辑。
输入是 MapReduce 输出的用户-电影-平均评分数据,输出是推荐结果;该模块主要包含的功能有:

  • 数据加载:将数据加载为适合算法处理的矩阵格式
  • 推荐算法实现:基于用户的协同过滤、基于物品的协同过滤、矩阵分解
  • 算法对比:计算 RMSE,选择最优算法

详细功能代码

MapReduce数据清洗与处理

Mapper

在 Mapper 中,添加数据清洗逻辑,包括检查数据是否完整,检查是否有空值或无效值。在 Mapper 中,每条记录的键是 <用户ID , 电影ID>,值是 <评分>。Hadoop 的 MapReduce 框架会自动合并相同键的值,因此在 Reducer 中,相同用户对同一部电影的多个评分会被合并。Reducer 计算这些评分的平均值,确保输出结果中没有重复值。

import java.io.IOException;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class MovieRatingMapper extends Mapper<LongWritable, Text, Text, FloatWritable> {
private Text userMovie = new Text();
private FloatWritable rating = new FloatWritable();

public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
String[] parts = line.split(",");

// 数据清洗:检查数据是否完整
if (parts.length >= 3) { // 确保有用户ID、电影ID、评分
String userId = parts[0].trim();
String movieId = parts[1].trim();
String ratingStr = parts[2].trim();

// 检查用户ID、电影ID、评分是否为空
if (!userId.isEmpty() && !movieId.isEmpty() && !ratingStr.isEmpty()) {
try {
float ratingValue = Float.parseFloat(ratingStr);
// 生成键值对:<用户ID,电影ID> -> <评分>
userMovie.set(userId + "," + movieId);
rating.set(ratingValue);
context.write(userMovie, rating);
} catch (NumberFormatException e) {
// 如果评分不是数字,跳过该记录
System.err.println("Invalid rating value: " + ratingStr);
}
}
}
}
}

Reducer

在 Reducer 中,计算每个用户对每部电影的平均评分,并确保输出结果没有重复值。

import java.io.IOException;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class MovieRatingReducer extends Reducer<Text, FloatWritable, Text, FloatWritable> {
private FloatWritable result = new FloatWritable();

public void reduce(Text key, Iterable<FloatWritable> values, Context context) throws IOException, InterruptedException {
float sum = 0;
int count = 0;

// 计算每个用户对每部电影的平均评分
for (FloatWritable val : values) {
sum += val.get();
count++;
}

if (count > 0) {
float averageRating = sum / count;
result.set(averageRating);
context.write(key, result); // 输出:<用户ID,电影ID> -> <平均评分>
}
}
}

Driver

用于配置和启动 MapReducer 任务。

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class MovieRatingDriver {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "Movie Rating Average");
job.setJarByClass(MovieRatingDriver.class);
job.setMapperClass(MovieRatingMapper.class);
job.setReducerClass(MovieRatingReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FloatWritable.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}

Python推荐算法实现

数据加载

import numpy as np
import pandas as pd
# Load data from MapReduce output
data = pd.read_csv('hadoop_output.csv', header=None, names=['user_id', 'movie_id', 'rating'])
user_movie_matrix = data.pivot(index='user_id', columns='movie_id', values='rating').fillna(0)

基于用户的协同过滤

from sklearn.metrics.pairwise import cosine_similarity

def user_based_collaborative_filtering(user_movie_matrix, user_id, top_n=5):
# Calculate user similarity
user_similarity = cosine_similarity(user_movie_matrix)
user_similarity = pd.DataFrame(user_similarity, index=user_movie_matrix.index, columns=user_movie_matrix.index)

# Get similar users
similar_users = user_similarity[user_id].sort_values(ascending=False).index[1:top_n+1]

# Recommend movies
recommendations = user_movie_matrix.loc[similar_users].mean(axis=0).sort_values(ascending=False)
return recommendations.head(top_n)

基于物品的协同过滤

def item_based_collaborative_filtering(user_movie_matrix, movie_id, top_n=5):
# Calculate item similarity
item_similarity = cosine_similarity(user_movie_matrix.T)
item_similarity = pd.DataFrame(item_similarity, index=user_movie_matrix.columns, columns=user_movie_matrix.columns)

# Get similar movies
similar_movies = item_similarity[movie_id].sort_values(ascending=False).index[1:top_n+1]
return similar_movies

矩阵分解

def matrix_factorization(R, K=10, steps=5000, alpha=0.0002, beta=0.02):
num_users, num_items = R.shape
P = np.random.rand(num_users, K)
Q = np.random.rand(num_items, K)

for step in range(steps):
for i in range(num_users):
for j in range(num_items):
if R[i][j] > 0:
eij = R[i][j] - np.dot(P[i,:], Q[j,:].T)
for k in range(K):
P[i][k] += alpha * (2 * eij * Q[j][k] - beta * P[i][k])
Q[j][k] += alpha * (2 * eij * P[i][k] - beta * Q[j][k])

# Calculate total error
error = 0
for i in range(num_users):
for j in range(num_items):
if R[i][j] > 0:
error += (R[i][j] - np.dot(P[i,:], Q[j,:].T)) ** 2
for k in range(K):
error += (beta/2) * (P[i][k]**2 + Q[j][k]**2)
if error < 0.001:
break
return P, Q

# Predict ratings
P, Q = matrix_factorization(user_movie_matrix.values)
predicted_ratings = np.dot(P, Q.T)

算法对比

from sklearn.metrics import mean_squared_error

# Calculate RMSE
def calculate_rmse(actual, predicted):
return np.sqrt(mean_squared_error(actual, predicted))

# Compare algorithms
actual_ratings = user_movie_matrix.values[user_movie_matrix.values > 0]
predicted_ratings_mf = predicted_ratings[user_movie_matrix.values > 0]

rmse_mf = calculate_rmse(actual_ratings, predicted_ratings_mf)
print(f"Matrix Factorization RMSE: {rmse_mf}")

项目运行

环境准备

Hadoop集群

下载 Hadoop 安装包,解压并配置环境变量:

export HADOOP_HOME=/path/to/hadoop
export PATH=$PATH:$HADOOP_HOME/bin:$HADOOP_HOME/sbin

编辑 $HADOOP_HOME/etc/hadoop/core-site.xml:

<configuration>
<property>
<name>fs.defaultFS</name>
<value>hdfs://localhost:9000</value>
</property>
</configuration>

编辑 $HADOOP_HOME/etc/hadoop/hdfs-site.xml:

<configuration>
<property>
<name>dfs.replication</name>
<value>1</value>
</property>
<property>
<name>dfs.namenode.name.dir</name>
<value>/path/to/hadoop/data/namenode</value>
</property>
<property>
<name>dfs.datanode.data.dir</name>
<value>/path/to/hadoop/data/datanode</value>
</property>
</configuration>

格式化 HDFS 并启动 Hadoop:

hdfs namenode -format
start-dfs.sh
start-yarn.sh

Python环境

安装 Python 以及依赖库:

pip install numpy pandas scikit-learn

数据上传

下载 MovieLens 数据集(如ratings.csv)。
创建 HDFS 输入目录并将数据上传到 HDFS:

hdfs dfs -mkdir -p /input
hdfs dfs -put ratings.csv /input

运行MapReduce任务

将 MapReduce 相关代码编译并打包为 JAR 文件:

javac -classpath `hadoop classpath` -d . MovieRatingMapper.java MovieRatingReducer.java MovieRatingDriver.java
jar -cvf MovieRating.jar -C . .

在 Hadoop 集群上运行任务:

hadoop jar MovieRating.jar MovieRatingDriver /input/ratings.csv /output

运行Python推荐模块

将 HDFS 输出文件下载到本地:

hdfs dfs -get /output/part-r-00000 user_movie_rating.csv

使用 Python 加载数据:

import pandas as pd

data = pd.read_csv('user_movie_rating.csv', header=None, names=['user_id', 'movie_id', 'rating'])
user_movie_matrix = data.pivot(index='user_id', columns='movie_id', values='rating').fillna(0)

基于用户的协同过滤:

from sklearn.metrics.pairwise import cosine_similarity

def user_based_collaborative_filtering(user_movie_matrix, user_id, top_n=5):
user_similarity = cosine_similarity(user_movie_matrix)
user_similarity = pd.DataFrame(user_similarity, index=user_movie_matrix.index, columns=user_movie_matrix.index)
similar_users = user_similarity[user_id].sort_values(ascending=False).index[1:top_n+1]
recommendations = user_movie_matrix.loc[similar_users].mean(axis=0).sort_values(ascending=False)
return recommendations.head(top_n)

基于物品的协同过滤:

def item_based_collaborative_filtering(user_movie_matrix, movie_id, top_n=5):
item_similarity = cosine_similarity(user_movie_matrix.T)
item_similarity = pd.DataFrame(item_similarity, index=user_movie_matrix.columns, columns=user_movie_matrix.columns)
similar_movies = item_similarity[movie_id].sort_values(ascending=False).index[1:top_n+1]
return similar_movies

矩阵分解:

import numpy as np

def matrix_factorization(R, K=10, steps=5000, alpha=0.0002, beta=0.02):
num_users, num_items = R.shape
P = np.random.rand(num_users, K)
Q = np.random.rand(num_items, K)
for step in range(steps):
for i in range(num_users):
for j in range(num_items):
if R[i][j] > 0:
eij = R[i][j] - np.dot(P[i,:], Q[j,:].T)
for k in range(K):
P[i][k] += alpha * (2 * eij * Q[j][k] - beta * P[i][k])
Q[j][k] += alpha * (2 * eij * P[i][k] - beta * Q[j][k])
error = 0
for i in range(num_users):
for j in range(num_items):
if R[i][j] > 0:
error += (R[i][j] - np.dot(P[i,:], Q[j,:].T)) ** 2
for k in range(K):
error += (beta/2) * (P[i][k]**2 + Q[j][k]**2)
if error < 0.001:
break
return P, Q

P, Q = matrix_factorization(user_movie_matrix.values)
predicted_ratings = np.dot(P, Q.T)

算法对比:

from sklearn.metrics import mean_squared_error

def calculate_rmse(actual, predicted):
return np.sqrt(mean_squared_error(actual, predicted))

actual_ratings = user_movie_matrix.values[user_movie_matrix.values > 0]
predicted_ratings_mf = predicted_ratings[user_movie_matrix.values > 0]
rmse_mf = calculate_rmse(actual_ratings, predicted_ratings_mf)
print(f"Matrix Factorization RMSE: {rmse_mf}")