diff --git a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskTest.java b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskTest.java index a59462a98af..2d53a47a3b2 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskTest.java @@ -1476,7 +1476,7 @@ public class KafkaIndexTaskTest derby.metadataTablesConfigSupplier().get(), derbyConnector ); - taskLockbox = new TaskLockbox(taskStorage); + taskLockbox = new TaskLockbox(taskStorage, 3000); final TaskActionToolbox taskActionToolbox = new TaskActionToolbox( taskLockbox, metadataStorageCoordinator, diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/TaskLockbox.java b/indexing-service/src/main/java/io/druid/indexing/overlord/TaskLockbox.java index 0c496973873..fbc3303e2cb 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/TaskLockbox.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/TaskLockbox.java @@ -40,7 +40,7 @@ import io.druid.java.util.common.ISE; import io.druid.java.util.common.Pair; import io.druid.java.util.common.guava.Comparators; import io.druid.java.util.common.guava.FunctionalIterable; - +import io.druid.server.initialization.ServerConfig; import org.joda.time.DateTime; import org.joda.time.Interval; @@ -52,6 +52,7 @@ import java.util.NavigableMap; import java.util.NavigableSet; import java.util.Set; import java.util.TreeMap; +import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; @@ -67,6 +68,7 @@ public class TaskLockbox private final TaskStorage taskStorage; private final ReentrantLock giant = new ReentrantLock(true); private final Condition lockReleaseCondition = giant.newCondition(); + protected final long lockTimeoutMillis; private static final EmittingLogger log = new EmittingLogger(TaskLockbox.class); @@ -76,10 +78,21 @@ public class TaskLockbox @Inject public TaskLockbox( - TaskStorage taskStorage + TaskStorage taskStorage, + ServerConfig serverConfig ) { this.taskStorage = taskStorage; + this.lockTimeoutMillis = serverConfig.getMaxIdleTime().getMillis(); + } + + public TaskLockbox( + TaskStorage taskStorage, + long lockTimeoutMillis + ) + { + this.taskStorage = taskStorage; + this.lockTimeoutMillis = lockTimeoutMillis; } /** @@ -179,15 +192,36 @@ public class TaskLockbox */ public TaskLock lock(final Task task, final Interval interval) throws InterruptedException { + long timeout = lockTimeoutMillis; giant.lock(); try { Optional taskLock; while (!(taskLock = tryLock(task, interval)).isPresent()) { - lockReleaseCondition.await(); + long startTime = System.currentTimeMillis(); + lockReleaseCondition.await(timeout, TimeUnit.MILLISECONDS); + long timeDelta = System.currentTimeMillis() - startTime; + if (timeDelta >= timeout) { + log.error( + "Task [%s] can not acquire lock for interval [%s] within [%s] ms", + task.getId(), + interval, + lockTimeoutMillis + ); + + throw new InterruptedException(String.format( + "Task [%s] can not acquire lock for interval [%s] within [%s] ms", + task.getId(), + interval, + lockTimeoutMillis + )); + } else { + timeout -= timeDelta; + } } return taskLock.get(); - } finally { + } + finally { giant.unlock(); } } @@ -247,6 +281,12 @@ public class TaskLockbox if (foundPosse.getTaskLock().getInterval().contains(interval) && foundPosse.getTaskLock().getGroupId().equals(task.getGroupId())) { posseToUse = foundPosse; } else { + //Could be a deadlock for LockAcquireAction: same task trying to acquire lock for overlapping interval + if (foundPosse.getTaskIds().contains(task.getId())) { + log.makeAlert("Same Task is trying to acquire lock for overlapping interval") + .addData("task", task.getId()) + .addData("interval", interval); + } return Optional.absent(); } diff --git a/indexing-service/src/test/java/io/druid/indexing/common/actions/SegmentAllocateActionTest.java b/indexing-service/src/test/java/io/druid/indexing/common/actions/SegmentAllocateActionTest.java index 4547917a251..a587b38e1c1 100644 --- a/indexing-service/src/test/java/io/druid/indexing/common/actions/SegmentAllocateActionTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/common/actions/SegmentAllocateActionTest.java @@ -25,6 +25,8 @@ import com.google.common.collect.FluentIterable; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterables; +import com.metamx.emitter.EmittingLogger; +import com.metamx.emitter.service.ServiceEmitter; import io.druid.indexing.common.TaskLock; import io.druid.indexing.common.task.NoopTask; import io.druid.indexing.common.task.Task; @@ -37,8 +39,10 @@ import io.druid.timeline.DataSegment; import io.druid.timeline.partition.LinearShardSpec; import io.druid.timeline.partition.NumberedShardSpec; import io.druid.timeline.partition.SingleDimensionShardSpec; +import org.easymock.EasyMock; import org.joda.time.DateTime; import org.junit.Assert; +import org.junit.Before; import org.junit.Rule; import org.junit.Test; import org.junit.rules.ExpectedException; @@ -55,6 +59,14 @@ public class SegmentAllocateActionTest private static final DateTime PARTY_TIME = new DateTime("1999"); private static final DateTime THE_DISTANT_FUTURE = new DateTime("3000"); + @Before + public void setUp() + { + ServiceEmitter emitter = EasyMock.createMock(ServiceEmitter.class); + EmittingLogger.registerEmitter(emitter); + EasyMock.replay(emitter); + } + @Test public void testGranularitiesFinerThanDay() throws Exception { diff --git a/indexing-service/src/test/java/io/druid/indexing/common/actions/TaskActionTestKit.java b/indexing-service/src/test/java/io/druid/indexing/common/actions/TaskActionTestKit.java index 3fe74a780eb..8a3442d9dbb 100644 --- a/indexing-service/src/test/java/io/druid/indexing/common/actions/TaskActionTestKit.java +++ b/indexing-service/src/test/java/io/druid/indexing/common/actions/TaskActionTestKit.java @@ -80,7 +80,7 @@ public class TaskActionTestKit extends ExternalResource public void before() { taskStorage = new HeapMemoryTaskStorage(new TaskStorageConfig(new Period("PT24H"))); - taskLockbox = new TaskLockbox(taskStorage); + taskLockbox = new TaskLockbox(taskStorage, 300); testDerbyConnector = new TestDerbyConnector( Suppliers.ofInstance(new MetadataStorageConnectorConfig()), Suppliers.ofInstance(metadataStorageTablesConfig) diff --git a/indexing-service/src/test/java/io/druid/indexing/common/task/RealtimeIndexTaskTest.java b/indexing-service/src/test/java/io/druid/indexing/common/task/RealtimeIndexTaskTest.java index a060f78591c..d097904c2ac 100644 --- a/indexing-service/src/test/java/io/druid/indexing/common/task/RealtimeIndexTaskTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/common/task/RealtimeIndexTaskTest.java @@ -931,7 +931,7 @@ public class RealtimeIndexTaskTest ) { final TaskConfig taskConfig = new TaskConfig(directory.getPath(), null, null, 50000, null, false, null, null); - final TaskLockbox taskLockbox = new TaskLockbox(taskStorage); + final TaskLockbox taskLockbox = new TaskLockbox(taskStorage, 300); try { taskStorage.insert(task, TaskStatus.running(task.getId())); } diff --git a/indexing-service/src/test/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactoryTest.java b/indexing-service/src/test/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactoryTest.java index 067ec8b4cd4..1c8c371dbcf 100644 --- a/indexing-service/src/test/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactoryTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactoryTest.java @@ -151,7 +151,7 @@ public class IngestSegmentFirehoseFactoryTest } INDEX_MERGER_V9.persist(index, persistDir, indexSpec); - final TaskLockbox tl = new TaskLockbox(ts); + final TaskLockbox tl = new TaskLockbox(ts, 300); final IndexerSQLMetadataStorageCoordinator mdc = new IndexerSQLMetadataStorageCoordinator(null, null, null) { final private Set published = Sets.newHashSet(); diff --git a/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java b/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java index c86041f0fa3..a2383602a59 100644 --- a/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java @@ -514,7 +514,7 @@ public class TaskLifecycleTest Preconditions.checkNotNull(taskStorage); Preconditions.checkNotNull(emitter); - taskLockbox = new TaskLockbox(taskStorage); + taskLockbox = new TaskLockbox(taskStorage, 300); tac = new LocalTaskActionClientFactory(taskStorage, new TaskActionToolbox(taskLockbox, mdc, emitter, EasyMock.createMock( SupervisorManager.class))); File tmpDir = temporaryFolder.newFolder(); diff --git a/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLockboxTest.java b/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLockboxTest.java index 600b696cf3d..c9b4245f91c 100644 --- a/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLockboxTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLockboxTest.java @@ -19,16 +19,28 @@ package io.druid.indexing.overlord; +import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Optional; import com.google.common.collect.ImmutableList; +import com.metamx.emitter.EmittingLogger; +import com.metamx.emitter.service.ServiceEmitter; +import io.druid.data.input.FirehoseFactory; import io.druid.indexing.common.TaskLock; import io.druid.indexing.common.config.TaskStorageConfig; import io.druid.indexing.common.task.NoopTask; import io.druid.indexing.common.task.Task; +import io.druid.java.util.common.ISE; +import io.druid.server.initialization.ServerConfig; +import org.easymock.EasyMock; import org.joda.time.Interval; +import org.joda.time.Period; import org.junit.Assert; import org.junit.Before; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.ExpectedException; + +import java.util.Map; public class TaskLockboxTest { @@ -36,11 +48,22 @@ public class TaskLockboxTest private TaskLockbox lockbox; + @Rule + public final ExpectedException exception = ExpectedException.none(); + @Before public void setUp() { taskStorage = new HeapMemoryTaskStorage(new TaskStorageConfig(null)); - lockbox = new TaskLockbox(taskStorage); + ServerConfig serverConfig = EasyMock.niceMock(ServerConfig.class); + EasyMock.expect(serverConfig.getMaxIdleTime()).andReturn(new Period(100)); + EasyMock.replay(serverConfig); + + ServiceEmitter emitter = EasyMock.createMock(ServiceEmitter.class); + EmittingLogger.registerEmitter(emitter); + EasyMock.replay(emitter); + + lockbox = new TaskLockbox(taskStorage, serverConfig); } @Test @@ -57,17 +80,19 @@ public class TaskLockboxTest lockbox.lock(NoopTask.create(), new Interval("2015-01-01/2015-01-02")); } - @Test(expected = IllegalStateException.class) + @Test public void testLockAfterTaskComplete() throws InterruptedException { Task task = NoopTask.create(); + exception.expect(ISE.class); + exception.expectMessage("Unable to grant lock to inactive Task"); lockbox.add(task); lockbox.remove(task); lockbox.lock(task, new Interval("2015-01-01/2015-01-02")); } @Test - public void testTryLock() throws InterruptedException + public void testTryLock() { Task task = NoopTask.create(); lockbox.add(task); @@ -86,7 +111,7 @@ public class TaskLockboxTest } @Test - public void testTrySmallerLock() throws InterruptedException + public void testTrySmallerLock() { Task task = NoopTask.create(); lockbox.add(task); @@ -109,20 +134,61 @@ public class TaskLockboxTest ); } + @Test(expected = IllegalStateException.class) - public void testTryLockForInactiveTask() throws InterruptedException + public void testTryLockForInactiveTask() { Assert.assertFalse(lockbox.tryLock(NoopTask.create(), new Interval("2015-01-01/2015-01-02")).isPresent()); } - @Test(expected = IllegalStateException.class) - public void testTryLockAfterTaskComplete() throws InterruptedException + @Test + public void testTryLockAfterTaskComplete() { Task task = NoopTask.create(); + exception.expect(ISE.class); + exception.expectMessage("Unable to grant lock to inactive Task"); lockbox.add(task); lockbox.remove(task); Assert.assertFalse(lockbox.tryLock(task, new Interval("2015-01-01/2015-01-02")).isPresent()); } + @Test + public void testTimeoutForLock() throws InterruptedException + { + Task task1 = NoopTask.create(); + Task task2 = new SomeTask(null, 0, 0, null, null, null); + + lockbox.add(task1); + lockbox.add(task2); + exception.expect(InterruptedException.class); + exception.expectMessage("can not acquire lock for interval"); + lockbox.lock(task1, new Interval("2015-01-01/2015-01-02")); + lockbox.lock(task2, new Interval("2015-01-01/2015-01-15")); + } + + public static class SomeTask extends NoopTask { + + public SomeTask( + @JsonProperty("id") String id, + @JsonProperty("runTime") long runTime, + @JsonProperty("isReadyTime") long isReadyTime, + @JsonProperty("isReadyResult") String isReadyResult, + @JsonProperty("firehose") FirehoseFactory firehoseFactory, + @JsonProperty("context") Map context + ) + { + super(id, runTime, isReadyTime, isReadyResult, firehoseFactory, context); + } + + @Override + public String getType() + { + return "someTask"; + } + + @Override + public String getGroupId() { return "someGroupId";} + + } }