1. Add newline to pom.xml

2. Change RocketMQ to rocketMQ
3. Make swapRequests methods synchronized in all places.
4. Make comparator static and final and use Long.compare.
This commit is contained in:
Li Zhanhui 2016-01-12 18:03:16 +08:00
parent abe134bef6
commit dc47e80924
3 changed files with 11 additions and 13 deletions

View File

@ -47,4 +47,4 @@
</dependencies>
</project>
</project>

View File

@ -34,7 +34,7 @@ public class RocketMQDruidModule implements DruidModule {
return ImmutableList.of(
new SimpleModule("RocketMQFirehoseModule")
.registerSubtypes(
new NamedType(RocketMQFirehoseFactory.class, "RocketMQ")
new NamedType(RocketMQFirehoseFactory.class, "rocketMQ")
)
);
}

View File

@ -39,8 +39,7 @@ import io.druid.data.input.ByteBufferInputRowParser;
import io.druid.data.input.Firehose;
import io.druid.data.input.FirehoseFactory;
import io.druid.data.input.InputRow;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.metamx.common.logger.Logger;
import java.io.IOException;
import java.nio.ByteBuffer;
@ -51,7 +50,7 @@ import java.util.concurrent.CountDownLatch;
public class RocketMQFirehoseFactory implements FirehoseFactory<ByteBufferInputRowParser> {
private static final Logger LOGGER = LoggerFactory.getLogger(RocketMQFirehoseFactory.class);
private static final Logger LOGGER = new Logger(RocketMQFirehoseFactory.class);
/**
* Passed in configuration for consumer client.
@ -81,12 +80,13 @@ public class RocketMQFirehoseFactory implements FirehoseFactory<ByteBufferInputR
/**
* Store messages that are fetched from brokers but not yet delivered to druid via fire hose.
*/
private ConcurrentHashMap<MessageQueue, ConcurrentSkipListSet<MessageExt>> messageQueueTreeSetMap = new ConcurrentHashMap<>();
private final ConcurrentHashMap<MessageQueue, ConcurrentSkipListSet<MessageExt>> messageQueueTreeSetMap =
new ConcurrentHashMap<>();
/**
* Store message consuming status.
*/
private ConcurrentHashMap<MessageQueue, ConcurrentSkipListSet<Long>> windows = new ConcurrentHashMap<>();
private final ConcurrentHashMap<MessageQueue, ConcurrentSkipListSet<Long>> windows = new ConcurrentHashMap<>();
/**
* Default pull batch size.
@ -355,7 +355,7 @@ public class RocketMQFirehoseFactory implements FirehoseFactory<ByteBufferInputR
}
}
private void swapRequests() {
private synchronized void swapRequests() {
List<DruidPullRequest> tmp = requestsWrite;
requestsWrite = requestsRead;
requestsRead = tmp;
@ -438,9 +438,7 @@ public class RocketMQFirehoseFactory implements FirehoseFactory<ByteBufferInputR
LOGGER.error("", e);
}
synchronized (this) {
swapRequests();
}
swapRequests();
doPull();
LOGGER.info(getServiceName() + " terminated.");
@ -456,10 +454,10 @@ public class RocketMQFirehoseFactory implements FirehoseFactory<ByteBufferInputR
/**
* Compare messages pulled from same message queue according to queue offset.
*/
final class MessageComparator implements Comparator<MessageExt> {
static final class MessageComparator implements Comparator<MessageExt> {
@Override
public int compare(MessageExt lhs, MessageExt rhs) {
return lhs.getQueueOffset() < rhs.getQueueOffset() ? -1 : (lhs.getQueueOffset() == rhs.getQueueOffset() ? 0 : 1);
return Long.compare(lhs.getQueueOffset(), lhs.getQueueOffset());
}
}