diff --git a/packages/monix-reactive/rootdoc.md b/packages/monix-reactive/rootdoc.md
index e483aef..2ad0c59 100644
--- a/packages/monix-reactive/rootdoc.md
+++ b/packages/monix-reactive/rootdoc.md
@@ -9,8 +9,8 @@ Create observables:
| | |
|-------------------|--------------------------------------------------------------------------------------- |
-| {@link Observable.empty} | Creates an observable that doesn't emit anything, but immediately calls onComplete instead. |
-| {@link Observable.pure} | Returns an Observable that on execution emits the given strict value. |
+| {@link empty} | Creates an observable that doesn't emit anything, but immediately calls onComplete instead. |
+| {@link pure} | Returns an Observable that on execution emits the given strict value. |
## Usage
@@ -52,7 +52,7 @@ Usage sample:
```typescript
import * as Mx from "monix"
-const stream = Mx.Observable.empty()
+const stream = Mx.empty()
```
### Modules: UMD and ES 2015
diff --git a/packages/monix-reactive/src/internal/builders/eval.js.flow b/packages/monix-reactive/src/builders.js.flow
similarity index 51%
rename from packages/monix-reactive/src/internal/builders/eval.js.flow
rename to packages/monix-reactive/src/builders.js.flow
index 3c1ec0a..56bea96 100644
--- a/packages/monix-reactive/src/internal/builders/eval.js.flow
+++ b/packages/monix-reactive/src/builders.js.flow
@@ -15,16 +15,21 @@
* limitations under the License.
*/
-import { ObservableInstance } from "../instance"
+import { Observable } from "./observable"
+import { Scheduler } from "funfix"
-declare export class EvalAlwaysObservable<+A> extends ObservableInstance {
- constructor(_fn: () => A): EvalAlwaysObservable;
+declare export function empty(): Observable;
- unsafeSubscribeFn(subscriber: Subscriber): Cancelable;
-}
+declare export function pure(value: A): Observable;
-declare export class EvalOnceObservable<+A> extends ObservableInstance {
- constructor(_fn: () => A): EvalOnceObservable;
+declare export function never(): Observable;
- unsafeSubscribeFn(subscriber: Subscriber): Cancelable;
-}
+declare export function evalAlways(fn: () => A): Observable;
+
+declare export function evalOnce(fn: () => A): Observable;
+
+declare export function fromArray(arr: Array, scheduler?: Scheduler): Observable;
+
+declare export function items(...items: Array): Observable;
+
+declare export function loop(scheduler?: Scheduler): Observable;
diff --git a/packages/monix-reactive/src/builders.ts b/packages/monix-reactive/src/builders.ts
new file mode 100644
index 0000000..3088278
--- /dev/null
+++ b/packages/monix-reactive/src/builders.ts
@@ -0,0 +1,86 @@
+/*!
+ * Copyright (c) 2018 by The Monix.js Project Developers.
+ * Some rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import { Observable } from "./observable"
+import { Scheduler } from "funfix"
+import { EmptyObservable } from "./internal/builders/empty"
+import { NeverObservable } from "./internal/builders/never"
+import { PureObservable } from "./internal/builders/pure"
+import { EvalAlwaysObservable, EvalOnceObservable } from "./internal/builders/eval"
+import { ArrayObservable } from "./internal/builders/array"
+import { LoopObservable } from "./internal/builders/loop"
+
+/**
+ * Create empty observable
+ */
+export function empty(): Observable {
+ return EmptyObservable
+}
+
+/**
+ * Create an observable which issues single given value and completes
+ */
+export function pure(value: A): Observable {
+ return new PureObservable(value)
+}
+
+/**
+ * Creates an observable that never issues any elements, completes or fails
+ */
+export function never(): Observable {
+ return NeverObservable
+}
+
+/**
+ * Creates an observable that issues single element from evaluating given expression (function)
+ * @param fn expression to evauate and retrieve element value
+ */
+export function evalAlways(fn: () => A): Observable {
+ return new EvalAlwaysObservable(fn)
+}
+
+/**
+ * Creates an observable that issues single element from evaluating given expression (function)
+ * After first evaluation it memoize result value (or error) and uses it for other subscribers
+ * @param fn expression to evaluate and retrieve element value
+ */
+export function evalOnce(fn: () => A): Observable {
+ return new EvalOnceObservable(fn)
+}
+
+/**
+ * Creates an observable that issues all elements of given array with backpressure
+ * @param arr array containing elements
+ * @param scheduler optional scheduler
+ */
+export function fromArray(arr: Array, scheduler?: Scheduler): Observable {
+ return new ArrayObservable(arr, scheduler || Scheduler.global.get())
+}
+
+/**
+ * Creates an observable that issues all arguments
+ */
+export function items(...items: Array): Observable {
+ return new ArrayObservable(items, Scheduler.global.get())
+}
+
+/**
+ * Creates an observable that loops indefinitely until stopped, issues integers starting with 0 (zero)
+ */
+export function loop(scheduler?: Scheduler): Observable {
+ return new LoopObservable(scheduler || Scheduler.global.get())
+}
diff --git a/packages/monix-reactive/src/index.ts b/packages/monix-reactive/src/index.ts
index a2fa1ba..5eff763 100644
--- a/packages/monix-reactive/src/index.ts
+++ b/packages/monix-reactive/src/index.ts
@@ -15,7 +15,8 @@
* limitations under the License.
*/
-export * from "./observable"
-export * from "./operators"
export * from "./ack"
export * from "./observer"
+export * from "./observable"
+export * from "./builders"
+export * from "./operators"
diff --git a/packages/monix-reactive/src/internal/ack-utils.ts b/packages/monix-reactive/src/internal/ack-utils.ts
index cce08fe..5497e97 100644
--- a/packages/monix-reactive/src/internal/ack-utils.ts
+++ b/packages/monix-reactive/src/internal/ack-utils.ts
@@ -21,6 +21,7 @@ import { Ack, SyncAck, AsyncAck, Stop, Continue } from "../ack"
/**
* Executes callback synchronously for given Ack
* @private
+ * @hidden
*/
export function syncOn(ack: Ack, callback: (t: Try) => void): Ack {
if (ack === Continue || ack === Stop) {
@@ -35,6 +36,7 @@ export function syncOn(ack: Ack, callback: (t: Try) => void): Ack {
/**
* Executes callback only for sync or async Continue ack
* @private
+ * @hidden
*/
export function syncOnContinue(ack: Ack, callback: () => void): Ack {
if (ack === Continue) {
@@ -53,6 +55,7 @@ export function syncOnContinue(ack: Ack, callback: () => void): Ack {
/**
* Executes callback only for Stop or failed async Ack
* @private
+ * @hidden
*/
export function syncOnStopOrFailure(ack: Ack, callback: () => void): Ack {
if (ack === Stop) {
@@ -71,6 +74,7 @@ export function syncOnStopOrFailure(ack: Ack, callback: () => void): Ack {
/**
* Tries to flatten ack
* @private
+ * @hidden
*/
export function syncTryFlatten(ack: Ack, scheduler: Scheduler): Ack {
if (ack === Continue || ack === Stop) {
diff --git a/packages/monix-reactive/src/internal/builders/array.ts b/packages/monix-reactive/src/internal/builders/array.ts
index e0bcd33..93c6619 100644
--- a/packages/monix-reactive/src/internal/builders/array.ts
+++ b/packages/monix-reactive/src/internal/builders/array.ts
@@ -15,15 +15,17 @@
* limitations under the License.
*/
-import { ObservableInstance } from "../instance"
+import { Observable } from "../../observable"
import { Continue, Stop, AsyncAck } from "../../ack"
import { Subscriber } from "../../observer"
import { Cancelable, Scheduler, BoolCancelable, IBoolCancelable } from "funfix"
/**
* An Observale that issues all elements of an array, with backpressure
+ * @private
+ * @hidden
*/
-export class ArrayObservable extends ObservableInstance {
+export class ArrayObservable extends Observable {
constructor(private readonly _arr: Array,
private readonly _scheduler: Scheduler) {
diff --git a/packages/monix-reactive/src/internal/builders/empty.js.flow b/packages/monix-reactive/src/internal/builders/empty.js.flow
deleted file mode 100644
index db33505..0000000
--- a/packages/monix-reactive/src/internal/builders/empty.js.flow
+++ /dev/null
@@ -1,23 +0,0 @@
-/*!
- * Copyright (c) 2018 by The Monix.js Project Developers.
- * Some rights reserved.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-import { ObservableInstance } from "../instance"
-
-/**
- * Empty observable object: bottom type, type parameter not used (phantom type)
- */
-declare export var EmptyObservable: ObservableInstance
diff --git a/packages/monix-reactive/src/internal/builders/empty.ts b/packages/monix-reactive/src/internal/builders/empty.ts
index 3e6d2b0..e652e7a 100644
--- a/packages/monix-reactive/src/internal/builders/empty.ts
+++ b/packages/monix-reactive/src/internal/builders/empty.ts
@@ -15,14 +15,16 @@
* limitations under the License.
*/
-import { ObservableInstance } from "../instance"
+import { Observable } from "../../observable"
import { Subscriber } from "../../observer"
import { Cancelable } from "funfix"
/**
* Completes immediately on subscribe without issueing any items
+ * @private
+ * @hidden
*/
-class EmptyObservableImpl extends ObservableInstance {
+class EmptyObservableImpl extends Observable {
unsafeSubscribeFn(subscriber: Subscriber): Cancelable {
subscriber.onComplete()
@@ -34,5 +36,7 @@ class EmptyObservableImpl extends ObservableInstance {
* {@link EmptyObservable} completes immediately on subscribe without issueing any values.
* EmptyObservable object uses [Bottom Type](https://en.wikipedia.org/wiki/Bottom_type)
* for elements to match all other types
+ * @private
+ * @hidden
*/
-export const EmptyObservable: ObservableInstance = new EmptyObservableImpl()
+export const EmptyObservable: Observable = new EmptyObservableImpl()
diff --git a/packages/monix-reactive/src/internal/builders/eval.ts b/packages/monix-reactive/src/internal/builders/eval.ts
index 5b3c5e1..7aeaa9b 100644
--- a/packages/monix-reactive/src/internal/builders/eval.ts
+++ b/packages/monix-reactive/src/internal/builders/eval.ts
@@ -15,14 +15,16 @@
* limitations under the License.
*/
-import { ObservableInstance } from "../instance"
+import { Observable } from "../../observable"
import { Subscriber } from "../../observer"
import { Cancelable, Throwable } from "funfix"
/**
* An observable that evaluates the given function argument and emits its result.
+ * @private
+ * @hidden
*/
-export class EvalAlwaysObservable extends ObservableInstance {
+export class EvalAlwaysObservable extends Observable {
constructor(private readonly _fn: () => A) {
super()
}
@@ -48,8 +50,10 @@ export class EvalAlwaysObservable extends ObservableInstance {
/**
* An observable that evaluates once the given function argument and emits its result.
+ * @private
+ * @hidden
*/
-export class EvalOnceObservable extends ObservableInstance {
+export class EvalOnceObservable extends Observable {
private _result!: A
private _errorThrown: Throwable | null = null
private _hasResult: boolean = false
diff --git a/packages/monix-reactive/src/internal/builders/loop.ts b/packages/monix-reactive/src/internal/builders/loop.ts
index 4c47567..a17f1b0 100644
--- a/packages/monix-reactive/src/internal/builders/loop.ts
+++ b/packages/monix-reactive/src/internal/builders/loop.ts
@@ -15,15 +15,17 @@
* limitations under the License.
*/
-import { ObservableInstance } from "../instance"
+import { Observable } from "../../observable"
import { Continue, Stop, AsyncAck } from "../../ack"
import { Subscriber } from "../../observer"
import { Cancelable, Scheduler, BoolCancelable, IBoolCancelable } from "funfix"
/**
* Loops indefinitely until stopped, issues integers starting with 0 (zero)
+ * @private
+ * @hidden
*/
-export class LoopObservable extends ObservableInstance {
+export class LoopObservable extends Observable {
constructor(private readonly _scheduler: Scheduler) {
super()
}
diff --git a/packages/monix-reactive/src/internal/builders/never.js.flow b/packages/monix-reactive/src/internal/builders/never.js.flow
deleted file mode 100644
index 0db2c98..0000000
--- a/packages/monix-reactive/src/internal/builders/never.js.flow
+++ /dev/null
@@ -1,23 +0,0 @@
-/*!
- * Copyright (c) 2018 by The Monix.js Project Developers.
- * Some rights reserved.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-import { ObservableInstance } from "../instance"
-
-/**
- * Never observable object: bottom type, type parameter not used (phantom type)
- */
-declare export var NeverObservable: ObservableInstance
diff --git a/packages/monix-reactive/src/internal/builders/never.ts b/packages/monix-reactive/src/internal/builders/never.ts
index 2fb6005..15bd153 100644
--- a/packages/monix-reactive/src/internal/builders/never.ts
+++ b/packages/monix-reactive/src/internal/builders/never.ts
@@ -15,14 +15,16 @@
* limitations under the License.
*/
-import { ObservableInstance } from "../instance"
+import { Observable } from "../../observable"
import { Subscriber } from "../../observer"
import { Cancelable } from "funfix"
/**
* Never issues elements, complets or fails
+ * @private
+ * @hidden
*/
-class NeverObservableImpl extends ObservableInstance {
+class NeverObservableImpl extends Observable {
unsafeSubscribeFn(subscriber: Subscriber): Cancelable {
return Cancelable.empty()
}
@@ -32,5 +34,7 @@ class NeverObservableImpl extends ObservableInstance {
* {@link NeverObservable} never issues any elements, complets of rails
* NeverObservable object uses [Bottom Type](https://en.wikipedia.org/wiki/Bottom_type)
* for elements to match all other types
+ * @private
+ * @hidden
*/
-export const NeverObservable: ObservableInstance = new NeverObservableImpl()
+export const NeverObservable: Observable = new NeverObservableImpl()
diff --git a/packages/monix-reactive/src/internal/builders/pure.js.flow b/packages/monix-reactive/src/internal/builders/pure.js.flow
deleted file mode 100644
index 2026a4e..0000000
--- a/packages/monix-reactive/src/internal/builders/pure.js.flow
+++ /dev/null
@@ -1,24 +0,0 @@
-/*!
- * Copyright (c) 2018 by The Monix.js Project Developers.
- * Some rights reserved.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-import { ObservableInstance } from "../instance"
-
-declare export class PureObservable<+A> extends ObservableInstance {
- constructor(_value: A): PureObservable;
-
- unsafeSubscribeFn(subscriber: Subscriber): Cancelable;
-}
diff --git a/packages/monix-reactive/src/internal/builders/pure.ts b/packages/monix-reactive/src/internal/builders/pure.ts
index e7e0f2b..5738ea9 100644
--- a/packages/monix-reactive/src/internal/builders/pure.ts
+++ b/packages/monix-reactive/src/internal/builders/pure.ts
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-import { ObservableInstance } from "../instance"
+import { Observable } from "../../observable"
import { Subscriber } from "../../observer"
import { Cancelable } from "funfix"
@@ -23,8 +23,10 @@ import { Cancelable } from "funfix"
* An {@link Observable} that issues single element
*
* Source: [Monix NowObservable](https://github.com/monix/monix/blob/master/monix-reactive/shared/src/main/scala/monix/reactive/internal/builders/NowObservable.scala)
+ * @private
+ * @hidden
*/
-export class PureObservable extends ObservableInstance {
+export class PureObservable extends Observable {
constructor(private readonly _value: A) {
super()
diff --git a/packages/monix-reactive/src/internal/instance.ts b/packages/monix-reactive/src/internal/instance.ts
deleted file mode 100644
index 43ce904..0000000
--- a/packages/monix-reactive/src/internal/instance.ts
+++ /dev/null
@@ -1,43 +0,0 @@
-/*!
- * Copyright (c) 2018 by The Monix.js Project Developers.
- * Some rights reserved.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-import { Ack } from "../ack"
-import { Subscriber, Operator } from "../observer"
-import { Cancelable, Scheduler, Throwable } from "funfix"
-import { SafeSubscriber } from "./subscribers/safe"
-import { SubscriberWrap } from "./subscribers/wrap"
-
-/**
- * {@link ObservableInstance} exposes observable instance operations, used internally by implementations
- *
- * It's exists mostly to workarround circular reference between {@link Observable}
- * and {@link Observer} implementations. Methods implemented in {@link ObservableMixin} which
- * is applied _lazily_ in package root modules
- */
-export abstract class ObservableInstance {
- abstract unsafeSubscribeFn(subscriber: Subscriber): Cancelable
-
- subscribeWith(out: Subscriber): Cancelable {
- return this.unsafeSubscribeFn(new SafeSubscriber(out))
- }
-
- subscribe(nextFn?: (elem: A) => Ack, errorFn?: (e: Throwable) => void, completeFn?: () => void, scheduler?: Scheduler): Cancelable {
- return this.subscribeWith(new SubscriberWrap(nextFn, errorFn, completeFn, scheduler))
- }
-
- pipe!: (operator: Operator) => ObservableInstance
-}
diff --git a/packages/monix-reactive/src/internal/mixin.ts b/packages/monix-reactive/src/internal/mixin.ts
deleted file mode 100644
index a62adbf..0000000
--- a/packages/monix-reactive/src/internal/mixin.ts
+++ /dev/null
@@ -1,38 +0,0 @@
-/*!
- * Copyright (c) 2018 by The Monix.js Project Developers.
- * Some rights reserved.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-import { Ack } from "../ack"
-import { Subscriber, Operator } from "../observer"
-import { Cancelable, Scheduler, Throwable } from "funfix"
-import { ObservableInstance } from "./instance"
-
-/**
- * Observable operators mixin
- *
- * Is applied to ObservableInstance in order to avoid circular references be
- */
-export abstract class OperatorsMixin {
- abstract unsafeSubscribeFn(subscriber: Subscriber): Cancelable
-
- abstract subscribeWith(out: Subscriber): Cancelable
-
- abstract subscribe(nextFn?: (elem: A) => Ack, errorFn?: (e: Throwable) => void, completeFn?: () => void, scheduler?: Scheduler): Cancelable
-
- pipe(operator: Operator): ObservableInstance {
- throw new Error("not implemented")
- }
-}
diff --git a/packages/monix-reactive/src/internal/subscribers/safe.js.flow b/packages/monix-reactive/src/internal/subscribers/safe.js.flow
deleted file mode 100644
index 4bd14b5..0000000
--- a/packages/monix-reactive/src/internal/subscribers/safe.js.flow
+++ /dev/null
@@ -1,28 +0,0 @@
-/*!
- * Copyright (c) 2018 by The Monix.js Project Developers.
- * Some rights reserved.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-import { Subscriber } from "../../observer"
-
-declare export class SafeSubscriber<+A> extends Subscriber {
- scheduler: Scheduler;
-
- constructor(subscriber: Subscriber, scheduler?: Scheduler): SafeSubscriber;
-
- onNext(elem: A): Ack;
- onComplete(): void;
- onError(e: Throwable): void;
-}
diff --git a/packages/monix-reactive/src/internal/subscribers/safe.ts b/packages/monix-reactive/src/internal/subscribers/safe.ts
index 462ba8f..2b9ef8d 100644
--- a/packages/monix-reactive/src/internal/subscribers/safe.ts
+++ b/packages/monix-reactive/src/internal/subscribers/safe.ts
@@ -30,6 +30,8 @@ import * as AckUtils from "../ack-utils"
* `onNext` events, ensuring that the grammar is respected
* - if downstream signals a `Stop`, the observer no longer accepts any events,
* ensuring that the grammar is respected
+ * @private
+ * @hidden
*/
export class SafeSubscriber implements Subscriber {
private _isDone: boolean = false
diff --git a/packages/monix-reactive/src/internal/subscribers/wrap.js.flow b/packages/monix-reactive/src/internal/subscribers/wrap.js.flow
deleted file mode 100644
index 9a65f2b..0000000
--- a/packages/monix-reactive/src/internal/subscribers/wrap.js.flow
+++ /dev/null
@@ -1,31 +0,0 @@
-/*!
- * Copyright (c) 2018 by The Monix.js Project Developers.
- * Some rights reserved.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-import { Subscriber } from "../../observer"
-
-declare export class WrapSubscriber<+A> extends Subscriber {
- scheduler: Scheduler;
-
- constructor(nextFn?: (elem: A) => Ack,
- errorFn?: (e: Throwable) => void,
- completeFn?: () => void,
- scheduler?: Scheduler): WrapSubscriber;
-
- onNext(elem: A): Ack;
- onComplete(): void;
- onError(e: Throwable): void;
-}
diff --git a/packages/monix-reactive/src/internal/subscribers/wrap.ts b/packages/monix-reactive/src/internal/subscribers/wrap.ts
index 3cf3982..decb01b 100644
--- a/packages/monix-reactive/src/internal/subscribers/wrap.ts
+++ b/packages/monix-reactive/src/internal/subscribers/wrap.ts
@@ -19,6 +19,11 @@ import { Ack, Continue } from "../../ack"
import { Subscriber } from "../../observer"
import { Scheduler, Throwable } from "funfix"
+/**
+ * Wraps functions to Subscriber interface
+ * @private
+ * @hidden
+ */
export class SubscriberWrap implements Subscriber {
readonly scheduler: Scheduler
diff --git a/packages/monix-reactive/src/observable.ts b/packages/monix-reactive/src/observable.ts
index d2318bf..e5740b1 100644
--- a/packages/monix-reactive/src/observable.ts
+++ b/packages/monix-reactive/src/observable.ts
@@ -15,83 +15,64 @@
* limitations under the License.
*/
-import { applyMixins, Scheduler } from "funfix"
-import { OperatorsMixin } from "./internal/mixin"
-import { ObservableInstance } from "./internal/instance"
-import { EmptyObservable } from "./internal/builders/empty"
-import { NeverObservable } from "./internal/builders/never"
-import { PureObservable } from "./internal/builders/pure"
-import { EvalAlwaysObservable, EvalOnceObservable } from "./internal/builders/eval"
-import { ArrayObservable } from "./internal/builders/array"
-import { LoopObservable } from "./internal/builders/loop"
+import { applyMixins, Scheduler, Cancelable, Throwable } from "funfix"
+import { Ack } from "./ack"
+import { Subscriber, Operator } from "./observer"
+import { SafeSubscriber } from "./internal/subscribers/safe"
+import { SubscriberWrap } from "./internal/subscribers/wrap"
/**
- * apply mixins
+ * Fluent interface to transform and subscribe to streams
*/
-applyMixins(ObservableInstance, [OperatorsMixin])
-
-/**
- * Observable object contains builder methods that help you create new {@link Observable} instances
- */
-export abstract class Observable extends ObservableInstance {
- /**
- * Create empty observable
- */
- static empty(): Observable {
- return EmptyObservable
- }
-
+export abstract class Observable {
/**
- * Create an observable which issues single given value and completes
+ * Subscribe to stream events (unsafe)
+ * @param out stream subscriber
+ * @hidden
*/
- static pure(value: A): Observable {
- return new PureObservable(value)
- }
+ abstract unsafeSubscribeFn(out: Subscriber): Cancelable
/**
- * Creates an observable that never issues any elements, completes or fails
+ * Subscribe to stream events
+ * @param out stream subscriber
*/
- static never(): Observable {
- return NeverObservable
+ subscribeWith(out: Subscriber): Cancelable {
+ return this.unsafeSubscribeFn(new SafeSubscriber(out))
}
/**
- * Creates an observable that issues single element from evaluating given expression (function)
- * @param fn expression to evauate and retrieve element value
+ * Subscribe to stream events
+ *
+ * @param nextFn callback for `onNext` event
+ * @param errorFn callback for `onError` event
+ * @param completeFn callback for `onComplete` event
+ * @param scheduler custom scheduler (optional)
*/
- static eval(fn: () => A): Observable {
- return new EvalAlwaysObservable(fn)
+ subscribe(nextFn?: (elem: A) => Ack, errorFn?: (e: Throwable) => void, completeFn?: () => void, scheduler?: Scheduler): Cancelable {
+ return this.subscribeWith(new SubscriberWrap(nextFn, errorFn, completeFn, scheduler))
}
/**
- * Creates an observable that issues single element from evaluating given expression (function)
- * After first evaluation it memoize result value (or error) and uses it for other subscribers
- * @param fn expression to evaluate and retrieve element value
+ * Apply a stream transformation defined by given operator
+ * @param operator stream transformation builder
*/
- static evalOnce(fn: () => A): Observable {
- return new EvalOnceObservable(fn)
- }
-
- /**
- * Creates an observable that issues all elements of given array with backpressure
- * @param arr array containing elements
- * @param scheduler optional scheduler
- */
- static fromArray(arr: Array, scheduler?: Scheduler): Observable {
- return new ArrayObservable(arr, scheduler || Scheduler.global.get())
+ pipe(operator: Operator): Observable {
+ return new LiftByOperatorObservable(this, operator)
}
+}
- /**
- * Creates an observable that issues all arguments
- */
- static items(...items: Array): Observable {
- return new ArrayObservable(items, Scheduler.global.get())
+/**
+ * @private
+ * @hidden
+ */
+class LiftByOperatorObservable extends Observable {
+ constructor(private readonly _self: Observable,
+ private readonly _operator: Operator) {
+ super()
}
- /**
- * Creates an observable that loops indefinitely until stopped, issues integers starting with 0 (zero)
- */
- static loop(scheduler?: Scheduler): Observable {
- return new LoopObservable(scheduler || Scheduler.global.get())
+ unsafeSubscribeFn(subscriber: Subscriber): Cancelable {
+ const sb = this._operator(subscriber)
+ return this._self.unsafeSubscribeFn(sb)
}
}
diff --git a/packages/monix-reactive/src/observer.ts b/packages/monix-reactive/src/observer.ts
index afb009c..7908adc 100644
--- a/packages/monix-reactive/src/observer.ts
+++ b/packages/monix-reactive/src/observer.ts
@@ -23,18 +23,29 @@ import { Ack, SyncAck } from "./ack"
* get subscribed to an Observable for receiving events.
*
* The events received must follow the Rx grammar, which is:
- * onNext * (onComplete | onError)?
+ * `onNext * (onComplete | onError)?`
*
* That means an Observer can receive zero or multiple events, the stream
- * ending either in one or zero `onComplete` or `onError` (just one, not both),
- * and after onComplete or onError, a well behaved `Observable`
- * implementation shouldn't send any more onNext events.
+ * ending either in one or zero {@link Observer.onComplete} or {@link Observer.onError} (just one, not both),
+ * and after `onComplete` or `onError`, a well behaved `Observable`
+ * implementation shouldn't send any more {@link Observer.onNext} events.
*/
export interface Observer {
+ /**
+ * Observe new element
+ * @param elem stream element
+ */
onNext(elem: T): Ack
+ /**
+ * Signals stream completion
+ */
onComplete(): void
+ /**
+ * Signals stream error
+ * @param e error object
+ */
onError(e: Throwable): void
}
@@ -42,6 +53,7 @@ export interface Observer {
* Observer subtype which accepts only SyncAck from onNext element
*
* Events gramar is same as for Observable
+ * @hidden
*/
export interface SyncObserver extends Observer {
onNext(elem: T): SyncAck
@@ -58,12 +70,13 @@ export interface Subscriber extends Observer {
/**
* `SyncSubscriber` si an `SyncObserver` with an attached `Scheduler`
+ * @hidden
*/
export interface SyncSubscriber extends SyncObserver {
readonly scheduler: Scheduler
}
/**
- * `Operator` type alias defines a transformation from one Subscriber to another
+ * `Operator` type alias defines a transformation from one {@link Subscriber} to another
*/
export type Operator = (s: Subscriber) => Subscriber
diff --git a/packages/monix-reactive/test/flow/internal/builders/array.test.js.flow b/packages/monix-reactive/test/flow/internal/builders/array.test.js.flow
deleted file mode 100644
index e43df88..0000000
--- a/packages/monix-reactive/test/flow/internal/builders/array.test.js.flow
+++ /dev/null
@@ -1,24 +0,0 @@
-/*!
- * Copyright (c) 2018 by The Monix.js Project Developers.
- * Some rights reserved.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-import { Observable } from "../../../../src"
-
-const o1: Observable = Observable.fromArray([1,2,3])
-const o2: Observable = Observable.fromArray(["one", "two", "three"])
-
-const o3: Observable = Observable.items(1,2,3)
-const o4: Observable = Observable.items("one", "two", "three")
diff --git a/packages/monix-reactive/test/flow/internal/builders/empty.test.js.flow b/packages/monix-reactive/test/flow/internal/builders/empty.test.js.flow
deleted file mode 100644
index 9731777..0000000
--- a/packages/monix-reactive/test/flow/internal/builders/empty.test.js.flow
+++ /dev/null
@@ -1,23 +0,0 @@
-/*!
- * Copyright (c) 2018 by The Monix.js Project Developers.
- * Some rights reserved.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-import { Observable } from "../../../../src"
-
-const o: Observable = Observable.empty()
-const o: Observable = Observable.empty()
-const o: Observable = Observable.empty()
-const o: Observable<10> = Observable.empty()
diff --git a/packages/monix-reactive/test/flow/internal/builders/eval.test.js.flow b/packages/monix-reactive/test/flow/internal/builders/eval.test.js.flow
deleted file mode 100644
index 7b10e73..0000000
--- a/packages/monix-reactive/test/flow/internal/builders/eval.test.js.flow
+++ /dev/null
@@ -1,26 +0,0 @@
-/*!
- * Copyright (c) 2018 by The Monix.js Project Developers.
- * Some rights reserved.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-import { Observable } from "../../../../src"
-
-const o1: Observable = Observable.eval(() => 101);
-const o2: Observable = Observable.eval(() => "hello");
-const o3: Observable<10> = Observable.eval(() => 10);
-
-const o4: Observable = Observable.evalOnce(() => 101);
-const o5: Observable = Observable.evalOnce(() => "hello");
-const o6: Observable<10> = Observable.evalOnce(() => 10);
diff --git a/packages/monix-reactive/test/flow/internal/builders/never.test.js.flow b/packages/monix-reactive/test/flow/internal/builders/never.test.js.flow
deleted file mode 100644
index abc3dc2..0000000
--- a/packages/monix-reactive/test/flow/internal/builders/never.test.js.flow
+++ /dev/null
@@ -1,23 +0,0 @@
-/*!
- * Copyright (c) 2018 by The Monix.js Project Developers.
- * Some rights reserved.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-import { Observable } from "../../../../src"
-
-const o: Observable = Observable.never()
-const o: Observable = Observable.never()
-const o: Observable = Observable.never()
-const o: Observable<10> = Observable.never()
diff --git a/packages/monix-reactive/test/flow/internal/builders/pure.test.js.flow b/packages/monix-reactive/test/flow/internal/builders/pure.test.js.flow
deleted file mode 100644
index 7b27099..0000000
--- a/packages/monix-reactive/test/flow/internal/builders/pure.test.js.flow
+++ /dev/null
@@ -1,21 +0,0 @@
-/*!
- * Copyright (c) 2018 by The Monix.js Project Developers.
- * Some rights reserved.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-import { Observable, Observable } from "../../../../src"
-
-const o: Observable = Observable.pure(10)
-const o: Observable = Observable.pure("hello")
diff --git a/packages/monix-reactive/test/flow/observable.js.flow b/packages/monix-reactive/test/flow/observable.js.flow
deleted file mode 100644
index 714a067..0000000
--- a/packages/monix-reactive/test/flow/observable.js.flow
+++ /dev/null
@@ -1,34 +0,0 @@
-/*!
- * Copyright (c) 2018 by The Monix.js Project Developers.
- * Some rights reserved.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-import { id } from "funfix"
-import { Observable } from "../../src"
-import * as assert from "./asserts"
-import { EmptyObservable } from "../../src/internal/builders/empty"
-
-const o1: Observable = Observable.empty();
-const o2: Observable = Observable.empty();
-
-const o3: Observable = Observable.pure("Hello");
-const o4: Observable = Observable.never();
-const o4: Observable = Observable.never();
-const o4: Observable = Observable.never();
-const o5: Observable = Observable.eval(() => "hello");
-const o6: Observable = Observable.evalOnce(() => "ehlo");
-const o7: Observable = Observable.fromArray([1, 2, 3]);
-const o8: Observable = Observable.items(1, 2, 3);
-const o9: Observable = Observable.loop();
diff --git a/packages/monix-reactive/src/internal/builders/array.js.flow b/packages/monix-reactive/test/flow/observable.test.js.flow
similarity index 52%
rename from packages/monix-reactive/src/internal/builders/array.js.flow
rename to packages/monix-reactive/test/flow/observable.test.js.flow
index b3713c6..b732e0f 100644
--- a/packages/monix-reactive/src/internal/builders/array.js.flow
+++ b/packages/monix-reactive/test/flow/observable.test.js.flow
@@ -15,11 +15,17 @@
* limitations under the License.
*/
-import { ObservableInstance } from "../instance"
+import { empty, never, evalAlways, evalOnce, fromArray, items, loop } from "../../src"
-declare export class ArrayObservable<+A> extends ObservableInstance {
- constructor(arr: Array,
- scheduler: Scheduler): ArrayObservable;
+const o1: Observable = empty();
+const o2: Observable = empty();
- unsafeSubscribeFn(subscriber: Subscriber): Cancelable;
-}
+const o3: Observable = pure("Hello");
+const o4: Observable = never();
+const o4: Observable = never();
+const o4: Observable = never();
+const o5: Observable = evalAlways(() => "hello");
+const o6: Observable = evalOnce(() => "ehlo");
+const o7: Observable = fromArray([1, 2, 3]);
+const o8: Observable = items(1, 2, 3);
+const o9: Observable = loop();
diff --git a/packages/monix-reactive/test/ts/internal/builders/array.test.ts b/packages/monix-reactive/test/ts/internal/builders/array.test.ts
index 4710b21..97fa9ef 100644
--- a/packages/monix-reactive/test/ts/internal/builders/array.test.ts
+++ b/packages/monix-reactive/test/ts/internal/builders/array.test.ts
@@ -16,13 +16,13 @@
*/
import * as assert from "../../asserts"
-import { Observable, Ack, Continue, Stop } from "../../../../src"
+import { fromArray, items, Ack, Continue, Stop } from "../../../../src"
import { TestScheduler, Throwable, Future } from "funfix"
describe("ArrayObservable", () => {
it("should complete immediately for empty array", () => {
let wasCompleted = false
- Observable.fromArray([]).unsafeSubscribeFn({
+ fromArray([]).unsafeSubscribeFn({
scheduler: new TestScheduler(),
onNext: (elem: any): Ack => {
throw new Error("Illegal state")
@@ -40,7 +40,7 @@ describe("ArrayObservable", () => {
it("can be also created using Observable.items() function", () => {
let wasCompleted = false
- Observable.items().unsafeSubscribeFn({
+ items().unsafeSubscribeFn({
scheduler: new TestScheduler(),
onNext: (elem: any): Ack => {
throw new Error("Illegal state")
@@ -59,7 +59,7 @@ describe("ArrayObservable", () => {
it("should issue all input array values and complete", () => {
let wasCompleted = false
let issued = []
- Observable.fromArray([1, 2, 3]).unsafeSubscribeFn({
+ fromArray([1, 2, 3]).unsafeSubscribeFn({
scheduler: new TestScheduler(),
onNext: (elem: number): Ack => {
issued.push(elem)
@@ -80,7 +80,7 @@ describe("ArrayObservable", () => {
it("should not complete if got Stop ack", () => {
let wasCompleted = false
let issued = []
- Observable.fromArray([1]).unsafeSubscribeFn({
+ fromArray([1]).unsafeSubscribeFn({
scheduler: new TestScheduler(),
onNext: (elem: number): Ack => {
issued.push(elem)
@@ -101,7 +101,7 @@ describe("ArrayObservable", () => {
it("should issue elements until got Stop ack", () => {
let wasCompleted = false
let issued = []
- Observable.fromArray([1, 2, 3]).unsafeSubscribeFn({
+ fromArray([1, 2, 3]).unsafeSubscribeFn({
scheduler: new TestScheduler(),
onNext: (elem: number): Ack => {
issued.push(elem)
@@ -124,7 +124,7 @@ describe("ArrayObservable", () => {
let wasCompleted = false
let gotError = false
- Observable.fromArray([1, 2, 3]).unsafeSubscribeFn({
+ fromArray([1, 2, 3]).unsafeSubscribeFn({
scheduler: new TestScheduler(),
onNext: (elem: number): Ack => {
return Future.raise(new Error("something went wrong"), scheduler)
diff --git a/packages/monix-reactive/test/ts/internal/builders/empty.test.ts b/packages/monix-reactive/test/ts/internal/builders/empty.test.ts
index 1b5cd79..ecd64a9 100644
--- a/packages/monix-reactive/test/ts/internal/builders/empty.test.ts
+++ b/packages/monix-reactive/test/ts/internal/builders/empty.test.ts
@@ -16,13 +16,13 @@
*/
import * as assert from "../../asserts"
-import { Observable, Ack } from "../../../../src"
+import { empty, Ack } from "../../../../src"
import { TestScheduler, Throwable } from "funfix"
describe("EmptyObservable", () => {
it("should complete immediately", () => {
let wasCompleted = false
- Observable.empty().unsafeSubscribeFn({
+ empty().unsafeSubscribeFn({
scheduler: new TestScheduler(),
onNext: (elem: any): Ack => {
throw new Error("Illegal state")
diff --git a/packages/monix-reactive/test/ts/internal/builders/eval.test.ts b/packages/monix-reactive/test/ts/internal/builders/eval.test.ts
index 3e0e16b..7a2e447 100644
--- a/packages/monix-reactive/test/ts/internal/builders/eval.test.ts
+++ b/packages/monix-reactive/test/ts/internal/builders/eval.test.ts
@@ -16,14 +16,14 @@
*/
import * as assert from "../../asserts"
-import { Observable, Ack, Continue } from "../../../../src"
+import { evalAlways, evalOnce, Ack, Continue } from "../../../../src"
import { TestScheduler, Throwable } from "funfix"
import { SubscriberWrap } from "../../../../src/internal/subscribers/wrap"
describe("EvalAlwaysObservable", () => {
it("should not eval it's value source if not subscribed", () => {
let executed = false
- Observable.eval(() => {
+ evalAlways(() => {
executed = true
return 0
})
@@ -35,7 +35,7 @@ describe("EvalAlwaysObservable", () => {
let executedCnt = 0
let issuedCnt = 0
let completedCnt = 0
- const o = Observable.eval(() => {
+ const o = evalAlways(() => {
executedCnt += 1
return executedCnt
})
@@ -56,7 +56,7 @@ describe("EvalAlwaysObservable", () => {
let issuedCnt = 0
let completedCnt = 0
let failedCnt = 0
- const o = Observable.eval(() => {
+ const o = evalAlways(() => {
throw new Error("something went wrong")
})
@@ -70,7 +70,7 @@ describe("EvalAlwaysObservable", () => {
describe("EvalOnceObservable", () => {
it("should not eval it's value source if not subscribed", () => {
let executed = false
- Observable.evalOnce(() => {
+ evalOnce(() => {
executed = true
return 0
})
@@ -82,7 +82,7 @@ describe("EvalOnceObservable", () => {
let executedCnt = 0
let issuedCnt = 0
let completedCnt = 0
- const o = Observable.evalOnce(() => {
+ const o = evalOnce(() => {
executedCnt += 1
return executedCnt
})
@@ -104,7 +104,7 @@ describe("EvalOnceObservable", () => {
let issuedCnt = 0
let completedCnt = 0
let failedCnt = 0
- const o = Observable.evalOnce(() => {
+ const o = evalOnce(() => {
throw new Error("something went wrong")
})
o.subscribe(_ => { issuedCnt += 1; return Continue }, e => { failedCnt += 1 }, () => { completedCnt += 1 })
@@ -116,7 +116,7 @@ describe("EvalOnceObservable", () => {
it("reports downstream failures", () => {
const scheduler = new TestScheduler()
- const o = Observable.evalOnce(() => "hello")
+ const o = evalOnce(() => "hello")
assert.equal(scheduler.triggeredFailures().length, 0)
o.unsafeSubscribeFn(new SubscriberWrap(
@@ -139,7 +139,7 @@ describe("EvalOnceObservable", () => {
// one more failure reported (total: 2)
assert.equal(scheduler.triggeredFailures().length, 2)
- Observable.evalOnce(() => { throw new Error("something went wrong") }).unsafeSubscribeFn(new SubscriberWrap(
+ evalOnce(() => { throw new Error("something went wrong") }).unsafeSubscribeFn(new SubscriberWrap(
_ => Continue,
e => { throw new Error("Faield to process onComplete") },
() => { },
diff --git a/packages/monix-reactive/test/ts/internal/builders/loop.test.ts b/packages/monix-reactive/test/ts/internal/builders/loop.test.ts
index a9d4ba4..88d6f77 100644
--- a/packages/monix-reactive/test/ts/internal/builders/loop.test.ts
+++ b/packages/monix-reactive/test/ts/internal/builders/loop.test.ts
@@ -16,13 +16,13 @@
*/
import * as assert from "../../asserts"
-import { Observable, Ack, Continue, Stop } from "../../../../src"
+import { loop, Observable, Ack, Continue, Stop } from "../../../../src"
import { TestScheduler, Throwable, Scheduler, Future } from "funfix"
import { LoopObservable } from "../../../../src/internal/builders/loop"
describe("LoopObservable", () => {
it("can be created using Observable.loop()", () => {
- const o: Observable = Observable.loop()
+ const o: Observable = loop()
})
it("should start with 0, increment by 1 and Stop", () => {
diff --git a/packages/monix-reactive/test/ts/internal/builders/never.test.ts b/packages/monix-reactive/test/ts/internal/builders/never.test.ts
index 9029742..1c70f6f 100644
--- a/packages/monix-reactive/test/ts/internal/builders/never.test.ts
+++ b/packages/monix-reactive/test/ts/internal/builders/never.test.ts
@@ -16,13 +16,13 @@
*/
import * as assert from "../../asserts"
-import { Observable, Ack } from "../../../../src"
+import { never, Observable, Ack } from "../../../../src"
import { TestScheduler, Throwable } from "funfix"
describe("NeverObservable", () => {
it("should never complete", () => {
const s = new TestScheduler()
- Observable.never().unsafeSubscribeFn({
+ never().unsafeSubscribeFn({
scheduler: s,
onNext: (elem: any): Ack => {
throw new Error("Illegal state: onNext should never be called")
diff --git a/packages/monix-reactive/test/ts/internal/builders/pure.test.ts b/packages/monix-reactive/test/ts/internal/builders/pure.test.ts
index ed0d3ec..3cbd623 100644
--- a/packages/monix-reactive/test/ts/internal/builders/pure.test.ts
+++ b/packages/monix-reactive/test/ts/internal/builders/pure.test.ts
@@ -16,14 +16,14 @@
*/
import * as assert from "../../asserts"
-import { Observable, Ack, Continue, Stop } from "../../../../src"
+import { pure, Ack, Continue, Stop } from "../../../../src"
import { TestScheduler, Throwable } from "funfix"
describe("PureObservable", () => {
it("should issue 1 element and complete", () => {
let elementsCnt = 0
let wasCompleted = false
- Observable.pure("hello").unsafeSubscribeFn({
+ pure("hello").unsafeSubscribeFn({
scheduler: new TestScheduler(),
onNext: (elem: string): Ack => {
assert.equal(elem, "hello")
@@ -44,7 +44,7 @@ describe("PureObservable", () => {
it("should not call onComplete if stopped", () => {
let wasCompleted = false
- Observable.pure("hello").subscribe(
+ pure("hello").subscribe(
(element) => {
return Stop
},
diff --git a/packages/monix-reactive/test/ts/observable.test.ts b/packages/monix-reactive/test/ts/observable.test.ts
index a395394..3fa4fb8 100644
--- a/packages/monix-reactive/test/ts/observable.test.ts
+++ b/packages/monix-reactive/test/ts/observable.test.ts
@@ -16,25 +16,25 @@
*/
import { id } from "funfix"
-import { Observable } from "../../src"
+import * as Mx from "../../src"
import * as assert from "./asserts"
import { EmptyObservable } from "../../src/internal/builders/empty"
-describe("Observable", () => {
- describe(".empty()", () => {
+describe("Builders", () => {
+ describe("empty()", () => {
it("creates new observable instance", () => {
- const o1: Observable = Observable.empty()
- const o2: Observable = Observable.empty()
+ const o1: Mx.Observable = Mx.empty()
+ const o2: Mx.Observable = Mx.empty()
})
it("returns singleton observable instance", () => {
- assert.equal(Observable.empty(), EmptyObservable)
+ assert.equal(Mx.empty(), EmptyObservable)
})
})
describe(".pure()", () => {
it("creates new observable instance", () => {
- const o: Observable = Observable.pure("Hello")
+ const o: Mx.Observable = Mx.pure("Hello")
})
})
})
diff --git a/packages/monix-reactive/test/flow/internal/builders/loop.test.js.flow b/packages/monix-reactive/test/ts/operators.test.ts
similarity index 75%
rename from packages/monix-reactive/test/flow/internal/builders/loop.test.js.flow
rename to packages/monix-reactive/test/ts/operators.test.ts
index 32a3e65..dbca7ea 100644
--- a/packages/monix-reactive/test/flow/internal/builders/loop.test.js.flow
+++ b/packages/monix-reactive/test/ts/operators.test.ts
@@ -15,6 +15,12 @@
* limitations under the License.
*/
-import { Observable } from "../../../../src"
+import * as Mx from "../../src"
-const o: Observable = Observable.loop();
+describe("Operators", () => {
+ describe("id", () => {
+ it("returns original subscriber", () => {
+ const o: Mx.Observable = Mx.pure(10).pipe(Mx.id)
+ })
+ })
+})