Future-proof some Guava usage (#5414)

* Future-proof some Guava usage

* Use a java-util EmptyIterator instead of Guava's
* Change some of the guava future handling to do manual async
transforms. Guava changes transform into transformAsync by deprecating
transform in ONLY Guava 19. Then its gone in 20

* Use `Collections.emptyIterator()`

* Pretty formatting

* Make listenable future transforms a thing in default druid

* Format fix

* Add forbidden guava apis

* Make the ListenableFutrues.transformAsync have comments

* Undo intellij bad pattern matching in comments

* Futrues --> Futures

* Add empty iterators forbidding

* Fix extra `A`

* Correct method signature

* Address review comments

* Finish Gian review comments

* Proper syntax from https://github.com/policeman-tools/forbidden-apis/wiki/SignaturesSyntax
This commit is contained in:
Charles Allen 2018-03-20 08:59:33 -07:00 committed by Gian Merlino
parent 17c71a2a60
commit 58f110f7f8
21 changed files with 161 additions and 81 deletions

View File

@ -1,2 +1,4 @@
com.google.common.collect.MapMaker @ Create java.util.concurrent.ConcurrentHashMap directly
com.google.common.collect.Maps#newConcurrentMap() @ Create java.util.concurrent.ConcurrentHashMap directly
com.google.common.util.concurrent.Futures#transform(com.google.common.util.concurrent.ListenableFuture, com.google.common.util.concurrent.AsyncFunction) @ Use io.druid.java.util.common.concurrent.ListenableFutures#transformAsync
com.google.common.collect.Iterators#emptyIterator() @ Use java.util.Collections#emptyIterator()

View File

@ -33,7 +33,6 @@ import com.alibaba.rocketmq.common.protocol.heartbeat.MessageModel;
import com.alibaba.rocketmq.remoting.exception.RemotingException;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.Iterators;
import com.google.common.collect.Sets;
import io.druid.data.input.Firehose;
import io.druid.data.input.FirehoseFactory;
@ -48,6 +47,7 @@ import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashSet;
import java.util.Iterator;
@ -199,7 +199,7 @@ public class RocketMQFirehoseFactory implements FirehoseFactory<InputRowParser<B
return new Firehose()
{
private Iterator<InputRow> nextIterator = Iterators.emptyIterator();
private Iterator<InputRow> nextIterator = Collections.emptyIterator();
@Override
public boolean hasMore()

View File

@ -22,7 +22,6 @@ package io.druid.firehose.kafka;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import com.google.common.collect.Iterators;
import com.google.common.collect.Maps;
import com.google.common.io.Closeables;
import io.druid.data.input.ByteBufferInputRowParser;
@ -38,6 +37,7 @@ import io.druid.java.util.emitter.EmittingLogger;
import java.io.Closeable;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@ -177,7 +177,7 @@ public class KafkaEightSimpleConsumerFirehoseFactory implements
private volatile boolean stopped;
private volatile BytesMessageWithOffset msg = null;
private volatile InputRow row = null;
private volatile Iterator<InputRow> nextIterator = Iterators.emptyIterator();
private volatile Iterator<InputRow> nextIterator = Collections.emptyIterator();
{
lastOffsetPartitions = Maps.newHashMap();
@ -212,7 +212,7 @@ public class KafkaEightSimpleConsumerFirehoseFactory implements
msg = messageQueue.take();
final byte[] message = msg.message();
nextIterator = message == null
? Iterators.emptyIterator()
? Collections.emptyIterator()
: firehoseParser.parseBatch(ByteBuffer.wrap(message)).iterator();
continue;
}

View File

@ -21,7 +21,6 @@ package io.druid.firehose.rabbitmq;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.Iterators;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
@ -46,6 +45,7 @@ import javax.annotation.Nullable;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.Iterator;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
@ -205,7 +205,7 @@ public class RabbitMQFirehoseFactory implements FirehoseFactory<InputRowParser<B
*/
private long lastDeliveryTag;
private Iterator<InputRow> nextIterator = Iterators.emptyIterator();
private Iterator<InputRow> nextIterator = Collections.emptyIterator();
@Override
public boolean hasMore()

View File

@ -23,7 +23,6 @@ package io.druid.firehose.kafka;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterators;
import com.google.common.collect.Sets;
import io.druid.data.input.Firehose;
import io.druid.data.input.FirehoseFactory;
@ -40,6 +39,7 @@ import kafka.message.InvalidMessageException;
import javax.annotation.Nullable;
import java.io.File;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@ -107,7 +107,7 @@ public class KafkaEightFirehoseFactory implements FirehoseFactory<InputRowParser
return new Firehose()
{
Iterator<InputRow> nextIterator = Iterators.emptyIterator();
Iterator<InputRow> nextIterator = Collections.emptyIterator();
@Override
public boolean hasMore()

View File

@ -23,9 +23,7 @@ import com.fasterxml.jackson.core.JsonProcessingException;
import com.google.common.base.Predicate;
import com.google.common.base.Strings;
import com.google.common.base.Throwables;
import com.google.common.io.ByteStreams;
import com.google.common.io.Files;
import com.google.common.io.OutputSupplier;
import io.druid.indexer.updater.HadoopDruidConverterConfig;
import io.druid.java.util.common.DateTimes;
import io.druid.java.util.common.FileUtils;
@ -90,6 +88,7 @@ public class JobHelper
{
return new Path(base, "classpath");
}
public static final String INDEX_ZIP = "index.zip";
public static final String DESCRIPTOR_JSON = "descriptor.json";
@ -277,18 +276,10 @@ public class JobHelper
static void uploadJar(File jarFile, final Path path, final FileSystem fs) throws IOException
{
log.info("Uploading jar to path[%s]", path);
ByteStreams.copy(
Files.newInputStreamSupplier(jarFile),
new OutputSupplier<OutputStream>()
{
@Override
public OutputStream getOutput() throws IOException
{
return fs.create(path);
try (OutputStream os = fs.create(path)) {
Files.asByteSource(jarFile).copyTo(os);
}
}
);
}
static boolean isSnapshot(File jarFile)
{
@ -562,8 +553,10 @@ public class JobHelper
DataSegmentPusher dataSegmentPusher
)
{
return new Path(prependFSIfNullScheme(fs, basePath),
dataSegmentPusher.makeIndexPathName(segmentTemplate, baseFileName));
return new Path(
prependFSIfNullScheme(fs, basePath),
dataSegmentPusher.makeIndexPathName(segmentTemplate, baseFileName)
);
}
public static Path makeTmpPath(
@ -576,7 +569,8 @@ public class JobHelper
{
return new Path(
prependFSIfNullScheme(fs, basePath),
StringUtils.format("./%s.%d",
StringUtils.format(
"./%s.%d",
dataSegmentPusher.makeIndexPathName(segmentTemplate, JobHelper.INDEX_ZIP),
taskAttemptID.getId()
)

View File

@ -46,6 +46,7 @@ import io.druid.indexing.common.index.RealtimeAppenderatorTuningConfig;
import io.druid.java.util.common.DateTimes;
import io.druid.java.util.common.ISE;
import io.druid.java.util.common.StringUtils;
import io.druid.java.util.common.concurrent.ListenableFutures;
import io.druid.java.util.common.guava.CloseQuietly;
import io.druid.java.util.common.parsers.ParseException;
import io.druid.java.util.emitter.EmittingLogger;
@ -388,9 +389,9 @@ public class AppenderatorDriverRealtimeIndexTask extends AbstractTask
/**
* Is a firehose from this factory drainable by closing it? If so, we should drain on stopGracefully rather than
* abruptly stopping.
*
* <p>
* This is a hack to get around the fact that the Firehose and FirehoseFactory interfaces do not help us do this.
*
* <p>
* Protected for tests.
*/
protected boolean isFirehoseDrainableByClosing(FirehoseFactory firehoseFactory)
@ -431,15 +432,12 @@ public class AppenderatorDriverRealtimeIndexTask extends AbstractTask
String sequenceName
)
{
ListenableFuture<SegmentsAndMetadata> publishFuture = driver.publish(
final ListenableFuture<SegmentsAndMetadata> publishFuture = driver.publish(
publisher,
committerSupplier.get(),
Collections.singletonList(sequenceName)
);
ListenableFuture<SegmentsAndMetadata> handoffFuture = Futures.transform(publishFuture, driver::registerHandoff);
pendingHandoffs.add(handoffFuture);
pendingHandoffs.add(ListenableFutures.transformAsync(publishFuture, driver::registerHandoff));
}
private void waitForSegmentPublishAndHandoff(long timeout) throws InterruptedException, ExecutionException,

View File

@ -59,7 +59,7 @@ public class FileTaskLogsTest
final Map<Long, String> expected = ImmutableMap.of(0L, "blah", 1L, "lah", -2L, "ah", -5L, "blah");
for (Map.Entry<Long, String> entry : expected.entrySet()) {
final byte[] bytes = ByteStreams.toByteArray(taskLogs.streamTaskLog("foo", entry.getKey()).get().getInput());
final byte[] bytes = ByteStreams.toByteArray(taskLogs.streamTaskLog("foo", entry.getKey()).get().openStream());
final String string = StringUtils.fromUtf8(bytes);
Assert.assertEquals(StringUtils.format("Read with offset %,d", entry.getKey()), string, entry.getValue());
}

View File

@ -0,0 +1,75 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.java.util.common.concurrent;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import javax.annotation.Nullable;
import java.util.function.Function;
public class ListenableFutures
{
/**
* Guava 19 changes the Futures.transform signature so that the async form is different. This is here as a
* compatability layer until such a time as druid only supports Guava 19 or later, in which case
* Futures.transformAsync should be used
*
* This is NOT copied from guava.
*/
public static <I, O> ListenableFuture<O> transformAsync(
final ListenableFuture<I> inFuture,
final Function<I, ListenableFuture<O>> transform
)
{
final SettableFuture<O> finalFuture = SettableFuture.create();
Futures.addCallback(inFuture, new FutureCallback<I>()
{
@Override
public void onSuccess(@Nullable I result)
{
final ListenableFuture<O> transformFuture = transform.apply(result);
Futures.addCallback(transformFuture, new FutureCallback<O>()
{
@Override
public void onSuccess(@Nullable O result)
{
finalFuture.set(result);
}
@Override
public void onFailure(Throwable t)
{
finalFuture.setException(t);
}
});
}
@Override
public void onFailure(Throwable t)
{
finalFuture.setException(t);
}
});
return finalFuture;
}
}

View File

@ -352,6 +352,11 @@
<artifactId>guice-multibindings</artifactId>
<version>${guice.version}</version>
</dependency>
<dependency>
<groupId>com.google.errorprone</groupId>
<artifactId>error_prone_annotations</artifactId>
<version>2.2.0</version>
</dependency>
<dependency>
<groupId>com.ibm.icu</groupId>
<artifactId>icu4j</artifactId>

View File

@ -82,6 +82,10 @@
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
</dependency>
<dependency>
<groupId>com.google.errorprone</groupId>
<artifactId>error_prone_annotations</artifactId>
</dependency>
<dependency>
<groupId>com.ibm.icu</groupId>
<artifactId>icu4j</artifactId>

View File

@ -22,7 +22,6 @@ package io.druid.query.groupby;
import com.google.common.base.Function;
import com.google.common.base.Supplier;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.inject.Inject;
@ -57,6 +56,7 @@ import javax.annotation.Nullable;
import java.io.Closeable;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@ -320,7 +320,7 @@ public class GroupByQueryEngine
this.maxIntermediateRows = querySpecificConfig.getMaxIntermediateRows();
unprocessedKeys = null;
delegate = Iterators.emptyIterator();
delegate = Collections.emptyIterator();
dimensionSpecs = query.getDimensions();
dimensions = Lists.newArrayListWithExpectedSize(dimensionSpecs.size());
dimNames = Lists.newArrayListWithExpectedSize(dimensionSpecs.size());

View File

@ -20,7 +20,6 @@
package io.druid.query.groupby.epinephelinae;
import com.google.common.base.Supplier;
import com.google.common.collect.Iterators;
import io.druid.java.util.common.CloseableIterators;
import io.druid.java.util.common.IAE;
import io.druid.java.util.common.parsers.CloseableIterator;
@ -170,7 +169,7 @@ public class BufferHashGrouper<KeyType> extends AbstractBufferHashGrouper<KeyTyp
if (!initialized) {
// it's possible for iterator() to be called before initialization when
// a nested groupBy's subquery has an empty result set (see testEmptySubquery() in GroupByQueryRunnerTest)
return CloseableIterators.withEmptyBaggage(Iterators.<Entry<KeyType>>emptyIterator());
return CloseableIterators.withEmptyBaggage(Collections.<Entry<KeyType>>emptyIterator());
}
if (sorted) {

View File

@ -20,7 +20,6 @@
package io.druid.query.groupby.epinephelinae;
import com.google.common.base.Supplier;
import com.google.common.collect.Iterators;
import io.druid.java.util.common.CloseableIterators;
import io.druid.java.util.common.IAE;
import io.druid.java.util.common.ISE;
@ -205,7 +204,7 @@ public class LimitedBufferHashGrouper<KeyType> extends AbstractBufferHashGrouper
// it's possible for iterator() to be called before initialization when
// a nested groupBy's subquery has an empty result set (see testEmptySubqueryWithLimitPushDown()
// in GroupByQueryRunnerTest)
return CloseableIterators.withEmptyBaggage(Iterators.<Entry<KeyType>>emptyIterator());
return CloseableIterators.withEmptyBaggage(Collections.<Entry<KeyType>>emptyIterator());
}
if (sortHasNonGroupingFields) {
@ -377,6 +376,7 @@ public class LimitedBufferHashGrouper<KeyType> extends AbstractBufferHashGrouper
aggregatorFactories,
aggregatorOffsets
);
@Override
public int compare(Integer o1, Integer o2)
{

View File

@ -19,13 +19,13 @@
package io.druid.query.groupby.orderby;
import com.google.common.collect.Iterators;
import com.google.common.collect.MinMaxPriorityQueue;
import com.google.common.collect.Ordering;
import io.druid.java.util.common.guava.Accumulator;
import io.druid.java.util.common.guava.BaseSequence;
import io.druid.java.util.common.guava.Sequence;
import java.util.Collections;
import java.util.Iterator;
public class TopNSequence<T> extends BaseSequence<T, Iterator<T>>
@ -43,7 +43,7 @@ public class TopNSequence<T> extends BaseSequence<T, Iterator<T>>
public Iterator<T> make()
{
if (limit <= 0) {
return Iterators.emptyIterator();
return Collections.emptyIterator();
}
// Materialize the topN values

View File

@ -26,7 +26,6 @@ import com.google.common.base.Function;
import com.google.common.base.Optional;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Ordering;
@ -40,21 +39,21 @@ import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.inject.Inject;
import io.druid.java.util.emitter.EmittingLogger;
import io.druid.client.cache.Cache;
import io.druid.client.cache.CacheConfig;
import io.druid.client.selector.QueryableDruidServer;
import io.druid.client.selector.ServerSelector;
import io.druid.java.util.common.concurrent.Execs;
import io.druid.guice.annotations.BackgroundCaching;
import io.druid.guice.annotations.Smile;
import io.druid.java.util.common.Intervals;
import io.druid.java.util.common.Pair;
import io.druid.java.util.common.StringUtils;
import io.druid.java.util.common.concurrent.Execs;
import io.druid.java.util.common.guava.BaseSequence;
import io.druid.java.util.common.guava.LazySequence;
import io.druid.java.util.common.guava.Sequence;
import io.druid.java.util.common.guava.Sequences;
import io.druid.java.util.emitter.EmittingLogger;
import io.druid.query.BySegmentResultValueClass;
import io.druid.query.CacheStrategy;
import io.druid.query.Query;
@ -255,7 +254,8 @@ public class CachingClusteredClient implements QuerySegmentWalker
Sequence<T> run(final UnaryOperator<TimelineLookup<String, ServerSelector>> timelineConverter)
{
@Nullable TimelineLookup<String, ServerSelector> timeline = serverView.getTimeline(query.getDataSource());
@Nullable
TimelineLookup<String, ServerSelector> timeline = serverView.getTimeline(query.getDataSource());
if (timeline == null) {
return Sequences.empty();
}
@ -265,10 +265,13 @@ public class CachingClusteredClient implements QuerySegmentWalker
}
final Set<ServerToSegment> segments = computeSegmentsToQuery(timeline);
@Nullable final byte[] queryCacheKey = computeQueryCacheKey();
@Nullable
final byte[] queryCacheKey = computeQueryCacheKey();
if (query.getContext().get(QueryResource.HEADER_IF_NONE_MATCH) != null) {
@Nullable final String prevEtag = (String) query.getContext().get(QueryResource.HEADER_IF_NONE_MATCH);
@Nullable final String currentEtag = computeCurrentEtag(segments, queryCacheKey);
@Nullable
final String prevEtag = (String) query.getContext().get(QueryResource.HEADER_IF_NONE_MATCH);
@Nullable
final String currentEtag = computeCurrentEtag(segments, queryCacheKey);
if (currentEtag != null && currentEtag.equals(prevEtag)) {
return Sequences.empty();
}
@ -509,7 +512,7 @@ public class CachingClusteredClient implements QuerySegmentWalker
{
try {
if (cachedResult.length == 0) {
return Iterators.emptyIterator();
return Collections.emptyIterator();
}
return objectMapper.readValues(

View File

@ -23,7 +23,6 @@ import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Function;
import com.google.common.base.Throwables;
import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
@ -117,7 +116,7 @@ public class CachingQueryRunner<T> implements QueryRunner<T>
{
try {
if (cachedResult.length == 0) {
return Iterators.emptyIterator();
return Collections.emptyIterator();
}
return mapper.readValues(

View File

@ -22,10 +22,10 @@ package io.druid.segment.realtime.appenderator;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import io.druid.data.input.InputRow;
import io.druid.java.util.common.ISE;
import io.druid.java.util.common.concurrent.ListenableFutures;
import io.druid.segment.realtime.appenderator.SegmentWithState.SegmentState;
import io.druid.timeline.DataSegment;
@ -41,11 +41,11 @@ import java.util.stream.Collectors;
/**
* This class is specifialized for batch ingestion. In batch ingestion, the segment lifecycle is like:
*
* <p>
* <pre>
* APPENDING -> PUSHED_AND_DROPPED -> PUBLISHED
* </pre>
*
* <p>
* <ul>
* <li>APPENDING: Segment is available for appending.</li>
* <li>PUSHED_AND_DROPPED: Segment is pushed to deep storage and dropped from the local storage.</li>
@ -72,7 +72,7 @@ public class BatchAppenderatorDriver extends BaseAppenderatorDriver
/**
* This method always returns null because batch ingestion doesn't support restoring tasks on failures.
*
* @return always null
*/
@Override
@ -132,7 +132,7 @@ public class BatchAppenderatorDriver extends BaseAppenderatorDriver
.map(SegmentWithState::getSegmentIdentifier)
.collect(Collectors.toList());
final ListenableFuture<SegmentsAndMetadata> future = Futures.transform(
final ListenableFuture<SegmentsAndMetadata> future = ListenableFutures.transformAsync(
pushInBackground(null, segmentIdentifierList),
this::dropInBackground
);
@ -195,10 +195,12 @@ public class BatchAppenderatorDriver extends BaseAppenderatorDriver
.values()
.stream()
.flatMap(SegmentsForSequence::segmentStateStream)
.map(segmentWithState -> Preconditions.checkNotNull(
.map(segmentWithState -> Preconditions
.checkNotNull(
segmentWithState.getDataSegment(),
"dataSegment for segmentId[%s]",
segmentWithState.getSegmentIdentifier())
segmentWithState.getSegmentIdentifier()
)
)
.collect(Collectors.toList()),
null

View File

@ -24,7 +24,6 @@ import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.base.Supplier;
import com.google.common.base.Throwables;
import com.google.common.util.concurrent.AsyncFunction;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
@ -33,6 +32,7 @@ import com.google.common.util.concurrent.SettableFuture;
import io.druid.data.input.Committer;
import io.druid.data.input.InputRow;
import io.druid.java.util.common.ISE;
import io.druid.java.util.common.concurrent.ListenableFutures;
import io.druid.java.util.common.logger.Logger;
import io.druid.query.SegmentDescriptor;
import io.druid.segment.realtime.FireDepartmentMetrics;
@ -210,7 +210,7 @@ public class StreamAppenderatorDriver extends BaseAppenderatorDriver
/**
* Persist all data indexed through this driver so far. Blocks until complete.
* <p>
*
* Should be called after all data has been added through {@link #add(InputRow, String, Supplier, boolean, boolean)}.
*
* @param committer committer representing all data that has been added so far
@ -236,7 +236,7 @@ public class StreamAppenderatorDriver extends BaseAppenderatorDriver
/**
* Persist all data indexed through this driver so far. Returns a future of persisted commitMetadata.
* <p>
*
* Should be called after all data has been added through {@link #add(InputRow, String, Supplier, boolean, boolean)}.
*
* @param committer committer representing all data that has been added so far
@ -269,21 +269,20 @@ public class StreamAppenderatorDriver extends BaseAppenderatorDriver
.map(SegmentWithState::getSegmentIdentifier)
.collect(Collectors.toList());
final ListenableFuture<SegmentsAndMetadata> publishFuture = Futures.transform(
final ListenableFuture<SegmentsAndMetadata> publishFuture = ListenableFutures.transformAsync(
pushInBackground(wrapCommitter(committer), theSegments),
(AsyncFunction<SegmentsAndMetadata, SegmentsAndMetadata>) segmentsAndMetadata -> publishInBackground(
segmentsAndMetadata,
sam -> publishInBackground(
sam,
publisher
)
);
return Futures.transform(
publishFuture,
(Function<SegmentsAndMetadata, SegmentsAndMetadata>) segmentsAndMetadata -> {
(Function<? super SegmentsAndMetadata, ? extends SegmentsAndMetadata>) sam -> {
synchronized (segments) {
sequenceNames.forEach(segments::remove);
}
return segmentsAndMetadata;
return sam;
}
);
}
@ -378,7 +377,7 @@ public class StreamAppenderatorDriver extends BaseAppenderatorDriver
final Collection<String> sequenceNames
)
{
return Futures.transform(
return ListenableFutures.transformAsync(
publish(publisher, committer, sequenceNames),
this::registerHandoff
);

View File

@ -21,7 +21,6 @@ package io.druid.segment.realtime.firehose;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
import com.ircclouds.irc.api.Callback;
import com.ircclouds.irc.api.IRCApi;
@ -42,6 +41,7 @@ import org.joda.time.DateTime;
import javax.annotation.Nullable;
import java.io.File;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.UUID;
@ -188,7 +188,7 @@ public class IrcFirehoseFactory implements FirehoseFactory<InputRowParser<Pair<D
return new Firehose()
{
InputRow nextRow = null;
Iterator<InputRow> nextIterator = Iterators.emptyIterator();
Iterator<InputRow> nextIterator = Collections.emptyIterator();
@Override
public boolean hasMore()