常见问题一
问: RocketMQ消费者订阅了tag,但却收不到消息无法消费,并且根据 msgid 去查询,发现这条消息的状态为 CONSUMED_BUT_FILTERED,那这是为什么?
答: 在RocketMQ中,一个消费组能同时订阅多个 tag,但一个消费组的不同消费者不能分开订阅不同的tag,即同一个消费组的订阅关系必须保持一样。例如:常见错误使用方式同一个项目中,一段消费代码订阅tagA,然后拷贝到这段代码再更改为tagB。
正确用法
1 2 3 4
| public void subscribe(){ DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("melon_online_test_consumer"); consumer.subscribe("melon_online_test","tag1 || tag2 || tag3"); }
|
错误用法
1 2 3 4 5 6 7 8 9 10 11
| public class SubscribeTest { public void subscribeA(){ DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("melon_online_test_consumer"); consumer.subscribe("melon_online_test","tag1"); }
public void subscribeB(){ DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("melon_online_test_consumer"); consumer.subscribe("melon_online_test","tag2"); } }
|
常见问题二
问: 发现大量的RocketMQ client 大量的info日志输出,我不关心,如何禁用呢?

答: 尝试以下设置,项目中使用了Slf4j
@1 可以配置RocketmqClient的logger设置优先级为warn
@2 也可以通过-Drocketmq.client.logUseSlf4j=false 和 -Drocketmq.client.logLevel=WARN 关闭MQ客户端使用Slf4j并提高日志等级
项目中没有使用Slf4j,可以通过-Drocketmq.client.logLevel=WARN调高日志等级。
常见问题三
问: 我的服务消费后需要调用第三方接口,别人的接口调用有限制,Rocketmq消费可以限流吗?
答: RocketMQ本身没有类似每秒消费多少条数据的精确限流,我们可以结合Sentienl来实现,示例代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44
| private String KEY = "melon_topic:melon_consumer";
public static void main(String[] args) throws InterruptedException, MQClientException { initFlowControlRule(); DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("melon_consumer"); consumer.setNamesrvAddr("localhost:9876"); consumer.subscribe("melon_topic", "*"); consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { for (MessageExt msg : msgs) { Entry entry = null; try { ContextUtil.enter(KEY); entry = SphU.entry(KEY, EntryType.OUT); System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msg); } catch (BlockException ex) { return ConsumeConcurrentlyStatus.RECONSUME_LATER; } finally { if (entry != null) { entry.exit(); } ContextUtil.exit(); } } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); System.out.printf("Consumer Started.%n"); }
private static void initFlowControlRule() { FlowRule rule = new FlowRule(); rule.setResource(KEY); rule.setCount(5); rule.setGrade(RuleConstant.FLOW_GRADE_QPS); rule.setControlBehavior(RuleConstant.CONTROL_BEHAVIOR_RATE_LIMITER); rule.setMaxQueueingTimeMs(5 * 1000); FlowRuleManager.loadRules(Collections.singletonList(rule)); }
|
常见问题四
问:RocketMQ默认延迟等级有18个,我可以扩增吗?
答: 可以的,但是不建议扩增太多等级,可以通过修改broker属性messageDelayLevel来实现,注意修改了后需要重启broker。例如:
1
| messageDelayLevel = 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h 1d 3d 7d 14d 21d
|