From dc47e809243289f3d77adc73c3c311973897f5ae Mon Sep 17 00:00:00 2001 From: Li Zhanhui Date: Tue, 12 Jan 2016 18:03:16 +0800 Subject: [PATCH] 1. Add newline to pom.xml 2. Change RocketMQ to rocketMQ 3. Make swapRequests methods synchronized in all places. 4. Make comparator static and final and use Long.compare. --- extensions/druid-rocketmq/pom.xml | 2 +- .../rocketmq/RocketMQDruidModule.java | 2 +- .../rocketmq/RocketMQFirehoseFactory.java | 20 +++++++++---------- 3 files changed, 11 insertions(+), 13 deletions(-) diff --git a/extensions/druid-rocketmq/pom.xml b/extensions/druid-rocketmq/pom.xml index a02bd191467..4f4930eb67c 100644 --- a/extensions/druid-rocketmq/pom.xml +++ b/extensions/druid-rocketmq/pom.xml @@ -47,4 +47,4 @@ - \ No newline at end of file + diff --git a/extensions/druid-rocketmq/src/main/java/io/druid/firehose/rocketmq/RocketMQDruidModule.java b/extensions/druid-rocketmq/src/main/java/io/druid/firehose/rocketmq/RocketMQDruidModule.java index f902ea739be..e45c1c254af 100644 --- a/extensions/druid-rocketmq/src/main/java/io/druid/firehose/rocketmq/RocketMQDruidModule.java +++ b/extensions/druid-rocketmq/src/main/java/io/druid/firehose/rocketmq/RocketMQDruidModule.java @@ -34,7 +34,7 @@ public class RocketMQDruidModule implements DruidModule { return ImmutableList.of( new SimpleModule("RocketMQFirehoseModule") .registerSubtypes( - new NamedType(RocketMQFirehoseFactory.class, "RocketMQ") + new NamedType(RocketMQFirehoseFactory.class, "rocketMQ") ) ); } diff --git a/extensions/druid-rocketmq/src/main/java/io/druid/firehose/rocketmq/RocketMQFirehoseFactory.java b/extensions/druid-rocketmq/src/main/java/io/druid/firehose/rocketmq/RocketMQFirehoseFactory.java index c2bd0b67b9f..430b79e5841 100644 --- a/extensions/druid-rocketmq/src/main/java/io/druid/firehose/rocketmq/RocketMQFirehoseFactory.java +++ b/extensions/druid-rocketmq/src/main/java/io/druid/firehose/rocketmq/RocketMQFirehoseFactory.java @@ -39,8 +39,7 @@ import io.druid.data.input.ByteBufferInputRowParser; import io.druid.data.input.Firehose; import io.druid.data.input.FirehoseFactory; import io.druid.data.input.InputRow; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import com.metamx.common.logger.Logger; import java.io.IOException; import java.nio.ByteBuffer; @@ -51,7 +50,7 @@ import java.util.concurrent.CountDownLatch; public class RocketMQFirehoseFactory implements FirehoseFactory { - private static final Logger LOGGER = LoggerFactory.getLogger(RocketMQFirehoseFactory.class); + private static final Logger LOGGER = new Logger(RocketMQFirehoseFactory.class); /** * Passed in configuration for consumer client. @@ -81,12 +80,13 @@ public class RocketMQFirehoseFactory implements FirehoseFactory> messageQueueTreeSetMap = new ConcurrentHashMap<>(); + private final ConcurrentHashMap> messageQueueTreeSetMap = + new ConcurrentHashMap<>(); /** * Store message consuming status. */ - private ConcurrentHashMap> windows = new ConcurrentHashMap<>(); + private final ConcurrentHashMap> windows = new ConcurrentHashMap<>(); /** * Default pull batch size. @@ -355,7 +355,7 @@ public class RocketMQFirehoseFactory implements FirehoseFactory tmp = requestsWrite; requestsWrite = requestsRead; requestsRead = tmp; @@ -438,9 +438,7 @@ public class RocketMQFirehoseFactory implements FirehoseFactory { + static final class MessageComparator implements Comparator { @Override public int compare(MessageExt lhs, MessageExt rhs) { - return lhs.getQueueOffset() < rhs.getQueueOffset() ? -1 : (lhs.getQueueOffset() == rhs.getQueueOffset() ? 0 : 1); + return Long.compare(lhs.getQueueOffset(), lhs.getQueueOffset()); } }