好得很程序员自学网

<tfoot draggable='sEl'></tfoot>

使用spring boot 整合kafka,延迟启动消费者

spring boot 整合kafka,延迟启动消费者

spring boot整合kafka的时候一般使用@KafkaListener来设置消费者,但是这种方式在spring启动的时候就会立即开启消费者。如果有需要根据配置信息延迟开启指定的消费者就不能使用这种方式。

参考了类:KafkaListenerAnnotationBeanPostProcessor,我提取了一部分代码。可以根据需要随时动态的开启消费者。还可以很方便的启动多个消费者。

为了方便使用,我自定义了一个注解:

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

import org.springframework.kafka.annotation.TopicPartition;

import java.lang.annotation.ElementType;

import java.lang.annotation.Retention;

import java.lang.annotation.RetentionPolicy;

import java.lang.annotation.Target;

@Target ({ ElementType.METHOD})

@Retention (RetentionPolicy.RUNTIME)

public @interface DelayKafkaConsumer {

     String id() default "" ;

     String[] topics() default {};

     String errorHandler() default "" ;

     String groupId() default "" ;

     TopicPartition[] topicPartitions() default {};

     String beanRef() default "__listener" ;

}

配合注解使用的factory:

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

92

93

94

95

96

97

98

99

100

101

102

103

104

105

106

107

108

109

110

111

112

113

114

115

116

117

118

119

120

121

122

123

124

125

126

127

128

129

130

131

132

133

134

135

136

137

138

139

140

141

142

143

144

145

146

147

148

149

150

151

152

153

154

155

156

157

158

159

160

161

162

163

164

165

166

167

168

169

170

171

172

173

174

175

176

177

178

179

180

181

182

183

184

185

186

187

188

189

190

191

192

193

194

195

196

197

198

199

200

201

202

203

204

205

206

207

208

209

210

211

212

213

214

215

216

217

218

219

220

221

222

223

224

225

226

227

228

229

230

231

232

233

234

235

236

237

238

239

240

241

242

243

244

245

246

247

248

249

250

251

252

253

254

255

256

257

258

259

260

261

262

263

264

265

266

267

268

269

270

271

272

273

274

275

276

277

278

279

280

281

282

283

284

285

286

287

288

289

290

291

292

293

294

295

296

297

298

299

300

301

302

303

304

305

306

307

308

309

310

311

312

313

314

315

316

317

318

319

320

321

322

323

324

325

326

327

328

329

330

331

332

333

334

335

336

337

338

339

340

341

342

343

344

345

346

347

348

349

350

351

352

353

354

355

356

357

358

359

360

361

362

363

364

365

366

367

368

369

370

371

372

373

374

375

376

377

378

379

380

381

382

383

384

385

386

387

388

389

390

391

392

393

394

395

396

397

398

399

400

401

402

403

404

405

406

407

408

409

410

411

412

413

414

415

416

417

418

419

420

421

422

423

424

425

426

427

428

429

430

431

432

433

434

435

436

437

438

439

440

441

442

443

444

445

446

447

448

449

450

451

452

453

454

455

456

457

458

459

460

461

462

463

464

465

466

467

468

469

470

471

472

473

474

475

476

477

478

479

480

481

482

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

import org.springframework.aop.framework.Advised;

import org.springframework.aop.support.AopUtils;

import org.springframework.beans.BeansException;

import org.springframework.beans.factory.BeanFactory;

import org.springframework.beans.factory.BeanFactoryAware;

import org.springframework.beans.factory.ListableBeanFactory;

import org.springframework.beans.factory.ObjectFactory;

import org.springframework.beans.factory.config.*;

import org.springframework.context.expression.StandardBeanExpressionResolver;

import org.springframework.core.MethodIntrospector;

import org.springframework.core.annotation.AnnotationUtils;

import org.springframework.core.convert.converter.Converter;

import org.springframework.core.convert.converter.GenericConverter;

import org.springframework.format.Formatter;

import org.springframework.format.FormatterRegistry;

import org.springframework.format.support.DefaultFormattingConversionService;

import org.springframework.kafka.annotation.KafkaListenerConfigurer;

import org.springframework.kafka.annotation.PartitionOffset;

import org.springframework.kafka.annotation.TopicPartition;

import org.springframework.kafka.config.KafkaListenerEndpoint;

import org.springframework.kafka.config.KafkaListenerEndpointRegistrar;

import org.springframework.kafka.config.MethodKafkaListenerEndpoint;

import org.springframework.kafka.listener.KafkaListenerErrorHandler;

import org.springframework.kafka.support.KafkaNull;

import org.springframework.kafka.support.TopicPartitionInitialOffset;

import org.springframework.messaging.converter.GenericMessageConverter;

import org.springframework.messaging.handler.annotation.support.*;

import org.springframework.messaging.handler.invocation.HandlerMethodArgumentResolver;

import org.springframework.messaging.handler.invocation.InvocableHandlerMethod;

import org.springframework.stereotype.Service;

import org.springframework.util.Assert;

import org.springframework.util.ReflectionUtils;

import org.springframework.util.StringUtils;

import java.lang.reflect.Method;

import java.util.*;

import java.util.concurrent.atomic.AtomicInteger;

 

@Service

public class MyKafkaConsumerFactory implements KafkaListenerConfigurer,BeanFactoryAware {

     private static final Logger logger = LoggerFactory.getLogger(MyKafkaConsumerFactory. class );

     private KafkaListenerEndpointRegistrar kafkaListenerEndpointRegistrar;

     private final AtomicInteger counter = new AtomicInteger();

     private BeanFactory beanFactory;

     private BeanExpressionResolver resolver = new StandardBeanExpressionResolver();

     private BeanExpressionContext expressionContext;

     private final ListenerScope listenerScope = new ListenerScope();

     private final KafkaHandlerMethodFactoryAdapter messageHandlerMethodFactory =

             new KafkaHandlerMethodFactoryAdapter();

 

     @Override

     public void configureKafkaListeners(KafkaListenerEndpointRegistrar registrar) {

         this .kafkaListenerEndpointRegistrar = registrar;

         addFormatters(messageHandlerMethodFactory.defaultFormattingConversionService);

     }

 

     public void startConsumer(KafkaListenerEndpoint endpoint){

         kafkaListenerEndpointRegistrar.registerEndpoint(endpoint);

     }

 

     public void startConsumer(Object target){

         logger.info( "start consumer {} ..." ,target.getClass());

         Class<?> targetClass = AopUtils.getTargetClass(target);

         Map<Method, Set<DelayKafkaConsumer>> annotatedMethods = MethodIntrospector.selectMethods(targetClass,

                 new MethodIntrospector.MetadataLookup<Set<DelayKafkaConsumer>>() {

 

                     @Override

                     public Set<DelayKafkaConsumer> inspect(Method method) {

                         Set<DelayKafkaConsumer> listenerMethods = findListenerAnnotations(method);

                         return (!listenerMethods.isEmpty() ? listenerMethods : null );

                     }

 

                 });

         if (annotatedMethods.size()== 0 )

             throw new IllegalArgumentException(target.getClass()+ " need have method with @DelayKafkaConsumer" );

         for (Map.Entry<Method, Set<DelayKafkaConsumer>> entry : annotatedMethods.entrySet()) {

             Method method = entry.getKey();

             logger.info( "find message listen handler method : {} , object : {}" ,method.getName(),target.getClass());

             for (DelayKafkaConsumer listener : entry.getValue()) {

                 if (listener.topics().length== 0 ) {

                     logger.info( "topics value is empty , will skip it , method : {} , target object : {}" ,method.getName(),target.getClass());

                     continue ;

                 }

                 processKafkaListener(listener,method,target);

                 logger.info( "register method {} success , target object : {}" ,method.getName(),target.getClass());

             }

         }

         logger.info( "{} consumer start complete ." ,target.getClass());

     }

 

     protected void processKafkaListener(DelayKafkaConsumer kafkaListener, Method method, Object bean) {

         Method methodToUse = checkProxy(method, bean);

         MethodKafkaListenerEndpoint endpoint = new MethodKafkaListenerEndpoint();

         endpoint.setMethod(methodToUse);

         endpoint.setBeanFactory( this .beanFactory);

         String errorHandlerBeanName = resolveExpressionAsString(kafkaListener.errorHandler(), "errorHandler" );

         if (StringUtils.hasText(errorHandlerBeanName)) {

             endpoint.setErrorHandler( this .beanFactory.getBean(errorHandlerBeanName, KafkaListenerErrorHandler. class ));

         }

         processListener(endpoint, kafkaListener, bean, methodToUse);

     }

 

     protected void processListener(MethodKafkaListenerEndpoint<?, ?> endpoint, DelayKafkaConsumer kafkaListener, Object bean,

                                    Object adminTarget) {

         String beanRef = kafkaListener.beanRef();

         if (StringUtils.hasText(beanRef)) {

             this .listenerScope.addListener(beanRef, bean);

         }

         endpoint.setBean(bean);

         endpoint.setMessageHandlerMethodFactory( this .messageHandlerMethodFactory);

         endpoint.setId(getEndpointId(kafkaListener));

         endpoint.setGroupId(getEndpointGroupId(kafkaListener, endpoint.getId()));

         endpoint.setTopics(resolveTopics(kafkaListener));

         endpoint.setTopicPartitions(resolveTopicPartitions(kafkaListener));

         kafkaListenerEndpointRegistrar.registerEndpoint(endpoint);

         if (StringUtils.hasText(beanRef)) {

             this .listenerScope.removeListener(beanRef);

         }

     }

 

     private String getEndpointId(DelayKafkaConsumer kafkaListener) {

         if (StringUtils.hasText(kafkaListener.id())) {

             return resolve(kafkaListener.id());

         }

         else {

             return "Custom-Consumer" + this .counter.getAndIncrement();

         }

     }

 

     private String getEndpointGroupId(DelayKafkaConsumer kafkaListener, String id) {

         String groupId = null ;

         if (StringUtils.hasText(kafkaListener.groupId())) {

             groupId = resolveExpressionAsString(kafkaListener.groupId(), "groupId" );

         }

         if (groupId == null && StringUtils.hasText(kafkaListener.id())) {

             groupId = id;

         }

         return groupId;

     }

 

     private String[] resolveTopics(DelayKafkaConsumer kafkaListener) {

         String[] topics = kafkaListener.topics();

         List<String> result = new ArrayList<>();

         if (topics.length > 0 ) {

             for ( int i = 0 ; i < topics.length; i++) {

                 Object topic = resolveExpression(topics[i]);

                 resolveAsString(topic, result);

             }

         }

         return result.toArray( new String[result.size()]);

     }

 

     private void resolveAsString(Object resolvedValue, List<String> result) {

         if (resolvedValue instanceof String[]) {

             for (Object object : (String[]) resolvedValue) {

                 resolveAsString(object, result);

             }

         }

         else if (resolvedValue instanceof String) {

             result.add((String) resolvedValue);

         }

         else if (resolvedValue instanceof Iterable) {

             for (Object object : (Iterable<Object>) resolvedValue) {

                 resolveAsString(object, result);

             }

         }

         else {

             throw new IllegalArgumentException(String.format(

                     "@DelayKafkaConsumer can't resolve '%s' as a String" , resolvedValue));

         }

     }

 

     private TopicPartitionInitialOffset[] resolveTopicPartitions(DelayKafkaConsumer kafkaListener) {

         TopicPartition[] topicPartitions = kafkaListener.topicPartitions();

         List<TopicPartitionInitialOffset> result = new ArrayList<>();

         if (topicPartitions.length > 0 ) {

             for (TopicPartition topicPartition : topicPartitions) {

                 result.addAll(resolveTopicPartitionsList(topicPartition));

             }

         }

         return result.toArray( new TopicPartitionInitialOffset[result.size()]);

     }

 

     private List<TopicPartitionInitialOffset> resolveTopicPartitionsList(TopicPartition topicPartition) {

         Object topic = resolveExpression(topicPartition.topic());

         Assert.state(topic instanceof String,

                 "topic in @TopicPartition must resolve to a String, not " + topic.getClass());

         Assert.state(StringUtils.hasText((String) topic), "topic in @TopicPartition must not be empty" );

         String[] partitions = topicPartition.partitions();

         PartitionOffset[] partitionOffsets = topicPartition.partitionOffsets();

         Assert.state(partitions.length > 0 || partitionOffsets.length > 0 ,

                 "At least one 'partition' or 'partitionOffset' required in @TopicPartition for topic '" + topic + "'" );

         List<TopicPartitionInitialOffset> result = new ArrayList<>();

         for ( int i = 0 ; i < partitions.length; i++) {

             resolvePartitionAsInteger((String) topic, resolveExpression(partitions[i]), result);

         }

 

         for (PartitionOffset partitionOffset : partitionOffsets) {

             Object partitionValue = resolveExpression(partitionOffset.partition());

             Integer partition;

             if (partitionValue instanceof String) {

                 Assert.state(StringUtils.hasText((String) partitionValue),

                         "partition in @PartitionOffset for topic '" + topic + "' cannot be empty" );

                 partition = Integer.valueOf((String) partitionValue);

             }

             else if (partitionValue instanceof Integer) {

                 partition = (Integer) partitionValue;

             }

             else {

                 throw new IllegalArgumentException(String.format(

                         "@PartitionOffset for topic '%s' can't resolve '%s' as an Integer or String, resolved to '%s'" ,

                         topic, partitionOffset.partition(), partitionValue.getClass()));

             }

 

             Object initialOffsetValue = resolveExpression(partitionOffset.initialOffset());

             Long initialOffset;

             if (initialOffsetValue instanceof String) {

                 Assert.state(StringUtils.hasText((String) initialOffsetValue),

                         "'initialOffset' in @PartitionOffset for topic '" + topic + "' cannot be empty" );

                 initialOffset = Long.valueOf((String) initialOffsetValue);

             }

             else if (initialOffsetValue instanceof Long) {

                 initialOffset = (Long) initialOffsetValue;

             }

             else {

                 throw new IllegalArgumentException(String.format(

                         "@PartitionOffset for topic '%s' can't resolve '%s' as a Long or String, resolved to '%s'" ,

                         topic, partitionOffset.initialOffset(), initialOffsetValue.getClass()));

             }

 

             Object relativeToCurrentValue = resolveExpression(partitionOffset.relativeToCurrent());

             Boolean relativeToCurrent;

             if (relativeToCurrentValue instanceof String) {

                 relativeToCurrent = Boolean.valueOf((String) relativeToCurrentValue);

             }

             else if (relativeToCurrentValue instanceof Boolean) {

                 relativeToCurrent = (Boolean) relativeToCurrentValue;

             }

             else {

                 throw new IllegalArgumentException(String.format(

                         "@PartitionOffset for topic '%s' can't resolve '%s' as a Boolean or String, resolved to '%s'" ,

                         topic, partitionOffset.relativeToCurrent(), relativeToCurrentValue.getClass()));

             }

 

             TopicPartitionInitialOffset topicPartitionOffset =

                     new TopicPartitionInitialOffset((String) topic, partition, initialOffset, relativeToCurrent);

             if (!result.contains(topicPartitionOffset)) {

                 result.add(topicPartitionOffset);

             }

             else {

                 throw new IllegalArgumentException(

                         String.format( "@TopicPartition can't have the same partition configuration twice: [%s]" ,

                                 topicPartitionOffset));

             }

         }

         return result;

     }

 

     private void resolvePartitionAsInteger(String topic, Object resolvedValue,

                                            List<TopicPartitionInitialOffset> result) {

         if (resolvedValue instanceof String[]) {

             for (Object object : (String[]) resolvedValue) {

                 resolvePartitionAsInteger(topic, object, result);

             }

         }

         else if (resolvedValue instanceof String) {

             Assert.state(StringUtils.hasText((String) resolvedValue),

                     "partition in @TopicPartition for topic '" + topic + "' cannot be empty" );

             result.add( new TopicPartitionInitialOffset(topic, Integer.valueOf((String) resolvedValue)));

         }

         else if (resolvedValue instanceof Integer[]) {

             for (Integer partition : (Integer[]) resolvedValue) {

                 result.add( new TopicPartitionInitialOffset(topic, partition));

             }

         }

         else if (resolvedValue instanceof Integer) {

             result.add( new TopicPartitionInitialOffset(topic, (Integer) resolvedValue));

         }

         else if (resolvedValue instanceof Iterable) {

             for (Object object : (Iterable<Object>) resolvedValue) {

                 resolvePartitionAsInteger(topic, object, result);

             }

         }

         else {

             throw new IllegalArgumentException(String.format(

                     "@DelayKafkaConsumer for topic '%s' can't resolve '%s' as an Integer or String" , topic, resolvedValue));

         }

     }

 

     private Set<DelayKafkaConsumer> findListenerAnnotations(Method method) {

         Set<DelayKafkaConsumer> listeners = new HashSet<>();

         DelayKafkaConsumer ann = AnnotationUtils.findAnnotation(method, DelayKafkaConsumer. class );

         if (ann != null ) {

             listeners.add(ann);

         }

         return listeners;

     }

 

     private Method checkProxy(Method methodArg, Object bean) {

         Method method = methodArg;

         if (AopUtils.isJdkDynamicProxy(bean)) {

             try {

                 method = bean.getClass().getMethod(method.getName(), method.getParameterTypes());

                 Class<?>[] proxiedInterfaces = ((Advised) bean).getProxiedInterfaces();

                 for (Class<?> iface : proxiedInterfaces) {

                     try {

                         method = iface.getMethod(method.getName(), method.getParameterTypes());

                         break ;

                     }

                     catch (NoSuchMethodException noMethod) {

                     }

                 }

             }

             catch (SecurityException ex) {

                 ReflectionUtils.handleReflectionException(ex);

             }

             catch (NoSuchMethodException ex) {

                 throw new IllegalStateException(String.format(

                         "target method '%s' found on bean target class '%s', " +

                                 "but not found in any interface(s) for bean JDK proxy. Either " +

                                 "pull the method up to an interface or switch to subclass (CGLIB) " +

                                 "proxies by setting proxy-target-class/proxyTargetClass " +

                                 "attribute to 'true'" , method.getName(), method.getDeclaringClass().getSimpleName()), ex);

             }

         }

         return method;

     }

 

     @Override

     public void setBeanFactory(BeanFactory beanFactory) throws BeansException {

         this .beanFactory = beanFactory;

         if (beanFactory instanceof ConfigurableListableBeanFactory) {

             this .resolver = ((ConfigurableListableBeanFactory) beanFactory).getBeanExpressionResolver();

             this .expressionContext = new BeanExpressionContext((ConfigurableListableBeanFactory) beanFactory,

                     this .listenerScope);

         }

     }

 

     private String resolveExpressionAsString(String value, String attribute) {

         Object resolved = resolveExpression(value);

         if (resolved instanceof String) {

             return (String) resolved;

         }

         else {

             throw new IllegalStateException( "The [" + attribute + "] must resolve to a String. "

                     + "Resolved to [" + resolved.getClass() + "] for [" + value + "]" );

         }

     }

 

     private Object resolveExpression(String value) {

         String resolvedValue = resolve(value);

         return this .resolver.evaluate(resolvedValue, this .expressionContext);

     }

 

     /**

      * Resolve the specified value if possible.

      * @param value the value to resolve

      * @return the resolved value

      * @see ConfigurableBeanFactory#resolveEmbeddedValue

      */

     private String resolve(String value) {

         if ( this .beanFactory instanceof ConfigurableBeanFactory) {

             return ((ConfigurableBeanFactory) this .beanFactory).resolveEmbeddedValue(value);

         }

         return value;

     }

 

     private void addFormatters(FormatterRegistry registry) {

         for (Converter<?, ?> converter : getBeansOfType(Converter. class )) {

             registry.addConverter(converter);

         }

         for (GenericConverter converter : getBeansOfType(GenericConverter. class )) {

             registry.addConverter(converter);

         }

         for (org.springframework.format.Formatter<?> formatter : getBeansOfType(Formatter. class )) {

             registry.addFormatter(formatter);

         }

     }

 

     private <T> Collection<T> getBeansOfType(Class<T> type) {

         if ( this .beanFactory instanceof ListableBeanFactory) {

             return ((ListableBeanFactory) this .beanFactory).getBeansOfType(type).values();

         } else {

             return Collections.emptySet();

         }

     }

 

     private static class ListenerScope implements Scope {

         private final Map<String, Object> listeners = new HashMap<>();

         ListenerScope() {

             super ();

         }

 

         public void addListener(String key, Object bean) {

             this .listeners.put(key, bean);

         }

 

         public void removeListener(String key) {

             this .listeners.remove(key);

         }

 

         @Override

         public Object get(String name, ObjectFactory<?> objectFactory) {

             return this .listeners.get(name);

         }

 

         @Override

         public Object remove(String name) {

             return null ;

         }

 

         @Override

         public void registerDestructionCallback(String name, Runnable callback) {

         }

 

         @Override

         public Object resolveContextualObject(String key) {

             return this .listeners.get(key);

         }

 

         @Override

         public String getConversationId() {

             return null ;

         }

 

     }

 

     private class KafkaHandlerMethodFactoryAdapter implements MessageHandlerMethodFactory {

 

         private final DefaultFormattingConversionService defaultFormattingConversionService = new DefaultFormattingConversionService();

 

         private MessageHandlerMethodFactory messageHandlerMethodFactory;

 

         public void setMessageHandlerMethodFactory(MessageHandlerMethodFactory kafkaHandlerMethodFactory1) {

             this .messageHandlerMethodFactory = kafkaHandlerMethodFactory1;

         }

 

         @Override

         public InvocableHandlerMethod createInvocableHandlerMethod(Object bean, Method method) {

             return getMessageHandlerMethodFactory().createInvocableHandlerMethod(bean, method);

         }

 

         private MessageHandlerMethodFactory getMessageHandlerMethodFactory() {

             if ( this .messageHandlerMethodFactory == null ) {

                 this .messageHandlerMethodFactory = createDefaultMessageHandlerMethodFactory();

             }

             return this .messageHandlerMethodFactory;

         }

 

         private MessageHandlerMethodFactory createDefaultMessageHandlerMethodFactory() {

             DefaultMessageHandlerMethodFactory defaultFactory = new DefaultMessageHandlerMethodFactory();

             defaultFactory.setBeanFactory(MyKafkaConsumerFactory. this .beanFactory);

 

             ConfigurableBeanFactory cbf =

                     (MyKafkaConsumerFactory. this .beanFactory instanceof ConfigurableBeanFactory ?

                             (ConfigurableBeanFactory) MyKafkaConsumerFactory. this .beanFactory : null );

 

             defaultFactory.setConversionService( this .defaultFormattingConversionService);

             List<HandlerMethodArgumentResolver> argumentResolvers = new ArrayList<>();

 

             // Annotation-based argument resolution

             argumentResolvers.add( new HeaderMethodArgumentResolver( this .defaultFormattingConversionService, cbf));

             argumentResolvers.add( new HeadersMethodArgumentResolver());

 

             // Type-based argument resolution

             final GenericMessageConverter messageConverter = new GenericMessageConverter( this .defaultFormattingConversionService);

             argumentResolvers.add( new MessageMethodArgumentResolver(messageConverter));

             argumentResolvers.add( new PayloadArgumentResolver(messageConverter) {

 

                 @Override

                 protected boolean isEmptyPayload(Object payload) {

                     return payload == null || payload instanceof KafkaNull;

                 }

 

             });

             defaultFactory.setArgumentResolvers(argumentResolvers);

             defaultFactory.afterPropertiesSet();

             return defaultFactory;

         }

     }

}

通过startConsumer来启动一个消费者(多次调用会启动多个消费者)。target必须至少包含一个有@DelayKafkaConsumer注解的方法。这里类似@KafkaListener。我去掉了一部分功能,保留了比较常用的部分。

这里提供了一个通过注解的方式在spring boot项目中动态控制consumer的方法。还有其他的方法来达到这种效果,不过我觉得这种方法比较方便。

java项目集成springboot使用kafka消费者,启动失败报错 Failed to construct kafka consumer

之前博客里面提到本公司为物联网项目。项目中使用mqtt+kafka进行与设备端的通讯,之前的协议格式为json格式,现在改成字节数组byte[]格式进行通信。

集成springboot后,具体的demo网上很多,接下来有时间会出一份kafka的demo。

报错信息如下:

Failed to start bean 'org.springframework.kafka.config.internalKafkaListenerEndpointRegistry';
nested exception is org.apache.kafka.common.KafkaException:Failed to construct kafka consumer

原因分析:

之前json格式通信时候,构建kafka消费工厂的时候,其中ConcurrentMessageListenerContainer的key为String类型,而value现在为byte[]类型,所以构建消费者工厂的时候需要指定正确的value类型。

代码如下:

?

1

2

3

4

5

6

7

public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, byte []>> kafkaListenerContainerByteFactory() {

     ConcurrentKafkaListenerContainerFactory<String, byte []> factory = new ConcurrentKafkaListenerContainerFactory<String, byte []>();

     factory.setConsumerFactory(consumerByteFactory());

     factory.setConcurrency(concurrency);

     factory.getContainerProperties().setPollTimeout( 1500 );

     return factory;

    }

整体kafka生产者+kafka消费者的demo会在接下来的博客中陆续整理。

以上为个人经验,希望能给大家一个参考,也希望大家多多支持。

原文链接:https://blog.csdn.net/weixin_42170534/article/details/80892411

查看更多关于使用spring boot 整合kafka,延迟启动消费者的详细内容...

  阅读:30次