From b7641f644cc19a80b33b3607ef6ef23d977236c6 Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Sun, 6 Dec 2020 22:35:11 -0800 Subject: [PATCH] Two fixes related to encoding of % symbols. (#10645) * Two fixes related to encoding of % symbols. 1) TaskResourceFilter: Don't double-decode task ids. request.getPathSegments() returns already-decoded strings. Applying StringUtils.urlDecode on top of that causes erroneous behavior with '%' characters. 2) Update various ThreadFactoryBuilder name formats to escape '%' characters. This fixes situations where substrings starting with '%' are erroneously treated as format specifiers. ITs are updated to include a '%' in extra.datasource.name.suffix. * Avoid String.replace. * Work around surefire bug. * Fix xml encoding. * Another try at the proper encoding. * Give up on the emojis. * Less ambitious testing. * Fix an additional problem. * Adjust encodeForFormat to return null if the input is null. --- .../druid/java/util/common/StringUtils.java | 15 +++++++ .../java/util/common/StringUtilsTest.java | 8 ++++ .../MaterializedViewSupervisor.java | 2 +- .../security/basic/CommonCacheNotifier.java | 4 +- .../lookup/KafkaLookupExtractorFactory.java | 2 +- .../druid/server/lookup/PollingLookup.java | 3 +- .../indexing/common/IndexTaskClient.java | 2 +- .../task/batch/parallel/TaskMonitor.java | 2 +- .../http/security/TaskResourceFilter.java | 13 +----- .../supervisor/SeekableStreamSupervisor.java | 42 +++++++++++++------ integration-tests/pom.xml | 4 +- .../discovery/CuratorDruidLeaderSelector.java | 7 +++- .../CuratorDruidNodeDiscoveryProvider.java | 4 +- .../metadata/SqlSegmentsMetadataManager.java | 2 +- .../appenderator/AppenderatorImpl.java | 9 ++-- .../appenderator/BaseAppenderatorDriver.java | 5 ++- .../druid/sql/avatica/DruidStatement.java | 10 +++-- 17 files changed, 93 insertions(+), 41 deletions(-) diff --git a/core/src/main/java/org/apache/druid/java/util/common/StringUtils.java b/core/src/main/java/org/apache/druid/java/util/common/StringUtils.java index bf9540015d3..97fb4339495 100644 --- a/core/src/main/java/org/apache/druid/java/util/common/StringUtils.java +++ b/core/src/main/java/org/apache/druid/java/util/common/StringUtils.java @@ -188,6 +188,21 @@ public class StringUtils } } + /** + * Encodes a string "s" for insertion into a format string. + * + * Returns null if the input is null. + */ + @Nullable + public static String encodeForFormat(@Nullable final String s) + { + if (s == null) { + return null; + } else { + return StringUtils.replaceChar(s, '%', "%%"); + } + } + public static String toLowerCase(String s) { return s.toLowerCase(Locale.ENGLISH); diff --git a/core/src/test/java/org/apache/druid/java/util/common/StringUtilsTest.java b/core/src/test/java/org/apache/druid/java/util/common/StringUtilsTest.java index bd5c0b2ba6c..4f80740f67f 100644 --- a/core/src/test/java/org/apache/druid/java/util/common/StringUtilsTest.java +++ b/core/src/test/java/org/apache/druid/java/util/common/StringUtilsTest.java @@ -173,6 +173,14 @@ public class StringUtilsTest Assert.assertEquals("", StringUtils.replace("aaaa", "aa", "")); } + @Test + public void testEncodeForFormat() + { + Assert.assertEquals("x %% a %%s", StringUtils.encodeForFormat("x % a %s")); + Assert.assertEquals("", StringUtils.encodeForFormat("")); + Assert.assertNull(StringUtils.encodeForFormat(null)); + } + @Test public void testURLEncodeSpace() { diff --git a/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisor.java b/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisor.java index dd3db5008a9..41647d525a5 100644 --- a/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisor.java +++ b/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisor.java @@ -137,7 +137,7 @@ public class MaterializedViewSupervisor implements Supervisor new DerivativeDataSourceMetadata(spec.getBaseDataSource(), spec.getDimensions(), spec.getMetrics()) ); } - exec = MoreExecutors.listeningDecorator(Execs.scheduledSingleThreaded(supervisorId)); + exec = MoreExecutors.listeningDecorator(Execs.scheduledSingleThreaded(StringUtils.encodeForFormat(supervisorId))); final Duration delay = config.getTaskCheckDuration().toStandardDuration(); future = exec.scheduleWithFixedDelay( MaterializedViewSupervisor.this::run, diff --git a/extensions-core/druid-basic-security/src/main/java/org/apache/druid/security/basic/CommonCacheNotifier.java b/extensions-core/druid-basic-security/src/main/java/org/apache/druid/security/basic/CommonCacheNotifier.java index 608beb1ea20..17b046eba5f 100644 --- a/extensions-core/druid-basic-security/src/main/java/org/apache/druid/security/basic/CommonCacheNotifier.java +++ b/extensions-core/druid-basic-security/src/main/java/org/apache/druid/security/basic/CommonCacheNotifier.java @@ -88,7 +88,9 @@ public class CommonCacheNotifier String callerName ) { - this.exec = Execs.singleThreaded(StringUtils.format("%s-notifierThread-", callerName) + "%d"); + this.exec = Execs.singleThreaded( + StringUtils.format("%s-notifierThread-", StringUtils.encodeForFormat(callerName)) + "%d" + ); this.callerName = callerName; this.updateQueue = new LinkedBlockingQueue<>(); this.itemConfigMap = itemConfigMap; diff --git a/extensions-core/kafka-extraction-namespace/src/main/java/org/apache/druid/query/lookup/KafkaLookupExtractorFactory.java b/extensions-core/kafka-extraction-namespace/src/main/java/org/apache/druid/query/lookup/KafkaLookupExtractorFactory.java index 28cbad2a233..a50402a2dd2 100644 --- a/extensions-core/kafka-extraction-namespace/src/main/java/org/apache/druid/query/lookup/KafkaLookupExtractorFactory.java +++ b/extensions-core/kafka-extraction-namespace/src/main/java/org/apache/druid/query/lookup/KafkaLookupExtractorFactory.java @@ -100,7 +100,7 @@ public class KafkaLookupExtractorFactory implements LookupExtractorFactory this.kafkaTopic = Preconditions.checkNotNull(kafkaTopic, "kafkaTopic required"); this.kafkaProperties = Preconditions.checkNotNull(kafkaProperties, "kafkaProperties required"); executorService = MoreExecutors.listeningDecorator(Execs.singleThreaded( - "kafka-factory-" + kafkaTopic + "-%s", + "kafka-factory-" + StringUtils.encodeForFormat(kafkaTopic) + "-%s", Thread.MIN_PRIORITY )); this.cacheManager = cacheManager; diff --git a/extensions-core/lookups-cached-single/src/main/java/org/apache/druid/server/lookup/PollingLookup.java b/extensions-core/lookups-cached-single/src/main/java/org/apache/druid/server/lookup/PollingLookup.java index 0b1b14c9d09..375f3d06d4a 100644 --- a/extensions-core/lookups-cached-single/src/main/java/org/apache/druid/server/lookup/PollingLookup.java +++ b/extensions-core/lookups-cached-single/src/main/java/org/apache/druid/server/lookup/PollingLookup.java @@ -25,6 +25,7 @@ import com.google.common.util.concurrent.ListeningScheduledExecutorService; import com.google.common.util.concurrent.MoreExecutors; import org.apache.druid.common.config.NullHandling; import org.apache.druid.java.util.common.ISE; +import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.concurrent.Execs; import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.query.lookup.LookupExtractor; @@ -75,7 +76,7 @@ public class PollingLookup extends LookupExtractor refOfCacheKeeper.set(new CacheRefKeeper(this.cacheFactory.makeOf(dataFetcher.fetchAll()))); if (pollPeriodMs > 0) { scheduledExecutorService = MoreExecutors.listeningDecorator(Executors.newSingleThreadScheduledExecutor( - Execs.makeThreadFactory("PollingLookup-" + id, Thread.MIN_PRIORITY) + Execs.makeThreadFactory("PollingLookup-" + StringUtils.encodeForFormat(id), Thread.MIN_PRIORITY) )); pollFuture = scheduledExecutorService.scheduleWithFixedDelay( pollAndSwap(), diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/IndexTaskClient.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/IndexTaskClient.java index bb073f4b5eb..d3584622edb 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/IndexTaskClient.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/IndexTaskClient.java @@ -117,7 +117,7 @@ public abstract class IndexTaskClient implements AutoCloseable numThreads, StringUtils.format( "IndexTaskClient-%s-%%d", - callerId + StringUtils.encodeForFormat(callerId) ) ) ); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/TaskMonitor.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/TaskMonitor.java index 302d23ac312..2b757826086 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/TaskMonitor.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/TaskMonitor.java @@ -54,7 +54,7 @@ public class TaskMonitor { private static final Logger log = new Logger(TaskMonitor.class); - private final ScheduledExecutorService taskStatusChecker = Execs.scheduledSingleThreaded(("task-monitor-%d")); + private final ScheduledExecutorService taskStatusChecker = Execs.scheduledSingleThreaded("task-monitor-%d"); /** * A map of subTaskSpecId to {@link MonitorEntry}. This map stores the state of running {@link SubTaskSpec}s. This is diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/security/TaskResourceFilter.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/security/TaskResourceFilter.java index 51b17f81def..da1e5c558ee 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/security/TaskResourceFilter.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/security/TaskResourceFilter.java @@ -21,7 +21,6 @@ package org.apache.druid.indexing.overlord.http.security; import com.google.common.base.Optional; import com.google.common.base.Preconditions; -import com.google.common.base.Predicate; import com.google.common.collect.Iterables; import com.google.inject.Inject; import com.sun.jersey.spi.container.ContainerRequest; @@ -39,7 +38,6 @@ import org.apache.druid.server.security.ResourceAction; import org.apache.druid.server.security.ResourceType; import javax.ws.rs.WebApplicationException; -import javax.ws.rs.core.PathSegment; import javax.ws.rs.core.Response; /** @@ -70,18 +68,11 @@ public class TaskResourceFilter extends AbstractResourceFilter .get( Iterables.indexOf( request.getPathSegments(), - new Predicate() - { - @Override - public boolean apply(PathSegment input) - { - return "task".equals(input.getPath()); - } - } + input -> "task".equals(input.getPath()) ) + 1 ).getPath() ); - taskId = StringUtils.urlDecode(taskId); + IdUtils.validateId("taskId", taskId); Optional taskOptional = taskStorageQueryAdapter.getTask(taskId); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java index da6dc8b83ad..1a4f35fcb05 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java @@ -521,9 +521,9 @@ public abstract class SeekableStreamSupervisortaskId->task row stats + * * @throws InterruptedException * @throws ExecutionException * @throws TimeoutException @@ -1597,7 +1603,8 @@ public abstract class SeekableStreamSupervisortask group mappings, updating * the metadata, the partitionIds list, and the partitionGroups mappings. * - * @param storedPartitions Set of partitions previously tracked, from the metadata store - * @param newlyClosedPartitions Set of partitions that are closed in the metadata store but still present in the - * current {@link #partitionIds} + * @param storedPartitions Set of partitions previously tracked, from the metadata store + * @param newlyClosedPartitions Set of partitions that are closed in the metadata store but still present in the + * current {@link #partitionIds} * @param activePartitionsIdsFromSupplier Set of partitions currently returned by the record supplier, but with * any partitions that are closed/expired in the metadata store removed - * @param previouslyExpiredPartitions Set of partitions that are recorded as expired in the metadata store - * @param partitionIdsFromSupplier Set of partitions currently returned by the record supplier. + * @param previouslyExpiredPartitions Set of partitions that are recorded as expired in the metadata store + * @param partitionIdsFromSupplier Set of partitions currently returned by the record supplier. */ private void cleanupClosedAndExpiredPartitions( Set storedPartitions, @@ -2176,6 +2183,7 @@ public abstract class SeekableStreamSupervisor> recomputePartitionGroupsForExpiration( @@ -2192,6 +2200,7 @@ public abstract class SeekableStreamSupervisor createDataSourceMetadataWithExpiredPartitions( @@ -2804,6 +2813,7 @@ public abstract class SeekableStreamSupervisor> filterExpiredPartitionsFromStartingOffsets( @@ -2832,8 +2842,8 @@ public abstract class SeekableStreamSupervisor maximumMessageTime = (ioConfig.getEarlyMessageRejectionPeriod().isPresent() ? Optional.of( @@ -3039,7 +3049,8 @@ public abstract class SeekableStreamSupervisor getOffsetsFromMetadataStorage() { - final DataSourceMetadata dataSourceMetadata = indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(dataSource); + final DataSourceMetadata dataSourceMetadata = indexerMetadataStorageCoordinator.retrieveDataSourceMetadata( + dataSource); if (dataSourceMetadata instanceof SeekableStreamDataSourceMetadata && checkSourceMetadataMatch(dataSourceMetadata)) { @SuppressWarnings("unchecked") @@ -3338,6 +3349,7 @@ public abstract class SeekableStreamSupervisor> createIndexTasks( @@ -3355,6 +3367,7 @@ public abstract class SeekableStreamSupervisor sequence + * * @return specific instance of datasource metadata */ protected abstract SeekableStreamDataSourceMetadata createDataSourceMetaDataForReset( diff --git a/integration-tests/pom.xml b/integration-tests/pom.xml index 4f0fbe63cc2..cdc384914dc 100644 --- a/integration-tests/pom.xml +++ b/integration-tests/pom.xml @@ -372,7 +372,9 @@ false - \ Россия\ 한국\ 中国!? + + + \ %Россия\ 한국\ 中国!? diff --git a/server/src/main/java/org/apache/druid/curator/discovery/CuratorDruidLeaderSelector.java b/server/src/main/java/org/apache/druid/curator/discovery/CuratorDruidLeaderSelector.java index 10b2a6c8076..f3c0afe326c 100644 --- a/server/src/main/java/org/apache/druid/curator/discovery/CuratorDruidLeaderSelector.java +++ b/server/src/main/java/org/apache/druid/curator/discovery/CuratorDruidLeaderSelector.java @@ -181,7 +181,12 @@ public class CuratorDruidLeaderSelector implements DruidLeaderSelector } try { this.listener = listener; - this.listenerExecutor = Execs.singleThreaded(StringUtils.format("LeaderSelector[%s]", latchPath)); + this.listenerExecutor = Execs.singleThreaded( + StringUtils.format( + "LeaderSelector[%s]", + StringUtils.encodeForFormat(latchPath) + ) + ); createNewLeaderLatchWithListener(); leaderLatch.get().start(); diff --git a/server/src/main/java/org/apache/druid/curator/discovery/CuratorDruidNodeDiscoveryProvider.java b/server/src/main/java/org/apache/druid/curator/discovery/CuratorDruidNodeDiscoveryProvider.java index 383ec204df3..e9c8a363610 100644 --- a/server/src/main/java/org/apache/druid/curator/discovery/CuratorDruidNodeDiscoveryProvider.java +++ b/server/src/main/java/org/apache/druid/curator/discovery/CuratorDruidNodeDiscoveryProvider.java @@ -204,7 +204,9 @@ public class CuratorDruidNodeDiscoveryProvider extends DruidNodeDiscoveryProvide this.jsonMapper = jsonMapper; // This is required to be single threaded from docs in PathChildrenCache. - this.cacheExecutor = Execs.singleThreaded(StringUtils.format("NodeRoleWatcher[%s]", nodeRole)); + this.cacheExecutor = Execs.singleThreaded( + StringUtils.format("NodeRoleWatcher[%s]", StringUtils.encodeForFormat(nodeRole.toString())) + ); cache = new PathChildrenCacheFactory.Builder() //NOTE: cacheData is temporarily set to false and we get data directly from ZK on each event. //this is a workaround to solve curator's out-of-order events problem diff --git a/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataManager.java b/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataManager.java index 60f7f4b86ee..68c333a1a4f 100644 --- a/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataManager.java +++ b/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataManager.java @@ -257,7 +257,7 @@ public class SqlSegmentsMetadataManager implements SegmentsMetadataManager if (exec != null) { return; // Already started } - exec = Execs.scheduledSingleThreaded(getClass().getName() + "-Exec--%d"); + exec = Execs.scheduledSingleThreaded(StringUtils.encodeForFormat(getClass().getName()) + "-Exec--%d"); } finally { lock.unlock(); diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorImpl.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorImpl.java index c2888376bd2..ec9200cf81d 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorImpl.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorImpl.java @@ -968,21 +968,24 @@ public class AppenderatorImpl implements Appenderator if (persistExecutor == null) { // use a blocking single threaded executor to throttle the firehose when write to disk is slow persistExecutor = MoreExecutors.listeningDecorator( - Execs.newBlockingSingleThreaded("[" + myId + "]-appenderator-persist", maxPendingPersists) + Execs.newBlockingSingleThreaded( + "[" + StringUtils.encodeForFormat(myId) + "]-appenderator-persist", + maxPendingPersists + ) ); } if (pushExecutor == null) { // use a blocking single threaded executor to throttle the firehose when write to disk is slow pushExecutor = MoreExecutors.listeningDecorator( - Execs.newBlockingSingleThreaded("[" + myId + "]-appenderator-merge", 1) + Execs.newBlockingSingleThreaded("[" + StringUtils.encodeForFormat(myId) + "]-appenderator-merge", 1) ); } if (intermediateTempExecutor == null) { // use single threaded executor with SynchronousQueue so that all abandon operations occur sequentially intermediateTempExecutor = MoreExecutors.listeningDecorator( - Execs.newBlockingSingleThreaded("[" + myId + "]-appenderator-abandon", 0) + Execs.newBlockingSingleThreaded("[" + StringUtils.encodeForFormat(myId) + "]-appenderator-abandon", 0) ); } } diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BaseAppenderatorDriver.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BaseAppenderatorDriver.java index dc16d595af3..92b9f3cdd00 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BaseAppenderatorDriver.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BaseAppenderatorDriver.java @@ -38,6 +38,7 @@ import org.apache.druid.data.input.Committer; import org.apache.druid.data.input.InputRow; import org.apache.druid.indexing.overlord.SegmentPublishResult; import org.apache.druid.java.util.common.ISE; +import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.concurrent.Execs; import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.segment.loading.DataSegmentKiller; @@ -254,7 +255,9 @@ public abstract class BaseAppenderatorDriver implements Closeable this.segmentAllocator = Preconditions.checkNotNull(segmentAllocator, "segmentAllocator"); this.usedSegmentChecker = Preconditions.checkNotNull(usedSegmentChecker, "usedSegmentChecker"); this.dataSegmentKiller = Preconditions.checkNotNull(dataSegmentKiller, "dataSegmentKiller"); - this.executor = MoreExecutors.listeningDecorator(Execs.singleThreaded("[" + appenderator.getId() + "]-publish")); + this.executor = MoreExecutors.listeningDecorator( + Execs.singleThreaded("[" + StringUtils.encodeForFormat(appenderator.getId()) + "]-publish") + ); } @VisibleForTesting diff --git a/sql/src/main/java/org/apache/druid/sql/avatica/DruidStatement.java b/sql/src/main/java/org/apache/druid/sql/avatica/DruidStatement.java index 2b64c3039e6..b618cf6aa62 100644 --- a/sql/src/main/java/org/apache/druid/sql/avatica/DruidStatement.java +++ b/sql/src/main/java/org/apache/druid/sql/avatica/DruidStatement.java @@ -99,7 +99,11 @@ public class DruidStatement implements Closeable this.sqlLifecycle = Preconditions.checkNotNull(sqlLifecycle, "sqlLifecycle"); this.onClose = Preconditions.checkNotNull(onClose, "onClose"); this.yielderOpenCloseExecutor = Execs.singleThreaded( - StringUtils.format("JDBCYielderOpenCloseExecutor-connection-%s-statement-%d", connectionId, statementId) + StringUtils.format( + "JDBCYielderOpenCloseExecutor-connection-%s-statement-%d", + StringUtils.encodeForFormat(connectionId), + statementId + ) ); } @@ -360,11 +364,11 @@ public class DruidStatement implements Closeable type.getSqlTypeName().getJdbcOrdinal(), type.getSqlTypeName().getName(), Calcites.sqlTypeNameJdbcToJavaClass(type.getSqlTypeName()).getName(), - field.getName()); + field.getName() + ); } - private DruidStatement closeAndPropagateThrowable(Throwable t) { this.throwable = t;