HBASE-19520 Add UTs for the new lock type PEER

Signed-off-by: zhangduo <zhangduo@apache.org>
This commit is contained in:
Guanghao Zhang 2017-12-20 16:43:38 +08:00 committed by zhangduo
parent 7c2a0d7e21
commit 9ead5934c0
3 changed files with 201 additions and 8 deletions

View File

@ -277,6 +277,13 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler {
while (tableIter.hasNext()) {
count += tableIter.next().size();
}
// Peer queues
final AvlTreeIterator<PeerQueue> peerIter = new AvlTreeIterator<>(peerMap);
while (peerIter.hasNext()) {
count += peerIter.next().size();
}
return count;
}
@ -807,7 +814,7 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler {
* @see #wakePeerExclusiveLock(Procedure, String)
* @param procedure the procedure trying to acquire the lock
* @param peerId peer to lock
* @return true if the procedure has to wait for the per to be available
* @return true if the procedure has to wait for the peer to be available
*/
public boolean waitPeerExclusiveLock(Procedure<?> procedure, String peerId) {
schedLock();

View File

@ -908,6 +908,27 @@ public class TestMasterProcedureScheduler {
}
}
public static class TestPeerProcedure extends TestProcedure implements PeerProcedureInterface {
private final String peerId;
private final PeerOperationType opType;
public TestPeerProcedure(long procId, String peerId, PeerOperationType opType) {
super(procId);
this.peerId = peerId;
this.opType = opType;
}
@Override
public String getPeerId() {
return peerId;
}
@Override
public PeerOperationType getPeerOperationType() {
return opType;
}
}
private static LockProcedure createLockProcedure(LockType lockType, long procId) throws Exception {
LockProcedure procedure = new LockProcedure();
@ -930,22 +951,19 @@ public class TestMasterProcedureScheduler {
return createLockProcedure(LockType.SHARED, procId);
}
private static void assertLockResource(LockedResource resource,
LockedResourceType resourceType, String resourceName)
{
private static void assertLockResource(LockedResource resource, LockedResourceType resourceType,
String resourceName) {
assertEquals(resourceType, resource.getResourceType());
assertEquals(resourceName, resource.getResourceName());
}
private static void assertExclusiveLock(LockedResource resource, Procedure<?> procedure)
{
private static void assertExclusiveLock(LockedResource resource, Procedure<?> procedure) {
assertEquals(LockType.EXCLUSIVE, resource.getLockType());
assertEquals(procedure, resource.getExclusiveLockOwnerProcedure());
assertEquals(0, resource.getSharedLockCount());
}
private static void assertSharedLock(LockedResource resource, int lockCount)
{
private static void assertSharedLock(LockedResource resource, int lockCount) {
assertEquals(LockType.SHARED, resource.getLockType());
assertEquals(lockCount, resource.getSharedLockCount());
}
@ -1029,6 +1047,39 @@ public class TestMasterProcedureScheduler {
assertTrue(regionResource.getWaitingProcedures().isEmpty());
}
@Test
public void testListLocksPeer() throws Exception {
String peerId = "1";
LockProcedure procedure = createExclusiveLockProcedure(4);
queue.waitPeerExclusiveLock(procedure, peerId);
List<LockedResource> locks = queue.getLocks();
assertEquals(1, locks.size());
LockedResource resource = locks.get(0);
assertLockResource(resource, LockedResourceType.PEER, peerId);
assertExclusiveLock(resource, procedure);
assertTrue(resource.getWaitingProcedures().isEmpty());
// Try to acquire the exclusive lock again with same procedure
assertFalse(queue.waitPeerExclusiveLock(procedure, peerId));
// Try to acquire the exclusive lock again with new procedure
LockProcedure procedure2 = createExclusiveLockProcedure(5);
assertTrue(queue.waitPeerExclusiveLock(procedure2, peerId));
// Same peerId, still only has 1 LockedResource
locks = queue.getLocks();
assertEquals(1, locks.size());
resource = locks.get(0);
assertLockResource(resource, LockedResourceType.PEER, peerId);
// LockedResource owner still is the origin procedure
assertExclusiveLock(resource, procedure);
// The new procedure should in the waiting list
assertEquals(1, resource.getWaitingProcedures().size());
}
@Test
public void testListLocksWaiting() throws Exception {
LockProcedure procedure1 = createExclusiveLockProcedure(1);

View File

@ -26,6 +26,8 @@ import java.util.HashSet;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.master.procedure.PeerProcedureInterface.PeerOperationType;
import org.apache.hadoop.hbase.master.procedure.TestMasterProcedureScheduler.TestPeerProcedure;
import org.apache.hadoop.hbase.master.procedure.TestMasterProcedureScheduler.TestTableProcedure;
import org.apache.hadoop.hbase.procedure2.Procedure;
import org.apache.hadoop.hbase.testclassification.MasterTests;
@ -63,6 +65,85 @@ public class TestMasterProcedureSchedulerConcurrency {
queue.clear();
}
@Test
public void testConcurrentPeerOperations() throws Exception {
TestPeerProcedureSet procSet = new TestPeerProcedureSet(queue);
int NUM_ITEMS = 10;
int NUM_PEERS = 5;
AtomicInteger opsCount = new AtomicInteger(0);
for (int i = 0; i < NUM_PEERS; ++i) {
String peerId = String.format("test-peer-%04d", i);
for (int j = 1; j < NUM_ITEMS; ++j) {
procSet.addBack(new TestPeerProcedure(i * 100 + j, peerId, PeerOperationType.ADD));
opsCount.incrementAndGet();
}
}
assertEquals(opsCount.get(), queue.size());
Thread[] threads = new Thread[NUM_PEERS * 2];
HashSet<String> concurrentPeers = new HashSet<>();
ArrayList<String> failures = new ArrayList<>();
AtomicInteger concurrentCount = new AtomicInteger(0);
for (int i = 0; i < threads.length; ++i) {
threads[i] = new Thread() {
@Override
public void run() {
while (opsCount.get() > 0) {
try {
TestPeerProcedure proc = procSet.acquire();
if (proc == null) {
queue.signalAll();
if (opsCount.get() > 0) {
continue;
}
break;
}
String peerId = proc.getPeerId();
synchronized (concurrentPeers) {
assertTrue("unexpected concurrency on " + peerId, concurrentPeers.add(peerId));
}
assertTrue(opsCount.decrementAndGet() >= 0);
try {
long procId = proc.getProcId();
int concurrent = concurrentCount.incrementAndGet();
assertTrue("inc-concurrent="+ concurrent +" 1 <= concurrent <= "+ NUM_PEERS,
concurrent >= 1 && concurrent <= NUM_PEERS);
LOG.debug("[S] peerId="+ peerId +" procId="+ procId +" concurrent="+ concurrent);
Thread.sleep(2000);
concurrent = concurrentCount.decrementAndGet();
LOG.debug("[E] peerId="+ peerId +" procId="+ procId +" concurrent="+ concurrent);
assertTrue("dec-concurrent=" + concurrent, concurrent < NUM_PEERS);
} finally {
synchronized (concurrentPeers) {
assertTrue(concurrentPeers.remove(peerId));
}
procSet.release(proc);
}
} catch (Throwable e) {
LOG.error("Failed " + e.getMessage(), e);
synchronized (failures) {
failures.add(e.getMessage());
}
} finally {
queue.signalAll();
}
}
}
};
threads[i].start();
}
for (int i = 0; i < threads.length; ++i) {
threads[i].join();
}
assertTrue(failures.toString(), failures.isEmpty());
assertEquals(0, opsCount.get());
assertEquals(0, queue.size());
}
/**
* Verify that "write" operations for a single table are serialized,
* but different tables can be executed in parallel.
@ -221,4 +302,58 @@ public class TestMasterProcedureSchedulerConcurrency {
return ((TableProcedureInterface)proc).getTableOperationType();
}
}
public static class TestPeerProcedureSet {
private final MasterProcedureScheduler queue;
public TestPeerProcedureSet(final MasterProcedureScheduler queue) {
this.queue = queue;
}
public void addBack(TestPeerProcedure proc) {
queue.addBack(proc);
}
public TestPeerProcedure acquire() {
TestPeerProcedure proc = null;
boolean waiting = true;
while (waiting && queue.size() > 0) {
proc = (TestPeerProcedure) queue.poll(100000000L);
if (proc == null) {
continue;
}
switch (proc.getPeerOperationType()) {
case ADD:
case REMOVE:
case ENABLE:
case DISABLE:
case UPDATE_CONFIG:
waiting = queue.waitPeerExclusiveLock(proc, proc.getPeerId());
break;
case REFRESH:
waiting = false;
break;
default:
throw new UnsupportedOperationException();
}
}
return proc;
}
public void release(TestPeerProcedure proc) {
switch (proc.getPeerOperationType()) {
case ADD:
case REMOVE:
case ENABLE:
case DISABLE:
case UPDATE_CONFIG:
queue.wakePeerExclusiveLock(proc, proc.getPeerId());
break;
case REFRESH:
break;
default:
throw new UnsupportedOperationException();
}
}
}
}