Fix duplicated locks after sync from storage (#4521)

* Fix duplicated locks after sync from storage

* Remove unnecessary table creation
This commit is contained in:
Jihoon Son 2017-07-12 02:10:11 +09:00 committed by Fangjin Yang
parent 98b1385bcd
commit 6d2df2a542
2 changed files with 172 additions and 51 deletions

View File

@ -19,6 +19,7 @@
package io.druid.indexing.overlord;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
import com.google.common.base.Objects;
import com.google.common.base.Optional;
@ -138,12 +139,18 @@ public class TaskLockbox
log.warn("WTF?! Got lock with empty interval for task: %s", task.getId());
continue;
}
final Optional<TaskLock> acquiredTaskLock = tryLock(
final TaskLockPosse taskLockPosse = tryAddTaskToLockPosse(
task,
savedTaskLock.getInterval(),
Optional.of(savedTaskLock.getVersion())
);
if (acquiredTaskLock.isPresent() && savedTaskLock.getVersion().equals(acquiredTaskLock.get().getVersion())) {
if (taskLockPosse != null) {
taskLockPosse.getTaskIds().add(task.getId());
final TaskLock taskLock = taskLockPosse.getTaskLock();
if (savedTaskLock.getVersion().equals(taskLock.getVersion())) {
taskLockCount ++;
log.info(
"Reacquired lock on interval[%s] version[%s] for task: %s",
@ -151,17 +158,18 @@ public class TaskLockbox
savedTaskLock.getVersion(),
task.getId()
);
} else if (acquiredTaskLock.isPresent()) {
} else {
taskLockCount ++;
log.info(
"Could not reacquire lock on interval[%s] version[%s] (got version[%s] instead) for task: %s",
savedTaskLock.getInterval(),
savedTaskLock.getVersion(),
acquiredTaskLock.get().getVersion(),
taskLock.getVersion(),
task.getId()
);
}
} else {
log.info(
throw new ISE(
"Could not reacquire lock on interval[%s] version[%s] for task: %s",
savedTaskLock.getInterval(),
savedTaskLock.getVersion(),
@ -264,6 +272,51 @@ public class TaskLockbox
throw new ISE("Unable to grant lock to inactive Task [%s]", task.getId());
}
Preconditions.checkArgument(interval.toDurationMillis() > 0, "interval empty");
final TaskLockPosse posseToUse = tryAddTaskToLockPosse(task, interval, preferredVersion);
if (posseToUse != null) {
// Add to existing TaskLockPosse, if necessary
if (posseToUse.getTaskIds().add(task.getId())) {
log.info("Added task[%s] to TaskLock[%s]", task.getId(), posseToUse.getTaskLock().getGroupId());
// Update task storage facility. If it fails, revoke the lock.
try {
taskStorage.addLock(task.getId(), posseToUse.getTaskLock());
return Optional.of(posseToUse.getTaskLock());
} catch(Exception e) {
log.makeAlert("Failed to persist lock in storage")
.addData("task", task.getId())
.addData("dataSource", posseToUse.getTaskLock().getDataSource())
.addData("interval", posseToUse.getTaskLock().getInterval())
.addData("version", posseToUse.getTaskLock().getVersion())
.emit();
unlock(task, interval);
return Optional.absent();
}
} else {
log.info("Task[%s] already present in TaskLock[%s]", task.getId(), posseToUse.getTaskLock().getGroupId());
return Optional.of(posseToUse.getTaskLock());
}
} else {
return Optional.absent();
}
}
finally {
giant.unlock();
}
}
private TaskLockPosse tryAddTaskToLockPosse(
final Task task,
final Interval interval,
final Optional<String> preferredVersion
)
{
giant.lock();
try {
final String dataSource = task.getDataSource();
final List<TaskLockPosse> foundPosses = findLockPossesForInterval(dataSource, interval);
final TaskLockPosse posseToUse;
@ -271,7 +324,7 @@ public class TaskLockbox
if (foundPosses.size() > 1) {
// Too many existing locks.
return Optional.absent();
return null;
} else if (foundPosses.size() == 1) {
@ -287,7 +340,7 @@ public class TaskLockbox
.addData("task", task.getId())
.addData("interval", interval);
}
return Optional.absent();
return null;
}
} else {
@ -321,33 +374,11 @@ public class TaskLockbox
log.info("Created new TaskLockPosse: %s", posseToUse);
}
// Add to existing TaskLockPosse, if necessary
if (posseToUse.getTaskIds().add(task.getId())) {
log.info("Added task[%s] to TaskLock[%s]", task.getId(), posseToUse.getTaskLock().getGroupId());
// Update task storage facility. If it fails, revoke the lock.
try {
taskStorage.addLock(task.getId(), posseToUse.getTaskLock());
return Optional.of(posseToUse.getTaskLock());
} catch(Exception e) {
log.makeAlert("Failed to persist lock in storage")
.addData("task", task.getId())
.addData("dataSource", posseToUse.getTaskLock().getDataSource())
.addData("interval", posseToUse.getTaskLock().getInterval())
.addData("version", posseToUse.getTaskLock().getVersion())
.emit();
unlock(task, interval);
return Optional.absent();
}
} else {
log.info("Task[%s] already present in TaskLock[%s]", task.getId(), posseToUse.getTaskLock().getGroupId());
return Optional.of(posseToUse.getTaskLock());
}
return posseToUse;
}
finally {
giant.unlock();
}
}
/**
@ -572,7 +603,19 @@ public class TaskLockbox
}
}
private static class TaskLockPosse
@VisibleForTesting
Set<String> getActiveTasks()
{
return activeTasks;
}
@VisibleForTesting
Map<String, NavigableMap<Interval, TaskLockPosse>> getAllLocks()
{
return running;
}
static class TaskLockPosse
{
final private TaskLock taskLock;
final private Set<String> taskIds;
@ -593,6 +636,31 @@ public class TaskLockbox
return taskIds;
}
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (!getClass().equals(o.getClass())) {
return false;
}
final TaskLockPosse that = (TaskLockPosse) o;
if (!taskLock.equals(that.taskLock)) {
return false;
}
return taskIds.equals(that.taskIds);
}
@Override
public int hashCode()
{
return Objects.hashCode(taskLock, taskIds);
}
@Override
public String toString()
{

View File

@ -20,16 +20,23 @@
package io.druid.indexing.overlord;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Optional;
import com.google.common.collect.ImmutableList;
import com.metamx.emitter.EmittingLogger;
import com.metamx.emitter.service.ServiceEmitter;
import io.druid.data.input.FirehoseFactory;
import io.druid.indexing.common.TaskLock;
import io.druid.indexing.common.TaskStatus;
import io.druid.indexing.common.config.TaskStorageConfig;
import io.druid.indexing.common.task.NoopTask;
import io.druid.indexing.common.task.Task;
import io.druid.jackson.DefaultObjectMapper;
import io.druid.java.util.common.ISE;
import io.druid.java.util.common.StringUtils;
import io.druid.metadata.EntryExistsException;
import io.druid.metadata.SQLMetadataStorageActionHandlerFactory;
import io.druid.metadata.TestDerbyConnector;
import io.druid.server.initialization.ServerConfig;
import org.easymock.EasyMock;
import org.joda.time.Interval;
@ -40,23 +47,39 @@ import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
public class TaskLockboxTest
{
private TaskStorage taskStorage;
@Rule
public final TestDerbyConnector.DerbyConnectorRule derby = new TestDerbyConnector.DerbyConnectorRule();
private final ObjectMapper objectMapper = new DefaultObjectMapper();
private ServerConfig serverConfig;
private TaskStorage taskStorage;
private TaskLockbox lockbox;
@Rule
public final ExpectedException exception = ExpectedException.none();
@Before
public void setUp()
public void setup()
{
taskStorage = new HeapMemoryTaskStorage(new TaskStorageConfig(null));
ServerConfig serverConfig = EasyMock.niceMock(ServerConfig.class);
EasyMock.expect(serverConfig.getMaxIdleTime()).andReturn(new Period(100));
final TestDerbyConnector derbyConnector = derby.getConnector();
derbyConnector.createTaskTables();
taskStorage = new MetadataTaskStorage(
derbyConnector,
new TaskStorageConfig(null),
new SQLMetadataStorageActionHandlerFactory(
derbyConnector,
derby.metadataTablesConfigSupplier().get(),
objectMapper
)
);
serverConfig = EasyMock.niceMock(ServerConfig.class);
EasyMock.expect(serverConfig.getMaxIdleTime()).andReturn(new Period(100)).anyTimes();
EasyMock.replay(serverConfig);
ServiceEmitter emitter = EasyMock.createMock(ServiceEmitter.class);
@ -166,6 +189,37 @@ public class TaskLockboxTest
lockbox.lock(task2, new Interval("2015-01-01/2015-01-15"));
}
@Test
public void testSyncFromStorage() throws EntryExistsException
{
final TaskLockbox originalBox = new TaskLockbox(taskStorage, serverConfig);
for (int i = 0; i < 5; i++) {
final Task task = NoopTask.create();
taskStorage.insert(task, TaskStatus.running(task.getId()));
originalBox.add(task);
Assert.assertTrue(
originalBox.tryLock(task, new Interval(StringUtils.format("2017-01-0%d/2017-01-0%d", (i + 1), (i + 2))))
.isPresent()
);
}
final List<TaskLock> beforeLocksInStorage = taskStorage.getActiveTasks().stream()
.flatMap(task -> taskStorage.getLocks(task.getId()).stream())
.collect(Collectors.toList());
final TaskLockbox newBox = new TaskLockbox(taskStorage, serverConfig);
newBox.syncFromStorage();
Assert.assertEquals(originalBox.getAllLocks(), newBox.getAllLocks());
Assert.assertEquals(originalBox.getActiveTasks(), newBox.getActiveTasks());
final List<TaskLock> afterLocksInStorage = taskStorage.getActiveTasks().stream()
.flatMap(task -> taskStorage.getLocks(task.getId()).stream())
.collect(Collectors.toList());
Assert.assertEquals(beforeLocksInStorage, afterLocksInStorage);
}
public static class SomeTask extends NoopTask {
public SomeTask(
@ -190,5 +244,4 @@ public class TaskLockboxTest
public String getGroupId() { return "someGroupId";}
}
}