Optimize overlord GET /tasks memory usage (#12404)

The web-console (indirectly) calls the Overlord’s GET tasks API to fetch the tasks' summary which in turn queries the metadata tasks table. This query tries to fetch several columns, including payload, of all the rows at once. This introduces a significant memory overhead and can cause unresponsiveness or overlord failure when the ingestion tab is opened multiple times (due to several parallel calls to this API)

Another thing to note is that the task table (the payload column in particular) can be very large. Extracting large payloads from such tables can be very slow, leading to slow UI. While we are fixing the memory pressure in the overlord, we can also fix the slowness in UI caused by fetching large payloads from the table. Fetching large payloads also puts pressure on the metadata store as reported in the community (Metadata store query performance degrades as the tasks in druid_tasks table grows · Issue #12318 · apache/druid )

The task summaries returned as a response for the API are several times smaller and can fit comfortably in memory. So, there is an opportunity here to fix the memory usage, slow ingestion, and under-pressure metadata store by removing the need to handle large payloads in every layer we can. Of course, the solution becomes complex as we try to fix more layers. With that in mind, this page captures two approaches. They vary in complexity and also in the degree to which they fix the aforementioned problems.
This commit is contained in:
AmatyaAvadhanula 2022-06-16 22:30:37 +05:30 committed by GitHub
parent 602d95d865
commit f970757efc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
22 changed files with 1227 additions and 541 deletions

View File

@ -0,0 +1,110 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.indexer;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import javax.annotation.Nullable;
import java.util.Objects;
/**
* Model class containing the id, type and groupId of a task
* These fields are extracted from the task payload for the new schema and this model can be used for migration as well.
*/
public class TaskIdentifier
{
private final String id;
@Nullable
private final String type;
@Nullable
private final String groupId;
@JsonCreator
public TaskIdentifier(
@JsonProperty("id") String id,
@JsonProperty("groupId") @Nullable String groupId,
@JsonProperty("type") @Nullable String type // nullable for backward compatibility
)
{
this.id = Preconditions.checkNotNull(id, "id");
this.groupId = groupId;
this.type = type;
}
@JsonProperty
public String getId()
{
return id;
}
@Nullable
@JsonProperty
public String getGroupId()
{
return groupId;
}
@Nullable
@JsonProperty
public String getType()
{
return type;
}
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
TaskIdentifier that = (TaskIdentifier) o;
return Objects.equals(getId(), that.getId()) &&
Objects.equals(getGroupId(), that.getGroupId()) &&
Objects.equals(getType(), that.getType());
}
@Override
public int hashCode()
{
return Objects.hash(
getId(),
getGroupId(),
getType()
);
}
@Override
public String toString()
{
return "TaskIdentifier{" +
"id='" + id + '\'' +
", groupId='" + groupId + '\'' +
", type='" + type + '\'' +
'}';
}
}

View File

@ -22,6 +22,7 @@ package org.apache.druid.indexer;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.RE;
import org.joda.time.DateTime;
@ -31,6 +32,7 @@ import java.util.Objects;
public class TaskStatusPlus
{
private final String id;
@Nullable
private final String type;
private final DateTime createdTime;
private final DateTime queueInsertionTime;
@ -252,4 +254,29 @@ public class TaskStatusPlus
", errorMsg='" + errorMsg + '\'' +
'}';
}
/**
* Convert a TaskInfo class of TaskIdentifier and TaskStatus to a TaskStatusPlus
* Applicable only for completed or waiting tasks since a TaskInfo doesn't have the exhaustive info for running tasks
*
* @param taskIdentifierInfo TaskInfo pair
* @return corresponding TaskStatusPlus
*/
public static TaskStatusPlus fromTaskIdentifierInfo(TaskInfo<TaskIdentifier, TaskStatus> taskIdentifierInfo)
{
TaskStatus status = taskIdentifierInfo.getStatus();
return new TaskStatusPlus(
taskIdentifierInfo.getId(),
taskIdentifierInfo.getTask().getGroupId(),
taskIdentifierInfo.getTask().getType(),
taskIdentifierInfo.getCreatedTime(),
DateTimes.EPOCH,
status.getStatusCode(),
status.getStatusCode().isComplete() ? RunnerTaskState.NONE : RunnerTaskState.WAITING,
status.getDuration(),
status.getLocation(),
taskIdentifierInfo.getDataSource(),
status.getErrorMsg()
);
}
}

View File

@ -20,6 +20,7 @@
package org.apache.druid.metadata;
import com.google.common.base.Optional;
import org.apache.druid.indexer.TaskIdentifier;
import org.apache.druid.indexer.TaskInfo;
import org.apache.druid.metadata.TaskLookup.TaskLookupType;
import org.joda.time.DateTime;
@ -41,6 +42,8 @@ public interface MetadataStorageActionHandler<EntryType, StatusType, LogType, Lo
* @param entry object representing this entry
* @param active active or inactive flag
* @param status status object associated wit this object, can be null
* @param type entry type
* @param groupId entry group id
* @throws EntryExistsException
*/
void insert(
@ -49,10 +52,11 @@ public interface MetadataStorageActionHandler<EntryType, StatusType, LogType, Lo
@NotNull String dataSource,
@NotNull EntryType entry,
boolean active,
@Nullable StatusType status
@Nullable StatusType status,
@NotNull String type,
@NotNull String groupId
) throws EntryExistsException;
/**
* Sets or updates the status for any active entry with the given id.
* Once an entry has been set inactive, its status cannot be updated anymore.
@ -99,6 +103,22 @@ public interface MetadataStorageActionHandler<EntryType, StatusType, LogType, Lo
@Nullable String datasource
);
/**
* Returns the statuses of the specified tasks.
*
* If {@code taskLookups} includes {@link TaskLookupType#ACTIVE}, it returns all active tasks in the metadata store.
* If {@code taskLookups} includes {@link TaskLookupType#COMPLETE}, it returns all complete tasks in the metadata
* store. For complete tasks, additional filters in {@code CompleteTaskLookup} can be applied.
* All lookups should be processed atomically if more than one lookup is given.
*
* @param taskLookups task lookup type and filters.
* @param datasource datasource filter
*/
List<TaskInfo<TaskIdentifier, StatusType>> getTaskStatusList(
Map<TaskLookupType, TaskLookup> taskLookups,
@Nullable String datasource
);
default List<TaskInfo<EntryType, StatusType>> getTaskInfos(
TaskLookup taskLookup,
@Nullable String datasource
@ -173,4 +193,10 @@ public interface MetadataStorageActionHandler<EntryType, StatusType, LogType, Lo
*/
@Nullable
Long getLockId(String entryId, LockType lock);
/**
* Utility to migrate existing tasks to the new schema by populating type and groupId asynchronously
*/
void populateTaskTypeAndGroupIdAsync();
}

View File

@ -40,6 +40,7 @@ import org.skife.jdbi.v2.util.StringMapper;
import java.sql.SQLException;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Locale;
import java.util.Set;
import java.util.regex.Pattern;
@ -212,6 +213,12 @@ public class SQLServerConnector extends SQLMetadataConnector
.isEmpty();
}
@Override
public String limitClause(int limit)
{
return String.format(Locale.ENGLISH, "FETCH NEXT %d ROWS ONLY", limit);
}
/**
*
* {@inheritDoc}

View File

@ -64,4 +64,15 @@ public class SQLServerConnectorTest
Assert.assertFalse(connector.isTransientException(new Throwable("Throwable with reason only")));
}
@Test
public void testLimitClause()
{
SQLServerConnector connector = new SQLServerConnector(
Suppliers.ofInstance(new MetadataStorageConnectorConfig()),
Suppliers.ofInstance(
new MetadataStorageTablesConfig(null, null, null, null, null, null, null, null, null, null, null)
)
);
Assert.assertEquals("FETCH NEXT 100 ROWS ONLY", connector.limitClause(100));
}
}

View File

@ -37,6 +37,7 @@ import org.skife.jdbi.v2.util.StringMapper;
import javax.annotation.Nullable;
import java.io.File;
import java.sql.SQLException;
import java.util.Locale;
public class MySQLConnector extends SQLMetadataConnector
{
@ -177,6 +178,12 @@ public class MySQLConnector extends SQLMetadataConnector
return Integer.MIN_VALUE;
}
@Override
public String limitClause(int limit)
{
return String.format(Locale.ENGLISH, "LIMIT %d", limit);
}
@Override
public boolean tableExists(Handle handle, String tableName)
{

View File

@ -91,4 +91,16 @@ public class MySQLConnectorTest
connector.connectorIsTransientException(new SQLTransientConnectionException("transient"))
);
}
@Test
public void testLimitClause()
{
MySQLConnector connector = new MySQLConnector(
CONNECTOR_CONFIG_SUPPLIER,
TABLES_CONFIG_SUPPLIER,
new MySQLConnectorSslConfig(),
MYSQL_DRIVER_CONFIG
);
Assert.assertEquals("LIMIT 100", connector.limitClause(100));
}
}

View File

@ -40,6 +40,7 @@ import org.skife.jdbi.v2.util.StringMapper;
import java.sql.DatabaseMetaData;
import java.sql.SQLException;
import java.util.List;
import java.util.Locale;
public class PostgreSQLConnector extends SQLMetadataConnector
{
@ -144,6 +145,12 @@ public class PostgreSQLConnector extends SQLMetadataConnector
return DEFAULT_STREAMING_RESULT_SIZE;
}
@Override
public String limitClause(int limit)
{
return String.format(Locale.ENGLISH, "LIMIT %d", limit);
}
protected boolean canUpsert(Handle handle) throws SQLException
{
if (canUpsert == null) {

View File

@ -62,4 +62,18 @@ public class PostgreSQLConnectorTest
Assert.assertFalse(connector.isTransientException(new Exception("I'm not happy")));
Assert.assertFalse(connector.isTransientException(new Throwable("I give up")));
}
@Test
public void testLimitClause()
{
PostgreSQLConnector connector = new PostgreSQLConnector(
Suppliers.ofInstance(new MetadataStorageConnectorConfig()),
Suppliers.ofInstance(
new MetadataStorageTablesConfig(null, null, null, null, null, null, null, null, null, null, null)
),
new PostgreSQLConnectorConfig(),
new PostgreSQLTablesConfig()
);
Assert.assertEquals("LIMIT 100", connector.limitClause(100));
}
}

View File

@ -22,6 +22,8 @@ package org.apache.druid.indexing.common.task;
import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonSubTypes.Type;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import org.apache.druid.indexer.TaskIdentifier;
import org.apache.druid.indexer.TaskInfo;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.actions.TaskActionClient;
@ -241,4 +243,20 @@ public interface Task
final ContextValueType value = getContextValue(key);
return value == null ? defaultValue : value;
}
default TaskIdentifier getMetadata()
{
return new TaskIdentifier(this.getId(), this.getGroupId(), this.getType());
}
static TaskInfo<TaskIdentifier, TaskStatus> toTaskIdentifierInfo(TaskInfo<Task, TaskStatus> taskInfo)
{
return new TaskInfo<>(
taskInfo.getId(),
taskInfo.getCreatedTime(),
taskInfo.getStatus(),
taskInfo.getDataSource(),
taskInfo.getTask().getMetadata()
);
}
}

View File

@ -30,6 +30,7 @@ import com.google.errorprone.annotations.concurrent.GuardedBy;
import com.google.inject.Inject;
import org.apache.druid.indexer.TaskInfo;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexer.TaskStatusPlus;
import org.apache.druid.indexing.common.TaskLock;
import org.apache.druid.indexing.common.actions.TaskAction;
import org.apache.druid.indexing.common.config.TaskStorageConfig;
@ -233,6 +234,18 @@ public class HeapMemoryTaskStorage implements TaskStorage
return tasks;
}
@Override
public List<TaskStatusPlus> getTaskStatusPlusList(
Map<TaskLookupType, TaskLookup> taskLookups,
@Nullable String datasource
)
{
return getTaskInfos(taskLookups, datasource).stream()
.map(Task::toTaskIdentifierInfo)
.map(TaskStatusPlus::fromTaskIdentifierInfo)
.collect(Collectors.toList());
}
private List<TaskInfo<Task, TaskStatus>> getRecentlyCreatedAlreadyFinishedTaskInfoSince(
DateTime start,
@Nullable Integer n,

View File

@ -29,6 +29,7 @@ import com.google.common.collect.Maps;
import com.google.inject.Inject;
import org.apache.druid.indexer.TaskInfo;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexer.TaskStatusPlus;
import org.apache.druid.indexing.common.TaskLock;
import org.apache.druid.indexing.common.actions.TaskAction;
import org.apache.druid.indexing.common.config.TaskStorageConfig;
@ -58,6 +59,7 @@ import java.util.stream.Collectors;
public class MetadataTaskStorage implements TaskStorage
{
private static final MetadataStorageActionHandlerTypes<Task, TaskStatus, TaskAction, TaskLock> TASK_TYPES = new MetadataStorageActionHandlerTypes<Task, TaskStatus, TaskAction, TaskLock>()
{
@Override
@ -115,6 +117,8 @@ public class MetadataTaskStorage implements TaskStorage
public void start()
{
metadataStorageConnector.createTaskTables();
// begins migration of existing tasks to new schema
handler.populateTaskTypeAndGroupIdAsync();
}
@LifecycleStop
@ -144,7 +148,9 @@ public class MetadataTaskStorage implements TaskStorage
task.getDataSource(),
task,
status.isRunnable(),
status
status,
task.getType(),
task.getGroupId()
);
}
catch (Exception e) {
@ -226,21 +232,44 @@ public class MetadataTaskStorage implements TaskStorage
@Nullable String datasource
)
{
Map<TaskLookupType, TaskLookup> theTaskLookups = Maps.newHashMapWithExpectedSize(taskLookups.size());
Map<TaskLookupType, TaskLookup> theTaskLookups = processTaskLookups(taskLookups);
return Collections.unmodifiableList(handler.getTaskInfos(theTaskLookups, datasource));
}
@Override
public List<TaskStatusPlus> getTaskStatusPlusList(
Map<TaskLookupType, TaskLookup> taskLookups,
@Nullable String datasource
)
{
Map<TaskLookupType, TaskLookup> processedTaskLookups = processTaskLookups(taskLookups);
return Collections.unmodifiableList(
handler.getTaskStatusList(processedTaskLookups, datasource)
.stream()
.map(TaskStatusPlus::fromTaskIdentifierInfo)
.collect(Collectors.toList())
);
}
private Map<TaskLookupType, TaskLookup> processTaskLookups(
Map<TaskLookupType, TaskLookup> taskLookups
)
{
Map<TaskLookupType, TaskLookup> processedTaskLookups = Maps.newHashMapWithExpectedSize(taskLookups.size());
for (Entry<TaskLookupType, TaskLookup> entry : taskLookups.entrySet()) {
if (entry.getKey() == TaskLookupType.COMPLETE) {
CompleteTaskLookup completeTaskLookup = (CompleteTaskLookup) entry.getValue();
theTaskLookups.put(
processedTaskLookups.put(
entry.getKey(),
completeTaskLookup.hasTaskCreatedTimeFilter()
? completeTaskLookup
: completeTaskLookup.withDurationBeforeNow(config.getRecentlyFinishedThreshold())
);
} else {
theTaskLookups.put(entry.getKey(), entry.getValue());
processedTaskLookups.put(entry.getKey(), entry.getValue());
}
}
return Collections.unmodifiableList(handler.getTaskInfos(theTaskLookups, datasource));
return processedTaskLookups;
}
@Override

View File

@ -22,6 +22,7 @@ package org.apache.druid.indexing.overlord;
import com.google.common.base.Optional;
import org.apache.druid.indexer.TaskInfo;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexer.TaskStatusPlus;
import org.apache.druid.indexing.common.TaskLock;
import org.apache.druid.indexing.common.actions.TaskAction;
import org.apache.druid.indexing.common.task.Task;
@ -150,6 +151,21 @@ public interface TaskStorage
*/
List<Task> getActiveTasksByDatasource(String datasource);
/**
* Returns the status of tasks in metadata storage as TaskStatusPlus
* No particular order is guaranteed, but implementations are encouraged to return tasks in ascending order of creation.
*
* The returned list can contain active tasks and complete tasks depending on the {@code taskLookups} parameter.
* See {@link TaskLookup} for more details of active and complete tasks.
*
* @param taskLookups lookup types and filters
* @param datasource datasource filter
*/
List<TaskStatusPlus> getTaskStatusPlusList(
Map<TaskLookupType, TaskLookup> taskLookups,
@Nullable String datasource
);
/**
* Returns a list of tasks stored in the storage facility as {@link TaskInfo}. No
* particular order is guaranteed, but implementations are encouraged to return tasks in ascending order of creation.

View File

@ -23,16 +23,15 @@ import com.google.common.base.Optional;
import com.google.inject.Inject;
import org.apache.druid.indexer.TaskInfo;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexer.TaskStatusPlus;
import org.apache.druid.indexing.common.actions.SegmentInsertAction;
import org.apache.druid.indexing.common.actions.SegmentTransactionalInsertAction;
import org.apache.druid.indexing.common.actions.TaskAction;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.metadata.TaskLookup;
import org.apache.druid.metadata.TaskLookup.ActiveTaskLookup;
import org.apache.druid.metadata.TaskLookup.CompleteTaskLookup;
import org.apache.druid.metadata.TaskLookup.TaskLookupType;
import org.apache.druid.timeline.DataSegment;
import org.joda.time.Duration;
import org.joda.time.Interval;
import javax.annotation.Nullable;
@ -85,21 +84,12 @@ public class TaskStorageQueryAdapter
);
}
public List<TaskInfo<Task, TaskStatus>> getCompletedTaskInfoByCreatedTimeDuration(
@Nullable Integer maxTaskStatuses,
@Nullable Duration duration,
@Nullable String dataSource
)
{
return storage.getTaskInfos(CompleteTaskLookup.of(maxTaskStatuses, duration), dataSource);
}
public List<TaskInfo<Task, TaskStatus>> getTaskInfos(
public List<TaskStatusPlus> getTaskStatusPlusList(
Map<TaskLookupType, TaskLookup> taskLookups,
@Nullable String dataSource
)
{
return storage.getTaskInfos(taskLookups, dataSource);
return storage.getTaskStatusPlusList(taskLookups, dataSource);
}
public Optional<Task> getTask(final String taskid)

View File

@ -690,7 +690,7 @@ public class OverlordResource
taskMaster.getTaskRunner(),
taskRunner -> {
final List<TaskStatusPlus> authorizedList = securedTaskStatusPlus(
getTasks(
getTaskStatusPlusList(
taskRunner,
TaskStateLookup.fromString(state),
dataSource,
@ -706,7 +706,7 @@ public class OverlordResource
);
}
private List<TaskStatusPlus> getTasks(
private List<TaskStatusPlus> getTaskStatusPlusList(
TaskRunner taskRunner,
TaskStateLookup state,
@Nullable String dataSource,
@ -729,7 +729,7 @@ public class OverlordResource
// This way, we can use the snapshot from taskStorage as the source of truth for the set of tasks to process
// and use the snapshot from taskRunner as a reference for potential task state updates happened
// after the first snapshotting.
Stream<TaskInfo<Task, TaskStatus>> taskInfoStreamFromTaskStorage = getTaskInfoStreamFromTaskStorage(
Stream<TaskStatusPlus> taskStatusPlusStream = getTaskStatusPlusList(
state,
dataSource,
createdTimeDuration,
@ -745,87 +745,57 @@ public class OverlordResource
if (state == TaskStateLookup.PENDING || state == TaskStateLookup.RUNNING) {
// We are interested in only those tasks which are in taskRunner.
taskInfoStreamFromTaskStorage = taskInfoStreamFromTaskStorage
.filter(info -> runnerWorkItems.containsKey(info.getId()));
taskStatusPlusStream = taskStatusPlusStream
.filter(statusPlus -> runnerWorkItems.containsKey(statusPlus.getId()));
}
final List<TaskInfo<Task, TaskStatus>> taskInfoFromTaskStorage = taskInfoStreamFromTaskStorage
.collect(Collectors.toList());
final List<TaskStatusPlus> taskStatusPlusList = taskStatusPlusStream.collect(Collectors.toList());
// Separate complete and active tasks from taskStorage.
// Note that taskStorage can return only either complete tasks or active tasks per TaskLookupType.
final List<TaskInfo<Task, TaskStatus>> completeTaskInfoFromTaskStorage = new ArrayList<>();
final List<TaskInfo<Task, TaskStatus>> activeTaskInfoFromTaskStorage = new ArrayList<>();
for (TaskInfo<Task, TaskStatus> info : taskInfoFromTaskStorage) {
if (info.getStatus().isComplete()) {
completeTaskInfoFromTaskStorage.add(info);
final List<TaskStatusPlus> completeTaskStatusPlusList = new ArrayList<>();
final List<TaskStatusPlus> activeTaskStatusPlusList = new ArrayList<>();
for (TaskStatusPlus statusPlus : taskStatusPlusList) {
if (statusPlus.getStatusCode().isComplete()) {
completeTaskStatusPlusList.add(statusPlus);
} else {
activeTaskInfoFromTaskStorage.add(info);
activeTaskStatusPlusList.add(statusPlus);
}
}
final List<TaskStatusPlus> statuses = new ArrayList<>();
completeTaskInfoFromTaskStorage.forEach(taskInfo -> statuses.add(
new TaskStatusPlus(
taskInfo.getId(),
taskInfo.getTask() == null ? null : taskInfo.getTask().getGroupId(),
taskInfo.getTask() == null ? null : taskInfo.getTask().getType(),
taskInfo.getCreatedTime(),
DateTimes.EPOCH,
taskInfo.getStatus().getStatusCode(),
RunnerTaskState.NONE,
taskInfo.getStatus().getDuration(),
taskInfo.getStatus().getLocation(),
taskInfo.getDataSource(),
taskInfo.getStatus().getErrorMsg()
)
));
final List<TaskStatusPlus> taskStatuses = new ArrayList<>(completeTaskStatusPlusList);
activeTaskInfoFromTaskStorage.forEach(taskInfo -> {
final TaskRunnerWorkItem runnerWorkItem = runnerWorkItems.get(taskInfo.getId());
activeTaskStatusPlusList.forEach(statusPlus -> {
final TaskRunnerWorkItem runnerWorkItem = runnerWorkItems.get(statusPlus.getId());
if (runnerWorkItem == null) {
// a task is assumed to be a waiting task if it exists in taskStorage but not in taskRunner.
if (state == TaskStateLookup.WAITING || state == TaskStateLookup.ALL) {
statuses.add(
new TaskStatusPlus(
taskInfo.getId(),
taskInfo.getTask() == null ? null : taskInfo.getTask().getGroupId(),
taskInfo.getTask() == null ? null : taskInfo.getTask().getType(),
taskInfo.getCreatedTime(),
DateTimes.EPOCH,
taskInfo.getStatus().getStatusCode(),
RunnerTaskState.WAITING,
taskInfo.getStatus().getDuration(),
taskInfo.getStatus().getLocation(),
taskInfo.getDataSource(),
taskInfo.getStatus().getErrorMsg()
)
);
taskStatuses.add(statusPlus);
}
} else {
if (state == TaskStateLookup.PENDING || state == TaskStateLookup.RUNNING || state == TaskStateLookup.ALL) {
statuses.add(
taskStatuses.add(
new TaskStatusPlus(
taskInfo.getId(),
taskInfo.getTask() == null ? null : taskInfo.getTask().getGroupId(),
taskInfo.getTask() == null ? null : taskInfo.getTask().getType(),
statusPlus.getId(),
statusPlus.getGroupId(),
statusPlus.getType(),
runnerWorkItem.getCreatedTime(),
runnerWorkItem.getQueueInsertionTime(),
taskInfo.getStatus().getStatusCode(),
taskRunner.getRunnerTaskState(taskInfo.getId()), // this is racy for remoteTaskRunner
taskInfo.getStatus().getDuration(),
statusPlus.getStatusCode(),
taskRunner.getRunnerTaskState(statusPlus.getId()), // this is racy for remoteTaskRunner
statusPlus.getDuration(),
runnerWorkItem.getLocation(), // location in taskInfo is only updated after the task is done.
taskInfo.getDataSource(),
taskInfo.getStatus().getErrorMsg()
statusPlus.getDataSource(),
statusPlus.getErrorMsg()
)
);
}
}
});
return statuses;
return taskStatuses;
}
private Stream<TaskInfo<Task, TaskStatus>> getTaskInfoStreamFromTaskStorage(
private Stream<TaskStatusPlus> getTaskStatusPlusList(
TaskStateLookup state,
@Nullable String dataSource,
Duration createdTimeDuration,
@ -861,16 +831,16 @@ public class OverlordResource
throw new IAE("Unknown state: [%s]", state);
}
final Stream<TaskInfo<Task, TaskStatus>> taskInfoStreamFromTaskStorage = taskStorageQueryAdapter.getTaskInfos(
final Stream<TaskStatusPlus> taskStatusPlusStream = taskStorageQueryAdapter.getTaskStatusPlusList(
taskLookups,
dataSource
).stream();
if (type != null) {
return taskInfoStreamFromTaskStorage.filter(
info -> type.equals(info.getTask() == null ? null : info.getTask().getType())
return taskStatusPlusStream.filter(
statusPlus -> type.equals(statusPlus == null ? null : statusPlus.getType())
);
} else {
return taskInfoStreamFromTaskStorage;
return taskStatusPlusStream;
}
}

View File

@ -32,10 +32,6 @@ import org.apache.druid.indexer.TaskLocation;
import org.apache.druid.indexer.TaskState;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexer.TaskStatusPlus;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.actions.TaskActionClient;
import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.indexing.common.task.AbstractTask;
import org.apache.druid.indexing.common.task.NoopTask;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.overlord.ImmutableWorkerInfo;
@ -231,40 +227,16 @@ public class OverlordResourceTest
{
expectAuthorizationTokenCheck();
EasyMock.expect(
taskStorageQueryAdapter.getTaskInfos(
taskStorageQueryAdapter.getTaskStatusPlusList(
ImmutableMap.of(TaskLookupType.ACTIVE, ActiveTaskLookup.getInstance()),
null
)
).andStubReturn(
ImmutableList.of(
new TaskInfo<>(
"id_1",
DateTime.now(ISOChronology.getInstanceUTC()),
TaskStatus.running("id_1"),
"allow",
getTaskWithIdAndDatasource("id_1", "allow")
),
new TaskInfo<>(
"id_2",
DateTime.now(ISOChronology.getInstanceUTC()),
TaskStatus.running("id_2"),
"allow",
getTaskWithIdAndDatasource("id_2", "allow")
),
new TaskInfo<>(
"id_3",
DateTime.now(ISOChronology.getInstanceUTC()),
TaskStatus.running("id_3"),
"deny",
getTaskWithIdAndDatasource("id_3", "deny")
),
new TaskInfo<>(
"id_4",
DateTime.now(ISOChronology.getInstanceUTC()),
TaskStatus.running("id_4"),
"deny",
getTaskWithIdAndDatasource("id_4", "deny")
)
createTaskStatusPlus("id_1", TaskState.RUNNING, "allow"),
createTaskStatusPlus("id_2", TaskState.RUNNING, "allow"),
createTaskStatusPlus("id_3", TaskState.RUNNING, "deny"),
createTaskStatusPlus("id_4", TaskState.RUNNING, "deny")
)
);
@ -297,31 +269,13 @@ public class OverlordResourceTest
List<String> tasksIds = ImmutableList.of("id_1", "id_2", "id_3");
EasyMock.expect(
taskStorageQueryAdapter.getTaskInfos(
taskStorageQueryAdapter.getTaskStatusPlusList(
ImmutableMap.of(TaskLookupType.COMPLETE, CompleteTaskLookup.of(null, (Duration) null)), null)
).andStubReturn(
ImmutableList.of(
new TaskInfo<>(
"id_1",
DateTime.now(ISOChronology.getInstanceUTC()),
TaskStatus.success("id_1"),
"deny",
getTaskWithIdAndDatasource("id_1", "deny")
),
new TaskInfo<>(
"id_2",
DateTime.now(ISOChronology.getInstanceUTC()),
TaskStatus.success("id_2"),
"allow",
getTaskWithIdAndDatasource("id_2", "allow")
),
new TaskInfo<>(
"id_3",
DateTime.now(ISOChronology.getInstanceUTC()),
TaskStatus.success("id_3"),
"allow",
getTaskWithIdAndDatasource("id_3", "allow")
)
createTaskStatusPlus("id_1", TaskState.SUCCESS, "deny"),
createTaskStatusPlus("id_2", TaskState.SUCCESS, "allow"),
createTaskStatusPlus("id_3", TaskState.SUCCESS, "allow")
)
);
EasyMock.replay(
@ -352,26 +306,14 @@ public class OverlordResourceTest
)
);
EasyMock.expect(
taskStorageQueryAdapter.getTaskInfos(
taskStorageQueryAdapter.getTaskStatusPlusList(
ImmutableMap.of(TaskLookupType.ACTIVE, ActiveTaskLookup.getInstance()),
null
)
).andStubReturn(
ImmutableList.of(
new TaskInfo<>(
"id_1",
DateTime.now(ISOChronology.getInstanceUTC()),
TaskStatus.running("id_1"),
"deny",
getTaskWithIdAndDatasource("id_1", "deny")
),
new TaskInfo<>(
"id_2",
DateTime.now(ISOChronology.getInstanceUTC()),
TaskStatus.running("id_2"),
"allow",
getTaskWithIdAndDatasource("id_2", "allow")
)
createTaskStatusPlus("id_1", TaskState.RUNNING, "deny"),
createTaskStatusPlus("id_2", TaskState.RUNNING, "allow")
)
);
EasyMock.expect(taskRunner.getRunnerTaskState("id_1")).andStubReturn(RunnerTaskState.RUNNING);
@ -398,7 +340,7 @@ public class OverlordResourceTest
{
expectAuthorizationTokenCheck();
EasyMock.expect(
taskStorageQueryAdapter.getTaskInfos(
taskStorageQueryAdapter.getTaskStatusPlusList(
ImmutableMap.of(
TaskLookupType.ACTIVE,
ActiveTaskLookup.getInstance(),
@ -409,48 +351,12 @@ public class OverlordResourceTest
)
).andStubReturn(
ImmutableList.of(
new TaskInfo<>(
"id_5",
DateTime.now(ISOChronology.getInstanceUTC()),
TaskStatus.success("id_5"),
"deny",
getTaskWithIdAndDatasource("id_5", "deny")
),
new TaskInfo<>(
"id_6",
DateTime.now(ISOChronology.getInstanceUTC()),
TaskStatus.success("id_6"),
"allow",
getTaskWithIdAndDatasource("id_6", "allow")
),
new TaskInfo<>(
"id_7",
DateTime.now(ISOChronology.getInstanceUTC()),
TaskStatus.success("id_7"),
"allow",
getTaskWithIdAndDatasource("id_7", "allow")
),
new TaskInfo<>(
"id_5",
DateTime.now(ISOChronology.getInstanceUTC()),
TaskStatus.success("id_5"),
"deny",
getTaskWithIdAndDatasource("id_5", "deny")
),
new TaskInfo<>(
"id_6",
DateTime.now(ISOChronology.getInstanceUTC()),
TaskStatus.success("id_6"),
"allow",
getTaskWithIdAndDatasource("id_6", "allow")
),
new TaskInfo<>(
"id_7",
DateTime.now(ISOChronology.getInstanceUTC()),
TaskStatus.success("id_7"),
"allow",
getTaskWithIdAndDatasource("id_7", "allow")
)
createTaskStatusPlus("id_5", TaskState.SUCCESS, "deny"),
createTaskStatusPlus("id_6", TaskState.SUCCESS, "allow"),
createTaskStatusPlus("id_7", TaskState.SUCCESS, "allow"),
createTaskStatusPlus("id_5", TaskState.SUCCESS, "deny"),
createTaskStatusPlus("id_6", TaskState.SUCCESS, "allow"),
createTaskStatusPlus("id_7", TaskState.SUCCESS, "allow")
)
);
@ -481,7 +387,7 @@ public class OverlordResourceTest
expectAuthorizationTokenCheck();
//completed tasks
EasyMock.expect(
taskStorageQueryAdapter.getTaskInfos(
taskStorageQueryAdapter.getTaskStatusPlusList(
ImmutableMap.of(
TaskLookupType.COMPLETE,
CompleteTaskLookup.of(null, null),
@ -492,55 +398,13 @@ public class OverlordResourceTest
)
).andStubReturn(
ImmutableList.of(
new TaskInfo<>(
"id_5",
DateTime.now(ISOChronology.getInstanceUTC()),
TaskStatus.success("id_5"),
"allow",
getTaskWithIdAndDatasource("id_5", "allow")
),
new TaskInfo<>(
"id_6",
DateTime.now(ISOChronology.getInstanceUTC()),
TaskStatus.success("id_6"),
"allow",
getTaskWithIdAndDatasource("id_6", "allow")
),
new TaskInfo<>(
"id_7",
DateTime.now(ISOChronology.getInstanceUTC()),
TaskStatus.success("id_7"),
"allow",
getTaskWithIdAndDatasource("id_7", "allow")
),
new TaskInfo<>(
"id_1",
DateTime.now(ISOChronology.getInstanceUTC()),
TaskStatus.running("id_1"),
"allow",
getTaskWithIdAndDatasource("id_1", "allow")
),
new TaskInfo<>(
"id_2",
DateTime.now(ISOChronology.getInstanceUTC()),
TaskStatus.success("id_2"),
"allow",
getTaskWithIdAndDatasource("id_2", "allow")
),
new TaskInfo<>(
"id_3",
DateTime.now(ISOChronology.getInstanceUTC()),
TaskStatus.success("id_3"),
"allow",
getTaskWithIdAndDatasource("id_3", "allow")
),
new TaskInfo<>(
"id_4",
DateTime.now(ISOChronology.getInstanceUTC()),
TaskStatus.success("id_4"),
"allow",
getTaskWithIdAndDatasource("id_4", "allow")
)
createTaskStatusPlus("id_5", TaskState.SUCCESS, "allow"),
createTaskStatusPlus("id_6", TaskState.SUCCESS, "allow"),
createTaskStatusPlus("id_7", TaskState.SUCCESS, "allow"),
createTaskStatusPlus("id_1", TaskState.RUNNING, "allow"),
createTaskStatusPlus("id_2", TaskState.SUCCESS, "allow"),
createTaskStatusPlus("id_3", TaskState.SUCCESS, "allow"),
createTaskStatusPlus("id_4", TaskState.SUCCESS, "allow")
)
);
EasyMock.<Collection<? extends TaskRunnerWorkItem>>expect(taskRunner.getKnownTasks()).andReturn(
@ -572,7 +436,7 @@ public class OverlordResourceTest
expectAuthorizationTokenCheck();
//active tasks
EasyMock.expect(
taskStorageQueryAdapter.getTaskInfos(
taskStorageQueryAdapter.getTaskStatusPlusList(
ImmutableMap.of(
TaskLookupType.ACTIVE,
ActiveTaskLookup.getInstance()
@ -581,34 +445,10 @@ public class OverlordResourceTest
)
).andStubReturn(
ImmutableList.of(
new TaskInfo<>(
"id_1",
DateTime.now(ISOChronology.getInstanceUTC()),
TaskStatus.running("id_1"),
"allow",
getTaskWithIdAndDatasource("id_1", "allow")
),
new TaskInfo<>(
"id_2",
DateTime.now(ISOChronology.getInstanceUTC()),
TaskStatus.running("id_2"),
"allow",
getTaskWithIdAndDatasource("id_2", "allow")
),
new TaskInfo<>(
"id_3",
DateTime.now(ISOChronology.getInstanceUTC()),
TaskStatus.running("id_3"),
"deny",
getTaskWithIdAndDatasource("id_3", "deny")
),
new TaskInfo<>(
"id_4",
DateTime.now(ISOChronology.getInstanceUTC()),
TaskStatus.running("id_4"),
"deny",
getTaskWithIdAndDatasource("id_4", "deny")
)
createTaskStatusPlus("id_1", TaskState.RUNNING, "allow"),
createTaskStatusPlus("id_2", TaskState.RUNNING, "allow"),
createTaskStatusPlus("id_3", TaskState.RUNNING, "deny"),
createTaskStatusPlus("id_4", TaskState.RUNNING, "deny")
)
);
@ -645,7 +485,7 @@ public class OverlordResourceTest
{
expectAuthorizationTokenCheck();
EasyMock.expect(
taskStorageQueryAdapter.getTaskInfos(
taskStorageQueryAdapter.getTaskStatusPlusList(
ImmutableMap.of(
TaskLookupType.ACTIVE,
ActiveTaskLookup.getInstance()
@ -654,34 +494,10 @@ public class OverlordResourceTest
)
).andStubReturn(
ImmutableList.of(
new TaskInfo<>(
"id_1",
DateTime.now(ISOChronology.getInstanceUTC()),
TaskStatus.running("id_1"),
"allow",
getTaskWithIdAndDatasource("id_1", "allow")
),
new TaskInfo<>(
"id_2",
DateTime.now(ISOChronology.getInstanceUTC()),
TaskStatus.running("id_2"),
"allow",
getTaskWithIdAndDatasource("id_2", "allow")
),
new TaskInfo<>(
"id_3",
DateTime.now(ISOChronology.getInstanceUTC()),
TaskStatus.running("id_3"),
"allow",
getTaskWithIdAndDatasource("id_3", "allow")
),
new TaskInfo<>(
"id_4",
DateTime.now(ISOChronology.getInstanceUTC()),
TaskStatus.running("id_4"),
"deny",
getTaskWithIdAndDatasource("id_4", "deny")
)
createTaskStatusPlus("id_1", TaskState.RUNNING, "allow"),
createTaskStatusPlus("id_2", TaskState.RUNNING, "allow"),
createTaskStatusPlus("id_3", TaskState.RUNNING, "allow"),
createTaskStatusPlus("id_4", TaskState.RUNNING, "deny")
)
);
@ -726,40 +542,16 @@ public class OverlordResourceTest
)
);
EasyMock.expect(
taskStorageQueryAdapter.getTaskInfos(
taskStorageQueryAdapter.getTaskStatusPlusList(
ImmutableMap.of(TaskLookupType.ACTIVE, ActiveTaskLookup.getInstance()),
null
)
).andStubReturn(
ImmutableList.of(
new TaskInfo<>(
"id_1",
DateTime.now(ISOChronology.getInstanceUTC()),
TaskStatus.running("id_1"),
"deny",
getTaskWithIdAndDatasource("id_1", "deny")
),
new TaskInfo<>(
"id_2",
DateTime.now(ISOChronology.getInstanceUTC()),
TaskStatus.running("id_2"),
"allow",
getTaskWithIdAndDatasource("id_2", "allow")
),
new TaskInfo<>(
"id_3",
DateTime.now(ISOChronology.getInstanceUTC()),
TaskStatus.running("id_3"),
"allow",
getTaskWithIdAndDatasource("id_3", "allow")
),
new TaskInfo<>(
"id_4",
DateTime.now(ISOChronology.getInstanceUTC()),
TaskStatus.running("id_4"),
"deny",
getTaskWithIdAndDatasource("id_4", "deny")
)
createTaskStatusPlus("id_1", TaskState.RUNNING, "deny"),
createTaskStatusPlus("id_2", TaskState.RUNNING, "allow"),
createTaskStatusPlus("id_3", TaskState.RUNNING, "allow"),
createTaskStatusPlus("id_4", TaskState.RUNNING, "deny")
)
);
@ -791,33 +583,15 @@ public class OverlordResourceTest
{
expectAuthorizationTokenCheck();
EasyMock.expect(
taskStorageQueryAdapter.getTaskInfos(
taskStorageQueryAdapter.getTaskStatusPlusList(
ImmutableMap.of(TaskLookupType.COMPLETE, CompleteTaskLookup.of(null, (Duration) null)),
null
)
).andStubReturn(
ImmutableList.of(
new TaskInfo<>(
"id_1",
DateTime.now(ISOChronology.getInstanceUTC()),
TaskStatus.success("id_1"),
"allow",
getTaskWithIdAndDatasource("id_1", "allow")
),
new TaskInfo<>(
"id_2",
DateTime.now(ISOChronology.getInstanceUTC()),
TaskStatus.success("id_2"),
"deny",
getTaskWithIdAndDatasource("id_2", "deny")
),
new TaskInfo<>(
"id_3",
DateTime.now(ISOChronology.getInstanceUTC()),
TaskStatus.success("id_3"),
"allow",
getTaskWithIdAndDatasource("id_3", "allow")
)
createTaskStatusPlus("id_1", TaskState.SUCCESS, "allow"),
createTaskStatusPlus("id_2", TaskState.SUCCESS, "deny"),
createTaskStatusPlus("id_3", TaskState.SUCCESS, "allow")
)
);
EasyMock.replay(
@ -842,33 +616,15 @@ public class OverlordResourceTest
expectAuthorizationTokenCheck();
Duration duration = new Period("PT86400S").toStandardDuration();
EasyMock.expect(
taskStorageQueryAdapter.getTaskInfos(
taskStorageQueryAdapter.getTaskStatusPlusList(
EasyMock.anyObject(),
EasyMock.anyObject()
)
).andStubReturn(
ImmutableList.of(
new TaskInfo<>(
"id_1",
DateTime.now(ISOChronology.getInstanceUTC()),
TaskStatus.success("id_1"),
"deny",
getTaskWithIdAndDatasource("id_1", "deny")
),
new TaskInfo<>(
"id_2",
DateTime.now(ISOChronology.getInstanceUTC()),
TaskStatus.success("id_2"),
"allow",
getTaskWithIdAndDatasource("id_2", "allow")
),
new TaskInfo<>(
"id_3",
DateTime.now(ISOChronology.getInstanceUTC()),
TaskStatus.success("id_3"),
"allow",
getTaskWithIdAndDatasource("id_3", "allow")
)
createTaskStatusPlus("id_1", TaskState.SUCCESS, "deny"),
createTaskStatusPlus("id_2", TaskState.SUCCESS, "allow"),
createTaskStatusPlus("id_3", TaskState.SUCCESS, "allow")
)
);
@ -886,7 +642,7 @@ public class OverlordResourceTest
.getEntity();
Assert.assertEquals(2, responseObjects.size());
Assert.assertEquals("id_2", responseObjects.get(0).getId());
Assert.assertTrue("DataSource Check", "allow".equals(responseObjects.get(0).getDataSource()));
Assert.assertEquals("DataSource Check", "allow", responseObjects.get(0).getDataSource());
}
@Test
@ -898,7 +654,7 @@ public class OverlordResourceTest
// Setup mocks to return completed, active, known, pending and running tasks
EasyMock.expect(
taskStorageQueryAdapter.getTaskInfos(
taskStorageQueryAdapter.getTaskStatusPlusList(
ImmutableMap.of(
TaskLookupType.COMPLETE,
CompleteTaskLookup.of(null, null),
@ -909,10 +665,10 @@ public class OverlordResourceTest
)
).andStubReturn(
ImmutableList.of(
createTaskInfo("id_5", Datasources.WIKIPEDIA),
createTaskInfo("id_6", Datasources.BUZZFEED),
createTaskInfo("id_1", Datasources.WIKIPEDIA, TaskState.RUNNING, "test"),
createTaskInfo("id_4", Datasources.BUZZFEED, TaskState.RUNNING, "test")
createTaskStatusPlus("id_5", TaskState.SUCCESS, Datasources.WIKIPEDIA),
createTaskStatusPlus("id_6", TaskState.SUCCESS, Datasources.BUZZFEED),
createTaskStatusPlus("id_1", TaskState.RUNNING, Datasources.WIKIPEDIA),
createTaskStatusPlus("id_4", TaskState.RUNNING, Datasources.BUZZFEED)
)
);
@ -955,7 +711,7 @@ public class OverlordResourceTest
// Setup mocks to return completed, active, known, pending and running tasks
EasyMock.expect(
taskStorageQueryAdapter.getTaskInfos(
taskStorageQueryAdapter.getTaskStatusPlusList(
ImmutableMap.of(
TaskLookupType.COMPLETE,
CompleteTaskLookup.of(null, null),
@ -966,10 +722,10 @@ public class OverlordResourceTest
)
).andStubReturn(
ImmutableList.of(
createTaskInfo("id_5", Datasources.WIKIPEDIA),
createTaskInfo("id_6", Datasources.BUZZFEED),
createTaskInfo("id_1", Datasources.WIKIPEDIA, TaskState.RUNNING, "to-return"),
createTaskInfo("id_4", Datasources.WIKIPEDIA, TaskState.RUNNING, "test")
createTaskStatusPlus("id_5", TaskState.SUCCESS, Datasources.WIKIPEDIA),
createTaskStatusPlus("id_6", TaskState.SUCCESS, Datasources.BUZZFEED),
createTaskStatusPlus("id_1", TaskState.RUNNING, Datasources.WIKIPEDIA, "to-return"),
createTaskStatusPlus("id_4", TaskState.RUNNING, Datasources.BUZZFEED)
)
);
@ -1025,11 +781,11 @@ public class OverlordResourceTest
}
@Test
public void testGetNullCompleteTask()
public void testGetCompleteTasksOfAllDatasources()
{
expectAuthorizationTokenCheck();
EasyMock.expect(
taskStorageQueryAdapter.getTaskInfos(
taskStorageQueryAdapter.getTaskStatusPlusList(
ImmutableMap.of(
TaskLookupType.COMPLETE,
CompleteTaskLookup.of(null, null)
@ -1038,27 +794,9 @@ public class OverlordResourceTest
)
).andStubReturn(
ImmutableList.of(
new TaskInfo<>(
"id_1",
DateTime.now(ISOChronology.getInstanceUTC()),
TaskStatus.success("id_1"),
"allow",
null
),
new TaskInfo<>(
"id_2",
DateTime.now(ISOChronology.getInstanceUTC()),
TaskStatus.success("id_2"),
"deny",
getTaskWithIdAndDatasource("id_2", "deny")
),
new TaskInfo<>(
"id_3",
DateTime.now(ISOChronology.getInstanceUTC()),
TaskStatus.success("id_3"),
"allow",
getTaskWithIdAndDatasource("id_3", "allow")
)
createTaskStatusPlus("id_1", TaskState.SUCCESS, "allow"),
createTaskStatusPlus("id_2", TaskState.SUCCESS, "deny"),
createTaskStatusPlus("id_3", TaskState.SUCCESS, "allow")
)
);
EasyMock.replay(
@ -1074,9 +812,7 @@ public class OverlordResourceTest
.getEntity();
Assert.assertEquals(2, responseObjects.size());
Assert.assertEquals("id_1", responseObjects.get(0).getId());
TaskStatusPlus tsp = responseObjects.get(0);
Assert.assertEquals(null, tsp.getType());
Assert.assertTrue("DataSource Check", "allow".equals(responseObjects.get(0).getDataSource()));
Assert.assertEquals("DataSource Check", "allow", responseObjects.get(0).getDataSource());
}
@Test
@ -1379,19 +1115,19 @@ public class OverlordResourceTest
Optional.of(mockQueue)
).anyTimes();
EasyMock.expect(taskStorageQueryAdapter.getActiveTaskInfo("datasource")).andStubReturn(ImmutableList.of(
new TaskInfo(
new TaskInfo<>(
"id_1",
DateTime.now(ISOChronology.getInstanceUTC()),
TaskStatus.success("id_1"),
"datasource",
getTaskWithIdAndDatasource("id_1", "datasource")
NoopTask.create("id_1", 1)
),
new TaskInfo(
new TaskInfo<>(
"id_2",
DateTime.now(ISOChronology.getInstanceUTC()),
TaskStatus.success("id_2"),
"datasource",
getTaskWithIdAndDatasource("id_2", "datasource")
NoopTask.create("id_2", 1)
)
));
mockQueue.shutdown("id_1", "Shutdown request from user");
@ -1679,7 +1415,7 @@ public class OverlordResourceTest
Optional.of(workerTaskRunner)
).anyTimes();
EasyMock.expect(provisioningStrategy.getExpectedWorkerCapacity(workerInfos)).andReturn(invalidExpectedCapacity).anyTimes();
AutoScaler autoScaler = EasyMock.createMock(AutoScaler.class);
AutoScaler<?> autoScaler = EasyMock.createMock(AutoScaler.class);
EasyMock.expect(autoScaler.getMinNumWorkers()).andReturn(0);
EasyMock.expect(autoScaler.getMaxNumWorkers()).andReturn(maxNumWorkers);
DefaultWorkerBehaviorConfig workerBehaviorConfig = new DefaultWorkerBehaviorConfig(null, autoScaler);
@ -1724,61 +1460,25 @@ public class OverlordResourceTest
EasyMock.expectLastCall().anyTimes();
}
private Task getTaskWithIdAndDatasource(String id, String datasource)
private TaskStatusPlus createTaskStatusPlus(String taskId, TaskState taskState, String datasource)
{
return getTaskWithIdAndDatasource(id, datasource, "test");
return createTaskStatusPlus(taskId, taskState, datasource, "test");
}
private Task getTaskWithIdAndDatasource(String id, String datasource, String taskType)
private TaskStatusPlus createTaskStatusPlus(String taskId, TaskState taskState, String datasource, String taskType)
{
return new AbstractTask(id, datasource, null)
{
@Override
public String getType()
{
return taskType;
}
@Override
public boolean isReady(TaskActionClient taskActionClient)
{
return false;
}
@Override
public void stopGracefully(TaskConfig taskConfig)
{
}
@Override
public TaskStatus run(TaskToolbox toolbox)
{
return null;
}
};
}
private TaskInfo<Task, TaskStatus> createTaskInfo(
String taskId,
String datasource
)
{
return createTaskInfo(taskId, datasource, TaskState.SUCCESS, "test");
}
private TaskInfo<Task, TaskStatus> createTaskInfo(
String taskId,
String datasource,
TaskState state,
String taskType
)
{
return new TaskInfo<>(
return new TaskStatusPlus(
taskId,
null,
taskType,
DateTime.now(ISOChronology.getInstanceUTC()),
TaskStatus.fromCode(taskId, state),
DateTimes.EPOCH,
taskState,
taskState.isComplete() ? RunnerTaskState.NONE : RunnerTaskState.WAITING,
100L,
TaskLocation.unknown(),
datasource,
getTaskWithIdAndDatasource(taskId, datasource, taskType)
null
);
}

View File

@ -1,8 +1,8 @@
[
{
"task_id": "index_auth_test_2030-04-30T01:13:31.893Z",
"type": null,
"group_id": null,
"group_id": "",
"type": "",
"datasource": "auth_test",
"created_time": "2030-04-30T01:13:31.893Z",
"queue_insertion_time": "1970-01-01T00:00:00.000Z",

View File

@ -44,6 +44,8 @@ import org.skife.jdbi.v2.util.IntegerMapper;
import javax.annotation.Nullable;
import java.sql.Connection;
import java.sql.DatabaseMetaData;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.SQLRecoverableException;
import java.sql.SQLTransientException;
@ -129,6 +131,8 @@ public abstract class SQLMetadataConnector implements MetadataStorageConnector
public abstract boolean tableExists(Handle handle, String tableName);
public abstract String limitClause(int limit);
public <T> T retryWithHandle(
final HandleCallback<T> callback,
final Predicate<Throwable> myShouldRetry
@ -328,6 +332,29 @@ public abstract class SQLMetadataConnector implements MetadataStorageConnector
);
}
public boolean tableContainsColumn(Handle handle, String table, String column)
{
try {
DatabaseMetaData databaseMetaData = handle.getConnection().getMetaData();
ResultSet columns = databaseMetaData.getColumns(
null,
null,
table,
column
);
return columns.next();
}
catch (SQLException e) {
return false;
}
}
public void prepareTaskEntryTable(final String tableName)
{
createEntryTable(tableName);
alterEntryTable(tableName);
}
public void createEntryTable(final String tableName)
{
createTable(
@ -350,6 +377,35 @@ public abstract class SQLMetadataConnector implements MetadataStorageConnector
);
}
private void alterEntryTable(final String tableName)
{
try {
retryWithHandle(
new HandleCallback<Void>()
{
@Override
public Void withHandle(Handle handle)
{
final Batch batch = handle.createBatch();
if (!tableContainsColumn(handle, tableName, "type")) {
log.info("Adding column: type to table[%s]", tableName);
batch.add(StringUtils.format("ALTER TABLE %1$s ADD COLUMN type VARCHAR(255)", tableName));
}
if (!tableContainsColumn(handle, tableName, "group_id")) {
log.info("Adding column: group_id to table[%s]", tableName);
batch.add(StringUtils.format("ALTER TABLE %1$s ADD COLUMN group_id VARCHAR(255)", tableName));
}
batch.execute();
return null;
}
}
);
}
catch (Exception e) {
log.warn(e, "Exception altering table");
}
}
public void createLogTable(final String tableName, final String entryTypeName)
{
createTable(
@ -578,7 +634,7 @@ public abstract class SQLMetadataConnector implements MetadataStorageConnector
if (config.get().isCreateTables()) {
final MetadataStorageTablesConfig tablesConfig = tablesConfigSupplier.get();
final String entryType = tablesConfig.getTaskEntryType();
createEntryTable(tablesConfig.getEntryTable(entryType));
prepareTaskEntryTable(tablesConfig.getEntryTable(entryType));
createLogTable(tablesConfig.getLogTable(entryType), entryType);
createLockTable(tablesConfig.getLockTable(entryType), entryType);
}

View File

@ -21,10 +21,14 @@ package org.apache.druid.metadata;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import com.google.common.base.Throwables;
import com.google.common.collect.Maps;
import org.apache.druid.indexer.TaskIdentifier;
import org.apache.druid.indexer.TaskInfo;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.IAE;
@ -34,6 +38,7 @@ import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.metadata.TaskLookup.CompleteTaskLookup;
import org.apache.druid.metadata.TaskLookup.TaskLookupType;
import org.joda.time.DateTime;
import org.skife.jdbi.v2.Batch;
import org.skife.jdbi.v2.FoldController;
import org.skife.jdbi.v2.Folder3;
import org.skife.jdbi.v2.Handle;
@ -53,6 +58,9 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
public abstract class SQLMetadataStorageActionHandler<EntryType, StatusType, LogType, LockType>
implements MetadataStorageActionHandler<EntryType, StatusType, LogType, LockType>
@ -72,6 +80,11 @@ public abstract class SQLMetadataStorageActionHandler<EntryType, StatusType, Log
private final String lockTable;
private final TaskInfoMapper<EntryType, StatusType> taskInfoMapper;
private final TaskStatusMapper taskStatusMapper;
private final TaskStatusMapperFromPayload taskStatusMapperFromPayload;
private final TaskIdentifierMapper taskIdentifierMapper;
private Future<Boolean> taskMigrationCompleteFuture;
@SuppressWarnings("PMD.UnnecessaryFullyQualifiedName")
public SQLMetadataStorageActionHandler(
@ -98,6 +111,9 @@ public abstract class SQLMetadataStorageActionHandler<EntryType, StatusType, Log
this.logTable = logTable;
this.lockTable = lockTable;
this.taskInfoMapper = new TaskInfoMapper<>(jsonMapper, entryType, statusType);
this.taskStatusMapper = new TaskStatusMapper(jsonMapper);
this.taskStatusMapperFromPayload = new TaskStatusMapperFromPayload(jsonMapper);
this.taskIdentifierMapper = new TaskIdentifierMapper(jsonMapper);
}
protected SQLMetadataConnector getConnector()
@ -142,15 +158,17 @@ public abstract class SQLMetadataStorageActionHandler<EntryType, StatusType, Log
final String dataSource,
final EntryType entry,
final boolean active,
final StatusType status
final StatusType status,
final String type,
final String groupId
) throws EntryExistsException
{
try {
getConnector().retryWithHandle(
(HandleCallback<Void>) handle -> {
final String sql = StringUtils.format(
"INSERT INTO %s (id, created_date, datasource, payload, active, status_payload) "
+ "VALUES (:id, :created_date, :datasource, :payload, :active, :status_payload)",
"INSERT INTO %s (id, created_date, datasource, payload, type, group_id, active, status_payload) "
+ "VALUES (:id, :created_date, :datasource, :payload, :type, :group_id, :active, :status_payload)",
getEntryTable()
);
handle.createStatement(sql)
@ -158,6 +176,8 @@ public abstract class SQLMetadataStorageActionHandler<EntryType, StatusType, Log
.bind("created_date", timestamp.toString())
.bind("datasource", dataSource)
.bind("payload", jsonMapper.writeValueAsBytes(entry))
.bind("type", type)
.bind("group_id", groupId)
.bind("active", active)
.bind("status_payload", jsonMapper.writeValueAsBytes(status))
.execute();
@ -284,7 +304,7 @@ public abstract class SQLMetadataStorageActionHandler<EntryType, StatusType, Log
final Query<Map<String, Object>> query;
switch (entry.getKey()) {
case ACTIVE:
query = createActiveTaskInfoQuery(
query = createActiveTaskStreamingQuery(
handle,
dataSource
);
@ -292,7 +312,7 @@ public abstract class SQLMetadataStorageActionHandler<EntryType, StatusType, Log
break;
case COMPLETE:
CompleteTaskLookup completeTaskLookup = (CompleteTaskLookup) entry.getValue();
query = createCompletedTaskInfoQuery(
query = createCompletedTaskStreamingQuery(
handle,
completeTaskLookup.getTasksCreatedPriorTo(),
completeTaskLookup.getMaxTaskStatuses(),
@ -311,7 +331,123 @@ public abstract class SQLMetadataStorageActionHandler<EntryType, StatusType, Log
);
}
protected Query<Map<String, Object>> createCompletedTaskInfoQuery(
@Override
public List<TaskInfo<TaskIdentifier, StatusType>> getTaskStatusList(
Map<TaskLookupType, TaskLookup> taskLookups,
@Nullable String dataSource
)
{
boolean fetchPayload = true;
if (taskMigrationCompleteFuture != null && taskMigrationCompleteFuture.isDone()) {
try {
fetchPayload = !taskMigrationCompleteFuture.get();
}
catch (Exception e) {
log.info(e, "Exception getting task migration future");
}
}
return getTaskStatusList(taskLookups, dataSource, fetchPayload);
}
@VisibleForTesting
List<TaskInfo<TaskIdentifier, StatusType>> getTaskStatusList(
Map<TaskLookupType, TaskLookup> taskLookups,
@Nullable String dataSource,
boolean fetchPayload
)
{
ResultSetMapper<TaskInfo<TaskIdentifier, StatusType>> resultSetMapper =
fetchPayload ? taskStatusMapperFromPayload : taskStatusMapper;
return getConnector().retryTransaction(
(handle, status) -> {
final List<TaskInfo<TaskIdentifier, StatusType>> taskMetadataInfos = new ArrayList<>();
for (Entry<TaskLookupType, TaskLookup> entry : taskLookups.entrySet()) {
final Query<Map<String, Object>> query;
switch (entry.getKey()) {
case ACTIVE:
query = fetchPayload
? createActiveTaskStreamingQuery(handle, dataSource)
: createActiveTaskSummaryStreamingQuery(handle, dataSource);
taskMetadataInfos.addAll(query.map(resultSetMapper).list());
break;
case COMPLETE:
CompleteTaskLookup completeTaskLookup = (CompleteTaskLookup) entry.getValue();
DateTime priorTo = completeTaskLookup.getTasksCreatedPriorTo();
Integer limit = completeTaskLookup.getMaxTaskStatuses();
query = fetchPayload
? createCompletedTaskStreamingQuery(handle, priorTo, limit, dataSource)
: createCompletedTaskSummaryStreamingQuery(handle, priorTo, limit, dataSource);
taskMetadataInfos.addAll(query.map(resultSetMapper).list());
break;
default:
throw new IAE("Unknown TaskLookupType: [%s]", entry.getKey());
}
}
return taskMetadataInfos;
},
3,
SQLMetadataConnector.DEFAULT_MAX_TRIES
);
}
/**
* Fetches the columns needed to build TaskStatusPlus for completed tasks
* Please note that this requires completion of data migration to avoid empty values for task type and groupId
* Recommended for GET /tasks API
* Uses streaming SQL query to avoid fetching too many rows at once into memory
* @param handle db handle
* @param dataSource datasource to which the tasks belong. null if we don't want to filter
* @return Query object for TaskStatusPlus for completed tasks of interest
*/
private Query<Map<String, Object>> createCompletedTaskSummaryStreamingQuery(
Handle handle,
DateTime timestamp,
@Nullable Integer maxNumStatuses,
@Nullable String dataSource
)
{
String sql = StringUtils.format(
"SELECT "
+ " id, "
+ " created_date, "
+ " datasource, "
+ " group_id, "
+ " type, "
+ " status_payload "
+ "FROM "
+ " %s "
+ "WHERE "
+ getWhereClauseForInactiveStatusesSinceQuery(dataSource)
+ "ORDER BY created_date DESC",
getEntryTable()
);
if (maxNumStatuses != null) {
sql = decorateSqlWithLimit(sql);
}
Query<Map<String, Object>> query = handle.createQuery(sql)
.bind("start", timestamp.toString())
.setFetchSize(connector.getStreamingFetchSize());
if (maxNumStatuses != null) {
query = query.bind("n", maxNumStatuses);
}
if (dataSource != null) {
query = query.bind("ds", dataSource);
}
return query;
}
/**
* Fetches the columns needed to build a Task object with payload for completed tasks
* This requires the task payload which can be large. Please use only when necessary.
* For example for ingestion tasks view before migration of the new columns
* Uses streaming SQL query to avoid fetching too many rows at once into memory
* @param handle db handle
* @param dataSource datasource to which the tasks belong. null if we don't want to filter
* @return Query object for completed TaskInfos of interest
*/
private Query<Map<String, Object>> createCompletedTaskStreamingQuery(
Handle handle,
DateTime timestamp,
@Nullable Integer maxNumStatuses,
@ -336,7 +472,9 @@ public abstract class SQLMetadataStorageActionHandler<EntryType, StatusType, Log
if (maxNumStatuses != null) {
sql = decorateSqlWithLimit(sql);
}
Query<Map<String, Object>> query = handle.createQuery(sql).bind("start", timestamp.toString());
Query<Map<String, Object>> query = handle.createQuery(sql)
.bind("start", timestamp.toString())
.setFetchSize(connector.getStreamingFetchSize());
if (maxNumStatuses != null) {
query = query.bind("n", maxNumStatuses);
@ -358,7 +496,51 @@ public abstract class SQLMetadataStorageActionHandler<EntryType, StatusType, Log
return sql;
}
private Query<Map<String, Object>> createActiveTaskInfoQuery(Handle handle, @Nullable String dataSource)
/**
* Fetches the columns needed to build TaskStatusPlus for active tasks
* Please note that this requires completion of data migration to avoid empty values for task type and groupId
* Recommended for GET /tasks API
* Uses streaming SQL query to avoid fetching too many rows at once into memory
* @param handle db handle
* @param dataSource datasource to which the tasks belong. null if we don't want to filter
* @return Query object for TaskStatusPlus for active tasks of interest
*/
private Query<Map<String, Object>> createActiveTaskSummaryStreamingQuery(Handle handle, @Nullable String dataSource)
{
String sql = StringUtils.format(
"SELECT "
+ " id, "
+ " status_payload, "
+ " group_id, "
+ " type, "
+ " datasource, "
+ " created_date "
+ "FROM "
+ " %s "
+ "WHERE "
+ getWhereClauseForActiveStatusesQuery(dataSource)
+ "ORDER BY created_date",
entryTable
);
Query<Map<String, Object>> query = handle.createQuery(sql)
.setFetchSize(connector.getStreamingFetchSize());
if (dataSource != null) {
query = query.bind("ds", dataSource);
}
return query;
}
/**
* Fetches the columns needed to build Task objects with payload for active tasks
* This requires the task payload which can be large. Please use only when necessary.
* For example for ingestion tasks view before migration of the new columns
* Uses streaming SQL query to avoid fetching too many rows at once into memory
* @param handle db handle
* @param dataSource datasource to which the tasks belong. null if we don't want to filter
* @return Query object for active TaskInfos of interest
*/
private Query<Map<String, Object>> createActiveTaskStreamingQuery(Handle handle, @Nullable String dataSource)
{
String sql = StringUtils.format(
"SELECT "
@ -375,7 +557,8 @@ public abstract class SQLMetadataStorageActionHandler<EntryType, StatusType, Log
entryTable
);
Query<Map<String, Object>> query = handle.createQuery(sql);
Query<Map<String, Object>> query = handle.createQuery(sql)
.setFetchSize(connector.getStreamingFetchSize());
if (dataSource != null) {
query = query.bind("ds", dataSource);
}
@ -391,6 +574,109 @@ public abstract class SQLMetadataStorageActionHandler<EntryType, StatusType, Log
return sql;
}
private class TaskStatusMapperFromPayload implements ResultSetMapper<TaskInfo<TaskIdentifier, StatusType>>
{
private final ObjectMapper objectMapper;
TaskStatusMapperFromPayload(ObjectMapper objectMapper)
{
this.objectMapper = objectMapper;
}
@Override
public TaskInfo<TaskIdentifier, StatusType> map(int index, ResultSet resultSet, StatementContext context)
throws SQLException
{
return toTaskIdentifierInfo(objectMapper, resultSet, true);
}
}
private class TaskStatusMapper implements ResultSetMapper<TaskInfo<TaskIdentifier, StatusType>>
{
private final ObjectMapper objectMapper;
TaskStatusMapper(ObjectMapper objectMapper)
{
this.objectMapper = objectMapper;
}
@Override
public TaskInfo<TaskIdentifier, StatusType> map(int index, ResultSet resultSet, StatementContext context)
throws SQLException
{
return toTaskIdentifierInfo(objectMapper, resultSet, false);
}
}
private TaskInfo<TaskIdentifier, StatusType> toTaskIdentifierInfo(ObjectMapper objectMapper,
ResultSet resultSet,
boolean usePayload
) throws SQLException
{
String type;
String groupId;
if (usePayload) {
try {
ObjectNode payload = objectMapper.readValue(resultSet.getBytes("payload"), ObjectNode.class);
type = payload.get("type").asText();
groupId = payload.get("groupId").asText();
}
catch (IOException e) {
log.error(e, "Encountered exception while deserializing task payload");
throw new SQLException(e);
}
} else {
type = resultSet.getString("type");
groupId = resultSet.getString("group_id");
}
String id = resultSet.getString("id");
DateTime createdTime = DateTimes.of(resultSet.getString("created_date"));
StatusType status;
try {
status = objectMapper.readValue(resultSet.getBytes("status_payload"), statusType);
}
catch (IOException e) {
log.error(e, "Encountered exception while deserializing task status_payload");
throw new SQLException(e);
}
String datasource = resultSet.getString("datasource");
TaskIdentifier taskIdentifier = new TaskIdentifier(id, groupId, type);
return new TaskInfo<>(id, createdTime, status, datasource, taskIdentifier);
}
static class TaskIdentifierMapper implements ResultSetMapper<TaskIdentifier>
{
private final ObjectMapper objectMapper;
TaskIdentifierMapper(ObjectMapper objectMapper)
{
this.objectMapper = objectMapper;
}
@Override
public TaskIdentifier map(int index, ResultSet resultSet, StatementContext context)
throws SQLException
{
try {
ObjectNode payload = objectMapper.readValue(resultSet.getBytes("payload"), ObjectNode.class);
// If field is absent (older task version), use blank string to avoid a loop of migration of such tasks.
JsonNode type = payload.get("type");
JsonNode groupId = payload.get("groupId");
return new TaskIdentifier(
resultSet.getString("id"),
groupId == null ? "" : groupId.asText(),
type == null ? "" : type.asText()
);
}
catch (IOException e) {
log.error(e, "Encountered exception while deserializing task payload");
throw new SQLException(e);
}
}
}
static class TaskInfoMapper<EntryType, StatusType> implements ResultSetMapper<TaskInfo<EntryType, StatusType>>
{
private final ObjectMapper objectMapper;
@ -679,4 +965,105 @@ public abstract class SQLMetadataStorageActionHandler<EntryType, StatusType, Log
.findAny()
.orElse(null);
}
private List<TaskIdentifier> fetchTaskMetadatas(String tableName, String id, int limit)
{
List<TaskIdentifier> taskIdentifiers = new ArrayList<>();
connector.retryWithHandle(
new HandleCallback<Void>()
{
@Override
public Void withHandle(Handle handle)
{
String sql = StringUtils.format(
"SELECT * FROM %1$s WHERE id > '%2$s' AND type IS null ORDER BY id %3$s",
tableName,
id,
connector.limitClause(limit)
);
Query<Map<String, Object>> query = handle.createQuery(sql);
taskIdentifiers.addAll(query.map(taskIdentifierMapper).list());
return null;
}
}
);
return taskIdentifiers;
}
private void updateTaskMetadatas(String tasksTable, List<TaskIdentifier> taskIdentifiers)
{
connector.retryWithHandle(
new HandleCallback<Void>()
{
@Override
public Void withHandle(Handle handle)
{
Batch batch = handle.createBatch();
String sql = "UPDATE %1$s SET type = '%2$s', group_id = '%3$s' WHERE id = '%4$s'";
for (TaskIdentifier metadata : taskIdentifiers) {
batch.add(StringUtils.format(sql, tasksTable, metadata.getType(), metadata.getGroupId(), metadata.getId())
);
}
batch.execute();
return null;
}
}
);
}
@Override
public void populateTaskTypeAndGroupIdAsync()
{
ExecutorService executorService = Executors.newSingleThreadExecutor();
taskMigrationCompleteFuture = executorService.submit(
() -> populateTaskTypeAndGroupId()
);
}
/**
* Utility to migrate existing tasks to the new schema by populating type and groupId synchronously
*
* @return true if successful
*/
@VisibleForTesting
boolean populateTaskTypeAndGroupId()
{
log.info("Populate fields task and group_id of task entry table [%s] from payload", entryTable);
String id = "";
int limit = 100;
int count = 0;
while (true) {
List<TaskIdentifier> taskIdentifiers;
try {
taskIdentifiers = fetchTaskMetadatas(entryTable, id, limit);
}
catch (Exception e) {
log.warn(e, "Task migration failed while reading entries from task table");
return false;
}
if (taskIdentifiers.isEmpty()) {
break;
}
try {
updateTaskMetadatas(entryTable, taskIdentifiers);
count += taskIdentifiers.size();
log.info("Successfully updated type and groupId for [%d] tasks", count);
}
catch (Exception e) {
log.warn(e, "Task migration failed while updating entries in task table");
return false;
}
id = taskIdentifiers.get(taskIdentifiers.size() - 1).getId();
try {
Thread.sleep(1000);
}
catch (InterruptedException e) {
log.info("Interrupted, exiting!");
Thread.currentThread().interrupt();
}
}
log.info("Task migration for table [%s] successful", entryTable);
return true;
}
}

View File

@ -35,6 +35,11 @@ import org.skife.jdbi.v2.DBI;
import org.skife.jdbi.v2.Handle;
import org.skife.jdbi.v2.tweak.HandleCallback;
import java.sql.DatabaseMetaData;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Locale;
@ManageLifecycle
public class DerbyConnector extends SQLMetadataConnector
{
@ -83,6 +88,24 @@ public class DerbyConnector extends SQLMetadataConnector
.isEmpty();
}
@Override
public boolean tableContainsColumn(Handle handle, String table, String column)
{
try {
DatabaseMetaData databaseMetaData = handle.getConnection().getMetaData();
ResultSet columns = databaseMetaData.getColumns(
null,
null,
table.toUpperCase(Locale.ENGLISH),
column.toUpperCase(Locale.ENGLISH)
);
return columns.next();
}
catch (SQLException e) {
return false;
}
}
@Override
public String getSerialType()
{
@ -114,6 +137,12 @@ public class DerbyConnector extends SQLMetadataConnector
return "VALUES 1";
}
@Override
public String limitClause(int limit)
{
return String.format(Locale.ENGLISH, "FETCH NEXT %d ROWS ONLY", limit);
}
@Override
public void exportTable(
String tableName,

View File

@ -32,7 +32,11 @@ import org.skife.jdbi.v2.DBI;
import org.skife.jdbi.v2.Handle;
import org.skife.jdbi.v2.tweak.HandleCallback;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
@ -86,6 +90,14 @@ public class SQLMetadataConnectorTest
);
}
String taskTable = tablesConfig.getTasksTable();
for (String column : Arrays.asList("type", "group_id")) {
Assert.assertTrue(
StringUtils.format("Tasks table column %s was not created!", column),
connector.tableContainsColumn(handle, taskTable, column)
);
}
return null;
}
}
@ -170,6 +182,12 @@ public class SQLMetadataConnectorTest
return 0;
}
@Override
public String limitClause(int limit)
{
return "";
}
@Override
public String getQuoteString()
{
@ -242,4 +260,32 @@ public class SQLMetadataConnectorTest
Assert.assertEquals(dataSource.getMaxConnLifetimeMillis(), 1200000);
Assert.assertEquals((long) dataSource.getDefaultQueryTimeout(), 30000);
}
private boolean verifyTaskTypeAndGroupId(String table, String id, String type, String groupId)
{
try {
return connector.retryWithHandle(
new HandleCallback<Boolean>()
{
@Override
public Boolean withHandle(Handle handle) throws SQLException
{
Statement statement = handle.getConnection().createStatement();
ResultSet resultSet = statement.executeQuery(
StringUtils.format("SELECT * FROM %1$s WHERE id = '%2$s'", table, id)
);
resultSet.next();
boolean flag = type.equals(resultSet.getString("type"))
&& groupId.equals(resultSet.getString("group_id"));
statement.close();
return flag;
}
}
);
}
catch (Exception e) {
e.printStackTrace();
return false;
}
}
}

View File

@ -25,7 +25,10 @@ import com.google.common.base.Optional;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import org.apache.druid.indexer.TaskIdentifier;
import org.apache.druid.indexer.TaskInfo;
import org.apache.druid.indexer.TaskLocation;
import org.apache.druid.indexer.TaskState;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.Pair;
@ -33,15 +36,26 @@ import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.jackson.JacksonUtils;
import org.apache.druid.metadata.TaskLookup.ActiveTaskLookup;
import org.apache.druid.metadata.TaskLookup.CompleteTaskLookup;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import org.joda.time.Duration;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.skife.jdbi.v2.Handle;
import org.skife.jdbi.v2.tweak.HandleCallback;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Random;
import java.util.UUID;
import java.util.stream.Collectors;
public class SQLMetadataStorageActionHandlerTest
@ -53,7 +67,13 @@ public class SQLMetadataStorageActionHandlerTest
public final ExpectedException thrown = ExpectedException.none();
private static final ObjectMapper JSON_MAPPER = new DefaultObjectMapper();
private SQLMetadataStorageActionHandler<Map<String, Integer>, Map<String, Integer>, Map<String, String>, Map<String, Integer>> handler;
private static final Random RANDOM = new Random(1);
private SQLMetadataStorageActionHandler<Map<String, Object>, Map<String, Object>, Map<String, String>, Map<String, Object>> handler;
private final String entryTable = "entries";
@Before
public void setUp()
@ -61,31 +81,30 @@ public class SQLMetadataStorageActionHandlerTest
TestDerbyConnector connector = derbyConnectorRule.getConnector();
final String entryType = "entry";
final String entryTable = "entries";
final String logTable = "logs";
final String lockTable = "locks";
connector.createEntryTable(entryTable);
connector.prepareTaskEntryTable(entryTable);
connector.createLockTable(lockTable, entryType);
connector.createLogTable(logTable, entryType);
handler = new DerbyMetadataStorageActionHandler<>(
connector,
JSON_MAPPER,
new MetadataStorageActionHandlerTypes<Map<String, Integer>, Map<String, Integer>, Map<String, String>, Map<String, Integer>>()
new MetadataStorageActionHandlerTypes<Map<String, Object>, Map<String, Object>, Map<String, String>, Map<String, Object>>()
{
@Override
public TypeReference<Map<String, Integer>> getEntryType()
public TypeReference<Map<String, Object>> getEntryType()
{
return new TypeReference<Map<String, Integer>>()
return new TypeReference<Map<String, Object>>()
{
};
}
@Override
public TypeReference<Map<String, Integer>> getStatusType()
public TypeReference<Map<String, Object>> getStatusType()
{
return new TypeReference<Map<String, Integer>>()
return new TypeReference<Map<String, Object>>()
{
};
}
@ -97,9 +116,9 @@ public class SQLMetadataStorageActionHandlerTest
}
@Override
public TypeReference<Map<String, Integer>> getLockType()
public TypeReference<Map<String, Object>> getLockType()
{
return new TypeReference<Map<String, Integer>>()
return new TypeReference<Map<String, Object>>()
{
};
}
@ -114,13 +133,13 @@ public class SQLMetadataStorageActionHandlerTest
@Test
public void testEntryAndStatus() throws Exception
{
Map<String, Integer> entry = ImmutableMap.of("numericId", 1234);
Map<String, Integer> status1 = ImmutableMap.of("count", 42);
Map<String, Integer> status2 = ImmutableMap.of("count", 42, "temp", 1);
Map<String, Object> entry = ImmutableMap.of("numericId", 1234);
Map<String, Object> status1 = ImmutableMap.of("count", 42);
Map<String, Object> status2 = ImmutableMap.of("count", 42, "temp", 1);
final String entryId = "1234";
handler.insert(entryId, DateTimes.of("2014-01-02T00:00:00.123"), "testDataSource", entry, true, null);
handler.insert(entryId, DateTimes.of("2014-01-02T00:00:00.123"), "testDataSource", entry, true, null, "type", "group");
Assert.assertEquals(
Optional.of(entry),
@ -195,13 +214,13 @@ public class SQLMetadataStorageActionHandlerTest
{
for (int i = 1; i < 11; i++) {
final String entryId = "abcd_" + i;
final Map<String, Integer> entry = ImmutableMap.of("a", i);
final Map<String, Integer> status = ImmutableMap.of("count", i * 10);
final Map<String, Object> entry = ImmutableMap.of("a", i);
final Map<String, Object> status = ImmutableMap.of("count", i * 10);
handler.insert(entryId, DateTimes.of(StringUtils.format("2014-01-%02d", i)), "test", entry, false, status);
handler.insert(entryId, DateTimes.of(StringUtils.format("2014-01-%02d", i)), "test", entry, false, status, "type", "group");
}
final List<TaskInfo<Map<String, Integer>, Map<String, Integer>>> statuses = handler.getTaskInfos(
final List<TaskInfo<Map<String, Object>, Map<String, Object>>> statuses = handler.getTaskInfos(
CompleteTaskLookup.withTasksCreatedPriorTo(
7,
DateTimes.of("2014-01-01")
@ -210,7 +229,7 @@ public class SQLMetadataStorageActionHandlerTest
);
Assert.assertEquals(7, statuses.size());
int i = 10;
for (TaskInfo<Map<String, Integer>, Map<String, Integer>> status : statuses) {
for (TaskInfo<Map<String, Object>, Map<String, Object>> status : statuses) {
Assert.assertEquals(ImmutableMap.of("count", i-- * 10), status.getStatus());
}
}
@ -220,13 +239,13 @@ public class SQLMetadataStorageActionHandlerTest
{
for (int i = 1; i < 6; i++) {
final String entryId = "abcd_" + i;
final Map<String, Integer> entry = ImmutableMap.of("a", i);
final Map<String, Integer> status = ImmutableMap.of("count", i * 10);
final Map<String, Object> entry = ImmutableMap.of("a", i);
final Map<String, Object> status = ImmutableMap.of("count", i * 10);
handler.insert(entryId, DateTimes.of(StringUtils.format("2014-01-%02d", i)), "test", entry, false, status);
handler.insert(entryId, DateTimes.of(StringUtils.format("2014-01-%02d", i)), "test", entry, false, status, "type", "group");
}
final List<TaskInfo<Map<String, Integer>, Map<String, Integer>>> statuses = handler.getTaskInfos(
final List<TaskInfo<Map<String, Object>, Map<String, Object>>> statuses = handler.getTaskInfos(
CompleteTaskLookup.withTasksCreatedPriorTo(
10,
DateTimes.of("2014-01-01")
@ -235,7 +254,7 @@ public class SQLMetadataStorageActionHandlerTest
);
Assert.assertEquals(5, statuses.size());
int i = 5;
for (TaskInfo<Map<String, Integer>, Map<String, Integer>> status : statuses) {
for (TaskInfo<Map<String, Object>, Map<String, Object>> status : statuses) {
Assert.assertEquals(ImmutableMap.of("count", i-- * 10), status.getStatus());
}
}
@ -244,23 +263,23 @@ public class SQLMetadataStorageActionHandlerTest
public void testRepeatInsert() throws Exception
{
final String entryId = "abcd";
Map<String, Integer> entry = ImmutableMap.of("a", 1);
Map<String, Integer> status = ImmutableMap.of("count", 42);
Map<String, Object> entry = ImmutableMap.of("a", 1);
Map<String, Object> status = ImmutableMap.of("count", 42);
handler.insert(entryId, DateTimes.of("2014-01-01"), "test", entry, true, status);
handler.insert(entryId, DateTimes.of("2014-01-01"), "test", entry, true, status, "type", "group");
thrown.expect(EntryExistsException.class);
handler.insert(entryId, DateTimes.of("2014-01-01"), "test", entry, true, status);
handler.insert(entryId, DateTimes.of("2014-01-01"), "test", entry, true, status, "type", "group");
}
@Test
public void testLogs() throws Exception
{
final String entryId = "abcd";
Map<String, Integer> entry = ImmutableMap.of("a", 1);
Map<String, Integer> status = ImmutableMap.of("count", 42);
Map<String, Object> entry = ImmutableMap.of("a", 1);
Map<String, Object> status = ImmutableMap.of("count", 42);
handler.insert(entryId, DateTimes.of("2014-01-01"), "test", entry, true, status);
handler.insert(entryId, DateTimes.of("2014-01-01"), "test", entry, true, status, "type", "group");
Assert.assertEquals(
ImmutableList.of(),
@ -289,32 +308,32 @@ public class SQLMetadataStorageActionHandlerTest
public void testLocks() throws Exception
{
final String entryId = "ABC123";
Map<String, Integer> entry = ImmutableMap.of("a", 1);
Map<String, Integer> status = ImmutableMap.of("count", 42);
Map<String, Object> entry = ImmutableMap.of("a", 1);
Map<String, Object> status = ImmutableMap.of("count", 42);
handler.insert(entryId, DateTimes.of("2014-01-01"), "test", entry, true, status);
handler.insert(entryId, DateTimes.of("2014-01-01"), "test", entry, true, status, "type", "group");
Assert.assertEquals(
ImmutableMap.<Long, Map<String, Integer>>of(),
ImmutableMap.<Long, Map<String, Object>>of(),
handler.getLocks("non_exist_entry")
);
Assert.assertEquals(
ImmutableMap.<Long, Map<String, Integer>>of(),
ImmutableMap.<Long, Map<String, Object>>of(),
handler.getLocks(entryId)
);
final ImmutableMap<String, Integer> lock1 = ImmutableMap.of("lock", 1);
final ImmutableMap<String, Integer> lock2 = ImmutableMap.of("lock", 2);
final ImmutableMap<String, Object> lock1 = ImmutableMap.of("lock", 1);
final ImmutableMap<String, Object> lock2 = ImmutableMap.of("lock", 2);
Assert.assertTrue(handler.addLock(entryId, lock1));
Assert.assertTrue(handler.addLock(entryId, lock2));
final Map<Long, Map<String, Integer>> locks = handler.getLocks(entryId);
final Map<Long, Map<String, Object>> locks = handler.getLocks(entryId);
Assert.assertEquals(2, locks.size());
Assert.assertEquals(
ImmutableSet.<Map<String, Integer>>of(lock1, lock2),
ImmutableSet.<Map<String, Object>>of(lock1, lock2),
new HashSet<>(locks.values())
);
@ -322,7 +341,7 @@ public class SQLMetadataStorageActionHandlerTest
handler.removeLock(lockId);
locks.remove(lockId);
final Map<Long, Map<String, Integer>> updated = handler.getLocks(entryId);
final Map<Long, Map<String, Object>> updated = handler.getLocks(entryId);
Assert.assertEquals(
new HashSet<>(locks.values()),
new HashSet<>(updated.values())
@ -334,23 +353,23 @@ public class SQLMetadataStorageActionHandlerTest
public void testReplaceLock() throws EntryExistsException
{
final String entryId = "ABC123";
Map<String, Integer> entry = ImmutableMap.of("a", 1);
Map<String, Integer> status = ImmutableMap.of("count", 42);
Map<String, Object> entry = ImmutableMap.of("a", 1);
Map<String, Object> status = ImmutableMap.of("count", 42);
handler.insert(entryId, DateTimes.of("2014-01-01"), "test", entry, true, status);
handler.insert(entryId, DateTimes.of("2014-01-01"), "test", entry, true, status, "type", "group");
Assert.assertEquals(
ImmutableMap.<Long, Map<String, Integer>>of(),
ImmutableMap.<Long, Map<String, Object>>of(),
handler.getLocks("non_exist_entry")
);
Assert.assertEquals(
ImmutableMap.<Long, Map<String, Integer>>of(),
ImmutableMap.<Long, Map<String, Object>>of(),
handler.getLocks(entryId)
);
final ImmutableMap<String, Integer> lock1 = ImmutableMap.of("lock", 1);
final ImmutableMap<String, Integer> lock2 = ImmutableMap.of("lock", 2);
final ImmutableMap<String, Object> lock1 = ImmutableMap.of("lock", 1);
final ImmutableMap<String, Object> lock2 = ImmutableMap.of("lock", 2);
Assert.assertTrue(handler.addLock(entryId, lock1));
@ -364,23 +383,23 @@ public class SQLMetadataStorageActionHandlerTest
public void testGetLockId() throws EntryExistsException
{
final String entryId = "ABC123";
Map<String, Integer> entry = ImmutableMap.of("a", 1);
Map<String, Integer> status = ImmutableMap.of("count", 42);
Map<String, Object> entry = ImmutableMap.of("a", 1);
Map<String, Object> status = ImmutableMap.of("count", 42);
handler.insert(entryId, DateTimes.of("2014-01-01"), "test", entry, true, status);
handler.insert(entryId, DateTimes.of("2014-01-01"), "test", entry, true, status, "type", "group");
Assert.assertEquals(
ImmutableMap.<Long, Map<String, Integer>>of(),
ImmutableMap.<Long, Map<String, Object>>of(),
handler.getLocks("non_exist_entry")
);
Assert.assertEquals(
ImmutableMap.<Long, Map<String, Integer>>of(),
ImmutableMap.<Long, Map<String, Object>>of(),
handler.getLocks(entryId)
);
final ImmutableMap<String, Integer> lock1 = ImmutableMap.of("lock", 1);
final ImmutableMap<String, Integer> lock2 = ImmutableMap.of("lock", 2);
final ImmutableMap<String, Object> lock1 = ImmutableMap.of("lock", 1);
final ImmutableMap<String, Object> lock2 = ImmutableMap.of("lock", 2);
Assert.assertTrue(handler.addLock(entryId, lock1));
@ -392,21 +411,21 @@ public class SQLMetadataStorageActionHandlerTest
public void testRemoveTasksOlderThan() throws Exception
{
final String entryId1 = "1234";
Map<String, Integer> entry1 = ImmutableMap.of("numericId", 1234);
Map<String, Integer> status1 = ImmutableMap.of("count", 42, "temp", 1);
handler.insert(entryId1, DateTimes.of("2014-01-01T00:00:00.123"), "testDataSource", entry1, false, status1);
Map<String, Object> entry1 = ImmutableMap.of("numericId", 1234);
Map<String, Object> status1 = ImmutableMap.of("count", 42, "temp", 1);
handler.insert(entryId1, DateTimes.of("2014-01-01T00:00:00.123"), "testDataSource", entry1, false, status1, "type", "group");
Assert.assertTrue(handler.addLog(entryId1, ImmutableMap.of("logentry", "created")));
final String entryId2 = "ABC123";
Map<String, Integer> entry2 = ImmutableMap.of("a", 1);
Map<String, Integer> status2 = ImmutableMap.of("count", 42);
handler.insert(entryId2, DateTimes.of("2014-01-01T00:00:00.123"), "test", entry2, true, status2);
Map<String, Object> entry2 = ImmutableMap.of("a", 1);
Map<String, Object> status2 = ImmutableMap.of("count", 42);
handler.insert(entryId2, DateTimes.of("2014-01-01T00:00:00.123"), "test", entry2, true, status2, "type", "group");
Assert.assertTrue(handler.addLog(entryId2, ImmutableMap.of("logentry", "created")));
final String entryId3 = "DEF5678";
Map<String, Integer> entry3 = ImmutableMap.of("numericId", 5678);
Map<String, Integer> status3 = ImmutableMap.of("count", 21, "temp", 2);
handler.insert(entryId3, DateTimes.of("2014-01-02T12:00:00.123"), "testDataSource", entry3, false, status3);
Map<String, Object> entry3 = ImmutableMap.of("numericId", 5678);
Map<String, Object> status3 = ImmutableMap.of("count", 21, "temp", 2);
handler.insert(entryId3, DateTimes.of("2014-01-02T12:00:00.123"), "testDataSource", entry3, false, status3, "type", "group");
Assert.assertTrue(handler.addLog(entryId3, ImmutableMap.of("logentry", "created")));
Assert.assertEquals(Optional.of(entry1), handler.getEntry(entryId1));
@ -450,4 +469,196 @@ public class SQLMetadataStorageActionHandlerTest
Assert.assertEquals(1, handler.getLogs(entryId2).size());
Assert.assertEquals(1, handler.getLogs(entryId3).size());
}
@Test
public void testMigration()
{
int active = 1234;
for (int i = 0; i < active; i++) {
insertTaskInfo(createRandomTaskInfo(true), false);
}
int completed = 2345;
for (int i = 0; i < completed; i++) {
insertTaskInfo(createRandomTaskInfo(false), false);
}
Assert.assertEquals(active + completed, getUnmigratedTaskCount().intValue());
handler.populateTaskTypeAndGroupId();
Assert.assertEquals(0, getUnmigratedTaskCount().intValue());
}
@Test
public void testGetTaskStatusPlusListInternal()
{
// SETUP
TaskInfo<Map<String, Object>, Map<String, Object>> activeUnaltered = createRandomTaskInfo(true);
insertTaskInfo(activeUnaltered, false);
TaskInfo<Map<String, Object>, Map<String, Object>> completedUnaltered = createRandomTaskInfo(false);
insertTaskInfo(completedUnaltered, false);
TaskInfo<Map<String, Object>, Map<String, Object>> activeAltered = createRandomTaskInfo(true);
insertTaskInfo(activeAltered, true);
TaskInfo<Map<String, Object>, Map<String, Object>> completedAltered = createRandomTaskInfo(false);
insertTaskInfo(completedAltered, true);
Map<TaskLookup.TaskLookupType, TaskLookup> taskLookups = new HashMap<>();
taskLookups.put(TaskLookup.TaskLookupType.ACTIVE, ActiveTaskLookup.getInstance());
taskLookups.put(TaskLookup.TaskLookupType.COMPLETE, CompleteTaskLookup.of(null, Duration.millis(86400000)));
List<TaskInfo<TaskIdentifier, Map<String, Object>>> taskMetadataInfos;
// BEFORE MIGRATION
// Payload based fetch. task type and groupid will be populated
taskMetadataInfos = handler.getTaskStatusList(taskLookups, null, true);
Assert.assertEquals(4, taskMetadataInfos.size());
verifyTaskInfoToMetadataInfo(completedUnaltered, taskMetadataInfos, false);
verifyTaskInfoToMetadataInfo(completedAltered, taskMetadataInfos, false);
verifyTaskInfoToMetadataInfo(activeUnaltered, taskMetadataInfos, false);
verifyTaskInfoToMetadataInfo(activeAltered, taskMetadataInfos, false);
// New columns based fetch before migration is complete. type and payload are null when altered = false
taskMetadataInfos = handler.getTaskStatusList(taskLookups, null, false);
Assert.assertEquals(4, taskMetadataInfos.size());
verifyTaskInfoToMetadataInfo(completedUnaltered, taskMetadataInfos, true);
verifyTaskInfoToMetadataInfo(completedAltered, taskMetadataInfos, false);
verifyTaskInfoToMetadataInfo(activeUnaltered, taskMetadataInfos, true);
verifyTaskInfoToMetadataInfo(activeAltered, taskMetadataInfos, false);
// MIGRATION
handler.populateTaskTypeAndGroupId();
// Payload based fetch. task type and groupid will still be populated in tasks tab
taskMetadataInfos = handler.getTaskStatusList(taskLookups, null, true);
Assert.assertEquals(4, taskMetadataInfos.size());
verifyTaskInfoToMetadataInfo(completedUnaltered, taskMetadataInfos, false);
verifyTaskInfoToMetadataInfo(completedAltered, taskMetadataInfos, false);
verifyTaskInfoToMetadataInfo(activeUnaltered, taskMetadataInfos, false);
verifyTaskInfoToMetadataInfo(activeAltered, taskMetadataInfos, false);
// New columns based fetch after migration is complete. All data must be populated in the tasks table
taskMetadataInfos = handler.getTaskStatusList(taskLookups, null, false);
Assert.assertEquals(4, taskMetadataInfos.size());
verifyTaskInfoToMetadataInfo(completedUnaltered, taskMetadataInfos, false);
verifyTaskInfoToMetadataInfo(completedAltered, taskMetadataInfos, false);
verifyTaskInfoToMetadataInfo(activeUnaltered, taskMetadataInfos, false);
verifyTaskInfoToMetadataInfo(activeAltered, taskMetadataInfos, false);
}
private Integer getUnmigratedTaskCount()
{
return handler.getConnector().retryWithHandle(
new HandleCallback<Integer>()
{
@Override
public Integer withHandle(Handle handle) throws SQLException
{
String sql = String.format(Locale.ENGLISH,
"SELECT COUNT(*) FROM %s WHERE type is NULL or group_id is NULL",
entryTable);
ResultSet resultSet = handle.getConnection().createStatement().executeQuery(sql);
resultSet.next();
return resultSet.getInt(1);
}
}
);
}
private TaskInfo<Map<String, Object>, Map<String, Object>> createRandomTaskInfo(boolean active)
{
String id = UUID.randomUUID().toString();
DateTime createdTime = DateTime.now(DateTimeZone.UTC);
String datasource = UUID.randomUUID().toString();
String type = UUID.randomUUID().toString();
String groupId = UUID.randomUUID().toString();
Map<String, Object> payload = new HashMap<>();
payload.put("id", id);
payload.put("type", type);
payload.put("groupId", groupId);
Map<String, Object> status = new HashMap<>();
status.put("id", id);
status.put("status", active ? TaskState.RUNNING : TaskState.SUCCESS);
status.put("duration", RANDOM.nextLong());
status.put("location", TaskLocation.create(UUID.randomUUID().toString(), 8080, 995));
status.put("errorMsg", UUID.randomUUID().toString());
return new TaskInfo<>(
id,
createdTime,
status,
datasource,
payload
);
}
private void insertTaskInfo(TaskInfo<Map<String, Object>, Map<String, Object>> taskInfo,
boolean altered)
{
try {
handler.insert(
taskInfo.getId(),
taskInfo.getCreatedTime(),
taskInfo.getDataSource(),
taskInfo.getTask(),
TaskState.RUNNING.equals(taskInfo.getStatus().get("status")),
taskInfo.getStatus(),
altered ? taskInfo.getTask().get("type").toString() : null,
altered ? taskInfo.getTask().get("groupId").toString() : null
);
}
catch (Exception e) {
throw new RuntimeException(e);
}
}
private void verifyTaskInfoToMetadataInfo(TaskInfo<Map<String, Object>, Map<String, Object>> taskInfo,
List<TaskInfo<TaskIdentifier, Map<String, Object>>> taskMetadataInfos,
boolean nullNewColumns)
{
for (TaskInfo<TaskIdentifier, Map<String, Object>> taskMetadataInfo : taskMetadataInfos) {
if (taskMetadataInfo.getId().equals(taskInfo.getId())) {
verifyTaskInfoToMetadataInfo(taskInfo, taskMetadataInfo, nullNewColumns);
}
return;
}
Assert.fail();
}
private void verifyTaskInfoToMetadataInfo(TaskInfo<Map<String, Object>, Map<String, Object>> taskInfo,
TaskInfo<TaskIdentifier, Map<String, Object>> taskMetadataInfo,
boolean nullNewColumns)
{
Assert.assertEquals(taskInfo.getId(), taskMetadataInfo.getId());
Assert.assertEquals(taskInfo.getCreatedTime(), taskMetadataInfo.getCreatedTime());
Assert.assertEquals(taskInfo.getDataSource(), taskMetadataInfo.getDataSource());
verifyTaskStatus(taskInfo.getStatus(), taskMetadataInfo.getStatus());
Map<String, Object> task = taskInfo.getTask();
TaskIdentifier taskIdentifier = taskMetadataInfo.getTask();
Assert.assertEquals(task.get("id"), taskIdentifier.getId());
if (nullNewColumns) {
Assert.assertNull(taskIdentifier.getGroupId());
Assert.assertNull(taskIdentifier.getType());
} else {
Assert.assertEquals(task.get("groupId"), taskIdentifier.getGroupId());
Assert.assertEquals(task.get("type"), taskIdentifier.getType());
}
}
private void verifyTaskStatus(Map<String, Object> expected, Map<String, Object> actual)
{
Assert.assertEquals(expected.get("id"), actual.get("id"));
Assert.assertEquals(expected.get("duration"), actual.get("duration"));
Assert.assertEquals(expected.get("errorMsg"), actual.get("errorMsg"));
Assert.assertEquals(expected.get("status").toString(), actual.get("status"));
Assert.assertEquals(expected.get("location"), JSON_MAPPER.convertValue(actual.get("location"), TaskLocation.class));
}
}