class BugExample { public static void main(String... args) { Observable obs = create((Observer o) -> { o.onNext("one"); o.onNext("two"); o.onCompleted(); }); obs.subscribe(new Observer() { @Override public void onCompleted() { System.out.println("Completed"); } @Override public void onError(Throwable e) { System.out.println("Error"); } @Override public void onNext(String v) { System.out.println("Value: " + v); } }); Observable obs2 = create(new OnSubscribeFunc() { @Override public void onSubscribe(Observer o) { o.onNext("one"); o.onNext("two"); o.onCompleted(); } }); obs2.subscribe(new Observer() { @Override public void onCompleted() { System.out.println("Completed"); } @Override public void onError(Throwable e) { System.out.println("Error"); } @Override public void onNext(String v) { System.out.println("Value: " + v); } }); } public static class Observable { private final OnSubscribeFunc f; public Observable(OnSubscribeFunc f) { this.f = f; } public void subscribe(Observer o) { f.onSubscribe(o); } } public static Observable create(OnSubscribeFunc func) { return new Observable(func); } public static interface OnSubscribeFunc { public void onSubscribe(Observer t1); } public interface Observer { public void onCompleted(); public void onError(Throwable e); public void onNext(T args); } }