java-tutorials/rxjava-observables/src/main/java/com/baeldung/rxjava/MultipleSubscribersColdObs....

60 lines
1.7 KiB
Java

package com.baeldung.rxjava;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
import rx.Subscription;
import rx.observables.ConnectableObservable;
import rx.subscriptions.Subscriptions;
public class MultipleSubscribersColdObs {
private static final Logger LOGGER = LoggerFactory.getLogger(MultipleSubscribersColdObs.class);
public static void main(String[] args) throws InterruptedException {
defaultBehaviour();
// subscribeBeforeConnect();
}
private static void defaultBehaviour() {
Observable obs = getObservable();
LOGGER.info("Subscribing");
Subscription s1 = obs.subscribe(i -> LOGGER.info("subscriber#1 is printing " + i));
Subscription s2 = obs.subscribe(i -> LOGGER.info("subscriber#2 is printing " + i));
s1.unsubscribe();
s2.unsubscribe();
}
private static void subscribeBeforeConnect() throws InterruptedException {
ConnectableObservable obs = getObservable().publish();
LOGGER.info("Subscribing");
obs.subscribe(i -> LOGGER.info("subscriber #1 is printing " + i));
obs.subscribe(i -> LOGGER.info("subscriber #2 is printing " + i));
Thread.sleep(1000);
LOGGER.info("Connecting");
Subscription s = obs.connect();
s.unsubscribe();
}
private static Observable getObservable() {
return Observable.create(subscriber -> {
subscriber.onNext(gettingValue(1));
subscriber.onNext(gettingValue(2));
subscriber.add(Subscriptions.create(() -> {
LOGGER.info("Clear resources");
}));
});
}
private static Integer gettingValue(int i) {
LOGGER.info("Getting " + i);
return i;
}
}