Fix bugs in Google extensions and RocketMQ extension (#4340)

This commit is contained in:
Roman Leventov 2017-05-30 16:25:35 -05:00 committed by Fangjin Yang
parent 3400f601db
commit 9625993c9a
3 changed files with 33 additions and 25 deletions

View File

@ -38,7 +38,6 @@ import io.druid.data.input.ByteBufferInputRowParser;
import io.druid.data.input.Firehose;
import io.druid.data.input.FirehoseFactory;
import io.druid.data.input.InputRow;
import io.druid.java.util.common.collect.JavaCompatUtils;
import io.druid.java.util.common.logger.Logger;
import io.druid.java.util.common.parsers.ParseException;
@ -128,7 +127,7 @@ public class RocketMQFirehoseFactory implements FirehoseFactory<ByteBufferInputR
private boolean hasMessagesPending()
{
for (ConcurrentHashMap.Entry<MessageQueue, ConcurrentSkipListSet<MessageExt>> entry : messageQueueTreeSetMap.entrySet()) {
for (Map.Entry<MessageQueue, ConcurrentSkipListSet<MessageExt>> entry : messageQueueTreeSetMap.entrySet()) {
if (!entry.getValue().isEmpty()) {
return true;
}
@ -206,8 +205,8 @@ public class RocketMQFirehoseFactory implements FirehoseFactory<ByteBufferInputR
for (Map.Entry<String, Set<MessageQueue>> entry : topicQueueMap.entrySet()) {
for (MessageQueue messageQueue : entry.getValue()) {
if (JavaCompatUtils.keySet(messageQueueTreeSetMap).contains(messageQueue)
&& !messageQueueTreeSetMap.get(messageQueue).isEmpty()) {
ConcurrentSkipListSet<MessageExt> messages = messageQueueTreeSetMap.get(messageQueue);
if (messages != null && !messages.isEmpty()) {
hasMore = true;
} else {
try {
@ -255,10 +254,9 @@ public class RocketMQFirehoseFactory implements FirehoseFactory<ByteBufferInputR
MessageExt message = entry.getValue().pollFirst();
InputRow inputRow = theParser.parse(ByteBuffer.wrap(message.getBody()));
if (!JavaCompatUtils.keySet(windows).contains(entry.getKey())) {
windows.put(entry.getKey(), new ConcurrentSkipListSet<Long>());
}
windows.get(entry.getKey()).add(message.getQueueOffset());
windows
.computeIfAbsent(entry.getKey(), k -> new ConcurrentSkipListSet<>())
.add(message.getQueueOffset());
return inputRow;
}
}
@ -438,13 +436,9 @@ public class RocketMQFirehoseFactory implements FirehoseFactory<ByteBufferInputR
switch (pullResult.getPullStatus()) {
case FOUND:
// Handle pull result.
if (!JavaCompatUtils.keySet(messageQueueTreeSetMap).contains(pullRequest.getMessageQueue())) {
messageQueueTreeSetMap.putIfAbsent(
pullRequest.getMessageQueue(),
new ConcurrentSkipListSet<>(new MessageComparator())
);
}
messageQueueTreeSetMap.get(pullRequest.getMessageQueue()).addAll(pullResult.getMsgFoundList());
messageQueueTreeSetMap
.computeIfAbsent(pullRequest.getMessageQueue(), k -> new ConcurrentSkipListSet<>(MESSAGE_COMPARATOR))
.addAll(pullResult.getMsgFoundList());
break;
case NO_NEW_MSG:
@ -512,14 +506,7 @@ public class RocketMQFirehoseFactory implements FirehoseFactory<ByteBufferInputR
/**
* Compare messages pulled from same message queue according to queue offset.
*/
static final class MessageComparator implements Comparator<MessageExt>
{
@Override
public int compare(MessageExt lhs, MessageExt rhs)
{
return Long.compare(lhs.getQueueOffset(), lhs.getQueueOffset());
}
}
private static final Comparator<MessageExt> MESSAGE_COMPARATOR = Comparator.comparingLong(MessageExt::getQueueOffset);
/**

View File

@ -21,6 +21,8 @@ package io.druid.storage.google;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.api.client.http.InputStreamContent;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Strings;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableMap;
import com.google.inject.Inject;
@ -142,9 +144,10 @@ public class GoogleDataSegmentPusher implements DataSegmentPusher
}
}
public String buildPath(final String path)
@VisibleForTesting
String buildPath(final String path)
{
if (config.getPrefix() != "") {
if (!Strings.isNullOrEmpty(config.getPrefix())) {
return config.getPrefix() + "/" + path;
} else {
return path;

View File

@ -132,4 +132,22 @@ public class GoogleDataSegmentPusherTest extends EasyMockSupport
verifyAll();
}
@Test
public void testBuildPath()
{
GoogleAccountConfig config = new GoogleAccountConfig();
StringBuilder sb = new StringBuilder();
sb.setLength(0);
config.setPrefix(sb.toString()); // avoid cached empty string
GoogleDataSegmentPusher pusher = new GoogleDataSegmentPusher(
storage,
config,
jsonMapper
);
Assert.assertEquals("/path", pusher.buildPath("/path"));
config.setPrefix(null);
Assert.assertEquals("/path", pusher.buildPath("/path"));
}
}