From 3d46356dc9763aebf09ab1ad9ba8ba27b70c291d Mon Sep 17 00:00:00 2001 From: Li Zhanhui Date: Mon, 11 Jan 2016 11:38:32 +0800 Subject: [PATCH] Update code on PR comments --- .../rocketmq/RocketMQDruidModule.java | 2 +- .../rocketmq/RocketMQFirehoseFactory.java | 177 ++++++++---------- 2 files changed, 80 insertions(+), 99 deletions(-) diff --git a/extensions/druid-rocketmq/src/main/java/io/druid/firehose/rocketmq/RocketMQDruidModule.java b/extensions/druid-rocketmq/src/main/java/io/druid/firehose/rocketmq/RocketMQDruidModule.java index 159928912a9..f902ea739be 100644 --- a/extensions/druid-rocketmq/src/main/java/io/druid/firehose/rocketmq/RocketMQDruidModule.java +++ b/extensions/druid-rocketmq/src/main/java/io/druid/firehose/rocketmq/RocketMQDruidModule.java @@ -34,7 +34,7 @@ public class RocketMQDruidModule implements DruidModule { return ImmutableList.of( new SimpleModule("RocketMQFirehoseModule") .registerSubtypes( - new NamedType(RocketMQFirehoseFactory.class, "RocketMQ-3.2.6") + new NamedType(RocketMQFirehoseFactory.class, "RocketMQ") ) ); } diff --git a/extensions/druid-rocketmq/src/main/java/io/druid/firehose/rocketmq/RocketMQFirehoseFactory.java b/extensions/druid-rocketmq/src/main/java/io/druid/firehose/rocketmq/RocketMQFirehoseFactory.java index 7d930e5c85b..a8ec73d3717 100644 --- a/extensions/druid-rocketmq/src/main/java/io/druid/firehose/rocketmq/RocketMQFirehoseFactory.java +++ b/extensions/druid-rocketmq/src/main/java/io/druid/firehose/rocketmq/RocketMQFirehoseFactory.java @@ -18,6 +18,7 @@ */ package io.druid.firehose.rocketmq; +import com.alibaba.rocketmq.client.Validators; import com.alibaba.rocketmq.client.consumer.DefaultMQPullConsumer; import com.alibaba.rocketmq.client.consumer.MessageQueueListener; import com.alibaba.rocketmq.client.consumer.PullResult; @@ -54,23 +55,28 @@ public class RocketMQFirehoseFactory implements FirehoseFactory feed; + + /** + * Pull batch size. It's optional. */ @JsonProperty - private final String feed; - + private final String pullBatchSize; /** * Store messages that are fetched from brokers but not yet delivered to druid via fire hose. @@ -85,13 +91,15 @@ public class RocketMQFirehoseFactory implements FirehoseFactory feed, + @JsonProperty("pullBatchSize") String pullBatchSize) { this.consumerProps = consumerProps; + this.pullBatchSize = pullBatchSize; for (Map.Entry configItem : this.consumerProps.entrySet()) { System.setProperty(configItem.getKey().toString(), configItem.getValue().toString()); } @@ -153,17 +161,12 @@ public class RocketMQFirehoseFactory implements FirehoseFactory(); pullMessageService = new DruidPullMessageService(defaultMQPullConsumer); - String[] topics = feed.split(","); - for (String topic : topics) { - topic = topic.trim(); - if (topic.isEmpty()) { - continue; - } - defaultMQPullConsumer.fetchSubscribeMessageQueues(topic); - topicQueueMap.put(topic, defaultMQPullConsumer.fetchMessageQueuesInBalance(topic)); + for (String topic : feed) { + Validators.checkTopic(topic); + topicQueueMap.put(topic, defaultMQPullConsumer.fetchSubscribeMessageQueues(topic)); } DruidMessageQueueListener druidMessageQueueListener = - new DruidMessageQueueListener(Sets.newHashSet(topics), topicQueueMap, defaultMQPullConsumer); + new DruidMessageQueueListener(Sets.newHashSet(feed), topicQueueMap, defaultMQPullConsumer); defaultMQPullConsumer.setMessageQueueListener(druidMessageQueueListener); defaultMQPullConsumer.start(); pullMessageService.start(); @@ -185,23 +188,23 @@ public class RocketMQFirehoseFactory implements FirehoseFactoryNote: this is a single thread service. */ - class DruidPullMessageService extends ServiceThread { + final class DruidPullMessageService extends ServiceThread { - private volatile List requestsWrite = new ArrayList(); - private volatile List requestsRead = new ArrayList(); + private volatile List requestsWrite = new ArrayList<>(); + private volatile List requestsRead = new ArrayList<>(); private final DefaultMQPullConsumer defaultMQPullConsumer; @@ -404,7 +371,7 @@ public class RocketMQFirehoseFactory implements FirehoseFactory(new MessageComparator())); + switch(pullResult.getPullStatus()) { + case FOUND: + // Handle pull result. + if (!messageQueueTreeSetMap.keySet().contains(pullRequest.getMessageQueue())) { + messageQueueTreeSetMap.putIfAbsent(pullRequest.getMessageQueue(), + new ConcurrentSkipListSet<>(new MessageComparator())); + } + messageQueueTreeSetMap.get(pullRequest.getMessageQueue()).addAll(pullResult.getMsgFoundList()); + break; + + case NO_NEW_MSG: + case NO_MATCHED_MSG: + break; + + case OFFSET_ILLEGAL: + LOGGER.error("Bad Pull Request: Offset is illegal. Offset used: {}", + pullRequest.getNextBeginOffset()); + break; + + default: + break; } - messageQueueTreeSetMap.get(pullRequest.getMessageQueue()).addAll(pullResult.getMsgFoundList()); - } catch (MQClientException | RemotingException | MQBrokerException | InterruptedException e) { LOGGER.error("Failed to pull message from broker.", e); } finally { @@ -475,7 +456,7 @@ public class RocketMQFirehoseFactory implements FirehoseFactory { + final class MessageComparator implements Comparator { @Override public int compare(MessageExt lhs, MessageExt rhs) { return lhs.getQueueOffset() < rhs.getQueueOffset() ? -1 : (lhs.getQueueOffset() == rhs.getQueueOffset() ? 0 : 1); @@ -486,9 +467,9 @@ public class RocketMQFirehoseFactory implements FirehoseFactory topics; + private final Set topics; private final ConcurrentHashMap> topicQueueMap;