【RabbitMQ】消息堆积、推拉模式

消息堆积

原因

消息堆积是指在消息队列中,待处理的消息数量超过了消费者处理能力,导致消息在队列中不断堆积的现象。通常有以下几种原因:

消息生产过快:在高流量或者高负载的情况下,生产者以极高的速率发送消息,超过了消费者的处理能力。

消费者处理能力不足:消费者消费消息的速度跟不上消息生产的速度,也会导致消息在队列中积压。可能的原因有:

  • 消费端业务逻辑复杂、耗时长。
  • 消费端代码性能低。
  • 系统资源限制,如CPU、内存、磁盘等也会限制消费者处理消息的速率。
  • 异常处理不当,消费者在处理消息时出现异常,导致消息无法被正确处理和确认。

网络问题:因为网络延迟或不稳定,消费者无法及时接收或确认消息,最终导致消息积压。

RabbitMQ服务器配置偏低

解决方案

消息积压可能会导致系统性能下降,影响用户体验,甚至导致系统崩溃。因此,及时发现消息积压并解决对于维护系统稳定性至关重要。

遇到消息积压时,首先要分析消息积压造成的原因,根据原因来调整策略。通常从以下几个方面考虑:

提高消费者效率

  • 增加消费者实例数量,比如新增机器。
  • 优化业务逻辑,比如使用多线程来处理业务。
  • 设置prefetchCount,当一个消费者阻塞时,消息转发到其他未阻塞的消费者。
  • 消息发生异常时,设置合适的重试策略,或者转入到死信队列。

限制生产者效率:比如流量控制、限流算法等。

  • 流量控制:在消息生产者中实现流量控制逻辑,根据消费者处理能力动态调整发送效率。
  • 限流:使用限流工具,为消息发送效率设置一个上限。
  • 设置过期时间:如果消息过期未被消费,可以配置死信队列,以避免消息丢失,同时减少对主队列的压力。

资源与配置优化:比如省级RabbitMQ服务器的硬件,调整RabbitMQ的配置参数等。

推拉模式

概述

RabbitMQ支持两种消息传递模式:推模式(push)和拉模式(pull)。

推模式:消息中间件主动将消息推送给消费者(对消息的获取更加实时,适合对数据实时性要求较高的业务,例如实时数据处理:监控系统、报表系统等)。

拉模式:消费者主动从消息中间件拉取消息(消费端可以按照自己的处理速度来消费,避免消息积压,适合需要流量控制,或者需要大量计算资源的任务,拉取模式允许消费者准备好之后再请求消息,避免资源浪费)。

RabbitMQ主要就是基于推模式工作的,例如最开始谈到的几种工作模式,都是基于推模式来进行实现。它的设计核心是让消息队列中的消费者接收到由生产者发送的消息,使用channel.basicConsume方式订阅队列,MQ就会把消息推送到订阅该队列的消费者。如果只想从队列中获取单条消息而不是持续订阅,则可以使用channel.basicGet方式来进行消费消息。

SpringBoot方式

@Configuration
public class PushAndPull {

    @Bean("pushQueue")
    public Queue pushQueue() {
        return QueueBuilder.durable(Constants.PUSH_QUEUE).build();
    }

    @Bean("pushExchange")
    public Exchange pushExchange() {
        return ExchangeBuilder.directExchange(Constants.PUSH_EXCHANGE).durable(true).build();
    }

    @Bean("pushQueueBind")
    public Binding pushQueueBind(@Qualifier("pushExchange") Exchange exchange,
                                 @Qualifier("pushQueue") Queue queue) {
        return BindingBuilder.bind(queue).to(exchange).with("push").noargs();
    }

    @Bean("pullQueue")
    public Queue pullQueue() {
        return QueueBuilder.durable(Constants.PULL_QUEUE).build();
    }

    @Bean("pullExchange")
    public Exchange pullExchange() {
        return ExchangeBuilder.directExchange(Constants.PULL_EXCHANGE).durable(true).build();
    }

    @Bean("pullQueueBind")
    public Binding pullQueueBind(@Qualifier("pullExchange") Exchange exchange,
                                 @Qualifier("pullQueue") Queue queue) {
        return BindingBuilder.bind(queue).to(exchange).with("pull").noargs();
    }
}
@Slf4j
@RestController
@RequestMapping("/pushAndPull")
public class PushAndPullController {

    @Resource
    private RabbitTemplate rabbitTemplate;

    // 推模式
    @RequestMapping("/push")
    public void push() {
        this.rabbitTemplate.convertAndSend(Constants.PUSH_EXCHANGE, "push", "hello push");
        System.out.println("推模式消息发送成功!");
    }

    // 拉模式
    @RequestMapping("/pull")
    public void pull() {
        this.rabbitTemplate.convertAndSend(Constants.PULL_EXCHANGE, "pull", "hello pull");
        System.out.println("拉模式消息发送成功!");
    }

}
@Configuration
@RestController
public class PushAndPullListener {

    // 推模式
    @RabbitListener(queues = Constants.PUSH_QUEUE)
    public void push(String message) {
        System.out.println("推模式: " + message);
    }

    @Resource
    private RabbitTemplate rabbitTemplate;

    // 拉模式
    @RequestMapping("/pullConsumer")
    public void pull(String message) {
        Message receive = this.rabbitTemplate.receive(Constants.PULL_QUEUE);
        System.out.println("拉模式: " + receive);
    }

}

SDK方式

// 推模式生产者
public class Producer {

    public static void main(String[] args) throws IOException, TimeoutException {
        // TODO 创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("113.45.220.15"); // IP
        connectionFactory.setPort(5672); // PORT
        connectionFactory.setUsername("admin"); // 用户名
        connectionFactory.setPassword("admin"); // 密码
        connectionFactory.setVirtualHost("mq-sdk-test"); // 虚拟主机

        // TODO 创建连接
        Connection connection = connectionFactory.newConnection();

        // TODO 获取信道
        Channel channel = connection.createChannel();

        // TODO 声明交换机
        channel.exchangeDeclare(Constants.PUSH_EXCHANGE, BuiltinExchangeType.DIRECT, true);

        // TODO 声明队列
        channel.queueDeclare(Constants.PUSH_QUEUE, true, false, false, null);

        // TODO 绑定交换机和队列
        channel.queueBind(Constants.PUSH_QUEUE, Constants.PUSH_EXCHANGE, "push");

        // TODO 发送消息
        channel.basicPublish(Constants.PUSH_EXCHANGE, "push", null, "推模式".getBytes());
        System.out.println("推模式发送消息成功!");

        // TODO 释放资源
        channel.close();
        connection.close();
    }

}
// 推模式消费者
public class Consumer {

    public static void main(String[] args) throws IOException, TimeoutException {
        // TODO 创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("113.45.220.15"); // IP
        connectionFactory.setPort(5672); // PORT
        connectionFactory.setUsername("admin"); // 用户名
        connectionFactory.setPassword("admin"); // 密码
        connectionFactory.setVirtualHost("mq-sdk-test"); // 虚拟主机

        // TODO 创建连接
        Connection connection = connectionFactory.newConnection();

        // TODO 获取信道
        Channel channel = connection.createChannel();

        // TODO 声明交换机
        channel.exchangeDeclare(Constants.PUSH_EXCHANGE, BuiltinExchangeType.DIRECT, true);

        // TODO 声明队列
        channel.queueDeclare(Constants.PUSH_QUEUE, true, false, false, null);

        // TODO 绑定交换机和队列
        channel.queueBind(Constants.PUSH_QUEUE, Constants.PUSH_EXCHANGE, "push");

        // TODO 接收消息
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("推模式消费者接收到消息:" + new String(body));
            }
        };
        channel.basicConsume(Constants.PUSH_QUEUE, true, consumer);


        // TODO 释放资源

    }

}
// 拉模式生产者
public class Producer {

    public static void main(String[] args) throws IOException, TimeoutException {
        // TODO 创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("113.45.220.15"); // IP
        connectionFactory.setPort(5672); // PORT
        connectionFactory.setUsername("admin"); // 用户名
        connectionFactory.setPassword("admin"); // 密码
        connectionFactory.setVirtualHost("mq-sdk-test"); // 虚拟主机

        // TODO 创建连接
        Connection connection = connectionFactory.newConnection();

        // TODO 获取信道
        Channel channel = connection.createChannel();

        // TODO 声明交换机
        channel.exchangeDeclare(Constants.PULL_EXCHANGE, BuiltinExchangeType.DIRECT, true);

        // TODO 声明队列
        channel.queueDeclare(Constants.PULL_QUEUE, true, false, false, null);

        // TODO 绑定交换机和队列
        channel.queueBind(Constants.PULL_QUEUE, Constants.PULL_EXCHANGE, "pull");

        // TODO 发送消息
        channel.basicPublish(Constants.PULL_EXCHANGE, "pull", null, "拉模式".getBytes());
        System.out.println("拉模式发送消息成功!");

        // TODO 释放资源
        channel.close();
        connection.close();
    }

}
// 拉模式消费者
public class Consumer {

    public static void main(String[] args) throws IOException, TimeoutException {
        // TODO 创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("113.45.220.15"); // IP
        connectionFactory.setPort(5672); // PORT
        connectionFactory.setUsername("admin"); // 用户名
        connectionFactory.setPassword("admin"); // 密码
        connectionFactory.setVirtualHost("mq-sdk-test"); // 虚拟主机

        // TODO 创建连接
        Connection connection = connectionFactory.newConnection();

        // TODO 获取信道
        Channel channel = connection.createChannel();

        // TODO 声明交换机
        channel.exchangeDeclare(Constants.PULL_EXCHANGE, BuiltinExchangeType.DIRECT, true);

        // TODO 声明队列
        channel.queueDeclare(Constants.PULL_QUEUE, true, false, false, null);

        // TODO 绑定交换机和队列
        channel.queueBind(Constants.PULL_QUEUE, Constants.PULL_EXCHANGE, "pull");

        // TODO 接收消息
        GetResponse getResponse = channel.basicGet(Constants.PULL_QUEUE, true);
        if (getResponse != null) {
            System.out.println("拉模式收到消息:" + new String(getResponse.getBody()));
        }


        // TODO 释放资源

    }

}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/885419.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

vue-element 表格组合查询 - fc-table-search 组件封装

开发目的 解决搜索form参数读取,配合异步请求,更新渲染数据;支持自适应高度,分页查询,搜索查询/重置。 额外提供formater类型:标签定义,金额,时间格式化,跨页勾选&#x…

uniapp/vue项目 import 导入文件时提示Module is not installed,‘@/views/xxx‘路径无法追踪

文章目录 背景解决方案1.IDE配置2.alias(别名)配置webpackvue-clivite 3.检查 jsconfig.json 或 tsconfig.json 写在最后 前往闪闪の小窝以获得更好的阅读和评论体验 背景 Vue3在我自学Vue的时候看过一点,实操过一点,但是太久没用…

基于php的酒店管理系

作者:计算机学姐 开发技术:SpringBoot、SSM、Vue、MySQL、JSP、ElementUI、Python、小程序等,“文末源码”。 专栏推荐:前后端分离项目源码、SpringBoot项目源码、Vue项目源码、SSM项目源码 精品专栏:Java精选实战项目…

动手学深度学习(李沐)PyTorch 第 3 章 线性神经网络

3.1 线性回归 线性回归是对n维输入的加权,外加偏差 线性回归可以看作是单层神经网络 回归问题中最常用的损失函数是平方误差函数。 平方误差可以定义为以下公式: 常数1/2不会带来本质的差别,但这样在形式上稍微简单一些 (因为当…

实时语音交互,打造更加智能便捷的应用

随着人工智能和自然语言处理技术的进步,用户对智能化和便捷化应用的需求不断增加。语音交互技术以其直观的语音指令,革新了传统的手动输入方式,简化了用户操作,让应用变得更加易用和高效。 通过语音交互,用户可以在不…

Android入门

下载Android studio,创建第一个项目 模板可以选择empty views Activity 在这个界面可以修改,使用语言,项目名字,存储路径以及适用版本 完成后,得到一个最初始的Android 项目,红色标记的两个文件&#xf…

七星创客:重塑商业模式认知

近期,一个普遍存在的疑问困扰着许多人:“商业模式是否仅仅等同于拉人头或传销活动?”这样的联想或许源于对商业模式概念的片面理解,使得一些人错误地将所有商业模式都笼罩在负面阴影之下。 商业模式,这一商业领域的核心…

(IDEA)spring项目导入本地jar包方法和项目打包时找不到引入本地jar包的问题解决方案

系列文章目录 文章目录 系列文章目录一、(IDEA)spring项目导入本地jar包方法和项目打包时找不到引入本地jar包的问题解决方案1.资料 一、(IDEA)spring项目导入本地jar包方法和项目打包时找不到引入本地jar包的问题解决方案 1.资料…

Windows11系统下SkyWalking环境搭建教程

目录 前言SkyWalking简介SkyWalking下载Agent监控实现启动配置SkyWalking启动Java应用程序启动Elasticsearch安装总结 前言 本文为博主在项目环境搭建时记录的SkyWalking安装流程,希望对大家能够有所帮助,不足之处欢迎批评指正🤝&#x1f91…

828华为云征文|华为云Flexus云服务器X实例部署 即时通讯IM聊天交友软件——高性能服务器实现120W并发连接

营运版的即时通讯IM聊天交友系统:特点可发红包,可添加多条链接到用户网站和应用,安卓苹果APPPC端H5四合一 后端开发语言:PHP, 前端开发语言:uniapp混合开发。 集安卓苹果APPPC端H5四合一APP源码&#xff0…

微信小程序——婚礼邀请函

目的 1.掌握微信小程序的开发技术,包括页面布局、交互设计、数据存储等。 2.学会运用微信小程序的各种组件和 API,实现个性化的婚礼邀请函功能。 3.通过制作婚礼邀请函小程序,提升创意设计和用户体验优化的能力。 4.了解如何在小程序中整…

JAVA并发编程系列(11)线程池底层原理架构剖析

面试官:说说JAVA线程池的几个核心参数? 之前我们用了10篇文章详细剖析了synchronized、volatile、CAS、AQS、ReentrantLock、Semaphore、CountDownLatch、CyclicBarrier、并发锁、Condition等各个核心基础原理,今天开始我们说说并发领域的各种…

信息安全数学基础(24)模为奇数的平方剩余与平方非剩余

前言 在信息安全数学基础中,模为奇数的平方剩余与平方非剩余是数论中的一个重要概念,特别是在密码学和安全协议中扮演着关键角色。当模数为奇数时,我们通常关注的是模为奇素数的平方剩余与平方非剩余,因为奇合数的情况更为复杂且…

自己做个国庆75周年头像生成器

版权声明:本文为博主原创文章,转载请在显著位置标明本文出处以及作者网名,未经作者允许不得用于商业目的。 下载相关代码:【免费】《自己做个国庆75周年头像生成器》代码资源-CSDN文库 又是一年国庆节,今年使用国旗做…

智慧城市交通管理中的云端多车调度与控制

城市交通管理中的云端多车调度与控制 智慧城市是 21世纪的城市基本发展方向,为了实现智慧城市建设的目标,人们需要用现代化的手段去管理和控制城市中的各种资源和设施。智能交通控制与管理是智慧城市中不可缺少的一部分,因为现代城市交通系统…

【2024工业3D异常检测文献】CMDIAD: 基于跨模态蒸馏驱动的多模态工业异常检测

Incomplete Multimodal Industrial Anomaly Detection via Cross-Modal Distillation 1、Background 近年来,基于3D点云和RGB图像的多模态工业异常检测(IAD)研究强调了利用模态间的冗余性和互补性对于精确分类和分割的重要性。 在项目中,提出了CMDIAD方…

亲身体验Llama 3.1:开源模型的部署与应用之旅

文章目录 1 Llama 3.1系列的诞生2 大型模型的未来发展3 使用教程4 Llama 3.1在客户服务中的运用 1 Llama 3.1系列的诞生 在人工智能的浪潮中,大型语言模型(LLM)正以其独特的魅力和潜力,成为深度学习领域的一颗耀眼明星。 这些模…

大模型增量训练--基于transformer制作一个大模型聊天机器人

针对夸夸闲聊数据集,利用UniLM模型进行模型训练及测试,更深入地了解预训练语言模型的使用方法,完成一个生成式闲聊机器人任务。 项目主要结构如下: data 存放数据的文件夹 dirty_word.txt 敏感词数据douban_kuakua_qa.txt 原始语…

【算法篇】二叉树类(2)(笔记)

目录 一、Leetcode 题目 1. 左叶子之和 (1)迭代法 (2)递归法 2. 找树左下角的值 (1)广度优先算法 (2)递归法 3. 路径总和 (1)递归法 (2…

JavaScript类型转换和相等性详解

类型转换 10"objects" //10objects,数字10转换为字符串 "7"*"4" //28,两个字符串均转为数字,只要不是加,其他都按两个数字算 var n 1-"x"// NaN,字符串x无法转化为数字 n"objects"//…