From 1524af703dac18bb4dd05e579f37e920606f9f22 Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Fri, 27 Jul 2018 10:43:32 -0700 Subject: [PATCH] Fix IllegalArgumentException in TaskLockBox.syncFromStorage() (#6050) --- .../druid/indexing/kafka/KafkaIndexTask.java | 4 +- .../indexing/common/task/AbstractTask.java | 33 ++++++---- .../AppenderatorDriverRealtimeIndexTask.java | 6 +- .../indexing/common/task/CompactionTask.java | 4 +- .../indexing/common/task/HadoopIndexTask.java | 6 +- .../druid/indexing/common/task/IndexTask.java | 6 +- .../indexing/common/task/MergeTaskBase.java | 6 +- .../druid/indexing/common/task/NoopTask.java | 6 ++ .../common/task/RealtimeIndexTask.java | 6 +- .../io/druid/indexing/common/task/Task.java | 24 +++++-- .../overlord/http/OverlordResource.java | 6 ++ .../indexing/overlord/TaskLockboxTest.java | 2 +- .../indexing/overlord/http/OverlordTest.java | 8 +++ .../DerbyMetadataStorageActionHandler.java | 2 + .../SQLMetadataStorageActionHandler.java | 66 ++++++++----------- 15 files changed, 111 insertions(+), 74 deletions(-) diff --git a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java index 2f8f224289f..d3a58240d0f 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java +++ b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java @@ -165,9 +165,9 @@ public class KafkaIndexTask extends AbstractTask implements ChatHandler } @Override - public int getPriority() + public int getDefaultPriority() { - return getContextValue(Tasks.PRIORITY_KEY, Tasks.DEFAULT_REALTIME_TASK_PRIORITY); + return Tasks.DEFAULT_REALTIME_TASK_PRIORITY; } @Override diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/AbstractTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/AbstractTask.java index 446078ab1f0..d40e42de67a 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/AbstractTask.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/AbstractTask.java @@ -36,6 +36,7 @@ import org.joda.time.Interval; import javax.annotation.Nullable; import java.io.IOException; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.Map; @@ -64,17 +65,17 @@ public abstract class AbstractTask implements Task protected AbstractTask( String id, - String groupId, - TaskResource taskResource, + @Nullable String groupId, + @Nullable TaskResource taskResource, String dataSource, - Map context + @Nullable Map context ) { this.id = Preconditions.checkNotNull(id, "id"); this.groupId = groupId == null ? id : groupId; this.taskResource = taskResource == null ? new TaskResource(id, 1) : taskResource; this.dataSource = Preconditions.checkNotNull(dataSource, "dataSource"); - this.context = context; + this.context = context == null ? new HashMap<>() : context; } static String getOrMakeId(String id, final String typeName, String dataSource) @@ -162,11 +163,13 @@ public abstract class AbstractTask implements Task @Override public String toString() { - return Objects.toStringHelper(this) - .add("id", id) - .add("type", getType()) - .add("dataSource", dataSource) - .toString(); + return "AbstractTask{" + + "id='" + id + '\'' + + ", groupId='" + groupId + '\'' + + ", taskResource=" + taskResource + + ", dataSource='" + dataSource + '\'' + + ", context=" + context + + '}'; } /** @@ -207,13 +210,21 @@ public abstract class AbstractTask implements Task return false; } - return true; + if (!groupId.equals(that.groupId)) { + return false; + } + + if (!dataSource.equals(that.dataSource)) { + return false; + } + + return context.equals(that.context); } @Override public int hashCode() { - return id.hashCode(); + return Objects.hashCode(id, groupId, dataSource, context); } static List getTaskLocks(TaskActionClient client) throws IOException diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java index aaf8bfd04ae..5dcc35bde4a 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java @@ -39,13 +39,13 @@ import io.druid.discovery.DiscoveryDruidNode; import io.druid.discovery.DruidNodeDiscoveryProvider; import io.druid.discovery.LookupNodeService; import io.druid.indexer.IngestionState; +import io.druid.indexer.TaskStatus; import io.druid.indexing.appenderator.ActionBasedSegmentAllocator; import io.druid.indexing.appenderator.ActionBasedUsedSegmentChecker; import io.druid.indexing.common.IngestionStatsAndErrorsTaskReport; import io.druid.indexing.common.IngestionStatsAndErrorsTaskReportData; import io.druid.indexing.common.TaskRealtimeMetricsMonitorBuilder; import io.druid.indexing.common.TaskReport; -import io.druid.indexer.TaskStatus; import io.druid.indexing.common.TaskToolbox; import io.druid.indexing.common.actions.SegmentTransactionalInsertAction; import io.druid.indexing.common.actions.TaskActionClient; @@ -202,9 +202,9 @@ public class AppenderatorDriverRealtimeIndexTask extends AbstractTask implements } @Override - public int getPriority() + public int getDefaultPriority() { - return getContextValue(Tasks.PRIORITY_KEY, Tasks.DEFAULT_REALTIME_TASK_PRIORITY); + return Tasks.DEFAULT_REALTIME_TASK_PRIORITY; } @Override diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/CompactionTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/CompactionTask.java index 53855574db2..2ffd5be8a90 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/CompactionTask.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/CompactionTask.java @@ -178,9 +178,9 @@ public class CompactionTask extends AbstractTask } @Override - public int getPriority() + public int getDefaultPriority() { - return getContextValue(Tasks.PRIORITY_KEY, Tasks.DEFAULT_MERGE_TASK_PRIORITY); + return Tasks.DEFAULT_MERGE_TASK_PRIORITY; } @VisibleForTesting diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/HadoopIndexTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/HadoopIndexTask.java index b91f5edfa82..a49239abcc7 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/HadoopIndexTask.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/HadoopIndexTask.java @@ -39,12 +39,12 @@ import io.druid.indexer.IngestionState; import io.druid.indexer.MetadataStorageUpdaterJobHandler; import io.druid.indexer.TaskMetricsGetter; import io.druid.indexer.TaskMetricsUtils; +import io.druid.indexer.TaskStatus; import io.druid.indexing.common.IngestionStatsAndErrorsTaskReport; import io.druid.indexing.common.IngestionStatsAndErrorsTaskReportData; import io.druid.indexing.common.TaskLock; import io.druid.indexing.common.TaskLockType; import io.druid.indexing.common.TaskReport; -import io.druid.indexer.TaskStatus; import io.druid.indexing.common.TaskToolbox; import io.druid.indexing.common.actions.LockAcquireAction; import io.druid.indexing.common.actions.LockTryAcquireAction; @@ -171,9 +171,9 @@ public class HadoopIndexTask extends HadoopTask implements ChatHandler } @Override - public int getPriority() + public int getDefaultPriority() { - return getContextValue(Tasks.PRIORITY_KEY, Tasks.DEFAULT_BATCH_INDEX_TASK_PRIORITY); + return Tasks.DEFAULT_BATCH_INDEX_TASK_PRIORITY; } @Override diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java index a6d3c53060f..0baafbdb81a 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java @@ -41,6 +41,7 @@ import io.druid.data.input.InputRow; import io.druid.data.input.Rows; import io.druid.hll.HyperLogLogCollector; import io.druid.indexer.IngestionState; +import io.druid.indexer.TaskStatus; import io.druid.indexing.appenderator.ActionBasedSegmentAllocator; import io.druid.indexing.appenderator.ActionBasedUsedSegmentChecker; import io.druid.indexing.common.IngestionStatsAndErrorsTaskReport; @@ -48,7 +49,6 @@ import io.druid.indexing.common.IngestionStatsAndErrorsTaskReportData; import io.druid.indexing.common.TaskLock; import io.druid.indexing.common.TaskRealtimeMetricsMonitorBuilder; import io.druid.indexing.common.TaskReport; -import io.druid.indexer.TaskStatus; import io.druid.indexing.common.TaskToolbox; import io.druid.indexing.common.actions.SegmentTransactionalInsertAction; import io.druid.indexing.common.actions.TaskActionClient; @@ -240,9 +240,9 @@ public class IndexTask extends AbstractTask implements ChatHandler } @Override - public int getPriority() + public int getDefaultPriority() { - return getContextValue(Tasks.PRIORITY_KEY, Tasks.DEFAULT_BATCH_INDEX_TASK_PRIORITY); + return Tasks.DEFAULT_BATCH_INDEX_TASK_PRIORITY; } @Override diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/MergeTaskBase.java b/indexing-service/src/main/java/io/druid/indexing/common/task/MergeTaskBase.java index 77b607e1246..675f27a64f6 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/MergeTaskBase.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/MergeTaskBase.java @@ -33,8 +33,8 @@ import com.google.common.collect.Lists; import com.google.common.collect.Ordering; import com.google.common.collect.Sets; import com.google.common.hash.Hashing; -import io.druid.indexing.common.TaskLock; import io.druid.indexer.TaskStatus; +import io.druid.indexing.common.TaskLock; import io.druid.indexing.common.TaskToolbox; import io.druid.indexing.common.actions.SegmentListUsedAction; import io.druid.indexing.common.actions.TaskActionClient; @@ -133,9 +133,9 @@ public abstract class MergeTaskBase extends AbstractFixedIntervalTask } @Override - public int getPriority() + public int getDefaultPriority() { - return getContextValue(Tasks.PRIORITY_KEY, Tasks.DEFAULT_MERGE_TASK_PRIORITY); + return Tasks.DEFAULT_MERGE_TASK_PRIORITY; } @Override diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/NoopTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/NoopTask.java index 6571364ac16..fab5a9d6b06 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/NoopTask.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/NoopTask.java @@ -151,6 +151,12 @@ public class NoopTask extends AbstractTask } } + @Override + public int getDefaultPriority() + { + return Tasks.DEFAULT_BATCH_INDEX_TASK_PRIORITY; + } + public static NoopTask create() { return new NoopTask(null, null, 0, 0, null, null, null); diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java index 00bb1bdac1c..5ea6c081020 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java @@ -33,10 +33,10 @@ import io.druid.data.input.FirehoseFactory; import io.druid.discovery.DiscoveryDruidNode; import io.druid.discovery.DruidNodeDiscoveryProvider; import io.druid.discovery.LookupNodeService; +import io.druid.indexer.TaskStatus; import io.druid.indexing.common.TaskLock; import io.druid.indexing.common.TaskLockType; import io.druid.indexing.common.TaskRealtimeMetricsMonitorBuilder; -import io.druid.indexer.TaskStatus; import io.druid.indexing.common.TaskToolbox; import io.druid.indexing.common.actions.LockAcquireAction; import io.druid.indexing.common.actions.LockReleaseAction; @@ -161,9 +161,9 @@ public class RealtimeIndexTask extends AbstractTask } @Override - public int getPriority() + public int getDefaultPriority() { - return getContextValue(Tasks.PRIORITY_KEY, Tasks.DEFAULT_REALTIME_TASK_PRIORITY); + return Tasks.DEFAULT_REALTIME_TASK_PRIORITY; } @Override diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/Task.java b/indexing-service/src/main/java/io/druid/indexing/common/task/Task.java index 293bb74350d..8848f7fa8a5 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/Task.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/Task.java @@ -27,7 +27,6 @@ import io.druid.indexing.common.actions.TaskActionClient; import io.druid.query.Query; import io.druid.query.QueryRunner; -import javax.annotation.Nullable; import java.util.Map; /** @@ -85,15 +84,28 @@ public interface Task * Returns task priority. The task priority is currently used only for prioritized locking, but, in the future, it can * be used for task scheduling, cluster resource management, etc. * + * The task priority must be in taskContext if the task is submitted to the proper Overlord endpoint. + * + * It might not be in taskContext in rolling update. This returns {@link Tasks#DEFAULT_TASK_PRIORITY} in this case. + * * @return task priority * * @see Tasks for default task priorities + * @see io.druid.indexing.overlord.http.OverlordResource#taskPost */ default int getPriority() { return getContextValue(Tasks.PRIORITY_KEY, Tasks.DEFAULT_TASK_PRIORITY); } + /** + * Returns the default task priority. It can vary depending on the task type. + */ + default int getDefaultPriority() + { + return Tasks.DEFAULT_TASK_PRIORITY; + } + /** * Returns a {@link TaskResource} for this task. Task resources define specific worker requirements a task may * require. @@ -180,13 +192,17 @@ public interface Task */ TaskStatus run(TaskToolbox toolbox) throws Exception; - @Nullable + default Map addToContext(String key, Object val) + { + getContext().put(key, val); + return getContext(); + } + Map getContext(); - @Nullable default ContextValueType getContextValue(String key) { - return getContext() == null ? null : (ContextValueType) getContext().get(key); + return (ContextValueType) getContext().get(key); } default ContextValueType getContextValue(String key, ContextValueType defaultValue) diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/http/OverlordResource.java b/indexing-service/src/main/java/io/druid/indexing/overlord/http/OverlordResource.java index 68b5ff69167..5c59eefaf82 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/http/OverlordResource.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/http/OverlordResource.java @@ -44,6 +44,7 @@ import io.druid.indexer.TaskStatusPlus; import io.druid.indexing.common.actions.TaskActionClient; import io.druid.indexing.common.actions.TaskActionHolder; import io.druid.indexing.common.task.Task; +import io.druid.indexing.common.task.Tasks; import io.druid.indexing.overlord.IndexerMetadataStorageAdapter; import io.druid.indexing.overlord.TaskMaster; import io.druid.indexing.overlord.TaskQueue; @@ -174,6 +175,11 @@ public class OverlordResource public Response apply(TaskQueue taskQueue) { try { + // Set default priority if needed + final Integer priority = task.getContextValue(Tasks.PRIORITY_KEY); + if (priority == null) { + task.addToContext(Tasks.PRIORITY_KEY, task.getDefaultPriority()); + } taskQueue.add(task); return Response.ok(ImmutableMap.of("task", task.getId())).build(); } diff --git a/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLockboxTest.java b/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLockboxTest.java index 4b01bb163f5..f2dceb0ee30 100644 --- a/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLockboxTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLockboxTest.java @@ -21,9 +21,9 @@ package io.druid.indexing.overlord; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.Iterables; +import io.druid.indexer.TaskStatus; import io.druid.indexing.common.TaskLock; import io.druid.indexing.common.TaskLockType; -import io.druid.indexer.TaskStatus; import io.druid.indexing.common.config.TaskStorageConfig; import io.druid.indexing.common.task.NoopTask; import io.druid.indexing.common.task.Task; diff --git a/indexing-service/src/test/java/io/druid/indexing/overlord/http/OverlordTest.java b/indexing-service/src/test/java/io/druid/indexing/overlord/http/OverlordTest.java index 7dc3c1f2880..ca3eda8512f 100644 --- a/indexing-service/src/test/java/io/druid/indexing/overlord/http/OverlordTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/overlord/http/OverlordTest.java @@ -37,6 +37,7 @@ import io.druid.indexing.common.actions.TaskActionClientFactory; import io.druid.indexing.common.config.TaskStorageConfig; import io.druid.indexing.common.task.NoopTask; import io.druid.indexing.common.task.Task; +import io.druid.indexing.common.task.Tasks; import io.druid.indexing.overlord.HeapMemoryTaskStorage; import io.druid.indexing.overlord.IndexerMetadataStorageAdapter; import io.druid.indexing.overlord.TaskLockbox; @@ -80,6 +81,7 @@ import javax.ws.rs.core.Response; import java.util.ArrayList; import java.util.Collection; import java.util.List; +import java.util.Map; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; @@ -231,6 +233,12 @@ public class OverlordTest Assert.assertEquals(200, response.getStatus()); Assert.assertEquals(ImmutableMap.of("task", taskId_0), response.getEntity()); + final Map context = task_0.getContext(); + Assert.assertEquals(1, context.size()); + final Integer priority = (Integer) context.get(Tasks.PRIORITY_KEY); + Assert.assertNotNull(priority); + Assert.assertEquals(Tasks.DEFAULT_BATCH_INDEX_TASK_PRIORITY, priority.intValue()); + // Duplicate task - should fail response = overlordResource.taskPost(task_0, req); Assert.assertEquals(400, response.getStatus()); diff --git a/server/src/main/java/io/druid/metadata/DerbyMetadataStorageActionHandler.java b/server/src/main/java/io/druid/metadata/DerbyMetadataStorageActionHandler.java index ce05ec2c903..6986cf7906d 100644 --- a/server/src/main/java/io/druid/metadata/DerbyMetadataStorageActionHandler.java +++ b/server/src/main/java/io/druid/metadata/DerbyMetadataStorageActionHandler.java @@ -20,6 +20,7 @@ package io.druid.metadata; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.annotations.VisibleForTesting; import io.druid.java.util.common.StringUtils; import org.joda.time.DateTime; import org.skife.jdbi.v2.Handle; @@ -31,6 +32,7 @@ import java.util.Map; public class DerbyMetadataStorageActionHandler extends SQLMetadataStorageActionHandler { + @VisibleForTesting DerbyMetadataStorageActionHandler( SQLMetadataConnector connector, ObjectMapper jsonMapper, diff --git a/server/src/main/java/io/druid/metadata/SQLMetadataStorageActionHandler.java b/server/src/main/java/io/druid/metadata/SQLMetadataStorageActionHandler.java index 8374ecee07d..2c26e34ae57 100644 --- a/server/src/main/java/io/druid/metadata/SQLMetadataStorageActionHandler.java +++ b/server/src/main/java/io/druid/metadata/SQLMetadataStorageActionHandler.java @@ -22,9 +22,8 @@ package io.druid.metadata; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Optional; -import com.google.common.base.Predicate; -import com.google.common.base.Throwables; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import io.druid.indexer.TaskInfo; @@ -128,53 +127,42 @@ public abstract class SQLMetadataStorageActionHandler() - { - @Override - public Void withHandle(Handle handle) throws Exception - { - handle.createStatement( - StringUtils.format( - "INSERT INTO %s (id, created_date, datasource, payload, active, status_payload) VALUES (:id, :created_date, :datasource, :payload, :active, :status_payload)", - entryTable - ) - ) - .bind("id", id) - .bind("created_date", timestamp.toString()) - .bind("datasource", dataSource) - .bind("payload", jsonMapper.writeValueAsBytes(entry)) - .bind("active", active) - .bind("status_payload", jsonMapper.writeValueAsBytes(status)) - .execute(); - return null; - } + getConnector().retryWithHandle( + (HandleCallback) handle -> { + final String sql = StringUtils.format( + "INSERT INTO %s (id, created_date, datasource, payload, active, status_payload) " + + "VALUES (:id, :created_date, :datasource, :payload, :active, :status_payload)", + getEntryTable() + ); + handle.createStatement(sql) + .bind("id", id) + .bind("created_date", timestamp.toString()) + .bind("datasource", dataSource) + .bind("payload", jsonMapper.writeValueAsBytes(entry)) + .bind("active", active) + .bind("status_payload", jsonMapper.writeValueAsBytes(status)) + .execute(); + return null; }, - new Predicate() - { - @Override - public boolean apply(Throwable e) - { - final boolean isStatementException = e instanceof StatementException || - (e instanceof CallbackFailedException - && e.getCause() instanceof StatementException); - return connector.isTransientException(e) && !(isStatementException && getEntry(id).isPresent()); - } - } + e -> getConnector().isTransientException(e) && !(isStatementException(e) && getEntry(id).isPresent()) ); } catch (Exception e) { - final boolean isStatementException = e instanceof StatementException || - (e instanceof CallbackFailedException - && e.getCause() instanceof StatementException); - if (isStatementException && getEntry(id).isPresent()) { + if (isStatementException(e) && getEntry(id).isPresent()) { throw new EntryExistsException(id, e); } else { - throw Throwables.propagate(e); + throw new RuntimeException(e); } } } + @VisibleForTesting + protected static boolean isStatementException(Throwable e) + { + return e instanceof StatementException || + (e instanceof CallbackFailedException && e.getCause() instanceof StatementException); + } + @Override public boolean setStatus(final String entryId, final boolean active, final StatusType status) {