好得很程序员自学网

<tfoot draggable='sEl'></tfoot>

springboot之配置双kafka全过程

springboot配置双kafka

使用spring boot 2.0.8.RELEASE 版本

引入Maven kafka jar、准备两个kafka;

?

1

2

3

4

< dependency >

    < groupId >org.springframework.kafka</ groupId >

    < artifactId >spring-kafka</ artifactId >

</ dependency >

配置yml配置文件

?

1

2

3

4

5

6

7

8

9

10

11

spring:

   kafka :

    bootstrap-servers : 180 .167.180.242: 9092 #kafka的访问地址,多个用","隔开

    consumer :

      enable-auto-commit : true

      group-id : kafka #群组ID

   outkafka :

    bootstrap-servers : localhost : 9092 #kafka的访问地址,多个用","隔开

    consumer :

      enable-auto-commit : true

      group-id : kafka_1 #群组ID

配置KafkaConfig类

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

92

93

94

95

96

97

98

99

100

101

102

103

104

105

106

107

108

109

110

111

112

113

114

115

116

117

118

119

120

121

122

123

124

125

126

127

128

129

130

131

132

133

134

135

136

137

138

139

140

141

142

143

144

145

146

147

148

149

150

151

152

153

154

155

156

157

158

159

160

161

162

163

164

165

166

167

168

169

170

171

172

173

174

175

176

177

178

179

180

181

182

183

184

185

186

187

188

189

190

191

192

193

194

195

196

197

198

199

200

201

202

203

204

205

206

207

208

209

210

211

212

213

214

215

216

217

218

219

220

221

222

223

224

225

226

227

228

229

230

import java.util.HashMap;

import java.util.Map;

 

import org.apache.kafka.clients.consumer.ConsumerConfig;

import org.apache.kafka.clients.producer.ProducerConfig;

import org.apache.kafka测试数据mon.serialization.StringDeserializer;

import org.apache.kafka测试数据mon.serialization.StringSerializer;

import org.springframework.beans.factory.annotation.Value;

import org.springframework.context.annotation.Bean;

import org.springframework.context.annotation.Configuration;

import org.springframework.context.annotation.Primary;

import org.springframework.kafka.annotation.EnableKafka;

import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;

import org.springframework.kafka.config.KafkaListenerContainerFactory;

import org.springframework.kafka.core.ConsumerFactory;

import org.springframework.kafka.core.DefaultKafkaConsumerFactory;

import org.springframework.kafka.core.DefaultKafkaProducerFactory;

import org.springframework.kafka.core.KafkaTemplate;

import org.springframework.kafka.core.ProducerFactory;

import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;

 

@Configuration

@EnableKafka

public class KafkaConfig {

    @Value ( "${spring.kafka.bootstrap-servers}" )

    private String innerServers;

    @Value ( "${spring.kafka.consumer.group-id}" )

    private String innerGroupid;

    @Value ( "${spring.kafka.consumer.enable-auto-commit}" )

    private String innerEnableAutoCommit;

 

    @Bean

    @Primary //理解为默认优先选择当前容器下的消费者工厂

    KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> kafkaListenerContainerFactory() {

        ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();

        factory.setConsumerFactory(consumerFactory());

        factory.setConcurrency( 3 );

        factory.getContainerProperties().setPollTimeout( 3000 );

        return factory;

    }

 

    @Bean //第一个消费者工厂的bean

    public ConsumerFactory<Integer, String> consumerFactory() {

        return new DefaultKafkaConsumerFactory<>(consumerConfigs());

    }

 

    @Bean

    public Map<String, Object> consumerConfigs() {

        Map<String, Object> props = new HashMap<>();

        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, innerServers);

        props.put(ConsumerConfig.GROUP_ID_CONFIG, innerGroupid);

        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, innerEnableAutoCommit);

//        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");

//        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");

        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer. class );

        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer. class );

        return props;

    }

    

    @Bean //生产者工厂配置

    public ProducerFactory<String, String> producerFactory() {

        return new DefaultKafkaProducerFactory<>(senderProps());

    }

    

    @Bean //kafka发送消息模板

    public KafkaTemplate<String, String> kafkaTemplate() {

        return new KafkaTemplate<String, String>(producerFactory());

    }

    

    /**

      * 生产者配置方法

      *

      * 生产者有三个必选属性

      * <p>

      * 1.bootstrap.servers broker地址清单,清单不要包含所有的broker地址,

      * 生产者会从给定的broker里查找到其他broker的信息。不过建议至少提供两个broker信息,一旦 其中一个宕机,生产者仍能能够连接到集群上。

      * </p>

      * <p>

      * 2.key.serializer broker希望接收到的消息的键和值都是字节数组。 生产者用对应的类把键对象序列化成字节数组。

      * </p>

      * <p>

      * 3.value.serializer 值得序列化方式

      * </p>

      *

      *

      * @return

      */

    private Map<String, Object> senderProps() {

        Map<String, Object> props = new HashMap<>();

        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, innerServers);

        /**

          * 当从broker接收到的是临时可恢复的异常时,生产者会向broker重发消息,但是不能无限

          * 制重发,如果重发次数达到限制值,生产者将不会重试并返回错误。

          * 通过retries属性设置。默认情况下生产者会在重试后等待100ms,可以通过 retries.backoff.ms属性进行修改

          */

        props.put(ProducerConfig.RETRIES_CONFIG, 0 );

        /**

          * 在考虑完成请求之前,生产者要求leader收到的确认数量。这可以控制发送记录的持久性。允许以下设置:

          * <ul>

          * <li>

          * <code> acks = 0 </ code>如果设置为零,则生产者将不会等待来自服务器的任何确认。该记录将立即添加到套接字缓冲区并视为已发送。在这种情况下,无法保证服务器已收到记录,并且

          * <code>retries </ code>配置将不会生效(因为客户端通常不会知道任何故障)。为每条记录返回的偏移量始终设置为-1。

          * <li> <code> acks = 1 </code>

          * 这意味着leader会将记录写入其本地日志,但无需等待所有follower的完全确认即可做出回应。在这种情况下,

          * 如果leader在确认记录后立即失败但在关注者复制之前,则记录将丢失。

          * <li><code> acks = all </code>

          * 这意味着leader将等待完整的同步副本集以确认记录。这保证了只要至少一个同步副本仍然存活,记录就不会丢失。这是最强有力的保证。

          * 这相当于acks = -1设置

          */

        props.put(ProducerConfig.ACKS_CONFIG, "1" );

        /**

          * 当有多条消息要被发送到统一分区是,生产者会把他们放到统一批里。kafka通过批次的概念来 提高吞吐量,但是也会在增加延迟。

          */

        // 以下配置当缓存数量达到16kb,就会触发网络请求,发送消息

//        props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);

        // 每条消息在缓存中的最长时间,如果超过这个时间就会忽略batch.size的限制,由客户端立即将消息发送出去

//        props.put(ProducerConfig.LINGER_MS_CONFIG, 1);

//        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);

        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer. class );

        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer. class );

        return props;

    }

    

    @Value ( "${spring.outkafka.bootstrap-servers}" )

    private String outServers;

    @Value ( "${spring.outkafka.consumer.group-id}" )

    private String outGroupid;

    @Value ( "${spring.outkafka.consumer.enable-auto-commit}" )

    private String outEnableAutoCommit;

    

 

    static {

        

    }

    

    /**

      * 连接第二个kafka集群的配置

      */

    @Bean

    KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> kafkaListenerContainerFactoryOutSchedule() {

        ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();

        factory.setConsumerFactory(consumerFactoryOutSchedule());

        factory.setConcurrency( 3 );

        factory.getContainerProperties().setPollTimeout( 3000 );

        return factory;

    }

 

    @Bean

    public ConsumerFactory<Integer, String> consumerFactoryOutSchedule() {

        return new DefaultKafkaConsumerFactory<>(consumerConfigsOutSchedule());

    }

 

    /**

      * 连接第二个集群的消费者配置

      */

    @Bean

    public Map<String, Object> consumerConfigsOutSchedule() {

        Map<String, Object> props = new HashMap<>();

        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, outServers);

        props.put(ConsumerConfig.GROUP_ID_CONFIG, outGroupid);

        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, outEnableAutoCommit);

        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer. class );

        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer. class );

        return props;

    }

    

    @Bean //生产者工厂配置

    public ProducerFactory<String, String> producerOutFactory() {

        return new DefaultKafkaProducerFactory<>(senderOutProps());

    }

    

    @Bean //kafka发送消息模板

    public KafkaTemplate<String, String> kafkaOutTemplate() {

        return new KafkaTemplate<String, String>(producerOutFactory());

    }

    

    /**

      * 生产者配置方法

      *

      * 生产者有三个必选属性

      * <p>

      * 1.bootstrap.servers broker地址清单,清单不要包含所有的broker地址,

      * 生产者会从给定的broker里查找到其他broker的信息。不过建议至少提供两个broker信息,一旦 其中一个宕机,生产者仍能能够连接到集群上。

      * </p>

      * <p>

      * 2.key.serializer broker希望接收到的消息的键和值都是字节数组。 生产者用对应的类把键对象序列化成字节数组。

      * </p>

      * <p>

      * 3.value.serializer 值得序列化方式

      * </p>

      *

      *

      * @return

      */

    private Map<String, Object> senderOutProps() {

        Map<String, Object> props = new HashMap<>();

        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, outServers);

        /**

          * 当从broker接收到的是临时可恢复的异常时,生产者会向broker重发消息,但是不能无限

          * 制重发,如果重发次数达到限制值,生产者将不会重试并返回错误。

          * 通过retries属性设置。默认情况下生产者会在重试后等待100ms,可以通过 retries.backoff.ms属性进行修改

          */

        props.put(ProducerConfig.RETRIES_CONFIG, 0 );

        /**

          * 在考虑完成请求之前,生产者要求leader收到的确认数量。这可以控制发送记录的持久性。允许以下设置:

          * <ul>

          * <li>

          * <code> acks = 0 </ code>如果设置为零,则生产者将不会等待来自服务器的任何确认。该记录将立即添加到套接字缓冲区并视为已发送。在这种情况下,无法保证服务器已收到记录,并且

          * <code>retries </ code>配置将不会生效(因为客户端通常不会知道任何故障)。为每条记录返回的偏移量始终设置为-1。

          * <li> <code> acks = 1 </code>

          * 这意味着leader会将记录写入其本地日志,但无需等待所有follower的完全确认即可做出回应。在这种情况下,

          * 如果leader在确认记录后立即失败但在关注者复制之前,则记录将丢失。

          * <li><code> acks = all </code>

          * 这意味着leader将等待完整的同步副本集以确认记录。这保证了只要至少一个同步副本仍然存活,记录就不会丢失。这是最强有力的保证。

          * 这相当于acks = -1设置

          */

        props.put(ProducerConfig.ACKS_CONFIG, "1" );

        /**

          * 当有多条消息要被发送到统一分区是,生产者会把他们放到统一批里。kafka通过批次的概念来 提高吞吐量,但是也会在增加延迟。

          */

        // 以下配置当缓存数量达到16kb,就会触发网络请求,发送消息

//        props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);

        // 每条消息在缓存中的最长时间,如果超过这个时间就会忽略batch.size的限制,由客户端立即将消息发送出去

//        props.put(ProducerConfig.LINGER_MS_CONFIG, 1);

//        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);

        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer. class );

        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer. class );

        return props;

    }

}

发送工具类MyKafkaProducer

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.kafka.core.KafkaTemplate;

import org.springframework.kafka.support.SendResult;

import org.springframework.scheduling.annotation.EnableScheduling;

import org.springframework.stereotype.Component;

import org.springframework.util.concurrent.ListenableFuture;

 

import lombok.extern.slf4j.Slf4j;

 

/**

  * <p>

  * <b>KafkaProducer Description:</b> kafka生产者

  * </p>

  *

  * @author douzaixing<b>DATE</b> 2019年7月8日 下午4:09:29

  */

@Component // 这个必须加入容器不然,不会执行

@EnableScheduling // 这里是为了测试加入定时调度

@Slf4j

public class MyKafkaProducer {

 

    @Autowired

    private KafkaTemplate<String, String> kafkaTemplate;

 

    @Autowired

    private KafkaTemplate<String, String> kafkaOutTemplate;

 

    public ListenableFuture<SendResult<String, String>> send(String topic, String key, String json) {

        ListenableFuture<SendResult<String, String>> result = kafkaTemplate.send(topic, key, json);

        log.info( "inner kafka send #topic=" + topic + "#key=" + key + "#json=" + json + "#推送成功===========" );

        return result;

    }

 

    public ListenableFuture<SendResult<String, String>> sendOut(String topic, String key, String json) {

        ListenableFuture<SendResult<String, String>> result = kafkaOutTemplate.send(topic, key, json);

        log.info( "out kafka send #topic=" + topic + "#key=" + key + "#json=" + json + "#推送成功===========" );

        return result;

    }

 

}

测试类

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

@Slf4j

@RunWith (SpringJUnit4ClassRunner. class )

@SpringBootTest (classes={OesBcServiceApplication. class })

public class MoreKafkaTest {

    

    @Autowired

    private MyKafkaProducer kafkaProducer;

    

    @Test

    public void sendInner() {

        for ( int i = 0 ; i < 1 ; i++) {

            kafkaProducer.send( "inner_test" , "douzi" + i, "liyuehua" + i);

            kafkaProducer.sendOut( "out_test" , "douziout" + i, "fanbingbing" + i);

        }

    }

}

接收类

?

1

2

3

4

5

6

7

8

9

10

11

12

13

@Component

@Slf4j

public class KafkaConsumer {  

    @KafkaListener (topics={ "inner_test" }, containerFactory= "kafkaListenerContainerFactory" )

    public void innerlistener(ConsumerRecord<String, String> record) {

        log.info( "inner kafka receive #key=" + record.key() + "#value=" + record.value());

    }

    

    @KafkaListener (topics={ "out_test" }, containerFactory= "kafkaListenerContainerFactoryOutSchedule" )

    public void outListener(ConsumerRecord<String, String> record) {

        log.info( "out kafka receive #key=" + record.key() + "#value=" + record.value());

    }

}

测试结果

07-11 12:41:27.811 INFO  [com.wondertek.oes.bc.service.send.MyKafkaProducer] - inner kafka send #topic=inner_test#key=douzi0#json=liyuehua0#推送成功===========
 
07-11 12:41:27.995 INFO  [com.wondertek.oes.bc.service.send.KafkaConsumer] - inner kafka receive #key=douzi0#value=liyuehua0
07-11 12:41:28.005 INFO  [com.wondertek.oes.bc.service.send.MyKafkaProducer] - out kafka send #topic=out_test#key=douziout0#json=fanbingbing0#推送成功===========
07-11 12:41:28.013 INFO  [com.wondertek.oes.bc.service.send.KafkaConsumer] - out kafka receive #key=douziout0#value=fanbingbing0

总结

以上为个人经验,希望能给大家一个参考,也希望大家多多支持。

原文链接:https://blog.csdn.net/qq_16116549/article/details/95465169

查看更多关于springboot之配置双kafka全过程的详细内容...

  阅读:15次