热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

记一次spark数据倾斜的优化

记一次解决spark处理性

一、问题的背景

    最近在flink上开发了用户和物品的实时统计特征,线上推荐服务请求的时候通过查询redis进行数据交互,每次推荐结果返回的最终会落地到HDFS文件系统中,供离线训练模型使用。

    在离线训练的时候由于各种各样的原因(无能狂怒),无法在采样的过程中将样本和realtimeFeature直接抽取出来(待优化),实际采用的是采样样本和特征整合分离的两阶段方案。

    对于离线特征整合 join 得到, 再根据item作为key, join  最终得到, 由于用户特征和物品特征都是天级离线特征这样做虽然不够牛逼(正确做法应该还是通过返回结果附带特征的方案)但是也不会出现大的纰漏(小纰漏还是不少,有优化空间)。

    但是到了实时特征的时候会出现问题,不论是用户实时特征还是物品实时特征都不断在变化,不是一个feature, 对于样本需要找到user在t1时刻的userFeature,item在t1时刻的itemFeature。


二、不太合理但是勉强能用的方案

    举用户实时特征为例,样本关联实时特征。

    我们落地了如果直接拿user+time作为key进行关联岂不是美滋滋,还是由于上诉吐槽的各种各样的原因,user+sampleTime未必能完美关联上user+featureTime,需要查找样本sampleTime最近的用户特征featureTime。

    被拆解成为两份数据:

    1、根据user group by 的;

   2、以user+featureTime为key的特征数据;

    整体设计分成了两个阶段:1、查找最近时间,2、关联具体特征数据;


三、时间查找优化

  首先我们根据uid关联, 遍历featureTimeList 找到和sampleTime距离最近的featureTime。

    最开始版本非常low:

val featureTime = timeList
.map { t1 => (Math.abs(t1 - sampleTime), t1) }
.minBy(v => v._1)._2

    因为热门的用户和物品落地的实时特征特别多,导致timeList非常大,timeList理论上最大值是24*3600=86400, 在spark上处理的时候会发现卡在99/100的过程中(假设有100个partition)。

    优化的做法是将timeList进行排序,然后采用二分查找:

val featureTime = biSearch(featureTimeList, sampleTime)
def biSearch(data:Array[Long], input:Long):LOng= {
var start = 0
var end = data.length - 1
var retIndex = 0
if(data(start) <= input ){
data(start)
}else if(data(end) >= input){
data(end)
}else{
while(start <end){
val index = (start + end)/2
if(data(index) > input && data(index+1) > input){
start = index
}else if(data(index) data(index + 1)
end = index
}else if(data(index) > input && data(index + 1)
val c = data(index) + data(index + 1) -2*input
if(c > 0){
retIndex = index + 1
}else{
retIndex = index
}
start = end
}else if(data(index) == input){
retIndex = index
start = end
}else if(data(index + 1) == input){
retIndex = index + 1
start = end
}
}
data(retIndex)
}
}

    毕竟不是做ACM懒得考虑featureTimeList边界条件,在使用的时候只对size > 10的列表使用二分查找,其他的就直接遍历。


四、关联优化

    经过上述的二分查找后,查找效率提升了不少,接下来我们愉快的将进行关联希望得到 , 在实际使用的过程中又遇到了99/100的阻塞问题。

    经过排查发现并不是所有的用户或者物品都有实时特征,在时间查找过程中featureTimeList为空(其实连对应的user key也都没有)当时的做法是给这部分user+featureTime赋值为空,查看日志的时候这样做会导致大量的数据倾斜。

    无论是""还是user+featureTime的作用就只是关联,对后续的数据处理没有什么影响,因此就考虑将上一阶段关联不上被设置为""的key打散到100个分区里规避数据倾斜的问题:

def searchTimeKey_user(keyArrayJson: RDD[(String, (Array[String], JSONObject))],
onlineTime: RDD[(String, Array[Long])]): RDD[(String, (Array[String], JSONObject))] = {
keyArrayJson.leftOuterJoin(onlineTime, 100)
.map { case (id, ((arr, itemFeature), timeList)) =>
val t0 = TimeUtil.getSecondTimeStamp(arr(3))
var key = String.valueOf(Random.nextInt(100)) // 当关联不上的时候key反正没用,打散了防止数据倾斜
if (timeList.isDefined) {
if(timeList.get.length > 10){
val t1 = biSearch(timeList.get, t0)
key = id + String.valueOf(t1)
}else{
val t1 = timeList.get.map { t1 => (Math.abs(t1 - t0), t1) }.minBy(v => v._1)._2
key = id + String.valueOf(t1)
}
}
(key, (arr, itemFeature))
}
}


五、小结

    以上关于样本和特征这样关联是否合理和进一步改进优化我们不做讨论,本着先用为敬的原则后续再去优化。

    由于太久没有遇到spark执行性能的问题并且文章也好久没有更新了,这次有点小激动就随手记录一笔。

    spark在执行过程中遇到的性能问题大概率都是shuffle后的数据倾斜问题,解决方案也无非就是小表broadcast, 大表join小表,打散join key等等方法,美团技术博客在这方面已经讲得很透彻了,温故而知新吧,毕竟程序员这样的手艺人长时间没有操作容易忘记和手生。

https://tech.meituan.com/2016/04/29/spark-tuning-basic.html

Spark性能优化指南——基础篇


https://tech.meituan.com/2016/05/12/spark-tuning-pro.html

Spark性能优化指南——高级篇




推荐阅读
  • 你知道Kafka和Redis的各自优缺点吗?一文带你优化选择,不走弯路 ... [详细]
  • 马蜂窝数据总监分享:从数仓到数据中台,大数据演进技术选型最优解
    大家好,今天分享的议题主要包括几大内容:带大家回顾一下大数据在国内的发展,从传统数仓到当前数据中台的演进过程;我个人认为数 ... [详细]
  • 图解redis的持久化存储机制RDB和AOF的原理和优缺点
    本文通过图解的方式介绍了redis的持久化存储机制RDB和AOF的原理和优缺点。RDB是将redis内存中的数据保存为快照文件,恢复速度较快但不支持拉链式快照。AOF是将操作日志保存到磁盘,实时存储数据但恢复速度较慢。文章详细分析了两种机制的优缺点,帮助读者更好地理解redis的持久化存储策略。 ... [详细]
  • 本文介绍了关系型数据库和NoSQL数据库的概念和特点,列举了主流的关系型数据库和NoSQL数据库,同时描述了它们在新闻、电商抢购信息和微博热点信息等场景中的应用。此外,还提供了MySQL配置文件的相关内容。 ... [详细]
  • ALTERTABLE通过更改、添加、除去列和约束,或者通过启用或禁用约束和触发器来更改表的定义。语法ALTERTABLEtable{[ALTERCOLUMNcolu ... [详细]
  • 本文介绍了在处理不规则数据时如何使用Python自动提取文本中的时间日期,包括使用dateutil.parser模块统一日期字符串格式和使用datefinder模块提取日期。同时,还介绍了一段使用正则表达式的代码,可以支持中文日期和一些特殊的时间识别,例如'2012年12月12日'、'3小时前'、'在2012/12/13哈哈'等。 ... [详细]
  • 超级简单加解密工具的方案和功能
    本文介绍了一个超级简单的加解密工具的方案和功能。该工具可以读取文件头,并根据特定长度进行加密,加密后将加密部分写入源文件。同时,该工具也支持解密操作。加密和解密过程是可逆的。本文还提到了一些相关的功能和使用方法,并给出了Python代码示例。 ... [详细]
  • MySQL多表数据库操作方法及子查询详解
    本文详细介绍了MySQL数据库的多表操作方法,包括增删改和单表查询,同时还解释了子查询的概念和用法。文章通过示例和步骤说明了如何进行数据的插入、删除和更新操作,以及如何执行单表查询和使用聚合函数进行统计。对于需要对MySQL数据库进行操作的读者来说,本文是一个非常实用的参考资料。 ... [详细]
  • 本文总结和分析了JDK核心源码(2)中lang包下的基础知识,包括常用的对象类型包和异常类型包。在对象类型包中,介绍了Object类、String类、StringBuilder类、StringBuffer类和基本元素的包装类。在异常类型包中,介绍了Throwable类、Error类型和Exception类型。这些基础知识对于理解和使用JDK核心源码具有重要意义。 ... [详细]
  • 深入理解Java虚拟机的并发编程与性能优化
    本文主要介绍了Java内存模型与线程的相关概念,探讨了并发编程在服务端应用中的重要性。同时,介绍了Java语言和虚拟机提供的工具,帮助开发人员处理并发方面的问题,提高程序的并发能力和性能优化。文章指出,充分利用计算机处理器的能力和协调线程之间的并发操作是提高服务端程序性能的关键。 ... [详细]
  • 基于分布式锁的防止重复请求解决方案
    一、前言关于重复请求,指的是我们服务端接收到很短的时间内的多个相同内容的重复请求。而这样的重复请求如果是幂等的(每次请求的结果都相同,如查 ... [详细]
  • 数据库异常智能分析与诊断
    数据库,异常, ... [详细]
  • 报错现象:从mysql5.5数据库导出的数据结构放到mysql5.7.10报错create_timetimestampNOTNULLDEFAULT‘0000-00-0 ... [详细]
  • 技术方案:Spark、kafka、opentsdb、Yahoo的egads模型静态训练:采用两种算法进行模型的训练:指数移动平均和HotWinters,模型一天训练一次,即每天0点开始训练, ... [详细]
  • 在计算机领域,数据仓库(DW或DWH),是一个用于报告和数据分析的零碎,被认为是商业智能的一个外围组成部分。它将以后和历史数据存储在一个中央,为整个企 ... [详细]
author-avatar
UTOB
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有