Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
1.3k views
in Technique[技术] by (71.8m points)

c# - Limiting concurrent requests using Rx and SelectMany

I have a list of URLs of pages I want to download concurrently using HttpClient. The list of URLs can be large (100 or more!)

I have currently have this code:

var urls = new List<string>
            {
                @"http:\www.amazon.com",
                @"http:\www.bing.com",
                @"http:\www.facebook.com",
                @"http:\www.twitter.com",
                @"http:\www.google.com"
            };

var client = new HttpClient();

var contents = urls
    .ToObservable()
    .SelectMany(uri => client.GetStringAsync(new Uri(uri, UriKind.Absolute)));

contents.Subscribe(Console.WriteLine);

The problem: due to the usage of SelectMany, a big bunch of Tasks are created almost at the same time. It seems that if the list of URLs is big enough, a lot Tasks give timeouts (I'm getting "A Task was cancelled" exceptions).

So, I thought there should be a way, maybe using some kind of Scheduler, to limit the number of concurrent Tasks, not allowing more than 5 or 6 at a given time.

This way I could get concurrent downloads without launching too many tasks that may get stall, like they do right now.

How to do that so I don't saturate with lots of timed-out Tasks?

See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)

Remember SelectMany() is actually Select().Merge(). While SelectMany does not have a maxConcurrent paramter, Merge() does. So you can use that.

From your example, you can do this:

var urls = new List<string>
    {
        @"http:\www.amazon.com",
        @"http:\www.bing.com",
        @"http:\www.facebook.com",
        @"http:\www.twitter.com",
        @"http:\www.google.com"
    };

var client = new HttpClient();

var contents = urls
    .ToObservable()
    .Select(uri => Observable.FromAsync(() => client.GetStringAsync(uri)))
    .Merge(2); // 2 maximum concurrent requests!

contents.Subscribe(Console.WriteLine);

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...