diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureScheduler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureScheduler.java index 8ff2d1289f0..a25217cfe91 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureScheduler.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureScheduler.java @@ -389,6 +389,13 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler { while (tableIter.hasNext()) { count += tableIter.next().size(); } + + // Peer queues + final AvlTreeIterator peerIter = new AvlTreeIterator<>(peerMap); + while (peerIter.hasNext()) { + count += peerIter.next().size(); + } + return count; } @@ -1041,7 +1048,7 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler { * @see #wakePeerExclusiveLock(Procedure, String) * @param procedure the procedure trying to acquire the lock * @param peerId peer to lock - * @return true if the procedure has to wait for the per to be available + * @return true if the procedure has to wait for the peer to be available */ public boolean waitPeerExclusiveLock(Procedure procedure, String peerId) { schedLock(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureScheduler.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureScheduler.java index 0291165848e..fd77e1fb0f5 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureScheduler.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureScheduler.java @@ -905,6 +905,27 @@ public class TestMasterProcedureScheduler { } } + public static class TestPeerProcedure extends TestProcedure implements PeerProcedureInterface { + private final String peerId; + private final PeerOperationType opType; + + public TestPeerProcedure(long procId, String peerId, PeerOperationType opType) { + super(procId); + this.peerId = peerId; + this.opType = opType; + } + + @Override + public String getPeerId() { + return peerId; + } + + @Override + public PeerOperationType getPeerOperationType() { + return opType; + } + } + private static LockProcedure createLockProcedure(LockType lockType, long procId) throws Exception { LockProcedure procedure = new LockProcedure(); @@ -927,22 +948,19 @@ public class TestMasterProcedureScheduler { return createLockProcedure(LockType.SHARED, procId); } - private static void assertLockResource(LockedResource resource, - LockedResourceType resourceType, String resourceName) - { + private static void assertLockResource(LockedResource resource, LockedResourceType resourceType, + String resourceName) { assertEquals(resourceType, resource.getResourceType()); assertEquals(resourceName, resource.getResourceName()); } - private static void assertExclusiveLock(LockedResource resource, Procedure procedure) - { + private static void assertExclusiveLock(LockedResource resource, Procedure procedure) { assertEquals(LockType.EXCLUSIVE, resource.getLockType()); assertEquals(procedure, resource.getExclusiveLockOwnerProcedure()); assertEquals(0, resource.getSharedLockCount()); } - private static void assertSharedLock(LockedResource resource, int lockCount) - { + private static void assertSharedLock(LockedResource resource, int lockCount) { assertEquals(LockType.SHARED, resource.getLockType()); assertEquals(lockCount, resource.getSharedLockCount()); } @@ -1026,6 +1044,39 @@ public class TestMasterProcedureScheduler { assertTrue(regionResource.getWaitingProcedures().isEmpty()); } + @Test + public void testListLocksPeer() throws Exception { + String peerId = "1"; + LockProcedure procedure = createExclusiveLockProcedure(4); + queue.waitPeerExclusiveLock(procedure, peerId); + + List locks = queue.getLocks(); + assertEquals(1, locks.size()); + + LockedResource resource = locks.get(0); + assertLockResource(resource, LockedResourceType.PEER, peerId); + assertExclusiveLock(resource, procedure); + assertTrue(resource.getWaitingProcedures().isEmpty()); + + // Try to acquire the exclusive lock again with same procedure + assertFalse(queue.waitPeerExclusiveLock(procedure, peerId)); + + // Try to acquire the exclusive lock again with new procedure + LockProcedure procedure2 = createExclusiveLockProcedure(5); + assertTrue(queue.waitPeerExclusiveLock(procedure2, peerId)); + + // Same peerId, still only has 1 LockedResource + locks = queue.getLocks(); + assertEquals(1, locks.size()); + + resource = locks.get(0); + assertLockResource(resource, LockedResourceType.PEER, peerId); + // LockedResource owner still is the origin procedure + assertExclusiveLock(resource, procedure); + // The new procedure should in the waiting list + assertEquals(1, resource.getWaitingProcedures().size()); + } + @Test public void testListLocksWaiting() throws Exception { LockProcedure procedure1 = createExclusiveLockProcedure(1); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureSchedulerConcurrency.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureSchedulerConcurrency.java index 2e8e52ae24e..4e67a632e25 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureSchedulerConcurrency.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureSchedulerConcurrency.java @@ -26,6 +26,8 @@ import java.util.concurrent.atomic.AtomicInteger; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.master.procedure.PeerProcedureInterface.PeerOperationType; +import org.apache.hadoop.hbase.master.procedure.TestMasterProcedureScheduler.TestPeerProcedure; import org.apache.hadoop.hbase.master.procedure.TestMasterProcedureScheduler.TestTableProcedure; import org.apache.hadoop.hbase.procedure2.Procedure; import org.apache.hadoop.hbase.testclassification.MediumTests; @@ -63,6 +65,85 @@ public class TestMasterProcedureSchedulerConcurrency { queue.clear(); } + @Test + public void testConcurrentPeerOperations() throws Exception { + TestPeerProcedureSet procSet = new TestPeerProcedureSet(queue); + + int NUM_ITEMS = 10; + int NUM_PEERS = 5; + AtomicInteger opsCount = new AtomicInteger(0); + for (int i = 0; i < NUM_PEERS; ++i) { + String peerId = String.format("test-peer-%04d", i); + for (int j = 1; j < NUM_ITEMS; ++j) { + procSet.addBack(new TestPeerProcedure(i * 100 + j, peerId, PeerOperationType.ADD)); + opsCount.incrementAndGet(); + } + } + assertEquals(opsCount.get(), queue.size()); + + Thread[] threads = new Thread[NUM_PEERS * 2]; + HashSet concurrentPeers = new HashSet<>(); + ArrayList failures = new ArrayList<>(); + AtomicInteger concurrentCount = new AtomicInteger(0); + for (int i = 0; i < threads.length; ++i) { + threads[i] = new Thread() { + @Override + public void run() { + while (opsCount.get() > 0) { + try { + TestPeerProcedure proc = procSet.acquire(); + if (proc == null) { + queue.signalAll(); + if (opsCount.get() > 0) { + continue; + } + break; + } + + String peerId = proc.getPeerId(); + synchronized (concurrentPeers) { + assertTrue("unexpected concurrency on " + peerId, concurrentPeers.add(peerId)); + } + assertTrue(opsCount.decrementAndGet() >= 0); + + try { + long procId = proc.getProcId(); + int concurrent = concurrentCount.incrementAndGet(); + assertTrue("inc-concurrent="+ concurrent +" 1 <= concurrent <= "+ NUM_PEERS, + concurrent >= 1 && concurrent <= NUM_PEERS); + LOG.debug("[S] peerId="+ peerId +" procId="+ procId +" concurrent="+ concurrent); + Thread.sleep(2000); + concurrent = concurrentCount.decrementAndGet(); + LOG.debug("[E] peerId="+ peerId +" procId="+ procId +" concurrent="+ concurrent); + assertTrue("dec-concurrent=" + concurrent, concurrent < NUM_PEERS); + } finally { + synchronized (concurrentPeers) { + assertTrue(concurrentPeers.remove(peerId)); + } + procSet.release(proc); + } + } catch (Throwable e) { + LOG.error("Failed " + e.getMessage(), e); + synchronized (failures) { + failures.add(e.getMessage()); + } + } finally { + queue.signalAll(); + } + } + } + }; + threads[i].start(); + } + + for (int i = 0; i < threads.length; ++i) { + threads[i].join(); + } + assertTrue(failures.toString(), failures.isEmpty()); + assertEquals(0, opsCount.get()); + assertEquals(0, queue.size()); + } + /** * Verify that "write" operations for a single table are serialized, * but different tables can be executed in parallel. @@ -221,4 +302,58 @@ public class TestMasterProcedureSchedulerConcurrency { return ((TableProcedureInterface)proc).getTableOperationType(); } } + + public static class TestPeerProcedureSet { + private final MasterProcedureScheduler queue; + + public TestPeerProcedureSet(final MasterProcedureScheduler queue) { + this.queue = queue; + } + + public void addBack(TestPeerProcedure proc) { + queue.addBack(proc); + } + + public TestPeerProcedure acquire() { + TestPeerProcedure proc = null; + boolean waiting = true; + while (waiting && queue.size() > 0) { + proc = (TestPeerProcedure) queue.poll(100000000L); + if (proc == null) { + continue; + } + switch (proc.getPeerOperationType()) { + case ADD: + case REMOVE: + case ENABLE: + case DISABLE: + case UPDATE_CONFIG: + waiting = queue.waitPeerExclusiveLock(proc, proc.getPeerId()); + break; + case REFRESH: + waiting = false; + break; + default: + throw new UnsupportedOperationException(); + } + } + return proc; + } + + public void release(TestPeerProcedure proc) { + switch (proc.getPeerOperationType()) { + case ADD: + case REMOVE: + case ENABLE: + case DISABLE: + case UPDATE_CONFIG: + queue.wakePeerExclusiveLock(proc, proc.getPeerId()); + break; + case REFRESH: + break; + default: + throw new UnsupportedOperationException(); + } + } + } }