热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

kafka滞销瓶颈解决方案

kafka滞销瓶颈解决方案,Go语言社区,Golang程序员人脉社



2019独角兽企业重金招聘Python工程师标准>>> hot3.png




一、问题背景

       kafka消费者程序在即时消费时,发现下午仍在消费当天上午数据,这或多或少发生了kafka消费程序的延时或阻塞,需要寻找相关解决方案,解放kafka消费程序。



二、解决方案

        1、方案1:解放kafka消费者,剥离kafka数据业务处理程序并异步调用。

        kafka消费者:

public class GateWayPushInoListener implements MessageListener {
private static final Logger logger = LoggerFactory.getLogger(GateWayPushInoListener.class);
@Autowired
private IPushService pushService;
@Override
public void onMessage(ConsumerRecord msg) {
String jsOnString= msg.value();
logger.warn("消费kafka数据:" + jsonString);

//异步调用业务方法
pushService.pushMsg(jsonString);
}
}

        业务实现类:

//public class PushService extends BaseService implements IPushService
@Async("jpushExecutor")
@Override
public void pushMsg(String jsonString) {
// 解析进出站包、gps包
/********************* 泰国警报推送业务代码新增 ********************/
// 解析报警异常信息
// 所有
List

allRes = findRegIdsByType();
// 所有
List

all = null;
// 异常开关门
List

openCloseDoors = null;
// 离线
List

leaveLines = null;
// 超速
List

overSpeeds = null;
// 疑似抛锚
List

breakDowns = null;
// 超载
List

overloadAlarms = null;
// 紧急报警
List

emergencyAlarms = null;
for (PushPlatformGroup pushPlatformGroup : allRes) {
if (pushPlatformGroup.getFlag() == 0) {
all = pushPlatformGroup.getPhonePlats();
} else if (pushPlatformGroup.getFlag() == 1) {
openCloseDoors = pushPlatformGroup.getPhonePlats();
} else if (pushPlatformGroup.getFlag() == 2) {
leaveLines = pushPlatformGroup.getPhonePlats();
} else if (pushPlatformGroup.getFlag() == 3) {
overSpeeds = pushPlatformGroup.getPhonePlats();
} else if (pushPlatformGroup.getFlag() == 4) {
breakDowns = pushPlatformGroup.getPhonePlats();
} else if (pushPlatformGroup.getFlag() == 5) {
overloadAlarms = pushPlatformGroup.getPhonePlats();
} else if (pushPlatformGroup.getFlag() == 99) {
emergencyAlarms = pushPlatformGroup.getPhonePlats();
}
}
JSONObject jsOnObj= JSONObject.parseObject(jsonString);
String type = null;
String lineName = null;
String carName = null;
String occurDate = null;
// 修订警报描述
type = alarmTypeRevise(jsonObj.getString("type"));
lineName = jsonObj.getString("lineName");
carName = jsonObj.getString("carName");
Date date = jsonObj.getDate("occurDate");
occurDate = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(date);
StringBuilder sb = new StringBuilder();
sb.append("{"msg_content": "").append("Busline:").append(lineName).append(",").append("Vehicle number:")
.append(carName).append(",").append("Time:").append(occurDate).append(" trigger ").append(type)
.append(" alarm!").append("","title": "").append(type).append(""}");
String alarmMsg = sb.toString();
StringBuilder sb2 = new StringBuilder();
sb2.append("Busline:").append(lineName).append(",").append("Vehicle number:").append(carName).append(",")
.append("Time:").append(occurDate).append(" trigger ").append(type).append(" alarm!");
String alarmMsg2 = sb2.toString();
// 向设置所有推送消息
jpush2Group(all, type, alarmMsg, alarmMsg2);
// 向自定义设置推送消息
switch (type) {
case "off route":
jpush2Group(leaveLines, type, alarmMsg, alarmMsg2);
break;
case "overtime parking":
jpush2Group(breakDowns, type, alarmMsg, alarmMsg2);
break;
case "overspeed":
jpush2Group(overSpeeds, type, alarmMsg, alarmMsg2);
break;
case "abnormal door open":
jpush2Group(openCloseDoors, type, alarmMsg, alarmMsg2);
break;
case "overload":
jpush2Group(overloadAlarms, type, alarmMsg, alarmMsg2);
break;
case "SOS alarm":
jpush2Group(emergencyAlarms, type, alarmMsg, alarmMsg2);
break;
default:
break;
}
}
/**
*


* Title: 给用户组发送消息推送
*


*


* Description:
*


*/
private void jpush2Group(List

phonePlats, String title, String msgContent, String msgContent2) {
if (!CollectionUtils.isEmpty(phonePlats)) {
phonePlats.forEach(g -> {
if (g.getType() == 1 && !StringUtils.isEmpty(g.getRegId())) {
JpushHelper.sendMessageAndroidWithExtra(title, msgContent, g.getRegId());
} else if (g.getType() == 2 && !StringUtils.isEmpty(g.getRegId())) {
JpushHelper.sendNotifyIOSWithExtra(title, msgContent2, g.getRegId());
}
});
}
}
// 修订警报提示
private String alarmTypeRevise(String type) {
switch (type) {
case "leaveLine":
return "off route";
case "breakDown":
return "overtime parking";
case "overSpeed":
return "overspeed";
case "openCloseDoor":
return "abnormal door open";
case "overLoad":
return "overload";
case "emergencyAlarm":
return "SOS alarm";
default:
return "";
}
}
/*************************** end ***************************/

       线程池配置(仅配置1个活跃线程,极光推送免费版有限制):


queue-capacity="10" keep-alive="5" rejection-policy="DISCARD_OLDEST" />

       

        2、方案2:自定义一个ArrayBlockingQueue队列,  大小为10,kafka消费者消费时将数据存入此队列中,超过数量10则删除最先前的;另起一个单独线程,对此队列数据进行业务处理。    

         待续...

        



三、总结

        将kafka消费程序和业务处理程序分离,串行并行化,各司其职,提高了吞吐率。

          







转载于:https://my.oschina.net/Howard2016/blog/991558



推荐阅读
  • 开发笔记:加密&json&StringIO模块&BytesIO模块
    篇首语:本文由编程笔记#小编为大家整理,主要介绍了加密&json&StringIO模块&BytesIO模块相关的知识,希望对你有一定的参考价值。一、加密加密 ... [详细]
  • 本文介绍了Java工具类库Hutool,该工具包封装了对文件、流、加密解密、转码、正则、线程、XML等JDK方法的封装,并提供了各种Util工具类。同时,还介绍了Hutool的组件,包括动态代理、布隆过滤、缓存、定时任务等功能。该工具包可以简化Java代码,提高开发效率。 ... [详细]
  • 如何使用Java获取服务器硬件信息和磁盘负载率
    本文介绍了使用Java编程语言获取服务器硬件信息和磁盘负载率的方法。首先在远程服务器上搭建一个支持服务端语言的HTTP服务,并获取服务器的磁盘信息,并将结果输出。然后在本地使用JS编写一个AJAX脚本,远程请求服务端的程序,得到结果并展示给用户。其中还介绍了如何提取硬盘序列号的方法。 ... [详细]
  • 本文讨论了在openwrt-17.01版本中,mt7628设备上初始化启动时eth0的mac地址总是随机生成的问题。每次随机生成的eth0的mac地址都会写到/sys/class/net/eth0/address目录下,而openwrt-17.01原版的SDK会根据随机生成的eth0的mac地址再生成eth0.1、eth0.2等,生成后的mac地址会保存在/etc/config/network下。 ... [详细]
  • 如何查询zone下的表的信息
    本文介绍了如何通过TcaplusDB知识库查询zone下的表的信息。包括请求地址、GET请求参数说明、返回参数说明等内容。通过curl方法发起请求,并提供了请求示例。 ... [详细]
  • 本文介绍了如何使用JSONObiect和Gson相关方法实现json数据与kotlin对象的相互转换。首先解释了JSON的概念和数据格式,然后详细介绍了相关API,包括JSONObject和Gson的使用方法。接着讲解了如何将json格式的字符串转换为kotlin对象或List,以及如何将kotlin对象转换为json字符串。最后提到了使用Map封装json对象的特殊情况。文章还对JSON和XML进行了比较,指出了JSON的优势和缺点。 ... [详细]
  • SpringBoot整合SpringSecurity+JWT实现单点登录
    SpringBoot整合SpringSecurity+JWT实现单点登录,Go语言社区,Golang程序员人脉社 ... [详细]
  • python3 nmap函数简介及使用方法
    本文介绍了python3 nmap函数的简介及使用方法,python-nmap是一个使用nmap进行端口扫描的python库,它可以生成nmap扫描报告,并帮助系统管理员进行自动化扫描任务和生成报告。同时,它也支持nmap脚本输出。文章详细介绍了python-nmap的几个py文件的功能和用途,包括__init__.py、nmap.py和test.py。__init__.py主要导入基本信息,nmap.py用于调用nmap的功能进行扫描,test.py用于测试是否可以利用nmap的扫描功能。 ... [详细]
  • 本文介绍了在开发Android新闻App时,搭建本地服务器的步骤。通过使用XAMPP软件,可以一键式搭建起开发环境,包括Apache、MySQL、PHP、PERL。在本地服务器上新建数据库和表,并设置相应的属性。最后,给出了创建new表的SQL语句。这个教程适合初学者参考。 ... [详细]
  • 本文分享了一个关于在C#中使用异步代码的问题,作者在控制台中运行时代码正常工作,但在Windows窗体中却无法正常工作。作者尝试搜索局域网上的主机,但在窗体中计数器没有减少。文章提供了相关的代码和解决思路。 ... [详细]
  • Java实战之电影在线观看系统的实现
    本文介绍了Java实战之电影在线观看系统的实现过程。首先对项目进行了简述,然后展示了系统的效果图。接着介绍了系统的核心代码,包括后台用户管理控制器、电影管理控制器和前台电影控制器。最后对项目的环境配置和使用的技术进行了说明,包括JSP、Spring、SpringMVC、MyBatis、html、css、JavaScript、JQuery、Ajax、layui和maven等。 ... [详细]
  • 目录实现效果:实现环境实现方法一:基本思路主要代码JavaScript代码总结方法二主要代码总结方法三基本思路主要代码JavaScriptHTML总结实 ... [详细]
  • 本文介绍了C#中生成随机数的三种方法,并分析了其中存在的问题。首先介绍了使用Random类生成随机数的默认方法,但在高并发情况下可能会出现重复的情况。接着通过循环生成了一系列随机数,进一步突显了这个问题。文章指出,随机数生成在任何编程语言中都是必备的功能,但Random类生成的随机数并不可靠。最后,提出了需要寻找其他可靠的随机数生成方法的建议。 ... [详细]
  • Firefox火狐浏览器关闭到http://detectportal.firefox.com的流量问题解决办法
    本文介绍了使用Firefox火狐浏览器时出现关闭到http://detectportal.firefox.com的流量问题,并提供了解决办法。问题的本质是因为火狐默认开启了Captive portal技术,当连接需要认证的WiFi时,火狐会跳出认证界面。通过修改about:config中的network.captive-portal-service.en的值为false,可以解决该问题。 ... [详细]
  • Android系统源码分析Zygote和SystemServer启动过程详解
    本文详细解析了Android系统源码中Zygote和SystemServer的启动过程。首先介绍了系统framework层启动的内容,帮助理解四大组件的启动和管理过程。接着介绍了AMS、PMS等系统服务的作用和调用方式。然后详细分析了Zygote的启动过程,解释了Zygote在Android启动过程中的决定作用。最后通过时序图展示了整个过程。 ... [详细]
author-avatar
mobiledu2502858263
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有