热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

分布式开源任务调度框架TBSchedule深度解析与应用实践

本文深入解析了分布式开源任务调度框架TBSchedule的核心原理与应用场景,并通过实际案例详细介绍了其部署与使用方法。首先,从源码下载开始,详细阐述了TBSchedule的安装步骤和配置要点。接着,探讨了该框架在大规模分布式环境中的性能优化策略,以及如何通过灵活的任务调度机制提升系统效率。最后,结合具体实例,展示了TBSchedule在实际项目中的应用效果,为开发者提供了宝贵的实践经验。

2019独角兽企业重金招聘Python工程师标准>>> hot3.png

TBSchedule使用

一、下载TBSchedule 源码

http://code.taobao.org/svn/tbschedule/

二、编译TBSchedule源码形成jar包

mvn package/直接mvn deploye/install 一步完成

三、安装到本地的maven仓库

mvn deploy/install

四、在项目pom配置文件中引用这个依赖文件

com.taobao.pamirs.schedule

tbschedule

3.3.3.2

五、配置TBSchedule依赖的zookeeper配置,TBSchedule的调度依赖于zookeeper的节点配置和心跳链接。

 

# zooker service address

schedule.zookeeper.address=120.25.87.176:2181

# root path

schedule.root.path=/bbb/dd

# session timeout

schedule.session.timeout=60000

# userName

schedule.zookeeper.username=ScheduleAdmin

# password

schedule.zookeeper.password=password

 

init-method="init">

//在控制台连接zookeeper的路径要和该配置的路径一致,由于配置时默认要检查父节点是否为空,所以最好先检查该路径是否存在。

六、配置任务项和任务策略,将任务分片,查询任务

六-一、通过代码实现任务的分配

private Logger log = LoggerFactory

.getLogger(getClass());

TBScheduleManagerFactory scheduleManagerFactory;

/**

* 心跳连接时间

*/

private int heartBeatRate = 5*1000;

 

private int judgeDeadInterval = 1*60*1000;

 

private String taskParameter="";

 

private String strategyTaskParameter="";

 

/**

* jvm最大单线程数量

*/

private int numOfSingleServer = 2;

 

/**

* 最大线程组数量 总 最大线程数量 = jvm最大单线程数量 * 最大的线程组总量

* 将任务列表分成几个队列来处理

*/

private int assignNum = 2;

 

private String[] iPLists={"127.0.0.1"};

 

private String[] taskItems={"0","1","2","3","4","5","6","7","8","9"};

 

private String[] baseTaskTypeNames;

 

private String[] dealBeanNames;

 

/**

* 开始时间

*/

private String permitRunStartTime;

 

/**

* 结束时间

*/

private String permitRunEndTime;

 

/**

* 批处理时 每次处理任务的数量

*/

private int executeNumber = 1;

 

/**

* 处理完一批数据后的休息时间

*/

private int sleepTimeInterval = 0;

 

/**

* 采集不到数据的休眠时间

*/

private int sleepTimeNoData = 500;

 

/**

* 每次采集任务的数量

*/

private int fetchDataNumber = 500;

 

@Autowired

public void setScheduleManagerFactory(

TBScheduleManagerFactory tbScheduleManagerFactory) {

this.scheduleManagerFactory = tbScheduleManagerFactory;

}

 

@Test

public void initialConfigData() throws Exception {

}

 

@Override

public void afterPropertiesSet() throws Exception {

if(baseTaskTypeNames.length != dealBeanNames.length) {

throw new RuntimeException("task definition error, baseTaskTypeNames length not equals dealBeanNames length");

}

String baseTaskTypeName;

String dealBeanName;

for (int i = 0; i

baseTaskTypeName = baseTaskTypeNames[i];

dealBeanName= dealBeanNames[i];

configTasks(baseTaskTypeName, dealBeanName);

}

}

 

private void configTasks(String baseTaskTypeName, String dealBeanName)

throws Exception, InterruptedException {

while(this.scheduleManagerFactory.isZookeeperInitialSucess() == false){

Thread.sleep(1000);

}

scheduleManagerFactory.stopServer(null);

Thread.sleep(1000);

try {

this.scheduleManagerFactory.getScheduleDataManager()

.deleteTaskType(baseTaskTypeName);

} catch (Exception e) {

 

}

ScheduleTaskType baseTaskType = new ScheduleTaskType();

baseTaskType.setBaseTaskType(baseTaskTypeName);

baseTaskType.setDealBeanName(dealBeanName);

baseTaskType.setHeartBeatRate(heartBeatRate);

baseTaskType.setJudgeDeadInterval(judgeDeadInterval);

baseTaskType.setTaskParameter(taskParameter);

baseTaskType.setTaskItems(taskItems);

baseTaskType.setPermitRunStartTime(permitRunStartTime);

baseTaskType.setPermitRunEndTime(permitRunEndTime);

baseTaskType.setExecuteNumber(executeNumber);

baseTaskType.setSleepTimeInterval(sleepTimeInterval);

baseTaskType.setSleepTimeNoData(sleepTimeNoData);

baseTaskType.setFetchDataNumber(fetchDataNumber);

this.scheduleManagerFactory.getScheduleDataManager()

.createBaseTaskType(baseTaskType);

String taskName = baseTaskTypeName + "$TEST";

String strategyName = baseTaskTypeName +"-Strategy";

try {

this.scheduleManagerFactory.getScheduleStrategyManager()

.deleteMachineStrategy(strategyName,true);

} catch (Exception e) {

e.printStackTrace();

}

ScheduleStrategy strategy = new ScheduleStrategy();

strategy.setStrategyName(strategyName);

strategy.setKind(ScheduleStrategy.Kind.Schedule);

strategy.setTaskName(taskName);

strategy.setTaskParameter(strategyTaskParameter);

strategy.setNumOfSingleServer(numOfSingleServer);

strategy.setAssignNum(assignNum);

strategy.setIPList(iPLists);

this.scheduleManagerFactory.getScheduleStrategyManager()

.createScheduleStrategy(strategy);

log.info("创建调度任务成功" + strategy.toString());

}

六-二、通过TBSchedule自带的控制台来实现任务的配置

 

七、任务的理解

1、同一个jvm中,不同线程之间如何防止任务被重复执行?一个scheduleServer的内部线程间如何进行任务分片?

答复:1、数据分片是在不同的jvm,获知同一个jvm中不同的线程组间起作用。在同一个线程组内的10个线程,是通过一个同步的任务队列来实现的。2、每个线程从队列中取任务执行,如果没有任务了,则由一个线程负责调用selectTasks方法再获取一批新的任务。

主要是设置休眠时间:即selectTasks方法返回列表的size为0后,进入休眠。休眠完成之后重新执行该定时任务

2、任务项设置的意义和selectTasks方法的参数含义

答复:1、 任务项(0,1,2,3,4,5,6,7,8,9)就是任务分片的策略。这个配置就是把数据分成10片。可以表示 ID的最后一位,也可以是一个独立的字段。根据你的业务来定。

2、 如果只有1组线程,则所有的任务片都分配给他。这时selectTasks方法的参数:taskItemNum =10, queryCondition由10个元素,分别对应0,1,2,3,4,5,6,7,8,9。

a) 如果只有2组线程,则任务片被分成两份。这时

b) 一个线程组的selectTasks方法的参数:taskItemNum =10, queryCondition有5个元素(0,2,4,6,8)

3、 另外一个线程组的selectTasks方法的参数:taskItemNum =10, queryCondition有5个元素(1,3,5,7,9)

4、 如果有10个线程组。则每组线程只会获取到1个任务片。这时selectTasks方法的参数:taskItemNum =10, queryCondition只有一个元素,对应0到9中的一个。

1、 执行期间和时间的修改功能 a) 在创建任务和修改任务的时候,有两个属性(执行开始时间,执行结算时间)用于控制任务的执行时间。 b) 时间格式遵循标准的cron格式 http://dogstar.javaeye.com/blog/116130 还增强了原来不支持的倒数第几天的能力。 c) 当时间到底开始时间的时候,就开始执行任务,到达结束时间则终止调度(不管是否所有的任务都处理完)。如果没有设置执行结束时间。则一直运行,直到selectTasks返回的记录数为0,就终止执行。等待下个开始运行时间在启动。 d) 如果要动态修改任务的执行时间区间,则先 点击“暂停”按钮,等所有的服务器都停止完毕(大概需要几秒时间)。当再次单击任务,出现如下情形表示停止完毕。然后修改执行开始时间,执行结算时间。在恢复任务调度,就可以实现调度时间的修改 2、 任务处理的问题 a) Schedule主要是提供任务调度的分配管理。每一个任务是否执行成功,是通过业务方的bean来实现的。 b) 你需求的例子,我理解的解决方案如下: i. 你从云梯拉下来100万数据放到保险应用的数据库中。这个表中有两个关键字段USER_ID和STS(状态 0-未发送,1-已发送) ii. 在bean的selectTasks方法的查询sql中除了根据任务进行分片外,还需要增加状态条件。例如 USER_ID % 10 in( ?,?,?) AND sts =0 iii. 在bean的execute方法中,在发送完消息后,你还需要 修改数据状态 update table STS =1 where USER_ID =? 。这样下次就不会取到这条数据了。 iv. 这样就可以保障机器重新启动后,也不会出现问题。你可以参考DBDemoSingle.java的实现模式。你使用的接口应该是IScheduleTaskDealSingle。 如果旺旺的接口支持批量发送消息的时候,你才需要使用IScheduleTaskDealMulti接口。


转:https://my.oschina.net/u/1867229/blog/850598



推荐阅读
  • 使用Vultr云服务器和Namesilo域名搭建个人网站
    本文详细介绍了如何通过Vultr云服务器和Namesilo域名搭建一个功能齐全的个人网站,包括购买、配置服务器以及绑定域名的具体步骤。文章还提供了详细的命令行操作指南,帮助读者顺利完成建站过程。 ... [详细]
  • 深入解析Spring Cloud Ribbon负载均衡机制
    本文详细介绍了Spring Cloud中的Ribbon组件如何实现服务调用的负载均衡。通过分析其工作原理、源码结构及配置方式,帮助读者理解Ribbon在分布式系统中的重要作用。 ... [详细]
  • 本文介绍如何使用 Python 将一个字符串按照指定的行和元素分隔符进行两次拆分,最终将字符串转换为矩阵形式。通过两种不同的方法实现这一功能:一种是使用循环与 split() 方法,另一种是利用列表推导式。 ... [详细]
  • 如何配置Unturned服务器及其消息设置
    本文详细介绍了Unturned服务器的配置方法和消息设置技巧,帮助用户了解并优化服务器管理。同时,提供了关于云服务资源操作记录、远程登录设置以及文件传输的相关补充信息。 ... [详细]
  • DNN Community 和 Professional 版本的主要差异
    本文详细解析了 DotNetNuke (DNN) 的两种主要版本:Community 和 Professional。通过对比两者的功能和附加组件,帮助用户选择最适合其需求的版本。 ... [详细]
  • 将Web服务部署到Tomcat
    本文介绍了如何在JDeveloper 12c中创建一个Java项目,并将其打包为Web服务,然后部署到Tomcat服务器。内容涵盖从项目创建、编写Web服务代码、配置相关XML文件到最终的本地部署和验证。 ... [详细]
  • UNP 第9章:主机名与地址转换
    本章探讨了用于在主机名和数值地址之间进行转换的函数,如gethostbyname和gethostbyaddr。此外,还介绍了getservbyname和getservbyport函数,用于在服务器名和端口号之间进行转换。 ... [详细]
  • 本文探讨了如何在给定整数N的情况下,找到两个不同的整数a和b,使得它们的和最大,并且满足特定的数学条件。 ... [详细]
  • 本文详细介绍了Java中org.w3c.dom.Text类的splitText()方法,通过多个代码示例展示了其实际应用。该方法用于将文本节点在指定位置拆分为两个节点,并保持在文档树中。 ... [详细]
  • 本文详细介绍如何使用Samba软件配置CIFS文件共享服务,涵盖安装、配置、权限管理及多用户挂载等关键步骤。通过具体示例和命令行操作,帮助读者快速搭建并优化Samba服务器。 ... [详细]
  • 本文介绍了如何通过 Maven 依赖引入 SQLiteJDBC 和 HikariCP 包,从而在 Java 应用中高效地连接和操作 SQLite 数据库。文章提供了详细的代码示例,并解释了每个步骤的实现细节。 ... [详细]
  • Scala 实现 UTF-8 编码属性文件读取与克隆
    本文介绍如何使用 Scala 以 UTF-8 编码方式读取属性文件,并实现属性文件的克隆功能。通过这种方式,可以确保配置文件在多线程环境下的一致性和高效性。 ... [详细]
  • 本文详细探讨了JDBC(Java数据库连接)的内部机制,重点分析其作为服务提供者接口(SPI)框架的应用。通过类图和代码示例,展示了JDBC如何注册驱动程序、建立数据库连接以及执行SQL查询的过程。 ... [详细]
  • 本文介绍了如何利用npm脚本和concurrently工具,实现本地开发环境中多个监听服务的同时启动,包括HTTP服务、自动刷新、Sass和ES6支持。 ... [详细]
  • 本文介绍如何在现有网络中部署基于Linux系统的透明防火墙(网桥模式),以实现灵活的时间段控制、流量限制等功能。通过详细的步骤和配置说明,确保内部网络的安全性和稳定性。 ... [详细]
author-avatar
手浪用户2602914837
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有