kafka
承灿 2023/7/5
# kafka
# 1 window 安装 kafka
https://kafka.apache.org/downloads (opens new window)
# 2 启动Zookeeper
bin\windows\zookeeper-server-start.bat config\zookeeper.properties
# 3 启动kafka服务端命令
bin\windows\kafka-server-start.bat config\server.properties
# 4 生产者-发送信息
/**
* kafka信息管理
*
* @author zhengwen
**/
@Slf4j
@RestController
@RequestMapping("/kafka/push")
public class KafkaPushController {
@Autowired
private KafkaTemplate kafkaTemplate;
/**
* kafka的信息push发送
*
* @param kafkaMessageVo kafka信息对象
* @return 推送结果
*/
@PostMapping("/sendMsg")
public void sendMsg(@RequestBody KafkaMessageVo kafkaMessageVo) {
String topic = kafkaMessageVo.getTopic();
String msg = kafkaMessageVo.getMessage();
//SpringBoot3的写法
CompletableFuture<SendResult<String, Object>> completableFuture = kafkaTemplate.send(topic, UUID.randomUUID().toString(), msg);
//执行成功回调
completableFuture.thenAccept(result -> {
log.info("发送成功:{}", JSONUtil.toJsonStr(kafkaMessageVo));
});
//执行失败回调
completableFuture.exceptionally(e -> {
log.info("发送失败", JSONUtil.toJsonStr(kafkaMessageVo), e);
return null;
});
}
}
# 5 消费者-消费信息
/**
* @author lichengcan
*/
@Component
@Slf4j
public class KafkaMessageConsumer {
/**
* kafka的信息消费
* @param record
*/
@KafkaListener(topics = "${xiaocan.analyze.device.flow.topic.consumer}", groupId = "dc-device-flow-analyze-0228")
public void consumeMessage(ConsumerRecord<String, String> record) {
String key = record.key();
String value = record.value();
String topic = record.topic();
int partition = record.partition();
long offset = record.offset();
log.info("开始消费");
log.info("Topic: " + topic);
log.info("Partition: " + partition);
log.info("Offset: " + offset);
log.info("Key: " + key);
log.info("Value: " + value);
}
}
# 6 kafka配置信息
#kafka配置
kafka:
bootstrap-servers: localhost:9092
client-id: dc-device-flow-analyze
consumer:
group-id: dc-device-flow-analyze-0228
max-poll-records: 10
#Kafka中没有初始偏移或如果当前偏移在服务器上不再存在时,默认区最新 ,有三个选项 【latest, earliest, none】
auto-offset-reset: earliest
#是否开启自动提交
enable-auto-commit: false
#自动提交的时间间隔
auto-commit-interval: 1000
producer:
acks: 1
batch-size: 4096
buffer-memory: 40960000
client-id: dc-device-flow-analyze-producer
compression-type: zstd
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
retries: 3
properties:
spring.json.add.type.headers: false
max.request.size: 126951500
listener:
ack-mode: MANUAL_IMMEDIATE
concurrency: 1 #推荐设置为topic的分区数
type: BATCH #开启批量监听
#消费topic配置
xiaocan:
analyze:
device:
flow:
topic:
consumer: test
# 7 maven依赖
<!--SpringBoot 3 整合kafka-->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>