From d87056e708c9a7e7ee530131ec41a6c8ccde675d Mon Sep 17 00:00:00 2001
From: Tejaswini Bandlamudi <96047043+tejaswini-imply@users.noreply.github.com>
Date: Tue, 22 Aug 2023 12:09:53 +0530
Subject: [PATCH] Upgrade guava version to 31.1-jre (#14767)
Currently, Druid is using Guava 16.0.1 version. This upgrade to 31.1-jre fixes the following issues.
CVE-2018-10237 (Unbounded memory allocation in Google Guava 11.0 through 24.x before 24.1.1 allows remote attackers to conduct denial of service attacks against servers that depend on this library and deserialize attacker-provided data because the AtomicDoubleArray class (when serialized with Java serialization) and the CompoundOrdering class (when serialized with GWT serialization) perform eager allocation without appropriate checks on what a client has sent and whether the data size is reasonable). We don't use Java or GWT serializations. Despite being false positive they're causing red security scans on Druid distribution.
Latest version of google-client-api is incompatible with the existing Guava version. This PR unblocks Update google client apis to latest version #14414
---
codestyle/guava16-forbidden-apis.txt | 3 ---
.../cloudfiles-extensions/pom.xml | 2 +-
.../CloudFilesStorageDruidModule.java | 2 +-
.../compressed-bigdecimal/pom.xml | 1 -
.../overlord/KubernetesTaskRunnerTest.java | 1 +
.../indexing/kafka/KafkaIndexTaskTest.java | 5 ++--
.../kinesis/KinesisIndexTaskTest.java | 5 ++--
.../apache/druid/msq/exec/ControllerImpl.java | 2 +-
.../exec/ExceptionWrappingWorkerClient.java | 4 ++-
.../org/apache/druid/msq/exec/WorkerImpl.java | 25 ++++++++++++-------
.../druid/msq/exec/WorkerSketchFetcher.java | 3 ++-
.../indexing/client/IndexerWorkerClient.java | 4 ++-
.../input/WorkerInputChannelFactory.java | 4 ++-
.../apache/druid/msq/util/MSQFutureUtils.java | 4 ++-
.../msq/test/MSQTestControllerContext.java | 2 +-
.../protobuf/ProtobufInputFormatTest.java | 2 +-
.../AppenderatorDriverRealtimeIndexTask.java | 6 +++--
.../parallel/ParallelIndexPhaseRunner.java | 4 ++-
.../overlord/QuotableWhiteSpaceSplitter.java | 2 +-
.../indexing/overlord/RemoteTaskRunner.java | 6 +++--
.../druid/indexing/overlord/TaskLockbox.java | 9 ++++---
.../overlord/hrtr/HttpRemoteTaskRunner.java | 3 ++-
...eekableStreamIndexTaskClientAsyncImpl.java | 7 ++++--
.../SeekableStreamIndexTaskRunner.java | 14 ++++++++---
.../supervisor/SeekableStreamSupervisor.java | 21 ++++++++++------
.../indexing/worker/WorkerTaskManager.java | 4 ++-
.../worker/executor/ExecutorLifecycle.java | 4 ++-
.../worker/http/TaskManagementResource.java | 4 ++-
.../indexing/common/TaskToolboxTest.java | 1 +
...penderatorDriverRealtimeIndexTaskTest.java | 2 +-
...stractParallelIndexSupervisorTaskTest.java | 3 ++-
.../indexing/overlord/TestTaskRunner.java | 3 ++-
licenses.yaml | 22 +++++++++++++++-
owasp-dependency-check-suppressions.xml | 14 -----------
pom.xml | 9 +++++--
.../druid/common/guava/FutureUtils.java | 10 +++++---
.../processor/FrameProcessorExecutor.java | 5 +++-
.../frame/processor/RunAllFullyWidget.java | 5 +++-
.../org/apache/druid/indexer/TaskStatus.java | 21 ++++++++--------
...erializablePairLongStringColumnHeader.java | 10 ++++----
.../collections/CombiningIteratorTest.java | 1 +
.../spatial/ImmutableRTreeTest.java | 1 +
.../druid/common/utils/JodaUtilsTest.java | 1 +
.../apache/druid/math/expr/ParserTest.java | 2 +-
.../ChainedExecutionQueryRunnerTest.java | 1 +
...etricsEmittingQueryProcessingPoolTest.java | 1 +
.../druid/client/HttpServerInventoryView.java | 2 +-
.../druid/client/indexing/QueryStatus.java | 12 ++++-----
.../metadata/SQLMetadataRuleManager.java | 5 ++--
.../indexing/SpecificTaskServiceLocator.java | 4 ++-
.../appenderator/AppenderatorImpl.java | 4 ++-
.../appenderator/AppenderatorPlumber.java | 7 ++++--
.../appenderator/BaseAppenderatorDriver.java | 3 ++-
.../appenderator/BatchAppenderator.java | 3 ++-
.../appenderator/BatchAppenderatorDriver.java | 6 +++--
.../appenderator/StreamAppenderator.java | 4 ++-
.../StreamAppenderatorDriver.java | 17 ++++++++-----
.../org/apache/druid/server/DruidNode.java | 2 +-
.../server/http/HostAndPortWithScheme.java | 2 +-
.../server/http/SegmentListerResource.java | 7 ++++--
.../cache/LookupCoordinatorManager.java | 3 ++-
.../client/CachingClusteredClientTest.java | 3 ++-
.../druid/client/JsonParserIteratorTest.java | 2 +-
.../discovery/DruidLeaderClientTest.java | 1 +
.../SqlSegmentsMetadataManagerTest.java | 2 +-
.../StreamAppenderatorDriverFailTest.java | 4 ++-
.../ChangeRequestHistoryTest.java | 7 ++++--
67 files changed, 234 insertions(+), 131 deletions(-)
delete mode 100644 codestyle/guava16-forbidden-apis.txt
diff --git a/codestyle/guava16-forbidden-apis.txt b/codestyle/guava16-forbidden-apis.txt
deleted file mode 100644
index 274a12696fb..00000000000
--- a/codestyle/guava16-forbidden-apis.txt
+++ /dev/null
@@ -1,3 +0,0 @@
-# Those signatures are only available in Guava 16:
-com.google.common.util.concurrent.MoreExecutors#sameThreadExecutor() @ Use org.apache.druid.java.util.common.concurrent.Execs#directExecutor()
-com.google.common.base.Objects#firstNonNull(java.lang.Object, java.lang.Object) @ Use org.apache.druid.common.guava.GuavaUtils#firstNonNull(java.lang.Object, java.lang.Object) instead (probably... the GuavaUtils method return object is nullable)
diff --git a/extensions-contrib/cloudfiles-extensions/pom.xml b/extensions-contrib/cloudfiles-extensions/pom.xml
index 14c98f24e06..25e00afbf86 100644
--- a/extensions-contrib/cloudfiles-extensions/pom.xml
+++ b/extensions-contrib/cloudfiles-extensions/pom.xml
@@ -35,7 +35,7 @@
UTF-8
- 2.0.0
+ 2.5.0
diff --git a/extensions-contrib/cloudfiles-extensions/src/main/java/org/apache/druid/storage/cloudfiles/CloudFilesStorageDruidModule.java b/extensions-contrib/cloudfiles-extensions/src/main/java/org/apache/druid/storage/cloudfiles/CloudFilesStorageDruidModule.java
index c54342fd32d..224be81125f 100644
--- a/extensions-contrib/cloudfiles-extensions/src/main/java/org/apache/druid/storage/cloudfiles/CloudFilesStorageDruidModule.java
+++ b/extensions-contrib/cloudfiles-extensions/src/main/java/org/apache/druid/storage/cloudfiles/CloudFilesStorageDruidModule.java
@@ -32,7 +32,7 @@ import org.apache.druid.initialization.DruidModule;
import org.apache.druid.java.util.common.logger.Logger;
import org.jclouds.ContextBuilder;
import org.jclouds.logging.slf4j.config.SLF4JLoggingModule;
-import org.jclouds.openstack.v2_0.config.InternalUrlModule;
+import org.jclouds.openstack.keystone.catalog.config.InternalUrlModule;
import org.jclouds.osgi.ProviderRegistry;
import org.jclouds.rackspace.cloudfiles.uk.CloudFilesUKProviderMetadata;
import org.jclouds.rackspace.cloudfiles.us.CloudFilesUSProviderMetadata;
diff --git a/extensions-contrib/compressed-bigdecimal/pom.xml b/extensions-contrib/compressed-bigdecimal/pom.xml
index a05e757a7a6..76612897ef3 100644
--- a/extensions-contrib/compressed-bigdecimal/pom.xml
+++ b/extensions-contrib/compressed-bigdecimal/pom.xml
@@ -131,7 +131,6 @@
com.google.guava
guava
- 16.0.1
provided
diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerTest.java b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerTest.java
index 613e3b1031e..511c2de352f 100644
--- a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerTest.java
+++ b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerTest.java
@@ -67,6 +67,7 @@ import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
@RunWith(EasyMockRunner.class)
+@SuppressWarnings("DoNotMock")
public class KubernetesTaskRunnerTest extends EasyMockSupport
{
private static final String ID = "id";
diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
index 04393cb914b..f1001174266 100644
--- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
+++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
@@ -1057,12 +1057,13 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase
final ListenableFuture normalReplicaFuture = runTask(normalReplica);
// Simulating one replica is slower than the other
- final ListenableFuture staleReplicaFuture = Futures.transform(
+ final ListenableFuture staleReplicaFuture = Futures.transformAsync(
taskExec.submit(() -> {
Thread.sleep(1000);
return staleReplica;
}),
- (AsyncFunction) this::runTask
+ (AsyncFunction) this::runTask,
+ MoreExecutors.directExecutor()
);
while (normalReplica.getRunner().getStatus() != Status.PAUSED) {
diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java
index b12ec56de93..69516979f3e 100644
--- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java
+++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java
@@ -2004,12 +2004,13 @@ public class KinesisIndexTaskTest extends SeekableStreamIndexTaskTestBase
((TestableKinesisIndexTask) staleReplica).setLocalSupplier(recordSupplier2);
final ListenableFuture normalReplicaFuture = runTask(normalReplica);
// Simulating one replica is slower than the other
- final ListenableFuture staleReplicaFuture = Futures.transform(
+ final ListenableFuture staleReplicaFuture = Futures.transformAsync(
taskExec.submit(() -> {
Thread.sleep(1000);
return staleReplica;
}),
- (AsyncFunction) this::runTask
+ (AsyncFunction) this::runTask,
+ MoreExecutors.directExecutor()
);
waitUntil(normalReplica, this::isTaskPaused);
diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
index 9aa94e9e476..c9fdc6cf9d9 100644
--- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
+++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
@@ -1199,7 +1199,7 @@ public class ControllerImpl implements Controller
settableFuture.setException(t);
}
}
- });
+ }, MoreExecutors.directExecutor());
taskFutures.add(settableFuture);
}
diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ExceptionWrappingWorkerClient.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ExceptionWrappingWorkerClient.java
index eb6b1af529e..93dbc008004 100644
--- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ExceptionWrappingWorkerClient.java
+++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ExceptionWrappingWorkerClient.java
@@ -23,6 +23,7 @@ import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
import org.apache.druid.frame.channel.ReadableByteChunksFrameChannel;
import org.apache.druid.frame.key.ClusterByPartitions;
@@ -158,7 +159,8 @@ public class ExceptionWrappingWorkerClient implements WorkerClient
{
retVal.setException(new MSQException(t, new WorkerRpcFailedFault(workerTaskId)));
}
- }
+ },
+ MoreExecutors.directExecutor()
);
return retVal;
diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerImpl.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerImpl.java
index e96a933e023..090cc976093 100644
--- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerImpl.java
+++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerImpl.java
@@ -32,6 +32,7 @@ import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
import it.unimi.dsi.fastutil.bytes.ByteArrays;
import org.apache.druid.common.guava.FutureUtils;
@@ -1320,7 +1321,8 @@ public class WorkerImpl implements Worker
kernelHolder.getStageKernelMap().get(stageDef.getId()).fail(t)
);
}
- }
+ },
+ MoreExecutors.directExecutor()
);
}
@@ -1612,7 +1614,7 @@ public class WorkerImpl implements Worker
};
// Chain futures so we only sort one partition at a time.
- nextFuture = Futures.transform(
+ nextFuture = Futures.transformAsync(
nextFuture,
(AsyncFunction) ignored -> {
final SuperSorter sorter = new SuperSorter(
@@ -1639,7 +1641,8 @@ public class WorkerImpl implements Worker
);
return FutureUtils.transform(sorter.run(), r -> Iterables.getOnlyElement(r.getAllChannels()));
- }
+ },
+ MoreExecutors.directExecutor()
);
sortedChannelFutures.add(nextFuture);
@@ -1665,7 +1668,7 @@ public class WorkerImpl implements Worker
throw new ISE("Not initialized");
}
- return Futures.transform(
+ return Futures.transformAsync(
pipelineFuture,
(AsyncFunction, OutputChannels>) resultAndChannels ->
Futures.transform(
@@ -1673,8 +1676,10 @@ public class WorkerImpl implements Worker
(Function
com.google.inject
@@ -1434,7 +1440,6 @@
${project.parent.basedir}/codestyle/joda-time-forbidden-apis.txt
- ${project.parent.basedir}/codestyle/guava16-forbidden-apis.txt
${project.parent.basedir}/codestyle/druid-forbidden-apis.txt
diff --git a/processing/src/main/java/org/apache/druid/common/guava/FutureUtils.java b/processing/src/main/java/org/apache/druid/common/guava/FutureUtils.java
index cf9a7f0ef71..43ab45ef457 100644
--- a/processing/src/main/java/org/apache/druid/common/guava/FutureUtils.java
+++ b/processing/src/main/java/org/apache/druid/common/guava/FutureUtils.java
@@ -23,6 +23,7 @@ import com.google.common.util.concurrent.AsyncFunction;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
import org.apache.druid.java.util.common.Either;
import org.apache.druid.java.util.common.ISE;
@@ -107,11 +108,11 @@ public class FutureUtils
*/
public static ListenableFuture transform(final ListenableFuture future, final Function fn)
{
- return Futures.transform(future, fn::apply);
+ return Futures.transform(future, fn::apply, MoreExecutors.directExecutor());
}
/**
- * Like {@link Futures#transform(ListenableFuture, AsyncFunction)}, but works better with lambdas due to not having
+ * Like {@link Futures#transformAsync(ListenableFuture, AsyncFunction, java.util.concurrent.Executor)}, but works better with lambdas due to not having
* overloads.
*
* One can write {@code FutureUtils.transformAsync(future, v -> ...)} instead of
@@ -119,7 +120,7 @@ public class FutureUtils
*/
public static ListenableFuture transformAsync(final ListenableFuture future, final AsyncFunction fn)
{
- return Futures.transform(future, fn);
+ return Futures.transformAsync(future, fn, MoreExecutors.directExecutor());
}
/**
@@ -200,7 +201,8 @@ public class FutureUtils
retVal.setException(e);
}
- }
+ },
+ MoreExecutors.directExecutor()
);
return retVal;
diff --git a/processing/src/main/java/org/apache/druid/frame/processor/FrameProcessorExecutor.java b/processing/src/main/java/org/apache/druid/frame/processor/FrameProcessorExecutor.java
index d278b8f83dd..20f9f7f4fe3 100644
--- a/processing/src/main/java/org/apache/druid/frame/processor/FrameProcessorExecutor.java
+++ b/processing/src/main/java/org/apache/druid/frame/processor/FrameProcessorExecutor.java
@@ -28,6 +28,7 @@ import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
import com.google.errorprone.annotations.concurrent.GuardedBy;
import it.unimi.dsi.fastutil.ints.IntOpenHashSet;
@@ -61,6 +62,7 @@ import java.util.stream.Collectors;
* If you want single threaded execution, use {@code Execs.singleThreaded()}. It is not a good idea to use this with a
* same-thread executor like {@code Execs.directExecutor()}, because it will lead to deep call stacks.
*/
+@SuppressWarnings("CheckReturnValue")
public class FrameProcessorExecutor
{
private static final Logger log = new Logger(FrameProcessorExecutor.class);
@@ -286,7 +288,8 @@ public class FrameProcessorExecutor
fail(t);
}
}
- }
+ },
+ MoreExecutors.directExecutor()
);
}
diff --git a/processing/src/main/java/org/apache/druid/frame/processor/RunAllFullyWidget.java b/processing/src/main/java/org/apache/druid/frame/processor/RunAllFullyWidget.java
index dfc1dde9bd8..67724cdf804 100644
--- a/processing/src/main/java/org/apache/druid/frame/processor/RunAllFullyWidget.java
+++ b/processing/src/main/java/org/apache/druid/frame/processor/RunAllFullyWidget.java
@@ -23,6 +23,7 @@ import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
import com.google.errorprone.annotations.concurrent.GuardedBy;
import org.apache.druid.common.guava.FutureUtils;
@@ -51,6 +52,7 @@ import java.util.function.BiFunction;
* The {@code bouncer} and {@code maxOutstandingProcessors} parameters are used to control how many processors are
* executed on the {@link FrameProcessorExecutor} concurrently.
*/
+@SuppressWarnings("CheckReturnValue")
public class RunAllFullyWidget
{
private static final Logger log = new Logger(RunAllFullyWidget.class);
@@ -296,7 +298,8 @@ public class RunAllFullyWidget
cleanupIfNoMoreProcessors();
}
}
- }
+ },
+ MoreExecutors.directExecutor()
);
}
}
diff --git a/processing/src/main/java/org/apache/druid/indexer/TaskStatus.java b/processing/src/main/java/org/apache/druid/indexer/TaskStatus.java
index 714c08be65d..7006f13fdec 100644
--- a/processing/src/main/java/org/apache/druid/indexer/TaskStatus.java
+++ b/processing/src/main/java/org/apache/druid/indexer/TaskStatus.java
@@ -23,10 +23,11 @@ import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Objects;
+import com.google.common.base.MoreObjects;
import com.google.common.base.Preconditions;
import javax.annotation.Nullable;
+import java.util.Objects;
/**
* Represents the status of a task from the perspective of the coordinator. The task may be ongoing
@@ -220,12 +221,12 @@ public class TaskStatus
@Override
public String toString()
{
- return Objects.toStringHelper(this)
- .add("id", id)
- .add("status", status)
- .add("duration", duration)
- .add("errorMsg", errorMsg)
- .toString();
+ return MoreObjects.toStringHelper(this)
+ .add("id", id)
+ .add("status", status)
+ .add("duration", duration)
+ .add("errorMsg", errorMsg)
+ .toString();
}
@Override
@@ -239,14 +240,14 @@ public class TaskStatus
}
TaskStatus that = (TaskStatus) o;
return getDuration() == that.getDuration() &&
- java.util.Objects.equals(getId(), that.getId()) &&
+ Objects.equals(getId(), that.getId()) &&
status == that.status &&
- java.util.Objects.equals(getErrorMsg(), that.getErrorMsg());
+ Objects.equals(getErrorMsg(), that.getErrorMsg());
}
@Override
public int hashCode()
{
- return java.util.Objects.hash(getId(), status, getDuration(), getErrorMsg());
+ return Objects.hash(getId(), status, getDuration(), getErrorMsg());
}
}
diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/SerializablePairLongStringColumnHeader.java b/processing/src/main/java/org/apache/druid/query/aggregation/SerializablePairLongStringColumnHeader.java
index e0aa21af03c..e9ad87caee5 100644
--- a/processing/src/main/java/org/apache/druid/query/aggregation/SerializablePairLongStringColumnHeader.java
+++ b/processing/src/main/java/org/apache/druid/query/aggregation/SerializablePairLongStringColumnHeader.java
@@ -19,7 +19,7 @@
package org.apache.druid.query.aggregation;
-import com.google.common.base.Objects;
+import com.google.common.base.MoreObjects;
import com.google.common.base.Preconditions;
import org.apache.druid.segment.serde.cell.LongSerializer;
@@ -103,9 +103,9 @@ public class SerializablePairLongStringColumnHeader
@Override
public String toString()
{
- return Objects.toStringHelper(this)
- .add("bytes", bytes)
- .add("minValue", minValue)
- .toString();
+ return MoreObjects.toStringHelper(this)
+ .add("bytes", bytes)
+ .add("minValue", minValue)
+ .toString();
}
}
diff --git a/processing/src/test/java/org/apache/druid/collections/CombiningIteratorTest.java b/processing/src/test/java/org/apache/druid/collections/CombiningIteratorTest.java
index 2e25542efed..b2a543a5811 100644
--- a/processing/src/test/java/org/apache/druid/collections/CombiningIteratorTest.java
+++ b/processing/src/test/java/org/apache/druid/collections/CombiningIteratorTest.java
@@ -30,6 +30,7 @@ import java.util.Comparator;
import java.util.NoSuchElementException;
import java.util.function.BinaryOperator;
+@SuppressWarnings("DoNotMock")
public class CombiningIteratorTest
{
private CombiningIterator testingIterator;
diff --git a/processing/src/test/java/org/apache/druid/collections/spatial/ImmutableRTreeTest.java b/processing/src/test/java/org/apache/druid/collections/spatial/ImmutableRTreeTest.java
index cba7eaff209..4f28ee62642 100644
--- a/processing/src/test/java/org/apache/druid/collections/spatial/ImmutableRTreeTest.java
+++ b/processing/src/test/java/org/apache/druid/collections/spatial/ImmutableRTreeTest.java
@@ -48,6 +48,7 @@ import java.util.concurrent.TimeUnit;
/**
*/
+@SuppressWarnings("CheckReturnValue")
public class ImmutableRTreeTest
{
@Test
diff --git a/processing/src/test/java/org/apache/druid/common/utils/JodaUtilsTest.java b/processing/src/test/java/org/apache/druid/common/utils/JodaUtilsTest.java
index 2ee6bc31220..b5b9f138a8b 100644
--- a/processing/src/test/java/org/apache/druid/common/utils/JodaUtilsTest.java
+++ b/processing/src/test/java/org/apache/druid/common/utils/JodaUtilsTest.java
@@ -39,6 +39,7 @@ import java.util.TreeSet;
/**
*
*/
+@SuppressWarnings("CheckReturnValue")
public class JodaUtilsTest
{
@Test
diff --git a/processing/src/test/java/org/apache/druid/math/expr/ParserTest.java b/processing/src/test/java/org/apache/druid/math/expr/ParserTest.java
index f6f5eccd10e..acb6345a2e9 100644
--- a/processing/src/test/java/org/apache/druid/math/expr/ParserTest.java
+++ b/processing/src/test/java/org/apache/druid/math/expr/ParserTest.java
@@ -450,7 +450,7 @@ public class ParserTest extends InitializedNullHandlingTest
public void testFunctions()
{
validateParser("sqrt(x)", "(sqrt [x])", ImmutableList.of("x"));
- validateParser("if(cond,then,else)", "(if [cond, then, else])", ImmutableList.of("cond", "else", "then"));
+ validateParser("if(cond,then,else)", "(if [cond, then, else])", ImmutableList.of("then", "cond", "else"));
validateParser("cast(x, 'STRING')", "(cast [x, STRING])", ImmutableList.of("x"));
validateParser("cast(x, 'LONG')", "(cast [x, LONG])", ImmutableList.of("x"));
validateParser("cast(x, 'DOUBLE')", "(cast [x, DOUBLE])", ImmutableList.of("x"));
diff --git a/processing/src/test/java/org/apache/druid/query/ChainedExecutionQueryRunnerTest.java b/processing/src/test/java/org/apache/druid/query/ChainedExecutionQueryRunnerTest.java
index 8c2cc8573c7..f7401a0dd99 100644
--- a/processing/src/test/java/org/apache/druid/query/ChainedExecutionQueryRunnerTest.java
+++ b/processing/src/test/java/org/apache/druid/query/ChainedExecutionQueryRunnerTest.java
@@ -56,6 +56,7 @@ import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
+@SuppressWarnings("DoNotMock")
public class ChainedExecutionQueryRunnerTest
{
private final Lock neverRelease = new ReentrantLock();
diff --git a/processing/src/test/java/org/apache/druid/query/MetricsEmittingQueryProcessingPoolTest.java b/processing/src/test/java/org/apache/druid/query/MetricsEmittingQueryProcessingPoolTest.java
index 71b3a1c9e0f..aa5f5473177 100644
--- a/processing/src/test/java/org/apache/druid/query/MetricsEmittingQueryProcessingPoolTest.java
+++ b/processing/src/test/java/org/apache/druid/query/MetricsEmittingQueryProcessingPoolTest.java
@@ -31,6 +31,7 @@ import org.mockito.Mockito;
import java.util.ArrayList;
import java.util.List;
+@SuppressWarnings("DoNotMock")
public class MetricsEmittingQueryProcessingPoolTest
{
@Test
diff --git a/server/src/main/java/org/apache/druid/client/HttpServerInventoryView.java b/server/src/main/java/org/apache/druid/client/HttpServerInventoryView.java
index 7b67e8802b3..378706809b9 100644
--- a/server/src/main/java/org/apache/druid/client/HttpServerInventoryView.java
+++ b/server/src/main/java/org/apache/druid/client/HttpServerInventoryView.java
@@ -540,7 +540,7 @@ public class HttpServerInventoryView implements ServerInventoryView, FilteredSer
smileMapper,
httpClient,
inventorySyncExecutor,
- new URL(druidServer.getScheme(), hostAndPort.getHostText(), hostAndPort.getPort(), "/"),
+ new URL(druidServer.getScheme(), hostAndPort.getHost(), hostAndPort.getPort(), "/"),
"/druid-internal/v1/segments",
SEGMENT_LIST_RESP_TYPE_REF,
config.getServerTimeout(),
diff --git a/server/src/main/java/org/apache/druid/client/indexing/QueryStatus.java b/server/src/main/java/org/apache/druid/client/indexing/QueryStatus.java
index b9348731bda..182fed97ec8 100644
--- a/server/src/main/java/org/apache/druid/client/indexing/QueryStatus.java
+++ b/server/src/main/java/org/apache/druid/client/indexing/QueryStatus.java
@@ -22,7 +22,7 @@ package org.apache.druid.client.indexing;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
-import com.google.common.base.Objects;
+import com.google.common.base.MoreObjects;
import com.google.common.base.Preconditions;
/**
@@ -80,10 +80,10 @@ public class QueryStatus
@Override
public String toString()
{
- return Objects.toStringHelper(this)
- .add("id", id)
- .add("status", status)
- .add("duration", duration)
- .toString();
+ return MoreObjects.toStringHelper(this)
+ .add("id", id)
+ .add("status", status)
+ .add("duration", duration)
+ .toString();
}
}
diff --git a/server/src/main/java/org/apache/druid/metadata/SQLMetadataRuleManager.java b/server/src/main/java/org/apache/druid/metadata/SQLMetadataRuleManager.java
index b7cdb2f7ec9..d0f9799e6aa 100644
--- a/server/src/main/java/org/apache/druid/metadata/SQLMetadataRuleManager.java
+++ b/server/src/main/java/org/apache/druid/metadata/SQLMetadataRuleManager.java
@@ -236,7 +236,7 @@ public class SQLMetadataRuleManager implements MetadataRuleManager
{
try {
- ImmutableMap> newRules = ImmutableMap.copyOf(
+ Map> newRulesMap =
dbi.withHandle(
handle -> handle.createQuery(
// Return latest version rule by dataSource
@@ -272,9 +272,10 @@ public class SQLMetadataRuleManager implements MetadataRuleManager
}
}
)
- )
);
+ ImmutableMap> newRules = ImmutableMap.copyOf(newRulesMap);
+
final int newRuleCount = newRules.values().stream().mapToInt(List::size).sum();
log.info("Polled and found [%d] rule(s) for [%d] datasource(s).", newRuleCount, newRules.size());
diff --git a/server/src/main/java/org/apache/druid/rpc/indexing/SpecificTaskServiceLocator.java b/server/src/main/java/org/apache/druid/rpc/indexing/SpecificTaskServiceLocator.java
index 88f02e43090..0276e768ba9 100644
--- a/server/src/main/java/org/apache/druid/rpc/indexing/SpecificTaskServiceLocator.java
+++ b/server/src/main/java/org/apache/druid/rpc/indexing/SpecificTaskServiceLocator.java
@@ -22,6 +22,7 @@ package org.apache.druid.rpc.indexing;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
import com.google.errorprone.annotations.concurrent.GuardedBy;
import org.apache.druid.client.indexing.TaskStatusResponse;
@@ -165,7 +166,8 @@ public class SpecificTaskServiceLocator implements ServiceLocator
}
}
}
- }
+ },
+ MoreExecutors.directExecutor()
);
return Futures.nonCancellationPropagating(retVal);
diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorImpl.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorImpl.java
index 1003467c6e1..2268c00e2eb 100644
--- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorImpl.java
+++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorImpl.java
@@ -111,6 +111,7 @@ import java.util.stream.Collectors;
* with isLegacy
constructor argument set to false is the default. When {@link BatchAppenderator}
* proves stable then the plan is to remove this class
*/
+@SuppressWarnings("CheckReturnValue")
public class AppenderatorImpl implements Appenderator
{
// Rough estimate of memory footprint of a ColumnHolder based on actual heap dumps
@@ -430,7 +431,8 @@ public class AppenderatorImpl implements Appenderator
{
persistError = t;
}
- }
+ },
+ MoreExecutors.directExecutor()
);
} else {
isPersistRequired = true;
diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorPlumber.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorPlumber.java
index bea2890fcf0..56af6382afa 100644
--- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorPlumber.java
+++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorPlumber.java
@@ -28,6 +28,7 @@ import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.MoreExecutors;
import org.apache.druid.common.guava.ThreadRenamingCallable;
import org.apache.druid.data.input.Committer;
import org.apache.druid.data.input.InputRow;
@@ -330,7 +331,8 @@ public class AppenderatorPlumber implements Plumber
// TODO: Retry?
log.warn(e, "Failed to drop segment: %s", identifier);
}
- }
+ },
+ MoreExecutors.directExecutor()
);
}
@@ -483,7 +485,8 @@ public class AppenderatorPlumber implements Plumber
log.warn(e, "Failed to push [%,d] segments.", segmentsToPush.size());
errorHandler.apply(e);
}
- }
+ },
+ MoreExecutors.directExecutor()
);
}
}
diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BaseAppenderatorDriver.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BaseAppenderatorDriver.java
index c050d9c5e49..9c212336a17 100644
--- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BaseAppenderatorDriver.java
+++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BaseAppenderatorDriver.java
@@ -545,7 +545,8 @@ public abstract class BaseAppenderatorDriver implements Closeable
segmentsAndCommitMetadata.getSegments(),
metadata == null ? null : ((AppenderatorDriverMetadata) metadata).getCallerMetadata()
);
- }
+ },
+ MoreExecutors.directExecutor()
);
}
diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BatchAppenderator.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BatchAppenderator.java
index 66cc9b87c58..4b1d384d0f7 100644
--- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BatchAppenderator.java
+++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BatchAppenderator.java
@@ -398,7 +398,8 @@ public class BatchAppenderator implements Appenderator
{
persistError = t;
}
- }
+ },
+ MoreExecutors.directExecutor()
);
}
return new AppenderatorAddResult(identifier, sinksMetadata.get(identifier).numRowsInSegment, false);
diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BatchAppenderatorDriver.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BatchAppenderatorDriver.java
index cf2efef2f88..ace4cd47bc6 100644
--- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BatchAppenderatorDriver.java
+++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BatchAppenderatorDriver.java
@@ -25,6 +25,7 @@ import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.AsyncFunction;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.MoreExecutors;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.segment.loading.DataSegmentKiller;
@@ -140,9 +141,10 @@ public class BatchAppenderatorDriver extends BaseAppenderatorDriver
{
final Set requestedSegmentIdsForSequences = getAppendingSegments(sequenceNames);
- final ListenableFuture future = Futures.transform(
+ final ListenableFuture future = Futures.transformAsync(
pushInBackground(null, requestedSegmentIdsForSequences, false),
- (AsyncFunction) this::dropInBackground
+ (AsyncFunction) this::dropInBackground,
+ MoreExecutors.directExecutor()
);
final SegmentsAndCommitMetadata segmentsAndCommitMetadata =
diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderator.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderator.java
index 70a60a77d85..e95852bfddb 100644
--- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderator.java
+++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderator.java
@@ -101,6 +101,7 @@ import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
+@SuppressWarnings("CheckReturnValue")
public class StreamAppenderator implements Appenderator
{
// Rough estimate of memory footprint of a ColumnHolder based on actual heap dumps
@@ -397,7 +398,8 @@ public class StreamAppenderator implements Appenderator
{
persistError = t;
}
- }
+ },
+ MoreExecutors.directExecutor()
);
} else {
isPersistRequired = true;
diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriver.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriver.java
index f8226631967..a645f81971e 100644
--- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriver.java
+++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriver.java
@@ -27,6 +27,7 @@ import com.google.common.util.concurrent.AsyncFunction;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
import org.apache.druid.data.input.Committer;
import org.apache.druid.data.input.InputRow;
@@ -277,7 +278,7 @@ public class StreamAppenderatorDriver extends BaseAppenderatorDriver
{
final List theSegments = getSegmentIdsWithShardSpecs(sequenceNames);
- final ListenableFuture publishFuture = Futures.transform(
+ final ListenableFuture publishFuture = Futures.transformAsync(
// useUniquePath=true prevents inconsistencies in segment data when task failures or replicas leads to a second
// version of a segment with the same identifier containing different data; see DataSegmentPusher.push() docs
pushInBackground(wrapCommitter(committer), theSegments, true),
@@ -287,7 +288,8 @@ public class StreamAppenderatorDriver extends BaseAppenderatorDriver
sam,
publisher,
java.util.function.Function.identity()
- )
+ ),
+ MoreExecutors.directExecutor()
);
return Futures.transform(
publishFuture,
@@ -296,7 +298,8 @@ public class StreamAppenderatorDriver extends BaseAppenderatorDriver
sequenceNames.forEach(segments::remove);
}
return sam;
- }
+ },
+ MoreExecutors.directExecutor()
);
}
@@ -383,7 +386,8 @@ public class StreamAppenderatorDriver extends BaseAppenderatorDriver
numRemainingHandoffSegments.decrementAndGet();
resultFuture.setException(e);
}
- }
+ },
+ MoreExecutors.directExecutor()
);
}
);
@@ -399,9 +403,10 @@ public class StreamAppenderatorDriver extends BaseAppenderatorDriver
final Collection sequenceNames
)
{
- return Futures.transform(
+ return Futures.transformAsync(
publish(publisher, committer, sequenceNames),
- (AsyncFunction) this::registerHandoff
+ (AsyncFunction) this::registerHandoff,
+ MoreExecutors.directExecutor()
);
}
diff --git a/server/src/main/java/org/apache/druid/server/DruidNode.java b/server/src/main/java/org/apache/druid/server/DruidNode.java
index 1a360393649..1c077d0cd73 100644
--- a/server/src/main/java/org/apache/druid/server/DruidNode.java
+++ b/server/src/main/java/org/apache/druid/server/DruidNode.java
@@ -156,7 +156,7 @@ public class DruidNode
Integer portFromHostConfig;
if (host != null) {
hostAndPort = HostAndPort.fromString(host);
- host = hostAndPort.getHostText();
+ host = hostAndPort.getHost();
portFromHostConfig = hostAndPort.hasPort() ? hostAndPort.getPort() : null;
if (plainTextPort != null && portFromHostConfig != null && !plainTextPort.equals(portFromHostConfig)) {
throw new IAE("Conflicting host:port [%s] and port [%d] settings", host, plainTextPort);
diff --git a/server/src/main/java/org/apache/druid/server/http/HostAndPortWithScheme.java b/server/src/main/java/org/apache/druid/server/http/HostAndPortWithScheme.java
index 0c584aab8d0..297e86f3898 100644
--- a/server/src/main/java/org/apache/druid/server/http/HostAndPortWithScheme.java
+++ b/server/src/main/java/org/apache/druid/server/http/HostAndPortWithScheme.java
@@ -74,7 +74,7 @@ public class HostAndPortWithScheme
public String getHostText()
{
- return hostAndPort.getHostText();
+ return hostAndPort.getHost();
}
public int getPort()
diff --git a/server/src/main/java/org/apache/druid/server/http/SegmentListerResource.java b/server/src/main/java/org/apache/druid/server/http/SegmentListerResource.java
index a0281b27ff2..1b6f8326773 100644
--- a/server/src/main/java/org/apache/druid/server/http/SegmentListerResource.java
+++ b/server/src/main/java/org/apache/druid/server/http/SegmentListerResource.java
@@ -24,6 +24,7 @@ import com.fasterxml.jackson.jaxrs.smile.SmileMediaTypes;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.MoreExecutors;
import com.google.inject.Inject;
import com.sun.jersey.spi.container.ResourceFilters;
import org.apache.druid.client.HttpServerInventoryView;
@@ -205,7 +206,8 @@ public class SegmentListerResource
log.debug(ex, "Request timed out or closed already.");
}
}
- }
+ },
+ MoreExecutors.directExecutor()
);
asyncContext.setTimeout(timeout);
@@ -317,7 +319,8 @@ public class SegmentListerResource
log.debug(ex, "Request timed out or closed already.");
}
}
- }
+ },
+ MoreExecutors.directExecutor()
);
asyncContext.setTimeout(timeout);
diff --git a/server/src/main/java/org/apache/druid/server/lookup/cache/LookupCoordinatorManager.java b/server/src/main/java/org/apache/druid/server/lookup/cache/LookupCoordinatorManager.java
index 7526ecbcc10..2662de6b981 100644
--- a/server/src/main/java/org/apache/druid/server/lookup/cache/LookupCoordinatorManager.java
+++ b/server/src/main/java/org/apache/druid/server/lookup/cache/LookupCoordinatorManager.java
@@ -435,7 +435,8 @@ public class LookupCoordinatorManager
LOG.makeAlert(t, "Background lookup manager exited with error!").emit();
}
}
- }
+ },
+ MoreExecutors.directExecutor()
);
LOG.debug("Started");
diff --git a/server/src/test/java/org/apache/druid/client/CachingClusteredClientTest.java b/server/src/test/java/org/apache/druid/client/CachingClusteredClientTest.java
index 0ebd441360e..4436a5c5899 100644
--- a/server/src/test/java/org/apache/druid/client/CachingClusteredClientTest.java
+++ b/server/src/test/java/org/apache/druid/client/CachingClusteredClientTest.java
@@ -376,7 +376,8 @@ public class CachingClusteredClientTest
{
pair.lhs.setException(t);
}
- }
+ },
+ MoreExecutors.directExecutor()
);
}
}
diff --git a/server/src/test/java/org/apache/druid/client/JsonParserIteratorTest.java b/server/src/test/java/org/apache/druid/client/JsonParserIteratorTest.java
index f896067646c..3ac2b4122d2 100644
--- a/server/src/test/java/org/apache/druid/client/JsonParserIteratorTest.java
+++ b/server/src/test/java/org/apache/druid/client/JsonParserIteratorTest.java
@@ -103,7 +103,7 @@ public class JsonParserIteratorTest
OBJECT_MAPPER
);
expectedException.expect(QueryInterruptedException.class);
- expectedException.expectMessage("Immediate cancelled future.");
+ expectedException.expectMessage("Task was cancelled.");
iterator.hasNext();
}
diff --git a/server/src/test/java/org/apache/druid/discovery/DruidLeaderClientTest.java b/server/src/test/java/org/apache/druid/discovery/DruidLeaderClientTest.java
index cff1aeb9fb0..f0f91469dc8 100644
--- a/server/src/test/java/org/apache/druid/discovery/DruidLeaderClientTest.java
+++ b/server/src/test/java/org/apache/druid/discovery/DruidLeaderClientTest.java
@@ -75,6 +75,7 @@ import java.nio.charset.StandardCharsets;
/**
*/
+@SuppressWarnings("DoNotMock")
public class DruidLeaderClientTest extends BaseJettyTest
{
@Rule
diff --git a/server/src/test/java/org/apache/druid/metadata/SqlSegmentsMetadataManagerTest.java b/server/src/test/java/org/apache/druid/metadata/SqlSegmentsMetadataManagerTest.java
index 4995e69d72a..9899df69504 100644
--- a/server/src/test/java/org/apache/druid/metadata/SqlSegmentsMetadataManagerTest.java
+++ b/server/src/test/java/org/apache/druid/metadata/SqlSegmentsMetadataManagerTest.java
@@ -283,7 +283,7 @@ public class SqlSegmentsMetadataManagerTest
Assert.assertTrue(sqlSegmentsMetadataManager.getLatestDatabasePoll() instanceof SqlSegmentsMetadataManager.PeriodicDatabasePoll);
dataSourcesSnapshot = sqlSegmentsMetadataManager.getDataSourcesSnapshot();
Assert.assertEquals(
- ImmutableList.of("wikipedia2", "wikipedia3", "wikipedia"),
+ ImmutableList.of("wikipedia3", "wikipedia", "wikipedia2"),
dataSourcesSnapshot.getDataSourcesWithAllUsedSegments()
.stream()
.map(ImmutableDruidDataSource::getName)
diff --git a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriverFailTest.java b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriverFailTest.java
index 1e2aa7d73e4..7e7ce334cc7 100644
--- a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriverFailTest.java
+++ b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriverFailTest.java
@@ -28,6 +28,7 @@ import com.google.common.collect.ImmutableSet;
import com.google.common.util.concurrent.AbstractFuture;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.MoreExecutors;
import org.apache.druid.data.input.Committer;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.MapBasedInputRow;
@@ -497,7 +498,8 @@ public class StreamAppenderatorDriverFailTest extends EasyMockSupport
.collect(Collectors.toList());
return Futures.transform(
persistAll(committer),
- (Function