/** * Creates an Observable with a Function to execute when it is subscribed to. * <p> * <em>Note:</em> Use {@link #create(OnSubscribe)} to create an Observable, instead of this constructor, * unless you specifically have a need for inheritance. * * @param f * {@link OnSubscribe} to be executed when {@link #subscribe(Subscriber)} is called */protectedObservable(OnSubscribe<T>f){this.onSubscribe=f;}
privatestatic<T>Subscriptionsubscribe(Subscriber<?superT>subscriber,Observable<T>observable){// validate and proceedif(subscriber==null){thrownewIllegalArgumentException("observer can not be null");}if(observable.onSubscribe==null){thrownewIllegalStateException("onSubscribe function can not be null.");/* * the subscribe function can also be overridden but generally that's not the appropriate approach * so I won't mention that in the exception */}// new Subscriber so onStart itsubscriber.onStart();/* * See https://github.com/ReactiveX/RxJava/issues/216 for discussion on "Guideline 6.4: Protect calls * to user code from within an Observer" */// if not already wrappedif(!(subscriberinstanceofSafeSubscriber)){// assign to `observer` so we return the protected versionsubscriber=newSafeSubscriber<T>(subscriber);}// The code below is exactly the same an unsafeSubscribe but not used because it would // add a significant depth to already huge call stacks.try{// allow the hook to intercept and/or decoratehook.onSubscribeStart(observable,observable.onSubscribe).call(subscriber);returnhook.onSubscribeReturn(subscriber);}catch(Throwablee){// special handling for certain Throwable/Error/Exception typesExceptions.throwIfFatal(e);// if an unhandled error occurs executing the onSubscribe we will propagate ittry{subscriber.onError(hook.onSubscribeError(e));}catch(Throwablee2){Exceptions.throwIfFatal(e2);// if this happens it means the onError itself failed (perhaps an invalid function implementation)// so we are unable to propagate the error correctly and will just throwRuntimeExceptionr=newRuntimeException("Error occurred attempting to subscribe ["+e.getMessage()+"] and then again while trying to pass to onError.",e2);// TODO could the hook be the cause of the error in the on error handling.hook.onSubscribeError(r);// TODO why aren't we throwing the hook's return value.throwr;}returnSubscriptions.unsubscribed();}}