在rx-java中的套接字看门狗

 昆仑神奇_325 发布于 2023-01-19 14:08

我目前正在努力尝试使用rx实现tcp看门狗/重试系统,您的帮助将不胜感激.

有一个Observable,我想通过定期检查我们是否仍然可以写入套接字来获得一个Observable.很简单,我可以做这样的事情:

class SocketSubscribeFunc implements Observable.OnSubscribeFunc {
  private final String hostname;
  private final int port;
  private Socket socket;

  SocketSubscribeFunc(String hostname, int port) {
    this.hostname = hostname;
    this.port = port;
  }

  public Subscription onSubscribe(final Observer observer) {
    try {
      log.debug("Trying to connect...");
      socket = new Socket(hostname, port);
      observer.onNext(socket);
    } catch (IOException e) {
      observer.onError(e);
    }
    return new Subscription() {
      public void unsubscribe() {
        try {
          socket.close();
        } catch (IOException e) {
          e.printStackTrace();
        }
      }
    };
  }
}

Observable socketObservable = Observable.create(new SocketSubscribeFunc(hostname,port));
Observable watchdog = Observable.combineLatest(socketObservable, Observable.interval(1, TimeUnit.SECONDS), new Func2() {

  public Boolean call(final Socket socket, final Long aLong) {
    try {
      socket.getOutputStream().write("ping\n".getBytes());
      return true;
    } catch (IOException e) {
     return false;
    }
  }
});

现在,我想重新连接如果可以获取套接字(服务器/链接在创建时关闭)或变得不可写(连接成功后服务器/链接无法访问).理想情况下,通过重新订阅套接字Observable,其OnSubscribeFunc使用重试运算符创建连接.正如您所看到的,这将在套接字和监视器Observables之间引入循环依赖关系.我用switchMap/materialize玩了一会儿......为了传播最终的错误无济于事.

我接近放弃这个想法并使用副作用代码中的主题.但是在全球范围内应该有一个更好的方式:)

提前致谢!

1 个回答
  • 首先,我会避免Observable.create大部分时间,因为它通常是不必要的,并引入了不必要的复杂性.在这种情况下,Rx有一个调用的运算符using,它允许您创建一个在Observable的生命周期中存在的资源对象.它会自动捕获运行时错误,并提供一个dispose操作,因此这对于此用例中的套接字来说是完美的.我正在使用Java8 lambdas,因为它们更容易伪代码.

    Observable.using(
        // Resource (socket) factory
        () -> {
          try {
            return new Socket(hostname, port);
          } catch (IOException e) {
            // Rx will propagate this as an onError event.
            throw new RuntimeException(e);
          }
        },
        // Observable factory
        (socket) -> {
          return Observable.interval(1, TimeUnit.SECONDS)
              .map((unusedTick) {
                try {
                  socket.getOutputStream().write("ping\n".getBytes());
                  return true;
                } catch (IOException e) {
                  throw new RuntimeException(e);
                }
              })
              // Retry the inner job up to 3 times before propagating.
              .retry(3);
        },
        // Dispose action for socket.
        // In real life the close probably needs a try/catch.
        (socket) -> socket.close())
        // Retry the outer job up to 3 times.
        .retry(3)
        // If we propagate all errors, emit a 'false', signaling service is not available.
        .onErrorResumeNext(Observable.just(false));
    

    请注意,如果内部作业传播(在3次失败之后),这将重试外部作业.为了解决这个问题,您应该使用谓词和retryWhen检查重试时的文档.如果这不是内部作业传播的类型,则可以抛出特殊的RuntimeException并仅重试外部作业.

    usingdocs:http: //reactivex.io/RxJava/javadoc/rx/Observable.html#using(rx.functions.Func0,%20rx.functions.Func1,% 20rx.functions.Action1)

    2023-01-19 14:12 回答
撰写答案
今天,你开发时遇到什么问题呢?
立即提问
热门标签
PHP1.CN | 中国最专业的PHP中文社区 | PNG素材下载 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有