javascript - RxJS bufferWithCount() not pausing for timeout -
i trying control inflow slow subscriber. tried below in nodejs
var xmlnodestream = rx.observable.from([1,2,3,4,5,6,7,8,9,10,11]); var commjson = xmlnodestream.bufferwithcount(2).publish(); var fastsubscriber = commjson.subscribe( function (x) { console.log('----------\nfastsub: onnext: %s', x); }, function (e) { console.log('fastsub: onerror: %s', e); }, function () { console.log('fastsub: oncompleted'); }); var slowsubscriber = commjson.subscribe(function (x) { settimeout(function () { console.log("============\nslowsub called: ", x); }, 5000); }); commjson.connect();
when run above code, expect slow subscriber pause 5 seconds everytime before next data-batch received.
but not happening. after initial 5 second delay, data flooded slowsubscriber
in batches of 2.
what right way control inflow slow subscibers can take time (and preferably fast ones can wait slow ones complete) ?
it isn't pausing because settimeout
not block execution schedules work done @ later time, i.e. after 2 seconds, more data comes in , gets scheduled 2 seconds + tiny delta now. result being fast , slow subscriber finish @ same time, results of slow subscriber won't visualized until 2 seconds later.
if slow subscriber in actual use case non-blocking have 2 options controlling flow of events, either need control flow source of messages, ever may be. or need use 1 of pressure operators controlled()
var xmlnodestream = rx.observable.from([1,2,3,4,5,6,7,8,9,10,11]); var controller = xmlnodestream.bufferwithcount(2).controlled(); var commjson = controller.publish().refcount(); var fastsubscriber = commjson.subscribe( function (x) { console.log('----------\nfastsub: onnext: %s', x); }, function (e) { console.log('fastsub: onerror: %s', e); }, function () { console.log('fastsub: oncompleted'); }); var slowsubscriber = commjson.subscribe(function (x) { settimeout(function () { console.log("============\nslowsub called: ", x); controller.request(1); }, 5000); }); commjson.request(1);
Comments
Post a Comment