在阅读本文前,建议先阅读本系列的上一篇文章『理解 yield 关键字』。我们知道,使用 C# 的 yield
关键字可以实现一个迭代器(Iterator),使用 async/await
关键字可以实现一个异步方法。异步流(Asynchronous Stream)就是这两种功能的结合体,它实现了以异步的方式生成和消费一组数据系列的迭代器。
异步流的支持主要建立在 C# 8 引入的两个接口上:
public interface IAsyncEnumerable<out T>
{
IAsyncEnumerator<T> GetAsyncEnumerator (...);
}
public interface IAsyncEnumerator<out T>: IAsyncDisposable
{
T Current { get; }
ValueTask<bool> MoveNextAsync();
}
所以理解了上一篇我们讲的 yield
关键字,就很容易理解异步流,它只是在原来的基础上支持通过 yield return
返回异步得到的一系列结果值而已。从序列中获取每个元素的行为(MoveNextAsync
)是一个异步操作,元素是以零散的方式到达,这就形成了所谓的“异步流”(比如视频流中的数据)。
这里 IAsyncEnumerator
接口的 ValueTask<T>
是 Task<T>
类型轻量化的封装,它是结构类型(值类型)。使用方式与 Task<T>
相似,但它在同步完成任务或返回立即可用的结果时(这在列举序列时会经常发生),可以避免不必要的内存开销,比 Task<T>
更高效。
在上一篇文章中的 Fibonacci
方法中,Thread.Sleep(1000)
用来模拟一个耗时操作,它是“同步”的:
IEnumerable<int> Fibonacci(int count)
{
int prev = 1;
int curr = 1;
for (int i = 0; i < count; i++)
{
yield return prev;
Thread.Sleep(1000);
int temp = prev + curr;
prev = curr;
curr = temp;
}
}
为了提高程序执行效率,我们很有可能需要把 Thread.Sleep(1000)
改成“异步”的。如果使它生成异步的数据流,该怎么做呢?这就需要同时用到迭代器和异步方法了,也就是说方法中要同时使用 yield return
和 async/await
关键字。要支持这一特性,就要使用 IAsyncEnumerable<T>
作为方法的返回类型。于是,前文的 Fibonacci
方法可以这样改造:
async IAsyncEnumerable<int> FibonacciAsync(int count)
{
int prev = 1;
int curr = 1;
Random r = new();
for (int i = 0; i < count; i++)
{
yield return prev;
await Task.Delay(1000);
int temp = prev + curr;
prev = curr;
curr = temp;
}
}
不同的是,这个方法允许调用者以异步的方式消费它生成的数字系列,换句话说就是使用 await foreach
来遍历消费这个方法的返回结果(IAsyncEnumerable<int>
),如下所示:
await foreach (var n in FibonacciAsync(10))
Console.Write("{0} ", n);
但,如果要在 LINQ 查询语句中消费异步流,需要先引入 System.Linq.Async
NuGet 包,除此之外,使用方式没有差别:
IAsyncEnumerable<int> query =
from i in FibonacciAsync(10)
where i % 2 == 0
select i * 10;
await foreach (var number in query)
Console.WriteLine(number);
另外,在 ASP.NET Core 的 Action 方法中也支持返回异步流,如下面示例:
[HttpGet]
public async IAsyncEnumerable<string> Get()
{
using var dbContext = new BookContext();
await foreach (var title in dbContext.Books
.Select(b => b.Title)
.AsAsyncEnumerable())
yield return title;
}
综上,可以看到,异步流解决了零散数据系列的异步生成和消费问题。要知道,在 C# 还没有异步流时,一组数据系列(IEnumerable<T>
)只能以整体异步的方式(Task<IEnumerable<T>>
)返回给调用者。
参考:
《C# 9.0 in a nutshell》
Generate and consume async streams