Using segmentio/kafka-go to implement kafka consumer producer.
https://github.com/segmentio/kafka-go
- create topic
func CreateTopicname() {
fmt.Println("start ")
mechanism := plain.Mechanism{
Username: "****",
Password: "***",
}
dialer := &kafka.Dialer{
Timeout: 10 * time.Second,
DualStack: true,
SASLMechanism: mechanism,
}
_, err = dialer.DialContext(context.Background(), "tcp", "localhost:9093")
if err != nil {
fmt.Println(err.Error())
} else {
fmt.Println("created successfully")
}
- List of Topics
func ListofTopics() {
mechanism := plain.Mechanism{
Username: "***",
Password: "****",
}
dialer := &kafka.Dialer{
Timeout: 10 * time.Second,
DualStack: true,
SASLMechanism: mechanism,
}
conn, err := dialer.DialContext(context.Background(), "tcp", "localhost:9092")
if err != nil {
fmt.Println(err.Error())
}
defer conn.Close()
_, err = conn.ReadPartitions()
if err != nil {
fmt.Println(err.Error())
}
fmt.Println(" end of list")
}
Inside Kafka/config directory created /kafka_server_jaas.conf and added following lines
KafkaServer {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="***"
password="***"
user_admin="***";
};
Client {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="***"
password="***";
};
Added the following values in the config/server.properties file.
security.inter.broker.protocol=SASL_PLAINTEXT
sasl.mechanism.inter.broker.protocol=PLAIN
sasl.enabled.mechanisms=PLAIN
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
allow.everyone.if.no.acl.found=true
listeners=SASL_PLAINTEXT://localhost:9092
advertised.listeners=SASL_PLAINTEXT://:9092
Then ran this command
export KAFKA_OPTS="-Djava.security.auth.login.config=PATH_TO_JASS.CONF_FILE/kafka_server_jaas.conf"
then started
- sudo systemctl start zookeeper
- sudo systemctl start kafka
the status of kafka server is active and have checked the status after running above command.
but when I execute go run main.go
getting error like dial tcp 127.0.0.1:9092: connect: connection refused or dial tcp 127.0.0.1:9092: connect: server missbehaving.
与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…