【RabbitMQ】rabbitmq在spring boot中的使用

news/2025/3/18 11:36:11

rabbitmq官网地址:https://www.rabbitmq.com/tutorials

下面介绍rabbitmq官网中七种使用方式在spring boot中如何使用

下面是基于 Spring Boot 使用 RabbitMQ 实现这七种模式的示例代码。假设已经引入了以下依赖:

Maven 依赖

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

 

1. Hello World! 模式

生产者和消费者直接发送和接收消息。

配置类

@Configuration
public class RabbitConfig {
    public static final String QUEUE_NAME = "hello";

    @Bean
    public Queue helloQueue() {
        return new Queue(QUEUE_NAME);
    }
}

生产者

@RestController
@RequestMapping("/hello")
public class HelloProducer {

    @Autowired
    private AmqpTemplate rabbitTemplate;

    @GetMapping("/send")
    public String send() {
        rabbitTemplate.convertAndSend(RabbitConfig.QUEUE_NAME, "Hello RabbitMQ!");
        return "Message sent!";
    }
}

消费者

@Component
public class HelloConsumer {

    @RabbitListener(queues = RabbitConfig.QUEUE_NAME)
    public void receive(String message) {
        System.out.println("Received: " + message);
    }
}

2. Work Queues(工作队列)

多个消费者从同一个队列中获取任务,进行任务分发。

配置类

@Configuration
public class WorkQueueConfig {
    public static final String WORK_QUEUE = "work_queue";

    @Bean
    public Queue workQueue() {
        return new Queue(WORK_QUEUE);
    }
}

生产者

@RestController
@RequestMapping("/work")
public class WorkQueueProducer {

    @Autowired
    private AmqpTemplate rabbitTemplate;

    @GetMapping("/send/{msg}")
    public String send(@PathVariable String msg) {
        rabbitTemplate.convertAndSend(WorkQueueConfig.WORK_QUEUE, msg);
        return "Work message sent!";
    }
}

消费者(多个)

@Component
public class WorkConsumer {

    @RabbitListener(queues = WorkQueueConfig.WORK_QUEUE)
    public void receive(String message) throws InterruptedException {
        System.out.println("Worker received: " + message);
        Thread.sleep(1000);  // 模拟任务耗时
    }
}

3. Publish/Subscribe(发布/订阅)

使用 Fanout Exchange 实现广播。

配置类

@Configuration
public class FanoutConfig {
    public static final String FANOUT_EXCHANGE = "fanout_exchange";

    @Bean
    public FanoutExchange fanoutExchange() {
        return new FanoutExchange(FANOUT_EXCHANGE);
    }

    @Bean
    public Queue queue1() {
        return new Queue("fanout.queue1");
    }

    @Bean
    public Queue queue2() {
        return new Queue("fanout.queue2");
    }

    @Bean
    public Binding binding1() {
        return BindingBuilder.bind(queue1()).to(fanoutExchange());
    }

    @Bean
    public Binding binding2() {
        return BindingBuilder.bind(queue2()).to(fanoutExchange());
    }
}

生产者

@RestController
@RequestMapping("/fanout")
public class FanoutProducer {

    @Autowired
    private AmqpTemplate rabbitTemplate;

    @GetMapping("/send")
    public String send() {
        rabbitTemplate.convertAndSend(FanoutConfig.FANOUT_EXCHANGE, "", "Fanout message!");
        return "Fanout message sent!";
    }
}

4. Routing(路由模式)

使用 Direct Exchange 和路由键实现定向投递。

配置类

@Configuration
public class DirectConfig {
    public static final String DIRECT_EXCHANGE = "direct_exchange";

    @Bean
    public DirectExchange directExchange() {
        return new DirectExchange(DIRECT_EXCHANGE);
    }

    @Bean
    public Queue errorQueue() {
        return new Queue("direct.error");
    }

    @Bean
    public Queue infoQueue() {
        return new Queue("direct.info");
    }

    @Bean
    public Binding errorBinding() {
        return BindingBuilder.bind(errorQueue()).to(directExchange()).with("error");
    }

    @Bean
    public Binding infoBinding() {
        return BindingBuilder.bind(infoQueue()).to(directExchange()).with("info");
    }
}

5. Topics(主题模式)

使用 Topic Exchange 实现多级路由。

配置类

@Configuration
public class TopicConfig {
    public static final String TOPIC_EXCHANGE = "topic_exchange";

    @Bean
    public TopicExchange topicExchange() {
        return new TopicExchange(TOPIC_EXCHANGE);
    }

    @Bean
    public Queue topicQueue1() {
        return new Queue("topic.queue1");
    }

    @Bean
    public Queue topicQueue2() {
        return new Queue("topic.queue2");
    }

    @Bean
    public Binding binding1() {
        return BindingBuilder.bind(topicQueue1()).to(topicExchange()).with("*.error");
    }

    @Bean
    public Binding binding2() {
        return BindingBuilder.bind(topicQueue2()).to(topicExchange()).with("log.#");
    }
}

6. RPC(远程过程调用)

生产者发送请求,消费者处理后返回响应。

RPC 服务端

@Component
public class RpcServer {

    @RabbitListener(queues = "rpc_queue")
    public String process(String message) {
        return "Processed: " + message;
    }
}

RPC 客户端

@RestController
@RequestMapping("/rpc")
public class RpcClient {

    @Autowired
    private AmqpTemplate rabbitTemplate;

    @GetMapping("/send/{msg}")
    public String send(@PathVariable String msg) {
        Object response = rabbitTemplate.convertSendAndReceive("rpc_queue", msg);
        return "RPC response: " + response;
    }
}

7. Publisher Confirms(发布者确认)

确保消息成功发送到 RabbitMQ 服务器。

配置类

@Configuration
public class ConfirmConfig {

    @Bean
    public CachingConnectionFactory connectionFactory() {
        CachingConnectionFactory factory = new CachingConnectionFactory("localhost");
        factory.setPublisherConfirms(true);
        return factory;
    }

    @Bean
    public RabbitTemplate rabbitTemplate(CachingConnectionFactory connectionFactory) {
        RabbitTemplate template = new RabbitTemplate(connectionFactory);
        template.setConfirmCallback((correlationData, ack, cause) -> {
            if (ack) {
                System.out.println("Message confirmed");
            } else {
                System.err.println("Message failed: " + cause);
            }
        });
        return template;
    }
}


http://www.niftyadmin.cn/n/5890688.html

相关文章

蓝桥杯刷题 Day1 高精度加法

蓝桥杯刷题 Day1 提示&#xff1a;写完文章后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档 文章目录 蓝桥杯刷题 Day1前言一、大数加法1. 解题思路2. 代码2.1主函数2.2 去除前导02.3 大数相加2.4 完整代码 二、KMP字符串匹配0. 知识点速记1. 解题思路…

MyBatis一对多查询方式

在 MyBatis 中&#xff0c;一对多查询是指一个实体对象&#xff08;如 Order&#xff09;关联多个子对象&#xff08;如 OrderItem&#xff09;。这种关系在数据库中通常通过外键实现&#xff0c;而在 MyBatis 中可以通过 resultMap 的嵌套集合&#xff08;<collection>&…

超越限制:大模型token管理与优化实践

前言 在大型语言模型&#xff08;LLM&#xff09;的应用中&#xff0c;token数量的管理是一个核心挑战。无论是模型的输入限制、计算资源的分配&#xff0c;还是成本的控制&#xff0c;token计数都至关重要。然而&#xff0c;当调用超过预期范围时&#xff0c;我们该如何应对&…

AI+python零代码机器学习小项目挖掘教程:鱼体特征数据挖掘与可视化实例

对于一组新的行业数据,我们要怎么挖掘呢? 以鱼体特征数据为例进行说明。 1.不同的领域的数据有不同的要求 制造业数据挖掘框架。 ##2. 零售业数据挖掘路径人货场三维分析: 顾客动线热力图 + 货架陈列优化(计算机视觉) 价格弹性模型 天气数据 → 动态定价策略 RFID数据 …

评委打分5个评委 去掉一个最高分和一个最低分 取平均分

一键替换max用min 按shiftF6 public static int getMin(int[]scores){int min scores[0];for (int i 0; i < scores.length; i) {if(scores[i]> min){min scores[i];}}return min;} 这里有和c/c不一样的知识点 c/c调用函数类似于java的方法,但是c/c的函数调用需要声明…

记第一次跟踪seatunnel的任务运行过程四——getJobConfigParser().parse()的动作

前绪 记第一次跟踪seatunnel的任务运行过程三——解析配置的具体方法getLogicalDag 正文 书接上文 ImmutablePair<List<Action>, Set<URL>> immutablePair getJobConfigParser().parse(null);在前一篇文章中说到getLogicDag()方法的第一行&#xff08;如…

结构型——装饰器模式

装饰器模式 装饰器是指能动态地为对象添加额外的功能的一种结构型设计模式。 特点 不修改原有代码的情况下&#xff0c;动态地扩展一个对象的功能。支持多个装饰器叠加使用透明性&#xff0c;装饰后的对象与原对象保持一致&#xff0c;客户端无需感知装饰过程 结构模式与实…

Word 小黑第21套

对应大猫22 设置表格为页面的80%&#xff1a;表布局 -属性 -表格 指定宽度80% 度量单位改成百分比 段落组 -中文版式 在表格上下方留一行空段&#xff08;如果表格太大改一下样式&#xff09;插入横线 边框线 &#xff08;右击横线 -图片 修改样式&#xff09; 段落 -取消对于…