From 9625993c9ace6e0e2934d5e38fb0d67937f23ef4 Mon Sep 17 00:00:00 2001 From: Roman Leventov Date: Tue, 30 May 2017 16:25:35 -0500 Subject: [PATCH] Fix bugs in Google extensions and RocketMQ extension (#4340) --- .../rocketmq/RocketMQFirehoseFactory.java | 33 ++++++------------- .../google/GoogleDataSegmentPusher.java | 7 ++-- .../google/GoogleDataSegmentPusherTest.java | 18 ++++++++++ 3 files changed, 33 insertions(+), 25 deletions(-) diff --git a/extensions-contrib/druid-rocketmq/src/main/java/io/druid/firehose/rocketmq/RocketMQFirehoseFactory.java b/extensions-contrib/druid-rocketmq/src/main/java/io/druid/firehose/rocketmq/RocketMQFirehoseFactory.java index 82af3a159c9..1bd78f8a302 100644 --- a/extensions-contrib/druid-rocketmq/src/main/java/io/druid/firehose/rocketmq/RocketMQFirehoseFactory.java +++ b/extensions-contrib/druid-rocketmq/src/main/java/io/druid/firehose/rocketmq/RocketMQFirehoseFactory.java @@ -38,7 +38,6 @@ import io.druid.data.input.ByteBufferInputRowParser; import io.druid.data.input.Firehose; import io.druid.data.input.FirehoseFactory; import io.druid.data.input.InputRow; -import io.druid.java.util.common.collect.JavaCompatUtils; import io.druid.java.util.common.logger.Logger; import io.druid.java.util.common.parsers.ParseException; @@ -128,7 +127,7 @@ public class RocketMQFirehoseFactory implements FirehoseFactory> entry : messageQueueTreeSetMap.entrySet()) { + for (Map.Entry> entry : messageQueueTreeSetMap.entrySet()) { if (!entry.getValue().isEmpty()) { return true; } @@ -206,8 +205,8 @@ public class RocketMQFirehoseFactory implements FirehoseFactory> entry : topicQueueMap.entrySet()) { for (MessageQueue messageQueue : entry.getValue()) { - if (JavaCompatUtils.keySet(messageQueueTreeSetMap).contains(messageQueue) - && !messageQueueTreeSetMap.get(messageQueue).isEmpty()) { + ConcurrentSkipListSet messages = messageQueueTreeSetMap.get(messageQueue); + if (messages != null && !messages.isEmpty()) { hasMore = true; } else { try { @@ -255,10 +254,9 @@ public class RocketMQFirehoseFactory implements FirehoseFactory()); - } - windows.get(entry.getKey()).add(message.getQueueOffset()); + windows + .computeIfAbsent(entry.getKey(), k -> new ConcurrentSkipListSet<>()) + .add(message.getQueueOffset()); return inputRow; } } @@ -438,13 +436,9 @@ public class RocketMQFirehoseFactory implements FirehoseFactory(new MessageComparator()) - ); - } - messageQueueTreeSetMap.get(pullRequest.getMessageQueue()).addAll(pullResult.getMsgFoundList()); + messageQueueTreeSetMap + .computeIfAbsent(pullRequest.getMessageQueue(), k -> new ConcurrentSkipListSet<>(MESSAGE_COMPARATOR)) + .addAll(pullResult.getMsgFoundList()); break; case NO_NEW_MSG: @@ -512,14 +506,7 @@ public class RocketMQFirehoseFactory implements FirehoseFactory - { - @Override - public int compare(MessageExt lhs, MessageExt rhs) - { - return Long.compare(lhs.getQueueOffset(), lhs.getQueueOffset()); - } - } + private static final Comparator MESSAGE_COMPARATOR = Comparator.comparingLong(MessageExt::getQueueOffset); /** diff --git a/extensions-contrib/google-extensions/src/main/java/io/druid/storage/google/GoogleDataSegmentPusher.java b/extensions-contrib/google-extensions/src/main/java/io/druid/storage/google/GoogleDataSegmentPusher.java index 8bd9fbb8ee7..5570cdd9d9e 100644 --- a/extensions-contrib/google-extensions/src/main/java/io/druid/storage/google/GoogleDataSegmentPusher.java +++ b/extensions-contrib/google-extensions/src/main/java/io/druid/storage/google/GoogleDataSegmentPusher.java @@ -21,6 +21,8 @@ package io.druid.storage.google; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.api.client.http.InputStreamContent; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Strings; import com.google.common.base.Throwables; import com.google.common.collect.ImmutableMap; import com.google.inject.Inject; @@ -142,9 +144,10 @@ public class GoogleDataSegmentPusher implements DataSegmentPusher } } - public String buildPath(final String path) + @VisibleForTesting + String buildPath(final String path) { - if (config.getPrefix() != "") { + if (!Strings.isNullOrEmpty(config.getPrefix())) { return config.getPrefix() + "/" + path; } else { return path; diff --git a/extensions-contrib/google-extensions/src/test/java/io/druid/storage/google/GoogleDataSegmentPusherTest.java b/extensions-contrib/google-extensions/src/test/java/io/druid/storage/google/GoogleDataSegmentPusherTest.java index 862974c513b..d85d57eb101 100644 --- a/extensions-contrib/google-extensions/src/test/java/io/druid/storage/google/GoogleDataSegmentPusherTest.java +++ b/extensions-contrib/google-extensions/src/test/java/io/druid/storage/google/GoogleDataSegmentPusherTest.java @@ -132,4 +132,22 @@ public class GoogleDataSegmentPusherTest extends EasyMockSupport verifyAll(); } + + @Test + public void testBuildPath() + { + GoogleAccountConfig config = new GoogleAccountConfig(); + StringBuilder sb = new StringBuilder(); + sb.setLength(0); + config.setPrefix(sb.toString()); // avoid cached empty string + GoogleDataSegmentPusher pusher = new GoogleDataSegmentPusher( + storage, + config, + jsonMapper + ); + Assert.assertEquals("/path", pusher.buildPath("/path")); + config.setPrefix(null); + Assert.assertEquals("/path", pusher.buildPath("/path")); + } + }