From da5d7b93769124f6e65d9332dcce020163449614 Mon Sep 17 00:00:00 2001 From: Li Zhanhui Date: Mon, 18 Jan 2016 16:37:03 +0800 Subject: [PATCH] Reformat code according to Druid Java and Scala style --- extensions/druid-rocketmq/pom.xml | 49 +- .../rocketmq/RocketMQDruidModule.java | 29 +- .../rocketmq/RocketMQFirehoseFactory.java | 897 ++++++++++-------- 3 files changed, 513 insertions(+), 462 deletions(-) diff --git a/extensions/druid-rocketmq/pom.xml b/extensions/druid-rocketmq/pom.xml index 0e17066dae7..adcc523744c 100644 --- a/extensions/druid-rocketmq/pom.xml +++ b/extensions/druid-rocketmq/pom.xml @@ -20,33 +20,28 @@ - 4.0.0 - - druid - io.druid - 0.9.0-SNAPSHOT - ../../pom.xml - + 4.0.0 + + druid + io.druid + 0.9.0-SNAPSHOT + ../../pom.xml + + druid-rocketmq - - 3.2.6 - - - druid-rocketmq - - - - - com.alibaba.rocketmq - rocketmq-client - ${rocketmq.version} - - - - io.druid - druid-api - - - + + 3.2.6 + + + + com.alibaba.rocketmq + rocketmq-client + ${rocketmq.version} + + + io.druid + druid-api + + diff --git a/extensions/druid-rocketmq/src/main/java/io/druid/firehose/rocketmq/RocketMQDruidModule.java b/extensions/druid-rocketmq/src/main/java/io/druid/firehose/rocketmq/RocketMQDruidModule.java index e45c1c254af..64414bbcd9f 100644 --- a/extensions/druid-rocketmq/src/main/java/io/druid/firehose/rocketmq/RocketMQDruidModule.java +++ b/extensions/druid-rocketmq/src/main/java/io/druid/firehose/rocketmq/RocketMQDruidModule.java @@ -27,20 +27,23 @@ import io.druid.initialization.DruidModule; import java.util.List; -public class RocketMQDruidModule implements DruidModule { +public class RocketMQDruidModule implements DruidModule +{ - @Override - public List getJacksonModules() { - return ImmutableList.of( - new SimpleModule("RocketMQFirehoseModule") - .registerSubtypes( - new NamedType(RocketMQFirehoseFactory.class, "rocketMQ") - ) - ); - } + @Override + public List getJacksonModules() + { + return ImmutableList.of( + new SimpleModule("RocketMQFirehoseModule") + .registerSubtypes( + new NamedType(RocketMQFirehoseFactory.class, "rocketMQ") + ) + ); + } - @Override - public void configure(Binder binder) { + @Override + public void configure(Binder binder) + { - } + } } diff --git a/extensions/druid-rocketmq/src/main/java/io/druid/firehose/rocketmq/RocketMQFirehoseFactory.java b/extensions/druid-rocketmq/src/main/java/io/druid/firehose/rocketmq/RocketMQFirehoseFactory.java index b1dffc01cdc..f3dcce46dd0 100644 --- a/extensions/druid-rocketmq/src/main/java/io/druid/firehose/rocketmq/RocketMQFirehoseFactory.java +++ b/extensions/druid-rocketmq/src/main/java/io/druid/firehose/rocketmq/RocketMQFirehoseFactory.java @@ -48,471 +48,524 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentSkipListSet; import java.util.concurrent.CountDownLatch; -public class RocketMQFirehoseFactory implements FirehoseFactory { +public class RocketMQFirehoseFactory implements FirehoseFactory +{ - private static final Logger LOGGER = new Logger(RocketMQFirehoseFactory.class); + private static final Logger LOGGER = new Logger(RocketMQFirehoseFactory.class); - /** - * Passed in configuration for consumer client. - * This provides an approach to override default values defined in {@link com.alibaba.rocketmq.common.MixAll}. - */ - @JsonProperty - private final Properties consumerProps; + /** + * Passed in configuration for consumer client. + * This provides an approach to override default values defined in {@link com.alibaba.rocketmq.common.MixAll}. + */ + @JsonProperty + private final Properties consumerProps; - /** - * Consumer group. It's required. - */ - @JsonProperty(required = true) - private final String consumerGroup; + /** + * Consumer group. It's required. + */ + @JsonProperty(required = true) + private final String consumerGroup; - /** - * Topics to consume. It's required. - */ - @JsonProperty(required = true) - private final List feed; + /** + * Topics to consume. It's required. + */ + @JsonProperty(required = true) + private final List feed; - /** - * Pull batch size. It's optional. - */ - @JsonProperty - private final String pullBatchSize; + /** + * Pull batch size. It's optional. + */ + @JsonProperty + private final String pullBatchSize; - /** - * Store messages that are fetched from brokers but not yet delivered to druid via fire hose. - */ - private final ConcurrentHashMap> messageQueueTreeSetMap = - new ConcurrentHashMap<>(); + /** + * Store messages that are fetched from brokers but not yet delivered to druid via fire hose. + */ + private final ConcurrentHashMap> messageQueueTreeSetMap = + new ConcurrentHashMap<>(); - /** - * Store message consuming status. - */ - private final ConcurrentHashMap> windows = new ConcurrentHashMap<>(); + /** + * Store message consuming status. + */ + private final ConcurrentHashMap> windows = new ConcurrentHashMap<>(); - /** - * Default pull batch size. - */ - private static final int DEFAULT_PULL_BATCH_SIZE = 32; + /** + * Default pull batch size. + */ + private static final int DEFAULT_PULL_BATCH_SIZE = 32; - @JsonCreator - public RocketMQFirehoseFactory(@JsonProperty("consumerProps") Properties consumerProps, - @JsonProperty("consumerGroup") String consumerGroup, - @JsonProperty("feed") List feed, - @JsonProperty("pullBatchSize") String pullBatchSize) { - this.consumerProps = consumerProps; - this.pullBatchSize = pullBatchSize; - for (Map.Entry configItem : this.consumerProps.entrySet()) { - System.setProperty(configItem.getKey().toString(), configItem.getValue().toString()); - } - this.consumerGroup = consumerGroup; - this.feed = feed; + @JsonCreator + public RocketMQFirehoseFactory( + @JsonProperty("consumerProps") Properties consumerProps, + @JsonProperty("consumerGroup") String consumerGroup, + @JsonProperty("feed") List feed, + @JsonProperty("pullBatchSize") String pullBatchSize + ) + { + this.consumerProps = consumerProps; + this.pullBatchSize = pullBatchSize; + for (Map.Entry configItem : this.consumerProps.entrySet()) { + System.setProperty(configItem.getKey().toString(), configItem.getValue().toString()); + } + this.consumerGroup = consumerGroup; + this.feed = feed; + } + + /** + * Check if there are locally pending messages to consume. + * + * @return true if there are some; false otherwise. + */ + private boolean hasMessagesPending() + { + + for (ConcurrentHashMap.Entry> entry : messageQueueTreeSetMap.entrySet()) { + if (!entry.getValue().isEmpty()) { + return true; + } } - /** - * Check if there are locally pending messages to consume. - * @return true if there are some; false otherwise. - */ - private boolean hasMessagesPending() { + return false; + } - for (ConcurrentHashMap.Entry> entry : messageQueueTreeSetMap.entrySet()) { - if (!entry.getValue().isEmpty()) { - return true; + @Override + public Firehose connect(ByteBufferInputRowParser byteBufferInputRowParser) throws IOException, ParseException + { + + Set newDimExclus = Sets.union( + byteBufferInputRowParser.getParseSpec().getDimensionsSpec().getDimensionExclusions(), + Sets.newHashSet("feed") + ); + + final ByteBufferInputRowParser theParser = byteBufferInputRowParser.withParseSpec( + byteBufferInputRowParser.getParseSpec() + .withDimensionsSpec( + byteBufferInputRowParser.getParseSpec() + .getDimensionsSpec() + .withDimensionExclusions( + newDimExclus + ) + ) + ); + + /** + * Topic-Queue mapping. + */ + final ConcurrentHashMap> topicQueueMap; + + /** + * Default Pull-style client for RocketMQ. + */ + final DefaultMQPullConsumer defaultMQPullConsumer; + final DruidPullMessageService pullMessageService; + + messageQueueTreeSetMap.clear(); + windows.clear(); + + try { + defaultMQPullConsumer = new DefaultMQPullConsumer(this.consumerGroup); + defaultMQPullConsumer.setMessageModel(MessageModel.CLUSTERING); + topicQueueMap = new ConcurrentHashMap<>(); + + pullMessageService = new DruidPullMessageService(defaultMQPullConsumer); + for (String topic : feed) { + Validators.checkTopic(topic); + topicQueueMap.put(topic, defaultMQPullConsumer.fetchSubscribeMessageQueues(topic)); + } + DruidMessageQueueListener druidMessageQueueListener = + new DruidMessageQueueListener(Sets.newHashSet(feed), topicQueueMap, defaultMQPullConsumer); + defaultMQPullConsumer.setMessageQueueListener(druidMessageQueueListener); + defaultMQPullConsumer.start(); + pullMessageService.start(); + } + catch (MQClientException e) { + LOGGER.error("Failed to start DefaultMQPullConsumer", e); + throw new IOException("Failed to start RocketMQ client", e); + } + + return new Firehose() + { + + @Override + public boolean hasMore() + { + boolean hasMore = false; + DruidPullRequest earliestPullRequest = null; + + for (Map.Entry> entry : topicQueueMap.entrySet()) { + for (MessageQueue messageQueue : entry.getValue()) { + if (messageQueueTreeSetMap.keySet().contains(messageQueue) + && !messageQueueTreeSetMap.get(messageQueue).isEmpty()) { + hasMore = true; + } else { + try { + long offset = defaultMQPullConsumer.fetchConsumeOffset(messageQueue, false); + int batchSize = (null == pullBatchSize || pullBatchSize.isEmpty()) ? + DEFAULT_PULL_BATCH_SIZE : Integer.parseInt(pullBatchSize); + + DruidPullRequest newPullRequest = new DruidPullRequest(messageQueue, null, offset, + batchSize, !hasMessagesPending() + ); + + // notify pull message service to pull messages from brokers. + pullMessageService.putRequest(newPullRequest); + + // set the earliest pull in case we need to block. + if (null == earliestPullRequest) { + earliestPullRequest = newPullRequest; + } + } + catch (MQClientException e) { + LOGGER.error("Failed to fetch consume offset for queue: {}", entry.getKey()); + } } + } } - return false; + // Block only when there is no locally pending messages. + if (!hasMore && null != earliestPullRequest) { + try { + earliestPullRequest.getCountDownLatch().await(); + hasMore = true; + } + catch (InterruptedException e) { + LOGGER.error("CountDownLatch await got interrupted", e); + } + } + return hasMore; + } + + @Override + public InputRow nextRow() + { + for (Map.Entry> entry : messageQueueTreeSetMap.entrySet()) { + if (!entry.getValue().isEmpty()) { + MessageExt message = entry.getValue().pollFirst(); + InputRow inputRow = theParser.parse(ByteBuffer.wrap(message.getBody())); + + if (!windows.keySet().contains(entry.getKey())) { + windows.put(entry.getKey(), new ConcurrentSkipListSet()); + } + windows.get(entry.getKey()).add(message.getQueueOffset()); + return inputRow; + } + } + + // should never happen. + throw new RuntimeException("Unexpected Fatal Error! There should have been one row available."); + } + + @Override + public Runnable commit() + { + return new Runnable() + { + @Override + public void run() + { + OffsetStore offsetStore = defaultMQPullConsumer.getOffsetStore(); + Set updated = new HashSet<>(); + // calculate offsets according to consuming windows. + for (ConcurrentHashMap.Entry> entry : windows.entrySet()) { + while (!entry.getValue().isEmpty()) { + + long offset = offsetStore.readOffset(entry.getKey(), ReadOffsetType.MEMORY_FIRST_THEN_STORE); + if (offset + 1 > entry.getValue().first()) { + entry.getValue().pollFirst(); + } else if (offset + 1 == entry.getValue().first()) { + entry.getValue().pollFirst(); + offsetStore.updateOffset(entry.getKey(), offset + 1, true); + updated.add(entry.getKey()); + } else { + break; + } + + } + } + offsetStore.persistAll(updated); + } + }; + } + + @Override + public void close() throws IOException + { + defaultMQPullConsumer.shutdown(); + pullMessageService.shutdown(false); + } + }; + } + + + /** + * Pull request. + */ + final class DruidPullRequest + { + private final MessageQueue messageQueue; + private final String tag; + private final long nextBeginOffset; + private final int pullBatchSize; + private final boolean longPull; + private final CountDownLatch countDownLatch; + + public DruidPullRequest( + final MessageQueue messageQueue, + final String tag, + final long nextBeginOffset, + final int pullBatchSize, + final boolean useLongPull + ) + { + this.messageQueue = messageQueue; + this.tag = (null == tag ? "*" : tag); + this.nextBeginOffset = nextBeginOffset; + this.pullBatchSize = pullBatchSize; + this.longPull = useLongPull; + countDownLatch = new CountDownLatch(1); + } + + public MessageQueue getMessageQueue() + { + return messageQueue; + } + + public long getNextBeginOffset() + { + return nextBeginOffset; + } + + public String getTag() + { + return tag; + } + + public int getPullBatchSize() + { + return pullBatchSize; + } + + public boolean isLongPull() + { + return longPull; + } + + public CountDownLatch getCountDownLatch() + { + return countDownLatch; + } + } + + + /** + * Pull message service for druid. + *

+ * Note: this is a single thread service. + */ + final class DruidPullMessageService extends ServiceThread + { + + private volatile List requestsWrite = new ArrayList<>(); + private volatile List requestsRead = new ArrayList<>(); + + private final DefaultMQPullConsumer defaultMQPullConsumer; + + public DruidPullMessageService(final DefaultMQPullConsumer defaultMQPullConsumer) + { + this.defaultMQPullConsumer = defaultMQPullConsumer; + } + + public void putRequest(final DruidPullRequest request) + { + synchronized (this) { + this.requestsWrite.add(request); + if (!hasNotified) { + hasNotified = true; + notify(); + } + } + } + + private void swapRequests() + { + List tmp = requestsWrite; + requestsWrite = requestsRead; + requestsRead = tmp; } @Override - public Firehose connect(ByteBufferInputRowParser byteBufferInputRowParser) throws IOException, ParseException { - - Set newDimExclus = Sets.union( - byteBufferInputRowParser.getParseSpec().getDimensionsSpec().getDimensionExclusions(), - Sets.newHashSet("feed") - ); - - final ByteBufferInputRowParser theParser = byteBufferInputRowParser.withParseSpec( - byteBufferInputRowParser.getParseSpec() - .withDimensionsSpec( - byteBufferInputRowParser.getParseSpec() - .getDimensionsSpec() - .withDimensionExclusions( - newDimExclus - ) - ) - ); - - /** - * Topic-Queue mapping. - */ - final ConcurrentHashMap> topicQueueMap; - - /** - * Default Pull-style client for RocketMQ. - */ - final DefaultMQPullConsumer defaultMQPullConsumer; - final DruidPullMessageService pullMessageService; - - messageQueueTreeSetMap.clear(); - windows.clear(); + public String getServiceName() + { + return getClass().getSimpleName(); + } + /** + * Core message pulling logic code goes here. + */ + private void doPull() + { + for (DruidPullRequest pullRequest : requestsRead) { + PullResult pullResult; try { - defaultMQPullConsumer = new DefaultMQPullConsumer(this.consumerGroup); - defaultMQPullConsumer.setMessageModel(MessageModel.CLUSTERING); - topicQueueMap = new ConcurrentHashMap<>(); + if (!pullRequest.isLongPull()) { + pullResult = defaultMQPullConsumer.pull( + pullRequest.getMessageQueue(), + pullRequest.getTag(), + pullRequest.getNextBeginOffset(), + pullRequest.getPullBatchSize() + ); + } else { + pullResult = defaultMQPullConsumer.pullBlockIfNotFound( + pullRequest.getMessageQueue(), + pullRequest.getTag(), + pullRequest.getNextBeginOffset(), + pullRequest.getPullBatchSize() + ); + } - pullMessageService = new DruidPullMessageService(defaultMQPullConsumer); - for (String topic : feed) { - Validators.checkTopic(topic); - topicQueueMap.put(topic, defaultMQPullConsumer.fetchSubscribeMessageQueues(topic)); - } - DruidMessageQueueListener druidMessageQueueListener = - new DruidMessageQueueListener(Sets.newHashSet(feed), topicQueueMap, defaultMQPullConsumer); - defaultMQPullConsumer.setMessageQueueListener(druidMessageQueueListener); - defaultMQPullConsumer.start(); - pullMessageService.start(); - } catch (MQClientException e) { - LOGGER.error("Failed to start DefaultMQPullConsumer", e); - throw new IOException("Failed to start RocketMQ client", e); + switch (pullResult.getPullStatus()) { + case FOUND: + // Handle pull result. + if (!messageQueueTreeSetMap.keySet().contains(pullRequest.getMessageQueue())) { + messageQueueTreeSetMap.putIfAbsent( + pullRequest.getMessageQueue(), + new ConcurrentSkipListSet<>(new MessageComparator()) + ); + } + messageQueueTreeSetMap.get(pullRequest.getMessageQueue()).addAll(pullResult.getMsgFoundList()); + break; + + case NO_NEW_MSG: + case NO_MATCHED_MSG: + break; + + case OFFSET_ILLEGAL: + LOGGER.error( + "Bad Pull Request: Offset is illegal. Offset used: {}", + pullRequest.getNextBeginOffset() + ); + break; + + default: + break; + } + } + catch (MQClientException | RemotingException | MQBrokerException | InterruptedException e) { + LOGGER.error("Failed to pull message from broker.", e); + } + finally { + pullRequest.getCountDownLatch().countDown(); } - return new Firehose() { - - @Override - public boolean hasMore() { - boolean hasMore = false; - DruidPullRequest earliestPullRequest = null; - - for (Map.Entry> entry : topicQueueMap.entrySet()) { - for (MessageQueue messageQueue : entry.getValue()) { - if (messageQueueTreeSetMap.keySet().contains(messageQueue) - && !messageQueueTreeSetMap.get(messageQueue).isEmpty()) { - hasMore = true; - } else { - try { - long offset = defaultMQPullConsumer.fetchConsumeOffset(messageQueue, false); - int batchSize = (null == pullBatchSize || pullBatchSize.isEmpty()) ? - DEFAULT_PULL_BATCH_SIZE:Integer.parseInt(pullBatchSize); - - DruidPullRequest newPullRequest = new DruidPullRequest(messageQueue, null, offset, - batchSize, !hasMessagesPending()); - - // notify pull message service to pull messages from brokers. - pullMessageService.putRequest(newPullRequest); - - // set the earliest pull in case we need to block. - if (null == earliestPullRequest) { - earliestPullRequest = newPullRequest; - } - } catch (MQClientException e) { - LOGGER.error("Failed to fetch consume offset for queue: {}", entry.getKey()); - } - } - } - } - - // Block only when there is no locally pending messages. - if (!hasMore && null != earliestPullRequest) { - try { - earliestPullRequest.getCountDownLatch().await(); - hasMore = true; - } catch (InterruptedException e) { - LOGGER.error("CountDownLatch await got interrupted", e); - } - } - return hasMore; - } - - @Override - public InputRow nextRow() { - for (Map.Entry> entry : messageQueueTreeSetMap.entrySet()) { - if (!entry.getValue().isEmpty()) { - MessageExt message = entry.getValue().pollFirst(); - InputRow inputRow = theParser.parse(ByteBuffer.wrap(message.getBody())); - - if (!windows.keySet().contains(entry.getKey())) { - windows.put(entry.getKey(), new ConcurrentSkipListSet()); - } - windows.get(entry.getKey()).add(message.getQueueOffset()); - return inputRow; - } - } - - // should never happen. - throw new RuntimeException("Unexpected Fatal Error! There should have been one row available."); - } - - @Override - public Runnable commit() { - return new Runnable() { - @Override - public void run() { - OffsetStore offsetStore = defaultMQPullConsumer.getOffsetStore(); - Set updated = new HashSet<>(); - // calculate offsets according to consuming windows. - for (ConcurrentHashMap.Entry> entry : windows.entrySet()) { - while (!entry.getValue().isEmpty()) { - - long offset = offsetStore.readOffset(entry.getKey(), ReadOffsetType.MEMORY_FIRST_THEN_STORE); - if (offset + 1 > entry.getValue().first()) { - entry.getValue().pollFirst(); - } else if (offset + 1 == entry.getValue().first()) { - entry.getValue().pollFirst(); - offsetStore.updateOffset(entry.getKey(), offset + 1, true); - updated.add(entry.getKey()); - } else { - break; - } - - } - } - offsetStore.persistAll(updated); - } - }; - } - - @Override - public void close() throws IOException { - defaultMQPullConsumer.shutdown(); - pullMessageService.shutdown(false); - } - }; + } + requestsRead.clear(); } - /** - * Pull request. + * Thread looping entry. */ - final class DruidPullRequest { - private final MessageQueue messageQueue; - private final String tag; - private final long nextBeginOffset; - private final int pullBatchSize; - private final boolean longPull; - private final CountDownLatch countDownLatch; + @Override + public void run() + { + LOGGER.info(getServiceName() + " starts."); + while (!isStoped()) { + waitForRunning(0); + doPull(); + } - public DruidPullRequest(final MessageQueue messageQueue, - final String tag, - final long nextBeginOffset, - final int pullBatchSize, - final boolean useLongPull) { - this.messageQueue = messageQueue; - this.tag = (null == tag ? "*" : tag); - this.nextBeginOffset = nextBeginOffset; - this.pullBatchSize = pullBatchSize; - this.longPull = useLongPull; - countDownLatch = new CountDownLatch(1); - } + // in case this service is shutdown gracefully without interruption. + try { + Thread.sleep(10); + } + catch (InterruptedException e) { + LOGGER.error("", e); + } - public MessageQueue getMessageQueue() { - return messageQueue; - } + synchronized (this) { + swapRequests(); + } - public long getNextBeginOffset() { - return nextBeginOffset; - } - - public String getTag() { - return tag; - } - - public int getPullBatchSize() { - return pullBatchSize; - } - - public boolean isLongPull() { - return longPull; - } - - public CountDownLatch getCountDownLatch() { - return countDownLatch; - } + doPull(); + LOGGER.info(getServiceName() + " terminated."); } + @Override + protected void onWaitEnd() + { + swapRequests(); + } + } - /** - * Pull message service for druid. - * - * Note: this is a single thread service. - */ - final class DruidPullMessageService extends ServiceThread { - private volatile List requestsWrite = new ArrayList<>(); - private volatile List requestsRead = new ArrayList<>(); + /** + * Compare messages pulled from same message queue according to queue offset. + */ + static final class MessageComparator implements Comparator + { + @Override + public int compare(MessageExt lhs, MessageExt rhs) + { + return Long.compare(lhs.getQueueOffset(), lhs.getQueueOffset()); + } + } - private final DefaultMQPullConsumer defaultMQPullConsumer; - public DruidPullMessageService(final DefaultMQPullConsumer defaultMQPullConsumer) { - this.defaultMQPullConsumer = defaultMQPullConsumer; - } + /** + * Handle message queues re-balance operations. + */ + final class DruidMessageQueueListener implements MessageQueueListener + { - public void putRequest(final DruidPullRequest request) { - synchronized (this) { - this.requestsWrite.add(request); - if (!hasNotified) { - hasNotified = true; - notify(); - } - } - } + private final Set topics; - private void swapRequests() { - List tmp = requestsWrite; - requestsWrite = requestsRead; - requestsRead = tmp; - } + private final ConcurrentHashMap> topicQueueMap; - @Override - public String getServiceName() { - return getClass().getSimpleName(); - } + private final DefaultMQPullConsumer defaultMQPullConsumer; - /** - * Core message pulling logic code goes here. - */ - private void doPull() { - for (DruidPullRequest pullRequest : requestsRead) { - PullResult pullResult; - try { - if (!pullRequest.isLongPull()) { - pullResult = defaultMQPullConsumer.pull( - pullRequest.getMessageQueue(), - pullRequest.getTag(), - pullRequest.getNextBeginOffset(), - pullRequest.getPullBatchSize()); - } else { - pullResult = defaultMQPullConsumer.pullBlockIfNotFound( - pullRequest.getMessageQueue(), - pullRequest.getTag(), - pullRequest.getNextBeginOffset(), - pullRequest.getPullBatchSize() - ); - } - - switch(pullResult.getPullStatus()) { - case FOUND: - // Handle pull result. - if (!messageQueueTreeSetMap.keySet().contains(pullRequest.getMessageQueue())) { - messageQueueTreeSetMap.putIfAbsent(pullRequest.getMessageQueue(), - new ConcurrentSkipListSet<>(new MessageComparator())); - } - messageQueueTreeSetMap.get(pullRequest.getMessageQueue()).addAll(pullResult.getMsgFoundList()); - break; - - case NO_NEW_MSG: - case NO_MATCHED_MSG: - break; - - case OFFSET_ILLEGAL: - LOGGER.error("Bad Pull Request: Offset is illegal. Offset used: {}", - pullRequest.getNextBeginOffset()); - break; - - default: - break; - } - } catch (MQClientException | RemotingException | MQBrokerException | InterruptedException e) { - LOGGER.error("Failed to pull message from broker.", e); - } finally { - pullRequest.getCountDownLatch().countDown(); - } - - } - requestsRead.clear(); - } - - /** - * Thread looping entry. - */ - @Override - public void run() { - LOGGER.info(getServiceName() + " starts."); - while (!isStoped()) { - waitForRunning(0); - doPull(); - } - - // in case this service is shutdown gracefully without interruption. - try { - Thread.sleep(10); - } catch (InterruptedException e) { - LOGGER.error("", e); - } - - synchronized (this) { - swapRequests(); - } - - doPull(); - LOGGER.info(getServiceName() + " terminated."); - } - - @Override - protected void onWaitEnd() { - swapRequests(); - } + public DruidMessageQueueListener( + final Set topics, + final ConcurrentHashMap> topicQueueMap, + final DefaultMQPullConsumer defaultMQPullConsumer + ) + { + this.topics = topics; + this.topicQueueMap = topicQueueMap; + this.defaultMQPullConsumer = defaultMQPullConsumer; } + @Override + public void messageQueueChanged(String topic, Set mqAll, Set mqDivided) + { + if (topics.contains(topic)) { + topicQueueMap.put(topic, mqDivided); - /** - * Compare messages pulled from same message queue according to queue offset. - */ - static final class MessageComparator implements Comparator { - @Override - public int compare(MessageExt lhs, MessageExt rhs) { - return Long.compare(lhs.getQueueOffset(), lhs.getQueueOffset()); - } - } - - - /** - * Handle message queues re-balance operations. - */ - final class DruidMessageQueueListener implements MessageQueueListener { - - private final Set topics; - - private final ConcurrentHashMap> topicQueueMap; - - private final DefaultMQPullConsumer defaultMQPullConsumer; - - public DruidMessageQueueListener(final Set topics, - final ConcurrentHashMap> topicQueueMap, - final DefaultMQPullConsumer defaultMQPullConsumer) { - this.topics = topics; - this.topicQueueMap = topicQueueMap; - this.defaultMQPullConsumer = defaultMQPullConsumer; - } - - @Override - public void messageQueueChanged(String topic, Set mqAll, Set mqDivided) { - if (topics.contains(topic)) { - topicQueueMap.put(topic, mqDivided); - - // Remove message queues that are re-assigned to other clients. - Iterator>> it = - messageQueueTreeSetMap.entrySet().iterator(); - while (it.hasNext()) { - if (!mqDivided.contains(it.next().getKey())) { - it.remove(); - } - } - - StringBuilder stringBuilder = new StringBuilder(); - for (MessageQueue messageQueue : mqDivided) { - stringBuilder.append(messageQueue.getBrokerName()) - .append("#") - .append(messageQueue.getQueueId()) - .append(", "); - } - - if (LOGGER.isDebugEnabled() && stringBuilder.length() > 2) { - LOGGER.debug(String.format("%s@%s is consuming the following message queues: %s", - defaultMQPullConsumer.getClientIP(), - defaultMQPullConsumer.getInstanceName(), - stringBuilder.substring(0, stringBuilder.length() - 2) /*Remove the trailing comma*/)); - } - } - + // Remove message queues that are re-assigned to other clients. + Iterator>> it = + messageQueueTreeSetMap.entrySet().iterator(); + while (it.hasNext()) { + if (!mqDivided.contains(it.next().getKey())) { + it.remove(); + } } + + StringBuilder stringBuilder = new StringBuilder(); + for (MessageQueue messageQueue : mqDivided) { + stringBuilder.append(messageQueue.getBrokerName()) + .append("#") + .append(messageQueue.getQueueId()) + .append(", "); + } + + if (LOGGER.isDebugEnabled() && stringBuilder.length() > 2) { + LOGGER.debug(String.format( + "%s@%s is consuming the following message queues: %s", + defaultMQPullConsumer.getClientIP(), + defaultMQPullConsumer.getInstanceName(), + stringBuilder.substring(0, stringBuilder.length() - 2) /*Remove the trailing comma*/ + )); + } + } + } + } }