update
This commit is contained in:
185
node_modules/rxjs/src/internal/Subject.ts
generated
vendored
Normal file
185
node_modules/rxjs/src/internal/Subject.ts
generated
vendored
Normal file
@@ -0,0 +1,185 @@
|
||||
import { Operator } from './Operator';
|
||||
import { Observable } from './Observable';
|
||||
import { Subscriber } from './Subscriber';
|
||||
import { Subscription, EMPTY_SUBSCRIPTION } from './Subscription';
|
||||
import { Observer, SubscriptionLike, TeardownLogic } from './types';
|
||||
import { ObjectUnsubscribedError } from './util/ObjectUnsubscribedError';
|
||||
import { arrRemove } from './util/arrRemove';
|
||||
import { errorContext } from './util/errorContext';
|
||||
|
||||
/**
|
||||
* A Subject is a special type of Observable that allows values to be
|
||||
* multicasted to many Observers. Subjects are like EventEmitters.
|
||||
*
|
||||
* Every Subject is an Observable and an Observer. You can subscribe to a
|
||||
* Subject, and you can call next to feed values as well as error and complete.
|
||||
*/
|
||||
export class Subject<T> extends Observable<T> implements SubscriptionLike {
|
||||
closed = false;
|
||||
|
||||
private currentObservers: Observer<T>[] | null = null;
|
||||
|
||||
/** @deprecated Internal implementation detail, do not use directly. Will be made internal in v8. */
|
||||
observers: Observer<T>[] = [];
|
||||
/** @deprecated Internal implementation detail, do not use directly. Will be made internal in v8. */
|
||||
isStopped = false;
|
||||
/** @deprecated Internal implementation detail, do not use directly. Will be made internal in v8. */
|
||||
hasError = false;
|
||||
/** @deprecated Internal implementation detail, do not use directly. Will be made internal in v8. */
|
||||
thrownError: any = null;
|
||||
|
||||
/**
|
||||
* Creates a "subject" by basically gluing an observer to an observable.
|
||||
*
|
||||
* @deprecated Recommended you do not use. Will be removed at some point in the future. Plans for replacement still under discussion.
|
||||
*/
|
||||
static create: (...args: any[]) => any = <T>(destination: Observer<T>, source: Observable<T>): AnonymousSubject<T> => {
|
||||
return new AnonymousSubject<T>(destination, source);
|
||||
};
|
||||
|
||||
constructor() {
|
||||
// NOTE: This must be here to obscure Observable's constructor.
|
||||
super();
|
||||
}
|
||||
|
||||
/** @deprecated Internal implementation detail, do not use directly. Will be made internal in v8. */
|
||||
lift<R>(operator: Operator<T, R>): Observable<R> {
|
||||
const subject = new AnonymousSubject(this, this);
|
||||
subject.operator = operator as any;
|
||||
return subject as any;
|
||||
}
|
||||
|
||||
/** @internal */
|
||||
protected _throwIfClosed() {
|
||||
if (this.closed) {
|
||||
throw new ObjectUnsubscribedError();
|
||||
}
|
||||
}
|
||||
|
||||
next(value: T) {
|
||||
errorContext(() => {
|
||||
this._throwIfClosed();
|
||||
if (!this.isStopped) {
|
||||
if (!this.currentObservers) {
|
||||
this.currentObservers = Array.from(this.observers);
|
||||
}
|
||||
for (const observer of this.currentObservers) {
|
||||
observer.next(value);
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
error(err: any) {
|
||||
errorContext(() => {
|
||||
this._throwIfClosed();
|
||||
if (!this.isStopped) {
|
||||
this.hasError = this.isStopped = true;
|
||||
this.thrownError = err;
|
||||
const { observers } = this;
|
||||
while (observers.length) {
|
||||
observers.shift()!.error(err);
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
complete() {
|
||||
errorContext(() => {
|
||||
this._throwIfClosed();
|
||||
if (!this.isStopped) {
|
||||
this.isStopped = true;
|
||||
const { observers } = this;
|
||||
while (observers.length) {
|
||||
observers.shift()!.complete();
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
unsubscribe() {
|
||||
this.isStopped = this.closed = true;
|
||||
this.observers = this.currentObservers = null!;
|
||||
}
|
||||
|
||||
get observed() {
|
||||
return this.observers?.length > 0;
|
||||
}
|
||||
|
||||
/** @internal */
|
||||
protected _trySubscribe(subscriber: Subscriber<T>): TeardownLogic {
|
||||
this._throwIfClosed();
|
||||
return super._trySubscribe(subscriber);
|
||||
}
|
||||
|
||||
/** @internal */
|
||||
protected _subscribe(subscriber: Subscriber<T>): Subscription {
|
||||
this._throwIfClosed();
|
||||
this._checkFinalizedStatuses(subscriber);
|
||||
return this._innerSubscribe(subscriber);
|
||||
}
|
||||
|
||||
/** @internal */
|
||||
protected _innerSubscribe(subscriber: Subscriber<any>) {
|
||||
const { hasError, isStopped, observers } = this;
|
||||
if (hasError || isStopped) {
|
||||
return EMPTY_SUBSCRIPTION;
|
||||
}
|
||||
this.currentObservers = null;
|
||||
observers.push(subscriber);
|
||||
return new Subscription(() => {
|
||||
this.currentObservers = null;
|
||||
arrRemove(observers, subscriber);
|
||||
});
|
||||
}
|
||||
|
||||
/** @internal */
|
||||
protected _checkFinalizedStatuses(subscriber: Subscriber<any>) {
|
||||
const { hasError, thrownError, isStopped } = this;
|
||||
if (hasError) {
|
||||
subscriber.error(thrownError);
|
||||
} else if (isStopped) {
|
||||
subscriber.complete();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a new Observable with this Subject as the source. You can do this
|
||||
* to create custom Observer-side logic of the Subject and conceal it from
|
||||
* code that uses the Observable.
|
||||
* @return Observable that this Subject casts to.
|
||||
*/
|
||||
asObservable(): Observable<T> {
|
||||
const observable: any = new Observable<T>();
|
||||
observable.source = this;
|
||||
return observable;
|
||||
}
|
||||
}
|
||||
|
||||
export class AnonymousSubject<T> extends Subject<T> {
|
||||
constructor(
|
||||
/** @deprecated Internal implementation detail, do not use directly. Will be made internal in v8. */
|
||||
public destination?: Observer<T>,
|
||||
source?: Observable<T>
|
||||
) {
|
||||
super();
|
||||
this.source = source;
|
||||
}
|
||||
|
||||
next(value: T) {
|
||||
this.destination?.next?.(value);
|
||||
}
|
||||
|
||||
error(err: any) {
|
||||
this.destination?.error?.(err);
|
||||
}
|
||||
|
||||
complete() {
|
||||
this.destination?.complete?.();
|
||||
}
|
||||
|
||||
/** @internal */
|
||||
protected _subscribe(subscriber: Subscriber<T>): Subscription {
|
||||
return this.source?.subscribe(subscriber) ?? EMPTY_SUBSCRIPTION;
|
||||
}
|
||||
}
|
Reference in New Issue
Block a user