From 48489dcf4be78bfa782f51eece33bbeaebb2e57e Mon Sep 17 00:00:00 2001 From: David Karnok Date: Wed, 29 Mar 2017 10:00:14 +0200 Subject: [PATCH 001/524] Release 2.0.8 --- CHANGES.md | 39 +++++++++++++++++++++++++++++++++++++-- 1 file changed, 37 insertions(+), 2 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index 2fe8a70ba7..762fb67a97 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -1,6 +1,41 @@ -# RxJava Releases # +# RxJava 2 Releases # -Version 1.x can be found at https://github.com/ReactiveX/RxJava/blob/1.x/CHANGES.md +The cnagelog of version 1.x can be found at https://github.com/ReactiveX/RxJava/blob/1.x/CHANGES.md + +### Version 2.0.8 - March 29, 2017 ([Maven](http://search.maven.org/#artifactdetails%7Cio.reactivex.rxjava2%7Crxjava%7C2.0.8%7C)) + +**API enhancements** +- [Pull 5161](https://github.com/ReactiveX/RxJava/pull/5161): Add `Observable.switchMapSingle()` +- [Pull 5184](https://github.com/ReactiveX/RxJava/pull/5184): Add `offer()` method to `PublishProcessor` & `BehaviorProcessor`. +- [Pull 5197](https://github.com/ReactiveX/RxJava/pull/5197): Add `ParallelTransformer` interface. +- [Pull 5217](https://github.com/ReactiveX/RxJava/pull/5217): `UnicastSubject` fail-fast support. +- [Pull 5202](https://github.com/ReactiveX/RxJava/pull/5202): Add resilient versions of parallel `map()`, `filter()` and `doOnNext()`. +- [Pull 5226](https://github.com/ReactiveX/RxJava/pull/5226): `UnicastProcessor` fail-fast support. + +**Bugfixes** +- [Pull 5163](https://github.com/ReactiveX/RxJava/pull/5163): `Single.subscribe()` to report `isDisposed()` true on success/error. +- [Pull 5170](https://github.com/ReactiveX/RxJava/pull/5170): Fix `LambdaObserver` not cancelling the upstream. +- [Pull 5182](https://github.com/ReactiveX/RxJava/pull/5182): Fix `replay().refCount()` leaking items between connections. +- [Pull 5188](https://github.com/ReactiveX/RxJava/pull/5188): Fix `flatMap` emitting the terminal exception indicator on cancel. +- [Pull 5207](https://github.com/ReactiveX/RxJava/pull/5207): Prevent tasks to self interrupt on the standard schedulers. +- [Pull 5213](https://github.com/ReactiveX/RxJava/pull/5213): Fix `window()` with time+size emission problems. +- [Pull 5240](https://github.com/ReactiveX/RxJava/pull/5240): fix `CallbackCompletableObserver` calling `onError`. + +**Documentation** +- [Pull 5189](https://github.com/ReactiveX/RxJava/pull/5189): Declare `concatMapEager` requires positive prefetch amount. +- [Pull 5191](https://github.com/ReactiveX/RxJava/pull/5191): Correct java doc for `refCount()` return type. +- [Pull 5208](https://github.com/ReactiveX/RxJava/pull/5208): Fix images of `firstElement`, `flattenAsX`, `flatMapIterable`, `UnicastSubject` and `UnicastProcessor`. +- [Pull 5210](https://github.com/ReactiveX/RxJava/pull/5210): Better documentation on the abstract consumer classes (such as `DisposableSubscriber`). +- [Pull 5223](https://github.com/ReactiveX/RxJava/pull/5223): Improve the documentation of `Schedulers` utility class. +- [Pull 5230](https://github.com/ReactiveX/RxJava/pull/5230): Fix wrong comments in `Functions` “Function3” -> “BiFunction, Function3” + +**Other** +- Remove anonymous inner classes. + - [Pull 5174](https://github.com/ReactiveX/RxJava/pull/5174) + - [Pull 5177](https://github.com/ReactiveX/RxJava/pull/5177) +- [Pull 5183](https://github.com/ReactiveX/RxJava/pull/5183): Test to disallow anonymous inner classes. +- [Pull 5187](https://github.com/ReactiveX/RxJava/pull/5187): Reflection-based parameter validator & fixes. +- [Pull 5196](https://github.com/ReactiveX/RxJava/pull/5196): Add a few more `@Nullable` & `@NonNull` annotations to public interfaces. ### Version 2.0.7 - March 7, 2017 ([Maven](http://search.maven.org/#artifactdetails%7Cio.reactivex.rxjava2%7Crxjava%7C2.0.7%7C)) From 7c95808f077537428f2ae80fffd15e2848a2de31 Mon Sep 17 00:00:00 2001 From: akarnokd Date: Wed, 29 Mar 2017 14:20:58 +0200 Subject: [PATCH 002/524] 2.x: fix DisposableX copy-paste error in javadoc --- src/main/java/io/reactivex/observers/DisposableObserver.java | 2 +- .../java/io/reactivex/subscribers/DisposableSubscriber.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/main/java/io/reactivex/observers/DisposableObserver.java b/src/main/java/io/reactivex/observers/DisposableObserver.java index 15b90ccb0f..da863b1fa5 100644 --- a/src/main/java/io/reactivex/observers/DisposableObserver.java +++ b/src/main/java/io/reactivex/observers/DisposableObserver.java @@ -27,7 +27,7 @@ *

Use the public {@link #dispose()} method to dispose the sequence from within an * {@code onNext} implementation. * - *

Like all other consumers, {@code DefaultObserver} can be subscribed only once. + *

Like all other consumers, {@code DisposableObserver} can be subscribed only once. * Any subsequent attempt to subscribe it to a new source will yield an * {@link IllegalStateException} with message {@code "Disposable already set!"}. * diff --git a/src/main/java/io/reactivex/subscribers/DisposableSubscriber.java b/src/main/java/io/reactivex/subscribers/DisposableSubscriber.java index 8ad8cc96af..cebac57baa 100644 --- a/src/main/java/io/reactivex/subscribers/DisposableSubscriber.java +++ b/src/main/java/io/reactivex/subscribers/DisposableSubscriber.java @@ -37,7 +37,7 @@ * Calling {@link #request(long)} inside {@link #onNext(Object)} can happen at any time * because by design, {@code onNext} calls from upstream are non-reentrant and non-overlapping. * - *

Like all other consumers, {@code DefaultSubscriber} can be subscribed only once. + *

Like all other consumers, {@code DisposableSubscriber} can be subscribed only once. * Any subsequent attempt to subscribe it to a new source will yield an * {@link IllegalStateException} with message {@code "Subscription already set!"}. * From 6c8b0efade6e2b82b32daa18fec7b045aabb3f6c Mon Sep 17 00:00:00 2001 From: David Karnok Date: Fri, 31 Mar 2017 13:16:01 +0200 Subject: [PATCH 003/524] 2.x: fix Flowable.toList() onNext/cancel race (#5247) --- .../operators/flowable/FlowableToList.java | 5 +- .../flowable/FlowableToListTest.java | 81 ++++++++++++++++++- 2 files changed, 84 insertions(+), 2 deletions(-) diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableToList.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableToList.java index 8ed41afff9..75ef64ce2f 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableToList.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableToList.java @@ -69,7 +69,10 @@ public void onSubscribe(Subscription s) { @Override public void onNext(T t) { - value.add(t); + U v = value; + if (v != null) { + v.add(t); + } } @Override diff --git a/src/test/java/io/reactivex/internal/operators/flowable/FlowableToListTest.java b/src/test/java/io/reactivex/internal/operators/flowable/FlowableToListTest.java index b8557827e4..8c97ae5475 100644 --- a/src/test/java/io/reactivex/internal/operators/flowable/FlowableToListTest.java +++ b/src/test/java/io/reactivex/internal/operators/flowable/FlowableToListTest.java @@ -13,6 +13,7 @@ package io.reactivex.internal.operators.flowable; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.*; import java.util.*; @@ -23,7 +24,6 @@ import org.reactivestreams.Subscriber; import io.reactivex.*; -import io.reactivex.Flowable; import io.reactivex.exceptions.TestException; import io.reactivex.observers.TestObserver; import io.reactivex.processors.PublishProcessor; @@ -389,4 +389,83 @@ public Collection call() throws Exception { .assertFailure(NullPointerException.class) .assertErrorMessage("The collectionSupplier returned a null collection. Null values are generally not allowed in 2.x operators and sources."); } + + @Test + public void onNextCancelRace() { + for (int i = 0; i < 1000; i++) { + final PublishProcessor pp = PublishProcessor.create(); + final TestObserver> ts = pp.toList().test(); + + Runnable r1 = new Runnable() { + @Override + public void run() { + pp.onNext(1); + } + }; + Runnable r2 = new Runnable() { + @Override + public void run() { + ts.cancel(); + } + }; + + TestHelper.race(r1, r2); + } + + } + + @Test + public void onNextCancelRaceFlowable() { + for (int i = 0; i < 1000; i++) { + final PublishProcessor pp = PublishProcessor.create(); + final TestSubscriber> ts = pp.toList().toFlowable().test(); + + Runnable r1 = new Runnable() { + @Override + public void run() { + pp.onNext(1); + } + }; + Runnable r2 = new Runnable() { + @Override + public void run() { + ts.cancel(); + } + }; + + TestHelper.race(r1, r2); + } + + } + + @Test + public void onCompleteCancelRaceFlowable() { + for (int i = 0; i < 1000; i++) { + final PublishProcessor pp = PublishProcessor.create(); + final TestSubscriber> ts = pp.toList().toFlowable().test(); + + pp.onNext(1); + + Runnable r1 = new Runnable() { + @Override + public void run() { + pp.onComplete(); + } + }; + Runnable r2 = new Runnable() { + @Override + public void run() { + ts.cancel(); + } + }; + + TestHelper.race(r1, r2); + + if (ts.valueCount() != 0) { + ts.assertValue(Arrays.asList(1)) + .assertNoErrors(); + } + } + + } } From ba5edc9bf28cc9cc4abab492dbf1daecf34c74f7 Mon Sep 17 00:00:00 2001 From: Jihong Park Date: Sun, 2 Apr 2017 00:49:26 +0900 Subject: [PATCH 004/524] Add nullPointerException comments and ObjectHelper test code. (#5255) * Add NullPointerException comments for doc. * Add test code for ObjectHelper.java --- src/main/java/io/reactivex/Observable.java | 3 +++ .../internal/functions/ObjectHelperTest.java | 21 ++++++++++++++++++- 2 files changed, 23 insertions(+), 1 deletion(-) diff --git a/src/main/java/io/reactivex/Observable.java b/src/main/java/io/reactivex/Observable.java index 3ccd3b01fa..5b01d6610a 100644 --- a/src/main/java/io/reactivex/Observable.java +++ b/src/main/java/io/reactivex/Observable.java @@ -3595,6 +3595,9 @@ public static Observable timer(long delay, TimeUnit unit) { * time units to use for {@code delay} * @param scheduler * the {@link Scheduler} to use for scheduling the item + * @throws NullPointerException + * if {@code unit} is null, or + * if {@code scheduler} is null * @return an Observable that emits {@code 0L} after a specified delay, on a specified Scheduler, and then * completes * @see ReactiveX operators documentation: Timer diff --git a/src/test/java/io/reactivex/internal/functions/ObjectHelperTest.java b/src/test/java/io/reactivex/internal/functions/ObjectHelperTest.java index ffcbb065ec..91912323d9 100644 --- a/src/test/java/io/reactivex/internal/functions/ObjectHelperTest.java +++ b/src/test/java/io/reactivex/internal/functions/ObjectHelperTest.java @@ -33,8 +33,27 @@ public void hashCodeOf() { } @Test - public void compare() { + public void verifyPositiveInt() throws Exception{ + assertEquals(1, ObjectHelper.verifyPositive(1, "param")); + } + + @Test + public void verifyPositiveLong() throws Exception{ + assertEquals(1L, ObjectHelper.verifyPositive(1L, "param")); + } + @Test(expected = IllegalArgumentException.class) + public void verifyPositiveIntFail() throws Exception{ + assertEquals(-1, ObjectHelper.verifyPositive(-1, "param")); + } + + @Test(expected = IllegalArgumentException.class) + public void verifyPositiveLongFail() throws Exception{ + assertEquals(-1L, ObjectHelper.verifyPositive(-1L, "param")); + } + + @Test + public void compare() { assertEquals(-1, ObjectHelper.compare(0, 2)); assertEquals(0, ObjectHelper.compare(0, 0)); assertEquals(1, ObjectHelper.compare(2, 0)); From 5ec4f761b7de39bb3b37939709ad2bacdc34ad0a Mon Sep 17 00:00:00 2001 From: David Karnok Date: Sat, 1 Apr 2017 20:00:20 +0200 Subject: [PATCH 005/524] 2.x: fix flatMapIterable appearing to be empty when fused (#5256) --- .../flowable/FlowableFlattenIterable.java | 5 +- .../flowable/FlowableFlattenIterableTest.java | 51 +++++++++++++++++++ .../flowable/FlowableToListTest.java | 20 ++++---- 3 files changed, 64 insertions(+), 12 deletions(-) diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableFlattenIterable.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableFlattenIterable.java index 2a8a9f66fd..460279af7f 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableFlattenIterable.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableFlattenIterable.java @@ -411,7 +411,10 @@ public void clear() { @Override public boolean isEmpty() { Iterator it = current; - return (it != null && !it.hasNext()) || queue.isEmpty(); + if (it == null) { + return queue.isEmpty(); + } + return !it.hasNext(); } @Nullable diff --git a/src/test/java/io/reactivex/internal/operators/flowable/FlowableFlattenIterableTest.java b/src/test/java/io/reactivex/internal/operators/flowable/FlowableFlattenIterableTest.java index 51de692dff..a865475acf 100644 --- a/src/test/java/io/reactivex/internal/operators/flowable/FlowableFlattenIterableTest.java +++ b/src/test/java/io/reactivex/internal/operators/flowable/FlowableFlattenIterableTest.java @@ -863,4 +863,55 @@ public void remove() { ts.assertResult(1); } + + @Test + public void doubleShare() { + Iterable it = Flowable.range(1, 300).blockingIterable(); + Flowable.just(it, it) + .flatMapIterable(Functions.>identity()) + .share() + .share() + .count() + .test() + .assertResult(600L); + } + + @Test + public void multiShare() { + Iterable it = Flowable.range(1, 300).blockingIterable(); + for (int i = 0; i < 5; i++) { + Flowable f = Flowable.just(it, it) + .flatMapIterable(Functions.>identity()); + + for (int j = 0; j < i; j++) { + f = f.share(); + } + + f + .count() + .test() + .withTag("Share: " + i) + .assertResult(600L); + } + } + + @Test + public void multiShareHidden() { + Iterable it = Flowable.range(1, 300).blockingIterable(); + for (int i = 0; i < 5; i++) { + Flowable f = Flowable.just(it, it) + .flatMapIterable(Functions.>identity()) + .hide(); + + for (int j = 0; j < i; j++) { + f = f.share(); + } + + f + .count() + .test() + .withTag("Share: " + i) + .assertResult(600L); + } + } } diff --git a/src/test/java/io/reactivex/internal/operators/flowable/FlowableToListTest.java b/src/test/java/io/reactivex/internal/operators/flowable/FlowableToListTest.java index 8c97ae5475..2bb37c2f18 100644 --- a/src/test/java/io/reactivex/internal/operators/flowable/FlowableToListTest.java +++ b/src/test/java/io/reactivex/internal/operators/flowable/FlowableToListTest.java @@ -395,7 +395,7 @@ public void onNextCancelRace() { for (int i = 0; i < 1000; i++) { final PublishProcessor pp = PublishProcessor.create(); final TestObserver> ts = pp.toList().test(); - + Runnable r1 = new Runnable() { @Override public void run() { @@ -408,10 +408,9 @@ public void run() { ts.cancel(); } }; - + TestHelper.race(r1, r2); } - } @Test @@ -419,7 +418,7 @@ public void onNextCancelRaceFlowable() { for (int i = 0; i < 1000; i++) { final PublishProcessor pp = PublishProcessor.create(); final TestSubscriber> ts = pp.toList().toFlowable().test(); - + Runnable r1 = new Runnable() { @Override public void run() { @@ -432,10 +431,10 @@ public void run() { ts.cancel(); } }; - + TestHelper.race(r1, r2); } - + } @Test @@ -443,9 +442,9 @@ public void onCompleteCancelRaceFlowable() { for (int i = 0; i < 1000; i++) { final PublishProcessor pp = PublishProcessor.create(); final TestSubscriber> ts = pp.toList().toFlowable().test(); - + pp.onNext(1); - + Runnable r1 = new Runnable() { @Override public void run() { @@ -458,14 +457,13 @@ public void run() { ts.cancel(); } }; - + TestHelper.race(r1, r2); - + if (ts.valueCount() != 0) { ts.assertValue(Arrays.asList(1)) .assertNoErrors(); } } - } } From fa58d36375ecc084b21bca299ca4780946b15dc5 Mon Sep 17 00:00:00 2001 From: Michael Date: Sat, 1 Apr 2017 21:11:23 +0200 Subject: [PATCH 006/524] More nullability annotations (#5251) * More nullability annotations * Refactored imports * Changes based on akarnokd's review * A few more annotations * Changes based on akarnokd's 2nd review --- src/main/java/io/reactivex/Notification.java | 10 ++++- src/main/java/io/reactivex/Observer.java | 7 ++-- src/main/java/io/reactivex/Single.java | 5 +-- .../disposables/ActionDisposable.java | 3 +- .../disposables/CompositeDisposable.java | 13 +++--- .../io/reactivex/disposables/Disposables.java | 21 ++++++---- .../disposables/ReferenceDisposable.java | 3 +- .../disposables/RunnableDisposable.java | 4 +- .../disposables/SerialDisposable.java | 10 +++-- .../disposables/SubscriptionDisposable.java | 3 +- .../exceptions/CompositeException.java | 9 ++++- .../io/reactivex/exceptions/Exceptions.java | 6 ++- .../OnErrorNotImplementedException.java | 6 +-- .../flowables/ConnectableFlowable.java | 9 ++++- .../reactivex/flowables/GroupedFlowable.java | 6 ++- .../disposables/DisposableHelper.java | 1 + .../observables/ConnectableObservable.java | 9 ++++- .../observables/GroupedObservable.java | 6 ++- .../reactivex/observers/DefaultObserver.java | 3 +- .../DisposableCompletableObserver.java | 3 +- .../observers/DisposableMaybeObserver.java | 4 +- .../observers/DisposableObserver.java | 6 ++- .../observers/DisposableSingleObserver.java | 4 +- .../ResourceCompletableObserver.java | 8 ++-- .../observers/ResourceMaybeObserver.java | 8 ++-- .../reactivex/observers/ResourceObserver.java | 3 +- .../observers/ResourceSingleObserver.java | 8 ++-- .../io/reactivex/observers/SafeObserver.java | 9 +++-- .../observers/SerializedObserver.java | 11 ++--- .../reactivex/parallel/ParallelFlowable.java | 40 ++++++++++++++++++- .../reactivex/processors/AsyncProcessor.java | 6 +-- .../processors/FlowableProcessor.java | 5 ++- .../java/io/reactivex/schedulers/Timed.java | 7 +++- .../io/reactivex/subjects/SingleSubject.java | 15 ++++--- .../java/io/reactivex/subjects/Subject.java | 3 ++ 35 files changed, 187 insertions(+), 87 deletions(-) diff --git a/src/main/java/io/reactivex/Notification.java b/src/main/java/io/reactivex/Notification.java index c0ad0b6f7b..84ceb38a88 100644 --- a/src/main/java/io/reactivex/Notification.java +++ b/src/main/java/io/reactivex/Notification.java @@ -13,6 +13,7 @@ package io.reactivex; +import io.reactivex.annotations.*; import io.reactivex.internal.functions.ObjectHelper; import io.reactivex.internal.util.NotificationLite; @@ -66,6 +67,7 @@ public boolean isOnNext() { * @see #isOnNext() */ @SuppressWarnings("unchecked") + @Nullable public T getValue() { Object o = value; if (o != null && !NotificationLite.isError(o)) { @@ -80,6 +82,7 @@ public T getValue() { * @return the Throwable error contained or null * @see #isOnError() */ + @Nullable public Throwable getError() { Object o = value; if (NotificationLite.isError(o)) { @@ -122,7 +125,8 @@ public String toString() { * @return the new Notification instance * @throws NullPointerException if value is null */ - public static Notification createOnNext(T value) { + @NonNull + public static Notification createOnNext(@NonNull T value) { ObjectHelper.requireNonNull(value, "value is null"); return new Notification(value); } @@ -134,7 +138,8 @@ public static Notification createOnNext(T value) { * @return the new Notification instance * @throws NullPointerException if error is null */ - public static Notification createOnError(Throwable error) { + @NonNull + public static Notification createOnError(@NonNull Throwable error) { ObjectHelper.requireNonNull(error, "error is null"); return new Notification(NotificationLite.error(error)); } @@ -146,6 +151,7 @@ public static Notification createOnError(Throwable error) { * @return the shared Notification instance representing an onComplete signal */ @SuppressWarnings("unchecked") + @NonNull public static Notification createOnComplete() { return (Notification)COMPLETE; } diff --git a/src/main/java/io/reactivex/Observer.java b/src/main/java/io/reactivex/Observer.java index 55b38dc0d7..a383d04a27 100644 --- a/src/main/java/io/reactivex/Observer.java +++ b/src/main/java/io/reactivex/Observer.java @@ -13,6 +13,7 @@ package io.reactivex; +import io.reactivex.annotations.NonNull; import io.reactivex.disposables.Disposable; /** @@ -40,7 +41,7 @@ public interface Observer { * be called anytime to cancel the connection * @since 2.0 */ - void onSubscribe(Disposable d); + void onSubscribe(@NonNull Disposable d); /** * Provides the Observer with a new item to observe. @@ -53,7 +54,7 @@ public interface Observer { * @param t * the item emitted by the Observable */ - void onNext(T t); + void onNext(@NonNull T t); /** * Notifies the Observer that the {@link Observable} has experienced an error condition. @@ -64,7 +65,7 @@ public interface Observer { * @param e * the exception encountered by the Observable */ - void onError(Throwable e); + void onError(@NonNull Throwable e); /** * Notifies the Observer that the {@link Observable} has finished sending push-based notifications. diff --git a/src/main/java/io/reactivex/Single.java b/src/main/java/io/reactivex/Single.java index f628e463b4..3cef7c9aa4 100644 --- a/src/main/java/io/reactivex/Single.java +++ b/src/main/java/io/reactivex/Single.java @@ -16,8 +16,6 @@ import java.util.NoSuchElementException; import java.util.concurrent.*; -import org.reactivestreams.Publisher; - import io.reactivex.annotations.*; import io.reactivex.disposables.Disposable; import io.reactivex.exceptions.Exceptions; @@ -34,6 +32,7 @@ import io.reactivex.observers.TestObserver; import io.reactivex.plugins.RxJavaPlugins; import io.reactivex.schedulers.Schedulers; +import org.reactivestreams.Publisher; /** * The Single class implements the Reactive Pattern for a single value response. @@ -2714,7 +2713,7 @@ public final void subscribe(SingleObserver subscriber) { * Override this method in subclasses to handle the incoming SingleObservers. * @param observer the SingleObserver to handle, not null */ - protected abstract void subscribeActual(SingleObserver observer); + protected abstract void subscribeActual(@NonNull SingleObserver observer); /** * Subscribes a given SingleObserver (subclass) to this Single and returns the given diff --git a/src/main/java/io/reactivex/disposables/ActionDisposable.java b/src/main/java/io/reactivex/disposables/ActionDisposable.java index d112a4e3a1..447dfe2e34 100644 --- a/src/main/java/io/reactivex/disposables/ActionDisposable.java +++ b/src/main/java/io/reactivex/disposables/ActionDisposable.java @@ -12,6 +12,7 @@ */ package io.reactivex.disposables; +import io.reactivex.annotations.NonNull; import io.reactivex.functions.Action; import io.reactivex.internal.util.ExceptionHelper; @@ -24,7 +25,7 @@ final class ActionDisposable extends ReferenceDisposable { } @Override - protected void onDisposed(Action value) { + protected void onDisposed(@NonNull Action value) { try { value.run(); } catch (Throwable ex) { diff --git a/src/main/java/io/reactivex/disposables/CompositeDisposable.java b/src/main/java/io/reactivex/disposables/CompositeDisposable.java index 3e02288f2e..f9bf2d9e84 100644 --- a/src/main/java/io/reactivex/disposables/CompositeDisposable.java +++ b/src/main/java/io/reactivex/disposables/CompositeDisposable.java @@ -14,6 +14,7 @@ import java.util.*; +import io.reactivex.annotations.NonNull; import io.reactivex.exceptions.*; import io.reactivex.internal.disposables.DisposableContainer; import io.reactivex.internal.functions.ObjectHelper; @@ -39,7 +40,7 @@ public CompositeDisposable() { * Creates a CompositeDisposables with the given array of initial elements. * @param resources the array of Disposables to start with */ - public CompositeDisposable(Disposable... resources) { + public CompositeDisposable(@NonNull Disposable... resources) { ObjectHelper.requireNonNull(resources, "resources is null"); this.resources = new OpenHashSet(resources.length + 1); for (Disposable d : resources) { @@ -52,7 +53,7 @@ public CompositeDisposable(Disposable... resources) { * Creates a CompositeDisposables with the given Iterable sequence of initial elements. * @param resources the Iterable sequence of Disposables to start with */ - public CompositeDisposable(Iterable resources) { + public CompositeDisposable(@NonNull Iterable resources) { ObjectHelper.requireNonNull(resources, "resources is null"); this.resources = new OpenHashSet(); for (Disposable d : resources) { @@ -85,7 +86,7 @@ public boolean isDisposed() { } @Override - public boolean add(Disposable d) { + public boolean add(@NonNull Disposable d) { ObjectHelper.requireNonNull(d, "d is null"); if (!disposed) { synchronized (this) { @@ -110,7 +111,7 @@ public boolean add(Disposable d) { * @param ds the array of Disposables * @return true if the operation was successful, false if the container has been disposed */ - public boolean addAll(Disposable... ds) { + public boolean addAll(@NonNull Disposable... ds) { ObjectHelper.requireNonNull(ds, "ds is null"); if (!disposed) { synchronized (this) { @@ -135,7 +136,7 @@ public boolean addAll(Disposable... ds) { } @Override - public boolean remove(Disposable d) { + public boolean remove(@NonNull Disposable d) { if (delete(d)) { d.dispose(); return true; @@ -144,7 +145,7 @@ public boolean remove(Disposable d) { } @Override - public boolean delete(Disposable d) { + public boolean delete(@NonNull Disposable d) { ObjectHelper.requireNonNull(d, "Disposable item is null"); if (disposed) { return false; diff --git a/src/main/java/io/reactivex/disposables/Disposables.java b/src/main/java/io/reactivex/disposables/Disposables.java index cac17cad7a..7fbac0f63e 100644 --- a/src/main/java/io/reactivex/disposables/Disposables.java +++ b/src/main/java/io/reactivex/disposables/Disposables.java @@ -15,11 +15,11 @@ import java.util.concurrent.Future; -import org.reactivestreams.Subscription; - +import io.reactivex.annotations.NonNull; import io.reactivex.functions.Action; import io.reactivex.internal.disposables.EmptyDisposable; import io.reactivex.internal.functions.*; +import org.reactivestreams.Subscription; /** * Utility class to help create disposables by wrapping @@ -38,7 +38,8 @@ private Disposables() { * @param run the Runnable to wrap * @return the new Disposable instance */ - public static Disposable fromRunnable(Runnable run) { + @NonNull + public static Disposable fromRunnable(@NonNull Runnable run) { ObjectHelper.requireNonNull(run, "run is null"); return new RunnableDisposable(run); } @@ -49,7 +50,8 @@ public static Disposable fromRunnable(Runnable run) { * @param run the Action to wrap * @return the new Disposable instance */ - public static Disposable fromAction(Action run) { + @NonNull + public static Disposable fromAction(@NonNull Action run) { ObjectHelper.requireNonNull(run, "run is null"); return new ActionDisposable(run); } @@ -60,7 +62,8 @@ public static Disposable fromAction(Action run) { * @param future the Future to wrap * @return the new Disposable instance */ - public static Disposable fromFuture(Future future) { + @NonNull + public static Disposable fromFuture(@NonNull Future future) { ObjectHelper.requireNonNull(future, "future is null"); return fromFuture(future, true); } @@ -72,7 +75,8 @@ public static Disposable fromFuture(Future future) { * @param allowInterrupt if true, the future cancel happens via Future.cancel(true) * @return the new Disposable instance */ - public static Disposable fromFuture(Future future, boolean allowInterrupt) { + @NonNull + public static Disposable fromFuture(@NonNull Future future, boolean allowInterrupt) { ObjectHelper.requireNonNull(future, "future is null"); return new FutureDisposable(future, allowInterrupt); } @@ -83,7 +87,8 @@ public static Disposable fromFuture(Future future, boolean allowInterrupt) { * @param subscription the Runnable to wrap * @return the new Disposable instance */ - public static Disposable fromSubscription(Subscription subscription) { + @NonNull + public static Disposable fromSubscription(@NonNull Subscription subscription) { ObjectHelper.requireNonNull(subscription, "subscription is null"); return new SubscriptionDisposable(subscription); } @@ -92,6 +97,7 @@ public static Disposable fromSubscription(Subscription subscription) { * Returns a new, non-disposed Disposable instance. * @return a new, non-disposed Disposable instance */ + @NonNull public static Disposable empty() { return fromRunnable(Functions.EMPTY_RUNNABLE); } @@ -100,6 +106,7 @@ public static Disposable empty() { * Returns a disposed Disposable instance. * @return a disposed Disposable instance */ + @NonNull public static Disposable disposed() { return EmptyDisposable.INSTANCE; } diff --git a/src/main/java/io/reactivex/disposables/ReferenceDisposable.java b/src/main/java/io/reactivex/disposables/ReferenceDisposable.java index a21449a403..100b1848ed 100644 --- a/src/main/java/io/reactivex/disposables/ReferenceDisposable.java +++ b/src/main/java/io/reactivex/disposables/ReferenceDisposable.java @@ -15,6 +15,7 @@ import java.util.concurrent.atomic.AtomicReference; +import io.reactivex.annotations.NonNull; import io.reactivex.internal.functions.ObjectHelper; /** @@ -31,7 +32,7 @@ abstract class ReferenceDisposable extends AtomicReference implements Disp super(ObjectHelper.requireNonNull(value, "value is null")); } - protected abstract void onDisposed(T value); + protected abstract void onDisposed(@NonNull T value); @Override public final void dispose() { diff --git a/src/main/java/io/reactivex/disposables/RunnableDisposable.java b/src/main/java/io/reactivex/disposables/RunnableDisposable.java index 417d55088c..a70df60317 100644 --- a/src/main/java/io/reactivex/disposables/RunnableDisposable.java +++ b/src/main/java/io/reactivex/disposables/RunnableDisposable.java @@ -12,6 +12,8 @@ */ package io.reactivex.disposables; +import io.reactivex.annotations.NonNull; + /** * A disposable container that manages a Runnable instance. */ @@ -24,7 +26,7 @@ final class RunnableDisposable extends ReferenceDisposable { } @Override - protected void onDisposed(Runnable value) { + protected void onDisposed(@NonNull Runnable value) { value.run(); } diff --git a/src/main/java/io/reactivex/disposables/SerialDisposable.java b/src/main/java/io/reactivex/disposables/SerialDisposable.java index 96beb94b55..d5063928a2 100644 --- a/src/main/java/io/reactivex/disposables/SerialDisposable.java +++ b/src/main/java/io/reactivex/disposables/SerialDisposable.java @@ -15,7 +15,8 @@ import java.util.concurrent.atomic.AtomicReference; -import io.reactivex.internal.disposables.*; +import io.reactivex.annotations.Nullable; +import io.reactivex.internal.disposables.DisposableHelper; /** * A Disposable container that allows atomically updating/replacing the contained @@ -36,7 +37,7 @@ public SerialDisposable() { * Constructs a SerialDisposable with the given initial Disposable instance. * @param initialDisposable the initial Disposable instance to use, null allowed */ - public SerialDisposable(Disposable initialDisposable) { + public SerialDisposable(@Nullable Disposable initialDisposable) { this.resource = new AtomicReference(initialDisposable); } @@ -47,7 +48,7 @@ public SerialDisposable(Disposable initialDisposable) { * @return true if the operation succeeded, false if the container has been disposed * @see #replace(Disposable) */ - public boolean set(Disposable next) { + public boolean set(@Nullable Disposable next) { return DisposableHelper.set(resource, next); } @@ -58,7 +59,7 @@ public boolean set(Disposable next) { * @return true if the operation succeeded, false if the container has been disposed * @see #set(Disposable) */ - public boolean replace(Disposable next) { + public boolean replace(@Nullable Disposable next) { return DisposableHelper.replace(resource, next); } @@ -66,6 +67,7 @@ public boolean replace(Disposable next) { * Returns the currently contained Disposable or null if this container is empty. * @return the current Disposable, may be null */ + @Nullable public Disposable get() { Disposable d = resource.get(); if (d == DisposableHelper.DISPOSED) { diff --git a/src/main/java/io/reactivex/disposables/SubscriptionDisposable.java b/src/main/java/io/reactivex/disposables/SubscriptionDisposable.java index 54b53b18c6..ebf8934a37 100644 --- a/src/main/java/io/reactivex/disposables/SubscriptionDisposable.java +++ b/src/main/java/io/reactivex/disposables/SubscriptionDisposable.java @@ -12,6 +12,7 @@ */ package io.reactivex.disposables; +import io.reactivex.annotations.NonNull; import org.reactivestreams.Subscription; /** @@ -26,7 +27,7 @@ final class SubscriptionDisposable extends ReferenceDisposable { } @Override - protected void onDisposed(Subscription value) { + protected void onDisposed(@NonNull Subscription value) { value.cancel(); } } diff --git a/src/main/java/io/reactivex/exceptions/CompositeException.java b/src/main/java/io/reactivex/exceptions/CompositeException.java index bcd5baffbb..0b18f8ce3f 100644 --- a/src/main/java/io/reactivex/exceptions/CompositeException.java +++ b/src/main/java/io/reactivex/exceptions/CompositeException.java @@ -18,6 +18,8 @@ import java.io.*; import java.util.*; +import io.reactivex.annotations.NonNull; + /** * Represents an exception that is a composite of one or more other exceptions. A {@code CompositeException} * does not modify the structure of any exception it wraps, but at print-time it iterates through the list of @@ -47,7 +49,7 @@ public final class CompositeException extends RuntimeException { * * @throws IllegalArgumentException if exceptions is empty. */ - public CompositeException(Throwable... exceptions) { + public CompositeException(@NonNull Throwable... exceptions) { this(exceptions == null ? Collections.singletonList(new NullPointerException("exceptions was null")) : Arrays.asList(exceptions)); } @@ -59,7 +61,7 @@ public CompositeException(Throwable... exceptions) { * * @throws IllegalArgumentException if errors is empty. */ - public CompositeException(Iterable errors) { + public CompositeException(@NonNull Iterable errors) { Set deDupedExceptions = new LinkedHashSet(); List localExceptions = new ArrayList(); if (errors != null) { @@ -89,16 +91,19 @@ public CompositeException(Iterable errors) { * * @return the exceptions that make up the {@code CompositeException}, as a {@link List} of {@link Throwable}s */ + @NonNull public List getExceptions() { return exceptions; } @Override + @NonNull public String getMessage() { return message; } @Override + @NonNull public synchronized Throwable getCause() { // NOPMD if (cause == null) { // we lazily generate this causal chain if this is called diff --git a/src/main/java/io/reactivex/exceptions/Exceptions.java b/src/main/java/io/reactivex/exceptions/Exceptions.java index cad41e6e51..42c2aa360d 100644 --- a/src/main/java/io/reactivex/exceptions/Exceptions.java +++ b/src/main/java/io/reactivex/exceptions/Exceptions.java @@ -13,6 +13,7 @@ package io.reactivex.exceptions; +import io.reactivex.annotations.*; import io.reactivex.internal.util.ExceptionHelper; /** @@ -32,7 +33,8 @@ private Exceptions() { * @return because {@code propagate} itself throws an exception or error, this is a sort of phantom return * value; {@code propagate} does not actually return anything */ - public static RuntimeException propagate(Throwable t) { + @NonNull + public static RuntimeException propagate(@NonNull Throwable t) { /* * The return type of RuntimeException is a trick for code to be like this: * @@ -61,7 +63,7 @@ public static RuntimeException propagate(Throwable t) { * the {@code Throwable} to test and perhaps throw * @see RxJava: StackOverflowError is swallowed (Issue #748) */ - public static void throwIfFatal(Throwable t) { + public static void throwIfFatal(@NonNull Throwable t) { // values here derived from https://github.com/ReactiveX/RxJava/issues/748#issuecomment-32471495 if (t instanceof VirtualMachineError) { throw (VirtualMachineError) t; diff --git a/src/main/java/io/reactivex/exceptions/OnErrorNotImplementedException.java b/src/main/java/io/reactivex/exceptions/OnErrorNotImplementedException.java index dc61f2eea8..13488f79b4 100644 --- a/src/main/java/io/reactivex/exceptions/OnErrorNotImplementedException.java +++ b/src/main/java/io/reactivex/exceptions/OnErrorNotImplementedException.java @@ -13,7 +13,7 @@ package io.reactivex.exceptions; -import io.reactivex.annotations.Experimental; +import io.reactivex.annotations.*; /** * Represents an exception used to signal to the {@code RxJavaPlugins.onError()} that a @@ -35,7 +35,7 @@ public final class OnErrorNotImplementedException extends RuntimeException { * @param e * the {@code Throwable} to signal; if null, a NullPointerException is constructed */ - public OnErrorNotImplementedException(String message, Throwable e) { + public OnErrorNotImplementedException(String message, @NonNull Throwable e) { super(message, e != null ? e : new NullPointerException()); } @@ -47,7 +47,7 @@ public OnErrorNotImplementedException(String message, Throwable e) { * @param e * the {@code Throwable} to signal; if null, a NullPointerException is constructed */ - public OnErrorNotImplementedException(Throwable e) { + public OnErrorNotImplementedException(@NonNull Throwable e) { super(e != null ? e.getMessage() : null, e != null ? e : new NullPointerException()); } } \ No newline at end of file diff --git a/src/main/java/io/reactivex/flowables/ConnectableFlowable.java b/src/main/java/io/reactivex/flowables/ConnectableFlowable.java index fb74764935..41039682f8 100644 --- a/src/main/java/io/reactivex/flowables/ConnectableFlowable.java +++ b/src/main/java/io/reactivex/flowables/ConnectableFlowable.java @@ -13,6 +13,7 @@ package io.reactivex.flowables; +import io.reactivex.annotations.NonNull; import org.reactivestreams.Subscriber; import io.reactivex.Flowable; @@ -47,7 +48,7 @@ public abstract class ConnectableFlowable extends Flowable { * allowing the caller to synchronously disconnect a synchronous source * @see ReactiveX documentation: Connect */ - public abstract void connect(Consumer connection); + public abstract void connect(@NonNull Consumer connection); /** * Instructs the {@code ConnectableObservable} to begin emitting the items from its underlying @@ -71,6 +72,7 @@ public final Disposable connect() { * @return a {@link Flowable} * @see ReactiveX documentation: RefCount */ + @NonNull public Flowable refCount() { return RxJavaPlugins.onAssembly(new FlowableRefCount(this)); } @@ -82,6 +84,7 @@ public Flowable refCount() { * @return an Observable that automatically connects to this ConnectableObservable * when the first Subscriber subscribes */ + @NonNull public Flowable autoConnect() { return autoConnect(1); } @@ -95,6 +98,7 @@ public Flowable autoConnect() { * @return an Observable that automatically connects to this ConnectableObservable * when the specified number of Subscribers subscribe to it */ + @NonNull public Flowable autoConnect(int numberOfSubscribers) { return autoConnect(numberOfSubscribers, Functions.emptyConsumer()); } @@ -113,7 +117,8 @@ public Flowable autoConnect(int numberOfSubscribers) { * when the specified number of Subscribers subscribe to it and calls the * specified callback with the Subscription associated with the established connection */ - public Flowable autoConnect(int numberOfSubscribers, Consumer connection) { + @NonNull + public Flowable autoConnect(int numberOfSubscribers, @NonNull Consumer connection) { if (numberOfSubscribers <= 0) { this.connect(connection); return RxJavaPlugins.onAssembly(this); diff --git a/src/main/java/io/reactivex/flowables/GroupedFlowable.java b/src/main/java/io/reactivex/flowables/GroupedFlowable.java index 4ffd71ad14..c7eb4dc121 100644 --- a/src/main/java/io/reactivex/flowables/GroupedFlowable.java +++ b/src/main/java/io/reactivex/flowables/GroupedFlowable.java @@ -13,6 +13,7 @@ package io.reactivex.flowables; import io.reactivex.Flowable; +import io.reactivex.annotations.Nullable; /** * A {@link Flowable} that has been grouped by key, the value of which can be obtained with {@link #getKey()}. @@ -30,14 +31,14 @@ * @see ReactiveX documentation: GroupBy */ public abstract class GroupedFlowable extends Flowable { - + final K key; /** * Constructs a GroupedFlowable with the given key. * @param key the key */ - protected GroupedFlowable(K key) { + protected GroupedFlowable(@Nullable K key) { this.key = key; } @@ -46,6 +47,7 @@ protected GroupedFlowable(K key) { * * @return the key that the items emitted by this {@code GroupedObservable} were grouped by */ + @Nullable public K getKey() { return key; } diff --git a/src/main/java/io/reactivex/internal/disposables/DisposableHelper.java b/src/main/java/io/reactivex/internal/disposables/DisposableHelper.java index 46f13bd743..f9ad177399 100644 --- a/src/main/java/io/reactivex/internal/disposables/DisposableHelper.java +++ b/src/main/java/io/reactivex/internal/disposables/DisposableHelper.java @@ -20,6 +20,7 @@ import io.reactivex.internal.functions.ObjectHelper; import io.reactivex.plugins.RxJavaPlugins; + /** * Utility methods for working with Disposables atomically. */ diff --git a/src/main/java/io/reactivex/observables/ConnectableObservable.java b/src/main/java/io/reactivex/observables/ConnectableObservable.java index cb66875298..2fed7634eb 100644 --- a/src/main/java/io/reactivex/observables/ConnectableObservable.java +++ b/src/main/java/io/reactivex/observables/ConnectableObservable.java @@ -13,6 +13,7 @@ package io.reactivex.observables; +import io.reactivex.annotations.NonNull; import org.reactivestreams.Subscriber; import io.reactivex.*; @@ -47,7 +48,7 @@ public abstract class ConnectableObservable extends Observable { * allowing the caller to synchronously disconnect a synchronous source * @see ReactiveX documentation: Connect */ - public abstract void connect(Consumer connection); + public abstract void connect(@NonNull Consumer connection); /** * Instructs the {@code ConnectableObservable} to begin emitting the items from its underlying @@ -71,6 +72,7 @@ public final Disposable connect() { * @return an {@link Observable} * @see ReactiveX documentation: RefCount */ + @NonNull public Observable refCount() { return RxJavaPlugins.onAssembly(new ObservableRefCount(this)); } @@ -82,6 +84,7 @@ public Observable refCount() { * @return an Observable that automatically connects to this ConnectableObservable * when the first Subscriber subscribes */ + @NonNull public Observable autoConnect() { return autoConnect(1); } @@ -95,6 +98,7 @@ public Observable autoConnect() { * @return an Observable that automatically connects to this ConnectableObservable * when the specified number of Subscribers subscribe to it */ + @NonNull public Observable autoConnect(int numberOfSubscribers) { return autoConnect(numberOfSubscribers, Functions.emptyConsumer()); } @@ -113,7 +117,8 @@ public Observable autoConnect(int numberOfSubscribers) { * when the specified number of Subscribers subscribe to it and calls the * specified callback with the Subscription associated with the established connection */ - public Observable autoConnect(int numberOfSubscribers, Consumer connection) { + @NonNull + public Observable autoConnect(int numberOfSubscribers, @NonNull Consumer connection) { if (numberOfSubscribers <= 0) { this.connect(connection); return RxJavaPlugins.onAssembly(this); diff --git a/src/main/java/io/reactivex/observables/GroupedObservable.java b/src/main/java/io/reactivex/observables/GroupedObservable.java index 2641ad43f7..a9673715dd 100644 --- a/src/main/java/io/reactivex/observables/GroupedObservable.java +++ b/src/main/java/io/reactivex/observables/GroupedObservable.java @@ -13,6 +13,7 @@ package io.reactivex.observables; import io.reactivex.Observable; +import io.reactivex.annotations.Nullable; /** * An {@link Observable} that has been grouped by key, the value of which can be obtained with {@link #getKey()}. @@ -30,14 +31,14 @@ * @see ReactiveX documentation: GroupBy */ public abstract class GroupedObservable extends Observable { - + final K key; /** * Constructs a GroupedObservable with the given key. * @param key the key */ - protected GroupedObservable(K key) { + protected GroupedObservable(@Nullable K key) { this.key = key; } @@ -46,6 +47,7 @@ protected GroupedObservable(K key) { * * @return the key that the items emitted by this {@code GroupedObservable} were grouped by */ + @Nullable public K getKey() { return key; } diff --git a/src/main/java/io/reactivex/observers/DefaultObserver.java b/src/main/java/io/reactivex/observers/DefaultObserver.java index 096388c2e9..c0ef8475c3 100644 --- a/src/main/java/io/reactivex/observers/DefaultObserver.java +++ b/src/main/java/io/reactivex/observers/DefaultObserver.java @@ -14,6 +14,7 @@ package io.reactivex.observers; import io.reactivex.Observer; +import io.reactivex.annotations.NonNull; import io.reactivex.disposables.Disposable; import io.reactivex.internal.disposables.DisposableHelper; @@ -65,7 +66,7 @@ public abstract class DefaultObserver implements Observer { private Disposable s; @Override - public final void onSubscribe(Disposable s) { + public final void onSubscribe(@NonNull Disposable s) { if (DisposableHelper.validate(this.s, s)) { this.s = s; onStart(); diff --git a/src/main/java/io/reactivex/observers/DisposableCompletableObserver.java b/src/main/java/io/reactivex/observers/DisposableCompletableObserver.java index b0a1c18914..e0d6902a76 100644 --- a/src/main/java/io/reactivex/observers/DisposableCompletableObserver.java +++ b/src/main/java/io/reactivex/observers/DisposableCompletableObserver.java @@ -16,6 +16,7 @@ import java.util.concurrent.atomic.AtomicReference; import io.reactivex.CompletableObserver; +import io.reactivex.annotations.NonNull; import io.reactivex.disposables.Disposable; import io.reactivex.internal.disposables.DisposableHelper; @@ -53,7 +54,7 @@ public abstract class DisposableCompletableObserver implements CompletableObserv final AtomicReference s = new AtomicReference(); @Override - public final void onSubscribe(Disposable s) { + public final void onSubscribe(@NonNull Disposable s) { if (DisposableHelper.setOnce(this.s, s)) { onStart(); } diff --git a/src/main/java/io/reactivex/observers/DisposableMaybeObserver.java b/src/main/java/io/reactivex/observers/DisposableMaybeObserver.java index f4d980f060..09686adc3d 100644 --- a/src/main/java/io/reactivex/observers/DisposableMaybeObserver.java +++ b/src/main/java/io/reactivex/observers/DisposableMaybeObserver.java @@ -16,6 +16,7 @@ import java.util.concurrent.atomic.AtomicReference; import io.reactivex.MaybeObserver; +import io.reactivex.annotations.NonNull; import io.reactivex.disposables.Disposable; import io.reactivex.internal.disposables.DisposableHelper; @@ -60,10 +61,11 @@ * @param the received value type */ public abstract class DisposableMaybeObserver implements MaybeObserver, Disposable { + final AtomicReference s = new AtomicReference(); @Override - public final void onSubscribe(Disposable s) { + public final void onSubscribe(@NonNull Disposable s) { if (DisposableHelper.setOnce(this.s, s)) { onStart(); } diff --git a/src/main/java/io/reactivex/observers/DisposableObserver.java b/src/main/java/io/reactivex/observers/DisposableObserver.java index da863b1fa5..7d22b8201b 100644 --- a/src/main/java/io/reactivex/observers/DisposableObserver.java +++ b/src/main/java/io/reactivex/observers/DisposableObserver.java @@ -16,8 +16,9 @@ import java.util.concurrent.atomic.AtomicReference; import io.reactivex.Observer; +import io.reactivex.annotations.NonNull; import io.reactivex.disposables.Disposable; -import io.reactivex.internal.disposables.*; +import io.reactivex.internal.disposables.DisposableHelper; /** * An abstract {@link Observer} that allows asynchronous cancellation by implementing Disposable. @@ -63,10 +64,11 @@ * @param the received value type */ public abstract class DisposableObserver implements Observer, Disposable { + final AtomicReference s = new AtomicReference(); @Override - public final void onSubscribe(Disposable s) { + public final void onSubscribe(@NonNull Disposable s) { if (DisposableHelper.setOnce(this.s, s)) { onStart(); } diff --git a/src/main/java/io/reactivex/observers/DisposableSingleObserver.java b/src/main/java/io/reactivex/observers/DisposableSingleObserver.java index dd5f0aab09..0515f458ac 100644 --- a/src/main/java/io/reactivex/observers/DisposableSingleObserver.java +++ b/src/main/java/io/reactivex/observers/DisposableSingleObserver.java @@ -16,6 +16,7 @@ import java.util.concurrent.atomic.AtomicReference; import io.reactivex.SingleObserver; +import io.reactivex.annotations.NonNull; import io.reactivex.disposables.Disposable; import io.reactivex.internal.disposables.DisposableHelper; @@ -52,10 +53,11 @@ * @param the received value type */ public abstract class DisposableSingleObserver implements SingleObserver, Disposable { + final AtomicReference s = new AtomicReference(); @Override - public final void onSubscribe(Disposable s) { + public final void onSubscribe(@NonNull Disposable s) { if (DisposableHelper.setOnce(this.s, s)) { onStart(); } diff --git a/src/main/java/io/reactivex/observers/ResourceCompletableObserver.java b/src/main/java/io/reactivex/observers/ResourceCompletableObserver.java index b892abbf62..58906dcb6c 100644 --- a/src/main/java/io/reactivex/observers/ResourceCompletableObserver.java +++ b/src/main/java/io/reactivex/observers/ResourceCompletableObserver.java @@ -16,9 +16,9 @@ import java.util.concurrent.atomic.AtomicReference; import io.reactivex.CompletableObserver; +import io.reactivex.annotations.NonNull; import io.reactivex.disposables.Disposable; -import io.reactivex.internal.disposables.DisposableHelper; -import io.reactivex.internal.disposables.ListCompositeDisposable; +import io.reactivex.internal.disposables.*; import io.reactivex.internal.functions.ObjectHelper; /** @@ -85,13 +85,13 @@ public abstract class ResourceCompletableObserver implements CompletableObserver * * @throws NullPointerException if resource is null */ - public final void add(Disposable resource) { + public final void add(@NonNull Disposable resource) { ObjectHelper.requireNonNull(resource, "resource is null"); resources.add(resource); } @Override - public final void onSubscribe(Disposable s) { + public final void onSubscribe(@NonNull Disposable s) { if (DisposableHelper.setOnce(this.s, s)) { onStart(); } diff --git a/src/main/java/io/reactivex/observers/ResourceMaybeObserver.java b/src/main/java/io/reactivex/observers/ResourceMaybeObserver.java index c12e9e2417..4a0287e01f 100644 --- a/src/main/java/io/reactivex/observers/ResourceMaybeObserver.java +++ b/src/main/java/io/reactivex/observers/ResourceMaybeObserver.java @@ -16,9 +16,9 @@ import java.util.concurrent.atomic.AtomicReference; import io.reactivex.MaybeObserver; +import io.reactivex.annotations.NonNull; import io.reactivex.disposables.Disposable; -import io.reactivex.internal.disposables.DisposableHelper; -import io.reactivex.internal.disposables.ListCompositeDisposable; +import io.reactivex.internal.disposables.*; import io.reactivex.internal.functions.ObjectHelper; /** @@ -95,13 +95,13 @@ public abstract class ResourceMaybeObserver implements MaybeObserver, Disp * * @throws NullPointerException if resource is null */ - public final void add(Disposable resource) { + public final void add(@NonNull Disposable resource) { ObjectHelper.requireNonNull(resource, "resource is null"); resources.add(resource); } @Override - public final void onSubscribe(Disposable s) { + public final void onSubscribe(@NonNull Disposable s) { if (DisposableHelper.setOnce(this.s, s)) { onStart(); } diff --git a/src/main/java/io/reactivex/observers/ResourceObserver.java b/src/main/java/io/reactivex/observers/ResourceObserver.java index ef7fd703fd..834b73674a 100644 --- a/src/main/java/io/reactivex/observers/ResourceObserver.java +++ b/src/main/java/io/reactivex/observers/ResourceObserver.java @@ -16,6 +16,7 @@ import java.util.concurrent.atomic.AtomicReference; import io.reactivex.Observer; +import io.reactivex.annotations.NonNull; import io.reactivex.disposables.Disposable; import io.reactivex.internal.disposables.*; import io.reactivex.internal.functions.ObjectHelper; @@ -92,7 +93,7 @@ public abstract class ResourceObserver implements Observer, Disposable { * * @throws NullPointerException if resource is null */ - public final void add(Disposable resource) { + public final void add(@NonNull Disposable resource) { ObjectHelper.requireNonNull(resource, "resource is null"); resources.add(resource); } diff --git a/src/main/java/io/reactivex/observers/ResourceSingleObserver.java b/src/main/java/io/reactivex/observers/ResourceSingleObserver.java index 88efae5c74..4c18a139c2 100644 --- a/src/main/java/io/reactivex/observers/ResourceSingleObserver.java +++ b/src/main/java/io/reactivex/observers/ResourceSingleObserver.java @@ -16,9 +16,9 @@ import java.util.concurrent.atomic.AtomicReference; import io.reactivex.SingleObserver; +import io.reactivex.annotations.NonNull; import io.reactivex.disposables.Disposable; -import io.reactivex.internal.disposables.DisposableHelper; -import io.reactivex.internal.disposables.ListCompositeDisposable; +import io.reactivex.internal.disposables.*; import io.reactivex.internal.functions.ObjectHelper; /** @@ -88,13 +88,13 @@ public abstract class ResourceSingleObserver implements SingleObserver, Di * * @throws NullPointerException if resource is null */ - public final void add(Disposable resource) { + public final void add(@NonNull Disposable resource) { ObjectHelper.requireNonNull(resource, "resource is null"); resources.add(resource); } @Override - public final void onSubscribe(Disposable s) { + public final void onSubscribe(@NonNull Disposable s) { if (DisposableHelper.setOnce(this.s, s)) { onStart(); } diff --git a/src/main/java/io/reactivex/observers/SafeObserver.java b/src/main/java/io/reactivex/observers/SafeObserver.java index 76edd194d3..a370f3799b 100644 --- a/src/main/java/io/reactivex/observers/SafeObserver.java +++ b/src/main/java/io/reactivex/observers/SafeObserver.java @@ -13,6 +13,7 @@ package io.reactivex.observers; import io.reactivex.Observer; +import io.reactivex.annotations.*; import io.reactivex.disposables.Disposable; import io.reactivex.exceptions.*; import io.reactivex.internal.disposables.*; @@ -36,12 +37,12 @@ public final class SafeObserver implements Observer, Disposable { * Constructs a SafeObserver by wrapping the given actual Observer. * @param actual the actual Observer to wrap, not null (not validated) */ - public SafeObserver(Observer actual) { + public SafeObserver(@NonNull Observer actual) { this.actual = actual; } @Override - public void onSubscribe(Disposable s) { + public void onSubscribe(@NonNull Disposable s) { if (DisposableHelper.validate(this.s, s)) { this.s = s; try { @@ -74,7 +75,7 @@ public boolean isDisposed() { } @Override - public void onNext(T t) { + public void onNext(@NonNull T t) { if (done) { return; } @@ -134,7 +135,7 @@ void onNextNoSubscription() { } @Override - public void onError(Throwable t) { + public void onError(@NonNull Throwable t) { if (done) { RxJavaPlugins.onError(t); return; diff --git a/src/main/java/io/reactivex/observers/SerializedObserver.java b/src/main/java/io/reactivex/observers/SerializedObserver.java index 7b429415af..ec2061ba97 100644 --- a/src/main/java/io/reactivex/observers/SerializedObserver.java +++ b/src/main/java/io/reactivex/observers/SerializedObserver.java @@ -13,6 +13,7 @@ package io.reactivex.observers; import io.reactivex.Observer; +import io.reactivex.annotations.*; import io.reactivex.disposables.Disposable; import io.reactivex.internal.disposables.DisposableHelper; import io.reactivex.internal.util.*; @@ -46,7 +47,7 @@ public final class SerializedObserver implements Observer, Disposable { * Construct a SerializedObserver by wrapping the given actual Observer. * @param actual the actual Observer, not null (not verified) */ - public SerializedObserver(Observer actual) { + public SerializedObserver(@NonNull Observer actual) { this(actual, false); } @@ -57,13 +58,13 @@ public SerializedObserver(Observer actual) { * @param actual the actual Observer, not null (not verified) * @param delayError if true, errors are emitted after regular values have been emitted */ - public SerializedObserver(Observer actual, boolean delayError) { + public SerializedObserver(@NonNull Observer actual, boolean delayError) { this.actual = actual; this.delayError = delayError; } @Override - public void onSubscribe(Disposable s) { + public void onSubscribe(@NonNull Disposable s) { if (DisposableHelper.validate(this.s, s)) { this.s = s; @@ -84,7 +85,7 @@ public boolean isDisposed() { @Override - public void onNext(T t) { + public void onNext(@NonNull T t) { if (done) { return; } @@ -115,7 +116,7 @@ public void onNext(T t) { } @Override - public void onError(Throwable t) { + public void onError(@NonNull Throwable t) { if (done) { RxJavaPlugins.onError(t); return; diff --git a/src/main/java/io/reactivex/parallel/ParallelFlowable.java b/src/main/java/io/reactivex/parallel/ParallelFlowable.java index b308545be4..e42e2c78ad 100644 --- a/src/main/java/io/reactivex/parallel/ParallelFlowable.java +++ b/src/main/java/io/reactivex/parallel/ParallelFlowable.java @@ -16,8 +16,6 @@ import java.util.*; import java.util.concurrent.Callable; -import org.reactivestreams.*; - import io.reactivex.*; import io.reactivex.annotations.*; import io.reactivex.exceptions.Exceptions; @@ -27,6 +25,7 @@ import io.reactivex.internal.subscriptions.EmptySubscription; import io.reactivex.internal.util.*; import io.reactivex.plugins.RxJavaPlugins; +import org.reactivestreams.*; /** * Abstract base class for Parallel publishers that take an array of Subscribers. @@ -112,6 +111,7 @@ public static ParallelFlowable from(@NonNull Publisher sourc * @return the new ParallelFlowable instance */ @CheckReturnValue + @NonNull public static ParallelFlowable from(@NonNull Publisher source, int parallelism, int prefetch) { ObjectHelper.requireNonNull(source, "source"); @@ -130,6 +130,7 @@ public static ParallelFlowable from(@NonNull Publisher sourc * @return the new ParallelFlowable instance */ @CheckReturnValue + @NonNull public final ParallelFlowable map(@NonNull Function mapper) { ObjectHelper.requireNonNull(mapper, "mapper"); return RxJavaPlugins.onAssembly(new ParallelMap(this, mapper)); @@ -149,6 +150,7 @@ public final ParallelFlowable map(@NonNull Function ParallelFlowable map(@NonNull Function mapper, @NonNull ParallelFailureHandling errorHandler) { ObjectHelper.requireNonNull(mapper, "mapper"); ObjectHelper.requireNonNull(errorHandler, "errorHandler is null"); @@ -170,6 +172,7 @@ public final ParallelFlowable map(@NonNull Function ParallelFlowable map(@NonNull Function mapper, @NonNull BiFunction errorHandler) { ObjectHelper.requireNonNull(mapper, "mapper"); ObjectHelper.requireNonNull(errorHandler, "errorHandler is null"); @@ -249,6 +252,7 @@ public final ParallelFlowable filter(@NonNull Predicate predicate, * @return the new ParallelFlowable instance */ @CheckReturnValue + @NonNull public final ParallelFlowable runOn(@NonNull Scheduler scheduler) { return runOn(scheduler, Flowable.bufferSize()); } @@ -275,6 +279,7 @@ public final ParallelFlowable runOn(@NonNull Scheduler scheduler) { * @return the new ParallelFlowable instance */ @CheckReturnValue + @NonNull public final ParallelFlowable runOn(@NonNull Scheduler scheduler, int prefetch) { ObjectHelper.requireNonNull(scheduler, "scheduler"); ObjectHelper.verifyPositive(prefetch, "prefetch"); @@ -290,6 +295,7 @@ public final ParallelFlowable runOn(@NonNull Scheduler scheduler, int prefetc * @return the new Flowable instance emitting the reduced value or empty if the ParallelFlowable was empty */ @CheckReturnValue + @NonNull public final Flowable reduce(@NonNull BiFunction reducer) { ObjectHelper.requireNonNull(reducer, "reducer"); return RxJavaPlugins.onAssembly(new ParallelReduceFull(this, reducer)); @@ -307,6 +313,7 @@ public final Flowable reduce(@NonNull BiFunction reducer) { * @return the new ParallelFlowable instance */ @CheckReturnValue + @NonNull public final ParallelFlowable reduce(@NonNull Callable initialSupplier, @NonNull BiFunction reducer) { ObjectHelper.requireNonNull(initialSupplier, "initialSupplier"); ObjectHelper.requireNonNull(reducer, "reducer"); @@ -356,6 +363,7 @@ public final Flowable sequential() { @BackpressureSupport(BackpressureKind.FULL) @SchedulerSupport(SchedulerSupport.NONE) @CheckReturnValue + @NonNull public final Flowable sequential(int prefetch) { ObjectHelper.verifyPositive(prefetch, "prefetch"); return RxJavaPlugins.onAssembly(new ParallelJoin(this, prefetch, false)); @@ -383,6 +391,7 @@ public final Flowable sequential(int prefetch) { @SchedulerSupport(SchedulerSupport.NONE) @CheckReturnValue @Experimental + @NonNull public final Flowable sequentialDelayError() { return sequentialDelayError(Flowable.bufferSize()); } @@ -407,6 +416,7 @@ public final Flowable sequentialDelayError() { @BackpressureSupport(BackpressureKind.FULL) @SchedulerSupport(SchedulerSupport.NONE) @CheckReturnValue + @NonNull public final Flowable sequentialDelayError(int prefetch) { ObjectHelper.verifyPositive(prefetch, "prefetch"); return RxJavaPlugins.onAssembly(new ParallelJoin(this, prefetch, true)); @@ -422,6 +432,7 @@ public final Flowable sequentialDelayError(int prefetch) { * @return the new Flowable instance */ @CheckReturnValue + @NonNull public final Flowable sorted(@NonNull Comparator comparator) { return sorted(comparator, 16); } @@ -437,6 +448,7 @@ public final Flowable sorted(@NonNull Comparator comparator) { * @return the new Flowable instance */ @CheckReturnValue + @NonNull public final Flowable sorted(@NonNull Comparator comparator, int capacityHint) { ObjectHelper.requireNonNull(comparator, "comparator is null"); ObjectHelper.verifyPositive(capacityHint, "capacityHint"); @@ -456,6 +468,7 @@ public final Flowable sorted(@NonNull Comparator comparator, int c * @return the new Flowable instance */ @CheckReturnValue + @NonNull public final Flowable> toSortedList(@NonNull Comparator comparator) { return toSortedList(comparator, 16); } @@ -469,6 +482,7 @@ public final Flowable> toSortedList(@NonNull Comparator compa * @return the new Flowable instance */ @CheckReturnValue + @NonNull public final Flowable> toSortedList(@NonNull Comparator comparator, int capacityHint) { ObjectHelper.requireNonNull(comparator, "comparator is null"); ObjectHelper.verifyPositive(capacityHint, "capacityHint"); @@ -489,6 +503,7 @@ public final Flowable> toSortedList(@NonNull Comparator compa * @return the new ParallelFlowable instance */ @CheckReturnValue + @NonNull public final ParallelFlowable doOnNext(@NonNull Consumer onNext) { ObjectHelper.requireNonNull(onNext, "onNext is null"); return RxJavaPlugins.onAssembly(new ParallelPeek(this, @@ -516,6 +531,7 @@ public final ParallelFlowable doOnNext(@NonNull Consumer onNext) { */ @CheckReturnValue @Experimental + @NonNull public final ParallelFlowable doOnNext(@NonNull Consumer onNext, @NonNull ParallelFailureHandling errorHandler) { ObjectHelper.requireNonNull(onNext, "onNext is null"); ObjectHelper.requireNonNull(errorHandler, "errorHandler is null"); @@ -535,6 +551,7 @@ public final ParallelFlowable doOnNext(@NonNull Consumer onNext, @ */ @CheckReturnValue @Experimental + @NonNull public final ParallelFlowable doOnNext(@NonNull Consumer onNext, @NonNull BiFunction errorHandler) { ObjectHelper.requireNonNull(onNext, "onNext is null"); ObjectHelper.requireNonNull(errorHandler, "errorHandler is null"); @@ -549,6 +566,7 @@ public final ParallelFlowable doOnNext(@NonNull Consumer onNext, @ * @return the new ParallelFlowable instance */ @CheckReturnValue + @NonNull public final ParallelFlowable doAfterNext(@NonNull Consumer onAfterNext) { ObjectHelper.requireNonNull(onAfterNext, "onAfterNext is null"); return RxJavaPlugins.onAssembly(new ParallelPeek(this, @@ -570,6 +588,7 @@ public final ParallelFlowable doAfterNext(@NonNull Consumer onAfte * @return the new ParallelFlowable instance */ @CheckReturnValue + @NonNull public final ParallelFlowable doOnError(@NonNull Consumer onError) { ObjectHelper.requireNonNull(onError, "onError is null"); return RxJavaPlugins.onAssembly(new ParallelPeek(this, @@ -591,6 +610,7 @@ public final ParallelFlowable doOnError(@NonNull Consumer onError) * @return the new ParallelFlowable instance */ @CheckReturnValue + @NonNull public final ParallelFlowable doOnComplete(@NonNull Action onComplete) { ObjectHelper.requireNonNull(onComplete, "onComplete is null"); return RxJavaPlugins.onAssembly(new ParallelPeek(this, @@ -612,6 +632,7 @@ public final ParallelFlowable doOnComplete(@NonNull Action onComplete) { * @return the new ParallelFlowable instance */ @CheckReturnValue + @NonNull public final ParallelFlowable doAfterTerminated(@NonNull Action onAfterTerminate) { ObjectHelper.requireNonNull(onAfterTerminate, "onAfterTerminate is null"); return RxJavaPlugins.onAssembly(new ParallelPeek(this, @@ -633,6 +654,7 @@ public final ParallelFlowable doAfterTerminated(@NonNull Action onAfterTermin * @return the new ParallelFlowable instance */ @CheckReturnValue + @NonNull public final ParallelFlowable doOnSubscribe(@NonNull Consumer onSubscribe) { ObjectHelper.requireNonNull(onSubscribe, "onSubscribe is null"); return RxJavaPlugins.onAssembly(new ParallelPeek(this, @@ -654,6 +676,7 @@ public final ParallelFlowable doOnSubscribe(@NonNull Consumer doOnRequest(@NonNull LongConsumer onRequest) { ObjectHelper.requireNonNull(onRequest, "onRequest is null"); return RxJavaPlugins.onAssembly(new ParallelPeek(this, @@ -675,6 +698,7 @@ public final ParallelFlowable doOnRequest(@NonNull LongConsumer onRequest) { * @return the new ParallelFlowable instance */ @CheckReturnValue + @NonNull public final ParallelFlowable doOnCancel(@NonNull Action onCancel) { ObjectHelper.requireNonNull(onCancel, "onCancel is null"); return RxJavaPlugins.onAssembly(new ParallelPeek(this, @@ -699,6 +723,7 @@ public final ParallelFlowable doOnCancel(@NonNull Action onCancel) { * @return the new ParallelFlowable instance */ @CheckReturnValue + @NonNull public final ParallelFlowable collect(@NonNull Callable collectionSupplier, @NonNull BiConsumer collector) { ObjectHelper.requireNonNull(collectionSupplier, "collectionSupplier is null"); ObjectHelper.requireNonNull(collector, "collector is null"); @@ -714,6 +739,7 @@ public final ParallelFlowable collect(@NonNull Callable coll * @return the new ParallelFlowable instance */ @CheckReturnValue + @NonNull public static ParallelFlowable fromArray(@NonNull Publisher... publishers) { if (publishers.length == 0) { throw new IllegalArgumentException("Zero publishers not supported"); @@ -730,6 +756,7 @@ public static ParallelFlowable fromArray(@NonNull Publisher... publish * @return the value returned by the converter function */ @CheckReturnValue + @NonNull public final U to(@NonNull Function, U> converter) { try { return ObjectHelper.requireNonNull(converter, "converter is null").apply(this); @@ -748,6 +775,7 @@ public final U to(@NonNull Function, U> converte * @return the ParallelFlowable returned by the function */ @CheckReturnValue + @NonNull public final ParallelFlowable compose(@NonNull ParallelTransformer composer) { return RxJavaPlugins.onAssembly(ObjectHelper.requireNonNull(composer, "composer is null").apply(this)); } @@ -762,6 +790,7 @@ public final ParallelFlowable compose(@NonNull ParallelTransformer * @return the new ParallelFlowable instance */ @CheckReturnValue + @NonNull public final ParallelFlowable flatMap(@NonNull Function> mapper) { return flatMap(mapper, false, Integer.MAX_VALUE, Flowable.bufferSize()); } @@ -777,6 +806,7 @@ public final ParallelFlowable flatMap(@NonNull Function ParallelFlowable flatMap( @NonNull Function> mapper, boolean delayError) { return flatMap(mapper, delayError, Integer.MAX_VALUE, Flowable.bufferSize()); @@ -795,6 +825,7 @@ public final ParallelFlowable flatMap( * @return the new ParallelFlowable instance */ @CheckReturnValue + @NonNull public final ParallelFlowable flatMap( @NonNull Function> mapper, boolean delayError, int maxConcurrency) { return flatMap(mapper, delayError, maxConcurrency, Flowable.bufferSize()); @@ -813,6 +844,7 @@ public final ParallelFlowable flatMap( * @return the new ParallelFlowable instance */ @CheckReturnValue + @NonNull public final ParallelFlowable flatMap( @NonNull Function> mapper, boolean delayError, int maxConcurrency, int prefetch) { @@ -832,6 +864,7 @@ public final ParallelFlowable flatMap( * @return the new ParallelFlowable instance */ @CheckReturnValue + @NonNull public final ParallelFlowable concatMap( @NonNull Function> mapper) { return concatMap(mapper, 2); @@ -848,6 +881,7 @@ public final ParallelFlowable concatMap( * @return the new ParallelFlowable instance */ @CheckReturnValue + @NonNull public final ParallelFlowable concatMap( @NonNull Function> mapper, int prefetch) { @@ -868,6 +902,7 @@ public final ParallelFlowable concatMap( * @return the new ParallelFlowable instance */ @CheckReturnValue + @NonNull public final ParallelFlowable concatMapDelayError( @NonNull Function> mapper, boolean tillTheEnd) { @@ -886,6 +921,7 @@ public final ParallelFlowable concatMapDelayError( * @return the new ParallelFlowable instance */ @CheckReturnValue + @NonNull public final ParallelFlowable concatMapDelayError( @NonNull Function> mapper, int prefetch, boolean tillTheEnd) { diff --git a/src/main/java/io/reactivex/processors/AsyncProcessor.java b/src/main/java/io/reactivex/processors/AsyncProcessor.java index f24a1954c1..67e6b7f6a8 100644 --- a/src/main/java/io/reactivex/processors/AsyncProcessor.java +++ b/src/main/java/io/reactivex/processors/AsyncProcessor.java @@ -12,14 +12,13 @@ */ package io.reactivex.processors; -import io.reactivex.annotations.CheckReturnValue; import java.util.Arrays; import java.util.concurrent.atomic.AtomicReference; -import org.reactivestreams.*; - +import io.reactivex.annotations.*; import io.reactivex.internal.subscriptions.DeferredScalarSubscription; import io.reactivex.plugins.RxJavaPlugins; +import org.reactivestreams.*; /** * A Subject that emits the very last value followed by a completion event or the received error to Subscribers. @@ -51,6 +50,7 @@ public final class AsyncProcessor extends FlowableProcessor { * @return the new AsyncProcessor instance */ @CheckReturnValue + @NonNull public static AsyncProcessor create() { return new AsyncProcessor(); } diff --git a/src/main/java/io/reactivex/processors/FlowableProcessor.java b/src/main/java/io/reactivex/processors/FlowableProcessor.java index 80699926a3..e5fe4ad838 100644 --- a/src/main/java/io/reactivex/processors/FlowableProcessor.java +++ b/src/main/java/io/reactivex/processors/FlowableProcessor.java @@ -13,9 +13,9 @@ package io.reactivex.processors; -import org.reactivestreams.Processor; - import io.reactivex.*; +import io.reactivex.annotations.NonNull; +import org.reactivestreams.Processor; /** * Represents a Subscriber and a Flowable (Publisher) at the same time, allowing @@ -66,6 +66,7 @@ public abstract class FlowableProcessor extends Flowable implements Proces *

The method is thread-safe. * @return the wrapped and serialized subject */ + @NonNull public final FlowableProcessor toSerialized() { if (this instanceof SerializedProcessor) { return this; diff --git a/src/main/java/io/reactivex/schedulers/Timed.java b/src/main/java/io/reactivex/schedulers/Timed.java index fefcc17620..a1bd20e4be 100644 --- a/src/main/java/io/reactivex/schedulers/Timed.java +++ b/src/main/java/io/reactivex/schedulers/Timed.java @@ -15,6 +15,7 @@ import java.util.concurrent.TimeUnit; +import io.reactivex.annotations.NonNull; import io.reactivex.internal.functions.ObjectHelper; /** @@ -34,7 +35,7 @@ public final class Timed { * @param unit the time unit, not null * @throws NullPointerException if unit is null */ - public Timed(T value, long time, TimeUnit unit) { + public Timed(@NonNull T value, long time, @NonNull TimeUnit unit) { this.value = value; this.time = time; this.unit = ObjectHelper.requireNonNull(unit, "unit is null"); @@ -44,6 +45,7 @@ public Timed(T value, long time, TimeUnit unit) { * Returns the contained value. * @return the contained value */ + @NonNull public T value() { return value; } @@ -52,6 +54,7 @@ public T value() { * Returns the time unit of the contained time. * @return the time unit of the contained time */ + @NonNull public TimeUnit unit() { return unit; } @@ -69,7 +72,7 @@ public long time() { * @param unit the time unt * @return the converted time */ - public long time(TimeUnit unit) { + public long time(@NonNull TimeUnit unit) { return unit.convert(time, this.unit); } diff --git a/src/main/java/io/reactivex/subjects/SingleSubject.java b/src/main/java/io/reactivex/subjects/SingleSubject.java index 0c3eab5582..6e8b0e2006 100644 --- a/src/main/java/io/reactivex/subjects/SingleSubject.java +++ b/src/main/java/io/reactivex/subjects/SingleSubject.java @@ -53,6 +53,7 @@ public final class SingleSubject extends Single implements SingleObserver< * @return the new SingleSubject instance */ @CheckReturnValue + @NonNull public static SingleSubject create() { return new SingleSubject(); } @@ -64,7 +65,7 @@ public static SingleSubject create() { } @Override - public void onSubscribe(Disposable d) { + public void onSubscribe(@NonNull Disposable d) { if (observers.get() == TERMINATED) { d.dispose(); } @@ -72,7 +73,7 @@ public void onSubscribe(Disposable d) { @SuppressWarnings("unchecked") @Override - public void onSuccess(T value) { + public void onSuccess(@NonNull T value) { if (value == null) { onError(new NullPointerException("Null values are not allowed in 2.x")); return; @@ -87,7 +88,7 @@ public void onSuccess(T value) { @SuppressWarnings("unchecked") @Override - public void onError(Throwable e) { + public void onError(@NonNull Throwable e) { if (e == null) { e = new NullPointerException("Null errors are not allowed in 2.x"); } @@ -102,7 +103,7 @@ public void onError(Throwable e) { } @Override - protected void subscribeActual(SingleObserver observer) { + protected void subscribeActual(@NonNull SingleObserver observer) { SingleDisposable md = new SingleDisposable(observer, this); observer.onSubscribe(md); if (add(md)) { @@ -119,7 +120,7 @@ protected void subscribeActual(SingleObserver observer) { } } - boolean add(SingleDisposable inner) { + boolean add(@NonNull SingleDisposable inner) { for (;;) { SingleDisposable[] a = observers.get(); if (a == TERMINATED) { @@ -138,7 +139,7 @@ boolean add(SingleDisposable inner) { } @SuppressWarnings("unchecked") - void remove(SingleDisposable inner) { + void remove(@NonNull SingleDisposable inner) { for (;;) { SingleDisposable[] a = observers.get(); int n = a.length; @@ -177,6 +178,7 @@ void remove(SingleDisposable inner) { * Returns the success value if this SingleSubject was terminated with a success value. * @return the success value or null */ + @Nullable public T getValue() { if (observers.get() == TERMINATED) { return value; @@ -196,6 +198,7 @@ public boolean hasValue() { * Returns the terminal error if this SingleSubject has been terminated with an error, null otherwise. * @return the terminal error or null if not terminated or not with an error */ + @Nullable public Throwable getThrowable() { if (observers.get() == TERMINATED) { return error; diff --git a/src/main/java/io/reactivex/subjects/Subject.java b/src/main/java/io/reactivex/subjects/Subject.java index f18464bebd..79bcb3934b 100644 --- a/src/main/java/io/reactivex/subjects/Subject.java +++ b/src/main/java/io/reactivex/subjects/Subject.java @@ -14,6 +14,7 @@ package io.reactivex.subjects; import io.reactivex.*; +import io.reactivex.annotations.*; /** * Represents an Observer and an Observable at the same time, allowing @@ -55,6 +56,7 @@ public abstract class Subject extends Observable implements Observer { * @return the error that caused the Subject to terminate or null if the Subject * hasn't terminated yet */ + @Nullable public abstract Throwable getThrowable(); /** @@ -63,6 +65,7 @@ public abstract class Subject extends Observable implements Observer { *

The method is thread-safe. * @return the wrapped and serialized subject */ + @NonNull public final Subject toSerialized() { if (this instanceof SerializedSubject) { return this; From 83d1a4f4eb66fbf6e2afd6045f96e31ced6aedfc Mon Sep 17 00:00:00 2001 From: Bloder Date: Sun, 2 Apr 2017 05:46:55 -0300 Subject: [PATCH 007/524] Remove @NonNull annotations in BiConsumer (#5257) --- src/main/java/io/reactivex/functions/BiConsumer.java | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/src/main/java/io/reactivex/functions/BiConsumer.java b/src/main/java/io/reactivex/functions/BiConsumer.java index bd382a1956..6b147ae0b0 100644 --- a/src/main/java/io/reactivex/functions/BiConsumer.java +++ b/src/main/java/io/reactivex/functions/BiConsumer.java @@ -13,8 +13,6 @@ package io.reactivex.functions; -import io.reactivex.annotations.NonNull; - /** * A functional interface (callback) that accepts two values (of possibly different types). * @param the first value type @@ -28,5 +26,5 @@ public interface BiConsumer { * @param t2 the second value * @throws Exception on error */ - void accept(@NonNull T1 t1, @NonNull T2 t2) throws Exception; + void accept(T1 t1, T2 t2) throws Exception; } From a0bfaa220b6cac0aa3c4ea4a7963da19e7df880e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?D=C3=A1vid=20Karnok?= Date: Sun, 2 Apr 2017 16:53:13 +0200 Subject: [PATCH 008/524] 2.x: fix whitespaces --- src/main/java/io/reactivex/flowables/GroupedFlowable.java | 2 +- .../java/io/reactivex/observables/GroupedObservable.java | 2 +- .../io/reactivex/observers/DisposableMaybeObserver.java | 2 +- .../java/io/reactivex/observers/DisposableObserver.java | 2 +- .../io/reactivex/observers/DisposableSingleObserver.java | 2 +- .../io/reactivex/internal/functions/ObjectHelperTest.java | 8 ++++---- 6 files changed, 9 insertions(+), 9 deletions(-) diff --git a/src/main/java/io/reactivex/flowables/GroupedFlowable.java b/src/main/java/io/reactivex/flowables/GroupedFlowable.java index c7eb4dc121..42a51aa03e 100644 --- a/src/main/java/io/reactivex/flowables/GroupedFlowable.java +++ b/src/main/java/io/reactivex/flowables/GroupedFlowable.java @@ -31,7 +31,7 @@ * @see ReactiveX documentation: GroupBy */ public abstract class GroupedFlowable extends Flowable { - + final K key; /** diff --git a/src/main/java/io/reactivex/observables/GroupedObservable.java b/src/main/java/io/reactivex/observables/GroupedObservable.java index a9673715dd..75518a8c6d 100644 --- a/src/main/java/io/reactivex/observables/GroupedObservable.java +++ b/src/main/java/io/reactivex/observables/GroupedObservable.java @@ -31,7 +31,7 @@ * @see ReactiveX documentation: GroupBy */ public abstract class GroupedObservable extends Observable { - + final K key; /** diff --git a/src/main/java/io/reactivex/observers/DisposableMaybeObserver.java b/src/main/java/io/reactivex/observers/DisposableMaybeObserver.java index 09686adc3d..9adcd89767 100644 --- a/src/main/java/io/reactivex/observers/DisposableMaybeObserver.java +++ b/src/main/java/io/reactivex/observers/DisposableMaybeObserver.java @@ -61,7 +61,7 @@ * @param the received value type */ public abstract class DisposableMaybeObserver implements MaybeObserver, Disposable { - + final AtomicReference s = new AtomicReference(); @Override diff --git a/src/main/java/io/reactivex/observers/DisposableObserver.java b/src/main/java/io/reactivex/observers/DisposableObserver.java index 7d22b8201b..79fd4db318 100644 --- a/src/main/java/io/reactivex/observers/DisposableObserver.java +++ b/src/main/java/io/reactivex/observers/DisposableObserver.java @@ -64,7 +64,7 @@ * @param the received value type */ public abstract class DisposableObserver implements Observer, Disposable { - + final AtomicReference s = new AtomicReference(); @Override diff --git a/src/main/java/io/reactivex/observers/DisposableSingleObserver.java b/src/main/java/io/reactivex/observers/DisposableSingleObserver.java index 0515f458ac..e086f466c2 100644 --- a/src/main/java/io/reactivex/observers/DisposableSingleObserver.java +++ b/src/main/java/io/reactivex/observers/DisposableSingleObserver.java @@ -53,7 +53,7 @@ * @param the received value type */ public abstract class DisposableSingleObserver implements SingleObserver, Disposable { - + final AtomicReference s = new AtomicReference(); @Override diff --git a/src/test/java/io/reactivex/internal/functions/ObjectHelperTest.java b/src/test/java/io/reactivex/internal/functions/ObjectHelperTest.java index 91912323d9..aefbfea49c 100644 --- a/src/test/java/io/reactivex/internal/functions/ObjectHelperTest.java +++ b/src/test/java/io/reactivex/internal/functions/ObjectHelperTest.java @@ -33,22 +33,22 @@ public void hashCodeOf() { } @Test - public void verifyPositiveInt() throws Exception{ + public void verifyPositiveInt() throws Exception { assertEquals(1, ObjectHelper.verifyPositive(1, "param")); } @Test - public void verifyPositiveLong() throws Exception{ + public void verifyPositiveLong() throws Exception { assertEquals(1L, ObjectHelper.verifyPositive(1L, "param")); } @Test(expected = IllegalArgumentException.class) - public void verifyPositiveIntFail() throws Exception{ + public void verifyPositiveIntFail() throws Exception { assertEquals(-1, ObjectHelper.verifyPositive(-1, "param")); } @Test(expected = IllegalArgumentException.class) - public void verifyPositiveLongFail() throws Exception{ + public void verifyPositiveLongFail() throws Exception { assertEquals(-1L, ObjectHelper.verifyPositive(-1L, "param")); } From 7bfa6c0ccf12de0234b5b56ea6fcbe702a5a7618 Mon Sep 17 00:00:00 2001 From: Mahach Imangazaliev Date: Mon, 3 Apr 2017 10:25:18 +0300 Subject: [PATCH 009/524] Fixed broken links (#5262) Fixed broken links for @Beta and @Experimental annotations --- README.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index 9b2cbf1571..4feb107412 100644 --- a/README.md +++ b/README.md @@ -276,5 +276,5 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. -[beta source link]: https://github.com/ReactiveX/RxJava/blob/master/src/main/java/rx/annotations/Beta.java -[experimental source link]: https://github.com/ReactiveX/RxJava/blob/master/src/main/java/rx/annotations/Experimental.java +[beta source link]: https://github.com/ReactiveX/RxJava/blob/2.x/src/main/java/io/reactivex/annotations/Beta.java +[experimental source link]: https://github.com/ReactiveX/RxJava/blob/2.x/src/main/java/io/reactivex/annotations/Experimental.java From 879d60f797aec6dfe17b5bc344451b5e53f9b09f Mon Sep 17 00:00:00 2001 From: Marvin Ramin Date: Thu, 6 Apr 2017 20:21:20 +0200 Subject: [PATCH 010/524] remove commented out code from IoScheduler (#5268) --- .../java/io/reactivex/internal/schedulers/IoScheduler.java | 6 ------ 1 file changed, 6 deletions(-) diff --git a/src/main/java/io/reactivex/internal/schedulers/IoScheduler.java b/src/main/java/io/reactivex/internal/schedulers/IoScheduler.java index cc22f0d4b1..f6913020a5 100644 --- a/src/main/java/io/reactivex/internal/schedulers/IoScheduler.java +++ b/src/main/java/io/reactivex/internal/schedulers/IoScheduler.java @@ -210,12 +210,6 @@ public void dispose() { tasks.dispose(); // releasing the pool should be the last action - // should prevent pool reuse in case there is a blocking - // action not responding to cancellation -// threadWorker.scheduleDirect(() -> { -// pool.release(threadWorker); -// }, 0, TimeUnit.MILLISECONDS); - pool.release(threadWorker); } } From 80d9b909e9df856b34f60438235e74f0daa49a96 Mon Sep 17 00:00:00 2001 From: David Karnok Date: Fri, 7 Apr 2017 21:25:58 +0200 Subject: [PATCH 011/524] 2.x: enable link to external JDK, fix Schedulers style (#5271) --- build.gradle | 1 + src/main/java/io/reactivex/schedulers/Schedulers.java | 8 ++++---- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/build.gradle b/build.gradle index 218352b443..acfc198784 100644 --- a/build.gradle +++ b/build.gradle @@ -47,6 +47,7 @@ javadoc { options.addStringOption('top').value = '' options.addStringOption('doctitle').value = '' options.addStringOption('header').value = '' + options.links("http://docs.oracle.com/javase/7/docs/api/") if (JavaVersion.current().isJava7()) { // "./gradle/stylesheet.css" only supports Java 7 options.addStringOption('stylesheetfile', rootProject.file('./gradle/stylesheet.css').toString()) diff --git a/src/main/java/io/reactivex/schedulers/Schedulers.java b/src/main/java/io/reactivex/schedulers/Schedulers.java index 902d4c5902..93674ac272 100644 --- a/src/main/java/io/reactivex/schedulers/Schedulers.java +++ b/src/main/java/io/reactivex/schedulers/Schedulers.java @@ -108,7 +108,7 @@ private Schedulers() { *

* You can control certain properties of this standard scheduler via system properties that have to be set * before the {@link Schedulers} class is referenced in your code. - *
Supported system properties ({@code System.getProperty()}): + *

Supported system properties ({@code System.getProperty()}): *