private BufferBlock<SomeType> _locationsToUpdate = BufferBlock<SomeType>(new DataflowBlockOptions {
BoundedCapacity = 12000 // define the maximum number of elements that can be held in memory at any given point.
});
public async Task StartProducerConsumer(CancellationToken token) {
// Start a consumer task.
var consumerTask = Task.Run(ConsumerAction, token);
await foreach (var location in _somethingIAsyncEnumerable)
{
// if the bufferblock is full, it will wait (asynchronously) until Buffer is freed by the consumer.
await _locationsToUpdate.SendAsync(location);
// If you want to ignore updates when the buffer is full, use .Post method
// _locationsToUpdate.Post(location)
}
// that indicates to the buffer no more items will be send (and it will break the while loop in the consumer code below)
_locationsToUpdate.Complete();
// ensure you await the consumberTask to finish
await consumerTask;
}
private async Task ConsumerAction() {
// once .Complete() is called this loop will be broken, if nothing is available, than we wait asynchronously
while (await _locationsToUpdate.OutputAvailableAsync())
{
if (!_locationsToUpdate.TryReceiveAll(out var collection))
{
_logger.LogDebug("There are no items to update.");
return;
}
// do something with collection :D
}
}