Update code on PR comments

This commit is contained in:
Li Zhanhui 2016-01-11 11:38:32 +08:00
parent 8eb332c1c4
commit 3d46356dc9
2 changed files with 80 additions and 99 deletions

View File

@ -34,7 +34,7 @@ public class RocketMQDruidModule implements DruidModule {
return ImmutableList.of( return ImmutableList.of(
new SimpleModule("RocketMQFirehoseModule") new SimpleModule("RocketMQFirehoseModule")
.registerSubtypes( .registerSubtypes(
new NamedType(RocketMQFirehoseFactory.class, "RocketMQ-3.2.6") new NamedType(RocketMQFirehoseFactory.class, "RocketMQ")
) )
); );
} }

View File

@ -18,6 +18,7 @@
*/ */
package io.druid.firehose.rocketmq; package io.druid.firehose.rocketmq;
import com.alibaba.rocketmq.client.Validators;
import com.alibaba.rocketmq.client.consumer.DefaultMQPullConsumer; import com.alibaba.rocketmq.client.consumer.DefaultMQPullConsumer;
import com.alibaba.rocketmq.client.consumer.MessageQueueListener; import com.alibaba.rocketmq.client.consumer.MessageQueueListener;
import com.alibaba.rocketmq.client.consumer.PullResult; import com.alibaba.rocketmq.client.consumer.PullResult;
@ -54,23 +55,28 @@ public class RocketMQFirehoseFactory implements FirehoseFactory<ByteBufferInputR
/** /**
* Passed in configuration for consumer client. * Passed in configuration for consumer client.
* This provides an approach to overrides default values defined in {@link com.alibaba.rocketmq.common.MixAll}.
*/ */
@JsonProperty @JsonProperty
private final Properties consumerProps; private final Properties consumerProps;
/** /**
* Consumer group. * Consumer group. It's required.
*/ */
@JsonProperty @JsonProperty(required = true)
private final String consumerGroup; private final String consumerGroup;
/** /**
* Topics to consume. * Topics to consume. It's required.
* Multiple topics are separated by comma ",". */
@JsonProperty(required = true)
private final List<String> feed;
/**
* Pull batch size. It's optional.
*/ */
@JsonProperty @JsonProperty
private final String feed; private final String pullBatchSize;
/** /**
* Store messages that are fetched from brokers but not yet delivered to druid via fire hose. * Store messages that are fetched from brokers but not yet delivered to druid via fire hose.
@ -85,13 +91,15 @@ public class RocketMQFirehoseFactory implements FirehoseFactory<ByteBufferInputR
/** /**
* Default pull batch size. * Default pull batch size.
*/ */
private static final int PULL_BATCH_SIZE = 32; private static final int DEFAULT_PULL_BATCH_SIZE = 32;
@JsonCreator @JsonCreator
public RocketMQFirehoseFactory(@JsonProperty("consumerProps") Properties consumerProps, public RocketMQFirehoseFactory(@JsonProperty("consumerProps") Properties consumerProps,
@JsonProperty("consumerGroup") String consumerGroup, @JsonProperty("consumerGroup") String consumerGroup,
@JsonProperty("feed") String feed) { @JsonProperty("feed") List<String> feed,
@JsonProperty("pullBatchSize") String pullBatchSize) {
this.consumerProps = consumerProps; this.consumerProps = consumerProps;
this.pullBatchSize = pullBatchSize;
for (Map.Entry<Object, Object> configItem : this.consumerProps.entrySet()) { for (Map.Entry<Object, Object> configItem : this.consumerProps.entrySet()) {
System.setProperty(configItem.getKey().toString(), configItem.getValue().toString()); System.setProperty(configItem.getKey().toString(), configItem.getValue().toString());
} }
@ -153,17 +161,12 @@ public class RocketMQFirehoseFactory implements FirehoseFactory<ByteBufferInputR
topicQueueMap = new ConcurrentHashMap<>(); topicQueueMap = new ConcurrentHashMap<>();
pullMessageService = new DruidPullMessageService(defaultMQPullConsumer); pullMessageService = new DruidPullMessageService(defaultMQPullConsumer);
String[] topics = feed.split(","); for (String topic : feed) {
for (String topic : topics) { Validators.checkTopic(topic);
topic = topic.trim(); topicQueueMap.put(topic, defaultMQPullConsumer.fetchSubscribeMessageQueues(topic));
if (topic.isEmpty()) {
continue;
}
defaultMQPullConsumer.fetchSubscribeMessageQueues(topic);
topicQueueMap.put(topic, defaultMQPullConsumer.fetchMessageQueuesInBalance(topic));
} }
DruidMessageQueueListener druidMessageQueueListener = DruidMessageQueueListener druidMessageQueueListener =
new DruidMessageQueueListener(Sets.newHashSet(topics), topicQueueMap, defaultMQPullConsumer); new DruidMessageQueueListener(Sets.newHashSet(feed), topicQueueMap, defaultMQPullConsumer);
defaultMQPullConsumer.setMessageQueueListener(druidMessageQueueListener); defaultMQPullConsumer.setMessageQueueListener(druidMessageQueueListener);
defaultMQPullConsumer.start(); defaultMQPullConsumer.start();
pullMessageService.start(); pullMessageService.start();
@ -185,23 +188,23 @@ public class RocketMQFirehoseFactory implements FirehoseFactory<ByteBufferInputR
&& !messageQueueTreeSetMap.get(messageQueue).isEmpty()) { && !messageQueueTreeSetMap.get(messageQueue).isEmpty()) {
hasMore = true; hasMore = true;
} else { } else {
DruidPullRequest newPullRequest = new DruidPullRequest();
newPullRequest.setMessageQueue(messageQueue);
try { try {
long offset = defaultMQPullConsumer.fetchConsumeOffset(messageQueue, false); long offset = defaultMQPullConsumer.fetchConsumeOffset(messageQueue, false);
newPullRequest.setNextBeginOffset(offset); int batchSize = (null == pullBatchSize || pullBatchSize.isEmpty()) ?
DEFAULT_PULL_BATCH_SIZE:Integer.parseInt(pullBatchSize);
DruidPullRequest newPullRequest = new DruidPullRequest(messageQueue, null, offset,
batchSize, !hasMessagesPending());
// notify pull message service to pull messages from brokers.
pullMessageService.putRequest(newPullRequest);
// set the earliest pull in case we need to block.
if (null == earliestPullRequest) {
earliestPullRequest = newPullRequest;
}
} catch (MQClientException e) { } catch (MQClientException e) {
LOGGER.error("Failed to fetch consume offset for queue: {}", entry.getKey()); LOGGER.error("Failed to fetch consume offset for queue: {}", entry.getKey());
continue;
}
newPullRequest.setLongPull(!hasMessagesPending());
// notify pull message service to pull messages from brokers.
pullMessageService.putRequest(newPullRequest);
// set the earliest pull in case we need to block.
if (null == earliestPullRequest) {
earliestPullRequest = newPullRequest;
} }
} }
} }
@ -235,7 +238,7 @@ public class RocketMQFirehoseFactory implements FirehoseFactory<ByteBufferInputR
} }
// should never happen. // should never happen.
return null; throw new RuntimeException("Unexpected Fatal Error! There should have been one row available.");
} }
@Override @Override
@ -279,31 +282,31 @@ public class RocketMQFirehoseFactory implements FirehoseFactory<ByteBufferInputR
/** /**
* Pull request. * Pull request.
*/ */
class DruidPullRequest { final class DruidPullRequest {
private MessageQueue messageQueue; private final MessageQueue messageQueue;
private String tag; private final String tag;
private long nextBeginOffset; private final long nextBeginOffset;
private int pullBatchSize; private final int pullBatchSize;
private boolean longPull; private final boolean longPull;
private CountDownLatch countDownLatch; private final CountDownLatch countDownLatch;
private PullResult pullResult;
private boolean successful;
public DruidPullRequest() { public DruidPullRequest(final MessageQueue messageQueue,
final String tag,
final long nextBeginOffset,
final int pullBatchSize,
final boolean useLongPull) {
this.messageQueue = messageQueue;
this.tag = (null == tag ? "*" : tag);
this.nextBeginOffset = nextBeginOffset;
this.pullBatchSize = pullBatchSize;
this.longPull = useLongPull;
countDownLatch = new CountDownLatch(1); countDownLatch = new CountDownLatch(1);
tag = "*";
pullBatchSize = PULL_BATCH_SIZE;
successful = false;
} }
public MessageQueue getMessageQueue() { public MessageQueue getMessageQueue() {
return messageQueue; return messageQueue;
} }
public void setMessageQueue(MessageQueue messageQueue) {
this.messageQueue = messageQueue;
}
public long getNextBeginOffset() { public long getNextBeginOffset() {
return nextBeginOffset; return nextBeginOffset;
} }
@ -312,53 +315,17 @@ public class RocketMQFirehoseFactory implements FirehoseFactory<ByteBufferInputR
return tag; return tag;
} }
public void setTag(String tag) {
this.tag = tag;
}
public void setNextBeginOffset(long nextBeginOffset) {
this.nextBeginOffset = nextBeginOffset;
}
public int getPullBatchSize() { public int getPullBatchSize() {
return pullBatchSize; return pullBatchSize;
} }
public void setPullBatchSize(int pullBatchSize) {
this.pullBatchSize = pullBatchSize;
}
public boolean isLongPull() { public boolean isLongPull() {
return longPull; return longPull;
} }
public void setLongPull(boolean longPull) {
this.longPull = longPull;
}
public CountDownLatch getCountDownLatch() { public CountDownLatch getCountDownLatch() {
return countDownLatch; return countDownLatch;
} }
public void setCountDownLatch(CountDownLatch countDownLatch) {
this.countDownLatch = countDownLatch;
}
public PullResult getPullResult() {
return pullResult;
}
public void setPullResult(PullResult pullResult) {
this.pullResult = pullResult;
}
public boolean isSuccessful() {
return successful;
}
public void setSuccessful(boolean successful) {
this.successful = successful;
}
} }
@ -367,10 +334,10 @@ public class RocketMQFirehoseFactory implements FirehoseFactory<ByteBufferInputR
* *
* <strong>Note: this is a single thread service.</strong> * <strong>Note: this is a single thread service.</strong>
*/ */
class DruidPullMessageService extends ServiceThread { final class DruidPullMessageService extends ServiceThread {
private volatile List<DruidPullRequest> requestsWrite = new ArrayList<DruidPullRequest>(); private volatile List<DruidPullRequest> requestsWrite = new ArrayList<>();
private volatile List<DruidPullRequest> requestsRead = new ArrayList<DruidPullRequest>(); private volatile List<DruidPullRequest> requestsRead = new ArrayList<>();
private final DefaultMQPullConsumer defaultMQPullConsumer; private final DefaultMQPullConsumer defaultMQPullConsumer;
@ -404,7 +371,7 @@ public class RocketMQFirehoseFactory implements FirehoseFactory<ByteBufferInputR
*/ */
private void doPull() { private void doPull() {
for (DruidPullRequest pullRequest : requestsRead) { for (DruidPullRequest pullRequest : requestsRead) {
PullResult pullResult = null; PullResult pullResult;
try { try {
if (!pullRequest.isLongPull()) { if (!pullRequest.isLongPull()) {
pullResult = defaultMQPullConsumer.pull( pullResult = defaultMQPullConsumer.pull(
@ -420,15 +387,29 @@ public class RocketMQFirehoseFactory implements FirehoseFactory<ByteBufferInputR
pullRequest.getPullBatchSize() pullRequest.getPullBatchSize()
); );
} }
pullRequest.setPullResult(pullResult);
pullRequest.setSuccessful(true);
if (!messageQueueTreeSetMap.keySet().contains(pullRequest.getMessageQueue())) { switch(pullResult.getPullStatus()) {
messageQueueTreeSetMap.putIfAbsent(pullRequest.getMessageQueue(), case FOUND:
new ConcurrentSkipListSet<>(new MessageComparator())); // Handle pull result.
if (!messageQueueTreeSetMap.keySet().contains(pullRequest.getMessageQueue())) {
messageQueueTreeSetMap.putIfAbsent(pullRequest.getMessageQueue(),
new ConcurrentSkipListSet<>(new MessageComparator()));
}
messageQueueTreeSetMap.get(pullRequest.getMessageQueue()).addAll(pullResult.getMsgFoundList());
break;
case NO_NEW_MSG:
case NO_MATCHED_MSG:
break;
case OFFSET_ILLEGAL:
LOGGER.error("Bad Pull Request: Offset is illegal. Offset used: {}",
pullRequest.getNextBeginOffset());
break;
default:
break;
} }
messageQueueTreeSetMap.get(pullRequest.getMessageQueue()).addAll(pullResult.getMsgFoundList());
} catch (MQClientException | RemotingException | MQBrokerException | InterruptedException e) { } catch (MQClientException | RemotingException | MQBrokerException | InterruptedException e) {
LOGGER.error("Failed to pull message from broker.", e); LOGGER.error("Failed to pull message from broker.", e);
} finally { } finally {
@ -475,7 +456,7 @@ public class RocketMQFirehoseFactory implements FirehoseFactory<ByteBufferInputR
/** /**
* Compare messages pulled from same message queue according to queue offset. * Compare messages pulled from same message queue according to queue offset.
*/ */
class MessageComparator implements Comparator<MessageExt> { final class MessageComparator implements Comparator<MessageExt> {
@Override @Override
public int compare(MessageExt lhs, MessageExt rhs) { public int compare(MessageExt lhs, MessageExt rhs) {
return lhs.getQueueOffset() < rhs.getQueueOffset() ? -1 : (lhs.getQueueOffset() == rhs.getQueueOffset() ? 0 : 1); return lhs.getQueueOffset() < rhs.getQueueOffset() ? -1 : (lhs.getQueueOffset() == rhs.getQueueOffset() ? 0 : 1);
@ -486,9 +467,9 @@ public class RocketMQFirehoseFactory implements FirehoseFactory<ByteBufferInputR
/** /**
* Handle message queues re-balance operations. * Handle message queues re-balance operations.
*/ */
class DruidMessageQueueListener implements MessageQueueListener { final class DruidMessageQueueListener implements MessageQueueListener {
private Set<String> topics; private final Set<String> topics;
private final ConcurrentHashMap<String, Set<MessageQueue>> topicQueueMap; private final ConcurrentHashMap<String, Set<MessageQueue>> topicQueueMap;