好得很程序员自学网

<tfoot draggable='sEl'></tfoot>

被kafka-client和springkafka版本坑到自闭及解决

被kafka-client和springkafka 版本 坑

上周刚刚欢天喜地的在linux上部了kafka,这周打算用spring-boot框架写个简单demo跑一下,结果悲剧就此展开。

首先建立maven工程:pom中添加spring boot kafka依赖:

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

<? xml version = "1.0" encoding = "UTF-8" ?>

< project xmlns = "http://maven.apache.org/POM/4.0.0" xmlns:xsi = "http://www.w3.org/2001/XMLSchema-instance"

    xsi:schemaLocation = "http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd" >

    < modelVersion >4.0.0</ modelVersion >

    < parent >

      < groupId >org.springframework.boot</ groupId >

      < artifactId >spring-boot-starter-parent</ artifactId >

      < version >2.1.5.RELEASE</ version >

      < relativePath /> <!-- lookup parent from repository -->

    </ parent >

    < groupId >com.example</ groupId >

    < artifactId >kafkaproducer</ artifactId >

    < version >0.0.1-SNAPSHOT</ version >

    < name >kafkaproducer</ name >

    < description >Demo project for Spring Boot</ description >

 

    < properties >

      < java.version >1.8</ java.version >

    </ properties >

 

    < dependencies >

      < dependency >

          < groupId >org.springframework.boot</ groupId >

          < artifactId >spring-boot-starter-web</ artifactId >

      </ dependency >

 

      < dependency >

          < groupId >org.projectlombok</ groupId >

          < artifactId >lombok</ artifactId >

          < optional >true</ optional >

      </ dependency >

      < dependency >

          < groupId >org.springframework.boot</ groupId >

          < artifactId >spring-boot-starter-test</ artifactId >

          < scope >test</ scope >

      </ dependency >

      < dependency >

          < groupId >org.springframework.kafka</ groupId >

          < artifactId >spring-kafka</ artifactId >

      </ dependency >

    </ dependencies >

 

    < build >

      < plugins >

          < plugin >

            < groupId >org.springframework.boot</ groupId >

            < artifactId >spring-boot-maven-plugin</ artifactId >

          </ plugin >

      </ plugins >

    </ build >

 

 

</ project >

配置文件如下:

?

1

2

3

4

5

6

7

8

server.port= 8089

spring.kafka.bootstrap-servers=ip:port

spring.kafka.producer.retries= 0

spring.kafka.producer.batch-size= 16384

spring.kafka.producer.buffer-memory= 33554432

spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer

spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer

spring.kafka.producer.linger.ms= 1

然后新建一个Producer类

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

package com.example.kafkaproducer;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.kafka.core.KafkaTemplate;

import org.springframework.stereotype.Component;

 

@Component

public class KafkaProducer {

    @Autowired

    KafkaTemplate kafkaTemplate;

    public void produce(){

        kafkaTemplate.send( "test" , "hello word" );

        System.out.println( "发送消息" );

    }

}

在test类中调用

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

package com.example.kafkaproducer;  

import org.junit.Test;

import org.junit.runner.RunWith;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.boot.test.context.SpringBootTest;

import org.springframework.test.context.junit4.SpringRunner; 

 

@RunWith (SpringRunner. class )

@SpringBootTest

public class KafkaproducerApplicationTests { 

    @Autowired KafkaProducer kafkaProducer; 

    @Test

    public void contextLoads() {

      kafkaProducer.produce();

    } 

}

然后控制台就会打印一个莫名奇妙的错误,没有打印任何堆栈信息,大概意思只是表达了连接不上。

Exception thrown when sending a message with key='null' and payload='' to topic

telnet ip+port 是可以通的

随后发现,xshell上启动的kafka-server在报这样一个错,更详细的没有留存。

ERROR Closing socket for /127.0.0.1 because of error (kafka.network.Processor)
kafka.common.KafkaException: Wrong request type 18

百度了一下,很可能是Linux上的kafka版本和pom中引入的spring-kafka依赖不匹配造成的,于是 查看对应关系 。

查看kafka,发现装的是一个0.8.2.1 版本的kafka,该版本的kafka是2015年3月发布的版本,可以说是十分古老,真是不知道为什么当初要选这么老的版本。

换了几次spring-kafka的pom之后,依然在报这个问题,于是我选择换更新的kafka的包。

换了2.2.0版本kafka的包,问题得到解决。

其中consumer的创建命令和老版本的不太一样,且consumer和producer需使用相同的端口号,而不是像之前producer配置为broker的端口,consumer配置为zookeeper的端口号。

?

1

./bin/kafka-console-consumer.sh --bootstrap-server ip:9092  --topic test

且config文件夹下server.properties文件中的一些配置和之前不太一样,需要注意的是,以下两行配置原来是被注解了的,需要在这里取消掉注解,并配置自己的ip。

?

1

2

listeners = PLAINTEXT://your.host.name:9092

advertised.listeners=PLAINTEXT://your.host.name:9092

springboot、spring-kafka、kafka-client三者兼容性关系

spring官方描述的spring-kafka的版本和kafka-clients的版本对应关系:

官方地址: https://spring.io/projects/spring-kafka

中间列:[Spring Integration for Apache Kafka Version 可忽略不看:

也就是说spring-kafka与spring-client是存在在一对多关系的,那是不是他所有的spring-client都可以选呢?

接着往下看(摘自官网):

他说啥 ?

springboot 1.5 你应该用的是spring-kafka 1.3.x. springboot2.0你应该使用的是spring-kafka2.0.x. 如果用的是spring boot2.1.x,那么你必须使用spring-kafka的版本是2.2.x。否则就会出现noClass等等各种异常。 spring-kafka的版本是2.1默认使用的spring-client是1.1.x,当你要使用另外两个时,你就要使用如下的版本配置. 如果你用的是2.2.x的spring-kafka,只看第一张图,你会以为2.1.x的kafka-clients也可以用。但是spring说了,此时默认用的kafka-clients是2.0.x,如果你想用2.1.x,必须看文档附录,下图的大概意思,必须换掉下图所示的所有依赖版本。

也就是说并不是一对多 他默认的还是只有一个kafka-client来给你的,你要选其他的可以的,你添加一些额外配置

例如:

 Spring Boot 2.1.0.RELEASE 版本,因此 spring-kafka 的版本为 2.2.0.RELEASE,kafka-clients 的默认版本为2.0.0,所以 kafka 的版本选用为 kafka_2.11-2.1.0 (前面的2.11代表的是Scala的版本后面为kafka的版本号)

以上为个人经验,希望能给大家一个参考,也希望大家多多支持。

原文链接:https://blog.csdn.net/qq_27707957/article/details/90553235

查看更多关于被kafka-client和springkafka版本坑到自闭及解决的详细内容...

  阅读:25次