博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
高可用服务 AHAS 在消息队列 MQ 削峰填谷场景下的应用
阅读量:7070 次
发布时间:2019-06-28

本文共 5276 字,大约阅读时间需要 17 分钟。

在消息队列中,当消费者去消费消息的时候,无论是通过 pull 的方式还是 push 的方式,都可能会出现大批量的消息突刺。如果此时要处理所有消息,很可能会导致系统负载过高,影响稳定性。但其实可能后面几秒之内都没有消息投递,若直接把多余的消息丢掉则没有充分利用系统处理消息的能力。我们希望可以把消息突刺均摊到一段时间内,让系统负载保持在消息处理水位之下的同时尽可能地处理更多消息,从而起到“削峰填谷”的效果:

削峰填谷

上图中红色的部分代表超出消息处理能力的部分。

我们可以看到消息突刺往往都是瞬时的、不规律的,其后一段时间系统往往都会有空闲资源。我们希望把红色的那部分消息平摊到后面空闲时去处理,这样既可以保证系统负载处在一个稳定的水位,又可以尽可能地处理更多消息,这时候我们就需要一个能够控制消费端消息匀速处理的利器 — ,来为消息队列削峰填谷,保驾护航。

AHAS 是如何削峰填谷的

AHAS 的流控降级是面向分布式服务架构的专业流量控制组件,主要以流量为切入点,从流量控制、熔断降级、系统保护等多个维度来帮助您保障服务的稳定性,同时提供强大的聚合监控和历史监控查询功能。

AHAS 专门为这种场景提供了的控制特性,可以把突然到来的大量请求以匀速的形式均摊,以固定的间隔时间让请求通过,以稳定的速度逐步处理这些请求,起到“削峰填谷”的效果,从而避免流量突刺造成系统负载过高。同时堆积的请求将会排队,逐步进行处理;当请求排队预计超过最大超时时长的时候则直接拒绝,而不是拒绝全部请求。

比如在 RocketMQ 的场景下配置了匀速模式下请求 QPS 为 5,则会每 200 ms 处理一条消息,多余的处理任务将排队;同时设置了超时时间,预计排队时长超过超时时间的处理任务将会直接被拒绝。示意图如下图所示:

Uniform rate

RocketMQ Consumer 接入示例

本部分将引导您快速在 RocketMQ 消费端接入 AHAS 流控降级 Sentinel。

1. 开通 AHAS

首先您需要到开通 AHAS 功能(免费)。可以根据 里面的指引进行开通。

2. 代码改造

在结合阿里云 RocketMQ Client 使用 Sentinel 时,用户需要引入 AHAS Sentinel 的依赖 ahas-sentinel-client (以 Maven 为例):

com.alibaba.csp
ahas-sentinel-client
1.1.0

由于 RocketMQ Client 未提供相应拦截机制,而且每次收到都可能是批量的消息,因此用户在处理消息时需要手动进行资源定义(埋点)。我们可以在处理消息的逻辑处手动进行埋点,资源名可以根据需要来确定(如 groupId + topic 的组合):

private static Action handleMessage(Message message, String groupId, String topic) {        Entry entry = null;        try {            // 资源名称为 groupId 和 topic 的组合,便于标识,同时可以针对不同的 groupId 和 topic 配置不同的规则            entry = SphU.entry("handleMqMessage:" + groupId + ":" + topic);                      // 在此处编写真实的处理逻辑            System.out.println(System.currentTimeMillis() + " | handling message: " + message);            return Action.CommitMessage;        } catch (BlockException ex) {            // 在编写处理被流控的逻辑            // 示例:可以在此处记录错误或进行重试            System.err.println("Blocked, will retry later: " + message);            return Action.ReconsumeLater; // 会触发消息重新投递        } finally {            if (entry != null) {                entry.exit();            }        }    }

消费者订阅消息的逻辑示例:

Consumer consumer = ONSFactory.createConsumer(properties);consumer.subscribe(topic, "*", (message, context) -> {    return handleMessage(message);});consumer.start();

更多关于 RocketMQ SDK 的信息可以参考 。

3. 获取 AHAS 启动参数

注意:若在本地运行接入 AHAS Sentinel 控制台需要在页面左上角选择 公网 环境,若在阿里云 ECS 环境则在页面左上角选择对应的 Region 环境。

我们可以进入 ,点击左侧侧边栏的 流控降级,进入 AHAS 流控降级控制台应用总览页面。在页面右上角,单击添加应用,选择 SDK 接入页签,到 配置启动参数 页签拿到需要的启动参数(详情请参考 ),类似于:

-Dproject.name=AppName -Dahas.license=

其中 project.name 配置项代表应用名(会显示在控制台,比如 MqConsumerDemo),ahas.license 配置项代表自己的授权 license(ECS 环境不需要此项)。

4. 启动 Consumer,配置规则

接下来我们添加获取到的启动参数,启动修改好的 Consumer 应用。由于 AHAS 流控降级需要进行资源调用才能触发初始化,因此首先需要向对应 group/topic 发送一条消息触发初始化。消费端接收到消息后,我们就可以在 AHAS Sentinel 控制台上看到我们的应用了。点击应用卡片,进入详情页面后点击左侧侧边栏的“机器列表”。我们可以在机器列表页面看到刚刚接入的机器,代表接入成功:

1550735297356_88465105_1099_4a15_9028_bde07c20dc84

点击“请求链路”页面,我们可以看到之前定义的资源。点击右边的“流控”按钮添加新的流控规则:

1550733424354_d626e59b_2cc5_4c19_8da6_3544e7eb6e99

我们在“流控方式”中选择“排队等待”,设置 QPS 为 10,代表每 100ms 匀速通过一个请求;并且设置最大超时时长为 2000ms,超出此超时时间的请求将不会排队,立即拒绝。配置完成后点击新建按钮。

5. 发送消息,查看效果

下面我们可以在 Producer 端批量发送消息,然后在 Consumer 端的控制台输出处观察效果。可以看到消息消费的速率是匀速的,大约每 100 ms 消费一条消息:

1550732955137 | handling message: Hello MQ 24531550732955236 | handling message: Hello MQ 91621550732955338 | handling message: Hello MQ 49441550732955438 | handling message: Hello MQ 55821550732955538 | handling message: Hello MQ 44931550732955637 | handling message: Hello MQ 30361550732955738 | handling message: Hello MQ 13811550732955834 | handling message: Hello MQ 14501550732955937 | handling message: Hello MQ 5871

同时不断有排队的处理任务完成,超出等待时长的处理请求直接被拒绝。注意在处理请求被拒绝的时候,需要根据需求决定是否需要重新消费消息。

我们也可以点击左侧侧边栏的“监控详情”进入监控详情页面,查看处理消息的监控曲线:

1550733864124_bdc35a38_cee3_4f77_a599_9898fa685fa2

对比普通限流模式的监控曲线(最右面的部分):

1550733991366_2bb63f68_df03_444f_bb4e_d3abb1395ea0

如果不开启匀速模式,只是普通的限流模式,则只会同时处理 10 条消息,其余的全部被拒绝,即使后面的时间系统资源充足多余的请求也无法被处理,因而浪费了许多空闲资源。两种模式对比说明匀速模式下消息处理能力得到了更好的利用。

Kafka 接入代码示例

Kafka 消费端接入 AHAS 流控降级的思路与上面的 RocketMQ 类似,这里给出一个简单的代码示例:

private static void handleMessage(ConsumerRecord
record, String groupId, String topic) { pool.submit(() -> { Entry entry = null; try { // 资源名称为 groupId 和 topic 的组合,便于标识,同时可以针对不同的 groupId 和 topic 配置不同的规则 entry = SphU.entry("handleKafkaMessage:" + groupId + ":" + topic); // 在此处理消息. System.out.printf("[%d] Receive new messages: %s%n", System.currentTimeMillis(), record.toString()); } catch (BlockException ex) { // Blocked. // NOTE: 在处理请求被拒绝的时候,需要根据需求决定是否需要重新消费消息 System.err.println("Blocked: " + record.toString()); } finally { if (entry != null) { entry.exit(); } } });}

消费消息的逻辑:

while (true) {    try {        ConsumerRecords
records = consumer.poll(1000); // 必须在下次 poll 之前消费完这些数据, 且总耗时不得超过 SESSION_TIMEOUT_MS_CONFIG // 建议开一个单独的线程池来消费消息,然后异步返回结果 for (ConsumerRecord
record : records) { handleMessage(record, groupId, topic); } } catch (Exception e) { try { Thread.sleep(1000); } catch (Throwable ignore) { } e.printStackTrace(); }}

其它

以上介绍的只是 AHAS 流控降级的其中一个场景 —— 请求匀速,它还可以处理更复杂的各种情况,比如:

  • 流量控制:可以针对不同的调用关系,以不同的运行指标(如 QPS、线程数、系统负载等)为基准,对资源调用进行流量控制,将随机的请求调整成合适的形状(请求匀速、Warm Up 等)。
  • 熔断降级:当调用链路中某个资源出现不稳定的情况,如平均 RT 增高、异常比例升高的时候,会使对此资源的调用请求快速失败,避免影响其它的资源导致级联失败。
  • 系统负载保护:对系统的维度提供保护。当系统负载较高的时候,提供了对应的保护机制,让系统的入口流量和系统的负载达到一个平衡,保证系统在能力范围之内处理最多的请求。

您可以参考 来挖掘更多的场景。

转载地址:http://gyell.baihongyu.com/

你可能感兴趣的文章
Jquery获取input表单的内容
查看>>
Paint、Canvas
查看>>
关于读写锁算法的java实现及思考
查看>>
最小堆排序
查看>>
iOS-网络爬虫
查看>>
jira
查看>>
elasticsearch rpm 安装
查看>>
Python基础总结(字符串常用,数字类型转换,基本运算符与流程控制)
查看>>
数据预处理——剔除异常值,平滑,归一化
查看>>
Visual stuido 项目路径的奇怪问题
查看>>
java局部变量和临时变量
查看>>
返回杨辉三角前 n 层的数字
查看>>
布局(2、相对布局)
查看>>
在 Eclipse 上配置tomcat7.0并创建工程发布
查看>>
腾讯移动分析 签名代码示例
查看>>
重新回归博客园,写在开始的话。
查看>>
JavaScript定义函数
查看>>
AJAX的基础
查看>>
安全使用电子邮件十三法
查看>>
开发人员必知的20+HTML5技巧
查看>>