引言
kafka近几年更新非常快,也可以看出kafka在企业中是用的频率越来越高,在springboot中集成kafka还是比较简单的,但是应该注意使用的版本和kafka中基本配置,这个地方需要信心,防止进入坑中。
版本对应地址:https://spring.io/projects/spring-kafka
基本环境
springboot版本2.1.4
kafka版本2.2.0
jdk 1.8
代码编写
1、基本引用pom
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 |
<? xml version = "1.0" encoding = "UTF-8" ?> < project xmlns = "http://maven.apache.org/POM/4.0.0" xmlns:xsi = "http://HdhCmsTestw3.org/2001/XMLSchema-instance" xsi:schemaLocation = "http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd" > < modelVersion >4.0.0</ modelVersion > < parent > < groupId >org.springframework.boot</ groupId > < artifactId >spring-boot-starter-parent</ artifactId > < version >2.1.4.RELEASE</ version > < relativePath /> <!-- lookup parent from repository --> </ parent > < groupId >com.example</ groupId > < artifactId >demo</ artifactId > < version >0.0.1-SNAPSHOT</ version > < name >kafkademo</ name > < description >Demo project for Spring Boot</ description >
< properties > < java.version >1.8</ java.version > </ properties >
< dependencies > < dependency > < groupId >org.springframework.boot</ groupId > < artifactId >spring-boot-starter-web</ artifactId > </ dependency >
< dependency > < groupId >mysql</ groupId > < artifactId >mysql-connector-java</ artifactId > < scope >runtime</ scope > </ dependency > < dependency > < groupId >org.springframework.boot</ groupId > < artifactId >spring-boot-starter-test</ artifactId > < scope >test</ scope > </ dependency >
< dependency > < groupId >org.springframework.kafka</ groupId > < artifactId >spring-kafka</ artifactId > < version >2.2.0.RELEASE</ version > </ dependency >
< dependency > < groupId >com.google.code.gson</ groupId > < artifactId >gson</ artifactId > < version >2.7</ version > </ dependency >
</ dependencies >
< build > < plugins > < plugin > < groupId >org.springframework.boot</ groupId > < artifactId >spring-boot-maven-plugin</ artifactId > </ plugin > </ plugins > </ build >
</ project > |
2、基本配置
|
1 2 3 4 5 6 7 8 |
spring.kafka.bootstrap-servers= 2.1 . 1.1 : 9092 spring.kafka.consumer.group-id=test-consumer-group spring.kafka.consumer.key-deserializer=org.apache.kafka测试数据mon.serialization.StringDeserializer spring.kafka.consumer.value-deserializer=org.apache.kafka测试数据mon.serialization.StringDeserializer spring.kafka.producer.key-serializer=org.apache.kafka测试数据mon.serialization.StringSerializer spring.kafka.producer.value-serializer=org.apache.kafka测试数据mon.serialization.StringSerializer
#logging.level.root=debug |
3、实体类
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 |
package com.example.demo.model;
import java.util.Date;
public class Messages {
private Long id;
private String msg;
private Date sendTime;
public Long getId() { return id; }
public void setId(Long id) { this .id = id; }
public String getMsg() { return msg; }
public void setMsg(String msg) { this .msg = msg; }
public Date getSendTime() { return sendTime; }
public void setSendTime(Date sendTime) { this .sendTime = sendTime; } } |
4、生产者端
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 |
package com.example.demo.service;
import com.example.demo.model.Messages; import com.google.gson.Gson; import com.google.gson.GsonBuilder; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.support.SendResult; import org.springframework.stereotype.Service; import org.springframework.util.concurrent.ListenableFuture;
import java.util.Date; import java.util.UUID;
@Service public class KafkaSender {
@Autowired private KafkaTemplate<String, String> kafkaTemplate;
private Gson gson = new GsonBuilder().create();
public void send() { Messages message = new Messages(); message.setId(System.currentTimeMillis()); message.setMsg( "123" ); message.setSendTime( new Date()); ListenableFuture<SendResult<String, String>> test0 = kafkaTemplate.send( "newtopic" , gson.toJson(message)); } } |
5、消费者
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 |
package com.example.demo.service;
import org.apache.kafka.clients.consumer.ConsumerRecord; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Service;
import java.util.Optional;
@Service public class KafkaReceiver {
@KafkaListener (topics = { "newtopic" }) public void listen(ConsumerRecord<?, ?> record) { Optional<?> kafkaMessage = Optional.ofNullable(record.value()); if (kafkaMessage.isPresent()) { Object message = kafkaMessage.get(); System.out.println( "record =" + record); System.out.println( "message =" + message);
} }
} |
6、测试
在启动方法中模拟消息生产者,向kafka中发送消息
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 |
package com.example.demo;
import com.example.demo.service.KafkaSender; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.context.ConfigurableApplicationContext;
@SpringBootApplication public class KafkademoApplication {
public static void main(String[] args) { ConfigurableApplicationContext context = SpringApplication.run(KafkademoApplication. class , args);
KafkaSender sender = context.getBean(KafkaSender. class ); for ( int i = 0 ; i < 1000 ; i++) { sender.send();
try { Thread.sleep( 300 ); } catch (InterruptedException e) { e.printStackTrace(); } }
}
} |
效果展示
命令行直接消费消息
遇到的问题
生产端连接kafka超时
at org.apache.kafka测试数据mon.network.NetworkReceive.readFrom(NetworkReceive.java:119)
解决方案:
修改kafka中的server.properties中的下面配置,将原来的默认配置替换成下面ip+端口的形式,重启kafka
到此这篇关于Springboot 2.x集成kafka 2.2.0的示例代码的文章就介绍到这了,更多相关Springboot集成kafka内容请搜索以前的文章或继续浏览下面的相关文章希望大家以后多多支持!
原文链接:https://blog.csdn.net/hao134838/article/details/90242719
查看更多关于Springboot 2.x集成kafka 2.2.0的示例代码的详细内容...