From 5e23674fe53339308ba1caaca48a80e7c9680da7 Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Thu, 17 Mar 2022 10:47:45 +0900 Subject: [PATCH] Fix a race condition in the '/tasks' Overlord API (#12330) * finds complete and active tasks from the same snapshot * overlord resource * unit test * integration test * javadoc and cleanup * more cleanup * fix test and add more --- .../apache/druid/indexer/RunnerTaskState.java | 2 + .../org/apache/druid/indexer/TaskInfo.java | 1 - .../org/apache/druid/indexer/TaskStatus.java | 5 +- .../apache/druid/indexer/TaskStatusPlus.java | 2 +- .../MetadataStorageActionHandler.java | 32 +- .../org/apache/druid/metadata/TaskLookup.java | 186 ++++++ .../apache/druid/metadata/TaskLookupTest.java | 109 ++++ .../overlord/HeapMemoryTaskStorage.java | 82 ++- .../overlord/MetadataTaskStorage.java | 51 +- .../druid/indexing/overlord/TaskStorage.java | 38 +- .../overlord/TaskStorageQueryAdapter.java | 19 +- .../overlord/http/OverlordResource.java | 544 ++++++++---------- .../overlord/http/OverlordResourceTest.java | 541 +++++++++-------- .../clients/OverlordResourceTestClient.java | 5 + .../ITOverlordResourceNotFoundTest.java} | 4 +- .../tests/api/ITOverlordResourceTest.java | 72 +++ .../api/overlord-resource-test-task.json | 87 +++ .../DerbyMetadataStorageActionHandler.java | 50 +- .../MySQLMetadataStorageActionHandler.java | 50 +- ...ostgreSQLMetadataStorageActionHandler.java | 50 +- .../SQLMetadataStorageActionHandler.java | 107 +++- ...SQLServerMetadataStorageActionHandler.java | 48 +- .../SQLMetadataStorageActionHandlerTest.java | 38 +- 23 files changed, 1270 insertions(+), 853 deletions(-) create mode 100644 core/src/main/java/org/apache/druid/metadata/TaskLookup.java create mode 100644 core/src/test/java/org/apache/druid/metadata/TaskLookupTest.java rename integration-tests/src/test/java/org/apache/druid/tests/{query/ITOverlordResourceTest.java => api/ITOverlordResourceNotFoundTest.java} (97%) create mode 100644 integration-tests/src/test/java/org/apache/druid/tests/api/ITOverlordResourceTest.java create mode 100644 integration-tests/src/test/resources/api/overlord-resource-test-task.json diff --git a/core/src/main/java/org/apache/druid/indexer/RunnerTaskState.java b/core/src/main/java/org/apache/druid/indexer/RunnerTaskState.java index b7a28cb2825..5345e918824 100644 --- a/core/src/main/java/org/apache/druid/indexer/RunnerTaskState.java +++ b/core/src/main/java/org/apache/druid/indexer/RunnerTaskState.java @@ -25,6 +25,8 @@ package org.apache.druid.indexer; */ public enum RunnerTaskState { + // Waiting tasks are not tracked. + // Instead, they are computed by (all tasks in metadata store - all tasks in taskRunner). WAITING, PENDING, RUNNING, diff --git a/core/src/main/java/org/apache/druid/indexer/TaskInfo.java b/core/src/main/java/org/apache/druid/indexer/TaskInfo.java index 411d87af6a9..aa9aa1097db 100644 --- a/core/src/main/java/org/apache/druid/indexer/TaskInfo.java +++ b/core/src/main/java/org/apache/druid/indexer/TaskInfo.java @@ -77,4 +77,3 @@ public class TaskInfo return task; } } - diff --git a/core/src/main/java/org/apache/druid/indexer/TaskStatus.java b/core/src/main/java/org/apache/druid/indexer/TaskStatus.java index 91f3a7d5f3d..c6b17bc1e00 100644 --- a/core/src/main/java/org/apache/druid/indexer/TaskStatus.java +++ b/core/src/main/java/org/apache/druid/indexer/TaskStatus.java @@ -22,6 +22,7 @@ package org.apache.druid.indexer; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Objects; import com.google.common.base.Preconditions; @@ -66,10 +67,10 @@ public class TaskStatus } /** - * This method is deprecated because it does not handle the error message of failed task status properly. + * This method is deprecated for production because it does not handle the error message of failed task status properly. * Use {@link #success(String)} or {@link #failure(String, String)} instead. */ - @Deprecated + @VisibleForTesting public static TaskStatus fromCode(String taskId, TaskState code) { return new TaskStatus(taskId, code, -1, null, null); diff --git a/core/src/main/java/org/apache/druid/indexer/TaskStatusPlus.java b/core/src/main/java/org/apache/druid/indexer/TaskStatusPlus.java index 23d36a688c9..75bbefa0eaf 100644 --- a/core/src/main/java/org/apache/druid/indexer/TaskStatusPlus.java +++ b/core/src/main/java/org/apache/druid/indexer/TaskStatusPlus.java @@ -48,7 +48,7 @@ public class TaskStatusPlus public TaskStatusPlus( String id, @Nullable String groupId, - String type, // nullable for backward compatibility + @Nullable String type, // nullable for backward compatibility DateTime createdTime, DateTime queueInsertionTime, @Nullable TaskState statusCode, diff --git a/core/src/main/java/org/apache/druid/metadata/MetadataStorageActionHandler.java b/core/src/main/java/org/apache/druid/metadata/MetadataStorageActionHandler.java index d25829cd0bc..b4d50aaa3d2 100644 --- a/core/src/main/java/org/apache/druid/metadata/MetadataStorageActionHandler.java +++ b/core/src/main/java/org/apache/druid/metadata/MetadataStorageActionHandler.java @@ -21,10 +21,12 @@ package org.apache.druid.metadata; import com.google.common.base.Optional; import org.apache.druid.indexer.TaskInfo; +import org.apache.druid.metadata.TaskLookup.TaskLookupType; import org.joda.time.DateTime; import javax.annotation.Nullable; import javax.validation.constraints.NotNull; +import java.util.Collections; import java.util.List; import java.util.Map; @@ -82,26 +84,28 @@ public interface MetadataStorageActionHandler getTaskInfo(String entryId); /** - * Return up to {@code maxNumStatuses} {@link TaskInfo} objects for all inactive entries - * created on or later than the given timestamp + * Returns a list of {@link TaskInfo} from metadata store that matches to the given filters. * - * @param timestamp timestamp - * @param maxNumStatuses maxNumStatuses + * If {@code taskLookups} includes {@link TaskLookupType#ACTIVE}, it returns all active tasks in the metadata store. + * If {@code taskLookups} includes {@link TaskLookupType#COMPLETE}, it returns all complete tasks in the metadata + * store. For complete tasks, additional filters in {@code CompleteTaskLookup} can be applied. + * All lookups should be processed atomically if there are more than one lookup is given. * - * @return list of {@link TaskInfo} + * @param taskLookups task lookup type and filters. + * @param datasource datasource filter */ - List> getCompletedTaskInfo( - DateTime timestamp, - @Nullable Integer maxNumStatuses, + List> getTaskInfos( + Map taskLookups, @Nullable String datasource ); - /** - * Return {@link TaskInfo} objects for all active entries - * - * @return list of {@link TaskInfo} - */ - List> getActiveTaskInfo(@Nullable String dataSource); + default List> getTaskInfos( + TaskLookup taskLookup, + @Nullable String datasource + ) + { + return getTaskInfos(Collections.singletonMap(taskLookup.getType(), taskLookup), datasource); + } /** * Add a lock to the given entry diff --git a/core/src/main/java/org/apache/druid/metadata/TaskLookup.java b/core/src/main/java/org/apache/druid/metadata/TaskLookup.java new file mode 100644 index 00000000000..cf81f78db61 --- /dev/null +++ b/core/src/main/java/org/apache/druid/metadata/TaskLookup.java @@ -0,0 +1,186 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.metadata; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import org.apache.druid.java.util.common.DateTimes; +import org.joda.time.DateTime; +import org.joda.time.Duration; + +import javax.annotation.Nullable; +import java.util.Objects; + +/** + * Lookup types and parameters for task lookups in the metadata store. + */ +public interface TaskLookup +{ + /** + * Task state in the metadata store. + * Complete tasks are the tasks that have been either succeeded or failed. + * Active tasks are the tasks that are not complete tasks. + */ + enum TaskLookupType + { + ACTIVE, + COMPLETE + } + + TaskLookupType getType(); + + /** + * Task lookup for complete tasks. It includes optional filters for task lookups. + * When the filters are given, the task lookup returns only the tasks that satisfy all filters. + */ + class CompleteTaskLookup implements TaskLookup + { + /** + * Limits the number of taskStatuses to return. + */ + @Nullable + private final Integer maxTaskStatuses; + + /** + * Returns only the tasks created prior to the given timestamp. + */ + @Nullable + private final DateTime tasksCreatedPriorTo; + + public static CompleteTaskLookup of( + @Nullable Integer maxTaskStatuses, + @Nullable Duration durationBeforeNow + ) + { + return new CompleteTaskLookup( + maxTaskStatuses, + durationBeforeNow == null ? null : computeTimestampPriorToNow(durationBeforeNow) + ); + } + + @VisibleForTesting + public static CompleteTaskLookup withTasksCreatedPriorTo( + @Nullable Integer maxTaskStatuses, + @Nullable DateTime tasksCreatedPriorTo + ) + { + return new CompleteTaskLookup(maxTaskStatuses, tasksCreatedPriorTo); + } + + private CompleteTaskLookup( + @Nullable Integer maxTaskStatuses, + @Nullable DateTime tasksCreatedPriorTo + ) + { + this.maxTaskStatuses = maxTaskStatuses; + this.tasksCreatedPriorTo = tasksCreatedPriorTo; + } + + public boolean hasTaskCreatedTimeFilter() + { + return tasksCreatedPriorTo != null; + } + + public CompleteTaskLookup withDurationBeforeNow(Duration durationBeforeNow) + { + return CompleteTaskLookup.of( + maxTaskStatuses, + Preconditions.checkNotNull(durationBeforeNow, "durationBeforeNow") + ); + } + + private static DateTime computeTimestampPriorToNow(Duration durationBeforeNow) + { + return DateTimes + .nowUtc() + .minus(durationBeforeNow); + } + + public DateTime getTasksCreatedPriorTo() + { + assert tasksCreatedPriorTo != null; + return tasksCreatedPriorTo; + } + + @Nullable + public Integer getMaxTaskStatuses() + { + return maxTaskStatuses; + } + + @Override + public TaskLookupType getType() + { + return TaskLookupType.COMPLETE; + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + CompleteTaskLookup that = (CompleteTaskLookup) o; + return Objects.equals(maxTaskStatuses, that.maxTaskStatuses) + && Objects.equals(tasksCreatedPriorTo, that.tasksCreatedPriorTo); + } + + @Override + public int hashCode() + { + return Objects.hash(maxTaskStatuses, tasksCreatedPriorTo); + } + } + + class ActiveTaskLookup implements TaskLookup + { + private static final ActiveTaskLookup INSTANCE = new ActiveTaskLookup(); + + public static ActiveTaskLookup getInstance() + { + return INSTANCE; + } + + private ActiveTaskLookup() + { + } + + @Override + public TaskLookupType getType() + { + return TaskLookupType.ACTIVE; + } + + @Override + public int hashCode() + { + return 0; + } + + @Override + public boolean equals(Object obj) + { + return obj instanceof ActiveTaskLookup; + } + } +} diff --git a/core/src/test/java/org/apache/druid/metadata/TaskLookupTest.java b/core/src/test/java/org/apache/druid/metadata/TaskLookupTest.java new file mode 100644 index 00000000000..2a2c5526926 --- /dev/null +++ b/core/src/test/java/org/apache/druid/metadata/TaskLookupTest.java @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.metadata; + +import nl.jqno.equalsverifier.EqualsVerifier; +import org.apache.druid.java.util.common.DateTimes; +import org.apache.druid.metadata.TaskLookup.ActiveTaskLookup; +import org.apache.druid.metadata.TaskLookup.CompleteTaskLookup; +import org.apache.druid.metadata.TaskLookup.TaskLookupType; +import org.joda.time.DateTime; +import org.joda.time.Duration; +import org.joda.time.Period; +import org.junit.Assert; +import org.junit.Test; +import org.junit.experimental.runners.Enclosed; +import org.junit.runner.RunWith; + +@RunWith(Enclosed.class) +public class TaskLookupTest +{ + public static class CompleteTaskLookupTest + { + @Test + public void testEquals() + { + EqualsVerifier.forClass(CompleteTaskLookup.class).usingGetClass().verify(); + } + + @Test + public void testGetType() + { + Assert.assertEquals(TaskLookupType.COMPLETE, CompleteTaskLookup.of(null, null).getType()); + } + + @Test + public void testNullParams() + { + final CompleteTaskLookup lookup = CompleteTaskLookup.of(null, null); + Assert.assertNull(lookup.getMaxTaskStatuses()); + Assert.assertFalse(lookup.hasTaskCreatedTimeFilter()); + Assert.assertThrows(AssertionError.class, lookup::getTasksCreatedPriorTo); + } + + @Test + public void testWithDurationBeforeNow() + { + final Duration duration = new Period("P1D").toStandardDuration(); + final DateTime timestampBeforeLookupCreated = DateTimes.nowUtc().minus(duration); + final CompleteTaskLookup lookup = CompleteTaskLookup + .of(null, null) + .withDurationBeforeNow(duration); + Assert.assertNull(lookup.getMaxTaskStatuses()); + Assert.assertTrue( + timestampBeforeLookupCreated.isEqual(lookup.getTasksCreatedPriorTo()) + || timestampBeforeLookupCreated.isBefore(lookup.getTasksCreatedPriorTo()) + ); + } + + @Test + public void testNonNullParams() + { + final Duration duration = new Period("P1D").toStandardDuration(); + final DateTime timestampBeforeLookupCreated = DateTimes.nowUtc().minus(duration); + final CompleteTaskLookup lookup = CompleteTaskLookup.of(3, duration); + Assert.assertNotNull(lookup.getMaxTaskStatuses()); + Assert.assertEquals(3, lookup.getMaxTaskStatuses().intValue()); + Assert.assertTrue(lookup.hasTaskCreatedTimeFilter()); + Assert.assertTrue( + timestampBeforeLookupCreated.isEqual(lookup.getTasksCreatedPriorTo()) + || timestampBeforeLookupCreated.isBefore(lookup.getTasksCreatedPriorTo()) + ); + } + } + + public static class ActiveTaskLookupTest + { + @Test + public void testSingleton() + { + final ActiveTaskLookup lookup1 = ActiveTaskLookup.getInstance(); + final ActiveTaskLookup lookup2 = ActiveTaskLookup.getInstance(); + Assert.assertEquals(lookup1, lookup2); + Assert.assertSame(lookup1, lookup2); + } + + @Test + public void testGetType() + { + Assert.assertEquals(TaskLookupType.ACTIVE, ActiveTaskLookup.getInstance().getType()); + } + } +} diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/HeapMemoryTaskStorage.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/HeapMemoryTaskStorage.java index d949faa7638..de9ebb72812 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/HeapMemoryTaskStorage.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/HeapMemoryTaskStorage.java @@ -37,15 +37,19 @@ import org.apache.druid.indexing.common.task.Task; import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.metadata.EntryExistsException; +import org.apache.druid.metadata.TaskLookup; +import org.apache.druid.metadata.TaskLookup.CompleteTaskLookup; +import org.apache.druid.metadata.TaskLookup.TaskLookupType; import org.joda.time.DateTime; -import org.joda.time.Duration; import javax.annotation.Nullable; +import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.stream.Collectors; +import java.util.stream.Stream; /** * Implements an in-heap TaskStorage facility, with no persistence across restarts. This class @@ -166,23 +170,21 @@ public class HeapMemoryTaskStorage implements TaskStorage return listBuilder.build(); } - @Override public List> getActiveTaskInfo(@Nullable String dataSource) { final ImmutableList.Builder> listBuilder = ImmutableList.builder(); for (final TaskStuff taskStuff : tasks.values()) { if (taskStuff.getStatus().isRunnable()) { - TaskInfo t = TaskStuff.toTaskInfo(taskStuff); - listBuilder.add(t); + if (dataSource == null || dataSource.equals(taskStuff.getDataSource())) { + listBuilder.add(TaskStuff.toTaskInfo(taskStuff)); + } } } return listBuilder.build(); } - @Override public List> getRecentlyCreatedAlreadyFinishedTaskInfo( - @Nullable Integer maxTaskStatuses, - @Nullable Duration durationBeforeNow, + CompleteTaskLookup taskLookup, @Nullable String datasource ) { @@ -195,39 +197,57 @@ public class HeapMemoryTaskStorage implements TaskStorage } }.reverse(); - return maxTaskStatuses == null ? - getRecentlyCreatedAlreadyFinishedTaskInfoSince( - DateTimes.nowUtc() - .minus(durationBeforeNow == null ? config.getRecentlyFinishedThreshold() : durationBeforeNow), - createdDateDesc - ) : - getNRecentlyCreatedAlreadyFinishedTaskInfo(maxTaskStatuses, createdDateDesc); + return getRecentlyCreatedAlreadyFinishedTaskInfoSince( + taskLookup.getTasksCreatedPriorTo(), + taskLookup.getMaxTaskStatuses(), + createdDateDesc + ); + } + + /** + * NOTE: This method is racy as it searches for complete tasks and active tasks separately outside a lock. + * This method should be used only for testing. + */ + @Override + public List> getTaskInfos( + Map taskLookups, + @Nullable String datasource + ) + { + final List> tasks = new ArrayList<>(); + taskLookups.forEach((type, lookup) -> { + if (type == TaskLookupType.COMPLETE) { + CompleteTaskLookup completeTaskLookup = (CompleteTaskLookup) lookup; + tasks.addAll( + getRecentlyCreatedAlreadyFinishedTaskInfo( + completeTaskLookup.hasTaskCreatedTimeFilter() + ? completeTaskLookup + : completeTaskLookup.withDurationBeforeNow(config.getRecentlyFinishedThreshold()), + datasource + ) + ); + } else { + tasks.addAll(getActiveTaskInfo(datasource)); + } + }); + return tasks; } private List> getRecentlyCreatedAlreadyFinishedTaskInfoSince( DateTime start, + @Nullable Integer n, Ordering createdDateDesc ) { - List> list = tasks.values() + Stream stream = tasks + .values() .stream() .filter(taskStuff -> taskStuff.getStatus().isComplete() && taskStuff.getCreatedDate().isAfter(start)) - .sorted(createdDateDesc) - .map(TaskStuff::toTaskInfo) - .collect(Collectors.toList()); - return Collections.unmodifiableList(list); - } - - private List> getNRecentlyCreatedAlreadyFinishedTaskInfo( - int n, - Ordering createdDateDesc - ) - { - List> list = tasks.values() - .stream() - .filter(taskStuff -> taskStuff.getStatus().isComplete()) - .sorted(createdDateDesc) - .limit(n) + .sorted(createdDateDesc); + if (n != null) { + stream = stream.limit(n); + } + List> list = stream .map(TaskStuff::toTaskInfo) .collect(Collectors.toList()); return Collections.unmodifiableList(list); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/MetadataTaskStorage.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/MetadataTaskStorage.java index 08c1343f80b..86597d5cfd2 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/MetadataTaskStorage.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/MetadataTaskStorage.java @@ -25,6 +25,7 @@ import com.google.common.base.Optional; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterables; +import com.google.common.collect.Maps; import com.google.inject.Inject; import org.apache.druid.indexer.TaskInfo; import org.apache.druid.indexer.TaskStatus; @@ -43,11 +44,16 @@ import org.apache.druid.metadata.MetadataStorageActionHandlerFactory; import org.apache.druid.metadata.MetadataStorageActionHandlerTypes; import org.apache.druid.metadata.MetadataStorageConnector; import org.apache.druid.metadata.MetadataStorageTablesConfig; -import org.joda.time.Duration; +import org.apache.druid.metadata.TaskLookup; +import org.apache.druid.metadata.TaskLookup.ActiveTaskLookup; +import org.apache.druid.metadata.TaskLookup.CompleteTaskLookup; +import org.apache.druid.metadata.TaskLookup.TaskLookupType; import javax.annotation.Nullable; +import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.Map.Entry; import java.util.stream.Collectors; public class MetadataTaskStorage implements TaskStorage @@ -191,7 +197,7 @@ public class MetadataTaskStorage implements TaskStorage { // filter out taskInfo with a null 'task' which should only happen in practice if we are missing a jackson module // and don't know what to do with the payload, so we won't be able to make use of it anyway - return handler.getActiveTaskInfo(null) + return handler.getTaskInfos(Collections.singletonMap(TaskLookupType.ACTIVE, ActiveTaskLookup.getInstance()), null) .stream() .filter(taskInfo -> taskInfo.getStatus().isRunnable() && taskInfo.getTask() != null) .map(TaskInfo::getTask) @@ -201,7 +207,10 @@ public class MetadataTaskStorage implements TaskStorage @Override public List getActiveTasksByDatasource(String datasource) { - List> activeTaskInfos = handler.getActiveTaskInfo(datasource); + List> activeTaskInfos = handler.getTaskInfos( + Collections.singletonMap(TaskLookupType.ACTIVE, ActiveTaskLookup.getInstance()), + datasource + ); ImmutableList.Builder tasksBuilder = ImmutableList.builder(); for (TaskInfo taskInfo : activeTaskInfos) { if (taskInfo.getStatus().isRunnable() && taskInfo.getTask() != null) { @@ -212,28 +221,26 @@ public class MetadataTaskStorage implements TaskStorage } @Override - public List> getActiveTaskInfo(@Nullable String dataSource) - { - return ImmutableList.copyOf( - handler.getActiveTaskInfo(dataSource) - ); - } - - @Override - public List> getRecentlyCreatedAlreadyFinishedTaskInfo( - @Nullable Integer maxTaskStatuses, - @Nullable Duration durationBeforeNow, + public List> getTaskInfos( + Map taskLookups, @Nullable String datasource ) { - return ImmutableList.copyOf( - handler.getCompletedTaskInfo( - DateTimes.nowUtc() - .minus(durationBeforeNow == null ? config.getRecentlyFinishedThreshold() : durationBeforeNow), - maxTaskStatuses, - datasource - ) - ); + Map theTaskLookups = Maps.newHashMapWithExpectedSize(taskLookups.size()); + for (Entry entry : taskLookups.entrySet()) { + if (entry.getKey() == TaskLookupType.COMPLETE) { + CompleteTaskLookup completeTaskLookup = (CompleteTaskLookup) entry.getValue(); + theTaskLookups.put( + entry.getKey(), + completeTaskLookup.hasTaskCreatedTimeFilter() + ? completeTaskLookup + : completeTaskLookup.withDurationBeforeNow(config.getRecentlyFinishedThreshold()) + ); + } else { + theTaskLookups.put(entry.getKey(), entry.getValue()); + } + } + return Collections.unmodifiableList(handler.getTaskInfos(theTaskLookups, datasource)); } @Override diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskStorage.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskStorage.java index 16810aa2529..cf858ebcc30 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskStorage.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskStorage.java @@ -26,10 +26,13 @@ import org.apache.druid.indexing.common.TaskLock; import org.apache.druid.indexing.common.actions.TaskAction; import org.apache.druid.indexing.common.task.Task; import org.apache.druid.metadata.EntryExistsException; -import org.joda.time.Duration; +import org.apache.druid.metadata.TaskLookup; +import org.apache.druid.metadata.TaskLookup.TaskLookupType; import javax.annotation.Nullable; +import java.util.Collections; import java.util.List; +import java.util.Map; public interface TaskStorage { @@ -148,31 +151,28 @@ public interface TaskStorage List getActiveTasksByDatasource(String datasource); /** - * Returns a list of currently running or pending tasks as stored in the storage facility as {@link TaskInfo}. No + * Returns a list of tasks stored in the storage facility as {@link TaskInfo}. No * particular order is guaranteed, but implementations are encouraged to return tasks in ascending order of creation. * - * @return list of {@link TaskInfo} - */ - List> getActiveTaskInfo(@Nullable String dataSource); - - /** - * Returns up to {@code maxTaskStatuses} {@link TaskInfo} objects of recently finished tasks as stored in the storage - * facility. No particular order is guaranteed, but implementations are encouraged to return tasks in descending order - * of creation. No particular standard of "recent" is guaranteed, and in fact, this method is permitted to simply - * return nothing. + * The returned list can contain active tasks and complete tasks depending on the {@code taskLookups} parameter. + * See {@link TaskLookup} for more details of active and complete tasks. * - * @param maxTaskStatuses maxTaskStatuses - * @param durationBeforeNow duration - * @param datasource datasource - * - * @return list of {@link TaskInfo} + * @param taskLookups lookup types and filters + * @param datasource datasource filter */ - List> getRecentlyCreatedAlreadyFinishedTaskInfo( - @Nullable Integer maxTaskStatuses, - @Nullable Duration durationBeforeNow, + List> getTaskInfos( + Map taskLookups, @Nullable String datasource ); + default List> getTaskInfos( + TaskLookup taskLookup, + @Nullable String datasource + ) + { + return getTaskInfos(Collections.singletonMap(taskLookup.getType(), taskLookup), datasource); + } + /** * Returns a list of locks for a particular task. * diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskStorageQueryAdapter.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskStorageQueryAdapter.java index 4ccd9251b43..f4338c90967 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskStorageQueryAdapter.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskStorageQueryAdapter.java @@ -27,6 +27,10 @@ import org.apache.druid.indexing.common.actions.SegmentInsertAction; import org.apache.druid.indexing.common.actions.SegmentTransactionalInsertAction; import org.apache.druid.indexing.common.actions.TaskAction; import org.apache.druid.indexing.common.task.Task; +import org.apache.druid.metadata.TaskLookup; +import org.apache.druid.metadata.TaskLookup.ActiveTaskLookup; +import org.apache.druid.metadata.TaskLookup.CompleteTaskLookup; +import org.apache.druid.metadata.TaskLookup.TaskLookupType; import org.apache.druid.timeline.DataSegment; import org.joda.time.Duration; import org.joda.time.Interval; @@ -75,7 +79,10 @@ public class TaskStorageQueryAdapter public List> getActiveTaskInfo(@Nullable String dataSource) { - return storage.getActiveTaskInfo(dataSource); + return storage.getTaskInfos( + ActiveTaskLookup.getInstance(), + dataSource + ); } public List> getCompletedTaskInfoByCreatedTimeDuration( @@ -84,7 +91,15 @@ public class TaskStorageQueryAdapter @Nullable String dataSource ) { - return storage.getRecentlyCreatedAlreadyFinishedTaskInfo(maxTaskStatuses, duration, dataSource); + return storage.getTaskInfos(CompleteTaskLookup.of(maxTaskStatuses, duration), dataSource); + } + + public List> getTaskInfos( + Map taskLookups, + @Nullable String dataSource + ) + { + return storage.getTaskInfos(taskLookups, dataSource); } public Optional getTask(final String taskid) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java index 32024fdf398..638fbf2df4e 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java @@ -26,8 +26,6 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.io.ByteSource; -import com.google.common.util.concurrent.ListenableFuture; -import com.google.common.util.concurrent.SettableFuture; import com.google.inject.Inject; import com.sun.jersey.spi.container.ResourceFilters; import org.apache.druid.audit.AuditEntry; @@ -39,7 +37,6 @@ import org.apache.druid.common.config.JacksonConfigManager; import org.apache.druid.indexer.RunnerTaskState; import org.apache.druid.indexer.TaskInfo; import org.apache.druid.indexer.TaskLocation; -import org.apache.druid.indexer.TaskState; import org.apache.druid.indexer.TaskStatus; import org.apache.druid.indexer.TaskStatusPlus; import org.apache.druid.indexing.common.actions.TaskActionClient; @@ -60,10 +57,15 @@ import org.apache.druid.indexing.overlord.http.security.TaskResourceFilter; import org.apache.druid.indexing.overlord.setup.DefaultWorkerBehaviorConfig; import org.apache.druid.indexing.overlord.setup.WorkerBehaviorConfig; import org.apache.druid.java.util.common.DateTimes; +import org.apache.druid.java.util.common.IAE; import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.metadata.EntryExistsException; +import org.apache.druid.metadata.TaskLookup; +import org.apache.druid.metadata.TaskLookup.ActiveTaskLookup; +import org.apache.druid.metadata.TaskLookup.CompleteTaskLookup; +import org.apache.druid.metadata.TaskLookup.TaskLookupType; import org.apache.druid.server.http.HttpMediaType; import org.apache.druid.server.http.security.ConfigResourceFilter; import org.apache.druid.server.http.security.DatasourceResourceFilter; @@ -78,7 +80,6 @@ import org.apache.druid.server.security.ResourceAction; import org.apache.druid.server.security.ResourceType; import org.apache.druid.tasklogs.TaskLogStreamer; import org.apache.druid.timeline.DataSegment; -import org.joda.time.DateTime; import org.joda.time.Duration; import org.joda.time.Interval; @@ -108,6 +109,7 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; +import java.util.stream.Stream; /** * @@ -130,6 +132,24 @@ public class OverlordResource private AtomicReference workerConfigRef = null; private static final List API_TASK_STATES = ImmutableList.of("pending", "waiting", "running", "complete"); + private enum TaskStateLookup + { + ALL, + WAITING, + PENDING, + RUNNING, + COMPLETE; + + private static TaskStateLookup fromString(@Nullable String state) + { + if (state == null) { + return ALL; + } else { + return TaskStateLookup.valueOf(StringUtils.toUpperCase(state)); + } + } + } + @Inject public OverlordResource( TaskMaster taskMaster, @@ -665,100 +685,229 @@ public class OverlordResource ); } } - List finalTaskList = new ArrayList<>(); - Function activeTaskTransformFunc = workItem -> new TaskStatusPlus( - workItem.getTaskId(), - workItem.getTaskGroupId(), - workItem.getTaskType(), - workItem.getCreatedTime(), - workItem.getQueueInsertionTime(), - workItem.getTaskState(), - workItem.getRunnerTaskState(), - null, - workItem.getLocation(), - workItem.getDataSource(), - null + + return asLeaderWith( + taskMaster.getTaskRunner(), + taskRunner -> { + final List authorizedList = securedTaskStatusPlus( + getTasks( + taskRunner, + TaskStateLookup.fromString(state), + dataSource, + createdTimeInterval, + maxCompletedTasks, + type + ), + dataSource, + req + ); + return Response.ok(authorizedList).build(); + } ); + } - Function, TaskStatusPlus> completeTaskTransformFunc = taskInfo -> new TaskStatusPlus( - taskInfo.getId(), - taskInfo.getTask() == null ? null : taskInfo.getTask().getGroupId(), - taskInfo.getTask() == null ? null : taskInfo.getTask().getType(), - taskInfo.getCreatedTime(), - // Would be nice to include the real queue insertion time, but the - // TaskStorage API doesn't yet allow it. - DateTimes.EPOCH, - taskInfo.getStatus().getStatusCode(), - RunnerTaskState.NONE, - taskInfo.getStatus().getDuration(), - taskInfo.getStatus().getLocation() == null ? TaskLocation.unknown() : taskInfo.getStatus().getLocation(), - taskInfo.getDataSource(), - taskInfo.getStatus().getErrorMsg() - ); - - //checking for complete tasks first to avoid querying active tasks if user only wants complete tasks - if (state == null || "complete".equals(StringUtils.toLowerCase(state))) { - Duration createdTimeDuration = null; - if (createdTimeInterval != null) { - final Interval theInterval = Intervals.of(StringUtils.replace(createdTimeInterval, "_", "/")); - createdTimeDuration = theInterval.toDuration(); - } - final List> taskInfoList = - taskStorageQueryAdapter.getCompletedTaskInfoByCreatedTimeDuration(maxCompletedTasks, createdTimeDuration, dataSource); - final List completedTasks = taskInfoList.stream() - .map(completeTaskTransformFunc::apply) - .collect(Collectors.toList()); - finalTaskList.addAll(completedTasks); + private List getTasks( + TaskRunner taskRunner, + TaskStateLookup state, + @Nullable String dataSource, + @Nullable String createdTimeInterval, + @Nullable Integer maxCompletedTasks, + @Nullable String type + ) + { + final Duration createdTimeDuration; + if (createdTimeInterval != null) { + final Interval theInterval = Intervals.of(StringUtils.replace(createdTimeInterval, "_", "/")); + createdTimeDuration = theInterval.toDuration(); + } else { + createdTimeDuration = null; } - final List> allActiveTaskInfo; - final List allActiveTasks = new ArrayList<>(); - if (state == null || !"complete".equals(StringUtils.toLowerCase(state))) { - allActiveTaskInfo = taskStorageQueryAdapter.getActiveTaskInfo(dataSource); - for (final TaskInfo task : allActiveTaskInfo) { - allActiveTasks.add( - new AnyTask( - task.getId(), - task.getTask() == null ? null : task.getTask().getGroupId(), - task.getTask() == null ? null : task.getTask().getType(), - SettableFuture.create(), - task.getDataSource(), - null, - null, - task.getCreatedTime(), - DateTimes.EPOCH, - TaskLocation.unknown() - )); - } - } - if (state == null || "waiting".equals(StringUtils.toLowerCase(state))) { - final List waitingWorkItems = filterActiveTasks(RunnerTaskState.WAITING, allActiveTasks); - List transformedWaitingList = waitingWorkItems.stream() - .map(activeTaskTransformFunc::apply) - .collect(Collectors.toList()); - finalTaskList.addAll(transformedWaitingList); - } - if (state == null || "pending".equals(StringUtils.toLowerCase(state))) { - final List pendingWorkItems = filterActiveTasks(RunnerTaskState.PENDING, allActiveTasks); - List transformedPendingList = pendingWorkItems.stream() - .map(activeTaskTransformFunc::apply) - .collect(Collectors.toList()); - finalTaskList.addAll(transformedPendingList); - } - if (state == null || "running".equals(StringUtils.toLowerCase(state))) { - final List runningWorkItems = filterActiveTasks(RunnerTaskState.RUNNING, allActiveTasks); - List transformedRunningList = runningWorkItems.stream() - .map(activeTaskTransformFunc::apply) - .collect(Collectors.toList()); - finalTaskList.addAll(transformedRunningList); - } - final List authorizedList = securedTaskStatusPlus( - finalTaskList, + // Ideally, snapshotting in taskStorage and taskRunner should be done atomically, + // but there is no way to do it today. + // Instead, we first gets a snapshot from taskStorage and then one from taskRunner. + // This way, we can use the snapshot from taskStorage as the source of truth for the set of tasks to process + // and use the snapshot from taskRunner as a reference for potential task state updates happened + // after the first snapshotting. + Stream> taskInfoStreamFromTaskStorage = getTaskInfoStreamFromTaskStorage( + state, dataSource, - type, - req + createdTimeDuration, + maxCompletedTasks, + type ); - return Response.ok(authorizedList).build(); + final Map runnerWorkItems = getTaskRunnerWorkItems( + taskRunner, + state, + dataSource, + type + ); + + if (state == TaskStateLookup.PENDING || state == TaskStateLookup.RUNNING) { + // We are interested in only those tasks which are in taskRunner. + taskInfoStreamFromTaskStorage = taskInfoStreamFromTaskStorage + .filter(info -> runnerWorkItems.containsKey(info.getId())); + } + final List> taskInfoFromTaskStorage = taskInfoStreamFromTaskStorage + .collect(Collectors.toList()); + + // Separate complete and active tasks from taskStorage. + // Note that taskStorage can return only either complete tasks or active tasks per TaskLookupType. + final List> completeTaskInfoFromTaskStorage = new ArrayList<>(); + final List> activeTaskInfoFromTaskStorage = new ArrayList<>(); + for (TaskInfo info : taskInfoFromTaskStorage) { + if (info.getStatus().isComplete()) { + completeTaskInfoFromTaskStorage.add(info); + } else { + activeTaskInfoFromTaskStorage.add(info); + } + } + + final List statuses = new ArrayList<>(); + completeTaskInfoFromTaskStorage.forEach(taskInfo -> statuses.add( + new TaskStatusPlus( + taskInfo.getId(), + taskInfo.getTask() == null ? null : taskInfo.getTask().getGroupId(), + taskInfo.getTask() == null ? null : taskInfo.getTask().getType(), + taskInfo.getCreatedTime(), + DateTimes.EPOCH, + taskInfo.getStatus().getStatusCode(), + RunnerTaskState.NONE, + taskInfo.getStatus().getDuration(), + taskInfo.getStatus().getLocation(), + taskInfo.getDataSource(), + taskInfo.getStatus().getErrorMsg() + ) + )); + + activeTaskInfoFromTaskStorage.forEach(taskInfo -> { + final TaskRunnerWorkItem runnerWorkItem = runnerWorkItems.get(taskInfo.getId()); + if (runnerWorkItem == null) { + // a task is assumed to be a waiting task if it exists in taskStorage but not in taskRunner. + if (state == TaskStateLookup.WAITING || state == TaskStateLookup.ALL) { + statuses.add( + new TaskStatusPlus( + taskInfo.getId(), + taskInfo.getTask() == null ? null : taskInfo.getTask().getGroupId(), + taskInfo.getTask() == null ? null : taskInfo.getTask().getType(), + taskInfo.getCreatedTime(), + DateTimes.EPOCH, + taskInfo.getStatus().getStatusCode(), + RunnerTaskState.WAITING, + taskInfo.getStatus().getDuration(), + taskInfo.getStatus().getLocation(), + taskInfo.getDataSource(), + taskInfo.getStatus().getErrorMsg() + ) + ); + } + } else { + if (state == TaskStateLookup.PENDING || state == TaskStateLookup.RUNNING || state == TaskStateLookup.ALL) { + statuses.add( + new TaskStatusPlus( + taskInfo.getId(), + taskInfo.getTask() == null ? null : taskInfo.getTask().getGroupId(), + taskInfo.getTask() == null ? null : taskInfo.getTask().getType(), + runnerWorkItem.getCreatedTime(), + runnerWorkItem.getQueueInsertionTime(), + taskInfo.getStatus().getStatusCode(), + taskRunner.getRunnerTaskState(taskInfo.getId()), // this is racy for remoteTaskRunner + taskInfo.getStatus().getDuration(), + runnerWorkItem.getLocation(), // location in taskInfo is only updated after the task is done. + taskInfo.getDataSource(), + taskInfo.getStatus().getErrorMsg() + ) + ); + } + } + }); + + return statuses; + } + + private Stream> getTaskInfoStreamFromTaskStorage( + TaskStateLookup state, + @Nullable String dataSource, + Duration createdTimeDuration, + @Nullable Integer maxCompletedTasks, + @Nullable String type + ) + { + final Map taskLookups; + switch (state) { + case ALL: + taskLookups = ImmutableMap.of( + TaskLookupType.ACTIVE, + ActiveTaskLookup.getInstance(), + TaskLookupType.COMPLETE, + CompleteTaskLookup.of(maxCompletedTasks, createdTimeDuration) + ); + break; + case COMPLETE: + taskLookups = ImmutableMap.of( + TaskLookupType.COMPLETE, + CompleteTaskLookup.of(maxCompletedTasks, createdTimeDuration) + ); + break; + case WAITING: + case PENDING: + case RUNNING: + taskLookups = ImmutableMap.of( + TaskLookupType.ACTIVE, + ActiveTaskLookup.getInstance() + ); + break; + default: + throw new IAE("Unknown state: [%s]", state); + } + + final Stream> taskInfoStreamFromTaskStorage = taskStorageQueryAdapter.getTaskInfos( + taskLookups, + dataSource + ).stream(); + if (type != null) { + return taskInfoStreamFromTaskStorage.filter( + info -> type.equals(info.getTask() == null ? null : info.getTask().getType()) + ); + } else { + return taskInfoStreamFromTaskStorage; + } + } + + private Map getTaskRunnerWorkItems( + TaskRunner taskRunner, + TaskStateLookup state, + @Nullable String dataSource, + @Nullable String type + ) + { + Stream runnerWorkItemsStream; + switch (state) { + case ALL: + case WAITING: + // waiting tasks can be found by (all tasks in taskStorage - all tasks in taskRunner) + runnerWorkItemsStream = taskRunner.getKnownTasks().stream(); + break; + case PENDING: + runnerWorkItemsStream = taskRunner.getPendingTasks().stream(); + break; + case RUNNING: + runnerWorkItemsStream = taskRunner.getRunningTasks().stream(); + break; + case COMPLETE: + runnerWorkItemsStream = Stream.empty(); + break; + default: + throw new IAE("Unknown state: [%s]", state); + } + if (dataSource != null) { + runnerWorkItemsStream = runnerWorkItemsStream.filter(item -> dataSource.equals(item.getDataSource())); + } + if (type != null) { + runnerWorkItemsStream = runnerWorkItemsStream.filter(item -> type.equals(item.getTaskType())); + } + return runnerWorkItemsStream + .collect(Collectors.toMap(TaskRunnerWorkItem::getTaskId, item -> item)); } @DELETE @@ -944,106 +1093,9 @@ public class OverlordResource } } - private List filterActiveTasks( - RunnerTaskState state, - List allTasks - ) - { - //divide active tasks into 3 lists : running, pending, waiting - Optional taskRunnerOpt = taskMaster.getTaskRunner(); - if (!taskRunnerOpt.isPresent()) { - throw new WebApplicationException( - Response.serverError().entity("No task runner found").build() - ); - } - TaskRunner runner = taskRunnerOpt.get(); - // the order of tasks below is waiting, pending, running to prevent - // skipping a task, it's the order in which tasks will change state - // if they do while this is code is executing, so a task might be - // counted twice but never skipped - if (RunnerTaskState.WAITING.equals(state)) { - Collection runnersKnownTasks = runner.getKnownTasks(); - Set runnerKnownTaskIds = runnersKnownTasks - .stream() - .map(TaskRunnerWorkItem::getTaskId) - .collect(Collectors.toSet()); - final List waitingTasks = new ArrayList<>(); - for (TaskRunnerWorkItem task : allTasks) { - if (!runnerKnownTaskIds.contains(task.getTaskId())) { - waitingTasks.add(((AnyTask) task).withTaskState( - TaskState.RUNNING, - RunnerTaskState.WAITING, - task.getCreatedTime(), - task.getQueueInsertionTime(), - task.getLocation() - )); - } - } - return waitingTasks; - } - - if (RunnerTaskState.PENDING.equals(state)) { - Collection knownPendingTasks = runner.getPendingTasks(); - Set pendingTaskIds = knownPendingTasks - .stream() - .map(TaskRunnerWorkItem::getTaskId) - .collect(Collectors.toSet()); - Map workItemIdMap = knownPendingTasks - .stream() - .collect(Collectors.toMap( - TaskRunnerWorkItem::getTaskId, - java.util.function.Function.identity(), - (previousWorkItem, newWorkItem) -> newWorkItem - )); - final List pendingTasks = new ArrayList<>(); - for (TaskRunnerWorkItem task : allTasks) { - if (pendingTaskIds.contains(task.getTaskId())) { - pendingTasks.add(((AnyTask) task).withTaskState( - TaskState.RUNNING, - RunnerTaskState.PENDING, - workItemIdMap.get(task.getTaskId()).getCreatedTime(), - workItemIdMap.get(task.getTaskId()).getQueueInsertionTime(), - workItemIdMap.get(task.getTaskId()).getLocation() - )); - } - } - return pendingTasks; - } - - if (RunnerTaskState.RUNNING.equals(state)) { - Collection knownRunningTasks = runner.getRunningTasks(); - Set runningTaskIds = knownRunningTasks - .stream() - .map(TaskRunnerWorkItem::getTaskId) - .collect(Collectors.toSet()); - Map workItemIdMap = knownRunningTasks - .stream() - .collect(Collectors.toMap( - TaskRunnerWorkItem::getTaskId, - java.util.function.Function.identity(), - (previousWorkItem, newWorkItem) -> newWorkItem - )); - final List runningTasks = new ArrayList<>(); - for (TaskRunnerWorkItem task : allTasks) { - if (runningTaskIds.contains(task.getTaskId())) { - runningTasks.add(((AnyTask) task).withTaskState( - TaskState.RUNNING, - RunnerTaskState.RUNNING, - workItemIdMap.get(task.getTaskId()).getCreatedTime(), - workItemIdMap.get(task.getTaskId()).getQueueInsertionTime(), - workItemIdMap.get(task.getTaskId()).getLocation() - )); - } - } - return runningTasks; - } - return allTasks; - } - private List securedTaskStatusPlus( List collectionToFilter, @Nullable String dataSource, - @Nullable String type, HttpServletRequest req ) { @@ -1061,127 +1113,17 @@ public class OverlordResource new ResourceAction(new Resource(taskDatasource, ResourceType.DATASOURCE), Action.READ) ); }; - List optionalTypeFilteredList = collectionToFilter; - if (type != null) { - optionalTypeFilteredList = collectionToFilter - .stream() - .filter(task -> type.equals(task.getType())) - .collect(Collectors.toList()); - } if (dataSource != null) { //skip auth check here, as it's already done in getTasks - return optionalTypeFilteredList; + return collectionToFilter; } return Lists.newArrayList( AuthorizationUtils.filterAuthorizedResources( req, - optionalTypeFilteredList, + collectionToFilter, raGenerator, authorizerMapper ) ); } - - private static class AnyTask extends TaskRunnerWorkItem - { - private final String taskGroupId; - private final String taskType; - private final String dataSource; - private final TaskState taskState; - private final RunnerTaskState runnerTaskState; - private final DateTime createdTime; - private final DateTime queueInsertionTime; - private final TaskLocation taskLocation; - - AnyTask( - String taskId, - String taskGroupId, - String taskType, - ListenableFuture result, - String dataSource, - TaskState state, - RunnerTaskState runnerState, - DateTime createdTime, - DateTime queueInsertionTime, - TaskLocation taskLocation - ) - { - super(taskId, result, DateTimes.EPOCH, DateTimes.EPOCH); - this.taskGroupId = taskGroupId; - this.taskType = taskType; - this.dataSource = dataSource; - this.taskState = state; - this.runnerTaskState = runnerState; - this.createdTime = createdTime; - this.queueInsertionTime = queueInsertionTime; - this.taskLocation = taskLocation; - } - - @Override - public TaskLocation getLocation() - { - return taskLocation; - } - - @Override - public String getTaskType() - { - return taskType; - } - - @Override - public String getDataSource() - { - return dataSource; - } - - public String getTaskGroupId() - { - return taskGroupId; - } - - public TaskState getTaskState() - { - return taskState; - } - - public RunnerTaskState getRunnerTaskState() - { - return runnerTaskState; - } - - @Override - public DateTime getCreatedTime() - { - return createdTime; - } - - @Override - public DateTime getQueueInsertionTime() - { - return queueInsertionTime; - } - - public AnyTask withTaskState( - TaskState newTaskState, - RunnerTaskState runnerState, - DateTime createdTime, - DateTime queueInsertionTime, - TaskLocation taskLocation - ) - { - return new AnyTask( - getTaskId(), - getTaskGroupId(), - getTaskType(), - getResult(), - getDataSource(), - newTaskState, - runnerState, - createdTime, - queueInsertionTime, - taskLocation - ); - } - } } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordResourceTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordResourceTest.java index 638a33d829b..15a8619eb28 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordResourceTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordResourceTest.java @@ -25,7 +25,6 @@ import com.google.common.base.Optional; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; -import com.google.common.util.concurrent.ListenableFuture; import org.apache.druid.common.config.JacksonConfigManager; import org.apache.druid.indexer.RunnerTaskState; import org.apache.druid.indexer.TaskInfo; @@ -57,6 +56,9 @@ import org.apache.druid.indexing.worker.config.WorkerConfig; import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.RE; +import org.apache.druid.metadata.TaskLookup.ActiveTaskLookup; +import org.apache.druid.metadata.TaskLookup.CompleteTaskLookup; +import org.apache.druid.metadata.TaskLookup.TaskLookupType; import org.apache.druid.segment.TestHelper; import org.apache.druid.server.security.Access; import org.apache.druid.server.security.Action; @@ -228,33 +230,38 @@ public class OverlordResourceTest public void testSecuredGetWaitingTask() { expectAuthorizationTokenCheck(); - EasyMock.expect(taskStorageQueryAdapter.getActiveTaskInfo(null)).andStubReturn( + EasyMock.expect( + taskStorageQueryAdapter.getTaskInfos( + ImmutableMap.of(TaskLookupType.ACTIVE, ActiveTaskLookup.getInstance()), + null + ) + ).andStubReturn( ImmutableList.of( - new TaskInfo( + new TaskInfo<>( "id_1", DateTime.now(ISOChronology.getInstanceUTC()), - TaskStatus.success("id_1"), + TaskStatus.running("id_1"), "allow", getTaskWithIdAndDatasource("id_1", "allow") ), - new TaskInfo( + new TaskInfo<>( "id_2", DateTime.now(ISOChronology.getInstanceUTC()), - TaskStatus.success("id_2"), + TaskStatus.running("id_2"), "allow", getTaskWithIdAndDatasource("id_2", "allow") ), - new TaskInfo( + new TaskInfo<>( "id_3", DateTime.now(ISOChronology.getInstanceUTC()), - TaskStatus.success("id_3"), + TaskStatus.running("id_3"), "deny", getTaskWithIdAndDatasource("id_3", "deny") ), - new TaskInfo( + new TaskInfo<>( "id_4", DateTime.now(ISOChronology.getInstanceUTC()), - TaskStatus.success("id_4"), + TaskStatus.running("id_4"), "deny", getTaskWithIdAndDatasource("id_4", "deny") ) @@ -263,8 +270,8 @@ public class OverlordResourceTest EasyMock.>expect(taskRunner.getKnownTasks()).andReturn( ImmutableList.of( - new MockTaskRunnerWorkItem("id_1", null), - new MockTaskRunnerWorkItem("id_4", null) + new MockTaskRunnerWorkItem("id_1"), + new MockTaskRunnerWorkItem("id_4") ) ); @@ -288,30 +295,27 @@ public class OverlordResourceTest { expectAuthorizationTokenCheck(); List tasksIds = ImmutableList.of("id_1", "id_2", "id_3"); - EasyMock.>expect(taskRunner.getRunningTasks()).andReturn( - ImmutableList.of( - new MockTaskRunnerWorkItem(tasksIds.get(0), null), - new MockTaskRunnerWorkItem(tasksIds.get(1), null), - new MockTaskRunnerWorkItem(tasksIds.get(2), null) - )); - EasyMock.expect(taskStorageQueryAdapter.getCompletedTaskInfoByCreatedTimeDuration(null, null, null)).andStubReturn( + EasyMock.expect( + taskStorageQueryAdapter.getTaskInfos( + ImmutableMap.of(TaskLookupType.COMPLETE, CompleteTaskLookup.of(null, (Duration) null)), null) + ).andStubReturn( ImmutableList.of( - new TaskInfo( + new TaskInfo<>( "id_1", DateTime.now(ISOChronology.getInstanceUTC()), TaskStatus.success("id_1"), "deny", getTaskWithIdAndDatasource("id_1", "deny") ), - new TaskInfo( + new TaskInfo<>( "id_2", DateTime.now(ISOChronology.getInstanceUTC()), TaskStatus.success("id_2"), "allow", getTaskWithIdAndDatasource("id_2", "allow") ), - new TaskInfo( + new TaskInfo<>( "id_3", DateTime.now(ISOChronology.getInstanceUTC()), TaskStatus.success("id_3"), @@ -328,8 +332,6 @@ public class OverlordResourceTest req, workerTaskRunnerQueryAdapter ); - Assert.assertTrue(taskStorageQueryAdapter.getCompletedTaskInfoByCreatedTimeDuration(null, null, null).size() == 3); - Assert.assertTrue(taskRunner.getRunningTasks().size() == 3); List responseObjects = (List) overlordResource .getCompleteTasks(null, req).getEntity(); @@ -345,28 +347,35 @@ public class OverlordResourceTest List tasksIds = ImmutableList.of("id_1", "id_2"); EasyMock.>expect(taskRunner.getRunningTasks()).andReturn( ImmutableList.of( - new MockTaskRunnerWorkItem(tasksIds.get(0), null), - new MockTaskRunnerWorkItem(tasksIds.get(1), null) + new MockTaskRunnerWorkItem(tasksIds.get(0)), + new MockTaskRunnerWorkItem(tasksIds.get(1)) ) ); - EasyMock.expect(taskStorageQueryAdapter.getActiveTaskInfo(null)).andStubReturn( + EasyMock.expect( + taskStorageQueryAdapter.getTaskInfos( + ImmutableMap.of(TaskLookupType.ACTIVE, ActiveTaskLookup.getInstance()), + null + ) + ).andStubReturn( ImmutableList.of( - new TaskInfo( + new TaskInfo<>( "id_1", DateTime.now(ISOChronology.getInstanceUTC()), - TaskStatus.success("id_1"), + TaskStatus.running("id_1"), "deny", getTaskWithIdAndDatasource("id_1", "deny") ), - new TaskInfo( + new TaskInfo<>( "id_2", DateTime.now(ISOChronology.getInstanceUTC()), - TaskStatus.success("id_2"), + TaskStatus.running("id_2"), "allow", getTaskWithIdAndDatasource("id_2", "allow") ) ) ); + EasyMock.expect(taskRunner.getRunnerTaskState("id_1")).andStubReturn(RunnerTaskState.RUNNING); + EasyMock.expect(taskRunner.getRunnerTaskState("id_2")).andStubReturn(RunnerTaskState.RUNNING); EasyMock.replay( taskRunner, @@ -388,24 +397,54 @@ public class OverlordResourceTest public void testGetTasks() { expectAuthorizationTokenCheck(); - //completed tasks - EasyMock.expect(taskStorageQueryAdapter.getCompletedTaskInfoByCreatedTimeDuration(null, null, null)).andStubReturn( + EasyMock.expect( + taskStorageQueryAdapter.getTaskInfos( + ImmutableMap.of( + TaskLookupType.ACTIVE, + ActiveTaskLookup.getInstance(), + TaskLookupType.COMPLETE, + CompleteTaskLookup.of(null, null) + ), + null + ) + ).andStubReturn( ImmutableList.of( - new TaskInfo( + new TaskInfo<>( "id_5", DateTime.now(ISOChronology.getInstanceUTC()), TaskStatus.success("id_5"), "deny", getTaskWithIdAndDatasource("id_5", "deny") ), - new TaskInfo( + new TaskInfo<>( "id_6", DateTime.now(ISOChronology.getInstanceUTC()), TaskStatus.success("id_6"), "allow", getTaskWithIdAndDatasource("id_6", "allow") ), - new TaskInfo( + new TaskInfo<>( + "id_7", + DateTime.now(ISOChronology.getInstanceUTC()), + TaskStatus.success("id_7"), + "allow", + getTaskWithIdAndDatasource("id_7", "allow") + ), + new TaskInfo<>( + "id_5", + DateTime.now(ISOChronology.getInstanceUTC()), + TaskStatus.success("id_5"), + "deny", + getTaskWithIdAndDatasource("id_5", "deny") + ), + new TaskInfo<>( + "id_6", + DateTime.now(ISOChronology.getInstanceUTC()), + TaskStatus.success("id_6"), + "allow", + getTaskWithIdAndDatasource("id_6", "allow") + ), + new TaskInfo<>( "id_7", DateTime.now(ISOChronology.getInstanceUTC()), TaskStatus.success("id_7"), @@ -414,59 +453,14 @@ public class OverlordResourceTest ) ) ); - //active tasks - EasyMock.expect(taskStorageQueryAdapter.getActiveTaskInfo(null)).andStubReturn( - ImmutableList.of( - new TaskInfo( - "id_1", - DateTime.now(ISOChronology.getInstanceUTC()), - TaskStatus.success("id_1"), - "allow", - getTaskWithIdAndDatasource("id_1", "allow") - ), - new TaskInfo( - "id_2", - DateTime.now(ISOChronology.getInstanceUTC()), - TaskStatus.success("id_2"), - "allow", - getTaskWithIdAndDatasource("id_2", "allow") - ), - new TaskInfo( - "id_3", - DateTime.now(ISOChronology.getInstanceUTC()), - TaskStatus.success("id_3"), - "deny", - getTaskWithIdAndDatasource("id_3", "deny") - ), - new TaskInfo( - "id_4", - DateTime.now(ISOChronology.getInstanceUTC()), - TaskStatus.success("id_4"), - "deny", - getTaskWithIdAndDatasource("id_4", "deny") - ) - ) - ); EasyMock.>expect(taskRunner.getKnownTasks()).andReturn( ImmutableList.of( - new MockTaskRunnerWorkItem("id_1", null), - new MockTaskRunnerWorkItem("id_4", null) + new MockTaskRunnerWorkItem("id_1"), + new MockTaskRunnerWorkItem("id_4") ) ).atLeastOnce(); - EasyMock.>expect(taskRunner.getPendingTasks()).andReturn( - ImmutableList.of( - new MockTaskRunnerWorkItem("id_4", null) - ) - ); - - EasyMock.>expect(taskRunner.getRunningTasks()).andReturn( - ImmutableList.of( - new MockTaskRunnerWorkItem("id_1", null) - ) - ); - EasyMock.replay( taskRunner, taskMaster, @@ -486,82 +480,75 @@ public class OverlordResourceTest { expectAuthorizationTokenCheck(); //completed tasks - EasyMock.expect(taskStorageQueryAdapter.getCompletedTaskInfoByCreatedTimeDuration(null, null, "allow")) - .andStubReturn( + EasyMock.expect( + taskStorageQueryAdapter.getTaskInfos( + ImmutableMap.of( + TaskLookupType.COMPLETE, + CompleteTaskLookup.of(null, null), + TaskLookupType.ACTIVE, + ActiveTaskLookup.getInstance() + ), + "allow" + ) + ).andStubReturn( ImmutableList.of( - new TaskInfo( + new TaskInfo<>( "id_5", DateTime.now(ISOChronology.getInstanceUTC()), TaskStatus.success("id_5"), "allow", getTaskWithIdAndDatasource("id_5", "allow") ), - new TaskInfo( + new TaskInfo<>( "id_6", DateTime.now(ISOChronology.getInstanceUTC()), TaskStatus.success("id_6"), "allow", getTaskWithIdAndDatasource("id_6", "allow") ), - new TaskInfo( + new TaskInfo<>( "id_7", DateTime.now(ISOChronology.getInstanceUTC()), TaskStatus.success("id_7"), "allow", getTaskWithIdAndDatasource("id_7", "allow") + ), + new TaskInfo<>( + "id_1", + DateTime.now(ISOChronology.getInstanceUTC()), + TaskStatus.running("id_1"), + "allow", + getTaskWithIdAndDatasource("id_1", "allow") + ), + new TaskInfo<>( + "id_2", + DateTime.now(ISOChronology.getInstanceUTC()), + TaskStatus.success("id_2"), + "allow", + getTaskWithIdAndDatasource("id_2", "allow") + ), + new TaskInfo<>( + "id_3", + DateTime.now(ISOChronology.getInstanceUTC()), + TaskStatus.success("id_3"), + "allow", + getTaskWithIdAndDatasource("id_3", "allow") + ), + new TaskInfo<>( + "id_4", + DateTime.now(ISOChronology.getInstanceUTC()), + TaskStatus.success("id_4"), + "allow", + getTaskWithIdAndDatasource("id_4", "allow") ) ) - ); - //active tasks - EasyMock.expect(taskStorageQueryAdapter.getActiveTaskInfo("allow")).andStubReturn( - ImmutableList.of( - new TaskInfo( - "id_1", - DateTime.now(ISOChronology.getInstanceUTC()), - TaskStatus.success("id_1"), - "allow", - getTaskWithIdAndDatasource("id_1", "allow") - ), - new TaskInfo( - "id_2", - DateTime.now(ISOChronology.getInstanceUTC()), - TaskStatus.success("id_1"), - "allow", - getTaskWithIdAndDatasource("id_2", "allow") - ), - new TaskInfo( - "id_3", - DateTime.now(ISOChronology.getInstanceUTC()), - TaskStatus.success("id_1"), - "allow", - getTaskWithIdAndDatasource("id_3", "allow") - ), - new TaskInfo( - "id_4", - DateTime.now(ISOChronology.getInstanceUTC()), - TaskStatus.success("id_4"), - "allow", - getTaskWithIdAndDatasource("id_4", "allow") - ) - ) ); EasyMock.>expect(taskRunner.getKnownTasks()).andReturn( ImmutableList.of( - new MockTaskRunnerWorkItem("id_1", null), - new MockTaskRunnerWorkItem("id_4", null) + new MockTaskRunnerWorkItem("id_1"), + new MockTaskRunnerWorkItem("id_4") ) ).atLeastOnce(); - EasyMock.>expect(taskRunner.getPendingTasks()).andReturn( - ImmutableList.of( - new MockTaskRunnerWorkItem("id_4", null) - ) - ); - - EasyMock.>expect(taskRunner.getRunningTasks()).andReturn( - ImmutableList.of( - new MockTaskRunnerWorkItem("id_1", null) - ) - ); EasyMock.replay( taskRunner, taskMaster, @@ -576,7 +563,7 @@ public class OverlordResourceTest .getEntity(); Assert.assertEquals(7, responseObjects.size()); Assert.assertEquals("id_5", responseObjects.get(0).getId()); - Assert.assertTrue("DataSource Check", "allow".equals(responseObjects.get(0).getDataSource())); + Assert.assertEquals("DataSource Check", "allow", responseObjects.get(0).getDataSource()); } @Test @@ -584,33 +571,41 @@ public class OverlordResourceTest { expectAuthorizationTokenCheck(); //active tasks - EasyMock.expect(taskStorageQueryAdapter.getActiveTaskInfo(null)).andStubReturn( + EasyMock.expect( + taskStorageQueryAdapter.getTaskInfos( + ImmutableMap.of( + TaskLookupType.ACTIVE, + ActiveTaskLookup.getInstance() + ), + null + ) + ).andStubReturn( ImmutableList.of( - new TaskInfo( + new TaskInfo<>( "id_1", DateTime.now(ISOChronology.getInstanceUTC()), - TaskStatus.success("id_1"), + TaskStatus.running("id_1"), "allow", getTaskWithIdAndDatasource("id_1", "allow") ), - new TaskInfo( + new TaskInfo<>( "id_2", DateTime.now(ISOChronology.getInstanceUTC()), - TaskStatus.success("id_2"), + TaskStatus.running("id_2"), "allow", getTaskWithIdAndDatasource("id_2", "allow") ), - new TaskInfo( + new TaskInfo<>( "id_3", DateTime.now(ISOChronology.getInstanceUTC()), - TaskStatus.success("id_3"), + TaskStatus.running("id_3"), "deny", getTaskWithIdAndDatasource("id_3", "deny") ), - new TaskInfo( + new TaskInfo<>( "id_4", DateTime.now(ISOChronology.getInstanceUTC()), - TaskStatus.success("id_4"), + TaskStatus.running("id_4"), "deny", getTaskWithIdAndDatasource("id_4", "deny") ) @@ -619,8 +614,8 @@ public class OverlordResourceTest EasyMock.>expect(taskRunner.getKnownTasks()).andReturn( ImmutableList.of( - new MockTaskRunnerWorkItem("id_1", null), - new MockTaskRunnerWorkItem("id_4", null) + new MockTaskRunnerWorkItem("id_1"), + new MockTaskRunnerWorkItem("id_4") ) ); @@ -649,33 +644,41 @@ public class OverlordResourceTest public void testGetTasksFilterRunningState() { expectAuthorizationTokenCheck(); - EasyMock.expect(taskStorageQueryAdapter.getActiveTaskInfo("allow")).andStubReturn( + EasyMock.expect( + taskStorageQueryAdapter.getTaskInfos( + ImmutableMap.of( + TaskLookupType.ACTIVE, + ActiveTaskLookup.getInstance() + ), + "allow" + ) + ).andStubReturn( ImmutableList.of( - new TaskInfo( + new TaskInfo<>( "id_1", DateTime.now(ISOChronology.getInstanceUTC()), - TaskStatus.success("id_1"), + TaskStatus.running("id_1"), "allow", getTaskWithIdAndDatasource("id_1", "allow") ), - new TaskInfo( + new TaskInfo<>( "id_2", DateTime.now(ISOChronology.getInstanceUTC()), - TaskStatus.success("id_2"), + TaskStatus.running("id_2"), "allow", getTaskWithIdAndDatasource("id_2", "allow") ), - new TaskInfo( + new TaskInfo<>( "id_3", DateTime.now(ISOChronology.getInstanceUTC()), - TaskStatus.success("id_3"), + TaskStatus.running("id_3"), "allow", getTaskWithIdAndDatasource("id_3", "allow") ), - new TaskInfo( + new TaskInfo<>( "id_4", DateTime.now(ISOChronology.getInstanceUTC()), - TaskStatus.success("id_4"), + TaskStatus.running("id_4"), "deny", getTaskWithIdAndDatasource("id_4", "deny") ) @@ -685,11 +688,12 @@ public class OverlordResourceTest List tasksIds = ImmutableList.of("id_1", "id_2"); EasyMock.>expect(taskRunner.getRunningTasks()).andReturn( ImmutableList.of( - new MockTaskRunnerWorkItem(tasksIds.get(0), null), - new MockTaskRunnerWorkItem(tasksIds.get(1), null) + new MockTaskRunnerWorkItem(tasksIds.get(0), "allow", "test"), + new MockTaskRunnerWorkItem(tasksIds.get(1), "allow", "test") ) ); - + EasyMock.expect(taskRunner.getRunnerTaskState("id_1")).andReturn(RunnerTaskState.RUNNING); + EasyMock.expect(taskRunner.getRunnerTaskState("id_2")).andReturn(RunnerTaskState.RUNNING); EasyMock.replay( taskRunner, @@ -706,8 +710,7 @@ public class OverlordResourceTest Assert.assertEquals(2, responseObjects.size()); Assert.assertEquals(tasksIds.get(0), responseObjects.get(0).getId()); - String ds = responseObjects.get(0).getDataSource(); - Assert.assertTrue("DataSource Check", "allow".equals(responseObjects.get(0).getDataSource())); + Assert.assertEquals("DataSource Check", "allow", responseObjects.get(0).getDataSource()); } @Test @@ -718,43 +721,52 @@ public class OverlordResourceTest List tasksIds = ImmutableList.of("id_1", "id_2"); EasyMock.>expect(taskRunner.getPendingTasks()).andReturn( ImmutableList.of( - new MockTaskRunnerWorkItem(tasksIds.get(0), null), - new MockTaskRunnerWorkItem(tasksIds.get(1), null) + new MockTaskRunnerWorkItem(tasksIds.get(0)), + new MockTaskRunnerWorkItem(tasksIds.get(1)) ) ); - EasyMock.expect(taskStorageQueryAdapter.getActiveTaskInfo(null)).andStubReturn( + EasyMock.expect( + taskStorageQueryAdapter.getTaskInfos( + ImmutableMap.of(TaskLookupType.ACTIVE, ActiveTaskLookup.getInstance()), + null + ) + ).andStubReturn( ImmutableList.of( - new TaskInfo( + new TaskInfo<>( "id_1", DateTime.now(ISOChronology.getInstanceUTC()), - TaskStatus.success("id_1"), + TaskStatus.running("id_1"), "deny", getTaskWithIdAndDatasource("id_1", "deny") ), - new TaskInfo( + new TaskInfo<>( "id_2", DateTime.now(ISOChronology.getInstanceUTC()), - TaskStatus.success("id_2"), + TaskStatus.running("id_2"), "allow", getTaskWithIdAndDatasource("id_2", "allow") ), - new TaskInfo( + new TaskInfo<>( "id_3", DateTime.now(ISOChronology.getInstanceUTC()), - TaskStatus.success("id_3"), + TaskStatus.running("id_3"), "allow", getTaskWithIdAndDatasource("id_3", "allow") ), - new TaskInfo( + new TaskInfo<>( "id_4", DateTime.now(ISOChronology.getInstanceUTC()), - TaskStatus.success("id_4"), + TaskStatus.running("id_4"), "deny", getTaskWithIdAndDatasource("id_4", "deny") ) ) ); + EasyMock.expect(taskRunner.getRunnerTaskState("id_1")).andStubReturn(RunnerTaskState.PENDING); + EasyMock.expect(taskRunner.getRunnerTaskState("id_2")).andStubReturn(RunnerTaskState.PENDING); + EasyMock.expect(taskRunner.getRunnerTaskState("id_3")).andStubReturn(RunnerTaskState.RUNNING); + EasyMock.expect(taskRunner.getRunnerTaskState("id_4")).andStubReturn(RunnerTaskState.RUNNING); EasyMock.replay( taskRunner, @@ -771,31 +783,35 @@ public class OverlordResourceTest Assert.assertEquals(1, responseObjects.size()); Assert.assertEquals(tasksIds.get(1), responseObjects.get(0).getId()); - String ds = responseObjects.get(0).getDataSource(); - //Assert.assertTrue("DataSource Check", "ds_test".equals(responseObjects.get(0).getDataSource())); + Assert.assertEquals("DataSource Check", "allow", responseObjects.get(0).getDataSource()); } @Test public void testGetTasksFilterCompleteState() { expectAuthorizationTokenCheck(); - EasyMock.expect(taskStorageQueryAdapter.getCompletedTaskInfoByCreatedTimeDuration(null, null, null)).andStubReturn( + EasyMock.expect( + taskStorageQueryAdapter.getTaskInfos( + ImmutableMap.of(TaskLookupType.COMPLETE, CompleteTaskLookup.of(null, (Duration) null)), + null + ) + ).andStubReturn( ImmutableList.of( - new TaskInfo( + new TaskInfo<>( "id_1", DateTime.now(ISOChronology.getInstanceUTC()), TaskStatus.success("id_1"), "allow", getTaskWithIdAndDatasource("id_1", "allow") ), - new TaskInfo( + new TaskInfo<>( "id_2", DateTime.now(ISOChronology.getInstanceUTC()), TaskStatus.success("id_2"), "deny", getTaskWithIdAndDatasource("id_2", "deny") ), - new TaskInfo( + new TaskInfo<>( "id_3", DateTime.now(ISOChronology.getInstanceUTC()), TaskStatus.success("id_3"), @@ -824,26 +840,29 @@ public class OverlordResourceTest public void testGetTasksFilterCompleteStateWithInterval() { expectAuthorizationTokenCheck(); - List tasksIds = ImmutableList.of("id_1", "id_2", "id_3"); Duration duration = new Period("PT86400S").toStandardDuration(); - EasyMock.expect(taskStorageQueryAdapter.getCompletedTaskInfoByCreatedTimeDuration(null, duration, null)) - .andStubReturn( + EasyMock.expect( + taskStorageQueryAdapter.getTaskInfos( + EasyMock.anyObject(), + EasyMock.anyObject() + ) + ).andStubReturn( ImmutableList.of( - new TaskInfo( + new TaskInfo<>( "id_1", DateTime.now(ISOChronology.getInstanceUTC()), TaskStatus.success("id_1"), "deny", getTaskWithIdAndDatasource("id_1", "deny") ), - new TaskInfo( + new TaskInfo<>( "id_2", DateTime.now(ISOChronology.getInstanceUTC()), TaskStatus.success("id_2"), "allow", getTaskWithIdAndDatasource("id_2", "allow") ), - new TaskInfo( + new TaskInfo<>( "id_3", DateTime.now(ISOChronology.getInstanceUTC()), TaskStatus.success("id_3"), @@ -851,7 +870,7 @@ public class OverlordResourceTest getTaskWithIdAndDatasource("id_3", "allow") ) ) - ); + ); EasyMock.replay( taskRunner, @@ -878,38 +897,34 @@ public class OverlordResourceTest expectAuthorizationTokenCheck(Users.WIKI_READER); // Setup mocks to return completed, active, known, pending and running tasks - EasyMock.expect(taskStorageQueryAdapter.getCompletedTaskInfoByCreatedTimeDuration(null, null, null)).andStubReturn( + EasyMock.expect( + taskStorageQueryAdapter.getTaskInfos( + ImmutableMap.of( + TaskLookupType.COMPLETE, + CompleteTaskLookup.of(null, null), + TaskLookupType.ACTIVE, + ActiveTaskLookup.getInstance() + ), + null + ) + ).andStubReturn( ImmutableList.of( createTaskInfo("id_5", Datasources.WIKIPEDIA), - createTaskInfo("id_6", Datasources.BUZZFEED) - ) - ); - - EasyMock.expect(taskStorageQueryAdapter.getActiveTaskInfo(null)).andStubReturn( - ImmutableList.of( - createTaskInfo("id_1", Datasources.WIKIPEDIA), - createTaskInfo("id_2", Datasources.BUZZFEED) + createTaskInfo("id_6", Datasources.BUZZFEED), + createTaskInfo("id_1", Datasources.WIKIPEDIA, TaskState.RUNNING, "test"), + createTaskInfo("id_4", Datasources.BUZZFEED, TaskState.RUNNING, "test") ) ); EasyMock.>expect(taskRunner.getKnownTasks()).andReturn( ImmutableList.of( - new MockTaskRunnerWorkItem("id_1", null), - new MockTaskRunnerWorkItem("id_4", null) + new MockTaskRunnerWorkItem("id_1", Datasources.WIKIPEDIA, "test"), + new MockTaskRunnerWorkItem("id_4", Datasources.BUZZFEED, "test") ) ).atLeastOnce(); - EasyMock.>expect(taskRunner.getPendingTasks()).andReturn( - ImmutableList.of( - new MockTaskRunnerWorkItem("id_4", null) - ) - ); - - EasyMock.>expect(taskRunner.getRunningTasks()).andReturn( - ImmutableList.of( - new MockTaskRunnerWorkItem("id_1", null) - ) - ); + EasyMock.expect(taskRunner.getRunnerTaskState("id_4")).andReturn(RunnerTaskState.PENDING); + EasyMock.expect(taskRunner.getRunnerTaskState("id_1")).andReturn(RunnerTaskState.RUNNING); // Replay all mocks EasyMock.replay( @@ -932,27 +947,68 @@ public class OverlordResourceTest } @Test - public void testGetTasksFilterByDatasourceRequiresReadAccess() + public void testGetTasksFilterByTaskTypeRequiresDatasourceRead() { // Setup mocks for a user who has read access to "wikipedia" // and no access to "buzzfeed" expectAuthorizationTokenCheck(Users.WIKI_READER); // Setup mocks to return completed, active, known, pending and running tasks - EasyMock.expect(taskStorageQueryAdapter.getCompletedTaskInfoByCreatedTimeDuration(null, null, null)).andStubReturn( + EasyMock.expect( + taskStorageQueryAdapter.getTaskInfos( + ImmutableMap.of( + TaskLookupType.COMPLETE, + CompleteTaskLookup.of(null, null), + TaskLookupType.ACTIVE, + ActiveTaskLookup.getInstance() + ), + null + ) + ).andStubReturn( ImmutableList.of( createTaskInfo("id_5", Datasources.WIKIPEDIA), - createTaskInfo("id_6", Datasources.BUZZFEED) + createTaskInfo("id_6", Datasources.BUZZFEED), + createTaskInfo("id_1", Datasources.WIKIPEDIA, TaskState.RUNNING, "to-return"), + createTaskInfo("id_4", Datasources.WIKIPEDIA, TaskState.RUNNING, "test") ) ); - EasyMock.expect(taskStorageQueryAdapter.getActiveTaskInfo(null)).andStubReturn( + EasyMock.>expect(taskRunner.getKnownTasks()).andReturn( ImmutableList.of( - createTaskInfo("id_1", Datasources.WIKIPEDIA), - createTaskInfo("id_2", Datasources.BUZZFEED) + new MockTaskRunnerWorkItem("id_1", Datasources.WIKIPEDIA, "to-return"), + new MockTaskRunnerWorkItem("id_4", Datasources.WIKIPEDIA, "test") ) + ).atLeastOnce(); + + EasyMock.expect(taskRunner.getRunnerTaskState("id_1")).andReturn(RunnerTaskState.RUNNING); + + // Replay all mocks + EasyMock.replay( + taskRunner, + taskMaster, + taskStorageQueryAdapter, + indexerMetadataStorageAdapter, + req, + workerTaskRunnerQueryAdapter ); + // Verify that only the tasks of read access datasource are returned + List responseObjects = (List) overlordResource + .getTasks(null, null, null, null, "to-return", req) + .getEntity(); + Assert.assertEquals(1, responseObjects.size()); + for (TaskStatusPlus taskStatus : responseObjects) { + Assert.assertEquals("to-return", taskStatus.getType()); + } + } + + @Test + public void testGetTasksFilterByDatasourceRequiresReadAccess() + { + // Setup mocks for a user who has read access to "wikipedia" + // and no access to "buzzfeed" + expectAuthorizationTokenCheck(Users.WIKI_READER); + // Replay all mocks EasyMock.replay( taskRunner, @@ -972,23 +1028,31 @@ public class OverlordResourceTest public void testGetNullCompleteTask() { expectAuthorizationTokenCheck(); - EasyMock.expect(taskStorageQueryAdapter.getCompletedTaskInfoByCreatedTimeDuration(null, null, null)).andStubReturn( + EasyMock.expect( + taskStorageQueryAdapter.getTaskInfos( + ImmutableMap.of( + TaskLookupType.COMPLETE, + CompleteTaskLookup.of(null, null) + ), + null + ) + ).andStubReturn( ImmutableList.of( - new TaskInfo( + new TaskInfo<>( "id_1", DateTime.now(ISOChronology.getInstanceUTC()), TaskStatus.success("id_1"), "allow", null ), - new TaskInfo( + new TaskInfo<>( "id_2", DateTime.now(ISOChronology.getInstanceUTC()), TaskStatus.success("id_2"), "deny", getTaskWithIdAndDatasource("id_2", "deny") ), - new TaskInfo( + new TaskInfo<>( "id_3", DateTime.now(ISOChronology.getInstanceUTC()), TaskStatus.success("id_3"), @@ -1661,13 +1725,18 @@ public class OverlordResourceTest } private Task getTaskWithIdAndDatasource(String id, String datasource) + { + return getTaskWithIdAndDatasource(id, datasource, "test"); + } + + private Task getTaskWithIdAndDatasource(String id, String datasource, String taskType) { return new AbstractTask(id, datasource, null) { @Override public String getType() { - return "test"; + return taskType; } @Override @@ -1689,14 +1758,27 @@ public class OverlordResourceTest }; } - private TaskInfo createTaskInfo(String taskId, String datasource) + private TaskInfo createTaskInfo( + String taskId, + String datasource + ) + { + return createTaskInfo(taskId, datasource, TaskState.SUCCESS, "test"); + } + + private TaskInfo createTaskInfo( + String taskId, + String datasource, + TaskState state, + String taskType + ) { return new TaskInfo<>( taskId, DateTime.now(ISOChronology.getInstanceUTC()), - TaskStatus.success(taskId), + TaskStatus.fromCode(taskId, state), datasource, - getTaskWithIdAndDatasource(taskId, datasource) + getTaskWithIdAndDatasource(taskId, datasource, taskType) ); } @@ -1721,12 +1803,23 @@ public class OverlordResourceTest private static class MockTaskRunnerWorkItem extends TaskRunnerWorkItem { + private final String dataSource; + private final String type; + + public MockTaskRunnerWorkItem(String taskId) + { + this(taskId, "ds_test", "test"); + } + public MockTaskRunnerWorkItem( String taskId, - ListenableFuture result + String dataSource, + String type ) { - super(taskId, result); + super(taskId, null); + this.dataSource = dataSource; + this.type = type; } @Override @@ -1738,13 +1831,13 @@ public class OverlordResourceTest @Override public String getTaskType() { - return "test"; + return type; } @Override public String getDataSource() { - return "ds_test"; + return dataSource; } } diff --git a/integration-tests/src/main/java/org/apache/druid/testing/clients/OverlordResourceTestClient.java b/integration-tests/src/main/java/org/apache/druid/testing/clients/OverlordResourceTestClient.java index 271ac3d3e55..ea9481b3850 100644 --- a/integration-tests/src/main/java/org/apache/druid/testing/clients/OverlordResourceTestClient.java +++ b/integration-tests/src/main/java/org/apache/druid/testing/clients/OverlordResourceTestClient.java @@ -144,6 +144,11 @@ public class OverlordResourceTestClient } } + public List getAllTasks() + { + return getTasks("tasks"); + } + public List getRunningTasks() { return getTasks("runningTasks"); diff --git a/integration-tests/src/test/java/org/apache/druid/tests/query/ITOverlordResourceTest.java b/integration-tests/src/test/java/org/apache/druid/tests/api/ITOverlordResourceNotFoundTest.java similarity index 97% rename from integration-tests/src/test/java/org/apache/druid/tests/query/ITOverlordResourceTest.java rename to integration-tests/src/test/java/org/apache/druid/tests/api/ITOverlordResourceNotFoundTest.java index 5db3749fe8b..8959c7744fe 100644 --- a/integration-tests/src/test/java/org/apache/druid/tests/query/ITOverlordResourceTest.java +++ b/integration-tests/src/test/java/org/apache/druid/tests/api/ITOverlordResourceNotFoundTest.java @@ -17,7 +17,7 @@ * under the License. */ -package org.apache.druid.tests.query; +package org.apache.druid.tests.api; import com.google.inject.Inject; import org.apache.druid.java.util.common.ISE; @@ -32,7 +32,7 @@ import java.util.function.Consumer; @Test(groups = TestNGGroup.HTTP_ENDPOINT) @Guice(moduleFactory = DruidTestModuleFactory.class) -public class ITOverlordResourceTest +public class ITOverlordResourceNotFoundTest { @Inject protected OverlordResourceTestClient indexer; diff --git a/integration-tests/src/test/java/org/apache/druid/tests/api/ITOverlordResourceTest.java b/integration-tests/src/test/java/org/apache/druid/tests/api/ITOverlordResourceTest.java new file mode 100644 index 00000000000..eb59afee0f7 --- /dev/null +++ b/integration-tests/src/test/java/org/apache/druid/tests/api/ITOverlordResourceTest.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.tests.api; + +import com.google.inject.Inject; +import org.apache.druid.indexer.TaskState; +import org.apache.druid.java.util.common.ISE; +import org.apache.druid.testing.clients.OverlordResourceTestClient; +import org.apache.druid.testing.clients.TaskResponseObject; +import org.apache.druid.testing.guice.DruidTestModuleFactory; +import org.apache.druid.testing.utils.ITRetryUtil; +import org.apache.druid.tests.TestNGGroup; +import org.apache.druid.tests.indexer.AbstractIndexerTest; +import org.testng.annotations.Guice; +import org.testng.annotations.Test; + +import java.io.IOException; +import java.util.List; + +@Test(groups = TestNGGroup.HTTP_ENDPOINT) +@Guice(moduleFactory = DruidTestModuleFactory.class) +public class ITOverlordResourceTest +{ + private static final String INGESTION_SPEC = "/api/overlord-resource-test-task.json"; + + @Inject + protected OverlordResourceTestClient indexer; + + @Test + public void testGetAllTasks() throws IOException + { + final String taskSpec = AbstractIndexerTest.getResourceAsString(INGESTION_SPEC); + final String taskId = indexer.submitTask(taskSpec); + + ITRetryUtil.retryUntil( + () -> { + final List tasks = indexer.getAllTasks(); + final TaskResponseObject taskStatus = tasks + .stream() + .filter(task -> taskId.equals(task.getId())) + .findAny() + .orElseThrow(() -> new ISE("Cannot find task[%s]", taskId)); + TaskState status = taskStatus.getStatus(); + if (status == TaskState.FAILED) { + throw new ISE("Task[%s] FAILED", taskId); + } + return status == TaskState.SUCCESS; + }, + true, + ITRetryUtil.DEFAULT_RETRY_SLEEP, + ITRetryUtil.DEFAULT_RETRY_COUNT, + taskId + ); + } +} diff --git a/integration-tests/src/test/resources/api/overlord-resource-test-task.json b/integration-tests/src/test/resources/api/overlord-resource-test-task.json new file mode 100644 index 00000000000..a3f99ad6254 --- /dev/null +++ b/integration-tests/src/test/resources/api/overlord-resource-test-task.json @@ -0,0 +1,87 @@ +{ + "type": "index_parallel", + "spec": { + "dataSchema": { + "dataSource": "it-overlord-resource-test", + "timestampSpec": { + "column": "timestamp" + }, + "dimensionsSpec": { + "dimensions": [ + "page", + {"type": "string", "name": "language", "createBitmapIndex": false}, + "user", + "unpatrolled", + "newPage", + "robot", + "anonymous", + "namespace", + "continent", + "country", + "region", + "city" + ] + }, + "metricsSpec": [ + { + "type": "count", + "name": "count" + }, + { + "type": "doubleSum", + "name": "added", + "fieldName": "added" + }, + { + "type": "doubleSum", + "name": "deleted", + "fieldName": "deleted" + }, + { + "type": "doubleSum", + "name": "delta", + "fieldName": "delta" + }, + { + "name": "thetaSketch", + "type": "thetaSketch", + "fieldName": "user" + }, + { + "name": "quantilesDoublesSketch", + "type": "quantilesDoublesSketch", + "fieldName": "delta" + }, + { + "name": "HLLSketchBuild", + "type": "HLLSketchBuild", + "fieldName": "user" + } + ], + "granularitySpec": { + "segmentGranularity": "DAY", + "queryGranularity": "second", + "intervals" : [ "2013-08-31/2013-09-02" ] + } + }, + "ioConfig": { + "type": "index_parallel", + "inputSource": { + "type": "local", + "filter" : "*.json", + "baseDir": "/resources/data/batch_index/json" + }, + "inputFormat": { + "type": "json" + } + }, + "tuningConfig": { + "type": "index_parallel", + "maxNumConcurrentSubTasks": 1, + "splitHintSpec": { + "type": "maxSize", + "maxNumFiles": 1 + } + } + } +} \ No newline at end of file diff --git a/server/src/main/java/org/apache/druid/metadata/DerbyMetadataStorageActionHandler.java b/server/src/main/java/org/apache/druid/metadata/DerbyMetadataStorageActionHandler.java index bff95785f8f..fa39d506b4c 100644 --- a/server/src/main/java/org/apache/druid/metadata/DerbyMetadataStorageActionHandler.java +++ b/server/src/main/java/org/apache/druid/metadata/DerbyMetadataStorageActionHandler.java @@ -22,12 +22,6 @@ package org.apache.druid.metadata; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.annotations.VisibleForTesting; import org.apache.druid.java.util.common.StringUtils; -import org.joda.time.DateTime; -import org.skife.jdbi.v2.Handle; -import org.skife.jdbi.v2.Query; - -import javax.annotation.Nullable; -import java.util.Map; public class DerbyMetadataStorageActionHandler extends SQLMetadataStorageActionHandler @@ -47,49 +41,9 @@ public class DerbyMetadataStorageActionHandler> createCompletedTaskInfoQuery( - Handle handle, - DateTime timestamp, - @Nullable Integer maxNumStatuses, - @Nullable String dataSource - ) + protected String decorateSqlWithLimit(String sql) { - String sql = StringUtils.format( - "SELECT " - + " id, " - + " status_payload, " - + " created_date, " - + " datasource, " - + " payload " - + "FROM " - + " %s " - + "WHERE " - + getWhereClauseForInactiveStatusesSinceQuery(dataSource) - + "ORDER BY created_date DESC", - getEntryTable() - ); - - if (maxNumStatuses != null) { - sql += " FETCH FIRST :n ROWS ONLY"; - } - Query> query = handle.createQuery(sql).bind("start", timestamp.toString()); - - if (maxNumStatuses != null) { - query = query.bind("n", maxNumStatuses); - } - if (dataSource != null) { - query = query.bind("ds", dataSource); - } - return query; - } - - private String getWhereClauseForInactiveStatusesSinceQuery(@Nullable String datasource) - { - String sql = StringUtils.format("active = FALSE AND created_date >= :start "); - if (datasource != null) { - sql += " AND datasource = :ds "; - } - return sql; + return sql + " FETCH FIRST :n ROWS ONLY"; } @Deprecated diff --git a/server/src/main/java/org/apache/druid/metadata/MySQLMetadataStorageActionHandler.java b/server/src/main/java/org/apache/druid/metadata/MySQLMetadataStorageActionHandler.java index 9765eae73af..79c028202f4 100644 --- a/server/src/main/java/org/apache/druid/metadata/MySQLMetadataStorageActionHandler.java +++ b/server/src/main/java/org/apache/druid/metadata/MySQLMetadataStorageActionHandler.java @@ -20,13 +20,6 @@ package org.apache.druid.metadata; import com.fasterxml.jackson.databind.ObjectMapper; -import org.apache.druid.java.util.common.StringUtils; -import org.joda.time.DateTime; -import org.skife.jdbi.v2.Handle; -import org.skife.jdbi.v2.Query; - -import javax.annotation.Nullable; -import java.util.Map; public class MySQLMetadataStorageActionHandler extends SQLMetadataStorageActionHandler @@ -45,47 +38,8 @@ public class MySQLMetadataStorageActionHandler> createCompletedTaskInfoQuery( - Handle handle, - DateTime timestamp, - @Nullable Integer maxNumStatuses, - @Nullable String dataSource - ) + protected String decorateSqlWithLimit(String sql) { - String sql = StringUtils.format( - "SELECT " - + " id, " - + " status_payload, " - + " created_date, " - + " datasource, " - + " payload " - + "FROM " - + " %s " - + "WHERE " - + getWhereClauseForInactiveStatusesSinceQuery(dataSource) - + "ORDER BY created_date DESC", - getEntryTable() - ); - - if (maxNumStatuses != null) { - sql += " LIMIT :n"; - } - Query> query = handle.createQuery(sql).bind("start", timestamp.toString()); - - if (maxNumStatuses != null) { - query = query.bind("n", maxNumStatuses); - } - if (dataSource != null) { - query = query.bind("ds", dataSource); - } - return query; - } - private String getWhereClauseForInactiveStatusesSinceQuery(@Nullable String datasource) - { - String sql = StringUtils.format("active = FALSE AND created_date >= :start "); - if (datasource != null) { - sql += " AND datasource = :ds "; - } - return sql; + return sql + " LIMIT :n"; } } diff --git a/server/src/main/java/org/apache/druid/metadata/PostgreSQLMetadataStorageActionHandler.java b/server/src/main/java/org/apache/druid/metadata/PostgreSQLMetadataStorageActionHandler.java index e967fb8f09e..b91427bcd66 100644 --- a/server/src/main/java/org/apache/druid/metadata/PostgreSQLMetadataStorageActionHandler.java +++ b/server/src/main/java/org/apache/druid/metadata/PostgreSQLMetadataStorageActionHandler.java @@ -21,12 +21,6 @@ package org.apache.druid.metadata; import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.druid.java.util.common.StringUtils; -import org.joda.time.DateTime; -import org.skife.jdbi.v2.Handle; -import org.skife.jdbi.v2.Query; - -import javax.annotation.Nullable; -import java.util.Map; public class PostgreSQLMetadataStorageActionHandler extends SQLMetadataStorageActionHandler @@ -45,49 +39,9 @@ public class PostgreSQLMetadataStorageActionHandler> createCompletedTaskInfoQuery( - Handle handle, - DateTime timestamp, - @Nullable Integer maxNumStatuses, - @Nullable String dataSource - ) + protected String decorateSqlWithLimit(String sql) { - String sql = StringUtils.format( - "SELECT " - + " id, " - + " status_payload, " - + " created_date, " - + " datasource, " - + " payload " - + "FROM " - + " %s " - + "WHERE " - + getWhereClauseForInactiveStatusesSinceQuery(dataSource) - + "ORDER BY created_date DESC", - getEntryTable() - ); - - if (maxNumStatuses != null) { - sql += " LIMIT :n"; - } - - Query> query = handle.createQuery(sql).bind("start", timestamp.toString()); - - if (maxNumStatuses != null) { - query = query.bind("n", maxNumStatuses); - } - if (dataSource != null) { - query = query.bind("ds", dataSource); - } - return query; - } - private String getWhereClauseForInactiveStatusesSinceQuery(@Nullable String datasource) - { - String sql = StringUtils.format("active = FALSE AND created_date >= :start "); - if (datasource != null) { - sql += " AND datasource = :ds "; - } - return sql; + return sql + " LIMIT :n"; } @Deprecated diff --git a/server/src/main/java/org/apache/druid/metadata/SQLMetadataStorageActionHandler.java b/server/src/main/java/org/apache/druid/metadata/SQLMetadataStorageActionHandler.java index 7f30d0405bb..0b8837c640d 100644 --- a/server/src/main/java/org/apache/druid/metadata/SQLMetadataStorageActionHandler.java +++ b/server/src/main/java/org/apache/druid/metadata/SQLMetadataStorageActionHandler.java @@ -27,9 +27,12 @@ import com.google.common.base.Throwables; import com.google.common.collect.Maps; import org.apache.druid.indexer.TaskInfo; import org.apache.druid.java.util.common.DateTimes; +import org.apache.druid.java.util.common.IAE; import org.apache.druid.java.util.common.Pair; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.emitter.EmittingLogger; +import org.apache.druid.metadata.TaskLookup.CompleteTaskLookup; +import org.apache.druid.metadata.TaskLookup.TaskLookupType; import org.joda.time.DateTime; import org.skife.jdbi.v2.FoldController; import org.skife.jdbi.v2.Folder3; @@ -269,37 +272,90 @@ public abstract class SQLMetadataStorageActionHandler> getCompletedTaskInfo( + public List> getTaskInfos( + Map taskLookups, + @Nullable String dataSource + ) + { + return getConnector().retryTransaction( + (handle, status) -> { + final List> tasks = new ArrayList<>(); + for (Entry entry : taskLookups.entrySet()) { + final Query> query; + switch (entry.getKey()) { + case ACTIVE: + query = createActiveTaskInfoQuery( + handle, + dataSource + ); + tasks.addAll(query.map(taskInfoMapper).list()); + break; + case COMPLETE: + CompleteTaskLookup completeTaskLookup = (CompleteTaskLookup) entry.getValue(); + query = createCompletedTaskInfoQuery( + handle, + completeTaskLookup.getTasksCreatedPriorTo(), + completeTaskLookup.getMaxTaskStatuses(), + dataSource + ); + tasks.addAll(query.map(taskInfoMapper).list()); + break; + default: + throw new IAE("Unknown TaskLookupType: [%s]", entry.getKey()); + } + } + return tasks; + }, + 3, + SQLMetadataConnector.DEFAULT_MAX_TRIES + ); + } + + protected Query> createCompletedTaskInfoQuery( + Handle handle, DateTime timestamp, @Nullable Integer maxNumStatuses, @Nullable String dataSource ) { - return getConnector().retryWithHandle( - handle -> { - final Query> query = createCompletedTaskInfoQuery( - handle, - timestamp, - maxNumStatuses, - dataSource - ); - return query.map(taskInfoMapper).list(); - } + String sql = StringUtils.format( + "SELECT " + + " id, " + + " status_payload, " + + " created_date, " + + " datasource, " + + " payload " + + "FROM " + + " %s " + + "WHERE " + + getWhereClauseForInactiveStatusesSinceQuery(dataSource) + + "ORDER BY created_date DESC", + getEntryTable() ); + + if (maxNumStatuses != null) { + sql = decorateSqlWithLimit(sql); + } + Query> query = handle.createQuery(sql).bind("start", timestamp.toString()); + + if (maxNumStatuses != null) { + query = query.bind("n", maxNumStatuses); + } + if (dataSource != null) { + query = query.bind("ds", dataSource); + } + return query; } - @Override - public List> getActiveTaskInfo(@Nullable String dataSource) + protected abstract String decorateSqlWithLimit(String sql); + + private String getWhereClauseForInactiveStatusesSinceQuery(@Nullable String datasource) { - return getConnector().retryWithHandle( - handle -> { - final Query> query = createActiveTaskInfoQuery( - handle, - dataSource - ); - return query.map(taskInfoMapper).list(); - } - ); + String sql = StringUtils.format("active = FALSE AND created_date >= :start "); + if (datasource != null) { + sql += " AND datasource = :ds "; + } + return sql; } private Query> createActiveTaskInfoQuery(Handle handle, @Nullable String dataSource) @@ -380,13 +436,6 @@ public abstract class SQLMetadataStorageActionHandler> createCompletedTaskInfoQuery( - Handle handle, - DateTime timestamp, - @Nullable Integer maxNumStatuses, - @Nullable String dataSource - ); - @Override public boolean addLock(final String entryId, final LockType lock) { diff --git a/server/src/main/java/org/apache/druid/metadata/SQLServerMetadataStorageActionHandler.java b/server/src/main/java/org/apache/druid/metadata/SQLServerMetadataStorageActionHandler.java index 4845fc91643..2c4e93820d2 100644 --- a/server/src/main/java/org/apache/druid/metadata/SQLServerMetadataStorageActionHandler.java +++ b/server/src/main/java/org/apache/druid/metadata/SQLServerMetadataStorageActionHandler.java @@ -20,13 +20,6 @@ package org.apache.druid.metadata; import com.fasterxml.jackson.databind.ObjectMapper; -import org.apache.druid.java.util.common.StringUtils; -import org.joda.time.DateTime; -import org.skife.jdbi.v2.Handle; -import org.skife.jdbi.v2.Query; - -import javax.annotation.Nullable; -import java.util.Map; public class SQLServerMetadataStorageActionHandler extends SQLMetadataStorageActionHandler @@ -45,45 +38,8 @@ public class SQLServerMetadataStorageActionHandler> createCompletedTaskInfoQuery( - Handle handle, - DateTime timestamp, - @Nullable Integer maxNumStatuses, - @Nullable String dataSource - ) + protected String decorateSqlWithLimit(String sql) { - String sql = maxNumStatuses == null ? "SELECT " : "SELECT TOP (:n) "; - - sql += StringUtils.format( - " id, " - + " status_payload, " - + " created_date, " - + " datasource, " - + " payload " - + "FROM " - + " %s " - + "WHERE " - + getWhereClauseForInactiveStatusesSinceQuery(dataSource) - + "ORDER BY created_date DESC", - getEntryTable() - ); - - Query> query = handle.createQuery(sql).bind("start", timestamp.toString()); - - if (maxNumStatuses != null) { - query = query.bind("n", maxNumStatuses); - } - if (dataSource != null) { - query = query.bind("ds", dataSource); - } - return query; - } - private String getWhereClauseForInactiveStatusesSinceQuery(@Nullable String datasource) - { - String sql = StringUtils.format("active = FALSE AND created_date >= :start "); - if (datasource != null) { - sql += " AND datasource = :ds "; - } - return sql; + return "SELECT TOP (:n)" + sql.substring("SELECT".length()); } } diff --git a/server/src/test/java/org/apache/druid/metadata/SQLMetadataStorageActionHandlerTest.java b/server/src/test/java/org/apache/druid/metadata/SQLMetadataStorageActionHandlerTest.java index 11ccc19f707..2e85e5a5d49 100644 --- a/server/src/test/java/org/apache/druid/metadata/SQLMetadataStorageActionHandlerTest.java +++ b/server/src/test/java/org/apache/druid/metadata/SQLMetadataStorageActionHandlerTest.java @@ -31,6 +31,8 @@ import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.Pair; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.jackson.JacksonUtils; +import org.apache.druid.metadata.TaskLookup.ActiveTaskLookup; +import org.apache.druid.metadata.TaskLookup.CompleteTaskLookup; import org.junit.Assert; import org.junit.Before; import org.junit.Rule; @@ -135,7 +137,7 @@ public class SQLMetadataStorageActionHandlerTest Assert.assertEquals( ImmutableList.of(Pair.of(entry, status1)), - handler.getActiveTaskInfo(null).stream() + handler.getTaskInfos(ActiveTaskLookup.getInstance(), null).stream() .map(taskInfo -> Pair.of(taskInfo.getTask(), taskInfo.getStatus())) .collect(Collectors.toList()) ); @@ -144,14 +146,14 @@ public class SQLMetadataStorageActionHandlerTest Assert.assertEquals( ImmutableList.of(Pair.of(entry, status2)), - handler.getActiveTaskInfo(null).stream() + handler.getTaskInfos(ActiveTaskLookup.getInstance(), null).stream() .map(taskInfo -> Pair.of(taskInfo.getTask(), taskInfo.getStatus())) .collect(Collectors.toList()) ); Assert.assertEquals( ImmutableList.of(), - handler.getCompletedTaskInfo(DateTimes.of("2014-01-01"), null, null) + handler.getTaskInfos(CompleteTaskLookup.withTasksCreatedPriorTo(null, DateTimes.of("2014-01-01")), null) ); Assert.assertTrue(handler.setStatus(entryId, false, status1)); @@ -176,12 +178,12 @@ public class SQLMetadataStorageActionHandlerTest Assert.assertEquals( ImmutableList.of(), - handler.getCompletedTaskInfo(DateTimes.of("2014-01-03"), null, null) + handler.getTaskInfos(CompleteTaskLookup.withTasksCreatedPriorTo(null, DateTimes.of("2014-01-03")), null) ); Assert.assertEquals( ImmutableList.of(status1), - handler.getCompletedTaskInfo(DateTimes.of("2014-01-01"), null, null) + handler.getTaskInfos(CompleteTaskLookup.withTasksCreatedPriorTo(null, DateTimes.of("2014-01-01")), null) .stream() .map(TaskInfo::getStatus) .collect(Collectors.toList()) @@ -199,9 +201,11 @@ public class SQLMetadataStorageActionHandlerTest handler.insert(entryId, DateTimes.of(StringUtils.format("2014-01-%02d", i)), "test", entry, false, status); } - final List, Map>> statuses = handler.getCompletedTaskInfo( - DateTimes.of("2014-01-01"), - 7, + final List, Map>> statuses = handler.getTaskInfos( + CompleteTaskLookup.withTasksCreatedPriorTo( + 7, + DateTimes.of("2014-01-01") + ), null ); Assert.assertEquals(7, statuses.size()); @@ -222,9 +226,11 @@ public class SQLMetadataStorageActionHandlerTest handler.insert(entryId, DateTimes.of(StringUtils.format("2014-01-%02d", i)), "test", entry, false, status); } - final List, Map>> statuses = handler.getCompletedTaskInfo( - DateTimes.of("2014-01-01"), - 10, + final List, Map>> statuses = handler.getTaskInfos( + CompleteTaskLookup.withTasksCreatedPriorTo( + 10, + DateTimes.of("2014-01-01") + ), null ); Assert.assertEquals(5, statuses.size()); @@ -409,14 +415,15 @@ public class SQLMetadataStorageActionHandlerTest Assert.assertEquals( ImmutableList.of(entryId2), - handler.getActiveTaskInfo(null).stream() + handler.getTaskInfos(ActiveTaskLookup.getInstance(), null).stream() .map(taskInfo -> taskInfo.getId()) .collect(Collectors.toList()) ); Assert.assertEquals( ImmutableList.of(entryId3, entryId1), - handler.getCompletedTaskInfo(DateTimes.of("2014-01-01"), null, null).stream() + handler.getTaskInfos(CompleteTaskLookup.withTasksCreatedPriorTo(null, DateTimes.of("2014-01-01")), null) + .stream() .map(taskInfo -> taskInfo.getId()) .collect(Collectors.toList()) @@ -426,13 +433,14 @@ public class SQLMetadataStorageActionHandlerTest // active task not removed. Assert.assertEquals( ImmutableList.of(entryId2), - handler.getActiveTaskInfo(null).stream() + handler.getTaskInfos(ActiveTaskLookup.getInstance(), null).stream() .map(taskInfo -> taskInfo.getId()) .collect(Collectors.toList()) ); Assert.assertEquals( ImmutableList.of(entryId3), - handler.getCompletedTaskInfo(DateTimes.of("2014-01-01"), null, null).stream() + handler.getTaskInfos(CompleteTaskLookup.withTasksCreatedPriorTo(null, DateTimes.of("2014-01-01")), null) + .stream() .map(taskInfo -> taskInfo.getId()) .collect(Collectors.toList())