Fix IllegalArgumentException in TaskLockBox.syncFromStorage() (#6050)

This commit is contained in:
Jihoon Son 2018-07-27 10:43:32 -07:00 committed by GitHub
parent 94d6c9a0a5
commit 1524af703d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 111 additions and 74 deletions

View File

@ -165,9 +165,9 @@ public class KafkaIndexTask extends AbstractTask implements ChatHandler
} }
@Override @Override
public int getPriority() public int getDefaultPriority()
{ {
return getContextValue(Tasks.PRIORITY_KEY, Tasks.DEFAULT_REALTIME_TASK_PRIORITY); return Tasks.DEFAULT_REALTIME_TASK_PRIORITY;
} }
@Override @Override

View File

@ -36,6 +36,7 @@ import org.joda.time.Interval;
import javax.annotation.Nullable; import javax.annotation.Nullable;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@ -64,17 +65,17 @@ public abstract class AbstractTask implements Task
protected AbstractTask( protected AbstractTask(
String id, String id,
String groupId, @Nullable String groupId,
TaskResource taskResource, @Nullable TaskResource taskResource,
String dataSource, String dataSource,
Map<String, Object> context @Nullable Map<String, Object> context
) )
{ {
this.id = Preconditions.checkNotNull(id, "id"); this.id = Preconditions.checkNotNull(id, "id");
this.groupId = groupId == null ? id : groupId; this.groupId = groupId == null ? id : groupId;
this.taskResource = taskResource == null ? new TaskResource(id, 1) : taskResource; this.taskResource = taskResource == null ? new TaskResource(id, 1) : taskResource;
this.dataSource = Preconditions.checkNotNull(dataSource, "dataSource"); this.dataSource = Preconditions.checkNotNull(dataSource, "dataSource");
this.context = context; this.context = context == null ? new HashMap<>() : context;
} }
static String getOrMakeId(String id, final String typeName, String dataSource) static String getOrMakeId(String id, final String typeName, String dataSource)
@ -162,11 +163,13 @@ public abstract class AbstractTask implements Task
@Override @Override
public String toString() public String toString()
{ {
return Objects.toStringHelper(this) return "AbstractTask{" +
.add("id", id) "id='" + id + '\'' +
.add("type", getType()) ", groupId='" + groupId + '\'' +
.add("dataSource", dataSource) ", taskResource=" + taskResource +
.toString(); ", dataSource='" + dataSource + '\'' +
", context=" + context +
'}';
} }
/** /**
@ -207,13 +210,21 @@ public abstract class AbstractTask implements Task
return false; return false;
} }
return true; if (!groupId.equals(that.groupId)) {
return false;
}
if (!dataSource.equals(that.dataSource)) {
return false;
}
return context.equals(that.context);
} }
@Override @Override
public int hashCode() public int hashCode()
{ {
return id.hashCode(); return Objects.hashCode(id, groupId, dataSource, context);
} }
static List<TaskLock> getTaskLocks(TaskActionClient client) throws IOException static List<TaskLock> getTaskLocks(TaskActionClient client) throws IOException

View File

@ -39,13 +39,13 @@ import io.druid.discovery.DiscoveryDruidNode;
import io.druid.discovery.DruidNodeDiscoveryProvider; import io.druid.discovery.DruidNodeDiscoveryProvider;
import io.druid.discovery.LookupNodeService; import io.druid.discovery.LookupNodeService;
import io.druid.indexer.IngestionState; import io.druid.indexer.IngestionState;
import io.druid.indexer.TaskStatus;
import io.druid.indexing.appenderator.ActionBasedSegmentAllocator; import io.druid.indexing.appenderator.ActionBasedSegmentAllocator;
import io.druid.indexing.appenderator.ActionBasedUsedSegmentChecker; import io.druid.indexing.appenderator.ActionBasedUsedSegmentChecker;
import io.druid.indexing.common.IngestionStatsAndErrorsTaskReport; import io.druid.indexing.common.IngestionStatsAndErrorsTaskReport;
import io.druid.indexing.common.IngestionStatsAndErrorsTaskReportData; import io.druid.indexing.common.IngestionStatsAndErrorsTaskReportData;
import io.druid.indexing.common.TaskRealtimeMetricsMonitorBuilder; import io.druid.indexing.common.TaskRealtimeMetricsMonitorBuilder;
import io.druid.indexing.common.TaskReport; import io.druid.indexing.common.TaskReport;
import io.druid.indexer.TaskStatus;
import io.druid.indexing.common.TaskToolbox; import io.druid.indexing.common.TaskToolbox;
import io.druid.indexing.common.actions.SegmentTransactionalInsertAction; import io.druid.indexing.common.actions.SegmentTransactionalInsertAction;
import io.druid.indexing.common.actions.TaskActionClient; import io.druid.indexing.common.actions.TaskActionClient;
@ -202,9 +202,9 @@ public class AppenderatorDriverRealtimeIndexTask extends AbstractTask implements
} }
@Override @Override
public int getPriority() public int getDefaultPriority()
{ {
return getContextValue(Tasks.PRIORITY_KEY, Tasks.DEFAULT_REALTIME_TASK_PRIORITY); return Tasks.DEFAULT_REALTIME_TASK_PRIORITY;
} }
@Override @Override

View File

@ -178,9 +178,9 @@ public class CompactionTask extends AbstractTask
} }
@Override @Override
public int getPriority() public int getDefaultPriority()
{ {
return getContextValue(Tasks.PRIORITY_KEY, Tasks.DEFAULT_MERGE_TASK_PRIORITY); return Tasks.DEFAULT_MERGE_TASK_PRIORITY;
} }
@VisibleForTesting @VisibleForTesting

View File

@ -39,12 +39,12 @@ import io.druid.indexer.IngestionState;
import io.druid.indexer.MetadataStorageUpdaterJobHandler; import io.druid.indexer.MetadataStorageUpdaterJobHandler;
import io.druid.indexer.TaskMetricsGetter; import io.druid.indexer.TaskMetricsGetter;
import io.druid.indexer.TaskMetricsUtils; import io.druid.indexer.TaskMetricsUtils;
import io.druid.indexer.TaskStatus;
import io.druid.indexing.common.IngestionStatsAndErrorsTaskReport; import io.druid.indexing.common.IngestionStatsAndErrorsTaskReport;
import io.druid.indexing.common.IngestionStatsAndErrorsTaskReportData; import io.druid.indexing.common.IngestionStatsAndErrorsTaskReportData;
import io.druid.indexing.common.TaskLock; import io.druid.indexing.common.TaskLock;
import io.druid.indexing.common.TaskLockType; import io.druid.indexing.common.TaskLockType;
import io.druid.indexing.common.TaskReport; import io.druid.indexing.common.TaskReport;
import io.druid.indexer.TaskStatus;
import io.druid.indexing.common.TaskToolbox; import io.druid.indexing.common.TaskToolbox;
import io.druid.indexing.common.actions.LockAcquireAction; import io.druid.indexing.common.actions.LockAcquireAction;
import io.druid.indexing.common.actions.LockTryAcquireAction; import io.druid.indexing.common.actions.LockTryAcquireAction;
@ -171,9 +171,9 @@ public class HadoopIndexTask extends HadoopTask implements ChatHandler
} }
@Override @Override
public int getPriority() public int getDefaultPriority()
{ {
return getContextValue(Tasks.PRIORITY_KEY, Tasks.DEFAULT_BATCH_INDEX_TASK_PRIORITY); return Tasks.DEFAULT_BATCH_INDEX_TASK_PRIORITY;
} }
@Override @Override

View File

@ -41,6 +41,7 @@ import io.druid.data.input.InputRow;
import io.druid.data.input.Rows; import io.druid.data.input.Rows;
import io.druid.hll.HyperLogLogCollector; import io.druid.hll.HyperLogLogCollector;
import io.druid.indexer.IngestionState; import io.druid.indexer.IngestionState;
import io.druid.indexer.TaskStatus;
import io.druid.indexing.appenderator.ActionBasedSegmentAllocator; import io.druid.indexing.appenderator.ActionBasedSegmentAllocator;
import io.druid.indexing.appenderator.ActionBasedUsedSegmentChecker; import io.druid.indexing.appenderator.ActionBasedUsedSegmentChecker;
import io.druid.indexing.common.IngestionStatsAndErrorsTaskReport; import io.druid.indexing.common.IngestionStatsAndErrorsTaskReport;
@ -48,7 +49,6 @@ import io.druid.indexing.common.IngestionStatsAndErrorsTaskReportData;
import io.druid.indexing.common.TaskLock; import io.druid.indexing.common.TaskLock;
import io.druid.indexing.common.TaskRealtimeMetricsMonitorBuilder; import io.druid.indexing.common.TaskRealtimeMetricsMonitorBuilder;
import io.druid.indexing.common.TaskReport; import io.druid.indexing.common.TaskReport;
import io.druid.indexer.TaskStatus;
import io.druid.indexing.common.TaskToolbox; import io.druid.indexing.common.TaskToolbox;
import io.druid.indexing.common.actions.SegmentTransactionalInsertAction; import io.druid.indexing.common.actions.SegmentTransactionalInsertAction;
import io.druid.indexing.common.actions.TaskActionClient; import io.druid.indexing.common.actions.TaskActionClient;
@ -240,9 +240,9 @@ public class IndexTask extends AbstractTask implements ChatHandler
} }
@Override @Override
public int getPriority() public int getDefaultPriority()
{ {
return getContextValue(Tasks.PRIORITY_KEY, Tasks.DEFAULT_BATCH_INDEX_TASK_PRIORITY); return Tasks.DEFAULT_BATCH_INDEX_TASK_PRIORITY;
} }
@Override @Override

View File

@ -33,8 +33,8 @@ import com.google.common.collect.Lists;
import com.google.common.collect.Ordering; import com.google.common.collect.Ordering;
import com.google.common.collect.Sets; import com.google.common.collect.Sets;
import com.google.common.hash.Hashing; import com.google.common.hash.Hashing;
import io.druid.indexing.common.TaskLock;
import io.druid.indexer.TaskStatus; import io.druid.indexer.TaskStatus;
import io.druid.indexing.common.TaskLock;
import io.druid.indexing.common.TaskToolbox; import io.druid.indexing.common.TaskToolbox;
import io.druid.indexing.common.actions.SegmentListUsedAction; import io.druid.indexing.common.actions.SegmentListUsedAction;
import io.druid.indexing.common.actions.TaskActionClient; import io.druid.indexing.common.actions.TaskActionClient;
@ -133,9 +133,9 @@ public abstract class MergeTaskBase extends AbstractFixedIntervalTask
} }
@Override @Override
public int getPriority() public int getDefaultPriority()
{ {
return getContextValue(Tasks.PRIORITY_KEY, Tasks.DEFAULT_MERGE_TASK_PRIORITY); return Tasks.DEFAULT_MERGE_TASK_PRIORITY;
} }
@Override @Override

View File

@ -151,6 +151,12 @@ public class NoopTask extends AbstractTask
} }
} }
@Override
public int getDefaultPriority()
{
return Tasks.DEFAULT_BATCH_INDEX_TASK_PRIORITY;
}
public static NoopTask create() public static NoopTask create()
{ {
return new NoopTask(null, null, 0, 0, null, null, null); return new NoopTask(null, null, 0, 0, null, null, null);

View File

@ -33,10 +33,10 @@ import io.druid.data.input.FirehoseFactory;
import io.druid.discovery.DiscoveryDruidNode; import io.druid.discovery.DiscoveryDruidNode;
import io.druid.discovery.DruidNodeDiscoveryProvider; import io.druid.discovery.DruidNodeDiscoveryProvider;
import io.druid.discovery.LookupNodeService; import io.druid.discovery.LookupNodeService;
import io.druid.indexer.TaskStatus;
import io.druid.indexing.common.TaskLock; import io.druid.indexing.common.TaskLock;
import io.druid.indexing.common.TaskLockType; import io.druid.indexing.common.TaskLockType;
import io.druid.indexing.common.TaskRealtimeMetricsMonitorBuilder; import io.druid.indexing.common.TaskRealtimeMetricsMonitorBuilder;
import io.druid.indexer.TaskStatus;
import io.druid.indexing.common.TaskToolbox; import io.druid.indexing.common.TaskToolbox;
import io.druid.indexing.common.actions.LockAcquireAction; import io.druid.indexing.common.actions.LockAcquireAction;
import io.druid.indexing.common.actions.LockReleaseAction; import io.druid.indexing.common.actions.LockReleaseAction;
@ -161,9 +161,9 @@ public class RealtimeIndexTask extends AbstractTask
} }
@Override @Override
public int getPriority() public int getDefaultPriority()
{ {
return getContextValue(Tasks.PRIORITY_KEY, Tasks.DEFAULT_REALTIME_TASK_PRIORITY); return Tasks.DEFAULT_REALTIME_TASK_PRIORITY;
} }
@Override @Override

View File

@ -27,7 +27,6 @@ import io.druid.indexing.common.actions.TaskActionClient;
import io.druid.query.Query; import io.druid.query.Query;
import io.druid.query.QueryRunner; import io.druid.query.QueryRunner;
import javax.annotation.Nullable;
import java.util.Map; import java.util.Map;
/** /**
@ -85,15 +84,28 @@ public interface Task
* Returns task priority. The task priority is currently used only for prioritized locking, but, in the future, it can * Returns task priority. The task priority is currently used only for prioritized locking, but, in the future, it can
* be used for task scheduling, cluster resource management, etc. * be used for task scheduling, cluster resource management, etc.
* *
* The task priority must be in taskContext if the task is submitted to the proper Overlord endpoint.
*
* It might not be in taskContext in rolling update. This returns {@link Tasks#DEFAULT_TASK_PRIORITY} in this case.
*
* @return task priority * @return task priority
* *
* @see Tasks for default task priorities * @see Tasks for default task priorities
* @see io.druid.indexing.overlord.http.OverlordResource#taskPost
*/ */
default int getPriority() default int getPriority()
{ {
return getContextValue(Tasks.PRIORITY_KEY, Tasks.DEFAULT_TASK_PRIORITY); return getContextValue(Tasks.PRIORITY_KEY, Tasks.DEFAULT_TASK_PRIORITY);
} }
/**
* Returns the default task priority. It can vary depending on the task type.
*/
default int getDefaultPriority()
{
return Tasks.DEFAULT_TASK_PRIORITY;
}
/** /**
* Returns a {@link TaskResource} for this task. Task resources define specific worker requirements a task may * Returns a {@link TaskResource} for this task. Task resources define specific worker requirements a task may
* require. * require.
@ -180,13 +192,17 @@ public interface Task
*/ */
TaskStatus run(TaskToolbox toolbox) throws Exception; TaskStatus run(TaskToolbox toolbox) throws Exception;
@Nullable default Map<String, Object> addToContext(String key, Object val)
{
getContext().put(key, val);
return getContext();
}
Map<String, Object> getContext(); Map<String, Object> getContext();
@Nullable
default <ContextValueType> ContextValueType getContextValue(String key) default <ContextValueType> ContextValueType getContextValue(String key)
{ {
return getContext() == null ? null : (ContextValueType) getContext().get(key); return (ContextValueType) getContext().get(key);
} }
default <ContextValueType> ContextValueType getContextValue(String key, ContextValueType defaultValue) default <ContextValueType> ContextValueType getContextValue(String key, ContextValueType defaultValue)

View File

@ -44,6 +44,7 @@ import io.druid.indexer.TaskStatusPlus;
import io.druid.indexing.common.actions.TaskActionClient; import io.druid.indexing.common.actions.TaskActionClient;
import io.druid.indexing.common.actions.TaskActionHolder; import io.druid.indexing.common.actions.TaskActionHolder;
import io.druid.indexing.common.task.Task; import io.druid.indexing.common.task.Task;
import io.druid.indexing.common.task.Tasks;
import io.druid.indexing.overlord.IndexerMetadataStorageAdapter; import io.druid.indexing.overlord.IndexerMetadataStorageAdapter;
import io.druid.indexing.overlord.TaskMaster; import io.druid.indexing.overlord.TaskMaster;
import io.druid.indexing.overlord.TaskQueue; import io.druid.indexing.overlord.TaskQueue;
@ -174,6 +175,11 @@ public class OverlordResource
public Response apply(TaskQueue taskQueue) public Response apply(TaskQueue taskQueue)
{ {
try { try {
// Set default priority if needed
final Integer priority = task.getContextValue(Tasks.PRIORITY_KEY);
if (priority == null) {
task.addToContext(Tasks.PRIORITY_KEY, task.getDefaultPriority());
}
taskQueue.add(task); taskQueue.add(task);
return Response.ok(ImmutableMap.of("task", task.getId())).build(); return Response.ok(ImmutableMap.of("task", task.getId())).build();
} }

View File

@ -21,9 +21,9 @@ package io.druid.indexing.overlord;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Iterables; import com.google.common.collect.Iterables;
import io.druid.indexer.TaskStatus;
import io.druid.indexing.common.TaskLock; import io.druid.indexing.common.TaskLock;
import io.druid.indexing.common.TaskLockType; import io.druid.indexing.common.TaskLockType;
import io.druid.indexer.TaskStatus;
import io.druid.indexing.common.config.TaskStorageConfig; import io.druid.indexing.common.config.TaskStorageConfig;
import io.druid.indexing.common.task.NoopTask; import io.druid.indexing.common.task.NoopTask;
import io.druid.indexing.common.task.Task; import io.druid.indexing.common.task.Task;

View File

@ -37,6 +37,7 @@ import io.druid.indexing.common.actions.TaskActionClientFactory;
import io.druid.indexing.common.config.TaskStorageConfig; import io.druid.indexing.common.config.TaskStorageConfig;
import io.druid.indexing.common.task.NoopTask; import io.druid.indexing.common.task.NoopTask;
import io.druid.indexing.common.task.Task; import io.druid.indexing.common.task.Task;
import io.druid.indexing.common.task.Tasks;
import io.druid.indexing.overlord.HeapMemoryTaskStorage; import io.druid.indexing.overlord.HeapMemoryTaskStorage;
import io.druid.indexing.overlord.IndexerMetadataStorageAdapter; import io.druid.indexing.overlord.IndexerMetadataStorageAdapter;
import io.druid.indexing.overlord.TaskLockbox; import io.druid.indexing.overlord.TaskLockbox;
@ -80,6 +81,7 @@ import javax.ws.rs.core.Response;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable; import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
@ -231,6 +233,12 @@ public class OverlordTest
Assert.assertEquals(200, response.getStatus()); Assert.assertEquals(200, response.getStatus());
Assert.assertEquals(ImmutableMap.of("task", taskId_0), response.getEntity()); Assert.assertEquals(ImmutableMap.of("task", taskId_0), response.getEntity());
final Map<String, Object> context = task_0.getContext();
Assert.assertEquals(1, context.size());
final Integer priority = (Integer) context.get(Tasks.PRIORITY_KEY);
Assert.assertNotNull(priority);
Assert.assertEquals(Tasks.DEFAULT_BATCH_INDEX_TASK_PRIORITY, priority.intValue());
// Duplicate task - should fail // Duplicate task - should fail
response = overlordResource.taskPost(task_0, req); response = overlordResource.taskPost(task_0, req);
Assert.assertEquals(400, response.getStatus()); Assert.assertEquals(400, response.getStatus());

View File

@ -20,6 +20,7 @@
package io.druid.metadata; package io.druid.metadata;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import io.druid.java.util.common.StringUtils; import io.druid.java.util.common.StringUtils;
import org.joda.time.DateTime; import org.joda.time.DateTime;
import org.skife.jdbi.v2.Handle; import org.skife.jdbi.v2.Handle;
@ -31,6 +32,7 @@ import java.util.Map;
public class DerbyMetadataStorageActionHandler<EntryType, StatusType, LogType, LockType> public class DerbyMetadataStorageActionHandler<EntryType, StatusType, LogType, LockType>
extends SQLMetadataStorageActionHandler<EntryType, StatusType, LogType, LockType> extends SQLMetadataStorageActionHandler<EntryType, StatusType, LogType, LockType>
{ {
@VisibleForTesting
DerbyMetadataStorageActionHandler( DerbyMetadataStorageActionHandler(
SQLMetadataConnector connector, SQLMetadataConnector connector,
ObjectMapper jsonMapper, ObjectMapper jsonMapper,

View File

@ -22,9 +22,8 @@ package io.druid.metadata;
import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional; import com.google.common.base.Optional;
import com.google.common.base.Predicate;
import com.google.common.base.Throwables;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.google.common.collect.Maps; import com.google.common.collect.Maps;
import io.druid.indexer.TaskInfo; import io.druid.indexer.TaskInfo;
@ -128,53 +127,42 @@ public abstract class SQLMetadataStorageActionHandler<EntryType, StatusType, Log
) throws EntryExistsException ) throws EntryExistsException
{ {
try { try {
connector.retryWithHandle( getConnector().retryWithHandle(
new HandleCallback<Void>() (HandleCallback<Void>) handle -> {
{ final String sql = StringUtils.format(
@Override "INSERT INTO %s (id, created_date, datasource, payload, active, status_payload) "
public Void withHandle(Handle handle) throws Exception + "VALUES (:id, :created_date, :datasource, :payload, :active, :status_payload)",
{ getEntryTable()
handle.createStatement( );
StringUtils.format( handle.createStatement(sql)
"INSERT INTO %s (id, created_date, datasource, payload, active, status_payload) VALUES (:id, :created_date, :datasource, :payload, :active, :status_payload)", .bind("id", id)
entryTable .bind("created_date", timestamp.toString())
) .bind("datasource", dataSource)
) .bind("payload", jsonMapper.writeValueAsBytes(entry))
.bind("id", id) .bind("active", active)
.bind("created_date", timestamp.toString()) .bind("status_payload", jsonMapper.writeValueAsBytes(status))
.bind("datasource", dataSource) .execute();
.bind("payload", jsonMapper.writeValueAsBytes(entry)) return null;
.bind("active", active)
.bind("status_payload", jsonMapper.writeValueAsBytes(status))
.execute();
return null;
}
}, },
new Predicate<Throwable>() e -> getConnector().isTransientException(e) && !(isStatementException(e) && getEntry(id).isPresent())
{
@Override
public boolean apply(Throwable e)
{
final boolean isStatementException = e instanceof StatementException ||
(e instanceof CallbackFailedException
&& e.getCause() instanceof StatementException);
return connector.isTransientException(e) && !(isStatementException && getEntry(id).isPresent());
}
}
); );
} }
catch (Exception e) { catch (Exception e) {
final boolean isStatementException = e instanceof StatementException || if (isStatementException(e) && getEntry(id).isPresent()) {
(e instanceof CallbackFailedException
&& e.getCause() instanceof StatementException);
if (isStatementException && getEntry(id).isPresent()) {
throw new EntryExistsException(id, e); throw new EntryExistsException(id, e);
} else { } else {
throw Throwables.propagate(e); throw new RuntimeException(e);
} }
} }
} }
@VisibleForTesting
protected static boolean isStatementException(Throwable e)
{
return e instanceof StatementException ||
(e instanceof CallbackFailedException && e.getCause() instanceof StatementException);
}
@Override @Override
public boolean setStatus(final String entryId, final boolean active, final StatusType status) public boolean setStatus(final String entryId, final boolean active, final StatusType status)
{ {