Modify batch index task naming to accomodate simultaneous tasks (#8612)

* Change hadoop task naming

* Remove unused

* Add timestamp

* Fix build
This commit is contained in:
Atul Mohan 2019-11-18 17:07:16 -06:00 committed by Jihoon Son
parent d60978343a
commit 8515a03c6b
10 changed files with 13 additions and 26 deletions

View File

@ -19,7 +19,7 @@
package org.apache.druid.indexing.kafka;
import org.apache.druid.indexing.seekablestream.utils.RandomIdUtils;
import org.apache.druid.indexing.common.task.utils.RandomIdUtils;
import org.apache.druid.java.util.common.StringUtils;
import java.util.HashMap;

View File

@ -69,7 +69,7 @@ public class KafkaIndexTask extends SeekableStreamIndexTask<Integer, Long>
)
{
super(
id == null ? getFormattedId(dataSchema.getDataSource(), TYPE) : id,
getOrMakeId(id, dataSchema.getDataSource(), TYPE),
taskResource,
dataSchema,
tuningConfig,

View File

@ -27,6 +27,7 @@ import com.google.common.base.Joiner;
import org.apache.druid.indexing.common.stats.RowIngestionMetersFactory;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.common.task.TaskResource;
import org.apache.druid.indexing.common.task.utils.RandomIdUtils;
import org.apache.druid.indexing.kafka.KafkaDataSourceMetadata;
import org.apache.druid.indexing.kafka.KafkaIndexTask;
import org.apache.druid.indexing.kafka.KafkaIndexTaskClientFactory;
@ -49,7 +50,6 @@ import org.apache.druid.indexing.seekablestream.common.StreamPartition;
import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor;
import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorIOConfig;
import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorReportPayload;
import org.apache.druid.indexing.seekablestream.utils.RandomIdUtils;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.emitter.EmittingLogger;

View File

@ -57,7 +57,7 @@ public class KinesisIndexTask extends SeekableStreamIndexTask<String, String>
)
{
super(
id == null ? getFormattedId(dataSchema.getDataSource(), TYPE) : id,
getOrMakeId(id, dataSchema.getDataSource(), TYPE),
taskResource,
dataSchema,
tuningConfig,

View File

@ -30,6 +30,7 @@ import org.apache.druid.common.aws.AWSCredentialsConfig;
import org.apache.druid.indexing.common.stats.RowIngestionMetersFactory;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.common.task.TaskResource;
import org.apache.druid.indexing.common.task.utils.RandomIdUtils;
import org.apache.druid.indexing.kinesis.KinesisDataSourceMetadata;
import org.apache.druid.indexing.kinesis.KinesisIndexTask;
import org.apache.druid.indexing.kinesis.KinesisIndexTaskClientFactory;
@ -54,7 +55,6 @@ import org.apache.druid.indexing.seekablestream.common.StreamPartition;
import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor;
import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorIOConfig;
import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorReportPayload;
import org.apache.druid.indexing.seekablestream.utils.RandomIdUtils;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.joda.time.DateTime;

View File

@ -28,6 +28,7 @@ import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.TaskLock;
import org.apache.druid.indexing.common.actions.LockListAction;
import org.apache.druid.indexing.common.actions.TaskActionClient;
import org.apache.druid.indexing.common.task.utils.RandomIdUtils;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryRunner;
@ -90,8 +91,10 @@ public abstract class AbstractTask implements Task
}
final List<Object> objects = new ArrayList<>();
final String suffix = RandomIdUtils.getRandomId();
objects.add(typeName);
objects.add(dataSource);
objects.add(suffix);
if (interval != null) {
objects.add(interval.getStart());
objects.add(interval.getEnd());

View File

@ -51,7 +51,6 @@ import org.apache.druid.indexing.common.actions.TimeChunkLockTryAcquireAction;
import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.indexing.common.stats.RowIngestionMeters;
import org.apache.druid.indexing.hadoop.OverlordActionBasedUsedSegmentsRetriever;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.JodaUtils;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.granularity.Granularity;
@ -88,6 +87,7 @@ public class HadoopIndexTask extends HadoopTask implements ChatHandler
{
private static final Logger log = new Logger(HadoopIndexTask.class);
private static final String HADOOP_JOB_ID_FILENAME = "mapReduceJobId.json";
private static final String TYPE = "index_hadoop";
private TaskConfig taskConfig = null;
private static String getTheDataSource(HadoopIngestionSpec spec)
@ -152,7 +152,7 @@ public class HadoopIndexTask extends HadoopTask implements ChatHandler
)
{
super(
id != null ? id : StringUtils.format("index_hadoop_%s_%s", getTheDataSource(spec), DateTimes.nowUtc()),
getOrMakeId(id, TYPE, getTheDataSource(spec)),
getTheDataSource(spec),
hadoopDependencyCoordinates == null
? (hadoopCoordinates == null ? null : ImmutableList.of(hadoopCoordinates))

View File

@ -17,20 +17,17 @@
* under the License.
*/
package org.apache.druid.indexing.seekablestream.utils;
package org.apache.druid.indexing.common.task.utils;
import java.util.Random;
import java.util.concurrent.ThreadLocalRandom;
public class RandomIdUtils
{
private static final Random RANDOM = ThreadLocalRandom.current();
public static String getRandomId()
{
final StringBuilder suffix = new StringBuilder(8);
for (int i = 0; i < Integer.BYTES * 2; ++i) {
suffix.append((char) ('a' + ((RANDOM.nextInt() >>> (i * 4)) & 0x0F)));
suffix.append((char) ('a' + ((ThreadLocalRandom.current().nextInt() >>> (i * 4)) & 0x0F)));
}
return suffix.toString();
}

View File

@ -21,7 +21,6 @@ package org.apache.druid.indexing.seekablestream;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Joiner;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.base.Supplier;
@ -40,7 +39,6 @@ import org.apache.druid.indexing.common.task.AbstractTask;
import org.apache.druid.indexing.common.task.TaskResource;
import org.apache.druid.indexing.common.task.Tasks;
import org.apache.druid.indexing.seekablestream.common.RecordSupplier;
import org.apache.druid.indexing.seekablestream.utils.RandomIdUtils;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.parsers.ParseException;
@ -125,17 +123,6 @@ public abstract class SeekableStreamIndexTask<PartitionIdType, SequenceOffsetTyp
: LockGranularity.SEGMENT;
}
private static String makeTaskId(String dataSource, String type)
{
final String suffix = RandomIdUtils.getRandomId();
return Joiner.on("_").join(type, dataSource, suffix);
}
protected static String getFormattedId(String dataSource, String type)
{
return makeTaskId(dataSource, type);
}
protected static String getFormattedGroupId(String dataSource, String type)
{
return StringUtils.format("%s_%s", type, dataSource);

View File

@ -28,8 +28,8 @@ import kafka.utils.ZkUtils;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.ZkConnection;
import org.apache.commons.io.IOUtils;
import org.apache.druid.indexing.common.task.utils.RandomIdUtils;
import org.apache.druid.indexing.kafka.KafkaConsumerConfigs;
import org.apache.druid.indexing.seekablestream.utils.RandomIdUtils;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;