Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
419 views
in Technique[技术] by (71.8m points)

multithreading - Java, divide incoming work uniformly via hashing in multithreaded evnironments

I've implemented a java code to execute incoming tasks (as Runnable) with n Threads based on their hashCode module nThreads. The work should spread, ideally - uniformly, among those threads. Specifically, we have a dispatchId as a string for each Task.

Here is this java code snippet:

int nThreads = Runtime.getRuntime().availableProcessors(); // Number of threads
Worker[] workers = new Worker[nThreads]; // Those threads, Worker is just a thread class that can run incoming tasks
...
Worker getWorker(String dispatchId) { // Get a thread for this Task
    return workers[(dispatchId.hashCode() & Integer.MAX_VALUE) % nThreads];
}

Important: In most cases a dispatchId is:

String dispatchId = 'SomePrefix' + counter.next()

But, I have a concern that modulo division by nThreads is not a good choice, because nThreads should be a prime number for a more uniform distribution of dispatId keys.

Are there any other options on how to spread the work better?

Update 1:

Each Worker has a queue: Queue<RunnableWrapper> tasks = new ConcurrentLinkedQueue();

The worker gets tasks from it and executes them. Tasks can be added to this queue from other threads.

Update 2:

Tasks with the same dispatchId can come in multiple times, therefore we need to find their thread by dispatchId.

Most importantly, each Worker thread must process its incoming tasks sequentially. Hence, there is data structure Queue in the update 1 above.

Update 3: Also, some threads can be busy, while others are free. Thus, we need to somehow decouple Queues from Threads, but maintain the FIFO order for the same dispatchId for tasks execution.

Solution: I've implemented Ben Manes' idea (his answer below), the code can be found here.

See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)

It sounds like you need FIFO ordering per dispatch id, so the ideal would be to have dispatch queues as the abstraction. That would explain your concern about hashing as not providing uniform distribution, as some dispatch queues may be more active than others and unfairly balanced among workers. By separating the queue from the worker, you retain FIFO semantics and evenly spread out the work.

An inactive library that provides this abstraction is HawtDispatch. It is Java 6 compatible.

A very simple Java 8 approach is to use CompletableFuture as a queuing mechanism, ConcurrentHashMap for registration, and an Executor (e.g. ForkJoinPool) for computing. See EventDispatcher for an implementation of this idea, where registration is explicit. If your dispatchers are more dynamic then you may need to periodically prune the map. The basic idea is as follows.

ConcurrentMap<String, CompletableFuture<Void>> dispatchQueues = ...

public CompletableFuture<Void> dispatch(String queueName, Runnable task) {
  return dispatchQueues.compute(queueName, (k, queue) -> {
    return (queue == null)
        ? CompletableFuture.runAsync(task)
        : queue.thenRunAsync(task);
  });
}

Update (JDK7)

A backport of the above idea would be translated with Guava into something like,

ListeningExecutorService executor = ...
Striped<Lock> locks = Striped.lock(256);
ConcurrentMap<String, ListenableFuture<?>> dispatchQueues = ...

public ListenableFuture<?> dispatch(String queueName, final Runnable task) {
  Lock lock = locks.get(queueName);
  lock.lock();
  try {
    ListenableFuture<?> future = dispatchQueues.get(queueName);
    if (future == null) {
      future = executor.submit(task);
    } else {
      final SettableFuture<Void> next = SettableFuture.create();
      future.addListener(new Runnable() {
        try {
          task.run();
        } finally {
          next.set(null);
        }
      }, executor);
      future = next;
    }
    dispatchQueues.put(queueName, future);
  } finally {
    lock.unlock();
  }
}

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...