基于这个讨论刷新基于时间的缓冲区的问题: Force flush to Observable.Buffer c# ,我很难弄清楚如何翻译 this那里给出了我按计数而不是按时间缓冲的情况的答案:
var subject = new Subject<Unit>();
var closing = Observable
.Timer(new TimeSpan(0, 0, 1, 30))
.Select(x => Unit.Default);
var query =
mFluxObservable
.Buffer(() => Observable
.Amb(subject, closing)
.Take(1));
我开始使用相同的 Amb
逻辑,使用“项目计数器”而不是计时器,但发现自己陷入了试图弄清楚如何重置 that.
你能轻轻地插入我找出如何实现我缺少的功能的方向吗?
var flusher = new Subject<Unit>();
var source = Observable.Interval(TimeSpan.FromSeconds(0.1));
var output = source.BufferExceptOnFlush(100, flusher);
我的资源很“热门”,如果有帮助...
PS:我可以使用 Observable.Create
和某种内部计数器来解决问题,但不是没有锁定...
请您参考如下方法:
我认为您可以通过在关闭可观察对象中使用源并将其与刷新可观察对象合并来实现。以下对我有用:
var source = new Subject<Unit>();
var flush = new Subject<Unit>();
// close buffer every 3 values or when a flush value arrives
var closing = source.Buffer(3)
.Select(x => Unit.Default)
.Merge(flush);
var query = source.Buffer(() => closing)
.Subscribe(Console.WriteLine);
// some test values
source.OnNext(Unit.Default);
source.OnNext(Unit.Default);
source.OnNext(Unit.Default);
source.OnNext(Unit.Default);
// flush buffer
flush.OnNext(Unit.Default);