Timeout for LockAcquireAction (#4461)

* Timeout for LockAcquireAction

* Static inner class.

* Rebase changes.

* makeAlert and throw exception incase of overlapping interval.

* Addressed comments.

* remove unused import.

* Addressed comments
This commit is contained in:
Akash Dwivedi 2017-07-11 02:59:32 -07:00 committed by Jihoon Son
parent a603c4b423
commit 5f411f14af
8 changed files with 134 additions and 16 deletions

View File

@ -1476,7 +1476,7 @@ public class KafkaIndexTaskTest
derby.metadataTablesConfigSupplier().get(), derby.metadataTablesConfigSupplier().get(),
derbyConnector derbyConnector
); );
taskLockbox = new TaskLockbox(taskStorage); taskLockbox = new TaskLockbox(taskStorage, 3000);
final TaskActionToolbox taskActionToolbox = new TaskActionToolbox( final TaskActionToolbox taskActionToolbox = new TaskActionToolbox(
taskLockbox, taskLockbox,
metadataStorageCoordinator, metadataStorageCoordinator,

View File

@ -40,7 +40,7 @@ import io.druid.java.util.common.ISE;
import io.druid.java.util.common.Pair; import io.druid.java.util.common.Pair;
import io.druid.java.util.common.guava.Comparators; import io.druid.java.util.common.guava.Comparators;
import io.druid.java.util.common.guava.FunctionalIterable; import io.druid.java.util.common.guava.FunctionalIterable;
import io.druid.server.initialization.ServerConfig;
import org.joda.time.DateTime; import org.joda.time.DateTime;
import org.joda.time.Interval; import org.joda.time.Interval;
@ -52,6 +52,7 @@ import java.util.NavigableMap;
import java.util.NavigableSet; import java.util.NavigableSet;
import java.util.Set; import java.util.Set;
import java.util.TreeMap; import java.util.TreeMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantLock;
@ -67,6 +68,7 @@ public class TaskLockbox
private final TaskStorage taskStorage; private final TaskStorage taskStorage;
private final ReentrantLock giant = new ReentrantLock(true); private final ReentrantLock giant = new ReentrantLock(true);
private final Condition lockReleaseCondition = giant.newCondition(); private final Condition lockReleaseCondition = giant.newCondition();
protected final long lockTimeoutMillis;
private static final EmittingLogger log = new EmittingLogger(TaskLockbox.class); private static final EmittingLogger log = new EmittingLogger(TaskLockbox.class);
@ -76,10 +78,21 @@ public class TaskLockbox
@Inject @Inject
public TaskLockbox( public TaskLockbox(
TaskStorage taskStorage TaskStorage taskStorage,
ServerConfig serverConfig
) )
{ {
this.taskStorage = taskStorage; this.taskStorage = taskStorage;
this.lockTimeoutMillis = serverConfig.getMaxIdleTime().getMillis();
}
public TaskLockbox(
TaskStorage taskStorage,
long lockTimeoutMillis
)
{
this.taskStorage = taskStorage;
this.lockTimeoutMillis = lockTimeoutMillis;
} }
/** /**
@ -179,15 +192,36 @@ public class TaskLockbox
*/ */
public TaskLock lock(final Task task, final Interval interval) throws InterruptedException public TaskLock lock(final Task task, final Interval interval) throws InterruptedException
{ {
long timeout = lockTimeoutMillis;
giant.lock(); giant.lock();
try { try {
Optional<TaskLock> taskLock; Optional<TaskLock> taskLock;
while (!(taskLock = tryLock(task, interval)).isPresent()) { while (!(taskLock = tryLock(task, interval)).isPresent()) {
lockReleaseCondition.await(); long startTime = System.currentTimeMillis();
lockReleaseCondition.await(timeout, TimeUnit.MILLISECONDS);
long timeDelta = System.currentTimeMillis() - startTime;
if (timeDelta >= timeout) {
log.error(
"Task [%s] can not acquire lock for interval [%s] within [%s] ms",
task.getId(),
interval,
lockTimeoutMillis
);
throw new InterruptedException(String.format(
"Task [%s] can not acquire lock for interval [%s] within [%s] ms",
task.getId(),
interval,
lockTimeoutMillis
));
} else {
timeout -= timeDelta;
}
} }
return taskLock.get(); return taskLock.get();
} finally { }
finally {
giant.unlock(); giant.unlock();
} }
} }
@ -247,6 +281,12 @@ public class TaskLockbox
if (foundPosse.getTaskLock().getInterval().contains(interval) && foundPosse.getTaskLock().getGroupId().equals(task.getGroupId())) { if (foundPosse.getTaskLock().getInterval().contains(interval) && foundPosse.getTaskLock().getGroupId().equals(task.getGroupId())) {
posseToUse = foundPosse; posseToUse = foundPosse;
} else { } else {
//Could be a deadlock for LockAcquireAction: same task trying to acquire lock for overlapping interval
if (foundPosse.getTaskIds().contains(task.getId())) {
log.makeAlert("Same Task is trying to acquire lock for overlapping interval")
.addData("task", task.getId())
.addData("interval", interval);
}
return Optional.absent(); return Optional.absent();
} }

View File

@ -25,6 +25,8 @@ import com.google.common.collect.FluentIterable;
import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet; import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables; import com.google.common.collect.Iterables;
import com.metamx.emitter.EmittingLogger;
import com.metamx.emitter.service.ServiceEmitter;
import io.druid.indexing.common.TaskLock; import io.druid.indexing.common.TaskLock;
import io.druid.indexing.common.task.NoopTask; import io.druid.indexing.common.task.NoopTask;
import io.druid.indexing.common.task.Task; import io.druid.indexing.common.task.Task;
@ -37,8 +39,10 @@ import io.druid.timeline.DataSegment;
import io.druid.timeline.partition.LinearShardSpec; import io.druid.timeline.partition.LinearShardSpec;
import io.druid.timeline.partition.NumberedShardSpec; import io.druid.timeline.partition.NumberedShardSpec;
import io.druid.timeline.partition.SingleDimensionShardSpec; import io.druid.timeline.partition.SingleDimensionShardSpec;
import org.easymock.EasyMock;
import org.joda.time.DateTime; import org.joda.time.DateTime;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule; import org.junit.Rule;
import org.junit.Test; import org.junit.Test;
import org.junit.rules.ExpectedException; import org.junit.rules.ExpectedException;
@ -55,6 +59,14 @@ public class SegmentAllocateActionTest
private static final DateTime PARTY_TIME = new DateTime("1999"); private static final DateTime PARTY_TIME = new DateTime("1999");
private static final DateTime THE_DISTANT_FUTURE = new DateTime("3000"); private static final DateTime THE_DISTANT_FUTURE = new DateTime("3000");
@Before
public void setUp()
{
ServiceEmitter emitter = EasyMock.createMock(ServiceEmitter.class);
EmittingLogger.registerEmitter(emitter);
EasyMock.replay(emitter);
}
@Test @Test
public void testGranularitiesFinerThanDay() throws Exception public void testGranularitiesFinerThanDay() throws Exception
{ {

View File

@ -80,7 +80,7 @@ public class TaskActionTestKit extends ExternalResource
public void before() public void before()
{ {
taskStorage = new HeapMemoryTaskStorage(new TaskStorageConfig(new Period("PT24H"))); taskStorage = new HeapMemoryTaskStorage(new TaskStorageConfig(new Period("PT24H")));
taskLockbox = new TaskLockbox(taskStorage); taskLockbox = new TaskLockbox(taskStorage, 300);
testDerbyConnector = new TestDerbyConnector( testDerbyConnector = new TestDerbyConnector(
Suppliers.ofInstance(new MetadataStorageConnectorConfig()), Suppliers.ofInstance(new MetadataStorageConnectorConfig()),
Suppliers.ofInstance(metadataStorageTablesConfig) Suppliers.ofInstance(metadataStorageTablesConfig)

View File

@ -931,7 +931,7 @@ public class RealtimeIndexTaskTest
) )
{ {
final TaskConfig taskConfig = new TaskConfig(directory.getPath(), null, null, 50000, null, false, null, null); final TaskConfig taskConfig = new TaskConfig(directory.getPath(), null, null, 50000, null, false, null, null);
final TaskLockbox taskLockbox = new TaskLockbox(taskStorage); final TaskLockbox taskLockbox = new TaskLockbox(taskStorage, 300);
try { try {
taskStorage.insert(task, TaskStatus.running(task.getId())); taskStorage.insert(task, TaskStatus.running(task.getId()));
} }

View File

@ -151,7 +151,7 @@ public class IngestSegmentFirehoseFactoryTest
} }
INDEX_MERGER_V9.persist(index, persistDir, indexSpec); INDEX_MERGER_V9.persist(index, persistDir, indexSpec);
final TaskLockbox tl = new TaskLockbox(ts); final TaskLockbox tl = new TaskLockbox(ts, 300);
final IndexerSQLMetadataStorageCoordinator mdc = new IndexerSQLMetadataStorageCoordinator(null, null, null) final IndexerSQLMetadataStorageCoordinator mdc = new IndexerSQLMetadataStorageCoordinator(null, null, null)
{ {
final private Set<DataSegment> published = Sets.newHashSet(); final private Set<DataSegment> published = Sets.newHashSet();

View File

@ -514,7 +514,7 @@ public class TaskLifecycleTest
Preconditions.checkNotNull(taskStorage); Preconditions.checkNotNull(taskStorage);
Preconditions.checkNotNull(emitter); Preconditions.checkNotNull(emitter);
taskLockbox = new TaskLockbox(taskStorage); taskLockbox = new TaskLockbox(taskStorage, 300);
tac = new LocalTaskActionClientFactory(taskStorage, new TaskActionToolbox(taskLockbox, mdc, emitter, EasyMock.createMock( tac = new LocalTaskActionClientFactory(taskStorage, new TaskActionToolbox(taskLockbox, mdc, emitter, EasyMock.createMock(
SupervisorManager.class))); SupervisorManager.class)));
File tmpDir = temporaryFolder.newFolder(); File tmpDir = temporaryFolder.newFolder();

View File

@ -19,16 +19,28 @@
package io.druid.indexing.overlord; package io.druid.indexing.overlord;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Optional; import com.google.common.base.Optional;
import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableList;
import com.metamx.emitter.EmittingLogger;
import com.metamx.emitter.service.ServiceEmitter;
import io.druid.data.input.FirehoseFactory;
import io.druid.indexing.common.TaskLock; import io.druid.indexing.common.TaskLock;
import io.druid.indexing.common.config.TaskStorageConfig; import io.druid.indexing.common.config.TaskStorageConfig;
import io.druid.indexing.common.task.NoopTask; import io.druid.indexing.common.task.NoopTask;
import io.druid.indexing.common.task.Task; import io.druid.indexing.common.task.Task;
import io.druid.java.util.common.ISE;
import io.druid.server.initialization.ServerConfig;
import org.easymock.EasyMock;
import org.joda.time.Interval; import org.joda.time.Interval;
import org.joda.time.Period;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
import org.junit.Rule;
import org.junit.Test; import org.junit.Test;
import org.junit.rules.ExpectedException;
import java.util.Map;
public class TaskLockboxTest public class TaskLockboxTest
{ {
@ -36,11 +48,22 @@ public class TaskLockboxTest
private TaskLockbox lockbox; private TaskLockbox lockbox;
@Rule
public final ExpectedException exception = ExpectedException.none();
@Before @Before
public void setUp() public void setUp()
{ {
taskStorage = new HeapMemoryTaskStorage(new TaskStorageConfig(null)); taskStorage = new HeapMemoryTaskStorage(new TaskStorageConfig(null));
lockbox = new TaskLockbox(taskStorage); ServerConfig serverConfig = EasyMock.niceMock(ServerConfig.class);
EasyMock.expect(serverConfig.getMaxIdleTime()).andReturn(new Period(100));
EasyMock.replay(serverConfig);
ServiceEmitter emitter = EasyMock.createMock(ServiceEmitter.class);
EmittingLogger.registerEmitter(emitter);
EasyMock.replay(emitter);
lockbox = new TaskLockbox(taskStorage, serverConfig);
} }
@Test @Test
@ -57,17 +80,19 @@ public class TaskLockboxTest
lockbox.lock(NoopTask.create(), new Interval("2015-01-01/2015-01-02")); lockbox.lock(NoopTask.create(), new Interval("2015-01-01/2015-01-02"));
} }
@Test(expected = IllegalStateException.class) @Test
public void testLockAfterTaskComplete() throws InterruptedException public void testLockAfterTaskComplete() throws InterruptedException
{ {
Task task = NoopTask.create(); Task task = NoopTask.create();
exception.expect(ISE.class);
exception.expectMessage("Unable to grant lock to inactive Task");
lockbox.add(task); lockbox.add(task);
lockbox.remove(task); lockbox.remove(task);
lockbox.lock(task, new Interval("2015-01-01/2015-01-02")); lockbox.lock(task, new Interval("2015-01-01/2015-01-02"));
} }
@Test @Test
public void testTryLock() throws InterruptedException public void testTryLock()
{ {
Task task = NoopTask.create(); Task task = NoopTask.create();
lockbox.add(task); lockbox.add(task);
@ -86,7 +111,7 @@ public class TaskLockboxTest
} }
@Test @Test
public void testTrySmallerLock() throws InterruptedException public void testTrySmallerLock()
{ {
Task task = NoopTask.create(); Task task = NoopTask.create();
lockbox.add(task); lockbox.add(task);
@ -109,20 +134,61 @@ public class TaskLockboxTest
); );
} }
@Test(expected = IllegalStateException.class) @Test(expected = IllegalStateException.class)
public void testTryLockForInactiveTask() throws InterruptedException public void testTryLockForInactiveTask()
{ {
Assert.assertFalse(lockbox.tryLock(NoopTask.create(), new Interval("2015-01-01/2015-01-02")).isPresent()); Assert.assertFalse(lockbox.tryLock(NoopTask.create(), new Interval("2015-01-01/2015-01-02")).isPresent());
} }
@Test(expected = IllegalStateException.class) @Test
public void testTryLockAfterTaskComplete() throws InterruptedException public void testTryLockAfterTaskComplete()
{ {
Task task = NoopTask.create(); Task task = NoopTask.create();
exception.expect(ISE.class);
exception.expectMessage("Unable to grant lock to inactive Task");
lockbox.add(task); lockbox.add(task);
lockbox.remove(task); lockbox.remove(task);
Assert.assertFalse(lockbox.tryLock(task, new Interval("2015-01-01/2015-01-02")).isPresent()); Assert.assertFalse(lockbox.tryLock(task, new Interval("2015-01-01/2015-01-02")).isPresent());
} }
@Test
public void testTimeoutForLock() throws InterruptedException
{
Task task1 = NoopTask.create();
Task task2 = new SomeTask(null, 0, 0, null, null, null);
lockbox.add(task1);
lockbox.add(task2);
exception.expect(InterruptedException.class);
exception.expectMessage("can not acquire lock for interval");
lockbox.lock(task1, new Interval("2015-01-01/2015-01-02"));
lockbox.lock(task2, new Interval("2015-01-01/2015-01-15"));
}
public static class SomeTask extends NoopTask {
public SomeTask(
@JsonProperty("id") String id,
@JsonProperty("runTime") long runTime,
@JsonProperty("isReadyTime") long isReadyTime,
@JsonProperty("isReadyResult") String isReadyResult,
@JsonProperty("firehose") FirehoseFactory firehoseFactory,
@JsonProperty("context") Map<String, Object> context
)
{
super(id, runTime, isReadyTime, isReadyResult, firehoseFactory, context);
}
@Override
public String getType()
{
return "someTask";
}
@Override
public String getGroupId() { return "someGroupId";}
}
} }