Start Debugging

Was ist IAsyncEnumerable<T> und wann sollten Sie es verwenden?

IAsyncEnumerable<T> ist die Schnittstelle für asynchrone Streams: eine Sequenz, deren Elemente über die Zeit eintreffen und bei der jedes Element ein await erfordern kann. Hier erfahren Sie, was es wirklich ist, wie await foreach und yield es antreiben, und die Regel, wann Sie es statt Task<List<T>> wählen sollten.

IAsyncEnumerable<T> ist die Schnittstelle für einen asynchronen Stream: eine Sequenz, die Sie ein Element nach dem anderen abrufen, wobei das Erzeugen jedes Elements ein Warten auf etwas erfordern kann (ein Netzwerklesevorgang, eine Datenbankzeile, ein Dateifragment). Es ist das asynchrone Gegenstück zu IEnumerable<T>. Sie erzeugen es mit einer Iteratormethode, die yield return und await kombiniert, und konsumieren es mit await foreach. Greifen Sie dazu, wenn Sie viele Elemente haben, die über die Zeit eintreffen, und Sie nicht alle im Speicher puffern möchten, bevor Sie das erste verarbeiten. Wenn Sie immer nur ein einzelnes Ergebnis erzeugen oder die gesamte Sammlung bereits im Speicher liegt, brauchen Sie es nicht. Dieser Beitrag (aktuell für .NET 11, C# 14) erklärt die Mechanik, den Grund, warum die naheliegenden Alternativen scheitern, und die Entscheidungsregel.

Die Lücke, die Task<T> und IEnumerable<T> offen lassen

Stellen Sie die vier Formen nebeneinander, und die fehlende Zelle wird offensichtlich:

einzelner Wertviele Werte
synchronTIEnumerable<T>
asynchronTask<T>IAsyncEnumerable<T>

Task<T> liefert Ihnen einen Wert, später. IEnumerable<T> liefert viele Werte, aber das Abrufen jedes einzelnen ist synchron: MoveNext() gibt einen bool zurück, nicht etwas, worauf Sie warten können. Jahrelang hatte die untere rechte Zelle keinen erstklassigen Typ, und man behalf sich mit zwei schlechten Notlösungen.

Die erste ist Task<IEnumerable<T>> (oder Task<List<T>>). Diese wartet einmal und übergibt Ihnen dann die gesamte Sammlung. Sie funktioniert, aber sie vereitelt den Zweck des Streamings: nichts ist für Ihren Code sichtbar, bis alles abgerufen wurde. Eine Abfrage, die fünf Millionen Zeilen zurückgibt, allokiert eine Liste mit fünf Millionen, bevor Ihr Schleifenrumpf ein einziges Mal läuft.

Die zweite ist IEnumerable<Task<T>>. Diese ist schlimmer. Sie ist eine synchrone Sequenz von Tasks, was bedeutet, dass der Iterator die gesamte Arbeitsmenge im Voraus festlegt, und Sie haben keine natürliche Möglichkeit, Gegendruck anzuwenden oder die Erzeugung von Tasks zu stoppen, sobald ein Konsument das Interesse verliert. Sie können auch kein await innerhalb des MoveNext ausführen, das den nächsten Task erzeugt, also blockiert jede Latenz pro Element die Thread.

IAsyncEnumerable<T>, hinzugefügt in C# 8 und .NET Core 3.0, füllt die Zelle korrekt. Jeder Schritt der Iteration ist selbst awaitbar, also kann der Erzeuger zwischen Elementen warten, und der Konsument ruft das nächste Element nur ab, wenn er dafür bereit ist.

Wie die Schnittstelle tatsächlich aussieht

Hier gibt es keine Magie. Der Vertrag ist klein:

// System.Collections.Generic
public interface IAsyncEnumerable<out T>
{
    IAsyncEnumerator<T> GetAsyncEnumerator(
        CancellationToken cancellationToken = default);
}

public interface IAsyncEnumerator<out T> : IAsyncDisposable
{
    T Current { get; }
    ValueTask<bool> MoveNextAsync();
    ValueTask DisposeAsync();
}

Zwei Details tragen das gesamte Design.

MoveNextAsync gibt ValueTask<bool> statt Task<bool> zurück. Diese Wahl ist bewusst getroffen. Sie rufen MoveNextAsync einmal pro Element auf, also bedeutet ein Stream mit 100.000 Elementen 100.000 Aufrufe. Würde jeder ein Task-Objekt auf dem Heap allokieren, wären asynchrone Streams eine Allokationskatastrophe. ValueTask<bool> allokiert nichts, wenn das Ergebnis bereits synchron verfügbar ist (eine gepufferte Zeile zum Beispiel), was der häufige Fall bei einem schnellen Erzeuger ist. Sie zahlen die Heap-Kosten nur, wenn ein Element wirklich warten muss.

IAsyncEnumerator<T> implementiert IAsyncDisposable, nicht IDisposable. Das Aufräumen ist asynchron, weil das Schließen der zugrunde liegenden Ressource (ein Socket, ein DbDataReader) selbst E/A erfordern kann. Deshalb braucht die konsumierende Schleife await foreach und kein einfaches foreach: das Freigeben am Ende der Iteration muss abgewartet werden.

Sie rufen diese Member fast nie von Hand auf. Der Compiler erledigt das für Sie an beiden Enden.

Einen Stream erzeugen: yield return trifft await

Eine asynchrone Iteratormethode ist eine, die IAsyncEnumerable<T> zurückgibt und sowohl await als auch yield return enthält. Der Compiler schreibt sie in einen Zustandsautomaten um, der weiß, wie er bei jedem await aussetzt und beim nächsten MoveNextAsync fortsetzt:

// .NET 11, C# 14
public static async IAsyncEnumerable<string> ReadLinesAsync(
    string path,
    [EnumeratorCancellation] CancellationToken ct = default)
{
    using var reader = new StreamReader(path);
    while (await reader.ReadLineAsync(ct) is { } line)
    {
        yield return line;
    }
}

Lesen Sie, was Ihnen das gibt. Jede Zeile wird asynchron gelesen und dann sofort ausgegeben. Der Aufrufer kann Zeile eins verarbeiten, während Zeile zwei noch von der Festplatte gelesen wird. Der Speicher hält nie mehr als eine einzelne Zeile plus den internen Puffer des Readers, unabhängig davon, ob die Datei 10 Zeilen oder 10 Gigabyte hat. Das using auf dem Reader wird durch das generierte DisposeAsync eingehalten, also schließt sich der Datei-Handle, wenn die Iteration endet, auch wenn der Konsument vorzeitig aussteigt oder eine Ausnahme die Schleife abwickelt.

Das Attribut [EnumeratorCancellation] auf dem Token-Parameter ist der Teil, den man vergisst. Es teilt dem Compiler mit, dass dieser Parameter das Token erhalten soll, das ein Konsument über WithCancellation übergibt, und leitet externe Abbrüche in den Iteratorrumpf weiter. Ohne es ist der Parameter nur ein gewöhnliches Argument, das standardmäßig CancellationToken.None annimmt und ignoriert, was auch immer der Konsument geliefert hat. Mehr dazu unten, denn es ist der häufigste Korrektheitsfehler bei asynchronen Streams.

Einen Stream konsumieren: await foreach

Die Konsumentenseite ist ein Schlüsselwort länger als eine normale Schleife:

// .NET 11, C# 14
await foreach (var line in ReadLinesAsync("huge.log", ct))
{
    if (line.Contains("ERROR"))
        await alertSink.WriteAsync(line, ct);
}

Der Compiler erweitert dies zu Aufrufen von GetAsyncEnumerator, einer Schleife aus await MoveNextAsync(), die jede Runde Current liest, und einem await DisposeAsync() in einem finally-Block. Die Schleife ist vollständig sequenziell: Element N+1 wird erst angefordert, wenn Ihr Rumpf mit Element N fertig ist. Diese sequenzielle, bedarfsgesteuerte Form ist die Funktion, keine Einschränkung. Sie ist es, die den Speicher begrenzt und Ihnen natürlichen Gegendruck gibt: ein langsamer Konsument verlangsamt automatisch den Erzeuger, weil das nächste await des Erzeugers erst beim nächsten Aufruf von MoveNextAsync fortgesetzt wird.

Wenn die Iterationsreihenfolge keine Rolle spielt und Sie Nebenläufigkeit wollen, ist await foreach das falsche Werkzeug. Verwenden Sie Parallel.ForEachAsync, das ein IAsyncEnumerable<T> konsumieren und den Rumpf für mehrere Elemente gleichzeitig mit einer Obergrenze für den Parallelitätsgrad ausführen kann. await foreach ist für geordnete Verarbeitung, eines nach dem anderen.

Abbruch: das Paar WithCancellation plus [EnumeratorCancellation]

Ein nacktes await foreach (var x in stream) gibt Ihnen keinen Ort, um ein Token zu übergeben, weil die Sprachsyntax keinen Platz dafür hat. Die zwei Teile, die den Kreis schließen, sind WithCancellation beim Konsumenten und [EnumeratorCancellation] beim Erzeuger:

// Producer: token parameter is tagged
public static async IAsyncEnumerable<int> ProduceAsync(
    [EnumeratorCancellation] CancellationToken ct = default)
{
    for (var i = 0; ; i++)
    {
        await Task.Delay(100, ct);
        yield return i;
    }
}

// Consumer: token is forwarded into GetAsyncEnumerator
await foreach (var n in ProduceAsync().WithCancellation(ct))
{
    Console.WriteLine(n);
}

WithCancellation umhüllt die Sequenz nicht mit einem weiteren Iterator und fügt keinen Mehraufwand hinzu. Es vermerkt lediglich das Token, sodass, wenn der Compiler GetAsyncEnumerator(token) aufruft, das Token hineinfließt, und [EnumeratorCancellation] es zum Parameter des Erzeugers leitet. Brechen Sie das Token ab, und das ausstehende await Task.Delay wirft OperationCanceledException, die sich durch Ihr await foreach nach außen fortpflanzt.

Das Token wegzulassen ist der Weg zu hängenden Hintergrund-Jobs und steckengebliebenen Anfragen in der Produktion: ein Stream über ein Netzwerk oder eine Datenbank hält eine Verbindung für die gesamte Schleife, und ohne ein Token gibt es keine Möglichkeit, sie abzubrechen, wenn der Aufrufer verschwindet. Behandeln Sie WithCancellation(ct) als verpflichtend bei jedem von E/A gestützten Stream.

ConfigureAwait funktioniert auch auf der Schleife

await foreach wartet intern, also greift es das Einfangen des Synchronisationskontexts genauso auf wie ein normales await. In Bibliothekscode, der nicht zu einem eingefangenen Kontext zurückkehren sollte, wenden Sie ConfigureAwait(false) auf die gesamte Schleife mit ConfigureAwait an:

await foreach (var item in stream.ConfigureAwait(false))
{
    Process(item);
}

Dies konfiguriert sowohl die MoveNextAsync-Awaits als auch das abschließende DisposeAsync-Await. In einer modernen ASP.NET-Core-Anwendung gibt es keinen Synchronisationskontext zum Einfangen, also ist es dort ein No-Op, aber es ist weiterhin wichtig für Bibliothekscode, Konsolen-Hosts und alles, was unter einem UI- oder Legacy-Kontext laufen könnte. Die Abwägungen sind dieselben wie überall sonst im asynchronen Code, behandelt in ob ConfigureAwait in .NET 11 noch wichtig ist.

LINQ über asynchrone Streams ist jetzt integriert

Eine langjährige raue Kante war, dass IAsyncEnumerable<T> kein LINQ hatte. Um stream.Where(...).Select(...) zu schreiben, zog man das von der Community gepflegte NuGet-Paket System.Linq.Async heran. Mit .NET 10 änderte sich das: die Laufzeit liefert System.Linq.AsyncEnumerable in der BCL aus, also funktionieren die Standardoperatoren über jedem IAsyncEnumerable<T> ohne Paketverweis, und .NET 11 erbt dies.

// .NET 11: Where/Select/Take resolve from the BCL, no NuGet package
var firstTenErrors = ReadLinesAsync("huge.log", ct)
    .Where(l => l.Contains("ERROR"))
    .Take(10);

await foreach (var line in firstTenErrors.WithCancellation(ct))
    Console.WriteLine(line);

Wenn Sie ein älteres Projekt migrieren, entfernen Sie den expliziten Verweis auf System.Linq.Async, wenn Sie zu .NET 10 oder neuer wechseln; ihn drinzulassen verursacht Fehler durch mehrdeutige Überladungen gegen die nun integrierten Methoden. Eine Namensänderung, die man kennen sollte: die alten Operatoren SelectAwait/WhereAwait, die asynchrone Lambdas entgegennahmen, sind weg, und Sie übergeben das asynchrone Delegate stattdessen an das reguläre Select/Where. Code, der mehrere ältere Laufzeiten als Ziel hat, sollte das Paket System.Linq.AsyncEnumerable statt System.Linq.Async referenzieren.

Wann Sie dazu greifen sollten

Verwenden Sie IAsyncEnumerable<T>, wenn alle drei dieser Bedingungen zutreffen:

  1. Es gibt viele Elemente, oder eine unbekannte oder unbegrenzte Anzahl.
  2. Das Erzeugen jedes Elements beinhaltet asynchrone E/A (Datenbank, Netzwerk, Datei, Nachrichtenwarteschlange).
  3. Sie möchten mit der Verarbeitung beginnen, bevor das letzte Element eintrifft, oder Sie können es sich nicht leisten, sie alle auf einmal im Speicher zu halten.

Konkrete Fälle, die passen: Zeilen aus einer Datenbank für einen Export streamen, wie behandelt in IAsyncEnumerable mit EF Core 11 verwenden; eine paginierte API Seite für Seite lesen und jedes Element ausgeben, sobald die Seiten eintreffen; ein Log oder einen Nachrichtenstrom verfolgen, der nie endet; Daten in einen Channel oder einen PipeWriter leiten. In ASP.NET Core streamt das Zurückgeben von IAsyncEnumerable<T> aus einer Minimal API oder einer Controller-Aktion das JSON-Array Element für Element an den Client, statt die gesamte Antwort zu puffern.

Wann Sie es nicht sollten

Asynchrone Streams sind nicht kostenlos, und sie sind nicht immer die richtige Form:

Eine nützliche Faustregel: Wenn Sie sich dabei ertappen, ToListAsync() sofort auf dem Stream aufzurufen, wollten Sie keinen Stream, sondern die Liste. Und wenn Sie versucht sind, eine im Speicher liegende Liste als IAsyncEnumerable<T> zu hüllen, nur um eine Methodensignatur zu erfüllen, überdenken Sie die Signatur.

Eine Anmerkung zu Freigabe und vorzeitigem Ausstieg

Da der Enumerator IAsyncDisposable ist, garantiert das await foreach, dass DisposeAsync läuft, wenn die Schleife aus irgendeinem Grund endet: normale Beendigung, ein break, oder eine Ausnahme, die durch den Rumpf reißt. Das ist es, was das using innerhalb eines asynchronen Iterators sicher macht. Die subtile Konsequenz ist, dass vorzeitiges Aussteigen die zugrunde liegende Quelle nicht zwangsläufig sofort stoppt. Eine Datenbank könnte serverseitig bereits Zeilen aufgespult haben; ein gepufferter Netzwerk-Reader könnte das nächste Fragment vorab geholt haben. Die Freigabe sendet das Abbruchsignal, aber ein wenig bereits in Bearbeitung befindliche Arbeit kann sich trotzdem noch abschließen. Das ist fast nie ein Problem, aber es erklärt den gelegentlichen Moment “warum läuft diese Abfrage noch, nachdem meine Schleife ausgestiegen ist?” in einem Profiler.

Asynchrone Streams machten die unbequeme untere rechte Zelle der Wert-/Sammlungsmatrix zu einem erstklassigen Sprachfeature. Das mentale Modell ist das ganze Spiel: es ist IEnumerable<T>, bei dem jeder Schritt await ausführen kann, angetrieben von await foreach, und genau dann lohnend, wenn die Elemente über die Zeit eintreffen und Sie sie lieber verarbeiten möchten, sobald sie kommen, als auf sie alle zu warten.

Verwandt

Quellen

Comments

Sign in with GitHub to comment. Reactions and replies thread back to the comments repo.

< Zurück