remove ListenableFutures and revert to using the Guava implementation (#9944)

This change removes ListenableFutures.transformAsync in favor of the
existing Guava Futures.transform implementation. Our own implementation
had a bug which did not fail the future if the applied function threw an
exception, resulting in the future never completing.

An attempt was made to fix this bug, however when running againts Guava's own
tests, our version failed another half dozen tests, so it was decided to not
continue down that path and scrap our own implementation.

Explanation for how was this bug manifested itself:

An exception thrown in BaseAppenderatorDriver.publishInBackground when
invoked via transformAsync in StreamAppenderatorDriver.publish will
cause the resulting future to never complete.

This explains why when encountering https://github.com/apache/druid/issues/9845
the task will never complete, forever waiting for the publishFuture to
register the handoff. As a result, the corresponding "Error while
publishing segments ..." message only gets logged once the index task
times out and is forcefully shutdown when the future is force-cancelled
by the executor.
This commit is contained in:
Xavier Léauté 2020-06-03 10:46:03 -07:00 committed by GitHub
parent 3d81564a14
commit a934b2664c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 22 additions and 92 deletions

View File

@ -24,7 +24,6 @@ com.google.common.io.Files#createTempDir() @ Use org.apache.druid.java.util.comm
com.google.common.util.concurrent.MoreExecutors#sameThreadExecutor() @ Use org.apache.druid.java.util.common.concurrent.Execs#directExecutor()
com.google.common.util.concurrent.MoreExecutors#newDirectExecutorService() @ Use org.apache.druid.java.util.common.concurrent.Execs#directExecutor()
com.google.common.util.concurrent.MoreExecutors#directExecutor() @ Use org.apache.druid.java.util.common.concurrent.Execs#directExecutor()
com.google.common.util.concurrent.Futures#transform(com.google.common.util.concurrent.ListenableFuture, com.google.common.util.concurrent.AsyncFunction) @ Use org.apache.druid.java.util.common.concurrent.ListenableFutures#transformAsync
java.io.File#toURL() @ Use java.io.File#toURI() and java.net.URI#toURL() instead
java.lang.String#matches(java.lang.String) @ Use startsWith(), endsWith(), contains(), or compile and cache a Pattern explicitly
java.lang.String#replace(java.lang.CharSequence,java.lang.CharSequence) @ Use one of the appropriate methods in StringUtils instead

View File

@ -1,75 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.java.util.common.concurrent;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import javax.annotation.Nullable;
import java.util.function.Function;
public class ListenableFutures
{
/**
* Guava 19 changes the Futures.transform signature so that the async form is different. This is here as a
* compatability layer until such a time as druid only supports Guava 19 or later, in which case
* Futures.transformAsync should be used
*
* This is NOT copied from guava.
*/
public static <I, O> ListenableFuture<O> transformAsync(
final ListenableFuture<I> inFuture,
final Function<I, ListenableFuture<O>> transform
)
{
final SettableFuture<O> finalFuture = SettableFuture.create();
Futures.addCallback(inFuture, new FutureCallback<I>()
{
@Override
public void onSuccess(@Nullable I result)
{
final ListenableFuture<O> transformFuture = transform.apply(result);
Futures.addCallback(transformFuture, new FutureCallback<O>()
{
@Override
public void onSuccess(@Nullable O result)
{
finalFuture.set(result);
}
@Override
public void onFailure(Throwable t)
{
finalFuture.setException(t);
}
});
}
@Override
public void onFailure(Throwable t)
{
finalFuture.setException(t);
}
});
return finalFuture;
}
}

View File

@ -27,6 +27,8 @@ import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.util.concurrent.AsyncFunction;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import org.apache.curator.test.TestingCluster;
@ -74,7 +76,6 @@ import org.apache.druid.indexing.test.TestDataSegmentKiller;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.concurrent.ListenableFutures;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.java.util.emitter.core.NoopEmitter;
@ -900,12 +901,12 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase
final ListenableFuture<TaskStatus> normalReplicaFuture = runTask(normalReplica);
// Simulating one replica is slower than the other
final ListenableFuture<TaskStatus> staleReplicaFuture = ListenableFutures.transformAsync(
final ListenableFuture<TaskStatus> staleReplicaFuture = Futures.transform(
taskExec.submit(() -> {
Thread.sleep(1000);
return staleReplica;
}),
this::runTask
(AsyncFunction<Task, TaskStatus>) this::runTask
);
while (normalReplica.getRunner().getStatus() != Status.PAUSED) {

View File

@ -30,6 +30,8 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.util.concurrent.AsyncFunction;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.inject.name.Named;
@ -78,7 +80,6 @@ import org.apache.druid.indexing.test.TestDataSegmentKiller;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.concurrent.ListenableFutures;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.java.util.emitter.core.NoopEmitter;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
@ -2429,12 +2430,12 @@ public class KinesisIndexTaskTest extends SeekableStreamIndexTaskTestBase
((TestableKinesisIndexTask) staleReplica).setLocalSupplier(recordSupplier2);
final ListenableFuture<TaskStatus> normalReplicaFuture = runTask(normalReplica);
// Simulating one replica is slower than the other
final ListenableFuture<TaskStatus> staleReplicaFuture = ListenableFutures.transformAsync(
final ListenableFuture<TaskStatus> staleReplicaFuture = Futures.transform(
taskExec.submit(() -> {
Thread.sleep(1000);
return staleReplica;
}),
this::runTask
(AsyncFunction<Task, TaskStatus>) this::runTask
);
while (normalReplica.getRunner().getStatus() != SeekableStreamIndexTaskRunner.Status.PAUSED) {

View File

@ -29,6 +29,7 @@ import com.google.common.base.Preconditions;
import com.google.common.base.Supplier;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.AsyncFunction;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import org.apache.commons.io.FileUtils;
@ -65,7 +66,6 @@ import org.apache.druid.indexing.common.stats.TaskRealtimeMetricsMonitor;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.concurrent.ListenableFutures;
import org.apache.druid.java.util.common.guava.CloseQuietly;
import org.apache.druid.java.util.common.parsers.ParseException;
import org.apache.druid.java.util.emitter.EmittingLogger;
@ -682,7 +682,10 @@ public class AppenderatorDriverRealtimeIndexTask extends AbstractTask implements
committerSupplier.get(),
Collections.singletonList(sequenceName)
);
pendingHandoffs.add(ListenableFutures.transformAsync(publishFuture, driver::registerHandoff));
pendingHandoffs.add(Futures.transform(
publishFuture,
(AsyncFunction<SegmentsAndCommitMetadata, SegmentsAndCommitMetadata>) driver::registerHandoff
));
}
private void waitForSegmentPublishAndHandoff(long timeout) throws InterruptedException, ExecutionException,

View File

@ -22,10 +22,11 @@ package org.apache.druid.segment.realtime.appenderator;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.AsyncFunction;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.concurrent.ListenableFutures;
import org.apache.druid.segment.loading.DataSegmentKiller;
import org.apache.druid.segment.realtime.appenderator.SegmentWithState.SegmentState;
import org.apache.druid.timeline.DataSegment;
@ -138,9 +139,9 @@ public class BatchAppenderatorDriver extends BaseAppenderatorDriver
{
final Set<SegmentIdWithShardSpec> requestedSegmentIdsForSequences = getAppendingSegments(sequenceNames);
final ListenableFuture<SegmentsAndCommitMetadata> future = ListenableFutures.transformAsync(
final ListenableFuture<SegmentsAndCommitMetadata> future = Futures.transform(
pushInBackground(null, requestedSegmentIdsForSequences, false),
this::dropInBackground
(AsyncFunction<SegmentsAndCommitMetadata, SegmentsAndCommitMetadata>) this::dropInBackground
);
final SegmentsAndCommitMetadata segmentsAndCommitMetadata =

View File

@ -23,6 +23,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.base.Supplier;
import com.google.common.util.concurrent.AsyncFunction;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
@ -32,7 +33,6 @@ import org.apache.druid.data.input.InputRow;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.concurrent.ListenableFutures;
import org.apache.druid.java.util.common.guava.Comparators;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.query.SegmentDescriptor;
@ -273,11 +273,11 @@ public class StreamAppenderatorDriver extends BaseAppenderatorDriver
{
final List<SegmentIdWithShardSpec> theSegments = getSegmentIdsWithShardSpecs(sequenceNames);
final ListenableFuture<SegmentsAndCommitMetadata> publishFuture = ListenableFutures.transformAsync(
final ListenableFuture<SegmentsAndCommitMetadata> publishFuture = Futures.transform(
// useUniquePath=true prevents inconsistencies in segment data when task failures or replicas leads to a second
// version of a segment with the same identifier containing different data; see DataSegmentPusher.push() docs
pushInBackground(wrapCommitter(committer), theSegments, true),
sam -> publishInBackground(
(AsyncFunction<SegmentsAndCommitMetadata, SegmentsAndCommitMetadata>) sam -> publishInBackground(
null,
sam,
publisher
@ -386,9 +386,9 @@ public class StreamAppenderatorDriver extends BaseAppenderatorDriver
final Collection<String> sequenceNames
)
{
return ListenableFutures.transformAsync(
return Futures.transform(
publish(publisher, committer, sequenceNames),
this::registerHandoff
(AsyncFunction<SegmentsAndCommitMetadata, SegmentsAndCommitMetadata>) this::registerHandoff
);
}