rabbitmq

2023/7/5

# rabbitmq

安装参考链接 (opens new window)

springboot整合rabbitmq参考 (opens new window)

# 1 依赖

<!--        rabbitMQ-->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

# 2 配置文件

# 配置rabbitmq服务
  rabbitmq:
    username: guest
    password: guest
    virtual-host: test
    host: localhost
    port: 5672

# 3 生产者

package com.example.community.controller;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

/**
 * @author: lichengcan
 * @date: 2023-07-04 17:26
 * @description rabbitmq 生产者
 **/
@Slf4j
@RestController
@RequestMapping("/firm")
public class SendFirmMsgController {

    @Autowired
    private RabbitTemplate rabbitTemplate;
    /**
     * 1、定义交换机
     */
    private String exchangeName = "exchange_firm";
    // 2、定义路由key
    private String routeKey1 = "爱国.小灿";
    private String routeKey2 = "哈哈.小峰";
    private String routeKey3 = "动作.小花";
    private String routeKey4 = "喜剧.小艹";

    @PostMapping("/send")
    public void sendMsg(){

        //发送信息到指定的交换器
        //并使用路由键进行匹配
        for (int i = 1; i <=50; i++) {
            // @params1: 交换机名称
            // @params2: 表示消息的路由键
            // @params3: 表示要发送的消息内容
            if(i%4==0){
                rabbitTemplate.convertAndSend(exchangeName,routeKey1,("爱国.小灿,说第"+i+"遍。").getBytes());
            }else if(i%4 ==1){
                rabbitTemplate.convertAndSend(exchangeName,routeKey2,("哈哈.小峰,说第"+i+"遍。").getBytes());
            }else if(i%4 ==2){
                rabbitTemplate.convertAndSend(exchangeName,routeKey3,("动作.小花,说第"+i+"遍。").getBytes());
            }else if(i%4 ==3){
                rabbitTemplate.convertAndSend(exchangeName,routeKey4,("喜剧.小艹,说第"+i+"遍。").getBytes());
            }
            log.info("发送第"+i);
        }
    }
}

# 4 消费者

package com.example.community.controller;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.web.bind.annotation.RestController;

/**
 * @author: lichengcan
 * @date: 2023-07-04 17:26
 * @description rabbitmq 消费者
 **/
@Slf4j
@RestController
public class MsgController {

    @RabbitListener(bindings = @QueueBinding(
            // 指定队列名字
            value = @Queue(value = "queue1",autoDelete = "false"),
            // 指定交换机的名字
            exchange = @Exchange(value = "exchange_firm",type = ExchangeTypes.TOPIC)
    ))
    @RabbitHandler
    public void consumrmsg1(String msg){
        log.info(" -------------->" + msg);
    }

    @RabbitListener(bindings = @QueueBinding(value = @Queue(value = "queue2",autoDelete = "false"),exchange = @Exchange(value = "exchange_firm",type = ExchangeTypes.TOPIC)))
    @RabbitHandler
    public void consumrmsg2(String msg){
        log.info(" -------------->" + msg);
    }

    @RabbitListener(bindings = @QueueBinding(value = @Queue(value = "queue3",autoDelete = "false"),exchange = @Exchange(value = "exchange_firm",type = ExchangeTypes.TOPIC)))
    @RabbitHandler
    public void consumrmsg3(String msg,String id){
        log.info(" -------------->" + msg+"------------"+id);
    }

    @RabbitListener(bindings = @QueueBinding(value = @Queue(value = "queue4",autoDelete = "false"),exchange = @Exchange(value = "exchange_firm",type = ExchangeTypes.TOPIC)))
    @RabbitHandler
    public void consumrmsg4(String msg){
        log.info(" -------------->" + msg);
    }


    /**
     * 手动消费
     */
    public void consumrmsg6(String msg){

    }
}

# 5 消费者-生产者的绑定关系

package com.example.community.rabbitmq;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * 主题消费类型配置
 * @Author xiaolei
 * @Date 2021/10/29 11:03
 **/
@Configuration
public class TopicRabbitConfig {

    /**
     *  给队列取名字
     * @return
     */
    @Bean
    public Queue firstQueue() {
        // durable:是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前连接有效
        // exclusive:默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable
        // autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。
        //   return new Queue("TestDirectQueue",true,true,false);

        //一般设置一下队列的持久化就好,其余两个就是默认false
        return new Queue ("queue1",true);
    }
    @Bean
    public Queue SecondQueue() {
        return new Queue ("queue2",true);
    }

    @Bean
    public Queue ThreeQueue() {
        return new Queue ("queue3",true);
    }

    @Bean
    public Queue FourQueue() {
        return new Queue ("queue4",true);
    }


    /**
     * 给交换机取名
     * @return
     */
    @Bean
    public TopicExchange topicExchange(){
        return new TopicExchange("exchange_firm",true,false);
    }

    /**
     * 绑定键可以使用通配符 * 和 # 进行模糊匹配,其中 * 匹配一个词,# 匹配零个或多个词
     * @return
     */
    @Bean
    public Binding bindingTopic1(){
        return BindingBuilder.bind(firstQueue()).to(topicExchange()).with("*.小灿");
    }

    @Bean
    public Binding bindingTopic2(){
        return BindingBuilder.bind(SecondQueue()).to(topicExchange()).with("哈哈.小峰");
    }

    @Bean
    public Binding bindingTopic3(){
        return BindingBuilder.bind(ThreeQueue()).to(topicExchange()).with("动作.小花");
    }
    @Bean
    public Binding bindingTopic4(){
        return BindingBuilder.bind(FourQueue()).to(topicExchange()).with("#.小艹");
    }

}