Массовые асинхронные вызовы с ограничением на количество параллельных без семафора
1,00
р.
р.
Вполне простая задача - есть N упорядоченных запросов, которые надо выполнить асинхронно с ограничением на количество одновременно выполняемых, и потом достать их из них так же N упорядоченных ответов. В реальности это может быть работа с сетью или базой, когда запросов много, но не хочется устроить апокалипсис своему компу, сети или серверу. Поэтому вводится ограничение на количество одновременно активных асинхронных задач. Лобовое решение выглядит очевидно. public async Task RunSemaphoreAsync(IEnumerable items, Func> func, int degree) { using SemaphoreSlim semaphore = new(degree) return await Task.WhenAll(items.Select(async item => { await semaphore.WaitAsync() try { return await func(item) } finally { semaphore.Release() } })) } Пример использования: Пусть это будет рабочая задача, которая что-то принимает, что-то делает и что-то отдает назад. public async Task RunJobAsync(int n) { await Task.Yield() return n + 1 } И вот такой запуск для примера. IEnumerable numbers = Enumerable.Range(0, 100) int[] result = await RunSemaphoreAsync(numbers, RunJobAsync, Environment.ProcessorCount * 2) Console.WriteLine(string.Join(",", result)) Всё работает работает как часики, быстро и ожидаемо.
С тем что выше все отлично, но мне однажды, когда я в очередной раз читал про всякие реализации Producer/Consumer паттерна, пришла идея использовать воркеры вместо семафора. Собственно, почему нет. Получился вот такой метод. public async Task RunWorkersAsync(IEnumerable items, Func> func, int degree) { List> tasks = new() using (var source = items.GetEnumerator()) { Task[] jobs = new Task[degree] for (int i = 0 i < degree i++) { jobs[i] = ((Func)(async () => { while (true) { Task task lock (source) { if (source.MoveNext()) { task = func(source.Current) tasks.Add(task) } else break } await task } }))() } await Task.WhenAll(jobs) } return tasks.Select(t => t.Result).ToArray() } Работает точно так же красиво как и первый кандидат. Тогда что же лучше?
И решил я померять оверхед. Я не мастер писать бенчмарки, но кого когда это останавливало? :) class Program { static void Main(string[] args) { var result = BenchmarkRunner.Run() Console.ReadKey() } } [MemoryDiagnoser] public class MyBenchmarks { private readonly List numbers = Enumerable.Range(0, 2000).ToList() private readonly int degree = Environment.ProcessorCount * 2 [Benchmark] public Task SemaphoreTest() { return RunSemaphoreAsync(numbers, RunJobAsync, degree) } [Benchmark] public Task WorkersTest() { return RunWorkersAsync(numbers, RunJobAsync, degree) } public async Task RunJobAsync(int n) { await Task.Yield() return n + 1 } public async Task RunSemaphoreAsync(IEnumerable items, Func> func, int degree) { using SemaphoreSlim semaphore = new(degree) return await Task.WhenAll(items.Select(async item => { await semaphore.WaitAsync() try { return await func(item) } finally { semaphore.Release() } })) } public async Task RunWorkersAsync(IEnumerable items, Func> func, int degree) { List> tasks = new() using (var source = items.GetEnumerator()) { Task[] jobs = new Task[degree] for (int i = 0 i < degree i++) { jobs[i] = ((Func)(async () => { while (true) { Task task lock (source) { if (source.MoveNext()) { task = func(source.Current) tasks.Add(task) } else break } await task } }))() } await Task.WhenAll(jobs) } return tasks.Select(t => t.Result).ToArray() } } И тут получаю интересный результат BenchmarkDotNet=v0.13.0, OS=Windows 10.0.19043.1081 (21H1/May2021Update) Intel Core i7-4700HQ CPU 2.40GHz (Haswell), 1 CPU, 8 logical and 4 physical cores .NET SDK=5.0.301 [Host] : .NET 5.0.7 (5.0.721.25508), X64 RyuJIT DefaultJob : .NET 5.0.7 (5.0.721.25508), X64 RyuJIT
Method Mean Error StdDev Gen 0 Gen 1 Gen 2 Allocated
SemaphoreTest 1,780.2 us 4.23 us 3.95 us 140.6250 41.0156 - 519 KB WorkersTest 943.2 us 18.37 us 26.92 us 74.2188 19.5313 - 262 KB
Глаза отказываются в это верить. Почему метод с семафором просто вот так вот слился, а быть может кривой тест или кривая реализация? Рассудите пожалуйста. P.S. Я же не сразу побежал на SO, а первым делом поискал жалобы на медленный SemaphoreSlim...и не нашел, но нашел вот что: https://github.com/dotnet/runtime/pull/55262. Другими словами, светофорчик немного полечат в .NET 6.
Ответ Код для RunWorkersAsync и для RunSemaphoreAsync по разному распределяет работу между потоками. В RunWorkersAsync в первоначальном потоке отрабатывает только первая операция цикла while каждого worker-a, после этого перебор происходит в несколько потоков, т.к. все после вызова await отрабатывает на потоках из пула. Больше никакой завязки на какой-то общий поток в коде нет. В RunSemaphoreAsync же в первоначальном потоке для каждого элемента отрабатывает items.Select(async item => { semaphore.WaitAsync() // await и продолжение - на потоке из пула Соответственно, в этом варианте основной поток становится узким местом. Обработчики не могут разгребать работу быстрее, чем этот поток ее создает. Это можно решить принудительным yield прямо в начале цикла: public async Task RunSemaphoreAsync(IEnumerable items, Func> func, int degree) { using SemaphoreSlim semaphore = new(degree) return await Task.WhenAll(items.Select(async item => { await Task.Yield() await semaphore.WaitAsync() try { return await func(item) } finally { semaphore.Release() } })) } Результаты станут чуть лучше, но сам боттлнек в виде одного потока все равно не исчезнет.