博客
关于我
RocketMQ 源码分析 —— Filtersrv
阅读量:796 次
发布时间:2023-03-22

本文共 2799 字,大约阅读时间需要 9 分钟。

RocketMQ Filtersrv 过滤机制深度解析

Filtersrv 是 RocketMQ 系列中一个重要的组件,主要负责自定义规则过滤 Consumer 从 Broker 拉取的消息。它通过在 Broker 端减少不必要的消息传输,同时避免了 Consumer 端接收无用消息的负担,在 RocketMQ 的架构中扮演着关键角色。本文将详细介绍 Filtersrv 的工作原理、实现机制以及实际应用场景。

1. Filtersrv 概述

Filtersrv 的核心功能是为 Consumer 提供灵活的过滤规则,通过 Broker 端进行消息过滤。与传统的 Broker 端过滤相比,Filtersrv 的优势在于减少了 Broker 的负担,同时也减少了 Consumer 接收无用消息的开销。它通过将过滤逻辑外部化,实现了高效的消息过滤机制。

为什么不直接在 Broker 过滤消息?

官方对 Broker 端过滤消息的理由包括:

  • Broker 端过滤的优点

    • 减少 Consumer 无用消息的网络传输。
    • 适合需要复杂过滤规则的场景,如淘宝 Notify 等。
  • Broker 端过滤的缺点

    • 增加了 Broker 的负担,实现复杂。
    • 不适合对消息传输效率要求较高的场景。
  • Filtersrv 的补充作用

    • 在 Broker 端过滤的基础上,Filtersrv 提供了更灵活的过滤规则,减少了无用消息的传输。
    • 通过高可用实现,确保过滤服务的稳定性。
  • 2. Filtersrv 注册到 Broker

    Filtersrv 与 Broker 的关系是基于高可用性的。一个 Filtersrv 实例只对应一个 Broker,但一个 Broker 可以有多个 Filtersrv 实例。Filtersrv 注册到 Broker 时,需要定期重试,如果注册失败则退出关闭。

    核心代码解析

    public boolean initialize() {
    // ...
    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
    @Override
    public void run() {
    FiltersrvController.this.registerFilterServerToBroker();
    }
    }, 15, 10, TimeUnit.SECONDS); // TODO edit by 芋艿:initialDelay时间太短,可能导致初始化失败。从3→15
    // ...
    }

    注册逻辑

  • 定期注册:Filtersrv 定期向 Broker 注册,确保服务可用。
  • 高可用性:如果注册失败,Filtersrv 会退出并关闭。
  • 重试机制:代码中没有明确显示,但通常会有重试逻辑来应对网络或其他异常。
  • 3. 过滤类

    Filtersrv 的核心是处理 Consumer 上传的过滤类代码,并进行编译使用。过滤类的实现涉及三个关键步骤:

    3.1 Consumer 订阅时设置过滤类代码

    Consumer 可以针对每个 Topic 设置不同的过滤类代码,实现高度定制化的过滤规则。

    3.2 Consumer 上传过滤类代码

    Consumer 在心跳注册到 Broker 的同时,会上传对应的过滤类代码到所有相关的 Filtersrv。这种方式确保了过滤规则与消费者的业务逻辑保持一致。

    3.3 Filter 编译过滤类代码

    Filtersrv 处理上传的过滤类代码,进行编译并生成对应的 MessageFilter 实例。具体实现如下:

    public boolean registerFilterClass(final String consumerGroup,
    final String topic,
    final String className,
    final int classCRC,
    final byte[] filterSourceBinary) {
    // ...
    try {
    FilterClassManager filterClassManager = this.filtersrvController.getFilterClassManager();
    FilterClassInfo filterClassInfoNew = new FilterClassInfo();
    filterClassInfoNew.setClassName(className);
    filterClassInfoNew.setClassCRC(0);
    filterClassInfoNew.setMessageFilter((MessageFilter) newInstance);
    filterClassInfoNew.setClassCRC(classCRC);
    this.filterClassTable.put(key, filterClassInfoNew);
    return true;
    } catch (Throwable e) {
    // ...
    return false;
    }
    }

    4. 过滤消息

    Filtersrv 的核心功能是根据过滤规则决定是否将消息拉取到 Consumer。具体实现包括两种模式:

    4.1 Consumer 从 Filtersrv 拉取消息

    Consumer 在拉取消息时,会从 Broker 对应的 Filtersrv 列表中随机选择一个实例进行消息拉取。如果没有可用的 Filtersrv,拉取消息将失败。

    4.2 Filtersrv 从 Broker 拉取消息

    Filtersrv 作为一个 Consumer,它会从 Broker 拉取消息,并根据过滤规则决定是否将消息传递给实际的 Consumer。

    5. Filtersrv 高可用

    Filtersrv 实现了高可用性,通过多个实例并发工作。当一个 Filtersrv 出现故障时,系统会自动切换到其他可用的实例,确保过滤服务的稳定性。

    结语

    Filtersrv 是 RocketMQ 中一个重要的组件,其过滤机制为消息系统提供了灵活性和高效性。通过合理配置和管理 Filtersrv,可以显著优化消息系统的性能。在实际应用中,推荐结合 RocketMQ 的其他特性如消息顺序、集群消费等,构建高效可靠的消息系统。

    转载地址:http://xiqfk.baihongyu.com/

    你可能感兴趣的文章
    Objective-C实现循环队列算法(附完整源码)
    查看>>
    Objective-C实现循环队列链表算法(附完整源码)
    查看>>
    Objective-C实现快速排序(附完整源码)
    查看>>
    Objective-C实现快速排序(附完整源码)
    查看>>
    Objective-C实现快速排序算法(附完整源码)
    查看>>
    Objective-C实现恩尼格玛密码机算法(附完整源码)
    查看>>
    Objective-C实现感知哈希算法(附完整源码)
    查看>>
    Objective-C实现感知哈希算法(附完整源码)
    查看>>
    Objective-C实现截留雨水问题的动态编程方法算法(附完整源码)
    查看>>
    Objective-C实现截留雨水问题的蛮力方法的算法(附完整源码)
    查看>>
    Objective-C实现打印10000以内的完数(附完整源码)
    查看>>
    Objective-C实现打印1000以内的水仙花数(附完整源码)
    查看>>
    Objective-C实现打印九九乘法表(附完整源码)
    查看>>
    Objective-C实现打印从 0 到 n 的卡特兰数算法(附完整源码)
    查看>>
    Objective-C实现打印函数调用堆栈( 附完整源码)
    查看>>
    Objective-C实现打印月份的日历算法(附完整源码)
    查看>>
    Objective-C实现打印杨辉三角(附完整源码)
    查看>>
    Objective-C实现打印某年的历法日期(附完整源码)
    查看>>
    Objective-C实现打印魔方矩阵(附完整源码)
    查看>>
    Objective-C实现打格点算法(附完整源码)
    查看>>