Don't log the entire task spec (#10278)

* Don't log the entire task spec

* fix lgtm

* fix serde

* address comments and add tests

* fix tests

* remove unnecessary codes
This commit is contained in:
Jihoon Son 2020-08-18 11:03:13 -07:00 committed by GitHub
parent 0891b1f833
commit 9a81740281
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
26 changed files with 401 additions and 159 deletions

View File

@ -19,11 +19,18 @@
package org.apache.druid.common.utils;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.StringUtils;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ThreadLocalRandom;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@ -68,4 +75,44 @@ public class IdUtils
{
return UNDERSCORE_JOINER.join(prefix, IdUtils.getRandomId());
}
public static String newTaskId(String typeName, String dataSource, @Nullable Interval interval)
{
return newTaskId(null, typeName, dataSource, interval);
}
public static String newTaskId(@Nullable String idPrefix, String typeName, String dataSource, @Nullable Interval interval)
{
return newTaskId(idPrefix, getRandomId(), DateTimes.nowUtc(), typeName, dataSource, interval);
}
/**
* This method is only visible to outside only for testing.
* Use {@link #newTaskId(String, String, Interval)} or {@link #newTaskId(String, String, String, Interval)} instead.
*/
@VisibleForTesting
static String newTaskId(
@Nullable String idPrefix,
String idSuffix,
DateTime now,
String typeName,
String dataSource,
@Nullable Interval interval
)
{
final List<String> objects = new ArrayList<>();
if (idPrefix != null) {
objects.add(idPrefix);
}
objects.add(typeName);
objects.add(dataSource);
objects.add(idSuffix);
if (interval != null) {
objects.add(interval.getStart().toString());
objects.add(interval.getEnd().toString());
}
objects.add(now.toString());
return String.join("_", objects);
}
}

View File

@ -19,6 +19,9 @@
package org.apache.druid.common.utils;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.Intervals;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
@ -108,4 +111,50 @@ public class IdUtilsTest
expectedException.expectMessage("thingToValidate cannot contain whitespace character except space.");
IdUtils.validateId(THINGO, "form\u000cfeed?");
}
@Test
public void testNewTaskIdWithoutInterval()
{
final String id = IdUtils.newTaskId(
"prefix",
"suffix",
DateTimes.of("2020-01-01"),
"type",
"datasource",
null
);
final String expected = String.join(
"_",
"prefix",
"type",
"datasource",
"suffix",
DateTimes.of("2020-01-01").toString()
);
Assert.assertEquals(expected, id);
}
@Test
public void testNewTaskIdWithInterval()
{
final String id = IdUtils.newTaskId(
"prefix",
"suffix",
DateTimes.of("2020-01-01"),
"type",
"datasource",
Intervals.of("2020-01-01/2020-06-01")
);
final String expected = String.join(
"_",
"prefix",
"type",
"datasource",
"suffix",
DateTimes.of("2020-01-01").toString(),
DateTimes.of("2020-06-01").toString(),
DateTimes.of("2020-01-01").toString()
);
Assert.assertEquals(expected, id);
}
}

View File

@ -21,7 +21,6 @@ package org.apache.druid.indexing.common.task;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Joiner;
import com.google.common.base.Objects;
import com.google.common.base.Preconditions;
import org.apache.druid.common.utils.IdUtils;
@ -29,22 +28,18 @@ import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.TaskLock;
import org.apache.druid.indexing.common.actions.LockListAction;
import org.apache.druid.indexing.common.actions.TaskActionClient;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryRunner;
import org.joda.time.Interval;
import javax.annotation.Nullable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public abstract class AbstractTask implements Task
{
private static final Joiner ID_JOINER = Joiner.on("_");
@JsonIgnore
private final String id;
@ -80,29 +75,18 @@ public abstract class AbstractTask implements Task
this.context = context == null ? new HashMap<>() : new HashMap<>(context);
}
public static String getOrMakeId(String id, final String typeName, String dataSource)
public static String getOrMakeId(@Nullable String id, final String typeName, String dataSource)
{
return getOrMakeId(id, typeName, dataSource, null);
}
static String getOrMakeId(String id, final String typeName, String dataSource, @Nullable Interval interval)
static String getOrMakeId(@Nullable String id, final String typeName, String dataSource, @Nullable Interval interval)
{
if (id != null) {
return id;
}
final List<Object> objects = new ArrayList<>();
final String suffix = IdUtils.getRandomId();
objects.add(typeName);
objects.add(dataSource);
objects.add(suffix);
if (interval != null) {
objects.add(interval.getStart());
objects.add(interval.getEnd());
}
objects.add(DateTimes.nowUtc().toString());
return joinId(objects);
return IdUtils.newTaskId(typeName, dataSource, interval);
}
@JsonProperty
@ -175,23 +159,6 @@ public abstract class AbstractTask implements Task
'}';
}
/**
* Start helper methods
*
* @param objects objects to join
*
* @return string of joined objects
*/
static String joinId(List<Object> objects)
{
return ID_JOINER.join(objects);
}
static String joinId(Object... objects)
{
return ID_JOINER.join(objects);
}
public TaskStatus success()
{
return TaskStatus.success(getId());

View File

@ -183,8 +183,8 @@ public class CompactionTask extends AbstractBatchIndexTask
@JsonCreator
public CompactionTask(
@JsonProperty("id") final String id,
@JsonProperty("resource") final TaskResource taskResource,
@JsonProperty("id") @Nullable final String id,
@JsonProperty("resource") @Nullable final TaskResource taskResource,
@JsonProperty("dataSource") final String dataSource,
@JsonProperty("interval") @Deprecated @Nullable final Interval interval,
@JsonProperty("segments") @Deprecated @Nullable final List<DataSegment> segments,

View File

@ -26,11 +26,6 @@ import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import org.apache.druid.client.indexing.IndexingServiceClient;
import org.apache.druid.data.input.InputFormat;
import org.apache.druid.data.input.InputSource;
import org.apache.druid.data.input.InputSplit;
import org.apache.druid.data.input.SplitHintSpec;
import org.apache.druid.data.input.impl.SplittableInputSource;
import org.apache.druid.indexer.TaskState;
import org.apache.druid.indexer.TaskStatusPlus;
import org.apache.druid.indexing.common.TaskToolbox;
@ -53,7 +48,6 @@ import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
/**
* Base class for different implementations of {@link ParallelIndexTaskRunner}.
@ -195,8 +189,8 @@ public abstract class ParallelIndexPhaseRunner<SubTaskType extends Task, SubTask
if (lastStatus != null) {
LOG.error("Failed because of the failed sub task[%s]", lastStatus.getId());
} else {
final SinglePhaseSubTaskSpec spec = (SinglePhaseSubTaskSpec) taskCompleteEvent.getSpec();
LOG.error("Failed to run sub tasks for inputSplits[%s]", spec.getInputSplit());
final SubTaskSpec<?> spec = taskCompleteEvent.getSpec();
LOG.error("Failed to process spec[%s] with an unknown last status", spec.getId());
}
break;
default:
@ -252,7 +246,7 @@ public abstract class ParallelIndexPhaseRunner<SubTaskType extends Task, SubTask
SubTaskSpec<SubTaskType> spec
)
{
LOG.info("Submit a new task for spec[%s] and inputSplit[%s]", spec.getId(), spec.getInputSplit());
LOG.info("Submit a new task for spec[%s]", spec.getId());
final ListenableFuture<SubTaskCompleteEvent<SubTaskType>> future = taskMonitor.submit(spec);
Futures.addCallback(
future,
@ -269,27 +263,13 @@ public abstract class ParallelIndexPhaseRunner<SubTaskType extends Task, SubTask
public void onFailure(Throwable t)
{
// this callback is called only when there were some problems in TaskMonitor.
LOG.error(t, "Error while running a task for subTaskSpec[%s]", spec);
LOG.error(t, "Error while running a task for spec[%s]", spec.getId());
taskCompleteEvents.offer(SubTaskCompleteEvent.fail(spec, t));
}
}
);
}
private static List<InputSplit> getSplitsIfSplittable(
InputSource inputSource,
InputFormat inputFormat,
@Nullable SplitHintSpec splitHintSpec
) throws IOException
{
if (inputSource instanceof SplittableInputSource) {
final SplittableInputSource<?> splittableInputSource = (SplittableInputSource) inputSource;
return splittableInputSource.createSplits(inputFormat, splitHintSpec).collect(Collectors.toList());
} else {
throw new ISE("inputSource[%s] is not splittable", inputSource.getClass().getSimpleName());
}
}
@Override
public void stopGracefully()
{

View File

@ -39,7 +39,6 @@ import org.apache.druid.indexer.TaskState;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexer.partitions.PartitionsSpec;
import org.apache.druid.indexer.partitions.SingleDimensionPartitionsSpec;
import org.apache.druid.indexing.appenderator.ActionBasedUsedSegmentChecker;
import org.apache.druid.indexing.common.Counters;
import org.apache.druid.indexing.common.TaskLock;
import org.apache.druid.indexing.common.TaskLockType;
@ -73,7 +72,6 @@ import org.apache.druid.segment.indexing.granularity.GranularitySpec;
import org.apache.druid.segment.realtime.appenderator.AppenderatorsManager;
import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec;
import org.apache.druid.segment.realtime.appenderator.TransactionalSegmentPublisher;
import org.apache.druid.segment.realtime.appenderator.UsedSegmentChecker;
import org.apache.druid.segment.realtime.firehose.ChatHandler;
import org.apache.druid.segment.realtime.firehose.ChatHandlerProvider;
import org.apache.druid.segment.realtime.firehose.ChatHandlers;
@ -769,7 +767,6 @@ public class ParallelIndexSupervisorTask extends AbstractBatchIndexTask implemen
private static void publishSegments(TaskToolbox toolbox, Map<String, PushedSegmentsReport> reportsMap)
throws IOException
{
final UsedSegmentChecker usedSegmentChecker = new ActionBasedUsedSegmentChecker(toolbox.getTaskActionClient());
final Set<DataSegment> oldSegments = new HashSet<>();
final Set<DataSegment> newSegments = new HashSet<>();
reportsMap
@ -788,18 +785,7 @@ public class ParallelIndexSupervisorTask extends AbstractBatchIndexTask implemen
if (published) {
LOG.info("Published [%d] segments", newSegments.size());
} else {
LOG.info("Transaction failure while publishing segments, checking if someone else beat us to it.");
final Set<SegmentIdWithShardSpec> segmentsIdentifiers = reportsMap
.values()
.stream()
.flatMap(report -> report.getNewSegments().stream())
.map(SegmentIdWithShardSpec::fromDataSegment)
.collect(Collectors.toSet());
if (usedSegmentChecker.findUsedSegments(segmentsIdentifiers).equals(newSegments)) {
LOG.info("Our segments really do exist, awaiting handoff.");
} else {
throw new ISE("Failed to publish segments[%s]", newSegments);
}
throw new ISE("Failed to publish segments");
}
}

View File

@ -277,13 +277,13 @@ public class TaskMonitor<T extends Task>
{
T task = spec.newSubTask(numAttempts);
try {
indexingServiceClient.runTask(task);
indexingServiceClient.runTask(task.getId(), task);
}
catch (Exception e) {
if (isUnknownTypeIdException(e)) {
log.warn(e, "Got an unknown type id error. Retrying with a backward compatible type.");
task = spec.newSubTaskWithBackwardCompatibleType(numAttempts);
indexingServiceClient.runTask(task);
indexingServiceClient.runTask(task.getId(), task);
} else {
throw e;
}

View File

@ -23,17 +23,22 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.introspect.AnnotationIntrospectorPair;
import com.fasterxml.jackson.databind.jsontype.NamedType;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import org.apache.druid.client.coordinator.CoordinatorClient;
import org.apache.druid.client.indexing.ClientCompactionIOConfig;
import org.apache.druid.client.indexing.ClientCompactionIntervalSpec;
import org.apache.druid.client.indexing.ClientCompactionTaskQuery;
import org.apache.druid.client.indexing.ClientCompactionTaskQueryTuningConfig;
import org.apache.druid.client.indexing.ClientTaskQuery;
import org.apache.druid.client.indexing.IndexingServiceClient;
import org.apache.druid.client.indexing.NoopIndexingServiceClient;
import org.apache.druid.data.input.SegmentsSplitHintSpec;
import org.apache.druid.guice.GuiceAnnotationIntrospector;
import org.apache.druid.guice.GuiceInjectableValues;
import org.apache.druid.guice.GuiceInjectors;
import org.apache.druid.indexer.partitions.DynamicPartitionsSpec;
import org.apache.druid.indexing.common.RetryPolicyConfig;
import org.apache.druid.indexing.common.RetryPolicyFactory;
import org.apache.druid.indexing.common.SegmentLoaderFactory;
import org.apache.druid.indexing.common.TestUtils;
import org.apache.druid.indexing.common.stats.RowIngestionMetersFactory;
@ -63,10 +68,11 @@ public class ClientCompactionTaskQuerySerdeTest
private static final AppenderatorsManager APPENDERATORS_MANAGER = new TestAppenderatorsManager();
@Test
public void testSerde() throws IOException
public void testClientCompactionTaskQueryToCompactionTask() throws IOException
{
final ObjectMapper mapper = setupInjectablesInObjectMapper(new DefaultObjectMapper());
final ClientCompactionTaskQuery query = new ClientCompactionTaskQuery(
"id",
"datasource",
new ClientCompactionIOConfig(
new ClientCompactionIntervalSpec(
@ -90,12 +96,13 @@ public class ClientCompactionTaskQuerySerdeTest
1000L,
100
),
new HashMap<>()
ImmutableMap.of("key", "value")
);
final byte[] json = mapper.writeValueAsBytes(query);
final CompactionTask task = (CompactionTask) mapper.readValue(json, Task.class);
Assert.assertEquals(query.getId(), task.getId());
Assert.assertEquals(query.getDataSource(), task.getDataSource());
Assert.assertTrue(task.getIoConfig().getInputSpec() instanceof CompactionIntervalSpec);
Assert.assertEquals(
@ -141,6 +148,95 @@ public class ClientCompactionTaskQuerySerdeTest
Assert.assertEquals(query.getContext(), task.getContext());
}
@Test
public void testCompactionTaskToClientCompactionTaskQuery() throws IOException
{
final ObjectMapper mapper = setupInjectablesInObjectMapper(new DefaultObjectMapper());
final CompactionTask.Builder builder = new CompactionTask.Builder(
"datasource",
mapper,
AuthTestUtils.TEST_AUTHORIZER_MAPPER,
new NoopChatHandlerProvider(),
ROW_INGESTION_METERS_FACTORY,
new NoopIndexingServiceClient(),
COORDINATOR_CLIENT,
new SegmentLoaderFactory(null, mapper),
new RetryPolicyFactory(new RetryPolicyConfig()),
APPENDERATORS_MANAGER
);
final CompactionTask task = builder
.inputSpec(new CompactionIntervalSpec(Intervals.of("2019/2020"), "testSha256OfSortedSegmentIds"))
.tuningConfig(
new ParallelIndexTuningConfig(
null,
null,
40000,
2000L,
null,
null,
new SegmentsSplitHintSpec(100000L),
new DynamicPartitionsSpec(100, 30000L),
new IndexSpec(
new DefaultBitmapSerdeFactory(),
CompressionStrategy.LZ4,
CompressionStrategy.LZF,
LongEncodingStrategy.LONGS
),
null,
null,
null,
null,
1000L,
null,
null,
100,
null,
null,
null,
null,
null,
null,
null,
null,
null
)
)
.build();
final ClientCompactionTaskQuery expected = new ClientCompactionTaskQuery(
task.getId(),
"datasource",
new ClientCompactionIOConfig(
new ClientCompactionIntervalSpec(
Intervals.of("2019/2020"),
"testSha256OfSortedSegmentIds"
)
),
new ClientCompactionTaskQueryTuningConfig(
100,
40000,
2000L,
30000L,
new SegmentsSplitHintSpec(100000L),
new IndexSpec(
new DefaultBitmapSerdeFactory(),
CompressionStrategy.LZ4,
CompressionStrategy.LZF,
LongEncodingStrategy.LONGS
),
0,
1000L,
100
),
new HashMap<>()
);
final byte[] json = mapper.writeValueAsBytes(task);
final ClientCompactionTaskQuery actual = (ClientCompactionTaskQuery) mapper.readValue(json, ClientTaskQuery.class);
Assert.assertEquals(expected, actual);
}
private static ObjectMapper setupInjectablesInObjectMapper(ObjectMapper objectMapper)
{
final GuiceAnnotationIntrospector guiceIntrospector = new GuiceAnnotationIntrospector();

View File

@ -0,0 +1,80 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.indexing.common.task;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.jsontype.NamedType;
import org.apache.druid.client.indexing.ClientKillUnusedSegmentsTaskQuery;
import org.apache.druid.client.indexing.ClientTaskQuery;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.Intervals;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
public class ClientKillUnusedSegmentsTaskQuerySerdeTest
{
private ObjectMapper objectMapper;
@Before
public void setup()
{
objectMapper = new DefaultObjectMapper();
objectMapper.registerSubtypes(
new NamedType(ClientKillUnusedSegmentsTaskQuery.class, ClientKillUnusedSegmentsTaskQuery.TYPE)
);
}
@Test
public void testClientKillUnusedSegmentsTaskQueryToKillUnusedSegmentsTask() throws IOException
{
final ClientKillUnusedSegmentsTaskQuery taskQuery = new ClientKillUnusedSegmentsTaskQuery(
"killTaskId",
"datasource",
Intervals.of("2020-01-01/P1D")
);
final byte[] json = objectMapper.writeValueAsBytes(taskQuery);
final KillUnusedSegmentsTask fromJson = (KillUnusedSegmentsTask) objectMapper.readValue(json, Task.class);
Assert.assertEquals(taskQuery.getId(), fromJson.getId());
Assert.assertEquals(taskQuery.getDataSource(), fromJson.getDataSource());
Assert.assertEquals(taskQuery.getInterval(), fromJson.getInterval());
}
@Test
public void testKillUnusedSegmentsTaskToClientKillUnusedSegmentsTaskQuery() throws IOException
{
final KillUnusedSegmentsTask task = new KillUnusedSegmentsTask(
null,
"datasource",
Intervals.of("2020-01-01/P1D"),
null
);
final byte[] json = objectMapper.writeValueAsBytes(task);
final ClientKillUnusedSegmentsTaskQuery taskQuery = (ClientKillUnusedSegmentsTaskQuery) objectMapper.readValue(
json,
ClientTaskQuery.class
);
Assert.assertEquals(task.getId(), taskQuery.getId());
Assert.assertEquals(task.getDataSource(), taskQuery.getDataSource());
Assert.assertEquals(task.getInterval(), taskQuery.getInterval());
}
}

View File

@ -24,7 +24,6 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.jsontype.NamedType;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import org.apache.druid.client.indexing.ClientKillUnusedSegmentsTaskQuery;
import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.data.input.impl.LocalInputSource;
import org.apache.druid.data.input.impl.NoopInputFormat;
@ -383,42 +382,6 @@ public class TaskSerdeTest
Assert.assertTrue(task2.getIngestionSchema().getIOConfig().getInputSource() instanceof LocalInputSource);
}
@Test
public void testKillTaskSerde() throws Exception
{
final KillUnusedSegmentsTask task = new KillUnusedSegmentsTask(
null,
"foo",
Intervals.of("2010-01-01/P1D"),
null
);
final String json = jsonMapper.writeValueAsString(task);
Thread.sleep(100); // Just want to run the clock a bit to make sure the task id doesn't change
final KillUnusedSegmentsTask task2 = (KillUnusedSegmentsTask) jsonMapper.readValue(json, Task.class);
Assert.assertEquals("foo", task.getDataSource());
Assert.assertEquals(Intervals.of("2010-01-01/P1D"), task.getInterval());
Assert.assertEquals(task.getId(), task2.getId());
Assert.assertEquals(task.getGroupId(), task2.getGroupId());
Assert.assertEquals(task.getDataSource(), task2.getDataSource());
Assert.assertEquals(task.getInterval(), task2.getInterval());
final KillUnusedSegmentsTask task3 = (KillUnusedSegmentsTask) jsonMapper.readValue(
jsonMapper.writeValueAsString(
new ClientKillUnusedSegmentsTaskQuery(
"foo",
Intervals.of("2010-01-01/P1D")
)
), Task.class
);
Assert.assertEquals("foo", task3.getDataSource());
Assert.assertEquals(Intervals.of("2010-01-01/P1D"), task3.getInterval());
}
@Test
public void testRealtimeIndexTaskSerde() throws Exception
{

View File

@ -454,7 +454,7 @@ public class AbstractParallelIndexSupervisorTaskTest extends IngestionTestBase
}
@Override
public String runTask(Object taskObject)
public String runTask(String taskId, Object taskObject)
{
final Task task = (Task) taskObject;
return taskRunner.run(injectIfNeeded(task));

View File

@ -83,7 +83,7 @@ public class ParallelIndexSupervisorTaskKillTest extends AbstractParallelIndexSu
false
)
);
getIndexingServiceClient().runTask(task);
getIndexingServiceClient().runTask(task.getId(), task);
while (task.getCurrentRunner() == null) {
Thread.sleep(100);
}

View File

@ -129,7 +129,7 @@ public class ParallelIndexSupervisorTaskResourceTest extends AbstractParallelInd
false
)
);
getIndexingServiceClient().runTask(task);
getIndexingServiceClient().runTask(task.getId(), task);
Thread.sleep(1000);
final SinglePhaseParallelIndexTaskRunner runner = (SinglePhaseParallelIndexTaskRunner) task.getCurrentRunner();

View File

@ -245,7 +245,7 @@ public class TaskMonitorTest
private class TestIndexingServiceClient extends NoopIndexingServiceClient
{
@Override
public String runTask(Object taskObject)
public String runTask(String taskId, Object taskObject)
{
final TestTask task = (TestTask) taskObject;
tasks.put(task.getId(), TaskState.RUNNING);

View File

@ -21,6 +21,7 @@ package org.apache.druid.client.indexing;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import java.util.Map;
import java.util.Objects;
@ -31,6 +32,9 @@ import java.util.Objects;
*/
public class ClientCompactionTaskQuery implements ClientTaskQuery
{
static final String TYPE = "compact";
private final String id;
private final String dataSource;
private final ClientCompactionIOConfig ioConfig;
private final ClientCompactionTaskQueryTuningConfig tuningConfig;
@ -38,23 +42,32 @@ public class ClientCompactionTaskQuery implements ClientTaskQuery
@JsonCreator
public ClientCompactionTaskQuery(
@JsonProperty("id") String id,
@JsonProperty("dataSource") String dataSource,
@JsonProperty("ioConfig") ClientCompactionIOConfig ioConfig,
@JsonProperty("tuningConfig") ClientCompactionTaskQueryTuningConfig tuningConfig,
@JsonProperty("context") Map<String, Object> context
)
{
this.id = Preconditions.checkNotNull(id, "id");
this.dataSource = dataSource;
this.ioConfig = ioConfig;
this.tuningConfig = tuningConfig;
this.context = context;
}
@JsonProperty
@Override
public String getId()
{
return id;
}
@JsonProperty
@Override
public String getType()
{
return "compact";
return TYPE;
}
@JsonProperty
@ -92,7 +105,8 @@ public class ClientCompactionTaskQuery implements ClientTaskQuery
return false;
}
ClientCompactionTaskQuery that = (ClientCompactionTaskQuery) o;
return Objects.equals(dataSource, that.dataSource) &&
return Objects.equals(id, that.id) &&
Objects.equals(dataSource, that.dataSource) &&
Objects.equals(ioConfig, that.ioConfig) &&
Objects.equals(tuningConfig, that.tuningConfig) &&
Objects.equals(context, that.context);
@ -101,14 +115,15 @@ public class ClientCompactionTaskQuery implements ClientTaskQuery
@Override
public int hashCode()
{
return Objects.hash(dataSource, ioConfig, tuningConfig, context);
return Objects.hash(id, dataSource, ioConfig, tuningConfig, context);
}
@Override
public String toString()
{
return "ClientCompactQuery{" +
"dataSource='" + dataSource + '\'' +
return "ClientCompactionTaskQuery{" +
"id='" + id + '\'' +
", dataSource='" + dataSource + '\'' +
", ioConfig=" + ioConfig +
", tuningConfig=" + tuningConfig +
", context=" + context +

View File

@ -21,8 +21,11 @@ package org.apache.druid.client.indexing;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import org.joda.time.Interval;
import java.util.Objects;
/**
* Client representation of org.apache.druid.indexing.common.task.KillUnusedSegmentsTask. JSON searialization
* fields of this class must correspond to those of org.apache.druid.indexing.common.task.KillUnusedSegmentsTask, except
@ -30,24 +33,36 @@ import org.joda.time.Interval;
*/
public class ClientKillUnusedSegmentsTaskQuery implements ClientTaskQuery
{
public static final String TYPE = "kill";
private final String id;
private final String dataSource;
private final Interval interval;
@JsonCreator
public ClientKillUnusedSegmentsTaskQuery(
@JsonProperty("id") String id,
@JsonProperty("dataSource") String dataSource,
@JsonProperty("interval") Interval interval
)
{
this.id = Preconditions.checkNotNull(id, "id");
this.dataSource = dataSource;
this.interval = interval;
}
@JsonProperty
@Override
public String getId()
{
return id;
}
@JsonProperty
@Override
public String getType()
{
return "kill";
return TYPE;
}
@JsonProperty
@ -62,4 +77,25 @@ public class ClientKillUnusedSegmentsTaskQuery implements ClientTaskQuery
{
return interval;
}
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
ClientKillUnusedSegmentsTaskQuery that = (ClientKillUnusedSegmentsTaskQuery) o;
return Objects.equals(id, that.id) &&
Objects.equals(dataSource, that.dataSource) &&
Objects.equals(interval, that.interval);
}
@Override
public int hashCode()
{
return Objects.hash(id, dataSource, interval);
}
}

View File

@ -33,11 +33,13 @@ import com.fasterxml.jackson.annotation.JsonTypeInfo;
*/
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
@JsonSubTypes(value = {
@Type(name = "kill", value = ClientKillUnusedSegmentsTaskQuery.class),
@Type(name = "compact", value = ClientCompactionTaskQuery.class)
@Type(name = ClientKillUnusedSegmentsTaskQuery.TYPE, value = ClientKillUnusedSegmentsTaskQuery.class),
@Type(name = ClientCompactionTaskQuery.TYPE, value = ClientCompactionTaskQuery.class)
})
public interface ClientTaskQuery
{
String getId();
String getType();
String getDataSource();

View File

@ -25,6 +25,7 @@ import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.collect.Iterables;
import com.google.inject.Inject;
import org.apache.druid.common.utils.IdUtils;
import org.apache.druid.discovery.DruidLeaderClient;
import org.apache.druid.indexer.TaskStatusPlus;
import org.apache.druid.java.util.common.DateTimes;
@ -65,13 +66,16 @@ public class HttpIndexingServiceClient implements IndexingServiceClient
}
@Override
public void killUnusedSegments(String dataSource, Interval interval)
public void killUnusedSegments(String idPrefix, String dataSource, Interval interval)
{
runTask(new ClientKillUnusedSegmentsTaskQuery(dataSource, interval));
final String taskId = IdUtils.newTaskId(idPrefix, ClientKillUnusedSegmentsTaskQuery.TYPE, dataSource, interval);
final ClientTaskQuery taskQuery = new ClientKillUnusedSegmentsTaskQuery(taskId, dataSource, interval);
runTask(taskId, taskQuery);
}
@Override
public String compactSegments(
String idPrefix,
List<DataSegment> segments,
int compactionTaskPriority,
ClientCompactionTaskQueryTuningConfig tuningConfig,
@ -89,18 +93,19 @@ public class HttpIndexingServiceClient implements IndexingServiceClient
context = context == null ? new HashMap<>() : context;
context.put("priority", compactionTaskPriority);
return runTask(
new ClientCompactionTaskQuery(
dataSource,
new ClientCompactionIOConfig(ClientCompactionIntervalSpec.fromSegments(segments)),
tuningConfig,
context
)
final String taskId = IdUtils.newTaskId(idPrefix, ClientCompactionTaskQuery.TYPE, dataSource, null);
final ClientTaskQuery taskQuery = new ClientCompactionTaskQuery(
taskId,
dataSource,
new ClientCompactionIOConfig(ClientCompactionIntervalSpec.fromSegments(segments)),
tuningConfig,
context
);
return runTask(taskId, taskQuery);
}
@Override
public String runTask(Object taskObject)
public String runTask(String taskId, Object taskObject)
{
try {
// Warning, magic: here we may serialize ClientTaskQuery objects, but OverlordResource.taskPost() deserializes
@ -114,11 +119,11 @@ public class HttpIndexingServiceClient implements IndexingServiceClient
if (!Strings.isNullOrEmpty(response.getContent())) {
throw new ISE(
"Failed to post task[%s] with error[%s].",
taskObject,
taskId,
response.getContent()
);
} else {
throw new ISE("Failed to post task[%s]. Please check overlord log", taskObject);
throw new ISE("Failed to post task[%s]. Please check overlord log", taskId);
}
}
@ -126,8 +131,14 @@ public class HttpIndexingServiceClient implements IndexingServiceClient
response.getContent(),
JacksonUtils.TYPE_REFERENCE_MAP_STRING_OBJECT
);
final String taskId = (String) resultMap.get("task");
return Preconditions.checkNotNull(taskId, "Null task id for task[%s]", taskObject);
final String returnedTaskId = (String) resultMap.get("task");
Preconditions.checkState(
taskId.equals(returnedTaskId),
"Got a different taskId[%s]. Expected taskId[%s]",
returnedTaskId,
taskId
);
return taskId;
}
catch (Exception e) {
throw new RuntimeException(e);

View File

@ -31,11 +31,12 @@ import java.util.Set;
public interface IndexingServiceClient
{
void killUnusedSegments(String dataSource, Interval interval);
void killUnusedSegments(String idPrefix, String dataSource, Interval interval);
int killPendingSegments(String dataSource, DateTime end);
String compactSegments(
String idPrefix,
List<DataSegment> segments,
int compactionTaskPriority,
@Nullable ClientCompactionTaskQueryTuningConfig tuningConfig,
@ -44,7 +45,7 @@ public interface IndexingServiceClient
int getTotalWorkerCapacity();
String runTask(Object taskObject);
String runTask(String taskId, Object taskObject);
String cancelTask(String taskId);

View File

@ -203,6 +203,7 @@ public class CompactSegments implements CoordinatorDuty
final DataSourceCompactionConfig config = compactionConfigs.get(dataSourceName);
// make tuningConfig
final String taskId = indexingServiceClient.compactSegments(
"coordinator-issued",
segmentsToCompact,
config.getTaskPriority(),
ClientCompactionTaskQueryTuningConfig.from(config.getTuningConfig(), config.getMaxRowsPerSegment()),

View File

@ -111,7 +111,7 @@ public class KillUnusedSegments implements CoordinatorDuty
final Interval intervalToKill = findIntervalForKill(dataSource, maxSegmentsToKill);
if (intervalToKill != null) {
try {
indexingServiceClient.killUnusedSegments(dataSource, intervalToKill);
indexingServiceClient.killUnusedSegments("coordinator-issued", dataSource, intervalToKill);
}
catch (Exception ex) {
log.error(ex, "Failed to submit kill task for dataSource [%s]", dataSource);

View File

@ -330,7 +330,7 @@ public class DataSourcesResource
}
final Interval theInterval = Intervals.of(interval.replace('_', '/'));
try {
indexingServiceClient.killUnusedSegments(dataSourceName, theInterval);
indexingServiceClient.killUnusedSegments("api-issued", dataSourceName, theInterval);
return Response.ok().build();
}
catch (Exception e) {

View File

@ -19,6 +19,7 @@
package org.apache.druid.client.indexing;
import nl.jqno.equalsverifier.EqualsVerifier;
import org.apache.druid.java.util.common.DateTimes;
import org.joda.time.DateTime;
import org.joda.time.Interval;
@ -27,7 +28,7 @@ import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
public class ClientKillUnusedSegmentsQueryTest
public class ClientKillUnusedSegmentsTaskQueryTest
{
private static final String DATA_SOURCE = "data_source";
public static final DateTime START = DateTimes.nowUtc();
@ -38,7 +39,7 @@ public class ClientKillUnusedSegmentsQueryTest
@Before
public void setUp()
{
clientKillUnusedSegmentsQuery = new ClientKillUnusedSegmentsTaskQuery(DATA_SOURCE, INTERVAL);
clientKillUnusedSegmentsQuery = new ClientKillUnusedSegmentsTaskQuery("killTaskId", DATA_SOURCE, INTERVAL);
}
@After
@ -64,4 +65,13 @@ public class ClientKillUnusedSegmentsQueryTest
{
Assert.assertEquals(INTERVAL, clientKillUnusedSegmentsQuery.getInterval());
}
@Test
public void testEquals()
{
EqualsVerifier.forClass(ClientKillUnusedSegmentsTaskQuery.class)
.usingGetClass()
.withNonnullFields("id", "dataSource", "interval")
.verify();
}
}

View File

@ -33,7 +33,7 @@ import java.util.Set;
public class NoopIndexingServiceClient implements IndexingServiceClient
{
@Override
public void killUnusedSegments(String dataSource, Interval interval)
public void killUnusedSegments(String idPrefix, String dataSource, Interval interval)
{
}
@ -46,6 +46,7 @@ public class NoopIndexingServiceClient implements IndexingServiceClient
@Override
public String compactSegments(
String idPrefix,
List<DataSegment> segments,
int compactionTaskPriority,
@Nullable ClientCompactionTaskQueryTuningConfig tuningConfig,
@ -62,7 +63,7 @@ public class NoopIndexingServiceClient implements IndexingServiceClient
}
@Override
public String runTask(Object taskObject)
public String runTask(String taskId, Object taskObject)
{
return null;
}

View File

@ -356,7 +356,6 @@ public class CompactSegmentsTest
private final ObjectMapper jsonMapper;
private int compactVersionSuffix = 0;
private int idSuffix = 0;
private TestDruidLeaderClient(ObjectMapper jsonMapper)
{
@ -434,15 +433,15 @@ public class CompactSegmentsTest
.flatMap(holder -> Streams.sequentialStreamFrom(holder.getObject()))
.map(PartitionChunk::getObject)
.collect(Collectors.toList());
final String taskId = compactSegments(
compactSegments(
timeline,
segments,
compactionTaskQuery.getTuningConfig()
);
return createStringFullResponseHolder(jsonMapper.writeValueAsString(ImmutableMap.of("task", taskId)));
return createStringFullResponseHolder(jsonMapper.writeValueAsString(ImmutableMap.of("task", taskQuery.getId())));
}
private String compactSegments(
private void compactSegments(
VersionedIntervalTimeline<String, DataSegment> timeline,
List<DataSegment> segments,
ClientCompactionTaskQueryTuningConfig tuningConfig
@ -503,8 +502,6 @@ public class CompactSegmentsTest
compactSegment.getShardSpec().createChunk(compactSegment)
);
}
return "task_" + idSuffix++;
}
}

View File

@ -592,7 +592,7 @@ public class DataSourcesResourceTest
Interval theInterval = Intervals.of(interval.replace('_', '/'));
IndexingServiceClient indexingServiceClient = EasyMock.createStrictMock(IndexingServiceClient.class);
indexingServiceClient.killUnusedSegments("datasource1", theInterval);
indexingServiceClient.killUnusedSegments("api-issued", "datasource1", theInterval);
EasyMock.expectLastCall().once();
EasyMock.replay(indexingServiceClient, server);