Remove some unnecessary task storage internal APIs. (#6058)

* Remove some unnecessary task storage internal APIs.

- Remove MetadataStorageActionHandler's getInactiveStatusesSince and getActiveEntriesWithStatus.
- Remove TaskStorage's getCreatedDateTimeAndDataSource.
- Remove TaskStorageQueryAdapter's getCreatedTime, and getCreatedDateAndDataSource.
- Migrated all callers to getActiveTaskInfo and getCompletedTaskInfo.

This has one side effect: since getActiveTaskInfo (new) warns and continues when it
sees unreadable tasks, but getActiveEntriesWithStatus threw an exception when it
encountered those, it means that after this patch bad tasks will be ignored when
syncing from metadata storage rather than causing an exception to be thrown.

IMO, this is an improvement, since the most likely reason for bad tasks is either:

- A new version introduced an additional validation, and a pre-existing task doesn't
  pass it.
- You are rolling back from a newer version to an older version.

In both cases, I believe you would want to skip tasks that can't be deserialized,
rather than blocking overlord startup.

* Remove unused import.

* Fix formatting.

* Fix formatting.
This commit is contained in:
Gian Merlino 2018-07-30 18:35:06 -07:00 committed by Jihoon Son
parent f3595c93d9
commit 3aa7017975
16 changed files with 123 additions and 291 deletions

View File

@ -18,6 +18,7 @@
*/
package io.druid.indexer;
import com.google.common.base.Preconditions;
import org.joda.time.DateTime;
import javax.annotation.Nullable;
@ -25,11 +26,11 @@ import javax.annotation.Nullable;
/**
* This class is used to store task info from runner query and cache in OverlordResource
*/
public class TaskInfo<EntryType>
public class TaskInfo<EntryType, StatusType>
{
private final String id;
private final DateTime createdTime;
private final TaskStatus status;
private final StatusType status;
private final String dataSource;
@Nullable
private final EntryType task;
@ -37,15 +38,15 @@ public class TaskInfo<EntryType>
public TaskInfo(
String id,
DateTime createdTime,
TaskStatus status,
StatusType status,
String dataSource,
@Nullable EntryType task
)
{
this.id = id;
this.createdTime = createdTime;
this.status = status;
this.dataSource = dataSource;
this.id = Preconditions.checkNotNull(id, "id");
this.createdTime = Preconditions.checkNotNull(createdTime, "createdTime");
this.status = Preconditions.checkNotNull(status, "status");
this.dataSource = Preconditions.checkNotNull(dataSource, "dataSource");
this.task = task;
}
@ -59,7 +60,7 @@ public class TaskInfo<EntryType>
return createdTime;
}
public TaskStatus getStatus()
public StatusType getStatus()
{
return status;
}

View File

@ -21,7 +21,6 @@ package io.druid.metadata;
import com.google.common.base.Optional;
import io.druid.indexer.TaskInfo;
import io.druid.java.util.common.Pair;
import org.joda.time.DateTime;
import javax.annotation.Nullable;
@ -79,27 +78,6 @@ public interface MetadataStorageActionHandler<EntryType, StatusType, LogType, Lo
*/
Optional<StatusType> getStatus(String entryId);
/**
* Return all active entries with their respective status
*
* @return list of (entry, status) pairs
*/
List<Pair<EntryType, StatusType>> getActiveEntriesWithStatus();
default List<StatusType> getInactiveStatusesSince(DateTime timestamp)
{
return getInactiveStatusesSince(timestamp, null);
}
/**
* Return up to {@code maxNumStatuses} statuses for inactive entries created on or later than the given timestamp
*
* @param timestamp timestamp
* @param maxNumStatuses maxNumStatuses
* @return list of statuses
*/
List<StatusType> getInactiveStatusesSince(DateTime timestamp, @Nullable Integer maxNumStatuses);
/**
* Return up to {@code maxNumStatuses} {@link TaskInfo} objects for all inactive entries
* created on or later than the given timestamp
@ -109,7 +87,7 @@ public interface MetadataStorageActionHandler<EntryType, StatusType, LogType, Lo
*
* @return list of {@link TaskInfo}
*/
List<TaskInfo<EntryType>> getCompletedTaskInfo(
List<TaskInfo<EntryType, StatusType>> getCompletedTaskInfo(
DateTime timestamp,
@Nullable Integer maxNumStatuses,
@Nullable String datasource
@ -120,15 +98,7 @@ public interface MetadataStorageActionHandler<EntryType, StatusType, LogType, Lo
*
* @return list of {@link TaskInfo}
*/
List<TaskInfo<EntryType>> getActiveTaskInfo(@Nullable String dataSource);
/**
* Return createdDate and dataSource for the given id
*
* @return a pair of createdDate and dataSource or null if an entry for the given id is not found
*/
@Nullable
Pair<DateTime, String> getCreatedDateAndDataSource(String entryId);
List<TaskInfo<EntryType, StatusType>> getActiveTaskInfo(@Nullable String dataSource);
/**
* Add a lock to the given entry

View File

@ -139,19 +139,4 @@ public class CustomStatementRewriterTest
rewrite("UPDATE %s SET active = :active, status_payload = :status_payload WHERE id = :id AND active = TRUE"));
}
/**
*
* @see io.druid.metadata.SQLMetadataStorageActionHandler#getInactiveStatusesSince(org.joda.time.DateTime)
*
*/
@Test
public void testSQLMetadataStorageActionHandlerGetInactiveStatusesSince()
{
Assert.assertEquals(
"SELECT id, status_payload FROM %s WHERE active = 0 AND created_date >= ? ORDER BY created_date DESC",
rewrite(
"SELECT id, status_payload FROM %s WHERE active = FALSE AND created_date >= :start ORDER BY created_date DESC"));
}
}

View File

@ -35,7 +35,6 @@ import io.druid.indexing.common.actions.TaskAction;
import io.druid.indexing.common.config.TaskStorageConfig;
import io.druid.indexing.common.task.Task;
import io.druid.java.util.common.DateTimes;
import io.druid.java.util.common.Pair;
import io.druid.java.util.common.logger.Logger;
import io.druid.metadata.EntryExistsException;
import org.joda.time.DateTime;
@ -170,12 +169,12 @@ public class HeapMemoryTaskStorage implements TaskStorage
}
@Override
public List<TaskInfo<Task>> getActiveTaskInfo(@Nullable String dataSource)
public List<TaskInfo<Task, TaskStatus>> getActiveTaskInfo(@Nullable String dataSource)
{
giant.lock();
try {
final ImmutableList.Builder<TaskInfo<Task>> listBuilder = ImmutableList.builder();
final ImmutableList.Builder<TaskInfo<Task, TaskStatus>> listBuilder = ImmutableList.builder();
for (final TaskStuff taskStuff : tasks.values()) {
if (taskStuff.getStatus().isRunnable()) {
TaskInfo t = new TaskInfo(
@ -196,7 +195,7 @@ public class HeapMemoryTaskStorage implements TaskStorage
}
@Override
public List<TaskInfo<Task>> getRecentlyFinishedTaskInfo(
public List<TaskInfo<Task, TaskStatus>> getRecentlyFinishedTaskInfo(
@Nullable Integer maxTaskStatuses, @Nullable Duration duration, @Nullable String datasource
)
{
@ -224,7 +223,10 @@ public class HeapMemoryTaskStorage implements TaskStorage
}
}
private List<TaskInfo<Task>> getRecentlyFinishedTaskInfoSince(DateTime start, Ordering<TaskStuff> createdDateDesc)
private List<TaskInfo<Task, TaskStatus>> getRecentlyFinishedTaskInfoSince(
DateTime start,
Ordering<TaskStuff> createdDateDesc
)
{
giant.lock();
@ -234,7 +236,7 @@ public class HeapMemoryTaskStorage implements TaskStorage
.stream()
.filter(taskStuff -> taskStuff.getStatus().isComplete())
.collect(Collectors.toList());
final ImmutableList.Builder<TaskInfo<Task>> listBuilder = ImmutableList.builder();
final ImmutableList.Builder<TaskInfo<Task, TaskStatus>> listBuilder = ImmutableList.builder();
for (final TaskStuff taskStuff : list) {
String id = taskStuff.getTask().getId();
TaskInfo t = new TaskInfo(
@ -253,7 +255,7 @@ public class HeapMemoryTaskStorage implements TaskStorage
}
}
private List<TaskInfo<Task>> getNRecentlyFinishedTaskInfo(int n, Ordering<TaskStuff> createdDateDesc)
private List<TaskInfo<Task, TaskStatus>> getNRecentlyFinishedTaskInfo(int n, Ordering<TaskStuff> createdDateDesc)
{
giant.lock();
@ -263,10 +265,10 @@ public class HeapMemoryTaskStorage implements TaskStorage
.stream()
.limit(n)
.collect(Collectors.toList());
final ImmutableList.Builder<TaskInfo<Task>> listBuilder = ImmutableList.builder();
final ImmutableList.Builder<TaskInfo<Task, TaskStatus>> listBuilder = ImmutableList.builder();
for (final TaskStuff taskStuff : list) {
String id = taskStuff.getTask().getId();
TaskInfo t = new TaskInfo(
TaskInfo t = new TaskInfo<>(
id,
taskStuff.getCreatedDate(),
taskStuff.getStatus(),
@ -282,21 +284,6 @@ public class HeapMemoryTaskStorage implements TaskStorage
}
}
@Nullable
@Override
public Pair<DateTime, String> getCreatedDateTimeAndDataSource(String taskId)
{
giant.lock();
try {
final TaskStuff taskStuff = tasks.get(taskId);
return taskStuff == null ? null : Pair.of(taskStuff.getCreatedDate(), taskStuff.getDataSource());
}
finally {
giant.unlock();
}
}
@Override
public void addLock(final String taskid, final TaskLock taskLock)
{

View File

@ -21,6 +21,7 @@ package io.druid.indexing.overlord;
import com.google.common.base.Preconditions;
import com.google.inject.Inject;
import io.druid.indexer.TaskInfo;
import io.druid.java.util.common.DateTimes;
import org.joda.time.DateTime;
import org.joda.time.Interval;
@ -47,13 +48,9 @@ public class IndexerMetadataStorageAdapter
{
// Check the given interval overlaps the interval(minCreatedDateOfActiveTasks, MAX)
final Optional<DateTime> minCreatedDateOfActiveTasks = taskStorageQueryAdapter
.getActiveTasks()
.getActiveTaskInfo(dataSource)
.stream()
.map(task -> Preconditions.checkNotNull(
taskStorageQueryAdapter.getCreatedTime(task.getId()),
"Can't find the createdTime for task[%s]",
task.getId()
))
.map(TaskInfo::getCreatedTime)
.min(Comparator.naturalOrder());
final Interval activeTaskInterval = new Interval(

View File

@ -23,7 +23,6 @@ import com.fasterxml.jackson.core.type.TypeReference;
import com.google.common.base.Function;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.base.Predicate;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
@ -36,7 +35,6 @@ import io.druid.indexing.common.config.TaskStorageConfig;
import io.druid.indexing.common.task.Task;
import io.druid.java.util.common.DateTimes;
import io.druid.java.util.common.ISE;
import io.druid.java.util.common.Pair;
import io.druid.java.util.common.lifecycle.LifecycleStart;
import io.druid.java.util.common.lifecycle.LifecycleStop;
import io.druid.java.util.emitter.EmittingLogger;
@ -46,12 +44,12 @@ import io.druid.metadata.MetadataStorageActionHandlerFactory;
import io.druid.metadata.MetadataStorageActionHandlerTypes;
import io.druid.metadata.MetadataStorageConnector;
import io.druid.metadata.MetadataStorageTablesConfig;
import org.joda.time.DateTime;
import org.joda.time.Duration;
import javax.annotation.Nullable;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
public class MetadataTaskStorage implements TaskStorage
{
@ -185,36 +183,15 @@ public class MetadataTaskStorage implements TaskStorage
@Override
public List<Task> getActiveTasks()
{
return ImmutableList.copyOf(
Iterables.transform(
Iterables.filter(
handler.getActiveEntriesWithStatus(),
new Predicate<Pair<Task, TaskStatus>>()
{
@Override
public boolean apply(
@Nullable Pair<Task, TaskStatus> input
)
{
return input.rhs.isRunnable();
}
}
),
new Function<Pair<Task, TaskStatus>, Task>()
{
@Nullable
@Override
public Task apply(@Nullable Pair<Task, TaskStatus> input)
{
return input.lhs;
}
}
)
);
return handler.getActiveTaskInfo(null)
.stream()
.filter(taskInfo -> taskInfo.getStatus().isRunnable())
.map(TaskInfo::getTask)
.collect(Collectors.toList());
}
@Override
public List<TaskInfo<Task>> getActiveTaskInfo(@Nullable String dataSource)
public List<TaskInfo<Task, TaskStatus>> getActiveTaskInfo(@Nullable String dataSource)
{
return ImmutableList.copyOf(
handler.getActiveTaskInfo(dataSource)
@ -222,7 +199,7 @@ public class MetadataTaskStorage implements TaskStorage
}
@Override
public List<TaskInfo<Task>> getRecentlyFinishedTaskInfo(
public List<TaskInfo<Task, TaskStatus>> getRecentlyFinishedTaskInfo(
@Nullable Integer maxTaskStatuses,
@Nullable Duration duration,
@Nullable String datasource
@ -237,13 +214,6 @@ public class MetadataTaskStorage implements TaskStorage
);
}
@Nullable
@Override
public Pair<DateTime, String> getCreatedDateTimeAndDataSource(String taskId)
{
return handler.getCreatedDateAndDataSource(taskId);
}
@Override
public void addLock(final String taskid, final TaskLock taskLock)
{

View File

@ -25,9 +25,7 @@ import io.druid.indexer.TaskStatus;
import io.druid.indexing.common.TaskLock;
import io.druid.indexing.common.actions.TaskAction;
import io.druid.indexing.common.task.Task;
import io.druid.java.util.common.Pair;
import io.druid.metadata.EntryExistsException;
import org.joda.time.DateTime;
import org.joda.time.Duration;
import javax.annotation.Nullable;
@ -131,7 +129,7 @@ public interface TaskStorage
*
* @return list of {@link TaskInfo}
*/
List<TaskInfo<Task>> getActiveTaskInfo(@Nullable String dataSource);
List<TaskInfo<Task, TaskStatus>> getActiveTaskInfo(@Nullable String dataSource);
/**
* Returns up to {@code maxTaskStatuses} {@link TaskInfo} objects of recently finished tasks as stored in the storage facility. No
@ -144,15 +142,12 @@ public interface TaskStorage
*
* @return list of {@link TaskInfo}
*/
List<TaskInfo<Task>> getRecentlyFinishedTaskInfo(
List<TaskInfo<Task, TaskStatus>> getRecentlyFinishedTaskInfo(
@Nullable Integer maxTaskStatuses,
@Nullable Duration duration,
@Nullable String datasource
);
@Nullable
Pair<DateTime, String> getCreatedDateTimeAndDataSource(String taskId);
/**
* Returns a list of locks for a particular task.
*

View File

@ -28,9 +28,7 @@ import io.druid.indexing.common.actions.SegmentInsertAction;
import io.druid.indexing.common.actions.SegmentTransactionalInsertAction;
import io.druid.indexing.common.actions.TaskAction;
import io.druid.indexing.common.task.Task;
import io.druid.java.util.common.Pair;
import io.druid.timeline.DataSegment;
import org.joda.time.DateTime;
import org.joda.time.Duration;
import javax.annotation.Nullable;
@ -55,12 +53,12 @@ public class TaskStorageQueryAdapter
return storage.getActiveTasks();
}
public List<TaskInfo<Task>> getActiveTaskInfo(@Nullable String dataSource)
public List<TaskInfo<Task, TaskStatus>> getActiveTaskInfo(@Nullable String dataSource)
{
return storage.getActiveTaskInfo(dataSource);
}
public List<TaskInfo<Task>> getRecentlyCompletedTaskInfo(
public List<TaskInfo<Task, TaskStatus>> getRecentlyCompletedTaskInfo(
@Nullable Integer maxTaskStatuses,
@Nullable Duration duration,
@Nullable String dataSource
@ -69,13 +67,6 @@ public class TaskStorageQueryAdapter
return storage.getRecentlyFinishedTaskInfo(maxTaskStatuses, duration, dataSource);
}
@Nullable
public DateTime getCreatedTime(String taskId)
{
final Pair<DateTime, String> pair = storage.getCreatedDateTimeAndDataSource(taskId);
return pair == null ? null : pair.lhs;
}
public Optional<Task> getTask(final String taskid)
{
return storage.getTask(taskid);
@ -108,9 +99,4 @@ public class TaskStorageQueryAdapter
}
return segments;
}
public Pair<DateTime, String> getCreatedDateAndDataSource(String taskId)
{
return storage.getCreatedDateTimeAndDataSource(taskId);
}
}

View File

@ -606,7 +606,7 @@ public class OverlordResource
null
);
Function<TaskInfo<Task>, TaskStatusPlus> completeTaskTransformFunc = taskInfo -> new TaskStatusPlus(
Function<TaskInfo<Task, TaskStatus>, TaskStatusPlus> completeTaskTransformFunc = taskInfo -> new TaskStatusPlus(
taskInfo.getId(),
taskInfo.getTask() == null ? null : taskInfo.getTask().getType(),
taskInfo.getCreatedTime(),
@ -628,18 +628,18 @@ public class OverlordResource
final Interval theInterval = Intervals.of(interval.replace("_", "/"));
duration = theInterval.toDuration();
}
final List<TaskInfo<Task>> taskInfoList = taskStorageQueryAdapter.getRecentlyCompletedTaskInfo(
final List<TaskInfo<Task, TaskStatus>> taskInfoList = taskStorageQueryAdapter.getRecentlyCompletedTaskInfo(
maxCompletedTasks, duration, dataSource
);
final List<TaskStatusPlus> completedTasks = Lists.transform(taskInfoList, completeTaskTransformFunc);
finalTaskList.addAll(completedTasks);
}
final List<TaskInfo<Task>> allActiveTaskInfo;
final List<TaskInfo<Task, TaskStatus>> allActiveTaskInfo;
final List<AnyTask> allActiveTasks = Lists.newArrayList();
if (state == null || !"complete".equals(StringUtils.toLowerCase(state))) {
allActiveTaskInfo = taskStorageQueryAdapter.getActiveTaskInfo(dataSource);
for (final TaskInfo<Task> task : allActiveTaskInfo) {
for (final TaskInfo<Task, TaskStatus> task : allActiveTaskInfo) {
allActiveTasks.add(
new AnyTask(
task.getId(),

View File

@ -20,7 +20,10 @@
package io.druid.indexing.overlord;
import com.google.common.collect.ImmutableList;
import io.druid.indexer.TaskInfo;
import io.druid.indexer.TaskStatus;
import io.druid.indexing.common.task.NoopTask;
import io.druid.indexing.common.task.Task;
import io.druid.java.util.common.DateTimes;
import io.druid.java.util.common.Intervals;
import org.easymock.EasyMock;
@ -32,6 +35,8 @@ import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import java.util.List;
public class IndexerMetadataStorageAdapterTest
{
@Rule
@ -55,12 +60,23 @@ public class IndexerMetadataStorageAdapterTest
@Test
public void testDeletePendingSegments()
{
EasyMock.expect(taskStorageQueryAdapter.getActiveTasks())
.andReturn(ImmutableList.of(NoopTask.create("id1", 0), NoopTask.create("id2", 0)));
EasyMock.expect(taskStorageQueryAdapter.getCreatedTime(EasyMock.eq("id1")))
.andReturn(DateTimes.of("2017-12-01"));
EasyMock.expect(taskStorageQueryAdapter.getCreatedTime(EasyMock.eq("id2")))
.andReturn(DateTimes.of("2017-12-02"));
final List<TaskInfo<Task, TaskStatus>> taskInfos = ImmutableList.of(
new TaskInfo<>(
"id1",
DateTimes.of("2017-12-01"),
TaskStatus.running("id1"),
"dataSource",
NoopTask.create("id1", 0)
),
new TaskInfo<>(
"id1",
DateTimes.of("2017-12-02"),
TaskStatus.running("id2"),
"dataSource",
NoopTask.create("id2", 0)
)
);
EasyMock.expect(taskStorageQueryAdapter.getActiveTaskInfo("dataSource")).andReturn(taskInfos);
final Interval deleteInterval = Intervals.of("2017-01-01/2017-12-01");
EasyMock
@ -76,12 +92,24 @@ public class IndexerMetadataStorageAdapterTest
@Test
public void testDeletePendingSegmentsOfRunningTasks()
{
EasyMock.expect(taskStorageQueryAdapter.getActiveTasks())
.andReturn(ImmutableList.of(NoopTask.create("id1", 0), NoopTask.create("id2", 0)));
EasyMock.expect(taskStorageQueryAdapter.getCreatedTime(EasyMock.eq("id1")))
.andReturn(DateTimes.of("2017-11-01"));
EasyMock.expect(taskStorageQueryAdapter.getCreatedTime(EasyMock.eq("id2")))
.andReturn(DateTimes.of("2017-12-02"));
final ImmutableList<TaskInfo<Task, TaskStatus>> taskInfos = ImmutableList.of(
new TaskInfo<>(
"id1",
DateTimes.of("2017-11-01"),
TaskStatus.running("id1"),
"dataSource",
NoopTask.create("id1", 0)
),
new TaskInfo<>(
"id1",
DateTimes.of("2017-12-02"),
TaskStatus.running("id2"),
"dataSource",
NoopTask.create("id2", 0)
)
);
EasyMock.expect(taskStorageQueryAdapter.getActiveTaskInfo("dataSource")).andReturn(taskInfos);
final Interval deleteInterval = Intervals.of("2017-01-01/2017-12-01");
EasyMock

View File

@ -47,7 +47,7 @@ public class DerbyMetadataStorageActionHandler<EntryType, StatusType, LogType, L
}
@Override
protected Query<Map<String, Object>> createInactiveStatusesSinceQuery(
protected Query<Map<String, Object>> createCompletedTaskInfoQuery(
Handle handle, DateTime timestamp, @Nullable Integer maxNumStatuses, @Nullable String dataSource
)
{

View File

@ -45,7 +45,7 @@ public class MySQLMetadataStorageActionHandler<EntryType, StatusType, LogType, L
}
@Override
protected Query<Map<String, Object>> createInactiveStatusesSinceQuery(
protected Query<Map<String, Object>> createCompletedTaskInfoQuery(
Handle handle, DateTime timestamp, @Nullable Integer maxNumStatuses, @Nullable String dataSource
)
{

View File

@ -45,7 +45,7 @@ public class PostgreSQLMetadataStorageActionHandler<EntryType, StatusType, LogTy
}
@Override
protected Query<Map<String, Object>> createInactiveStatusesSinceQuery(
protected Query<Map<String, Object>> createCompletedTaskInfoQuery(
Handle handle, DateTime timestamp, @Nullable Integer maxNumStatuses, @Nullable String dataSource
)
{

View File

@ -27,7 +27,6 @@ import com.google.common.base.Optional;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import io.druid.indexer.TaskInfo;
import io.druid.indexer.TaskStatus;
import io.druid.java.util.common.DateTimes;
import io.druid.java.util.common.Pair;
import io.druid.java.util.common.StringUtils;
@ -237,88 +236,7 @@ public abstract class SQLMetadataStorageActionHandler<EntryType, StatusType, Log
}
@Override
public List<Pair<EntryType, StatusType>> getActiveEntriesWithStatus()
{
return connector.retryWithHandle(
new HandleCallback<List<Pair<EntryType, StatusType>>>()
{
@Override
public List<Pair<EntryType, StatusType>> withHandle(Handle handle)
{
return handle
.createQuery(
StringUtils.format(
"SELECT id, payload, status_payload FROM %s WHERE active = TRUE ORDER BY created_date",
entryTable
)
)
.map(
new ResultSetMapper<Pair<EntryType, StatusType>>()
{
@Override
public Pair<EntryType, StatusType> map(int index, ResultSet r, StatementContext ctx)
throws SQLException
{
try {
return Pair.of(
jsonMapper.readValue(
r.getBytes("payload"),
entryType
),
jsonMapper.readValue(
r.getBytes("status_payload"),
statusType
)
);
}
catch (IOException e) {
log.makeAlert(e, "Failed to parse entry payload").addData("entry", r.getString("id")).emit();
throw new SQLException(e);
}
}
}
).list();
}
}
);
}
@Override
public List<StatusType> getInactiveStatusesSince(DateTime timestamp, @Nullable Integer maxNumStatuses)
{
return getConnector().retryWithHandle(
handle -> {
final Query<Map<String, Object>> query = createInactiveStatusesSinceQuery(
handle,
timestamp,
maxNumStatuses,
null
);
return query
.map(
(ResultSetMapper<StatusType>) (index, r, ctx) -> {
try {
return getJsonMapper().readValue(
r.getBytes("status_payload"),
getStatusType()
);
}
catch (IOException e) {
log.makeAlert(e, "Failed to parse status payload")
.addData("entry", r.getString("id"))
.emit();
throw new SQLException(e);
}
}
).list();
}
);
}
@Override
public List<TaskInfo<EntryType>> getCompletedTaskInfo(
public List<TaskInfo<EntryType, StatusType>> getCompletedTaskInfo(
DateTime timestamp,
@Nullable Integer maxNumStatuses,
@Nullable String dataSource
@ -326,7 +244,7 @@ public abstract class SQLMetadataStorageActionHandler<EntryType, StatusType, Log
{
return getConnector().retryWithHandle(
handle -> {
final Query<Map<String, Object>> query = createInactiveStatusesSinceQuery(
final Query<Map<String, Object>> query = createCompletedTaskInfoQuery(
handle,
timestamp,
maxNumStatuses,
@ -338,11 +256,11 @@ public abstract class SQLMetadataStorageActionHandler<EntryType, StatusType, Log
}
@Override
public List<TaskInfo<EntryType>> getActiveTaskInfo(@Nullable String dataSource)
public List<TaskInfo<EntryType, StatusType>> getActiveTaskInfo(@Nullable String dataSource)
{
return getConnector().retryWithHandle(
handle -> {
final Query<Map<String, Object>> query = createActiveStatusesQuery(
final Query<Map<String, Object>> query = createActiveTaskInfoQuery(
handle,
dataSource
);
@ -351,7 +269,7 @@ public abstract class SQLMetadataStorageActionHandler<EntryType, StatusType, Log
);
}
private Query<Map<String, Object>> createActiveStatusesQuery(Handle handle, @Nullable String dataSource)
private Query<Map<String, Object>> createActiveTaskInfoQuery(Handle handle, @Nullable String dataSource)
{
String sql = StringUtils.format(
"SELECT "
@ -384,14 +302,14 @@ public abstract class SQLMetadataStorageActionHandler<EntryType, StatusType, Log
return sql;
}
class TaskInfoMapper implements ResultSetMapper<TaskInfo<EntryType>>
class TaskInfoMapper implements ResultSetMapper<TaskInfo<EntryType, StatusType>>
{
@Override
public TaskInfo<EntryType> map(int index, ResultSet resultSet, StatementContext context) throws SQLException
public TaskInfo<EntryType, StatusType> map(int index, ResultSet resultSet, StatementContext context) throws SQLException
{
final TaskInfo<EntryType> taskInfo;
final TaskInfo<EntryType, StatusType> taskInfo;
EntryType task;
TaskStatus status;
StatusType status;
try {
task = getJsonMapper().readValue(resultSet.getBytes("payload"), getEntryType());
}
@ -417,35 +335,13 @@ public abstract class SQLMetadataStorageActionHandler<EntryType, StatusType, Log
}
}
protected abstract Query<Map<String, Object>> createInactiveStatusesSinceQuery(
protected abstract Query<Map<String, Object>> createCompletedTaskInfoQuery(
Handle handle,
DateTime timestamp,
@Nullable Integer maxNumStatuses,
@Nullable String dataSource
);
@Override
@Nullable
public Pair<DateTime, String> getCreatedDateAndDataSource(String entryId)
{
return connector.retryWithHandle(
handle -> handle
.createQuery(
StringUtils.format(
"SELECT created_date, datasource FROM %s WHERE id = :entryId",
entryTable
)
)
.bind("entryId", entryId)
.map(
(index, resultSet, ctx) -> Pair.of(
DateTimes.of(resultSet.getString("created_date")), resultSet.getString("datasource")
)
)
.first()
);
}
@Override
public boolean addLock(final String entryId, final LockType lock)
{

View File

@ -45,7 +45,7 @@ public class SQLServerMetadataStorageActionHandler<EntryType, StatusType, LogTyp
}
@Override
protected Query<Map<String, Object>> createInactiveStatusesSinceQuery(
protected Query<Map<String, Object>> createCompletedTaskInfoQuery(
Handle handle, DateTime timestamp, @Nullable Integer maxNumStatuses, @Nullable String dataSource
)
{

View File

@ -25,6 +25,7 @@ import com.google.common.base.Optional;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import io.druid.indexer.TaskInfo;
import io.druid.jackson.DefaultObjectMapper;
import io.druid.java.util.common.DateTimes;
import io.druid.java.util.common.Pair;
@ -39,6 +40,7 @@ import org.junit.rules.ExpectedException;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
public class SQLMetadataStorageActionHandlerTest
{
@ -133,19 +135,23 @@ public class SQLMetadataStorageActionHandlerTest
Assert.assertEquals(
ImmutableList.of(Pair.of(entry, status1)),
handler.getActiveEntriesWithStatus()
handler.getActiveTaskInfo(null).stream()
.map(taskInfo -> Pair.of(taskInfo.getTask(), taskInfo.getStatus()))
.collect(Collectors.toList())
);
Assert.assertTrue(handler.setStatus(entryId, true, status2));
Assert.assertEquals(
ImmutableList.of(Pair.of(entry, status2)),
handler.getActiveEntriesWithStatus()
handler.getActiveTaskInfo(null).stream()
.map(taskInfo -> Pair.of(taskInfo.getTask(), taskInfo.getStatus()))
.collect(Collectors.toList())
);
Assert.assertEquals(
ImmutableList.of(),
handler.getInactiveStatusesSince(DateTimes.of("2014-01-01"))
handler.getCompletedTaskInfo(DateTimes.of("2014-01-01"), null, null)
);
Assert.assertTrue(handler.setStatus(entryId, false, status1));
@ -170,12 +176,15 @@ public class SQLMetadataStorageActionHandlerTest
Assert.assertEquals(
ImmutableList.of(),
handler.getInactiveStatusesSince(DateTimes.of("2014-01-03"))
handler.getCompletedTaskInfo(DateTimes.of("2014-01-03"), null, null)
);
Assert.assertEquals(
ImmutableList.of(status1),
handler.getInactiveStatusesSince(DateTimes.of("2014-01-01"))
handler.getCompletedTaskInfo(DateTimes.of("2014-01-01"), null, null)
.stream()
.map(TaskInfo::getStatus)
.collect(Collectors.toList())
);
}
@ -190,11 +199,15 @@ public class SQLMetadataStorageActionHandlerTest
handler.insert(entryId, DateTimes.of(StringUtils.format("2014-01-%02d", i)), "test", entry, false, status);
}
final List<Map<String, Integer>> statuses = handler.getInactiveStatusesSince(DateTimes.of("2014-01-01"), 7);
final List<TaskInfo<Map<String, Integer>, Map<String, Integer>>> statuses = handler.getCompletedTaskInfo(
DateTimes.of("2014-01-01"),
7,
null
);
Assert.assertEquals(7, statuses.size());
int i = 10;
for (Map<String, Integer> status : statuses) {
Assert.assertEquals(ImmutableMap.of("count", i-- * 10), status);
for (TaskInfo<Map<String, Integer>, Map<String, Integer>> status : statuses) {
Assert.assertEquals(ImmutableMap.of("count", i-- * 10), status.getStatus());
}
}
@ -209,11 +222,15 @@ public class SQLMetadataStorageActionHandlerTest
handler.insert(entryId, DateTimes.of(StringUtils.format("2014-01-%02d", i)), "test", entry, false, status);
}
final List<Map<String, Integer>> statuses = handler.getInactiveStatusesSince(DateTimes.of("2014-01-01"), 10);
final List<TaskInfo<Map<String, Integer>, Map<String, Integer>>> statuses = handler.getCompletedTaskInfo(
DateTimes.of("2014-01-01"),
10,
null
);
Assert.assertEquals(5, statuses.size());
int i = 5;
for (Map<String, Integer> status : statuses) {
Assert.assertEquals(ImmutableMap.of("count", i-- * 10), status);
for (TaskInfo<Map<String, Integer>, Map<String, Integer>> status : statuses) {
Assert.assertEquals(ImmutableMap.of("count", i-- * 10), status.getStatus());
}
}