我一直在为RX Java使用scala绑定一段时间了,我正在考虑将它与Akka Actors结合起来.我想知道Observable
在Akka Actor
s 之间传递RX是否安全/可能.例如,一个程序可以打印最多20个(每秒)整数的正方形:
/* producer creates an observable and sends it to the worker */
object Producer extends Actor {
val toTwenty : Observable[Int] = Observable.interval(1 second).take(20)
def receive = {
case o : Observable[Int] =>
o.subscribe( onNext => println )
}
worker ! toTwenty
}
/* worker which returns squares of even numbers */
object Worker extends Actor {
def receive = {
case o : Observable[Int] =>
sender ! o filter { _ % 2 == 0 } map { _^2 }
}
}
(请将此视为伪代码;它不会编译).注意我是send
从一个actor到另一个actor的Observable.我想了解:
Akka和RX会自动同步对Observable的访问吗?
将Observable
不能在分布式系统发送-这是在本地内存中的对象的引用.但是,它会在本地工作吗?
假设在这个简单的例子中,工作将在subscribe
电话中安排Producer
.我可以拆分工作,以便分别在每个演员身上完成吗?
题外话:我看过一些看起来要结合RX和Actors的项目:
http://jmhofer.johoop.de/?p=507和 https://github.com/jmhofer/rxjava-akka
但这些不同之处在于它们并不是简单地Observable
在演员之间传递消息.他们首先调用subscribe()
获取值,然后将这些值发送到actor邮箱,并创建一个新Observable
的.还是我弄错了?