热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

Spark综合练习题

请注意,本文编写于169天前,最后修改于169天前,其中某些信息可能已经过时。写在前面:博主是一名软件工程系大数据应用开发专业大二的学生,昵称来源于《爱丽丝梦游仙境》中的Alice


请注意,本文编写于 169 天前,最后修改于 169 天前,其中某些信息可能已经过时。



写在前面:博主是一名软件工程系大数据应用开发专业大二的学生,昵称来源于《爱丽丝梦游仙境》中的Alice和自己的昵称。作为一名互联网小白, 写博客一方面是为了记录自己的学习历程,一方面是希望能够帮助到很多和自己一样处于起步阶段的萌新 。由于水平有限,博客中难免会有一些错误,有纰漏之处恳请各位大佬不吝赐教!个人小站: http://alices.ibilibili.xyz/ , 博客主页: https://alice.blog.csdn.net/


尽管当前水平可能不及各位大佬,但我还是希望自己能够做得更好,因为 一天的生活就是一生的缩影。 我希望 在最美的年华,做最好的自己!


之前刚学Spark时分享过一篇磨炼基础的练习题,➤ Ta来了,Ta来了,Spark基础能力测试题Ta来了! ,收到的反馈还是不错的。于是,在正式结课Spark之后,博主又为大家倾情奉献一道关于Spark的综合练习题,希望大家能有所收获✍




题目


以下是RNG S8 8强赛失败后,官微发表道歉微博下一级评论


数据说明:























































字段 字段含义
index 数据id
child_comment 回复数量
comment_time 评论时间
content 评论内容
da_v 微博个人认证
like_status
pic 图片评论url
user_id 微博用户id
user_name 微博用户名
vip_rank 微博会员等级
stamp 时间戳

<1> 在kafak中创建rng_comment主题,设置2个分区2个副本


<2>数据预处理,把空行和缺失字段的行过滤掉


<3>请把给出的文件写入到kafka中,根据数据id进行分区,id为奇数的发送到一个分区中,偶数的发送到另一个分区


<4>使用Spark Streaming对接kafka


<5>使用Spark Streaming对接kafka之后进行计算

在mysql中创建一个数据库rng_comment 在数据库rng_comment创建vip_rank表,字段为数据的所有字段 在数据库rng_comment创建like_status表,字段为数据的所有字段 在数据库rng_comment创建count_conmment表,字段为 时间,条数

<6>查询出微博会员等级为5的用户,并把这些数据写入到mysql数据库中的vip_rank表中


<7>查询出评论赞的个数在10个以上的数据,并写入到mysql数据库中的like_status表中


<8>分别计算出2018/10/20 ,2018/10/21,2018/10/22,2018/10/23这四天每一天的评论数是多少,并写入到mysql数据库中的count_conmment表中




答案


<1> 创建Topic


在命令行窗口执行Kafka创建Topic的命令,并指定对应的分区数和副本数


/export/servers/kafka_2.11-1.0.0/bin/kafka-topics.sh --create --zookeeper node01:2181 --replication-factor 2 --partitions 2 --topic rng_comment

<2> 读取文件,并对数据做过滤并输出到新文件


object test01_filter {
def main(args: Array[String]): Unit = {
val spark: SparkSession = SparkSession.builder().master("local[*]").appName("demo01").getOrCreate()
val sc: SparkCOntext= spark.sparkContext
// 读取数据
//testFile是多行数据
val rddInfo: RDD[String] = sc.textFile("E:\\rng_comment.txt")

// 对数据进行一个过滤
val RNG_INFO: RDD[String] = rddInfo.filter(data => {
// 判断长度:将每行的内容用tab键切割,判断最后的长度
// 判读是否为空字符: trim之后不为empty
data.split("\t").length == 11 && !data.trim.isEmpty
})

// // 如果想直接将数据写入到Kafka,而不通过输出文件的方式
// val kafkaProducer: KafkaProducer[String, String] = new KafkaProducer[String, String](props)
//
// def saveToKafka(INFO:RDD[String]): Unit ={
//
// try {
//
// INFO.foreach(x=>{
// val record: ProducerRecord[String, String] = new ProducerRecord[String,String]("rng_test",x.split("\t")(0),x.toString)
//
// kafkaProducer.send(record)
// })
//
// }catch {
// case e:Exception => println("发送数据出错:"+e)
// }
//
// }

// 导入隐式转换
// 将RDD转换成DF
import spark.implicits._
val df: DataFrame = RNG_INFO.toDF()
// 输出数据【默认分区数为2,这里我们指定分区数为1】
df.repartition(1).write.text("E:\\outputtest")

// 关闭资源
sc.stop()
spark.stop()
}
}

<3>读取新文件,将数据按照题意发送到Kafka的不同分区


需要先写一个实现自定义分区逻辑的java类


/*
编写自定义分区逻辑
*/
public class ProducerPartition implements Partitioner {
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
/*
编写自定义分区代码
*/
//System.out.println(value.toString());
String[] str = value.toString().split("\t");
// 由题意可得,id为奇数的发送到一个分区中,偶数的发送到另一个分区
if (Integer.parseInt(str[0]) % 2 == 0){
return 0;
}else {
return 1;
}
}
@Override
public void close() {
}
@Override
public void configure(Map configs) {
}
}

然后在下面的程序中引用分区类的类路径


public class test02_send {
/*
程序的入口
*/
public static void main(String[] args) throws IOException {
//编写生产数据的程序
//1、配置kafka集群环境(设置)
Properties props = new Properties();
//kafka服务器地址
props.put("bootstrap.servers", "node01:9092,node02:9092,node03:9092");
//消息确认机制
props.put("acks", "all");
//重试机制
props.put("retries", 0);
//批量发送的大小
props.put("batch.size", 16384);
//消息延迟
props.put("linger.ms", 1);
//批量的缓冲区大小
props.put("buffer.memory", 33554432);
// kafka key 和value的序列化
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// 根据题意得,需要自定义分区
props.put("partitioner.class", "com.czxy.scala.demo12_0415.han.ProducerPartition");
KafkaProducer kafkaProducer = new KafkaProducer<>(props);
// 指定需要读取的文件
File file = new File("E:\\outputtest\\part-00000-fe536dc7-523d-4fdd-b0b5-1a045b8cb1ab-c000.txt");
// 创建对应的文件流,进行数据的读取
FileInputStream fileInputStream = new FileInputStream(file);
// 指定编码格式进行读取
InputStreamReader inputStreamReader = new InputStreamReader(fileInputStream, "UTF-8");
// 创建缓冲流
BufferedReader bufferedReader = new BufferedReader(inputStreamReader);
// 创建一个变量,用来保存每次读取的数据
String tempString = null;
// 循环遍历读取文件内容
while ((tempString = bufferedReader.readLine()) != null) {
// 利用kafka对象发送数据
kafkaProducer.send(new ProducerRecord<>("rng_comment", tempString));
// 发送完成之后打印数据
System.out.println("已发送:" + tempString);
}
System.out.println("数据发送完毕!");
// 关闭kafka数据生产者
kafkaProducer.close();
}
}

<4> 先在数据库中创建好接收数据需要用到的表


create table vip_rank
(
`index` varchar(100) null comment '数据id',
child_comment varchar(100) null comment '回复数量',
comment_time DATE null comment '评论时间',
content TEXT null comment '评论内容',
da_v varchar(100) null comment '微博个人认证',
like_status varchar(100) null comment '赞',
pic varchar(100) null comment '图片评论url',
user_id varchar(100) null comment '微博用户id',
user_name varchar(100) null comment '微博用户名',
vip_rank int null comment '微博会员等级',
stamp varchar(100) null comment '时间戳'
);
create table like_status
(
`index` varchar(100) null comment '数据id',
child_comment varchar(100) null comment '回复数量',
comment_time DATE null comment '评论时间',
content varchar(10000) null comment '评论内容',
da_v varchar(100) null comment '微博个人认证',
like_status varchar(100) null comment '赞',
pic varchar(100) null comment '图片评论url',
user_id varchar(100) null comment '微博用户id',
user_name varchar(100) null comment '微博用户名',
vip_rank int null comment '微博会员等级',
stamp varchar(100) null comment '时间戳'
);
create table count_comment
(
time DATE null comment '时间',
count int null comment '出现的次数',
constraint rng_comment_pk
primary key (time)
);

<5> 使用Spark Streaming对接kafka之后进行计算

下面的代码完成了:查询出微博会员等级为5的用户,并把这些数据写入到mysql数据库中的vip_rank表中查询出评论赞的个数在10个以上的数据,并写入到mysql数据库中的like_status表中

object test03_calculate {
/*
将数据从kafka集群中读取,并将数据做进一步的处理过后,写入到mysql数据库中
*/
def ConnectToMysql() ={
// 连接驱动,设置需要连接的MySQL的位置以及数据库名 + 用户名 + 密码
DriverManager.getConnection("jdbc:mysql://localhost:3306/rng_comment?characterEncoding=UTF-8", "root", "root")
}
/**
* 将数据写入到MySQL的方法
* @param tableName 表名
* @param data List类型的数据
*/
def saveDataToMysql(tableName:String,data:List[String]): Unit ={
// 获取连接
val connection: COnnection= ConnectToMysql()
// 创建一个变量用来保存sql语句
val sql = s"insert into ${tableName} (`index`, child_comment, comment_time, content, da_v,like_status,pic,user_id,user_name,vip_rank,stamp) values (?,?,?,?,?,?,?,?,?,?,?)"
// 将数据存入到mysql中
val ps: PreparedStatement = connection.prepareStatement(sql)
ps.setString(1,data.head)
ps.setString(2,data(1))
ps.setString(3,data(2))
ps.setString(4,data(3))
ps.setString(5,data(4))
ps.setString(6,data(5))
ps.setString(7,data(6))
ps.setString(8,data(7))
ps.setString(9,data(8))
ps.setString(10,data(9))
ps.setString(11,data(10))
// 提交[因为是插入数据,所以这里需要更新]
ps.executeUpdate()
// 关闭连接
connection.close()
}
def main(args: Array[String]): Unit = {
//1 创建sparkConf
var cOnf= new SparkConf().setMaster("local[*]").setAppName("SparkStremingDemo1")
//2 创建一个sparkcontext
var sc = new SparkContext(conf)
sc.setLogLevel("WARN")
//3 创建streamingcontext
var ssc = new StreamingContext(sc,Seconds(3))
//设置kafka对接参数
var kafkaParams= Map[String, Object](
"bootstrap.servers" -> "node01:9092,node02:9092,node03:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "SparkKafkaDemo",
//earliest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
//latest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据
//none:topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常
//这里配置latest自动重置偏移量为最新的偏移量,即如果有偏移量从偏移量位置开始消费,没有偏移量从新来的数据开始消费
"auto.offset.reset" -> "earliest",
//false表示关闭自动提交.由spark帮你提交到Checkpoint或程序员手动维护
"enable.auto.commit" -> (false: java.lang.Boolean)
)
// 设置检查点的位置
ssc.checkpoint("sparkstreaming/")
//kafkaDatas 含有key和value
//key是kafka成产数据时指定的key(可能为空)
//value是真实的数据(100%有数据)
val kafkaDatas: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String](
ssc,
//设置位置策略 均衡
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String, String](Array("rng_comment"), kafkaParams))
kafkaDatas.foreachRDD(rdd=>rdd.foreachPartition(line=>{
// 遍历每一个分区的数据
for (row <- line){
// 获取到行数据组成的array数组
val str: Array[String] = row.value().split("\t")
// 将数据转成List集合
val list: List[String] = str.toList
/* 查询出微博会员等级为5的用户,并把这些数据写入到mysql数据库中的vip_rank表中 */
if (list(9).equals("5")){
// 调用方法,将集合数据写入到指定的表中
saveDataToMysql("vip_rank",list)
}
/* 查询出评论赞的个数在10个以上的数据,并写入到mysql数据库中的like_status表中 */
if (Integer.parseInt(list(5))>10){
saveDataToMysql("like_status",list)
}
}
}))
//5 开启计算任务
ssc.start()
//6 等待关闭
ssc.awaitTermination()
}
}

运行成功后的效果


vip_rank




like_status



下面的代码完成了:分别计算出2018/10/20 ,2018/10/21,2018/10/22,2018/10/23这四天每一天的评论数是多少,并写入到mysql数据库中的count_conmment表中

object test04_count {
def ConnectToMysql() ={
// 连接驱动,设置需要连接的MySQL的位置以及数据库名 + 用户名 + 密码
DriverManager.getConnection("jdbc:mysql://localhost:3306/rng_test?characterEncoding=UTF-8", "root", "root")
}
/**
* 将数据存入到mysql中
*
* @param time 时间
* @param count 数量
*/
def saveDataToMysql(time: String, count: Int): Unit = {
println(s"$time\t $count")
if (time.contains("2018/10/20") || time.contains("2018/10/21") || time.contains("2018/10/22") || time.contains("2018/10/23")) {
//获取连接
val connection: COnnection= ConnectToMysql()
//创建一个变量用来保存sql语句
val sql: String = "INSERT INTO count_comment (time,count) VALUES (?,?) ON DUPLICATE KEY UPDATE count = ?"
//将一条数据存入到mysql
val ps: PreparedStatement = connection.prepareStatement(sql)
ps.setString(1, time)
ps.setInt(2, count)
ps.setInt(3, count)
//提交
ps.executeUpdate()
//关闭连接
connection.close()
}
}
def main(args: Array[String]): Unit = {
//1 创建sparkConf
var conf: SparkCOnf=new SparkConf().setMaster("local[*]").setAppName("SparkStremingDemo1")
//2 创建一个sparkcontext
var sc: SparkCOntext=new SparkContext(conf)
sc.setLogLevel("WARN")
//3 创建StreamingContext
var ssc: StreamingCOntext=new StreamingContext(sc,Seconds(5))
//设置缓存数据的位置
ssc.checkpoint("./TmpCount")
// 设置kafka的参数
var kafkaParams: Map[String, Object] = Map[String, Object](
"bootstrap.servers" -> "node01:9092,node02:9092,node03:9092", // 集群位置
"key.deserializer" -> classOf[StringDeserializer], // key序列化标准
"value.deserializer" -> classOf[StringDeserializer], // value序列化标准
"group.id" -> "SparkKafkaDemo", // 分组id
//earliest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
//latest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据
//none:topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常
//这里配置latest自动重置偏移量为最新的偏移量,即如果有偏移量从偏移量位置开始消费,没有偏移量从新来的数据开始消费
"auto.offset.reset" -> "earliest",
//false表示关闭自动提交.由spark帮你提交到Checkpoint或程序员手动维护
"enable.auto.commit" -> (false: java.lang.Boolean)
)
// 接收Kafka的数据并根据业务逻辑进行计算
val kafkaDatas: InputDStream[ConsumerRecord[String, String]] =
KafkaUtils.createDirectStream[String,String](
ssc, // StreamingContext对象
LocationStrategies.PreferConsistent, // 位置策略
ConsumerStrategies.Subscribe[String,String](Array("rng_comment"),kafkaParams) // 设置需要消费的topic和kafka参数
)
// 2018/10/23 16:09 需要先获取到下标为2的数据,再按照空格进行切分,获取到年月日即可
val kafkaWordOne: DStream[(String, Int)] = kafkaDatas.map(z=>z.value().split("\t")(2).split(" ")(0)).map((_,1))
// 更新数据
val wordCounts: DStream[(String, Int)] = kafkaWordOne.updateStateByKey(updateFunc)
// 遍历RDD
wordCounts.foreachRDD(rdd=>rdd.foreachPartition(line=>{
for(row <- line){
saveDataToMysql(row._1,row._2)
//println("保存成功!")
}
}))
println("完毕!")
// 开启计算任务
ssc.start()
// 等待关闭
ssc.awaitTermination()
}
//currentValues:当前批次的value值,如:1,1,1 (以测试数据中的hadoop为例)
//historyValue:之前累计的历史值,第一次没有值是0,第二次是3
//目标是把当前数据+历史数据返回作为新的结果(下次的历史数据)
def updateFunc(currentValues:Seq[Int], historyValue:Option[Int] ):Option[Int] ={
// currentValues当前值
// historyValue历史值
val result: Int = currentValues.sum + historyValue.getOrElse(0)
Some(result)
}
}

运行成功后的效果


count_comment






结语


本次的分享就到这里,因为博主还是一个萌新,能力有限,如果以上过程中出现了任何的纰漏错误,烦请大佬们指正。

受益的朋友或对大数据技术感兴趣的伙伴记得点赞关注支持一波(^U^)ノ~YO





推荐阅读
  • 本文分享了一个关于在C#中使用异步代码的问题,作者在控制台中运行时代码正常工作,但在Windows窗体中却无法正常工作。作者尝试搜索局域网上的主机,但在窗体中计数器没有减少。文章提供了相关的代码和解决思路。 ... [详细]
  • 本文介绍了Java工具类库Hutool,该工具包封装了对文件、流、加密解密、转码、正则、线程、XML等JDK方法的封装,并提供了各种Util工具类。同时,还介绍了Hutool的组件,包括动态代理、布隆过滤、缓存、定时任务等功能。该工具包可以简化Java代码,提高开发效率。 ... [详细]
  • 如何使用Java获取服务器硬件信息和磁盘负载率
    本文介绍了使用Java编程语言获取服务器硬件信息和磁盘负载率的方法。首先在远程服务器上搭建一个支持服务端语言的HTTP服务,并获取服务器的磁盘信息,并将结果输出。然后在本地使用JS编写一个AJAX脚本,远程请求服务端的程序,得到结果并展示给用户。其中还介绍了如何提取硬盘序列号的方法。 ... [详细]
  • Metasploit攻击渗透实践
    本文介绍了Metasploit攻击渗透实践的内容和要求,包括主动攻击、针对浏览器和客户端的攻击,以及成功应用辅助模块的实践过程。其中涉及使用Hydra在不知道密码的情况下攻击metsploit2靶机获取密码,以及攻击浏览器中的tomcat服务的具体步骤。同时还讲解了爆破密码的方法和设置攻击目标主机的相关参数。 ... [详细]
  • 本文介绍了在Mac上搭建php环境后无法使用localhost连接mysql的问题,并通过将localhost替换为127.0.0.1或本机IP解决了该问题。文章解释了localhost和127.0.0.1的区别,指出了使用socket方式连接导致连接失败的原因。此外,还提供了相关链接供读者深入了解。 ... [详细]
  • 本文详细介绍了MySQL表分区的创建、增加和删除方法,包括查看分区数据量和全库数据量的方法。欢迎大家阅读并给予点评。 ... [详细]
  • 解决nginx启动报错epoll_wait() reported that client prematurely closed connection的方法
    本文介绍了解决nginx启动报错epoll_wait() reported that client prematurely closed connection的方法,包括检查location配置是否正确、pass_proxy是否需要加“/”等。同时,还介绍了修改nginx的error.log日志级别为debug,以便查看详细日志信息。 ... [详细]
  • 基于Socket的多个客户端之间的聊天功能实现方法
    本文介绍了基于Socket的多个客户端之间实现聊天功能的方法,包括服务器端的实现和客户端的实现。服务器端通过每个用户的输出流向特定用户发送消息,而客户端通过输入流接收消息。同时,还介绍了相关的实体类和Socket的基本概念。 ... [详细]
  • 大数据Hadoop生态(20)MapReduce框架原理OutputFormat的开发笔记
    本文介绍了大数据Hadoop生态(20)MapReduce框架原理OutputFormat的开发笔记,包括outputFormat接口实现类、自定义outputFormat步骤和案例。案例中将包含nty的日志输出到nty.log文件,其他日志输出到other.log文件。同时提供了一些相关网址供参考。 ... [详细]
  • 本文讨论了在shiro java配置中加入Shiro listener后启动失败的问题。作者引入了一系列jar包,并在web.xml中配置了相关内容,但启动后却无法正常运行。文章提供了具体引入的jar包和web.xml的配置内容,并指出可能的错误原因。该问题可能与jar包版本不兼容、web.xml配置错误等有关。 ... [详细]
  • 抽空写了一个ICON图标的转换程序
    抽空写了一个ICON图标的转换程序,支持png\jpe\bmp格式到ico的转换。具体的程序就在下面,如果看的人多,过两天再把思路写一下。 ... [详细]
  • 目录实现效果:实现环境实现方法一:基本思路主要代码JavaScript代码总结方法二主要代码总结方法三基本思路主要代码JavaScriptHTML总结实 ... [详细]
  • 不同优化算法的比较分析及实验验证
    本文介绍了神经网络优化中常用的优化方法,包括学习率调整和梯度估计修正,并通过实验验证了不同优化算法的效果。实验结果表明,Adam算法在综合考虑学习率调整和梯度估计修正方面表现较好。该研究对于优化神经网络的训练过程具有指导意义。 ... [详细]
  • mac php错误日志配置方法及错误级别修改
    本文介绍了在mac环境下配置php错误日志的方法,包括修改php.ini文件和httpd.conf文件的操作步骤。同时还介绍了如何修改错误级别,以及相应的错误级别参考链接。 ... [详细]
  • 本文介绍了在Python中使用zlib模块进行字符串的压缩与解压缩的方法,并探讨了其在内存优化方面的应用。通过压缩存储URL等长字符串,可以大大降低内存消耗,虽然处理时间会增加,但是整体效果显著。同时,给出了参考链接,供进一步学习和应用。 ... [详细]
author-avatar
秋静222
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有