diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/Procedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/Procedure.java index 05ef141a8d6..fd15a481c07 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/Procedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/Procedure.java @@ -19,7 +19,6 @@ package org.apache.hadoop.hbase.procedure; import java.io.IOException; import java.util.ArrayList; -import java.util.HashMap; import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.CountDownLatch; @@ -106,7 +105,6 @@ public class Procedure implements Callable, ForeignExceptionListener { private Object joinBarrierLock = new Object(); private final List acquiringMembers; private final List inBarrierMembers; - private final HashMap dataFromFinishedMembers; private ProcedureCoordinator coord; /** @@ -127,7 +125,6 @@ public class Procedure implements Callable, ForeignExceptionListener { this.coord = coord; this.acquiringMembers = new ArrayList(expectedMembers); this.inBarrierMembers = new ArrayList(acquiringMembers.size()); - this.dataFromFinishedMembers = new HashMap(); this.procName = procName; this.args = args; this.monitor = monitor; @@ -314,9 +311,8 @@ public class Procedure implements Callable, ForeignExceptionListener { * Call back triggered by a individual member upon successful local in-barrier execution and * release * @param member - * @param dataFromMember */ - public void barrierReleasedByMember(String member, byte[] dataFromMember) { + public void barrierReleasedByMember(String member) { boolean removed = false; synchronized (joinBarrierLock) { removed = this.inBarrierMembers.remove(member); @@ -332,7 +328,6 @@ public class Procedure implements Callable, ForeignExceptionListener { LOG.warn("Member: '" + member + "' released barrier for procedure'" + procName + "', but we weren't waiting on it to release!"); } - dataFromFinishedMembers.put(member, dataFromMember); } /** @@ -346,19 +341,6 @@ public class Procedure implements Callable, ForeignExceptionListener { waitForLatch(completedLatch, monitor, wakeFrequency, procName + " completed"); } - /** - * Waits until the entire procedure has globally completed, or has been aborted. If an - * exception is thrown the procedure may or not have run cleanup to trigger the completion latch - * yet. - * @return data returned from procedure members upon successfully completing subprocedure. - * @throws ForeignException - * @throws InterruptedException - */ - public HashMap waitForCompletedWithRet() throws ForeignException, InterruptedException { - waitForCompleted(); - return dataFromFinishedMembers; - } - /** * Check if the entire procedure has globally completed, or has been aborted. * @throws ForeignException diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ProcedureCoordinator.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ProcedureCoordinator.java index fe7318b8bd2..516365d0180 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ProcedureCoordinator.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ProcedureCoordinator.java @@ -284,15 +284,14 @@ public class ProcedureCoordinator { * via {@link Subprocedure#insideBarrier()}. * @param procName name of the subprocedure that finished * @param member name of the member that executed and released its barrier - * @param dataFromMember the data that the member returned along with the notification */ - void memberFinishedBarrier(String procName, final String member, byte[] dataFromMember) { + void memberFinishedBarrier(String procName, final String member) { Procedure proc = procedures.get(procName); if (proc == null) { LOG.warn("Member '"+ member +"' is trying to release an unknown procedure '"+ procName +"'"); return; } - proc.barrierReleasedByMember(member, dataFromMember); + proc.barrierReleasedByMember(member); } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ProcedureMemberRpcs.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ProcedureMemberRpcs.java index b2754afb250..e6b391979ce 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ProcedureMemberRpcs.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ProcedureMemberRpcs.java @@ -66,8 +66,7 @@ public interface ProcedureMemberRpcs extends Closeable { * needed to be done under the global barrier. * * @param sub the specified {@link Subprocedure} - * @param data the data the member returns to the coordinator along with the notification * @throws IOException if we can't reach the coordinator */ - void sendMemberCompleted(Subprocedure sub, byte[] data) throws IOException; + void sendMemberCompleted(Subprocedure sub) throws IOException; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/Subprocedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/Subprocedure.java index fc234f653d5..ee3d1342f03 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/Subprocedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/Subprocedure.java @@ -179,12 +179,12 @@ abstract public class Subprocedure implements Callable { // semantics. LOG.debug("Subprocedure '" + barrierName + "' received 'reached' from coordinator."); - byte[] dataToCoordinator = insideBarrier(); + insideBarrier(); LOG.debug("Subprocedure '" + barrierName + "' locally completed"); rethrowException(); // Ack that the member has executed and released local barrier - rpcs.sendMemberCompleted(this, dataToCoordinator); + rpcs.sendMemberCompleted(this); LOG.debug("Subprocedure '" + barrierName + "' has notified controller of completion"); // make sure we didn't get an external exception @@ -244,13 +244,12 @@ abstract public class Subprocedure implements Callable { * has been satisfied. Continuing the previous example, a condition could be that all RS's * globally have been quiesced, and procedures that require this precondition could be * implemented here. - * The implementation should also collect the result of the subprocedure as data to be returned - * to the coordinator upon successful completion. - * Users should override this method. - * @return the data the subprocedure wants to return to coordinator side. + * + * Users should override this method. If quiescense is not required, this can be a no-op + * * @throws ForeignException */ - abstract public byte[] insideBarrier() throws ForeignException; + abstract public void insideBarrier() throws ForeignException; /** * Users should override this method. This implementation of this method should rollback and @@ -326,9 +325,7 @@ abstract public class Subprocedure implements Callable { public void acquireBarrier() throws ForeignException {} @Override - public byte[] insideBarrier() throws ForeignException { - return new byte[0]; - } + public void insideBarrier() throws ForeignException {} @Override public void cleanup(Exception e) {} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ZKProcedureCoordinatorRpcs.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ZKProcedureCoordinatorRpcs.java index 7b53a3a0dba..4e2071896b5 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ZKProcedureCoordinatorRpcs.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ZKProcedureCoordinatorRpcs.java @@ -18,7 +18,6 @@ package org.apache.hadoop.hbase.procedure; import java.io.IOException; -import java.io.InterruptedIOException; import java.util.Arrays; import java.util.List; @@ -121,23 +120,11 @@ public class ZKProcedureCoordinatorRpcs implements ProcedureCoordinatorRpcs { for (String node : nodeNames) { String znode = ZKUtil.joinZNode(reachedNode, node); if (ZKUtil.watchAndCheckExists(zkProc.getWatcher(), znode)) { - byte[] dataFromMember = ZKUtil.getData(zkProc.getWatcher(), znode); - // ProtobufUtil.isPBMagicPrefix will check null - if (!ProtobufUtil.isPBMagicPrefix(dataFromMember)) { - throw new IOException( - "Failed to get data from finished node or data is illegally formatted: " - + znode); - } else { - dataFromMember = Arrays.copyOfRange(dataFromMember, ProtobufUtil.lengthOfPBMagic(), - dataFromMember.length); - coordinator.memberFinishedBarrier(procName, node, dataFromMember); - } + coordinator.memberFinishedBarrier(procName, node); } } } catch (KeeperException e) { throw new IOException("Failed while creating reached node:" + reachedNode, e); - } catch (InterruptedException e) { - throw new InterruptedIOException("Interrupted while creating reached node:" + reachedNode); } } @@ -190,31 +177,8 @@ public class ZKProcedureCoordinatorRpcs implements ProcedureCoordinatorRpcs { // node was absent when we created the watch so zk event triggers the finished barrier. // TODO Nothing enforces that acquire and reached znodes from showing up in wrong order. - String procName = ZKUtil.getNodeName(ZKUtil.getParent(path)); - String member = ZKUtil.getNodeName(path); - // get the data from the procedure member - try { - byte[] dataFromMember = ZKUtil.getData(watcher, path); - // ProtobufUtil.isPBMagicPrefix will check null - if (!ProtobufUtil.isPBMagicPrefix(dataFromMember)) { - ForeignException ee = new ForeignException(coordName, - "Failed to get data from finished node or data is illegally formatted:" - + path); - coordinator.abortProcedure(procName, ee); - } else { - dataFromMember = Arrays.copyOfRange(dataFromMember, ProtobufUtil.lengthOfPBMagic(), - dataFromMember.length); - LOG.debug("Finished data from procedure '" + procName - + "' member '" + member + "': " + new String(dataFromMember)); - coordinator.memberFinishedBarrier(procName, member, dataFromMember); - } - } catch (KeeperException e) { - ForeignException ee = new ForeignException(coordName, e); - coordinator.abortProcedure(procName, ee); - } catch (InterruptedException e) { - ForeignException ee = new ForeignException(coordName, e); - coordinator.abortProcedure(procName, ee); - } + coordinator.memberFinishedBarrier(ZKUtil.getNodeName(ZKUtil.getParent(path)), + ZKUtil.getNodeName(path)); } else if (isAbortPathNode(path)) { abort(path); } else { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ZKProcedureMemberRpcs.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ZKProcedureMemberRpcs.java index cfb2040e755..4c1623c28fb 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ZKProcedureMemberRpcs.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ZKProcedureMemberRpcs.java @@ -257,21 +257,16 @@ public class ZKProcedureMemberRpcs implements ProcedureMemberRpcs { } /** - * This acts as the ack for a completed procedure + * This acts as the ack for a completed snapshot */ @Override - public void sendMemberCompleted(Subprocedure sub, byte[] data) throws IOException { + public void sendMemberCompleted(Subprocedure sub) throws IOException { String procName = sub.getName(); LOG.debug("Marking procedure '" + procName + "' completed for member '" + memberName + "' in zk"); String joinPath = ZKUtil.joinZNode(zkController.getReachedBarrierNode(procName), memberName); - // ProtobufUtil.prependPBMagic does not take care of null - if (data == null) { - data = new byte[0]; - } try { - ZKUtil.createAndFailSilent(zkController.getWatcher(), joinPath, - ProtobufUtil.prependPBMagic(data)); + ZKUtil.createAndFailSilent(zkController.getWatcher(), joinPath); } catch (KeeperException e) { member.controllerConnectionFailure("Failed to post zk node:" + joinPath + " to join procedure barrier.", new IOException(e)); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/flush/FlushTableSubprocedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/flush/FlushTableSubprocedure.java index dd3c3822d52..370f18189ea 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/flush/FlushTableSubprocedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/flush/FlushTableSubprocedure.java @@ -114,9 +114,8 @@ public class FlushTableSubprocedure extends Subprocedure { } @Override - public byte[] insideBarrier() throws ForeignException { + public void insideBarrier() throws ForeignException { // No-Op - return new byte[0]; } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/snapshot/FlushSnapshotSubprocedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/snapshot/FlushSnapshotSubprocedure.java index a7a5186ad09..b0a1b33d687 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/snapshot/FlushSnapshotSubprocedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/snapshot/FlushSnapshotSubprocedure.java @@ -133,9 +133,8 @@ public class FlushSnapshotSubprocedure extends Subprocedure { * do a flush snapshot of every region on this rs from the target table. */ @Override - public byte[] insideBarrier() throws ForeignException { + public void insideBarrier() throws ForeignException { flushSnapshot(); - return new byte[0]; } /** diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure/SimpleRSProcedureManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure/SimpleRSProcedureManager.java index d277c3a4815..f355bbc9aea 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure/SimpleRSProcedureManager.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure/SimpleRSProcedureManager.java @@ -247,9 +247,8 @@ public class SimpleRSProcedureManager extends RegionServerProcedureManager { * do a log roll. */ @Override - public byte[] insideBarrier() throws ForeignException { + public void insideBarrier() throws ForeignException { execute(); - return new byte[0]; } /** diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure/TestProcedure.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure/TestProcedure.java index dc256c3ee06..b08bc8972a9 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure/TestProcedure.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure/TestProcedure.java @@ -117,7 +117,7 @@ public class TestProcedure { // or was not called here. // member: trigger global barrier release - proc.barrierReleasedByMember(members.get(0), new byte[0]); + proc.barrierReleasedByMember(members.get(0)); // coordinator: wait for procedure to be completed proc.completedProcedure.await(); @@ -168,8 +168,8 @@ public class TestProcedure { verify(procspy).sendGlobalBarrierStart(); // old news // member 1, 2: trigger global barrier release - procspy.barrierReleasedByMember(members.get(0), new byte[0]); - procspy.barrierReleasedByMember(members.get(1), new byte[0]); + procspy.barrierReleasedByMember(members.get(0)); + procspy.barrierReleasedByMember(members.get(1)); // coordinator wait for procedure to be completed procspy.completedProcedure.await(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure/TestProcedureCoordinator.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure/TestProcedureCoordinator.java index 5032570d42a..b23c392f355 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure/TestProcedureCoordinator.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure/TestProcedureCoordinator.java @@ -218,12 +218,12 @@ public class TestProcedureCoordinator { // then do some fun where we commit before all nodes have prepared // "one" commits before anyone else is done ref.memberAcquiredBarrier(this.opName, this.cohort[0]); - ref.memberFinishedBarrier(this.opName, this.cohort[0], new byte[0]); + ref.memberFinishedBarrier(this.opName, this.cohort[0]); // but "two" takes a while ref.memberAcquiredBarrier(this.opName, this.cohort[1]); // "three"jumps ahead ref.memberAcquiredBarrier(this.opName, this.cohort[2]); - ref.memberFinishedBarrier(this.opName, this.cohort[2], new byte[0]); + ref.memberFinishedBarrier(this.opName, this.cohort[2]); // and "four" takes a while ref.memberAcquiredBarrier(this.opName, this.cohort[3]); } @@ -232,8 +232,8 @@ public class TestProcedureCoordinator { BarrierAnswer commit = new BarrierAnswer(procName, cohort) { @Override public void doWork() { - ref.memberFinishedBarrier(opName, this.cohort[1], new byte[0]); - ref.memberFinishedBarrier(opName, this.cohort[3], new byte[0]); + ref.memberFinishedBarrier(opName, this.cohort[1]); + ref.memberFinishedBarrier(opName, this.cohort[3]); } }; runCoordinatedOperation(spy, prepare, commit, cohort); @@ -343,8 +343,7 @@ public class TestProcedureCoordinator { public void doWork() { if (cohort == null) return; for (String member : cohort) { - TestProcedureCoordinator.this.coordinator.memberFinishedBarrier(opName, member, - new byte[0]); + TestProcedureCoordinator.this.coordinator.memberFinishedBarrier(opName, member); } } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure/TestProcedureMember.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure/TestProcedureMember.java index 8ede86044ad..b3a1f5840ea 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure/TestProcedureMember.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure/TestProcedureMember.java @@ -146,7 +146,7 @@ public class TestProcedureMember { order.verify(spy).acquireBarrier(); order.verify(mockMemberComms).sendMemberAcquired(eq(spy)); order.verify(spy).insideBarrier(); - order.verify(mockMemberComms).sendMemberCompleted(eq(spy), eq(data)); + order.verify(mockMemberComms).sendMemberCompleted(eq(spy)); order.verify(mockMemberComms, never()).sendMemberAborted(eq(spy), any(ForeignException.class)); } @@ -181,7 +181,7 @@ public class TestProcedureMember { // Later phases not run order.verify(mockMemberComms, never()).sendMemberAcquired(eq(spySub)); order.verify(spySub, never()).insideBarrier(); - order.verify(mockMemberComms, never()).sendMemberCompleted(eq(spySub), eq(data)); + order.verify(mockMemberComms, never()).sendMemberCompleted(eq(spySub)); // error recovery path exercised order.verify(spySub).cancel(anyString(), any(Exception.class)); order.verify(spySub).cleanup(any(Exception.class)); @@ -217,7 +217,7 @@ public class TestProcedureMember { // Later phases not run order.verify(spySub, never()).insideBarrier(); - order.verify(mockMemberComms, never()).sendMemberCompleted(eq(spySub), eq(data)); + order.verify(mockMemberComms, never()).sendMemberCompleted(eq(spySub)); // error recovery path exercised order.verify(spySub).cancel(anyString(), any(Exception.class)); order.verify(spySub).cleanup(any(Exception.class)); @@ -260,7 +260,7 @@ public class TestProcedureMember { order.verify(mockMemberComms).sendMemberAcquired(eq(spySub)); // Later phases not run order.verify(spySub, never()).insideBarrier(); - order.verify(mockMemberComms, never()).sendMemberCompleted(eq(spySub), eq(data)); + order.verify(mockMemberComms, never()).sendMemberCompleted(eq(spySub)); // error recovery path exercised order.verify(spySub).cancel(anyString(), any(Exception.class)); order.verify(spySub).cleanup(any(Exception.class)); @@ -301,7 +301,7 @@ public class TestProcedureMember { order.verify(spySub).insideBarrier(); // Later phases not run - order.verify(mockMemberComms, never()).sendMemberCompleted(eq(spySub), eq(data)); + order.verify(mockMemberComms, never()).sendMemberCompleted(eq(spySub)); // error recovery path exercised order.verify(spySub).cancel(anyString(), any(Exception.class)); order.verify(spySub).cleanup(any(Exception.class)); @@ -329,7 +329,7 @@ public class TestProcedureMember { Thread.sleep(WAKE_FREQUENCY); return null; } - }).when(mockMemberComms).sendMemberCompleted(any(Subprocedure.class), eq(data)); + }).when(mockMemberComms).sendMemberCompleted(any(Subprocedure.class)); // run the operation // build a new operation @@ -343,7 +343,7 @@ public class TestProcedureMember { order.verify(spySub).acquireBarrier(); order.verify(mockMemberComms).sendMemberAcquired(eq(spySub)); order.verify(spySub).insideBarrier(); - order.verify(mockMemberComms).sendMemberCompleted(eq(spySub), eq(data)); + order.verify(mockMemberComms).sendMemberCompleted(eq(spySub)); // error recovery path exercised order.verify(spySub).cancel(anyString(), any(Exception.class)); order.verify(spySub).cleanup(any(Exception.class)); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure/TestZKProcedureControllers.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure/TestZKProcedureControllers.java index 0529142b6b4..18f9bd1c0e6 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure/TestZKProcedureControllers.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure/TestZKProcedureControllers.java @@ -18,7 +18,6 @@ package org.apache.hadoop.hbase.procedure; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertArrayEquals; import static org.mockito.Mockito.never; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; @@ -61,8 +60,6 @@ public class TestZKProcedureControllers { private static final String CONTROLLER_NODE_NAME = "controller"; private static final VerificationMode once = Mockito.times(1); - private final byte[] memberData = new String("data from member").getBytes(); - @BeforeClass public static void setupTest() throws Exception { UTIL.startMiniZKCluster(); @@ -108,7 +105,7 @@ public class TestZKProcedureControllers { Mockito.doAnswer(new Answer() { @Override public Void answer(InvocationOnMock invocation) throws Throwable { - controller.sendMemberCompleted(sub, memberData); + controller.sendMemberCompleted(sub); committed.countDown(); return null; } @@ -179,11 +176,9 @@ public class TestZKProcedureControllers { CountDownLatch prepared = new CountDownLatch(expected.size()); CountDownLatch committed = new CountDownLatch(expected.size()); - ArrayList dataFromMembers = new ArrayList(); - // mock out coordinator so we can keep track of zk progress ProcedureCoordinator coordinator = setupMockCoordinator(operationName, - prepared, committed, dataFromMembers); + prepared, committed); ProcedureMember member = Mockito.mock(ProcedureMember.class); @@ -213,20 +208,14 @@ public class TestZKProcedureControllers { // post the committed node for each expected node for (ZKProcedureMemberRpcs cc : cohortControllers) { - cc.sendMemberCompleted(sub, memberData); + cc.sendMemberCompleted(sub); } // wait for all commit notifications to reach the coordinator committed.await(); // make sure we got the all the nodes and no more Mockito.verify(coordinator, times(expected.size())).memberFinishedBarrier(Mockito.eq(operationName), - Mockito.anyString(), Mockito.eq(memberData)); - - assertEquals("Incorrect number of members returnd data", expected.size(), - dataFromMembers.size()); - for (byte[] result : dataFromMembers) { - assertArrayEquals("Incorrect data from member", memberData, result); - } + Mockito.anyString()); controller.resetMembers(p); @@ -255,11 +244,9 @@ public class TestZKProcedureControllers { final CountDownLatch prepared = new CountDownLatch(expected.size()); final CountDownLatch committed = new CountDownLatch(expected.size()); - ArrayList dataFromMembers = new ArrayList(); - // mock out coordinator so we can keep track of zk progress ProcedureCoordinator coordinator = setupMockCoordinator(operationName, - prepared, committed, dataFromMembers); + prepared, committed); ProcedureMember member = Mockito.mock(ProcedureMember.class); Procedure p = Mockito.mock(Procedure.class); @@ -294,14 +281,14 @@ public class TestZKProcedureControllers { // post the committed node for each expected node for (ZKProcedureMemberRpcs cc : cohortControllers) { - cc.sendMemberCompleted(sub, memberData); + cc.sendMemberCompleted(sub); } // wait for all commit notifications to reach the coordiantor committed.await(); // make sure we got the all the nodes and no more Mockito.verify(coordinator, times(expected.size())).memberFinishedBarrier(Mockito.eq(operationName), - Mockito.anyString(), Mockito.eq(memberData)); + Mockito.anyString()); controller.resetMembers(p); @@ -312,13 +299,11 @@ public class TestZKProcedureControllers { } /** - * @param dataFromMembers * @return a mock {@link ProcedureCoordinator} that just counts down the * prepared and committed latch for called to the respective method */ private ProcedureCoordinator setupMockCoordinator(String operationName, - final CountDownLatch prepared, final CountDownLatch committed, - final ArrayList dataFromMembers) { + final CountDownLatch prepared, final CountDownLatch committed) { ProcedureCoordinator coordinator = Mockito .mock(ProcedureCoordinator.class); Mockito.mock(ProcedureCoordinator.class); @@ -332,12 +317,10 @@ public class TestZKProcedureControllers { Mockito.doAnswer(new Answer() { @Override public Void answer(InvocationOnMock invocation) throws Throwable { - dataFromMembers.add(memberData); committed.countDown(); return null; } - }).when(coordinator).memberFinishedBarrier(Mockito.eq(operationName), Mockito.anyString(), - Mockito.eq(memberData)); + }).when(coordinator).memberFinishedBarrier(Mockito.eq(operationName), Mockito.anyString()); return coordinator; } @@ -373,7 +356,7 @@ public class TestZKProcedureControllers { // verify that we got all the expected nodes for (String node : expected) { verify(coordinator, once).memberAcquiredBarrier(operationName, node); - verify(coordinator, once).memberFinishedBarrier(operationName, node, memberData); + verify(coordinator, once).memberFinishedBarrier(operationName, node); } }