HBASE-8783 RSSnapshotManager.ZKProcedureMemberRpcs may be initialized with the wrong server name

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1495946 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
mbertozzi 2013-06-24 08:15:56 +00:00
parent 3f7d269d8a
commit a809a7bdd0
7 changed files with 32 additions and 42 deletions

View File

@ -35,7 +35,7 @@ public interface ProcedureMemberRpcs extends Closeable {
/**
* Initialize and start any threads or connections the member needs.
*/
public void start(ProcedureMember member);
public void start(final String memberName, final ProcedureMember member);
/**
* Each subprocedure is being executed on a member. This is the identifier for the member.

View File

@ -164,7 +164,7 @@ public class ZKProcedureCoordinatorRpcs implements ProcedureCoordinatorRpcs {
this.coordinator = coordinator;
try {
this.zkProc = new ZKProcedureUtil(watcher, procedureType, coordName) {
this.zkProc = new ZKProcedureUtil(watcher, procedureType) {
@Override
public void nodeCreated(String path) {
if (!isInProcedurePath(path)) return;
@ -191,7 +191,7 @@ public class ZKProcedureCoordinatorRpcs implements ProcedureCoordinatorRpcs {
return false;
}
LOG.debug("Starting the controller for procedure member:" + zkProc.getMemberName());
LOG.debug("Starting the controller for procedure member:" + coordName);
return true;
}

View File

@ -55,24 +55,23 @@ import com.google.protobuf.InvalidProtocolBufferException;
@InterfaceAudience.Public
@InterfaceStability.Evolving
public class ZKProcedureMemberRpcs implements ProcedureMemberRpcs {
private static final Log LOG = LogFactory.getLog(ZKProcedureMemberRpcs.class);
private final String memberName;
private final ZKProcedureUtil zkController;
protected ProcedureMember member;
private ZKProcedureUtil zkController;
private String memberName;
/**
* Must call {@link #start(ProcedureMember)} before this can be used.
* Must call {@link #start(String, ProcedureMember)} before this can be used.
* @param watcher {@link ZooKeeperWatcher} to be owned by <tt>this</tt>. Closed via
* {@link #close()}.
* @param procType name of the znode describing the procedure type
* @param memberName name of the member to join the procedure
* @throws KeeperException if we can't reach zookeeper
*/
public ZKProcedureMemberRpcs(ZooKeeperWatcher watcher,
String procType, String memberName) throws KeeperException {
this.zkController = new ZKProcedureUtil(watcher, procType, memberName) {
public ZKProcedureMemberRpcs(final ZooKeeperWatcher watcher, final String procType)
throws KeeperException {
this.zkController = new ZKProcedureUtil(watcher, procType) {
@Override
public void nodeCreated(String path) {
if (!isInProcedurePath(path)) {
@ -114,7 +113,6 @@ public class ZKProcedureMemberRpcs implements ProcedureMemberRpcs {
}
}
};
this.memberName = memberName;
}
public ZKProcedureUtil getZkController() {
@ -337,9 +335,10 @@ public class ZKProcedureMemberRpcs implements ProcedureMemberRpcs {
}
}
public void start(ProcedureMember listener) {
public void start(final String memberName, final ProcedureMember listener) {
LOG.debug("Starting procedure member '" + this.memberName + "'");
this.member = listener;
this.memberName = memberName;
watchForAbortedProcedures();
waitForNewProcedures();
}

View File

@ -64,8 +64,6 @@ public abstract class ZKProcedureUtil
protected final String reachedZnode;
protected final String abortZnode;
protected final String memberName;
/**
* Top-level watcher/controller for procedures across the cluster.
* <p>
@ -74,13 +72,11 @@ public abstract class ZKProcedureUtil
* @param watcher watcher for the cluster ZK. Owned by <tt>this</tt> and closed via
* {@link #close()}
* @param procDescription name of the znode describing the procedure to run
* @param memberName name of the member from which we are interacting with running procedures
* @throws KeeperException when the procedure znodes cannot be created
*/
public ZKProcedureUtil(ZooKeeperWatcher watcher, String procDescription,
String memberName) throws KeeperException {
public ZKProcedureUtil(ZooKeeperWatcher watcher, String procDescription)
throws KeeperException {
super(watcher);
this.memberName = memberName;
// make sure we are listening for events
watcher.registerListener(this);
// setup paths for the zknodes used in procedures
@ -127,10 +123,6 @@ public abstract class ZKProcedureUtil
return acquiredZnode;
}
public String getMemberName() {
return memberName;
}
/**
* Get the full znode path for the node used by the coordinator to trigger a global barrier
* acquire on each subprocedure.
@ -189,7 +181,7 @@ public abstract class ZKProcedureUtil
return path.equals(acquiredZnode);
}
/**
* Is this in the procedure barrier acquired znode path
*/

View File

@ -119,9 +119,8 @@ public class RegionServerSnapshotManager {
throws KeeperException {
this.rss = rss;
ZooKeeperWatcher zkw = rss.getZooKeeper();
String nodeName = rss.getServerName().toString();
this.memberRpcs = new ZKProcedureMemberRpcs(zkw,
SnapshotManager.ONLINE_SNAPSHOT_CONTROLLER_DESCRIPTION, nodeName);
SnapshotManager.ONLINE_SNAPSHOT_CONTROLLER_DESCRIPTION);
// read in the snapshot request configuration properties
Configuration conf = rss.getConfiguration();
@ -129,7 +128,8 @@ public class RegionServerSnapshotManager {
int opThreads = conf.getInt(SNAPSHOT_REQUEST_THREADS_KEY, SNAPSHOT_REQUEST_THREADS_DEFAULT);
// create the actual snapshot procedure member
ThreadPoolExecutor pool = ProcedureMember.defaultPool(nodeName, opThreads, keepAlive);
ThreadPoolExecutor pool = ProcedureMember.defaultPool(rss.getServerName().toString(),
opThreads, keepAlive);
this.member = new ProcedureMember(memberRpcs, pool, new SnapshotSubprocedureBuilder());
}
@ -137,7 +137,8 @@ public class RegionServerSnapshotManager {
* Start accepting snapshot requests.
*/
public void start() {
this.memberRpcs.start(member);
LOG.debug("Start Snapshot Manager " + rss.getServerName().toString());
this.memberRpcs.start(rss.getServerName().toString(), member);
}
/**
@ -282,7 +283,7 @@ public class RegionServerSnapshotManager {
boolean hasTasks() {
return futures.size() != 0;
}
/**
* Submit a task to the pool.
*

View File

@ -145,11 +145,11 @@ public class TestZKProcedure {
// start each member
for (String member : members) {
ZooKeeperWatcher watcher = newZooKeeperWatcher();
ZKProcedureMemberRpcs comms = new ZKProcedureMemberRpcs(watcher, opDescription, member);
ZKProcedureMemberRpcs comms = new ZKProcedureMemberRpcs(watcher, opDescription);
ThreadPoolExecutor pool2 = ProcedureMember.defaultPool(member, 1, KEEP_ALIVE);
ProcedureMember procMember = new ProcedureMember(comms, pool2, subprocFactory);
procMembers.add(new Pair<ProcedureMember, ZKProcedureMemberRpcs>(procMember, comms));
comms.start(procMember);
comms.start(member, procMember);
}
// setup mock member subprocedures
@ -219,11 +219,11 @@ public class TestZKProcedure {
expected.size());
for (String member : expected) {
ZooKeeperWatcher watcher = newZooKeeperWatcher();
ZKProcedureMemberRpcs controller = new ZKProcedureMemberRpcs(watcher, opDescription, member);
ZKProcedureMemberRpcs controller = new ZKProcedureMemberRpcs(watcher, opDescription);
ThreadPoolExecutor pool2 = ProcedureMember.defaultPool(member, 1, KEEP_ALIVE);
ProcedureMember mem = new ProcedureMember(controller, pool2, subprocFactory);
members.add(new Pair<ProcedureMember, ZKProcedureMemberRpcs>(mem, controller));
controller.start(mem);
controller.start(member, mem);
}
// setup mock subprocedures
@ -311,7 +311,7 @@ public class TestZKProcedure {
try {
task.waitForCompleted();
} catch (ForeignException fe) {
// this may get caught or may not
// this may get caught or may not
}
// -------------

View File

@ -88,7 +88,7 @@ public class TestZKProcedureControllers {
final ForeignExceptionDispatcher monitor = spy(new ForeignExceptionDispatcher());
final ZKProcedureMemberRpcs controller = new ZKProcedureMemberRpcs(
watcher, "testSimple", COHORT_NODE_NAME);
watcher, "testSimple");
// mock out cohort member callbacks
final ProcedureMember member = Mockito
@ -112,7 +112,7 @@ public class TestZKProcedureControllers {
}).when(member).receivedReachedGlobalBarrier(operationName);
// start running the listener
controller.start(member);
controller.start(COHORT_NODE_NAME, member);
// set a prepare node from a 'coordinator'
String prepare = ZKProcedureUtil.getAcquireBarrierNode(controller.getZkController(), operationName);
@ -386,9 +386,8 @@ public class TestZKProcedureControllers {
List<ZKProcedureMemberRpcs> cohortControllers = new ArrayList<ZKProcedureMemberRpcs>();
for (String nodeName : expected) {
ZKProcedureMemberRpcs cc = new ZKProcedureMemberRpcs(
watcher, operationName, nodeName);
cc.start(member);
ZKProcedureMemberRpcs cc = new ZKProcedureMemberRpcs(watcher, operationName);
cc.start(nodeName, member);
cohortControllers.add(cc);
}
return new Pair<ZKProcedureCoordinatorRpcs, List<ZKProcedureMemberRpcs>>(
@ -411,9 +410,8 @@ public class TestZKProcedureControllers {
// make a cohort controller for each expected node
List<ZKProcedureMemberRpcs> cohortControllers = new ArrayList<ZKProcedureMemberRpcs>();
for (String nodeName : expected) {
ZKProcedureMemberRpcs cc = new ZKProcedureMemberRpcs(
watcher, operationName, nodeName);
cc.start(member);
ZKProcedureMemberRpcs cc = new ZKProcedureMemberRpcs(watcher, operationName);
cc.start(nodeName, member);
cohortControllers.add(cc);
}