Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
785 views
in Technique[技术] by (71.8m points)

scale spring-kafka consumers app horizontally

I'm wondering what would be a good approach to configure the amount of partitions in relation to the max number of horizontally scaled instances.

Suppose I have one topic with 6 partitions.

I have one application that uses the ConcurrentKafkaListenerContainerFactory with setConcurrency of 6. That would mean I have will have 6 KafkaMessageListenerContainer each using one thread and consuming messages from all of my partitions spread out evenly.

If the above is correct then I was wondering what would happen if I scale the app horizontally by adding another instance ? If the new instance would have the same configuration of a concurrency of 6 and ofcourse the same consumer group I believe the 2nd instance will not be consuming any messages. Because no rebalance will happen because each existing consumer will have one partition assigned to it.

But what if we go back to the first example and have 6 partition with one instance having a concurrency of 3 then each consumer thread/KafkaMessageListenerContainer will have 2 partitions assigned. If we scale this app (same consumer group id and also a concurrency of 3) I believe that a rebalance will happen and both instances will individually be consuming from 3 partitions.

Are these assumptions correct and if not how should you handle such a case ?

question from:https://stackoverflow.com/questions/65946520/scale-spring-kafka-consumers-app-horizontally

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)

In general your assumption is correct for a default behavior, which is based on the:

/**
 * <p>The range assignor works on a per-topic basis. For each topic, we lay out the available partitions in numeric order
 * and the consumers in lexicographic order. We then divide the number of partitions by the total number of
 * consumers to determine the number of partitions to assign to each consumer. If it does not evenly
 * divide, then the first few consumers will have one extra partition.
 *
 * <p>For example, suppose there are two consumers <code>C0</code> and <code>C1</code>, two topics <code>t0</code> and
 * <code>t1</code>, and each topic has 3 partitions, resulting in partitions <code>t0p0</code>, <code>t0p1</code>,
 * <code>t0p2</code>, <code>t1p0</code>, <code>t1p1</code>, and <code>t1p2</code>.
 *
 * <p>The assignment will be:
 * <ul>
 * <li><code>C0: [t0p0, t0p1, t1p0, t1p1]</code></li>
 * <li><code>C1: [t0p2, t1p2]</code></li>
 * </ul>
 *
 * Since the introduction of static membership, we could leverage <code>group.instance.id</code> to make the assignment behavior more sticky.
 * For the above example, after one rolling bounce, group coordinator will attempt to assign new <code>member.id</code> towards consumers,
 * for example <code>C0</code> -&gt; <code>C3</code> <code>C1</code> -&gt; <code>C2</code>.
 *
 * <p>The assignment could be completely shuffled to:
 * <ul>
 * <li><code>C3 (was C0): [t0p2, t1p2] (before was [t0p0, t0p1, t1p0, t1p1])</code>
 * <li><code>C2 (was C1): [t0p0, t0p1, t1p0, t1p1] (before was [t0p2, t1p2])</code>
 * </ul>
 *
 * The assignment change was caused by the change of <code>member.id</code> relative order, and
 * can be avoided by setting the group.instance.id.
 * Consumers will have individual instance ids <code>I1</code>, <code>I2</code>. As long as
 * 1. Number of members remain the same across generation
 * 2. Static members' identities persist across generation
 * 3. Subscription pattern doesn't change for any member
 *
 * <p>The assignment will always be:
 * <ul>
 * <li><code>I0: [t0p0, t0p1, t1p0, t1p1]</code>
 * <li><code>I1: [t0p2, t1p2]</code>
 * </ul>
 */
public class RangeAssignor extends AbstractPartitionAssignor {

However you can plug in any ConsumerPartitionAssignor via partition.assignment.strategy consumer property: https://kafka.apache.org/documentation/#consumerconfigs_partition.assignment.strategy

See also ConsumerPartitionAssignor JavaDocs for more info and its implementations to make a choice for your use-case.


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...