热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

flink异步iomysql缓存_Flink异步IO访问外部数据(mysql篇)

最近看了大佬的博客,突然想起AsyncIO方式是Blink推给社区的一大重要功能,可以使用异步的方式获取外部数据,想着自己实现以下

最近看了大佬的博客,突然想起Async I/O方式是Blink 推给社区的一大重要功能,可以使用异步的方式获取外部数据,想着自己实现以下,项目上用的时候,可以不用现去找了。

最开始想用scala 实现一个读取 hbase数据的demo,参照官网demo:

/**

* An implementation of the 'AsyncFunction' that sends requests and sets the callback.

*/

class AsyncDatabaseRequest extends AsyncFunction[String, (String, String)] {

/** The database specific client that can issue concurrent requests with callbacks */

lazy val client: DatabaseClient = new DatabaseClient(host, post, credentials)

/** The context used for the future callbacks */

implicit lazy val executor: ExecutionContext = ExecutionContext.fromExecutor(Executors.directExecutor())

override def asyncInvoke(str: String, resultFuture: ResultFuture[(String, String)]): Unit = {

// issue the asynchronous request, receive a future for the result

val resultFutureRequested: Future[String] = client.query(str)

// set the callback to be executed once the request by the client is complete

// the callback simply forwards the result to the result future

resultFutureRequested.onSuccess {

case result: String => resultFuture.complete(Iterable((str, result)))

}

}

}

// create the original stream

val stream: DataStream[String] = ...

// apply the async I/O transformation

val resultStream: DataStream[(String, String)] =

AsyncDataStream.unorderedWait(stream, new AsyncDatabaseRequest(), 1000, TimeUnit.MILLISECONDS, 100)

失败了,上图标红的部分实现不了

1、Future 找不到可以用的实现类

2、unorderedWait 一直报错

源码example 里面也有Scala 的案例

def main(args: Array[String]) {

val timeout = 10000L

val env = StreamExecutionEnvironment.getExecutionEnvironment

val input = env.addSource(new SimpleSource())

val asyncMapped = AsyncDataStream.orderedWait(input, timeout, TimeUnit.MILLISECONDS, 10) {

(input, collector: ResultFuture[Int]) =>

Future {

collector.complete(Seq(input))

} (ExecutionContext.global)

}

asyncMapped.print()

env.execute("Async I/O job")

}

主要部分是这样的,菜鸡表示无力,想继承RichAsyncFunction,可以使用open 方法初始化链接。

网上博客翻了不少,大部分是翻译官网的原理,案例也没有可以执行的,苦恼。

失败了。

下面开始上mysql 版本 的 源码(hbase 的还没测试过,本机的hbase 挂了):

业务如下:

接收kafka数据,转为user对象,调用async,使用user.id 查询对应的phone,放回user对象,输出

主类:

import com.alibaba.fastjson.JSON;

import com.venn.common.Common;

import org.apache.flink.formats.json.JsonNodeDeserializationSchema;

import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;

import org.apache.flink.streaming.api.datastream.AsyncDataStream;

import org.apache.flink.streaming.api.datastream.DataStream;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;

import java.util.concurrent.TimeUnit;

public class AsyncMysqlRequest {

public static void main(String[] args) throws Exception {

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

FlinkKafkaConsumer source &#61; new FlinkKafkaConsumer<>("async", new JsonNodeDeserializationSchema(), Common.getProp());

// 接收kafka数据&#xff0c;转为User 对象

DataStream input &#61; env.addSource(source).map(value -> {

String id &#61; value.get("id").asText();

String username &#61; value.get("username").asText();

String password &#61; value.get("password").asText();

return new User(id, username, password);

});

// 异步IO 获取mysql数据, timeout 时间 1s&#xff0c;容量 100(超过100个请求&#xff0c;会反压上游节点)

DataStream async &#61; AsyncDataStream.unorderedWait(input, new AsyncFunctionForMysqlJava(), 1000, TimeUnit.MICROSECONDS, 100);

async.map(user -> {

return JSON.toJSON(user).toString();

})

.print();

env.execute("asyncForMysql");

}

}

函数类&#xff1a;

import org.apache.flink.configuration.Configuration;

import org.apache.flink.streaming.api.functions.async.ResultFuture;

import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

import java.sql.DriverManager;

import java.sql.PreparedStatement;

import java.sql.ResultSet;

import java.util.ArrayList;

import java.util.List;

public class AsyncFunctionForMysqlJava extends RichAsyncFunction {

// 链接

private static String jdbcUrl &#61; "jdbc:mysql://192.168.229.128:3306?useSSL&#61;false";

private static String username &#61; "root";

private static String password &#61; "123456";

private static String driverName &#61; "com.mysql.jdbc.Driver";

java.sql.Connection conn;

PreparedStatement ps;

Logger logger &#61; LoggerFactory.getLogger(AsyncFunctionForMysqlJava.class);

/**

* open 方法中初始化链接

* &#64;param parameters

* &#64;throws Exception

*/

&#64;Override

public void open(Configuration parameters) throws Exception {

logger.info("async function for hbase java open ...");

super.open(parameters);

Class.forName(driverName);

conn &#61; DriverManager.getConnection(jdbcUrl, username, password);

ps &#61; conn.prepareStatement("select phone from async.async_test where id &#61; ?");

}

/**

* use user.getId async get user phone

*

* &#64;param user

* &#64;param resultFuture

* &#64;throws Exception

*/

&#64;Override

public void asyncInvoke(User user, ResultFuture resultFuture) throws Exception {

// 使用 user id 查询

ps.setString(1, user.getId());

ResultSet rs &#61; ps.executeQuery();

String phone &#61; null;

if (rs.next()) {

phone &#61; rs.getString(1);

}

user.setPhone(phone);

List list &#61; new ArrayList();

list.add(user);

// 放回 result 队列

resultFuture.complete(list);

}

&#64;Override

public void timeout(User input, ResultFuture resultFuture) throws Exception {

logger.info("Async function for hbase timeout");

List list &#61; new ArrayList();

list.add(input);

resultFuture.complete(list);

}

/**

* close function

*

* &#64;throws Exception

*/

&#64;Override

public void close() throws Exception {

logger.info("async function for hbase java close ...");

super.close();

conn.close();

}

}

测试数据如下&#xff1a;

{"id" : 1, "username" : "venn", "password" : 1561709530935}

{"id" : 2, "username" : "venn", "password" : 1561709536029}

{"id" : 3, "username" : "venn", "password" : 1561709541033}

{"id" : 4, "username" : "venn", "password" : 1561709546037}

{"id" : 5, "username" : "venn", "password" : 1561709551040}

{"id" : 6, "username" : "venn", "password" : 1561709556044}

{"id" : 7, "username" : "venn", "password" : 1561709561048}

执行结果如下&#xff1a;

1> {"password":"1561709536029","phone":"12345678911","id":"2","username":"venn"}

1> {"password":"1561709541033","phone":"12345678912","id":"3","username":"venn"}

1> {"password":"1561709546037","phone":"12345678913","id":"4","username":"venn"}

1> {"password":"1561709551040","id":"5","username":"venn"} # 关联不上&#xff0c;原样返回

1> {"password":"1561709556044","id":"6","username":"venn"}

1> {"password":"1561709561048","id":"7","username":"venn"}

hbase、redis或其他实现类似&#xff0c;继承AsyncStreamFunction&#xff0c;实现

方法



推荐阅读
  • 本文介绍了Java后台Jsonp处理方法及其应用场景。首先解释了Jsonp是一个非官方的协议,它允许在服务器端通过Script tags返回至客户端,并通过javascript callback的形式实现跨域访问。然后介绍了JSON系统开发方法,它是一种面向数据结构的分析和设计方法,以活动为中心,将一连串的活动顺序组合成一个完整的工作进程。接着给出了一个客户端示例代码,使用了jQuery的ajax方法请求一个Jsonp数据。 ... [详细]
  • 在重复造轮子的情况下用ProxyServlet反向代理来减少工作量
    像不少公司内部不同团队都会自己研发自己工具产品,当各个产品逐渐成熟,到达了一定的发展瓶颈,同时每个产品都有着自己的入口,用户 ... [详细]
  • Todayatworksomeonetriedtoconvincemethat:今天在工作中有人试图说服我:{$obj->getTableInfo()}isfine ... [详细]
  • 1简介本文结合数字信号处理课程和Matlab程序设计课程的相关知识,给出了基于Matlab的音乐播放器的总体设计方案,介绍了播放器主要模块的功能,设计与实现方法.我们将该设 ... [详细]
  • 在Android开发中,使用Picasso库可以实现对网络图片的等比例缩放。本文介绍了使用Picasso库进行图片缩放的方法,并提供了具体的代码实现。通过获取图片的宽高,计算目标宽度和高度,并创建新图实现等比例缩放。 ... [详细]
  • t-io 2.0.0发布-法网天眼第一版的回顾和更新说明
    本文回顾了t-io 1.x版本的工程结构和性能数据,并介绍了t-io在码云上的成绩和用户反馈。同时,还提到了@openSeLi同学发布的t-io 30W长连接并发压力测试报告。最后,详细介绍了t-io 2.0.0版本的更新内容,包括更简洁的使用方式和内置的httpsession功能。 ... [详细]
  • springmvc学习笔记(十):控制器业务方法中通过注解实现封装Javabean接收表单提交的数据
    本文介绍了在springmvc学习笔记系列的第十篇中,控制器的业务方法中如何通过注解实现封装Javabean来接收表单提交的数据。同时还讨论了当有多个注册表单且字段完全相同时,如何将其交给同一个控制器处理。 ... [详细]
  • 利用Visual Basic开发SAP接口程序初探的方法与原理
    本文介绍了利用Visual Basic开发SAP接口程序的方法与原理,以及SAP R/3系统的特点和二次开发平台ABAP的使用。通过程序接口自动读取SAP R/3的数据表或视图,在外部进行处理和利用水晶报表等工具生成符合中国人习惯的报表样式。具体介绍了RFC调用的原理和模型,并强调本文主要不讨论SAP R/3函数的开发,而是针对使用SAP的公司的非ABAP开发人员提供了初步的接口程序开发指导。 ... [详细]
  • 本文详细介绍了如何使用MySQL来显示SQL语句的执行时间,并通过MySQL Query Profiler获取CPU和内存使用量以及系统锁和表锁的时间。同时介绍了效能分析的三种方法:瓶颈分析、工作负载分析和基于比率的分析。 ... [详细]
  • Android自定义控件绘图篇之Paint函数大汇总
    本文介绍了Android自定义控件绘图篇中的Paint函数大汇总,包括重置画笔、设置颜色、设置透明度、设置样式、设置宽度、设置抗锯齿等功能。通过学习这些函数,可以更好地掌握Paint的用法。 ... [详细]
  • 本文总结了在编写JS代码时,不同浏览器间的兼容性差异,并提供了相应的解决方法。其中包括阻止默认事件的代码示例和猎取兄弟节点的函数。这些方法可以帮助开发者在不同浏览器上实现一致的功能。 ... [详细]
  • 开发笔记:图像识别基于主成分分析算法实现人脸二维码识别
    篇首语:本文由编程笔记#小编为大家整理,主要介绍了图像识别基于主成分分析算法实现人脸二维码识别相关的知识,希望对你有一定的参考价值。 ... [详细]
  • 时域|波形_语音处理基于matlab GUI音频数据处理含Matlab源码 1734期
    篇首语:本文由编程笔记#小编为大家整理,主要介绍了语音处理基于matlabGUI音频数据处理含Matlab源码1734期相关的知识,希望对你有一定的参考价值。 ... [详细]
  • java多线程获取线程返回结果
    我们在使用java多线程编写相关业务代码时,往往有这样一种情况,某个线程依赖于其他线程执行结果。也就是说,我们需要在一个线程中获取另一个线程的信息。可以分为两种情况,一种是轮询,一 ... [详细]
  • 嵌套函数定义时先判断function_exists防止递归调用外部函数导致两次定义内部函数导致致命错误看一下PHP手册中是如何说的: ... [详细]
author-avatar
小梦茜呦_163
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有