将RX Observable传递给actor(scala)是否安全?

 虽虽___Hui 发布于 2023-01-20 15:56

我一直在为RX Java使用scala绑定一段时间了,我正在考虑将它与Akka Actors结合起来.我想知道Observable在Akka Actors 之间传递RX是否安全/可能.例如,一个程序可以打印最多20个(每秒)整数的正方形:

/* producer creates an observable and sends it to the worker */
object Producer extends Actor {
  val toTwenty : Observable[Int] = Observable.interval(1 second).take(20)

  def receive = {
    case o : Observable[Int] =>
      o.subscribe( onNext => println )
  }

  worker ! toTwenty
}


/* worker which returns squares of even numbers */
object Worker extends Actor {
  def receive = {
    case o : Observable[Int] => 
       sender ! o filter { _ % 2 == 0 } map { _^2 }
  }
}

(请将此视为伪代码;它不会编译).注意我是send从一个actor到另一个actor的Observable.我想了解:

Akka和RX会自动同步对Observable的访问吗?

Observable不能在分布式系统发送-这是在本地内存中的对象的引用.但是,它会在本地工作吗?

假设在这个简单的例子中,工作将在subscribe电话中安排Producer.我可以拆分工作,以便分别在每个演员身上完成吗?

题外话:我看过一些看起来要结合RX和Actors的项目:

http://jmhofer.johoop.de/?p=507和 https://github.com/jmhofer/rxjava-akka

但这些不同之处在于它们并不是简单地Observable在演员之间传递消息.他们首先调用subscribe()获取值,然后将这些值发送到actor邮箱,并创建一个新Observable的.还是我弄错了?

撰写答案
今天,你开发时遇到什么问题呢?
立即提问
热门标签
PHP1.CN | 中国最专业的PHP中文社区 | PNG素材下载 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有