我目前正在努力尝试使用rx实现tcp看门狗/重试系统,您的帮助将不胜感激.
有一个Observable,我想通过定期检查我们是否仍然可以写入套接字来获得一个Observable.很简单,我可以做这样的事情:
class SocketSubscribeFunc implements Observable.OnSubscribeFunc{ private final String hostname; private final int port; private Socket socket; SocketSubscribeFunc(String hostname, int port) { this.hostname = hostname; this.port = port; } public Subscription onSubscribe(final Observer super Socket> observer) { try { log.debug("Trying to connect..."); socket = new Socket(hostname, port); observer.onNext(socket); } catch (IOException e) { observer.onError(e); } return new Subscription() { public void unsubscribe() { try { socket.close(); } catch (IOException e) { e.printStackTrace(); } } }; } } Observable socketObservable = Observable.create(new SocketSubscribeFunc(hostname,port)); Observable watchdog = Observable.combineLatest(socketObservable, Observable.interval(1, TimeUnit.SECONDS), new Func2 () { public Boolean call(final Socket socket, final Long aLong) { try { socket.getOutputStream().write("ping\n".getBytes()); return true; } catch (IOException e) { return false; } } });
现在,我想重新连接如果可以获取套接字(服务器/链接在创建时关闭)或变得不可写(连接成功后服务器/链接无法访问).理想情况下,通过重新订阅套接字Observable,其OnSubscribeFunc使用重试运算符创建连接.正如您所看到的,这将在套接字和监视器Observables之间引入循环依赖关系.我用switchMap/materialize玩了一会儿......为了传播最终的错误无济于事.
我接近放弃这个想法并使用副作用代码中的主题.但是在全球范围内应该有一个更好的方式:)
提前致谢!
首先,我会避免Observable.create
大部分时间,因为它通常是不必要的,并引入了不必要的复杂性.在这种情况下,Rx有一个调用的运算符using
,它允许您创建一个在Observable的生命周期中存在的资源对象.它会自动捕获运行时错误,并提供一个dispose操作,因此这对于此用例中的套接字来说是完美的.我正在使用Java8 lambdas,因为它们更容易伪代码.
Observable.using( // Resource (socket) factory () -> { try { return new Socket(hostname, port); } catch (IOException e) { // Rx will propagate this as an onError event. throw new RuntimeException(e); } }, // Observable factory (socket) -> { return Observable.interval(1, TimeUnit.SECONDS) .map((unusedTick) { try { socket.getOutputStream().write("ping\n".getBytes()); return true; } catch (IOException e) { throw new RuntimeException(e); } }) // Retry the inner job up to 3 times before propagating. .retry(3); }, // Dispose action for socket. // In real life the close probably needs a try/catch. (socket) -> socket.close()) // Retry the outer job up to 3 times. .retry(3) // If we propagate all errors, emit a 'false', signaling service is not available. .onErrorResumeNext(Observable.just(false));
请注意,如果内部作业传播(在3次失败之后),这将重试外部作业.为了解决这个问题,您应该使用谓词和retryWhen检查重试时的文档.如果这不是内部作业传播的类型,则可以抛出特殊的RuntimeException并仅重试外部作业.
using
docs:http: //reactivex.io/RxJava/javadoc/rx/Observable.html#using(rx.functions.Func0,%20rx.functions.Func1,% 20rx.functions.Action1)