diff --git a/core/src/main/java/org/apache/druid/common/utils/IdUtils.java b/core/src/main/java/org/apache/druid/common/utils/IdUtils.java index f0f4ef90a9a..cbafef00e67 100644 --- a/core/src/main/java/org/apache/druid/common/utils/IdUtils.java +++ b/core/src/main/java/org/apache/druid/common/utils/IdUtils.java @@ -19,11 +19,18 @@ package org.apache.druid.common.utils; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Joiner; import com.google.common.base.Preconditions; import com.google.common.base.Strings; +import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.StringUtils; +import org.joda.time.DateTime; +import org.joda.time.Interval; +import javax.annotation.Nullable; +import java.util.ArrayList; +import java.util.List; import java.util.concurrent.ThreadLocalRandom; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -68,4 +75,44 @@ public class IdUtils { return UNDERSCORE_JOINER.join(prefix, IdUtils.getRandomId()); } + + public static String newTaskId(String typeName, String dataSource, @Nullable Interval interval) + { + return newTaskId(null, typeName, dataSource, interval); + } + + public static String newTaskId(@Nullable String idPrefix, String typeName, String dataSource, @Nullable Interval interval) + { + return newTaskId(idPrefix, getRandomId(), DateTimes.nowUtc(), typeName, dataSource, interval); + } + + /** + * This method is only visible to outside only for testing. + * Use {@link #newTaskId(String, String, Interval)} or {@link #newTaskId(String, String, String, Interval)} instead. + */ + @VisibleForTesting + static String newTaskId( + @Nullable String idPrefix, + String idSuffix, + DateTime now, + String typeName, + String dataSource, + @Nullable Interval interval + ) + { + final List objects = new ArrayList<>(); + if (idPrefix != null) { + objects.add(idPrefix); + } + objects.add(typeName); + objects.add(dataSource); + objects.add(idSuffix); + if (interval != null) { + objects.add(interval.getStart().toString()); + objects.add(interval.getEnd().toString()); + } + objects.add(now.toString()); + + return String.join("_", objects); + } } diff --git a/core/src/test/java/org/apache/druid/common/utils/IdUtilsTest.java b/core/src/test/java/org/apache/druid/common/utils/IdUtilsTest.java index 2adbd495c30..16a34f15608 100644 --- a/core/src/test/java/org/apache/druid/common/utils/IdUtilsTest.java +++ b/core/src/test/java/org/apache/druid/common/utils/IdUtilsTest.java @@ -19,6 +19,9 @@ package org.apache.druid.common.utils; +import org.apache.druid.java.util.common.DateTimes; +import org.apache.druid.java.util.common.Intervals; +import org.junit.Assert; import org.junit.Rule; import org.junit.Test; import org.junit.rules.ExpectedException; @@ -108,4 +111,50 @@ public class IdUtilsTest expectedException.expectMessage("thingToValidate cannot contain whitespace character except space."); IdUtils.validateId(THINGO, "form\u000cfeed?"); } + + @Test + public void testNewTaskIdWithoutInterval() + { + final String id = IdUtils.newTaskId( + "prefix", + "suffix", + DateTimes.of("2020-01-01"), + "type", + "datasource", + null + ); + final String expected = String.join( + "_", + "prefix", + "type", + "datasource", + "suffix", + DateTimes.of("2020-01-01").toString() + ); + Assert.assertEquals(expected, id); + } + + @Test + public void testNewTaskIdWithInterval() + { + final String id = IdUtils.newTaskId( + "prefix", + "suffix", + DateTimes.of("2020-01-01"), + "type", + "datasource", + Intervals.of("2020-01-01/2020-06-01") + ); + final String expected = String.join( + "_", + "prefix", + "type", + "datasource", + "suffix", + DateTimes.of("2020-01-01").toString(), + DateTimes.of("2020-06-01").toString(), + DateTimes.of("2020-01-01").toString() + ); + Assert.assertEquals(expected, id); + } } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractTask.java index dbc9119736f..8964a1e1c68 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractTask.java @@ -21,7 +21,6 @@ package org.apache.druid.indexing.common.task; import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonProperty; -import com.google.common.base.Joiner; import com.google.common.base.Objects; import com.google.common.base.Preconditions; import org.apache.druid.common.utils.IdUtils; @@ -29,22 +28,18 @@ import org.apache.druid.indexer.TaskStatus; import org.apache.druid.indexing.common.TaskLock; import org.apache.druid.indexing.common.actions.LockListAction; import org.apache.druid.indexing.common.actions.TaskActionClient; -import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.query.Query; import org.apache.druid.query.QueryRunner; import org.joda.time.Interval; import javax.annotation.Nullable; import java.io.IOException; -import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; public abstract class AbstractTask implements Task { - private static final Joiner ID_JOINER = Joiner.on("_"); - @JsonIgnore private final String id; @@ -80,29 +75,18 @@ public abstract class AbstractTask implements Task this.context = context == null ? new HashMap<>() : new HashMap<>(context); } - public static String getOrMakeId(String id, final String typeName, String dataSource) + public static String getOrMakeId(@Nullable String id, final String typeName, String dataSource) { return getOrMakeId(id, typeName, dataSource, null); } - static String getOrMakeId(String id, final String typeName, String dataSource, @Nullable Interval interval) + static String getOrMakeId(@Nullable String id, final String typeName, String dataSource, @Nullable Interval interval) { if (id != null) { return id; } - final List objects = new ArrayList<>(); - final String suffix = IdUtils.getRandomId(); - objects.add(typeName); - objects.add(dataSource); - objects.add(suffix); - if (interval != null) { - objects.add(interval.getStart()); - objects.add(interval.getEnd()); - } - objects.add(DateTimes.nowUtc().toString()); - - return joinId(objects); + return IdUtils.newTaskId(typeName, dataSource, interval); } @JsonProperty @@ -175,23 +159,6 @@ public abstract class AbstractTask implements Task '}'; } - /** - * Start helper methods - * - * @param objects objects to join - * - * @return string of joined objects - */ - static String joinId(List objects) - { - return ID_JOINER.join(objects); - } - - static String joinId(Object... objects) - { - return ID_JOINER.join(objects); - } - public TaskStatus success() { return TaskStatus.success(getId()); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java index 5cfaf32c92a..c9e74663564 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java @@ -183,8 +183,8 @@ public class CompactionTask extends AbstractBatchIndexTask @JsonCreator public CompactionTask( - @JsonProperty("id") final String id, - @JsonProperty("resource") final TaskResource taskResource, + @JsonProperty("id") @Nullable final String id, + @JsonProperty("resource") @Nullable final TaskResource taskResource, @JsonProperty("dataSource") final String dataSource, @JsonProperty("interval") @Deprecated @Nullable final Interval interval, @JsonProperty("segments") @Deprecated @Nullable final List segments, diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexPhaseRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexPhaseRunner.java index 57c117da578..7c71d5811e8 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexPhaseRunner.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexPhaseRunner.java @@ -26,11 +26,6 @@ import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import org.apache.druid.client.indexing.IndexingServiceClient; -import org.apache.druid.data.input.InputFormat; -import org.apache.druid.data.input.InputSource; -import org.apache.druid.data.input.InputSplit; -import org.apache.druid.data.input.SplitHintSpec; -import org.apache.druid.data.input.impl.SplittableInputSource; import org.apache.druid.indexer.TaskState; import org.apache.druid.indexer.TaskStatusPlus; import org.apache.druid.indexing.common.TaskToolbox; @@ -53,7 +48,6 @@ import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.TimeUnit; -import java.util.stream.Collectors; /** * Base class for different implementations of {@link ParallelIndexTaskRunner}. @@ -195,8 +189,8 @@ public abstract class ParallelIndexPhaseRunner spec = taskCompleteEvent.getSpec(); + LOG.error("Failed to process spec[%s] with an unknown last status", spec.getId()); } break; default: @@ -252,7 +246,7 @@ public abstract class ParallelIndexPhaseRunner spec ) { - LOG.info("Submit a new task for spec[%s] and inputSplit[%s]", spec.getId(), spec.getInputSplit()); + LOG.info("Submit a new task for spec[%s]", spec.getId()); final ListenableFuture> future = taskMonitor.submit(spec); Futures.addCallback( future, @@ -269,27 +263,13 @@ public abstract class ParallelIndexPhaseRunner getSplitsIfSplittable( - InputSource inputSource, - InputFormat inputFormat, - @Nullable SplitHintSpec splitHintSpec - ) throws IOException - { - if (inputSource instanceof SplittableInputSource) { - final SplittableInputSource splittableInputSource = (SplittableInputSource) inputSource; - return splittableInputSource.createSplits(inputFormat, splitHintSpec).collect(Collectors.toList()); - } else { - throw new ISE("inputSource[%s] is not splittable", inputSource.getClass().getSimpleName()); - } - } - @Override public void stopGracefully() { diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java index 28667acbfa4..842d0b6f6a0 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java @@ -39,7 +39,6 @@ import org.apache.druid.indexer.TaskState; import org.apache.druid.indexer.TaskStatus; import org.apache.druid.indexer.partitions.PartitionsSpec; import org.apache.druid.indexer.partitions.SingleDimensionPartitionsSpec; -import org.apache.druid.indexing.appenderator.ActionBasedUsedSegmentChecker; import org.apache.druid.indexing.common.Counters; import org.apache.druid.indexing.common.TaskLock; import org.apache.druid.indexing.common.TaskLockType; @@ -73,7 +72,6 @@ import org.apache.druid.segment.indexing.granularity.GranularitySpec; import org.apache.druid.segment.realtime.appenderator.AppenderatorsManager; import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec; import org.apache.druid.segment.realtime.appenderator.TransactionalSegmentPublisher; -import org.apache.druid.segment.realtime.appenderator.UsedSegmentChecker; import org.apache.druid.segment.realtime.firehose.ChatHandler; import org.apache.druid.segment.realtime.firehose.ChatHandlerProvider; import org.apache.druid.segment.realtime.firehose.ChatHandlers; @@ -769,7 +767,6 @@ public class ParallelIndexSupervisorTask extends AbstractBatchIndexTask implemen private static void publishSegments(TaskToolbox toolbox, Map reportsMap) throws IOException { - final UsedSegmentChecker usedSegmentChecker = new ActionBasedUsedSegmentChecker(toolbox.getTaskActionClient()); final Set oldSegments = new HashSet<>(); final Set newSegments = new HashSet<>(); reportsMap @@ -788,18 +785,7 @@ public class ParallelIndexSupervisorTask extends AbstractBatchIndexTask implemen if (published) { LOG.info("Published [%d] segments", newSegments.size()); } else { - LOG.info("Transaction failure while publishing segments, checking if someone else beat us to it."); - final Set segmentsIdentifiers = reportsMap - .values() - .stream() - .flatMap(report -> report.getNewSegments().stream()) - .map(SegmentIdWithShardSpec::fromDataSegment) - .collect(Collectors.toSet()); - if (usedSegmentChecker.findUsedSegments(segmentsIdentifiers).equals(newSegments)) { - LOG.info("Our segments really do exist, awaiting handoff."); - } else { - throw new ISE("Failed to publish segments[%s]", newSegments); - } + throw new ISE("Failed to publish segments"); } } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/TaskMonitor.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/TaskMonitor.java index 588f1628d77..302d23ac312 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/TaskMonitor.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/TaskMonitor.java @@ -277,13 +277,13 @@ public class TaskMonitor { T task = spec.newSubTask(numAttempts); try { - indexingServiceClient.runTask(task); + indexingServiceClient.runTask(task.getId(), task); } catch (Exception e) { if (isUnknownTypeIdException(e)) { log.warn(e, "Got an unknown type id error. Retrying with a backward compatible type."); task = spec.newSubTaskWithBackwardCompatibleType(numAttempts); - indexingServiceClient.runTask(task); + indexingServiceClient.runTask(task.getId(), task); } else { throw e; } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ClientCompactionTaskQuerySerdeTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ClientCompactionTaskQuerySerdeTest.java index 8be6b85864d..e17119ae17a 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ClientCompactionTaskQuerySerdeTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ClientCompactionTaskQuerySerdeTest.java @@ -23,17 +23,22 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.introspect.AnnotationIntrospectorPair; import com.fasterxml.jackson.databind.jsontype.NamedType; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; import org.apache.druid.client.coordinator.CoordinatorClient; import org.apache.druid.client.indexing.ClientCompactionIOConfig; import org.apache.druid.client.indexing.ClientCompactionIntervalSpec; import org.apache.druid.client.indexing.ClientCompactionTaskQuery; import org.apache.druid.client.indexing.ClientCompactionTaskQueryTuningConfig; +import org.apache.druid.client.indexing.ClientTaskQuery; import org.apache.druid.client.indexing.IndexingServiceClient; import org.apache.druid.client.indexing.NoopIndexingServiceClient; import org.apache.druid.data.input.SegmentsSplitHintSpec; import org.apache.druid.guice.GuiceAnnotationIntrospector; import org.apache.druid.guice.GuiceInjectableValues; import org.apache.druid.guice.GuiceInjectors; +import org.apache.druid.indexer.partitions.DynamicPartitionsSpec; +import org.apache.druid.indexing.common.RetryPolicyConfig; +import org.apache.druid.indexing.common.RetryPolicyFactory; import org.apache.druid.indexing.common.SegmentLoaderFactory; import org.apache.druid.indexing.common.TestUtils; import org.apache.druid.indexing.common.stats.RowIngestionMetersFactory; @@ -63,10 +68,11 @@ public class ClientCompactionTaskQuerySerdeTest private static final AppenderatorsManager APPENDERATORS_MANAGER = new TestAppenderatorsManager(); @Test - public void testSerde() throws IOException + public void testClientCompactionTaskQueryToCompactionTask() throws IOException { final ObjectMapper mapper = setupInjectablesInObjectMapper(new DefaultObjectMapper()); final ClientCompactionTaskQuery query = new ClientCompactionTaskQuery( + "id", "datasource", new ClientCompactionIOConfig( new ClientCompactionIntervalSpec( @@ -90,12 +96,13 @@ public class ClientCompactionTaskQuerySerdeTest 1000L, 100 ), - new HashMap<>() + ImmutableMap.of("key", "value") ); final byte[] json = mapper.writeValueAsBytes(query); final CompactionTask task = (CompactionTask) mapper.readValue(json, Task.class); + Assert.assertEquals(query.getId(), task.getId()); Assert.assertEquals(query.getDataSource(), task.getDataSource()); Assert.assertTrue(task.getIoConfig().getInputSpec() instanceof CompactionIntervalSpec); Assert.assertEquals( @@ -141,6 +148,95 @@ public class ClientCompactionTaskQuerySerdeTest Assert.assertEquals(query.getContext(), task.getContext()); } + @Test + public void testCompactionTaskToClientCompactionTaskQuery() throws IOException + { + final ObjectMapper mapper = setupInjectablesInObjectMapper(new DefaultObjectMapper()); + final CompactionTask.Builder builder = new CompactionTask.Builder( + "datasource", + mapper, + AuthTestUtils.TEST_AUTHORIZER_MAPPER, + new NoopChatHandlerProvider(), + ROW_INGESTION_METERS_FACTORY, + new NoopIndexingServiceClient(), + COORDINATOR_CLIENT, + new SegmentLoaderFactory(null, mapper), + new RetryPolicyFactory(new RetryPolicyConfig()), + APPENDERATORS_MANAGER + ); + final CompactionTask task = builder + .inputSpec(new CompactionIntervalSpec(Intervals.of("2019/2020"), "testSha256OfSortedSegmentIds")) + .tuningConfig( + new ParallelIndexTuningConfig( + null, + null, + 40000, + 2000L, + null, + null, + new SegmentsSplitHintSpec(100000L), + new DynamicPartitionsSpec(100, 30000L), + new IndexSpec( + new DefaultBitmapSerdeFactory(), + CompressionStrategy.LZ4, + CompressionStrategy.LZF, + LongEncodingStrategy.LONGS + ), + null, + null, + null, + null, + 1000L, + null, + null, + 100, + null, + null, + null, + null, + null, + null, + null, + null, + null + ) + ) + .build(); + + final ClientCompactionTaskQuery expected = new ClientCompactionTaskQuery( + task.getId(), + "datasource", + new ClientCompactionIOConfig( + new ClientCompactionIntervalSpec( + Intervals.of("2019/2020"), + "testSha256OfSortedSegmentIds" + ) + ), + new ClientCompactionTaskQueryTuningConfig( + 100, + 40000, + 2000L, + 30000L, + new SegmentsSplitHintSpec(100000L), + new IndexSpec( + new DefaultBitmapSerdeFactory(), + CompressionStrategy.LZ4, + CompressionStrategy.LZF, + LongEncodingStrategy.LONGS + ), + 0, + 1000L, + 100 + ), + new HashMap<>() + ); + + final byte[] json = mapper.writeValueAsBytes(task); + final ClientCompactionTaskQuery actual = (ClientCompactionTaskQuery) mapper.readValue(json, ClientTaskQuery.class); + + Assert.assertEquals(expected, actual); + } + private static ObjectMapper setupInjectablesInObjectMapper(ObjectMapper objectMapper) { final GuiceAnnotationIntrospector guiceIntrospector = new GuiceAnnotationIntrospector(); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ClientKillUnusedSegmentsTaskQuerySerdeTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ClientKillUnusedSegmentsTaskQuerySerdeTest.java new file mode 100644 index 00000000000..a0498467490 --- /dev/null +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ClientKillUnusedSegmentsTaskQuerySerdeTest.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.common.task; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.jsontype.NamedType; +import org.apache.druid.client.indexing.ClientKillUnusedSegmentsTaskQuery; +import org.apache.druid.client.indexing.ClientTaskQuery; +import org.apache.druid.jackson.DefaultObjectMapper; +import org.apache.druid.java.util.common.Intervals; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; + +public class ClientKillUnusedSegmentsTaskQuerySerdeTest +{ + private ObjectMapper objectMapper; + + @Before + public void setup() + { + objectMapper = new DefaultObjectMapper(); + objectMapper.registerSubtypes( + new NamedType(ClientKillUnusedSegmentsTaskQuery.class, ClientKillUnusedSegmentsTaskQuery.TYPE) + ); + } + + @Test + public void testClientKillUnusedSegmentsTaskQueryToKillUnusedSegmentsTask() throws IOException + { + final ClientKillUnusedSegmentsTaskQuery taskQuery = new ClientKillUnusedSegmentsTaskQuery( + "killTaskId", + "datasource", + Intervals.of("2020-01-01/P1D") + ); + final byte[] json = objectMapper.writeValueAsBytes(taskQuery); + final KillUnusedSegmentsTask fromJson = (KillUnusedSegmentsTask) objectMapper.readValue(json, Task.class); + Assert.assertEquals(taskQuery.getId(), fromJson.getId()); + Assert.assertEquals(taskQuery.getDataSource(), fromJson.getDataSource()); + Assert.assertEquals(taskQuery.getInterval(), fromJson.getInterval()); + } + + @Test + public void testKillUnusedSegmentsTaskToClientKillUnusedSegmentsTaskQuery() throws IOException + { + final KillUnusedSegmentsTask task = new KillUnusedSegmentsTask( + null, + "datasource", + Intervals.of("2020-01-01/P1D"), + null + ); + final byte[] json = objectMapper.writeValueAsBytes(task); + final ClientKillUnusedSegmentsTaskQuery taskQuery = (ClientKillUnusedSegmentsTaskQuery) objectMapper.readValue( + json, + ClientTaskQuery.class + ); + Assert.assertEquals(task.getId(), taskQuery.getId()); + Assert.assertEquals(task.getDataSource(), taskQuery.getDataSource()); + Assert.assertEquals(task.getInterval(), taskQuery.getInterval()); + } +} diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskSerdeTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskSerdeTest.java index 8c8d4c2b889..6802961bf20 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskSerdeTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskSerdeTest.java @@ -24,7 +24,6 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.jsontype.NamedType; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; -import org.apache.druid.client.indexing.ClientKillUnusedSegmentsTaskQuery; import org.apache.druid.data.input.impl.DimensionsSpec; import org.apache.druid.data.input.impl.LocalInputSource; import org.apache.druid.data.input.impl.NoopInputFormat; @@ -383,42 +382,6 @@ public class TaskSerdeTest Assert.assertTrue(task2.getIngestionSchema().getIOConfig().getInputSource() instanceof LocalInputSource); } - @Test - public void testKillTaskSerde() throws Exception - { - final KillUnusedSegmentsTask task = new KillUnusedSegmentsTask( - null, - "foo", - Intervals.of("2010-01-01/P1D"), - null - ); - - final String json = jsonMapper.writeValueAsString(task); - - Thread.sleep(100); // Just want to run the clock a bit to make sure the task id doesn't change - final KillUnusedSegmentsTask task2 = (KillUnusedSegmentsTask) jsonMapper.readValue(json, Task.class); - - Assert.assertEquals("foo", task.getDataSource()); - Assert.assertEquals(Intervals.of("2010-01-01/P1D"), task.getInterval()); - - Assert.assertEquals(task.getId(), task2.getId()); - Assert.assertEquals(task.getGroupId(), task2.getGroupId()); - Assert.assertEquals(task.getDataSource(), task2.getDataSource()); - Assert.assertEquals(task.getInterval(), task2.getInterval()); - - final KillUnusedSegmentsTask task3 = (KillUnusedSegmentsTask) jsonMapper.readValue( - jsonMapper.writeValueAsString( - new ClientKillUnusedSegmentsTaskQuery( - "foo", - Intervals.of("2010-01-01/P1D") - ) - ), Task.class - ); - - Assert.assertEquals("foo", task3.getDataSource()); - Assert.assertEquals(Intervals.of("2010-01-01/P1D"), task3.getInterval()); - } - @Test public void testRealtimeIndexTaskSerde() throws Exception { diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java index a1d26feaa9b..858c6c93f23 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java @@ -454,7 +454,7 @@ public class AbstractParallelIndexSupervisorTaskTest extends IngestionTestBase } @Override - public String runTask(Object taskObject) + public String runTask(String taskId, Object taskObject) { final Task task = (Task) taskObject; return taskRunner.run(injectIfNeeded(task)); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskKillTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskKillTest.java index e50b41ffcca..680fab35db5 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskKillTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskKillTest.java @@ -83,7 +83,7 @@ public class ParallelIndexSupervisorTaskKillTest extends AbstractParallelIndexSu false ) ); - getIndexingServiceClient().runTask(task); + getIndexingServiceClient().runTask(task.getId(), task); while (task.getCurrentRunner() == null) { Thread.sleep(100); } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskResourceTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskResourceTest.java index 9f8a07dfaeb..acede08ca45 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskResourceTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskResourceTest.java @@ -129,7 +129,7 @@ public class ParallelIndexSupervisorTaskResourceTest extends AbstractParallelInd false ) ); - getIndexingServiceClient().runTask(task); + getIndexingServiceClient().runTask(task.getId(), task); Thread.sleep(1000); final SinglePhaseParallelIndexTaskRunner runner = (SinglePhaseParallelIndexTaskRunner) task.getCurrentRunner(); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/TaskMonitorTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/TaskMonitorTest.java index 07a231e31a1..1e99dd67346 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/TaskMonitorTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/TaskMonitorTest.java @@ -245,7 +245,7 @@ public class TaskMonitorTest private class TestIndexingServiceClient extends NoopIndexingServiceClient { @Override - public String runTask(Object taskObject) + public String runTask(String taskId, Object taskObject) { final TestTask task = (TestTask) taskObject; tasks.put(task.getId(), TaskState.RUNNING); diff --git a/server/src/main/java/org/apache/druid/client/indexing/ClientCompactionTaskQuery.java b/server/src/main/java/org/apache/druid/client/indexing/ClientCompactionTaskQuery.java index fd509a3583b..d0dd1756f02 100644 --- a/server/src/main/java/org/apache/druid/client/indexing/ClientCompactionTaskQuery.java +++ b/server/src/main/java/org/apache/druid/client/indexing/ClientCompactionTaskQuery.java @@ -21,6 +21,7 @@ package org.apache.druid.client.indexing; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.base.Preconditions; import java.util.Map; import java.util.Objects; @@ -31,6 +32,9 @@ import java.util.Objects; */ public class ClientCompactionTaskQuery implements ClientTaskQuery { + static final String TYPE = "compact"; + + private final String id; private final String dataSource; private final ClientCompactionIOConfig ioConfig; private final ClientCompactionTaskQueryTuningConfig tuningConfig; @@ -38,23 +42,32 @@ public class ClientCompactionTaskQuery implements ClientTaskQuery @JsonCreator public ClientCompactionTaskQuery( + @JsonProperty("id") String id, @JsonProperty("dataSource") String dataSource, @JsonProperty("ioConfig") ClientCompactionIOConfig ioConfig, @JsonProperty("tuningConfig") ClientCompactionTaskQueryTuningConfig tuningConfig, @JsonProperty("context") Map context ) { + this.id = Preconditions.checkNotNull(id, "id"); this.dataSource = dataSource; this.ioConfig = ioConfig; this.tuningConfig = tuningConfig; this.context = context; } + @JsonProperty + @Override + public String getId() + { + return id; + } + @JsonProperty @Override public String getType() { - return "compact"; + return TYPE; } @JsonProperty @@ -92,7 +105,8 @@ public class ClientCompactionTaskQuery implements ClientTaskQuery return false; } ClientCompactionTaskQuery that = (ClientCompactionTaskQuery) o; - return Objects.equals(dataSource, that.dataSource) && + return Objects.equals(id, that.id) && + Objects.equals(dataSource, that.dataSource) && Objects.equals(ioConfig, that.ioConfig) && Objects.equals(tuningConfig, that.tuningConfig) && Objects.equals(context, that.context); @@ -101,14 +115,15 @@ public class ClientCompactionTaskQuery implements ClientTaskQuery @Override public int hashCode() { - return Objects.hash(dataSource, ioConfig, tuningConfig, context); + return Objects.hash(id, dataSource, ioConfig, tuningConfig, context); } @Override public String toString() { - return "ClientCompactQuery{" + - "dataSource='" + dataSource + '\'' + + return "ClientCompactionTaskQuery{" + + "id='" + id + '\'' + + ", dataSource='" + dataSource + '\'' + ", ioConfig=" + ioConfig + ", tuningConfig=" + tuningConfig + ", context=" + context + diff --git a/server/src/main/java/org/apache/druid/client/indexing/ClientKillUnusedSegmentsTaskQuery.java b/server/src/main/java/org/apache/druid/client/indexing/ClientKillUnusedSegmentsTaskQuery.java index f5851ba273b..ec008d38568 100644 --- a/server/src/main/java/org/apache/druid/client/indexing/ClientKillUnusedSegmentsTaskQuery.java +++ b/server/src/main/java/org/apache/druid/client/indexing/ClientKillUnusedSegmentsTaskQuery.java @@ -21,8 +21,11 @@ package org.apache.druid.client.indexing; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.base.Preconditions; import org.joda.time.Interval; +import java.util.Objects; + /** * Client representation of org.apache.druid.indexing.common.task.KillUnusedSegmentsTask. JSON searialization * fields of this class must correspond to those of org.apache.druid.indexing.common.task.KillUnusedSegmentsTask, except @@ -30,24 +33,36 @@ import org.joda.time.Interval; */ public class ClientKillUnusedSegmentsTaskQuery implements ClientTaskQuery { + public static final String TYPE = "kill"; + + private final String id; private final String dataSource; private final Interval interval; @JsonCreator public ClientKillUnusedSegmentsTaskQuery( + @JsonProperty("id") String id, @JsonProperty("dataSource") String dataSource, @JsonProperty("interval") Interval interval ) { + this.id = Preconditions.checkNotNull(id, "id"); this.dataSource = dataSource; this.interval = interval; } + @JsonProperty + @Override + public String getId() + { + return id; + } + @JsonProperty @Override public String getType() { - return "kill"; + return TYPE; } @JsonProperty @@ -62,4 +77,25 @@ public class ClientKillUnusedSegmentsTaskQuery implements ClientTaskQuery { return interval; } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + ClientKillUnusedSegmentsTaskQuery that = (ClientKillUnusedSegmentsTaskQuery) o; + return Objects.equals(id, that.id) && + Objects.equals(dataSource, that.dataSource) && + Objects.equals(interval, that.interval); + } + + @Override + public int hashCode() + { + return Objects.hash(id, dataSource, interval); + } } diff --git a/server/src/main/java/org/apache/druid/client/indexing/ClientTaskQuery.java b/server/src/main/java/org/apache/druid/client/indexing/ClientTaskQuery.java index e57e30903ab..eadda17afbe 100644 --- a/server/src/main/java/org/apache/druid/client/indexing/ClientTaskQuery.java +++ b/server/src/main/java/org/apache/druid/client/indexing/ClientTaskQuery.java @@ -33,11 +33,13 @@ import com.fasterxml.jackson.annotation.JsonTypeInfo; */ @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type") @JsonSubTypes(value = { - @Type(name = "kill", value = ClientKillUnusedSegmentsTaskQuery.class), - @Type(name = "compact", value = ClientCompactionTaskQuery.class) + @Type(name = ClientKillUnusedSegmentsTaskQuery.TYPE, value = ClientKillUnusedSegmentsTaskQuery.class), + @Type(name = ClientCompactionTaskQuery.TYPE, value = ClientCompactionTaskQuery.class) }) public interface ClientTaskQuery { + String getId(); + String getType(); String getDataSource(); diff --git a/server/src/main/java/org/apache/druid/client/indexing/HttpIndexingServiceClient.java b/server/src/main/java/org/apache/druid/client/indexing/HttpIndexingServiceClient.java index 8ae9777939c..3b4f0121aa0 100644 --- a/server/src/main/java/org/apache/druid/client/indexing/HttpIndexingServiceClient.java +++ b/server/src/main/java/org/apache/druid/client/indexing/HttpIndexingServiceClient.java @@ -25,6 +25,7 @@ import com.google.common.base.Preconditions; import com.google.common.base.Strings; import com.google.common.collect.Iterables; import com.google.inject.Inject; +import org.apache.druid.common.utils.IdUtils; import org.apache.druid.discovery.DruidLeaderClient; import org.apache.druid.indexer.TaskStatusPlus; import org.apache.druid.java.util.common.DateTimes; @@ -65,13 +66,16 @@ public class HttpIndexingServiceClient implements IndexingServiceClient } @Override - public void killUnusedSegments(String dataSource, Interval interval) + public void killUnusedSegments(String idPrefix, String dataSource, Interval interval) { - runTask(new ClientKillUnusedSegmentsTaskQuery(dataSource, interval)); + final String taskId = IdUtils.newTaskId(idPrefix, ClientKillUnusedSegmentsTaskQuery.TYPE, dataSource, interval); + final ClientTaskQuery taskQuery = new ClientKillUnusedSegmentsTaskQuery(taskId, dataSource, interval); + runTask(taskId, taskQuery); } @Override public String compactSegments( + String idPrefix, List segments, int compactionTaskPriority, ClientCompactionTaskQueryTuningConfig tuningConfig, @@ -89,18 +93,19 @@ public class HttpIndexingServiceClient implements IndexingServiceClient context = context == null ? new HashMap<>() : context; context.put("priority", compactionTaskPriority); - return runTask( - new ClientCompactionTaskQuery( - dataSource, - new ClientCompactionIOConfig(ClientCompactionIntervalSpec.fromSegments(segments)), - tuningConfig, - context - ) + final String taskId = IdUtils.newTaskId(idPrefix, ClientCompactionTaskQuery.TYPE, dataSource, null); + final ClientTaskQuery taskQuery = new ClientCompactionTaskQuery( + taskId, + dataSource, + new ClientCompactionIOConfig(ClientCompactionIntervalSpec.fromSegments(segments)), + tuningConfig, + context ); + return runTask(taskId, taskQuery); } @Override - public String runTask(Object taskObject) + public String runTask(String taskId, Object taskObject) { try { // Warning, magic: here we may serialize ClientTaskQuery objects, but OverlordResource.taskPost() deserializes @@ -114,11 +119,11 @@ public class HttpIndexingServiceClient implements IndexingServiceClient if (!Strings.isNullOrEmpty(response.getContent())) { throw new ISE( "Failed to post task[%s] with error[%s].", - taskObject, + taskId, response.getContent() ); } else { - throw new ISE("Failed to post task[%s]. Please check overlord log", taskObject); + throw new ISE("Failed to post task[%s]. Please check overlord log", taskId); } } @@ -126,8 +131,14 @@ public class HttpIndexingServiceClient implements IndexingServiceClient response.getContent(), JacksonUtils.TYPE_REFERENCE_MAP_STRING_OBJECT ); - final String taskId = (String) resultMap.get("task"); - return Preconditions.checkNotNull(taskId, "Null task id for task[%s]", taskObject); + final String returnedTaskId = (String) resultMap.get("task"); + Preconditions.checkState( + taskId.equals(returnedTaskId), + "Got a different taskId[%s]. Expected taskId[%s]", + returnedTaskId, + taskId + ); + return taskId; } catch (Exception e) { throw new RuntimeException(e); diff --git a/server/src/main/java/org/apache/druid/client/indexing/IndexingServiceClient.java b/server/src/main/java/org/apache/druid/client/indexing/IndexingServiceClient.java index 43c38884d39..cede9047c02 100644 --- a/server/src/main/java/org/apache/druid/client/indexing/IndexingServiceClient.java +++ b/server/src/main/java/org/apache/druid/client/indexing/IndexingServiceClient.java @@ -31,11 +31,12 @@ import java.util.Set; public interface IndexingServiceClient { - void killUnusedSegments(String dataSource, Interval interval); + void killUnusedSegments(String idPrefix, String dataSource, Interval interval); int killPendingSegments(String dataSource, DateTime end); String compactSegments( + String idPrefix, List segments, int compactionTaskPriority, @Nullable ClientCompactionTaskQueryTuningConfig tuningConfig, @@ -44,7 +45,7 @@ public interface IndexingServiceClient int getTotalWorkerCapacity(); - String runTask(Object taskObject); + String runTask(String taskId, Object taskObject); String cancelTask(String taskId); diff --git a/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java b/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java index 7b8c9b58f45..aa1728ea47b 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java @@ -203,6 +203,7 @@ public class CompactSegments implements CoordinatorDuty final DataSourceCompactionConfig config = compactionConfigs.get(dataSourceName); // make tuningConfig final String taskId = indexingServiceClient.compactSegments( + "coordinator-issued", segmentsToCompact, config.getTaskPriority(), ClientCompactionTaskQueryTuningConfig.from(config.getTuningConfig(), config.getMaxRowsPerSegment()), diff --git a/server/src/main/java/org/apache/druid/server/coordinator/duty/KillUnusedSegments.java b/server/src/main/java/org/apache/druid/server/coordinator/duty/KillUnusedSegments.java index 29c3f22e6a2..9c94cf4b2d1 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/duty/KillUnusedSegments.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/duty/KillUnusedSegments.java @@ -111,7 +111,7 @@ public class KillUnusedSegments implements CoordinatorDuty final Interval intervalToKill = findIntervalForKill(dataSource, maxSegmentsToKill); if (intervalToKill != null) { try { - indexingServiceClient.killUnusedSegments(dataSource, intervalToKill); + indexingServiceClient.killUnusedSegments("coordinator-issued", dataSource, intervalToKill); } catch (Exception ex) { log.error(ex, "Failed to submit kill task for dataSource [%s]", dataSource); diff --git a/server/src/main/java/org/apache/druid/server/http/DataSourcesResource.java b/server/src/main/java/org/apache/druid/server/http/DataSourcesResource.java index f88d1c70854..444481d2f72 100644 --- a/server/src/main/java/org/apache/druid/server/http/DataSourcesResource.java +++ b/server/src/main/java/org/apache/druid/server/http/DataSourcesResource.java @@ -330,7 +330,7 @@ public class DataSourcesResource } final Interval theInterval = Intervals.of(interval.replace('_', '/')); try { - indexingServiceClient.killUnusedSegments(dataSourceName, theInterval); + indexingServiceClient.killUnusedSegments("api-issued", dataSourceName, theInterval); return Response.ok().build(); } catch (Exception e) { diff --git a/server/src/test/java/org/apache/druid/client/indexing/ClientKillUnusedSegmentsQueryTest.java b/server/src/test/java/org/apache/druid/client/indexing/ClientKillUnusedSegmentsTaskQueryTest.java similarity index 82% rename from server/src/test/java/org/apache/druid/client/indexing/ClientKillUnusedSegmentsQueryTest.java rename to server/src/test/java/org/apache/druid/client/indexing/ClientKillUnusedSegmentsTaskQueryTest.java index b1fdcc22f8f..fdd9f78fd98 100644 --- a/server/src/test/java/org/apache/druid/client/indexing/ClientKillUnusedSegmentsQueryTest.java +++ b/server/src/test/java/org/apache/druid/client/indexing/ClientKillUnusedSegmentsTaskQueryTest.java @@ -19,6 +19,7 @@ package org.apache.druid.client.indexing; +import nl.jqno.equalsverifier.EqualsVerifier; import org.apache.druid.java.util.common.DateTimes; import org.joda.time.DateTime; import org.joda.time.Interval; @@ -27,7 +28,7 @@ import org.junit.Assert; import org.junit.Before; import org.junit.Test; -public class ClientKillUnusedSegmentsQueryTest +public class ClientKillUnusedSegmentsTaskQueryTest { private static final String DATA_SOURCE = "data_source"; public static final DateTime START = DateTimes.nowUtc(); @@ -38,7 +39,7 @@ public class ClientKillUnusedSegmentsQueryTest @Before public void setUp() { - clientKillUnusedSegmentsQuery = new ClientKillUnusedSegmentsTaskQuery(DATA_SOURCE, INTERVAL); + clientKillUnusedSegmentsQuery = new ClientKillUnusedSegmentsTaskQuery("killTaskId", DATA_SOURCE, INTERVAL); } @After @@ -64,4 +65,13 @@ public class ClientKillUnusedSegmentsQueryTest { Assert.assertEquals(INTERVAL, clientKillUnusedSegmentsQuery.getInterval()); } + + @Test + public void testEquals() + { + EqualsVerifier.forClass(ClientKillUnusedSegmentsTaskQuery.class) + .usingGetClass() + .withNonnullFields("id", "dataSource", "interval") + .verify(); + } } diff --git a/server/src/test/java/org/apache/druid/client/indexing/NoopIndexingServiceClient.java b/server/src/test/java/org/apache/druid/client/indexing/NoopIndexingServiceClient.java index 10ae4e04cda..f770acab781 100644 --- a/server/src/test/java/org/apache/druid/client/indexing/NoopIndexingServiceClient.java +++ b/server/src/test/java/org/apache/druid/client/indexing/NoopIndexingServiceClient.java @@ -33,7 +33,7 @@ import java.util.Set; public class NoopIndexingServiceClient implements IndexingServiceClient { @Override - public void killUnusedSegments(String dataSource, Interval interval) + public void killUnusedSegments(String idPrefix, String dataSource, Interval interval) { } @@ -46,6 +46,7 @@ public class NoopIndexingServiceClient implements IndexingServiceClient @Override public String compactSegments( + String idPrefix, List segments, int compactionTaskPriority, @Nullable ClientCompactionTaskQueryTuningConfig tuningConfig, @@ -62,7 +63,7 @@ public class NoopIndexingServiceClient implements IndexingServiceClient } @Override - public String runTask(Object taskObject) + public String runTask(String taskId, Object taskObject) { return null; } diff --git a/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java b/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java index 43c38556972..755e9159d20 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java @@ -356,7 +356,6 @@ public class CompactSegmentsTest private final ObjectMapper jsonMapper; private int compactVersionSuffix = 0; - private int idSuffix = 0; private TestDruidLeaderClient(ObjectMapper jsonMapper) { @@ -434,15 +433,15 @@ public class CompactSegmentsTest .flatMap(holder -> Streams.sequentialStreamFrom(holder.getObject())) .map(PartitionChunk::getObject) .collect(Collectors.toList()); - final String taskId = compactSegments( + compactSegments( timeline, segments, compactionTaskQuery.getTuningConfig() ); - return createStringFullResponseHolder(jsonMapper.writeValueAsString(ImmutableMap.of("task", taskId))); + return createStringFullResponseHolder(jsonMapper.writeValueAsString(ImmutableMap.of("task", taskQuery.getId()))); } - private String compactSegments( + private void compactSegments( VersionedIntervalTimeline timeline, List segments, ClientCompactionTaskQueryTuningConfig tuningConfig @@ -503,8 +502,6 @@ public class CompactSegmentsTest compactSegment.getShardSpec().createChunk(compactSegment) ); } - - return "task_" + idSuffix++; } } diff --git a/server/src/test/java/org/apache/druid/server/http/DataSourcesResourceTest.java b/server/src/test/java/org/apache/druid/server/http/DataSourcesResourceTest.java index 39e02ae86de..02747a35497 100644 --- a/server/src/test/java/org/apache/druid/server/http/DataSourcesResourceTest.java +++ b/server/src/test/java/org/apache/druid/server/http/DataSourcesResourceTest.java @@ -592,7 +592,7 @@ public class DataSourcesResourceTest Interval theInterval = Intervals.of(interval.replace('_', '/')); IndexingServiceClient indexingServiceClient = EasyMock.createStrictMock(IndexingServiceClient.class); - indexingServiceClient.killUnusedSegments("datasource1", theInterval); + indexingServiceClient.killUnusedSegments("api-issued", "datasource1", theInterval); EasyMock.expectLastCall().once(); EasyMock.replay(indexingServiceClient, server);