.NET has introduced a library called System.Threading.Tasks.Dataflow
which is now included with .NET Core. Library contains BufferBlock<T>
which is great data structure to implement an extremely simple Producer-Consumer pattern.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 |
private BufferBlock<SomeType> _locationsToUpdate = BufferBlock<SomeType>(new DataflowBlockOptions { BoundedCapacity = 12000 // define the maximum number of elements that can be held in memory at any given point. }); public async Task StartProducerConsumer(CancellationToken token) { // Start a consumer task. var consumerTask = Task.Run(ConsumerAction, token); await foreach (var location in _somethingIAsyncEnumerable) { // if the bufferblock is full, it will wait (asynchronously) until Buffer is freed by the consumer. await _locationsToUpdate.SendAsync(location); // If you want to ignore updates when the buffer is full, use .Post method // _locationsToUpdate.Post(location) } // that indicates to the buffer no more items will be send (and it will break the while loop in the consumer code below) _locationsToUpdate.Complete(); // ensure you await the consumberTask to finish await consumerTask; } private async Task ConsumerAction() { // once .Complete() is called this loop will be broken, if nothing is available, than we wait asynchronously while (await _locationsToUpdate.OutputAvailableAsync()) { if (!_locationsToUpdate.TryReceiveAll(out var collection)) { _logger.LogDebug("There are no items to update."); return; } // do something with collection :D } } |