springboot集成kafka

笔者开发环境为:
jdk1.8、springboot2.1.6、kafka_2.12-2.2.1

准备工作

pom.xml引入

1
2
3
4
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>

application.yml配置

1
2
3
4
5
6
spring:
kafka:
bootstrap-servers: kafkaserver:9092
consumer:
group-id: kafka_group
enable-auto-commit: true

kafkaserver是笔者配置的host

消费者

1
2
3
4
5
6
7
8
9
10
11
12
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

@Component
public class Consumer {

@KafkaListener(id = "Listener的id", topics = "消费的主题")
public void listen1(String foo) {
System.out.println(foo);
}

}

使用@KafkaListener注解来实现,多个主题英文逗号隔开。如果group-id和KafkaListener的id相同,同时消费kafka的同一主题,kafka会将消息分发给多个Consumer,此时listen1不一定能够消费数据。

生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;


public class KafkaProducer {

private KafkaProducer<String, String> producer;

private String TOPIC = "test";

public KafkaClient(Kafkadomain kafkadomain){
Properties props = new Properties();
props.put("bootstrap.servers", kafkaserver:9092);
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
producer = new KafkaProducer<String, String>(props);
}

public void produce(String data){
//如需分区可自己添加key
producer.send(new ProducerRecord<String, String>(TOPIC,data));
producer.close();
}

public static void main(String[] args) {
KafkaProducer p = new KafkaProducer();
p.produce("hello world");
}

}
文章作者: 柯熠
文章链接: https://smallkserver.github.io/2019/08/09/springboot集成kafka/
版权声明: 本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明来自 Smallkserver
打赏
  • 微信
  • 支付宝

评论