笔者开发环境为:
jdk1.8、springboot2.1.6、kafka_2.12-2.2.1
准备工作
pom.xml引入
1 2 3 4
   | <dependency>     <groupId>org.springframework.kafka</groupId>     <artifactId>spring-kafka</artifactId> </dependency>
   | 
 
application.yml配置
1 2 3 4 5 6
   | spring:   kafka:     bootstrap-servers: kafkaserver:9092     consumer:       group-id: kafka_group       enable-auto-commit: true
   | 
 
kafkaserver是笔者配置的host
消费者
1 2 3 4 5 6 7 8 9 10 11 12
   | import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Component;
  @Component public class Consumer {
      @KafkaListener(id = "Listener的id", topics = "消费的主题")     public void listen1(String foo) {         System.out.println(foo);     }      }
   | 
 
使用@KafkaListener注解来实现,多个主题英文逗号隔开。如果group-id和KafkaListener的id相同,同时消费kafka的同一主题,kafka会将消息分发给多个Consumer,此时listen1不一定能够消费数据。
生产者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37
   | import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value;
 
  public class KafkaProducer {
  	private KafkaProducer<String, String> producer; 	 	private String TOPIC = "test"; 	 	public KafkaClient(Kafkadomain kafkadomain){         Properties props = new Properties();         props.put("bootstrap.servers", kafkaserver:9092);         props.put("acks", "all");         props.put("retries", 0);         props.put("batch.size", 16384);         props.put("linger.ms", 1);         props.put("buffer.memory", 33554432);         props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");         props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");         producer = new KafkaProducer<String, String>(props);     } 	     public void produce(String data){                  producer.send(new ProducerRecord<String, String>(TOPIC,data));         producer.close();     } 	     public static void main(String[] args) {         KafkaProducer p = new KafkaProducer();         p.produce("hello world");     }
  }
   |