From f970757efc3413ce97b50fd06c57ac6f5ec39c46 Mon Sep 17 00:00:00 2001 From: AmatyaAvadhanula Date: Thu, 16 Jun 2022 22:30:37 +0530 Subject: [PATCH] Optimize overlord GET /tasks memory usage (#12404) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The web-console (indirectly) calls the Overlord’s GET tasks API to fetch the tasks' summary which in turn queries the metadata tasks table. This query tries to fetch several columns, including payload, of all the rows at once. This introduces a significant memory overhead and can cause unresponsiveness or overlord failure when the ingestion tab is opened multiple times (due to several parallel calls to this API) Another thing to note is that the task table (the payload column in particular) can be very large. Extracting large payloads from such tables can be very slow, leading to slow UI. While we are fixing the memory pressure in the overlord, we can also fix the slowness in UI caused by fetching large payloads from the table. Fetching large payloads also puts pressure on the metadata store as reported in the community (Metadata store query performance degrades as the tasks in druid_tasks table grows · Issue #12318 · apache/druid ) The task summaries returned as a response for the API are several times smaller and can fit comfortably in memory. So, there is an opportunity here to fix the memory usage, slow ingestion, and under-pressure metadata store by removing the need to handle large payloads in every layer we can. Of course, the solution becomes complex as we try to fix more layers. With that in mind, this page captures two approaches. They vary in complexity and also in the degree to which they fix the aforementioned problems. --- .../apache/druid/indexer/TaskIdentifier.java | 110 ++++ .../apache/druid/indexer/TaskStatusPlus.java | 27 + .../MetadataStorageActionHandler.java | 30 +- .../storage/sqlserver/SQLServerConnector.java | 7 + .../sqlserver/SQLServerConnectorTest.java | 11 + .../storage/mysql/MySQLConnector.java | 7 + .../storage/mysql/MySQLConnectorTest.java | 12 + .../postgresql/PostgreSQLConnector.java | 7 + .../postgresql/PostgreSQLConnectorTest.java | 14 + .../druid/indexing/common/task/Task.java | 18 + .../overlord/HeapMemoryTaskStorage.java | 13 + .../overlord/MetadataTaskStorage.java | 39 +- .../druid/indexing/overlord/TaskStorage.java | 16 + .../overlord/TaskStorageQueryAdapter.java | 16 +- .../overlord/http/OverlordResource.java | 92 ++-- .../overlord/http/OverlordResourceTest.java | 468 ++++-------------- .../results/auth_test_sys_schema_tasks.json | 4 +- .../druid/metadata/SQLMetadataConnector.java | 58 ++- .../SQLMetadataStorageActionHandler.java | 405 ++++++++++++++- .../storage/derby/DerbyConnector.java | 29 ++ .../metadata/SQLMetadataConnectorTest.java | 46 ++ .../SQLMetadataStorageActionHandlerTest.java | 339 ++++++++++--- 22 files changed, 1227 insertions(+), 541 deletions(-) create mode 100644 core/src/main/java/org/apache/druid/indexer/TaskIdentifier.java diff --git a/core/src/main/java/org/apache/druid/indexer/TaskIdentifier.java b/core/src/main/java/org/apache/druid/indexer/TaskIdentifier.java new file mode 100644 index 00000000000..753de56328b --- /dev/null +++ b/core/src/main/java/org/apache/druid/indexer/TaskIdentifier.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexer; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.base.Preconditions; + +import javax.annotation.Nullable; +import java.util.Objects; + +/** + * Model class containing the id, type and groupId of a task + * These fields are extracted from the task payload for the new schema and this model can be used for migration as well. + */ +public class TaskIdentifier +{ + + private final String id; + + @Nullable + private final String type; + + @Nullable + private final String groupId; + + @JsonCreator + public TaskIdentifier( + @JsonProperty("id") String id, + @JsonProperty("groupId") @Nullable String groupId, + @JsonProperty("type") @Nullable String type // nullable for backward compatibility + ) + { + this.id = Preconditions.checkNotNull(id, "id"); + this.groupId = groupId; + this.type = type; + } + + @JsonProperty + public String getId() + { + return id; + } + + @Nullable + @JsonProperty + public String getGroupId() + { + return groupId; + } + + @Nullable + @JsonProperty + public String getType() + { + return type; + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + TaskIdentifier that = (TaskIdentifier) o; + return Objects.equals(getId(), that.getId()) && + Objects.equals(getGroupId(), that.getGroupId()) && + Objects.equals(getType(), that.getType()); + } + + @Override + public int hashCode() + { + return Objects.hash( + getId(), + getGroupId(), + getType() + ); + } + + @Override + public String toString() + { + return "TaskIdentifier{" + + "id='" + id + '\'' + + ", groupId='" + groupId + '\'' + + ", type='" + type + '\'' + + '}'; + } +} diff --git a/core/src/main/java/org/apache/druid/indexer/TaskStatusPlus.java b/core/src/main/java/org/apache/druid/indexer/TaskStatusPlus.java index 75bbefa0eaf..af04db21113 100644 --- a/core/src/main/java/org/apache/druid/indexer/TaskStatusPlus.java +++ b/core/src/main/java/org/apache/druid/indexer/TaskStatusPlus.java @@ -22,6 +22,7 @@ package org.apache.druid.indexer; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Preconditions; +import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.RE; import org.joda.time.DateTime; @@ -31,6 +32,7 @@ import java.util.Objects; public class TaskStatusPlus { private final String id; + @Nullable private final String type; private final DateTime createdTime; private final DateTime queueInsertionTime; @@ -252,4 +254,29 @@ public class TaskStatusPlus ", errorMsg='" + errorMsg + '\'' + '}'; } + + /** + * Convert a TaskInfo class of TaskIdentifier and TaskStatus to a TaskStatusPlus + * Applicable only for completed or waiting tasks since a TaskInfo doesn't have the exhaustive info for running tasks + * + * @param taskIdentifierInfo TaskInfo pair + * @return corresponding TaskStatusPlus + */ + public static TaskStatusPlus fromTaskIdentifierInfo(TaskInfo taskIdentifierInfo) + { + TaskStatus status = taskIdentifierInfo.getStatus(); + return new TaskStatusPlus( + taskIdentifierInfo.getId(), + taskIdentifierInfo.getTask().getGroupId(), + taskIdentifierInfo.getTask().getType(), + taskIdentifierInfo.getCreatedTime(), + DateTimes.EPOCH, + status.getStatusCode(), + status.getStatusCode().isComplete() ? RunnerTaskState.NONE : RunnerTaskState.WAITING, + status.getDuration(), + status.getLocation(), + taskIdentifierInfo.getDataSource(), + status.getErrorMsg() + ); + } } diff --git a/core/src/main/java/org/apache/druid/metadata/MetadataStorageActionHandler.java b/core/src/main/java/org/apache/druid/metadata/MetadataStorageActionHandler.java index b4d50aaa3d2..ca6ede6d32f 100644 --- a/core/src/main/java/org/apache/druid/metadata/MetadataStorageActionHandler.java +++ b/core/src/main/java/org/apache/druid/metadata/MetadataStorageActionHandler.java @@ -20,6 +20,7 @@ package org.apache.druid.metadata; import com.google.common.base.Optional; +import org.apache.druid.indexer.TaskIdentifier; import org.apache.druid.indexer.TaskInfo; import org.apache.druid.metadata.TaskLookup.TaskLookupType; import org.joda.time.DateTime; @@ -41,6 +42,8 @@ public interface MetadataStorageActionHandler> getTaskStatusList( + Map taskLookups, + @Nullable String datasource + ); + default List> getTaskInfos( TaskLookup taskLookup, @Nullable String datasource @@ -173,4 +193,10 @@ public interface MetadataStorageActionHandler toTaskIdentifierInfo(TaskInfo taskInfo) + { + return new TaskInfo<>( + taskInfo.getId(), + taskInfo.getCreatedTime(), + taskInfo.getStatus(), + taskInfo.getDataSource(), + taskInfo.getTask().getMetadata() + ); + } } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/HeapMemoryTaskStorage.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/HeapMemoryTaskStorage.java index de9ebb72812..8ef5970982a 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/HeapMemoryTaskStorage.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/HeapMemoryTaskStorage.java @@ -30,6 +30,7 @@ import com.google.errorprone.annotations.concurrent.GuardedBy; import com.google.inject.Inject; import org.apache.druid.indexer.TaskInfo; import org.apache.druid.indexer.TaskStatus; +import org.apache.druid.indexer.TaskStatusPlus; import org.apache.druid.indexing.common.TaskLock; import org.apache.druid.indexing.common.actions.TaskAction; import org.apache.druid.indexing.common.config.TaskStorageConfig; @@ -233,6 +234,18 @@ public class HeapMemoryTaskStorage implements TaskStorage return tasks; } + @Override + public List getTaskStatusPlusList( + Map taskLookups, + @Nullable String datasource + ) + { + return getTaskInfos(taskLookups, datasource).stream() + .map(Task::toTaskIdentifierInfo) + .map(TaskStatusPlus::fromTaskIdentifierInfo) + .collect(Collectors.toList()); + } + private List> getRecentlyCreatedAlreadyFinishedTaskInfoSince( DateTime start, @Nullable Integer n, diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/MetadataTaskStorage.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/MetadataTaskStorage.java index 86597d5cfd2..7c7fbd3f53c 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/MetadataTaskStorage.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/MetadataTaskStorage.java @@ -29,6 +29,7 @@ import com.google.common.collect.Maps; import com.google.inject.Inject; import org.apache.druid.indexer.TaskInfo; import org.apache.druid.indexer.TaskStatus; +import org.apache.druid.indexer.TaskStatusPlus; import org.apache.druid.indexing.common.TaskLock; import org.apache.druid.indexing.common.actions.TaskAction; import org.apache.druid.indexing.common.config.TaskStorageConfig; @@ -58,6 +59,7 @@ import java.util.stream.Collectors; public class MetadataTaskStorage implements TaskStorage { + private static final MetadataStorageActionHandlerTypes TASK_TYPES = new MetadataStorageActionHandlerTypes() { @Override @@ -115,6 +117,8 @@ public class MetadataTaskStorage implements TaskStorage public void start() { metadataStorageConnector.createTaskTables(); + // begins migration of existing tasks to new schema + handler.populateTaskTypeAndGroupIdAsync(); } @LifecycleStop @@ -144,7 +148,9 @@ public class MetadataTaskStorage implements TaskStorage task.getDataSource(), task, status.isRunnable(), - status + status, + task.getType(), + task.getGroupId() ); } catch (Exception e) { @@ -226,21 +232,44 @@ public class MetadataTaskStorage implements TaskStorage @Nullable String datasource ) { - Map theTaskLookups = Maps.newHashMapWithExpectedSize(taskLookups.size()); + Map theTaskLookups = processTaskLookups(taskLookups); + return Collections.unmodifiableList(handler.getTaskInfos(theTaskLookups, datasource)); + } + + @Override + public List getTaskStatusPlusList( + Map taskLookups, + @Nullable String datasource + ) + { + Map processedTaskLookups = processTaskLookups(taskLookups); + return Collections.unmodifiableList( + handler.getTaskStatusList(processedTaskLookups, datasource) + .stream() + .map(TaskStatusPlus::fromTaskIdentifierInfo) + .collect(Collectors.toList()) + ); + } + + private Map processTaskLookups( + Map taskLookups + ) + { + Map processedTaskLookups = Maps.newHashMapWithExpectedSize(taskLookups.size()); for (Entry entry : taskLookups.entrySet()) { if (entry.getKey() == TaskLookupType.COMPLETE) { CompleteTaskLookup completeTaskLookup = (CompleteTaskLookup) entry.getValue(); - theTaskLookups.put( + processedTaskLookups.put( entry.getKey(), completeTaskLookup.hasTaskCreatedTimeFilter() ? completeTaskLookup : completeTaskLookup.withDurationBeforeNow(config.getRecentlyFinishedThreshold()) ); } else { - theTaskLookups.put(entry.getKey(), entry.getValue()); + processedTaskLookups.put(entry.getKey(), entry.getValue()); } } - return Collections.unmodifiableList(handler.getTaskInfos(theTaskLookups, datasource)); + return processedTaskLookups; } @Override diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskStorage.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskStorage.java index cf858ebcc30..e9b0af24057 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskStorage.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskStorage.java @@ -22,6 +22,7 @@ package org.apache.druid.indexing.overlord; import com.google.common.base.Optional; import org.apache.druid.indexer.TaskInfo; import org.apache.druid.indexer.TaskStatus; +import org.apache.druid.indexer.TaskStatusPlus; import org.apache.druid.indexing.common.TaskLock; import org.apache.druid.indexing.common.actions.TaskAction; import org.apache.druid.indexing.common.task.Task; @@ -150,6 +151,21 @@ public interface TaskStorage */ List getActiveTasksByDatasource(String datasource); + /** + * Returns the status of tasks in metadata storage as TaskStatusPlus + * No particular order is guaranteed, but implementations are encouraged to return tasks in ascending order of creation. + * + * The returned list can contain active tasks and complete tasks depending on the {@code taskLookups} parameter. + * See {@link TaskLookup} for more details of active and complete tasks. + * + * @param taskLookups lookup types and filters + * @param datasource datasource filter + */ + List getTaskStatusPlusList( + Map taskLookups, + @Nullable String datasource + ); + /** * Returns a list of tasks stored in the storage facility as {@link TaskInfo}. No * particular order is guaranteed, but implementations are encouraged to return tasks in ascending order of creation. diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskStorageQueryAdapter.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskStorageQueryAdapter.java index f4338c90967..3fa570ccb32 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskStorageQueryAdapter.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskStorageQueryAdapter.java @@ -23,16 +23,15 @@ import com.google.common.base.Optional; import com.google.inject.Inject; import org.apache.druid.indexer.TaskInfo; import org.apache.druid.indexer.TaskStatus; +import org.apache.druid.indexer.TaskStatusPlus; import org.apache.druid.indexing.common.actions.SegmentInsertAction; import org.apache.druid.indexing.common.actions.SegmentTransactionalInsertAction; import org.apache.druid.indexing.common.actions.TaskAction; import org.apache.druid.indexing.common.task.Task; import org.apache.druid.metadata.TaskLookup; import org.apache.druid.metadata.TaskLookup.ActiveTaskLookup; -import org.apache.druid.metadata.TaskLookup.CompleteTaskLookup; import org.apache.druid.metadata.TaskLookup.TaskLookupType; import org.apache.druid.timeline.DataSegment; -import org.joda.time.Duration; import org.joda.time.Interval; import javax.annotation.Nullable; @@ -85,21 +84,12 @@ public class TaskStorageQueryAdapter ); } - public List> getCompletedTaskInfoByCreatedTimeDuration( - @Nullable Integer maxTaskStatuses, - @Nullable Duration duration, - @Nullable String dataSource - ) - { - return storage.getTaskInfos(CompleteTaskLookup.of(maxTaskStatuses, duration), dataSource); - } - - public List> getTaskInfos( + public List getTaskStatusPlusList( Map taskLookups, @Nullable String dataSource ) { - return storage.getTaskInfos(taskLookups, dataSource); + return storage.getTaskStatusPlusList(taskLookups, dataSource); } public Optional getTask(final String taskid) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java index 638fbf2df4e..a111ebe787a 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java @@ -690,7 +690,7 @@ public class OverlordResource taskMaster.getTaskRunner(), taskRunner -> { final List authorizedList = securedTaskStatusPlus( - getTasks( + getTaskStatusPlusList( taskRunner, TaskStateLookup.fromString(state), dataSource, @@ -706,7 +706,7 @@ public class OverlordResource ); } - private List getTasks( + private List getTaskStatusPlusList( TaskRunner taskRunner, TaskStateLookup state, @Nullable String dataSource, @@ -729,7 +729,7 @@ public class OverlordResource // This way, we can use the snapshot from taskStorage as the source of truth for the set of tasks to process // and use the snapshot from taskRunner as a reference for potential task state updates happened // after the first snapshotting. - Stream> taskInfoStreamFromTaskStorage = getTaskInfoStreamFromTaskStorage( + Stream taskStatusPlusStream = getTaskStatusPlusList( state, dataSource, createdTimeDuration, @@ -745,87 +745,57 @@ public class OverlordResource if (state == TaskStateLookup.PENDING || state == TaskStateLookup.RUNNING) { // We are interested in only those tasks which are in taskRunner. - taskInfoStreamFromTaskStorage = taskInfoStreamFromTaskStorage - .filter(info -> runnerWorkItems.containsKey(info.getId())); + taskStatusPlusStream = taskStatusPlusStream + .filter(statusPlus -> runnerWorkItems.containsKey(statusPlus.getId())); } - final List> taskInfoFromTaskStorage = taskInfoStreamFromTaskStorage - .collect(Collectors.toList()); + final List taskStatusPlusList = taskStatusPlusStream.collect(Collectors.toList()); // Separate complete and active tasks from taskStorage. // Note that taskStorage can return only either complete tasks or active tasks per TaskLookupType. - final List> completeTaskInfoFromTaskStorage = new ArrayList<>(); - final List> activeTaskInfoFromTaskStorage = new ArrayList<>(); - for (TaskInfo info : taskInfoFromTaskStorage) { - if (info.getStatus().isComplete()) { - completeTaskInfoFromTaskStorage.add(info); + final List completeTaskStatusPlusList = new ArrayList<>(); + final List activeTaskStatusPlusList = new ArrayList<>(); + for (TaskStatusPlus statusPlus : taskStatusPlusList) { + if (statusPlus.getStatusCode().isComplete()) { + completeTaskStatusPlusList.add(statusPlus); } else { - activeTaskInfoFromTaskStorage.add(info); + activeTaskStatusPlusList.add(statusPlus); } } - final List statuses = new ArrayList<>(); - completeTaskInfoFromTaskStorage.forEach(taskInfo -> statuses.add( - new TaskStatusPlus( - taskInfo.getId(), - taskInfo.getTask() == null ? null : taskInfo.getTask().getGroupId(), - taskInfo.getTask() == null ? null : taskInfo.getTask().getType(), - taskInfo.getCreatedTime(), - DateTimes.EPOCH, - taskInfo.getStatus().getStatusCode(), - RunnerTaskState.NONE, - taskInfo.getStatus().getDuration(), - taskInfo.getStatus().getLocation(), - taskInfo.getDataSource(), - taskInfo.getStatus().getErrorMsg() - ) - )); + final List taskStatuses = new ArrayList<>(completeTaskStatusPlusList); - activeTaskInfoFromTaskStorage.forEach(taskInfo -> { - final TaskRunnerWorkItem runnerWorkItem = runnerWorkItems.get(taskInfo.getId()); + activeTaskStatusPlusList.forEach(statusPlus -> { + final TaskRunnerWorkItem runnerWorkItem = runnerWorkItems.get(statusPlus.getId()); if (runnerWorkItem == null) { // a task is assumed to be a waiting task if it exists in taskStorage but not in taskRunner. if (state == TaskStateLookup.WAITING || state == TaskStateLookup.ALL) { - statuses.add( - new TaskStatusPlus( - taskInfo.getId(), - taskInfo.getTask() == null ? null : taskInfo.getTask().getGroupId(), - taskInfo.getTask() == null ? null : taskInfo.getTask().getType(), - taskInfo.getCreatedTime(), - DateTimes.EPOCH, - taskInfo.getStatus().getStatusCode(), - RunnerTaskState.WAITING, - taskInfo.getStatus().getDuration(), - taskInfo.getStatus().getLocation(), - taskInfo.getDataSource(), - taskInfo.getStatus().getErrorMsg() - ) - ); + taskStatuses.add(statusPlus); } } else { if (state == TaskStateLookup.PENDING || state == TaskStateLookup.RUNNING || state == TaskStateLookup.ALL) { - statuses.add( + taskStatuses.add( new TaskStatusPlus( - taskInfo.getId(), - taskInfo.getTask() == null ? null : taskInfo.getTask().getGroupId(), - taskInfo.getTask() == null ? null : taskInfo.getTask().getType(), + statusPlus.getId(), + statusPlus.getGroupId(), + statusPlus.getType(), runnerWorkItem.getCreatedTime(), runnerWorkItem.getQueueInsertionTime(), - taskInfo.getStatus().getStatusCode(), - taskRunner.getRunnerTaskState(taskInfo.getId()), // this is racy for remoteTaskRunner - taskInfo.getStatus().getDuration(), + statusPlus.getStatusCode(), + taskRunner.getRunnerTaskState(statusPlus.getId()), // this is racy for remoteTaskRunner + statusPlus.getDuration(), runnerWorkItem.getLocation(), // location in taskInfo is only updated after the task is done. - taskInfo.getDataSource(), - taskInfo.getStatus().getErrorMsg() + statusPlus.getDataSource(), + statusPlus.getErrorMsg() ) ); } } }); - return statuses; + return taskStatuses; } - private Stream> getTaskInfoStreamFromTaskStorage( + private Stream getTaskStatusPlusList( TaskStateLookup state, @Nullable String dataSource, Duration createdTimeDuration, @@ -861,16 +831,16 @@ public class OverlordResource throw new IAE("Unknown state: [%s]", state); } - final Stream> taskInfoStreamFromTaskStorage = taskStorageQueryAdapter.getTaskInfos( + final Stream taskStatusPlusStream = taskStorageQueryAdapter.getTaskStatusPlusList( taskLookups, dataSource ).stream(); if (type != null) { - return taskInfoStreamFromTaskStorage.filter( - info -> type.equals(info.getTask() == null ? null : info.getTask().getType()) + return taskStatusPlusStream.filter( + statusPlus -> type.equals(statusPlus == null ? null : statusPlus.getType()) ); } else { - return taskInfoStreamFromTaskStorage; + return taskStatusPlusStream; } } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordResourceTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordResourceTest.java index 15a8619eb28..f01668fd37a 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordResourceTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordResourceTest.java @@ -32,10 +32,6 @@ import org.apache.druid.indexer.TaskLocation; import org.apache.druid.indexer.TaskState; import org.apache.druid.indexer.TaskStatus; import org.apache.druid.indexer.TaskStatusPlus; -import org.apache.druid.indexing.common.TaskToolbox; -import org.apache.druid.indexing.common.actions.TaskActionClient; -import org.apache.druid.indexing.common.config.TaskConfig; -import org.apache.druid.indexing.common.task.AbstractTask; import org.apache.druid.indexing.common.task.NoopTask; import org.apache.druid.indexing.common.task.Task; import org.apache.druid.indexing.overlord.ImmutableWorkerInfo; @@ -231,40 +227,16 @@ public class OverlordResourceTest { expectAuthorizationTokenCheck(); EasyMock.expect( - taskStorageQueryAdapter.getTaskInfos( + taskStorageQueryAdapter.getTaskStatusPlusList( ImmutableMap.of(TaskLookupType.ACTIVE, ActiveTaskLookup.getInstance()), null ) ).andStubReturn( ImmutableList.of( - new TaskInfo<>( - "id_1", - DateTime.now(ISOChronology.getInstanceUTC()), - TaskStatus.running("id_1"), - "allow", - getTaskWithIdAndDatasource("id_1", "allow") - ), - new TaskInfo<>( - "id_2", - DateTime.now(ISOChronology.getInstanceUTC()), - TaskStatus.running("id_2"), - "allow", - getTaskWithIdAndDatasource("id_2", "allow") - ), - new TaskInfo<>( - "id_3", - DateTime.now(ISOChronology.getInstanceUTC()), - TaskStatus.running("id_3"), - "deny", - getTaskWithIdAndDatasource("id_3", "deny") - ), - new TaskInfo<>( - "id_4", - DateTime.now(ISOChronology.getInstanceUTC()), - TaskStatus.running("id_4"), - "deny", - getTaskWithIdAndDatasource("id_4", "deny") - ) + createTaskStatusPlus("id_1", TaskState.RUNNING, "allow"), + createTaskStatusPlus("id_2", TaskState.RUNNING, "allow"), + createTaskStatusPlus("id_3", TaskState.RUNNING, "deny"), + createTaskStatusPlus("id_4", TaskState.RUNNING, "deny") ) ); @@ -297,31 +269,13 @@ public class OverlordResourceTest List tasksIds = ImmutableList.of("id_1", "id_2", "id_3"); EasyMock.expect( - taskStorageQueryAdapter.getTaskInfos( + taskStorageQueryAdapter.getTaskStatusPlusList( ImmutableMap.of(TaskLookupType.COMPLETE, CompleteTaskLookup.of(null, (Duration) null)), null) ).andStubReturn( ImmutableList.of( - new TaskInfo<>( - "id_1", - DateTime.now(ISOChronology.getInstanceUTC()), - TaskStatus.success("id_1"), - "deny", - getTaskWithIdAndDatasource("id_1", "deny") - ), - new TaskInfo<>( - "id_2", - DateTime.now(ISOChronology.getInstanceUTC()), - TaskStatus.success("id_2"), - "allow", - getTaskWithIdAndDatasource("id_2", "allow") - ), - new TaskInfo<>( - "id_3", - DateTime.now(ISOChronology.getInstanceUTC()), - TaskStatus.success("id_3"), - "allow", - getTaskWithIdAndDatasource("id_3", "allow") - ) + createTaskStatusPlus("id_1", TaskState.SUCCESS, "deny"), + createTaskStatusPlus("id_2", TaskState.SUCCESS, "allow"), + createTaskStatusPlus("id_3", TaskState.SUCCESS, "allow") ) ); EasyMock.replay( @@ -352,26 +306,14 @@ public class OverlordResourceTest ) ); EasyMock.expect( - taskStorageQueryAdapter.getTaskInfos( + taskStorageQueryAdapter.getTaskStatusPlusList( ImmutableMap.of(TaskLookupType.ACTIVE, ActiveTaskLookup.getInstance()), null ) ).andStubReturn( ImmutableList.of( - new TaskInfo<>( - "id_1", - DateTime.now(ISOChronology.getInstanceUTC()), - TaskStatus.running("id_1"), - "deny", - getTaskWithIdAndDatasource("id_1", "deny") - ), - new TaskInfo<>( - "id_2", - DateTime.now(ISOChronology.getInstanceUTC()), - TaskStatus.running("id_2"), - "allow", - getTaskWithIdAndDatasource("id_2", "allow") - ) + createTaskStatusPlus("id_1", TaskState.RUNNING, "deny"), + createTaskStatusPlus("id_2", TaskState.RUNNING, "allow") ) ); EasyMock.expect(taskRunner.getRunnerTaskState("id_1")).andStubReturn(RunnerTaskState.RUNNING); @@ -398,7 +340,7 @@ public class OverlordResourceTest { expectAuthorizationTokenCheck(); EasyMock.expect( - taskStorageQueryAdapter.getTaskInfos( + taskStorageQueryAdapter.getTaskStatusPlusList( ImmutableMap.of( TaskLookupType.ACTIVE, ActiveTaskLookup.getInstance(), @@ -409,48 +351,12 @@ public class OverlordResourceTest ) ).andStubReturn( ImmutableList.of( - new TaskInfo<>( - "id_5", - DateTime.now(ISOChronology.getInstanceUTC()), - TaskStatus.success("id_5"), - "deny", - getTaskWithIdAndDatasource("id_5", "deny") - ), - new TaskInfo<>( - "id_6", - DateTime.now(ISOChronology.getInstanceUTC()), - TaskStatus.success("id_6"), - "allow", - getTaskWithIdAndDatasource("id_6", "allow") - ), - new TaskInfo<>( - "id_7", - DateTime.now(ISOChronology.getInstanceUTC()), - TaskStatus.success("id_7"), - "allow", - getTaskWithIdAndDatasource("id_7", "allow") - ), - new TaskInfo<>( - "id_5", - DateTime.now(ISOChronology.getInstanceUTC()), - TaskStatus.success("id_5"), - "deny", - getTaskWithIdAndDatasource("id_5", "deny") - ), - new TaskInfo<>( - "id_6", - DateTime.now(ISOChronology.getInstanceUTC()), - TaskStatus.success("id_6"), - "allow", - getTaskWithIdAndDatasource("id_6", "allow") - ), - new TaskInfo<>( - "id_7", - DateTime.now(ISOChronology.getInstanceUTC()), - TaskStatus.success("id_7"), - "allow", - getTaskWithIdAndDatasource("id_7", "allow") - ) + createTaskStatusPlus("id_5", TaskState.SUCCESS, "deny"), + createTaskStatusPlus("id_6", TaskState.SUCCESS, "allow"), + createTaskStatusPlus("id_7", TaskState.SUCCESS, "allow"), + createTaskStatusPlus("id_5", TaskState.SUCCESS, "deny"), + createTaskStatusPlus("id_6", TaskState.SUCCESS, "allow"), + createTaskStatusPlus("id_7", TaskState.SUCCESS, "allow") ) ); @@ -481,7 +387,7 @@ public class OverlordResourceTest expectAuthorizationTokenCheck(); //completed tasks EasyMock.expect( - taskStorageQueryAdapter.getTaskInfos( + taskStorageQueryAdapter.getTaskStatusPlusList( ImmutableMap.of( TaskLookupType.COMPLETE, CompleteTaskLookup.of(null, null), @@ -492,55 +398,13 @@ public class OverlordResourceTest ) ).andStubReturn( ImmutableList.of( - new TaskInfo<>( - "id_5", - DateTime.now(ISOChronology.getInstanceUTC()), - TaskStatus.success("id_5"), - "allow", - getTaskWithIdAndDatasource("id_5", "allow") - ), - new TaskInfo<>( - "id_6", - DateTime.now(ISOChronology.getInstanceUTC()), - TaskStatus.success("id_6"), - "allow", - getTaskWithIdAndDatasource("id_6", "allow") - ), - new TaskInfo<>( - "id_7", - DateTime.now(ISOChronology.getInstanceUTC()), - TaskStatus.success("id_7"), - "allow", - getTaskWithIdAndDatasource("id_7", "allow") - ), - new TaskInfo<>( - "id_1", - DateTime.now(ISOChronology.getInstanceUTC()), - TaskStatus.running("id_1"), - "allow", - getTaskWithIdAndDatasource("id_1", "allow") - ), - new TaskInfo<>( - "id_2", - DateTime.now(ISOChronology.getInstanceUTC()), - TaskStatus.success("id_2"), - "allow", - getTaskWithIdAndDatasource("id_2", "allow") - ), - new TaskInfo<>( - "id_3", - DateTime.now(ISOChronology.getInstanceUTC()), - TaskStatus.success("id_3"), - "allow", - getTaskWithIdAndDatasource("id_3", "allow") - ), - new TaskInfo<>( - "id_4", - DateTime.now(ISOChronology.getInstanceUTC()), - TaskStatus.success("id_4"), - "allow", - getTaskWithIdAndDatasource("id_4", "allow") - ) + createTaskStatusPlus("id_5", TaskState.SUCCESS, "allow"), + createTaskStatusPlus("id_6", TaskState.SUCCESS, "allow"), + createTaskStatusPlus("id_7", TaskState.SUCCESS, "allow"), + createTaskStatusPlus("id_1", TaskState.RUNNING, "allow"), + createTaskStatusPlus("id_2", TaskState.SUCCESS, "allow"), + createTaskStatusPlus("id_3", TaskState.SUCCESS, "allow"), + createTaskStatusPlus("id_4", TaskState.SUCCESS, "allow") ) ); EasyMock.>expect(taskRunner.getKnownTasks()).andReturn( @@ -572,7 +436,7 @@ public class OverlordResourceTest expectAuthorizationTokenCheck(); //active tasks EasyMock.expect( - taskStorageQueryAdapter.getTaskInfos( + taskStorageQueryAdapter.getTaskStatusPlusList( ImmutableMap.of( TaskLookupType.ACTIVE, ActiveTaskLookup.getInstance() @@ -581,34 +445,10 @@ public class OverlordResourceTest ) ).andStubReturn( ImmutableList.of( - new TaskInfo<>( - "id_1", - DateTime.now(ISOChronology.getInstanceUTC()), - TaskStatus.running("id_1"), - "allow", - getTaskWithIdAndDatasource("id_1", "allow") - ), - new TaskInfo<>( - "id_2", - DateTime.now(ISOChronology.getInstanceUTC()), - TaskStatus.running("id_2"), - "allow", - getTaskWithIdAndDatasource("id_2", "allow") - ), - new TaskInfo<>( - "id_3", - DateTime.now(ISOChronology.getInstanceUTC()), - TaskStatus.running("id_3"), - "deny", - getTaskWithIdAndDatasource("id_3", "deny") - ), - new TaskInfo<>( - "id_4", - DateTime.now(ISOChronology.getInstanceUTC()), - TaskStatus.running("id_4"), - "deny", - getTaskWithIdAndDatasource("id_4", "deny") - ) + createTaskStatusPlus("id_1", TaskState.RUNNING, "allow"), + createTaskStatusPlus("id_2", TaskState.RUNNING, "allow"), + createTaskStatusPlus("id_3", TaskState.RUNNING, "deny"), + createTaskStatusPlus("id_4", TaskState.RUNNING, "deny") ) ); @@ -645,7 +485,7 @@ public class OverlordResourceTest { expectAuthorizationTokenCheck(); EasyMock.expect( - taskStorageQueryAdapter.getTaskInfos( + taskStorageQueryAdapter.getTaskStatusPlusList( ImmutableMap.of( TaskLookupType.ACTIVE, ActiveTaskLookup.getInstance() @@ -654,34 +494,10 @@ public class OverlordResourceTest ) ).andStubReturn( ImmutableList.of( - new TaskInfo<>( - "id_1", - DateTime.now(ISOChronology.getInstanceUTC()), - TaskStatus.running("id_1"), - "allow", - getTaskWithIdAndDatasource("id_1", "allow") - ), - new TaskInfo<>( - "id_2", - DateTime.now(ISOChronology.getInstanceUTC()), - TaskStatus.running("id_2"), - "allow", - getTaskWithIdAndDatasource("id_2", "allow") - ), - new TaskInfo<>( - "id_3", - DateTime.now(ISOChronology.getInstanceUTC()), - TaskStatus.running("id_3"), - "allow", - getTaskWithIdAndDatasource("id_3", "allow") - ), - new TaskInfo<>( - "id_4", - DateTime.now(ISOChronology.getInstanceUTC()), - TaskStatus.running("id_4"), - "deny", - getTaskWithIdAndDatasource("id_4", "deny") - ) + createTaskStatusPlus("id_1", TaskState.RUNNING, "allow"), + createTaskStatusPlus("id_2", TaskState.RUNNING, "allow"), + createTaskStatusPlus("id_3", TaskState.RUNNING, "allow"), + createTaskStatusPlus("id_4", TaskState.RUNNING, "deny") ) ); @@ -726,40 +542,16 @@ public class OverlordResourceTest ) ); EasyMock.expect( - taskStorageQueryAdapter.getTaskInfos( + taskStorageQueryAdapter.getTaskStatusPlusList( ImmutableMap.of(TaskLookupType.ACTIVE, ActiveTaskLookup.getInstance()), null ) ).andStubReturn( ImmutableList.of( - new TaskInfo<>( - "id_1", - DateTime.now(ISOChronology.getInstanceUTC()), - TaskStatus.running("id_1"), - "deny", - getTaskWithIdAndDatasource("id_1", "deny") - ), - new TaskInfo<>( - "id_2", - DateTime.now(ISOChronology.getInstanceUTC()), - TaskStatus.running("id_2"), - "allow", - getTaskWithIdAndDatasource("id_2", "allow") - ), - new TaskInfo<>( - "id_3", - DateTime.now(ISOChronology.getInstanceUTC()), - TaskStatus.running("id_3"), - "allow", - getTaskWithIdAndDatasource("id_3", "allow") - ), - new TaskInfo<>( - "id_4", - DateTime.now(ISOChronology.getInstanceUTC()), - TaskStatus.running("id_4"), - "deny", - getTaskWithIdAndDatasource("id_4", "deny") - ) + createTaskStatusPlus("id_1", TaskState.RUNNING, "deny"), + createTaskStatusPlus("id_2", TaskState.RUNNING, "allow"), + createTaskStatusPlus("id_3", TaskState.RUNNING, "allow"), + createTaskStatusPlus("id_4", TaskState.RUNNING, "deny") ) ); @@ -791,33 +583,15 @@ public class OverlordResourceTest { expectAuthorizationTokenCheck(); EasyMock.expect( - taskStorageQueryAdapter.getTaskInfos( + taskStorageQueryAdapter.getTaskStatusPlusList( ImmutableMap.of(TaskLookupType.COMPLETE, CompleteTaskLookup.of(null, (Duration) null)), null ) ).andStubReturn( ImmutableList.of( - new TaskInfo<>( - "id_1", - DateTime.now(ISOChronology.getInstanceUTC()), - TaskStatus.success("id_1"), - "allow", - getTaskWithIdAndDatasource("id_1", "allow") - ), - new TaskInfo<>( - "id_2", - DateTime.now(ISOChronology.getInstanceUTC()), - TaskStatus.success("id_2"), - "deny", - getTaskWithIdAndDatasource("id_2", "deny") - ), - new TaskInfo<>( - "id_3", - DateTime.now(ISOChronology.getInstanceUTC()), - TaskStatus.success("id_3"), - "allow", - getTaskWithIdAndDatasource("id_3", "allow") - ) + createTaskStatusPlus("id_1", TaskState.SUCCESS, "allow"), + createTaskStatusPlus("id_2", TaskState.SUCCESS, "deny"), + createTaskStatusPlus("id_3", TaskState.SUCCESS, "allow") ) ); EasyMock.replay( @@ -842,33 +616,15 @@ public class OverlordResourceTest expectAuthorizationTokenCheck(); Duration duration = new Period("PT86400S").toStandardDuration(); EasyMock.expect( - taskStorageQueryAdapter.getTaskInfos( + taskStorageQueryAdapter.getTaskStatusPlusList( EasyMock.anyObject(), EasyMock.anyObject() ) ).andStubReturn( ImmutableList.of( - new TaskInfo<>( - "id_1", - DateTime.now(ISOChronology.getInstanceUTC()), - TaskStatus.success("id_1"), - "deny", - getTaskWithIdAndDatasource("id_1", "deny") - ), - new TaskInfo<>( - "id_2", - DateTime.now(ISOChronology.getInstanceUTC()), - TaskStatus.success("id_2"), - "allow", - getTaskWithIdAndDatasource("id_2", "allow") - ), - new TaskInfo<>( - "id_3", - DateTime.now(ISOChronology.getInstanceUTC()), - TaskStatus.success("id_3"), - "allow", - getTaskWithIdAndDatasource("id_3", "allow") - ) + createTaskStatusPlus("id_1", TaskState.SUCCESS, "deny"), + createTaskStatusPlus("id_2", TaskState.SUCCESS, "allow"), + createTaskStatusPlus("id_3", TaskState.SUCCESS, "allow") ) ); @@ -886,7 +642,7 @@ public class OverlordResourceTest .getEntity(); Assert.assertEquals(2, responseObjects.size()); Assert.assertEquals("id_2", responseObjects.get(0).getId()); - Assert.assertTrue("DataSource Check", "allow".equals(responseObjects.get(0).getDataSource())); + Assert.assertEquals("DataSource Check", "allow", responseObjects.get(0).getDataSource()); } @Test @@ -898,7 +654,7 @@ public class OverlordResourceTest // Setup mocks to return completed, active, known, pending and running tasks EasyMock.expect( - taskStorageQueryAdapter.getTaskInfos( + taskStorageQueryAdapter.getTaskStatusPlusList( ImmutableMap.of( TaskLookupType.COMPLETE, CompleteTaskLookup.of(null, null), @@ -909,10 +665,10 @@ public class OverlordResourceTest ) ).andStubReturn( ImmutableList.of( - createTaskInfo("id_5", Datasources.WIKIPEDIA), - createTaskInfo("id_6", Datasources.BUZZFEED), - createTaskInfo("id_1", Datasources.WIKIPEDIA, TaskState.RUNNING, "test"), - createTaskInfo("id_4", Datasources.BUZZFEED, TaskState.RUNNING, "test") + createTaskStatusPlus("id_5", TaskState.SUCCESS, Datasources.WIKIPEDIA), + createTaskStatusPlus("id_6", TaskState.SUCCESS, Datasources.BUZZFEED), + createTaskStatusPlus("id_1", TaskState.RUNNING, Datasources.WIKIPEDIA), + createTaskStatusPlus("id_4", TaskState.RUNNING, Datasources.BUZZFEED) ) ); @@ -955,7 +711,7 @@ public class OverlordResourceTest // Setup mocks to return completed, active, known, pending and running tasks EasyMock.expect( - taskStorageQueryAdapter.getTaskInfos( + taskStorageQueryAdapter.getTaskStatusPlusList( ImmutableMap.of( TaskLookupType.COMPLETE, CompleteTaskLookup.of(null, null), @@ -966,10 +722,10 @@ public class OverlordResourceTest ) ).andStubReturn( ImmutableList.of( - createTaskInfo("id_5", Datasources.WIKIPEDIA), - createTaskInfo("id_6", Datasources.BUZZFEED), - createTaskInfo("id_1", Datasources.WIKIPEDIA, TaskState.RUNNING, "to-return"), - createTaskInfo("id_4", Datasources.WIKIPEDIA, TaskState.RUNNING, "test") + createTaskStatusPlus("id_5", TaskState.SUCCESS, Datasources.WIKIPEDIA), + createTaskStatusPlus("id_6", TaskState.SUCCESS, Datasources.BUZZFEED), + createTaskStatusPlus("id_1", TaskState.RUNNING, Datasources.WIKIPEDIA, "to-return"), + createTaskStatusPlus("id_4", TaskState.RUNNING, Datasources.BUZZFEED) ) ); @@ -1025,11 +781,11 @@ public class OverlordResourceTest } @Test - public void testGetNullCompleteTask() + public void testGetCompleteTasksOfAllDatasources() { expectAuthorizationTokenCheck(); EasyMock.expect( - taskStorageQueryAdapter.getTaskInfos( + taskStorageQueryAdapter.getTaskStatusPlusList( ImmutableMap.of( TaskLookupType.COMPLETE, CompleteTaskLookup.of(null, null) @@ -1038,27 +794,9 @@ public class OverlordResourceTest ) ).andStubReturn( ImmutableList.of( - new TaskInfo<>( - "id_1", - DateTime.now(ISOChronology.getInstanceUTC()), - TaskStatus.success("id_1"), - "allow", - null - ), - new TaskInfo<>( - "id_2", - DateTime.now(ISOChronology.getInstanceUTC()), - TaskStatus.success("id_2"), - "deny", - getTaskWithIdAndDatasource("id_2", "deny") - ), - new TaskInfo<>( - "id_3", - DateTime.now(ISOChronology.getInstanceUTC()), - TaskStatus.success("id_3"), - "allow", - getTaskWithIdAndDatasource("id_3", "allow") - ) + createTaskStatusPlus("id_1", TaskState.SUCCESS, "allow"), + createTaskStatusPlus("id_2", TaskState.SUCCESS, "deny"), + createTaskStatusPlus("id_3", TaskState.SUCCESS, "allow") ) ); EasyMock.replay( @@ -1074,9 +812,7 @@ public class OverlordResourceTest .getEntity(); Assert.assertEquals(2, responseObjects.size()); Assert.assertEquals("id_1", responseObjects.get(0).getId()); - TaskStatusPlus tsp = responseObjects.get(0); - Assert.assertEquals(null, tsp.getType()); - Assert.assertTrue("DataSource Check", "allow".equals(responseObjects.get(0).getDataSource())); + Assert.assertEquals("DataSource Check", "allow", responseObjects.get(0).getDataSource()); } @Test @@ -1379,19 +1115,19 @@ public class OverlordResourceTest Optional.of(mockQueue) ).anyTimes(); EasyMock.expect(taskStorageQueryAdapter.getActiveTaskInfo("datasource")).andStubReturn(ImmutableList.of( - new TaskInfo( + new TaskInfo<>( "id_1", DateTime.now(ISOChronology.getInstanceUTC()), TaskStatus.success("id_1"), "datasource", - getTaskWithIdAndDatasource("id_1", "datasource") + NoopTask.create("id_1", 1) ), - new TaskInfo( + new TaskInfo<>( "id_2", DateTime.now(ISOChronology.getInstanceUTC()), TaskStatus.success("id_2"), "datasource", - getTaskWithIdAndDatasource("id_2", "datasource") + NoopTask.create("id_2", 1) ) )); mockQueue.shutdown("id_1", "Shutdown request from user"); @@ -1679,7 +1415,7 @@ public class OverlordResourceTest Optional.of(workerTaskRunner) ).anyTimes(); EasyMock.expect(provisioningStrategy.getExpectedWorkerCapacity(workerInfos)).andReturn(invalidExpectedCapacity).anyTimes(); - AutoScaler autoScaler = EasyMock.createMock(AutoScaler.class); + AutoScaler autoScaler = EasyMock.createMock(AutoScaler.class); EasyMock.expect(autoScaler.getMinNumWorkers()).andReturn(0); EasyMock.expect(autoScaler.getMaxNumWorkers()).andReturn(maxNumWorkers); DefaultWorkerBehaviorConfig workerBehaviorConfig = new DefaultWorkerBehaviorConfig(null, autoScaler); @@ -1724,61 +1460,25 @@ public class OverlordResourceTest EasyMock.expectLastCall().anyTimes(); } - private Task getTaskWithIdAndDatasource(String id, String datasource) + private TaskStatusPlus createTaskStatusPlus(String taskId, TaskState taskState, String datasource) { - return getTaskWithIdAndDatasource(id, datasource, "test"); + return createTaskStatusPlus(taskId, taskState, datasource, "test"); } - private Task getTaskWithIdAndDatasource(String id, String datasource, String taskType) + private TaskStatusPlus createTaskStatusPlus(String taskId, TaskState taskState, String datasource, String taskType) { - return new AbstractTask(id, datasource, null) - { - @Override - public String getType() - { - return taskType; - } - - @Override - public boolean isReady(TaskActionClient taskActionClient) - { - return false; - } - - @Override - public void stopGracefully(TaskConfig taskConfig) - { - } - - @Override - public TaskStatus run(TaskToolbox toolbox) - { - return null; - } - }; - } - - private TaskInfo createTaskInfo( - String taskId, - String datasource - ) - { - return createTaskInfo(taskId, datasource, TaskState.SUCCESS, "test"); - } - - private TaskInfo createTaskInfo( - String taskId, - String datasource, - TaskState state, - String taskType - ) - { - return new TaskInfo<>( + return new TaskStatusPlus( taskId, + null, + taskType, DateTime.now(ISOChronology.getInstanceUTC()), - TaskStatus.fromCode(taskId, state), + DateTimes.EPOCH, + taskState, + taskState.isComplete() ? RunnerTaskState.NONE : RunnerTaskState.WAITING, + 100L, + TaskLocation.unknown(), datasource, - getTaskWithIdAndDatasource(taskId, datasource, taskType) + null ); } diff --git a/integration-tests/src/test/resources/results/auth_test_sys_schema_tasks.json b/integration-tests/src/test/resources/results/auth_test_sys_schema_tasks.json index 3f0e48fca71..fe09abad5c8 100644 --- a/integration-tests/src/test/resources/results/auth_test_sys_schema_tasks.json +++ b/integration-tests/src/test/resources/results/auth_test_sys_schema_tasks.json @@ -1,8 +1,8 @@ [ { "task_id": "index_auth_test_2030-04-30T01:13:31.893Z", - "type": null, - "group_id": null, + "group_id": "", + "type": "", "datasource": "auth_test", "created_time": "2030-04-30T01:13:31.893Z", "queue_insertion_time": "1970-01-01T00:00:00.000Z", diff --git a/server/src/main/java/org/apache/druid/metadata/SQLMetadataConnector.java b/server/src/main/java/org/apache/druid/metadata/SQLMetadataConnector.java index 9fe81dcdffb..781a4b99c8f 100644 --- a/server/src/main/java/org/apache/druid/metadata/SQLMetadataConnector.java +++ b/server/src/main/java/org/apache/druid/metadata/SQLMetadataConnector.java @@ -44,6 +44,8 @@ import org.skife.jdbi.v2.util.IntegerMapper; import javax.annotation.Nullable; import java.sql.Connection; +import java.sql.DatabaseMetaData; +import java.sql.ResultSet; import java.sql.SQLException; import java.sql.SQLRecoverableException; import java.sql.SQLTransientException; @@ -129,6 +131,8 @@ public abstract class SQLMetadataConnector implements MetadataStorageConnector public abstract boolean tableExists(Handle handle, String tableName); + public abstract String limitClause(int limit); + public T retryWithHandle( final HandleCallback callback, final Predicate myShouldRetry @@ -327,6 +331,29 @@ public abstract class SQLMetadataConnector implements MetadataStorageConnector ) ); } + + public boolean tableContainsColumn(Handle handle, String table, String column) + { + try { + DatabaseMetaData databaseMetaData = handle.getConnection().getMetaData(); + ResultSet columns = databaseMetaData.getColumns( + null, + null, + table, + column + ); + return columns.next(); + } + catch (SQLException e) { + return false; + } + } + + public void prepareTaskEntryTable(final String tableName) + { + createEntryTable(tableName); + alterEntryTable(tableName); + } public void createEntryTable(final String tableName) { @@ -350,6 +377,35 @@ public abstract class SQLMetadataConnector implements MetadataStorageConnector ); } + private void alterEntryTable(final String tableName) + { + try { + retryWithHandle( + new HandleCallback() + { + @Override + public Void withHandle(Handle handle) + { + final Batch batch = handle.createBatch(); + if (!tableContainsColumn(handle, tableName, "type")) { + log.info("Adding column: type to table[%s]", tableName); + batch.add(StringUtils.format("ALTER TABLE %1$s ADD COLUMN type VARCHAR(255)", tableName)); + } + if (!tableContainsColumn(handle, tableName, "group_id")) { + log.info("Adding column: group_id to table[%s]", tableName); + batch.add(StringUtils.format("ALTER TABLE %1$s ADD COLUMN group_id VARCHAR(255)", tableName)); + } + batch.execute(); + return null; + } + } + ); + } + catch (Exception e) { + log.warn(e, "Exception altering table"); + } + } + public void createLogTable(final String tableName, final String entryTypeName) { createTable( @@ -578,7 +634,7 @@ public abstract class SQLMetadataConnector implements MetadataStorageConnector if (config.get().isCreateTables()) { final MetadataStorageTablesConfig tablesConfig = tablesConfigSupplier.get(); final String entryType = tablesConfig.getTaskEntryType(); - createEntryTable(tablesConfig.getEntryTable(entryType)); + prepareTaskEntryTable(tablesConfig.getEntryTable(entryType)); createLogTable(tablesConfig.getLogTable(entryType), entryType); createLockTable(tablesConfig.getLockTable(entryType), entryType); } diff --git a/server/src/main/java/org/apache/druid/metadata/SQLMetadataStorageActionHandler.java b/server/src/main/java/org/apache/druid/metadata/SQLMetadataStorageActionHandler.java index 0b8837c640d..436af7ad694 100644 --- a/server/src/main/java/org/apache/druid/metadata/SQLMetadataStorageActionHandler.java +++ b/server/src/main/java/org/apache/druid/metadata/SQLMetadataStorageActionHandler.java @@ -21,10 +21,14 @@ package org.apache.druid.metadata; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Optional; import com.google.common.base.Throwables; import com.google.common.collect.Maps; +import org.apache.druid.indexer.TaskIdentifier; import org.apache.druid.indexer.TaskInfo; import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.IAE; @@ -34,6 +38,7 @@ import org.apache.druid.java.util.emitter.EmittingLogger; import org.apache.druid.metadata.TaskLookup.CompleteTaskLookup; import org.apache.druid.metadata.TaskLookup.TaskLookupType; import org.joda.time.DateTime; +import org.skife.jdbi.v2.Batch; import org.skife.jdbi.v2.FoldController; import org.skife.jdbi.v2.Folder3; import org.skife.jdbi.v2.Handle; @@ -53,6 +58,9 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; public abstract class SQLMetadataStorageActionHandler implements MetadataStorageActionHandler @@ -72,6 +80,11 @@ public abstract class SQLMetadataStorageActionHandler taskInfoMapper; + private final TaskStatusMapper taskStatusMapper; + private final TaskStatusMapperFromPayload taskStatusMapperFromPayload; + private final TaskIdentifierMapper taskIdentifierMapper; + + private Future taskMigrationCompleteFuture; @SuppressWarnings("PMD.UnnecessaryFullyQualifiedName") public SQLMetadataStorageActionHandler( @@ -98,6 +111,9 @@ public abstract class SQLMetadataStorageActionHandler(jsonMapper, entryType, statusType); + this.taskStatusMapper = new TaskStatusMapper(jsonMapper); + this.taskStatusMapperFromPayload = new TaskStatusMapperFromPayload(jsonMapper); + this.taskIdentifierMapper = new TaskIdentifierMapper(jsonMapper); } protected SQLMetadataConnector getConnector() @@ -142,15 +158,17 @@ public abstract class SQLMetadataStorageActionHandler) handle -> { final String sql = StringUtils.format( - "INSERT INTO %s (id, created_date, datasource, payload, active, status_payload) " - + "VALUES (:id, :created_date, :datasource, :payload, :active, :status_payload)", + "INSERT INTO %s (id, created_date, datasource, payload, type, group_id, active, status_payload) " + + "VALUES (:id, :created_date, :datasource, :payload, :type, :group_id, :active, :status_payload)", getEntryTable() ); handle.createStatement(sql) @@ -158,6 +176,8 @@ public abstract class SQLMetadataStorageActionHandler> query; switch (entry.getKey()) { case ACTIVE: - query = createActiveTaskInfoQuery( + query = createActiveTaskStreamingQuery( handle, dataSource ); @@ -292,7 +312,7 @@ public abstract class SQLMetadataStorageActionHandler> createCompletedTaskInfoQuery( + @Override + public List> getTaskStatusList( + Map taskLookups, + @Nullable String dataSource + ) + { + boolean fetchPayload = true; + if (taskMigrationCompleteFuture != null && taskMigrationCompleteFuture.isDone()) { + try { + fetchPayload = !taskMigrationCompleteFuture.get(); + } + catch (Exception e) { + log.info(e, "Exception getting task migration future"); + } + } + return getTaskStatusList(taskLookups, dataSource, fetchPayload); + } + + @VisibleForTesting + List> getTaskStatusList( + Map taskLookups, + @Nullable String dataSource, + boolean fetchPayload + ) + { + ResultSetMapper> resultSetMapper = + fetchPayload ? taskStatusMapperFromPayload : taskStatusMapper; + return getConnector().retryTransaction( + (handle, status) -> { + final List> taskMetadataInfos = new ArrayList<>(); + for (Entry entry : taskLookups.entrySet()) { + final Query> query; + switch (entry.getKey()) { + case ACTIVE: + query = fetchPayload + ? createActiveTaskStreamingQuery(handle, dataSource) + : createActiveTaskSummaryStreamingQuery(handle, dataSource); + taskMetadataInfos.addAll(query.map(resultSetMapper).list()); + break; + case COMPLETE: + CompleteTaskLookup completeTaskLookup = (CompleteTaskLookup) entry.getValue(); + DateTime priorTo = completeTaskLookup.getTasksCreatedPriorTo(); + Integer limit = completeTaskLookup.getMaxTaskStatuses(); + query = fetchPayload + ? createCompletedTaskStreamingQuery(handle, priorTo, limit, dataSource) + : createCompletedTaskSummaryStreamingQuery(handle, priorTo, limit, dataSource); + taskMetadataInfos.addAll(query.map(resultSetMapper).list()); + break; + default: + throw new IAE("Unknown TaskLookupType: [%s]", entry.getKey()); + } + } + return taskMetadataInfos; + }, + 3, + SQLMetadataConnector.DEFAULT_MAX_TRIES + ); + } + + /** + * Fetches the columns needed to build TaskStatusPlus for completed tasks + * Please note that this requires completion of data migration to avoid empty values for task type and groupId + * Recommended for GET /tasks API + * Uses streaming SQL query to avoid fetching too many rows at once into memory + * @param handle db handle + * @param dataSource datasource to which the tasks belong. null if we don't want to filter + * @return Query object for TaskStatusPlus for completed tasks of interest + */ + private Query> createCompletedTaskSummaryStreamingQuery( + Handle handle, + DateTime timestamp, + @Nullable Integer maxNumStatuses, + @Nullable String dataSource + ) + { + String sql = StringUtils.format( + "SELECT " + + " id, " + + " created_date, " + + " datasource, " + + " group_id, " + + " type, " + + " status_payload " + + "FROM " + + " %s " + + "WHERE " + + getWhereClauseForInactiveStatusesSinceQuery(dataSource) + + "ORDER BY created_date DESC", + getEntryTable() + ); + + if (maxNumStatuses != null) { + sql = decorateSqlWithLimit(sql); + } + Query> query = handle.createQuery(sql) + .bind("start", timestamp.toString()) + .setFetchSize(connector.getStreamingFetchSize()); + + if (maxNumStatuses != null) { + query = query.bind("n", maxNumStatuses); + } + if (dataSource != null) { + query = query.bind("ds", dataSource); + } + return query; + } + + /** + * Fetches the columns needed to build a Task object with payload for completed tasks + * This requires the task payload which can be large. Please use only when necessary. + * For example for ingestion tasks view before migration of the new columns + * Uses streaming SQL query to avoid fetching too many rows at once into memory + * @param handle db handle + * @param dataSource datasource to which the tasks belong. null if we don't want to filter + * @return Query object for completed TaskInfos of interest + */ + private Query> createCompletedTaskStreamingQuery( Handle handle, DateTime timestamp, @Nullable Integer maxNumStatuses, @@ -336,7 +472,9 @@ public abstract class SQLMetadataStorageActionHandler> query = handle.createQuery(sql).bind("start", timestamp.toString()); + Query> query = handle.createQuery(sql) + .bind("start", timestamp.toString()) + .setFetchSize(connector.getStreamingFetchSize()); if (maxNumStatuses != null) { query = query.bind("n", maxNumStatuses); @@ -358,7 +496,51 @@ public abstract class SQLMetadataStorageActionHandler> createActiveTaskInfoQuery(Handle handle, @Nullable String dataSource) + /** + * Fetches the columns needed to build TaskStatusPlus for active tasks + * Please note that this requires completion of data migration to avoid empty values for task type and groupId + * Recommended for GET /tasks API + * Uses streaming SQL query to avoid fetching too many rows at once into memory + * @param handle db handle + * @param dataSource datasource to which the tasks belong. null if we don't want to filter + * @return Query object for TaskStatusPlus for active tasks of interest + */ + private Query> createActiveTaskSummaryStreamingQuery(Handle handle, @Nullable String dataSource) + { + String sql = StringUtils.format( + "SELECT " + + " id, " + + " status_payload, " + + " group_id, " + + " type, " + + " datasource, " + + " created_date " + + "FROM " + + " %s " + + "WHERE " + + getWhereClauseForActiveStatusesQuery(dataSource) + + "ORDER BY created_date", + entryTable + ); + + Query> query = handle.createQuery(sql) + .setFetchSize(connector.getStreamingFetchSize()); + if (dataSource != null) { + query = query.bind("ds", dataSource); + } + return query; + } + + /** + * Fetches the columns needed to build Task objects with payload for active tasks + * This requires the task payload which can be large. Please use only when necessary. + * For example for ingestion tasks view before migration of the new columns + * Uses streaming SQL query to avoid fetching too many rows at once into memory + * @param handle db handle + * @param dataSource datasource to which the tasks belong. null if we don't want to filter + * @return Query object for active TaskInfos of interest + */ + private Query> createActiveTaskStreamingQuery(Handle handle, @Nullable String dataSource) { String sql = StringUtils.format( "SELECT " @@ -375,7 +557,8 @@ public abstract class SQLMetadataStorageActionHandler> query = handle.createQuery(sql); + Query> query = handle.createQuery(sql) + .setFetchSize(connector.getStreamingFetchSize()); if (dataSource != null) { query = query.bind("ds", dataSource); } @@ -391,6 +574,109 @@ public abstract class SQLMetadataStorageActionHandler> + { + private final ObjectMapper objectMapper; + + TaskStatusMapperFromPayload(ObjectMapper objectMapper) + { + this.objectMapper = objectMapper; + } + + @Override + public TaskInfo map(int index, ResultSet resultSet, StatementContext context) + throws SQLException + { + return toTaskIdentifierInfo(objectMapper, resultSet, true); + } + } + + private class TaskStatusMapper implements ResultSetMapper> + { + private final ObjectMapper objectMapper; + + TaskStatusMapper(ObjectMapper objectMapper) + { + this.objectMapper = objectMapper; + } + + @Override + public TaskInfo map(int index, ResultSet resultSet, StatementContext context) + throws SQLException + { + return toTaskIdentifierInfo(objectMapper, resultSet, false); + } + } + + private TaskInfo toTaskIdentifierInfo(ObjectMapper objectMapper, + ResultSet resultSet, + boolean usePayload + ) throws SQLException + { + String type; + String groupId; + if (usePayload) { + try { + ObjectNode payload = objectMapper.readValue(resultSet.getBytes("payload"), ObjectNode.class); + type = payload.get("type").asText(); + groupId = payload.get("groupId").asText(); + } + catch (IOException e) { + log.error(e, "Encountered exception while deserializing task payload"); + throw new SQLException(e); + } + } else { + type = resultSet.getString("type"); + groupId = resultSet.getString("group_id"); + } + + String id = resultSet.getString("id"); + DateTime createdTime = DateTimes.of(resultSet.getString("created_date")); + StatusType status; + try { + status = objectMapper.readValue(resultSet.getBytes("status_payload"), statusType); + } + catch (IOException e) { + log.error(e, "Encountered exception while deserializing task status_payload"); + throw new SQLException(e); + } + String datasource = resultSet.getString("datasource"); + TaskIdentifier taskIdentifier = new TaskIdentifier(id, groupId, type); + + return new TaskInfo<>(id, createdTime, status, datasource, taskIdentifier); + } + + static class TaskIdentifierMapper implements ResultSetMapper + { + private final ObjectMapper objectMapper; + + TaskIdentifierMapper(ObjectMapper objectMapper) + { + this.objectMapper = objectMapper; + } + + @Override + public TaskIdentifier map(int index, ResultSet resultSet, StatementContext context) + throws SQLException + { + try { + ObjectNode payload = objectMapper.readValue(resultSet.getBytes("payload"), ObjectNode.class); + // If field is absent (older task version), use blank string to avoid a loop of migration of such tasks. + JsonNode type = payload.get("type"); + JsonNode groupId = payload.get("groupId"); + return new TaskIdentifier( + resultSet.getString("id"), + groupId == null ? "" : groupId.asText(), + type == null ? "" : type.asText() + ); + } + catch (IOException e) { + log.error(e, "Encountered exception while deserializing task payload"); + throw new SQLException(e); + } + } + } + static class TaskInfoMapper implements ResultSetMapper> { private final ObjectMapper objectMapper; @@ -679,4 +965,105 @@ public abstract class SQLMetadataStorageActionHandler fetchTaskMetadatas(String tableName, String id, int limit) + { + List taskIdentifiers = new ArrayList<>(); + connector.retryWithHandle( + new HandleCallback() + { + @Override + public Void withHandle(Handle handle) + { + String sql = StringUtils.format( + "SELECT * FROM %1$s WHERE id > '%2$s' AND type IS null ORDER BY id %3$s", + tableName, + id, + connector.limitClause(limit) + ); + Query> query = handle.createQuery(sql); + taskIdentifiers.addAll(query.map(taskIdentifierMapper).list()); + return null; + } + } + ); + return taskIdentifiers; + } + + private void updateTaskMetadatas(String tasksTable, List taskIdentifiers) + { + connector.retryWithHandle( + new HandleCallback() + { + @Override + public Void withHandle(Handle handle) + { + Batch batch = handle.createBatch(); + String sql = "UPDATE %1$s SET type = '%2$s', group_id = '%3$s' WHERE id = '%4$s'"; + for (TaskIdentifier metadata : taskIdentifiers) { + batch.add(StringUtils.format(sql, tasksTable, metadata.getType(), metadata.getGroupId(), metadata.getId()) + ); + } + batch.execute(); + return null; + } + } + ); + } + + @Override + public void populateTaskTypeAndGroupIdAsync() + { + ExecutorService executorService = Executors.newSingleThreadExecutor(); + taskMigrationCompleteFuture = executorService.submit( + () -> populateTaskTypeAndGroupId() + ); + } + + /** + * Utility to migrate existing tasks to the new schema by populating type and groupId synchronously + * + * @return true if successful + */ + @VisibleForTesting + boolean populateTaskTypeAndGroupId() + { + log.info("Populate fields task and group_id of task entry table [%s] from payload", entryTable); + String id = ""; + int limit = 100; + int count = 0; + while (true) { + List taskIdentifiers; + try { + taskIdentifiers = fetchTaskMetadatas(entryTable, id, limit); + } + catch (Exception e) { + log.warn(e, "Task migration failed while reading entries from task table"); + return false; + } + if (taskIdentifiers.isEmpty()) { + break; + } + try { + updateTaskMetadatas(entryTable, taskIdentifiers); + count += taskIdentifiers.size(); + log.info("Successfully updated type and groupId for [%d] tasks", count); + } + catch (Exception e) { + log.warn(e, "Task migration failed while updating entries in task table"); + return false; + } + id = taskIdentifiers.get(taskIdentifiers.size() - 1).getId(); + + try { + Thread.sleep(1000); + } + catch (InterruptedException e) { + log.info("Interrupted, exiting!"); + Thread.currentThread().interrupt(); + } + } + log.info("Task migration for table [%s] successful", entryTable); + return true; + } } diff --git a/server/src/main/java/org/apache/druid/metadata/storage/derby/DerbyConnector.java b/server/src/main/java/org/apache/druid/metadata/storage/derby/DerbyConnector.java index 989d0decf3d..19d0c6b04f1 100644 --- a/server/src/main/java/org/apache/druid/metadata/storage/derby/DerbyConnector.java +++ b/server/src/main/java/org/apache/druid/metadata/storage/derby/DerbyConnector.java @@ -35,6 +35,11 @@ import org.skife.jdbi.v2.DBI; import org.skife.jdbi.v2.Handle; import org.skife.jdbi.v2.tweak.HandleCallback; +import java.sql.DatabaseMetaData; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.Locale; + @ManageLifecycle public class DerbyConnector extends SQLMetadataConnector { @@ -83,6 +88,24 @@ public class DerbyConnector extends SQLMetadataConnector .isEmpty(); } + @Override + public boolean tableContainsColumn(Handle handle, String table, String column) + { + try { + DatabaseMetaData databaseMetaData = handle.getConnection().getMetaData(); + ResultSet columns = databaseMetaData.getColumns( + null, + null, + table.toUpperCase(Locale.ENGLISH), + column.toUpperCase(Locale.ENGLISH) + ); + return columns.next(); + } + catch (SQLException e) { + return false; + } + } + @Override public String getSerialType() { @@ -114,6 +137,12 @@ public class DerbyConnector extends SQLMetadataConnector return "VALUES 1"; } + @Override + public String limitClause(int limit) + { + return String.format(Locale.ENGLISH, "FETCH NEXT %d ROWS ONLY", limit); + } + @Override public void exportTable( String tableName, diff --git a/server/src/test/java/org/apache/druid/metadata/SQLMetadataConnectorTest.java b/server/src/test/java/org/apache/druid/metadata/SQLMetadataConnectorTest.java index 9c55a252b92..1c192da475d 100644 --- a/server/src/test/java/org/apache/druid/metadata/SQLMetadataConnectorTest.java +++ b/server/src/test/java/org/apache/druid/metadata/SQLMetadataConnectorTest.java @@ -32,7 +32,11 @@ import org.skife.jdbi.v2.DBI; import org.skife.jdbi.v2.Handle; import org.skife.jdbi.v2.tweak.HandleCallback; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; @@ -86,6 +90,14 @@ public class SQLMetadataConnectorTest ); } + String taskTable = tablesConfig.getTasksTable(); + for (String column : Arrays.asList("type", "group_id")) { + Assert.assertTrue( + StringUtils.format("Tasks table column %s was not created!", column), + connector.tableContainsColumn(handle, taskTable, column) + ); + } + return null; } } @@ -170,6 +182,12 @@ public class SQLMetadataConnectorTest return 0; } + @Override + public String limitClause(int limit) + { + return ""; + } + @Override public String getQuoteString() { @@ -242,4 +260,32 @@ public class SQLMetadataConnectorTest Assert.assertEquals(dataSource.getMaxConnLifetimeMillis(), 1200000); Assert.assertEquals((long) dataSource.getDefaultQueryTimeout(), 30000); } + + private boolean verifyTaskTypeAndGroupId(String table, String id, String type, String groupId) + { + try { + return connector.retryWithHandle( + new HandleCallback() + { + @Override + public Boolean withHandle(Handle handle) throws SQLException + { + Statement statement = handle.getConnection().createStatement(); + ResultSet resultSet = statement.executeQuery( + StringUtils.format("SELECT * FROM %1$s WHERE id = '%2$s'", table, id) + ); + resultSet.next(); + boolean flag = type.equals(resultSet.getString("type")) + && groupId.equals(resultSet.getString("group_id")); + statement.close(); + return flag; + } + } + ); + } + catch (Exception e) { + e.printStackTrace(); + return false; + } + } } diff --git a/server/src/test/java/org/apache/druid/metadata/SQLMetadataStorageActionHandlerTest.java b/server/src/test/java/org/apache/druid/metadata/SQLMetadataStorageActionHandlerTest.java index 2e85e5a5d49..f89ae9b0ebd 100644 --- a/server/src/test/java/org/apache/druid/metadata/SQLMetadataStorageActionHandlerTest.java +++ b/server/src/test/java/org/apache/druid/metadata/SQLMetadataStorageActionHandlerTest.java @@ -25,7 +25,10 @@ import com.google.common.base.Optional; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; +import org.apache.druid.indexer.TaskIdentifier; import org.apache.druid.indexer.TaskInfo; +import org.apache.druid.indexer.TaskLocation; +import org.apache.druid.indexer.TaskState; import org.apache.druid.jackson.DefaultObjectMapper; import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.Pair; @@ -33,15 +36,26 @@ import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.jackson.JacksonUtils; import org.apache.druid.metadata.TaskLookup.ActiveTaskLookup; import org.apache.druid.metadata.TaskLookup.CompleteTaskLookup; +import org.joda.time.DateTime; +import org.joda.time.DateTimeZone; +import org.joda.time.Duration; import org.junit.Assert; import org.junit.Before; import org.junit.Rule; import org.junit.Test; import org.junit.rules.ExpectedException; +import org.skife.jdbi.v2.Handle; +import org.skife.jdbi.v2.tweak.HandleCallback; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.HashMap; import java.util.HashSet; import java.util.List; +import java.util.Locale; import java.util.Map; +import java.util.Random; +import java.util.UUID; import java.util.stream.Collectors; public class SQLMetadataStorageActionHandlerTest @@ -53,7 +67,13 @@ public class SQLMetadataStorageActionHandlerTest public final ExpectedException thrown = ExpectedException.none(); private static final ObjectMapper JSON_MAPPER = new DefaultObjectMapper(); - private SQLMetadataStorageActionHandler, Map, Map, Map> handler; + + + private static final Random RANDOM = new Random(1); + + private SQLMetadataStorageActionHandler, Map, Map, Map> handler; + + private final String entryTable = "entries"; @Before public void setUp() @@ -61,31 +81,30 @@ public class SQLMetadataStorageActionHandlerTest TestDerbyConnector connector = derbyConnectorRule.getConnector(); final String entryType = "entry"; - final String entryTable = "entries"; final String logTable = "logs"; final String lockTable = "locks"; - connector.createEntryTable(entryTable); + connector.prepareTaskEntryTable(entryTable); connector.createLockTable(lockTable, entryType); connector.createLogTable(logTable, entryType); handler = new DerbyMetadataStorageActionHandler<>( connector, JSON_MAPPER, - new MetadataStorageActionHandlerTypes, Map, Map, Map>() + new MetadataStorageActionHandlerTypes, Map, Map, Map>() { @Override - public TypeReference> getEntryType() + public TypeReference> getEntryType() { - return new TypeReference>() + return new TypeReference>() { }; } @Override - public TypeReference> getStatusType() + public TypeReference> getStatusType() { - return new TypeReference>() + return new TypeReference>() { }; } @@ -97,9 +116,9 @@ public class SQLMetadataStorageActionHandlerTest } @Override - public TypeReference> getLockType() + public TypeReference> getLockType() { - return new TypeReference>() + return new TypeReference>() { }; } @@ -114,13 +133,13 @@ public class SQLMetadataStorageActionHandlerTest @Test public void testEntryAndStatus() throws Exception { - Map entry = ImmutableMap.of("numericId", 1234); - Map status1 = ImmutableMap.of("count", 42); - Map status2 = ImmutableMap.of("count", 42, "temp", 1); + Map entry = ImmutableMap.of("numericId", 1234); + Map status1 = ImmutableMap.of("count", 42); + Map status2 = ImmutableMap.of("count", 42, "temp", 1); final String entryId = "1234"; - handler.insert(entryId, DateTimes.of("2014-01-02T00:00:00.123"), "testDataSource", entry, true, null); + handler.insert(entryId, DateTimes.of("2014-01-02T00:00:00.123"), "testDataSource", entry, true, null, "type", "group"); Assert.assertEquals( Optional.of(entry), @@ -195,13 +214,13 @@ public class SQLMetadataStorageActionHandlerTest { for (int i = 1; i < 11; i++) { final String entryId = "abcd_" + i; - final Map entry = ImmutableMap.of("a", i); - final Map status = ImmutableMap.of("count", i * 10); + final Map entry = ImmutableMap.of("a", i); + final Map status = ImmutableMap.of("count", i * 10); - handler.insert(entryId, DateTimes.of(StringUtils.format("2014-01-%02d", i)), "test", entry, false, status); + handler.insert(entryId, DateTimes.of(StringUtils.format("2014-01-%02d", i)), "test", entry, false, status, "type", "group"); } - final List, Map>> statuses = handler.getTaskInfos( + final List, Map>> statuses = handler.getTaskInfos( CompleteTaskLookup.withTasksCreatedPriorTo( 7, DateTimes.of("2014-01-01") @@ -210,7 +229,7 @@ public class SQLMetadataStorageActionHandlerTest ); Assert.assertEquals(7, statuses.size()); int i = 10; - for (TaskInfo, Map> status : statuses) { + for (TaskInfo, Map> status : statuses) { Assert.assertEquals(ImmutableMap.of("count", i-- * 10), status.getStatus()); } } @@ -220,13 +239,13 @@ public class SQLMetadataStorageActionHandlerTest { for (int i = 1; i < 6; i++) { final String entryId = "abcd_" + i; - final Map entry = ImmutableMap.of("a", i); - final Map status = ImmutableMap.of("count", i * 10); + final Map entry = ImmutableMap.of("a", i); + final Map status = ImmutableMap.of("count", i * 10); - handler.insert(entryId, DateTimes.of(StringUtils.format("2014-01-%02d", i)), "test", entry, false, status); + handler.insert(entryId, DateTimes.of(StringUtils.format("2014-01-%02d", i)), "test", entry, false, status, "type", "group"); } - final List, Map>> statuses = handler.getTaskInfos( + final List, Map>> statuses = handler.getTaskInfos( CompleteTaskLookup.withTasksCreatedPriorTo( 10, DateTimes.of("2014-01-01") @@ -235,7 +254,7 @@ public class SQLMetadataStorageActionHandlerTest ); Assert.assertEquals(5, statuses.size()); int i = 5; - for (TaskInfo, Map> status : statuses) { + for (TaskInfo, Map> status : statuses) { Assert.assertEquals(ImmutableMap.of("count", i-- * 10), status.getStatus()); } } @@ -244,23 +263,23 @@ public class SQLMetadataStorageActionHandlerTest public void testRepeatInsert() throws Exception { final String entryId = "abcd"; - Map entry = ImmutableMap.of("a", 1); - Map status = ImmutableMap.of("count", 42); + Map entry = ImmutableMap.of("a", 1); + Map status = ImmutableMap.of("count", 42); - handler.insert(entryId, DateTimes.of("2014-01-01"), "test", entry, true, status); + handler.insert(entryId, DateTimes.of("2014-01-01"), "test", entry, true, status, "type", "group"); thrown.expect(EntryExistsException.class); - handler.insert(entryId, DateTimes.of("2014-01-01"), "test", entry, true, status); + handler.insert(entryId, DateTimes.of("2014-01-01"), "test", entry, true, status, "type", "group"); } @Test public void testLogs() throws Exception { final String entryId = "abcd"; - Map entry = ImmutableMap.of("a", 1); - Map status = ImmutableMap.of("count", 42); + Map entry = ImmutableMap.of("a", 1); + Map status = ImmutableMap.of("count", 42); - handler.insert(entryId, DateTimes.of("2014-01-01"), "test", entry, true, status); + handler.insert(entryId, DateTimes.of("2014-01-01"), "test", entry, true, status, "type", "group"); Assert.assertEquals( ImmutableList.of(), @@ -289,32 +308,32 @@ public class SQLMetadataStorageActionHandlerTest public void testLocks() throws Exception { final String entryId = "ABC123"; - Map entry = ImmutableMap.of("a", 1); - Map status = ImmutableMap.of("count", 42); + Map entry = ImmutableMap.of("a", 1); + Map status = ImmutableMap.of("count", 42); - handler.insert(entryId, DateTimes.of("2014-01-01"), "test", entry, true, status); + handler.insert(entryId, DateTimes.of("2014-01-01"), "test", entry, true, status, "type", "group"); Assert.assertEquals( - ImmutableMap.>of(), + ImmutableMap.>of(), handler.getLocks("non_exist_entry") ); Assert.assertEquals( - ImmutableMap.>of(), + ImmutableMap.>of(), handler.getLocks(entryId) ); - final ImmutableMap lock1 = ImmutableMap.of("lock", 1); - final ImmutableMap lock2 = ImmutableMap.of("lock", 2); + final ImmutableMap lock1 = ImmutableMap.of("lock", 1); + final ImmutableMap lock2 = ImmutableMap.of("lock", 2); Assert.assertTrue(handler.addLock(entryId, lock1)); Assert.assertTrue(handler.addLock(entryId, lock2)); - final Map> locks = handler.getLocks(entryId); + final Map> locks = handler.getLocks(entryId); Assert.assertEquals(2, locks.size()); Assert.assertEquals( - ImmutableSet.>of(lock1, lock2), + ImmutableSet.>of(lock1, lock2), new HashSet<>(locks.values()) ); @@ -322,7 +341,7 @@ public class SQLMetadataStorageActionHandlerTest handler.removeLock(lockId); locks.remove(lockId); - final Map> updated = handler.getLocks(entryId); + final Map> updated = handler.getLocks(entryId); Assert.assertEquals( new HashSet<>(locks.values()), new HashSet<>(updated.values()) @@ -334,23 +353,23 @@ public class SQLMetadataStorageActionHandlerTest public void testReplaceLock() throws EntryExistsException { final String entryId = "ABC123"; - Map entry = ImmutableMap.of("a", 1); - Map status = ImmutableMap.of("count", 42); + Map entry = ImmutableMap.of("a", 1); + Map status = ImmutableMap.of("count", 42); - handler.insert(entryId, DateTimes.of("2014-01-01"), "test", entry, true, status); + handler.insert(entryId, DateTimes.of("2014-01-01"), "test", entry, true, status, "type", "group"); Assert.assertEquals( - ImmutableMap.>of(), + ImmutableMap.>of(), handler.getLocks("non_exist_entry") ); Assert.assertEquals( - ImmutableMap.>of(), + ImmutableMap.>of(), handler.getLocks(entryId) ); - final ImmutableMap lock1 = ImmutableMap.of("lock", 1); - final ImmutableMap lock2 = ImmutableMap.of("lock", 2); + final ImmutableMap lock1 = ImmutableMap.of("lock", 1); + final ImmutableMap lock2 = ImmutableMap.of("lock", 2); Assert.assertTrue(handler.addLock(entryId, lock1)); @@ -364,23 +383,23 @@ public class SQLMetadataStorageActionHandlerTest public void testGetLockId() throws EntryExistsException { final String entryId = "ABC123"; - Map entry = ImmutableMap.of("a", 1); - Map status = ImmutableMap.of("count", 42); + Map entry = ImmutableMap.of("a", 1); + Map status = ImmutableMap.of("count", 42); - handler.insert(entryId, DateTimes.of("2014-01-01"), "test", entry, true, status); + handler.insert(entryId, DateTimes.of("2014-01-01"), "test", entry, true, status, "type", "group"); Assert.assertEquals( - ImmutableMap.>of(), + ImmutableMap.>of(), handler.getLocks("non_exist_entry") ); Assert.assertEquals( - ImmutableMap.>of(), + ImmutableMap.>of(), handler.getLocks(entryId) ); - final ImmutableMap lock1 = ImmutableMap.of("lock", 1); - final ImmutableMap lock2 = ImmutableMap.of("lock", 2); + final ImmutableMap lock1 = ImmutableMap.of("lock", 1); + final ImmutableMap lock2 = ImmutableMap.of("lock", 2); Assert.assertTrue(handler.addLock(entryId, lock1)); @@ -392,21 +411,21 @@ public class SQLMetadataStorageActionHandlerTest public void testRemoveTasksOlderThan() throws Exception { final String entryId1 = "1234"; - Map entry1 = ImmutableMap.of("numericId", 1234); - Map status1 = ImmutableMap.of("count", 42, "temp", 1); - handler.insert(entryId1, DateTimes.of("2014-01-01T00:00:00.123"), "testDataSource", entry1, false, status1); + Map entry1 = ImmutableMap.of("numericId", 1234); + Map status1 = ImmutableMap.of("count", 42, "temp", 1); + handler.insert(entryId1, DateTimes.of("2014-01-01T00:00:00.123"), "testDataSource", entry1, false, status1, "type", "group"); Assert.assertTrue(handler.addLog(entryId1, ImmutableMap.of("logentry", "created"))); final String entryId2 = "ABC123"; - Map entry2 = ImmutableMap.of("a", 1); - Map status2 = ImmutableMap.of("count", 42); - handler.insert(entryId2, DateTimes.of("2014-01-01T00:00:00.123"), "test", entry2, true, status2); + Map entry2 = ImmutableMap.of("a", 1); + Map status2 = ImmutableMap.of("count", 42); + handler.insert(entryId2, DateTimes.of("2014-01-01T00:00:00.123"), "test", entry2, true, status2, "type", "group"); Assert.assertTrue(handler.addLog(entryId2, ImmutableMap.of("logentry", "created"))); final String entryId3 = "DEF5678"; - Map entry3 = ImmutableMap.of("numericId", 5678); - Map status3 = ImmutableMap.of("count", 21, "temp", 2); - handler.insert(entryId3, DateTimes.of("2014-01-02T12:00:00.123"), "testDataSource", entry3, false, status3); + Map entry3 = ImmutableMap.of("numericId", 5678); + Map status3 = ImmutableMap.of("count", 21, "temp", 2); + handler.insert(entryId3, DateTimes.of("2014-01-02T12:00:00.123"), "testDataSource", entry3, false, status3, "type", "group"); Assert.assertTrue(handler.addLog(entryId3, ImmutableMap.of("logentry", "created"))); Assert.assertEquals(Optional.of(entry1), handler.getEntry(entryId1)); @@ -450,4 +469,196 @@ public class SQLMetadataStorageActionHandlerTest Assert.assertEquals(1, handler.getLogs(entryId2).size()); Assert.assertEquals(1, handler.getLogs(entryId3).size()); } + + @Test + public void testMigration() + { + int active = 1234; + for (int i = 0; i < active; i++) { + insertTaskInfo(createRandomTaskInfo(true), false); + } + + int completed = 2345; + for (int i = 0; i < completed; i++) { + insertTaskInfo(createRandomTaskInfo(false), false); + } + + Assert.assertEquals(active + completed, getUnmigratedTaskCount().intValue()); + + handler.populateTaskTypeAndGroupId(); + + Assert.assertEquals(0, getUnmigratedTaskCount().intValue()); + } + + @Test + public void testGetTaskStatusPlusListInternal() + { + // SETUP + TaskInfo, Map> activeUnaltered = createRandomTaskInfo(true); + insertTaskInfo(activeUnaltered, false); + + TaskInfo, Map> completedUnaltered = createRandomTaskInfo(false); + insertTaskInfo(completedUnaltered, false); + + TaskInfo, Map> activeAltered = createRandomTaskInfo(true); + insertTaskInfo(activeAltered, true); + + TaskInfo, Map> completedAltered = createRandomTaskInfo(false); + insertTaskInfo(completedAltered, true); + + Map taskLookups = new HashMap<>(); + taskLookups.put(TaskLookup.TaskLookupType.ACTIVE, ActiveTaskLookup.getInstance()); + taskLookups.put(TaskLookup.TaskLookupType.COMPLETE, CompleteTaskLookup.of(null, Duration.millis(86400000))); + + List>> taskMetadataInfos; + + // BEFORE MIGRATION + + // Payload based fetch. task type and groupid will be populated + taskMetadataInfos = handler.getTaskStatusList(taskLookups, null, true); + Assert.assertEquals(4, taskMetadataInfos.size()); + verifyTaskInfoToMetadataInfo(completedUnaltered, taskMetadataInfos, false); + verifyTaskInfoToMetadataInfo(completedAltered, taskMetadataInfos, false); + verifyTaskInfoToMetadataInfo(activeUnaltered, taskMetadataInfos, false); + verifyTaskInfoToMetadataInfo(activeAltered, taskMetadataInfos, false); + + // New columns based fetch before migration is complete. type and payload are null when altered = false + taskMetadataInfos = handler.getTaskStatusList(taskLookups, null, false); + Assert.assertEquals(4, taskMetadataInfos.size()); + verifyTaskInfoToMetadataInfo(completedUnaltered, taskMetadataInfos, true); + verifyTaskInfoToMetadataInfo(completedAltered, taskMetadataInfos, false); + verifyTaskInfoToMetadataInfo(activeUnaltered, taskMetadataInfos, true); + verifyTaskInfoToMetadataInfo(activeAltered, taskMetadataInfos, false); + + // MIGRATION + handler.populateTaskTypeAndGroupId(); + + // Payload based fetch. task type and groupid will still be populated in tasks tab + taskMetadataInfos = handler.getTaskStatusList(taskLookups, null, true); + Assert.assertEquals(4, taskMetadataInfos.size()); + verifyTaskInfoToMetadataInfo(completedUnaltered, taskMetadataInfos, false); + verifyTaskInfoToMetadataInfo(completedAltered, taskMetadataInfos, false); + verifyTaskInfoToMetadataInfo(activeUnaltered, taskMetadataInfos, false); + verifyTaskInfoToMetadataInfo(activeAltered, taskMetadataInfos, false); + + // New columns based fetch after migration is complete. All data must be populated in the tasks table + taskMetadataInfos = handler.getTaskStatusList(taskLookups, null, false); + Assert.assertEquals(4, taskMetadataInfos.size()); + verifyTaskInfoToMetadataInfo(completedUnaltered, taskMetadataInfos, false); + verifyTaskInfoToMetadataInfo(completedAltered, taskMetadataInfos, false); + verifyTaskInfoToMetadataInfo(activeUnaltered, taskMetadataInfos, false); + verifyTaskInfoToMetadataInfo(activeAltered, taskMetadataInfos, false); + } + + private Integer getUnmigratedTaskCount() + { + return handler.getConnector().retryWithHandle( + new HandleCallback() + { + @Override + public Integer withHandle(Handle handle) throws SQLException + { + String sql = String.format(Locale.ENGLISH, + "SELECT COUNT(*) FROM %s WHERE type is NULL or group_id is NULL", + entryTable); + ResultSet resultSet = handle.getConnection().createStatement().executeQuery(sql); + resultSet.next(); + return resultSet.getInt(1); + } + } + ); + } + + private TaskInfo, Map> createRandomTaskInfo(boolean active) + { + String id = UUID.randomUUID().toString(); + DateTime createdTime = DateTime.now(DateTimeZone.UTC); + String datasource = UUID.randomUUID().toString(); + String type = UUID.randomUUID().toString(); + String groupId = UUID.randomUUID().toString(); + + Map payload = new HashMap<>(); + payload.put("id", id); + payload.put("type", type); + payload.put("groupId", groupId); + + Map status = new HashMap<>(); + status.put("id", id); + status.put("status", active ? TaskState.RUNNING : TaskState.SUCCESS); + status.put("duration", RANDOM.nextLong()); + status.put("location", TaskLocation.create(UUID.randomUUID().toString(), 8080, 995)); + status.put("errorMsg", UUID.randomUUID().toString()); + + return new TaskInfo<>( + id, + createdTime, + status, + datasource, + payload + ); + } + + private void insertTaskInfo(TaskInfo, Map> taskInfo, + boolean altered) + { + try { + handler.insert( + taskInfo.getId(), + taskInfo.getCreatedTime(), + taskInfo.getDataSource(), + taskInfo.getTask(), + TaskState.RUNNING.equals(taskInfo.getStatus().get("status")), + taskInfo.getStatus(), + altered ? taskInfo.getTask().get("type").toString() : null, + altered ? taskInfo.getTask().get("groupId").toString() : null + ); + } + catch (Exception e) { + throw new RuntimeException(e); + } + } + + private void verifyTaskInfoToMetadataInfo(TaskInfo, Map> taskInfo, + List>> taskMetadataInfos, + boolean nullNewColumns) + { + for (TaskInfo> taskMetadataInfo : taskMetadataInfos) { + if (taskMetadataInfo.getId().equals(taskInfo.getId())) { + verifyTaskInfoToMetadataInfo(taskInfo, taskMetadataInfo, nullNewColumns); + } + return; + } + Assert.fail(); + } + + private void verifyTaskInfoToMetadataInfo(TaskInfo, Map> taskInfo, + TaskInfo> taskMetadataInfo, + boolean nullNewColumns) + { + Assert.assertEquals(taskInfo.getId(), taskMetadataInfo.getId()); + Assert.assertEquals(taskInfo.getCreatedTime(), taskMetadataInfo.getCreatedTime()); + Assert.assertEquals(taskInfo.getDataSource(), taskMetadataInfo.getDataSource()); + + verifyTaskStatus(taskInfo.getStatus(), taskMetadataInfo.getStatus()); + + Map task = taskInfo.getTask(); + TaskIdentifier taskIdentifier = taskMetadataInfo.getTask(); + Assert.assertEquals(task.get("id"), taskIdentifier.getId()); + if (nullNewColumns) { + Assert.assertNull(taskIdentifier.getGroupId()); + Assert.assertNull(taskIdentifier.getType()); + } else { + Assert.assertEquals(task.get("groupId"), taskIdentifier.getGroupId()); + Assert.assertEquals(task.get("type"), taskIdentifier.getType()); + } + } + + private void verifyTaskStatus(Map expected, Map actual) + { + Assert.assertEquals(expected.get("id"), actual.get("id")); + Assert.assertEquals(expected.get("duration"), actual.get("duration")); + Assert.assertEquals(expected.get("errorMsg"), actual.get("errorMsg")); + Assert.assertEquals(expected.get("status").toString(), actual.get("status")); + Assert.assertEquals(expected.get("location"), JSON_MAPPER.convertValue(actual.get("location"), TaskLocation.class)); + } }