笔者开发环境为:
jdk1.8、springboot2.1.6、kafka_2.12-2.2.1
准备工作
pom.xml引入
1 2 3 4
| <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency>
|
application.yml配置
1 2 3 4 5 6
| spring: kafka: bootstrap-servers: kafkaserver:9092 consumer: group-id: kafka_group enable-auto-commit: true
|
kafkaserver是笔者配置的host
消费者
1 2 3 4 5 6 7 8 9 10 11 12
| import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Component;
@Component public class Consumer {
@KafkaListener(id = "Listener的id", topics = "消费的主题") public void listen1(String foo) { System.out.println(foo); } }
|
使用@KafkaListener注解来实现,多个主题英文逗号隔开。如果group-id和KafkaListener的id相同,同时消费kafka的同一主题,kafka会将消息分发给多个Consumer,此时listen1不一定能够消费数据。
生产者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37
| import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value;
public class KafkaProducer {
private KafkaProducer<String, String> producer; private String TOPIC = "test"; public KafkaClient(Kafkadomain kafkadomain){ Properties props = new Properties(); props.put("bootstrap.servers", kafkaserver:9092); props.put("acks", "all"); props.put("retries", 0); props.put("batch.size", 16384); props.put("linger.ms", 1); props.put("buffer.memory", 33554432); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); producer = new KafkaProducer<String, String>(props); } public void produce(String data){ producer.send(new ProducerRecord<String, String>(TOPIC,data)); producer.close(); } public static void main(String[] args) { KafkaProducer p = new KafkaProducer(); p.produce("hello world"); }
}
|