本文共 2799 字,大约阅读时间需要 9 分钟。
Filtersrv 是 RocketMQ 系列中一个重要的组件,主要负责自定义规则过滤 Consumer 从 Broker 拉取的消息。它通过在 Broker 端减少不必要的消息传输,同时避免了 Consumer 端接收无用消息的负担,在 RocketMQ 的架构中扮演着关键角色。本文将详细介绍 Filtersrv 的工作原理、实现机制以及实际应用场景。
Filtersrv 的核心功能是为 Consumer 提供灵活的过滤规则,通过 Broker 端进行消息过滤。与传统的 Broker 端过滤相比,Filtersrv 的优势在于减少了 Broker 的负担,同时也减少了 Consumer 接收无用消息的开销。它通过将过滤逻辑外部化,实现了高效的消息过滤机制。
官方对 Broker 端过滤消息的理由包括:
Broker 端过滤的优点:
Broker 端过滤的缺点:
Filtersrv 的补充作用:
Filtersrv 与 Broker 的关系是基于高可用性的。一个 Filtersrv 实例只对应一个 Broker,但一个 Broker 可以有多个 Filtersrv 实例。Filtersrv 注册到 Broker 时,需要定期重试,如果注册失败则退出关闭。
public boolean initialize() { // ... this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { FiltersrvController.this.registerFilterServerToBroker(); } }, 15, 10, TimeUnit.SECONDS); // TODO edit by 芋艿:initialDelay时间太短,可能导致初始化失败。从3→15 // ...} Filtersrv 的核心是处理 Consumer 上传的过滤类代码,并进行编译使用。过滤类的实现涉及三个关键步骤:
Consumer 可以针对每个 Topic 设置不同的过滤类代码,实现高度定制化的过滤规则。
Consumer 在心跳注册到 Broker 的同时,会上传对应的过滤类代码到所有相关的 Filtersrv。这种方式确保了过滤规则与消费者的业务逻辑保持一致。
Filtersrv 处理上传的过滤类代码,进行编译并生成对应的 MessageFilter 实例。具体实现如下:
public boolean registerFilterClass(final String consumerGroup, final String topic, final String className, final int classCRC, final byte[] filterSourceBinary) { // ... try { FilterClassManager filterClassManager = this.filtersrvController.getFilterClassManager(); FilterClassInfo filterClassInfoNew = new FilterClassInfo(); filterClassInfoNew.setClassName(className); filterClassInfoNew.setClassCRC(0); filterClassInfoNew.setMessageFilter((MessageFilter) newInstance); filterClassInfoNew.setClassCRC(classCRC); this.filterClassTable.put(key, filterClassInfoNew); return true; } catch (Throwable e) { // ... return false; }} Filtersrv 的核心功能是根据过滤规则决定是否将消息拉取到 Consumer。具体实现包括两种模式:
Consumer 在拉取消息时,会从 Broker 对应的 Filtersrv 列表中随机选择一个实例进行消息拉取。如果没有可用的 Filtersrv,拉取消息将失败。
Filtersrv 作为一个 Consumer,它会从 Broker 拉取消息,并根据过滤规则决定是否将消息传递给实际的 Consumer。
Filtersrv 实现了高可用性,通过多个实例并发工作。当一个 Filtersrv 出现故障时,系统会自动切换到其他可用的实例,确保过滤服务的稳定性。
Filtersrv 是 RocketMQ 中一个重要的组件,其过滤机制为消息系统提供了灵活性和高效性。通过合理配置和管理 Filtersrv,可以显著优化消息系统的性能。在实际应用中,推荐结合 RocketMQ 的其他特性如消息顺序、集群消费等,构建高效可靠的消息系统。
转载地址:http://xiqfk.baihongyu.com/