diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/Procedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/Procedure.java index fd15a481c07..05ef141a8d6 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/Procedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/Procedure.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.procedure; import java.io.IOException; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.CountDownLatch; @@ -105,6 +106,7 @@ public class Procedure implements Callable, ForeignExceptionListener { private Object joinBarrierLock = new Object(); private final List acquiringMembers; private final List inBarrierMembers; + private final HashMap dataFromFinishedMembers; private ProcedureCoordinator coord; /** @@ -125,6 +127,7 @@ public class Procedure implements Callable, ForeignExceptionListener { this.coord = coord; this.acquiringMembers = new ArrayList(expectedMembers); this.inBarrierMembers = new ArrayList(acquiringMembers.size()); + this.dataFromFinishedMembers = new HashMap(); this.procName = procName; this.args = args; this.monitor = monitor; @@ -311,8 +314,9 @@ public class Procedure implements Callable, ForeignExceptionListener { * Call back triggered by a individual member upon successful local in-barrier execution and * release * @param member + * @param dataFromMember */ - public void barrierReleasedByMember(String member) { + public void barrierReleasedByMember(String member, byte[] dataFromMember) { boolean removed = false; synchronized (joinBarrierLock) { removed = this.inBarrierMembers.remove(member); @@ -328,6 +332,7 @@ public class Procedure implements Callable, ForeignExceptionListener { LOG.warn("Member: '" + member + "' released barrier for procedure'" + procName + "', but we weren't waiting on it to release!"); } + dataFromFinishedMembers.put(member, dataFromMember); } /** @@ -341,6 +346,19 @@ public class Procedure implements Callable, ForeignExceptionListener { waitForLatch(completedLatch, monitor, wakeFrequency, procName + " completed"); } + /** + * Waits until the entire procedure has globally completed, or has been aborted. If an + * exception is thrown the procedure may or not have run cleanup to trigger the completion latch + * yet. + * @return data returned from procedure members upon successfully completing subprocedure. + * @throws ForeignException + * @throws InterruptedException + */ + public HashMap waitForCompletedWithRet() throws ForeignException, InterruptedException { + waitForCompleted(); + return dataFromFinishedMembers; + } + /** * Check if the entire procedure has globally completed, or has been aborted. * @throws ForeignException diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ProcedureCoordinator.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ProcedureCoordinator.java index 516365d0180..fe7318b8bd2 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ProcedureCoordinator.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ProcedureCoordinator.java @@ -284,14 +284,15 @@ public class ProcedureCoordinator { * via {@link Subprocedure#insideBarrier()}. * @param procName name of the subprocedure that finished * @param member name of the member that executed and released its barrier + * @param dataFromMember the data that the member returned along with the notification */ - void memberFinishedBarrier(String procName, final String member) { + void memberFinishedBarrier(String procName, final String member, byte[] dataFromMember) { Procedure proc = procedures.get(procName); if (proc == null) { LOG.warn("Member '"+ member +"' is trying to release an unknown procedure '"+ procName +"'"); return; } - proc.barrierReleasedByMember(member); + proc.barrierReleasedByMember(member, dataFromMember); } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ProcedureMemberRpcs.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ProcedureMemberRpcs.java index e6b391979ce..b2754afb250 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ProcedureMemberRpcs.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ProcedureMemberRpcs.java @@ -66,7 +66,8 @@ public interface ProcedureMemberRpcs extends Closeable { * needed to be done under the global barrier. * * @param sub the specified {@link Subprocedure} + * @param data the data the member returns to the coordinator along with the notification * @throws IOException if we can't reach the coordinator */ - void sendMemberCompleted(Subprocedure sub) throws IOException; + void sendMemberCompleted(Subprocedure sub, byte[] data) throws IOException; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/Subprocedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/Subprocedure.java index ee3d1342f03..fc234f653d5 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/Subprocedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/Subprocedure.java @@ -179,12 +179,12 @@ abstract public class Subprocedure implements Callable { // semantics. LOG.debug("Subprocedure '" + barrierName + "' received 'reached' from coordinator."); - insideBarrier(); + byte[] dataToCoordinator = insideBarrier(); LOG.debug("Subprocedure '" + barrierName + "' locally completed"); rethrowException(); // Ack that the member has executed and released local barrier - rpcs.sendMemberCompleted(this); + rpcs.sendMemberCompleted(this, dataToCoordinator); LOG.debug("Subprocedure '" + barrierName + "' has notified controller of completion"); // make sure we didn't get an external exception @@ -244,12 +244,13 @@ abstract public class Subprocedure implements Callable { * has been satisfied. Continuing the previous example, a condition could be that all RS's * globally have been quiesced, and procedures that require this precondition could be * implemented here. - * - * Users should override this method. If quiescense is not required, this can be a no-op - * + * The implementation should also collect the result of the subprocedure as data to be returned + * to the coordinator upon successful completion. + * Users should override this method. + * @return the data the subprocedure wants to return to coordinator side. * @throws ForeignException */ - abstract public void insideBarrier() throws ForeignException; + abstract public byte[] insideBarrier() throws ForeignException; /** * Users should override this method. This implementation of this method should rollback and @@ -325,7 +326,9 @@ abstract public class Subprocedure implements Callable { public void acquireBarrier() throws ForeignException {} @Override - public void insideBarrier() throws ForeignException {} + public byte[] insideBarrier() throws ForeignException { + return new byte[0]; + } @Override public void cleanup(Exception e) {} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ZKProcedureCoordinatorRpcs.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ZKProcedureCoordinatorRpcs.java index 4e2071896b5..7b53a3a0dba 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ZKProcedureCoordinatorRpcs.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ZKProcedureCoordinatorRpcs.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hbase.procedure; import java.io.IOException; +import java.io.InterruptedIOException; import java.util.Arrays; import java.util.List; @@ -120,11 +121,23 @@ public class ZKProcedureCoordinatorRpcs implements ProcedureCoordinatorRpcs { for (String node : nodeNames) { String znode = ZKUtil.joinZNode(reachedNode, node); if (ZKUtil.watchAndCheckExists(zkProc.getWatcher(), znode)) { - coordinator.memberFinishedBarrier(procName, node); + byte[] dataFromMember = ZKUtil.getData(zkProc.getWatcher(), znode); + // ProtobufUtil.isPBMagicPrefix will check null + if (!ProtobufUtil.isPBMagicPrefix(dataFromMember)) { + throw new IOException( + "Failed to get data from finished node or data is illegally formatted: " + + znode); + } else { + dataFromMember = Arrays.copyOfRange(dataFromMember, ProtobufUtil.lengthOfPBMagic(), + dataFromMember.length); + coordinator.memberFinishedBarrier(procName, node, dataFromMember); + } } } } catch (KeeperException e) { throw new IOException("Failed while creating reached node:" + reachedNode, e); + } catch (InterruptedException e) { + throw new InterruptedIOException("Interrupted while creating reached node:" + reachedNode); } } @@ -177,8 +190,31 @@ public class ZKProcedureCoordinatorRpcs implements ProcedureCoordinatorRpcs { // node was absent when we created the watch so zk event triggers the finished barrier. // TODO Nothing enforces that acquire and reached znodes from showing up in wrong order. - coordinator.memberFinishedBarrier(ZKUtil.getNodeName(ZKUtil.getParent(path)), - ZKUtil.getNodeName(path)); + String procName = ZKUtil.getNodeName(ZKUtil.getParent(path)); + String member = ZKUtil.getNodeName(path); + // get the data from the procedure member + try { + byte[] dataFromMember = ZKUtil.getData(watcher, path); + // ProtobufUtil.isPBMagicPrefix will check null + if (!ProtobufUtil.isPBMagicPrefix(dataFromMember)) { + ForeignException ee = new ForeignException(coordName, + "Failed to get data from finished node or data is illegally formatted:" + + path); + coordinator.abortProcedure(procName, ee); + } else { + dataFromMember = Arrays.copyOfRange(dataFromMember, ProtobufUtil.lengthOfPBMagic(), + dataFromMember.length); + LOG.debug("Finished data from procedure '" + procName + + "' member '" + member + "': " + new String(dataFromMember)); + coordinator.memberFinishedBarrier(procName, member, dataFromMember); + } + } catch (KeeperException e) { + ForeignException ee = new ForeignException(coordName, e); + coordinator.abortProcedure(procName, ee); + } catch (InterruptedException e) { + ForeignException ee = new ForeignException(coordName, e); + coordinator.abortProcedure(procName, ee); + } } else if (isAbortPathNode(path)) { abort(path); } else { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ZKProcedureMemberRpcs.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ZKProcedureMemberRpcs.java index 4c1623c28fb..cfb2040e755 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ZKProcedureMemberRpcs.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ZKProcedureMemberRpcs.java @@ -257,16 +257,21 @@ public class ZKProcedureMemberRpcs implements ProcedureMemberRpcs { } /** - * This acts as the ack for a completed snapshot + * This acts as the ack for a completed procedure */ @Override - public void sendMemberCompleted(Subprocedure sub) throws IOException { + public void sendMemberCompleted(Subprocedure sub, byte[] data) throws IOException { String procName = sub.getName(); LOG.debug("Marking procedure '" + procName + "' completed for member '" + memberName + "' in zk"); String joinPath = ZKUtil.joinZNode(zkController.getReachedBarrierNode(procName), memberName); + // ProtobufUtil.prependPBMagic does not take care of null + if (data == null) { + data = new byte[0]; + } try { - ZKUtil.createAndFailSilent(zkController.getWatcher(), joinPath); + ZKUtil.createAndFailSilent(zkController.getWatcher(), joinPath, + ProtobufUtil.prependPBMagic(data)); } catch (KeeperException e) { member.controllerConnectionFailure("Failed to post zk node:" + joinPath + " to join procedure barrier.", new IOException(e)); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/flush/FlushTableSubprocedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/flush/FlushTableSubprocedure.java index 370f18189ea..dd3c3822d52 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/flush/FlushTableSubprocedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/flush/FlushTableSubprocedure.java @@ -114,8 +114,9 @@ public class FlushTableSubprocedure extends Subprocedure { } @Override - public void insideBarrier() throws ForeignException { + public byte[] insideBarrier() throws ForeignException { // No-Op + return new byte[0]; } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/snapshot/FlushSnapshotSubprocedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/snapshot/FlushSnapshotSubprocedure.java index b0a1b33d687..a7a5186ad09 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/snapshot/FlushSnapshotSubprocedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/snapshot/FlushSnapshotSubprocedure.java @@ -133,8 +133,9 @@ public class FlushSnapshotSubprocedure extends Subprocedure { * do a flush snapshot of every region on this rs from the target table. */ @Override - public void insideBarrier() throws ForeignException { + public byte[] insideBarrier() throws ForeignException { flushSnapshot(); + return new byte[0]; } /** diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure/SimpleRSProcedureManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure/SimpleRSProcedureManager.java index f355bbc9aea..d277c3a4815 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure/SimpleRSProcedureManager.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure/SimpleRSProcedureManager.java @@ -247,8 +247,9 @@ public class SimpleRSProcedureManager extends RegionServerProcedureManager { * do a log roll. */ @Override - public void insideBarrier() throws ForeignException { + public byte[] insideBarrier() throws ForeignException { execute(); + return new byte[0]; } /** diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure/TestProcedure.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure/TestProcedure.java index b08bc8972a9..dc256c3ee06 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure/TestProcedure.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure/TestProcedure.java @@ -117,7 +117,7 @@ public class TestProcedure { // or was not called here. // member: trigger global barrier release - proc.barrierReleasedByMember(members.get(0)); + proc.barrierReleasedByMember(members.get(0), new byte[0]); // coordinator: wait for procedure to be completed proc.completedProcedure.await(); @@ -168,8 +168,8 @@ public class TestProcedure { verify(procspy).sendGlobalBarrierStart(); // old news // member 1, 2: trigger global barrier release - procspy.barrierReleasedByMember(members.get(0)); - procspy.barrierReleasedByMember(members.get(1)); + procspy.barrierReleasedByMember(members.get(0), new byte[0]); + procspy.barrierReleasedByMember(members.get(1), new byte[0]); // coordinator wait for procedure to be completed procspy.completedProcedure.await(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure/TestProcedureCoordinator.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure/TestProcedureCoordinator.java index b23c392f355..5032570d42a 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure/TestProcedureCoordinator.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure/TestProcedureCoordinator.java @@ -218,12 +218,12 @@ public class TestProcedureCoordinator { // then do some fun where we commit before all nodes have prepared // "one" commits before anyone else is done ref.memberAcquiredBarrier(this.opName, this.cohort[0]); - ref.memberFinishedBarrier(this.opName, this.cohort[0]); + ref.memberFinishedBarrier(this.opName, this.cohort[0], new byte[0]); // but "two" takes a while ref.memberAcquiredBarrier(this.opName, this.cohort[1]); // "three"jumps ahead ref.memberAcquiredBarrier(this.opName, this.cohort[2]); - ref.memberFinishedBarrier(this.opName, this.cohort[2]); + ref.memberFinishedBarrier(this.opName, this.cohort[2], new byte[0]); // and "four" takes a while ref.memberAcquiredBarrier(this.opName, this.cohort[3]); } @@ -232,8 +232,8 @@ public class TestProcedureCoordinator { BarrierAnswer commit = new BarrierAnswer(procName, cohort) { @Override public void doWork() { - ref.memberFinishedBarrier(opName, this.cohort[1]); - ref.memberFinishedBarrier(opName, this.cohort[3]); + ref.memberFinishedBarrier(opName, this.cohort[1], new byte[0]); + ref.memberFinishedBarrier(opName, this.cohort[3], new byte[0]); } }; runCoordinatedOperation(spy, prepare, commit, cohort); @@ -343,7 +343,8 @@ public class TestProcedureCoordinator { public void doWork() { if (cohort == null) return; for (String member : cohort) { - TestProcedureCoordinator.this.coordinator.memberFinishedBarrier(opName, member); + TestProcedureCoordinator.this.coordinator.memberFinishedBarrier(opName, member, + new byte[0]); } } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure/TestProcedureMember.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure/TestProcedureMember.java index b3a1f5840ea..8ede86044ad 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure/TestProcedureMember.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure/TestProcedureMember.java @@ -146,7 +146,7 @@ public class TestProcedureMember { order.verify(spy).acquireBarrier(); order.verify(mockMemberComms).sendMemberAcquired(eq(spy)); order.verify(spy).insideBarrier(); - order.verify(mockMemberComms).sendMemberCompleted(eq(spy)); + order.verify(mockMemberComms).sendMemberCompleted(eq(spy), eq(data)); order.verify(mockMemberComms, never()).sendMemberAborted(eq(spy), any(ForeignException.class)); } @@ -181,7 +181,7 @@ public class TestProcedureMember { // Later phases not run order.verify(mockMemberComms, never()).sendMemberAcquired(eq(spySub)); order.verify(spySub, never()).insideBarrier(); - order.verify(mockMemberComms, never()).sendMemberCompleted(eq(spySub)); + order.verify(mockMemberComms, never()).sendMemberCompleted(eq(spySub), eq(data)); // error recovery path exercised order.verify(spySub).cancel(anyString(), any(Exception.class)); order.verify(spySub).cleanup(any(Exception.class)); @@ -217,7 +217,7 @@ public class TestProcedureMember { // Later phases not run order.verify(spySub, never()).insideBarrier(); - order.verify(mockMemberComms, never()).sendMemberCompleted(eq(spySub)); + order.verify(mockMemberComms, never()).sendMemberCompleted(eq(spySub), eq(data)); // error recovery path exercised order.verify(spySub).cancel(anyString(), any(Exception.class)); order.verify(spySub).cleanup(any(Exception.class)); @@ -260,7 +260,7 @@ public class TestProcedureMember { order.verify(mockMemberComms).sendMemberAcquired(eq(spySub)); // Later phases not run order.verify(spySub, never()).insideBarrier(); - order.verify(mockMemberComms, never()).sendMemberCompleted(eq(spySub)); + order.verify(mockMemberComms, never()).sendMemberCompleted(eq(spySub), eq(data)); // error recovery path exercised order.verify(spySub).cancel(anyString(), any(Exception.class)); order.verify(spySub).cleanup(any(Exception.class)); @@ -301,7 +301,7 @@ public class TestProcedureMember { order.verify(spySub).insideBarrier(); // Later phases not run - order.verify(mockMemberComms, never()).sendMemberCompleted(eq(spySub)); + order.verify(mockMemberComms, never()).sendMemberCompleted(eq(spySub), eq(data)); // error recovery path exercised order.verify(spySub).cancel(anyString(), any(Exception.class)); order.verify(spySub).cleanup(any(Exception.class)); @@ -329,7 +329,7 @@ public class TestProcedureMember { Thread.sleep(WAKE_FREQUENCY); return null; } - }).when(mockMemberComms).sendMemberCompleted(any(Subprocedure.class)); + }).when(mockMemberComms).sendMemberCompleted(any(Subprocedure.class), eq(data)); // run the operation // build a new operation @@ -343,7 +343,7 @@ public class TestProcedureMember { order.verify(spySub).acquireBarrier(); order.verify(mockMemberComms).sendMemberAcquired(eq(spySub)); order.verify(spySub).insideBarrier(); - order.verify(mockMemberComms).sendMemberCompleted(eq(spySub)); + order.verify(mockMemberComms).sendMemberCompleted(eq(spySub), eq(data)); // error recovery path exercised order.verify(spySub).cancel(anyString(), any(Exception.class)); order.verify(spySub).cleanup(any(Exception.class)); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure/TestZKProcedureControllers.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure/TestZKProcedureControllers.java index 18f9bd1c0e6..0529142b6b4 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure/TestZKProcedureControllers.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure/TestZKProcedureControllers.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hbase.procedure; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertArrayEquals; import static org.mockito.Mockito.never; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; @@ -60,6 +61,8 @@ public class TestZKProcedureControllers { private static final String CONTROLLER_NODE_NAME = "controller"; private static final VerificationMode once = Mockito.times(1); + private final byte[] memberData = new String("data from member").getBytes(); + @BeforeClass public static void setupTest() throws Exception { UTIL.startMiniZKCluster(); @@ -105,7 +108,7 @@ public class TestZKProcedureControllers { Mockito.doAnswer(new Answer() { @Override public Void answer(InvocationOnMock invocation) throws Throwable { - controller.sendMemberCompleted(sub); + controller.sendMemberCompleted(sub, memberData); committed.countDown(); return null; } @@ -176,9 +179,11 @@ public class TestZKProcedureControllers { CountDownLatch prepared = new CountDownLatch(expected.size()); CountDownLatch committed = new CountDownLatch(expected.size()); + ArrayList dataFromMembers = new ArrayList(); + // mock out coordinator so we can keep track of zk progress ProcedureCoordinator coordinator = setupMockCoordinator(operationName, - prepared, committed); + prepared, committed, dataFromMembers); ProcedureMember member = Mockito.mock(ProcedureMember.class); @@ -208,14 +213,20 @@ public class TestZKProcedureControllers { // post the committed node for each expected node for (ZKProcedureMemberRpcs cc : cohortControllers) { - cc.sendMemberCompleted(sub); + cc.sendMemberCompleted(sub, memberData); } // wait for all commit notifications to reach the coordinator committed.await(); // make sure we got the all the nodes and no more Mockito.verify(coordinator, times(expected.size())).memberFinishedBarrier(Mockito.eq(operationName), - Mockito.anyString()); + Mockito.anyString(), Mockito.eq(memberData)); + + assertEquals("Incorrect number of members returnd data", expected.size(), + dataFromMembers.size()); + for (byte[] result : dataFromMembers) { + assertArrayEquals("Incorrect data from member", memberData, result); + } controller.resetMembers(p); @@ -244,9 +255,11 @@ public class TestZKProcedureControllers { final CountDownLatch prepared = new CountDownLatch(expected.size()); final CountDownLatch committed = new CountDownLatch(expected.size()); + ArrayList dataFromMembers = new ArrayList(); + // mock out coordinator so we can keep track of zk progress ProcedureCoordinator coordinator = setupMockCoordinator(operationName, - prepared, committed); + prepared, committed, dataFromMembers); ProcedureMember member = Mockito.mock(ProcedureMember.class); Procedure p = Mockito.mock(Procedure.class); @@ -281,14 +294,14 @@ public class TestZKProcedureControllers { // post the committed node for each expected node for (ZKProcedureMemberRpcs cc : cohortControllers) { - cc.sendMemberCompleted(sub); + cc.sendMemberCompleted(sub, memberData); } // wait for all commit notifications to reach the coordiantor committed.await(); // make sure we got the all the nodes and no more Mockito.verify(coordinator, times(expected.size())).memberFinishedBarrier(Mockito.eq(operationName), - Mockito.anyString()); + Mockito.anyString(), Mockito.eq(memberData)); controller.resetMembers(p); @@ -299,11 +312,13 @@ public class TestZKProcedureControllers { } /** + * @param dataFromMembers * @return a mock {@link ProcedureCoordinator} that just counts down the * prepared and committed latch for called to the respective method */ private ProcedureCoordinator setupMockCoordinator(String operationName, - final CountDownLatch prepared, final CountDownLatch committed) { + final CountDownLatch prepared, final CountDownLatch committed, + final ArrayList dataFromMembers) { ProcedureCoordinator coordinator = Mockito .mock(ProcedureCoordinator.class); Mockito.mock(ProcedureCoordinator.class); @@ -317,10 +332,12 @@ public class TestZKProcedureControllers { Mockito.doAnswer(new Answer() { @Override public Void answer(InvocationOnMock invocation) throws Throwable { + dataFromMembers.add(memberData); committed.countDown(); return null; } - }).when(coordinator).memberFinishedBarrier(Mockito.eq(operationName), Mockito.anyString()); + }).when(coordinator).memberFinishedBarrier(Mockito.eq(operationName), Mockito.anyString(), + Mockito.eq(memberData)); return coordinator; } @@ -356,7 +373,7 @@ public class TestZKProcedureControllers { // verify that we got all the expected nodes for (String node : expected) { verify(coordinator, once).memberAcquiredBarrier(operationName, node); - verify(coordinator, once).memberFinishedBarrier(operationName, node); + verify(coordinator, once).memberFinishedBarrier(operationName, node, memberData); } }