diff --git a/core/src/main/java/org/apache/druid/indexer/TaskIdUtils.java b/core/src/main/java/org/apache/druid/indexer/TaskIdUtils.java new file mode 100644 index 00000000000..a88341b0833 --- /dev/null +++ b/core/src/main/java/org/apache/druid/indexer/TaskIdUtils.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexer; + +import com.google.common.base.Preconditions; +import com.google.common.base.Strings; +import org.apache.druid.java.util.common.StringUtils; + +import java.util.concurrent.ThreadLocalRandom; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +public class TaskIdUtils +{ + private static final Pattern INVALIDCHARS = Pattern.compile("(?s).*[^\\S ].*"); + + public static void validateId(String thingToValidate, String stringToValidate) + { + Preconditions.checkArgument( + !Strings.isNullOrEmpty(stringToValidate), + StringUtils.format("%s cannot be null or empty. Please provide a %s.", thingToValidate, thingToValidate) + ); + Preconditions.checkArgument( + !stringToValidate.startsWith("."), + StringUtils.format("%s cannot start with the '.' character.", thingToValidate) + ); + Preconditions.checkArgument( + !stringToValidate.contains("/"), + StringUtils.format("%s cannot contain the '/' character.", thingToValidate) + ); + Matcher m = INVALIDCHARS.matcher(stringToValidate); + Preconditions.checkArgument( + !m.matches(), + StringUtils.format("%s cannot contain whitespace character except space.", thingToValidate) + ); + } + + public static String getRandomId() + { + final StringBuilder suffix = new StringBuilder(8); + for (int i = 0; i < Integer.BYTES * 2; ++i) { + suffix.append((char) ('a' + ((ThreadLocalRandom.current().nextInt() >>> (i * 4)) & 0x0F))); + } + return suffix.toString(); + } +} diff --git a/core/src/test/java/org/apache/druid/indexer/TaskIdUtilsTest.java b/core/src/test/java/org/apache/druid/indexer/TaskIdUtilsTest.java new file mode 100644 index 00000000000..5fed8fb34fb --- /dev/null +++ b/core/src/test/java/org/apache/druid/indexer/TaskIdUtilsTest.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexer; + +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; + +public class TaskIdUtilsTest +{ + private static final String THINGO = "thingToValidate"; + public static final String VALID_ID_CHARS = "alpha123..*~!@#&%^&*()-+ Россия\\ 한국 中国!"; + + @Rule + public ExpectedException expectedException = ExpectedException.none(); + + @Test + public void testValidIdName() + { + TaskIdUtils.validateId(THINGO, VALID_ID_CHARS); + } + + @Test + public void testInvalidNull() + { + expectedException.expect(IllegalArgumentException.class); + expectedException.expectMessage("thingToValidate cannot be null or empty. Please provide a thingToValidate."); + TaskIdUtils.validateId(THINGO, null); + } + + @Test + public void testInvalidEmpty() + { + expectedException.expect(IllegalArgumentException.class); + expectedException.expectMessage("thingToValidate cannot be null or empty. Please provide a thingToValidate."); + TaskIdUtils.validateId(THINGO, ""); + } + + @Test + public void testInvalidSlashes() + { + expectedException.expect(IllegalArgumentException.class); + expectedException.expectMessage("thingToValidate cannot contain the '/' character."); + TaskIdUtils.validateId(THINGO, "/paths/are/bad/since/we/make/files/from/stuff"); + } + + @Test + public void testInvalidLeadingDot() + { + expectedException.expect(IllegalArgumentException.class); + expectedException.expectMessage("thingToValidate cannot start with the '.' character."); + TaskIdUtils.validateId(THINGO, "./nice/try"); + } + + @Test + public void testInvalidSpacesRegexTabs() + { + expectedException.expect(IllegalArgumentException.class); + expectedException.expectMessage("thingToValidate cannot contain whitespace character except space."); + TaskIdUtils.validateId(THINGO, "spaces\tare\tbetter\tthan\ttabs\twhich\tare\tillegal"); + } + + @Test + public void testInvalidSpacesRegexNewline() + { + expectedException.expect(IllegalArgumentException.class); + expectedException.expectMessage("thingToValidate cannot contain whitespace character except space."); + TaskIdUtils.validateId(THINGO, "new\nline"); + } + + @Test + public void testInvalidSpacesRegexCarriageReturn() + { + expectedException.expect(IllegalArgumentException.class); + expectedException.expectMessage("thingToValidate cannot contain whitespace character except space."); + TaskIdUtils.validateId(THINGO, "does\rexist\rby\ritself"); + } + + @Test + public void testInvalidSpacesRegexLineTabulation() + { + expectedException.expect(IllegalArgumentException.class); + expectedException.expectMessage("thingToValidate cannot contain whitespace character except space."); + TaskIdUtils.validateId(THINGO, "wtf\u000Bis line tabulation"); + } + + @Test + public void testInvalidSpacesRegexFormFeed() + { + expectedException.expect(IllegalArgumentException.class); + expectedException.expectMessage("thingToValidate cannot contain whitespace character except space."); + TaskIdUtils.validateId(THINGO, "form\u000cfeed?"); + } +} diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaConsumerConfigs.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaConsumerConfigs.java index 39174d562a6..7339d26f7e6 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaConsumerConfigs.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaConsumerConfigs.java @@ -19,7 +19,7 @@ package org.apache.druid.indexing.kafka; -import org.apache.druid.indexing.common.task.utils.RandomIdUtils; +import org.apache.druid.indexer.TaskIdUtils; import org.apache.druid.java.util.common.StringUtils; import java.util.HashMap; @@ -35,7 +35,7 @@ public class KafkaConsumerConfigs { final Map props = new HashMap<>(); props.put("metadata.max.age.ms", "10000"); - props.put("group.id", StringUtils.format("kafka-supervisor-%s", RandomIdUtils.getRandomId())); + props.put("group.id", StringUtils.format("kafka-supervisor-%s", TaskIdUtils.getRandomId())); props.put("auto.offset.reset", "none"); props.put("enable.auto.commit", "false"); props.put("isolation.level", "read_committed"); diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java index f5cec3d9147..ba12128ab1e 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java @@ -24,10 +24,10 @@ import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Joiner; +import org.apache.druid.indexer.TaskIdUtils; import org.apache.druid.indexing.common.stats.RowIngestionMetersFactory; import org.apache.druid.indexing.common.task.Task; import org.apache.druid.indexing.common.task.TaskResource; -import org.apache.druid.indexing.common.task.utils.RandomIdUtils; import org.apache.druid.indexing.kafka.KafkaDataSourceMetadata; import org.apache.druid.indexing.kafka.KafkaIndexTask; import org.apache.druid.indexing.kafka.KafkaIndexTaskClientFactory; @@ -221,7 +221,7 @@ public class KafkaSupervisor extends SeekableStreamSupervisor List> taskList = new ArrayList<>(); for (int i = 0; i < replicas; i++) { - String taskId = Joiner.on("_").join(baseSequenceName, RandomIdUtils.getRandomId()); + String taskId = Joiner.on("_").join(baseSequenceName, TaskIdUtils.getRandomId()); taskList.add(new KafkaIndexTask( taskId, new TaskResource(baseSequenceName, 1), diff --git a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java index 13f94a5d345..c789fc7f3ac 100644 --- a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java +++ b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java @@ -25,10 +25,10 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Joiner; import com.google.common.collect.ImmutableMap; import org.apache.druid.common.aws.AWSCredentialsConfig; +import org.apache.druid.indexer.TaskIdUtils; import org.apache.druid.indexing.common.stats.RowIngestionMetersFactory; import org.apache.druid.indexing.common.task.Task; import org.apache.druid.indexing.common.task.TaskResource; -import org.apache.druid.indexing.common.task.utils.RandomIdUtils; import org.apache.druid.indexing.kinesis.KinesisDataSourceMetadata; import org.apache.druid.indexing.kinesis.KinesisIndexTask; import org.apache.druid.indexing.kinesis.KinesisIndexTaskClientFactory; @@ -169,7 +169,7 @@ public class KinesisSupervisor extends SeekableStreamSupervisor List> taskList = new ArrayList<>(); for (int i = 0; i < replicas; i++) { - String taskId = Joiner.on("_").join(baseSequenceName, RandomIdUtils.getRandomId()); + String taskId = Joiner.on("_").join(baseSequenceName, TaskIdUtils.getRandomId()); taskList.add(new KinesisIndexTask( taskId, new TaskResource(baseSequenceName, 1), diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractTask.java index d47f9003737..40745bfe32f 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractTask.java @@ -24,11 +24,11 @@ import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Joiner; import com.google.common.base.Objects; import com.google.common.base.Preconditions; +import org.apache.druid.indexer.TaskIdUtils; import org.apache.druid.indexer.TaskStatus; import org.apache.druid.indexing.common.TaskLock; import org.apache.druid.indexing.common.actions.LockListAction; import org.apache.druid.indexing.common.actions.TaskActionClient; -import org.apache.druid.indexing.common.task.utils.RandomIdUtils; import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.query.Query; import org.apache.druid.query.QueryRunner; @@ -92,7 +92,7 @@ public abstract class AbstractTask implements Task } final List objects = new ArrayList<>(); - final String suffix = RandomIdUtils.getRandomId(); + final String suffix = TaskIdUtils.getRandomId(); objects.add(typeName); objects.add(dataSource); objects.add(suffix); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/utils/RandomIdUtils.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/utils/RandomIdUtils.java deleted file mode 100644 index a782b668899..00000000000 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/utils/RandomIdUtils.java +++ /dev/null @@ -1,34 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.druid.indexing.common.task.utils; - -import java.util.concurrent.ThreadLocalRandom; - -public class RandomIdUtils -{ - public static String getRandomId() - { - final StringBuilder suffix = new StringBuilder(8); - for (int i = 0; i < Integer.BYTES * 2; ++i) { - suffix.append((char) ('a' + ((ThreadLocalRandom.current().nextInt() >>> (i * 4)) & 0x0F))); - } - return suffix.toString(); - } -} diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java index 948b687ea56..682a560e8ee 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java @@ -430,13 +430,13 @@ public abstract class SeekableStreamSupervisor; see documentation for TaskGroup class + // Map<{group id}, {actively reading task group}>; see documentation for TaskGroup class private final ConcurrentHashMap activelyReadingTaskGroups = new ConcurrentHashMap<>(); // After telling a taskGroup to stop reading and begin publishing a segment, it is moved from [activelyReadingTaskGroups] to here so // we can monitor its status while we queue new tasks to read the next range of sequences. This is a list since we could // have multiple sets of tasks publishing at once if time-to-publish > taskDuration. - // Map<{group RandomIdUtils}, List<{pending completion task groups}>> + // Map<{group id}, List<{pending completion task groups}>> private final ConcurrentHashMap> pendingCompletionTaskGroups = new ConcurrentHashMap<>(); // We keep two separate maps for tracking the current state of partition->task group mappings [partitionGroups] and partition->offset @@ -998,7 +998,7 @@ public abstract class SeekableStreamSupervisor x, x -> new TaskData()))); if (activelyReadingTaskGroups.putIfAbsent(taskGroupId, group) != null) { throw new ISE( - "trying to add taskGroup with RandomIdUtils [%s] to actively reading task groups, but group already exists.", + "trying to add taskGroup with id [%s] to actively reading task groups, but group already exists.", taskGroupId ); } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/worker/IntermediaryDataManager.java b/indexing-service/src/main/java/org/apache/druid/indexing/worker/IntermediaryDataManager.java index b35b1918d33..78090ca5181 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/worker/IntermediaryDataManager.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/worker/IntermediaryDataManager.java @@ -27,6 +27,7 @@ import org.apache.commons.lang3.mutable.MutableInt; import org.apache.druid.client.indexing.IndexingServiceClient; import org.apache.druid.client.indexing.TaskStatus; import org.apache.druid.guice.ManageLifecycle; +import org.apache.druid.indexer.TaskIdUtils; import org.apache.druid.indexing.common.config.TaskConfig; import org.apache.druid.indexing.worker.config.WorkerConfig; import org.apache.druid.java.util.common.DateTimes; @@ -336,6 +337,7 @@ public class IntermediaryDataManager @Nullable public File findPartitionFile(String supervisorTaskId, String subTaskId, Interval interval, int partitionId) { + TaskIdUtils.validateId("supervisorTaskId", supervisorTaskId); for (StorageLocation location : shuffleDataLocations) { final File partitionDir = new File(location.getPath(), getPartitionDir(supervisorTaskId, interval, partitionId)); if (partitionDir.exists()) { @@ -364,6 +366,7 @@ public class IntermediaryDataManager public void deletePartitions(String supervisorTaskId) throws IOException { + TaskIdUtils.validateId("supervisorTaskId", supervisorTaskId); for (StorageLocation location : shuffleDataLocations) { final File supervisorTaskPath = new File(location.getPath(), supervisorTaskId); if (supervisorTaskPath.exists()) { diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/worker/IntermediaryDataManagerManualAddAndDeleteTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/worker/IntermediaryDataManagerManualAddAndDeleteTest.java index 1e1eab4e7ff..15aad92b6a3 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/worker/IntermediaryDataManagerManualAddAndDeleteTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/worker/IntermediaryDataManagerManualAddAndDeleteTest.java @@ -26,6 +26,7 @@ import org.apache.druid.client.indexing.NoopIndexingServiceClient; import org.apache.druid.indexing.common.config.TaskConfig; import org.apache.druid.indexing.worker.config.WorkerConfig; import org.apache.druid.java.util.common.Intervals; +import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.segment.loading.StorageLocationConfig; import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.partition.NumberedShardSpec; @@ -51,11 +52,15 @@ public class IntermediaryDataManagerManualAddAndDeleteTest public ExpectedException expectedException = ExpectedException.none(); private IntermediaryDataManager intermediaryDataManager; + private File intermediarySegmentsLocation; + private File siblingLocation; @Before public void setup() throws IOException { final WorkerConfig workerConfig = new WorkerConfig(); + intermediarySegmentsLocation = tempDir.newFolder(); + siblingLocation = tempDir.newFolder(); final TaskConfig taskConfig = new TaskConfig( null, null, @@ -65,7 +70,7 @@ public class IntermediaryDataManagerManualAddAndDeleteTest false, null, null, - ImmutableList.of(new StorageLocationConfig(tempDir.newFolder(), 600L, null)) + ImmutableList.of(new StorageLocationConfig(intermediarySegmentsLocation, 600L, null)) ); final IndexingServiceClient indexingServiceClient = new NoopIndexingServiceClient(); intermediaryDataManager = new IntermediaryDataManager(workerConfig, taskConfig, indexingServiceClient); @@ -157,6 +162,63 @@ public class IntermediaryDataManagerManualAddAndDeleteTest intermediaryDataManager.addSegment(supervisorTaskId, "subTaskId", segment, segmentFile); } + @Test + public void testFailsWithCraftyFabricatedNamesForDelete() throws IOException + { + expectedException.expect(IllegalArgumentException.class); + expectedException.expectMessage("supervisorTaskId cannot start with the '.' character."); + final String supervisorTaskId = "../" + siblingLocation.getName(); + final String someFile = "sneaky-snake.txt"; + File dataFile = new File(siblingLocation, someFile); + FileUtils.write( + dataFile, + "test data", + StandardCharsets.UTF_8 + ); + Assert.assertTrue(new File(intermediarySegmentsLocation, supervisorTaskId).exists()); + Assert.assertTrue(dataFile.exists()); + intermediaryDataManager.deletePartitions(supervisorTaskId); + Assert.assertTrue(new File(intermediarySegmentsLocation, supervisorTaskId).exists()); + Assert.assertTrue(dataFile.exists()); + } + + @Test + public void testFailsWithCraftyFabricatedNamesForFind() throws IOException + { + expectedException.expect(IllegalArgumentException.class); + expectedException.expectMessage("supervisorTaskId cannot start with the '.' character."); + final String supervisorTaskId = "../" + siblingLocation.getName(); + final Interval interval = Intervals.of("2018/2019"); + final int partitionId = 0; + final String intervalAndPart = + StringUtils.format("%s/%s/%s", interval.getStart().toString(), interval.getEnd().toString(), partitionId); + + final String someFile = "sneaky-snake.txt"; + + final String someFilePath = intervalAndPart + "/" + someFile; + + // can only traverse to find files that are in a path of the form {start}/{end}/{partitionId}, so write a data file + // in a location like that + File dataFile = new File(siblingLocation, someFilePath); + FileUtils.write( + dataFile, + "test data", + StandardCharsets.UTF_8 + ); + + Assert.assertTrue(new File(intermediarySegmentsLocation, supervisorTaskId).exists()); + Assert.assertTrue( + new File(intermediarySegmentsLocation, supervisorTaskId + "/" + someFilePath).exists()); + + final File foundFile1 = intermediaryDataManager.findPartitionFile( + supervisorTaskId, + someFile, + interval, + partitionId + ); + Assert.assertNull(foundFile1); + } + private File generateSegmentDir(String fileName) throws IOException { // Each file size is 138 bytes after compression diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractKafkaIndexerTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractKafkaIndexerTest.java index eea0f7fb2ae..a20254d2cf0 100644 --- a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractKafkaIndexerTest.java +++ b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractKafkaIndexerTest.java @@ -28,7 +28,7 @@ import kafka.utils.ZkUtils; import org.I0Itec.zkclient.ZkClient; import org.I0Itec.zkclient.ZkConnection; import org.apache.commons.io.IOUtils; -import org.apache.druid.indexing.common.task.utils.RandomIdUtils; +import org.apache.druid.indexer.TaskIdUtils; import org.apache.druid.indexing.kafka.KafkaConsumerConfigs; import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.ISE; @@ -165,7 +165,7 @@ abstract class AbstractKafkaIndexerTest extends AbstractIndexerTest properties.setProperty("value.serializer", ByteArraySerializer.class.getName()); if (txnEnabled) { properties.setProperty("enable.idempotence", "true"); - properties.setProperty("transactional.id", RandomIdUtils.getRandomId()); + properties.setProperty("transactional.id", TaskIdUtils.getRandomId()); } KafkaProducer producer = new KafkaProducer<>( diff --git a/server/src/main/java/org/apache/druid/segment/indexing/DataSchema.java b/server/src/main/java/org/apache/druid/segment/indexing/DataSchema.java index 37009da2bda..6be9a7141bf 100644 --- a/server/src/main/java/org/apache/druid/segment/indexing/DataSchema.java +++ b/server/src/main/java/org/apache/druid/segment/indexing/DataSchema.java @@ -27,12 +27,12 @@ import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; -import com.google.common.base.Strings; import com.google.common.collect.Sets; import org.apache.druid.data.input.impl.DimensionsSpec; import org.apache.druid.data.input.impl.InputRowParser; import org.apache.druid.data.input.impl.ParseSpec; import org.apache.druid.data.input.impl.TimestampSpec; +import org.apache.druid.indexer.TaskIdUtils; import org.apache.druid.java.util.common.IAE; import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.query.aggregation.AggregatorFactory; @@ -45,7 +45,6 @@ import java.util.Arrays; import java.util.HashSet; import java.util.Map; import java.util.Set; -import java.util.regex.Matcher; import java.util.regex.Pattern; import java.util.stream.Collectors; @@ -148,16 +147,7 @@ public class DataSchema private static void validateDatasourceName(String dataSource) { - Preconditions.checkArgument( - !Strings.isNullOrEmpty(dataSource), - "dataSource cannot be null or empty. Please provide a dataSource." - ); - Matcher m = INVALIDCHARS.matcher(dataSource); - Preconditions.checkArgument( - !m.matches(), - "dataSource cannot contain whitespace character except space." - ); - Preconditions.checkArgument(!dataSource.contains("/"), "dataSource cannot contain the '/' character."); + TaskIdUtils.validateId("dataSource", dataSource); } private static DimensionsSpec computeDimensionsSpec( diff --git a/server/src/test/java/org/apache/druid/segment/indexing/DataSchemaTest.java b/server/src/test/java/org/apache/druid/segment/indexing/DataSchemaTest.java index ff081c598b7..8ce50d3817b 100644 --- a/server/src/test/java/org/apache/druid/segment/indexing/DataSchemaTest.java +++ b/server/src/test/java/org/apache/druid/segment/indexing/DataSchemaTest.java @@ -26,12 +26,12 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import org.apache.commons.text.StringEscapeUtils; -import org.apache.druid.common.config.NullHandlingTest; import org.apache.druid.data.input.InputRow; import org.apache.druid.data.input.impl.DimensionsSpec; import org.apache.druid.data.input.impl.JSONParseSpec; import org.apache.druid.data.input.impl.StringInputRowParser; import org.apache.druid.data.input.impl.TimestampSpec; +import org.apache.druid.indexer.TaskIdUtilsTest; import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.IAE; import org.apache.druid.java.util.common.Intervals; @@ -46,6 +46,7 @@ import org.apache.druid.segment.TestHelper; import org.apache.druid.segment.indexing.granularity.ArbitraryGranularitySpec; import org.apache.druid.segment.transform.ExpressionTransform; import org.apache.druid.segment.transform.TransformSpec; +import org.apache.druid.testing.InitializedNullHandlingTest; import org.hamcrest.CoreMatchers; import org.junit.Assert; import org.junit.Rule; @@ -60,11 +61,8 @@ import java.util.Collections; import java.util.List; import java.util.Map; -public class DataSchemaTest extends NullHandlingTest +public class DataSchemaTest extends InitializedNullHandlingTest { - - private static final String VALID_DATASOURCE_CHARS_NAME = "alpha123..*~!@#&%^&*()-+ Россия\\ 한국 中国!"; - @Rule public ExpectedException expectedException = ExpectedException.none(); @@ -86,7 +84,7 @@ public class DataSchemaTest extends NullHandlingTest ); DataSchema schema = new DataSchema( - VALID_DATASOURCE_CHARS_NAME, + TaskIdUtilsTest.VALID_ID_CHARS, parser, new AggregatorFactory[]{ new DoubleSumAggregatorFactory("metric1", "col1"), @@ -123,7 +121,7 @@ public class DataSchemaTest extends NullHandlingTest ); DataSchema schema = new DataSchema( - VALID_DATASOURCE_CHARS_NAME, + TaskIdUtilsTest.VALID_ID_CHARS, parser, new AggregatorFactory[]{ new DoubleSumAggregatorFactory("metric1", "col1"), @@ -160,7 +158,7 @@ public class DataSchemaTest extends NullHandlingTest ); DataSchema schema = new DataSchema( - VALID_DATASOURCE_CHARS_NAME, + TaskIdUtilsTest.VALID_ID_CHARS, parserMap, new AggregatorFactory[]{ new DoubleSumAggregatorFactory("metric1", "col1"), @@ -218,7 +216,7 @@ public class DataSchemaTest extends NullHandlingTest ); DataSchema schema = new DataSchema( - VALID_DATASOURCE_CHARS_NAME, + TaskIdUtilsTest.VALID_ID_CHARS, parser, new AggregatorFactory[]{ new DoubleSumAggregatorFactory("metric1", "col1"), @@ -251,7 +249,7 @@ public class DataSchemaTest extends NullHandlingTest ); DataSchema schema = new DataSchema( - VALID_DATASOURCE_CHARS_NAME, + TaskIdUtilsTest.VALID_ID_CHARS, parser, new AggregatorFactory[]{ new DoubleSumAggregatorFactory("metric1", "col1"), @@ -269,7 +267,7 @@ public class DataSchemaTest extends NullHandlingTest public void testSerdeWithInvalidParserMap() throws Exception { String jsonStr = "{" - + "\"dataSource\":\"" + StringEscapeUtils.escapeJson(VALID_DATASOURCE_CHARS_NAME) + "\"," + + "\"dataSource\":\"" + StringEscapeUtils.escapeJson(TaskIdUtilsTest.VALID_ID_CHARS) + "\"," + "\"parser\":{\"type\":\"invalid\"}," + "\"metricsSpec\":[{\"type\":\"doubleSum\",\"name\":\"metric1\",\"fieldName\":\"col1\"}]," + "\"granularitySpec\":{" @@ -372,7 +370,7 @@ public class DataSchemaTest extends NullHandlingTest public void testSerde() throws Exception { String jsonStr = "{" - + "\"dataSource\":\"" + StringEscapeUtils.escapeJson(VALID_DATASOURCE_CHARS_NAME) + "\"," + + "\"dataSource\":\"" + StringEscapeUtils.escapeJson(TaskIdUtilsTest.VALID_ID_CHARS) + "\"," + "\"parser\":{" + "\"type\":\"string\"," + "\"parseSpec\":{" @@ -396,7 +394,7 @@ public class DataSchemaTest extends NullHandlingTest DataSchema.class ); - Assert.assertEquals(actual.getDataSource(), VALID_DATASOURCE_CHARS_NAME); + Assert.assertEquals(actual.getDataSource(), TaskIdUtilsTest.VALID_ID_CHARS); Assert.assertEquals( actual.getParser().getParseSpec(), new JSONParseSpec( @@ -476,7 +474,7 @@ public class DataSchemaTest extends NullHandlingTest ); DataSchema originalSchema = new DataSchema( - VALID_DATASOURCE_CHARS_NAME, + TaskIdUtilsTest.VALID_ID_CHARS, parser, new AggregatorFactory[]{ new DoubleSumAggregatorFactory("metric1", "col1"), @@ -515,7 +513,7 @@ public class DataSchemaTest extends NullHandlingTest ); TestModifiedDataSchema originalSchema = new TestModifiedDataSchema( - VALID_DATASOURCE_CHARS_NAME, + TaskIdUtilsTest.VALID_ID_CHARS, null, null, new AggregatorFactory[]{