diff --git a/packages/monix-reactive/rootdoc.md b/packages/monix-reactive/rootdoc.md index e483aef..2ad0c59 100644 --- a/packages/monix-reactive/rootdoc.md +++ b/packages/monix-reactive/rootdoc.md @@ -9,8 +9,8 @@ Create observables: | | | |-------------------|--------------------------------------------------------------------------------------- | -| {@link Observable.empty} | Creates an observable that doesn't emit anything, but immediately calls onComplete instead. | -| {@link Observable.pure} | Returns an Observable that on execution emits the given strict value. | +| {@link empty} | Creates an observable that doesn't emit anything, but immediately calls onComplete instead. | +| {@link pure} | Returns an Observable that on execution emits the given strict value. | ## Usage @@ -52,7 +52,7 @@ Usage sample: ```typescript import * as Mx from "monix" -const stream = Mx.Observable.empty() +const stream = Mx.empty() ``` ### Modules: UMD and ES 2015 diff --git a/packages/monix-reactive/src/internal/builders/eval.js.flow b/packages/monix-reactive/src/builders.js.flow similarity index 51% rename from packages/monix-reactive/src/internal/builders/eval.js.flow rename to packages/monix-reactive/src/builders.js.flow index 3c1ec0a..56bea96 100644 --- a/packages/monix-reactive/src/internal/builders/eval.js.flow +++ b/packages/monix-reactive/src/builders.js.flow @@ -15,16 +15,21 @@ * limitations under the License. */ -import { ObservableInstance } from "../instance" +import { Observable } from "./observable" +import { Scheduler } from "funfix" -declare export class EvalAlwaysObservable<+A> extends ObservableInstance { - constructor(_fn: () => A): EvalAlwaysObservable; +declare export function empty(): Observable; - unsafeSubscribeFn(subscriber: Subscriber): Cancelable; -} +declare export function pure(value: A): Observable; -declare export class EvalOnceObservable<+A> extends ObservableInstance { - constructor(_fn: () => A): EvalOnceObservable; +declare export function never(): Observable; - unsafeSubscribeFn(subscriber: Subscriber): Cancelable; -} +declare export function evalAlways(fn: () => A): Observable; + +declare export function evalOnce(fn: () => A): Observable; + +declare export function fromArray(arr: Array, scheduler?: Scheduler): Observable; + +declare export function items(...items: Array): Observable; + +declare export function loop(scheduler?: Scheduler): Observable; diff --git a/packages/monix-reactive/src/builders.ts b/packages/monix-reactive/src/builders.ts new file mode 100644 index 0000000..3088278 --- /dev/null +++ b/packages/monix-reactive/src/builders.ts @@ -0,0 +1,86 @@ +/*! + * Copyright (c) 2018 by The Monix.js Project Developers. + * Some rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import { Observable } from "./observable" +import { Scheduler } from "funfix" +import { EmptyObservable } from "./internal/builders/empty" +import { NeverObservable } from "./internal/builders/never" +import { PureObservable } from "./internal/builders/pure" +import { EvalAlwaysObservable, EvalOnceObservable } from "./internal/builders/eval" +import { ArrayObservable } from "./internal/builders/array" +import { LoopObservable } from "./internal/builders/loop" + +/** + * Create empty observable + */ +export function empty(): Observable { + return EmptyObservable +} + +/** + * Create an observable which issues single given value and completes + */ +export function pure(value: A): Observable { + return new PureObservable(value) +} + +/** + * Creates an observable that never issues any elements, completes or fails + */ +export function never(): Observable { + return NeverObservable +} + +/** + * Creates an observable that issues single element from evaluating given expression (function) + * @param fn expression to evauate and retrieve element value + */ +export function evalAlways(fn: () => A): Observable { + return new EvalAlwaysObservable(fn) +} + +/** + * Creates an observable that issues single element from evaluating given expression (function) + * After first evaluation it memoize result value (or error) and uses it for other subscribers + * @param fn expression to evaluate and retrieve element value + */ +export function evalOnce(fn: () => A): Observable { + return new EvalOnceObservable(fn) +} + +/** + * Creates an observable that issues all elements of given array with backpressure + * @param arr array containing elements + * @param scheduler optional scheduler + */ +export function fromArray(arr: Array, scheduler?: Scheduler): Observable { + return new ArrayObservable(arr, scheduler || Scheduler.global.get()) +} + +/** + * Creates an observable that issues all arguments + */ +export function items(...items: Array): Observable { + return new ArrayObservable(items, Scheduler.global.get()) +} + +/** + * Creates an observable that loops indefinitely until stopped, issues integers starting with 0 (zero) + */ +export function loop(scheduler?: Scheduler): Observable { + return new LoopObservable(scheduler || Scheduler.global.get()) +} diff --git a/packages/monix-reactive/src/index.ts b/packages/monix-reactive/src/index.ts index a2fa1ba..5eff763 100644 --- a/packages/monix-reactive/src/index.ts +++ b/packages/monix-reactive/src/index.ts @@ -15,7 +15,8 @@ * limitations under the License. */ -export * from "./observable" -export * from "./operators" export * from "./ack" export * from "./observer" +export * from "./observable" +export * from "./builders" +export * from "./operators" diff --git a/packages/monix-reactive/src/internal/ack-utils.ts b/packages/monix-reactive/src/internal/ack-utils.ts index cce08fe..5497e97 100644 --- a/packages/monix-reactive/src/internal/ack-utils.ts +++ b/packages/monix-reactive/src/internal/ack-utils.ts @@ -21,6 +21,7 @@ import { Ack, SyncAck, AsyncAck, Stop, Continue } from "../ack" /** * Executes callback synchronously for given Ack * @private + * @hidden */ export function syncOn(ack: Ack, callback: (t: Try) => void): Ack { if (ack === Continue || ack === Stop) { @@ -35,6 +36,7 @@ export function syncOn(ack: Ack, callback: (t: Try) => void): Ack { /** * Executes callback only for sync or async Continue ack * @private + * @hidden */ export function syncOnContinue(ack: Ack, callback: () => void): Ack { if (ack === Continue) { @@ -53,6 +55,7 @@ export function syncOnContinue(ack: Ack, callback: () => void): Ack { /** * Executes callback only for Stop or failed async Ack * @private + * @hidden */ export function syncOnStopOrFailure(ack: Ack, callback: () => void): Ack { if (ack === Stop) { @@ -71,6 +74,7 @@ export function syncOnStopOrFailure(ack: Ack, callback: () => void): Ack { /** * Tries to flatten ack * @private + * @hidden */ export function syncTryFlatten(ack: Ack, scheduler: Scheduler): Ack { if (ack === Continue || ack === Stop) { diff --git a/packages/monix-reactive/src/internal/builders/array.ts b/packages/monix-reactive/src/internal/builders/array.ts index e0bcd33..93c6619 100644 --- a/packages/monix-reactive/src/internal/builders/array.ts +++ b/packages/monix-reactive/src/internal/builders/array.ts @@ -15,15 +15,17 @@ * limitations under the License. */ -import { ObservableInstance } from "../instance" +import { Observable } from "../../observable" import { Continue, Stop, AsyncAck } from "../../ack" import { Subscriber } from "../../observer" import { Cancelable, Scheduler, BoolCancelable, IBoolCancelable } from "funfix" /** * An Observale that issues all elements of an array, with backpressure + * @private + * @hidden */ -export class ArrayObservable extends ObservableInstance { +export class ArrayObservable extends Observable { constructor(private readonly _arr: Array, private readonly _scheduler: Scheduler) { diff --git a/packages/monix-reactive/src/internal/builders/empty.js.flow b/packages/monix-reactive/src/internal/builders/empty.js.flow deleted file mode 100644 index db33505..0000000 --- a/packages/monix-reactive/src/internal/builders/empty.js.flow +++ /dev/null @@ -1,23 +0,0 @@ -/*! - * Copyright (c) 2018 by The Monix.js Project Developers. - * Some rights reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -import { ObservableInstance } from "../instance" - -/** - * Empty observable object: bottom type, type parameter not used (phantom type) - */ -declare export var EmptyObservable: ObservableInstance diff --git a/packages/monix-reactive/src/internal/builders/empty.ts b/packages/monix-reactive/src/internal/builders/empty.ts index 3e6d2b0..e652e7a 100644 --- a/packages/monix-reactive/src/internal/builders/empty.ts +++ b/packages/monix-reactive/src/internal/builders/empty.ts @@ -15,14 +15,16 @@ * limitations under the License. */ -import { ObservableInstance } from "../instance" +import { Observable } from "../../observable" import { Subscriber } from "../../observer" import { Cancelable } from "funfix" /** * Completes immediately on subscribe without issueing any items + * @private + * @hidden */ -class EmptyObservableImpl extends ObservableInstance { +class EmptyObservableImpl extends Observable { unsafeSubscribeFn(subscriber: Subscriber): Cancelable { subscriber.onComplete() @@ -34,5 +36,7 @@ class EmptyObservableImpl extends ObservableInstance { * {@link EmptyObservable} completes immediately on subscribe without issueing any values. * EmptyObservable object uses [Bottom Type](https://en.wikipedia.org/wiki/Bottom_type) * for elements to match all other types + * @private + * @hidden */ -export const EmptyObservable: ObservableInstance = new EmptyObservableImpl() +export const EmptyObservable: Observable = new EmptyObservableImpl() diff --git a/packages/monix-reactive/src/internal/builders/eval.ts b/packages/monix-reactive/src/internal/builders/eval.ts index 5b3c5e1..7aeaa9b 100644 --- a/packages/monix-reactive/src/internal/builders/eval.ts +++ b/packages/monix-reactive/src/internal/builders/eval.ts @@ -15,14 +15,16 @@ * limitations under the License. */ -import { ObservableInstance } from "../instance" +import { Observable } from "../../observable" import { Subscriber } from "../../observer" import { Cancelable, Throwable } from "funfix" /** * An observable that evaluates the given function argument and emits its result. + * @private + * @hidden */ -export class EvalAlwaysObservable extends ObservableInstance { +export class EvalAlwaysObservable extends Observable { constructor(private readonly _fn: () => A) { super() } @@ -48,8 +50,10 @@ export class EvalAlwaysObservable extends ObservableInstance { /** * An observable that evaluates once the given function argument and emits its result. + * @private + * @hidden */ -export class EvalOnceObservable extends ObservableInstance { +export class EvalOnceObservable extends Observable { private _result!: A private _errorThrown: Throwable | null = null private _hasResult: boolean = false diff --git a/packages/monix-reactive/src/internal/builders/loop.ts b/packages/monix-reactive/src/internal/builders/loop.ts index 4c47567..a17f1b0 100644 --- a/packages/monix-reactive/src/internal/builders/loop.ts +++ b/packages/monix-reactive/src/internal/builders/loop.ts @@ -15,15 +15,17 @@ * limitations under the License. */ -import { ObservableInstance } from "../instance" +import { Observable } from "../../observable" import { Continue, Stop, AsyncAck } from "../../ack" import { Subscriber } from "../../observer" import { Cancelable, Scheduler, BoolCancelable, IBoolCancelable } from "funfix" /** * Loops indefinitely until stopped, issues integers starting with 0 (zero) + * @private + * @hidden */ -export class LoopObservable extends ObservableInstance { +export class LoopObservable extends Observable { constructor(private readonly _scheduler: Scheduler) { super() } diff --git a/packages/monix-reactive/src/internal/builders/never.js.flow b/packages/monix-reactive/src/internal/builders/never.js.flow deleted file mode 100644 index 0db2c98..0000000 --- a/packages/monix-reactive/src/internal/builders/never.js.flow +++ /dev/null @@ -1,23 +0,0 @@ -/*! - * Copyright (c) 2018 by The Monix.js Project Developers. - * Some rights reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -import { ObservableInstance } from "../instance" - -/** - * Never observable object: bottom type, type parameter not used (phantom type) - */ -declare export var NeverObservable: ObservableInstance diff --git a/packages/monix-reactive/src/internal/builders/never.ts b/packages/monix-reactive/src/internal/builders/never.ts index 2fb6005..15bd153 100644 --- a/packages/monix-reactive/src/internal/builders/never.ts +++ b/packages/monix-reactive/src/internal/builders/never.ts @@ -15,14 +15,16 @@ * limitations under the License. */ -import { ObservableInstance } from "../instance" +import { Observable } from "../../observable" import { Subscriber } from "../../observer" import { Cancelable } from "funfix" /** * Never issues elements, complets or fails + * @private + * @hidden */ -class NeverObservableImpl extends ObservableInstance { +class NeverObservableImpl extends Observable { unsafeSubscribeFn(subscriber: Subscriber): Cancelable { return Cancelable.empty() } @@ -32,5 +34,7 @@ class NeverObservableImpl extends ObservableInstance { * {@link NeverObservable} never issues any elements, complets of rails * NeverObservable object uses [Bottom Type](https://en.wikipedia.org/wiki/Bottom_type) * for elements to match all other types + * @private + * @hidden */ -export const NeverObservable: ObservableInstance = new NeverObservableImpl() +export const NeverObservable: Observable = new NeverObservableImpl() diff --git a/packages/monix-reactive/src/internal/builders/pure.js.flow b/packages/monix-reactive/src/internal/builders/pure.js.flow deleted file mode 100644 index 2026a4e..0000000 --- a/packages/monix-reactive/src/internal/builders/pure.js.flow +++ /dev/null @@ -1,24 +0,0 @@ -/*! - * Copyright (c) 2018 by The Monix.js Project Developers. - * Some rights reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -import { ObservableInstance } from "../instance" - -declare export class PureObservable<+A> extends ObservableInstance { - constructor(_value: A): PureObservable; - - unsafeSubscribeFn(subscriber: Subscriber): Cancelable; -} diff --git a/packages/monix-reactive/src/internal/builders/pure.ts b/packages/monix-reactive/src/internal/builders/pure.ts index e7e0f2b..5738ea9 100644 --- a/packages/monix-reactive/src/internal/builders/pure.ts +++ b/packages/monix-reactive/src/internal/builders/pure.ts @@ -15,7 +15,7 @@ * limitations under the License. */ -import { ObservableInstance } from "../instance" +import { Observable } from "../../observable" import { Subscriber } from "../../observer" import { Cancelable } from "funfix" @@ -23,8 +23,10 @@ import { Cancelable } from "funfix" * An {@link Observable} that issues single element * * Source: [Monix NowObservable](https://github.com/monix/monix/blob/master/monix-reactive/shared/src/main/scala/monix/reactive/internal/builders/NowObservable.scala) + * @private + * @hidden */ -export class PureObservable extends ObservableInstance { +export class PureObservable extends Observable { constructor(private readonly _value: A) { super() diff --git a/packages/monix-reactive/src/internal/instance.ts b/packages/monix-reactive/src/internal/instance.ts deleted file mode 100644 index 43ce904..0000000 --- a/packages/monix-reactive/src/internal/instance.ts +++ /dev/null @@ -1,43 +0,0 @@ -/*! - * Copyright (c) 2018 by The Monix.js Project Developers. - * Some rights reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -import { Ack } from "../ack" -import { Subscriber, Operator } from "../observer" -import { Cancelable, Scheduler, Throwable } from "funfix" -import { SafeSubscriber } from "./subscribers/safe" -import { SubscriberWrap } from "./subscribers/wrap" - -/** - * {@link ObservableInstance} exposes observable instance operations, used internally by implementations - * - * It's exists mostly to workarround circular reference between {@link Observable} - * and {@link Observer} implementations. Methods implemented in {@link ObservableMixin} which - * is applied _lazily_ in package root modules - */ -export abstract class ObservableInstance { - abstract unsafeSubscribeFn(subscriber: Subscriber): Cancelable - - subscribeWith(out: Subscriber): Cancelable { - return this.unsafeSubscribeFn(new SafeSubscriber(out)) - } - - subscribe(nextFn?: (elem: A) => Ack, errorFn?: (e: Throwable) => void, completeFn?: () => void, scheduler?: Scheduler): Cancelable { - return this.subscribeWith(new SubscriberWrap(nextFn, errorFn, completeFn, scheduler)) - } - - pipe!: (operator: Operator) => ObservableInstance -} diff --git a/packages/monix-reactive/src/internal/mixin.ts b/packages/monix-reactive/src/internal/mixin.ts deleted file mode 100644 index a62adbf..0000000 --- a/packages/monix-reactive/src/internal/mixin.ts +++ /dev/null @@ -1,38 +0,0 @@ -/*! - * Copyright (c) 2018 by The Monix.js Project Developers. - * Some rights reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -import { Ack } from "../ack" -import { Subscriber, Operator } from "../observer" -import { Cancelable, Scheduler, Throwable } from "funfix" -import { ObservableInstance } from "./instance" - -/** - * Observable operators mixin - * - * Is applied to ObservableInstance in order to avoid circular references be - */ -export abstract class OperatorsMixin { - abstract unsafeSubscribeFn(subscriber: Subscriber): Cancelable - - abstract subscribeWith(out: Subscriber): Cancelable - - abstract subscribe(nextFn?: (elem: A) => Ack, errorFn?: (e: Throwable) => void, completeFn?: () => void, scheduler?: Scheduler): Cancelable - - pipe(operator: Operator): ObservableInstance { - throw new Error("not implemented") - } -} diff --git a/packages/monix-reactive/src/internal/subscribers/safe.js.flow b/packages/monix-reactive/src/internal/subscribers/safe.js.flow deleted file mode 100644 index 4bd14b5..0000000 --- a/packages/monix-reactive/src/internal/subscribers/safe.js.flow +++ /dev/null @@ -1,28 +0,0 @@ -/*! - * Copyright (c) 2018 by The Monix.js Project Developers. - * Some rights reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -import { Subscriber } from "../../observer" - -declare export class SafeSubscriber<+A> extends Subscriber { - scheduler: Scheduler; - - constructor(subscriber: Subscriber, scheduler?: Scheduler): SafeSubscriber; - - onNext(elem: A): Ack; - onComplete(): void; - onError(e: Throwable): void; -} diff --git a/packages/monix-reactive/src/internal/subscribers/safe.ts b/packages/monix-reactive/src/internal/subscribers/safe.ts index 462ba8f..2b9ef8d 100644 --- a/packages/monix-reactive/src/internal/subscribers/safe.ts +++ b/packages/monix-reactive/src/internal/subscribers/safe.ts @@ -30,6 +30,8 @@ import * as AckUtils from "../ack-utils" * `onNext` events, ensuring that the grammar is respected * - if downstream signals a `Stop`, the observer no longer accepts any events, * ensuring that the grammar is respected + * @private + * @hidden */ export class SafeSubscriber implements Subscriber { private _isDone: boolean = false diff --git a/packages/monix-reactive/src/internal/subscribers/wrap.js.flow b/packages/monix-reactive/src/internal/subscribers/wrap.js.flow deleted file mode 100644 index 9a65f2b..0000000 --- a/packages/monix-reactive/src/internal/subscribers/wrap.js.flow +++ /dev/null @@ -1,31 +0,0 @@ -/*! - * Copyright (c) 2018 by The Monix.js Project Developers. - * Some rights reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -import { Subscriber } from "../../observer" - -declare export class WrapSubscriber<+A> extends Subscriber { - scheduler: Scheduler; - - constructor(nextFn?: (elem: A) => Ack, - errorFn?: (e: Throwable) => void, - completeFn?: () => void, - scheduler?: Scheduler): WrapSubscriber; - - onNext(elem: A): Ack; - onComplete(): void; - onError(e: Throwable): void; -} diff --git a/packages/monix-reactive/src/internal/subscribers/wrap.ts b/packages/monix-reactive/src/internal/subscribers/wrap.ts index 3cf3982..decb01b 100644 --- a/packages/monix-reactive/src/internal/subscribers/wrap.ts +++ b/packages/monix-reactive/src/internal/subscribers/wrap.ts @@ -19,6 +19,11 @@ import { Ack, Continue } from "../../ack" import { Subscriber } from "../../observer" import { Scheduler, Throwable } from "funfix" +/** + * Wraps functions to Subscriber interface + * @private + * @hidden + */ export class SubscriberWrap implements Subscriber { readonly scheduler: Scheduler diff --git a/packages/monix-reactive/src/observable.ts b/packages/monix-reactive/src/observable.ts index d2318bf..e5740b1 100644 --- a/packages/monix-reactive/src/observable.ts +++ b/packages/monix-reactive/src/observable.ts @@ -15,83 +15,64 @@ * limitations under the License. */ -import { applyMixins, Scheduler } from "funfix" -import { OperatorsMixin } from "./internal/mixin" -import { ObservableInstance } from "./internal/instance" -import { EmptyObservable } from "./internal/builders/empty" -import { NeverObservable } from "./internal/builders/never" -import { PureObservable } from "./internal/builders/pure" -import { EvalAlwaysObservable, EvalOnceObservable } from "./internal/builders/eval" -import { ArrayObservable } from "./internal/builders/array" -import { LoopObservable } from "./internal/builders/loop" +import { applyMixins, Scheduler, Cancelable, Throwable } from "funfix" +import { Ack } from "./ack" +import { Subscriber, Operator } from "./observer" +import { SafeSubscriber } from "./internal/subscribers/safe" +import { SubscriberWrap } from "./internal/subscribers/wrap" /** - * apply mixins + * Fluent interface to transform and subscribe to streams */ -applyMixins(ObservableInstance, [OperatorsMixin]) - -/** - * Observable object contains builder methods that help you create new {@link Observable} instances - */ -export abstract class Observable extends ObservableInstance { - /** - * Create empty observable - */ - static empty(): Observable { - return EmptyObservable - } - +export abstract class Observable { /** - * Create an observable which issues single given value and completes + * Subscribe to stream events (unsafe) + * @param out stream subscriber + * @hidden */ - static pure(value: A): Observable { - return new PureObservable(value) - } + abstract unsafeSubscribeFn(out: Subscriber): Cancelable /** - * Creates an observable that never issues any elements, completes or fails + * Subscribe to stream events + * @param out stream subscriber */ - static never(): Observable { - return NeverObservable + subscribeWith(out: Subscriber): Cancelable { + return this.unsafeSubscribeFn(new SafeSubscriber(out)) } /** - * Creates an observable that issues single element from evaluating given expression (function) - * @param fn expression to evauate and retrieve element value + * Subscribe to stream events + * + * @param nextFn callback for `onNext` event + * @param errorFn callback for `onError` event + * @param completeFn callback for `onComplete` event + * @param scheduler custom scheduler (optional) */ - static eval(fn: () => A): Observable { - return new EvalAlwaysObservable(fn) + subscribe(nextFn?: (elem: A) => Ack, errorFn?: (e: Throwable) => void, completeFn?: () => void, scheduler?: Scheduler): Cancelable { + return this.subscribeWith(new SubscriberWrap(nextFn, errorFn, completeFn, scheduler)) } /** - * Creates an observable that issues single element from evaluating given expression (function) - * After first evaluation it memoize result value (or error) and uses it for other subscribers - * @param fn expression to evaluate and retrieve element value + * Apply a stream transformation defined by given operator + * @param operator stream transformation builder */ - static evalOnce(fn: () => A): Observable { - return new EvalOnceObservable(fn) - } - - /** - * Creates an observable that issues all elements of given array with backpressure - * @param arr array containing elements - * @param scheduler optional scheduler - */ - static fromArray(arr: Array, scheduler?: Scheduler): Observable { - return new ArrayObservable(arr, scheduler || Scheduler.global.get()) + pipe(operator: Operator): Observable { + return new LiftByOperatorObservable(this, operator) } +} - /** - * Creates an observable that issues all arguments - */ - static items(...items: Array): Observable { - return new ArrayObservable(items, Scheduler.global.get()) +/** + * @private + * @hidden + */ +class LiftByOperatorObservable extends Observable { + constructor(private readonly _self: Observable, + private readonly _operator: Operator) { + super() } - /** - * Creates an observable that loops indefinitely until stopped, issues integers starting with 0 (zero) - */ - static loop(scheduler?: Scheduler): Observable { - return new LoopObservable(scheduler || Scheduler.global.get()) + unsafeSubscribeFn(subscriber: Subscriber): Cancelable { + const sb = this._operator(subscriber) + return this._self.unsafeSubscribeFn(sb) } } diff --git a/packages/monix-reactive/src/observer.ts b/packages/monix-reactive/src/observer.ts index afb009c..7908adc 100644 --- a/packages/monix-reactive/src/observer.ts +++ b/packages/monix-reactive/src/observer.ts @@ -23,18 +23,29 @@ import { Ack, SyncAck } from "./ack" * get subscribed to an Observable for receiving events. * * The events received must follow the Rx grammar, which is: - * onNext * (onComplete | onError)? + * `onNext * (onComplete | onError)?` * * That means an Observer can receive zero or multiple events, the stream - * ending either in one or zero `onComplete` or `onError` (just one, not both), - * and after onComplete or onError, a well behaved `Observable` - * implementation shouldn't send any more onNext events. + * ending either in one or zero {@link Observer.onComplete} or {@link Observer.onError} (just one, not both), + * and after `onComplete` or `onError`, a well behaved `Observable` + * implementation shouldn't send any more {@link Observer.onNext} events. */ export interface Observer { + /** + * Observe new element + * @param elem stream element + */ onNext(elem: T): Ack + /** + * Signals stream completion + */ onComplete(): void + /** + * Signals stream error + * @param e error object + */ onError(e: Throwable): void } @@ -42,6 +53,7 @@ export interface Observer { * Observer subtype which accepts only SyncAck from onNext element * * Events gramar is same as for Observable + * @hidden */ export interface SyncObserver extends Observer { onNext(elem: T): SyncAck @@ -58,12 +70,13 @@ export interface Subscriber extends Observer { /** * `SyncSubscriber` si an `SyncObserver` with an attached `Scheduler` + * @hidden */ export interface SyncSubscriber extends SyncObserver { readonly scheduler: Scheduler } /** - * `Operator` type alias defines a transformation from one Subscriber to another + * `Operator` type alias defines a transformation from one {@link Subscriber} to another */ export type Operator = (s: Subscriber) => Subscriber diff --git a/packages/monix-reactive/test/flow/internal/builders/array.test.js.flow b/packages/monix-reactive/test/flow/internal/builders/array.test.js.flow deleted file mode 100644 index e43df88..0000000 --- a/packages/monix-reactive/test/flow/internal/builders/array.test.js.flow +++ /dev/null @@ -1,24 +0,0 @@ -/*! - * Copyright (c) 2018 by The Monix.js Project Developers. - * Some rights reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -import { Observable } from "../../../../src" - -const o1: Observable = Observable.fromArray([1,2,3]) -const o2: Observable = Observable.fromArray(["one", "two", "three"]) - -const o3: Observable = Observable.items(1,2,3) -const o4: Observable = Observable.items("one", "two", "three") diff --git a/packages/monix-reactive/test/flow/internal/builders/empty.test.js.flow b/packages/monix-reactive/test/flow/internal/builders/empty.test.js.flow deleted file mode 100644 index 9731777..0000000 --- a/packages/monix-reactive/test/flow/internal/builders/empty.test.js.flow +++ /dev/null @@ -1,23 +0,0 @@ -/*! - * Copyright (c) 2018 by The Monix.js Project Developers. - * Some rights reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -import { Observable } from "../../../../src" - -const o: Observable = Observable.empty() -const o: Observable = Observable.empty() -const o: Observable = Observable.empty() -const o: Observable<10> = Observable.empty() diff --git a/packages/monix-reactive/test/flow/internal/builders/eval.test.js.flow b/packages/monix-reactive/test/flow/internal/builders/eval.test.js.flow deleted file mode 100644 index 7b10e73..0000000 --- a/packages/monix-reactive/test/flow/internal/builders/eval.test.js.flow +++ /dev/null @@ -1,26 +0,0 @@ -/*! - * Copyright (c) 2018 by The Monix.js Project Developers. - * Some rights reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -import { Observable } from "../../../../src" - -const o1: Observable = Observable.eval(() => 101); -const o2: Observable = Observable.eval(() => "hello"); -const o3: Observable<10> = Observable.eval(() => 10); - -const o4: Observable = Observable.evalOnce(() => 101); -const o5: Observable = Observable.evalOnce(() => "hello"); -const o6: Observable<10> = Observable.evalOnce(() => 10); diff --git a/packages/monix-reactive/test/flow/internal/builders/never.test.js.flow b/packages/monix-reactive/test/flow/internal/builders/never.test.js.flow deleted file mode 100644 index abc3dc2..0000000 --- a/packages/monix-reactive/test/flow/internal/builders/never.test.js.flow +++ /dev/null @@ -1,23 +0,0 @@ -/*! - * Copyright (c) 2018 by The Monix.js Project Developers. - * Some rights reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -import { Observable } from "../../../../src" - -const o: Observable = Observable.never() -const o: Observable = Observable.never() -const o: Observable = Observable.never() -const o: Observable<10> = Observable.never() diff --git a/packages/monix-reactive/test/flow/internal/builders/pure.test.js.flow b/packages/monix-reactive/test/flow/internal/builders/pure.test.js.flow deleted file mode 100644 index 7b27099..0000000 --- a/packages/monix-reactive/test/flow/internal/builders/pure.test.js.flow +++ /dev/null @@ -1,21 +0,0 @@ -/*! - * Copyright (c) 2018 by The Monix.js Project Developers. - * Some rights reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -import { Observable, Observable } from "../../../../src" - -const o: Observable = Observable.pure(10) -const o: Observable = Observable.pure("hello") diff --git a/packages/monix-reactive/test/flow/observable.js.flow b/packages/monix-reactive/test/flow/observable.js.flow deleted file mode 100644 index 714a067..0000000 --- a/packages/monix-reactive/test/flow/observable.js.flow +++ /dev/null @@ -1,34 +0,0 @@ -/*! - * Copyright (c) 2018 by The Monix.js Project Developers. - * Some rights reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -import { id } from "funfix" -import { Observable } from "../../src" -import * as assert from "./asserts" -import { EmptyObservable } from "../../src/internal/builders/empty" - -const o1: Observable = Observable.empty(); -const o2: Observable = Observable.empty(); - -const o3: Observable = Observable.pure("Hello"); -const o4: Observable = Observable.never(); -const o4: Observable = Observable.never(); -const o4: Observable = Observable.never(); -const o5: Observable = Observable.eval(() => "hello"); -const o6: Observable = Observable.evalOnce(() => "ehlo"); -const o7: Observable = Observable.fromArray([1, 2, 3]); -const o8: Observable = Observable.items(1, 2, 3); -const o9: Observable = Observable.loop(); diff --git a/packages/monix-reactive/src/internal/builders/array.js.flow b/packages/monix-reactive/test/flow/observable.test.js.flow similarity index 52% rename from packages/monix-reactive/src/internal/builders/array.js.flow rename to packages/monix-reactive/test/flow/observable.test.js.flow index b3713c6..b732e0f 100644 --- a/packages/monix-reactive/src/internal/builders/array.js.flow +++ b/packages/monix-reactive/test/flow/observable.test.js.flow @@ -15,11 +15,17 @@ * limitations under the License. */ -import { ObservableInstance } from "../instance" +import { empty, never, evalAlways, evalOnce, fromArray, items, loop } from "../../src" -declare export class ArrayObservable<+A> extends ObservableInstance { - constructor(arr: Array, - scheduler: Scheduler): ArrayObservable; +const o1: Observable = empty(); +const o2: Observable = empty(); - unsafeSubscribeFn(subscriber: Subscriber): Cancelable; -} +const o3: Observable = pure("Hello"); +const o4: Observable = never(); +const o4: Observable = never(); +const o4: Observable = never(); +const o5: Observable = evalAlways(() => "hello"); +const o6: Observable = evalOnce(() => "ehlo"); +const o7: Observable = fromArray([1, 2, 3]); +const o8: Observable = items(1, 2, 3); +const o9: Observable = loop(); diff --git a/packages/monix-reactive/test/ts/internal/builders/array.test.ts b/packages/monix-reactive/test/ts/internal/builders/array.test.ts index 4710b21..97fa9ef 100644 --- a/packages/monix-reactive/test/ts/internal/builders/array.test.ts +++ b/packages/monix-reactive/test/ts/internal/builders/array.test.ts @@ -16,13 +16,13 @@ */ import * as assert from "../../asserts" -import { Observable, Ack, Continue, Stop } from "../../../../src" +import { fromArray, items, Ack, Continue, Stop } from "../../../../src" import { TestScheduler, Throwable, Future } from "funfix" describe("ArrayObservable", () => { it("should complete immediately for empty array", () => { let wasCompleted = false - Observable.fromArray([]).unsafeSubscribeFn({ + fromArray([]).unsafeSubscribeFn({ scheduler: new TestScheduler(), onNext: (elem: any): Ack => { throw new Error("Illegal state") @@ -40,7 +40,7 @@ describe("ArrayObservable", () => { it("can be also created using Observable.items() function", () => { let wasCompleted = false - Observable.items().unsafeSubscribeFn({ + items().unsafeSubscribeFn({ scheduler: new TestScheduler(), onNext: (elem: any): Ack => { throw new Error("Illegal state") @@ -59,7 +59,7 @@ describe("ArrayObservable", () => { it("should issue all input array values and complete", () => { let wasCompleted = false let issued = [] - Observable.fromArray([1, 2, 3]).unsafeSubscribeFn({ + fromArray([1, 2, 3]).unsafeSubscribeFn({ scheduler: new TestScheduler(), onNext: (elem: number): Ack => { issued.push(elem) @@ -80,7 +80,7 @@ describe("ArrayObservable", () => { it("should not complete if got Stop ack", () => { let wasCompleted = false let issued = [] - Observable.fromArray([1]).unsafeSubscribeFn({ + fromArray([1]).unsafeSubscribeFn({ scheduler: new TestScheduler(), onNext: (elem: number): Ack => { issued.push(elem) @@ -101,7 +101,7 @@ describe("ArrayObservable", () => { it("should issue elements until got Stop ack", () => { let wasCompleted = false let issued = [] - Observable.fromArray([1, 2, 3]).unsafeSubscribeFn({ + fromArray([1, 2, 3]).unsafeSubscribeFn({ scheduler: new TestScheduler(), onNext: (elem: number): Ack => { issued.push(elem) @@ -124,7 +124,7 @@ describe("ArrayObservable", () => { let wasCompleted = false let gotError = false - Observable.fromArray([1, 2, 3]).unsafeSubscribeFn({ + fromArray([1, 2, 3]).unsafeSubscribeFn({ scheduler: new TestScheduler(), onNext: (elem: number): Ack => { return Future.raise(new Error("something went wrong"), scheduler) diff --git a/packages/monix-reactive/test/ts/internal/builders/empty.test.ts b/packages/monix-reactive/test/ts/internal/builders/empty.test.ts index 1b5cd79..ecd64a9 100644 --- a/packages/monix-reactive/test/ts/internal/builders/empty.test.ts +++ b/packages/monix-reactive/test/ts/internal/builders/empty.test.ts @@ -16,13 +16,13 @@ */ import * as assert from "../../asserts" -import { Observable, Ack } from "../../../../src" +import { empty, Ack } from "../../../../src" import { TestScheduler, Throwable } from "funfix" describe("EmptyObservable", () => { it("should complete immediately", () => { let wasCompleted = false - Observable.empty().unsafeSubscribeFn({ + empty().unsafeSubscribeFn({ scheduler: new TestScheduler(), onNext: (elem: any): Ack => { throw new Error("Illegal state") diff --git a/packages/monix-reactive/test/ts/internal/builders/eval.test.ts b/packages/monix-reactive/test/ts/internal/builders/eval.test.ts index 3e0e16b..7a2e447 100644 --- a/packages/monix-reactive/test/ts/internal/builders/eval.test.ts +++ b/packages/monix-reactive/test/ts/internal/builders/eval.test.ts @@ -16,14 +16,14 @@ */ import * as assert from "../../asserts" -import { Observable, Ack, Continue } from "../../../../src" +import { evalAlways, evalOnce, Ack, Continue } from "../../../../src" import { TestScheduler, Throwable } from "funfix" import { SubscriberWrap } from "../../../../src/internal/subscribers/wrap" describe("EvalAlwaysObservable", () => { it("should not eval it's value source if not subscribed", () => { let executed = false - Observable.eval(() => { + evalAlways(() => { executed = true return 0 }) @@ -35,7 +35,7 @@ describe("EvalAlwaysObservable", () => { let executedCnt = 0 let issuedCnt = 0 let completedCnt = 0 - const o = Observable.eval(() => { + const o = evalAlways(() => { executedCnt += 1 return executedCnt }) @@ -56,7 +56,7 @@ describe("EvalAlwaysObservable", () => { let issuedCnt = 0 let completedCnt = 0 let failedCnt = 0 - const o = Observable.eval(() => { + const o = evalAlways(() => { throw new Error("something went wrong") }) @@ -70,7 +70,7 @@ describe("EvalAlwaysObservable", () => { describe("EvalOnceObservable", () => { it("should not eval it's value source if not subscribed", () => { let executed = false - Observable.evalOnce(() => { + evalOnce(() => { executed = true return 0 }) @@ -82,7 +82,7 @@ describe("EvalOnceObservable", () => { let executedCnt = 0 let issuedCnt = 0 let completedCnt = 0 - const o = Observable.evalOnce(() => { + const o = evalOnce(() => { executedCnt += 1 return executedCnt }) @@ -104,7 +104,7 @@ describe("EvalOnceObservable", () => { let issuedCnt = 0 let completedCnt = 0 let failedCnt = 0 - const o = Observable.evalOnce(() => { + const o = evalOnce(() => { throw new Error("something went wrong") }) o.subscribe(_ => { issuedCnt += 1; return Continue }, e => { failedCnt += 1 }, () => { completedCnt += 1 }) @@ -116,7 +116,7 @@ describe("EvalOnceObservable", () => { it("reports downstream failures", () => { const scheduler = new TestScheduler() - const o = Observable.evalOnce(() => "hello") + const o = evalOnce(() => "hello") assert.equal(scheduler.triggeredFailures().length, 0) o.unsafeSubscribeFn(new SubscriberWrap( @@ -139,7 +139,7 @@ describe("EvalOnceObservable", () => { // one more failure reported (total: 2) assert.equal(scheduler.triggeredFailures().length, 2) - Observable.evalOnce(() => { throw new Error("something went wrong") }).unsafeSubscribeFn(new SubscriberWrap( + evalOnce(() => { throw new Error("something went wrong") }).unsafeSubscribeFn(new SubscriberWrap( _ => Continue, e => { throw new Error("Faield to process onComplete") }, () => { }, diff --git a/packages/monix-reactive/test/ts/internal/builders/loop.test.ts b/packages/monix-reactive/test/ts/internal/builders/loop.test.ts index a9d4ba4..88d6f77 100644 --- a/packages/monix-reactive/test/ts/internal/builders/loop.test.ts +++ b/packages/monix-reactive/test/ts/internal/builders/loop.test.ts @@ -16,13 +16,13 @@ */ import * as assert from "../../asserts" -import { Observable, Ack, Continue, Stop } from "../../../../src" +import { loop, Observable, Ack, Continue, Stop } from "../../../../src" import { TestScheduler, Throwable, Scheduler, Future } from "funfix" import { LoopObservable } from "../../../../src/internal/builders/loop" describe("LoopObservable", () => { it("can be created using Observable.loop()", () => { - const o: Observable = Observable.loop() + const o: Observable = loop() }) it("should start with 0, increment by 1 and Stop", () => { diff --git a/packages/monix-reactive/test/ts/internal/builders/never.test.ts b/packages/monix-reactive/test/ts/internal/builders/never.test.ts index 9029742..1c70f6f 100644 --- a/packages/monix-reactive/test/ts/internal/builders/never.test.ts +++ b/packages/monix-reactive/test/ts/internal/builders/never.test.ts @@ -16,13 +16,13 @@ */ import * as assert from "../../asserts" -import { Observable, Ack } from "../../../../src" +import { never, Observable, Ack } from "../../../../src" import { TestScheduler, Throwable } from "funfix" describe("NeverObservable", () => { it("should never complete", () => { const s = new TestScheduler() - Observable.never().unsafeSubscribeFn({ + never().unsafeSubscribeFn({ scheduler: s, onNext: (elem: any): Ack => { throw new Error("Illegal state: onNext should never be called") diff --git a/packages/monix-reactive/test/ts/internal/builders/pure.test.ts b/packages/monix-reactive/test/ts/internal/builders/pure.test.ts index ed0d3ec..3cbd623 100644 --- a/packages/monix-reactive/test/ts/internal/builders/pure.test.ts +++ b/packages/monix-reactive/test/ts/internal/builders/pure.test.ts @@ -16,14 +16,14 @@ */ import * as assert from "../../asserts" -import { Observable, Ack, Continue, Stop } from "../../../../src" +import { pure, Ack, Continue, Stop } from "../../../../src" import { TestScheduler, Throwable } from "funfix" describe("PureObservable", () => { it("should issue 1 element and complete", () => { let elementsCnt = 0 let wasCompleted = false - Observable.pure("hello").unsafeSubscribeFn({ + pure("hello").unsafeSubscribeFn({ scheduler: new TestScheduler(), onNext: (elem: string): Ack => { assert.equal(elem, "hello") @@ -44,7 +44,7 @@ describe("PureObservable", () => { it("should not call onComplete if stopped", () => { let wasCompleted = false - Observable.pure("hello").subscribe( + pure("hello").subscribe( (element) => { return Stop }, diff --git a/packages/monix-reactive/test/ts/observable.test.ts b/packages/monix-reactive/test/ts/observable.test.ts index a395394..3fa4fb8 100644 --- a/packages/monix-reactive/test/ts/observable.test.ts +++ b/packages/monix-reactive/test/ts/observable.test.ts @@ -16,25 +16,25 @@ */ import { id } from "funfix" -import { Observable } from "../../src" +import * as Mx from "../../src" import * as assert from "./asserts" import { EmptyObservable } from "../../src/internal/builders/empty" -describe("Observable", () => { - describe(".empty()", () => { +describe("Builders", () => { + describe("empty()", () => { it("creates new observable instance", () => { - const o1: Observable = Observable.empty() - const o2: Observable = Observable.empty() + const o1: Mx.Observable = Mx.empty() + const o2: Mx.Observable = Mx.empty() }) it("returns singleton observable instance", () => { - assert.equal(Observable.empty(), EmptyObservable) + assert.equal(Mx.empty(), EmptyObservable) }) }) describe(".pure()", () => { it("creates new observable instance", () => { - const o: Observable = Observable.pure("Hello") + const o: Mx.Observable = Mx.pure("Hello") }) }) }) diff --git a/packages/monix-reactive/test/flow/internal/builders/loop.test.js.flow b/packages/monix-reactive/test/ts/operators.test.ts similarity index 75% rename from packages/monix-reactive/test/flow/internal/builders/loop.test.js.flow rename to packages/monix-reactive/test/ts/operators.test.ts index 32a3e65..dbca7ea 100644 --- a/packages/monix-reactive/test/flow/internal/builders/loop.test.js.flow +++ b/packages/monix-reactive/test/ts/operators.test.ts @@ -15,6 +15,12 @@ * limitations under the License. */ -import { Observable } from "../../../../src" +import * as Mx from "../../src" -const o: Observable = Observable.loop(); +describe("Operators", () => { + describe("id", () => { + it("returns original subscriber", () => { + const o: Mx.Observable = Mx.pure(10).pipe(Mx.id) + }) + }) +})