HBASE-11201 Enable global procedure members to return values to procedure master

This commit is contained in:
Ted Yu 2014-05-29 02:56:01 +00:00
parent af1714c8be
commit cbd39422b4
13 changed files with 125 additions and 51 deletions

View File

@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.procedure;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.concurrent.Callable; import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
@ -105,6 +106,7 @@ public class Procedure implements Callable<Void>, ForeignExceptionListener {
private Object joinBarrierLock = new Object(); private Object joinBarrierLock = new Object();
private final List<String> acquiringMembers; private final List<String> acquiringMembers;
private final List<String> inBarrierMembers; private final List<String> inBarrierMembers;
private final HashMap<String, byte[]> dataFromFinishedMembers;
private ProcedureCoordinator coord; private ProcedureCoordinator coord;
/** /**
@ -125,6 +127,7 @@ public class Procedure implements Callable<Void>, ForeignExceptionListener {
this.coord = coord; this.coord = coord;
this.acquiringMembers = new ArrayList<String>(expectedMembers); this.acquiringMembers = new ArrayList<String>(expectedMembers);
this.inBarrierMembers = new ArrayList<String>(acquiringMembers.size()); this.inBarrierMembers = new ArrayList<String>(acquiringMembers.size());
this.dataFromFinishedMembers = new HashMap<String, byte[]>();
this.procName = procName; this.procName = procName;
this.args = args; this.args = args;
this.monitor = monitor; this.monitor = monitor;
@ -311,8 +314,9 @@ public class Procedure implements Callable<Void>, ForeignExceptionListener {
* Call back triggered by a individual member upon successful local in-barrier execution and * Call back triggered by a individual member upon successful local in-barrier execution and
* release * release
* @param member * @param member
* @param dataFromMember
*/ */
public void barrierReleasedByMember(String member) { public void barrierReleasedByMember(String member, byte[] dataFromMember) {
boolean removed = false; boolean removed = false;
synchronized (joinBarrierLock) { synchronized (joinBarrierLock) {
removed = this.inBarrierMembers.remove(member); removed = this.inBarrierMembers.remove(member);
@ -328,17 +332,20 @@ public class Procedure implements Callable<Void>, ForeignExceptionListener {
LOG.warn("Member: '" + member + "' released barrier for procedure'" + procName LOG.warn("Member: '" + member + "' released barrier for procedure'" + procName
+ "', but we weren't waiting on it to release!"); + "', but we weren't waiting on it to release!");
} }
dataFromFinishedMembers.put(member, dataFromMember);
} }
/** /**
* Waits until the entire procedure has globally completed, or has been aborted. If an * Waits until the entire procedure has globally completed, or has been aborted. If an
* exception is thrown the procedure may or not have run cleanup to trigger the completion latch * exception is thrown the procedure may or not have run cleanup to trigger the completion latch
* yet. * yet.
* @return data returned from procedure members upon successfully completing subprocedure.
* @throws ForeignException * @throws ForeignException
* @throws InterruptedException * @throws InterruptedException
*/ */
public void waitForCompleted() throws ForeignException, InterruptedException { public HashMap<String, byte[]> waitForCompleted() throws ForeignException, InterruptedException {
waitForLatch(completedLatch, monitor, wakeFrequency, procName + " completed"); waitForLatch(completedLatch, monitor, wakeFrequency, procName + " completed");
return dataFromFinishedMembers;
} }
/** /**

View File

@ -284,14 +284,15 @@ public class ProcedureCoordinator {
* via {@link Subprocedure#insideBarrier()}. * via {@link Subprocedure#insideBarrier()}.
* @param procName name of the subprocedure that finished * @param procName name of the subprocedure that finished
* @param member name of the member that executed and released its barrier * @param member name of the member that executed and released its barrier
* @param dataFromMember the data that the member returned along with the notification
*/ */
void memberFinishedBarrier(String procName, final String member) { void memberFinishedBarrier(String procName, final String member, byte[] dataFromMember) {
Procedure proc = procedures.get(procName); Procedure proc = procedures.get(procName);
if (proc == null) { if (proc == null) {
LOG.warn("Member '"+ member +"' is trying to release an unknown procedure '"+ procName +"'"); LOG.warn("Member '"+ member +"' is trying to release an unknown procedure '"+ procName +"'");
return; return;
} }
proc.barrierReleasedByMember(member); proc.barrierReleasedByMember(member, dataFromMember);
} }
/** /**

View File

@ -66,7 +66,8 @@ public interface ProcedureMemberRpcs extends Closeable {
* needed to be done under the global barrier. * needed to be done under the global barrier.
* *
* @param sub the specified {@link Subprocedure} * @param sub the specified {@link Subprocedure}
* @param data the data the member returns to the coordinator along with the notification
* @throws IOException if we can't reach the coordinator * @throws IOException if we can't reach the coordinator
*/ */
void sendMemberCompleted(Subprocedure sub) throws IOException; void sendMemberCompleted(Subprocedure sub, byte[] data) throws IOException;
} }

View File

@ -179,12 +179,12 @@ abstract public class Subprocedure implements Callable<Void> {
// semantics. // semantics.
LOG.debug("Subprocedure '" + barrierName + "' received 'reached' from coordinator."); LOG.debug("Subprocedure '" + barrierName + "' received 'reached' from coordinator.");
insideBarrier(); byte[] dataToCoordinator = insideBarrier();
LOG.debug("Subprocedure '" + barrierName + "' locally completed"); LOG.debug("Subprocedure '" + barrierName + "' locally completed");
rethrowException(); rethrowException();
// Ack that the member has executed and released local barrier // Ack that the member has executed and released local barrier
rpcs.sendMemberCompleted(this); rpcs.sendMemberCompleted(this, dataToCoordinator);
LOG.debug("Subprocedure '" + barrierName + "' has notified controller of completion"); LOG.debug("Subprocedure '" + barrierName + "' has notified controller of completion");
// make sure we didn't get an external exception // make sure we didn't get an external exception
@ -244,12 +244,13 @@ abstract public class Subprocedure implements Callable<Void> {
* has been satisfied. Continuing the previous example, a condition could be that all RS's * has been satisfied. Continuing the previous example, a condition could be that all RS's
* globally have been quiesced, and procedures that require this precondition could be * globally have been quiesced, and procedures that require this precondition could be
* implemented here. * implemented here.
* * The implementation should also collect the result of the subprocedure as data to be returned
* Users should override this method. If quiescense is not required, this can be a no-op * to the coordinator upon successful completion.
* * Users should override this method.
* @return the data the subprocedure wants to return to coordinator side.
* @throws ForeignException * @throws ForeignException
*/ */
abstract public void insideBarrier() throws ForeignException; abstract public byte[] insideBarrier() throws ForeignException;
/** /**
* Users should override this method. This implementation of this method should rollback and * Users should override this method. This implementation of this method should rollback and
@ -325,7 +326,9 @@ abstract public class Subprocedure implements Callable<Void> {
public void acquireBarrier() throws ForeignException {} public void acquireBarrier() throws ForeignException {}
@Override @Override
public void insideBarrier() throws ForeignException {} public byte[] insideBarrier() throws ForeignException {
return new byte[0];
}
@Override @Override
public void cleanup(Exception e) {} public void cleanup(Exception e) {}

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.hbase.procedure; package org.apache.hadoop.hbase.procedure;
import java.io.IOException; import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.Arrays; import java.util.Arrays;
import java.util.List; import java.util.List;
@ -120,11 +121,23 @@ public class ZKProcedureCoordinatorRpcs implements ProcedureCoordinatorRpcs {
for (String node : nodeNames) { for (String node : nodeNames) {
String znode = ZKUtil.joinZNode(reachedNode, node); String znode = ZKUtil.joinZNode(reachedNode, node);
if (ZKUtil.watchAndCheckExists(zkProc.getWatcher(), znode)) { if (ZKUtil.watchAndCheckExists(zkProc.getWatcher(), znode)) {
coordinator.memberFinishedBarrier(procName, node); byte[] dataFromMember = ZKUtil.getData(zkProc.getWatcher(), znode);
// ProtobufUtil.isPBMagicPrefix will check null
if (!ProtobufUtil.isPBMagicPrefix(dataFromMember)) {
throw new IOException(
"Failed to get data from finished node or data is illegally formatted: "
+ znode);
} else {
dataFromMember = Arrays.copyOfRange(dataFromMember, ProtobufUtil.lengthOfPBMagic(),
dataFromMember.length);
coordinator.memberFinishedBarrier(procName, node, dataFromMember);
}
} }
} }
} catch (KeeperException e) { } catch (KeeperException e) {
throw new IOException("Failed while creating reached node:" + reachedNode, e); throw new IOException("Failed while creating reached node:" + reachedNode, e);
} catch (InterruptedException e) {
throw new InterruptedIOException("Interrupted while creating reached node:" + reachedNode);
} }
} }
@ -177,8 +190,31 @@ public class ZKProcedureCoordinatorRpcs implements ProcedureCoordinatorRpcs {
// node was absent when we created the watch so zk event triggers the finished barrier. // node was absent when we created the watch so zk event triggers the finished barrier.
// TODO Nothing enforces that acquire and reached znodes from showing up in wrong order. // TODO Nothing enforces that acquire and reached znodes from showing up in wrong order.
coordinator.memberFinishedBarrier(ZKUtil.getNodeName(ZKUtil.getParent(path)), String procName = ZKUtil.getNodeName(ZKUtil.getParent(path));
ZKUtil.getNodeName(path)); String member = ZKUtil.getNodeName(path);
// get the data from the procedure member
try {
byte[] dataFromMember = ZKUtil.getData(watcher, path);
// ProtobufUtil.isPBMagicPrefix will check null
if (!ProtobufUtil.isPBMagicPrefix(dataFromMember)) {
ForeignException ee = new ForeignException(coordName,
"Failed to get data from finished node or data is illegally formatted:"
+ path);
coordinator.abortProcedure(procName, ee);
} else {
dataFromMember = Arrays.copyOfRange(dataFromMember, ProtobufUtil.lengthOfPBMagic(),
dataFromMember.length);
LOG.debug("Finished data from procedure '" + procName
+ "' member '" + member + "': " + new String(dataFromMember));
coordinator.memberFinishedBarrier(procName, member, dataFromMember);
}
} catch (KeeperException e) {
ForeignException ee = new ForeignException(coordName, e);
coordinator.abortProcedure(procName, ee);
} catch (InterruptedException e) {
ForeignException ee = new ForeignException(coordName, e);
coordinator.abortProcedure(procName, ee);
}
} else if (isAbortPathNode(path)) { } else if (isAbortPathNode(path)) {
abort(path); abort(path);
} else { } else {

View File

@ -257,16 +257,21 @@ public class ZKProcedureMemberRpcs implements ProcedureMemberRpcs {
} }
/** /**
* This acts as the ack for a completed snapshot * This acts as the ack for a completed procedure
*/ */
@Override @Override
public void sendMemberCompleted(Subprocedure sub) throws IOException { public void sendMemberCompleted(Subprocedure sub, byte[] data) throws IOException {
String procName = sub.getName(); String procName = sub.getName();
LOG.debug("Marking procedure '" + procName + "' completed for member '" + memberName LOG.debug("Marking procedure '" + procName + "' completed for member '" + memberName
+ "' in zk"); + "' in zk");
String joinPath = ZKUtil.joinZNode(zkController.getReachedBarrierNode(procName), memberName); String joinPath = ZKUtil.joinZNode(zkController.getReachedBarrierNode(procName), memberName);
// ProtobufUtil.prependPBMagic does not take care of null
if (data == null) {
data = new byte[0];
}
try { try {
ZKUtil.createAndFailSilent(zkController.getWatcher(), joinPath); ZKUtil.createAndFailSilent(zkController.getWatcher(), joinPath,
ProtobufUtil.prependPBMagic(data));
} catch (KeeperException e) { } catch (KeeperException e) {
member.controllerConnectionFailure("Failed to post zk node:" + joinPath member.controllerConnectionFailure("Failed to post zk node:" + joinPath
+ " to join procedure barrier.", new IOException(e)); + " to join procedure barrier.", new IOException(e));

View File

@ -114,8 +114,9 @@ public class FlushTableSubprocedure extends Subprocedure {
} }
@Override @Override
public void insideBarrier() throws ForeignException { public byte[] insideBarrier() throws ForeignException {
// No-Op // No-Op
return new byte[0];
} }
/** /**

View File

@ -133,8 +133,9 @@ public class FlushSnapshotSubprocedure extends Subprocedure {
* do a flush snapshot of every region on this rs from the target table. * do a flush snapshot of every region on this rs from the target table.
*/ */
@Override @Override
public void insideBarrier() throws ForeignException { public byte[] insideBarrier() throws ForeignException {
flushSnapshot(); flushSnapshot();
return new byte[0];
} }
/** /**

View File

@ -247,8 +247,9 @@ public class SimpleRSProcedureManager extends RegionServerProcedureManager {
* do a log roll. * do a log roll.
*/ */
@Override @Override
public void insideBarrier() throws ForeignException { public byte[] insideBarrier() throws ForeignException {
execute(); execute();
return new byte[0];
} }
/** /**

View File

@ -117,7 +117,7 @@ public class TestProcedure {
// or was not called here. // or was not called here.
// member: trigger global barrier release // member: trigger global barrier release
proc.barrierReleasedByMember(members.get(0)); proc.barrierReleasedByMember(members.get(0), new byte[0]);
// coordinator: wait for procedure to be completed // coordinator: wait for procedure to be completed
proc.completedProcedure.await(); proc.completedProcedure.await();
@ -168,8 +168,8 @@ public class TestProcedure {
verify(procspy).sendGlobalBarrierStart(); // old news verify(procspy).sendGlobalBarrierStart(); // old news
// member 1, 2: trigger global barrier release // member 1, 2: trigger global barrier release
procspy.barrierReleasedByMember(members.get(0)); procspy.barrierReleasedByMember(members.get(0), new byte[0]);
procspy.barrierReleasedByMember(members.get(1)); procspy.barrierReleasedByMember(members.get(1), new byte[0]);
// coordinator wait for procedure to be completed // coordinator wait for procedure to be completed
procspy.completedProcedure.await(); procspy.completedProcedure.await();

View File

@ -218,12 +218,12 @@ public class TestProcedureCoordinator {
// then do some fun where we commit before all nodes have prepared // then do some fun where we commit before all nodes have prepared
// "one" commits before anyone else is done // "one" commits before anyone else is done
ref.memberAcquiredBarrier(this.opName, this.cohort[0]); ref.memberAcquiredBarrier(this.opName, this.cohort[0]);
ref.memberFinishedBarrier(this.opName, this.cohort[0]); ref.memberFinishedBarrier(this.opName, this.cohort[0], new byte[0]);
// but "two" takes a while // but "two" takes a while
ref.memberAcquiredBarrier(this.opName, this.cohort[1]); ref.memberAcquiredBarrier(this.opName, this.cohort[1]);
// "three"jumps ahead // "three"jumps ahead
ref.memberAcquiredBarrier(this.opName, this.cohort[2]); ref.memberAcquiredBarrier(this.opName, this.cohort[2]);
ref.memberFinishedBarrier(this.opName, this.cohort[2]); ref.memberFinishedBarrier(this.opName, this.cohort[2], new byte[0]);
// and "four" takes a while // and "four" takes a while
ref.memberAcquiredBarrier(this.opName, this.cohort[3]); ref.memberAcquiredBarrier(this.opName, this.cohort[3]);
} }
@ -232,8 +232,8 @@ public class TestProcedureCoordinator {
BarrierAnswer commit = new BarrierAnswer(procName, cohort) { BarrierAnswer commit = new BarrierAnswer(procName, cohort) {
@Override @Override
public void doWork() { public void doWork() {
ref.memberFinishedBarrier(opName, this.cohort[1]); ref.memberFinishedBarrier(opName, this.cohort[1], new byte[0]);
ref.memberFinishedBarrier(opName, this.cohort[3]); ref.memberFinishedBarrier(opName, this.cohort[3], new byte[0]);
} }
}; };
runCoordinatedOperation(spy, prepare, commit, cohort); runCoordinatedOperation(spy, prepare, commit, cohort);
@ -343,7 +343,8 @@ public class TestProcedureCoordinator {
public void doWork() { public void doWork() {
if (cohort == null) return; if (cohort == null) return;
for (String member : cohort) { for (String member : cohort) {
TestProcedureCoordinator.this.coordinator.memberFinishedBarrier(opName, member); TestProcedureCoordinator.this.coordinator.memberFinishedBarrier(opName, member,
new byte[0]);
} }
} }
} }

View File

@ -146,7 +146,7 @@ public class TestProcedureMember {
order.verify(spy).acquireBarrier(); order.verify(spy).acquireBarrier();
order.verify(mockMemberComms).sendMemberAcquired(eq(spy)); order.verify(mockMemberComms).sendMemberAcquired(eq(spy));
order.verify(spy).insideBarrier(); order.verify(spy).insideBarrier();
order.verify(mockMemberComms).sendMemberCompleted(eq(spy)); order.verify(mockMemberComms).sendMemberCompleted(eq(spy), eq(data));
order.verify(mockMemberComms, never()).sendMemberAborted(eq(spy), order.verify(mockMemberComms, never()).sendMemberAborted(eq(spy),
any(ForeignException.class)); any(ForeignException.class));
} }
@ -181,7 +181,7 @@ public class TestProcedureMember {
// Later phases not run // Later phases not run
order.verify(mockMemberComms, never()).sendMemberAcquired(eq(spySub)); order.verify(mockMemberComms, never()).sendMemberAcquired(eq(spySub));
order.verify(spySub, never()).insideBarrier(); order.verify(spySub, never()).insideBarrier();
order.verify(mockMemberComms, never()).sendMemberCompleted(eq(spySub)); order.verify(mockMemberComms, never()).sendMemberCompleted(eq(spySub), eq(data));
// error recovery path exercised // error recovery path exercised
order.verify(spySub).cancel(anyString(), any(Exception.class)); order.verify(spySub).cancel(anyString(), any(Exception.class));
order.verify(spySub).cleanup(any(Exception.class)); order.verify(spySub).cleanup(any(Exception.class));
@ -217,7 +217,7 @@ public class TestProcedureMember {
// Later phases not run // Later phases not run
order.verify(spySub, never()).insideBarrier(); order.verify(spySub, never()).insideBarrier();
order.verify(mockMemberComms, never()).sendMemberCompleted(eq(spySub)); order.verify(mockMemberComms, never()).sendMemberCompleted(eq(spySub), eq(data));
// error recovery path exercised // error recovery path exercised
order.verify(spySub).cancel(anyString(), any(Exception.class)); order.verify(spySub).cancel(anyString(), any(Exception.class));
order.verify(spySub).cleanup(any(Exception.class)); order.verify(spySub).cleanup(any(Exception.class));
@ -260,7 +260,7 @@ public class TestProcedureMember {
order.verify(mockMemberComms).sendMemberAcquired(eq(spySub)); order.verify(mockMemberComms).sendMemberAcquired(eq(spySub));
// Later phases not run // Later phases not run
order.verify(spySub, never()).insideBarrier(); order.verify(spySub, never()).insideBarrier();
order.verify(mockMemberComms, never()).sendMemberCompleted(eq(spySub)); order.verify(mockMemberComms, never()).sendMemberCompleted(eq(spySub), eq(data));
// error recovery path exercised // error recovery path exercised
order.verify(spySub).cancel(anyString(), any(Exception.class)); order.verify(spySub).cancel(anyString(), any(Exception.class));
order.verify(spySub).cleanup(any(Exception.class)); order.verify(spySub).cleanup(any(Exception.class));
@ -301,7 +301,7 @@ public class TestProcedureMember {
order.verify(spySub).insideBarrier(); order.verify(spySub).insideBarrier();
// Later phases not run // Later phases not run
order.verify(mockMemberComms, never()).sendMemberCompleted(eq(spySub)); order.verify(mockMemberComms, never()).sendMemberCompleted(eq(spySub), eq(data));
// error recovery path exercised // error recovery path exercised
order.verify(spySub).cancel(anyString(), any(Exception.class)); order.verify(spySub).cancel(anyString(), any(Exception.class));
order.verify(spySub).cleanup(any(Exception.class)); order.verify(spySub).cleanup(any(Exception.class));
@ -329,7 +329,7 @@ public class TestProcedureMember {
Thread.sleep(WAKE_FREQUENCY); Thread.sleep(WAKE_FREQUENCY);
return null; return null;
} }
}).when(mockMemberComms).sendMemberCompleted(any(Subprocedure.class)); }).when(mockMemberComms).sendMemberCompleted(any(Subprocedure.class), eq(data));
// run the operation // run the operation
// build a new operation // build a new operation
@ -343,7 +343,7 @@ public class TestProcedureMember {
order.verify(spySub).acquireBarrier(); order.verify(spySub).acquireBarrier();
order.verify(mockMemberComms).sendMemberAcquired(eq(spySub)); order.verify(mockMemberComms).sendMemberAcquired(eq(spySub));
order.verify(spySub).insideBarrier(); order.verify(spySub).insideBarrier();
order.verify(mockMemberComms).sendMemberCompleted(eq(spySub)); order.verify(mockMemberComms).sendMemberCompleted(eq(spySub), eq(data));
// error recovery path exercised // error recovery path exercised
order.verify(spySub).cancel(anyString(), any(Exception.class)); order.verify(spySub).cancel(anyString(), any(Exception.class));
order.verify(spySub).cleanup(any(Exception.class)); order.verify(spySub).cleanup(any(Exception.class));

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.hbase.procedure; package org.apache.hadoop.hbase.procedure;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertArrayEquals;
import static org.mockito.Mockito.never; import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy; import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times; import static org.mockito.Mockito.times;
@ -60,6 +61,8 @@ public class TestZKProcedureControllers {
private static final String CONTROLLER_NODE_NAME = "controller"; private static final String CONTROLLER_NODE_NAME = "controller";
private static final VerificationMode once = Mockito.times(1); private static final VerificationMode once = Mockito.times(1);
private final byte[] memberData = new String("data from member").getBytes();
@BeforeClass @BeforeClass
public static void setupTest() throws Exception { public static void setupTest() throws Exception {
UTIL.startMiniZKCluster(); UTIL.startMiniZKCluster();
@ -105,7 +108,7 @@ public class TestZKProcedureControllers {
Mockito.doAnswer(new Answer<Void>() { Mockito.doAnswer(new Answer<Void>() {
@Override @Override
public Void answer(InvocationOnMock invocation) throws Throwable { public Void answer(InvocationOnMock invocation) throws Throwable {
controller.sendMemberCompleted(sub); controller.sendMemberCompleted(sub, memberData);
committed.countDown(); committed.countDown();
return null; return null;
} }
@ -176,9 +179,11 @@ public class TestZKProcedureControllers {
CountDownLatch prepared = new CountDownLatch(expected.size()); CountDownLatch prepared = new CountDownLatch(expected.size());
CountDownLatch committed = new CountDownLatch(expected.size()); CountDownLatch committed = new CountDownLatch(expected.size());
ArrayList<byte[]> dataFromMembers = new ArrayList<byte[]>();
// mock out coordinator so we can keep track of zk progress // mock out coordinator so we can keep track of zk progress
ProcedureCoordinator coordinator = setupMockCoordinator(operationName, ProcedureCoordinator coordinator = setupMockCoordinator(operationName,
prepared, committed); prepared, committed, dataFromMembers);
ProcedureMember member = Mockito.mock(ProcedureMember.class); ProcedureMember member = Mockito.mock(ProcedureMember.class);
@ -208,14 +213,20 @@ public class TestZKProcedureControllers {
// post the committed node for each expected node // post the committed node for each expected node
for (ZKProcedureMemberRpcs cc : cohortControllers) { for (ZKProcedureMemberRpcs cc : cohortControllers) {
cc.sendMemberCompleted(sub); cc.sendMemberCompleted(sub, memberData);
} }
// wait for all commit notifications to reach the coordinator // wait for all commit notifications to reach the coordinator
committed.await(); committed.await();
// make sure we got the all the nodes and no more // make sure we got the all the nodes and no more
Mockito.verify(coordinator, times(expected.size())).memberFinishedBarrier(Mockito.eq(operationName), Mockito.verify(coordinator, times(expected.size())).memberFinishedBarrier(Mockito.eq(operationName),
Mockito.anyString()); Mockito.anyString(), Mockito.eq(memberData));
assertEquals("Incorrect number of members returnd data", expected.size(),
dataFromMembers.size());
for (byte[] result : dataFromMembers) {
assertArrayEquals("Incorrect data from member", memberData, result);
}
controller.resetMembers(p); controller.resetMembers(p);
@ -244,9 +255,11 @@ public class TestZKProcedureControllers {
final CountDownLatch prepared = new CountDownLatch(expected.size()); final CountDownLatch prepared = new CountDownLatch(expected.size());
final CountDownLatch committed = new CountDownLatch(expected.size()); final CountDownLatch committed = new CountDownLatch(expected.size());
ArrayList<byte[]> dataFromMembers = new ArrayList<byte[]>();
// mock out coordinator so we can keep track of zk progress // mock out coordinator so we can keep track of zk progress
ProcedureCoordinator coordinator = setupMockCoordinator(operationName, ProcedureCoordinator coordinator = setupMockCoordinator(operationName,
prepared, committed); prepared, committed, dataFromMembers);
ProcedureMember member = Mockito.mock(ProcedureMember.class); ProcedureMember member = Mockito.mock(ProcedureMember.class);
Procedure p = Mockito.mock(Procedure.class); Procedure p = Mockito.mock(Procedure.class);
@ -281,14 +294,14 @@ public class TestZKProcedureControllers {
// post the committed node for each expected node // post the committed node for each expected node
for (ZKProcedureMemberRpcs cc : cohortControllers) { for (ZKProcedureMemberRpcs cc : cohortControllers) {
cc.sendMemberCompleted(sub); cc.sendMemberCompleted(sub, memberData);
} }
// wait for all commit notifications to reach the coordiantor // wait for all commit notifications to reach the coordiantor
committed.await(); committed.await();
// make sure we got the all the nodes and no more // make sure we got the all the nodes and no more
Mockito.verify(coordinator, times(expected.size())).memberFinishedBarrier(Mockito.eq(operationName), Mockito.verify(coordinator, times(expected.size())).memberFinishedBarrier(Mockito.eq(operationName),
Mockito.anyString()); Mockito.anyString(), Mockito.eq(memberData));
controller.resetMembers(p); controller.resetMembers(p);
@ -299,11 +312,13 @@ public class TestZKProcedureControllers {
} }
/** /**
* @param dataFromMembers
* @return a mock {@link ProcedureCoordinator} that just counts down the * @return a mock {@link ProcedureCoordinator} that just counts down the
* prepared and committed latch for called to the respective method * prepared and committed latch for called to the respective method
*/ */
private ProcedureCoordinator setupMockCoordinator(String operationName, private ProcedureCoordinator setupMockCoordinator(String operationName,
final CountDownLatch prepared, final CountDownLatch committed) { final CountDownLatch prepared, final CountDownLatch committed,
final ArrayList<byte[]> dataFromMembers) {
ProcedureCoordinator coordinator = Mockito ProcedureCoordinator coordinator = Mockito
.mock(ProcedureCoordinator.class); .mock(ProcedureCoordinator.class);
Mockito.mock(ProcedureCoordinator.class); Mockito.mock(ProcedureCoordinator.class);
@ -317,10 +332,12 @@ public class TestZKProcedureControllers {
Mockito.doAnswer(new Answer<Void>() { Mockito.doAnswer(new Answer<Void>() {
@Override @Override
public Void answer(InvocationOnMock invocation) throws Throwable { public Void answer(InvocationOnMock invocation) throws Throwable {
dataFromMembers.add(memberData);
committed.countDown(); committed.countDown();
return null; return null;
} }
}).when(coordinator).memberFinishedBarrier(Mockito.eq(operationName), Mockito.anyString()); }).when(coordinator).memberFinishedBarrier(Mockito.eq(operationName), Mockito.anyString(),
Mockito.eq(memberData));
return coordinator; return coordinator;
} }
@ -356,7 +373,7 @@ public class TestZKProcedureControllers {
// verify that we got all the expected nodes // verify that we got all the expected nodes
for (String node : expected) { for (String node : expected) {
verify(coordinator, once).memberAcquiredBarrier(operationName, node); verify(coordinator, once).memberAcquiredBarrier(operationName, node);
verify(coordinator, once).memberFinishedBarrier(operationName, node); verify(coordinator, once).memberFinishedBarrier(operationName, node, memberData);
} }
} }