Pom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.atguigu</groupId>
<artifactId>Spark</artifactId>
<packaging>pom</packaging>
<version>1.0-SNAPSHOT</version>
<name>Archetype - Spark</name>
<url>http://maven.apache.org</url>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.12</artifactId>
<version>3.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.12</artifactId>
<version>3.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.12</artifactId>
<version>3.0.0</version>
</dependency>
<!-- <dependency>-->
<!-- <groupId>org.apache.hadoop</groupId>-->
<!-- <artifactId>hadoop-client</artifactId>-->
<!-- <version>3.1.3</version>-->
<!-- </dependency>-->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-mllib_2.12</artifactId>
<version>3.0.0</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
</dependency>
</dependencies>
<build>
<sourceDirectory>src/main/scala</sourceDirectory>
<testSourceDirectory>src/test/scala</testSourceDirectory>
<plugins>
<!-- 该插件用于将 Scala 代码编译成 class 文件 -->
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.2.2</version>
<executions>
<execution>
<!-- 声明绑定到 maven 的 compile 阶段 -->
<goals>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.1.0</version>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
WordCount练习
package wordCount
import org.apache.spark.{SparkConf, SparkContext}
object __51CTO__wordCount01 {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
.setMaster("local")
.setAppName("testRdd")
val sc = new SparkContext(conf)
val data=sc.textFile("DataSets/hello.txt") //读取本地文件
println(data)
println("=============================================================")
data.flatMap(_.split(" ")) //下划线是占位符,flatMap是对行操作的方法,对读入的数据进行分割
.map((_, 1)) //将每一项转换为key-value,数据是key,value是1
.reduceByKey(_ + _) //将具有相同key的项相加合并成一个
.collect() //将分布式的RDD返回一个单机的scala array,在这个数组上运用scala的函数操作,并返回结果到驱动程序
.foreach(println) //循环打印
}
}
/**
* Created by Administrator on 2017/4/20.
* xudong
*/
/**
*
*/
// object WordCountLocal
// def main(args: Array[String]) {
// /**
// * SparkContext 的初始化需要一个SparkConf对象
// * SparkConf包含了Spark集群的配置的各种参数
// */
// val conf=new SparkConf()
// .setMaster("local")//启动本地化计算
// .setAppName("testRdd")//设置本程序名称
//
// //Spark程序的编写都是从SparkContext开始的
// val sc=new SparkContext(conf)
// //以上的语句等价与val sc=new SparkContext("local","testRdd")
// val data=sc.textFile("e://hello.txt")//读取本地文件
// data.flatMap(_.split(" "))//下划线是占位符,flatMap是对行操作的方法,对读入的数据进行分割
// .map((_,1))//将每一项转换为key-value,数据是key,value是1
// .reduceByKey(_+_)//将具有相同key的项相加合并成一个
// .collect()//将分布式的RDD返回一个单机的scala array,在这个数组上运用scala的函数操作,并返回结果到驱动程序
// .foreach(println)//循环打印
ALSRec
package RecAlgri
import org.apache.spark.ml.evaluation.RegressionEvaluator
import org.apache.spark.ml.recommendation.ALS
import org.apache.spark.ml.recommendation.ALS.Rating
import org.apache.spark.sql.SparkSession
object ALSRecommend {
def main(args: Array[String]): Unit = {
val spark:SparkSession = SparkSession
.builder
.appName(this.getClass.getName)
.master("local[5]")
.getOrCreate()
//设置日志等级
spark.sparkContext.setLogLevel("WARN") //ALL, DEBUG, ERROR, FATAL, INFO, OFF, TRACE, WARN
import spark.implicits._
val movieRatings = spark.read.textFile("F:/wzideng/ML机器学习/推荐系统/ml-1m/ratings.csv")
.map( line => {
val field = line.split(",")
Rating(field(0).toInt,field(1).toInt,field(2).toFloat) //这里调了一个ml.recommendation.ALS里的Rating方法
}).toDF("user","item","rate")
movieRatings.show(20) //Integer.MAX_VALUE
val movieData = spark.read.textFile("F:/wzideng/ML机器学习/推荐系统/ml-1m/movies.dat")
.map(line => {
val field = line.split("::")
(field(0).toInt, field(1).toString, field(2).toString)
}).toDF("item", "movie_name", "genres")
movieData.show(20) //Integer.MAX_VALUE
println("*****************************************************************************************************************************************************")
movieRatings.join(movieData,"item").where("user = 417").select("user","item","movie_name","genres")
.show(20,false) //Integer.MAX_VALUE,不用truncate截断
/**
* 2.从源数据集中,拆分出一部分作为训练集
*/
val Array(traing,test) = movieRatings.randomSplit(Array(0.8,0.2))
/**
* 3.用训练集来训练一个模型
*/
val als = new ALS()
.setMaxIter(15)
.setUserCol("user")
.setItemCol("item")
.setRatingCol("rate")
.setRegParam(0.15)
.setRank(5)
val model = als.fit(traing)
/**
* 在打分数据里面,有这样的情况,这就是数据集切割之后,用户对电影的评分有不完整的情况,就是测试集里面的数据可能会出现训练集当中
* 没有出现过的电影或者用户,这个叫做冷启动
* ALS有两种策略,一种是标识为NAN,还有一种是丢弃掉。我们这里选择丢掉
*/
model.setColdStartStrategy("drop")
/**
* 4.通过模型预测我们的测试集
*/
val prediction = model.transform(test)
/**
* 进行模型评估
*/
val evaluator = new RegressionEvaluator()
.setMetricName("rmse")
.setLabelCol("rate")
.setPredictionCol("prediction")
val rmse = evaluator.evaluate(prediction)
println("评估值: "+rmse)
/**
* 6.完成模型物品的推荐
*/
println("6.完成模型物品的推荐******************************************************************************************************************************************")
val forUserRecommendItem = model.recommendForAllUsers(10)
println("为每个商品推荐100个用户******************************************************************************************************************************************")
forUserRecommendItem.show(100,false)
//为指定的用户推荐商品
val users = movieRatings.select(als.getUserCol).distinct().limit(5)
val userSubset = model.recommendForUserSubset(users,100)
println("指定用户推荐100个商品******************************************************************************************************************************************")
userSubset.show(100,false)
// 为指定的商品推荐用户
val items = movieRatings.select(als.getItemCol).distinct().limit(5)
val itemSubset = model.recommendForItemSubset(items, 100)
println("指定商品推荐100个商品******************************************************************************************************************************************")
itemSubset.show(100,false)
}
}
文章来源地址https://www.toymoban.com/news/detail-614158.html
文章来源:https://www.toymoban.com/news/detail-614158.html
到了这里,关于基于模型的协同过滤推荐--IDEA+Scala实现的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!