HBASE-19520 Add UTs for the new lock type PEER
Signed-off-by: zhangduo <zhangduo@apache.org>
This commit is contained in:
parent
62496b5f36
commit
17762c60cc
|
@ -389,6 +389,13 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler {
|
||||||
while (tableIter.hasNext()) {
|
while (tableIter.hasNext()) {
|
||||||
count += tableIter.next().size();
|
count += tableIter.next().size();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Peer queues
|
||||||
|
final AvlTreeIterator<PeerQueue> peerIter = new AvlTreeIterator<>(peerMap);
|
||||||
|
while (peerIter.hasNext()) {
|
||||||
|
count += peerIter.next().size();
|
||||||
|
}
|
||||||
|
|
||||||
return count;
|
return count;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1041,7 +1048,7 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler {
|
||||||
* @see #wakePeerExclusiveLock(Procedure, String)
|
* @see #wakePeerExclusiveLock(Procedure, String)
|
||||||
* @param procedure the procedure trying to acquire the lock
|
* @param procedure the procedure trying to acquire the lock
|
||||||
* @param peerId peer to lock
|
* @param peerId peer to lock
|
||||||
* @return true if the procedure has to wait for the per to be available
|
* @return true if the procedure has to wait for the peer to be available
|
||||||
*/
|
*/
|
||||||
public boolean waitPeerExclusiveLock(Procedure<?> procedure, String peerId) {
|
public boolean waitPeerExclusiveLock(Procedure<?> procedure, String peerId) {
|
||||||
schedLock();
|
schedLock();
|
||||||
|
|
|
@ -905,6 +905,27 @@ public class TestMasterProcedureScheduler {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static class TestPeerProcedure extends TestProcedure implements PeerProcedureInterface {
|
||||||
|
private final String peerId;
|
||||||
|
private final PeerOperationType opType;
|
||||||
|
|
||||||
|
public TestPeerProcedure(long procId, String peerId, PeerOperationType opType) {
|
||||||
|
super(procId);
|
||||||
|
this.peerId = peerId;
|
||||||
|
this.opType = opType;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String getPeerId() {
|
||||||
|
return peerId;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public PeerOperationType getPeerOperationType() {
|
||||||
|
return opType;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private static LockProcedure createLockProcedure(LockType lockType, long procId) throws Exception {
|
private static LockProcedure createLockProcedure(LockType lockType, long procId) throws Exception {
|
||||||
LockProcedure procedure = new LockProcedure();
|
LockProcedure procedure = new LockProcedure();
|
||||||
|
|
||||||
|
@ -927,22 +948,19 @@ public class TestMasterProcedureScheduler {
|
||||||
return createLockProcedure(LockType.SHARED, procId);
|
return createLockProcedure(LockType.SHARED, procId);
|
||||||
}
|
}
|
||||||
|
|
||||||
private static void assertLockResource(LockedResource resource,
|
private static void assertLockResource(LockedResource resource, LockedResourceType resourceType,
|
||||||
LockedResourceType resourceType, String resourceName)
|
String resourceName) {
|
||||||
{
|
|
||||||
assertEquals(resourceType, resource.getResourceType());
|
assertEquals(resourceType, resource.getResourceType());
|
||||||
assertEquals(resourceName, resource.getResourceName());
|
assertEquals(resourceName, resource.getResourceName());
|
||||||
}
|
}
|
||||||
|
|
||||||
private static void assertExclusiveLock(LockedResource resource, Procedure<?> procedure)
|
private static void assertExclusiveLock(LockedResource resource, Procedure<?> procedure) {
|
||||||
{
|
|
||||||
assertEquals(LockType.EXCLUSIVE, resource.getLockType());
|
assertEquals(LockType.EXCLUSIVE, resource.getLockType());
|
||||||
assertEquals(procedure, resource.getExclusiveLockOwnerProcedure());
|
assertEquals(procedure, resource.getExclusiveLockOwnerProcedure());
|
||||||
assertEquals(0, resource.getSharedLockCount());
|
assertEquals(0, resource.getSharedLockCount());
|
||||||
}
|
}
|
||||||
|
|
||||||
private static void assertSharedLock(LockedResource resource, int lockCount)
|
private static void assertSharedLock(LockedResource resource, int lockCount) {
|
||||||
{
|
|
||||||
assertEquals(LockType.SHARED, resource.getLockType());
|
assertEquals(LockType.SHARED, resource.getLockType());
|
||||||
assertEquals(lockCount, resource.getSharedLockCount());
|
assertEquals(lockCount, resource.getSharedLockCount());
|
||||||
}
|
}
|
||||||
|
@ -1026,6 +1044,39 @@ public class TestMasterProcedureScheduler {
|
||||||
assertTrue(regionResource.getWaitingProcedures().isEmpty());
|
assertTrue(regionResource.getWaitingProcedures().isEmpty());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testListLocksPeer() throws Exception {
|
||||||
|
String peerId = "1";
|
||||||
|
LockProcedure procedure = createExclusiveLockProcedure(4);
|
||||||
|
queue.waitPeerExclusiveLock(procedure, peerId);
|
||||||
|
|
||||||
|
List<LockedResource> locks = queue.getLocks();
|
||||||
|
assertEquals(1, locks.size());
|
||||||
|
|
||||||
|
LockedResource resource = locks.get(0);
|
||||||
|
assertLockResource(resource, LockedResourceType.PEER, peerId);
|
||||||
|
assertExclusiveLock(resource, procedure);
|
||||||
|
assertTrue(resource.getWaitingProcedures().isEmpty());
|
||||||
|
|
||||||
|
// Try to acquire the exclusive lock again with same procedure
|
||||||
|
assertFalse(queue.waitPeerExclusiveLock(procedure, peerId));
|
||||||
|
|
||||||
|
// Try to acquire the exclusive lock again with new procedure
|
||||||
|
LockProcedure procedure2 = createExclusiveLockProcedure(5);
|
||||||
|
assertTrue(queue.waitPeerExclusiveLock(procedure2, peerId));
|
||||||
|
|
||||||
|
// Same peerId, still only has 1 LockedResource
|
||||||
|
locks = queue.getLocks();
|
||||||
|
assertEquals(1, locks.size());
|
||||||
|
|
||||||
|
resource = locks.get(0);
|
||||||
|
assertLockResource(resource, LockedResourceType.PEER, peerId);
|
||||||
|
// LockedResource owner still is the origin procedure
|
||||||
|
assertExclusiveLock(resource, procedure);
|
||||||
|
// The new procedure should in the waiting list
|
||||||
|
assertEquals(1, resource.getWaitingProcedures().size());
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testListLocksWaiting() throws Exception {
|
public void testListLocksWaiting() throws Exception {
|
||||||
LockProcedure procedure1 = createExclusiveLockProcedure(1);
|
LockProcedure procedure1 = createExclusiveLockProcedure(1);
|
||||||
|
|
|
@ -26,6 +26,8 @@ import java.util.concurrent.atomic.AtomicInteger;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.hbase.HBaseConfiguration;
|
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||||
import org.apache.hadoop.hbase.TableName;
|
import org.apache.hadoop.hbase.TableName;
|
||||||
|
import org.apache.hadoop.hbase.master.procedure.PeerProcedureInterface.PeerOperationType;
|
||||||
|
import org.apache.hadoop.hbase.master.procedure.TestMasterProcedureScheduler.TestPeerProcedure;
|
||||||
import org.apache.hadoop.hbase.master.procedure.TestMasterProcedureScheduler.TestTableProcedure;
|
import org.apache.hadoop.hbase.master.procedure.TestMasterProcedureScheduler.TestTableProcedure;
|
||||||
import org.apache.hadoop.hbase.procedure2.Procedure;
|
import org.apache.hadoop.hbase.procedure2.Procedure;
|
||||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||||
|
@ -63,6 +65,85 @@ public class TestMasterProcedureSchedulerConcurrency {
|
||||||
queue.clear();
|
queue.clear();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testConcurrentPeerOperations() throws Exception {
|
||||||
|
TestPeerProcedureSet procSet = new TestPeerProcedureSet(queue);
|
||||||
|
|
||||||
|
int NUM_ITEMS = 10;
|
||||||
|
int NUM_PEERS = 5;
|
||||||
|
AtomicInteger opsCount = new AtomicInteger(0);
|
||||||
|
for (int i = 0; i < NUM_PEERS; ++i) {
|
||||||
|
String peerId = String.format("test-peer-%04d", i);
|
||||||
|
for (int j = 1; j < NUM_ITEMS; ++j) {
|
||||||
|
procSet.addBack(new TestPeerProcedure(i * 100 + j, peerId, PeerOperationType.ADD));
|
||||||
|
opsCount.incrementAndGet();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
assertEquals(opsCount.get(), queue.size());
|
||||||
|
|
||||||
|
Thread[] threads = new Thread[NUM_PEERS * 2];
|
||||||
|
HashSet<String> concurrentPeers = new HashSet<>();
|
||||||
|
ArrayList<String> failures = new ArrayList<>();
|
||||||
|
AtomicInteger concurrentCount = new AtomicInteger(0);
|
||||||
|
for (int i = 0; i < threads.length; ++i) {
|
||||||
|
threads[i] = new Thread() {
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
while (opsCount.get() > 0) {
|
||||||
|
try {
|
||||||
|
TestPeerProcedure proc = procSet.acquire();
|
||||||
|
if (proc == null) {
|
||||||
|
queue.signalAll();
|
||||||
|
if (opsCount.get() > 0) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
String peerId = proc.getPeerId();
|
||||||
|
synchronized (concurrentPeers) {
|
||||||
|
assertTrue("unexpected concurrency on " + peerId, concurrentPeers.add(peerId));
|
||||||
|
}
|
||||||
|
assertTrue(opsCount.decrementAndGet() >= 0);
|
||||||
|
|
||||||
|
try {
|
||||||
|
long procId = proc.getProcId();
|
||||||
|
int concurrent = concurrentCount.incrementAndGet();
|
||||||
|
assertTrue("inc-concurrent="+ concurrent +" 1 <= concurrent <= "+ NUM_PEERS,
|
||||||
|
concurrent >= 1 && concurrent <= NUM_PEERS);
|
||||||
|
LOG.debug("[S] peerId="+ peerId +" procId="+ procId +" concurrent="+ concurrent);
|
||||||
|
Thread.sleep(2000);
|
||||||
|
concurrent = concurrentCount.decrementAndGet();
|
||||||
|
LOG.debug("[E] peerId="+ peerId +" procId="+ procId +" concurrent="+ concurrent);
|
||||||
|
assertTrue("dec-concurrent=" + concurrent, concurrent < NUM_PEERS);
|
||||||
|
} finally {
|
||||||
|
synchronized (concurrentPeers) {
|
||||||
|
assertTrue(concurrentPeers.remove(peerId));
|
||||||
|
}
|
||||||
|
procSet.release(proc);
|
||||||
|
}
|
||||||
|
} catch (Throwable e) {
|
||||||
|
LOG.error("Failed " + e.getMessage(), e);
|
||||||
|
synchronized (failures) {
|
||||||
|
failures.add(e.getMessage());
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
queue.signalAll();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
threads[i].start();
|
||||||
|
}
|
||||||
|
|
||||||
|
for (int i = 0; i < threads.length; ++i) {
|
||||||
|
threads[i].join();
|
||||||
|
}
|
||||||
|
assertTrue(failures.toString(), failures.isEmpty());
|
||||||
|
assertEquals(0, opsCount.get());
|
||||||
|
assertEquals(0, queue.size());
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Verify that "write" operations for a single table are serialized,
|
* Verify that "write" operations for a single table are serialized,
|
||||||
* but different tables can be executed in parallel.
|
* but different tables can be executed in parallel.
|
||||||
|
@ -221,4 +302,58 @@ public class TestMasterProcedureSchedulerConcurrency {
|
||||||
return ((TableProcedureInterface)proc).getTableOperationType();
|
return ((TableProcedureInterface)proc).getTableOperationType();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static class TestPeerProcedureSet {
|
||||||
|
private final MasterProcedureScheduler queue;
|
||||||
|
|
||||||
|
public TestPeerProcedureSet(final MasterProcedureScheduler queue) {
|
||||||
|
this.queue = queue;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void addBack(TestPeerProcedure proc) {
|
||||||
|
queue.addBack(proc);
|
||||||
|
}
|
||||||
|
|
||||||
|
public TestPeerProcedure acquire() {
|
||||||
|
TestPeerProcedure proc = null;
|
||||||
|
boolean waiting = true;
|
||||||
|
while (waiting && queue.size() > 0) {
|
||||||
|
proc = (TestPeerProcedure) queue.poll(100000000L);
|
||||||
|
if (proc == null) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
switch (proc.getPeerOperationType()) {
|
||||||
|
case ADD:
|
||||||
|
case REMOVE:
|
||||||
|
case ENABLE:
|
||||||
|
case DISABLE:
|
||||||
|
case UPDATE_CONFIG:
|
||||||
|
waiting = queue.waitPeerExclusiveLock(proc, proc.getPeerId());
|
||||||
|
break;
|
||||||
|
case REFRESH:
|
||||||
|
waiting = false;
|
||||||
|
break;
|
||||||
|
default:
|
||||||
|
throw new UnsupportedOperationException();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return proc;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void release(TestPeerProcedure proc) {
|
||||||
|
switch (proc.getPeerOperationType()) {
|
||||||
|
case ADD:
|
||||||
|
case REMOVE:
|
||||||
|
case ENABLE:
|
||||||
|
case DISABLE:
|
||||||
|
case UPDATE_CONFIG:
|
||||||
|
queue.wakePeerExclusiveLock(proc, proc.getPeerId());
|
||||||
|
break;
|
||||||
|
case REFRESH:
|
||||||
|
break;
|
||||||
|
default:
|
||||||
|
throw new UnsupportedOperationException();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue