Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
311 views
in Technique[技术] by (71.8m points)

c# - Can Interlocked CompareExchange be used correctly in this multithreaded round-robin implementation?

I need to round-robin some calls between N different connections because of some rate limits in a multithreaded context. I've decided to implement this functionality using a list and a "counter," which is supposed to "jump by one" between instances on each call.

I'll illustrate this concept with a minimal example (using a class called A to stand in for the connections)

class A
{
    public A()
    {
        var newIndex = Interlocked.Increment(ref index);
        ID = newIndex.ToString();
    }
    private static int index;
    public string ID;
}

static int crt = 0;
static List<A> Items = Enumerable.Range(1, 15).Select(i => new A()).ToList();
static int itemsCount = Items.Count;

static A GetInstance()
{            
    var newIndex = Interlocked.Increment(ref crt);
    var instance = Items[newIndex % itemsCount];
    //Console.WriteLine($"{DateTime.Now.Ticks}, {Guid.NewGuid()}, Got instance: {instance.ID}");
    return instance;
}

static void Test()
{
    var sw = Stopwatch.StartNew();

    var tasks = Enumerable.Range(1, 1000000).Select(i => Task.Run(GetInstance)).ToArray();
    Task.WaitAll(tasks);
}

This works as expected in that it ensures that calls are round-robin-ed between the connections. I will probably stick to this implementation in the "real" code (with a long instead of an int for the counter)

However, even if it is unlikely to reach int.MaxValue in my use case, I wondered if there is a way to "safely overflow" the counter.

I know that "%" in C# is "Remainder" rather than "Modulus," which would mean that some ?: gymnastics would be required to always return positives, which I want to avoid.

So what I wanted to cume up with is instead something like:

static A GetInstance()
{            
    var newIndex = Interlocked.Increment(ref crt);
    Interlocked.CompareExchange(ref crt, 0, itemsCount); //?? the return value is the original value, how to know if it succeeded
    var instance = Items[newIndex];
    //Console.WriteLine($"{DateTime.Now.Ticks}, {Guid.NewGuid()}, Got instance: {instance.ID}");
    return instance;
}

What I am expecting is that Interlocked.CompareExchange(ref crt, 0, itemsCount) would be "won" by only one thread, setting the counter back to 0 once it reaches the number of connections available. However, I don't know how to use this in this context.

Can CompareExchange or another mechanism in Interlocked be used here?

question from:https://stackoverflow.com/questions/66047164/can-interlocked-compareexchange-be-used-correctly-in-this-multithreaded-round-ro

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)

You could probably:

static int crt = -1;
static readonly IReadOnlyList<A> Items = Enumerable.Range(1, 15).Select(i => new A()).ToList();
static readonly int itemsCount = Items.Count;
static readonly int maxItemCount = itemsCount * 100;

static A GetInstance()
{
    int newIndex;

    while (true)
    {
        newIndex = Interlocked.Increment(ref crt);

        if (newIndex >= itemsCount)
        {
            while (newIndex >= itemsCount && Interlocked.CompareExchange(ref crt, -1, newIndex) != newIndex)
            {
                // There is an implicit memory barrier caused by the Interlockd.CompareExchange around the
                // next line
                // See for example https://afana.me/archive/2015/07/10/memory-barriers-in-dot-net.aspx/
                // A full memory barrier is the strongest and interesting one. At least all of the following generate a full memory barrier implicitly:
                // Interlocked class mehods
                newIndex = crt;
            }

            continue;
        }

        break;
    }

    var instance = Items[newIndex % itemsCount];
    //Console.WriteLine($"{DateTime.Now.Ticks}, {Guid.NewGuid()}, Got instance: {instance.ID}");
    return instance;
}

But I have to say the truth... I'm not sure if it is correct (it should be), and explaining it is hard, and if anyone touches it in any way it will break.

The basic idea is to have a "low" ceiling for crt (we don't want to overflow, it would break everything... so we want to keep veeeeeery far from int.MaxValue, or you could use uint).

The maximum possible value is:

maxItemCount = (int.MaxValue - MaximumNumberOfThreads) / itemsCount * itemsCount;

The / itemsCount * itemsCount is because we want the rounds to be equally distributed. In the example I give I use a probably much lower number (itemsCount * 100) because lowering this ceiling will only cause the reset more often, but the reset isn't so much slow that it is truly important (it depends on what you are doing on the threads. If they are very small threads that only use cpu then the reset is slow, but if not then it isn't).

Then when we overflow this ceiling we try to move it back to -1 (our starting point). We know that at the same time other bad bad threads could Interlocked.Increment it and create a race on this reset. Thanks to the Interlocked.CompareExchange only one thread can successfully reset the counter, but the other racing threads will immediately see this and break from their attempts.

Mmmh... The if can be rewritten as:

if (newIndex >= itemsCount)
{
    int newIndex2;
    while (newIndex >= itemsCount && (newIndex2 = Interlocked.CompareExchange(ref crt, 0, newIndex)) != newIndex)
    {
        // If the Interlocked.CompareExchange is successfull, the while will end and so we won't be here,
        // if it fails, newIndex2 is the current value of crt
        newIndex = newIndex2;
    }

    continue;
}

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...