Upgrade guava version to 31.1-jre (#14767)

Currently, Druid is using Guava 16.0.1 version. This upgrade to 31.1-jre fixes the following issues.

CVE-2018-10237 (Unbounded memory allocation in Google Guava 11.0 through 24.x before 24.1.1 allows remote attackers to conduct denial of service attacks against servers that depend on this library and deserialize attacker-provided data because the AtomicDoubleArray class (when serialized with Java serialization) and the CompoundOrdering class (when serialized with GWT serialization) perform eager allocation without appropriate checks on what a client has sent and whether the data size is reasonable). We don't use Java or GWT serializations. Despite being false positive they're causing red security scans on Druid distribution.
Latest version of google-client-api is incompatible with the existing Guava version. This PR unblocks Update google client apis to latest version #14414
This commit is contained in:
Tejaswini Bandlamudi 2023-08-22 12:09:53 +05:30 committed by GitHub
parent 18f7cb6926
commit d87056e708
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
67 changed files with 234 additions and 131 deletions

View File

@ -1,3 +0,0 @@
# Those signatures are only available in Guava 16:
com.google.common.util.concurrent.MoreExecutors#sameThreadExecutor() @ Use org.apache.druid.java.util.common.concurrent.Execs#directExecutor()
com.google.common.base.Objects#firstNonNull(java.lang.Object, java.lang.Object) @ Use org.apache.druid.common.guava.GuavaUtils#firstNonNull(java.lang.Object, java.lang.Object) instead (probably... the GuavaUtils method return object is nullable)

View File

@ -35,7 +35,7 @@
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<jclouds.version>2.0.0</jclouds.version>
<jclouds.version>2.5.0</jclouds.version>
</properties>
<dependencies>

View File

@ -32,7 +32,7 @@ import org.apache.druid.initialization.DruidModule;
import org.apache.druid.java.util.common.logger.Logger;
import org.jclouds.ContextBuilder;
import org.jclouds.logging.slf4j.config.SLF4JLoggingModule;
import org.jclouds.openstack.v2_0.config.InternalUrlModule;
import org.jclouds.openstack.keystone.catalog.config.InternalUrlModule;
import org.jclouds.osgi.ProviderRegistry;
import org.jclouds.rackspace.cloudfiles.uk.CloudFilesUKProviderMetadata;
import org.jclouds.rackspace.cloudfiles.us.CloudFilesUSProviderMetadata;

View File

@ -131,7 +131,6 @@
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>16.0.1</version>
<scope>provided</scope>
</dependency>
<dependency>

View File

@ -67,6 +67,7 @@ import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
@RunWith(EasyMockRunner.class)
@SuppressWarnings("DoNotMock")
public class KubernetesTaskRunnerTest extends EasyMockSupport
{
private static final String ID = "id";

View File

@ -1057,12 +1057,13 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase
final ListenableFuture<TaskStatus> normalReplicaFuture = runTask(normalReplica);
// Simulating one replica is slower than the other
final ListenableFuture<TaskStatus> staleReplicaFuture = Futures.transform(
final ListenableFuture<TaskStatus> staleReplicaFuture = Futures.transformAsync(
taskExec.submit(() -> {
Thread.sleep(1000);
return staleReplica;
}),
(AsyncFunction<Task, TaskStatus>) this::runTask
(AsyncFunction<Task, TaskStatus>) this::runTask,
MoreExecutors.directExecutor()
);
while (normalReplica.getRunner().getStatus() != Status.PAUSED) {

View File

@ -2004,12 +2004,13 @@ public class KinesisIndexTaskTest extends SeekableStreamIndexTaskTestBase
((TestableKinesisIndexTask) staleReplica).setLocalSupplier(recordSupplier2);
final ListenableFuture<TaskStatus> normalReplicaFuture = runTask(normalReplica);
// Simulating one replica is slower than the other
final ListenableFuture<TaskStatus> staleReplicaFuture = Futures.transform(
final ListenableFuture<TaskStatus> staleReplicaFuture = Futures.transformAsync(
taskExec.submit(() -> {
Thread.sleep(1000);
return staleReplica;
}),
(AsyncFunction<Task, TaskStatus>) this::runTask
(AsyncFunction<Task, TaskStatus>) this::runTask,
MoreExecutors.directExecutor()
);
waitUntil(normalReplica, this::isTaskPaused);

View File

@ -1199,7 +1199,7 @@ public class ControllerImpl implements Controller
settableFuture.setException(t);
}
}
});
}, MoreExecutors.directExecutor());
taskFutures.add(settableFuture);
}

View File

@ -23,6 +23,7 @@ import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
import org.apache.druid.frame.channel.ReadableByteChunksFrameChannel;
import org.apache.druid.frame.key.ClusterByPartitions;
@ -158,7 +159,8 @@ public class ExceptionWrappingWorkerClient implements WorkerClient
{
retVal.setException(new MSQException(t, new WorkerRpcFailedFault(workerTaskId)));
}
}
},
MoreExecutors.directExecutor()
);
return retVal;

View File

@ -32,6 +32,7 @@ import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
import it.unimi.dsi.fastutil.bytes.ByteArrays;
import org.apache.druid.common.guava.FutureUtils;
@ -1320,7 +1321,8 @@ public class WorkerImpl implements Worker
kernelHolder.getStageKernelMap().get(stageDef.getId()).fail(t)
);
}
}
},
MoreExecutors.directExecutor()
);
}
@ -1612,7 +1614,7 @@ public class WorkerImpl implements Worker
};
// Chain futures so we only sort one partition at a time.
nextFuture = Futures.transform(
nextFuture = Futures.transformAsync(
nextFuture,
(AsyncFunction<OutputChannel, OutputChannel>) ignored -> {
final SuperSorter sorter = new SuperSorter(
@ -1639,7 +1641,8 @@ public class WorkerImpl implements Worker
);
return FutureUtils.transform(sorter.run(), r -> Iterables.getOnlyElement(r.getAllChannels()));
}
},
MoreExecutors.directExecutor()
);
sortedChannelFutures.add(nextFuture);
@ -1665,7 +1668,7 @@ public class WorkerImpl implements Worker
throw new ISE("Not initialized");
}
return Futures.transform(
return Futures.transformAsync(
pipelineFuture,
(AsyncFunction<ResultAndChannels<?>, OutputChannels>) resultAndChannels ->
Futures.transform(
@ -1673,8 +1676,10 @@ public class WorkerImpl implements Worker
(Function<Object, OutputChannels>) input -> {
sanityCheckOutputChannels(resultAndChannels.getOutputChannels());
return resultAndChannels.getOutputChannels();
}
)
},
MoreExecutors.directExecutor()
),
MoreExecutors.directExecutor()
);
}
@ -1742,7 +1747,8 @@ public class WorkerImpl implements Worker
}
);
}
}
},
MoreExecutors.directExecutor()
);
return new ResultAndChannels<>(
@ -1772,7 +1778,7 @@ public class WorkerImpl implements Worker
}
pipelineFuture = FutureUtils.transform(
Futures.transform(
Futures.transformAsync(
pipelineFuture,
new AsyncFunction<ResultAndChannels<?>, ResultAndChannels<?>>()
{
@ -1781,7 +1787,8 @@ public class WorkerImpl implements Worker
{
return fn.apply(t);
}
}
},
MoreExecutors.directExecutor()
),
resultAndChannels -> new ResultAndChannels<>(
resultAndChannels.getResultFuture(),

View File

@ -23,6 +23,7 @@ import com.google.common.collect.ImmutableSet;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
import org.apache.druid.common.guava.FutureUtils;
import org.apache.druid.java.util.common.ISE;
@ -210,7 +211,7 @@ public class WorkerSketchFetcher implements AutoCloseable
}
}
});
}, MoreExecutors.directExecutor());
FutureUtils.getUnchecked(kernelActionFuture, true);
}

View File

@ -24,6 +24,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
import com.google.errorprone.annotations.concurrent.GuardedBy;
import org.apache.druid.common.guava.FutureUtils;
@ -268,7 +269,8 @@ public class IndexerWorkerClient implements WorkerClient
{
retVal.setException(t);
}
}
},
MoreExecutors.directExecutor()
);
return retVal;

View File

@ -23,6 +23,7 @@ import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import org.apache.druid.frame.channel.ReadableByteChunksFrameChannel;
import org.apache.druid.frame.channel.ReadableFrameChannel;
import org.apache.druid.java.util.common.StringUtils;
@ -90,7 +91,8 @@ public class WorkerInputChannelFactory implements InputChannelFactory
{
channel.setError(t);
}
}
},
MoreExecutors.directExecutor()
);
}

View File

@ -22,6 +22,7 @@ package org.apache.druid.msq.util;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import org.apache.druid.common.guava.FutureUtils;
import javax.annotation.Nullable;
@ -61,7 +62,8 @@ public class MSQFutureUtils
inputFuture.cancel(true);
}
}
}
},
MoreExecutors.directExecutor()
);
}

View File

@ -156,7 +156,7 @@ public class MSQTestControllerContext implements ControllerContext
log.error(t, "error running worker task %s", task.getId());
statusMap.put(task.getId(), TaskStatus.failure(task.getId(), t.getMessage()));
}
});
}, MoreExecutors.directExecutor());
return task.getId();
}

View File

@ -370,7 +370,6 @@ public class ProtobufInputFormatTest
Assert.assertEquals(
ImmutableList.of(
"someOtherId",
"bar",
"someIntColumn",
"isValid",
"foo",
@ -378,6 +377,7 @@ public class ProtobufInputFormatTest
"someLongColumn",
"someFloatColumn",
"eventType",
"bar",
"id",
"someBytesColumn"
),

View File

@ -30,6 +30,7 @@ import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.AsyncFunction;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import org.apache.druid.data.input.Committer;
import org.apache.druid.data.input.Firehose;
import org.apache.druid.data.input.FirehoseFactory;
@ -692,9 +693,10 @@ public class AppenderatorDriverRealtimeIndexTask extends AbstractTask implements
committerSupplier.get(),
Collections.singletonList(sequenceName)
);
pendingHandoffs.add(Futures.transform(
pendingHandoffs.add(Futures.transformAsync(
publishFuture,
(AsyncFunction<SegmentsAndCommitMetadata, SegmentsAndCommitMetadata>) driver::registerHandoff
(AsyncFunction<SegmentsAndCommitMetadata, SegmentsAndCommitMetadata>) driver::registerHandoff,
MoreExecutors.directExecutor()
));
}

View File

@ -24,6 +24,7 @@ import com.google.common.collect.Maps;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import org.apache.druid.indexer.TaskState;
import org.apache.druid.indexer.TaskStatusPlus;
import org.apache.druid.indexing.common.TaskToolbox;
@ -262,7 +263,8 @@ public abstract class ParallelIndexPhaseRunner<SubTaskType extends Task, SubTask
LOG.error(t, "Error while running a task for spec[%s]", spec.getId());
taskCompleteEvents.offer(SubTaskCompleteEvent.fail(spec, t));
}
}
},
MoreExecutors.directExecutor()
);
}

View File

@ -54,7 +54,7 @@ public class QuotableWhiteSpaceSplitter implements Iterable<String>
if (inQuotes) {
return false;
}
return CharMatcher.BREAKING_WHITESPACE.matches(c);
return CharMatcher.breakingWhitespace().matches(c);
}
}
).omitEmptyStrings().split(string).iterator();

View File

@ -266,7 +266,8 @@ public class RemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
waitingForMonitor.notifyAll();
}
}
}
},
MoreExecutors.directExecutor()
);
break;
case CHILD_UPDATED:
@ -1308,7 +1309,8 @@ public class RemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
{
removedWorkerCleanups.remove(worker, cleanupTask);
}
}
},
MoreExecutors.directExecutor()
);
}

View File

@ -20,6 +20,7 @@
package org.apache.druid.indexing.overlord;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.MoreObjects;
import com.google.common.base.Objects;
import com.google.common.base.Preconditions;
import com.google.common.collect.ComparisonChain;
@ -1560,10 +1561,10 @@ public class TaskLockbox
@Override
public String toString()
{
return Objects.toStringHelper(this)
.add("taskLock", taskLock)
.add("taskIds", taskIds)
.toString();
return MoreObjects.toStringHelper(this)
.add("taskLock", taskLock)
.add("taskIds", taskIds)
.toString();
}
}

View File

@ -757,7 +757,8 @@ public class HttpRemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
{
removedWorkerCleanups.remove(workerHostAndPort, cleanupTask);
}
}
},
MoreExecutors.directExecutor()
);
}

View File

@ -26,6 +26,7 @@ import com.google.common.base.Optional;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
import org.apache.druid.common.guava.FutureUtils;
import org.apache.druid.indexer.TaskLocation;
@ -407,7 +408,8 @@ public abstract class SeekableStreamIndexTaskClientAsyncImpl<PartitionIdType, Se
{
retVal.setException(t);
}
}
},
MoreExecutors.directExecutor()
),
sleepTime,
TimeUnit.MILLISECONDS
@ -584,7 +586,8 @@ public abstract class SeekableStreamIndexTaskClientAsyncImpl<PartitionIdType, Se
retVal.set(either.valueOrThrow());
}
}
}
},
MoreExecutors.directExecutor()
);
return retVal;

View File

@ -35,6 +35,7 @@ import com.google.common.collect.Sets;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
import org.apache.druid.data.input.Committer;
import org.apache.druid.data.input.InputFormat;
@ -137,6 +138,7 @@ import java.util.stream.Collectors;
* @param <PartitionIdType> Partition Number Type
* @param <SequenceOffsetType> Sequence Number Type
*/
@SuppressWarnings("CheckReturnValue")
public abstract class SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOffsetType, RecordType extends ByteEntity> implements ChatHandler
{
public enum Status
@ -697,7 +699,8 @@ public abstract class SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
log.error("Persist failed, dying");
backgroundThreadException = t;
}
}
},
MoreExecutors.directExecutor()
);
}
@ -970,7 +973,8 @@ public abstract class SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
} else {
return publishedSegmentsAndMetadata;
}
}
},
MoreExecutors.directExecutor()
);
publishWaitList.add(publishFuture);
@ -1026,7 +1030,8 @@ public abstract class SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
handoffFuture.set(handoffSegmentsAndCommitMetadata);
return null;
}
}
},
MoreExecutors.directExecutor()
);
// emit segment count metric:
int segmentCount = 0;
@ -1047,7 +1052,8 @@ public abstract class SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
log.error(t, "Error while publishing segments for sequenceNumber[%s]", sequenceMetadata);
handoffFuture.setException(t);
}
}
},
MoreExecutors.directExecutor()
);
}

View File

@ -1356,7 +1356,8 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
groupId,
taskId,
currentStats
)
),
MoreExecutors.directExecutor()
)
);
groupAndTaskIds.add(new Pair<>(groupId, taskId));
@ -1374,7 +1375,8 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
groupId,
taskId,
currentStats
)
),
MoreExecutors.directExecutor()
)
);
groupAndTaskIds.add(new Pair<>(groupId, taskId));
@ -1428,7 +1430,8 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
groupId,
taskId,
taskErrors
)
),
MoreExecutors.directExecutor()
)
);
groupAndTaskIds.add(new Pair<>(groupId, taskId));
@ -1446,7 +1449,8 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
groupId,
taskId,
taskErrors
)
),
MoreExecutors.directExecutor()
)
);
groupAndTaskIds.add(new Pair<>(groupId, taskId));
@ -2488,7 +2492,8 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
}
return null;
}
}
},
MoreExecutors.directExecutor()
);
}
@ -3207,7 +3212,8 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
{
return null;
}
}
},
MoreExecutors.directExecutor()
);
}
@ -3944,7 +3950,8 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
}
return null;
}
},
MoreExecutors.directExecutor()
)
).collect(Collectors.toList());

View File

@ -27,6 +27,7 @@ import com.google.common.collect.Lists;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
import com.google.inject.Inject;
import org.apache.druid.common.guava.FutureUtils;
@ -260,7 +261,8 @@ public class WorkerTaskManager
)
);
}
}
},
MoreExecutors.directExecutor()
);
}

View File

@ -24,6 +24,7 @@ import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.inject.Inject;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.actions.TaskActionClientFactory;
@ -198,7 +199,8 @@ public class ExecutorLifecycle
throw new RuntimeException(e);
}
}
}
},
MoreExecutors.directExecutor()
);
}

View File

@ -24,6 +24,7 @@ import com.fasterxml.jackson.jaxrs.smile.SmileMediaTypes;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.inject.Inject;
import com.sun.jersey.spi.container.ResourceFilters;
import org.apache.druid.guice.annotations.Json;
@ -191,7 +192,8 @@ public class TaskManagementResource
log.debug(ex, "Request timed out or closed already.");
}
}
}
},
MoreExecutors.directExecutor()
);
asyncContext.setTimeout(timeout);

View File

@ -67,6 +67,7 @@ import org.mockito.Mockito;
import java.io.IOException;
@SuppressWarnings("DoNotMock")
public class TaskToolboxTest
{

View File

@ -326,7 +326,7 @@ public class AppenderatorDriverRealtimeIndexTaskTest extends InitializedNullHand
// handoff would timeout, resulting in exception
TaskStatus status = statusFuture.get();
Assert.assertTrue(status.getErrorMsg()
.contains("java.util.concurrent.TimeoutException: Timeout waiting for task."));
.contains("java.util.concurrent.TimeoutException: Waited 100 milliseconds"));
}
@Test(timeout = 60_000L)

View File

@ -478,7 +478,8 @@ public class AbstractParallelIndexSupervisorTaskTest extends IngestionTestBase
(Function<TaskStatus, TaskStatus>) status -> {
shutdownTask(task);
return status;
}
},
MoreExecutors.directExecutor()
);
return cleanupFuture;
}

View File

@ -232,7 +232,8 @@ public class TestTaskRunner implements TaskRunner, QuerySegmentWalker
{
runningItems.remove(taskRunnerWorkItem);
}
}
},
MoreExecutors.directExecutor()
);
return statusFuture;

View File

@ -364,12 +364,32 @@ name: Guava
license_category: binary
module: java-core
license_name: Apache License version 2.0
version: 16.0.1
version: 31.1-jre
libraries:
- com.google.guava: guava
---
name: Failureaccess
license_category: binary
module: java-core
license_name: Apache License version 2.0
version: 1.0.1
libraries:
- com.google.guava: failureaccess
---
name: j2objc
license_category: binary
module: core
license_name: Apache License version 2.0
version: 1.3
libraries:
- com.google.j2objc: j2objc-annotations
---
name: Guice
license_category: binary
module: java-core

View File

@ -785,18 +785,4 @@
<packageUrl regex="true">^pkg:maven/.*/.*@.*$</packageUrl>
<cve>CVE-2021-4277</cve>
</suppress>
<suppress>
<notes><![CDATA[
file name: guava-16.0.1.jar
]]></notes>
<packageUrl regex="true">^pkg:maven/com\.google\.guava/guava@16.0.1$</packageUrl>
<!--
~ We don't either use AtomicDoubleArray (when serialized with Java serialization) or
~ CompoundOrdering (when serialized with GWT serialization) nor do we use Java or GWT serialization. https://nvd.nist.gov/vuln/detail/cve-2018-10237
-->
<cve>CVE-2018-10237</cve>
<cve>CVE-2020-8908</cve> <!-- We do not use com.google.common.io.Files.createTempDir() https://nvd.nist.gov/vuln/detail/CVE-2020-8908 -->
<cve>CVE-2023-2976</cve> <!-- We do not use com.google.common.io.FileBackedOutputStream https://nvd.nist.gov/vuln/detail/CVE-2023-2976 -->
</suppress>
</suppressions>

View File

@ -92,7 +92,7 @@
<dropwizard.metrics.version>4.2.19</dropwizard.metrics.version>
<errorprone.version>2.20.0</errorprone.version>
<fastutil.version>8.5.4</fastutil.version>
<guava.version>16.0.1</guava.version>
<guava.version>31.1-jre</guava.version>
<guice.version>4.1.0</guice.version>
<hamcrest.version>1.3</hamcrest.version>
<jetty.version>9.4.51.v20230217</jetty.version>
@ -490,6 +490,12 @@
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>${guava.version}</version>
<exclusions>
<exclusion>
<groupId>com.google.guava</groupId>
<artifactId>listenablefuture</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.google.inject</groupId>
@ -1434,7 +1440,6 @@
</bundledSignatures>
<signaturesFiles>
<signaturesFile>${project.parent.basedir}/codestyle/joda-time-forbidden-apis.txt</signaturesFile>
<signaturesFile>${project.parent.basedir}/codestyle/guava16-forbidden-apis.txt</signaturesFile>
<signaturesFile>${project.parent.basedir}/codestyle/druid-forbidden-apis.txt</signaturesFile>
</signaturesFiles>
<excludes>

View File

@ -23,6 +23,7 @@ import com.google.common.util.concurrent.AsyncFunction;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
import org.apache.druid.java.util.common.Either;
import org.apache.druid.java.util.common.ISE;
@ -107,11 +108,11 @@ public class FutureUtils
*/
public static <T, R> ListenableFuture<R> transform(final ListenableFuture<T> future, final Function<T, R> fn)
{
return Futures.transform(future, fn::apply);
return Futures.transform(future, fn::apply, MoreExecutors.directExecutor());
}
/**
* Like {@link Futures#transform(ListenableFuture, AsyncFunction)}, but works better with lambdas due to not having
* Like {@link Futures#transformAsync(ListenableFuture, AsyncFunction, java.util.concurrent.Executor)}, but works better with lambdas due to not having
* overloads.
*
* One can write {@code FutureUtils.transformAsync(future, v -> ...)} instead of
@ -119,7 +120,7 @@ public class FutureUtils
*/
public static <T, R> ListenableFuture<R> transformAsync(final ListenableFuture<T> future, final AsyncFunction<T, R> fn)
{
return Futures.transform(future, fn);
return Futures.transformAsync(future, fn, MoreExecutors.directExecutor());
}
/**
@ -200,7 +201,8 @@ public class FutureUtils
retVal.setException(e);
}
}
},
MoreExecutors.directExecutor()
);
return retVal;

View File

@ -28,6 +28,7 @@ import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
import com.google.errorprone.annotations.concurrent.GuardedBy;
import it.unimi.dsi.fastutil.ints.IntOpenHashSet;
@ -61,6 +62,7 @@ import java.util.stream.Collectors;
* If you want single threaded execution, use {@code Execs.singleThreaded()}. It is not a good idea to use this with a
* same-thread executor like {@code Execs.directExecutor()}, because it will lead to deep call stacks.
*/
@SuppressWarnings("CheckReturnValue")
public class FrameProcessorExecutor
{
private static final Logger log = new Logger(FrameProcessorExecutor.class);
@ -286,7 +288,8 @@ public class FrameProcessorExecutor
fail(t);
}
}
}
},
MoreExecutors.directExecutor()
);
}

View File

@ -23,6 +23,7 @@ import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
import com.google.errorprone.annotations.concurrent.GuardedBy;
import org.apache.druid.common.guava.FutureUtils;
@ -51,6 +52,7 @@ import java.util.function.BiFunction;
* The {@code bouncer} and {@code maxOutstandingProcessors} parameters are used to control how many processors are
* executed on the {@link FrameProcessorExecutor} concurrently.
*/
@SuppressWarnings("CheckReturnValue")
public class RunAllFullyWidget<T, ResultType>
{
private static final Logger log = new Logger(RunAllFullyWidget.class);
@ -296,7 +298,8 @@ public class RunAllFullyWidget<T, ResultType>
cleanupIfNoMoreProcessors();
}
}
}
},
MoreExecutors.directExecutor()
);
}
}

View File

@ -23,10 +23,11 @@ import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Objects;
import com.google.common.base.MoreObjects;
import com.google.common.base.Preconditions;
import javax.annotation.Nullable;
import java.util.Objects;
/**
* Represents the status of a task from the perspective of the coordinator. The task may be ongoing
@ -220,12 +221,12 @@ public class TaskStatus
@Override
public String toString()
{
return Objects.toStringHelper(this)
.add("id", id)
.add("status", status)
.add("duration", duration)
.add("errorMsg", errorMsg)
.toString();
return MoreObjects.toStringHelper(this)
.add("id", id)
.add("status", status)
.add("duration", duration)
.add("errorMsg", errorMsg)
.toString();
}
@Override
@ -239,14 +240,14 @@ public class TaskStatus
}
TaskStatus that = (TaskStatus) o;
return getDuration() == that.getDuration() &&
java.util.Objects.equals(getId(), that.getId()) &&
Objects.equals(getId(), that.getId()) &&
status == that.status &&
java.util.Objects.equals(getErrorMsg(), that.getErrorMsg());
Objects.equals(getErrorMsg(), that.getErrorMsg());
}
@Override
public int hashCode()
{
return java.util.Objects.hash(getId(), status, getDuration(), getErrorMsg());
return Objects.hash(getId(), status, getDuration(), getErrorMsg());
}
}

View File

@ -19,7 +19,7 @@
package org.apache.druid.query.aggregation;
import com.google.common.base.Objects;
import com.google.common.base.MoreObjects;
import com.google.common.base.Preconditions;
import org.apache.druid.segment.serde.cell.LongSerializer;
@ -103,9 +103,9 @@ public class SerializablePairLongStringColumnHeader
@Override
public String toString()
{
return Objects.toStringHelper(this)
.add("bytes", bytes)
.add("minValue", minValue)
.toString();
return MoreObjects.toStringHelper(this)
.add("bytes", bytes)
.add("minValue", minValue)
.toString();
}
}

View File

@ -30,6 +30,7 @@ import java.util.Comparator;
import java.util.NoSuchElementException;
import java.util.function.BinaryOperator;
@SuppressWarnings("DoNotMock")
public class CombiningIteratorTest
{
private CombiningIterator<String> testingIterator;

View File

@ -48,6 +48,7 @@ import java.util.concurrent.TimeUnit;
/**
*/
@SuppressWarnings("CheckReturnValue")
public class ImmutableRTreeTest
{
@Test

View File

@ -39,6 +39,7 @@ import java.util.TreeSet;
/**
*
*/
@SuppressWarnings("CheckReturnValue")
public class JodaUtilsTest
{
@Test

View File

@ -450,7 +450,7 @@ public class ParserTest extends InitializedNullHandlingTest
public void testFunctions()
{
validateParser("sqrt(x)", "(sqrt [x])", ImmutableList.of("x"));
validateParser("if(cond,then,else)", "(if [cond, then, else])", ImmutableList.of("cond", "else", "then"));
validateParser("if(cond,then,else)", "(if [cond, then, else])", ImmutableList.of("then", "cond", "else"));
validateParser("cast(x, 'STRING')", "(cast [x, STRING])", ImmutableList.of("x"));
validateParser("cast(x, 'LONG')", "(cast [x, LONG])", ImmutableList.of("x"));
validateParser("cast(x, 'DOUBLE')", "(cast [x, DOUBLE])", ImmutableList.of("x"));

View File

@ -56,6 +56,7 @@ import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
@SuppressWarnings("DoNotMock")
public class ChainedExecutionQueryRunnerTest
{
private final Lock neverRelease = new ReentrantLock();

View File

@ -31,6 +31,7 @@ import org.mockito.Mockito;
import java.util.ArrayList;
import java.util.List;
@SuppressWarnings("DoNotMock")
public class MetricsEmittingQueryProcessingPoolTest
{
@Test

View File

@ -540,7 +540,7 @@ public class HttpServerInventoryView implements ServerInventoryView, FilteredSer
smileMapper,
httpClient,
inventorySyncExecutor,
new URL(druidServer.getScheme(), hostAndPort.getHostText(), hostAndPort.getPort(), "/"),
new URL(druidServer.getScheme(), hostAndPort.getHost(), hostAndPort.getPort(), "/"),
"/druid-internal/v1/segments",
SEGMENT_LIST_RESP_TYPE_REF,
config.getServerTimeout(),

View File

@ -22,7 +22,7 @@ package org.apache.druid.client.indexing;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Objects;
import com.google.common.base.MoreObjects;
import com.google.common.base.Preconditions;
/**
@ -80,10 +80,10 @@ public class QueryStatus
@Override
public String toString()
{
return Objects.toStringHelper(this)
.add("id", id)
.add("status", status)
.add("duration", duration)
.toString();
return MoreObjects.toStringHelper(this)
.add("id", id)
.add("status", status)
.add("duration", duration)
.toString();
}
}

View File

@ -236,7 +236,7 @@ public class SQLMetadataRuleManager implements MetadataRuleManager
{
try {
ImmutableMap<String, List<Rule>> newRules = ImmutableMap.copyOf(
Map<String, List<Rule>> newRulesMap =
dbi.withHandle(
handle -> handle.createQuery(
// Return latest version rule by dataSource
@ -272,9 +272,10 @@ public class SQLMetadataRuleManager implements MetadataRuleManager
}
}
)
)
);
ImmutableMap<String, List<Rule>> newRules = ImmutableMap.copyOf(newRulesMap);
final int newRuleCount = newRules.values().stream().mapToInt(List::size).sum();
log.info("Polled and found [%d] rule(s) for [%d] datasource(s).", newRuleCount, newRules.size());

View File

@ -22,6 +22,7 @@ package org.apache.druid.rpc.indexing;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
import com.google.errorprone.annotations.concurrent.GuardedBy;
import org.apache.druid.client.indexing.TaskStatusResponse;
@ -165,7 +166,8 @@ public class SpecificTaskServiceLocator implements ServiceLocator
}
}
}
}
},
MoreExecutors.directExecutor()
);
return Futures.nonCancellationPropagating(retVal);

View File

@ -111,6 +111,7 @@ import java.util.stream.Collectors;
* with <code>isLegacy</code> constructor argument set to false is the default. When {@link BatchAppenderator}
* proves stable then the plan is to remove this class
*/
@SuppressWarnings("CheckReturnValue")
public class AppenderatorImpl implements Appenderator
{
// Rough estimate of memory footprint of a ColumnHolder based on actual heap dumps
@ -430,7 +431,8 @@ public class AppenderatorImpl implements Appenderator
{
persistError = t;
}
}
},
MoreExecutors.directExecutor()
);
} else {
isPersistRequired = true;

View File

@ -28,6 +28,7 @@ import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.MoreExecutors;
import org.apache.druid.common.guava.ThreadRenamingCallable;
import org.apache.druid.data.input.Committer;
import org.apache.druid.data.input.InputRow;
@ -330,7 +331,8 @@ public class AppenderatorPlumber implements Plumber
// TODO: Retry?
log.warn(e, "Failed to drop segment: %s", identifier);
}
}
},
MoreExecutors.directExecutor()
);
}
@ -483,7 +485,8 @@ public class AppenderatorPlumber implements Plumber
log.warn(e, "Failed to push [%,d] segments.", segmentsToPush.size());
errorHandler.apply(e);
}
}
},
MoreExecutors.directExecutor()
);
}
}

View File

@ -545,7 +545,8 @@ public abstract class BaseAppenderatorDriver implements Closeable
segmentsAndCommitMetadata.getSegments(),
metadata == null ? null : ((AppenderatorDriverMetadata) metadata).getCallerMetadata()
);
}
},
MoreExecutors.directExecutor()
);
}

View File

@ -398,7 +398,8 @@ public class BatchAppenderator implements Appenderator
{
persistError = t;
}
}
},
MoreExecutors.directExecutor()
);
}
return new AppenderatorAddResult(identifier, sinksMetadata.get(identifier).numRowsInSegment, false);

View File

@ -25,6 +25,7 @@ import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.AsyncFunction;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.segment.loading.DataSegmentKiller;
@ -140,9 +141,10 @@ public class BatchAppenderatorDriver extends BaseAppenderatorDriver
{
final Set<SegmentIdWithShardSpec> requestedSegmentIdsForSequences = getAppendingSegments(sequenceNames);
final ListenableFuture<SegmentsAndCommitMetadata> future = Futures.transform(
final ListenableFuture<SegmentsAndCommitMetadata> future = Futures.transformAsync(
pushInBackground(null, requestedSegmentIdsForSequences, false),
(AsyncFunction<SegmentsAndCommitMetadata, SegmentsAndCommitMetadata>) this::dropInBackground
(AsyncFunction<SegmentsAndCommitMetadata, SegmentsAndCommitMetadata>) this::dropInBackground,
MoreExecutors.directExecutor()
);
final SegmentsAndCommitMetadata segmentsAndCommitMetadata =

View File

@ -101,6 +101,7 @@ import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
@SuppressWarnings("CheckReturnValue")
public class StreamAppenderator implements Appenderator
{
// Rough estimate of memory footprint of a ColumnHolder based on actual heap dumps
@ -397,7 +398,8 @@ public class StreamAppenderator implements Appenderator
{
persistError = t;
}
}
},
MoreExecutors.directExecutor()
);
} else {
isPersistRequired = true;

View File

@ -27,6 +27,7 @@ import com.google.common.util.concurrent.AsyncFunction;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
import org.apache.druid.data.input.Committer;
import org.apache.druid.data.input.InputRow;
@ -277,7 +278,7 @@ public class StreamAppenderatorDriver extends BaseAppenderatorDriver
{
final List<SegmentIdWithShardSpec> theSegments = getSegmentIdsWithShardSpecs(sequenceNames);
final ListenableFuture<SegmentsAndCommitMetadata> publishFuture = Futures.transform(
final ListenableFuture<SegmentsAndCommitMetadata> publishFuture = Futures.transformAsync(
// useUniquePath=true prevents inconsistencies in segment data when task failures or replicas leads to a second
// version of a segment with the same identifier containing different data; see DataSegmentPusher.push() docs
pushInBackground(wrapCommitter(committer), theSegments, true),
@ -287,7 +288,8 @@ public class StreamAppenderatorDriver extends BaseAppenderatorDriver
sam,
publisher,
java.util.function.Function.identity()
)
),
MoreExecutors.directExecutor()
);
return Futures.transform(
publishFuture,
@ -296,7 +298,8 @@ public class StreamAppenderatorDriver extends BaseAppenderatorDriver
sequenceNames.forEach(segments::remove);
}
return sam;
}
},
MoreExecutors.directExecutor()
);
}
@ -383,7 +386,8 @@ public class StreamAppenderatorDriver extends BaseAppenderatorDriver
numRemainingHandoffSegments.decrementAndGet();
resultFuture.setException(e);
}
}
},
MoreExecutors.directExecutor()
);
}
);
@ -399,9 +403,10 @@ public class StreamAppenderatorDriver extends BaseAppenderatorDriver
final Collection<String> sequenceNames
)
{
return Futures.transform(
return Futures.transformAsync(
publish(publisher, committer, sequenceNames),
(AsyncFunction<SegmentsAndCommitMetadata, SegmentsAndCommitMetadata>) this::registerHandoff
(AsyncFunction<SegmentsAndCommitMetadata, SegmentsAndCommitMetadata>) this::registerHandoff,
MoreExecutors.directExecutor()
);
}

View File

@ -156,7 +156,7 @@ public class DruidNode
Integer portFromHostConfig;
if (host != null) {
hostAndPort = HostAndPort.fromString(host);
host = hostAndPort.getHostText();
host = hostAndPort.getHost();
portFromHostConfig = hostAndPort.hasPort() ? hostAndPort.getPort() : null;
if (plainTextPort != null && portFromHostConfig != null && !plainTextPort.equals(portFromHostConfig)) {
throw new IAE("Conflicting host:port [%s] and port [%d] settings", host, plainTextPort);

View File

@ -74,7 +74,7 @@ public class HostAndPortWithScheme
public String getHostText()
{
return hostAndPort.getHostText();
return hostAndPort.getHost();
}
public int getPort()

View File

@ -24,6 +24,7 @@ import com.fasterxml.jackson.jaxrs.smile.SmileMediaTypes;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.inject.Inject;
import com.sun.jersey.spi.container.ResourceFilters;
import org.apache.druid.client.HttpServerInventoryView;
@ -205,7 +206,8 @@ public class SegmentListerResource
log.debug(ex, "Request timed out or closed already.");
}
}
}
},
MoreExecutors.directExecutor()
);
asyncContext.setTimeout(timeout);
@ -317,7 +319,8 @@ public class SegmentListerResource
log.debug(ex, "Request timed out or closed already.");
}
}
}
},
MoreExecutors.directExecutor()
);
asyncContext.setTimeout(timeout);

View File

@ -435,7 +435,8 @@ public class LookupCoordinatorManager
LOG.makeAlert(t, "Background lookup manager exited with error!").emit();
}
}
}
},
MoreExecutors.directExecutor()
);
LOG.debug("Started");

View File

@ -376,7 +376,8 @@ public class CachingClusteredClientTest
{
pair.lhs.setException(t);
}
}
},
MoreExecutors.directExecutor()
);
}
}

View File

@ -103,7 +103,7 @@ public class JsonParserIteratorTest
OBJECT_MAPPER
);
expectedException.expect(QueryInterruptedException.class);
expectedException.expectMessage("Immediate cancelled future.");
expectedException.expectMessage("Task was cancelled.");
iterator.hasNext();
}

View File

@ -75,6 +75,7 @@ import java.nio.charset.StandardCharsets;
/**
*/
@SuppressWarnings("DoNotMock")
public class DruidLeaderClientTest extends BaseJettyTest
{
@Rule

View File

@ -283,7 +283,7 @@ public class SqlSegmentsMetadataManagerTest
Assert.assertTrue(sqlSegmentsMetadataManager.getLatestDatabasePoll() instanceof SqlSegmentsMetadataManager.PeriodicDatabasePoll);
dataSourcesSnapshot = sqlSegmentsMetadataManager.getDataSourcesSnapshot();
Assert.assertEquals(
ImmutableList.of("wikipedia2", "wikipedia3", "wikipedia"),
ImmutableList.of("wikipedia3", "wikipedia", "wikipedia2"),
dataSourcesSnapshot.getDataSourcesWithAllUsedSegments()
.stream()
.map(ImmutableDruidDataSource::getName)

View File

@ -28,6 +28,7 @@ import com.google.common.collect.ImmutableSet;
import com.google.common.util.concurrent.AbstractFuture;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import org.apache.druid.data.input.Committer;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.MapBasedInputRow;
@ -497,7 +498,8 @@ public class StreamAppenderatorDriverFailTest extends EasyMockSupport
.collect(Collectors.toList());
return Futures.transform(
persistAll(committer),
(Function<Object, SegmentsAndCommitMetadata>) commitMetadata -> new SegmentsAndCommitMetadata(segments, commitMetadata)
(Function<Object, SegmentsAndCommitMetadata>) commitMetadata -> new SegmentsAndCommitMetadata(segments, commitMetadata),
MoreExecutors.directExecutor()
);
} else {
if (interruptPush) {

View File

@ -23,6 +23,7 @@ import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import org.junit.Assert;
import org.junit.Test;
@ -148,7 +149,8 @@ public class ChangeRequestHistoryTest
{
callbackExcecuted.set(true);
}
}
},
MoreExecutors.directExecutor()
);
future.cancel(true);
@ -205,7 +207,8 @@ public class ChangeRequestHistoryTest
{
callbackExcecuted.set(true);
}
}
},
MoreExecutors.directExecutor()
);
history.stop();