Refactor: Cleanup NoopTask (#14938)

Changes:
- Simplify static `create` methods for `NoopTask`
- Remove `FirehoseFactory`, `IsReadyResult`, `readyTime` from `NoopTask`
as these fields were not being used anywhere
- Update tests
This commit is contained in:
Kashif Faraz 2023-09-05 09:15:41 +05:30 committed by GitHub
parent d4e972e1e4
commit 289ee1e011
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
36 changed files with 234 additions and 465 deletions

View File

@ -30,10 +30,10 @@ import org.apache.commons.io.IOUtils;
import org.apache.druid.indexer.TaskLocation;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.TestUtils;
import org.apache.druid.indexing.common.task.NoopTask;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.k8s.overlord.common.JobResponse;
import org.apache.druid.k8s.overlord.common.K8sTaskId;
import org.apache.druid.k8s.overlord.common.K8sTestUtils;
import org.apache.druid.k8s.overlord.common.KubernetesPeonClient;
import org.apache.druid.k8s.overlord.common.PeonPhase;
import org.apache.druid.tasklogs.TaskLogs;
@ -73,7 +73,7 @@ public class KubernetesPeonLifecycleTest extends EasyMockSupport
public void setup()
{
mapper = new TestUtils().getTestObjectMapper();
task = NoopTask.create(ID, 0);
task = K8sTestUtils.createTask(ID, 0);
k8sTaskId = new K8sTaskId(task);
EasyMock.expect(logWatch.getOutput()).andReturn(IOUtils.toInputStream("", StandardCharsets.UTF_8)).anyTimes();
}

View File

@ -29,7 +29,6 @@ import org.apache.commons.io.IOUtils;
import org.apache.druid.indexer.RunnerTaskState;
import org.apache.druid.indexer.TaskLocation;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.task.NoopTask;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.overlord.TaskRunnerWorkItem;
import org.apache.druid.java.util.common.Pair;
@ -38,6 +37,7 @@ import org.apache.druid.java.util.emitter.service.ServiceEventBuilder;
import org.apache.druid.java.util.http.client.HttpClient;
import org.apache.druid.java.util.http.client.Request;
import org.apache.druid.java.util.http.client.response.InputStreamResponseHandler;
import org.apache.druid.k8s.overlord.common.K8sTestUtils;
import org.apache.druid.k8s.overlord.common.KubernetesPeonClient;
import org.apache.druid.k8s.overlord.taskadapter.TaskAdapter;
import org.easymock.EasyMock;
@ -89,7 +89,7 @@ public class KubernetesTaskRunnerTest extends EasyMockSupport
.withCapacity(1)
.build();
task = NoopTask.create(ID, 0);
task = K8sTestUtils.createTask(ID, 0);
runner = new KubernetesTaskRunner(
taskAdapter,
@ -367,7 +367,7 @@ public class KubernetesTaskRunnerTest extends EasyMockSupport
@Test
public void test_getRunningTasks()
{
Task pendingTask = NoopTask.create("pending-id", 0);
Task pendingTask = K8sTestUtils.createTask("pending-id", 0);
KubernetesWorkItem pendingWorkItem = new KubernetesWorkItem(pendingTask, null) {
@Override
protected RunnerTaskState getRunnerTaskState()
@ -377,7 +377,7 @@ public class KubernetesTaskRunnerTest extends EasyMockSupport
};
runner.tasks.put(pendingTask.getId(), pendingWorkItem);
Task runningTask = NoopTask.create("running-id", 0);
Task runningTask = K8sTestUtils.createTask("running-id", 0);
KubernetesWorkItem runningWorkItem = new KubernetesWorkItem(runningTask, null) {
@Override
protected RunnerTaskState getRunnerTaskState()
@ -396,7 +396,7 @@ public class KubernetesTaskRunnerTest extends EasyMockSupport
@Test
public void test_getPendingTasks()
{
Task pendingTask = NoopTask.create("pending-id", 0);
Task pendingTask = K8sTestUtils.createTask("pending-id", 0);
KubernetesWorkItem pendingWorkItem = new KubernetesWorkItem(pendingTask, null) {
@Override
protected RunnerTaskState getRunnerTaskState()
@ -406,7 +406,7 @@ public class KubernetesTaskRunnerTest extends EasyMockSupport
};
runner.tasks.put(pendingTask.getId(), pendingWorkItem);
Task runningTask = NoopTask.create("running-id", 0);
Task runningTask = K8sTestUtils.createTask("running-id", 0);
KubernetesWorkItem runningWorkItem = new KubernetesWorkItem(runningTask, null) {
@Override
protected RunnerTaskState getRunnerTaskState()

View File

@ -44,7 +44,7 @@ public class KubernetesWorkItemTest extends EasyMockSupport
@Before
public void setup()
{
task = NoopTask.create("id", 0);
task = NoopTask.create();
workItem = new KubernetesWorkItem(task, null);
}

View File

@ -29,7 +29,9 @@ import org.apache.druid.data.input.impl.NoopInputFormat;
import org.apache.druid.data.input.impl.TimestampSpec;
import org.apache.druid.indexer.partitions.DynamicPartitionsSpec;
import org.apache.druid.indexing.common.task.IndexTask;
import org.apache.druid.indexing.common.task.NoopTask;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.common.task.Tasks;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.query.aggregation.AggregatorFactory;
@ -39,6 +41,7 @@ import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
import java.io.File;
import java.util.Collections;
public class K8sTestUtils
@ -120,4 +123,9 @@ public class K8sTestUtils
type
);
}
public static NoopTask createTask(String id, int priority)
{
return new NoopTask(id, null, null, 0, 0, Collections.singletonMap(Tasks.PRIORITY_KEY, priority));
}
}

View File

@ -354,7 +354,7 @@ class K8sTaskAdapterTest
node,
jsonMapper
);
NoopTask task = NoopTask.create("id", 1);
NoopTask task = K8sTestUtils.createTask("id", 1);
Job actual = adapter.createJobFromPodSpec(
pod.getSpec(),
task,
@ -426,4 +426,5 @@ class K8sTaskAdapterTest
);
assertEquals(1, additionalProperties.getAdditionalProperties().size());
}
}

View File

@ -100,7 +100,7 @@ class MultiContainerTaskAdapterTest
druidNode,
jsonMapper
);
NoopTask task = NoopTask.create("id", 1);
NoopTask task = K8sTestUtils.createTask("id", 1);
Job actual = adapter.createJobFromPodSpec(
pod.getSpec(),
task,
@ -148,7 +148,7 @@ class MultiContainerTaskAdapterTest
druidNode,
jsonMapper
);
NoopTask task = NoopTask.create("id", 1);
NoopTask task = K8sTestUtils.createTask("id", 1);
PodSpec spec = pod.getSpec();
K8sTaskAdapter.massageSpec(spec, "primary");
Job actual = adapter.createJobFromPodSpec(
@ -197,7 +197,7 @@ class MultiContainerTaskAdapterTest
startupLoggingConfig,
druidNode,
jsonMapper);
NoopTask task = NoopTask.create("id", 1);
NoopTask task = K8sTestUtils.createTask("id", 1);
PodSpec spec = pod.getSpec();
K8sTaskAdapter.massageSpec(spec, config.getPrimaryContainerName());
Job actual = adapter.createJobFromPodSpec(

View File

@ -130,16 +130,7 @@ public class PodTemplateTaskAdapterTest
props
);
Task task = new NoopTask(
"id",
"id",
"datasource",
0,
0,
null,
null,
null
);
Task task = new NoopTask("id", "id", "datasource", 0, 0, null);
Job actual = adapter.fromTask(task);
Job expected = K8sTestUtils.fileToResource("expectedNoopJob.yaml", Job.class);
@ -171,17 +162,7 @@ public class PodTemplateTaskAdapterTest
props
);
Task task = new NoopTask(
"id",
"id",
"datasource",
0,
0,
null,
null,
null
);
Task task = new NoopTask("id", "id", "datasource", 0, 0, null);
Job actual = adapter.fromTask(task);
Job expected = K8sTestUtils.fileToResource("expectedNoopJobTlsEnabled.yaml", Job.class);
@ -226,17 +207,7 @@ public class PodTemplateTaskAdapterTest
props
);
Task task = new NoopTask(
"id",
"id",
"datasource",
0,
0,
null,
null,
null
);
Task task = new NoopTask("id", "id", "datasource", 0, 0, null);
Job actual = adapter.fromTask(task);
Job expected = K8sTestUtils.fileToResource("expectedNoopJob.yaml", Job.class);
@ -316,7 +287,7 @@ public class PodTemplateTaskAdapterTest
Job job = K8sTestUtils.fileToResource("baseJob.yaml", Job.class);
Task actual = adapter.toTask(job);
Task expected = NoopTask.create("id", 1);
Task expected = K8sTestUtils.createTask("id", 1);
Assertions.assertEquals(expected, actual);
}
@ -345,8 +316,6 @@ public class PodTemplateTaskAdapterTest
"data_source",
0,
0,
null,
null,
null
);
@ -406,7 +375,7 @@ public class PodTemplateTaskAdapterTest
expectedAnnotations.remove(DruidK8sConstants.TASK);
expected.getSpec().getTemplate().getMetadata().setAnnotations(expectedAnnotations);
Assertions.assertEquals(actual, expected);
Assertions.assertEquals(expected, actual);
Assertions.assertEquals(
Base64Compression.decompressBase64(actualTaskAnnotation),
Base64Compression.decompressBase64(expectedTaskAnnotation)

View File

@ -98,7 +98,7 @@ class SingleContainerTaskAdapterTest
druidNode,
jsonMapper
);
NoopTask task = NoopTask.create("id", 1);
NoopTask task = K8sTestUtils.createTask("id", 1);
Job actual = adapter.createJobFromPodSpec(
pod.getSpec(),
task,

View File

@ -26,7 +26,7 @@ spec:
druid.task.group.id: "id"
druid.task.datasource: "datasource"
annotations:
task: "H4sIAAAAAAAAAEVOuQ4CIRD9l6kpVhObbY0xtrs2liOMSoKAHEZC+HeHrEczmXfmVUjFE4xgnfMgQCv++Qi4Bpf94QcVJpxdDrKbO4gLEBCyPeo70+vNMHBDnAhVWag/nihmkzh72s0cuuhANxfZYrMxAqSziV6s18aN9CkfK+ATtcGzNjqVfZ/0HRTokblEbdGjZBHGVWtvT9WXlc8AAAA="
task: "H4sIAAAAAAAAAD2MvQ4CIRCE32VqijsTG1qLi7W+wArEbHICrmC8EN7dJf40k/lmJtNQthxgEVPKMGCvXsXgKqnm4x89FTqlKm6MBzw+YCA1nvmm8W4/TQYuxRJeBbZ17cJ3ZhvoSbzShVcu2zLOf9cS7pUl+ANlclrCzr2/AQUK0FqZAAAA"
tls.enabled: "false"
task.id: "id"
task.type: "noop"

View File

@ -26,7 +26,7 @@ spec:
druid.task.group.id: "apiissuedkillwikipedia3omjobnbc10000101t000000000z20230514t0000"
druid.task.datasource: "data_source"
annotations:
task: "H4sIAAAAAAAAAMVQu07EMBD8F9fJae0QIblFCNHepeEay4kNLOezjR9AFOXf2XBIVNQnbbEzs6/ZhZU5WiaZDyGyhqGhXEdsMedqjTqhc+oTTxitQd2pcH4Lox8nxQGgBU4xAMif2BF1VAJE10Lf8pv/hH7gtxI6CXwnBBxp60sKNT5eZbXRRR9CTdP2hA2ofEENS9UPeCZe9AD0mry32swX6g/vba6uUPPT/YGanjHZ15CpxFfnGjYFX+wX6ctKE+3vcLkw/aHR6REdlvlh838N98m+VzrY3OmoJzqESb6u3yiWc3MUAgAA"
task: "H4sIAAAAAAAAAMVQPa/CMAz8L55b5KRUSFkZnt5Mpy6R20RPhtKENOFDVf87KbC+GekG352l83mG+PAWFIzOeSiATZ7Jc8nTlKzRJx4GfeMTe2uYKu3OR9eNXa8FIpYoMhpE9cImS62WKKsS61Js/zPqRuwUVgrFRkpsc+pfcMn/fiXaUKSDS6Ffn7ASPb1ZASGNDZ+zLmvEAno3RnuPoOYle/azpmagK/FAHQ8cHz9rk2/0CPaSOFizJ099PgSUWJYnqMIU2d4BAAA="
tls.enabled: "false"
task.id: "api-issued_kill_wikipedia3_omjobnbc_1000-01-01T00:00:00.000Z_2023-05-14T00:00:00.000Z_2023-05-15T17:03:01.220Z"
task.type: "noop"

View File

@ -26,7 +26,7 @@ spec:
druid.task.group.id: "id"
druid.task.datasource: "datasource"
annotations:
task: "H4sIAAAAAAAAAEVOuQ4CIRD9l6kpVhObbY0xtrs2liOMSoKAHEZC+HeHrEczmXfmVUjFE4xgnfMgQCv++Qi4Bpf94QcVJpxdDrKbO4gLEBCyPeo70+vNMHBDnAhVWag/nihmkzh72s0cuuhANxfZYrMxAqSziV6s18aN9CkfK+ATtcGzNjqVfZ/0HRTokblEbdGjZBHGVWtvT9WXlc8AAAA="
task: "H4sIAAAAAAAAAD2MvQ4CIRCE32VqijsTG1qLi7W+wArEbHICrmC8EN7dJf40k/lmJtNQthxgEVPKMGCvXsXgKqnm4x89FTqlKm6MBzw+YCA1nvmm8W4/TQYuxRJeBbZ17cJ3ZhvoSbzShVcu2zLOf9cS7pUl+ANlclrCzr2/AQUK0FqZAAAA"
tls.enabled: "true"
task.id: "id"
task.type: "noop"

View File

@ -22,18 +22,13 @@ package org.apache.druid.indexing.common.task;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableSet;
import org.apache.druid.data.input.Firehose;
import org.apache.druid.data.input.FirehoseFactory;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.actions.TaskActionClient;
import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.server.security.ResourceAction;
import javax.annotation.Nonnull;
@ -46,39 +41,18 @@ import java.util.UUID;
*/
public class NoopTask extends AbstractTask
{
private static final Logger log = new Logger(NoopTask.class);
private static final int DEFAULT_RUN_TIME = 2500;
private static final int DEFAULT_IS_READY_TIME = 0;
private static final IsReadyResult DEFAULT_IS_READY_RESULT = IsReadyResult.YES;
enum IsReadyResult
{
YES,
NO,
EXCEPTION
}
@JsonIgnore
private final long runTime;
@JsonIgnore
private final long isReadyTime;
@JsonIgnore
private final IsReadyResult isReadyResult;
@JsonIgnore
private final FirehoseFactory firehoseFactory;
@JsonCreator
public NoopTask(
@JsonProperty("id") String id,
@JsonProperty("groupId") String groupId,
@JsonProperty("dataSource") String dataSource,
@JsonProperty("runTime") long runTime,
@JsonProperty("runTime") long runTimeMillis,
@JsonProperty("isReadyTime") long isReadyTime,
@JsonProperty("isReadyResult") String isReadyResult,
@JsonProperty("firehose") FirehoseFactory firehoseFactory,
@JsonProperty("context") Map<String, Object> context
)
{
@ -90,12 +64,7 @@ public class NoopTask extends AbstractTask
context
);
this.runTime = (runTime == 0) ? DEFAULT_RUN_TIME : runTime;
this.isReadyTime = (isReadyTime == 0) ? DEFAULT_IS_READY_TIME : isReadyTime;
this.isReadyResult = (isReadyResult == null)
? DEFAULT_IS_READY_RESULT
: IsReadyResult.valueOf(StringUtils.toUpperCase(isReadyResult));
this.firehoseFactory = firehoseFactory;
this.runTime = (runTimeMillis == 0) ? DEFAULT_RUN_TIME : runTimeMillis;
}
@Override
@ -118,37 +87,10 @@ public class NoopTask extends AbstractTask
return runTime;
}
@JsonProperty
public long getIsReadyTime()
{
return isReadyTime;
}
@JsonProperty
public IsReadyResult getIsReadyResult()
{
return isReadyResult;
}
@JsonProperty("firehose")
public FirehoseFactory getFirehoseFactory()
{
return firehoseFactory;
}
@Override
public boolean isReady(TaskActionClient taskActionClient)
{
switch (isReadyResult) {
case YES:
return true;
case NO:
return false;
case EXCEPTION:
throw new ISE("Not ready. Never will be ready. Go away!");
default:
throw new AssertionError("#notreached");
}
return true;
}
@Override
@ -159,17 +101,8 @@ public class NoopTask extends AbstractTask
@Override
public TaskStatus runTask(TaskToolbox toolbox) throws Exception
{
if (firehoseFactory != null) {
log.info("Connecting firehose");
}
try (Firehose firehose = firehoseFactory != null ? firehoseFactory.connect(null, null) : null) {
log.info("Running noop task[%s]", getId());
log.info("Sleeping for %,d millis.", runTime);
Thread.sleep(runTime);
log.info("Woke up!");
return TaskStatus.success(getId());
}
Thread.sleep(runTime);
return TaskStatus.success(getId());
}
@Override
@ -180,33 +113,18 @@ public class NoopTask extends AbstractTask
public static NoopTask create()
{
return new NoopTask(null, null, null, 0, 0, null, null, null);
return forDatasource(null);
}
public static NoopTask withGroupId(String groupId)
public static NoopTask forDatasource(String datasource)
{
return new NoopTask(null, groupId, null, 0, 0, null, null, null);
return new NoopTask(null, null, datasource, 0, 0, null);
}
@VisibleForTesting
public static NoopTask create(String dataSource)
{
return new NoopTask(null, null, dataSource, 0, 0, null, null, null);
}
@VisibleForTesting
public static NoopTask create(int priority)
public static NoopTask ofPriority(int priority)
{
final Map<String, Object> context = new HashMap<>();
context.put(Tasks.PRIORITY_KEY, priority);
return new NoopTask(null, null, null, 0, 0, null, null, context);
}
@VisibleForTesting
public static NoopTask create(String id, int priority)
{
final Map<String, Object> context = new HashMap<>();
context.put(Tasks.PRIORITY_KEY, priority);
return new NoopTask(id, null, null, 0, 0, null, null, context);
return new NoopTask(null, null, null, 0, 0, context);
}
}

View File

@ -87,7 +87,7 @@ public class RemoteTaskActionClientTest
final BytesFullResponseHolder responseHolder = new BytesFullResponseHolder(httpResponse);
responseHolder.addChunk(objectMapper.writeValueAsBytes(expectedResponse));
final Task task = NoopTask.create("id", 0);
final Task task = NoopTask.create();
final LockListAction action = new LockListAction();
EasyMock.expect(
@ -123,7 +123,7 @@ public class RemoteTaskActionClientTest
StandardCharsets.UTF_8
).addChunk("testSubmitWithIllegalStatusCode");
final Task task = NoopTask.create("id", 0);
final Task task = NoopTask.create();
final LockListAction action = new LockListAction();
EasyMock.expect(
directOverlordClient.request(

View File

@ -358,7 +358,7 @@ public class SegmentAllocationQueueTest
private Task createTask(String datasource, String groupId)
{
Task task = new NoopTask(null, groupId, datasource, 0, 0, null, null, null);
Task task = new NoopTask(null, groupId, datasource, 0, 0, null);
taskActionTestKit.getTaskLockbox().add(task);
return task;
}

View File

@ -90,7 +90,7 @@ public class AbstractTaskTest
when(toolbox.getTaskActionClient()).thenReturn(taskActionClient);
AbstractTask task = new NoopTask("myID", null, null, 1, 0, null, null, null)
AbstractTask task = new NoopTask("myID", null, null, 1, 0, null)
{
@Nullable
@Override
@ -138,7 +138,7 @@ public class AbstractTaskTest
when(toolbox.getTaskActionClient()).thenReturn(taskActionClient);
AbstractTask task = new NoopTask("myID", null, null, 1, 0, null, null, null)
AbstractTask task = new NoopTask("myID", null, null, 1, 0, null)
{
@Nullable
@Override
@ -182,7 +182,7 @@ public class AbstractTaskTest
when(taskActionClient.submit(any())).thenReturn(TaskConfig.class);
when(toolbox.getTaskActionClient()).thenReturn(taskActionClient);
AbstractTask task = new NoopTask("myID", null, null, 1, 0, null, null, null)
AbstractTask task = new NoopTask("myID", null, null, 1, 0, null)
{
@Override
public TaskStatus runTask(TaskToolbox toolbox)

View File

@ -27,7 +27,7 @@ public class NoopTaskTest
@Test
public void testNullInputSources()
{
NoopTask task = new NoopTask("myID", null, null, 1, 0, null, null, null);
NoopTask task = NoopTask.create();
Assert.assertTrue(task.getInputSourceResources().isEmpty());
}
}

View File

@ -232,8 +232,6 @@ public class ParallelIndexPhaseRunnerTest extends AbstractParallelIndexSuperviso
null,
10,
0,
null,
null,
Collections.singletonMap(AbstractParallelIndexSupervisorTaskTest.DISABLE_TASK_INJECT_CONTEXT_KEY, true)
);
this.phaseRunner = phaseRunner;

View File

@ -231,7 +231,7 @@ public class TaskMonitorTest
TestTask(String id, long runTime, boolean shouldFail, boolean throwUnknownTypeIdError)
{
super(id, null, "testDataSource", runTime, 0, null, null, null);
super(id, null, "testDataSource", runTime, 0, null);
this.shouldFail = shouldFail;
this.throwUnknownTypeIdError = throwUnknownTypeIdError;
}

View File

@ -47,8 +47,8 @@ public class HeapMemoryTaskStorageTest
@Test
public void testRemoveTasksOlderThan()
{
final NoopTask task1 = NoopTask.create("foo");
final NoopTask task2 = NoopTask.create("bar");
final NoopTask task1 = NoopTask.create();
final NoopTask task2 = NoopTask.create();
storage.insert(task1, TaskStatus.success(task1.getId()));
storage.insert(task2, TaskStatus.running(task2.getId()));
@ -64,8 +64,8 @@ public class HeapMemoryTaskStorageTest
@Test
public void testGetTaskInfos()
{
final NoopTask task1 = NoopTask.create("foo");
final NoopTask task2 = NoopTask.create("bar");
final NoopTask task1 = NoopTask.create();
final NoopTask task2 = NoopTask.create();
storage.insert(task1, TaskStatus.success(task1.getId()));
storage.insert(task2, TaskStatus.running(task2.getId()));

View File

@ -66,14 +66,14 @@ public class IndexerMetadataStorageAdapterTest
DateTimes.of("2017-12-01"),
TaskStatus.running("id1"),
"dataSource",
NoopTask.create("id1", 0)
NoopTask.create()
),
new TaskInfo<>(
"id1",
DateTimes.of("2017-12-02"),
TaskStatus.running("id2"),
"dataSource",
NoopTask.create("id2", 0)
NoopTask.create()
)
);
EasyMock.expect(taskStorageQueryAdapter.getActiveTaskInfo("dataSource")).andReturn(taskInfos);
@ -101,14 +101,14 @@ public class IndexerMetadataStorageAdapterTest
DateTimes.of("2017-11-01"),
TaskStatus.running("id1"),
"dataSource",
NoopTask.create("id1", 0)
NoopTask.create()
),
new TaskInfo<>(
"id1",
DateTimes.of("2017-12-02"),
TaskStatus.running("id2"),
"dataSource",
NoopTask.create("id2", 0)
NoopTask.create()
)
);

View File

@ -154,21 +154,7 @@ public class SingleTaskBackgroundRunnerTest
@Test
public void testRun() throws ExecutionException, InterruptedException
{
NoopTask task = new NoopTask(null, null, null, 500L, 0, null, null, null)
{
@Nullable
@Override
public String setup(TaskToolbox toolbox)
{
return null;
}
@Override
public void cleanUp(TaskToolbox toolbox, TaskStatus taskStatus)
{
// do nothing
}
};
NoopTask task = new NoopTask(null, null, null, 500L, 0, null);
Assert.assertEquals(
TaskState.SUCCESS,
runner.run(task).get().getStatusCode()
@ -178,7 +164,7 @@ public class SingleTaskBackgroundRunnerTest
@Test
public void testGetQueryRunner() throws ExecutionException, InterruptedException
{
runner.run(new NoopTask(null, null, "foo", 500L, 0, null, null, null)).get().getStatusCode();
runner.run(new NoopTask(null, null, "foo", 500L, 0, null)).get().getStatusCode();
final QueryRunner<ScanResultValue> queryRunner =
Druids.newScanQueryBuilder()
@ -194,7 +180,7 @@ public class SingleTaskBackgroundRunnerTest
public void testStop() throws ExecutionException, InterruptedException, TimeoutException
{
final ListenableFuture<TaskStatus> future = runner.run(
new NoopTask(null, null, null, Long.MAX_VALUE, 0, null, null, null) // infinite task
new NoopTask(null, null, null, Long.MAX_VALUE, 0, null) // infinite task
);
runner.stop();
Assert.assertEquals(
@ -320,8 +306,6 @@ public class SingleTaskBackgroundRunnerTest
"datasource",
10000, // 10 sec
0,
null,
null,
null
)
);

View File

@ -66,6 +66,7 @@ import org.apache.druid.indexing.common.TestUtils;
import org.apache.druid.indexing.common.actions.LocalTaskActionClientFactory;
import org.apache.druid.indexing.common.actions.LockListAction;
import org.apache.druid.indexing.common.actions.SegmentInsertAction;
import org.apache.druid.indexing.common.actions.TaskActionClient;
import org.apache.druid.indexing.common.actions.TaskActionClientFactory;
import org.apache.druid.indexing.common.actions.TaskActionToolbox;
import org.apache.druid.indexing.common.actions.TaskAuditLogConfig;
@ -79,6 +80,7 @@ import org.apache.druid.indexing.common.task.IndexTask.IndexIOConfig;
import org.apache.druid.indexing.common.task.IndexTask.IndexIngestionSpec;
import org.apache.druid.indexing.common.task.IndexTask.IndexTuningConfig;
import org.apache.druid.indexing.common.task.KillUnusedSegmentsTask;
import org.apache.druid.indexing.common.task.NoopTask;
import org.apache.druid.indexing.common.task.NoopTestTaskReportFileWriter;
import org.apache.druid.indexing.common.task.RealtimeIndexTask;
import org.apache.druid.indexing.common.task.Task;
@ -88,6 +90,7 @@ import org.apache.druid.indexing.overlord.config.DefaultTaskConfig;
import org.apache.druid.indexing.overlord.config.TaskLockConfig;
import org.apache.druid.indexing.overlord.config.TaskQueueConfig;
import org.apache.druid.indexing.overlord.supervisor.SupervisorManager;
import org.apache.druid.indexing.test.TestDataSegmentAnnouncer;
import org.apache.druid.indexing.test.TestIndexerMetadataStorageCoordinator;
import org.apache.druid.indexing.worker.config.WorkerConfig;
import org.apache.druid.jackson.DefaultObjectMapper;
@ -98,6 +101,7 @@ import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.RE;
import org.apache.druid.java.util.common.Stopwatch;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.granularity.Granularities;
@ -129,18 +133,16 @@ import org.apache.druid.segment.indexing.RealtimeTuningConfig;
import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
import org.apache.druid.segment.join.JoinableFactoryWrapperTest;
import org.apache.druid.segment.join.NoopJoinableFactory;
import org.apache.druid.segment.loading.DataSegmentArchiver;
import org.apache.druid.segment.loading.DataSegmentMover;
import org.apache.druid.segment.loading.DataSegmentPusher;
import org.apache.druid.segment.loading.LocalDataSegmentKiller;
import org.apache.druid.segment.loading.LocalDataSegmentPusherConfig;
import org.apache.druid.segment.loading.NoopDataSegmentArchiver;
import org.apache.druid.segment.realtime.FireDepartment;
import org.apache.druid.segment.realtime.FireDepartmentTest;
import org.apache.druid.segment.realtime.appenderator.AppenderatorsManager;
import org.apache.druid.segment.realtime.appenderator.UnifiedIndexerAppenderatorsManager;
import org.apache.druid.segment.realtime.firehose.NoopChatHandlerProvider;
import org.apache.druid.server.DruidNode;
import org.apache.druid.server.coordination.DataSegmentAnnouncer;
import org.apache.druid.server.coordination.DataSegmentServerAnnouncer;
import org.apache.druid.server.coordination.ServerType;
import org.apache.druid.server.initialization.ServerConfig;
@ -452,8 +454,6 @@ public class TaskLifecycleTest extends InitializedNullHandlingTest
case HEAP_TASK_STORAGE: {
taskStorage = new HeapMemoryTaskStorage(
new TaskStorageConfig(null)
{
}
);
break;
}
@ -479,9 +479,8 @@ public class TaskLifecycleTest extends InitializedNullHandlingTest
break;
}
default: {
default:
throw new RE("Unknown task storage type [%s]", taskStorageType);
}
}
tsqa = new TaskStorageQueryAdapter(taskStorage, taskLockbox);
return taskStorage;
@ -622,29 +621,9 @@ public class TaskLifecycleTest extends InitializedNullHandlingTest
emitter,
dataSegmentPusher,
new LocalDataSegmentKiller(new LocalDataSegmentPusherConfig()),
new DataSegmentMover()
{
@Override
public DataSegment move(DataSegment dataSegment, Map<String, Object> targetLoadSpec)
{
return dataSegment;
}
},
new DataSegmentArchiver()
{
@Override
public DataSegment archive(DataSegment segment)
{
return segment;
}
@Override
public DataSegment restore(DataSegment segment)
{
return segment;
}
},
new DataSegmentAnnouncer()
(dataSegment, targetLoadSpec) -> dataSegment,
new NoopDataSegmentArchiver(),
new TestDataSegmentAnnouncer()
{
@Override
public void announceSegment(DataSegment segment)
@ -652,24 +631,7 @@ public class TaskLifecycleTest extends InitializedNullHandlingTest
announcedSinks++;
}
@Override
public void unannounceSegment(DataSegment segment)
{
}
@Override
public void announceSegments(Iterable<DataSegment> segments)
{
}
@Override
public void unannounceSegments(Iterable<DataSegment> segments)
{
}
}, // segment announcer
},
EasyMock.createNiceMock(DataSegmentServerAnnouncer.class),
handoffNotifierFactory,
() -> queryRunnerFactoryConglomerate, // query runner factory conglomerate corporation unionized collective
@ -740,7 +702,7 @@ public class TaskLifecycleTest extends InitializedNullHandlingTest
}
@Test
public void testIndexTask() throws Exception
public void testIndexTask()
{
final Task indexTask = new IndexTask(
null,
@ -824,7 +786,7 @@ public class TaskLifecycleTest extends InitializedNullHandlingTest
}
@Test
public void testIndexTaskFailure() throws Exception
public void testIndexTaskFailure()
{
final Task indexTask = new IndexTask(
null,
@ -1070,7 +1032,7 @@ public class TaskLifecycleTest extends InitializedNullHandlingTest
}
@Test
public void testRealtimeishTask() throws Exception
public void testRealtimeishTask()
{
final Task rtishTask = new RealtimeishTask();
final TaskStatus status = runTask(rtishTask);
@ -1082,13 +1044,9 @@ public class TaskLifecycleTest extends InitializedNullHandlingTest
}
@Test
public void testNoopTask() throws Exception
public void testNoopTask()
{
final Task noopTask = new DefaultObjectMapper().readValue(
"{\"type\":\"noop\", \"runTime\":\"100\"}\"",
Task.class
);
final TaskStatus status = runTask(noopTask);
final TaskStatus status = runTask(NoopTask.create());
Assert.assertEquals("statusCode", TaskState.SUCCESS, status.getStatusCode());
Assert.assertEquals(taskLocation, status.getLocation());
@ -1097,12 +1055,16 @@ public class TaskLifecycleTest extends InitializedNullHandlingTest
}
@Test
public void testNeverReadyTask() throws Exception
public void testNeverReadyTask()
{
final Task neverReadyTask = new DefaultObjectMapper().readValue(
"{\"type\":\"noop\", \"isReadyResult\":\"exception\"}\"",
Task.class
);
final Task neverReadyTask = new NoopTask(null, null, null, 0, 0, null)
{
@Override
public boolean isReady(TaskActionClient taskActionClient)
{
throw new ISE("Task will never be ready");
}
};
final TaskStatus status = runTask(neverReadyTask);
Assert.assertEquals("statusCode", TaskState.FAILED, status.getStatusCode());
@ -1112,7 +1074,7 @@ public class TaskLifecycleTest extends InitializedNullHandlingTest
}
@Test
public void testSimple() throws Exception
public void testSimple()
{
final Task task = new AbstractFixedIntervalTask(
"id1",
@ -1169,7 +1131,7 @@ public class TaskLifecycleTest extends InitializedNullHandlingTest
}
@Test
public void testBadInterval() throws Exception
public void testBadInterval()
{
final Task task = new AbstractFixedIntervalTask("id1", "id1", "ds", Intervals.of("2012-01-01/P1D"), null)
{
@ -1211,7 +1173,7 @@ public class TaskLifecycleTest extends InitializedNullHandlingTest
}
@Test
public void testBadVersion() throws Exception
public void testBadVersion()
{
final Task task = new AbstractFixedIntervalTask("id1", "id1", "ds", Intervals.of("2012-01-01/P1D"), null)
{
@ -1541,7 +1503,7 @@ public class TaskLifecycleTest extends InitializedNullHandlingTest
}
@Test
public void testLockRevoked() throws Exception
public void testLockRevoked()
{
final Task task = new AbstractFixedIntervalTask(
"id1",
@ -1595,15 +1557,9 @@ public class TaskLifecycleTest extends InitializedNullHandlingTest
Assert.assertEquals("segments nuked", 0, mdc.getNuked().size());
}
private TaskStatus runTask(final Task task) throws Exception
private TaskStatus runTask(final Task task)
{
final Task dummyTask = new DefaultObjectMapper().readValue(
"{\"type\":\"noop\", \"isReadyResult\":\"exception\"}\"",
Task.class
);
final long startTime = System.currentTimeMillis();
Preconditions.checkArgument(!task.getId().equals(dummyTask.getId()));
final Stopwatch taskRunDuration = Stopwatch.createStarted();
// Since multiple tasks can be run in a single unit test using runTask(), hence this check and synchronization
synchronized (this) {
@ -1611,29 +1567,26 @@ public class TaskLifecycleTest extends InitializedNullHandlingTest
taskQueue.start();
}
}
taskQueue.add(dummyTask);
taskQueue.add(task);
final String taskId = task.getId();
TaskStatus retVal = null;
for (final String taskId : ImmutableList.of(dummyTask.getId(), task.getId())) {
try {
TaskStatus status;
while ((status = tsqa.getStatus(taskId).get()).isRunnable()) {
if (System.currentTimeMillis() > startTime + 10 * 1000) {
throw new ISE("Where did the task go?!: %s", task.getId());
}
Thread.sleep(100);
}
if (taskId.equals(task.getId())) {
retVal = status;
try {
TaskStatus status;
while ((status = tsqa.getStatus(taskId).get()).isRunnable()) {
if (taskRunDuration.millisElapsed() > 10_000) {
throw new ISE("Where did the task go?!: %s", task.getId());
}
Thread.sleep(100);
}
catch (Exception e) {
throw new RuntimeException(e);
if (taskId.equals(task.getId())) {
retVal = status;
}
}
catch (Exception e) {
throw new RuntimeException(e);
}
return retVal;
}

View File

@ -102,8 +102,8 @@ public class TaskLockBoxConcurrencyTest
throws ExecutionException, InterruptedException, EntryExistsException
{
final Interval interval = Intervals.of("2017-01-01/2017-01-02");
final Task lowPriorityTask = NoopTask.create(10);
final Task highPriorityTask = NoopTask.create(100);
final Task lowPriorityTask = NoopTask.ofPriority(10);
final Task highPriorityTask = NoopTask.ofPriority(100);
lockbox.add(lowPriorityTask);
lockbox.add(highPriorityTask);
taskStorage.insert(lowPriorityTask, TaskStatus.running(lowPriorityTask.getId()));

View File

@ -221,9 +221,9 @@ public class TaskLockboxTest
@Test
public void testTryMixedLocks() throws EntryExistsException
{
final Task lowPriorityTask = NoopTask.create(0);
final Task lowPriorityTask2 = NoopTask.create(0);
final Task highPiorityTask = NoopTask.create(10);
final Task lowPriorityTask = NoopTask.ofPriority(0);
final Task lowPriorityTask2 = NoopTask.ofPriority(0);
final Task highPiorityTask = NoopTask.ofPriority(10);
final Interval interval1 = Intervals.of("2017-01-01/2017-01-02");
final Interval interval2 = Intervals.of("2017-01-02/2017-01-03");
final Interval interval3 = Intervals.of("2017-01-03/2017-01-04");
@ -476,13 +476,13 @@ public class TaskLockboxTest
{
final TaskLockbox originalBox = new TaskLockbox(taskStorage, metadataStorageCoordinator);
final Task task1 = NoopTask.create("task1", 10);
final Task task1 = NoopTask.ofPriority(10);
taskStorage.insert(task1, TaskStatus.running(task1.getId()));
originalBox.add(task1);
Assert.assertTrue(originalBox.tryLock(task1, new TimeChunkLockRequest(TaskLockType.EXCLUSIVE, task1, Intervals.of("2017/2018"), null)).isOk());
// task2 revokes task1
final Task task2 = NoopTask.create("task2", 100);
final Task task2 = NoopTask.ofPriority(100);
taskStorage.insert(task2, TaskStatus.running(task2.getId()));
originalBox.add(task2);
Assert.assertTrue(originalBox.tryLock(task2, new TimeChunkLockRequest(TaskLockType.EXCLUSIVE, task2, Intervals.of("2017/2018"), null)).isOk());
@ -492,11 +492,11 @@ public class TaskLockboxTest
.stream()
.collect(Collectors.toMap(Task::getId, task -> taskStorage.getLocks(task.getId())));
final List<TaskLock> task1Locks = beforeLocksInStorage.get("task1");
final List<TaskLock> task1Locks = beforeLocksInStorage.get(task1.getId());
Assert.assertEquals(1, task1Locks.size());
Assert.assertTrue(task1Locks.get(0).isRevoked());
final List<TaskLock> task2Locks = beforeLocksInStorage.get("task1");
final List<TaskLock> task2Locks = beforeLocksInStorage.get(task1.getId());
Assert.assertEquals(1, task2Locks.size());
Assert.assertTrue(task2Locks.get(0).isRevoked());
@ -578,7 +578,7 @@ public class TaskLockboxTest
Assert.assertTrue(tryTimeChunkLock(TaskLockType.SHARED, task, interval).isOk());
}
final Task highPriorityTask = NoopTask.create(100);
final Task highPriorityTask = NoopTask.ofPriority(100);
lockbox.add(highPriorityTask);
taskStorage.insert(highPriorityTask, TaskStatus.running(highPriorityTask.getId()));
final TaskLock lock = tryTimeChunkLock(TaskLockType.EXCLUSIVE, highPriorityTask, interval).getTaskLock();
@ -597,8 +597,8 @@ public class TaskLockboxTest
public void testDoInCriticalSectionWithRevokedLock() throws Exception
{
final Interval interval = Intervals.of("2017-01-01/2017-01-02");
final Task lowPriorityTask = NoopTask.create("task1", 0);
final Task highPriorityTask = NoopTask.create("task2", 10);
final Task lowPriorityTask = NoopTask.ofPriority(0);
final Task highPriorityTask = NoopTask.ofPriority(10);
lockbox.add(lowPriorityTask);
lockbox.add(highPriorityTask);
taskStorage.insert(lowPriorityTask, TaskStatus.running(lowPriorityTask.getId()));
@ -622,8 +622,8 @@ public class TaskLockboxTest
public void testAcquireLockAfterRevoked() throws EntryExistsException, InterruptedException
{
final Interval interval = Intervals.of("2017-01-01/2017-01-02");
final Task lowPriorityTask = NoopTask.create("task1", 0);
final Task highPriorityTask = NoopTask.create("task2", 10);
final Task lowPriorityTask = NoopTask.ofPriority(0);
final Task highPriorityTask = NoopTask.ofPriority(10);
lockbox.add(lowPriorityTask);
lockbox.add(highPriorityTask);
taskStorage.insert(lowPriorityTask, TaskStatus.running(lowPriorityTask.getId()));
@ -650,7 +650,7 @@ public class TaskLockboxTest
final List<Task> highPriorityTasks = new ArrayList<>();
for (int i = 0; i < 8; i++) {
final Task task = NoopTask.create(10);
final Task task = NoopTask.ofPriority(10);
lowPriorityTasks.add(task);
taskStorage.insert(task, TaskStatus.running(task.getId()));
lockbox.add(task);
@ -665,7 +665,7 @@ public class TaskLockboxTest
// Revoke some locks
for (int i = 0; i < 4; i++) {
final Task task = NoopTask.create(100);
final Task task = NoopTask.ofPriority(100);
highPriorityTasks.add(task);
taskStorage.insert(task, TaskStatus.running(task.getId()));
lockbox.add(task);
@ -711,8 +711,8 @@ public class TaskLockboxTest
@Test
public void testFindLockPosseAfterRevokeWithDifferentLockIntervals() throws EntryExistsException
{
final Task lowPriorityTask = NoopTask.create(0);
final Task highPriorityTask = NoopTask.create(10);
final Task lowPriorityTask = NoopTask.ofPriority(0);
final Task highPriorityTask = NoopTask.ofPriority(10);
taskStorage.insert(lowPriorityTask, TaskStatus.running(lowPriorityTask.getId()));
taskStorage.insert(highPriorityTask, TaskStatus.running(highPriorityTask.getId()));
@ -821,11 +821,11 @@ public class TaskLockboxTest
@Test
public void testSegmentAndTimeChunkLockForSameIntervalWithDifferentPriority() throws EntryExistsException
{
final Task task1 = NoopTask.create(10);
final Task task1 = NoopTask.ofPriority(10);
lockbox.add(task1);
taskStorage.insert(task1, TaskStatus.running(task1.getId()));
final Task task2 = NoopTask.create(100);
final Task task2 = NoopTask.ofPriority(100);
lockbox.add(task2);
taskStorage.insert(task2, TaskStatus.running(task2.getId()));
@ -1086,8 +1086,8 @@ public class TaskLockboxTest
@Test
public void testGetTimeChunkAndSegmentLockForSameGroup()
{
final Task task1 = NoopTask.withGroupId("groupId");
final Task task2 = NoopTask.withGroupId("groupId");
final Task task1 = new NoopTask(null, "groupId", null, 0, 0, null);
final Task task2 = new NoopTask(null, "groupId", null, 0, 0, null);
lockbox.add(task1);
lockbox.add(task2);
@ -1130,8 +1130,8 @@ public class TaskLockboxTest
@Test
public void testGetTimeChunkAndSegmentLockForDifferentGroup()
{
final Task task1 = NoopTask.withGroupId("groupId");
final Task task2 = NoopTask.withGroupId("groupId2");
final Task task1 = new NoopTask(null, "group1", "wiki", 0, 0, null);
final Task task2 = new NoopTask(null, "group2", "wiki", 0, 0, null);
lockbox.add(task1);
lockbox.add(task2);
@ -1155,7 +1155,7 @@ public class TaskLockboxTest
public void testGetLockedIntervals()
{
// Acquire locks for task1
final Task task1 = NoopTask.create("ds1");
final Task task1 = NoopTask.forDatasource("ds1");
lockbox.add(task1);
tryTimeChunkLock(
@ -1170,7 +1170,7 @@ public class TaskLockboxTest
);
// Acquire locks for task2
final Task task2 = NoopTask.create("ds2");
final Task task2 = NoopTask.forDatasource("ds2");
lockbox.add(task2);
tryTimeChunkLock(
TaskLockType.EXCLUSIVE,
@ -1204,7 +1204,7 @@ public class TaskLockboxTest
public void testGetLockedIntervalsForLowPriorityTask()
{
// Acquire lock for a low priority task
final Task lowPriorityTask = NoopTask.create(5);
final Task lowPriorityTask = NoopTask.ofPriority(5);
lockbox.add(lowPriorityTask);
taskStorage.insert(lowPriorityTask, TaskStatus.running(lowPriorityTask.getId()));
tryTimeChunkLock(
@ -1224,7 +1224,7 @@ public class TaskLockboxTest
public void testGetLockedIntervalsForEqualPriorityTask()
{
// Acquire lock for a low priority task
final Task task = NoopTask.create(5);
final Task task = NoopTask.ofPriority(5);
lockbox.add(task);
taskStorage.insert(task, TaskStatus.running(task.getId()));
tryTimeChunkLock(
@ -1622,7 +1622,7 @@ public class TaskLockboxTest
public void testGetLockedIntervalsForRevokedLocks()
{
// Acquire lock for a low priority task
final Task lowPriorityTask = NoopTask.create(5);
final Task lowPriorityTask = NoopTask.ofPriority(5);
lockbox.add(lowPriorityTask);
taskStorage.insert(lowPriorityTask, TaskStatus.running(lowPriorityTask.getId()));
tryTimeChunkLock(
@ -1643,7 +1643,7 @@ public class TaskLockboxTest
);
// Revoke the lowPriorityTask
final Task highPriorityTask = NoopTask.create(10);
final Task highPriorityTask = NoopTask.ofPriority(10);
lockbox.add(highPriorityTask);
tryTimeChunkLock(
TaskLockType.EXCLUSIVE,
@ -1667,8 +1667,8 @@ public class TaskLockboxTest
{
// Tasks to be failed have a group id with the substring "FailingLockAcquisition"
// Please refer to NullLockPosseTaskLockbox
final Task taskWithFailingLockAcquisition0 = NoopTask.withGroupId("FailingLockAcquisition");
final Task taskWithFailingLockAcquisition1 = NoopTask.withGroupId("FailingLockAcquisition");
final Task taskWithFailingLockAcquisition0 = new NoopTask(null, "FailingLockAcquisition", null, 0, 0, null);
final Task taskWithFailingLockAcquisition1 = new NoopTask(null, "FailingLockAcquisition", null, 0, 0, null);
final Task taskWithSuccessfulLockAcquisition = NoopTask.create();
taskStorage.insert(taskWithFailingLockAcquisition0, TaskStatus.running(taskWithFailingLockAcquisition0.getId()));
taskStorage.insert(taskWithFailingLockAcquisition1, TaskStatus.running(taskWithFailingLockAcquisition1.getId()));
@ -1787,7 +1787,7 @@ public class TaskLockboxTest
private TaskLock tryTaskLock(TaskLockType type, Interval interval, int priority)
{
final Task task = NoopTask.create(priority);
final Task task = NoopTask.ofPriority(priority);
tasks.add(task);
lockbox.add(task);
taskStorage.insert(task, TaskStatus.running(task.getId()));

View File

@ -144,7 +144,7 @@ public class TaskQueueScaleTest
// Add all tasks.
for (int i = 0; i < numTasks; i++) {
final TestTask testTask = new TestTask(i, 2000L /* runtime millis */);
final NoopTask testTask = createTestTask(2000L);
taskQueue.add(testTask);
}
@ -182,8 +182,7 @@ public class TaskQueueScaleTest
// Add all tasks.
final List<String> taskIds = new ArrayList<>();
for (int i = 0; i < numTasks; i++) {
final TestTask testTask = new TestTask(
i,
final NoopTask testTask = createTestTask(
Duration.standardHours(1).getMillis() /* very long runtime millis, so we can do a shutdown */
);
taskQueue.add(testTask);
@ -215,27 +214,9 @@ public class TaskQueueScaleTest
Assert.assertEquals("all tasks should have completed", numTasks, completed);
}
private static class TestTask extends NoopTask
private NoopTask createTestTask(long runtimeMillis)
{
private final int number;
private final long runtime;
public TestTask(int number, long runtime)
{
super(null, null, DATASOURCE, 0, 0, null, null, Collections.emptyMap());
this.number = number;
this.runtime = runtime;
}
public int getNumber()
{
return number;
}
public long getRuntimeMillis()
{
return runtime;
}
return new NoopTask(null, null, DATASOURCE, runtimeMillis, 0, Collections.emptyMap());
}
private static class TestTaskRunner implements TaskRunner
@ -289,7 +270,7 @@ public class TaskQueueScaleTest
log.error(e, "Error in scheduled executor");
}
},
((TestTask) task).getRuntimeMillis(),
((NoopTask) task).getRunTime(),
TimeUnit.MILLISECONDS
);
}

View File

@ -129,9 +129,7 @@ public class ZkWorkerTest
@Test
public void testCanReadIdFromAJacksonSerializedTaskAnnouncement() throws JsonProcessingException
{
final String expectedTaskId = "task01234";
Task task0 = NoopTask.create(expectedTaskId, 0);
Task task0 = NoopTask.create();
TaskAnnouncement taskAnnouncement = TaskAnnouncement.create(
task0,
TaskStatus.running(task0.getId()),
@ -145,6 +143,6 @@ public class ZkWorkerTest
ChildData zkNode = new ChildData("/a/b/c", new Stat(), serialized);
String actualExtractedTaskId = extract.apply(zkNode);
Assert.assertEquals(expectedTaskId, actualExtractedTaskId);
Assert.assertEquals(task0.getId(), actualExtractedTaskId);
}
}

View File

@ -137,7 +137,7 @@ public class HttpRemoteTaskRunnerTest
int numTasks = 8;
List<Future<TaskStatus>> futures = new ArrayList<>();
for (int i = 0; i < numTasks; i++) {
futures.add(taskRunner.run(NoopTask.create("task-id-" + i, 0)));
futures.add(taskRunner.run(NoopTask.create()));
}
for (Future<TaskStatus> future : futures) {
@ -284,9 +284,9 @@ public class HttpRemoteTaskRunnerTest
.andReturn(druidNodeDiscovery);
EasyMock.replay(druidNodeDiscoveryProvider);
Task task1 = NoopTask.create("task-id-1", 0);
Task task2 = NoopTask.create("task-id-2", 0);
Task task3 = NoopTask.create("task-id-3", 0);
Task task1 = NoopTask.create();
Task task2 = NoopTask.create();
Task task3 = NoopTask.create();
HttpRemoteTaskRunner taskRunner = new HttpRemoteTaskRunner(
TestHelper.makeJsonMapper(),
@ -379,11 +379,11 @@ public class HttpRemoteTaskRunnerTest
ConcurrentMap<String, CustomFunction> workerHolders = new ConcurrentHashMap<>();
Task task1 = NoopTask.create("task-id-1", 0);
Task task2 = NoopTask.create("task-id-2", 0);
Task task3 = NoopTask.create("task-id-3", 0);
Task task4 = NoopTask.create("task-id-4", 0);
Task task5 = NoopTask.create("task-id-5", 0);
Task task1 = NoopTask.create();
Task task2 = NoopTask.create();
Task task3 = NoopTask.create();
Task task4 = NoopTask.create();
Task task5 = NoopTask.create();
TaskStorage taskStorageMock = EasyMock.createStrictMock(TaskStorage.class);
EasyMock.expect(taskStorageMock.getStatus(task1.getId())).andReturn(Optional.absent());
@ -585,8 +585,8 @@ public class HttpRemoteTaskRunnerTest
taskRunner.start();
Task task1 = NoopTask.create("task-id-1", 0);
Task task2 = NoopTask.create("task-id-2", 0);
Task task1 = NoopTask.create();
Task task2 = NoopTask.create();
DiscoveryDruidNode druidNode = new DiscoveryDruidNode(
new DruidNode("service", "host", false, 1234, null, true, false),
@ -761,8 +761,8 @@ public class HttpRemoteTaskRunnerTest
taskRunner.start();
Task task1 = NoopTask.create("task-id-1", 0);
Task task2 = NoopTask.create("task-id-2", 0);
Task task1 = NoopTask.create();
Task task2 = NoopTask.create();
DiscoveryDruidNode druidNode = new DiscoveryDruidNode(
new DruidNode("service", "host", false, 1234, null, true, false),
@ -904,8 +904,8 @@ public class HttpRemoteTaskRunnerTest
.andReturn(druidNodeDiscovery);
EasyMock.replay(druidNodeDiscoveryProvider);
Task task1 = NoopTask.create("task-id-1", 0);
Task task2 = NoopTask.create("task-id-2", 0);
Task task1 = NoopTask.create();
Task task2 = NoopTask.create();
String additionalWorkerCategory = "category2";
ConcurrentMap<String, CustomFunction> workerHolders = new ConcurrentHashMap<>();
@ -1140,7 +1140,7 @@ public class HttpRemoteTaskRunnerTest
@Test
public void testTaskAddedOrUpdated1() throws Exception
{
Task task = NoopTask.create("task");
Task task = NoopTask.create();
List<Object> listenerNotificationsAccumulator = new ArrayList<>();
HttpRemoteTaskRunner taskRunner = createTaskRunnerForTestTaskAddedOrUpdated(
EasyMock.createStrictMock(TaskStorage.class),
@ -1269,7 +1269,7 @@ public class HttpRemoteTaskRunnerTest
@Test
public void testTaskAddedOrUpdated2() throws Exception
{
Task task = NoopTask.create("task");
Task task = NoopTask.create();
List<Object> listenerNotificationsAccumulator = new ArrayList<>();
HttpRemoteTaskRunner taskRunner = createTaskRunnerForTestTaskAddedOrUpdated(
EasyMock.createStrictMock(TaskStorage.class),
@ -1315,12 +1315,12 @@ public class HttpRemoteTaskRunnerTest
@Test
public void testTaskAddedOrUpdated3()
{
Task task1 = NoopTask.create("task1");
Task task2 = NoopTask.create("task2");
Task task3 = NoopTask.create("task3");
Task task4 = NoopTask.create("task4");
Task task5 = NoopTask.create("task5");
Task task6 = NoopTask.create("task6");
Task task1 = NoopTask.create();
Task task2 = NoopTask.create();
Task task3 = NoopTask.create();
Task task4 = NoopTask.create();
Task task5 = NoopTask.create();
Task task6 = NoopTask.create();
TaskStorage taskStorage = EasyMock.createMock(TaskStorage.class);
EasyMock.expect(taskStorage.getStatus(task1.getId())).andReturn(Optional.of(TaskStatus.running(task1.getId())));
@ -1501,7 +1501,7 @@ public class HttpRemoteTaskRunnerTest
druidNodeDiscovery.getListeners().get(0).nodesAdded(ImmutableList.of(druidNode1));
Future<TaskStatus> future = taskRunner.run(NoopTask.create("task-id", 0));
Future<TaskStatus> future = taskRunner.run(NoopTask.create());
Assert.assertTrue(future.get().isFailure());
Assert.assertNotNull(future.get().getErrorMsg());
Assert.assertTrue(
@ -1613,7 +1613,7 @@ public class HttpRemoteTaskRunnerTest
druidNodeDiscovery.getListeners().get(0).nodesAdded(ImmutableList.of(druidNode1));
Future<TaskStatus> future = taskRunner.run(NoopTask.create("task-id", 0));
Future<TaskStatus> future = taskRunner.run(NoopTask.create());
Assert.assertTrue(future.get().isFailure());
Assert.assertNotNull(future.get().getErrorMsg());
Assert.assertTrue(
@ -1646,7 +1646,7 @@ public class HttpRemoteTaskRunnerTest
taskRunner.start();
Task pendingTask = NoopTask.create("pendingTask");
Task pendingTask = NoopTask.create();
taskRunner.run(pendingTask);
// Pending task is not cleaned up immediately
taskRunner.shutdown(pendingTask.getId(), "Forced shutdown");
@ -1657,7 +1657,7 @@ public class HttpRemoteTaskRunnerTest
.contains(pendingTask.getId())
);
Task completedTask = NoopTask.create("completedTask");
Task completedTask = NoopTask.create();
taskRunner.run(completedTask);
taskRunner.taskAddedOrUpdated(TaskAnnouncement.create(
completedTask,

View File

@ -49,10 +49,10 @@ public class WorkerHolderTest
{
List<TaskAnnouncement> updates = new ArrayList<>();
Task task0 = NoopTask.create("task0", 0);
Task task1 = NoopTask.create("task1", 0);
Task task2 = NoopTask.create("task2", 0);
Task task3 = NoopTask.create("task3", 0);
Task task0 = NoopTask.create();
Task task1 = NoopTask.create();
Task task2 = NoopTask.create();
Task task3 = NoopTask.create();
WorkerHolder workerHolder = new WorkerHolder(
TestHelper.makeJsonMapper(),

View File

@ -897,7 +897,7 @@ public class OverlordResourceTest
);
// Verify that taskPost fails for user who has only datasource read access
Task task = NoopTask.create(Datasources.WIKIPEDIA);
Task task = NoopTask.forDatasource(Datasources.WIKIPEDIA);
expectedException.expect(ForbiddenException.class);
expectedException.expect(ForbiddenException.class);
overlordResource.taskPost(task, req);
@ -941,7 +941,7 @@ public class OverlordResourceTest
// set authorization token properly, but isn't called in this test.
// This should be fixed in https://github.com/apache/druid/issues/6685.
// expectAuthorizationTokenCheck();
final NoopTask task = NoopTask.create("mydatasource");
final NoopTask task = NoopTask.create();
EasyMock.expect(taskStorageQueryAdapter.getTask("mytask"))
.andReturn(Optional.of(task));
@ -980,10 +980,11 @@ public class OverlordResourceTest
// set authorization token properly, but isn't called in this test.
// This should be fixed in https://github.com/apache/druid/issues/6685.
// expectAuthorizationTokenCheck();
final Task task = NoopTask.create("mytask", 0);
final TaskStatus status = TaskStatus.running("mytask");
final Task task = NoopTask.create();
final String taskId = task.getId();
final TaskStatus status = TaskStatus.running(taskId);
EasyMock.expect(taskStorageQueryAdapter.getTaskInfo("mytask"))
EasyMock.expect(taskStorageQueryAdapter.getTaskInfo(taskId))
.andReturn(new TaskInfo(
task.getId(),
DateTimes.of("2018-01-01"),
@ -1008,7 +1009,7 @@ public class OverlordResourceTest
authConfig
);
final Response response1 = overlordResource.getTaskStatus("mytask");
final Response response1 = overlordResource.getTaskStatus(taskId);
final TaskStatusResponse taskStatusResponse1 = TestHelper.makeJsonMapper().readValue(
TestHelper.makeJsonMapper().writeValueAsString(response1.getEntity()),
TaskStatusResponse.class
@ -1017,10 +1018,10 @@ public class OverlordResourceTest
Assert.assertEquals(tsp.getStatusCode(), tsp.getStatus());
Assert.assertEquals(
new TaskStatusResponse(
"mytask",
taskId,
new TaskStatusPlus(
"mytask",
"mytask",
task.getId(),
task.getGroupId(),
"noop",
DateTimes.of("2018-01-01"),
DateTimes.EPOCH,
@ -1155,14 +1156,14 @@ public class OverlordResourceTest
DateTime.now(ISOChronology.getInstanceUTC()),
TaskStatus.success("id_1"),
"datasource",
NoopTask.create("id_1", 1)
NoopTask.create()
),
new TaskInfo<>(
"id_2",
DateTime.now(ISOChronology.getInstanceUTC()),
TaskStatus.success("id_2"),
"datasource",
NoopTask.create("id_2", 1)
NoopTask.create()
)
));
mockQueue.shutdown("id_1", "Shutdown request from user");

View File

@ -119,6 +119,11 @@ public class OverlordTest
private final String goodTaskId = "aaa";
private final String badTaskId = "zzz";
private Task task0;
private Task task1;
private String taskId0;
private String taskId1;
private void setupServerAndCurator() throws Exception
{
server = new TestingServer();
@ -163,12 +168,18 @@ public class OverlordTest
taskLockbox = new TaskLockbox(taskStorage, mdc);
task0 = NoopTask.create();
taskId0 = task0.getId();
task1 = NoopTask.create();
taskId1 = task1.getId();
runTaskCountDownLatches = new HashMap<>();
runTaskCountDownLatches.put("0", new CountDownLatch(1));
runTaskCountDownLatches.put("1", new CountDownLatch(1));
runTaskCountDownLatches.put(taskId0, new CountDownLatch(1));
runTaskCountDownLatches.put(taskId1, new CountDownLatch(1));
taskCompletionCountDownLatches = new HashMap<>();
taskCompletionCountDownLatches.put("0", new CountDownLatch(1));
taskCompletionCountDownLatches.put("1", new CountDownLatch(1));
taskCompletionCountDownLatches.put(taskId0, new CountDownLatch(1));
taskCompletionCountDownLatches.put(taskId1, new CountDownLatch(1));
announcementLatch = new CountDownLatch(1);
setupServerAndCurator();
curator.start();
@ -178,9 +189,9 @@ public class OverlordTest
// Add two tasks with conflicting locks
// The bad task (The one with a lexicographically larger name) must be failed
Task badTask = new NoopTask(badTaskId, badTaskId, "datasource", 10_000, 0, null, null, null);
Task badTask = new NoopTask(badTaskId, badTaskId, "datasource", 10_000, 0, null);
TaskLock badLock = new TimeChunkLock(null, badTaskId, "datasource", Intervals.ETERNITY, "version1", 50);
Task goodTask = new NoopTask(goodTaskId, goodTaskId, "datasource", 0, 0, null, null, null);
Task goodTask = new NoopTask(goodTaskId, goodTaskId, "datasource", 0, 0, null);
TaskLock goodLock = new TimeChunkLock(null, goodTaskId, "datasource", Intervals.ETERNITY, "version0", 50);
taskStorage.insert(goodTask, TaskStatus.running(goodTaskId));
taskStorage.insert(badTask, TaskStatus.running(badTaskId));
@ -271,57 +282,53 @@ public class OverlordTest
taskCompletionCountDownLatches.get(goodTaskId).countDown();
waitForTaskStatus(goodTaskId, TaskState.SUCCESS);
final String taskId_0 = "0";
NoopTask task_0 = NoopTask.create(taskId_0, 0);
response = overlordResource.taskPost(task_0, req);
response = overlordResource.taskPost(task0, req);
Assert.assertEquals(200, response.getStatus());
Assert.assertEquals(ImmutableMap.of("task", taskId_0), response.getEntity());
Assert.assertEquals(ImmutableMap.of("task", taskId0), response.getEntity());
// Duplicate task - should fail
response = overlordResource.taskPost(task_0, req);
response = overlordResource.taskPost(task0, req);
Assert.assertEquals(400, response.getStatus());
// Task payload for task_0 should be present in taskStorage
response = overlordResource.getTaskPayload(taskId_0);
Assert.assertEquals(task_0, ((TaskPayloadResponse) response.getEntity()).getPayload());
response = overlordResource.getTaskPayload(taskId0);
Assert.assertEquals(task0, ((TaskPayloadResponse) response.getEntity()).getPayload());
// Task not present in taskStorage - should fail
response = overlordResource.getTaskPayload("whatever");
Assert.assertEquals(404, response.getStatus());
// Task status of the submitted task should be running
response = overlordResource.getTaskStatus(taskId_0);
Assert.assertEquals(taskId_0, ((TaskStatusResponse) response.getEntity()).getTask());
response = overlordResource.getTaskStatus(taskId0);
Assert.assertEquals(taskId0, ((TaskStatusResponse) response.getEntity()).getTask());
Assert.assertEquals(
TaskStatus.running(taskId_0).getStatusCode(),
TaskStatus.running(taskId0).getStatusCode(),
((TaskStatusResponse) response.getEntity()).getStatus().getStatusCode()
);
// Simulate completion of task_0
taskCompletionCountDownLatches.get(taskId_0).countDown();
taskCompletionCountDownLatches.get(taskId0).countDown();
// Wait for taskQueue to handle success status of task_0
waitForTaskStatus(taskId_0, TaskState.SUCCESS);
waitForTaskStatus(taskId0, TaskState.SUCCESS);
// Manually insert task in taskStorage
// Verifies sync from storage
final String taskId_1 = "1";
NoopTask task_1 = NoopTask.create(taskId_1, 0);
taskStorage.insert(task_1, TaskStatus.running(taskId_1));
taskStorage.insert(task1, TaskStatus.running(taskId1));
// Wait for task runner to run task_1
runTaskCountDownLatches.get(taskId_1).await();
runTaskCountDownLatches.get(taskId1).await();
response = overlordResource.getRunningTasks(null, req);
// 1 task that was manually inserted should be in running state
Assert.assertEquals(1, (((List) response.getEntity()).size()));
final TaskStatusPlus taskResponseObject = ((List<TaskStatusPlus>) response
.getEntity()).get(0);
Assert.assertEquals(taskId_1, taskResponseObject.getId());
Assert.assertEquals(taskId1, taskResponseObject.getId());
Assert.assertEquals(TASK_LOCATION, taskResponseObject.getLocation());
// Simulate completion of task_1
taskCompletionCountDownLatches.get(taskId_1).countDown();
taskCompletionCountDownLatches.get(taskId1).countDown();
// Wait for taskQueue to handle success status of task_1
waitForTaskStatus(taskId_1, TaskState.SUCCESS);
waitForTaskStatus(taskId1, TaskState.SUCCESS);
// should return number of tasks which are not in running state
response = overlordResource.getCompleteTasks(null, req);

View File

@ -43,14 +43,7 @@ public class EqualDistributionWithAffinityWorkerSelectStrategyTest
new AffinityConfig(ImmutableMap.of("foo", ImmutableSet.of("localhost1", "localhost2", "localhost3")), false)
);
NoopTask noopTask = new NoopTask(null, null, null, 1, 0, null, null, null)
{
@Override
public String getDataSource()
{
return "foo";
}
};
NoopTask noopTask = NoopTask.forDatasource("foo");
ImmutableWorkerInfo worker = strategy.findWorkerForTask(
new RemoteTaskRunnerConfig(),
ImmutableMap.of(
@ -113,7 +106,7 @@ public class EqualDistributionWithAffinityWorkerSelectStrategyTest
DateTimes.nowUtc()
)
),
new NoopTask(null, null, null, 1, 0, null, null, null)
NoopTask.create()
);
Assert.assertEquals("lhost", worker.getWorker().getHost());
}
@ -136,7 +129,7 @@ public class EqualDistributionWithAffinityWorkerSelectStrategyTest
DateTimes.nowUtc()
)
),
new NoopTask(null, null, null, 1, 0, null, null, null)
NoopTask.create()
);
Assert.assertNull(worker);
}

View File

@ -182,7 +182,7 @@ public class EqualDistributionWithCategorySpecWorkerSelectStrategyTest
ImmutableWorkerInfo worker = strategy.findWorkerForTask(
new RemoteTaskRunnerConfig(),
WORKERS_FOR_TIER_TESTS,
new NoopTask(null, null, "ds1", 1, 0, null, null, null)
NoopTask.forDatasource("ds1")
);
return worker;

View File

@ -89,14 +89,7 @@ public class EqualDistributionWorkerSelectStrategyTest
DateTimes.nowUtc()
)
),
new NoopTask(null, null, null, 1, 0, null, null, null)
{
@Override
public String getDataSource()
{
return "foo";
}
}
NoopTask.forDatasource("foo")
);
Assert.assertEquals("lhost", worker.getWorker().getHost());
}
@ -124,14 +117,7 @@ public class EqualDistributionWorkerSelectStrategyTest
DateTimes.nowUtc()
)
),
new NoopTask(null, null, null, 1, 0, null, null, null)
{
@Override
public String getDataSource()
{
return "foo";
}
}
NoopTask.forDatasource("foo")
);
Assert.assertEquals("localhost", worker.getWorker().getHost());
}
@ -160,14 +146,7 @@ public class EqualDistributionWorkerSelectStrategyTest
DateTimes.nowUtc()
)
),
new NoopTask(null, null, null, 1, 0, null, null, null)
{
@Override
public String getDataSource()
{
return "foo";
}
}
NoopTask.forDatasource("foo")
);
Assert.assertEquals("enableHost", worker.getWorker().getHost());
}
@ -196,14 +175,7 @@ public class EqualDistributionWorkerSelectStrategyTest
DateTimes.nowUtc()
)
),
new NoopTask(null, null, null, 1, 0, null, null, null)
{
@Override
public String getDataSource()
{
return "foo";
}
}
NoopTask.forDatasource("foo")
);
Assert.assertEquals("enableHost", worker.getWorker().getHost());
}
@ -282,13 +254,6 @@ public class EqualDistributionWorkerSelectStrategyTest
private static NoopTask createDummyTask(final String dataSource)
{
return new NoopTask(null, null, null, 1, 0, null, null, null)
{
@Override
public String getDataSource()
{
return dataSource;
}
};
return NoopTask.forDatasource(dataSource);
}
}

View File

@ -59,14 +59,7 @@ public class FillCapacityWithAffinityWorkerSelectStrategyTest
DateTimes.nowUtc()
)
),
new NoopTask(null, null, null, 1, 0, null, null, null)
{
@Override
public String getDataSource()
{
return "foo";
}
}
NoopTask.forDatasource("foo")
);
Assert.assertEquals("localhost", worker.getWorker().getHost());
}
@ -96,7 +89,7 @@ public class FillCapacityWithAffinityWorkerSelectStrategyTest
DateTimes.nowUtc()
)
),
new NoopTask(null, null, null, 1, 0, null, null, null)
NoopTask.create()
);
Assert.assertEquals("lhost", worker.getWorker().getHost());
}
@ -119,7 +112,7 @@ public class FillCapacityWithAffinityWorkerSelectStrategyTest
DateTimes.nowUtc()
)
),
new NoopTask(null, null, null, 1, 0, null, null, null)
NoopTask.create()
);
Assert.assertNull(worker);
}

View File

@ -182,7 +182,7 @@ public class FillCapacityWithCategorySpecWorkerSelectStrategyTest
ImmutableWorkerInfo worker = strategy.findWorkerForTask(
new RemoteTaskRunnerConfig(),
WORKERS_FOR_TIER_TESTS,
new NoopTask(null, null, "ds1", 1, 0, null, null, null)
NoopTask.forDatasource("ds1")
);
return worker;

View File

@ -295,7 +295,7 @@ public class WorkerTaskManagerTest
@Test(timeout = 30_000L)
public void testTaskStatusWhenTaskRunnerFutureThrowsException() throws Exception
{
Task task = new NoopTask("id", null, null, 100, 0, null, null, ImmutableMap.of(Tasks.PRIORITY_KEY, 0))
Task task = new NoopTask("id", null, null, 100, 0, ImmutableMap.of(Tasks.PRIORITY_KEY, 0))
{
@Override
public TaskStatus runTask(TaskToolbox toolbox)
@ -444,7 +444,7 @@ public class WorkerTaskManagerTest
private NoopTask createNoopTask(String id)
{
return new NoopTask(id, null, null, 100, 0, null, null, ImmutableMap.of(Tasks.PRIORITY_KEY, 0));
return new NoopTask(id, null, null, 100, 0, ImmutableMap.of(Tasks.PRIORITY_KEY, 0));
}
/**
@ -456,7 +456,7 @@ public class WorkerTaskManagerTest
EasyMock.expect(overlordClient.withRetryPolicy(EasyMock.anyObject())).andReturn(overlordClient).anyTimes();
EasyMock.replay(overlordClient);
final Task task = new NoopTask("id", null, null, 100, 0, null, null, ImmutableMap.of(Tasks.PRIORITY_KEY, 0));
final Task task = new NoopTask("id", null, null, 100, 0, ImmutableMap.of(Tasks.PRIORITY_KEY, 0));
// Scheduled scheduleCompletedTasksCleanup will not run, because initialDelay is 1 minute, which is longer than
// the 30-second timeout of this test case.