本文整理了Java中reactor.core.publisher.Flux.mergeOrdered()
方法的一些代码示例,展示了Flux.mergeOrdered()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flux.mergeOrdered()
方法的具体详情如下:
包路径:reactor.core.publisher.Flux
类名称:Flux
方法名:mergeOrdered
[英]Merge data from provided Publisher sequences into an ordered merged sequence, by picking the smallest values from each source (as defined by the provided Comparator). This is not a #sort(Comparator), as it doesn't consider the whole of each sequences.
Instead, this operator considers only one value from each source and picks the smallest of all these values, then replenishes the slot for that picked source.
[中]通过从每个源(由提供的比较器定义)中选取最小值,将提供的发布者序列中的数据合并到有序的合并序列中。这不是一个γ类(比较器),因为它不考虑整个序列。
相反,该运算符只考虑每个源中的一个值,并从所有这些值中选取最小值,然后为该选取的源补充插槽。
代码示例来源:origin: reactor/reactor-core
/**
* Merge data from provided {@link Publisher} sequences into an ordered merged sequence,
* by picking the smallest values from each source (as defined by the provided
* {@link Comparator}). This is not a {@link #sort(Comparator)}, as it doesn't consider
* the whole of each sequences.
*
* Instead, this operator considers only one value from each source and picks the
* smallest of all these values, then replenishes the slot for that picked source.
*
*
*
* @param comparator the {@link Comparator} to use to find the smallest value
* @param sources {@link Publisher} sources to merge
* @param
* @return a merged {@link Flux} that , subscribing early but keeping the original ordering
*/
@SafeVarargs
public static
return mergeOrdered(Queues.SMALL_BUFFER_SIZE, comparator, sources);
}
代码示例来源:origin: reactor/reactor-core
/**
* Merge data from provided {@link Publisher} sequences into an ordered merged sequence,
* by picking the smallest values from each source (as defined by their natural order).
* This is not a {@link #sort()}, as it doesn't consider the whole of each sequences.
*
* Instead, this operator considers only one value from each source and picks the
* smallest of all these values, then replenishes the slot for that picked source.
*
*
*
* @param sources {@link Publisher} sources of {@link Comparable} to merge
* @param a {@link Comparable} merged type that has a {@link Comparator#naturalOrder() natural order}
* @return a merged {@link Flux} that , subscribing early but keeping the original ordering
*/
@SafeVarargs
public static > Flux mergeOrdered(Publisher extends I>... sources) {
return mergeOrdered(Queues.SMALL_BUFFER_SIZE, Comparator.naturalOrder(), sources);
}
代码示例来源:origin: reactor/reactor-core
/**
* Merge data from this {@link Flux} and a {@link Publisher} into a reordered merge
* sequence, by picking the smallest value from each sequence as defined by a provided
* {@link Comparator}. Note that subsequent calls are combined, and their comparators are
* in lexicographic order as defined by {@link Comparator#thenComparing(Comparator)}.
*
* The combination step is avoided if the two {@link Comparator Comparators} are
* {@link Comparator#equals(Object) equal} (which can easily be achieved by using the
* same reference, and is also always true of {@link Comparator#naturalOrder()}).
*
* Note that merge is tailored to work with asynchronous sources or finite sources. When dealing with
* an infinite source that doesn't already publish on a dedicated Scheduler, you must isolate that source
* in its own Scheduler, as merge would otherwise attempt to drain it before subscribing to
* another source.
*
*
*
* @param other the {@link Publisher} to merge with
* @param otherComparator the {@link Comparator} to use for merging
*
* @return a new {@link Flux}
*/
public final Flux
Comparator super T> otherComparator) {
if (this instanceof FluxMergeOrdered) {
FluxMergeOrdered
return fluxMerge.mergeAdditionalSource(other, otherComparator);
}
return mergeOrdered(otherComparator, this, other);
}
代码示例来源:origin: reactor/reactor-core
@Test
public void reorderingAPIZeroOrOneSource() {
Flux
Flux
Flux
Flux
assertThat(testZero).isSameAs(expectedZero);
assertThat(testOne).isSameAs(expectedOne);
}
代码示例来源:origin: reactor/reactor-core
@Test
public void reorderingAPIWithDefaultPrefetch() {
Flux
Flux.just(1, 3, 5, 7), Flux.just(2, 4, 6, 8, 10));
assertThat(test.getPrefetch()).isEqualTo(Queues.SMALL_BUFFER_SIZE);
StepVerifier.create(test)
.expectNext(1, 2, 3, 4, 5, 6, 7, 8, 10)
.verifyComplete();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void reorderingAPINaturalOrder() {
Flux
assertThat(test.getPrefetch()).isEqualTo(Queues.SMALL_BUFFER_SIZE);
StepVerifier.create(test)
.expectNext(1, 2, 3, 4, 5, 6, 7, 8, 10)
.verifyComplete();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void reorderingAPI() {
Flux
Flux.just(1, 3, 5, 7),
Flux.just(2, 4, 6, 8, 10));
StepVerifier.create(test)
.expectNext(1, 2, 3, 4, 5, 6, 7, 8, 10)
.verifyComplete();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void reorderingAPISmallRequest() {
Flux
Flux.just(1, 3, 5, 7),
Flux.just(2, 4, 6, 8, 10));
StepVerifier.create(test, 5)
.expectNext(1, 2, 3, 4)
.expectNoEvent(Duration.ofMillis(50))
.thenRequest(5)
.expectNext(5, 6, 7, 8, 10)
.verifyComplete();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void considersOnlyLatestElementInEachSource() {
final Flux
Flux.just("AAAAA", "BBBB"),
Flux.just("DD", "CCC"),
Flux.just("E"));
StepVerifier.create(flux)
.expectNext("E") // between E, DD and AAAAA => E, 3rd slot done
.expectNext("DD") // between DD and AAAAA => DD, replenish 2nd slot to CCC
.expectNext("CCC") // between CCC and AAAAA => CCC, 2nd slot done
.expectNext("AAAAA", "BBBB") // rest of first flux in 1st slot => AAAAA then BBBB
.verifyComplete();
}
代码示例来源:origin: io.projectreactor/reactor-core
/**
* Merge data from provided {@link Publisher} sequences into an ordered merged sequence,
* by picking the smallest values from each source (as defined by the provided
* {@link Comparator}). This is not a {@link #sort(Comparator)}, as it doesn't consider
* the whole of each sequences.
*
* Instead, this operator considers only one value from each source and picks the
* smallest of all these values, then replenishes the slot for that picked source.
*
*
*
* @param comparator the {@link Comparator} to use to find the smallest value
* @param sources {@link Publisher} sources to merge
* @param
* @return a merged {@link Flux} that , subscribing early but keeping the original ordering
*/
@SafeVarargs
public static
return mergeOrdered(Queues.SMALL_BUFFER_SIZE, comparator, sources);
}
代码示例来源:origin: io.projectreactor/reactor-core
/**
* Merge data from provided {@link Publisher} sequences into an ordered merged sequence,
* by picking the smallest values from each source (as defined by their natural order).
* This is not a {@link #sort()}, as it doesn't consider the whole of each sequences.
*
* Instead, this operator considers only one value from each source and picks the
* smallest of all these values, then replenishes the slot for that picked source.
*
*
*
* @param sources {@link Publisher} sources of {@link Comparable} to merge
* @param a {@link Comparable} merged type that has a {@link Comparator#naturalOrder() natural order}
* @return a merged {@link Flux} that , subscribing early but keeping the original ordering
*/
@SafeVarargs
public static > Flux mergeOrdered(Publisher extends I>... sources) {
return mergeOrdered(Queues.SMALL_BUFFER_SIZE, Comparator.naturalOrder(), sources);
}
代码示例来源:origin: io.projectreactor/reactor-core
/**
* Merge data from this {@link Flux} and a {@link Publisher} into a reordered merge
* sequence, by picking the smallest value from each sequence as defined by a provided
* {@link Comparator}. Note that subsequent calls are combined, and their comparators are
* in lexicographic order as defined by {@link Comparator#thenComparing(Comparator)}.
*
* The combination step is avoided if the two {@link Comparator Comparators} are
* {@link Comparator#equals(Object) equal} (which can easily be achieved by using the
* same reference, and is also always true of {@link Comparator#naturalOrder()}).
*
* Note that merge is tailored to work with asynchronous sources or finite sources. When dealing with
* an infinite source that doesn't already publish on a dedicated Scheduler, you must isolate that source
* in its own Scheduler, as merge would otherwise attempt to drain it before subscribing to
* another source.
*
*
*
* @param other the {@link Publisher} to merge with
* @param otherComparator the {@link Comparator} to use for merging
*
* @return a new {@link Flux}
*/
public final Flux
Comparator super T> otherComparator) {
if (this instanceof FluxMergeOrdered) {
FluxMergeOrdered
return fluxMerge.mergeAdditionalSource(other, otherComparator);
}
return mergeOrdered(otherComparator, this, other);
}