public class ConcurrentKafkaListenerContainerFactoryConfigurer { private KafkaProperties properties; /** * Set the {@link KafkaProperties} to use. * @param properties the properties */ void setKafkaProperties(KafkaProperties properties) { this.properties = properties; } /** * Configure the specified Kafka listener container factory. The factory can be * further tuned and default settings can be overridden. * @param listenerContainerFactory the {@link ConcurrentKafkaListenerContainerFactory} * instance to configure * @param consumerFactory the {@link ConsumerFactory} to use */ public void configure( ConcurrentKafkaListenerContainerFactory listenerContainerFactory, ConsumerFactory consumerFactory) { listenerContainerFactory.setConsumerFactory(consumerFactory); Listener container = this.properties.getListener(); ContainerProperties containerProperties = listenerContainerFactory .getContainerProperties(); if (container.getAckMode() != null) { containerProperties.setAckMode(container.getAckMode()); } if (container.getAckCount() != null) { containerProperties.setAckCount(container.getAckCount()); } if (container.getAckTime() != null) { containerProperties.setAckTime(container.getAckTime()); } if (container.getPollTimeout() != null) { containerProperties.setPollTimeout(container.getPollTimeout()); } if (container.getConcurrency() != null) { listenerContainerFactory.setConcurrency(container.getConcurrency()); } }}
spring integration是spring关于Enterprise Integration Patterns的实现,而spring integration kafka则基于spring for apache kafka提供了inbound以及outbound channel的适配器
Starting from version 2.0 version this project is a complete rewrite based on the new spring-kafka project which uses the pure java Producer and Consumer clients provided by Kafka 0.9.x.x and 0.10.x.x
这个的话,没有自动配置,又引入了integration相关的概念,整体来讲,相对复杂一些。
consumer配置
@Bean public KafkaMessageListenerContainer container( ConsumerFactory kafkaConsumerFactory) { return new KafkaMessageListenerContainer<>(kafkaConsumerFactory, new ContainerProperties(new TopicPartitionInitialOffset(topic, 0))); } /** * KAFKA consumer. */ @Bean public ConsumerFactory kafkaConsumerFactory() { Map props = new HashMap<>(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerAddress); props.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroup); props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true); props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 100); props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 15000); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); return new DefaultKafkaConsumerFactory<>(props); } /** * Channel adapter for message. */ @Bean public KafkaMessageDrivenChannelAdapter adapter(KafkaMessageListenerContainer container) { KafkaMessageDrivenChannelAdapter kafkaMessageDrivenChannelAdapter = new KafkaMessageDrivenChannelAdapter<>(container); kafkaMessageDrivenChannelAdapter.setOutputChannel(fromKafka()); return kafkaMessageDrivenChannelAdapter; } /** * Channel for KAFKA message received. */ @Bean public PollableChannel fromKafka() { return new QueueChannel(); }
producer配置
@Bean @ServiceActivator(inputChannel = "toKafka") public MessageHandler handler() throws Exception { KafkaProducerMessageHandler handler = new KafkaProducerMessageHandler<>(kafkaTemplate()); handler.setTopicExpression(new LiteralExpression(topic)); handler.setMessageKeyExpression(new LiteralExpression(messageKey)); return handler; } @Bean public ProducerFactory kafkaProducerFactory() { Map props = new HashMap<>(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerAddress); props.put(ProducerConfig.RETRIES_CONFIG, 0); props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384); props.put(ProducerConfig.LINGER_MS_CONFIG, 1); props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); return new DefaultKafkaProducerFactory<>(props); } @Bean public KafkaTemplate kafkaTemplate() { return new KafkaTemplate<>(kafkaProducerFactory()); }