博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
聊聊spring对kafka的集成方式
阅读量:6328 次
发布时间:2019-06-22

本文共 9333 字,大约阅读时间需要 31 分钟。

本文主要简单梳理梳理java应用中生产/消费kafka消息的一些使用选择。

可用类库

  • kafka client
  • spring for apache kafka
  • spring integration kafka
  • spring cloud stream binder kafka

除了官方的java api类库外,spring生态中又额外包装了很多,这里一一简单介绍下。

spring for apache kafka

基于java版的kafka client与spring进行集成

org.springframework.kafka
spring-kafka
1.2.2.RELEASE

与springboot的集成

对于springboot 1.5版本之前的话,需要自己去配置java configuration,而1.5版本以后则提供了auto config,具体详见org.springframework.boot.autoconfigure.kafka这个包,主要有

  • KafkaAutoConfiguration
    spring-boot-autoconfigure-1.5.7.RELEASE-sources.jar!/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfiguration.java
@Configuration@ConditionalOnClass(KafkaTemplate.class)@EnableConfigurationProperties(KafkaProperties.class)@Import(KafkaAnnotationDrivenConfiguration.class)public class KafkaAutoConfiguration {    private final KafkaProperties properties;    public KafkaAutoConfiguration(KafkaProperties properties) {        this.properties = properties;    }    @Bean    @ConditionalOnMissingBean(KafkaTemplate.class)    public KafkaTemplate
kafkaTemplate( ProducerFactory
kafkaProducerFactory, ProducerListener
kafkaProducerListener) { KafkaTemplate
kafkaTemplate = new KafkaTemplate
( kafkaProducerFactory); kafkaTemplate.setProducerListener(kafkaProducerListener); kafkaTemplate.setDefaultTopic(this.properties.getTemplate().getDefaultTopic()); return kafkaTemplate; } @Bean @ConditionalOnMissingBean(ProducerListener.class) public ProducerListener
kafkaProducerListener() { return new LoggingProducerListener
(); } @Bean @ConditionalOnMissingBean(ConsumerFactory.class) public ConsumerFactory
kafkaConsumerFactory() { return new DefaultKafkaConsumerFactory
( this.properties.buildConsumerProperties()); } @Bean @ConditionalOnMissingBean(ProducerFactory.class) public ProducerFactory
kafkaProducerFactory() { return new DefaultKafkaProducerFactory
( this.properties.buildProducerProperties()); }}
  • KafkaAnnotationDrivenConfiguration
    spring-boot-autoconfigure-1.5.7.RELEASE-sources.jar!/org/springframework/boot/autoconfigure/kafka/KafkaAnnotationDrivenConfiguration.java
@Configuration@ConditionalOnClass(EnableKafka.class)class KafkaAnnotationDrivenConfiguration {    private final KafkaProperties properties;    KafkaAnnotationDrivenConfiguration(KafkaProperties properties) {        this.properties = properties;    }    @Bean    @ConditionalOnMissingBean    public ConcurrentKafkaListenerContainerFactoryConfigurer kafkaListenerContainerFactoryConfigurer() {        ConcurrentKafkaListenerContainerFactoryConfigurer configurer = new ConcurrentKafkaListenerContainerFactoryConfigurer();        configurer.setKafkaProperties(this.properties);        return configurer;    }    @Bean    @ConditionalOnMissingBean(name = "kafkaListenerContainerFactory")    public ConcurrentKafkaListenerContainerFactory
kafkaListenerContainerFactory( ConcurrentKafkaListenerContainerFactoryConfigurer configurer, ConsumerFactory
kafkaConsumerFactory) { ConcurrentKafkaListenerContainerFactory
factory = new ConcurrentKafkaListenerContainerFactory
(); configurer.configure(factory, kafkaConsumerFactory); return factory; } @EnableKafka @ConditionalOnMissingBean(name = KafkaListenerConfigUtils.KAFKA_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME) protected static class EnableKafkaConfiguration { }}
  • ConcurrentKafkaListenerContainerFactoryConfigurer
    spring-boot-autoconfigure-1.5.7.RELEASE-sources.jar!/org/springframework/boot/autoconfigure/kafka/ConcurrentKafkaListenerContainerFactoryConfigurer.java
public class ConcurrentKafkaListenerContainerFactoryConfigurer {    private KafkaProperties properties;    /**     * Set the {@link KafkaProperties} to use.     * @param properties the properties     */    void setKafkaProperties(KafkaProperties properties) {        this.properties = properties;    }    /**     * Configure the specified Kafka listener container factory. The factory can be     * further tuned and default settings can be overridden.     * @param listenerContainerFactory the {@link ConcurrentKafkaListenerContainerFactory}     * instance to configure     * @param consumerFactory the {@link ConsumerFactory} to use     */    public void configure(            ConcurrentKafkaListenerContainerFactory
listenerContainerFactory, ConsumerFactory
consumerFactory) { listenerContainerFactory.setConsumerFactory(consumerFactory); Listener container = this.properties.getListener(); ContainerProperties containerProperties = listenerContainerFactory .getContainerProperties(); if (container.getAckMode() != null) { containerProperties.setAckMode(container.getAckMode()); } if (container.getAckCount() != null) { containerProperties.setAckCount(container.getAckCount()); } if (container.getAckTime() != null) { containerProperties.setAckTime(container.getAckTime()); } if (container.getPollTimeout() != null) { containerProperties.setPollTimeout(container.getPollTimeout()); } if (container.getConcurrency() != null) { listenerContainerFactory.setConcurrency(container.getConcurrency()); } }}

创建并发的多个KafkaMessageListenerContainer,相当于一个应用实例创建多个consumer

如果是1.5版本及以上的springboot,使用起来就比较简单了,注入kafkaTemplate直接发消息,然后简单配置一下就可以消费消息

spring integration kafka

spring integration是spring关于Enterprise Integration Patterns的实现,而spring integration kafka则基于spring for apache kafka提供了inbound以及outbound channel的适配器

Starting from version 2.0 version this project is a complete rewrite based on the new spring-kafka project which uses the pure java Producer and Consumer clients provided by Kafka 0.9.x.x and 0.10.x.x

这个的话,没有自动配置,又引入了integration相关的概念,整体来讲,相对复杂一些。

consumer配置

@Bean    public KafkaMessageListenerContainer
container( ConsumerFactory
kafkaConsumerFactory) { return new KafkaMessageListenerContainer<>(kafkaConsumerFactory, new ContainerProperties(new TopicPartitionInitialOffset(topic, 0))); } /** * KAFKA consumer. */ @Bean public ConsumerFactory
kafkaConsumerFactory() { Map
props = new HashMap<>(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerAddress); props.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroup); props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true); props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 100); props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 15000); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); return new DefaultKafkaConsumerFactory<>(props); } /** * Channel adapter for message. */ @Bean public KafkaMessageDrivenChannelAdapter
adapter(KafkaMessageListenerContainer
container) { KafkaMessageDrivenChannelAdapter
kafkaMessageDrivenChannelAdapter = new KafkaMessageDrivenChannelAdapter<>(container); kafkaMessageDrivenChannelAdapter.setOutputChannel(fromKafka()); return kafkaMessageDrivenChannelAdapter; } /** * Channel for KAFKA message received. */ @Bean public PollableChannel fromKafka() { return new QueueChannel(); }

producer配置

@Bean    @ServiceActivator(inputChannel = "toKafka")    public MessageHandler handler() throws Exception {        KafkaProducerMessageHandler
handler = new KafkaProducerMessageHandler<>(kafkaTemplate()); handler.setTopicExpression(new LiteralExpression(topic)); handler.setMessageKeyExpression(new LiteralExpression(messageKey)); return handler; } @Bean public ProducerFactory
kafkaProducerFactory() { Map
props = new HashMap<>(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerAddress); props.put(ProducerConfig.RETRIES_CONFIG, 0); props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384); props.put(ProducerConfig.LINGER_MS_CONFIG, 1); props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); return new DefaultKafkaProducerFactory<>(props); } @Bean public KafkaTemplate
kafkaTemplate() { return new KafkaTemplate<>(kafkaProducerFactory()); }

收发信息

@Autowired    @Qualifier("fromKafka")    private PollableChannel fromKafka;    @Autowired    @Qualifier("toKafka")    MessageChannel toKafka;    Message msg = fromKafka.receive(10000l);    toKafka.send(new GenericMessage(UUID.randomUUID().toString()));

spring cloud stream

基于Spring Integration构建,在spring cloud环境中又稍作加工,也稍微有点封装了.

具体详见以及

doc

转载地址:http://cnwoa.baihongyu.com/

你可能感兴趣的文章
JSTL的比较运算符有哪些,用例说说它们的作用
查看>>
怎么学JavaScript?来自前辈的分享 【转载】
查看>>
YYHS-Floor it(递推+矩阵乘法+快速幂)
查看>>
部署ceph mds node
查看>>
[LeetCode] Merge Sorted Array
查看>>
BZOJ1187:[HNOI2007]神奇游乐园——题解
查看>>
BZOJ3930:[CQOI2015]选数——题解
查看>>
CGI FASTCGI php-fpm
查看>>
在fragment中获取Application数据
查看>>
详解CSS float属性(转)
查看>>
利用JDBC连接Oracle数据库(转)
查看>>
Java基本语法-----java流程控制语句
查看>>
【面试 网络协议】【第十四篇】网络协议篇
查看>>
指令汇B新闻客户端开发(二) 主页面布局
查看>>
获取文本区域(textarea)行数【换行获取输入用户名个数】
查看>>
Mysql常用命令详解
查看>>
Android中实现iPhone开关
查看>>
是男人就下100层【第二层】——帮美女更衣(1)
查看>>
Web应用程序设计十个建议
查看>>
//……关于报文
查看>>