mirror of https://github.com/apache/druid.git
check paths used for shuffle intermediary data manager get and delete (#9630)
* check paths used for shuffle intermediary data manager get and delete * add test * newline * meh
This commit is contained in:
parent
79522f3e25
commit
d267b1c414
|
@ -0,0 +1,63 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.apache.druid.indexer;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.base.Strings;
|
||||
import org.apache.druid.java.util.common.StringUtils;
|
||||
|
||||
import java.util.concurrent.ThreadLocalRandom;
|
||||
import java.util.regex.Matcher;
|
||||
import java.util.regex.Pattern;
|
||||
|
||||
public class TaskIdUtils
|
||||
{
|
||||
private static final Pattern INVALIDCHARS = Pattern.compile("(?s).*[^\\S ].*");
|
||||
|
||||
public static void validateId(String thingToValidate, String stringToValidate)
|
||||
{
|
||||
Preconditions.checkArgument(
|
||||
!Strings.isNullOrEmpty(stringToValidate),
|
||||
StringUtils.format("%s cannot be null or empty. Please provide a %s.", thingToValidate, thingToValidate)
|
||||
);
|
||||
Preconditions.checkArgument(
|
||||
!stringToValidate.startsWith("."),
|
||||
StringUtils.format("%s cannot start with the '.' character.", thingToValidate)
|
||||
);
|
||||
Preconditions.checkArgument(
|
||||
!stringToValidate.contains("/"),
|
||||
StringUtils.format("%s cannot contain the '/' character.", thingToValidate)
|
||||
);
|
||||
Matcher m = INVALIDCHARS.matcher(stringToValidate);
|
||||
Preconditions.checkArgument(
|
||||
!m.matches(),
|
||||
StringUtils.format("%s cannot contain whitespace character except space.", thingToValidate)
|
||||
);
|
||||
}
|
||||
|
||||
public static String getRandomId()
|
||||
{
|
||||
final StringBuilder suffix = new StringBuilder(8);
|
||||
for (int i = 0; i < Integer.BYTES * 2; ++i) {
|
||||
suffix.append((char) ('a' + ((ThreadLocalRandom.current().nextInt() >>> (i * 4)) & 0x0F)));
|
||||
}
|
||||
return suffix.toString();
|
||||
}
|
||||
}
|
|
@ -0,0 +1,111 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.apache.druid.indexer;
|
||||
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
import org.junit.rules.ExpectedException;
|
||||
|
||||
public class TaskIdUtilsTest
|
||||
{
|
||||
private static final String THINGO = "thingToValidate";
|
||||
public static final String VALID_ID_CHARS = "alpha123..*~!@#&%^&*()-+ Россия\\ 한국 中国!";
|
||||
|
||||
@Rule
|
||||
public ExpectedException expectedException = ExpectedException.none();
|
||||
|
||||
@Test
|
||||
public void testValidIdName()
|
||||
{
|
||||
TaskIdUtils.validateId(THINGO, VALID_ID_CHARS);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testInvalidNull()
|
||||
{
|
||||
expectedException.expect(IllegalArgumentException.class);
|
||||
expectedException.expectMessage("thingToValidate cannot be null or empty. Please provide a thingToValidate.");
|
||||
TaskIdUtils.validateId(THINGO, null);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testInvalidEmpty()
|
||||
{
|
||||
expectedException.expect(IllegalArgumentException.class);
|
||||
expectedException.expectMessage("thingToValidate cannot be null or empty. Please provide a thingToValidate.");
|
||||
TaskIdUtils.validateId(THINGO, "");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testInvalidSlashes()
|
||||
{
|
||||
expectedException.expect(IllegalArgumentException.class);
|
||||
expectedException.expectMessage("thingToValidate cannot contain the '/' character.");
|
||||
TaskIdUtils.validateId(THINGO, "/paths/are/bad/since/we/make/files/from/stuff");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testInvalidLeadingDot()
|
||||
{
|
||||
expectedException.expect(IllegalArgumentException.class);
|
||||
expectedException.expectMessage("thingToValidate cannot start with the '.' character.");
|
||||
TaskIdUtils.validateId(THINGO, "./nice/try");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testInvalidSpacesRegexTabs()
|
||||
{
|
||||
expectedException.expect(IllegalArgumentException.class);
|
||||
expectedException.expectMessage("thingToValidate cannot contain whitespace character except space.");
|
||||
TaskIdUtils.validateId(THINGO, "spaces\tare\tbetter\tthan\ttabs\twhich\tare\tillegal");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testInvalidSpacesRegexNewline()
|
||||
{
|
||||
expectedException.expect(IllegalArgumentException.class);
|
||||
expectedException.expectMessage("thingToValidate cannot contain whitespace character except space.");
|
||||
TaskIdUtils.validateId(THINGO, "new\nline");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testInvalidSpacesRegexCarriageReturn()
|
||||
{
|
||||
expectedException.expect(IllegalArgumentException.class);
|
||||
expectedException.expectMessage("thingToValidate cannot contain whitespace character except space.");
|
||||
TaskIdUtils.validateId(THINGO, "does\rexist\rby\ritself");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testInvalidSpacesRegexLineTabulation()
|
||||
{
|
||||
expectedException.expect(IllegalArgumentException.class);
|
||||
expectedException.expectMessage("thingToValidate cannot contain whitespace character except space.");
|
||||
TaskIdUtils.validateId(THINGO, "wtf\u000Bis line tabulation");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testInvalidSpacesRegexFormFeed()
|
||||
{
|
||||
expectedException.expect(IllegalArgumentException.class);
|
||||
expectedException.expectMessage("thingToValidate cannot contain whitespace character except space.");
|
||||
TaskIdUtils.validateId(THINGO, "form\u000cfeed?");
|
||||
}
|
||||
}
|
|
@ -19,7 +19,7 @@
|
|||
|
||||
package org.apache.druid.indexing.kafka;
|
||||
|
||||
import org.apache.druid.indexing.common.task.utils.RandomIdUtils;
|
||||
import org.apache.druid.indexer.TaskIdUtils;
|
||||
import org.apache.druid.java.util.common.StringUtils;
|
||||
|
||||
import java.util.HashMap;
|
||||
|
@ -35,7 +35,7 @@ public class KafkaConsumerConfigs
|
|||
{
|
||||
final Map<String, Object> props = new HashMap<>();
|
||||
props.put("metadata.max.age.ms", "10000");
|
||||
props.put("group.id", StringUtils.format("kafka-supervisor-%s", RandomIdUtils.getRandomId()));
|
||||
props.put("group.id", StringUtils.format("kafka-supervisor-%s", TaskIdUtils.getRandomId()));
|
||||
props.put("auto.offset.reset", "none");
|
||||
props.put("enable.auto.commit", "false");
|
||||
props.put("isolation.level", "read_committed");
|
||||
|
|
|
@ -24,10 +24,10 @@ import com.fasterxml.jackson.core.type.TypeReference;
|
|||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.base.Joiner;
|
||||
import org.apache.druid.indexer.TaskIdUtils;
|
||||
import org.apache.druid.indexing.common.stats.RowIngestionMetersFactory;
|
||||
import org.apache.druid.indexing.common.task.Task;
|
||||
import org.apache.druid.indexing.common.task.TaskResource;
|
||||
import org.apache.druid.indexing.common.task.utils.RandomIdUtils;
|
||||
import org.apache.druid.indexing.kafka.KafkaDataSourceMetadata;
|
||||
import org.apache.druid.indexing.kafka.KafkaIndexTask;
|
||||
import org.apache.druid.indexing.kafka.KafkaIndexTaskClientFactory;
|
||||
|
@ -221,7 +221,7 @@ public class KafkaSupervisor extends SeekableStreamSupervisor<Integer, Long>
|
|||
|
||||
List<SeekableStreamIndexTask<Integer, Long>> taskList = new ArrayList<>();
|
||||
for (int i = 0; i < replicas; i++) {
|
||||
String taskId = Joiner.on("_").join(baseSequenceName, RandomIdUtils.getRandomId());
|
||||
String taskId = Joiner.on("_").join(baseSequenceName, TaskIdUtils.getRandomId());
|
||||
taskList.add(new KafkaIndexTask(
|
||||
taskId,
|
||||
new TaskResource(baseSequenceName, 1),
|
||||
|
|
|
@ -25,10 +25,10 @@ import com.fasterxml.jackson.databind.ObjectMapper;
|
|||
import com.google.common.base.Joiner;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import org.apache.druid.common.aws.AWSCredentialsConfig;
|
||||
import org.apache.druid.indexer.TaskIdUtils;
|
||||
import org.apache.druid.indexing.common.stats.RowIngestionMetersFactory;
|
||||
import org.apache.druid.indexing.common.task.Task;
|
||||
import org.apache.druid.indexing.common.task.TaskResource;
|
||||
import org.apache.druid.indexing.common.task.utils.RandomIdUtils;
|
||||
import org.apache.druid.indexing.kinesis.KinesisDataSourceMetadata;
|
||||
import org.apache.druid.indexing.kinesis.KinesisIndexTask;
|
||||
import org.apache.druid.indexing.kinesis.KinesisIndexTaskClientFactory;
|
||||
|
@ -169,7 +169,7 @@ public class KinesisSupervisor extends SeekableStreamSupervisor<String, String>
|
|||
|
||||
List<SeekableStreamIndexTask<String, String>> taskList = new ArrayList<>();
|
||||
for (int i = 0; i < replicas; i++) {
|
||||
String taskId = Joiner.on("_").join(baseSequenceName, RandomIdUtils.getRandomId());
|
||||
String taskId = Joiner.on("_").join(baseSequenceName, TaskIdUtils.getRandomId());
|
||||
taskList.add(new KinesisIndexTask(
|
||||
taskId,
|
||||
new TaskResource(baseSequenceName, 1),
|
||||
|
|
|
@ -24,11 +24,11 @@ import com.fasterxml.jackson.annotation.JsonProperty;
|
|||
import com.google.common.base.Joiner;
|
||||
import com.google.common.base.Objects;
|
||||
import com.google.common.base.Preconditions;
|
||||
import org.apache.druid.indexer.TaskIdUtils;
|
||||
import org.apache.druid.indexer.TaskStatus;
|
||||
import org.apache.druid.indexing.common.TaskLock;
|
||||
import org.apache.druid.indexing.common.actions.LockListAction;
|
||||
import org.apache.druid.indexing.common.actions.TaskActionClient;
|
||||
import org.apache.druid.indexing.common.task.utils.RandomIdUtils;
|
||||
import org.apache.druid.java.util.common.DateTimes;
|
||||
import org.apache.druid.query.Query;
|
||||
import org.apache.druid.query.QueryRunner;
|
||||
|
@ -92,7 +92,7 @@ public abstract class AbstractTask implements Task
|
|||
}
|
||||
|
||||
final List<Object> objects = new ArrayList<>();
|
||||
final String suffix = RandomIdUtils.getRandomId();
|
||||
final String suffix = TaskIdUtils.getRandomId();
|
||||
objects.add(typeName);
|
||||
objects.add(dataSource);
|
||||
objects.add(suffix);
|
||||
|
|
|
@ -1,34 +0,0 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.apache.druid.indexing.common.task.utils;
|
||||
|
||||
import java.util.concurrent.ThreadLocalRandom;
|
||||
|
||||
public class RandomIdUtils
|
||||
{
|
||||
public static String getRandomId()
|
||||
{
|
||||
final StringBuilder suffix = new StringBuilder(8);
|
||||
for (int i = 0; i < Integer.BYTES * 2; ++i) {
|
||||
suffix.append((char) ('a' + ((ThreadLocalRandom.current().nextInt() >>> (i * 4)) & 0x0F)));
|
||||
}
|
||||
return suffix.toString();
|
||||
}
|
||||
}
|
|
@ -430,13 +430,13 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
|
|||
}
|
||||
}
|
||||
|
||||
// Map<{group RandomIdUtils}, {actively reading task group}>; see documentation for TaskGroup class
|
||||
// Map<{group id}, {actively reading task group}>; see documentation for TaskGroup class
|
||||
private final ConcurrentHashMap<Integer, TaskGroup> activelyReadingTaskGroups = new ConcurrentHashMap<>();
|
||||
|
||||
// After telling a taskGroup to stop reading and begin publishing a segment, it is moved from [activelyReadingTaskGroups] to here so
|
||||
// we can monitor its status while we queue new tasks to read the next range of sequences. This is a list since we could
|
||||
// have multiple sets of tasks publishing at once if time-to-publish > taskDuration.
|
||||
// Map<{group RandomIdUtils}, List<{pending completion task groups}>>
|
||||
// Map<{group id}, List<{pending completion task groups}>>
|
||||
private final ConcurrentHashMap<Integer, CopyOnWriteArrayList<TaskGroup>> pendingCompletionTaskGroups = new ConcurrentHashMap<>();
|
||||
|
||||
// We keep two separate maps for tracking the current state of partition->task group mappings [partitionGroups] and partition->offset
|
||||
|
@ -998,7 +998,7 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
|
|||
group.tasks.putAll(tasks.stream().collect(Collectors.toMap(x -> x, x -> new TaskData())));
|
||||
if (activelyReadingTaskGroups.putIfAbsent(taskGroupId, group) != null) {
|
||||
throw new ISE(
|
||||
"trying to add taskGroup with RandomIdUtils [%s] to actively reading task groups, but group already exists.",
|
||||
"trying to add taskGroup with id [%s] to actively reading task groups, but group already exists.",
|
||||
taskGroupId
|
||||
);
|
||||
}
|
||||
|
|
|
@ -27,6 +27,7 @@ import org.apache.commons.lang3.mutable.MutableInt;
|
|||
import org.apache.druid.client.indexing.IndexingServiceClient;
|
||||
import org.apache.druid.client.indexing.TaskStatus;
|
||||
import org.apache.druid.guice.ManageLifecycle;
|
||||
import org.apache.druid.indexer.TaskIdUtils;
|
||||
import org.apache.druid.indexing.common.config.TaskConfig;
|
||||
import org.apache.druid.indexing.worker.config.WorkerConfig;
|
||||
import org.apache.druid.java.util.common.DateTimes;
|
||||
|
@ -336,6 +337,7 @@ public class IntermediaryDataManager
|
|||
@Nullable
|
||||
public File findPartitionFile(String supervisorTaskId, String subTaskId, Interval interval, int partitionId)
|
||||
{
|
||||
TaskIdUtils.validateId("supervisorTaskId", supervisorTaskId);
|
||||
for (StorageLocation location : shuffleDataLocations) {
|
||||
final File partitionDir = new File(location.getPath(), getPartitionDir(supervisorTaskId, interval, partitionId));
|
||||
if (partitionDir.exists()) {
|
||||
|
@ -364,6 +366,7 @@ public class IntermediaryDataManager
|
|||
|
||||
public void deletePartitions(String supervisorTaskId) throws IOException
|
||||
{
|
||||
TaskIdUtils.validateId("supervisorTaskId", supervisorTaskId);
|
||||
for (StorageLocation location : shuffleDataLocations) {
|
||||
final File supervisorTaskPath = new File(location.getPath(), supervisorTaskId);
|
||||
if (supervisorTaskPath.exists()) {
|
||||
|
|
|
@ -26,6 +26,7 @@ import org.apache.druid.client.indexing.NoopIndexingServiceClient;
|
|||
import org.apache.druid.indexing.common.config.TaskConfig;
|
||||
import org.apache.druid.indexing.worker.config.WorkerConfig;
|
||||
import org.apache.druid.java.util.common.Intervals;
|
||||
import org.apache.druid.java.util.common.StringUtils;
|
||||
import org.apache.druid.segment.loading.StorageLocationConfig;
|
||||
import org.apache.druid.timeline.DataSegment;
|
||||
import org.apache.druid.timeline.partition.NumberedShardSpec;
|
||||
|
@ -51,11 +52,15 @@ public class IntermediaryDataManagerManualAddAndDeleteTest
|
|||
public ExpectedException expectedException = ExpectedException.none();
|
||||
|
||||
private IntermediaryDataManager intermediaryDataManager;
|
||||
private File intermediarySegmentsLocation;
|
||||
private File siblingLocation;
|
||||
|
||||
@Before
|
||||
public void setup() throws IOException
|
||||
{
|
||||
final WorkerConfig workerConfig = new WorkerConfig();
|
||||
intermediarySegmentsLocation = tempDir.newFolder();
|
||||
siblingLocation = tempDir.newFolder();
|
||||
final TaskConfig taskConfig = new TaskConfig(
|
||||
null,
|
||||
null,
|
||||
|
@ -65,7 +70,7 @@ public class IntermediaryDataManagerManualAddAndDeleteTest
|
|||
false,
|
||||
null,
|
||||
null,
|
||||
ImmutableList.of(new StorageLocationConfig(tempDir.newFolder(), 600L, null))
|
||||
ImmutableList.of(new StorageLocationConfig(intermediarySegmentsLocation, 600L, null))
|
||||
);
|
||||
final IndexingServiceClient indexingServiceClient = new NoopIndexingServiceClient();
|
||||
intermediaryDataManager = new IntermediaryDataManager(workerConfig, taskConfig, indexingServiceClient);
|
||||
|
@ -157,6 +162,63 @@ public class IntermediaryDataManagerManualAddAndDeleteTest
|
|||
intermediaryDataManager.addSegment(supervisorTaskId, "subTaskId", segment, segmentFile);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFailsWithCraftyFabricatedNamesForDelete() throws IOException
|
||||
{
|
||||
expectedException.expect(IllegalArgumentException.class);
|
||||
expectedException.expectMessage("supervisorTaskId cannot start with the '.' character.");
|
||||
final String supervisorTaskId = "../" + siblingLocation.getName();
|
||||
final String someFile = "sneaky-snake.txt";
|
||||
File dataFile = new File(siblingLocation, someFile);
|
||||
FileUtils.write(
|
||||
dataFile,
|
||||
"test data",
|
||||
StandardCharsets.UTF_8
|
||||
);
|
||||
Assert.assertTrue(new File(intermediarySegmentsLocation, supervisorTaskId).exists());
|
||||
Assert.assertTrue(dataFile.exists());
|
||||
intermediaryDataManager.deletePartitions(supervisorTaskId);
|
||||
Assert.assertTrue(new File(intermediarySegmentsLocation, supervisorTaskId).exists());
|
||||
Assert.assertTrue(dataFile.exists());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFailsWithCraftyFabricatedNamesForFind() throws IOException
|
||||
{
|
||||
expectedException.expect(IllegalArgumentException.class);
|
||||
expectedException.expectMessage("supervisorTaskId cannot start with the '.' character.");
|
||||
final String supervisorTaskId = "../" + siblingLocation.getName();
|
||||
final Interval interval = Intervals.of("2018/2019");
|
||||
final int partitionId = 0;
|
||||
final String intervalAndPart =
|
||||
StringUtils.format("%s/%s/%s", interval.getStart().toString(), interval.getEnd().toString(), partitionId);
|
||||
|
||||
final String someFile = "sneaky-snake.txt";
|
||||
|
||||
final String someFilePath = intervalAndPart + "/" + someFile;
|
||||
|
||||
// can only traverse to find files that are in a path of the form {start}/{end}/{partitionId}, so write a data file
|
||||
// in a location like that
|
||||
File dataFile = new File(siblingLocation, someFilePath);
|
||||
FileUtils.write(
|
||||
dataFile,
|
||||
"test data",
|
||||
StandardCharsets.UTF_8
|
||||
);
|
||||
|
||||
Assert.assertTrue(new File(intermediarySegmentsLocation, supervisorTaskId).exists());
|
||||
Assert.assertTrue(
|
||||
new File(intermediarySegmentsLocation, supervisorTaskId + "/" + someFilePath).exists());
|
||||
|
||||
final File foundFile1 = intermediaryDataManager.findPartitionFile(
|
||||
supervisorTaskId,
|
||||
someFile,
|
||||
interval,
|
||||
partitionId
|
||||
);
|
||||
Assert.assertNull(foundFile1);
|
||||
}
|
||||
|
||||
private File generateSegmentDir(String fileName) throws IOException
|
||||
{
|
||||
// Each file size is 138 bytes after compression
|
||||
|
|
|
@ -28,7 +28,7 @@ import kafka.utils.ZkUtils;
|
|||
import org.I0Itec.zkclient.ZkClient;
|
||||
import org.I0Itec.zkclient.ZkConnection;
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.apache.druid.indexing.common.task.utils.RandomIdUtils;
|
||||
import org.apache.druid.indexer.TaskIdUtils;
|
||||
import org.apache.druid.indexing.kafka.KafkaConsumerConfigs;
|
||||
import org.apache.druid.java.util.common.DateTimes;
|
||||
import org.apache.druid.java.util.common.ISE;
|
||||
|
@ -165,7 +165,7 @@ abstract class AbstractKafkaIndexerTest extends AbstractIndexerTest
|
|||
properties.setProperty("value.serializer", ByteArraySerializer.class.getName());
|
||||
if (txnEnabled) {
|
||||
properties.setProperty("enable.idempotence", "true");
|
||||
properties.setProperty("transactional.id", RandomIdUtils.getRandomId());
|
||||
properties.setProperty("transactional.id", TaskIdUtils.getRandomId());
|
||||
}
|
||||
|
||||
KafkaProducer<String, String> producer = new KafkaProducer<>(
|
||||
|
|
|
@ -27,12 +27,12 @@ import com.fasterxml.jackson.annotation.JsonProperty;
|
|||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.base.Strings;
|
||||
import com.google.common.collect.Sets;
|
||||
import org.apache.druid.data.input.impl.DimensionsSpec;
|
||||
import org.apache.druid.data.input.impl.InputRowParser;
|
||||
import org.apache.druid.data.input.impl.ParseSpec;
|
||||
import org.apache.druid.data.input.impl.TimestampSpec;
|
||||
import org.apache.druid.indexer.TaskIdUtils;
|
||||
import org.apache.druid.java.util.common.IAE;
|
||||
import org.apache.druid.java.util.common.logger.Logger;
|
||||
import org.apache.druid.query.aggregation.AggregatorFactory;
|
||||
|
@ -45,7 +45,6 @@ import java.util.Arrays;
|
|||
import java.util.HashSet;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.regex.Matcher;
|
||||
import java.util.regex.Pattern;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
|
@ -148,16 +147,7 @@ public class DataSchema
|
|||
|
||||
private static void validateDatasourceName(String dataSource)
|
||||
{
|
||||
Preconditions.checkArgument(
|
||||
!Strings.isNullOrEmpty(dataSource),
|
||||
"dataSource cannot be null or empty. Please provide a dataSource."
|
||||
);
|
||||
Matcher m = INVALIDCHARS.matcher(dataSource);
|
||||
Preconditions.checkArgument(
|
||||
!m.matches(),
|
||||
"dataSource cannot contain whitespace character except space."
|
||||
);
|
||||
Preconditions.checkArgument(!dataSource.contains("/"), "dataSource cannot contain the '/' character.");
|
||||
TaskIdUtils.validateId("dataSource", dataSource);
|
||||
}
|
||||
|
||||
private static DimensionsSpec computeDimensionsSpec(
|
||||
|
|
|
@ -26,12 +26,12 @@ import com.google.common.collect.ImmutableList;
|
|||
import com.google.common.collect.ImmutableMap;
|
||||
import com.google.common.collect.ImmutableSet;
|
||||
import org.apache.commons.text.StringEscapeUtils;
|
||||
import org.apache.druid.common.config.NullHandlingTest;
|
||||
import org.apache.druid.data.input.InputRow;
|
||||
import org.apache.druid.data.input.impl.DimensionsSpec;
|
||||
import org.apache.druid.data.input.impl.JSONParseSpec;
|
||||
import org.apache.druid.data.input.impl.StringInputRowParser;
|
||||
import org.apache.druid.data.input.impl.TimestampSpec;
|
||||
import org.apache.druid.indexer.TaskIdUtilsTest;
|
||||
import org.apache.druid.java.util.common.DateTimes;
|
||||
import org.apache.druid.java.util.common.IAE;
|
||||
import org.apache.druid.java.util.common.Intervals;
|
||||
|
@ -46,6 +46,7 @@ import org.apache.druid.segment.TestHelper;
|
|||
import org.apache.druid.segment.indexing.granularity.ArbitraryGranularitySpec;
|
||||
import org.apache.druid.segment.transform.ExpressionTransform;
|
||||
import org.apache.druid.segment.transform.TransformSpec;
|
||||
import org.apache.druid.testing.InitializedNullHandlingTest;
|
||||
import org.hamcrest.CoreMatchers;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Rule;
|
||||
|
@ -60,11 +61,8 @@ import java.util.Collections;
|
|||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
public class DataSchemaTest extends NullHandlingTest
|
||||
public class DataSchemaTest extends InitializedNullHandlingTest
|
||||
{
|
||||
|
||||
private static final String VALID_DATASOURCE_CHARS_NAME = "alpha123..*~!@#&%^&*()-+ Россия\\ 한국 中国!";
|
||||
|
||||
@Rule
|
||||
public ExpectedException expectedException = ExpectedException.none();
|
||||
|
||||
|
@ -86,7 +84,7 @@ public class DataSchemaTest extends NullHandlingTest
|
|||
);
|
||||
|
||||
DataSchema schema = new DataSchema(
|
||||
VALID_DATASOURCE_CHARS_NAME,
|
||||
TaskIdUtilsTest.VALID_ID_CHARS,
|
||||
parser,
|
||||
new AggregatorFactory[]{
|
||||
new DoubleSumAggregatorFactory("metric1", "col1"),
|
||||
|
@ -123,7 +121,7 @@ public class DataSchemaTest extends NullHandlingTest
|
|||
);
|
||||
|
||||
DataSchema schema = new DataSchema(
|
||||
VALID_DATASOURCE_CHARS_NAME,
|
||||
TaskIdUtilsTest.VALID_ID_CHARS,
|
||||
parser,
|
||||
new AggregatorFactory[]{
|
||||
new DoubleSumAggregatorFactory("metric1", "col1"),
|
||||
|
@ -160,7 +158,7 @@ public class DataSchemaTest extends NullHandlingTest
|
|||
);
|
||||
|
||||
DataSchema schema = new DataSchema(
|
||||
VALID_DATASOURCE_CHARS_NAME,
|
||||
TaskIdUtilsTest.VALID_ID_CHARS,
|
||||
parserMap,
|
||||
new AggregatorFactory[]{
|
||||
new DoubleSumAggregatorFactory("metric1", "col1"),
|
||||
|
@ -218,7 +216,7 @@ public class DataSchemaTest extends NullHandlingTest
|
|||
);
|
||||
|
||||
DataSchema schema = new DataSchema(
|
||||
VALID_DATASOURCE_CHARS_NAME,
|
||||
TaskIdUtilsTest.VALID_ID_CHARS,
|
||||
parser,
|
||||
new AggregatorFactory[]{
|
||||
new DoubleSumAggregatorFactory("metric1", "col1"),
|
||||
|
@ -251,7 +249,7 @@ public class DataSchemaTest extends NullHandlingTest
|
|||
);
|
||||
|
||||
DataSchema schema = new DataSchema(
|
||||
VALID_DATASOURCE_CHARS_NAME,
|
||||
TaskIdUtilsTest.VALID_ID_CHARS,
|
||||
parser,
|
||||
new AggregatorFactory[]{
|
||||
new DoubleSumAggregatorFactory("metric1", "col1"),
|
||||
|
@ -269,7 +267,7 @@ public class DataSchemaTest extends NullHandlingTest
|
|||
public void testSerdeWithInvalidParserMap() throws Exception
|
||||
{
|
||||
String jsonStr = "{"
|
||||
+ "\"dataSource\":\"" + StringEscapeUtils.escapeJson(VALID_DATASOURCE_CHARS_NAME) + "\","
|
||||
+ "\"dataSource\":\"" + StringEscapeUtils.escapeJson(TaskIdUtilsTest.VALID_ID_CHARS) + "\","
|
||||
+ "\"parser\":{\"type\":\"invalid\"},"
|
||||
+ "\"metricsSpec\":[{\"type\":\"doubleSum\",\"name\":\"metric1\",\"fieldName\":\"col1\"}],"
|
||||
+ "\"granularitySpec\":{"
|
||||
|
@ -372,7 +370,7 @@ public class DataSchemaTest extends NullHandlingTest
|
|||
public void testSerde() throws Exception
|
||||
{
|
||||
String jsonStr = "{"
|
||||
+ "\"dataSource\":\"" + StringEscapeUtils.escapeJson(VALID_DATASOURCE_CHARS_NAME) + "\","
|
||||
+ "\"dataSource\":\"" + StringEscapeUtils.escapeJson(TaskIdUtilsTest.VALID_ID_CHARS) + "\","
|
||||
+ "\"parser\":{"
|
||||
+ "\"type\":\"string\","
|
||||
+ "\"parseSpec\":{"
|
||||
|
@ -396,7 +394,7 @@ public class DataSchemaTest extends NullHandlingTest
|
|||
DataSchema.class
|
||||
);
|
||||
|
||||
Assert.assertEquals(actual.getDataSource(), VALID_DATASOURCE_CHARS_NAME);
|
||||
Assert.assertEquals(actual.getDataSource(), TaskIdUtilsTest.VALID_ID_CHARS);
|
||||
Assert.assertEquals(
|
||||
actual.getParser().getParseSpec(),
|
||||
new JSONParseSpec(
|
||||
|
@ -476,7 +474,7 @@ public class DataSchemaTest extends NullHandlingTest
|
|||
);
|
||||
|
||||
DataSchema originalSchema = new DataSchema(
|
||||
VALID_DATASOURCE_CHARS_NAME,
|
||||
TaskIdUtilsTest.VALID_ID_CHARS,
|
||||
parser,
|
||||
new AggregatorFactory[]{
|
||||
new DoubleSumAggregatorFactory("metric1", "col1"),
|
||||
|
@ -515,7 +513,7 @@ public class DataSchemaTest extends NullHandlingTest
|
|||
);
|
||||
|
||||
TestModifiedDataSchema originalSchema = new TestModifiedDataSchema(
|
||||
VALID_DATASOURCE_CHARS_NAME,
|
||||
TaskIdUtilsTest.VALID_ID_CHARS,
|
||||
null,
|
||||
null,
|
||||
new AggregatorFactory[]{
|
||||
|
|
Loading…
Reference in New Issue