RxJava - 终止无限流

 just_roshinn5 发布于 2023-02-09 13:48

我正在探索反应式编程和RxJava.这很有趣,但我遇到了一个无法找到答案的问题.我的基本问题:什么是一种反应性的方法来终止一个无限运行的Observable?我也欢迎有关我的代码的批评和反应性最佳实践.

作为练习,我正在编写一个日志文件尾实用程序.日志文件中的行流由a表示Observable.为了BufferedReader继续阅读添加到文件中的文本,我忽略了通常的reader.readLine() == null终止检查,而是将其解释为意味着我的线程应该休眠并等待更多的记录器文本.

但是当我可以使用Ob终止Observer时takeUntil,我需要找到一种干净的方法来终止无限运行的文件观察器.我可以编写自己的terminateWatcher方法/字段,但这会打破Observable/Observer封装 - 我希望尽可能严格遵守反应范式.

这是Observable代码:

public class FileWatcher implements OnSubscribeFunc {
    private Path path = . . .;

    @Override
    // The  generic is pointless but required by the compiler
    public Subscription onSubscribe(Observer observer) {
        try (BufferedReader reader = new BufferedReader(new FileReader(path.toFile()))) {
            String newLine = "";
            while (!Thread.interrupted()) {  // How do I terminate this reactively?
                if ((newLine = reader.readLine()) != null)
                    observer.onNext(newLine);
                else
                    try {
                        // Wait for more text
                        Thread.sleep(250);
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                    }
            }
            observer.onCompleted();
        } catch (Exception e) {
            observer.onError(e);
        }

        return null;  // Not sure what Subscription I should return
    }
}

以下是Observer代码,用于打印新行:

public static void main(String... args) {
    . . .
    Observable lines = Observable.create(createWatcher(file));
    lines = lines.takeWhile(new Func1() {
        @Override
        public Boolean call(String line) {
            // Predicate for which to continue processing
            return !line.contains("shutdown");
        }
    }).subscribeOn(Schedulers.threadPoolForIO())
            .observeOn(Schedulers.currentThread());
    // Seems like I should use subscribeOn() and observeOn(), but they
    // make my tailer terminate without reading any text.

    Subscription subscription = lines.subscribe(new Action1() {
        @Override
        public void call(String line) {
             System.out.printf("%20s\t%s\n", file, line);
        }
    });
}

我的两个问题是:

    什么是终止无限运行流的反应一致方法?

    我的代码中有哪些其他错误会让你哭泣?:)

Mike Strobel.. 6

由于您在请求订阅时正在启动文件监视,因此在订阅结束时(即Subscription关闭时)终止它是有意义的.这解决了代码注释中的一个松散结束:您返回一个Subscription告诉FileWatcher停止观看文件的内容.然后替换循环条件,以便检查订阅是否已被取消,而不是检查当前线程是否已被中断.

问题是,如果你的onSubscribe()方法永远不会返回,那就没有用了.也许您FileWatcher应该要求指定调度程序,并在该调度程序上进行读取.

1 个回答
  • 由于您在请求订阅时正在启动文件监视,因此在订阅结束时(即Subscription关闭时)终止它是有意义的.这解决了代码注释中的一个松散结束:您返回一个Subscription告诉FileWatcher停止观看文件的内容.然后替换循环条件,以便检查订阅是否已被取消,而不是检查当前线程是否已被中断.

    问题是,如果你的onSubscribe()方法永远不会返回,那就没有用了.也许您FileWatcher应该要求指定调度程序,并在该调度程序上进行读取.

    2023-02-09 13:50 回答
撰写答案
今天,你开发时遇到什么问题呢?
立即提问
热门标签
PHP1.CN | 中国最专业的PHP中文社区 | PNG素材下载 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有