Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
693 views
in Technique[技术] by (71.8m points)

go - how to Authenticate kafka with SASL_PLAIN

Using segmentio/kafka-go to implement kafka consumer producer. https://github.com/segmentio/kafka-go

  1. create topic
func CreateTopicname() {
    fmt.Println("start ")
    mechanism := plain.Mechanism{
        Username: "****",
        Password: "***",
    }

    dialer := &kafka.Dialer{
        Timeout:       10 * time.Second,
        DualStack:     true,
        SASLMechanism: mechanism,
    }

    _, err = dialer.DialContext(context.Background(), "tcp", "localhost:9093")

    if err != nil {
        fmt.Println(err.Error())
    } else {
        fmt.Println("created successfully")
    }
  1. List of Topics
func ListofTopics() {
        mechanism := plain.Mechanism{
            Username: "***",
            Password: "****",
        }
    
        dialer := &kafka.Dialer{
            Timeout:       10 * time.Second,
            DualStack:     true,
            SASLMechanism: mechanism,
        }
    
        conn, err := dialer.DialContext(context.Background(), "tcp", "localhost:9092")
    
        
        if err != nil {
            fmt.Println(err.Error())
        }
        defer conn.Close()
    
        _, err = conn.ReadPartitions()
        if err != nil {
            fmt.Println(err.Error())
        }
        
        fmt.Println(" end of list")
    }

Inside Kafka/config directory created /kafka_server_jaas.conf and added following lines

KafkaServer {
  org.apache.kafka.common.security.plain.PlainLoginModule required
  username="***"
  password="***"
  user_admin="***";
};

Client {
  org.apache.kafka.common.security.plain.PlainLoginModule required
  username="***"
  password="***";
};

Added the following values in the config/server.properties file.

security.inter.broker.protocol=SASL_PLAINTEXT
sasl.mechanism.inter.broker.protocol=PLAIN
sasl.enabled.mechanisms=PLAIN

authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
allow.everyone.if.no.acl.found=true
listeners=SASL_PLAINTEXT://localhost:9092
advertised.listeners=SASL_PLAINTEXT://:9092

Then ran this command export KAFKA_OPTS="-Djava.security.auth.login.config=PATH_TO_JASS.CONF_FILE/kafka_server_jaas.conf"

then started

  1. sudo systemctl start zookeeper
  2. sudo systemctl start kafka the status of kafka server is active and have checked the status after running above command.

but when I execute go run main.go

getting error like dial tcp 127.0.0.1:9092: connect: connection refused or dial tcp 127.0.0.1:9092: connect: server missbehaving.


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)
等待大神答复

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...