Consumer类
@Component
@Slf4j
public class KafkaConsumer {
@Autowired
private KafkaListenerEndpointRegistry registry;
/**
* 定时执行
*
* @param recordList
* @param acknowledgment
*/
@KafkaListener(id = "task1", topics = {"topic1"}, groupId = "groupId1", containerFactory = "taskFactory")
public void listen(String message, Acknowledgment acknowledgment) {
// 你的业务逻辑
acknowledgment.acknowledge();
}
@Scheduled(cron = "0 0 1 * * ?")
public void startListener() {
log.info("开启监听");
MessageListenerContainer container = registry.getListenerContainer("task1");
if (!container.isRunning()) {
container.start();
}
//恢复
container.resume();
}
@Scheduled(cron = "0 0 6 * * ?")
public void shutdownListener() {
log.info("关闭监听");
//暂停
MessageListenerContainer container = registry.getListenerContainer("task1");
container.pause();
}
/**
* kafka初始化配置
*
* @return
*/
private Map<String, Object> initProps() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "kafkaGroupId");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 100);
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 300000);
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 500000);
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 50);
props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, 51200);
props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 2000);
// 配置序列化
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return props;
}
/**
* kafka监听工厂
*
* @return
*/
@Bean("taskFactory")
public ConcurrentKafkaListenerContainerFactory taskFactory() {
ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
factory.setConsumerFactory(new DefaultKafkaConsumerFactory(initProps()));
factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL);
factory.getContainerProperties().setAckCount(10);
factory.getContainerProperties().setAckTime(10000);
factory.setConcurrency(30);
factory.setBatchListener(true);
factory.setAutoStartup(false);
return factory;
}
}
注意:
cron=“0 0 1 * * ?” 01:00:00开始消费
cron=“0 0 6 * * ?” 06:00:00停止消费
kafka的配置具体可以参考这篇文章:
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/71383.html