热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

四、Azkaban各种类型的Job编写

一、概述原生的Azkaban支持的plugin类型有以下这些:command:Linuxshell命令行任务gobblin:通用数据采集工具hadoopJava:运行hadoopM

一、概述

原生的 Azkaban 支持的plugin类型有以下这些:

  1. command:Linux shell命令行任务
  2. gobblin:通用数据采集工具
  3. hadoopJava:运行hadoopMR任务
  4. java:原生java任务
  5. hive:支持执行hiveSQL
  6. pig:pig脚本任务
  7. spark:spark任务
  8. hdfsToTeradata:把数据从hdfs导入Teradata
  9. teradataToHdfs:把数据从Teradata导入hdfs

其中最简单而且最常用的是command类型,我们在上一篇文章中已经描述了如何编写一个command的job任务。如果使用command类型,效果其实跟在本地执行Linux shell命令一样,这样的话,还不如把shell放到crontable 中运行。所以我们把重点放到Azkaban支持的比较常用的四种类型:java、hadoopJava、hive、spark

二、java类型

1、代码编写:MyJavaJob.java

package com.dataeye.java;
public class MyJavaJob {
public static void main(String[] args) {
System.out.println("#################################");
System.out.println("#### MyJavaJob class exec... ###");
System.out.println("#################################");
}
}

2、打包成jar文件:使用maven或者eclipse导出为jar文件

3、编写job文件:java.job

type=javaprocess
classpath=./lib/*,${azkaban.home}/lib/*
java.class=com.dataeye.java.MyJavaJob

4、组成一个完整的运行包
新建一个目录,在该目录下创建一个lib文件夹,把第二步打包的jar文件放到这里,把job文件放到和lib文件夹同一级的目录下,如下所示:

《四、Azkaban各种类型的Job编写》 完整的运行包

5、打包成zip文件

把lib目录和job文件打包成zip文件,如下的java.zip:

《四、Azkaban各种类型的Job编写》 zip文件

6、提交运行,过程跟之前文章介绍的步骤一样,不再详述,执行结果如下:

《四、Azkaban各种类型的Job编写》 执行结果

从输出日志可以看出,代码已经正常执行。

以上是java类型的任务编写和执行的过程。接下来介绍其他任务编写的时候,只会介绍代码的编写和job的编写,其他过程与上面的一致。

三、hadoopJava类型

1、数据准备

以下内容是运行wordcount任务时需要的输入文件input.txt:

1 Ross male 33 3674
2 Julie male 42 2019
3 Gloria female 45 3567
4 Carol female 36 2813
5 Malcolm male 42 2856
6 Joan female 22 2235
7 Niki female 27 3682
8 Betty female 20 3001
9 Linda male 21 2511
10 Whitney male 35 3075
11 Lily male 27 3645
12 Fred female 39 2202
13 Gary male 28 3925
14 William female 38 2056
15 Charles male 48 2981
16 Michael male 25 2606
17 Karl female 32 2260
18 Barbara male 39 2743
19 Elizabeth female 26 2726
20 Helen female 47 2457
21 Katharine male 45 3638
22 Lee female 43 3050
23 Ann male 35 2874
24 Diana male 37 3929
25 Fiona female 45 2955
26 Bob female 21 3382
27 John male 48 3677
28 Thomas female 22 2784
29 Dean male 38 2266
30 Paul female 31 2679

把input.txt文件拷贝到hdfs的 /data/yann/input 目录下

2、代码准备:

package azkaban.jobtype.examples.java;
import azkaban.jobtype.javautils.AbstractHadoopJob;
import azkaban.utils.Props;
import java.io.IOException;
import java.util.Iterator;
import java.util.StringTokenizer;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.TextOutputFormat;
import org.apache.log4j.Logger;
public class WordCount extends AbstractHadoopJob
{
private static final Logger logger = Logger.getLogger(WordCount.class);
private final String inputPath;
private final String outputPath;
private boolean forceOutputOverrite;
public WordCount(String name, Props props)
{
super(name, props);
this.inputPath = props.getString("input.path");
this.outputPath = props.getString("output.path");
this.forceOutputOverrite = props.getBoolean("force.output.overwrite", false);
}
public void run()
throws Exception
{
logger.info(String.format("Starting %s", new Object[] { getClass().getSimpleName() }));
JobConf jobcOnf= getJobConf();
jobconf.setJarByClass(WordCount.class);
jobconf.setOutputKeyClass(Text.class);
jobconf.setOutputValueClass(IntWritable.class);
jobconf.setMapperClass(Map.class);
jobconf.setReducerClass(Reduce.class);
jobconf.setInputFormat(TextInputFormat.class);
jobconf.setOutputFormat(TextOutputFormat.class);
FileInputFormat.addInputPath(jobconf, new Path(this.inputPath));
FileOutputFormat.setOutputPath(jobconf, new Path(this.outputPath));
if (this.forceOutputOverrite)
{
FileSystem fs = FileOutputFormat.getOutputPath(jobconf).getFileSystem(jobconf);
fs.delete(FileOutputFormat.getOutputPath(jobconf), true);
}
super.run();
}
public static class Map extends MapReduceBase
implements Mapper
{
private static final IntWritable One= new IntWritable(1);
private Text word = new Text();
private long numRecords = 0L;
public void map(LongWritable key, Text value, OutputCollector output, Reporter reporter) throws IOException
{
String line = value.toString();
StringTokenizer tokenizer = new StringTokenizer(line);
while (tokenizer.hasMoreTokens()) {
this.word.set(tokenizer.nextToken());
output.collect(this.word, one);
reporter.incrCounter(Counters.INPUT_WORDS, 1L);
}
if (++this.numRecords % 100L == 0L)
reporter.setStatus("Finished processing " + this.numRecords + " records " + "from the input file");
}
static enum Counters
{
INPUT_WORDS;
}
}
public static class Reduce extends MapReduceBase
implements Reducer
{
public void reduce(Text key, Iterator values, OutputCollector output, Reporter reporter)
throws IOException
{
int sum = 0;
while (values.hasNext()) {
sum += ((IntWritable)values.next()).get();
}
output.collect(key, new IntWritable(sum));
}
}
}

3、编写job文件

wordcount.job文件内容如下:

type=hadoopJava
job.extend=false
job.class=azkaban.jobtype.examples.java.WordCount
classpath=./lib/*,${azkaban.home}/lib/*
force.output.overwrite=true
input.path=/data/yann/input
output.path=/data/yann/output

这样hadoopJava类型的任务已经完成,打包提交到Azkaban中执行即可

四、hive类型

1、编写 hive.sql文件

use azkaban;
INSERT OVERWRITE TABLE
user_table1 PARTITION (day_p='2017-02-08')
SELECT appid,uid,country,province,city
FROM user_table0 where adType=1;

以上是标准的hive的sql脚本,首先切换到azkaban数据库,然后把user_table0 的数据插入到user_table1 表的指定day_p分区。需要先准备好 user_table0 和 user_table1 表结构和数据。

编写完成后,把文件放入 res 文件夹中。

2、编写hive.job文件

type=hive
user.to.proxy=azkaban
classpath=./lib/*,${azkaban.home}/lib/*
azk.hive.action=execute.query
hive.script=res/hive.sql

关键的参数是 hive.script,该参数指定使用的sql脚本在 res目录下的hive.sql文件

五、spark类型

spark任务有两种运行方式,一种是command类型,另一种是spark类型

首先准备好spark任务的代码

package com.dataeye.template.spark
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.{SQLContext}
object WordCount {
def main(args: Array[String]) {
if (args.length <1) {
System.err.println("Usage:WordCount ")
System.exit(1)
}
System.out.println("get first param ==> " + args(0))
System.out.println("get second param ==> " + args(1))
/** spark 2.0的方式
* val spark = SparkSession.builder().appName("WordCount").getOrCreate()
*/
val sc = new SparkContext(new SparkConf().setAppName("WordCount"))
val spark = new SQLContext(sc)
val file = spark.sparkContext.textFile(args(0))
val wordCounts = file.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey(_ + _)
// 数据collect 到driver端打印
wordCounts.collect().foreach(println _)
}
}

然后准备数据,数据就使用前面hadoopJava中的数据即可。

最后打包成jar文件:spark-template-1.0-SNAPSHOT.jar

1、command类型

command类型的配置方式比较简单,spark.job文件如下:

type=command
command=${spark.home}/bin/spark-submit --master yarn-cluster --class com.dataeye.template.spark.WordCount lib/spark-template-1.0-SNAPSHOT.jar hdfs://de-hdfs/data/yann/info.txt paramtest

2、spark类型

type=spark
master=yarn-cluster
execution-jar=lib/spark-template-1.0-SNAPSHOT.jar
class=com.dataeye.template.spark.WordCount
params=hdfs://de-hdfs/data/yann/info.txt paramtest

以上就是Azkaban支持的几种常用的任务类型。


推荐阅读
  • 本文记录了在vue cli 3.x中移除console的一些采坑经验,通过使用uglifyjs-webpack-plugin插件,在vue.config.js中进行相关配置,包括设置minimizer、UglifyJsPlugin和compress等参数,最终成功移除了console。同时,还包括了一些可能出现的报错情况和解决方法。 ... [详细]
  • 一、Hadoop来历Hadoop的思想来源于Google在做搜索引擎的时候出现一个很大的问题就是这么多网页我如何才能以最快的速度来搜索到,由于这个问题Google发明 ... [详细]
  • 近年来,大数据成为互联网世界的新宠儿,被列入阿里巴巴、谷歌等公司的战略规划中,也在政府报告中频繁提及。据《大数据人才报告》显示,目前全国大数据人才仅46万,未来3-5年将出现高达150万的人才缺口。根据领英报告,数据剖析人才供应指数最低,且跳槽速度最快。中国商业结合会数据剖析专业委员会统计显示,未来中国基础性数据剖析人才缺口将高达1400万。目前BAT企业中,60%以上的招聘职位都是针对大数据人才的。 ... [详细]
  • 本文介绍了Java工具类库Hutool,该工具包封装了对文件、流、加密解密、转码、正则、线程、XML等JDK方法的封装,并提供了各种Util工具类。同时,还介绍了Hutool的组件,包括动态代理、布隆过滤、缓存、定时任务等功能。该工具包可以简化Java代码,提高开发效率。 ... [详细]
  • 本文介绍了在Win10上安装WinPythonHadoop的详细步骤,包括安装Python环境、安装JDK8、安装pyspark、安装Hadoop和Spark、设置环境变量、下载winutils.exe等。同时提醒注意Hadoop版本与pyspark版本的一致性,并建议重启电脑以确保安装成功。 ... [详细]
  • HDFS2.x新特性
    一、集群间数据拷贝scp实现两个远程主机之间的文件复制scp-rhello.txtroothadoop103:useratguiguhello.txt推pushscp-rr ... [详细]
  • [大整数乘法] java代码实现
    本文介绍了使用java代码实现大整数乘法的过程,同时也涉及到大整数加法和大整数减法的计算方法。通过分治算法来提高计算效率,并对算法的时间复杂度进行了研究。详细代码实现请参考文章链接。 ... [详细]
  • 本文整理了Java面试中常见的问题及相关概念的解析,包括HashMap中为什么重写equals还要重写hashcode、map的分类和常见情况、final关键字的用法、Synchronized和lock的区别、volatile的介绍、Syncronized锁的作用、构造函数和构造函数重载的概念、方法覆盖和方法重载的区别、反射获取和设置对象私有字段的值的方法、通过反射创建对象的方式以及内部类的详解。 ... [详细]
  • 本文介绍了在RHEL 7中的系统日志管理和网络管理。系统日志管理包括rsyslog和systemd-journal两种日志服务,分别介绍了它们的特点、配置文件和日志查询方式。网络管理主要介绍了使用nmcli命令查看和配置网络接口的方法,包括查看网卡信息、添加、修改和删除配置文件等操作。 ... [详细]
  • 本文整理了Java中org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc.getTypeInfo()方法的一些代码示例,展 ... [详细]
  •     这里使用自己编译的hadoop-2.7.0版本部署在windows上,记得几年前,部署hadoop需要借助于cygwin,还需要开启ssh服务,最近发现,原来不需要借助cy ... [详细]
  • Hadoop2.6.0 + 云centos +伪分布式只谈部署
    3.0.3玩不好,现将2.6.0tar.gz上传到usr,chmod-Rhadoop:hadophadoop-2.6.0,rm掉3.0.32.在etcp ... [详细]
  • Maven构建Hadoop,
    Maven构建Hadoop工程阅读目录序Maven安装构建示例下载系列索引 序  上一篇,我们编写了第一个MapReduce,并且成功的运行了Job,Hadoop1.x是通过ant ... [详细]
  • 什么是大数据lambda架构
    一、什么是Lambda架构Lambda架构由Storm的作者[NathanMarz]提出,根据维基百科的定义,Lambda架构的设计是为了在处理大规模数 ... [详细]
  • 2018-02-1420:07:13,610ERROR[main]regionserver.HRegionServerCommandLine:Regionserverexiting ... [详细]
author-avatar
黄宗翰琼琦莉雯
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有