Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
976 views
in Technique[技术] by (71.8m points)

c# - How to return AggregateException from async method

I got an async method working like an enhanced Task.WhenAll. It takes a bunch of tasks and returns when all are completed.

public async Task MyWhenAll(Task[] tasks) {
    ...
    await Something();
    ...

    // all tasks are completed
    if (someTasksFailed)
        throw ??
}

My question is how do I get the method to return a Task looking like the one returned from Task.WhenAll when one or more tasks has failed?

If I collect the exceptions and throw an AggregateException it will be wrapped in another AggregateException.

Edit: Full Example

async Task Main() {
    try {
        Task.WhenAll(Throw(1), Throw(2)).Wait();
    }
    catch (Exception ex) {
        ex.Dump();
    }

    try {
        MyWhenAll(Throw(1), Throw(2)).Wait();
    }
    catch (Exception ex) {
        ex.Dump();
    }
}

public async Task MyWhenAll(Task t1, Task t2) {
    await Task.Delay(TimeSpan.FromMilliseconds(100));
    try {
        await Task.WhenAll(t1, t2);
    }
    catch {
        throw new AggregateException(new[] { t1.Exception, t2.Exception });
    }
}
public async Task Throw(int id) {
    await Task.Delay(TimeSpan.FromMilliseconds(100));
    throw new InvalidOperationException("Inner" + id);
}

For Task.WhenAll the exception is AggregateException with 2 inner exceptions.

For MyWhenAll the exception is AggregateException with one inner AggregateException with 2 inner exceptions.

Edit: Why I am doing this

I often need to call paging API:s and want to limit number of simultaneous connections.

The actual method signatures are

public static async Task<TResult[]> AsParallelAsync<TResult>(this IEnumerable<Task<TResult>> source, int maxParallel)
public static async Task<TResult[]> AsParallelUntilAsync<TResult>(this IEnumerable<Task<TResult>> source, int maxParallel, Func<Task<TResult>, bool> predicate)

It means I can do paging like this

var pagedRecords = await Enumerable.Range(1, int.MaxValue)
                                   .Select(x => GetRecordsAsync(pageSize: 1000, pageNumber: x)
                                   .AsParallelUntilAsync(maxParallel: 5, x => x.Result.Count < 1000);
var records = pagedRecords.SelectMany(x => x).ToList();

It all works fine, the aggregate within aggregate is just a minor inconvenience.

See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)

async methods are designed to only every set at most a single exception on the returned task, not multiple.

This leaves you with two options, you can either not use an async method to start with, instead relying on other means of performing your method:

public Task MyWhenAll(Task t1, Task t2)
{
    return Task.Delay(TimeSpan.FromMilliseconds(100))
        .ContinueWith(_ => Task.WhenAll(t1, t2))
        .Unwrap();
}

If you have a more complex method that would be harder to write without using await, then you'll need to unwrap the nested aggregate exceptions, which is tedious, although not overly complex, to do:

    public static Task UnwrapAggregateException(this Task taskToUnwrap)
    {
        var tcs = new TaskCompletionSource<bool>();

        taskToUnwrap.ContinueWith(task =>
        {
            if (task.IsCanceled)
                tcs.SetCanceled();
            else if (task.IsFaulted)
            {
                if (task.Exception is AggregateException aggregateException)
                    tcs.SetException(Flatten(aggregateException));
                else
                    tcs.SetException(task.Exception);
            }
            else //successful
                tcs.SetResult(true);
        });

        IEnumerable<Exception> Flatten(AggregateException exception)
        {
            var stack = new Stack<AggregateException>();
            stack.Push(exception);
            while (stack.Any())
            {
                var next = stack.Pop();
                foreach (Exception inner in next.InnerExceptions)
                {
                    if (inner is AggregateException innerAggregate)
                        stack.Push(innerAggregate);
                    else
                        yield return inner;
                }
            }
        }

        return tcs.Task;
    }

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...