在Akka中,不是在使用?创建的未来响应中使用onComplete,而是尝试使用pipeTo,因为这应该是首选模式.但是,当未来超时时,我似乎没有收到任何Throwables或Failures.如果在使用pipeTo时发生超时,我应该在演员中收到什么?什么时候抛出不同的异常?示例代码:
class Simple(otherActor : ActorRef) extends Actor{ def receive = { case "some_msg" => { val implicit timeout = Timeout(1 seconds) val response = otherActor ? "hello" response pipeTo self } // case ??? // How do I handle timeouts? } }
如果超时发生时没有自动发送消息,我应该如何使用pipeTo处理超时?
如果你的例子与你的实际代码非常匹配,那么我不确定pipeTo
你想要的是什么.对我来说,将消息管道传递回给自己,并没有太多意义,并且对于一个演员将消息发送给另一个演员然后等待响应的情况有更好的解决方案.首先,我们来谈谈pipeTo
.我认为何时使用的一个很好的例子pipeTo
是你有三个演员,A,B和C. A向B发送一条消息,B又向C发送消息,并且B的响应应该在B之后返回给A.首先是别的东西.在那个例子中,你可以在B里面做这样的事情:
val fut = actorC ? someMessage fut map(someMapFunc) pipeTo sender
在这里,该pipeTo
函数有助于防止您意外关闭mutable sender
var,如果您改为使用类似的东西onComplete
并响应sender
该回调的内部.
现在,对于你的情况,如果你只想让A与B交谈,然后等待B的响应(并处理潜在的超时),你可以尝试这样的事情:
class ActorA extends Actor{ import context._ val myB = context.actorOf(Props[ActorB]) def receive = { case msg => myB ! msg setReceiveTimeout(2 seconds) become(waitingForResponse) } def waitingForResponse:Receive = { case ReceiveTimeout => println("got a receive timeout") cancelReceiveTimeout case response => println("got my response back") cancelReceiveTimeout } def cancelReceiveTimeout = setReceiveTimeout(Duration.Undefined) }
在此示例中,A以默认的receive
部分函数开始.当它收到一条消息时,它会向B发送另一条消息,设置一个接收超时,用于接收来自B的响应,然后将其receive
功能切换为特定于等待来自B的响应的内容.在新的接收功能中,我可以及时收到我的回复,或者我可以得到一个ReceiveTimeout
,表明我没有及时得到我的答复.在任何一种情况下,我都会取消我的接收超时,因为它会重复.
现在这是非常简化的,但我只是试图展示一种方法来在两个演员之间做一个来回,这似乎是你的例子所展示的.
未来的失败将作为akka.actor.Status.Failure
包含异常的消息发送.超时的例外是akka.pattern.AskTimeoutException
.