Two fixes related to encoding of % symbols. (#10645)

* Two fixes related to encoding of % symbols.

1) TaskResourceFilter: Don't double-decode task ids. request.getPathSegments()
   returns already-decoded strings. Applying StringUtils.urlDecode on
   top of that causes erroneous behavior with '%' characters.

2) Update various ThreadFactoryBuilder name formats to escape '%'
   characters. This fixes situations where substrings starting with '%'
   are erroneously treated as format specifiers.

ITs are updated to include a '%' in extra.datasource.name.suffix.

* Avoid String.replace.

* Work around surefire bug.

* Fix xml encoding.

* Another try at the proper encoding.

* Give up on the emojis.

* Less ambitious testing.

* Fix an additional problem.

* Adjust encodeForFormat to return null if the input is null.
This commit is contained in:
Gian Merlino 2020-12-06 22:35:11 -08:00 committed by GitHub
parent 17f39ab91e
commit b7641f644c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 93 additions and 41 deletions

View File

@ -188,6 +188,21 @@ public class StringUtils
}
}
/**
* Encodes a string "s" for insertion into a format string.
*
* Returns null if the input is null.
*/
@Nullable
public static String encodeForFormat(@Nullable final String s)
{
if (s == null) {
return null;
} else {
return StringUtils.replaceChar(s, '%', "%%");
}
}
public static String toLowerCase(String s)
{
return s.toLowerCase(Locale.ENGLISH);

View File

@ -173,6 +173,14 @@ public class StringUtilsTest
Assert.assertEquals("", StringUtils.replace("aaaa", "aa", ""));
}
@Test
public void testEncodeForFormat()
{
Assert.assertEquals("x %% a %%s", StringUtils.encodeForFormat("x % a %s"));
Assert.assertEquals("", StringUtils.encodeForFormat(""));
Assert.assertNull(StringUtils.encodeForFormat(null));
}
@Test
public void testURLEncodeSpace()
{

View File

@ -137,7 +137,7 @@ public class MaterializedViewSupervisor implements Supervisor
new DerivativeDataSourceMetadata(spec.getBaseDataSource(), spec.getDimensions(), spec.getMetrics())
);
}
exec = MoreExecutors.listeningDecorator(Execs.scheduledSingleThreaded(supervisorId));
exec = MoreExecutors.listeningDecorator(Execs.scheduledSingleThreaded(StringUtils.encodeForFormat(supervisorId)));
final Duration delay = config.getTaskCheckDuration().toStandardDuration();
future = exec.scheduleWithFixedDelay(
MaterializedViewSupervisor.this::run,

View File

@ -88,7 +88,9 @@ public class CommonCacheNotifier
String callerName
)
{
this.exec = Execs.singleThreaded(StringUtils.format("%s-notifierThread-", callerName) + "%d");
this.exec = Execs.singleThreaded(
StringUtils.format("%s-notifierThread-", StringUtils.encodeForFormat(callerName)) + "%d"
);
this.callerName = callerName;
this.updateQueue = new LinkedBlockingQueue<>();
this.itemConfigMap = itemConfigMap;

View File

@ -100,7 +100,7 @@ public class KafkaLookupExtractorFactory implements LookupExtractorFactory
this.kafkaTopic = Preconditions.checkNotNull(kafkaTopic, "kafkaTopic required");
this.kafkaProperties = Preconditions.checkNotNull(kafkaProperties, "kafkaProperties required");
executorService = MoreExecutors.listeningDecorator(Execs.singleThreaded(
"kafka-factory-" + kafkaTopic + "-%s",
"kafka-factory-" + StringUtils.encodeForFormat(kafkaTopic) + "-%s",
Thread.MIN_PRIORITY
));
this.cacheManager = cacheManager;

View File

@ -25,6 +25,7 @@ import com.google.common.util.concurrent.ListeningScheduledExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.query.lookup.LookupExtractor;
@ -75,7 +76,7 @@ public class PollingLookup extends LookupExtractor
refOfCacheKeeper.set(new CacheRefKeeper(this.cacheFactory.makeOf(dataFetcher.fetchAll())));
if (pollPeriodMs > 0) {
scheduledExecutorService = MoreExecutors.listeningDecorator(Executors.newSingleThreadScheduledExecutor(
Execs.makeThreadFactory("PollingLookup-" + id, Thread.MIN_PRIORITY)
Execs.makeThreadFactory("PollingLookup-" + StringUtils.encodeForFormat(id), Thread.MIN_PRIORITY)
));
pollFuture = scheduledExecutorService.scheduleWithFixedDelay(
pollAndSwap(),

View File

@ -117,7 +117,7 @@ public abstract class IndexTaskClient implements AutoCloseable
numThreads,
StringUtils.format(
"IndexTaskClient-%s-%%d",
callerId
StringUtils.encodeForFormat(callerId)
)
)
);

View File

@ -54,7 +54,7 @@ public class TaskMonitor<T extends Task>
{
private static final Logger log = new Logger(TaskMonitor.class);
private final ScheduledExecutorService taskStatusChecker = Execs.scheduledSingleThreaded(("task-monitor-%d"));
private final ScheduledExecutorService taskStatusChecker = Execs.scheduledSingleThreaded("task-monitor-%d");
/**
* A map of subTaskSpecId to {@link MonitorEntry}. This map stores the state of running {@link SubTaskSpec}s. This is

View File

@ -21,7 +21,6 @@ package org.apache.druid.indexing.overlord.http.security;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.base.Predicate;
import com.google.common.collect.Iterables;
import com.google.inject.Inject;
import com.sun.jersey.spi.container.ContainerRequest;
@ -39,7 +38,6 @@ import org.apache.druid.server.security.ResourceAction;
import org.apache.druid.server.security.ResourceType;
import javax.ws.rs.WebApplicationException;
import javax.ws.rs.core.PathSegment;
import javax.ws.rs.core.Response;
/**
@ -70,18 +68,11 @@ public class TaskResourceFilter extends AbstractResourceFilter
.get(
Iterables.indexOf(
request.getPathSegments(),
new Predicate<PathSegment>()
{
@Override
public boolean apply(PathSegment input)
{
return "task".equals(input.getPath());
}
}
input -> "task".equals(input.getPath())
) + 1
).getPath()
);
taskId = StringUtils.urlDecode(taskId);
IdUtils.validateId("taskId", taskId);
Optional<Task> taskOptional = taskStorageQueryAdapter.getTask(taskId);

View File

@ -521,9 +521,9 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
this.tuningConfig = spec.getTuningConfig();
this.taskTuningConfig = this.tuningConfig.convertToTaskTuningConfig();
this.supervisorId = supervisorId;
this.exec = Execs.singleThreaded(supervisorId);
this.scheduledExec = Execs.scheduledSingleThreaded(supervisorId + "-Scheduler-%d");
this.reportingExec = Execs.scheduledSingleThreaded(supervisorId + "-Reporting-%d");
this.exec = Execs.singleThreaded(StringUtils.encodeForFormat(supervisorId));
this.scheduledExec = Execs.scheduledSingleThreaded(StringUtils.encodeForFormat(supervisorId) + "-Scheduler-%d");
this.reportingExec = Execs.scheduledSingleThreaded(StringUtils.encodeForFormat(supervisorId) + "-Reporting-%d");
this.stateManager = new SeekableStreamSupervisorStateManager(
spec.getSupervisorStateManagerConfig(),
spec.isSuspended()
@ -533,7 +533,12 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
? this.tuningConfig.getWorkerThreads()
: Math.min(10, this.ioConfig.getTaskCount()));
this.workerExec = MoreExecutors.listeningDecorator(Execs.multiThreaded(workerThreads, supervisorId + "-Worker-%d"));
this.workerExec = MoreExecutors.listeningDecorator(
Execs.multiThreaded(
workerThreads,
StringUtils.encodeForFormat(supervisorId) + "-Worker-%d"
)
);
log.info("Created worker pool with [%d] threads for dataSource [%s]", workerThreads, this.dataSource);
this.taskInfoProvider = new TaskInfoProvider()
@ -921,6 +926,7 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
* Collect row ingestion stats from all tasks managed by this supervisor.
*
* @return A map of groupId->taskId->task row stats
*
* @throws InterruptedException
* @throws ExecutionException
* @throws TimeoutException
@ -1597,7 +1603,8 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
throw new RuntimeException(e);
}
final DataSourceMetadata rawDataSourceMetadata = indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(dataSource);
final DataSourceMetadata rawDataSourceMetadata = indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(
dataSource);
if (rawDataSourceMetadata != null && !checkSourceMetadataMatch(rawDataSourceMetadata)) {
throw new IAE(
@ -2088,13 +2095,13 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
* It will mark the expired partitions in metadata and recompute the partition->task group mappings, updating
* the metadata, the partitionIds list, and the partitionGroups mappings.
*
* @param storedPartitions Set of partitions previously tracked, from the metadata store
* @param newlyClosedPartitions Set of partitions that are closed in the metadata store but still present in the
* current {@link #partitionIds}
* @param storedPartitions Set of partitions previously tracked, from the metadata store
* @param newlyClosedPartitions Set of partitions that are closed in the metadata store but still present in the
* current {@link #partitionIds}
* @param activePartitionsIdsFromSupplier Set of partitions currently returned by the record supplier, but with
* any partitions that are closed/expired in the metadata store removed
* @param previouslyExpiredPartitions Set of partitions that are recorded as expired in the metadata store
* @param partitionIdsFromSupplier Set of partitions currently returned by the record supplier.
* @param previouslyExpiredPartitions Set of partitions that are recorded as expired in the metadata store
* @param partitionIdsFromSupplier Set of partitions currently returned by the record supplier.
*/
private void cleanupClosedAndExpiredPartitions(
Set<PartitionIdType> storedPartitions,
@ -2176,6 +2183,7 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
* by this method.
*
* @param availablePartitions
*
* @return a remapped copy of partitionGroups, containing only the partitions in availablePartitions
*/
protected Map<Integer, Set<PartitionIdType>> recomputePartitionGroupsForExpiration(
@ -2192,6 +2200,7 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
*
* @param currentMetadata The current DataSourceMetadata from metadata storage
* @param expiredPartitionIds The set of expired partition IDs.
*
* @return currentMetadata but with any expired partitions removed.
*/
protected SeekableStreamDataSourceMetadata<PartitionIdType, SequenceOffsetType> createDataSourceMetadataWithExpiredPartitions(
@ -2804,6 +2813,7 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
* should be removed from the starting offsets sent to the tasks.
*
* @param startingOffsets
*
* @return startingOffsets with entries for expired partitions removed
*/
protected Map<PartitionIdType, OrderedSequenceNumber<SequenceOffsetType>> filterExpiredPartitionsFromStartingOffsets(
@ -2832,8 +2842,8 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
minimumMessageTime = Optional.of(ioConfig.getLateMessageRejectionStartDateTime().get());
} else {
minimumMessageTime = (ioConfig.getLateMessageRejectionPeriod().isPresent() ? Optional.of(
DateTimes.nowUtc().minus(ioConfig.getLateMessageRejectionPeriod().get())
) : Optional.absent());
DateTimes.nowUtc().minus(ioConfig.getLateMessageRejectionPeriod().get())
) : Optional.absent());
}
Optional<DateTime> maximumMessageTime = (ioConfig.getEarlyMessageRejectionPeriod().isPresent() ? Optional.of(
@ -3039,7 +3049,8 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
private Map<PartitionIdType, SequenceOffsetType> getOffsetsFromMetadataStorage()
{
final DataSourceMetadata dataSourceMetadata = indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(dataSource);
final DataSourceMetadata dataSourceMetadata = indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(
dataSource);
if (dataSourceMetadata instanceof SeekableStreamDataSourceMetadata
&& checkSourceMetadataMatch(dataSourceMetadata)) {
@SuppressWarnings("unchecked")
@ -3338,6 +3349,7 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
* the given replicas count
*
* @return list of specific kafka/kinesis index taksks
*
* @throws JsonProcessingException
*/
protected abstract List<SeekableStreamIndexTask<PartitionIdType, SequenceOffsetType>> createIndexTasks(
@ -3355,6 +3367,7 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
* different between Kafka/Kinesis since Kinesis uses String as partition id
*
* @param partition partition id
*
* @return taskgroup id
*/
protected abstract int getTaskGroupIdForPartition(PartitionIdType partition);
@ -3364,6 +3377,7 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
* of [kafka/kinesis]DataSourceMetadata
*
* @param metadata datasource metadata
*
* @return true if isInstance else false
*/
protected abstract boolean checkSourceMetadataMatch(DataSourceMetadata metadata);
@ -3373,6 +3387,7 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
* [Kafka/Kinesis]IndexTask
*
* @param task task
*
* @return true if isInstance else false
*/
protected abstract boolean doesTaskTypeMatchSupervisor(Task task);
@ -3382,6 +3397,7 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
*
* @param stream stream name
* @param map partitionId -> sequence
*
* @return specific instance of datasource metadata
*/
protected abstract SeekableStreamDataSourceMetadata<PartitionIdType, SequenceOffsetType> createDataSourceMetaDataForReset(

View File

@ -372,7 +372,9 @@
<docker.build.skip>false</docker.build.skip>
<override.config.path />
<resource.file.dir.path />
<extra.datasource.name.suffix>\ Россия\ 한국\ 中国!?</extra.datasource.name.suffix>
<!-- Would like to put emojis in here too, but they throw "Input buffer too short" errors due to https://issues.apache.org/jira/browse/SUREFIRE-1865 -->
<extra.datasource.name.suffix>\ %Россия\ 한국\ 中国!?</extra.datasource.name.suffix>
</properties>
<build>
<plugins>

View File

@ -181,7 +181,12 @@ public class CuratorDruidLeaderSelector implements DruidLeaderSelector
}
try {
this.listener = listener;
this.listenerExecutor = Execs.singleThreaded(StringUtils.format("LeaderSelector[%s]", latchPath));
this.listenerExecutor = Execs.singleThreaded(
StringUtils.format(
"LeaderSelector[%s]",
StringUtils.encodeForFormat(latchPath)
)
);
createNewLeaderLatchWithListener();
leaderLatch.get().start();

View File

@ -204,7 +204,9 @@ public class CuratorDruidNodeDiscoveryProvider extends DruidNodeDiscoveryProvide
this.jsonMapper = jsonMapper;
// This is required to be single threaded from docs in PathChildrenCache.
this.cacheExecutor = Execs.singleThreaded(StringUtils.format("NodeRoleWatcher[%s]", nodeRole));
this.cacheExecutor = Execs.singleThreaded(
StringUtils.format("NodeRoleWatcher[%s]", StringUtils.encodeForFormat(nodeRole.toString()))
);
cache = new PathChildrenCacheFactory.Builder()
//NOTE: cacheData is temporarily set to false and we get data directly from ZK on each event.
//this is a workaround to solve curator's out-of-order events problem

View File

@ -257,7 +257,7 @@ public class SqlSegmentsMetadataManager implements SegmentsMetadataManager
if (exec != null) {
return; // Already started
}
exec = Execs.scheduledSingleThreaded(getClass().getName() + "-Exec--%d");
exec = Execs.scheduledSingleThreaded(StringUtils.encodeForFormat(getClass().getName()) + "-Exec--%d");
}
finally {
lock.unlock();

View File

@ -968,21 +968,24 @@ public class AppenderatorImpl implements Appenderator
if (persistExecutor == null) {
// use a blocking single threaded executor to throttle the firehose when write to disk is slow
persistExecutor = MoreExecutors.listeningDecorator(
Execs.newBlockingSingleThreaded("[" + myId + "]-appenderator-persist", maxPendingPersists)
Execs.newBlockingSingleThreaded(
"[" + StringUtils.encodeForFormat(myId) + "]-appenderator-persist",
maxPendingPersists
)
);
}
if (pushExecutor == null) {
// use a blocking single threaded executor to throttle the firehose when write to disk is slow
pushExecutor = MoreExecutors.listeningDecorator(
Execs.newBlockingSingleThreaded("[" + myId + "]-appenderator-merge", 1)
Execs.newBlockingSingleThreaded("[" + StringUtils.encodeForFormat(myId) + "]-appenderator-merge", 1)
);
}
if (intermediateTempExecutor == null) {
// use single threaded executor with SynchronousQueue so that all abandon operations occur sequentially
intermediateTempExecutor = MoreExecutors.listeningDecorator(
Execs.newBlockingSingleThreaded("[" + myId + "]-appenderator-abandon", 0)
Execs.newBlockingSingleThreaded("[" + StringUtils.encodeForFormat(myId) + "]-appenderator-abandon", 0)
);
}
}

View File

@ -38,6 +38,7 @@ import org.apache.druid.data.input.Committer;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.indexing.overlord.SegmentPublishResult;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.segment.loading.DataSegmentKiller;
@ -254,7 +255,9 @@ public abstract class BaseAppenderatorDriver implements Closeable
this.segmentAllocator = Preconditions.checkNotNull(segmentAllocator, "segmentAllocator");
this.usedSegmentChecker = Preconditions.checkNotNull(usedSegmentChecker, "usedSegmentChecker");
this.dataSegmentKiller = Preconditions.checkNotNull(dataSegmentKiller, "dataSegmentKiller");
this.executor = MoreExecutors.listeningDecorator(Execs.singleThreaded("[" + appenderator.getId() + "]-publish"));
this.executor = MoreExecutors.listeningDecorator(
Execs.singleThreaded("[" + StringUtils.encodeForFormat(appenderator.getId()) + "]-publish")
);
}
@VisibleForTesting

View File

@ -99,7 +99,11 @@ public class DruidStatement implements Closeable
this.sqlLifecycle = Preconditions.checkNotNull(sqlLifecycle, "sqlLifecycle");
this.onClose = Preconditions.checkNotNull(onClose, "onClose");
this.yielderOpenCloseExecutor = Execs.singleThreaded(
StringUtils.format("JDBCYielderOpenCloseExecutor-connection-%s-statement-%d", connectionId, statementId)
StringUtils.format(
"JDBCYielderOpenCloseExecutor-connection-%s-statement-%d",
StringUtils.encodeForFormat(connectionId),
statementId
)
);
}
@ -360,11 +364,11 @@ public class DruidStatement implements Closeable
type.getSqlTypeName().getJdbcOrdinal(),
type.getSqlTypeName().getName(),
Calcites.sqlTypeNameJdbcToJavaClass(type.getSqlTypeName()).getName(),
field.getName());
field.getName()
);
}
private DruidStatement closeAndPropagateThrowable(Throwable t)
{
this.throwable = t;