Что такое IAsyncEnumerable<T> и когда его следует использовать?
IAsyncEnumerable<T> — это интерфейс для асинхронных потоков: последовательность, элементы которой поступают со временем и каждый из которых может потребовать await. Вот что это на самом деле, как await foreach и yield приводят его в движение, и правило, когда выбирать его вместо Task<List<T>>.
IAsyncEnumerable<T> — это интерфейс для асинхронного потока: последовательность, которую вы извлекаете по одному элементу за раз, где производство каждого элемента может потребовать ожидания чего-либо (чтения из сети, строки базы данных, фрагмента файла). Это асинхронный собрат IEnumerable<T>. Вы производите его методом-итератором, который сочетает yield return и await, а потребляете с помощью await foreach. Обращайтесь к нему, когда у вас много элементов, поступающих со временем, и вы не хотите буферизовать их все в памяти, прежде чем обработать первый. Если вы производите лишь один результат, или вся коллекция уже в памяти, он вам не нужен. В этой публикации (актуальной для .NET 11, C# 14) объясняются механика, причина, по которой очевидные альтернативы не подходят, и правило выбора.
Пробел, который оставляют Task<T> и IEnumerable<T>
Выстройте четыре формы в ряд, и недостающая ячейка станет очевидной:
| одно значение | много значений | |
|---|---|---|
| синхронный | T | IEnumerable<T> |
| асинхронный | Task<T> | IAsyncEnumerable<T> |
Task<T> даёт вам одно значение, позже. IEnumerable<T> даёт много значений, но получение каждого из них синхронно: MoveNext() возвращает bool, а не нечто, что вы можете ожидать. Долгие годы нижняя правая ячейка не имела первоклассного типа, и его подделывали двумя плохими обходными путями.
Первый — Task<IEnumerable<T>> (или Task<List<T>>). Он ожидает один раз, а затем вручает вам всю коллекцию. Он работает, но сводит на нет смысл потоковой передачи: для вашего кода ничего не видно, пока не получено всё. Запрос, возвращающий пять миллионов строк, выделяет список на пять миллионов, прежде чем тело вашего цикла выполнится хоть раз.
Второй — IEnumerable<Task<T>>. Он хуже. Это синхронная последовательность задач, что означает, что итератор определяет весь объём работы заранее, и у вас нет естественного способа применить обратное давление или прекратить производство задач, когда потребитель теряет интерес. Вы также не можете выполнить await внутри MoveNext, который производит следующую задачу, поэтому любая задержка на элемент блокирует поток.
IAsyncEnumerable<T>, добавленный в C# 8 и .NET Core 3.0, заполняет ячейку как надо. Каждый шаг итерации сам по себе ожидаем, поэтому производитель может ожидать между элементами, а потребитель извлекает следующий элемент только тогда, когда готов к нему.
Как на самом деле выглядит интерфейс
Здесь нет никакой магии. Контракт невелик:
// System.Collections.Generic
public interface IAsyncEnumerable<out T>
{
IAsyncEnumerator<T> GetAsyncEnumerator(
CancellationToken cancellationToken = default);
}
public interface IAsyncEnumerator<out T> : IAsyncDisposable
{
T Current { get; }
ValueTask<bool> MoveNextAsync();
ValueTask DisposeAsync();
}
Две детали несут на себе весь дизайн.
MoveNextAsync возвращает ValueTask<bool>, а не Task<bool>. Этот выбор сделан намеренно. Вы вызываете MoveNextAsync один раз на элемент, поэтому поток из 100 000 элементов означает 100 000 вызовов. Если бы каждый из них выделял объект Task в куче, асинхронные потоки были бы катастрофой выделений. ValueTask<bool> ничего не выделяет, когда результат уже доступен синхронно (буферизованная строка, например), что является обычным случаем для быстрого производителя. Вы платите за кучу только тогда, когда элементу действительно приходится ждать.
IAsyncEnumerator<T> реализует IAsyncDisposable, а не IDisposable. Очистка асинхронна, потому что закрытие нижележащего ресурса (сокета, DbDataReader) само может потребовать ввода-вывода. Вот почему потребляющий цикл нуждается в await foreach, а не в простом foreach: освобождение в конце итерации должно быть ожидаемым.
Вы почти никогда не вызываете эти члены вручную. Компилятор делает это за вас на обоих концах.
Производство потока: yield return встречает await
Асинхронный метод-итератор — это тот, который возвращает IAsyncEnumerable<T> и содержит как await, так и yield return. Компилятор переписывает его в конечный автомат, который знает, как приостанавливаться на каждом await и возобновляться на следующем MoveNextAsync:
// .NET 11, C# 14
public static async IAsyncEnumerable<string> ReadLinesAsync(
string path,
[EnumeratorCancellation] CancellationToken ct = default)
{
using var reader = new StreamReader(path);
while (await reader.ReadLineAsync(ct) is { } line)
{
yield return line;
}
}
Прочтите, что это вам даёт. Каждая строка читается асинхронно, а затем сразу же выдаётся. Вызывающий может обрабатывать первую строку, пока вторая ещё читается с диска. Память никогда не удерживает больше одной строки плюс внутренний буфер ридера, независимо от того, в файле 10 строк или 10 гигабайт. using на ридере соблюдается через сгенерированный DisposeAsync, поэтому дескриптор файла закрывается, когда итерация заканчивается, в том числе когда потребитель выходит досрочно или исключение разматывает цикл.
Атрибут [EnumeratorCancellation] на параметре токена — та часть, которую забывают. Он сообщает компилятору, что этот параметр должен получить токен, который потребитель передаёт через WithCancellation, направляя внешнюю отмену в тело итератора. Без него параметр — это всего лишь обычный аргумент, по умолчанию принимающий CancellationToken.None и игнорирующий то, что предоставил потребитель. Подробнее об этом ниже, потому что это самая распространённая ошибка корректности с асинхронными потоками.
Потребление потока: await foreach
Сторона потребителя длиннее обычного цикла на одно ключевое слово:
// .NET 11, C# 14
await foreach (var line in ReadLinesAsync("huge.log", ct))
{
if (line.Contains("ERROR"))
await alertSink.WriteAsync(line, ct);
}
Компилятор разворачивает это в вызовы GetAsyncEnumerator, цикл из await MoveNextAsync(), читающий Current на каждом витке, и await DisposeAsync() в блоке finally. Цикл полностью последователен: элемент N+1 не запрашивается, пока ваше тело не завершит работу с элементом N. Эта последовательная, управляемая спросом форма — это возможность, а не ограничение. Именно она ограничивает память и даёт вам естественное обратное давление: медленный потребитель автоматически замедляет производителя, потому что следующий await производителя не возобновляется до следующего вызова MoveNextAsync.
Если порядок итерации не важен и вам нужна параллельность, await foreach — неподходящий инструмент. Используйте Parallel.ForEachAsync, который может потреблять IAsyncEnumerable<T> и выполнять тело для нескольких элементов одновременно с ограничением степени параллелизма. await foreach — для упорядоченной обработки, по одному за раз.
Отмена: пара WithCancellation плюс [EnumeratorCancellation]
Голый await foreach (var x in stream) не даёт вам места, куда передать токен, потому что в синтаксисе языка нет для этого слота. Две части, которые замыкают круг, — это WithCancellation у потребителя и [EnumeratorCancellation] у производителя:
// Producer: token parameter is tagged
public static async IAsyncEnumerable<int> ProduceAsync(
[EnumeratorCancellation] CancellationToken ct = default)
{
for (var i = 0; ; i++)
{
await Task.Delay(100, ct);
yield return i;
}
}
// Consumer: token is forwarded into GetAsyncEnumerator
await foreach (var n in ProduceAsync().WithCancellation(ct))
{
Console.WriteLine(n);
}
WithCancellation не оборачивает последовательность в ещё один итератор и не добавляет накладных расходов. Он лишь запоминает токен, чтобы, когда компилятор вызывает GetAsyncEnumerator(token), токен втекал внутрь, а [EnumeratorCancellation] направлял его в параметр производителя. Отмените токен, и ожидающий await Task.Delay бросит OperationCanceledException, которое распространяется наружу через ваш await foreach.
Пропуск токена — это то, как вы получаете зависшие фоновые задания и застрявшие запросы в продакшене: поток поверх сети или базы данных удерживает соединение на весь цикл, и без токена нет способа прервать его, когда вызывающий исчезает. Считайте WithCancellation(ct) обязательным для любого потока, опирающегося на ввод-вывод.
ConfigureAwait работает и на цикле
await foreach ожидает внутренне, поэтому он подхватывает захват контекста синхронизации так же, как обычный await. В коде библиотеки, который не должен возвращаться в захваченный контекст, примените ConfigureAwait(false) ко всему циклу с помощью ConfigureAwait:
await foreach (var item in stream.ConfigureAwait(false))
{
Process(item);
}
Это настраивает как await у MoveNextAsync, так и финальный await у DisposeAsync. В современном приложении ASP.NET Core нет контекста синхронизации для захвата, поэтому там это пустая операция, но это по-прежнему важно для кода библиотек, консольных хостов и всего, что может выполняться в контексте пользовательского интерфейса или устаревшем контексте. Компромиссы те же, что и везде в асинхронном коде, рассмотрены в важен ли ещё ConfigureAwait в .NET 11.
LINQ над асинхронными потоками теперь встроен
Давней шероховатостью было то, что у IAsyncEnumerable<T> не было LINQ. Чтобы написать stream.Where(...).Select(...), вы подключали поддерживаемый сообществом пакет NuGet System.Linq.Async. Начиная с .NET 10 это изменилось: среда выполнения поставляет System.Linq.AsyncEnumerable в составе BCL, поэтому стандартные операторы работают над любым IAsyncEnumerable<T> без ссылки на пакет, и .NET 11 наследует это.
// .NET 11: Where/Select/Take resolve from the BCL, no NuGet package
var firstTenErrors = ReadLinesAsync("huge.log", ct)
.Where(l => l.Contains("ERROR"))
.Take(10);
await foreach (var line in firstTenErrors.WithCancellation(ct))
Console.WriteLine(line);
Если вы переносите более старый проект, удалите явную ссылку на System.Linq.Async при переходе на .NET 10 или новее; её сохранение вызывает ошибки неоднозначной перегрузки против теперь встроенных методов. Изменение имён, которое стоит знать: старые операторы SelectAwait/WhereAwait, принимавшие асинхронные лямбды, исчезли, и вы вместо этого передаёте асинхронный делегат в обычные Select/Where. Код, нацеленный на несколько более старых сред выполнения, должен ссылаться на пакет System.Linq.AsyncEnumerable вместо System.Linq.Async.
Когда к нему обращаться
Используйте IAsyncEnumerable<T>, когда выполняются все три условия:
- Элементов много, или их число неизвестно или неограниченно.
- Производство каждого элемента включает асинхронный ввод-вывод (база данных, сеть, файл, очередь сообщений).
- Вы хотите начать обработку до того, как поступит последний элемент, или не можете позволить себе держать их все в памяти одновременно.
Конкретные подходящие случаи: потоковая выдача строк из базы данных для экспорта, как рассмотрено в использовании IAsyncEnumerable с EF Core 11; чтение API с постраничной разбивкой страница за страницей и выдача каждого элемента по мере поступления страниц; слежение за журналом или потоком сообщений, который никогда не заканчивается; перекачивание данных в Channel или PipeWriter. В ASP.NET Core возврат IAsyncEnumerable<T> из minimal API или действия контроллера передаёт JSON-массив клиенту элемент за элементом вместо буферизации всего ответа.
Когда не следует
Асинхронные потоки не бесплатны, и они не всегда подходящая форма:
- Данные уже в памяти. Итерируете
List<T>или массив? Используйтеforeach. Обёртывание коллекции в памяти в асинхронный поток добавляет накладные расходы конечного автомата и ничего не покупает, потому что ни один элемент на самом деле не ожидает. - Есть ровно один результат. Метод, возвращающий одну запись, должен возвращать
Task<T>. Поток из одного — это просто церемония. - Набор мал и ограничен, и вам нужен произвольный доступ,
Countили несколько проходов.Task<List<T>>(черезToListAsync) проще и позволяет индексировать, считать и повторно перечислять. Потоковая передача даёт вам последовательность с однонаправленным, однопроходным доступом; если вам нужно больше, материализуйте её. - Вам нужна настоящая параллельность по элементам. Один
await foreachпо своей природе последователен. Для веерного распараллеливания используйтеParallel.ForEachAsyncили собирайте задачи иTask.WhenAll.
Полезное эмпирическое правило: если вы ловите себя на том, что сразу вызываете ToListAsync() на потоке, вам нужен был не поток, а список. И если у вас возникает соблазн обернуть список в памяти как IAsyncEnumerable<T> лишь для того, чтобы удовлетворить сигнатуру метода, пересмотрите сигнатуру.
Замечание об освобождении и досрочном выходе
Поскольку перечислитель — это IAsyncDisposable, await foreach гарантирует, что DisposeAsync выполнится, когда цикл заканчивается по любой причине: нормальное завершение, break или исключение, прорывающееся сквозь тело. Именно это делает using внутри асинхронного итератора безопасным. Тонкое следствие в том, что досрочный выход не обязательно мгновенно останавливает нижележащий источник. База данных могла уже выгрузить строки на стороне сервера; буферизованный сетевой ридер мог предвыбрать следующий фрагмент. Освобождение посылает сигнал отмены, но небольшая часть уже выполняемой работы всё же может завершиться. Это почти никогда не проблема, но это объясняет случайный момент «почему этот запрос всё ещё выполняется после того, как мой цикл вышел?» в профилировщике.
Асинхронные потоки превратили неудобную нижнюю правую ячейку матрицы значение/коллекция в первоклассную возможность языка. Ментальная модель — это вся игра: это IEnumerable<T>, где каждый шаг может выполнить await, приводимый в движение await foreach, и его стоит использовать именно тогда, когда элементы поступают со временем и вы предпочли бы обрабатывать их по мере поступления, а не ждать их всех.
Связанное
- How to use IAsyncEnumerable
with EF Core 11 применяет всё это к потоковой выдаче строк базы данных. - IEnumerable vs IAsyncEnumerable vs IQueryable in C# — это сравнительное руководство по выбору для трёх интерфейсов последовательностей.
- How to stream a file from an ASP.NET Core endpoint without buffering — это HTTP-ответный аналог производства потока.
- How to cancel a long-running Task in C# without deadlocking глубже разбирает токены отмены, от которых зависят асинхронные потоки.
- Streaming tasks with .NET 9 Task.WhenEach — это другой основной способ потреблять результаты по мере их завершения.
Comments
Sign in with GitHub to comment. Reactions and replies thread back to the comments repo.