热门标签 | HotTags
当前位置:  开发笔记 > 运维 > 正文

Akka2使用探索4(Actors)

ask异步发送一条消息并返回一个Future代表一个可能的回应。需要采用Future的处理模式。每一个消息发送者分别保证自己的消息的次序.try{Stringresultoperation();getSender().tell(result);}catch(Exceptione){getSender().tell(newakka.a

ask 异步发送一条消息并返回一个 Future 代表一个可能的回应。需要采用Future的处理模式。 每一个消息发送者分别保证自己的消息的次序. try { String result = operation(); getSender().tell(result); } catch (Exception e) { getSender().tell(new akka.a

ask 异步发送一条消息并返回一个 Future代表一个可能的回应。需要采用Future的处理模式。

每一个消息发送者分别保证自己的消息的次序. try {
String result = operation();
getSender().tell(result);
} catch (Exception e) {
getSender().tell(new akka.actor.Status.Failure(e));
throw e;
}

ask使用方式如下:

    List futures = []
        AkkaClientNoReply client = new AkkaClientNoReply("akka://xw@127.0.0.1:8888/user/server")
        client.send("hello")
        0.upto(15) {
            futures <
        //模拟客户端给服务端发0——15消息,服务器处理(把数&#20540;&#43;1返回给客户端)
        }

        final Future aggregate = Futures.sequence(futures, client.system.dispatcher());
        final Future transformed = aggregate.map(new Mapper() {
            public Integer apply(Iterable coll) {
                final Iterator it = coll.iterator();
                int count = 0;
                while (it.hasNext()) {
                    int x = (Integer) it.next();
                    count = count &#43; x
                }
                return new Integer(count);
            }
        });

        AkkaServerApp app = new AkkaServerApp("resultHandler", "127.0.0.1", 6666, "result")
        app.messageProcessor = {msg, UntypedActorContext context ->
            log.info("1到16之和为" &#43; msg)
        }
        app.startup()

        akka.pattern.Patterns.pipe(transformed).to(app.serverActor)


如果服务端处理消息时发生了异常而导致没有给客户端回应,那么客户端收到的结果将会收到Timeout的Failure:Failure(akka.pattern.AskTimeoutException: Timed out)。可以将异常捕获用Failure封装异常发给客户端:actor.tell(new akka.actor.Status.Failure(e))。

Future的onComplete, onResult, 或 onTimeout 方法可以用来注册一个回调,以便在Future完成时得到通知。从而提供一种避免阻塞的方法。

警告

在使用future回调如 onComplete, onSuccess, and onFailure时, 在actor内部你要小心避免捕捉该actor的引用, i.e. 不要在回调中调用该actor的方法或访问其可变状态。这会破坏actor的封装,会引用同步bug和race condition, 因为回调会与此actor一同被并发调度。 不幸的是目前还没有一种编译时的方法能够探测到这种非法访问。

转发消息

你可以将消息从一个actor转发给另一个。虽然经过了一个‘中转’,但最初的发送者地址/引用将保持不变。当实现功能类&#20284;路由器、负载均衡器、备份等的actor时会很有用。

myActor.forward(message, getContext());

回应消息

getSender().tell(replyMsg)

如果没有sender (不是从actor发送的消息或者没有future上下文) 那么 sender 缺省为“dead-letter” actor的引用.

初始化接收消息超时

设置receiveTimeout 属性并声明一个处理 ReceiveTimeout 对象的匹配分支。

public class MyReceivedTimeoutUntypedActor extends UntypedActor {
public MyReceivedTimeoutUntypedActor() {
getContext().setReceiveTimeout(Duration.parse("30 seconds"));
}
public void onReceive(Object message) {
if (message.equals("Hello")) {
getSender().tell("Hello world");
} else if (message == Actors.receiveTimeout()) {
throw new RuntimeException("received timeout");
} else {
unhandled(message);
}
}
}

终止Actor

通过调用ActorRefFactory i.e. ActorContextActorSystemstop 方法来终止一个actor , 通常 context 用来终止子actor,而 system 用来终止顶级actor. 实际的终止操作是异步执行的, i.e. stop 可能在actor被终止之前返回。

如果当前有正在处理的消息,对该消息的处理将在actor被终止之前完成,但是邮箱中的后续消息将不会被处理。缺省情况下这些消息会被送到 ActorSystem死信, 但是这取决于邮箱的实现。

actor的终止分两步: 第一步actor将停止对邮箱的处理,向所有子actor发送终止命令,然后处理来自子actor的终止消息直到所有的子actor都完成终止, 最后终止自己 (调用 postStop, 销毁邮箱, 向 DeathWatch 发布 Terminated, 通知其监管者). 这个过程保证actor系统中的子树以一种有序的方式终止, 将终止命令传播到叶子结点并收集它们回送的确认消息给被终止的监管者。如果其中某个actor没有响应 (i.e. 由于处理消息用了太长时间以至于没有收到终止命令), 整个过程将会被阻塞。

ActorSystem.shutdown被调用时, 系统根监管actor会被终止,以上的过程将保证整个系统的正确终止。

postStop hook 是在actor被完全终止以后调用。

PoisonPill

你也可以向actor发送 akka.actor.PoisonPill 消息, 这个消息处理完成后actor会被终止。 PoisonPill 与普通消息一样被放进队列,因此会在已经入队列的其它消息之后被执行。

优雅地终止

如果你想等待终止过程的结束,或者组合若干actor的终止次序,可以使用gracefulStop:

try {

Future stopped = akka.pattern.Patterns.gracefulStop(actorRef, Duration.create(5, TimeUnit.SECONDS), system);

Await.result(stopped, Duration.create(6, TimeUnit.SECONDS));

// the actor has been stopped

} catch (ActorTimeoutException e) {

// the actor wasn't stopped within 5 seconds

}

热拔插 Become/Unbecome

升级 Upgrade

Akka支持在运行时对Actor消息循环 (e.g. 的实现)进行实时替换: 在actor中调用 context.become 方法。

Become 要求一个 akka.japi.Procedure 参数作为新的消息处理实现。 被替换的代码被存在一个栈中,可以被push和pop。

降级

由于被热替换掉的代码存在栈中,你也可以对代码进行降级,只需要在actor中调用 context.unbecome 方法。

Killing actor

发送Kill消息给actor

Actor 与 异常

在消息被actor处理的过程中可能会抛出异常,例如数据库异常。

消息会怎样

如果消息处理过程中(即从邮箱中取出并交给receive后)发生了异常,这个消息将被丢失。必须明白它不会被放回到邮箱中。所以如果你希望重试对消息的处理,你需要自己抓住异常然后在异常处理流程中重试. 请确保你限制重试的次数,因为你不会希望系统产生活锁 (从而消耗大量CPU而于事无补)。

邮箱会怎样

如果消息处理过程中发生异常,邮箱没有任何变化。如果actor被重启,邮箱会被保留。邮箱中的所有消息不会丢失。

actor会怎样

如果抛出了异常,actor实例将被丢弃而生成一个新的实例。这个新的实例会被该actor的引用所引用(所以这个过程对开发人员来说是不可见的)。注意这意味着如果你不在preRestart 回调中进行保存,并在postRestart回调中恢复,那么失败的actor实例的当前状态会被丢失。

推荐阅读
  • [翻译]微服务设计模式5. 服务发现服务端服务发现
    服务之间需要互相调用,在单体架构中,服务之间的互相调用直接通过编程语言层面的方法调用就搞定了。在传统的分布式应用的部署中,服务地 ... [详细]
  • 在单位的一台4cpu的服务器上部署了esxserver,挂载了6个虚拟机,目前运行正常。在安装部署过程中,得到了cnvz.net论坛精华区 ... [详细]
  • ZooKeeper 学习
    前言相信大家对ZooKeeper应该不算陌生。但是你真的了解ZooKeeper是个什么东西吗?如果别人面试官让你给他讲讲ZooKeeper是个什么东西, ... [详细]
  • 抖音服务器带宽有多大,才能供上亿人同时刷?
    最近看到一个有意思的提问:抖音服务器带宽有多大,为什么能够供那么多人同时刷?今天来给大家科普一下。 ... [详细]
  • LVS-DR直接路由实现负载均衡示例
    nsitionalENhttp:www.w3.orgTRxhtml1DTDxhtml1-transitional.dtd ... [详细]
  • pm2常用的命令用法介绍pm2是一个带有负载均衡功能的Node应用的进程管理器.当你要把你的独立代码利用全部的服务器上的所有CPU,并保证进程永远都活着,0秒的重载, ... [详细]
  • 域名解析系统DNS
    文章目录前言一、域名系统概述二、因特网的域名结构三、域名服务器1.根域名服务器2.顶级域名服务器(TLD,top-leveldomain)3.权威(Authoritative)域名 ... [详细]
  • php网站设计实验报告,php网站开发实训报告
    本文目录一览:1、php动态网站设计的关键技术有哪些软件,及搭建步骤需要哪些页面,分别完成 ... [详细]
  • 什么是网关服务器初学linux服务器开发时,我们的服务器是很简单的,只需要一个程序完成与客户端的连接,接收客户端数据,数据处理,向客户端发送数据。但是在处理量很大的情况下,一 ... [详细]
  • 搭建Windows Server 2012 R2 IIS8.5+PHP(FastCGI)+MySQL环境的详细步骤
    本文详细介绍了搭建Windows Server 2012 R2 IIS8.5+PHP(FastCGI)+MySQL环境的步骤,包括环境说明、相关软件下载的地址以及所需的插件下载地址。 ... [详细]
  • 本文介绍了使用AJAX的POST请求实现数据修改功能的方法。通过ajax-post技术,可以实现在输入某个id后,通过ajax技术调用post.jsp修改具有该id记录的姓名的值。文章还提到了AJAX的概念和作用,以及使用async参数和open()方法的注意事项。同时强调了不推荐使用async=false的情况,并解释了JavaScript等待服务器响应的机制。 ... [详细]
  • PHP设置MySQL字符集的方法及使用mysqli_set_charset函数
    本文介绍了PHP设置MySQL字符集的方法,详细介绍了使用mysqli_set_charset函数来规定与数据库服务器进行数据传送时要使用的字符集。通过示例代码演示了如何设置默认客户端字符集。 ... [详细]
  • 本文详细介绍了云服务器API接口的概念和作用,以及如何使用API接口管理云上资源和开发应用程序。通过创建实例API、调整实例配置API、关闭实例API和退还实例API等功能,可以实现云服务器的创建、配置修改和销毁等操作。对于想要学习云服务器API接口的人来说,本文提供了详细的入门指南和使用方法。如果想进一步了解相关知识或阅读更多相关文章,请关注编程笔记行业资讯频道。 ... [详细]
  • 目录Atlas介绍Atlas部署Atlas基本管理Atlas结合MHA故障恢复读写分离建议Atlas介绍Atlas是由Qihoo360Web平台部基础架构团队开发维护的一个基于My ... [详细]
  • Nginxgaodaima.comnginx属于七层架构,支持的是http协议,本身对tcp协议没有支持。所以不能代理mysql等实现负载均衡。但是lvs这个东西不熟悉,主要是公司 ... [详细]
author-avatar
心星Lover
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有