From 6e158704cb058e7c50db025eef1afc796ca80780 Mon Sep 17 00:00:00 2001 From: Kashif Faraz Date: Sat, 10 Jun 2023 12:15:44 +0530 Subject: [PATCH] Do not retry INSERT task into metadata if max_allowed_packet limit is violated (#14271) Changes - Add a `DruidException` which contains a user-facing error message, HTTP response code - Make `EntryExistsException` extend `DruidException` - If metadata store max_allowed_packet limit is violated while inserting a new task, throw `DruidException` with response code 400 (bad request) to prevent retries - Add `SQLMetadataConnector.isRootCausePacketTooBigException` with impl for MySQL --- .../supervisor/KinesisSupervisorTest.java | 2 +- .../storage/mysql/MySQLConnector.java | 20 +- .../storage/mysql/MySQLConnectorTest.java | 36 +++- .../parallel/ParallelIndexPhaseRunner.java | 2 +- .../overlord/HeapMemoryTaskStorage.java | 2 +- .../overlord/MetadataTaskStorage.java | 48 ++--- .../druid/indexing/overlord/TaskQueue.java | 2 +- .../overlord/http/OverlordResource.java | 30 +-- .../indexing/overlord/TaskLockboxTest.java | 38 ++-- .../SeekableStreamSupervisorStateTest.java | 20 +- .../common/exception/DruidException.java | 53 +++++ .../druid/metadata/EntryExistsException.java | 17 +- .../MetadataStorageConnectorConfig.java | 2 +- .../common/exception/DruidExceptionTest.java | 40 ++++ .../metadata/EntryExistsExceptionTest.java | 40 ++++ .../MetadataStorageConnectorConfigTest.java | 13 +- .../druid/metadata/SQLMetadataConnector.java | 20 +- .../SQLMetadataStorageActionHandler.java | 123 +++++++++--- .../metadata/SQLMetadataConnectorTest.java | 182 ++++++++---------- .../SQLMetadataStorageActionHandlerTest.java | 10 +- 20 files changed, 458 insertions(+), 242 deletions(-) create mode 100644 processing/src/main/java/org/apache/druid/common/exception/DruidException.java create mode 100644 processing/src/test/java/org/apache/druid/common/exception/DruidExceptionTest.java create mode 100644 processing/src/test/java/org/apache/druid/metadata/EntryExistsExceptionTest.java diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java index 258f80ebeb6..19caa96ead9 100644 --- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java +++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java @@ -3465,7 +3465,7 @@ public class KinesisSupervisorTest extends EasyMockSupport } @Test - public void testSuspendedNoRunningTasks() throws Exception + public void testSuspendedNoRunningTasks() { supervisor = getTestableSupervisor(1, 1, true, "PT1H", null, null, true); diff --git a/extensions-core/mysql-metadata-storage/src/main/java/org/apache/druid/metadata/storage/mysql/MySQLConnector.java b/extensions-core/mysql-metadata-storage/src/main/java/org/apache/druid/metadata/storage/mysql/MySQLConnector.java index 4098e30a544..91abb44380f 100644 --- a/extensions-core/mysql-metadata-storage/src/main/java/org/apache/druid/metadata/storage/mysql/MySQLConnector.java +++ b/extensions-core/mysql-metadata-storage/src/main/java/org/apache/druid/metadata/storage/mysql/MySQLConnector.java @@ -46,7 +46,12 @@ public class MySQLConnector extends SQLMetadataConnector private static final String SERIAL_TYPE = "BIGINT(20) AUTO_INCREMENT"; private static final String QUOTE_STRING = "`"; private static final String COLLATION = "CHARACTER SET utf8mb4 COLLATE utf8mb4_bin"; - private static final String MYSQL_TRANSIENT_EXCEPTION_CLASS_NAME = "com.mysql.jdbc.exceptions.MySQLTransientException"; + private static final String MYSQL_TRANSIENT_EXCEPTION_CLASS_NAME + = "com.mysql.jdbc.exceptions.MySQLTransientException"; + private static final String MARIA_DB_PACKET_EXCEPTION_CLASS_NAME + = "org.mariadb.jdbc.internal.util.exceptions.MaxAllowedPacketException"; + private static final String MYSQL_PACKET_EXCEPTION_CLASS_NAME + = "com.mysql.jdbc.PacketTooBigException"; @Nullable private final Class myTransientExceptionClass; @@ -218,6 +223,19 @@ public class MySQLConnector extends SQLMetadataConnector return false; } + @Override + protected boolean isRootCausePacketTooBigException(Throwable t) + { + if (t == null) { + return false; + } + + final String className = t.getClass().getName(); + return MARIA_DB_PACKET_EXCEPTION_CLASS_NAME.equals(className) + || MYSQL_PACKET_EXCEPTION_CLASS_NAME.equals(className) + || isRootCausePacketTooBigException(t.getCause()); + } + @Override public Void insertOrUpdate( final String tableName, diff --git a/extensions-core/mysql-metadata-storage/src/test/java/org/apache/druid/metadata/storage/mysql/MySQLConnectorTest.java b/extensions-core/mysql-metadata-storage/src/test/java/org/apache/druid/metadata/storage/mysql/MySQLConnectorTest.java index 23ce4628223..514d01b6a59 100644 --- a/extensions-core/mysql-metadata-storage/src/test/java/org/apache/druid/metadata/storage/mysql/MySQLConnectorTest.java +++ b/extensions-core/mysql-metadata-storage/src/test/java/org/apache/druid/metadata/storage/mysql/MySQLConnectorTest.java @@ -29,6 +29,7 @@ import org.junit.Test; import java.sql.SQLException; import java.sql.SQLTransientConnectionException; +import java.sql.SQLTransientException; public class MySQLConnectorTest { @@ -59,10 +60,10 @@ public class MySQLConnectorTest Assert.assertTrue(connector.connectorIsTransientException(new MySQLTransientException())); Assert.assertTrue(connector.connectorIsTransientException(new MySQLTransactionRollbackException())); Assert.assertTrue( - connector.connectorIsTransientException(new SQLException("some transient failure", "wtf", 1317)) + connector.connectorIsTransientException(new SQLException("some transient failure", "s0", 1317)) ); Assert.assertFalse( - connector.connectorIsTransientException(new SQLException("totally realistic test data", "wtf", 1337)) + connector.connectorIsTransientException(new SQLException("totally realistic test data", "s0", 1337)) ); // this method does not specially handle normal transient exceptions either, since it is not vendor specific Assert.assertFalse( @@ -82,16 +83,43 @@ public class MySQLConnectorTest // no vendor specific for MariaDb, so should always be false Assert.assertFalse(connector.connectorIsTransientException(new MySQLTransientException())); Assert.assertFalse( - connector.connectorIsTransientException(new SQLException("some transient failure", "wtf", 1317)) + connector.connectorIsTransientException(new SQLException("some transient failure", "s0", 1317)) ); Assert.assertFalse( - connector.connectorIsTransientException(new SQLException("totally realistic test data", "wtf", 1337)) + connector.connectorIsTransientException(new SQLException("totally realistic test data", "s0", 1337)) ); Assert.assertFalse( connector.connectorIsTransientException(new SQLTransientConnectionException("transient")) ); } + @Test + public void testIsRootCausePacketTooBigException() + { + MySQLConnector connector = new MySQLConnector( + CONNECTOR_CONFIG_SUPPLIER, + TABLES_CONFIG_SUPPLIER, + new MySQLConnectorSslConfig(), + MYSQL_DRIVER_CONFIG + ); + + // The test method should return true only for + // mariadb.MaxAllowedPacketException or mysql.PacketTooBigException. + // Verifying this requires creating a mock Class object, but Class is final + // and has only a private constructor. It would be overkill to try to mock it. + + // Verify some of the false cases + Assert.assertFalse( + connector.isRootCausePacketTooBigException(new SQLException()) + ); + Assert.assertFalse( + connector.isRootCausePacketTooBigException(new SQLTransientException()) + ); + Assert.assertFalse( + connector.isRootCausePacketTooBigException(new MySQLTransientException()) + ); + } + @Test public void testLimitClause() { diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexPhaseRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexPhaseRunner.java index 4725b43d64f..3b725388f94 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexPhaseRunner.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexPhaseRunner.java @@ -242,7 +242,7 @@ public abstract class ParallelIndexPhaseRunner spec ) { - LOG.info("Submit a new task for spec[%s]", spec.getId()); + LOG.info("Submitting a new task for spec[%s]", spec.getId()); final ListenableFuture> future = taskMonitor.submit(spec); Futures.addCallback( future, diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/HeapMemoryTaskStorage.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/HeapMemoryTaskStorage.java index 8ef5970982a..efbd3ef181f 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/HeapMemoryTaskStorage.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/HeapMemoryTaskStorage.java @@ -90,7 +90,7 @@ public class HeapMemoryTaskStorage implements TaskStorage TaskStuff newTaskStuff = new TaskStuff(task, status, DateTimes.nowUtc(), task.getDataSource()); TaskStuff alreadyExisted = tasks.putIfAbsent(task.getId(), newTaskStuff); if (alreadyExisted != null) { - throw new EntryExistsException(task.getId()); + throw new EntryExistsException("Task", task.getId()); } log.info("Inserted task %s with status: %s", task.getId(), status); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/MetadataTaskStorage.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/MetadataTaskStorage.java index 7c7fbd3f53c..2735802b2d7 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/MetadataTaskStorage.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/MetadataTaskStorage.java @@ -20,13 +20,13 @@ package org.apache.druid.indexing.overlord; import com.fasterxml.jackson.core.type.TypeReference; -import com.google.common.base.Function; import com.google.common.base.Optional; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterables; import com.google.common.collect.Maps; import com.google.inject.Inject; +import org.apache.druid.common.exception.DruidException; import org.apache.druid.indexer.TaskInfo; import org.apache.druid.indexer.TaskStatus; import org.apache.druid.indexer.TaskStatusPlus; @@ -139,7 +139,7 @@ public class MetadataTaskStorage implements TaskStorage status.getId() ); - log.info("Inserting task %s with status: %s", task.getId(), status); + log.info("Inserting task [%s] with status [%s].", task.getId(), status); try { handler.insert( @@ -153,12 +153,11 @@ public class MetadataTaskStorage implements TaskStorage task.getGroupId() ); } + catch (DruidException e) { + throw e; + } catch (Exception e) { - if (e instanceof EntryExistsException) { - throw (EntryExistsException) e; - } else { - throw new RuntimeException(e); - } + throw new RuntimeException(e); } } @@ -167,15 +166,12 @@ public class MetadataTaskStorage implements TaskStorage { Preconditions.checkNotNull(status, "status"); - log.info("Updating task %s to status: %s", status.getId(), status); + final String taskId = status.getId(); + log.info("Updating status of task [%s] to [%s].", taskId, status); - final boolean set = handler.setStatus( - status.getId(), - status.isRunnable(), - status - ); + final boolean set = handler.setStatus(taskId, status.isRunnable(), status); if (!set) { - throw new ISE("Active task not found: %s", status.getId()); + throw new ISE("No active task for id [%s]", taskId); } } @@ -279,10 +275,8 @@ public class MetadataTaskStorage implements TaskStorage Preconditions.checkNotNull(taskLock, "taskLock"); log.info( - "Adding lock on interval[%s] version[%s] for task: %s", - taskLock.getInterval(), - taskLock.getVersion(), - taskid + "Adding lock on interval[%s] version[%s] for task [%s].", + taskLock.getInterval(), taskLock.getVersion(), taskid ); handler.addLock(taskid, taskLock); @@ -296,15 +290,13 @@ public class MetadataTaskStorage implements TaskStorage Preconditions.checkNotNull(newLock, "newLock"); log.info( - "Replacing an existing lock[%s] with a new lock[%s] for task: %s", - oldLock, - newLock, - taskid + "Replacing an existing lock[%s] with a new lock[%s] for task [%s].", + oldLock, newLock, taskid ); final Long oldLockId = handler.getLockId(taskid, oldLock); if (oldLockId == null) { - throw new ISE("Cannot find an existing lock[%s]", oldLock); + throw new ISE("Cannot find lock[%s] for task [%s]", oldLock, taskid); } handler.replaceLock(taskid, oldLockId, newLock); @@ -336,14 +328,8 @@ public class MetadataTaskStorage implements TaskStorage { return ImmutableList.copyOf( Iterables.transform( - getLocksWithIds(taskid).entrySet(), new Function, TaskLock>() - { - @Override - public TaskLock apply(Map.Entry e) - { - return e.getValue(); - } - } + getLocksWithIds(taskid).entrySet(), + Entry::getValue ) ); } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java index c508876f0ce..0beccce3dd0 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java @@ -482,7 +482,7 @@ public class TaskQueue IdUtils.validateId("Task ID", task.getId()); if (taskStorage.getTask(task.getId()).isPresent()) { - throw new EntryExistsException(StringUtils.format("Task %s already exists", task.getId())); + throw new EntryExistsException("Task", task.getId()); } // Set forceTimeChunkLock before adding task spec to taskStorage, so that we can see always consistent task spec. diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java index a64b67b3b83..c6de461f1ff 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java @@ -36,6 +36,7 @@ import org.apache.druid.client.indexing.IndexingWorker; import org.apache.druid.client.indexing.IndexingWorkerInfo; import org.apache.druid.common.config.ConfigManager.SetResult; import org.apache.druid.common.config.JacksonConfigManager; +import org.apache.druid.common.exception.DruidException; import org.apache.druid.indexer.RunnerTaskState; import org.apache.druid.indexer.TaskInfo; import org.apache.druid.indexer.TaskLocation; @@ -64,7 +65,6 @@ import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.UOE; import org.apache.druid.java.util.common.logger.Logger; -import org.apache.druid.metadata.EntryExistsException; import org.apache.druid.metadata.TaskLookup; import org.apache.druid.metadata.TaskLookup.ActiveTaskLookup; import org.apache.druid.metadata.TaskLookup.CompleteTaskLookup; @@ -222,25 +222,15 @@ public class OverlordResource return asLeaderWith( taskMaster.getTaskQueue(), - new Function() - { - @Override - public Response apply(TaskQueue taskQueue) - { - try { - taskQueue.add(task); - return Response.ok(ImmutableMap.of("task", task.getId())).build(); - } - catch (EntryExistsException e) { - return Response.status(Response.Status.BAD_REQUEST) - .entity( - ImmutableMap.of( - "error", - StringUtils.format("Task[%s] already exists!", task.getId()) - ) - ) - .build(); - } + taskQueue -> { + try { + taskQueue.add(task); + return Response.ok(ImmutableMap.of("task", task.getId())).build(); + } + catch (DruidException e) { + return Response.status(e.getResponseCode()) + .entity(ImmutableMap.of("error", e.getMessage())) + .build(); } } ); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java index 15e59227c27..c9f9d72beaa 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java @@ -149,7 +149,7 @@ public class TaskLockboxTest } @Test - public void testLock() throws Exception + public void testLock() { validator.expectLockCreated( TaskLockType.EXCLUSIVE, @@ -176,7 +176,7 @@ public class TaskLockboxTest } @Test - public void testTrySharedLock() throws Exception + public void testTrySharedLock() { final Interval interval = Intervals.of("2017-01/2017-02"); @@ -427,7 +427,7 @@ public class TaskLockboxTest } @Test - public void testSyncWithUnknownTaskTypesFromModuleNotLoaded() throws Exception + public void testSyncWithUnknownTaskTypesFromModuleNotLoaded() { // ensure that if we don't know how to deserialize a task it won't explode the lockbox // (or anything else that uses taskStorage.getActiveTasks() and doesn't expect null which is most things) @@ -1201,7 +1201,7 @@ public class TaskLockboxTest } @Test - public void testGetLockedIntervalsForLowPriorityTask() throws Exception + public void testGetLockedIntervalsForLowPriorityTask() { // Acquire lock for a low priority task final Task lowPriorityTask = NoopTask.create(5); @@ -1221,7 +1221,7 @@ public class TaskLockboxTest } @Test - public void testGetLockedIntervalsForEqualPriorityTask() throws Exception + public void testGetLockedIntervalsForEqualPriorityTask() { // Acquire lock for a low priority task final Task task = NoopTask.create(5); @@ -1245,7 +1245,7 @@ public class TaskLockboxTest } @Test - public void testExclusiveLockCompatibility() throws Exception + public void testExclusiveLockCompatibility() { final TaskLock theLock = validator.expectLockCreated( TaskLockType.EXCLUSIVE, @@ -1282,7 +1282,7 @@ public class TaskLockboxTest } @Test - public void testExclusiveLockCanRevokeAllIncompatible() throws Exception + public void testExclusiveLockCanRevokeAllIncompatible() { final TaskLockboxValidator validator = new TaskLockboxValidator(lockbox, taskStorage); @@ -1325,7 +1325,7 @@ public class TaskLockboxTest } @Test - public void testSharedLockCompatibility() throws Exception + public void testSharedLockCompatibility() { final TaskLock theLock = validator.expectLockCreated( TaskLockType.SHARED, @@ -1374,7 +1374,7 @@ public class TaskLockboxTest } @Test - public void testSharedLockCanRevokeAllIncompatible() throws Exception + public void testSharedLockCanRevokeAllIncompatible() { final TaskLock exclusiveLock = validator.expectLockCreated( TaskLockType.EXCLUSIVE, @@ -1415,7 +1415,7 @@ public class TaskLockboxTest } @Test - public void testAppendLockCompatibility() throws Exception + public void testAppendLockCompatibility() { final TaskLock theLock = validator.expectLockCreated( TaskLockType.APPEND, @@ -1473,7 +1473,7 @@ public class TaskLockboxTest } @Test - public void testAppendLockCanRevokeAllIncompatible() throws Exception + public void testAppendLockCanRevokeAllIncompatible() { final TaskLock sharedLock = validator.expectLockCreated( TaskLockType.SHARED, @@ -1521,7 +1521,7 @@ public class TaskLockboxTest @Test - public void testReplaceLockCompatibility() throws Exception + public void testReplaceLockCompatibility() { final TaskLock theLock = validator.expectLockCreated( TaskLockType.REPLACE, @@ -1566,7 +1566,7 @@ public class TaskLockboxTest } @Test - public void testReplaceLockCanRevokeAllIncompatible() throws Exception + public void testReplaceLockCanRevokeAllIncompatible() { final TaskLock appendLock0 = validator.expectLockCreated( TaskLockType.APPEND, @@ -1619,7 +1619,7 @@ public class TaskLockboxTest } @Test - public void testGetLockedIntervalsForRevokedLocks() throws Exception + public void testGetLockedIntervalsForRevokedLocks() { // Acquire lock for a low priority task final Task lowPriorityTask = NoopTask.create(5); @@ -1663,7 +1663,7 @@ public class TaskLockboxTest } @Test - public void testFailedToReacquireTaskLock() throws Exception + public void testFailedToReacquireTaskLock() { // Tasks to be failed have a group id with the substring "FailingLockAcquisition" // Please refer to NullLockPosseTaskLockbox @@ -1704,7 +1704,7 @@ public class TaskLockboxTest } @Test - public void testConflictsWithOverlappingSharedLocks() throws Exception + public void testConflictsWithOverlappingSharedLocks() { TaskLock conflictingLock = validator.expectLockCreated( TaskLockType.SHARED, @@ -1744,7 +1744,7 @@ public class TaskLockboxTest this.taskStorage = taskStorage; } - public TaskLock expectLockCreated(TaskLockType type, Interval interval, int priority) throws Exception + public TaskLock expectLockCreated(TaskLockType type, Interval interval, int priority) { final TaskLock lock = tryTaskLock(type, interval, priority); Assert.assertNotNull(lock); @@ -1757,7 +1757,7 @@ public class TaskLockboxTest lockbox.revokeLock(lockToTaskIdMap.get(lock), lock); } - public void expectLockNotGranted(TaskLockType type, Interval interval, int priority) throws Exception + public void expectLockNotGranted(TaskLockType type, Interval interval, int priority) { final TaskLock lock = tryTaskLock(type, interval, priority); Assert.assertNull(lock); @@ -1785,7 +1785,7 @@ public class TaskLockboxTest } } - private TaskLock tryTaskLock(TaskLockType type, Interval interval, int priority) throws Exception + private TaskLock tryTaskLock(TaskLockType type, Interval interval, int priority) { final Task task = NoopTask.create(priority); tasks.add(task); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java index d622984b0ba..b8d5a556eef 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java @@ -139,7 +139,7 @@ public class SeekableStreamSupervisorStateTest extends EasyMockSupport taskClientFactory = createMock(SeekableStreamIndexTaskClientFactory.class); spec = createMock(SeekableStreamSupervisorSpec.class); indexTaskClient = createMock(SeekableStreamIndexTaskClient.class); - recordSupplier = (RecordSupplier) createMock(RecordSupplier.class); + recordSupplier = createMock(RecordSupplier.class); rowIngestionMetersFactory = new TestUtils().getRowIngestionMetersFactory(); @@ -175,7 +175,7 @@ public class SeekableStreamSupervisorStateTest extends EasyMockSupport } @Test - public void testRunning() throws Exception + public void testRunning() { EasyMock.expect(spec.isSuspended()).andReturn(false).anyTimes(); EasyMock.expect(recordSupplier.getPartitionIds(STREAM)).andReturn(ImmutableSet.of(SHARD_ID)).anyTimes(); @@ -214,7 +214,7 @@ public class SeekableStreamSupervisorStateTest extends EasyMockSupport } @Test - public void testRunningStreamGetSequenceNumberReturnsNull() throws Exception + public void testRunningStreamGetSequenceNumberReturnsNull() { EasyMock.reset(recordSupplier); EasyMock.expect(recordSupplier.getAssignment()).andReturn(ImmutableSet.of(SHARD0_PARTITION)).anyTimes(); @@ -266,7 +266,7 @@ public class SeekableStreamSupervisorStateTest extends EasyMockSupport } @Test - public void testConnectingToStreamFail() throws Exception + public void testConnectingToStreamFail() { EasyMock.expect(spec.isSuspended()).andReturn(false).anyTimes(); EasyMock.expect(recordSupplier.getPartitionIds(STREAM)) @@ -321,7 +321,7 @@ public class SeekableStreamSupervisorStateTest extends EasyMockSupport } @Test - public void testConnectingToStreamFailRecoveryFailRecovery() throws Exception + public void testConnectingToStreamFailRecoveryFailRecovery() { EasyMock.expect(spec.isSuspended()).andReturn(false).anyTimes(); EasyMock.expect(recordSupplier.getPartitionIds(STREAM)) @@ -395,7 +395,7 @@ public class SeekableStreamSupervisorStateTest extends EasyMockSupport } @Test - public void testDiscoveringInitialTasksFailRecoveryFail() throws Exception + public void testDiscoveringInitialTasksFailRecoveryFail() { EasyMock.expect(spec.isSuspended()).andReturn(false).anyTimes(); EasyMock.expect(recordSupplier.getPartitionIds(STREAM)).andReturn(ImmutableSet.of(SHARD_ID)).anyTimes(); @@ -560,7 +560,7 @@ public class SeekableStreamSupervisorStateTest extends EasyMockSupport } @Test - public void testCreatingTasksFailRecoveryFail() throws Exception + public void testCreatingTasksFailRecoveryFail() { EasyMock.expect(spec.isSuspended()).andReturn(false).anyTimes(); EasyMock.expect(recordSupplier.getPartitionIds(STREAM)).andReturn(ImmutableSet.of(SHARD_ID)).anyTimes(); @@ -638,7 +638,7 @@ public class SeekableStreamSupervisorStateTest extends EasyMockSupport } @Test - public void testSuspended() throws Exception + public void testSuspended() { EasyMock.expect(spec.isSuspended()).andReturn(true).anyTimes(); EasyMock.expect(recordSupplier.getPartitionIds(STREAM)).andReturn(ImmutableSet.of(SHARD_ID)).anyTimes(); @@ -677,7 +677,7 @@ public class SeekableStreamSupervisorStateTest extends EasyMockSupport } @Test - public void testStopping() throws Exception + public void testStopping() { EasyMock.expect(spec.isSuspended()).andReturn(false).anyTimes(); EasyMock.expect(recordSupplier.getPartitionIds(STREAM)).andReturn(ImmutableSet.of(SHARD_ID)).anyTimes(); @@ -718,7 +718,7 @@ public class SeekableStreamSupervisorStateTest extends EasyMockSupport } @Test - public void testStoppingGracefully() throws Exception + public void testStoppingGracefully() { EasyMock.expect(spec.isSuspended()).andReturn(false).anyTimes(); EasyMock.expect(recordSupplier.getPartitionIds(STREAM)).andReturn(ImmutableSet.of(SHARD_ID)).anyTimes(); diff --git a/processing/src/main/java/org/apache/druid/common/exception/DruidException.java b/processing/src/main/java/org/apache/druid/common/exception/DruidException.java new file mode 100644 index 00000000000..638653fc5ce --- /dev/null +++ b/processing/src/main/java/org/apache/druid/common/exception/DruidException.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.common.exception; + +/** + * A generic exception thrown by Druid. + */ +public class DruidException extends RuntimeException +{ + public static final int HTTP_CODE_SERVER_ERROR = 500; + public static final int HTTP_CODE_BAD_REQUEST = 400; + + private final int responseCode; + private final boolean isTransient; + + public DruidException(String message, int responseCode, Throwable cause, boolean isTransient) + { + super(message, cause); + this.responseCode = responseCode; + this.isTransient = isTransient; + } + + public int getResponseCode() + { + return responseCode; + } + + /** + * Returns true if this is a transient exception and might go away if the + * operation is retried. + */ + public boolean isTransient() + { + return isTransient; + } +} diff --git a/processing/src/main/java/org/apache/druid/metadata/EntryExistsException.java b/processing/src/main/java/org/apache/druid/metadata/EntryExistsException.java index 76d71de217b..f8a7d860d53 100644 --- a/processing/src/main/java/org/apache/druid/metadata/EntryExistsException.java +++ b/processing/src/main/java/org/apache/druid/metadata/EntryExistsException.java @@ -19,19 +19,26 @@ package org.apache.druid.metadata; +import org.apache.druid.common.exception.DruidException; import org.apache.druid.java.util.common.StringUtils; -public class EntryExistsException extends Exception +/** + * A non-transient Druid metadata exception thrown when trying to insert a + * duplicate entry in the metadata. + */ +public class EntryExistsException extends DruidException { - public EntryExistsException(String entryId, Throwable t) + private static final int HTTP_BAD_REQUEST = 400; + + public EntryExistsException(String entryType, String entryId) { - super(StringUtils.format("Entry already exists: %s", entryId), t); + this(entryType, entryId, null); } - public EntryExistsException(String entryId) + public EntryExistsException(String entryType, String entryId, Throwable t) { - this(entryId, null); + super(StringUtils.format("%s [%s] already exists.", entryType, entryId), HTTP_BAD_REQUEST, t, false); } } diff --git a/processing/src/main/java/org/apache/druid/metadata/MetadataStorageConnectorConfig.java b/processing/src/main/java/org/apache/druid/metadata/MetadataStorageConnectorConfig.java index 2e6767f0b77..38f06f0b53c 100644 --- a/processing/src/main/java/org/apache/druid/metadata/MetadataStorageConnectorConfig.java +++ b/processing/src/main/java/org/apache/druid/metadata/MetadataStorageConnectorConfig.java @@ -56,7 +56,7 @@ public class MetadataStorageConnectorConfig String connectUri, String user, String password, - Map properties + Map properties ) { MetadataStorageConnectorConfig config = new MetadataStorageConnectorConfig(); diff --git a/processing/src/test/java/org/apache/druid/common/exception/DruidExceptionTest.java b/processing/src/test/java/org/apache/druid/common/exception/DruidExceptionTest.java new file mode 100644 index 00000000000..9f0d53930e5 --- /dev/null +++ b/processing/src/test/java/org/apache/druid/common/exception/DruidExceptionTest.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.common.exception; + +import org.junit.Assert; +import org.junit.Test; + +public class DruidExceptionTest +{ + @Test + public void testExceptionMessageAndResponseCode() + { + DruidException exception = Assert.assertThrows( + DruidException.class, + () -> { + throw new DruidException("an error has occurred", 401, null, true); + } + ); + Assert.assertEquals("an error has occurred", exception.getMessage()); + Assert.assertEquals(401, exception.getResponseCode()); + Assert.assertTrue(exception.isTransient()); + } +} diff --git a/processing/src/test/java/org/apache/druid/metadata/EntryExistsExceptionTest.java b/processing/src/test/java/org/apache/druid/metadata/EntryExistsExceptionTest.java new file mode 100644 index 00000000000..68b8a39b0d4 --- /dev/null +++ b/processing/src/test/java/org/apache/druid/metadata/EntryExistsExceptionTest.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.metadata; + +import org.apache.druid.common.exception.DruidException; +import org.junit.Assert; +import org.junit.Test; + +public class EntryExistsExceptionTest +{ + @Test + public void testExceptionMessageAndResponseCode() + { + EntryExistsException exception = Assert.assertThrows( + EntryExistsException.class, + () -> { + throw new EntryExistsException("task", "100"); + } + ); + Assert.assertEquals("task [100] already exists.", exception.getMessage()); + Assert.assertEquals(DruidException.HTTP_CODE_BAD_REQUEST, exception.getResponseCode()); + } +} diff --git a/processing/src/test/java/org/apache/druid/metadata/MetadataStorageConnectorConfigTest.java b/processing/src/test/java/org/apache/druid/metadata/MetadataStorageConnectorConfigTest.java index 03a10c9540a..31c3ad33bcc 100644 --- a/processing/src/test/java/org/apache/druid/metadata/MetadataStorageConnectorConfigTest.java +++ b/processing/src/test/java/org/apache/druid/metadata/MetadataStorageConnectorConfigTest.java @@ -77,8 +77,8 @@ public class MetadataStorageConnectorConfigTest "user", "\"nothing\"" ); - Assert.assertTrue(metadataStorageConnectorConfig.equals(metadataStorageConnectorConfig2)); - Assert.assertTrue(metadataStorageConnectorConfig.hashCode() == metadataStorageConnectorConfig2.hashCode()); + Assert.assertEquals(metadataStorageConnectorConfig, metadataStorageConnectorConfig2); + Assert.assertEquals(metadataStorageConnectorConfig.hashCode(), metadataStorageConnectorConfig2.hashCode()); } private static final ObjectMapper JSON_MAPPER = new ObjectMapper(); @@ -193,12 +193,9 @@ public class MetadataStorageConnectorConfigTest @Test public void testCreate() { - Map props = ImmutableMap.of("key", "value"); - MetadataStorageConnectorConfig config = MetadataStorageConnectorConfig.create( - "connectURI", - "user", - "pwd", - props); + Map props = ImmutableMap.of("key", "value"); + MetadataStorageConnectorConfig config = + MetadataStorageConnectorConfig.create("connectURI", "user", "pwd", props); Assert.assertEquals("connectURI", config.getConnectURI()); Assert.assertEquals("user", config.getUser()); Assert.assertEquals("pwd", config.getPassword()); diff --git a/server/src/main/java/org/apache/druid/metadata/SQLMetadataConnector.java b/server/src/main/java/org/apache/druid/metadata/SQLMetadataConnector.java index 2e24f97aa1c..108128e09c2 100644 --- a/server/src/main/java/org/apache/druid/metadata/SQLMetadataConnector.java +++ b/server/src/main/java/org/apache/druid/metadata/SQLMetadataConnector.java @@ -169,7 +169,7 @@ public abstract class SQLMetadataConnector implements MetadataStorageConnector || e instanceof SQLTransientException || e instanceof SQLRecoverableException || e instanceof UnableToObtainConnectionException - || e instanceof UnableToExecuteStatementException + || (e instanceof UnableToExecuteStatementException && isTransientException(e.getCause())) || connectorIsTransientException(e) || (e instanceof SQLException && isTransientException(e.getCause())) || (e instanceof DBIException && isTransientException(e.getCause()))); @@ -183,6 +183,17 @@ public abstract class SQLMetadataConnector implements MetadataStorageConnector return false; } + /** + * Checks if the root cause of the given exception is a PacketTooBigException. + * + * @return false by default. Specific implementations should override this method + * to correctly classify their packet exceptions. + */ + protected boolean isRootCausePacketTooBigException(Throwable t) + { + return false; + } + public void createTable(final String tableName, final Iterable sql) { try { @@ -336,12 +347,7 @@ public abstract class SQLMetadataConnector implements MetadataStorageConnector { try { DatabaseMetaData databaseMetaData = handle.getConnection().getMetaData(); - ResultSet columns = databaseMetaData.getColumns( - null, - null, - table, - column - ); + ResultSet columns = databaseMetaData.getColumns(null, null, table, column); return columns.next(); } catch (SQLException e) { diff --git a/server/src/main/java/org/apache/druid/metadata/SQLMetadataStorageActionHandler.java b/server/src/main/java/org/apache/druid/metadata/SQLMetadataStorageActionHandler.java index ef083fd36e0..8d84432ff1e 100644 --- a/server/src/main/java/org/apache/druid/metadata/SQLMetadataStorageActionHandler.java +++ b/server/src/main/java/org/apache/druid/metadata/SQLMetadataStorageActionHandler.java @@ -28,6 +28,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Optional; import com.google.common.base.Throwables; import com.google.common.collect.Maps; +import org.apache.druid.common.exception.DruidException; import org.apache.druid.indexer.TaskIdentifier; import org.apache.druid.indexer.TaskInfo; import org.apache.druid.java.util.common.DateTimes; @@ -165,34 +166,61 @@ public abstract class SQLMetadataStorageActionHandler) handle -> { - final String sql = StringUtils.format( - "INSERT INTO %s (id, created_date, datasource, payload, type, group_id, active, status_payload) " - + "VALUES (:id, :created_date, :datasource, :payload, :type, :group_id, :active, :status_payload)", - getEntryTable() - ); - handle.createStatement(sql) - .bind("id", id) - .bind("created_date", timestamp.toString()) - .bind("datasource", dataSource) - .bind("payload", jsonMapper.writeValueAsBytes(entry)) - .bind("type", type) - .bind("group_id", groupId) - .bind("active", active) - .bind("status_payload", jsonMapper.writeValueAsBytes(status)) - .execute(); - return null; - }, - e -> getConnector().isTransientException(e) && !(isStatementException(e) && getEntry(id).isPresent()) + handle -> insertEntryWithHandle(handle, id, timestamp, dataSource, entry, active, status, type, groupId), + this::isTransientDruidException ); } + catch (CallbackFailedException e) { + propagateAsRuntimeException(e.getCause()); + } catch (Exception e) { - if (isStatementException(e) && getEntry(id).isPresent()) { - throw new EntryExistsException(id, e); - } else { - Throwables.propagateIfPossible(e); - throw new RuntimeException(e); - } + propagateAsRuntimeException(e); + } + } + + private void propagateAsRuntimeException(Throwable t) + { + Throwables.propagateIfPossible(t); + throw new RuntimeException(t); + } + + /** + * Inserts the given entry into the metadata store. This method wraps any + * exception thrown in a {@link DruidException}. When used in a HandleCallback, + * that exception is further wrapped in a {@link CallbackFailedException}. + */ + private Void insertEntryWithHandle( + Handle handle, + String entryId, + DateTime timestamp, + String dataSource, + EntryType entry, + boolean active, + StatusType status, + String type, + String groupId + ) + { + try { + final String sql = StringUtils.format( + "INSERT INTO %s (id, created_date, datasource, payload, type, group_id, active, status_payload) " + + "VALUES (:id, :created_date, :datasource, :payload, :type, :group_id, :active, :status_payload)", + getEntryTable() + ); + handle.createStatement(sql) + .bind("id", entryId) + .bind("created_date", timestamp.toString()) + .bind("datasource", dataSource) + .bind("payload", jsonMapper.writeValueAsBytes(entry)) + .bind("type", type) + .bind("group_id", groupId) + .bind("active", active) + .bind("status_payload", jsonMapper.writeValueAsBytes(status)) + .execute(); + return null; + } + catch (Throwable t) { + throw wrapInDruidException(entryId, t); } } @@ -202,6 +230,17 @@ public abstract class SQLMetadataStorageActionHandler> query; switch (entry.getKey()) { case ACTIVE: - query = createActiveTaskStreamingQuery( - handle, - dataSource - ); + query = createActiveTaskStreamingQuery(handle, dataSource); tasks.addAll(query.map(taskInfoMapper).list()); break; case COMPLETE: @@ -390,6 +426,35 @@ public abstract class SQLMetadataStorageActionHandler() - { - @Override - public Void withHandle(Handle handle) - { - for (String table : tables) { - Assert.assertTrue( - StringUtils.format("table %s was not created!", table), - connector.tableExists(handle, table) - ); - } - - String taskTable = tablesConfig.getTasksTable(); - for (String column : Arrays.asList("type", "group_id")) { - Assert.assertTrue( - StringUtils.format("Tasks table column %s was not created!", column), - connector.tableContainsColumn(handle, taskTable, column) - ); - } - - return null; + handle -> { + for (String table : tables) { + Assert.assertTrue( + StringUtils.format("table %s was not created!", table), + connector.tableExists(handle, table) + ); } + + String taskTable = tablesConfig.getTasksTable(); + for (String column : Arrays.asList("type", "group_id")) { + Assert.assertTrue( + StringUtils.format("Tasks table column %s was not created!", column), + connector.tableContainsColumn(handle, taskTable, column) + ); + } + + return null; } ); @@ -147,16 +146,8 @@ public class SQLMetadataConnectorTest private void dropTable(final String tableName) { connector.getDBI().withHandle( - new HandleCallback() - { - @Override - public Void withHandle(Handle handle) - { - handle.createStatement(StringUtils.format("DROP TABLE %s", tableName)) - .execute(); - return null; - } - } + handle -> handle.createStatement(StringUtils.format("DROP TABLE %s", tableName)) + .execute() ); } @@ -213,79 +204,74 @@ public class SQLMetadataConnectorTest } } - private MetadataStorageConnectorConfig getDbcpPropertiesFile( - boolean createTables, - String host, - int port, - String connectURI, - String user, - String pwdString, - String pwd - ) throws Exception - { - return JSON_MAPPER.readValue( - "{" + - "\"createTables\": \"" + createTables + "\"," + - "\"host\": \"" + host + "\"," + - "\"port\": \"" + port + "\"," + - "\"connectURI\": \"" + connectURI + "\"," + - "\"user\": \"" + user + "\"," + - "\"password\": " + pwdString + "," + - "\"dbcp\": {\n" + - " \"maxConnLifetimeMillis\" : 1200000,\n" + - " \"defaultQueryTimeout\" : \"30000\"\n" + - "}" + - "}", - MetadataStorageConnectorConfig.class - ); - } - @Test - public void testBasicDataSourceCreation() throws Exception + public void testBasicDataSourceCreation() { - MetadataStorageConnectorConfig config = getDbcpPropertiesFile( - true, - "host", - 1234, - "connectURI", - "user", - "{\"type\":\"default\",\"password\":\"nothing\"}", - "nothing" + Map props = ImmutableMap.of( + "maxConnLifetimeMillis", "1200000", + "defaultQueryTimeout", "30000" ); + MetadataStorageConnectorConfig config = + MetadataStorageConnectorConfig.create("connectURI", "user", "password", props); + TestSQLMetadataConnector testSQLMetadataConnector = new TestSQLMetadataConnector( Suppliers.ofInstance(config), Suppliers.ofInstance(tablesConfig) ); BasicDataSource dataSource = testSQLMetadataConnector.getDatasource(); Assert.assertEquals(dataSource.getMaxConnLifetimeMillis(), 1200000); - Assert.assertEquals((long) dataSource.getDefaultQueryTimeout(), 30000); + Assert.assertEquals(dataSource.getDefaultQueryTimeout().intValue(), 30000); } - private boolean verifyTaskTypeAndGroupId(String table, String id, String type, String groupId) + @Test + public void testIsTransientException() { - try { - return connector.retryWithHandle( - new HandleCallback() - { - @Override - public Boolean withHandle(Handle handle) throws SQLException - { - Statement statement = handle.getConnection().createStatement(); - ResultSet resultSet = statement.executeQuery( - StringUtils.format("SELECT * FROM %1$s WHERE id = '%2$s'", table, id) - ); - resultSet.next(); - boolean flag = type.equals(resultSet.getString("type")) - && groupId.equals(resultSet.getString("group_id")); - statement.close(); - return flag; - } - } - ); - } - catch (Exception e) { - e.printStackTrace(); - return false; - } + MetadataStorageConnectorConfig config = + MetadataStorageConnectorConfig.create("connectURI", "user", "password", Collections.emptyMap()); + TestSQLMetadataConnector metadataConnector = new TestSQLMetadataConnector( + Suppliers.ofInstance(config), + Suppliers.ofInstance(tablesConfig) + ); + + // Transient exceptions + Assert.assertTrue(metadataConnector.isTransientException(new RetryTransactionException(""))); + Assert.assertTrue(metadataConnector.isTransientException(new SQLRecoverableException())); + Assert.assertTrue(metadataConnector.isTransientException(new SQLTransientException())); + Assert.assertTrue(metadataConnector.isTransientException(new SQLTransientConnectionException())); + + // Non transient exceptions + Assert.assertFalse(metadataConnector.isTransientException(null)); + Assert.assertFalse(metadataConnector.isTransientException(new SQLException())); + Assert.assertFalse(metadataConnector.isTransientException(new UnableToExecuteStatementException(""))); + + // Nested transient exceptions + Assert.assertTrue( + metadataConnector.isTransientException( + new CallbackFailedException(new SQLTransientException()) + ) + ); + Assert.assertTrue( + metadataConnector.isTransientException( + new UnableToObtainConnectionException(new SQLException()) + ) + ); + Assert.assertTrue( + metadataConnector.isTransientException( + new UnableToExecuteStatementException(new SQLTransientException()) + ) + ); + + // Nested non-transient exceptions + Assert.assertFalse( + metadataConnector.isTransientException( + new CallbackFailedException(new SQLException()) + ) + ); + Assert.assertFalse( + metadataConnector.isTransientException( + new UnableToExecuteStatementException(new SQLException()) + ) + ); + } } diff --git a/server/src/test/java/org/apache/druid/metadata/SQLMetadataStorageActionHandlerTest.java b/server/src/test/java/org/apache/druid/metadata/SQLMetadataStorageActionHandlerTest.java index a7c19ff978b..2ade4f96019 100644 --- a/server/src/test/java/org/apache/druid/metadata/SQLMetadataStorageActionHandlerTest.java +++ b/server/src/test/java/org/apache/druid/metadata/SQLMetadataStorageActionHandlerTest.java @@ -127,7 +127,7 @@ public class SQLMetadataStorageActionHandlerTest } @Test - public void testEntryAndStatus() throws Exception + public void testEntryAndStatus() { Map entry = ImmutableMap.of("numericId", 1234); Map status1 = ImmutableMap.of("count", 42); @@ -256,7 +256,7 @@ public class SQLMetadataStorageActionHandlerTest } @Test(timeout = 60_000L) - public void testRepeatInsert() throws Exception + public void testRepeatInsert() { final String entryId = "abcd"; Map entry = ImmutableMap.of("a", 1); @@ -269,7 +269,7 @@ public class SQLMetadataStorageActionHandlerTest } @Test - public void testLogs() throws Exception + public void testLogs() { final String entryId = "abcd"; Map entry = ImmutableMap.of("a", 1); @@ -301,7 +301,7 @@ public class SQLMetadataStorageActionHandlerTest @Test - public void testLocks() throws Exception + public void testLocks() { final String entryId = "ABC123"; Map entry = ImmutableMap.of("a", 1); @@ -404,7 +404,7 @@ public class SQLMetadataStorageActionHandlerTest } @Test - public void testRemoveTasksOlderThan() throws Exception + public void testRemoveTasksOlderThan() { final String entryId1 = "1234"; Map entry1 = ImmutableMap.of("numericId", 1234);