Thursday, 5 September 2013

Why does Dart StreamController.addStream not work expectedly when joining streams

Why does Dart StreamController.addStream not work expectedly when joining
streams

Given the following Dart code snippet:
Stream stream1 = new Stream.periodic(new Duration(seconds: 1), (n) => n)
.take(10)
.asBroadcastStream();
stream1.listen((n) => print("stream1 : $n"),
onError : (err) => print("stream1 : $err"),
onDone : () => print("stream1 : done"),
cancelOnError : false);
Stream stream2 = stream1.where((n) => n % 2 == 0).take(2);
stream2.listen((n) => print("stream2 : $n"),
onError : (err) => print("stream2 : $err"),
onDone : () => print("stream2 : done"),
cancelOnError : false);
Stream stream3 = stream1.where((n) => n % 2 != 0).take(2);
stream3.listen((n) => print("stream3 : $n"),
onError : (err) => print("stream3 : $err"),
onDone : () => print("stream3 : done"),
cancelOnError : false);
StreamController controller = new StreamController.broadcast();
controller.addStream(stream2)
.then((_) => controller.addStream(stream3));
controller.stream.listen((n) => print("composite stream : $n"),
onError : (err) => print("composite stream : $err"),
onDone : () => print("composite stream : done"),
cancelOnError : false);
I get the following output:
stream1 : 0
stream2 : 0
composite stream : 0
stream1 : 1
stream3 : 1
composite stream : 1
stream1 : 2
stream1 : 3
stream1 : 4
stream1 : 5
stream1 : 6
stream1 : 7
stream1 : 8
stream1 : 9
stream1 : done
stream2 : done
stream3 : done
There are a couple of things I don't understand from this output:
why is there only one entry each for stream2 and stream3 when there should
be 2 of each? does the composite stream I created with the
StreamController consume one of the events from stream2 and stream3? This
behaviour seems odd to me, am I missing something?
why does stream2 and stream3 only complete when stream1 complete? this is
not the natural behaviour I'd expect when both are bounded, and
contradicts with the behaviour of .take(10) on stream1. If I remove the
.take(10) on stream1 then stream2 and stream3 in fact never completes.
If I modify controller to also add the source stream1 (see snippet and
output below), then the stream2 and stream3 in fact completes at their
natural positions when their 2 elements are up, but I then also get an
exception because it tried to listen to one of the streams twice.
StreamController controller = new StreamController.broadcast();
controller.addStream(stream1)
.then((_) => controller.addStream(stream2))
.then((_) => controller.addStream(stream3));
controller.stream.listen((n) => print("composite stream : $n"),
onError : (err) => print("composite stream : $err"),
onDone : () => print("composite stream : done"),
cancelOnError : false);
stream1 : 0
stream2 : 0
composite stream : 0
stream1 : 1
stream3 : 1
composite stream : 1
stream1 : 2
stream2 : 2
stream2 : done
composite stream : 2
stream1 : 3
stream3 : 3
stream3 : done
composite stream : 3
stream1 : 4
composite stream : 4
stream1 : 5
composite stream : 5
stream1 : 6
composite stream : 6
stream1 : 7
composite stream : 7
stream1 : 8
composite stream : 8
stream1 : 9
stream1 : done
composite stream : 9
Uncaught Error: Bad state: Stream has already been listened to.
Stack Trace:
#0 _StreamController._subscribe
(dart:async/stream_controller.dart:151:7)
#1 _ControllerStream._createSubscription
(dart:async/stream_controller.dart:259:157)
#2 _StreamImpl.listen (dart:async/stream_impl.dart:260:58)
#3 _ForwardingStreamSubscription._ForwardingStreamSubscription
(dart:async/stream_pipe.dart:53:43)
#4 _ForwardingStream._createSubscription
(dart:async/stream_pipe.dart:35:16)
#5 _ForwardingStream.listen (dart:async/stream_pipe.dart:32:31)
#6 _AsBroadcastStream.listen (dart:async/stream_impl.dart:466:37)
#7 _ForwardingStreamSubscription._ForwardingStreamSubscription
(dart:async/stream_pipe.dart:53:43)
#8 _ForwardingStream._createSubscription
(dart:async/stream_pipe.dart:35:16)
#9 _ForwardingStream.listen (dart:async/stream_pipe.dart:32:31)
#10 _ForwardingStreamSubscription._ForwardingStreamSubscription
(dart:async/stream_pipe.dart:53:43)
#11 _ForwardingStream._createSubscription
(dart:async/stream_pipe.dart:35:16)
#12 _ForwardingStream.listen (dart:async/stream_pipe.dart:32:31)
#13 _AddStreamState._AddStreamState
(dart:async/stream_controller.dart:300:133)
#14 _BroadcastStreamController.addStream
(dart:async/broadcast_stream_controller.dart:140:27)
#15 main.<anonymous closure>
(file:///C:/SourceCode/personal/SteamTest/lib/streamtest.dart:38:38)
#16 _ThenFuture._zonedSendValue (dart:async/future_impl.dart:371:24)
#17 _TransformFuture._sendValue.<anonymous closure>
(dart:async/future_impl.dart:348:48)
#18 _ZoneBase._runInZone (dart:async/zone.dart:82:17)
#19 _ZoneBase._runUnguarded (dart:async/zone.dart:102:22)
#20 _ZoneBase.executeCallback (dart:async/zone.dart:58:23)
#21 _TransformFuture._sendValue (dart:async/future_impl.dart:348:26)
#22 _FutureImpl._setValueUnchecked (dart:async/future_impl.dart:184:26)
#23 _FutureImpl._asyncSetValue.<anonymous closure>
(dart:async/future_impl.dart:218:25)
#24 _asyncRunCallback (dart:async/event_loop.dart:9:15)
#25 _createTimer.<anonymous closure>
(dart:async-patch/timer_patch.dart:8:13)
#26 _Timer._createTimerHandler._handleTimeout (timer_impl.dart:95:21)
#27 _Timer._createTimerHandler.<anonymous closure>
(timer_impl.dart:111:23)
#28 _ReceivePortImpl._handleMessage
(dart:isolate-patch/isolate_patch.dart:81:92)
Unhandled exception:
Bad state: Stream has already been listened to.
#0 _DefaultZone.handleUncaughtError.<anonymous closure>
(dart:async/zone.dart:146:7)
#1 _asyncRunCallback (dart:async/event_loop.dart:9:15)
#2 _asyncRunCallback (dart:async/event_loop.dart:13:7)
#3 _createTimer.<anonymous closure>
(dart:async-patch/timer_patch.dart:8:13)
#4 _Timer._createTimerHandler._handleTimeout (timer_impl.dart:95:21)
#5 _Timer._createTimerHandler._handleTimeout (timer_impl.dart:103:7)
#6 _Timer._createTimerHandler._handleTimeout (timer_impl.dart:103:7)
#7 _Timer._createTimerHandler.<anonymous closure>
(timer_impl.dart:111:23)
#8 _ReceivePortImpl._handleMessage
(dart:isolate-patch/isolate_patch.dart:81:92)
Can someone help me make sense of what is going on here?
Thanks,

No comments:

Post a Comment