diff --git a/codestyle/druid-forbidden-apis.txt b/codestyle/druid-forbidden-apis.txt index abc3d6e00f4..96db0882600 100644 --- a/codestyle/druid-forbidden-apis.txt +++ b/codestyle/druid-forbidden-apis.txt @@ -1,2 +1,4 @@ com.google.common.collect.MapMaker @ Create java.util.concurrent.ConcurrentHashMap directly -com.google.common.collect.Maps#newConcurrentMap() @ Create java.util.concurrent.ConcurrentHashMap directly \ No newline at end of file +com.google.common.collect.Maps#newConcurrentMap() @ Create java.util.concurrent.ConcurrentHashMap directly +com.google.common.util.concurrent.Futures#transform(com.google.common.util.concurrent.ListenableFuture, com.google.common.util.concurrent.AsyncFunction) @ Use io.druid.java.util.common.concurrent.ListenableFutures#transformAsync +com.google.common.collect.Iterators#emptyIterator() @ Use java.util.Collections#emptyIterator() \ No newline at end of file diff --git a/common/src/main/java/io/druid/common/utils/SerializerUtils.java b/common/src/main/java/io/druid/common/utils/SerializerUtils.java index 03dbc670649..652ab8c8525 100644 --- a/common/src/main/java/io/druid/common/utils/SerializerUtils.java +++ b/common/src/main/java/io/druid/common/utils/SerializerUtils.java @@ -61,7 +61,7 @@ public class SerializerUtils final int length = in.getInt(); return StringUtils.fromUtf8(readBytes(in, length)); } - + public byte[] readBytes(ByteBuffer in, int length) { byte[] bytes = new byte[length]; diff --git a/extensions-contrib/druid-rocketmq/src/main/java/io/druid/firehose/rocketmq/RocketMQFirehoseFactory.java b/extensions-contrib/druid-rocketmq/src/main/java/io/druid/firehose/rocketmq/RocketMQFirehoseFactory.java index c942b1d037d..5874298bf32 100644 --- a/extensions-contrib/druid-rocketmq/src/main/java/io/druid/firehose/rocketmq/RocketMQFirehoseFactory.java +++ b/extensions-contrib/druid-rocketmq/src/main/java/io/druid/firehose/rocketmq/RocketMQFirehoseFactory.java @@ -33,7 +33,6 @@ import com.alibaba.rocketmq.common.protocol.heartbeat.MessageModel; import com.alibaba.rocketmq.remoting.exception.RemotingException; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; -import com.google.common.collect.Iterators; import com.google.common.collect.Sets; import io.druid.data.input.Firehose; import io.druid.data.input.FirehoseFactory; @@ -48,6 +47,7 @@ import java.io.File; import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayList; +import java.util.Collections; import java.util.Comparator; import java.util.HashSet; import java.util.Iterator; @@ -199,7 +199,7 @@ public class RocketMQFirehoseFactory implements FirehoseFactory nextIterator = Iterators.emptyIterator(); + private Iterator nextIterator = Collections.emptyIterator(); @Override public boolean hasMore() diff --git a/extensions-contrib/kafka-eight-simpleConsumer/src/main/java/io/druid/firehose/kafka/KafkaEightSimpleConsumerFirehoseFactory.java b/extensions-contrib/kafka-eight-simpleConsumer/src/main/java/io/druid/firehose/kafka/KafkaEightSimpleConsumerFirehoseFactory.java index f593f2c5a27..c6533e6d105 100644 --- a/extensions-contrib/kafka-eight-simpleConsumer/src/main/java/io/druid/firehose/kafka/KafkaEightSimpleConsumerFirehoseFactory.java +++ b/extensions-contrib/kafka-eight-simpleConsumer/src/main/java/io/druid/firehose/kafka/KafkaEightSimpleConsumerFirehoseFactory.java @@ -22,7 +22,6 @@ package io.druid.firehose.kafka; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Preconditions; -import com.google.common.collect.Iterators; import com.google.common.collect.Maps; import com.google.common.io.Closeables; import io.druid.data.input.ByteBufferInputRowParser; @@ -38,6 +37,7 @@ import io.druid.java.util.emitter.EmittingLogger; import java.io.Closeable; import java.io.IOException; import java.nio.ByteBuffer; +import java.util.Collections; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -177,7 +177,7 @@ public class KafkaEightSimpleConsumerFirehoseFactory implements private volatile boolean stopped; private volatile BytesMessageWithOffset msg = null; private volatile InputRow row = null; - private volatile Iterator nextIterator = Iterators.emptyIterator(); + private volatile Iterator nextIterator = Collections.emptyIterator(); { lastOffsetPartitions = Maps.newHashMap(); @@ -212,7 +212,7 @@ public class KafkaEightSimpleConsumerFirehoseFactory implements msg = messageQueue.take(); final byte[] message = msg.message(); nextIterator = message == null - ? Iterators.emptyIterator() + ? Collections.emptyIterator() : firehoseParser.parseBatch(ByteBuffer.wrap(message)).iterator(); continue; } diff --git a/extensions-contrib/rabbitmq/src/main/java/io/druid/firehose/rabbitmq/RabbitMQFirehoseFactory.java b/extensions-contrib/rabbitmq/src/main/java/io/druid/firehose/rabbitmq/RabbitMQFirehoseFactory.java index d348f5e6ece..612f7ee54d6 100644 --- a/extensions-contrib/rabbitmq/src/main/java/io/druid/firehose/rabbitmq/RabbitMQFirehoseFactory.java +++ b/extensions-contrib/rabbitmq/src/main/java/io/druid/firehose/rabbitmq/RabbitMQFirehoseFactory.java @@ -21,7 +21,6 @@ package io.druid.firehose.rabbitmq; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; -import com.google.common.collect.Iterators; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; @@ -46,6 +45,7 @@ import javax.annotation.Nullable; import java.io.File; import java.io.IOException; import java.nio.ByteBuffer; +import java.util.Collections; import java.util.Iterator; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; @@ -205,7 +205,7 @@ public class RabbitMQFirehoseFactory implements FirehoseFactory nextIterator = Iterators.emptyIterator(); + private Iterator nextIterator = Collections.emptyIterator(); @Override public boolean hasMore() diff --git a/extensions-core/kafka-eight/src/main/java/io/druid/firehose/kafka/KafkaEightFirehoseFactory.java b/extensions-core/kafka-eight/src/main/java/io/druid/firehose/kafka/KafkaEightFirehoseFactory.java index 205d8b27c64..c50badd4c7c 100644 --- a/extensions-core/kafka-eight/src/main/java/io/druid/firehose/kafka/KafkaEightFirehoseFactory.java +++ b/extensions-core/kafka-eight/src/main/java/io/druid/firehose/kafka/KafkaEightFirehoseFactory.java @@ -23,7 +23,6 @@ package io.druid.firehose.kafka; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.collect.ImmutableMap; -import com.google.common.collect.Iterators; import com.google.common.collect.Sets; import io.druid.data.input.Firehose; import io.druid.data.input.FirehoseFactory; @@ -40,6 +39,7 @@ import kafka.message.InvalidMessageException; import javax.annotation.Nullable; import java.io.File; import java.nio.ByteBuffer; +import java.util.Collections; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -107,7 +107,7 @@ public class KafkaEightFirehoseFactory implements FirehoseFactory nextIterator = Iterators.emptyIterator(); + Iterator nextIterator = Collections.emptyIterator(); @Override public boolean hasMore() diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/JobHelper.java b/indexing-hadoop/src/main/java/io/druid/indexer/JobHelper.java index 16809561cbb..db44e01e422 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/JobHelper.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/JobHelper.java @@ -23,9 +23,7 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.google.common.base.Predicate; import com.google.common.base.Strings; import com.google.common.base.Throwables; -import com.google.common.io.ByteStreams; import com.google.common.io.Files; -import com.google.common.io.OutputSupplier; import io.druid.indexer.updater.HadoopDruidConverterConfig; import io.druid.java.util.common.DateTimes; import io.druid.java.util.common.FileUtils; @@ -90,6 +88,7 @@ public class JobHelper { return new Path(base, "classpath"); } + public static final String INDEX_ZIP = "index.zip"; public static final String DESCRIPTOR_JSON = "descriptor.json"; @@ -277,17 +276,9 @@ public class JobHelper static void uploadJar(File jarFile, final Path path, final FileSystem fs) throws IOException { log.info("Uploading jar to path[%s]", path); - ByteStreams.copy( - Files.newInputStreamSupplier(jarFile), - new OutputSupplier() - { - @Override - public OutputStream getOutput() throws IOException - { - return fs.create(path); - } - } - ); + try (OutputStream os = fs.create(path)) { + Files.asByteSource(jarFile).copyTo(os); + } } static boolean isSnapshot(File jarFile) @@ -562,8 +553,10 @@ public class JobHelper DataSegmentPusher dataSegmentPusher ) { - return new Path(prependFSIfNullScheme(fs, basePath), - dataSegmentPusher.makeIndexPathName(segmentTemplate, baseFileName)); + return new Path( + prependFSIfNullScheme(fs, basePath), + dataSegmentPusher.makeIndexPathName(segmentTemplate, baseFileName) + ); } public static Path makeTmpPath( @@ -576,9 +569,10 @@ public class JobHelper { return new Path( prependFSIfNullScheme(fs, basePath), - StringUtils.format("./%s.%d", - dataSegmentPusher.makeIndexPathName(segmentTemplate, JobHelper.INDEX_ZIP), - taskAttemptID.getId() + StringUtils.format( + "./%s.%d", + dataSegmentPusher.makeIndexPathName(segmentTemplate, JobHelper.INDEX_ZIP), + taskAttemptID.getId() ) ); } diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java index 4af077fe501..3181b252544 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java @@ -46,6 +46,7 @@ import io.druid.indexing.common.index.RealtimeAppenderatorTuningConfig; import io.druid.java.util.common.DateTimes; import io.druid.java.util.common.ISE; import io.druid.java.util.common.StringUtils; +import io.druid.java.util.common.concurrent.ListenableFutures; import io.druid.java.util.common.guava.CloseQuietly; import io.druid.java.util.common.parsers.ParseException; import io.druid.java.util.emitter.EmittingLogger; @@ -388,9 +389,9 @@ public class AppenderatorDriverRealtimeIndexTask extends AbstractTask /** * Is a firehose from this factory drainable by closing it? If so, we should drain on stopGracefully rather than * abruptly stopping. - * + *

* This is a hack to get around the fact that the Firehose and FirehoseFactory interfaces do not help us do this. - * + *

* Protected for tests. */ protected boolean isFirehoseDrainableByClosing(FirehoseFactory firehoseFactory) @@ -431,19 +432,16 @@ public class AppenderatorDriverRealtimeIndexTask extends AbstractTask String sequenceName ) { - ListenableFuture publishFuture = driver.publish( + final ListenableFuture publishFuture = driver.publish( publisher, committerSupplier.get(), Collections.singletonList(sequenceName) ); - - ListenableFuture handoffFuture = Futures.transform(publishFuture, driver::registerHandoff); - - pendingHandoffs.add(handoffFuture); + pendingHandoffs.add(ListenableFutures.transformAsync(publishFuture, driver::registerHandoff)); } private void waitForSegmentPublishAndHandoff(long timeout) throws InterruptedException, ExecutionException, - TimeoutException + TimeoutException { if (!pendingHandoffs.isEmpty()) { ListenableFuture allHandoffs = Futures.allAsList(pendingHandoffs); diff --git a/indexing-service/src/test/java/io/druid/indexing/common/tasklogs/FileTaskLogsTest.java b/indexing-service/src/test/java/io/druid/indexing/common/tasklogs/FileTaskLogsTest.java index 066f01d05ae..cb7b2282a68 100644 --- a/indexing-service/src/test/java/io/druid/indexing/common/tasklogs/FileTaskLogsTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/common/tasklogs/FileTaskLogsTest.java @@ -59,7 +59,7 @@ public class FileTaskLogsTest final Map expected = ImmutableMap.of(0L, "blah", 1L, "lah", -2L, "ah", -5L, "blah"); for (Map.Entry entry : expected.entrySet()) { - final byte[] bytes = ByteStreams.toByteArray(taskLogs.streamTaskLog("foo", entry.getKey()).get().getInput()); + final byte[] bytes = ByteStreams.toByteArray(taskLogs.streamTaskLog("foo", entry.getKey()).get().openStream()); final String string = StringUtils.fromUtf8(bytes); Assert.assertEquals(StringUtils.format("Read with offset %,d", entry.getKey()), string, entry.getValue()); } diff --git a/java-util/src/main/java/io/druid/java/util/common/concurrent/ListenableFutures.java b/java-util/src/main/java/io/druid/java/util/common/concurrent/ListenableFutures.java new file mode 100644 index 00000000000..44cda890382 --- /dev/null +++ b/java-util/src/main/java/io/druid/java/util/common/concurrent/ListenableFutures.java @@ -0,0 +1,75 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.java.util.common.concurrent; + +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.SettableFuture; + +import javax.annotation.Nullable; +import java.util.function.Function; + +public class ListenableFutures +{ + /** + * Guava 19 changes the Futures.transform signature so that the async form is different. This is here as a + * compatability layer until such a time as druid only supports Guava 19 or later, in which case + * Futures.transformAsync should be used + * + * This is NOT copied from guava. + */ + public static ListenableFuture transformAsync( + final ListenableFuture inFuture, + final Function> transform + ) + { + final SettableFuture finalFuture = SettableFuture.create(); + Futures.addCallback(inFuture, new FutureCallback() + { + @Override + public void onSuccess(@Nullable I result) + { + final ListenableFuture transformFuture = transform.apply(result); + Futures.addCallback(transformFuture, new FutureCallback() + { + @Override + public void onSuccess(@Nullable O result) + { + finalFuture.set(result); + } + + @Override + public void onFailure(Throwable t) + { + finalFuture.setException(t); + } + }); + } + + @Override + public void onFailure(Throwable t) + { + finalFuture.setException(t); + } + }); + return finalFuture; + } +} diff --git a/pom.xml b/pom.xml index 77b17ff454d..095d58e2e2b 100644 --- a/pom.xml +++ b/pom.xml @@ -352,6 +352,11 @@ guice-multibindings ${guice.version} + + com.google.errorprone + error_prone_annotations + 2.2.0 + com.ibm.icu icu4j diff --git a/processing/pom.xml b/processing/pom.xml index 220abc4192f..accd83ebe64 100644 --- a/processing/pom.xml +++ b/processing/pom.xml @@ -82,6 +82,10 @@ commons-io commons-io + + com.google.errorprone + error_prone_annotations + com.ibm.icu icu4j diff --git a/processing/src/main/java/io/druid/query/groupby/GroupByQueryEngine.java b/processing/src/main/java/io/druid/query/groupby/GroupByQueryEngine.java index c2c1322b999..9e87e76e2b8 100644 --- a/processing/src/main/java/io/druid/query/groupby/GroupByQueryEngine.java +++ b/processing/src/main/java/io/druid/query/groupby/GroupByQueryEngine.java @@ -22,7 +22,6 @@ package io.druid.query.groupby; import com.google.common.base.Function; import com.google.common.base.Supplier; import com.google.common.collect.ImmutableList; -import com.google.common.collect.Iterators; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.inject.Inject; @@ -57,6 +56,7 @@ import javax.annotation.Nullable; import java.io.Closeable; import java.nio.ByteBuffer; import java.util.ArrayList; +import java.util.Collections; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -320,7 +320,7 @@ public class GroupByQueryEngine this.maxIntermediateRows = querySpecificConfig.getMaxIntermediateRows(); unprocessedKeys = null; - delegate = Iterators.emptyIterator(); + delegate = Collections.emptyIterator(); dimensionSpecs = query.getDimensions(); dimensions = Lists.newArrayListWithExpectedSize(dimensionSpecs.size()); dimNames = Lists.newArrayListWithExpectedSize(dimensionSpecs.size()); diff --git a/processing/src/main/java/io/druid/query/groupby/epinephelinae/BufferHashGrouper.java b/processing/src/main/java/io/druid/query/groupby/epinephelinae/BufferHashGrouper.java index 2dfff2c9308..894e78ea36a 100644 --- a/processing/src/main/java/io/druid/query/groupby/epinephelinae/BufferHashGrouper.java +++ b/processing/src/main/java/io/druid/query/groupby/epinephelinae/BufferHashGrouper.java @@ -20,7 +20,6 @@ package io.druid.query.groupby.epinephelinae; import com.google.common.base.Supplier; -import com.google.common.collect.Iterators; import io.druid.java.util.common.CloseableIterators; import io.druid.java.util.common.IAE; import io.druid.java.util.common.parsers.CloseableIterator; @@ -170,7 +169,7 @@ public class BufferHashGrouper extends AbstractBufferHashGrouper>emptyIterator()); + return CloseableIterators.withEmptyBaggage(Collections.>emptyIterator()); } if (sorted) { diff --git a/processing/src/main/java/io/druid/query/groupby/epinephelinae/LimitedBufferHashGrouper.java b/processing/src/main/java/io/druid/query/groupby/epinephelinae/LimitedBufferHashGrouper.java index 3058e17c4a1..f6e3e4daa66 100644 --- a/processing/src/main/java/io/druid/query/groupby/epinephelinae/LimitedBufferHashGrouper.java +++ b/processing/src/main/java/io/druid/query/groupby/epinephelinae/LimitedBufferHashGrouper.java @@ -20,7 +20,6 @@ package io.druid.query.groupby.epinephelinae; import com.google.common.base.Supplier; -import com.google.common.collect.Iterators; import io.druid.java.util.common.CloseableIterators; import io.druid.java.util.common.IAE; import io.druid.java.util.common.ISE; @@ -205,7 +204,7 @@ public class LimitedBufferHashGrouper extends AbstractBufferHashGrouper // it's possible for iterator() to be called before initialization when // a nested groupBy's subquery has an empty result set (see testEmptySubqueryWithLimitPushDown() // in GroupByQueryRunnerTest) - return CloseableIterators.withEmptyBaggage(Iterators.>emptyIterator()); + return CloseableIterators.withEmptyBaggage(Collections.>emptyIterator()); } if (sortHasNonGroupingFields) { @@ -377,6 +376,7 @@ public class LimitedBufferHashGrouper extends AbstractBufferHashGrouper aggregatorFactories, aggregatorOffsets ); + @Override public int compare(Integer o1, Integer o2) { @@ -452,7 +452,7 @@ public class LimitedBufferHashGrouper extends AbstractBufferHashGrouper subHashTable2Buffer.limit(tableArenaSize); subHashTable2Buffer = subHashTable2Buffer.slice(); - subHashTableBuffers = new ByteBuffer[] {subHashTable1Buffer, subHashTable2Buffer}; + subHashTableBuffers = new ByteBuffer[]{subHashTable1Buffer, subHashTable2Buffer}; } @Override diff --git a/processing/src/main/java/io/druid/query/groupby/orderby/TopNSequence.java b/processing/src/main/java/io/druid/query/groupby/orderby/TopNSequence.java index 574204d7a51..0bb5f6aab25 100644 --- a/processing/src/main/java/io/druid/query/groupby/orderby/TopNSequence.java +++ b/processing/src/main/java/io/druid/query/groupby/orderby/TopNSequence.java @@ -19,13 +19,13 @@ package io.druid.query.groupby.orderby; -import com.google.common.collect.Iterators; import com.google.common.collect.MinMaxPriorityQueue; import com.google.common.collect.Ordering; import io.druid.java.util.common.guava.Accumulator; import io.druid.java.util.common.guava.BaseSequence; import io.druid.java.util.common.guava.Sequence; +import java.util.Collections; import java.util.Iterator; public class TopNSequence extends BaseSequence> @@ -43,7 +43,7 @@ public class TopNSequence extends BaseSequence> public Iterator make() { if (limit <= 0) { - return Iterators.emptyIterator(); + return Collections.emptyIterator(); } // Materialize the topN values diff --git a/server/src/main/java/io/druid/client/CachingClusteredClient.java b/server/src/main/java/io/druid/client/CachingClusteredClient.java index 0fd9762dc7e..1e2e441949d 100644 --- a/server/src/main/java/io/druid/client/CachingClusteredClient.java +++ b/server/src/main/java/io/druid/client/CachingClusteredClient.java @@ -26,7 +26,6 @@ import com.google.common.base.Function; import com.google.common.base.Optional; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Iterables; -import com.google.common.collect.Iterators; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Ordering; @@ -40,21 +39,21 @@ import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; import com.google.inject.Inject; -import io.druid.java.util.emitter.EmittingLogger; import io.druid.client.cache.Cache; import io.druid.client.cache.CacheConfig; import io.druid.client.selector.QueryableDruidServer; import io.druid.client.selector.ServerSelector; -import io.druid.java.util.common.concurrent.Execs; import io.druid.guice.annotations.BackgroundCaching; import io.druid.guice.annotations.Smile; import io.druid.java.util.common.Intervals; import io.druid.java.util.common.Pair; import io.druid.java.util.common.StringUtils; +import io.druid.java.util.common.concurrent.Execs; import io.druid.java.util.common.guava.BaseSequence; import io.druid.java.util.common.guava.LazySequence; import io.druid.java.util.common.guava.Sequence; import io.druid.java.util.common.guava.Sequences; +import io.druid.java.util.emitter.EmittingLogger; import io.druid.query.BySegmentResultValueClass; import io.druid.query.CacheStrategy; import io.druid.query.Query; @@ -255,7 +254,8 @@ public class CachingClusteredClient implements QuerySegmentWalker Sequence run(final UnaryOperator> timelineConverter) { - @Nullable TimelineLookup timeline = serverView.getTimeline(query.getDataSource()); + @Nullable + TimelineLookup timeline = serverView.getTimeline(query.getDataSource()); if (timeline == null) { return Sequences.empty(); } @@ -265,10 +265,13 @@ public class CachingClusteredClient implements QuerySegmentWalker } final Set segments = computeSegmentsToQuery(timeline); - @Nullable final byte[] queryCacheKey = computeQueryCacheKey(); + @Nullable + final byte[] queryCacheKey = computeQueryCacheKey(); if (query.getContext().get(QueryResource.HEADER_IF_NONE_MATCH) != null) { - @Nullable final String prevEtag = (String) query.getContext().get(QueryResource.HEADER_IF_NONE_MATCH); - @Nullable final String currentEtag = computeCurrentEtag(segments, queryCacheKey); + @Nullable + final String prevEtag = (String) query.getContext().get(QueryResource.HEADER_IF_NONE_MATCH); + @Nullable + final String currentEtag = computeCurrentEtag(segments, queryCacheKey); if (currentEtag != null && currentEtag.equals(prevEtag)) { return Sequences.empty(); } @@ -509,7 +512,7 @@ public class CachingClusteredClient implements QuerySegmentWalker { try { if (cachedResult.length == 0) { - return Iterators.emptyIterator(); + return Collections.emptyIterator(); } return objectMapper.readValues( diff --git a/server/src/main/java/io/druid/client/CachingQueryRunner.java b/server/src/main/java/io/druid/client/CachingQueryRunner.java index 17df72a5996..54ecc5c35a9 100644 --- a/server/src/main/java/io/druid/client/CachingQueryRunner.java +++ b/server/src/main/java/io/druid/client/CachingQueryRunner.java @@ -23,7 +23,6 @@ import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Function; import com.google.common.base.Throwables; -import com.google.common.collect.Iterators; import com.google.common.collect.Lists; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; @@ -117,7 +116,7 @@ public class CachingQueryRunner implements QueryRunner { try { if (cachedResult.length == 0) { - return Iterators.emptyIterator(); + return Collections.emptyIterator(); } return mapper.readValues( diff --git a/server/src/main/java/io/druid/segment/realtime/appenderator/BatchAppenderatorDriver.java b/server/src/main/java/io/druid/segment/realtime/appenderator/BatchAppenderatorDriver.java index c67b31054bf..73fa2c48ff4 100644 --- a/server/src/main/java/io/druid/segment/realtime/appenderator/BatchAppenderatorDriver.java +++ b/server/src/main/java/io/druid/segment/realtime/appenderator/BatchAppenderatorDriver.java @@ -22,10 +22,10 @@ package io.druid.segment.realtime.appenderator; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; -import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import io.druid.data.input.InputRow; import io.druid.java.util.common.ISE; +import io.druid.java.util.common.concurrent.ListenableFutures; import io.druid.segment.realtime.appenderator.SegmentWithState.SegmentState; import io.druid.timeline.DataSegment; @@ -41,11 +41,11 @@ import java.util.stream.Collectors; /** * This class is specifialized for batch ingestion. In batch ingestion, the segment lifecycle is like: - * + *

*

  * APPENDING -> PUSHED_AND_DROPPED -> PUBLISHED
  * 
- * + *

*

    *
  • APPENDING: Segment is available for appending.
  • *
  • PUSHED_AND_DROPPED: Segment is pushed to deep storage and dropped from the local storage.
  • @@ -57,9 +57,9 @@ public class BatchAppenderatorDriver extends BaseAppenderatorDriver /** * Create a driver. * - * @param appenderator appenderator - * @param segmentAllocator segment allocator - * @param usedSegmentChecker used segment checker + * @param appenderator appenderator + * @param segmentAllocator segment allocator + * @param usedSegmentChecker used segment checker */ public BatchAppenderatorDriver( Appenderator appenderator, @@ -72,7 +72,7 @@ public class BatchAppenderatorDriver extends BaseAppenderatorDriver /** * This method always returns null because batch ingestion doesn't support restoring tasks on failures. - + * * @return always null */ @Override @@ -132,7 +132,7 @@ public class BatchAppenderatorDriver extends BaseAppenderatorDriver .map(SegmentWithState::getSegmentIdentifier) .collect(Collectors.toList()); - final ListenableFuture future = Futures.transform( + final ListenableFuture future = ListenableFutures.transformAsync( pushInBackground(null, segmentIdentifierList), this::dropInBackground ); @@ -195,10 +195,12 @@ public class BatchAppenderatorDriver extends BaseAppenderatorDriver .values() .stream() .flatMap(SegmentsForSequence::segmentStateStream) - .map(segmentWithState -> Preconditions.checkNotNull( - segmentWithState.getDataSegment(), - "dataSegment for segmentId[%s]", - segmentWithState.getSegmentIdentifier()) + .map(segmentWithState -> Preconditions + .checkNotNull( + segmentWithState.getDataSegment(), + "dataSegment for segmentId[%s]", + segmentWithState.getSegmentIdentifier() + ) ) .collect(Collectors.toList()), null diff --git a/server/src/main/java/io/druid/segment/realtime/appenderator/StreamAppenderatorDriver.java b/server/src/main/java/io/druid/segment/realtime/appenderator/StreamAppenderatorDriver.java index ab2c0262e9f..3f7fdde3291 100644 --- a/server/src/main/java/io/druid/segment/realtime/appenderator/StreamAppenderatorDriver.java +++ b/server/src/main/java/io/druid/segment/realtime/appenderator/StreamAppenderatorDriver.java @@ -24,7 +24,6 @@ import com.google.common.base.Function; import com.google.common.base.Preconditions; import com.google.common.base.Supplier; import com.google.common.base.Throwables; -import com.google.common.util.concurrent.AsyncFunction; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; @@ -33,6 +32,7 @@ import com.google.common.util.concurrent.SettableFuture; import io.druid.data.input.Committer; import io.druid.data.input.InputRow; import io.druid.java.util.common.ISE; +import io.druid.java.util.common.concurrent.ListenableFutures; import io.druid.java.util.common.logger.Logger; import io.druid.query.SegmentDescriptor; import io.druid.segment.realtime.FireDepartmentMetrics; @@ -210,7 +210,7 @@ public class StreamAppenderatorDriver extends BaseAppenderatorDriver /** * Persist all data indexed through this driver so far. Blocks until complete. - *

    + * * Should be called after all data has been added through {@link #add(InputRow, String, Supplier, boolean, boolean)}. * * @param committer committer representing all data that has been added so far @@ -236,7 +236,7 @@ public class StreamAppenderatorDriver extends BaseAppenderatorDriver /** * Persist all data indexed through this driver so far. Returns a future of persisted commitMetadata. - *

    + * * Should be called after all data has been added through {@link #add(InputRow, String, Supplier, boolean, boolean)}. * * @param committer committer representing all data that has been added so far @@ -269,21 +269,20 @@ public class StreamAppenderatorDriver extends BaseAppenderatorDriver .map(SegmentWithState::getSegmentIdentifier) .collect(Collectors.toList()); - final ListenableFuture publishFuture = Futures.transform( + final ListenableFuture publishFuture = ListenableFutures.transformAsync( pushInBackground(wrapCommitter(committer), theSegments), - (AsyncFunction) segmentsAndMetadata -> publishInBackground( - segmentsAndMetadata, + sam -> publishInBackground( + sam, publisher ) ); - return Futures.transform( publishFuture, - (Function) segmentsAndMetadata -> { + (Function) sam -> { synchronized (segments) { sequenceNames.forEach(segments::remove); } - return segmentsAndMetadata; + return sam; } ); } @@ -378,7 +377,7 @@ public class StreamAppenderatorDriver extends BaseAppenderatorDriver final Collection sequenceNames ) { - return Futures.transform( + return ListenableFutures.transformAsync( publish(publisher, committer, sequenceNames), this::registerHandoff ); diff --git a/server/src/main/java/io/druid/segment/realtime/firehose/IrcFirehoseFactory.java b/server/src/main/java/io/druid/segment/realtime/firehose/IrcFirehoseFactory.java index dcd233463a7..c1239e9a0cb 100644 --- a/server/src/main/java/io/druid/segment/realtime/firehose/IrcFirehoseFactory.java +++ b/server/src/main/java/io/druid/segment/realtime/firehose/IrcFirehoseFactory.java @@ -21,7 +21,6 @@ package io.druid.segment.realtime.firehose; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; -import com.google.common.collect.Iterators; import com.google.common.collect.Lists; import com.ircclouds.irc.api.Callback; import com.ircclouds.irc.api.IRCApi; @@ -42,6 +41,7 @@ import org.joda.time.DateTime; import javax.annotation.Nullable; import java.io.File; +import java.util.Collections; import java.util.Iterator; import java.util.List; import java.util.UUID; @@ -188,7 +188,7 @@ public class IrcFirehoseFactory implements FirehoseFactory nextIterator = Iterators.emptyIterator(); + Iterator nextIterator = Collections.emptyIterator(); @Override public boolean hasMore()