本文整理汇总了Python中pyspark.mllib.recommendation.ALS类的典型用法代码示例。如果您正苦于以下问题:Python ALS类的具体用法?Python ALS怎么用?Python ALS使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
在下文中一共展示了ALS类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: evaluate
def evaluate(sc, raw_user_movies, raw_hot_movies):
movies_name = build_movies(raw_hot_movies)
user_id_to_int = raw_user_movies.map(lambda line: line.split(',')[0]).distinct().zipWithUniqueId().collectAsMap()
ratings = build_ratings(raw_user_movies, user_id_to_int)
num_iterations = 10
for rank in [10, 50]:
for lam in [1.0, 0.01, 0.0001]:
model = ALS.train(ratings, rank, num_iterations, lam)
user_movies = ratings.map(lambda tokens: (tokens[0], tokens[1]))
predictions = model.predictAll(user_movies).map(lambda r: ((r[0], r[1]), r[2]))
print predictions.take(3)
rates_and_preds = ratings.map(lambda tokens: ((tokens[0], tokens[1]), tokens[2])).join(predictions)
print rates_and_preds.take(3)
mse = math.sqrt(rates_and_preds.map(lambda r: (r[1][0] - r[1][1])**2).mean())
print "(rank:%d, lambda: %f,) Mean Squared Error = %f" % (rank, lam, mse)
for rank in [10, 50]:
for lam in [1.0, 0.01, 0.0001]:
for alpha in [1.0, 40.0]:
model = ALS.trainImplicit(ratings, rank, num_iterations, lam, alpha=alpha)
user_movies = ratings.map(lambda tokens: (tokens[0], tokens[1]))
predictions = model.predictAll(user_movies).map(lambda r: ((r[0], r[1]), r[2]))
rates_and_preds = ratings.map(lambda tokens: ((tokens[0], tokens[1]), tokens[2])).join(predictions)
print rates_and_preds.take(3)
mse = math.sqrt(rates_and_preds.map(lambda r: (r[1][0] - r[1][1])**2).mean())
print "(rank:%d, lambda: %f, alpha: %f, implicit ) Mean Squared Error = %f" % (rank, lam, alpha, mse)
开发者ID:dengshilong,项目名称:douban_recommender,代码行数:25,代码来源:recommender.py
示例2: _recommend
def _recommend(self, train_ratings, users):
from pyspark.mllib.recommendation import ALS, Rating
# Preparing the user/item mapping as integers, since Spark's ALS implementation only works with integer values
train_ratings['user'] = train_ratings['user'].astype('category')
train_ratings['item'] = train_ratings['item'].astype('category')
user_cat, item_cat = train_ratings['user'].cat, train_ratings['item'].cat
self.user_cat = user_cat
self.item_cat = item_cat
self.train_ratings = train_ratings
# Training the model
self.ratings = self.sc.parallelize(Rating(u, i, rating) for u, i, rating in zip(user_cat.codes, item_cat.codes, train_ratings.rating))
if self.implicit:
model = ALS.trainImplicit(self.ratings, **self.spark_args)
else:
model = ALS.train(self.ratings, **self.spark_args)
# Getting predictions from the model
self.ratings_to_predict = self.sc.parallelize((user, item) for user in users for item in item_cat.codes.unique())
self.predictions = model.predictAll(self.ratings_to_predict).collect()
# Presenting the recommendations as a DataFrame
self.predictions = [(user_cat.categories[p.user], item_cat.categories[p.product], p.rating) for p in self.predictions]
self.predictions_df = pd.DataFrame(self.predictions, columns=['user', 'item', 'rating'])
return self.predictions_df
开发者ID:halflings,项目名称:receval,代码行数:25,代码来源:recommender.py
示例3: train
def train(self, rank=3, iterations=20, lambda_=0.01, alpha=None, blocks=-1):
"""
train a mf model against the given parameters
"""
if alpha:
model = ALS.trainImplicit(self.train_data, rank, iterations, lambda_, blocks, alpha)
else:
model = ALS.train(self.train_data, rank, iterations, lambda_)
return model
开发者ID:farcryzry,项目名称:spala,代码行数:10,代码来源:MatrixFactorization.py
示例4: find_best_model
def find_best_model(data):
global bestRank
global bestLambda
global bestNumIter
bestRank = 0
bestLambda = -1.0
bestNumIter = -1
ranks = [8, 12]
lambdas = [0.1, 10.0]
numIters = [10, 20]
min_error = float('inf')
training, validation, test = data.randomSplit([0.6, 0.2, 0.2], 6)
for rank, lmbda, numIter in itertools.product(ranks, lambdas, numIters):
ALS.checkpointInterval = 2
training_data = training.map(lambda xs: [int(x) for x in xs])
model = ALS.train(training_data, rank, numIter, lmbda)
validation_data = validation.map(lambda p: (int(p[0]), int(p[1])))
predictions = model.predictAll(validation_data).map(lambda r: ((r[0], r[1]), r[2]))
ratings_and_predictions = validation.map(lambda r: ((int(r[0]), int(r[1])), float(r[2]))).join(predictions)
error = sqrt(ratings_and_predictions.map(lambda r: (r[1][0] - r[1][1])**2).mean())
print 'For rank %s the RMSE is %s' % (rank, error)
if error < min_error:
min_error = error
bestRank = rank
bestLambda = lmbda
bestNumIter = numIter
print 'The best model was trained with rank %s' % bestRank
开发者ID:MiguelPeralvo,项目名称:mongodb-spark,代码行数:27,代码来源:movie-recommendations.py
示例5: main
def main():
""" Train and evaluate an ALS recommender.
"""
# Set up environment
sc = SparkContext("local[*]", "RecSys")
# Load and parse the data
data = sc.textFile("./data/ratings.dat")
ratings = data.map(parse_rating)
# Build the recommendation model using Alternating Least Squares
rank = 10
iterations = 20
model = ALS.train(ratings, rank, iterations)
movies = sc.textFile("./data/movies.dat")\
.map(parse_movie)
# Evaluate the model on training data
testdata = ratings.map(lambda p: (p[0], p[1]))
predictions = model.predictAll(testdata)\
.map(lambda r: ((r[0], r[1]), r[2]))
rates_and_preds = ratings.map(lambda r: ((r[0], r[1]), r[2]))\
.join(predictions)
MSE = rates_and_preds.map(lambda r: (r[1][0] - r[1][1])**2).mean()
print("Mean Squared Error = " + str(MSE))
开发者ID:randidwiputra,项目名称:python-spark-recsys,代码行数:25,代码来源:recsys.py
示例6: main
def main(argv):
Conf = (SparkConf().setAppName("recommendation"))
sc = SparkContext(conf=Conf)
sqlContext = SQLContext(sc)
dirPath = "hdfs://ec2-52-71-113-80.compute-1.amazonaws.com:9000/reddit/recommend/data/sr_userCount.parquet"
rawDF = sqlContext.read.parquet(dirPath).persist(StorageLevel.MEMORY_AND_DISK_SER)
# argv[1] is the dump of training data in hdfs
# argv[2] is the user perferences
# User Hash Lookup stored into cassandra
user_hash = rawDF.map(lambda (a,b,c): (a,hashFunction(a)))
distinctUser = user_hash.distinct()
userHashDF = sqlContext.createDataFrame(distinctUser,["user","hash"])
userHashDF.write.format("org.apache.spark.sql.cassandra").options(table ="userhash", keyspace = keyspace).save(mode="append")
# Product Hash Lookup stored into cassandra
product_hash = rawDF.map(lambda (a,b,c): (b, hashFunction(b)))
distinctProduct = product_hash.distinct()
productHashDF = sqlContext.createDataFrame(distinctProduct,["product","hash"])
productHashDF.write.format("org.apache.spark.sql.cassandra").options(table ="producthash", keyspace = keyspace).save(mode="append")
# Ratings for training
# ALS requires a java hash of string. This function does that and stores it as Rating Object
# for the algorithm to consume
ratings = rawDF.map(lambda (a,b,c) : Rating(hashFunction(a),hashFunction(b),float(c)))
model = ALS.trainImplicit(ratings,10,10,alpha=0.01,seed=5)
model.save(sc, "hdfs://ec2-52-71-113-80.compute-1.amazonaws.com:9000/reddit/recommend/model")
sc.stop()
开发者ID:Swebask,项目名称:RedditR--Insight-Data-Engineering-Project,代码行数:34,代码来源:engine.py
示例7: grid_search
def grid_search(train_df, test_df, X_test_df, y_test):
ranks = [6] # , 8, 12, 18]
lambdas = list(np.arange(0.1, 0.5, 0.1))
numIters = [20]
bestModel = None
bestValidationRmse = float("inf")
bestRank = 0
bestLambda = -1.0
bestNumIter = -1
for rank, lmbda, numIter in itertools.product(ranks, lambdas, numIters):
model = ALS.train(train_df, rank, numIter, lmbda)
validationRmse = computeRMSE(model, test_df, X_test_df, len(y_test))
print "RMSE (validation) = %f for the model trained with " % validationRmse + "rank = %d, lambda = %.1f, and numIter = %d." % (
rank,
lmbda,
numIter,
)
if validationRmse < bestValidationRmse:
bestModel = model
bestValidationRmse = validationRmse
bestRank = rank
bestLambda = lmbda
bestNumIter = numIter
testRmse = computeRMSE(bestModel, test_df, X_test_df, len(y_test))
# evaluate the best model on the test set
print "The best model was trained with rank = %d and lambda = %.1f, " % (
bestRank,
bestLambda,
) + "and numIter = %d, and its RMSE on the test set is %f." % (bestNumIter, testRmse)
return bestModel
开发者ID:azstein,项目名称:new-food,代码行数:35,代码来源:recommender.py
示例8: train
def train(self, rank, iterations=10, lambda_=0.01, seed=0, **kwargs):
"""
Train the model.
Parameters
----------
rank : int
The number of factors in the underlying model. Generally, larger numbers of factors
lead to better models, but increase the memory required. A rank in the range of 10 to 200
is usually reasonable.
iterations : int, optional
The number of iterations to perform. With each iteration, the model improves. ALS
typically converges quickly, so a value of 10 is recommended.
lambda : float, optional
This parameter controls regularization, which controls overfitting. The higher the value of
lambda applies more regularization. The appropriate value here depends on the problem, and needs
to be tuned by train/test techniques, which measure overfitting.
Returns
-------
out: : model
A RecommenderModel. This can be used to make predidictions on how a user would rate an item.
"""
ratings = self._prepare_ratings()
model = ALS.train(ratings.to_rdd(),
rank,
iterations=iterations,
lambda_=lambda_,
seed=seed,
**kwargs)
return MatrixFactorizationModel(model, self.ratings, self.user_col, self.item_col, self.rating_col)
开发者ID:Atigeo,项目名称:xpatterns-xframe,代码行数:34,代码来源:recommend.py
示例9: build_ALS_model
def build_ALS_model(ratings):
# Build the recommendation model using Alternating Least Squares
rank = 10
numIterations = 20
model = ALS.train(ratings, rank, numIterations)
return model
开发者ID:JJconde,项目名称:video-game-recommendation,代码行数:7,代码来源:build_recommendation.py
示例10: alq_spark
def alq_spark(A, k, sc, **kwargs):
"""
Args:
- A: sign matrix (csr_matrix)
- k: number of clusters
- sc: the spark context
- kwargs: parameters for ALS.train except for ratings
https://spark.apache.org/docs/1.5.1/api/python/pyspark.mllib.html#pyspark.mllib.recommendation.ALS.train
Return:
X: np.ndarray (n x k)
Y: np.ndarray (k x n)
"""
edges = indexed_entries(A)
edges_rdd = sc.parallelize(edges)
model = ALS.train(edges_rdd, rank=k, **kwargs)
u_ft = model.userFeatures()
p_ft = model.productFeatures()
X = u_ft.sortByKey(ascending=True).collect()
Y = p_ft.sortByKey(ascending=True).collect()
X = np.array(list(zip(*X))[1])
Y = np.transpose(np.array(list(zip(*Y))[1]))
return X, Y
开发者ID:xiaohan2012,项目名称:snpp,代码行数:30,代码来源:lowrank.py
示例11: als
def als(data):
train, test = data.randomSplit(weights=[0.8, 0.2])
X_train = train.map(lambda r : Rating(r[0], r[1], r[2]))
y = test.map(lambda r : ((r[0], r[1]), r[2]))
X_test = test.map(lambda r : (r[0], r[1]))
rank = 7
X_train.cache()
X_test.cache()
lambdas = [0.1,0.2,0.3,0.4,0.5,0.6,0.7,0.8,0.9,1.0]
numIterations = 10
nonnegative=True
bestModel = None
error = float('Inf')
errors = []
#Use ALS to predict play time for test users and choose the best parameter for lambda
for lmbda in lambdas:
model = ALS.train(X_train, rank, numIterations, lmbda, nonnegative=nonnegative)
y_hat = model.predictAll(X_test).map(lambda r : ((r[0], r[1]), r[2]))
ratesAndPreds = y.join(y_hat)
MSE = ratesAndPreds.map(lambda r : ((r[1][0]) - (r[1][1]))**2).mean()
errors.append(MSE)
if MSE < error:
bestModel = model
error = MSE
#Plot mean square error v.s. lambda
plt.plot(lambdas, errors, 'ro')
plt.xlabel(r'$\lambda$')
plt.ylabel(r'$MSE$')
plt.title(r'MSE v.s. $\lambda$')
plt.savefig('cross_validation_p.png')
#Make Prediction by using the best model
y_hat = model.predictAll(X_test).map(lambda r : (r[0], r[1], r[2]))
y_hat.map(toCVSLine).saveAsTextFile('prediction')
return bestModel, error
开发者ID:DataLAUSDEclassProject,项目名称:spark,代码行数:34,代码来源:spark_als.py
示例12: train
def train(self):
"Train the model with new data and write to file"
user_lookup, course_lookup = self.__prepare_data()
# send list of (user_id, course_id, rating) triples to the ML algorithm
log.info('Loading ratings data')
ratings_RDD_raw = self.sc.parallelize(m.UserCourse.objects)
self.ratings_RDD = (ratings_RDD_raw
.filter(lambda ratings:
ratings.course_review.interest is not None)
.map(lambda ratings:
(user_lookup[str(ratings.user_id)],
course_lookup[ratings.course_id],
float(ratings.course_review.interest)))
).cache()
training_error, test_error = self._report_error(self.ratings_RDD)
log.info('Training model')
model = ALS.train(self.ratings_RDD,
_PARAMS['rank'],
_PARAMS['num_iterations'],
_PARAMS['reg_param'])
log.info('Model trained!')
model_path = os.path.join(os.path.dirname(__file__),
'%s/trained_model' % c.RECOMMENDATION_DIR)
if os.path.isdir(model_path):
rmtree(model_path)
model.save(self.sc, model_path)
self._report_metrics(num_courses=self.ratings_RDD.count(),
training_error=training_error,
test_error=test_error)
开发者ID:JGulbronson,项目名称:rmc,代码行数:32,代码来源:engine.py
示例13: fit_final_model
def fit_final_model(train):
#model params
iterations = 20
reg = 0.0875
rank = 6
model = ALS.train(train.rdd.map(lambda x: (x[0], x[1], x[2])), rank=rank, nonnegative=True, iterations=iterations, lambda_=reg)
return model
开发者ID:JohnRenshaw,项目名称:BeerSleuth,代码行数:7,代码来源:beer_spark.py
示例14: main
def main(sc):
seed = 5L
iterations = 10
regularization_parameter = 0.1
rank = 4
data = sc.textFile("file:///Expedia/data/train1.csv")
ratings = data.map(lambda l: l.split(',')).map(lambda l: Rating(int(l[0]), int(l[1]), float(l[2]))).cache()
new_data = sc.textFile("file:///Expedia/data/new_set.csv")
new_ratings = new_data.map(lambda l: l.split(',')).map(lambda l: Rating(int(l[0]), int(l[1]), float(l[2]))).cache()
new_ratings_for_predict_RDD = new_ratings.map(lambda x: (x[0], x[1])).cache()
complete_data = ratings.union(new_ratings).cache()
new_ratings_model = ALS.trainImplicit(complete_data, rank, seed=seed,
iterations=iterations, lambda_=regularization_parameter)
# that not work need more invistigation
#predictions = new_ratings_model.predictAll(0,'83').collect()
predictions = new_ratings_model.predictAll(new_ratings_for_predict_RDD).map(lambda r: ((r[0], r[1]), r[2])).collect()
recommendations = sorted(predictions, key=lambda x: x[2], reverse=True)[:2]
recommendations.take(5)
开发者ID:aaabed,项目名称:Capstone_Hotel_recommendation_system,代码行数:29,代码来源:cluster-recommend.py
示例15: model_param_sweep
def model_param_sweep(train, test):
#model params
iterations = 20
regularization_param_list = np.linspace(0.05, 0.2, 5)
#params used in keeping track of error between different ranks
rank_list = [4, 6, 8]
errors = np.zeros(len(regularization_param_list)*len(rank_list))
err = 0
min_error = float('inf')
max_class_rate = 0
best_rank = -1
best_iteration = -1
for rank in rank_list:
for reg in regularization_param_list:
model = ALS.train(train.rdd.map(lambda x: (x[0], x[1], x[2])), rank=rank, nonnegative=True, iterations=iterations, lambda_=reg)
predictions = model.predictAll(test.rdd.map(lambda r: (r[0], r[1]) )).map(lambda x: ((int(x[0]), int(x[1])), float(x[2])) )
rates_and_preds = test.rdd.map(lambda r: ((int(r[0]), int(r[1])), float(r[2]))).join(predictions)
correct_count = rates_and_preds.filter(lambda r:( abs(r[1][0] - r[1][1]) < 1) or (r[1][0] < 6 and r[1][1] < 6) ).count()
total_count = rates_and_preds.count()
class_rate = correct_count*1./total_count
error = math.sqrt(rates_and_preds.map(lambda r: (r[1][0] - r[1][1])**2).mean())
errors[err] = error
err += 1
print 'For rank=%s, regParam=%s the RMSE is %s with a correct classification rate of %0.3f' % (rank, reg, error, class_rate)
if class_rate > max_class_rate:
max_class_rate = class_rate
best_rank = (rank, reg)
print 'The best model was trained with (rank, regParam): %s and had class rate %0.3f' %(str(best_rank), max_class_rate)
开发者ID:JohnRenshaw,项目名称:BeerSleuth,代码行数:30,代码来源:beer_spark.py
示例16: __train_model
def __train_model(self, ratings_RDD):
"""Train the ALS model with the current dataset
"""
model = ALS.train(ratings_RDD, self.rank, seed=self.seed,
iterations=self.iterations, lambda_=self.regularization_parameter)
return model
开发者ID:KevinDocel,项目名称:bigdata_pingxin,代码行数:7,代码来源:engine.py
示例17: __train_model
def __train_model(self):
"""Train the ALS model with the current dataset
"""
logger.info("Training the ALS model...")
self.model = ALS.train(self.ratings_RDD, self.rank, seed=self.seed,
iterations=self.iterations, lambda_=self.regularization_parameter)
logger.info("ALS model built!")
开发者ID:TZstatsADS,项目名称:project4-team-7,代码行数:7,代码来源:engine.py
示例18: main
def main():
training_data = sc.textFile(training_inputs)
testing_data = sc.textFile(testing_inputs)
training_ratings = training_data.map(get_tuple).cache()
testing_ratings = testing_data.map(get_tuple).cache()
testing_all = testing_ratings.map(lambda (uid, mid, rating): (uid, mid)).cache()
ratings = testing_ratings.map(to_Rating)
ranks = [2, 4, 8, 16, 32, 64, 128, 256]
reg_params = [0.1, 0.01]
for i in range(len(reg_params)):
RMSES = []
for rank in ranks:
model = ALS.train(training_ratings, rank=rank, lambda_=reg_params[i], seed=10)
predictions = model.predictAll(testing_all).map(lambda r: ((r[0], r[1]), r[2]))
ratesAndPreds = ratings.map(lambda r: ((r[0], r[1]), r[2])).join(predictions)
MSE = ratesAndPreds.map(lambda r: (r[1][0] - r[1][1])**2).mean()
RMSE = math.sqrt(MSE)
RMSES.append(RMSE)
plt.plot(range(len(ranks)), RMSES, label=str(reg_params[i]))
plt.xticks(range(len(ranks)), ranks, size='small')
plt.legend()
plt.show()
开发者ID:Veterun,项目名称:SparkPythonHanhan,代码行数:27,代码来源:als.py
示例19: train_als
def train_als(data):
# map ratings into Ratings object comprised of [user, movie, rating]
data = data.map(lambda l: Rating(int(l[0]), int(l[1]), float(l[2])))
rank = 10
numIterations = 10
model = ALS.train(data, rank, numIterations)
return model, data
开发者ID:mzw4,项目名称:datascience,代码行数:8,代码来源:hw22.py
示例20: train_and_predict
def train_and_predict():
training = sc.textFile('member_item_file').map(parseRating).cache()
#now train the model using ALS
rank=10
number_of_iterations = 10
model = ALS.train(training, rank, number_of_iterations)
print model
开发者ID:nischalhp,项目名称:Experimentz-RM,代码行数:8,代码来源:collaborative_filtering.py
注:本文中的pyspark.mllib.recommendation.ALS类示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论