HBASE-14017 Procedure v2 - MasterProcedureQueue fix concurrency issue on table queue deletion

Signed-off-by: Sean Busbey <busbey@apache.org>
This commit is contained in:
Matteo Bertozzi 2015-07-06 11:36:20 -07:00 committed by Sean Busbey
parent e28094fe4d
commit 80b0a3e914
2 changed files with 60 additions and 5 deletions

View File

@ -366,7 +366,7 @@ public class MasterProcedureQueue implements ProcedureRunnableSet {
if (queue != null) {
lock.lock();
try {
if (queue.isEmpty() && !queue.isLocked()) {
if (queue.isEmpty() && queue.acquireDeleteLock()) {
tableFairQ.remove(table);
// Remove the table lock
@ -390,7 +390,7 @@ public class MasterProcedureQueue implements ProcedureRunnableSet {
void addFront(Procedure proc);
void addBack(Procedure proc);
Long poll();
boolean isLocked();
boolean acquireDeleteLock();
}
/**
@ -443,6 +443,10 @@ public class MasterProcedureQueue implements ProcedureRunnableSet {
}
@Override
public synchronized boolean acquireDeleteLock() {
return tryExclusiveLock();
}
public synchronized boolean isLocked() {
return isExclusiveLock() || sharedLock > 0;
}

View File

@ -29,6 +29,7 @@ import java.util.ArrayList;
import java.util.HashSet;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.logging.Log;
@ -62,6 +63,56 @@ public class TestMasterProcedureQueue {
assertEquals(0, queue.size());
}
@Test
public void testConcurrentCreateDelete() throws Exception {
final MasterProcedureQueue procQueue = queue;
final TableName table = TableName.valueOf("testtb");
final AtomicBoolean running = new AtomicBoolean(true);
final AtomicBoolean failure = new AtomicBoolean(false);
Thread createThread = new Thread() {
@Override
public void run() {
try {
while (running.get() && !failure.get()) {
if (procQueue.tryAcquireTableExclusiveLock(table, "create")) {
procQueue.releaseTableExclusiveLock(table);
}
}
} catch (Throwable e) {
LOG.error("create failed", e);
failure.set(true);
}
}
};
Thread deleteThread = new Thread() {
@Override
public void run() {
try {
while (running.get() && !failure.get()) {
if (procQueue.tryAcquireTableExclusiveLock(table, "delete")) {
procQueue.releaseTableExclusiveLock(table);
}
procQueue.markTableAsDeleted(table);
}
} catch (Throwable e) {
LOG.error("delete failed", e);
failure.set(true);
}
}
};
createThread.start();
deleteThread.start();
for (int i = 0; i < 100 && running.get() && !failure.get(); ++i) {
Thread.sleep(100);
}
running.set(false);
createThread.join();
deleteThread.join();
assertEquals(false, failure.get());
}
/**
* Verify simple create/insert/fetch/delete of the table queue.
*/