kafka

2023/7/5

# kafka

# 1 window 安装 kafka

https://kafka.apache.org/downloads (opens new window)

# 2 启动Zookeeper

bin\windows\zookeeper-server-start.bat config\zookeeper.properties 

# 3 启动kafka服务端命令

bin\windows\kafka-server-start.bat config\server.properties

# 4 生产者-发送信息

/**
 * kafka信息管理
 *
 * @author zhengwen
 **/
@Slf4j
@RestController
@RequestMapping("/kafka/push")
public class KafkaPushController {
    @Autowired
    private KafkaTemplate kafkaTemplate;

    /**
     * kafka的信息push发送
     *
     * @param kafkaMessageVo kafka信息对象
     * @return 推送结果
     */
    @PostMapping("/sendMsg")
    public void sendMsg(@RequestBody KafkaMessageVo kafkaMessageVo) {
        String topic = kafkaMessageVo.getTopic();
        String msg = kafkaMessageVo.getMessage();
        //SpringBoot3的写法
        CompletableFuture<SendResult<String, Object>> completableFuture = kafkaTemplate.send(topic, UUID.randomUUID().toString(), msg);
        //执行成功回调
        completableFuture.thenAccept(result -> {
            log.info("发送成功:{}", JSONUtil.toJsonStr(kafkaMessageVo));
        });
        //执行失败回调
        completableFuture.exceptionally(e -> {
            log.info("发送失败", JSONUtil.toJsonStr(kafkaMessageVo), e);
            return null;
        });
    }
}

# 5 消费者-消费信息

/**
 * @author lichengcan
 */
@Component
@Slf4j
public class KafkaMessageConsumer {

    /**
     * kafka的信息消费
     * @param record
     */
    @KafkaListener(topics = "${xiaocan.analyze.device.flow.topic.consumer}", groupId = "dc-device-flow-analyze-0228")
    public void consumeMessage(ConsumerRecord<String, String> record) {
        String key = record.key();
        String value = record.value();
        String topic = record.topic();
        int partition = record.partition();
        long offset = record.offset();

        log.info("开始消费");
        log.info("Topic: " + topic);
        log.info("Partition: " + partition);
        log.info("Offset: " + offset);
        log.info("Key: " + key);
        log.info("Value: " + value);
    }
}

# 6 kafka配置信息

  #kafka配置
  kafka:
    bootstrap-servers: localhost:9092
    client-id: dc-device-flow-analyze
    consumer:
      group-id: dc-device-flow-analyze-0228
      max-poll-records: 10
      #Kafka中没有初始偏移或如果当前偏移在服务器上不再存在时,默认区最新 ,有三个选项 【latest, earliest, none】
      auto-offset-reset: earliest
      #是否开启自动提交
      enable-auto-commit: false
      #自动提交的时间间隔
      auto-commit-interval: 1000
    producer:
      acks: 1
      batch-size: 4096
      buffer-memory: 40960000
      client-id: dc-device-flow-analyze-producer
      compression-type: zstd
      value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
      retries: 3
      properties:
        spring.json.add.type.headers: false
        max.request.size: 126951500
    listener:
      ack-mode: MANUAL_IMMEDIATE
      concurrency: 1  #推荐设置为topic的分区数
      type: BATCH #开启批量监听

#消费topic配置
xiaocan:
  analyze:
    device:
      flow:
        topic:
          consumer: test

# 7 maven依赖

<!--SpringBoot 3 整合kafka-->
<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>