我正在探索反应式编程和RxJava.这很有趣,但我遇到了一个无法找到答案的问题.我的基本问题:什么是一种反应性的方法来终止一个无限运行的Observable?我也欢迎有关我的代码的批评和反应性最佳实践.
作为练习,我正在编写一个日志文件尾实用程序.日志文件中的行流由a表示Observable
.为了BufferedReader
继续阅读添加到文件中的文本,我忽略了通常的reader.readLine() == null
终止检查,而是将其解释为意味着我的线程应该休眠并等待更多的记录器文本.
但是当我可以使用Ob终止Observer时takeUntil
,我需要找到一种干净的方法来终止无限运行的文件观察器.我可以编写自己的terminateWatcher
方法/字段,但这会打破Observable/Observer封装 - 我希望尽可能严格遵守反应范式.
这是Observable
代码:
public class FileWatcher implements OnSubscribeFunc{ private Path path = . . .; @Override // The super String> generic is pointless but required by the compiler public Subscription onSubscribe(Observer super String> observer) { try (BufferedReader reader = new BufferedReader(new FileReader(path.toFile()))) { String newLine = ""; while (!Thread.interrupted()) { // How do I terminate this reactively? if ((newLine = reader.readLine()) != null) observer.onNext(newLine); else try { // Wait for more text Thread.sleep(250); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } observer.onCompleted(); } catch (Exception e) { observer.onError(e); } return null; // Not sure what Subscription I should return } }
以下是Observer代码,用于打印新行:
public static void main(String... args) { . . . Observablelines = Observable.create(createWatcher(file)); lines = lines.takeWhile(new Func1 () { @Override public Boolean call(String line) { // Predicate for which to continue processing return !line.contains("shutdown"); } }).subscribeOn(Schedulers.threadPoolForIO()) .observeOn(Schedulers.currentThread()); // Seems like I should use subscribeOn() and observeOn(), but they // make my tailer terminate without reading any text. Subscription subscription = lines.subscribe(new Action1 () { @Override public void call(String line) { System.out.printf("%20s\t%s\n", file, line); } }); }
我的两个问题是:
什么是终止无限运行流的反应一致方法?
我的代码中有哪些其他错误会让你哭泣?:)
Mike Strobel.. 6
由于您在请求订阅时正在启动文件监视,因此在订阅结束时(即Subscription
关闭时)终止它是有意义的.这解决了代码注释中的一个松散结束:您返回一个Subscription
告诉FileWatcher
停止观看文件的内容.然后替换循环条件,以便检查订阅是否已被取消,而不是检查当前线程是否已被中断.
问题是,如果你的onSubscribe()
方法永远不会返回,那就没有用了.也许您FileWatcher
应该要求指定调度程序,并在该调度程序上进行读取.
由于您在请求订阅时正在启动文件监视,因此在订阅结束时(即Subscription
关闭时)终止它是有意义的.这解决了代码注释中的一个松散结束:您返回一个Subscription
告诉FileWatcher
停止观看文件的内容.然后替换循环条件,以便检查订阅是否已被取消,而不是检查当前线程是否已被中断.
问题是,如果你的onSubscribe()
方法永远不会返回,那就没有用了.也许您FileWatcher
应该要求指定调度程序,并在该调度程序上进行读取.