Skip to main content
 首页 » 编程设计

c#之强制刷新计数类型 Observable.Buffer c#

2024年06月20日12sharpest

基于这个讨论刷新基于时间的缓冲区的问题: Force flush to Observable.Buffer c# ,我很难弄清楚如何翻译 this那里给出了我按计数而不是按时间缓冲的情况的答案:

var subject = new Subject<Unit>(); 
var closing = Observable 
    .Timer(new TimeSpan(0, 0, 1, 30)) 
    .Select(x => Unit.Default); 
 
var query = 
    mFluxObservable 
        .Buffer(() => Observable 
            .Amb(subject, closing) 
            .Take(1)); 

我开始使用相同的 Amb 逻辑,使用“项目计数器”而不是计时器,但发现自己陷入了试图弄清楚如何重置 that.

你能轻轻地插入我找出如何实现我缺少的功能的方向吗?

var flusher = new Subject<Unit>(); 
var source = Observable.Interval(TimeSpan.FromSeconds(0.1)); 
var output = source.BufferExceptOnFlush(100, flusher); 

我的资源很“热门”,如果有帮助...

PS:我可以使用 Observable.Create 和某种内部计数器来解决问题,但不是没有锁定...

请您参考如下方法:

我认为您可以通过在关闭可观察对象中使用源并将其与刷新可观察对象合并来实现。以下对我有用:

 var source = new Subject<Unit>(); 
 var flush = new Subject<Unit>(); 
 
 // close buffer every 3 values or when a flush value arrives 
 var closing = source.Buffer(3)  
            .Select(x => Unit.Default) 
            .Merge(flush); 
 
 var query = source.Buffer(() => closing) 
         .Subscribe(Console.WriteLine); 
 
// some test values 
source.OnNext(Unit.Default); 
source.OnNext(Unit.Default); 
source.OnNext(Unit.Default); 
source.OnNext(Unit.Default); 
 
// flush buffer 
flush.OnNext(Unit.Default);