HBASE-7212 Globally Barriered Procedure Mechanism

git-svn-id: https://svn.apache.org/repos/asf/hbase/branches/hbase-7290@1445818 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Jonathan Hsieh 2013-02-13 18:39:51 +00:00
parent 836f786f63
commit 8d117a6117
16 changed files with 4170 additions and 11 deletions

View File

@ -0,0 +1,360 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.procedure;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hbase.errorhandling.ForeignException;
import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher;
import org.apache.hadoop.hbase.errorhandling.ForeignExceptionListener;
import org.apache.hadoop.hbase.errorhandling.ForeignExceptionSnare;
import org.apache.hadoop.hbase.errorhandling.TimeoutExceptionInjector;
import com.google.common.collect.Lists;
/**
* A globally-barriered distributed procedure. This class encapsulates state and methods for
* tracking and managing a distributed procedure, as well as aborting if any member encounters
* a problem or if a cancellation is requested.
* <p>
* All procedures first attempt to reach a barrier point with the {@link #sendGlobalBarrierStart()}
* method. The procedure contacts all members and waits for all subprocedures to execute
* {@link Subprocedure#acquireBarrier} to acquire its local piece of the global barrier and then
* send acquisition info back to the coordinator. If all acquisitions at subprocedures succeed,
* the coordinator then will call {@link #sendGlobalBarrierReached()}. This notifies members to
* execute the {@link Subprocedure#insideBarrier()} method. The procedure is blocked until all
* {@link Subprocedure#insideBarrier} executions complete at the members. When
* {@link Subprocedure#insideBarrier} completes at each member, the member sends notification to
* the coordinator. Once all members complete, the coordinator calls
* {@link #sendGlobalBarrierComplete()}.
* <p>
* If errors are encountered remotely, they are forwarded to the coordinator, and
* {@link Subprocedure#cleanup(Exception)} is called.
* <p>
* Each Procedure and each Subprocedure enforces a time limit on the execution time. If the time
* limit expires before the procedure completes the {@link TimeoutExceptionInjector} will trigger
* an {@link ForeignException} to abort the procedure. This is particularly useful for situations
* when running a distributed {@link Subprocedure} so participants can avoid blocking for extreme
* amounts of time if one of the participants fails or takes a really long time (e.g. GC pause).
* <p>
* Users should generally not directly create or subclass instances of this. They are created
* for them implicitly via {@link ProcedureCoordinator#startProcedure(String, byte[], List)}}
*/
@InterfaceAudience.Public
@InterfaceStability.Evolving
public class Procedure implements Callable<Void>, ForeignExceptionListener {
private static final Log LOG = LogFactory.getLog(Procedure.class);
//
// Arguments and naming
//
// Name of the procedure
final private String procName;
// Arguments for this procedure execution
final private byte[] args;
//
// Execution State
//
/** latch for waiting until all members have acquire in barrier state */
final CountDownLatch acquiredBarrierLatch;
/** latch for waiting until all members have executed and released their in barrier state */
final CountDownLatch releasedBarrierLatch;
/** latch for waiting until a procedure has completed */
final CountDownLatch completedLatch;
/** monitor to check for errors */
private final ForeignExceptionDispatcher monitor;
//
// Execution Timeout Handling.
//
/** frequency to check for errors (ms) */
protected final long wakeFrequency;
protected final TimeoutExceptionInjector timeoutInjector;
//
// Members' and Coordinator's state
//
/** lock to prevent nodes from acquiring and then releasing before we can track them */
private Object joinBarrierLock = new Object();
private final List<String> acquiringMembers;
private final List<String> inBarrierMembers;
private ProcedureCoordinator coord;
/**
* Creates a procedure. (FOR TESTING)
*
* {@link Procedure} state to be run by a {@link ProcedureCoordinator}.
* @param coord coordinator to call back to for general errors (e.g.
* {@link ProcedureCoordinator#rpcConnectionFailure(String, IOException)}).
* @param monitor error monitor to check for external errors
* @param wakeFreq frequency to check for errors while waiting
* @param timeout amount of time to allow the procedure to run before cancelling
* @param procName name of the procedure instance
* @param args argument data associated with the procedure instance
* @param expectedMembers names of the expected members
*/
public Procedure(ProcedureCoordinator coord, ForeignExceptionDispatcher monitor, long wakeFreq,
long timeout, String procName, byte[] args, List<String> expectedMembers) {
this.coord = coord;
this.acquiringMembers = new ArrayList<String>(expectedMembers);
this.inBarrierMembers = new ArrayList<String>(acquiringMembers.size());
this.procName = procName;
this.args = args;
this.monitor = monitor;
this.wakeFrequency = wakeFreq;
int count = expectedMembers.size();
this.acquiredBarrierLatch = new CountDownLatch(count);
this.releasedBarrierLatch = new CountDownLatch(count);
this.completedLatch = new CountDownLatch(1);
this.timeoutInjector = new TimeoutExceptionInjector(monitor, timeout);
}
/**
* Create a procedure.
*
* Users should generally not directly create instances of this. They are created them
* implicitly via {@link ProcedureCoordinator#createProcedure(String, byte[], List)}}
*
* @param coord coordinator to call back to for general errors (e.g.
* {@link ProcedureCoordinator#rpcConnectionFailure(String, IOException)}).
* @param wakeFreq frequency to check for errors while waiting
* @param timeout amount of time to allow the procedure to run before cancelling
* @param procName name of the procedure instance
* @param args argument data associated with the procedure instance
* @param expectedMembers names of the expected members
*/
public Procedure(ProcedureCoordinator coord, long wakeFreq, long timeout,
String procName, byte[] args, List<String> expectedMembers) {
this(coord, new ForeignExceptionDispatcher(), wakeFreq, timeout, procName, args,
expectedMembers);
}
public String getName() {
return procName;
}
/**
* Get the ExternalErrorDispatcher
* @return the Procedure's monitor.
*/
public ForeignExceptionDispatcher getErrorMonitor() {
return monitor;
}
/**
* This call is the main execution thread of the barriered procedure. It sends messages and
* essentially blocks until all procedure members acquire or later complete but periodically
* checks for foreign exceptions.
*/
@Override
@SuppressWarnings("finally")
final public Void call() {
LOG.info("Starting procedure '" + procName + "'");
// start the timer
timeoutInjector.start();
// run the procedure
try {
// start by checking for error first
monitor.rethrowException();
LOG.debug("Procedure '" + procName + "' starting 'acquire'");
sendGlobalBarrierStart();
// wait for all the members to report acquisition
LOG.debug("Waiting for all members to 'acquire'");
waitForLatch(acquiredBarrierLatch, monitor, wakeFrequency, "acquired");
monitor.rethrowException();
LOG.debug("Procedure '" + procName + "' starting 'in-barrier' execution.");
sendGlobalBarrierReached();
// wait for all members to report barrier release
waitForLatch(releasedBarrierLatch, monitor, wakeFrequency, "released");
// make sure we didn't get an error during in barrier execution and release
monitor.rethrowException();
LOG.info("Procedure '" + procName + "' execution completed");
} catch (Exception e) {
if (e instanceof InterruptedException) {
Thread.currentThread().interrupt();
}
String msg = "Procedure '" + procName +"' execution failed!";
LOG.error(msg, e);
receive(new ForeignException(getName(), e));
} finally {
LOG.debug("Running finish phase.");
sendGlobalBarrierComplete();
completedLatch.countDown();
// tell the timer we are done, if we get here successfully
timeoutInjector.complete();
return null;
}
}
/**
* Sends a message to Members to create a new {@link Subprocedure} for this Procedure and execute
* the {@link Subprocedure#acquireBarrier} step.
* @throws ForeignException
*/
public void sendGlobalBarrierStart() throws ForeignException {
// start the procedure
LOG.debug("Starting procedure '" + procName + "', kicking off acquire phase on members.");
try {
// send procedure barrier start to specified list of members. cloning the list to avoid
// concurrent modification from the controller setting the prepared nodes
coord.getRpcs().sendGlobalBarrierAcquire(this, args, Lists.newArrayList(this.acquiringMembers));
} catch (IOException e) {
coord.rpcConnectionFailure("Can't reach controller.", e);
} catch (IllegalArgumentException e) {
throw new ForeignException(getName(), e);
}
}
/**
* Sends a message to all members that the global barrier condition has been satisfied. This
* should only be executed after all members have completed its
* {@link Subprocedure#acquireBarrier()} call successfully. This triggers the member
* {@link Subprocedure#insideBarrier} method.
* @throws ForeignException
*/
public void sendGlobalBarrierReached() throws ForeignException {
try {
// trigger to have member run {@link Subprocedure#insideBarrier}
coord.getRpcs().sendGlobalBarrierReached(this, Lists.newArrayList(inBarrierMembers));
} catch (IOException e) {
coord.rpcConnectionFailure("Can't reach controller.", e);
}
}
/**
* Sends a message to members that all {@link Subprocedure#inBarrier} calls have completed.
* After this executes, the coordinator can assume that any state resources about this barrier
* procedure state has been released.
*/
public void sendGlobalBarrierComplete() {
LOG.debug("Finished coordinator procedure - removing self from list of running procedures");
try {
coord.getRpcs().resetMembers(this);
} catch (IOException e) {
coord.rpcConnectionFailure("Failed to reset procedure:" + procName, e);
}
}
//
// Call backs from other external processes.
//
/**
* Call back triggered by an individual member upon successful local barrier acquisition
* @param member
*/
public void barrierAcquiredByMember(String member) {
LOG.debug("member: '" + member + "' joining prepared barrier for procedure '" + procName
+ "' on coordinator");
if (this.acquiringMembers.contains(member)) {
synchronized (joinBarrierLock) {
if (this.acquiringMembers.remove(member)) {
this.inBarrierMembers.add(member);
acquiredBarrierLatch.countDown();
}
}
LOG.debug("Waiting on: " + acquiredBarrierLatch + " remaining members to aqcuire global barrier");
} else {
LOG.warn("Member " + member + " joined barrier, but we weren't waiting on it to join." +
" Continuting on.");
}
}
/**
* Call back triggered by a individual member upon successful local in-barrier execution and
* release
* @param member
*/
public void barrierReleasedByMember(String member) {
boolean removed = false;
synchronized (joinBarrierLock) {
removed = this.inBarrierMembers.remove(member);
if (removed) {
releasedBarrierLatch.countDown();
}
}
if (removed) {
LOG.debug("Member: '" + member + "' released barrier for procedure'" + procName
+ "', counting down latch");
} else {
LOG.warn("Member: '" + member + "' released barrier for procedure'" + procName
+ "', but we weren't waiting on it to release!");
}
}
/**
* Waits until the entire procedure has globally completed, or has been aborted.
* @throws ForeignException
* @throws InterruptedException
*/
public void waitForCompleted() throws ForeignException, InterruptedException {
waitForLatch(completedLatch, monitor, wakeFrequency, procName + " completed");
}
/**
* A callback that handles incoming ExternalExceptions.
*/
@Override
public void receive(ForeignException e) {
monitor.receive(e);
}
/**
* Wait for latch to count to zero, ignoring any spurious wake-ups, but waking periodically to
* check for errors
* @param latch latch to wait on
* @param monitor monitor to check for errors while waiting
* @param wakeFrequency frequency to wake up and check for errors (in
* {@link TimeUnit#MILLISECONDS})
* @param latchDescription description of the latch, for logging
* @throws ForeignException type of error the monitor can throw, if the task fails
* @throws InterruptedException if we are interrupted while waiting on latch
*/
public static void waitForLatch(CountDownLatch latch, ForeignExceptionSnare monitor,
long wakeFrequency, String latchDescription) throws ForeignException,
InterruptedException {
boolean released = false;
while (!released) {
if (monitor != null) {
monitor.rethrowException();
}
ForeignExceptionDispatcher.LOG.debug("Waiting for '" + latchDescription + "' latch. (sleep:" + wakeFrequency + " ms)");
released = latch.await(wakeFrequency, TimeUnit.MILLISECONDS);
}
}
}

View File

@ -0,0 +1,241 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.procedure;
import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hbase.DaemonThreadFactory;
import org.apache.hadoop.hbase.errorhandling.ForeignException;
import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher;
import com.google.common.collect.MapMaker;
/**
* This is the master side of a distributed complex procedure execution.
* <p>
* The {@link Procedure} is generic and subclassing or customization shouldn't be
* necessary -- any customization should happen just in {@link Subprocedure}s.
*/
@InterfaceAudience.Public
@InterfaceStability.Evolving
public class ProcedureCoordinator {
private static final Log LOG = LogFactory.getLog(ProcedureCoordinator.class);
final static long TIMEOUT_MILLIS_DEFAULT = 60000;
final static long WAKE_MILLIS_DEFAULT = 500;
private final ProcedureCoordinatorRpcs rpcs;
private final ExecutorService pool;
// Running procedure table. Maps procedure name to running procedure reference
private final ConcurrentMap<String, Procedure> procedures =
new MapMaker().concurrencyLevel(4).weakValues().makeMap();
/**
* Create and start a ProcedureCoordinator.
*
* The rpc object registers the ProcedureCoordinator and starts any threads in this
* constructor.
*
* @param rpcs
* @param factory Builder for building Procedures
* @param pool Used for executing procedures.
*/
public ProcedureCoordinator(ProcedureCoordinatorRpcs rpcs, ThreadPoolExecutor pool) {
this.rpcs = rpcs;
this.pool = pool;
this.rpcs.start(this);
}
/**
* Default thread pool for the procedure
*/
public static ThreadPoolExecutor defaultPool(String coordName, long keepAliveTime, int opThreads,
long wakeFrequency) {
return new ThreadPoolExecutor(1, opThreads, keepAliveTime, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
new DaemonThreadFactory("(" + coordName + ")-proc-coordinator-pool"));
}
/**
* Shutdown the thread pools and release rpc resources
* @throws IOException
*/
public void close() throws IOException {
// have to use shutdown now to break any latch waiting
pool.shutdownNow();
rpcs.close();
}
/**
* Submit an procedure to kick off its dependent subprocedures.
* @param proc Procedure to execute
* @return <tt>true</tt> if the procedure was started correctly, <tt>false</tt> if the
* procedure or any subprocedures could not be started. Failure could be due to
* submitting a procedure multiple times (or one with the same name), or some sort
* of IO problem. On errors, the procedure's monitor holds a reference to the exception
* that caused the failure.
*/
boolean submitProcedure(Procedure proc) {
// if the submitted procedure was null, then we don't want to run it
if (proc == null) {
return false;
}
String procName = proc.getName();
// make sure we aren't already running an procedure of that name
synchronized (procedures) {
if (procedures.get(procName) != null) {
return false;
}
}
// kick off the procedure's execution in a separate thread
Future<Void> f = null;
try {
f = this.pool.submit(proc);
// if everything got started properly, we can add it known running procedures
synchronized (procedures) {
this.procedures.put(procName, proc);
}
return true;
} catch (RejectedExecutionException e) {
// the thread pool is full and we can't run the procedure
proc.receive(new ForeignException(procName, e));
// cancel procedure proactively
if (f != null) {
f.cancel(true);
}
}
return false;
}
/**
* The connection to the rest of the procedure group (members and coordinator) has been
* broken/lost/failed. This should fail any interested procedures, but not attempt to notify other
* members since we cannot reach them anymore.
* @param message description of the error
* @param cause the actual cause of the failure
*/
void rpcConnectionFailure(final String message, final IOException cause) {
Collection<Procedure> toNotify = procedures.values();
for (Procedure proc : toNotify) {
if (proc == null) {
continue;
}
// notify the elements, if they aren't null
proc.receive(new ForeignException(proc.getName(), cause));
}
}
/**
* Abort the procedure with the given name
* @param procName name of the procedure to abort
* @param reason serialized information about the abort
*/
public void abortProcedure(String procName, ForeignException reason) {
// if we know about the Procedure, notify it
synchronized(procedures) {
Procedure proc = procedures.get(procName);
if (proc == null) {
return;
}
proc.receive(reason);
}
}
/**
* Exposed for hooking with unit tests.
* @param procName
* @param procArgs
* @param expectedMembers
* @return
*/
Procedure createProcedure(ForeignExceptionDispatcher fed, String procName, byte[] procArgs,
List<String> expectedMembers) {
// build the procedure
return new Procedure(this, fed, WAKE_MILLIS_DEFAULT, TIMEOUT_MILLIS_DEFAULT,
procName, procArgs, expectedMembers);
}
/**
* Kick off the named procedure
* @param procName name of the procedure to start
* @param procArgs arguments for the procedure
* @param expectedMembers expected members to start
* @return handle to the running procedure, if it was started correctly, <tt>null</tt> otherwise
* @throws RejectedExecutionException if there are no more available threads to run the procedure
*/
public Procedure startProcedure(ForeignExceptionDispatcher fed, String procName, byte[] procArgs,
List<String> expectedMembers) throws RejectedExecutionException {
Procedure proc = createProcedure(fed, procName, procArgs, expectedMembers);
if (!this.submitProcedure(proc)) {
LOG.error("Failed to submit procedure '" + procName + "'");
return null;
}
return proc;
}
/**
* Notification that the procedure had the specified member acquired its part of the barrier
* via {@link Subprocedure#acquireBarrier()}.
* @param procName name of the procedure that acquired
* @param member name of the member that acquired
*/
void memberAcquiredBarrier(String procName, final String member) {
Procedure proc = procedures.get(procName);
if (proc != null) {
proc.barrierAcquiredByMember(member);
}
}
/**
* Notification that the procedure had another member finished executing its in-barrier subproc
* via {@link Subprocedure#insideBarrier()}.
* @param procName name of the subprocedure that finished
* @param member name of the member that executed and released its barrier
*/
void memberFinishedBarrier(String procName, final String member) {
Procedure proc = procedures.get(procName);
if (proc != null) {
proc.barrierReleasedByMember(member);
}
}
/**
* @return the rpcs implementation for all current procedures
*/
ProcedureCoordinatorRpcs getRpcs() {
return rpcs;
}
}

View File

@ -0,0 +1,85 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.procedure;
import java.io.Closeable;
import java.io.IOException;
import java.util.List;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hbase.errorhandling.ForeignException;
/**
* RPCs for the coordinator to run a barriered procedure with subprocedures executed at
* distributed members.
* @see ProcedureCoordinator
*/
@InterfaceAudience.Public
@InterfaceStability.Evolving
public interface ProcedureCoordinatorRpcs extends Closeable {
/**
* Initialize and start threads necessary to connect an implementation's rpc mechanisms.
* @param listener
* @return true if succeed, false if encountered initialization errors.
*/
public boolean start(final ProcedureCoordinator listener);
/**
* Notify the members that the coordinator has aborted the procedure and that it should release
* barrier resources.
*
* @param procName name of the procedure that was aborted
* @param cause the reason why the procedure needs to be aborted
* @throws IOException if the rpcs can't reach the other members of the procedure (and can't
* recover).
*/
public void sendAbortToMembers(Procedure procName, ForeignException cause) throws IOException;
/**
* Notify the members to acquire barrier for the procedure
*
* @param procName name of the procedure to start
* @param info information that should be passed to all members
* @param members names of the members requested to reach the acquired phase
* @throws IllegalArgumentException if the procedure was already marked as failed
* @throws IOException if we can't reach the remote notification mechanism
*/
public void sendGlobalBarrierAcquire(Procedure procName, byte[] info, List<String> members)
throws IOException, IllegalArgumentException;
/**
* Notify members that all members have acquired their parts of the barrier and that they can
* now execute under the global barrier.
*
* Must come after calling {@link #sendGlobalBarrierAcquire(String, byte[], List)}
*
* @param procName name of the procedure to start
* @param members members to tell we have reached in-barrier phase
* @throws IOException if we can't reach the remote notification mechanism
*/
public void sendGlobalBarrierReached(Procedure procName, List<String> members) throws IOException;
/**
* Notify Members to reset the distributed state for procedure
* @param procName name of the procedure to reset
* @throws IOException if the remote notification mechanism cannot be reached
*/
public void resetMembers(Procedure procName) throws IOException;
}

View File

@ -0,0 +1,230 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.procedure;
import java.io.Closeable;
import java.io.IOException;
import java.util.Collection;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hbase.DaemonThreadFactory;
import org.apache.hadoop.hbase.errorhandling.ForeignException;
import com.google.common.collect.MapMaker;
/**
* Process to kick off and manage a running {@link Subprocedure} on a member. This is the
* specialized part of a {@link Procedure} that actually does procedure type-specific work
* and reports back to the coordinator as it completes each phase.
* <p>
* If there is a connection error ({@link #controllerConnectionFailure(String, IOException)}), all
* currently running subprocedures are notify to failed since there is no longer a way to reach any
* other members or coordinators since the rpcs are down.
*/
@InterfaceAudience.Public
@InterfaceStability.Evolving
public class ProcedureMember implements Closeable {
private static final Log LOG = LogFactory.getLog(ProcedureMember.class);
private final SubprocedureFactory builder;
private final ProcedureMemberRpcs rpcs;
// private final WeakValueMapping<String, Subprocedure> subprocs = new WeakValueMapping<String, Subprocedure>();
private final ConcurrentMap<String,Subprocedure> subprocs = new MapMaker().concurrencyLevel(4).weakValues().makeMap();
private final ExecutorService pool;
/**
* Instantiate a new ProcedureMember. This is a slave that executes subprocedures.
*
* @param rpcs controller used to send notifications to the procedure coordinator
* @param pool thread pool to submit subprocedures
* @param factory class that creates instances of a subprocedure.
*/
public ProcedureMember(ProcedureMemberRpcs rpcs, ThreadPoolExecutor pool,
SubprocedureFactory factory) {
this.pool = pool;
this.rpcs = rpcs;
this.builder = factory;
}
public static ThreadPoolExecutor defaultPool(long wakeFrequency, long keepAlive,
int procThreads, String memberName) {
return new ThreadPoolExecutor(1, procThreads, keepAlive, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
new DaemonThreadFactory("( member-" + memberName + ") subprocedure-pool"));
}
/**
* Package exposed. Not for public use.
*
* @return reference to the Procedure member's rpcs object
*/
ProcedureMemberRpcs getRpcs() {
return rpcs;
}
/**
* This is separated from execution so that we can detect and handle the case where the
* subprocedure is invalid and inactionable due to bad info (like DISABLED snapshot type being
* sent here)
* @param opName
* @param data
* @return
*/
public Subprocedure createSubprocedure(String opName, byte[] data) {
return builder.buildSubprocedure(opName, data);
}
/**
* Submit an subprocedure for execution. This starts the local acquire phase.
* @param subproc the subprocedure to execute.
* @return <tt>true</tt> if the subprocedure was started correctly, <tt>false</tt> if it
* could not be started. In the latter case, the subprocedure holds a reference to
* the exception that caused the failure.
*/
public boolean submitSubprocedure(Subprocedure subproc) {
// if the submitted subprocedure was null, bail.
if (subproc == null) {
LOG.warn("Submitted null subprocedure, nothing to run here.");
return false;
}
String procName = subproc.getName();
if (procName == null || procName.length() == 0) {
LOG.error("Subproc name cannot be null or the empty string");
return false;
}
// make sure we aren't already running an subprocedure of that name
Subprocedure rsub;
synchronized (subprocs) {
rsub = subprocs.get(procName);
}
if (rsub != null) {
if (!rsub.isComplete()) {
LOG.error("Subproc '" + procName + "' is already running. Bailing out");
return false;
}
LOG.warn("A completed old subproc " + procName + " is still present, removing");
subprocs.remove(procName);
}
LOG.debug("Submitting new Subprocedure:" + procName);
// kick off the subprocedure
Future<Void> future = null;
try {
future = this.pool.submit(subproc);
synchronized (subprocs) {
subprocs.put(procName, subproc);
}
return true;
} catch (RejectedExecutionException e) {
// the thread pool is full and we can't run the subprocedure
String msg = "Subprocedure pool is full!";
subproc.cancel(msg, e.getCause());
// cancel all subprocedures proactively
if (future != null) {
future.cancel(true);
}
}
LOG.error("Failed to start subprocedure '" + procName + "'");
return false;
}
/**
* Notification that procedure coordinator has reached the global barrier
* @param procName name of the subprocedure that should start running the the in-barrier phase
*/
public void receivedReachedGlobalBarrier(String procName) {
Subprocedure subproc = subprocs.get(procName);
if (subproc == null) {
LOG.warn("Unexpected reached glabal barrier message for Procedure '" + procName + "'");
}
subproc.receiveReachedGlobalBarrier();
}
/**
* Best effort attempt to close the threadpool via Thread.interrupt.
*/
@Override
public void close() throws IOException {
// have to use shutdown now to break any latch waiting
pool.shutdownNow();
}
/**
* Shutdown the threadpool, and wait for upto timeoutMs millis before bailing
* @param timeoutMs timeout limit in millis
* @return true if successfully, false if bailed due to timeout.
* @throws InterruptedException
*/
public boolean closeAndWait(long timeoutMs) throws InterruptedException {
pool.shutdown();
return pool.awaitTermination(timeoutMs, TimeUnit.MILLISECONDS);
}
/**
* The connection to the rest of the procedure group (member and coordinator) has been
* broken/lost/failed. This should fail any interested subprocedure, but not attempt to notify
* other members since we cannot reach them anymore.
* @param message description of the error
* @param cause the actual cause of the failure
*
* TODO i'm tempted to just remove this code completely and treat it like any other abort.
* Implementation wise, if this happens it is a ZK failure which means the RS will abort.
*/
public void controllerConnectionFailure(final String message, final IOException cause) {
Collection<Subprocedure> toNotify = subprocs.values();
for (Subprocedure sub : toNotify) {
// TODO notify the elements, if they aren't null
LOG.error(message, cause);
sub.cancel(message, cause);
}
}
/**
* Send abort to the specified procedure
* @param procName name of the procedure to about
* @param reason serialized information about the abort
*/
public void receiveAbortProcedure(String procName, ForeignException ee) {
// if we know about the procedure, notify it
Subprocedure sub = subprocs.get(procName);
if (sub == null) {
LOG.info("Received abort on procedure with no local subprocedure " + procName +
", ignoring it.", ee);
return; // Procedure has already completed
}
LOG.error("Remote procedure failure, not propagating error:" + ee);
sub.monitor.receive(ee);
}
}

View File

@ -0,0 +1,73 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.procedure;
import java.io.Closeable;
import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hbase.errorhandling.ForeignException;
/**
* This is the notification interface for Procedures that encapsulates message passing from
* members to a coordinator. Each of these calls should send a message to the coordinator.
*/
@InterfaceAudience.Public
@InterfaceStability.Evolving
public interface ProcedureMemberRpcs extends Closeable {
/**
* Initialize and start any threads or connections the member needs.
*/
public void start(ProcedureMember member);
/**
* Each subprocedure is being executed on a member. This is the identifier for the member.
* @return the member name
*/
public String getMemberName();
/**
* Notify the coordinator that we aborted the specified {@link Subprocedure}
*
* @param sub the {@link Subprocedure} we are aborting
* @param cause the reason why the member's subprocedure aborted
* @throws IOException thrown when the rpcs can't reach the other members of the procedure (and
* thus can't recover).
*/
public void sendMemberAborted(Subprocedure sub, ForeignException cause) throws IOException;
/**
* Notify the coordinator that the specified {@link Subprocedure} has acquired the locally required
* barrier condition.
*
* @param sub the specified {@link Subprocedure}
* @throws IOException if we can't reach the coordinator
*/
public void sendMemberAcquired(Subprocedure sub) throws IOException;
/**
* Notify the coordinator that the specified {@link Subprocedure} has completed the work that
* needed to be done under the global barrier.
*
* @param sub the specified {@link Subprocedure}
* @throws IOException if we can't reach the coordinator
*/
public void sendMemberCompleted(Subprocedure sub) throws IOException;
}

View File

@ -0,0 +1,330 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.procedure;
import java.io.IOException;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.errorhandling.ForeignException;
import org.apache.hadoop.hbase.errorhandling.ForeignExceptionSnare;
import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher;
import org.apache.hadoop.hbase.errorhandling.ForeignExceptionListener;
import org.apache.hadoop.hbase.errorhandling.TimeoutExceptionInjector;
/**
* Distributed procedure member's Subprocedure. A procedure is sarted on a ProcedureCoordinator
* which communicates with ProcedureMembers who create and start its part of the Procedure. This
* sub part is called a Subprocedure
*
* Users should subclass this and implement {@link #acquireBarrier()} (get local barrier for this
* member), {@link #insideBarrier()} (execute while globally barriered and release barrier) and
* {@link #cleanup(Exception)} (release state associated with subprocedure.)
*
* When submitted to a ProcedureMemeber, the call method is executed in a separate thread.
* Latches are use too block its progress and trigger continuations when barrier conditions are
* met.
*
* Exceptions that make it out from calls to {@link #acquireBarrier()} or {@link #insideBarrier()}
* get converted into {@link ExternalExceptions}, which will get propagated to the
* {@link ProcedureCoordinator}.
*
* There is a category of procedure (ex: online-snapshots), and a user-specified instance-specific
* barrierName. (ex: snapshot121126).
*/
abstract public class Subprocedure implements Callable<Void> {
private static final Log LOG = LogFactory.getLog(Subprocedure.class);
// Name of the procedure
final private String barrierName;
//
// Execution state
//
/** wait on before allowing the in barrier phase to proceed */
private final CountDownLatch inGlobalBarrier;
/** counted down when the Subprocedure has completed */
private final CountDownLatch releasedLocalBarrier;
//
// Error handling
//
/** monitor to check for errors */
protected final ForeignExceptionDispatcher monitor;
/** frequency to check for errors (ms) */
protected final long wakeFrequency;
protected final TimeoutExceptionInjector executionTimeoutTimer;
protected final ProcedureMemberRpcs rpcs;
private volatile boolean complete = false;
/**
* @param member reference to the member managing this subprocedure
* @param procName name of the procedure this subprocedure is associated with
* @param monitor notified if there is an error in the subprocedure
* @param wakeFrequency time in millis to wake to check if there is an error via the monitor (in
* milliseconds).
* @param timeout time in millis that will trigger a subprocedure abort if it has not completed
*/
public Subprocedure(ProcedureMember member, String procName, ForeignExceptionDispatcher monitor,
long wakeFrequency, long timeout) {
// Asserts should be caught during unit testing
assert member != null : "procedure member should be non-null";
assert member.getRpcs() != null : "rpc handlers should be non-null";
assert procName != null : "procedure name should be non-null";
assert monitor != null : "monitor should be non-null";
// Default to a very large timeout
this.rpcs = member.getRpcs();
this.barrierName = procName;
this.monitor = monitor;
// forward any failures to coordinator. Since this is a dispatcher, resend loops should not be
// possible.
this.monitor.addListener(new ForeignExceptionListener() {
@Override
public void receive(ForeignException ee) {
// if this is a notification from a remote source, just log
if (ee.isRemote()) {
LOG.debug("Can't reach controller, not propagating error", ee);
return;
}
// if it is local, then send it to the coordinator
try {
rpcs.sendMemberAborted(Subprocedure.this, ee);
} catch (IOException e) {
// this will fail all the running procedures, since the connection is down
LOG.error("Can't reach controller, not propagating error", e);
}
}
});
this.wakeFrequency = wakeFrequency;
this.inGlobalBarrier = new CountDownLatch(1);
this.releasedLocalBarrier = new CountDownLatch(1);
// accept error from timer thread, this needs to be started.
this.executionTimeoutTimer = new TimeoutExceptionInjector(monitor, timeout);
}
public String getName() {
return barrierName;
}
public String getMemberName() {
return rpcs.getMemberName();
}
private void rethrowException() throws ForeignException {
monitor.rethrowException();
}
/**
* Execute the Subprocedure {@link #acquireBarrier()} and {@link #insideBarrier()} methods
* while keeping some state for other threads to access.
*
* This would normally be executed by the ProcedureMemeber when a acquire message comes from the
* coordinator. Rpcs are used to spend message back to the coordinator after different phases
* are executed. Any exceptions caught during the execution (except for InterrupedException) get
* converted and propagated to coordinator via {@link ProcedureMemberRpcs#sendAbort(Exception)}.
*/
@SuppressWarnings("finally")
final public Void call() {
LOG.debug("Starting subprocedure '" + barrierName + "' with timeout " + executionTimeoutTimer.getMaxTime() + "ms");
// start the execution timeout timer
executionTimeoutTimer.start();
try {
// start by checking for error first
rethrowException();
LOG.debug("Subprocedure '" + barrierName + "' starting 'acquire' stage");
acquireBarrier();
LOG.debug("Subprocedure '" + barrierName + "' locally acquired");
// vote yes to coordinator about being prepared
rpcs.sendMemberAcquired(this);
LOG.debug("Subprocedure '" + barrierName + "' coordinator notified of 'acquire', waiting on" +
" 'reached' or 'abort' from coordinator");
// wait for the procedure to reach global barrier before proceding
waitForReachedGlobalBarrier();
rethrowException(); // if Coordinator aborts, will bail from here with exception
// In traditional 2PC, if a member reaches this state the TX has been committed and the
// member is responsible for rolling forward and recovering and completing the subsequent
// operations in the case of failure. It cannot rollback.
//
// This implementation is not 2PC since it can still rollback here, and thus has different
// semantics.
LOG.debug("Subprocedure '" + barrierName + "' received 'reached' from coordinator.");
insideBarrier();
LOG.debug("Subprocedure '" + barrierName + "' locally completed");
// Ack that the member has executed and relased local barrier
rpcs.sendMemberCompleted(this);
LOG.debug("Subprocedure '" + barrierName + "' has notified controller of completion");
// make sure we didn't get an external exception
rethrowException();
LOG.debug("Subprocedure '" + barrierName + "' locally completed");
} catch (Exception e) {
String msg = null;
if (e instanceof InterruptedException) {
msg = "Procedure '" + barrierName + "' aborting due to interrupt!" +
" Likely due to pool shutdown.";
Thread.currentThread().interrupt();
} else if (e instanceof ForeignException) {
msg = "Subprocedure '" + barrierName + "' aborting due to external exception!";
} else {
msg = "Subprocedure '" + barrierName + "' failed!";
}
LOG.error(msg , e);
cancel(msg, e);
LOG.debug("Subprocedure '" + barrierName + "' Running cleanup.");
cleanup(e);
} finally {
releasedLocalBarrier.countDown();
// tell the timer we are done, if we get here successfully
executionTimeoutTimer.complete();
complete = true;
LOG.debug("Subprocedure '" + barrierName + "' completed.");
return null;
}
}
boolean isComplete() {
return complete;
}
/**
* exposed for testing.
*/
ForeignExceptionSnare getErrorCheckable() {
return this.monitor;
}
/**
* The implementation of this method should gather and hold required resources (locks, disk
* space, etc) to satisfy the Procedures barrier condition. For example, this would be where
* to make all the regions on a RS on the quiescent for an procedure that required all regions
* to be globally quiesed.
*
* Users should override this method. If a quiescent is not required, this is overkill but
* can still be used to execute a procedure on all members and to propagate any exceptions.
*
* @throws ForeignException
*/
abstract public void acquireBarrier() throws ForeignException;
/**
* The implementation of this method should act with the assumption that the barrier condition
* has been satisfied. Continuing the previous example, a condition could be that all RS's
* globally have been quiesced, and procedures that require this precondition could be
* implemented here.
*
* Users should override this method. If quiescense is not required, this can be a no-op
*
* @throws ForeignException
*/
abstract public void insideBarrier() throws ForeignException;
/**
* Users should override this method. This implementation of this method should rollback and
* cleanup any temporary or partially completed state that the {@link #acquireBarrier()} may have
* created.
* @param e
*/
abstract public void cleanup(Exception e);
/**
* Method to cancel the Subprocedure by injecting an exception from and external source.
* @param cause
*/
public void cancel(String msg, Throwable cause) {
LOG.error(msg, cause);
if (cause instanceof ForeignException) {
monitor.receive((ForeignException) cause);
} else {
monitor.receive(new ForeignException(getMemberName(), cause));
}
}
/**
* Callback for the member rpcs to call when the global barrier has been reached. This
* unblocks the main subprocedure exectuion thread so that the Subprocedure's
* {@link #insideBarrier()} method can be run.
*/
public void receiveReachedGlobalBarrier() {
inGlobalBarrier.countDown();
}
//
// Subprocedure Internal State interface
//
/**
* Wait for the reached global barrier notification.
*
* Package visibility for testing
*
* @throws ForeignException
* @throws InterruptedException
*/
void waitForReachedGlobalBarrier() throws ForeignException, InterruptedException {
Procedure.waitForLatch(inGlobalBarrier, monitor, wakeFrequency,
barrierName + ":remote acquired");
}
/**
* Waits until the entire procedure has globally completed, or has been aborted.
* @throws ForeignException
* @throws InterruptedException
*/
public void waitForLocallyCompleted() throws ForeignException, InterruptedException {
Procedure.waitForLatch(releasedLocalBarrier, monitor, wakeFrequency,
barrierName + ":completed");
}
/**
* Empty Subprocedure for testing.
*
* Must be public for stubbing used in testing to work.
*/
public static class SubprocedureImpl extends Subprocedure {
public SubprocedureImpl(ProcedureMember member, String opName,
ForeignExceptionDispatcher monitor, long wakeFrequency, long timeout) {
super(member, opName, monitor, wakeFrequency, timeout);
}
@Override
public void acquireBarrier() throws ForeignException {}
@Override
public void insideBarrier() throws ForeignException {}
@Override
public void cleanup(Exception e) {}
};
}

View File

@ -0,0 +1,40 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.procedure;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
/**
* Task builder to build instances of a {@link ProcedureMember}'s {@link Subporocedure}s.
*/
@InterfaceAudience.Public
@InterfaceStability.Evolving
public interface SubprocedureFactory {
/**
* Build {@link Subprocedure} when requested.
* @param procName name of the procedure associated with this subprocedure
* @param procArgs arguments passed from the coordinator about the procedure
* @return {@link Subprocedure} to run or <tt>null</tt> if the no operation should be run
* @throws IllegalArgumentException if the operation could not be run because of errors in the
* request
* @throws IllegalStateException if the current runner cannot accept any more new requests
*/
public Subprocedure buildSubprocedure(String procName, byte[] procArgs);
}

View File

@ -0,0 +1,265 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.procedure;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hbase.errorhandling.ForeignException;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.apache.zookeeper.KeeperException;
import com.google.protobuf.InvalidProtocolBufferException;
/**
* ZooKeeper based {@link ProcedureCoordinatorRpcs} for a {@link ProcedureCoordinator}
*/
@InterfaceAudience.Public
@InterfaceStability.Evolving
public class ZKProcedureCoordinatorRpcs implements ProcedureCoordinatorRpcs {
public static final Log LOG = LogFactory.getLog(ZKProcedureUtil.class);
private ZKProcedureUtil zkProc = null;
protected ProcedureCoordinator coordinator = null; // if started this should be non-null
ZooKeeperWatcher watcher;
String procedureType;
String coordName;
/**
* @param watcher zookeeper watcher. Owned by <tt>this</tt> and closed via {@link #close()}
* @param procedureType procedure type name is a category for when there are multiple kinds of
* procedures.-- this becomes a znode so be aware of the naming restrictions
* @param coordName name of the node running the coordinator
* @throws KeeperException if an unexpected zk error occurs
*/
public ZKProcedureCoordinatorRpcs(ZooKeeperWatcher watcher,
String procedureClass, String coordName) throws KeeperException {
this.watcher = watcher;
this.procedureType = procedureClass;
this.coordName = coordName;
}
/**
* The "acquire" phase. The coordinator creates a new procType/acquired/ znode dir. If znodes
* appear, first acquire to relevant listener or sets watch waiting for notification of
* the acquire node
*
* @throws IOException if any failure occurs.
*/
@Override
final public void sendGlobalBarrierAcquire(Procedure proc, byte[] info, List<String> nodeNames)
throws IOException, IllegalArgumentException {
String procName = proc.getName();
// start watching for the abort node
String abortNode = zkProc.getAbortZNode(procName);
try {
// check to see if the abort node already exists
if (ZKUtil.watchAndCheckExists(zkProc.getWatcher(), abortNode)) {
abort(abortNode);
}
// If we get an abort node watch triggered here, we'll go complete creating the acquired
// znode but then handle the acquire znode and bail out
} catch (KeeperException e) {
LOG.error("Failed to create abort", e);
throw new IOException("Failed while watching abort node:" + abortNode, e);
}
// create the acquire barrier
String acquire = zkProc.getAcquiredBarrierNode(procName);
LOG.debug("Creating acquire znode:" + acquire);
try {
// notify all the procedure listeners to look for the acquire node
byte[] data = ProtobufUtil.prependPBMagic(info);
ZKUtil.createWithParents(zkProc.getWatcher(), acquire, data);
// loop through all the children of the acquire phase and watch for them
for (String node : nodeNames) {
String znode = ZKUtil.joinZNode(acquire, node);
LOG.debug("Watching for acquire node:" + znode);
if (ZKUtil.watchAndCheckExists(zkProc.getWatcher(), znode)) {
coordinator.memberAcquiredBarrier(procName, node);
}
}
} catch (KeeperException e) {
throw new IOException("Failed while creating acquire node:" + acquire, e);
}
}
@Override
public void sendGlobalBarrierReached(Procedure proc, List<String> nodeNames) throws IOException {
String procName = proc.getName();
String reachedNode = zkProc.getReachedBarrierNode(procName);
LOG.debug("Creating reached barrier zk node:" + reachedNode);
try {
// create the reached znode and watch for the reached znodes
ZKUtil.createWithParents(zkProc.getWatcher(), reachedNode);
// loop through all the children of the acquire phase and watch for them
for (String node : nodeNames) {
String znode = ZKUtil.joinZNode(reachedNode, node);
if (ZKUtil.watchAndCheckExists(zkProc.getWatcher(), znode)) {
coordinator.memberFinishedBarrier(procName, node);
}
}
} catch (KeeperException e) {
throw new IOException("Failed while creating reached node:" + reachedNode, e);
}
}
/**
* Delete znodes that are no longer in use.
*/
@Override
final public void resetMembers(Procedure proc) throws IOException {
String procName = proc.getName();
boolean stillGettingNotifications = false;
do {
try {
LOG.debug("Attempting to clean out zk node for op:" + procName);
zkProc.clearZNodes(procName);
stillGettingNotifications = false;
} catch (KeeperException.NotEmptyException e) {
// recursive delete isn't transactional (yet) so we need to deal with cases where we get
// children trickling in
stillGettingNotifications = true;
} catch (KeeperException e) {
throw new IOException("Failed to complete reset procedure " + procName, e);
}
} while (stillGettingNotifications);
}
/**
* Start monitoring znodes in ZK - subclass hook to start monitoring znodes they are about.
* @return true if succeed, false if encountered initialization errors.
*/
final public boolean start(final ProcedureCoordinator listener) {
if (this.coordinator != null) {
throw new IllegalStateException("ZKProcedureCoordinator already started and already has listener installed");
}
this.coordinator = listener;
try {
this.zkProc = new ZKProcedureUtil(watcher, procedureType, coordName) {
@Override
public void nodeCreated(String path) {
if (!zkProc.isInProcedurePath(path)) return;
LOG.debug("Node created: " + path);
logZKTree(this.baseZNode);
if (zkProc.isAcquiredPathNode(path)) {
// node wasn't present when we created the watch so zk event triggers acquire
listener.memberAcquiredBarrier(ZKUtil.getNodeName(ZKUtil.getParent(path)), ZKUtil.getNodeName(path));
}
if (zkProc.isReachedPathNode(path)) {
// node wasn't present when we created the watch so zk event triggers the finished barrier.
// TODO Nothing enforces that acquire and reached znodes from showing up in the wrong order.
listener.memberFinishedBarrier(ZKUtil.getNodeName(ZKUtil.getParent(path)), ZKUtil.getNodeName(path));
}
if (zkProc.isAbortPathNode(path)) {
abort(path);
}
}
};
zkProc.clearChildZNodes();
} catch (KeeperException e) {
LOG.error("Unable to start the ZK-based Procedure Coordinator rpcs.", e);
return false;
}
LOG.debug("Starting the controller for procedure member:" + zkProc.getMemberName());
return true;
}
/**
* This is the abort message being sent by the coordinator to member
*
* TODO this code isn't actually used but can be used to issue a cancellation from the
* coordinator.
*/
@Override
final public void sendAbortToMembers(Procedure proc, ForeignException ee) {
String procName = proc.getName();
LOG.debug("Aborting procedure '" + procName + "' in zk");
String procAbortNode = zkProc.getAbortZNode(procName);
try {
LOG.debug("Creating abort znode:" + procAbortNode);
String source = (ee.getSource() == null) ? coordName : ee.getSource();
byte[] errorInfo = ProtobufUtil.prependPBMagic(ForeignException.serialize(source, ee));
// first create the znode for the procedure
ZKUtil.createAndFailSilent(zkProc.getWatcher(), procAbortNode, errorInfo);
LOG.debug("Finished creating abort node:" + procAbortNode);
} catch (KeeperException e) {
// possible that we get this error for the procedure if we already reset the zk state, but in
// that case we should still get an error for that procedure anyways
zkProc.logZKTree(zkProc.baseZNode);
coordinator.rpcConnectionFailure("Failed to post zk node:" + procAbortNode
+ " to abort procedure '" + procName + "'", new IOException(e));
}
}
/**
* Receive a notification and propagate it to the local coordinator
* @param abortNode full znode path to the failed procedure information
*/
protected void abort(String abortNode) {
String procName = ZKUtil.getNodeName(abortNode);
ForeignException ee = null;
try {
byte[] data = ZKUtil.getData(zkProc.getWatcher(), abortNode);
if (!ProtobufUtil.isPBMagicPrefix(data)) {
LOG.warn("Got an error notification for op:" + abortNode
+ " but we can't read the information. Killing the procedure.");
// we got a remote exception, but we can't describe it
ee = new ForeignException(coordName, "Data in abort node is illegally formatted. ignoring content.");
} else {
data = Arrays.copyOfRange(data, ProtobufUtil.lengthOfPBMagic(), data.length);
ee = ForeignException.deserialize(data);
}
} catch (InvalidProtocolBufferException e) {
LOG.warn("Got an error notification for op:" + abortNode
+ " but we can't read the information. Killing the procedure.");
// we got a remote exception, but we can't describe it
ee = new ForeignException(coordName, e);
} catch (KeeperException e) {
coordinator.rpcConnectionFailure("Failed to get data for abort node:" + abortNode
+ zkProc.getAbortZnode(), new IOException(e));
}
coordinator.abortProcedure(procName, ee);
}
@Override
final public void close() throws IOException {
zkProc.close();
}
/**
* Used in testing
*/
final ZKProcedureUtil getZkProcedureUtil() {
return zkProc;
}
}

View File

@ -0,0 +1,345 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.procedure;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hbase.errorhandling.ForeignException;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.apache.zookeeper.KeeperException;
import com.google.protobuf.InvalidProtocolBufferException;
/**
* ZooKeeper based controller for a procedure member.
* <p>
* There can only be one {@link ZKProcedureMemberRpcs} per procedure type per member,
* since each procedure type is bound to a single set of znodes. You can have multiple
* {@link ZKProcedureMemberRpcs} on the same server, each serving a different member
* name, but each individual rpcs is still bound to a single member name (and since they are
* used to determine global progress, its important to not get this wrong).
* <p>
* To make this slightly more confusing, you can run multiple, concurrent procedures at the same
* time (as long as they have different types), from the same controller, but the same node name
* must be used for each procedure (though there is no conflict between the two procedure as long
* as they have distinct names).
* <p>
* There is no real error recovery with this mechanism currently -- if any the coordinator fails,
* its re-initialization will delete the znodes and require all in progress subprocedures to start
* anew.
*/
@InterfaceAudience.Public
@InterfaceStability.Evolving
public class ZKProcedureMemberRpcs implements ProcedureMemberRpcs {
private static final Log LOG = LogFactory.getLog(ZKProcedureMemberRpcs.class);
private final String memberName;
protected ProcedureMember member;
private ZKProcedureUtil zkController;
/**
* Must call {@link #start(ProcedureMember)} before this is can be used.
* @param watcher {@link ZooKeeperWatcher} to be owned by <tt>this</tt>. Closed via
* {@link #close()}.
* @param procType name of the znode describing the procedure type
* @param memberName name of the member to join the procedure
* @throws KeeperException if we can't reach zookeeper
*/
public ZKProcedureMemberRpcs(ZooKeeperWatcher watcher,
String procType, String memberName) throws KeeperException {
this.zkController = new ZKProcedureUtil(watcher, procType, memberName) {
@Override
public void nodeCreated(String path) {
if (path.startsWith(this.baseZNode)) {
LOG.info("Received created event:" + path);
// if it is a simple start/end/abort then we just rewatch the node
if (path.equals(this.acquiredZnode)) {
waitForNewProcedures();
return;
} else if (path.equals(this.abortZnode)) {
watchForAbortedProcedures();
return;
}
String parent = ZKUtil.getParent(path);
// if its the end barrier, the procedure can be completed
if (parent.equals(this.reachedZnode)) {
recievedReachedGlobalBarrier(path);
return;
} else if (parent.equals(this.abortZnode)) {
abort(path);
return;
} else if (parent.equals(this.acquiredZnode)) {
startNewSubprocedure(path);
} else {
LOG.debug("Ignoring created notification for node:" + path);
}
}
}
@Override
public void nodeChildrenChanged(String path) {
LOG.info("Received children changed event:" + path);
if (path.equals(this.acquiredZnode)) {
LOG.info("Recieved start event.");
waitForNewProcedures();
} else if (path.equals(this.abortZnode)) {
LOG.info("Recieved abort event.");
watchForAbortedProcedures();
}
}
};
this.memberName = memberName;
}
public ZKProcedureUtil getZkController() {
return zkController;
}
public void start() {
LOG.debug("Starting the procedure member");
watchForAbortedProcedures();
waitForNewProcedures();
}
@Override
public String getMemberName() {
return memberName;
}
/**
* Pass along the procedure global barrier notification to any listeners
* @param path full znode path that cause the notification
*/
private void recievedReachedGlobalBarrier(String path) {
LOG.debug("Recieved reached global barrier:" + path);
String procName = ZKUtil.getNodeName(path);
this.member.receivedReachedGlobalBarrier(procName);
}
private void watchForAbortedProcedures() {
LOG.debug("Checking for aborted procedures on node: '" + zkController.getAbortZnode() + "'");
try {
// this is the list of the currently aborted procedues
for (String node : ZKUtil.listChildrenAndWatchForNewChildren(zkController.getWatcher(), zkController.getAbortZnode())) {
String abortNode = ZKUtil.joinZNode(zkController.getAbortZnode(), node);
abort(abortNode);
}
} catch (KeeperException e) {
member.controllerConnectionFailure("Failed to list children for abort node:"
+ zkController.getAbortZnode(), new IOException(e));
}
}
private void waitForNewProcedures() {
// watch for new procedues that we need to start subprocedures for
LOG.debug("Looking for new procedures under znode: '" + zkController.getAcquiredBarrier() + "'");
List<String> runningProcedure = null;
try {
runningProcedure = ZKUtil.listChildrenAndWatchForNewChildren(zkController.getWatcher(), zkController.getAcquiredBarrier());
if (runningProcedure == null) {
LOG.debug("No running procedures.");
return;
}
} catch (KeeperException e) {
member.controllerConnectionFailure("General failure when watching for new procedures",
new IOException(e));
}
for (String procName : runningProcedure) {
// then read in the procedure information
String path = ZKUtil.joinZNode(zkController.getAcquiredBarrier(), procName);
startNewSubprocedure(path);
}
}
/**
* Kick off a new procedure on the listener with the data stored in the passed znode.
* <p>
* Will attempt to create the same procedure multiple times if an procedure znode with the same
* name is created. It is left up the coordinator to ensure this doesn't occur.
* @param path full path to the znode for the procedure to start
*/
private synchronized void startNewSubprocedure(String path) {
LOG.debug("Found procedure znode: " + path);
String opName = ZKUtil.getNodeName(path);
// start watching for an abort notification for the procedure
String abortZNode = zkController.getAbortZNode(opName);
try {
if (ZKUtil.watchAndCheckExists(zkController.getWatcher(), abortZNode)) {
LOG.debug("Not starting:" + opName + " because we already have an abort notification.");
return;
}
} catch (KeeperException e) {
member.controllerConnectionFailure("Failed to get the abort znode (" + abortZNode
+ ") for procedure :" + opName, new IOException(e));
return;
}
// get the data for the procedure
Subprocedure subproc = null;
try {
byte[] data = ZKUtil.getData(zkController.getWatcher(), path);
LOG.debug("start proc data length is " + data.length);
if (!ProtobufUtil.isPBMagicPrefix(data)) {
String msg = "Data in for starting procuedure " + opName + " is illegally formatted. "
+ "Killing the procedure.";
LOG.error(msg);
throw new IllegalArgumentException(msg);
}
data = Arrays.copyOfRange(data, ProtobufUtil.lengthOfPBMagic(), data.length);
LOG.debug("Found data for znode:" + path);
subproc = member.createSubprocedure(opName, data);
member.submitSubprocedure(subproc);
} catch (IllegalArgumentException iae ) {
LOG.error("Illegal argument exception", iae);
sendMemberAborted(subproc, new ForeignException(getMemberName(), iae));
} catch (IllegalStateException ise) {
LOG.error("Illegal state exception ", ise);
sendMemberAborted(subproc, new ForeignException(getMemberName(), ise));
} catch (KeeperException e) {
member.controllerConnectionFailure("Failed to get data for new procedure:" + opName,
new IOException(e));
}
}
/**
* This attempts to create an acquired state znode for the procedure (snapshot name).
*
* It then looks for the reached znode to trigger in-barrier execution. If not present we
* have a watcher, if present then trigger the in-barrier action.
*/
@Override
public void sendMemberAcquired(Subprocedure sub) throws IOException {
String procName = sub.getName();
try {
LOG.debug("Member: '" + memberName + "' joining acquired barrier for procedure (" + procName
+ ") in zk");
String acquiredZNode = ZKUtil.joinZNode(ZKProcedureUtil.getAcquireBarrierNode(zkController, procName), memberName);
ZKUtil.createAndFailSilent(zkController.getWatcher(), acquiredZNode);
// watch for the complete node for this snapshot
String reachedBarrier = zkController.getReachedBarrierNode(procName);
LOG.debug("Watch for global barrier reached:" + reachedBarrier);
if (ZKUtil.watchAndCheckExists(zkController.getWatcher(), reachedBarrier)) {
recievedReachedGlobalBarrier(reachedBarrier);
}
} catch (KeeperException e) {
member.controllerConnectionFailure("Failed to acquire barrier for procedure: "
+ procName + " and member: " + memberName, new IOException(e));
}
}
/**
* This acts as the ack for a completed
*/
@Override
public void sendMemberCompleted(Subprocedure sub) throws IOException {
String procName = sub.getName();
LOG.debug("Marking procedure '" + procName + "' completed for member '" + memberName
+ "' in zk");
String joinPath = ZKUtil.joinZNode(zkController.getReachedBarrierNode(procName), memberName);
try {
ZKUtil.createAndFailSilent(zkController.getWatcher(), joinPath);
} catch (KeeperException e) {
member.controllerConnectionFailure("Failed to post zk node:" + joinPath
+ " to join procedure barrier.", new IOException(e));
}
}
/**
* This should be called by the member and should write a serialized root cause exception as
* to the abort znode.
*/
@Override
public void sendMemberAborted(Subprocedure sub, ForeignException ee) {
if (sub == null) {
LOG.error("Failed due to null subprocedure", ee);
}
String procName = sub.getName();
LOG.debug("Aborting procedure (" + procName + ") in zk");
String procAbortZNode = zkController.getAbortZNode(procName);
try {
LOG.debug("Creating abort znode:" + procAbortZNode);
String source = (ee.getSource() == null) ? memberName: ee.getSource();
byte[] errorInfo = ProtobufUtil.prependPBMagic(ForeignException.serialize(source, ee));
ZKUtil.createAndFailSilent(zkController.getWatcher(), procAbortZNode, errorInfo);
LOG.debug("Finished creating abort znode:" + procAbortZNode);
} catch (KeeperException e) {
// possible that we get this error for the procedure if we already reset the zk state, but in
// that case we should still get an error for that procedure anyways
zkController.logZKTree(zkController.getBaseZnode());
member.controllerConnectionFailure("Failed to post zk node:" + procAbortZNode
+ " to abort procedure", new IOException(e));
}
}
/**
* Pass along the found abort notification to the listener
* @param abortZNode full znode path to the failed procedure information
*/
protected void abort(String abortZNode) {
String opName = ZKUtil.getNodeName(abortZNode);
try {
byte[] data = ZKUtil.getData(zkController.getWatcher(), abortZNode);
// figure out the data we need to pass
ForeignException ee;
try {
if (!ProtobufUtil.isPBMagicPrefix(data)) {
String msg = "Illegally formatted data in abort node for proc " + opName
+ ". Killing the procedure.";
LOG.error(msg);
// we got a remote exception, but we can't describe it so just return exn from here
ee = new ForeignException(getMemberName(), new IllegalArgumentException(msg));
}
data = Arrays.copyOfRange(data, ProtobufUtil.lengthOfPBMagic(), data.length);
ee = ForeignException.deserialize(data);
} catch (InvalidProtocolBufferException e) {
LOG.warn("Got an error notification for op:" + opName
+ " but we can't read the information. Killing the procedure.");
// we got a remote exception, but we can't describe it so just return exn from here
ee = new ForeignException(getMemberName(), e);
}
this.member.receiveAbortProcedure(opName, ee);
} catch (KeeperException e) {
member.controllerConnectionFailure("Failed to get data for abort znode:" + abortZNode
+ zkController.getAbortZnode(), new IOException(e));
}
}
public void start(ProcedureMember listener) {
LOG.debug("Starting procedure member '" + this.memberName + "'");
this.member = listener;
this.start();
}
@Override
public void close() throws IOException {
zkController.close();
}
}

View File

@ -0,0 +1,260 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.procedure;
import java.io.Closeable;
import java.io.IOException;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.apache.zookeeper.KeeperException;
/**
* This is a shared ZooKeeper-based znode management utils for distributed procedure. All znode
* operations should go through the provided methods in coordinators and members.
*
* Layout of nodes in ZK is
* /hbase/[op name]/acquired/
* [op instance] - op data/
* /[nodes that have acquired]
* /reached/
* [op instance]/
* /[nodes that have completed]
* /abort/
* [op instance] - failure data
*
* NOTE: while acquired and completed are znode dirs, abort is actually just a znode.
*
* Assumption here that procedure names are unique
*/
@InterfaceAudience.Public
@InterfaceStability.Evolving
public abstract class ZKProcedureUtil
extends ZooKeeperListener implements Closeable {
private static final Log LOG = LogFactory.getLog(ZKProcedureUtil.class);
public static final String ACQUIRED_BARRIER_ZNODE_DEFAULT = "acquired";
public static final String REACHED_BARRIER_ZNODE_DEFAULT = "reached";
public static final String ABORT_ZNODE_DEFAULT = "abort";
public final String baseZNode;
protected final String acquiredZnode;
protected final String reachedZnode;
protected final String abortZnode;
protected final String memberName;
/**
* Top-level watcher/controller for procedures across the cluster.
* <p>
* On instantiation, this ensures the procedure znodes exists. This however requires calling
* {@link #start} to start monitoring for running procedures.
* @param watcher watcher for the cluster ZK. Owned by <tt>this</tt> and closed via
* {@link #close()}
* @param procDescription name of the znode describing the procedure to run
* @param memberName name of the member from which we are interacting with running procedures
* @throws KeeperException when the procedure znodes cannot be created
*/
public ZKProcedureUtil(ZooKeeperWatcher watcher, String procDescription,
String memberName) throws KeeperException {
super(watcher);
this.memberName = memberName;
// make sure we are listening for events
watcher.registerListener(this);
// setup paths for the zknodes used in procedures
this.baseZNode = ZKUtil.joinZNode(watcher.baseZNode, procDescription);
acquiredZnode = ZKUtil.joinZNode(baseZNode, ACQUIRED_BARRIER_ZNODE_DEFAULT);
reachedZnode = ZKUtil.joinZNode(baseZNode, REACHED_BARRIER_ZNODE_DEFAULT);
abortZnode = ZKUtil.joinZNode(baseZNode, ABORT_ZNODE_DEFAULT);
// first make sure all the ZK nodes exist
// make sure all the parents exist (sometimes not the case in tests)
ZKUtil.createWithParents(watcher, acquiredZnode);
// regular create because all the parents exist
ZKUtil.createAndFailSilent(watcher, reachedZnode);
ZKUtil.createAndFailSilent(watcher, abortZnode);
}
@Override
public void close() throws IOException {
if (watcher != null) {
watcher.close();
}
}
public String getAcquiredBarrierNode(String opInstanceName) {
return ZKProcedureUtil.getAcquireBarrierNode(this, opInstanceName);
}
public String getReachedBarrierNode(String opInstanceName) {
return ZKProcedureUtil.getReachedBarrierNode(this, opInstanceName);
}
public String getAbortZNode(String opInstanceName) {
return ZKProcedureUtil.getAbortNode(this, opInstanceName);
}
public String getAbortZnode() {
return abortZnode;
}
public String getBaseZnode() {
return baseZNode;
}
public String getAcquiredBarrier() {
return acquiredZnode;
}
public String getMemberName() {
return memberName;
}
/**
* Get the full znode path for the node used by the coordinator to trigger a global barrier
* acquire on each subprocedure.
* @param controller controller running the procedure
* @param opInstanceName name of the running procedure instance (not the procedure description).
* @return full znode path to the prepare barrier/start node
*/
public static String getAcquireBarrierNode(ZKProcedureUtil controller,
String opInstanceName) {
return ZKUtil.joinZNode(controller.acquiredZnode, opInstanceName);
}
/**
* Get the full znode path for the node used by the coordinator to trigger a global barrier
* execution and release on each subprocedure.
* @param controller controller running the procedure
* @param opInstanceName name of the running procedure instance (not the procedure description).
* @return full znode path to the commit barrier
*/
public static String getReachedBarrierNode(ZKProcedureUtil controller,
String opInstanceName) {
return ZKUtil.joinZNode(controller.reachedZnode, opInstanceName);
}
/**
* Get the full znode path for the node used by the coordinator or member to trigger an abort
* of the global barrier acquisition or execution in subprocedures.
* @param controller controller running the procedure
* @param opInstanceName name of the running procedure instance (not the procedure description).
* @return full znode path to the abort znode
*/
public static String getAbortNode(ZKProcedureUtil controller, String opInstanceName) {
return ZKUtil.joinZNode(controller.abortZnode, opInstanceName);
}
public ZooKeeperWatcher getWatcher() {
return watcher;
}
/**
* Is this a procedure related znode path?
*
* TODO: this is not strict, can return true if had name just starts with same prefix but is
* different zdir.
*
* @return true if starts with baseZnode
*/
public boolean isInProcedurePath(String path) {
return path.startsWith(baseZNode);
}
/**
* Is this in the procedure barrier acquired znode path
*/
public boolean isAcquiredPathNode(String path) {
return path.startsWith(this.acquiredZnode) && !path.equals(acquiredZnode);
}
/**
* Is this in the procedure barrier reached znode path
*/
public boolean isReachedPathNode(String path) {
return path.startsWith(this.reachedZnode) && !path.equals(reachedZnode);
}
/**
* Is this in the procedure barrier abort znode path
*/
public boolean isAbortPathNode(String path) {
return path.startsWith(this.abortZnode) && !path.equals(abortZnode);
}
// --------------------------------------------------------------------------
// internal debugging methods
// --------------------------------------------------------------------------
/**
* Recursively print the current state of ZK (non-transactional)
* @param root name of the root directory in zk to print
* @throws KeeperException
*/
public void logZKTree(String root) {
if (!LOG.isDebugEnabled()) return;
LOG.debug("Current zk system:");
String prefix = "|-";
LOG.debug(prefix + root);
try {
logZKTree(root, prefix);
} catch (KeeperException e) {
throw new RuntimeException(e);
}
}
/**
* Helper method to print the current state of the ZK tree.
* @see #logZKTree(String)
* @throws KeeperException if an unexpected exception occurs
*/
protected void logZKTree(String root, String prefix) throws KeeperException {
List<String> children = ZKUtil.listChildrenNoWatch(watcher, root);
if (children == null) return;
for (String child : children) {
LOG.debug(prefix + child);
String node = ZKUtil.joinZNode(root.equals("/") ? "" : root, child);
logZKTree(node, prefix + "---");
}
}
public void clearChildZNodes() throws KeeperException {
// TODO This is potentially racy since not atomic. update when we support zk that has multi
// If the coordinator was shutdown mid-procedure, then we are going to lose
// an procedure that was previously started by cleaning out all the previous state. Its much
// harder to figure out how to keep an procedure going and the subject of HBASE-5487.
ZKUtil.deleteChildrenRecursively(watcher, acquiredZnode);
ZKUtil.deleteChildrenRecursively(watcher, reachedZnode);
ZKUtil.deleteChildrenRecursively(watcher, abortZnode);
}
public void clearZNodes(String procedureName) throws KeeperException {
// TODO This is potentially racy since not atomic. update when we support zk that has multi
ZKUtil.deleteNodeRecursively(watcher, getAcquiredBarrierNode(procedureName));
ZKUtil.deleteNodeRecursively(watcher, getReachedBarrierNode(procedureName));
ZKUtil.deleteNodeRecursively(watcher, getAbortZNode(procedureName));
}
}

View File

@ -23,21 +23,16 @@ import java.io.IOException;
import java.io.InputStreamReader; import java.io.InputStreamReader;
import java.io.PrintWriter; import java.io.PrintWriter;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.net.InetAddress;
import java.net.Socket; import java.net.Socket;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.HashMap; import java.util.HashMap;
import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Properties;
import javax.security.auth.login.LoginException;
import javax.security.auth.login.AppConfigurationEntry; import javax.security.auth.login.AppConfigurationEntry;
import javax.security.auth.login.AppConfigurationEntry.LoginModuleControlFlag; import javax.security.auth.login.AppConfigurationEntry.LoginModuleControlFlag;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.authentication.util.KerberosUtil;
import org.apache.commons.lang.StringUtils; import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
@ -49,6 +44,8 @@ import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.authentication.util.KerberosUtil;
import org.apache.zookeeper.AsyncCallback; import org.apache.zookeeper.AsyncCallback;
import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.KeeperException;
@ -56,9 +53,9 @@ import org.apache.zookeeper.KeeperException.NoNodeException;
import org.apache.zookeeper.Watcher; import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs.Ids; import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.client.ZooKeeperSaslClient;
import org.apache.zookeeper.data.ACL; import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Stat; import org.apache.zookeeper.data.Stat;
import org.apache.zookeeper.client.ZooKeeperSaslClient;
import org.apache.zookeeper.server.ZooKeeperSaslServer; import org.apache.zookeeper.server.ZooKeeperSaslServer;
/** /**
@ -849,6 +846,10 @@ public class ZKUtil {
/** /**
* Set data into node creating node if it doesn't yet exist. * Set data into node creating node if it doesn't yet exist.
* Does not set watch. * Does not set watch.
*
* WARNING: this is not atomic -- it is possible to get a 0-byte data value in the znode before
* data is written
*
* @param zkw zk reference * @param zkw zk reference
* @param znode path of node * @param znode path of node
* @param data data to set for node * @param data data to set for node
@ -1070,7 +1071,7 @@ public class ZKUtil {
} }
/** /**
* Creates the specified node, if the node does not exist. Does not set a * Creates the specified node, iff the node does not exist. Does not set a
* watch and fails silently if the node already exists. * watch and fails silently if the node already exists.
* *
* The node created is persistent and open access. * The node created is persistent and open access.
@ -1081,12 +1082,29 @@ public class ZKUtil {
*/ */
public static void createAndFailSilent(ZooKeeperWatcher zkw, public static void createAndFailSilent(ZooKeeperWatcher zkw,
String znode) String znode)
throws KeeperException {
createAndFailSilent(zkw, znode, new byte[0]);
}
/**
* Creates the specified node containing specified data, iff the node does not exist. Does
* not set a watch and fails silently if the node already exists.
*
* The node created is persistent and open access.
*
* @param zkw zk reference
* @param znode path of node
* @param data a byte array data to store in the znode
* @throws KeeperException if unexpected zookeeper exception
*/
public static void createAndFailSilent(ZooKeeperWatcher zkw,
String znode, byte[] data)
throws KeeperException { throws KeeperException {
try { try {
RecoverableZooKeeper zk = zkw.getRecoverableZooKeeper(); RecoverableZooKeeper zk = zkw.getRecoverableZooKeeper();
waitForZKConnectionIfAuthenticating(zkw); waitForZKConnectionIfAuthenticating(zkw);
if (zk.exists(znode, false) == null) { if (zk.exists(znode, false) == null) {
zk.create(znode, new byte[0], createACL(zkw,znode), zk.create(znode, data, createACL(zkw,znode),
CreateMode.PERSISTENT); CreateMode.PERSISTENT);
} }
} catch(KeeperException.NodeExistsException nee) { } catch(KeeperException.NodeExistsException nee) {
@ -1117,13 +1135,31 @@ public class ZKUtil {
* @throws KeeperException if unexpected zookeeper exception * @throws KeeperException if unexpected zookeeper exception
*/ */
public static void createWithParents(ZooKeeperWatcher zkw, String znode) public static void createWithParents(ZooKeeperWatcher zkw, String znode)
throws KeeperException {
createWithParents(zkw, znode, new byte[0]);
}
/**
* Creates the specified node and all parent nodes required for it to exist. The creation of
* parent znodes is not atomic with the leafe znode creation but the data is written atomically
* when the leaf node is created.
*
* No watches are set and no errors are thrown if the node already exists.
*
* The nodes created are persistent and open access.
*
* @param zkw zk reference
* @param znode path of node
* @throws KeeperException if unexpected zookeeper exception
*/
public static void createWithParents(ZooKeeperWatcher zkw, String znode, byte[] data)
throws KeeperException { throws KeeperException {
try { try {
if(znode == null) { if(znode == null) {
return; return;
} }
waitForZKConnectionIfAuthenticating(zkw); waitForZKConnectionIfAuthenticating(zkw);
zkw.getRecoverableZooKeeper().create(znode, new byte[0], createACL(zkw, znode), zkw.getRecoverableZooKeeper().create(znode, data, createACL(zkw, znode),
CreateMode.PERSISTENT); CreateMode.PERSISTENT);
} catch(KeeperException.NodeExistsException nee) { } catch(KeeperException.NodeExistsException nee) {
return; return;
@ -1422,4 +1458,37 @@ public class ZKUtil {
ke.initCause(e); ke.initCause(e);
return ke; return ke;
} }
/**
* Recursively print the current state of ZK (non-transactional)
* @param root name of the root directory in zk to print
* @throws KeeperException
*/
public static void logZKTree(ZooKeeperWatcher zkw, String root) {
if (!LOG.isDebugEnabled()) return;
LOG.debug("Current zk system:");
String prefix = "|-";
LOG.debug(prefix + root);
try {
logZKTree(zkw, root, prefix);
} catch (KeeperException e) {
throw new RuntimeException(e);
}
}
/**
* Helper method to print the current state of the ZK tree.
* @see #logZKTree(String)
* @throws KeeperException if an unexpected exception occurs
*/
protected static void logZKTree(ZooKeeperWatcher zkw, String root, String prefix) throws KeeperException {
List<String> children = ZKUtil.listChildrenNoWatch(zkw, root);
if (children == null) return;
for (String child : children) {
LOG.debug(prefix + child);
String node = ZKUtil.joinZNode(root.equals("/") ? "" : root, child);
logZKTree(zkw, node, prefix + "---");
}
}
} }

View File

@ -0,0 +1,234 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.procedure;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import org.apache.hadoop.hbase.SmallTests;
import org.apache.hadoop.hbase.errorhandling.ForeignException;
import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher;
import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;
/**
* Demonstrate how Procedure handles single members, multiple members, and errors semantics
*/
@Category(SmallTests.class)
public class TestProcedure {
ProcedureCoordinator coord;
@Before
public void setup() {
coord = mock(ProcedureCoordinator.class);
final ProcedureCoordinatorRpcs comms = mock(ProcedureCoordinatorRpcs.class);
when(coord.getRpcs()).thenReturn(comms); // make it not null
}
class LatchedProcedure extends Procedure {
CountDownLatch startedAcquireBarrier = new CountDownLatch(1);
CountDownLatch startedDuringBarrier = new CountDownLatch(1);
CountDownLatch completedProcedure = new CountDownLatch(1);
public LatchedProcedure(ProcedureCoordinator coord, ForeignExceptionDispatcher monitor,
long wakeFreq, long timeout, String opName, byte[] data,
List<String> expectedMembers) {
super(coord, monitor, wakeFreq, timeout, opName, data, expectedMembers);
}
@Override
public void sendGlobalBarrierStart() {
startedAcquireBarrier.countDown();
}
@Override
public void sendGlobalBarrierReached() {
startedDuringBarrier.countDown();
}
@Override
public void sendGlobalBarrierComplete() {
completedProcedure.countDown();
}
};
/**
* With a single member, verify ordered execution. The Coordinator side is run in a separate
* thread so we can only trigger from members and wait for particular state latches.
*/
@Test(timeout = 1000)
public void testSingleMember() throws Exception {
// The member
List<String> members = new ArrayList<String>();
members.add("member");
LatchedProcedure proc = new LatchedProcedure(coord, new ForeignExceptionDispatcher(), 100,
Integer.MAX_VALUE, "op", null, members);
final LatchedProcedure procspy = spy(proc);
// coordinator: start the barrier procedure
new Thread() {
public void run() {
procspy.call();
}
}.start();
// coordinator: wait for the barrier to be acquired, then send start barrier
proc.startedAcquireBarrier.await();
// we only know that {@link Procedure#sendStartBarrier()} was called, and others are blocked.
verify(procspy).sendGlobalBarrierStart();
verify(procspy, never()).sendGlobalBarrierReached();
verify(procspy, never()).sendGlobalBarrierComplete();
verify(procspy, never()).barrierAcquiredByMember(anyString());
// member: trigger global barrier acquisition
proc.barrierAcquiredByMember(members.get(0));
// coordinator: wait for global barrier to be acquired.
proc.acquiredBarrierLatch.await();
verify(procspy).sendGlobalBarrierStart(); // old news
// since two threads, we cannot guarantee that {@link Procedure#sendSatsifiedBarrier()} was
// or was not called here.
// member: trigger global barrier release
proc.barrierReleasedByMember(members.get(0));
// coordinator: wait for procedure to be completed
proc.completedProcedure.await();
verify(procspy).sendGlobalBarrierReached();
verify(procspy).sendGlobalBarrierComplete();
verify(procspy, never()).receive(any(ForeignException.class));
}
@Test(timeout=1000)
public void testMultipleMember() throws Exception {
// 2 members
List<String> members = new ArrayList<String>();
members.add("member1");
members.add("member2");
LatchedProcedure proc = new LatchedProcedure(coord, new ForeignExceptionDispatcher(), 100,
Integer.MAX_VALUE, "op", null, members);
final LatchedProcedure procspy = spy(proc);
// start the barrier procedure
new Thread() {
public void run() {
procspy.call();
}
}.start();
// coordinator: wait for the barrier to be acquired, then send start barrier
procspy.startedAcquireBarrier.await();
// we only know that {@link Procedure#sendStartBarrier()} was called, and others are blocked.
verify(procspy).sendGlobalBarrierStart();
verify(procspy, never()).sendGlobalBarrierReached();
verify(procspy, never()).sendGlobalBarrierComplete();
verify(procspy, never()).barrierAcquiredByMember(anyString()); // no externals
// member0: [1/2] trigger global barrier acquisition.
procspy.barrierAcquiredByMember(members.get(0));
// coordinator not satisified.
verify(procspy).sendGlobalBarrierStart();
verify(procspy, never()).sendGlobalBarrierReached();
verify(procspy, never()).sendGlobalBarrierComplete();
// member 1: [2/2] trigger global barrier acquisition.
procspy.barrierAcquiredByMember(members.get(1));
// coordinator: wait for global barrier to be acquired.
procspy.startedDuringBarrier.await();
verify(procspy).sendGlobalBarrierStart(); // old news
// member 1, 2: trigger global barrier release
procspy.barrierReleasedByMember(members.get(0));
procspy.barrierReleasedByMember(members.get(1));
// coordinator wait for procedure to be completed
procspy.completedProcedure.await();
verify(procspy).sendGlobalBarrierReached();
verify(procspy).sendGlobalBarrierComplete();
verify(procspy, never()).receive(any(ForeignException.class));
}
@Test(timeout = 1000)
public void testErrorPropagation() throws Exception {
List<String> members = new ArrayList<String>();
members.add("member");
Procedure proc = new Procedure(coord, new ForeignExceptionDispatcher(), 100,
Integer.MAX_VALUE, "op", null, members);
final Procedure procspy = spy(proc);
ForeignException cause = new ForeignException("SRC", "External Exception");
proc.receive(cause);
// start the barrier procedure
Thread t = new Thread() {
public void run() {
procspy.call();
}
};
t.start();
t.join();
verify(procspy, never()).sendGlobalBarrierStart();
verify(procspy, never()).sendGlobalBarrierReached();
verify(procspy).sendGlobalBarrierComplete();
}
@Test(timeout = 1000)
public void testBarrieredErrorPropagation() throws Exception {
List<String> members = new ArrayList<String>();
members.add("member");
LatchedProcedure proc = new LatchedProcedure(coord, new ForeignExceptionDispatcher(), 100,
Integer.MAX_VALUE, "op", null, members);
final LatchedProcedure procspy = spy(proc);
// start the barrier procedure
Thread t = new Thread() {
public void run() {
procspy.call();
}
};
t.start();
// now test that we can put an error in before the commit phase runs
procspy.startedAcquireBarrier.await();
ForeignException cause = new ForeignException("SRC", "External Exception");
procspy.receive(cause);
procspy.barrierAcquiredByMember(members.get(0));
t.join();
// verify state of all the object
verify(procspy).sendGlobalBarrierStart();
verify(procspy).sendGlobalBarrierComplete();
verify(procspy, never()).sendGlobalBarrierReached();
}
}

View File

@ -0,0 +1,349 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.procedure;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyListOf;
import static org.mockito.Matchers.anyString;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ThreadPoolExecutor;
import org.apache.hadoop.hbase.SmallTests;
import org.apache.hadoop.hbase.errorhandling.ForeignException;
import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher;
import org.junit.After;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.mockito.InOrder;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import com.google.common.collect.Lists;
/**
* Test Procedure coordinator operation.
* <p>
* This only works correctly when we do <i>class level parallelization</i> of tests. If we do method
* level serialization this class will likely throw all kinds of errors.
*/
@Category(SmallTests.class)
public class TestProcedureCoordinator {
// general test constants
private static final long WAKE_FREQUENCY = 1000;
private static final long TIMEOUT = 100000;
private static final long POOL_KEEP_ALIVE = 1;
private static final String nodeName = "node";
private static final String procName = "some op";
private static final byte[] procData = new byte[0];
private static final List<String> expected = Lists.newArrayList("remote1", "remote2");
// setup the mocks
private final ProcedureCoordinatorRpcs controller = mock(ProcedureCoordinatorRpcs.class);
private final Procedure task = mock(Procedure.class);
private final ForeignExceptionDispatcher monitor = mock(ForeignExceptionDispatcher.class);
// handle to the coordinator for each test
private ProcedureCoordinator coordinator;
@After
public void resetTest() throws IOException {
// reset all the mocks used for the tests
reset(controller, task, monitor);
// close the open coordinator, if it was used
if (coordinator != null) coordinator.close();
}
private ProcedureCoordinator buildNewCoordinator() {
ThreadPoolExecutor pool = ProcedureCoordinator.defaultPool(nodeName, POOL_KEEP_ALIVE, 1, WAKE_FREQUENCY);
return spy(new ProcedureCoordinator(controller, pool));
}
/**
* Currently we can only handle one procedure at a time. This makes sure we handle that and
* reject submitting more.
*/
@Test
public void testThreadPoolSize() throws Exception {
ProcedureCoordinator coordinator = buildNewCoordinator();
Procedure proc = new Procedure(coordinator, monitor,
WAKE_FREQUENCY, TIMEOUT, procName, procData, expected);
Procedure procSpy = spy(proc);
Procedure proc2 = new Procedure(coordinator, monitor,
WAKE_FREQUENCY, TIMEOUT, procName +"2", procData, expected);
Procedure procSpy2 = spy(proc2);
when(coordinator.createProcedure(any(ForeignExceptionDispatcher.class), eq(procName), eq(procData), anyListOf(String.class)))
.thenReturn(procSpy, procSpy2);
coordinator.startProcedure(procSpy.getErrorMonitor(), procName, procData, expected);
// null here means second procedure failed to start.
assertNull("Coordinator successfully ran two tasks at once with a single thread pool.",
coordinator.startProcedure(proc2.getErrorMonitor(), "another op", procData, expected));
}
/**
* Check handling a connection failure correctly if we get it during the acquiring phase
*/
@Test(timeout = 5000)
public void testUnreachableControllerDuringPrepare() throws Exception {
coordinator = buildNewCoordinator();
// setup the proc
List<String> expected = Arrays.asList("cohort");
Procedure proc = new Procedure(coordinator, WAKE_FREQUENCY,
TIMEOUT, procName, procData, expected);
final Procedure procSpy = spy(proc);
when(coordinator.createProcedure(any(ForeignExceptionDispatcher.class), eq(procName), eq(procData), anyListOf(String.class)))
.thenReturn(procSpy);
// use the passed controller responses
IOException cause = new IOException("Failed to reach comms during acquire");
doThrow(cause).when(controller)
.sendGlobalBarrierAcquire(eq(procSpy), eq(procData), anyListOf(String.class));
// run the operation
proc = coordinator.startProcedure(proc.getErrorMonitor(), procName, procData, expected);
// and wait for it to finish
proc.waitForCompleted();
verify(procSpy, atLeastOnce()).receive(any(ForeignException.class));
verify(coordinator, times(1)).rpcConnectionFailure(anyString(), eq(cause));
verify(controller, times(1)).sendGlobalBarrierAcquire(procSpy, procData, expected);
verify(controller, never()).sendGlobalBarrierReached(any(Procedure.class),
anyListOf(String.class));
}
/**
* Check handling a connection failure correctly if we get it during the barrier phase
*/
@Test(timeout = 5000)
public void testUnreachableControllerDuringCommit() throws Exception {
coordinator = buildNewCoordinator();
// setup the task and spy on it
List<String> expected = Arrays.asList("cohort");
final Procedure spy = spy(new Procedure(coordinator,
WAKE_FREQUENCY, TIMEOUT, procName, procData, expected));
when(coordinator.createProcedure(any(ForeignExceptionDispatcher.class), eq(procName), eq(procData), anyListOf(String.class)))
.thenReturn(spy);
// use the passed controller responses
IOException cause = new IOException("Failed to reach controller during prepare");
doAnswer(new AcquireBarrierAnswer(procName, new String[] { "cohort" }))
.when(controller).sendGlobalBarrierAcquire(eq(spy), eq(procData), anyListOf(String.class));
doThrow(cause).when(controller).sendGlobalBarrierReached(eq(spy), anyListOf(String.class));
// run the operation
Procedure task = coordinator.startProcedure(spy.getErrorMonitor(), procName, procData, expected);
// and wait for it to finish
task.waitForCompleted();
verify(spy, atLeastOnce()).receive(any(ForeignException.class));
verify(coordinator, times(1)).rpcConnectionFailure(anyString(), eq(cause));
verify(controller, times(1)).sendGlobalBarrierAcquire(eq(spy),
eq(procData), anyListOf(String.class));
verify(controller, times(1)).sendGlobalBarrierReached(any(Procedure.class),
anyListOf(String.class));
}
@Test(timeout = 1000)
public void testNoCohort() throws Exception {
runSimpleProcedure();
}
@Test(timeout = 1000)
public void testSingleCohortOrchestration() throws Exception {
runSimpleProcedure("one");
}
@Test(timeout = 1000)
public void testMultipleCohortOrchestration() throws Exception {
runSimpleProcedure("one", "two", "three", "four");
}
public void runSimpleProcedure(String... members) throws Exception {
coordinator = buildNewCoordinator();
Procedure task = new Procedure(coordinator, monitor, WAKE_FREQUENCY,
TIMEOUT, procName, procData, Arrays.asList(members));
final Procedure spy = spy(task);
runCoordinatedProcedure(spy, members);
}
/**
* Test that if nodes join the barrier early we still correctly handle the progress
*/
@Test(timeout = 1000)
public void testEarlyJoiningBarrier() throws Exception {
final String[] cohort = new String[] { "one", "two", "three", "four" };
coordinator = buildNewCoordinator();
final ProcedureCoordinator ref = coordinator;
Procedure task = new Procedure(coordinator, monitor, WAKE_FREQUENCY,
TIMEOUT, procName, procData, Arrays.asList(cohort));
final Procedure spy = spy(task);
AcquireBarrierAnswer prepare = new AcquireBarrierAnswer(procName, cohort) {
public void doWork() {
// then do some fun where we commit before all nodes have prepared
// "one" commits before anyone else is done
ref.memberAcquiredBarrier(this.opName, this.cohort[0]);
ref.memberFinishedBarrier(this.opName, this.cohort[0]);
// but "two" takes a while
ref.memberAcquiredBarrier(this.opName, this.cohort[1]);
// "three"jumps ahead
ref.memberAcquiredBarrier(this.opName, this.cohort[2]);
ref.memberFinishedBarrier(this.opName, this.cohort[2]);
// and "four" takes a while
ref.memberAcquiredBarrier(this.opName, this.cohort[3]);
}
};
BarrierAnswer commit = new BarrierAnswer(procName, cohort) {
@Override
public void doWork() {
ref.memberFinishedBarrier(opName, this.cohort[1]);
ref.memberFinishedBarrier(opName, this.cohort[3]);
}
};
runCoordinatedOperation(spy, prepare, commit, cohort);
}
/**
* Just run a procedure with the standard name and data, with not special task for the mock
* coordinator (it works just like a regular coordinator). For custom behavior see
* {@link #runCoordinatedOperation(Procedure, AcquireBarrierAnswer, BarrierAnswer, String[])}
* .
* @param spy Spy on a real {@link Procedure}
* @param cohort expected cohort members
* @throws Exception on failure
*/
public void runCoordinatedProcedure(Procedure spy, String... cohort) throws Exception {
runCoordinatedOperation(spy, new AcquireBarrierAnswer(procName, cohort),
new BarrierAnswer(procName, cohort), cohort);
}
public void runCoordinatedOperation(Procedure spy, AcquireBarrierAnswer prepare,
String... cohort) throws Exception {
runCoordinatedOperation(spy, prepare, new BarrierAnswer(procName, cohort), cohort);
}
public void runCoordinatedOperation(Procedure spy, BarrierAnswer commit,
String... cohort) throws Exception {
runCoordinatedOperation(spy, new AcquireBarrierAnswer(procName, cohort), commit, cohort);
}
public void runCoordinatedOperation(Procedure spy, AcquireBarrierAnswer prepareOperation,
BarrierAnswer commitOperation, String... cohort) throws Exception {
List<String> expected = Arrays.asList(cohort);
when(coordinator.createProcedure(any(ForeignExceptionDispatcher.class), eq(procName), eq(procData), anyListOf(String.class)))
.thenReturn(spy);
// use the passed controller responses
doAnswer(prepareOperation).when(controller).sendGlobalBarrierAcquire(spy, procData, expected);
doAnswer(commitOperation).when(controller)
.sendGlobalBarrierReached(eq(spy), anyListOf(String.class));
// run the operation
Procedure task = coordinator.startProcedure(spy.getErrorMonitor(), procName, procData, expected);
// and wait for it to finish
task.waitForCompleted();
// make sure we mocked correctly
prepareOperation.ensureRan();
// we never got an exception
InOrder inorder = inOrder(spy, controller);
inorder.verify(spy).sendGlobalBarrierStart();
inorder.verify(controller).sendGlobalBarrierAcquire(task, procData, expected);
inorder.verify(spy).sendGlobalBarrierReached();
inorder.verify(controller).sendGlobalBarrierReached(eq(task), anyListOf(String.class));
}
private abstract class OperationAnswer implements Answer<Void> {
private boolean ran = false;
public void ensureRan() {
assertTrue("Prepare mocking didn't actually run!", ran);
}
@Override
public final Void answer(InvocationOnMock invocation) throws Throwable {
this.ran = true;
doWork();
return null;
}
protected abstract void doWork() throws Throwable;
}
/**
* Just tell the current coordinator that each of the nodes has prepared
*/
private class AcquireBarrierAnswer extends OperationAnswer {
protected final String[] cohort;
protected final String opName;
public AcquireBarrierAnswer(String opName, String... cohort) {
this.cohort = cohort;
this.opName = opName;
}
@Override
public void doWork() {
if (cohort == null) return;
for (String member : cohort) {
TestProcedureCoordinator.this.coordinator.memberAcquiredBarrier(opName, member);
}
}
}
/**
* Just tell the current coordinator that each of the nodes has committed
*/
private class BarrierAnswer extends OperationAnswer {
protected final String[] cohort;
protected final String opName;
public BarrierAnswer(String opName, String... cohort) {
this.cohort = cohort;
this.opName = opName;
}
@Override
public void doWork() {
if (cohort == null) return;
for (String member : cohort) {
TestProcedureCoordinator.this.coordinator.memberFinishedBarrier(opName, member);
}
}
}
}

View File

@ -0,0 +1,444 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.procedure;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyString;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyZeroInteractions;
import static org.mockito.Mockito.when;
import java.io.IOException;
import java.util.concurrent.ThreadPoolExecutor;
import org.apache.hadoop.hbase.SmallTests;
import org.apache.hadoop.hbase.errorhandling.ForeignException;
import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher;
import org.apache.hadoop.hbase.errorhandling.TimeoutException;
import org.apache.hadoop.hbase.procedure.Subprocedure.SubprocedureImpl;
import org.junit.After;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.mockito.InOrder;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
/**
* Test the procedure member, and it's error handling mechanisms.
*/
@Category(SmallTests.class)
public class TestProcedureMember {
private static final long WAKE_FREQUENCY = 100;
private static final long TIMEOUT = 100000;
private static final long POOL_KEEP_ALIVE = 1;
private final String op = "some op";
private final byte[] data = new byte[0];
private final ForeignExceptionDispatcher mockListener = Mockito
.spy(new ForeignExceptionDispatcher());
private final SubprocedureFactory mockBuilder = mock(SubprocedureFactory.class);
private final ProcedureMemberRpcs mockMemberComms = Mockito
.mock(ProcedureMemberRpcs.class);
private ProcedureMember member;
private ForeignExceptionDispatcher dispatcher;
Subprocedure spySub;
/**
* Reset all the mock objects
*/
@After
public void resetTest() {
reset(mockListener, mockBuilder, mockMemberComms);
if (member != null)
try {
member.close();
} catch (IOException e) {
e.printStackTrace();
}
}
/**
* Build a member using the class level mocks
* @return member to use for tests
*/
private ProcedureMember buildCohortMember() {
String name = "node";
ThreadPoolExecutor pool = ProcedureMember.defaultPool(WAKE_FREQUENCY, POOL_KEEP_ALIVE, 1, name);
return new ProcedureMember(mockMemberComms, pool, mockBuilder);
}
/**
* Setup a procedure member that returns the spied-upon {@link Subprocedure}.
*/
private void buildCohortMemberPair() throws IOException {
dispatcher = new ForeignExceptionDispatcher();
String name = "node";
ThreadPoolExecutor pool = ProcedureMember.defaultPool(WAKE_FREQUENCY, POOL_KEEP_ALIVE, 1, name);
member = new ProcedureMember(mockMemberComms, pool, mockBuilder);
when(mockMemberComms.getMemberName()).thenReturn("membername"); // needed for generating exception
Subprocedure subproc = new EmptySubprocedure(member, dispatcher);
spySub = spy(subproc);
when(mockBuilder.buildSubprocedure(op, data)).thenReturn(spySub);
addCommitAnswer();
}
/**
* Add a 'in barrier phase' response to the mock controller when it gets a acquired notification
*/
private void addCommitAnswer() throws IOException {
doAnswer(new Answer<Void>() {
@Override
public Void answer(InvocationOnMock invocation) throws Throwable {
member.receivedReachedGlobalBarrier(op);
return null;
}
}).when(mockMemberComms).sendMemberAcquired(any(Subprocedure.class));
}
/**
* Test the normal sub procedure execution case.
*/
@Test(timeout = 500)
public void testSimpleRun() throws Exception {
member = buildCohortMember();
EmptySubprocedure subproc = new EmptySubprocedure(member, mockListener);
EmptySubprocedure spy = spy(subproc);
when(mockBuilder.buildSubprocedure(op, data)).thenReturn(spy);
// when we get a prepare, then start the commit phase
addCommitAnswer();
// run the operation
// build a new operation
Subprocedure subproc1 = member.createSubprocedure(op, data);
member.submitSubprocedure(subproc1);
// and wait for it to finish
subproc.waitForLocallyCompleted();
// make sure everything ran in order
InOrder order = inOrder(mockMemberComms, spy);
order.verify(spy).acquireBarrier();
order.verify(mockMemberComms).sendMemberAcquired(eq(spy));
order.verify(spy).insideBarrier();
order.verify(mockMemberComms).sendMemberCompleted(eq(spy));
order.verify(mockMemberComms, never()).sendMemberAborted(eq(spy),
any(ForeignException.class));
}
/**
* Make sure we call cleanup etc, when we have an exception during
* {@link Subprocedure#acquireBarrier()}.
*/
@Test(timeout = 1000)
public void testMemberPrepareException() throws Exception {
buildCohortMemberPair();
// mock an exception on Subprocedure's prepare
doAnswer(
new Answer<Void>() {
@Override
public Void answer(InvocationOnMock invocation) throws Throwable {
throw new IOException("Forced IOException in member acquireBarrier");
}
}).when(spySub).acquireBarrier();
// run the operation
// build a new operation
Subprocedure subproc = member.createSubprocedure(op, data);
member.submitSubprocedure(subproc);
// if the operation doesn't die properly, then this will timeout
member.closeAndWait(TIMEOUT);
// make sure everything ran in order
InOrder order = inOrder(mockMemberComms, spySub);
order.verify(spySub).acquireBarrier();
// Later phases not run
order.verify(mockMemberComms, never()).sendMemberAcquired(eq(spySub));
order.verify(spySub, never()).insideBarrier();
order.verify(mockMemberComms, never()).sendMemberCompleted(eq(spySub));
// error recovery path exercised
order.verify(spySub).cancel(anyString(), any(Exception.class));
order.verify(spySub).cleanup(any(Exception.class));
}
/**
* Make sure we call cleanup etc, when we have an exception during prepare.
*/
@Test(timeout = 1000)
public void testSendMemberAcquiredCommsFailure() throws Exception {
buildCohortMemberPair();
// mock an exception on Subprocedure's prepare
doAnswer(
new Answer<Void>() {
@Override
public Void answer(InvocationOnMock invocation) throws Throwable {
throw new IOException("Forced IOException in memeber prepare");
}
}).when(mockMemberComms).sendMemberAcquired(any(Subprocedure.class));
// run the operation
// build a new operation
Subprocedure subproc = member.createSubprocedure(op, data);
member.submitSubprocedure(subproc);
// if the operation doesn't die properly, then this will timeout
member.closeAndWait(TIMEOUT);
// make sure everything ran in order
InOrder order = inOrder(mockMemberComms, spySub);
order.verify(spySub).acquireBarrier();
order.verify(mockMemberComms).sendMemberAcquired(eq(spySub));
// Later phases not run
order.verify(spySub, never()).insideBarrier();
order.verify(mockMemberComms, never()).sendMemberCompleted(eq(spySub));
// error recovery path exercised
order.verify(spySub).cancel(anyString(), any(Exception.class));
order.verify(spySub).cleanup(any(Exception.class));
}
/**
* Fail correctly if coordinator aborts the procedure. The subprocedure will not interrupt a
* running {@link Subprocedure#prepare} -- prepare needs to finish first, and the the abort
* is checked. Thus, the {@link Subprocedure#prepare} should succeed but later get rolled back
* via {@link Subprocedure#cleanup}.
*/
@Test(timeout = 1000)
public void testCoordinatorAbort() throws Exception {
buildCohortMemberPair();
// mock that another node timed out or failed to prepare
final TimeoutException oate = new TimeoutException("bogus timeout", 1,2,0);
doAnswer(
new Answer<Void>() {
@Override
public Void answer(InvocationOnMock invocation) throws Throwable {
// inject a remote error (this would have come from an external thread)
spySub.cancel("bogus message", oate);
// sleep the wake frequency since that is what we promised
Thread.sleep(WAKE_FREQUENCY);
return null;
}
}).when(spySub).waitForReachedGlobalBarrier();
// run the operation
// build a new operation
Subprocedure subproc = member.createSubprocedure(op, data);
member.submitSubprocedure(subproc);
// if the operation doesn't die properly, then this will timeout
member.closeAndWait(TIMEOUT);
// make sure everything ran in order
InOrder order = inOrder(mockMemberComms, spySub);
order.verify(spySub).acquireBarrier();
order.verify(mockMemberComms).sendMemberAcquired(eq(spySub));
// Later phases not run
order.verify(spySub, never()).insideBarrier();
order.verify(mockMemberComms, never()).sendMemberCompleted(eq(spySub));
// error recovery path exercised
order.verify(spySub).cancel(anyString(), any(Exception.class));
order.verify(spySub).cleanup(any(Exception.class));
}
/**
* Handle failures if a member's commit phase fails.
*
* NOTE: This is the core difference that makes this different from traditional 2PC. In true
* 2PC the transaction is committed just before the coordinator sends commit messages to the
* member. Members are then responsible for reading its TX log. This implementation actually
* rolls back, and thus breaks the normal TX guarantees.
*/
@Test(timeout = 1000)
public void testMemberCommitException() throws Exception {
buildCohortMemberPair();
// mock an exception on Subprocedure's prepare
doAnswer(
new Answer<Void>() {
@Override
public Void answer(InvocationOnMock invocation) throws Throwable {
throw new IOException("Forced IOException in memeber prepare");
}
}).when(spySub).insideBarrier();
// run the operation
// build a new operation
Subprocedure subproc = member.createSubprocedure(op, data);
member.submitSubprocedure(subproc);
// if the operation doesn't die properly, then this will timeout
member.closeAndWait(TIMEOUT);
// make sure everything ran in order
InOrder order = inOrder(mockMemberComms, spySub);
order.verify(spySub).acquireBarrier();
order.verify(mockMemberComms).sendMemberAcquired(eq(spySub));
order.verify(spySub).insideBarrier();
// Later phases not run
order.verify(mockMemberComms, never()).sendMemberCompleted(eq(spySub));
// error recovery path exercised
order.verify(spySub).cancel(anyString(), any(Exception.class));
order.verify(spySub).cleanup(any(Exception.class));
}
/**
* Handle Failures if a member's commit phase succeeds but notification to coordinator fails
*
* NOTE: This is the core difference that makes this different from traditional 2PC. In true
* 2PC the transaction is committed just before the coordinator sends commit messages to the
* member. Members are then responsible for reading its TX log. This implementation actually
* rolls back, and thus breaks the normal TX guarantees.
*/
@Test(timeout = 1000)
public void testMemberCommitCommsFailure() throws Exception {
buildCohortMemberPair();
final TimeoutException oate = new TimeoutException("bogus timeout",1,2,0);
doAnswer(
new Answer<Void>() {
@Override
public Void answer(InvocationOnMock invocation) throws Throwable {
// inject a remote error (this would have come from an external thread)
spySub.cancel("commit comms fail", oate);
// sleep the wake frequency since that is what we promised
Thread.sleep(WAKE_FREQUENCY);
return null;
}
}).when(mockMemberComms).sendMemberCompleted(any(Subprocedure.class));
// run the operation
// build a new operation
Subprocedure subproc = member.createSubprocedure(op, data);
member.submitSubprocedure(subproc);
// if the operation doesn't die properly, then this will timeout
member.closeAndWait(TIMEOUT);
// make sure everything ran in order
InOrder order = inOrder(mockMemberComms, spySub);
order.verify(spySub).acquireBarrier();
order.verify(mockMemberComms).sendMemberAcquired(eq(spySub));
order.verify(spySub).insideBarrier();
order.verify(mockMemberComms).sendMemberCompleted(eq(spySub));
// error recovery path exercised
order.verify(spySub).cancel(anyString(), any(Exception.class));
order.verify(spySub).cleanup(any(Exception.class));
}
/**
* Fail correctly on getting an external error while waiting for the prepared latch
* @throws Exception on failure
*/
@Test(timeout = 1000)
public void testPropagateConnectionErrorBackToManager() throws Exception {
// setup the operation
member = buildCohortMember();
ProcedureMember memberSpy = spy(member);
// setup the commit and the spy
final ForeignExceptionDispatcher dispatcher = new ForeignExceptionDispatcher();
ForeignExceptionDispatcher dispSpy = spy(dispatcher);
Subprocedure commit = new EmptySubprocedure(member, dispatcher);
Subprocedure spy = spy(commit);
when(mockBuilder.buildSubprocedure(op, data)).thenReturn(spy);
// fail during the prepare phase
doThrow(new ForeignException("SRC", "prepare exception")).when(spy).acquireBarrier();
// and throw a connection error when we try to tell the controller about it
doThrow(new IOException("Controller is down!")).when(mockMemberComms)
.sendMemberAborted(eq(spy), any(ForeignException.class));
// run the operation
// build a new operation
Subprocedure subproc = memberSpy.createSubprocedure(op, data);
memberSpy.submitSubprocedure(subproc);
// if the operation doesn't die properly, then this will timeout
memberSpy.closeAndWait(TIMEOUT);
// make sure everything ran in order
InOrder order = inOrder(mockMemberComms, spy, dispSpy);
// make sure we acquire.
order.verify(spy).acquireBarrier();
order.verify(mockMemberComms, never()).sendMemberAcquired(spy);
// TODO Need to do another refactor to get this to propagate to the coordinator.
// make sure we pass a remote exception back the controller
// order.verify(mockMemberComms).sendMemberAborted(eq(spy),
// any(ExternalException.class));
// order.verify(dispSpy).receiveError(anyString(),
// any(ExternalException.class), any());
}
/**
* Test that the cohort member correctly doesn't attempt to start a task when the builder cannot
* correctly build a new task for the requested operation
* @throws Exception on failure
*/
@Test
public void testNoTaskToBeRunFromRequest() throws Exception {
ThreadPoolExecutor pool = mock(ThreadPoolExecutor.class);
when(mockBuilder.buildSubprocedure(op, data)).thenReturn(null)
.thenThrow(new IllegalStateException("Wrong state!"), new IllegalArgumentException("can't understand the args"));
member = new ProcedureMember(mockMemberComms, pool, mockBuilder);
// builder returns null
// build a new operation
Subprocedure subproc = member.createSubprocedure(op, data);
member.submitSubprocedure(subproc);
// throws an illegal state exception
try {
// build a new operation
Subprocedure subproc2 = member.createSubprocedure(op, data);
member.submitSubprocedure(subproc2);
} catch (IllegalStateException ise) {
}
// throws an illegal argument exception
try {
// build a new operation
Subprocedure subproc3 = member.createSubprocedure(op, data);
member.submitSubprocedure(subproc3);
} catch (IllegalArgumentException iae) {
}
// no request should reach the pool
verifyZeroInteractions(pool);
// get two abort requests
// TODO Need to do another refactor to get this to propagate to the coordinator.
// verify(mockMemberComms, times(2)).sendMemberAborted(any(Subprocedure.class), any(ExternalException.class));
}
/**
* Helper {@link Procedure} who's phase for each step is just empty
*/
public class EmptySubprocedure extends SubprocedureImpl {
public EmptySubprocedure(ProcedureMember member, ForeignExceptionDispatcher dispatcher) {
super( member, op, dispatcher,
// TODO 1000000 is an arbitrary number that I picked.
WAKE_FREQUENCY, TIMEOUT);
}
}
}

View File

@ -0,0 +1,405 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.procedure;
import static org.junit.Assert.assertEquals;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyListOf;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.MediumTests;
import org.apache.hadoop.hbase.errorhandling.ForeignException;
import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher;
import org.apache.hadoop.hbase.errorhandling.TimeoutException;
import org.apache.hadoop.hbase.procedure.Subprocedure.SubprocedureImpl;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.mockito.Mockito;
import org.mockito.internal.matchers.ArrayEquals;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.mockito.verification.VerificationMode;
import com.google.common.collect.Lists;
/**
* Cluster-wide testing of a distributed three-phase commit using a 'real' zookeeper cluster
*/
@Category(MediumTests.class)
public class TestZKProcedure {
private static final Log LOG = LogFactory.getLog(TestZKProcedure.class);
private static HBaseTestingUtility UTIL = new HBaseTestingUtility();
private static final String COORDINATOR_NODE_NAME = "coordinator";
private static final long KEEP_ALIVE = 100; // seconds
private static final int POOL_SIZE = 1;
private static final long TIMEOUT = 10000; // when debugging make this larger for debugging
private static final long WAKE_FREQUENCY = 500;
private static final String opName = "op";
private static final byte[] data = new byte[] { 1, 2 }; // TODO what is this used for?
private static final VerificationMode once = Mockito.times(1);
@BeforeClass
public static void setupTest() throws Exception {
UTIL.startMiniZKCluster();
}
@AfterClass
public static void cleanupTest() throws Exception {
UTIL.shutdownMiniZKCluster();
}
private static ZooKeeperWatcher newZooKeeperWatcher() throws IOException {
return new ZooKeeperWatcher(UTIL.getConfiguration(), "testing utility", new Abortable() {
@Override
public void abort(String why, Throwable e) {
throw new RuntimeException(
"Unexpected abort in distributed three phase commit test:" + why, e);
}
@Override
public boolean isAborted() {
return false;
}
});
}
@Test
public void testEmptyMemberSet() throws Exception {
runCommit();
}
@Test
public void testSingleMember() throws Exception {
runCommit("one");
}
@Test
public void testMultipleMembers() throws Exception {
runCommit("one", "two", "three", "four" );
}
private void runCommit(String... members) throws Exception {
// make sure we just have an empty list
if (members == null) {
members = new String[0];
}
List<String> expected = Arrays.asList(members);
// setup the constants
ZooKeeperWatcher coordZkw = newZooKeeperWatcher();
String opDescription = "coordination test - " + members.length + " cohort members";
// start running the controller
ZKProcedureCoordinatorRpcs coordinatorComms = new ZKProcedureCoordinatorRpcs(
coordZkw, opDescription, COORDINATOR_NODE_NAME);
ThreadPoolExecutor pool = ProcedureCoordinator.defaultPool(COORDINATOR_NODE_NAME, KEEP_ALIVE, POOL_SIZE, WAKE_FREQUENCY);
ProcedureCoordinator coordinator = new ProcedureCoordinator(coordinatorComms, pool) {
@Override
public Procedure createProcedure(ForeignExceptionDispatcher fed, String procName, byte[] procArgs,
List<String> expectedMembers) {
return Mockito.spy(super.createProcedure(fed, procName, procArgs, expectedMembers));
}
};
// build and start members
// NOTE: There is a single subprocedure builder for all members here.
SubprocedureFactory subprocFactory = Mockito.mock(SubprocedureFactory.class);
List<Pair<ProcedureMember, ZKProcedureMemberRpcs>> procMembers = new ArrayList<Pair<ProcedureMember, ZKProcedureMemberRpcs>>(
members.length);
// start each member
for (String member : members) {
ZooKeeperWatcher watcher = newZooKeeperWatcher();
ZKProcedureMemberRpcs comms = new ZKProcedureMemberRpcs(watcher, opDescription, member);
ThreadPoolExecutor pool2 = ProcedureMember.defaultPool(WAKE_FREQUENCY, KEEP_ALIVE, 1, member);
ProcedureMember procMember = new ProcedureMember(comms, pool2, subprocFactory);
procMembers.add(new Pair<ProcedureMember, ZKProcedureMemberRpcs>(procMember, comms));
comms.start(procMember);
}
// setup mock member subprocedures
final List<Subprocedure> subprocs = new ArrayList<Subprocedure>();
for (int i = 0; i < procMembers.size(); i++) {
ForeignExceptionDispatcher cohortMonitor = new ForeignExceptionDispatcher();
Subprocedure commit = Mockito
.spy(new SubprocedureImpl(procMembers.get(i).getFirst(), opName, cohortMonitor,
TIMEOUT, WAKE_FREQUENCY));
subprocs.add(commit);
}
// link subprocedure to buildNewOperation invocation.
final AtomicInteger i = new AtomicInteger(0); // NOTE: would be racy if not an AtomicInteger
Mockito.when(subprocFactory.buildSubprocedure(Mockito.eq(opName),
(byte[]) Mockito.argThat(new ArrayEquals(data)))).thenAnswer(
new Answer<Subprocedure>() {
@Override
public Subprocedure answer(InvocationOnMock invocation) throws Throwable {
int index = i.getAndIncrement();
LOG.debug("Task size:" + subprocs.size() + ", getting:" + index);
Subprocedure commit = subprocs.get(index);
return commit;
}
});
// setup spying on the coordinator
// Procedure proc = Mockito.spy(procBuilder.createProcedure(coordinator, opName, data, expected));
// Mockito.when(procBuilder.build(coordinator, opName, data, expected)).thenReturn(proc);
// start running the operation
Procedure task = coordinator.startProcedure(new ForeignExceptionDispatcher(), opName, data, expected);
// assertEquals("Didn't mock coordinator task", proc, task);
// verify all things ran as expected
// waitAndVerifyProc(proc, once, once, never(), once, false);
waitAndVerifyProc(task, once, once, never(), once, false);
verifyCohortSuccessful(expected, subprocFactory, subprocs, once, once, never(), once, false);
// close all the things
closeAll(coordinator, coordinatorComms, procMembers);
}
/**
* Test a distributed commit with multiple cohort members, where one of the cohort members has a
* timeout exception during the prepare stage.
*/
@Test
public void testMultiCohortWithMemberTimeoutDuringPrepare() throws Exception {
String opDescription = "error injection coordination";
String[] cohortMembers = new String[] { "one", "two", "three" };
List<String> expected = Lists.newArrayList(cohortMembers);
// error constants
final int memberErrorIndex = 2;
final CountDownLatch coordinatorReceivedErrorLatch = new CountDownLatch(1);
// start running the coordinator and its controller
ZooKeeperWatcher coordinatorWatcher = newZooKeeperWatcher();
ZKProcedureCoordinatorRpcs coordinatorController = new ZKProcedureCoordinatorRpcs(
coordinatorWatcher, opDescription, COORDINATOR_NODE_NAME);
ThreadPoolExecutor pool = ProcedureCoordinator.defaultPool(COORDINATOR_NODE_NAME, KEEP_ALIVE, POOL_SIZE, WAKE_FREQUENCY);
ProcedureCoordinator coordinator = spy(new ProcedureCoordinator(coordinatorController, pool));
// start a member for each node
SubprocedureFactory subprocFactory = Mockito.mock(SubprocedureFactory.class);
List<Pair<ProcedureMember, ZKProcedureMemberRpcs>> members = new ArrayList<Pair<ProcedureMember, ZKProcedureMemberRpcs>>(
expected.size());
for (String member : expected) {
ZooKeeperWatcher watcher = newZooKeeperWatcher();
ZKProcedureMemberRpcs controller = new ZKProcedureMemberRpcs(watcher, opDescription, member);
ThreadPoolExecutor pool2 = ProcedureMember.defaultPool(WAKE_FREQUENCY, KEEP_ALIVE, 1, member);
ProcedureMember mem = new ProcedureMember(controller, pool2, subprocFactory);
members.add(new Pair<ProcedureMember, ZKProcedureMemberRpcs>(mem, controller));
controller.start(mem);
}
// setup mock subprocedures
final List<Subprocedure> cohortTasks = new ArrayList<Subprocedure>();
final int[] elem = new int[1];
for (int i = 0; i < members.size(); i++) {
ForeignExceptionDispatcher cohortMonitor = new ForeignExceptionDispatcher();
ProcedureMember comms = members.get(i).getFirst();
Subprocedure commit = Mockito
.spy(new SubprocedureImpl(comms, opName, cohortMonitor, TIMEOUT, WAKE_FREQUENCY));
// This nasty bit has one of the impls throw a TimeoutException
Mockito.doAnswer(new Answer<Void>() {
@Override
public Void answer(InvocationOnMock invocation) throws Throwable {
int index = elem[0];
if (index == memberErrorIndex) {
LOG.debug("Sending error to coordinator");
ForeignException remoteCause = new ForeignException("TIMER",
new TimeoutException("subprocTimeout" , 1, 2, 0));
Subprocedure r = ((Subprocedure) invocation.getMock());
LOG.error("Remote commit failure, not propagating error:" + remoteCause);
r.monitor.receive(remoteCause);
// don't complete the error phase until the coordinator has gotten the error
// notification (which ensures that we never progress past prepare)
try {
Procedure.waitForLatch(coordinatorReceivedErrorLatch, new ForeignExceptionDispatcher(),
WAKE_FREQUENCY, "coordinator received error");
} catch (InterruptedException e) {
LOG.debug("Wait for latch interrupted, done:" + (coordinatorReceivedErrorLatch.getCount() == 0));
// reset the interrupt status on the thread
Thread.currentThread().interrupt();
}
}
elem[0] = ++index;
return null;
}
}).when(commit).acquireBarrier();
cohortTasks.add(commit);
}
// pass out a task per member
final int[] i = new int[] { 0 };
Mockito.when(
subprocFactory.buildSubprocedure(Mockito.eq(opName),
(byte[]) Mockito.argThat(new ArrayEquals(data)))).thenAnswer(
new Answer<Subprocedure>() {
@Override
public Subprocedure answer(InvocationOnMock invocation) throws Throwable {
int index = i[0];
Subprocedure commit = cohortTasks.get(index);
index++;
i[0] = index;
return commit;
}
});
// setup spying on the coordinator
ForeignExceptionDispatcher coordinatorTaskErrorMonitor = Mockito
.spy(new ForeignExceptionDispatcher());
Procedure coordinatorTask = Mockito.spy(new Procedure(coordinator,
coordinatorTaskErrorMonitor, WAKE_FREQUENCY, TIMEOUT,
opName, data, expected));
when(coordinator.createProcedure(any(ForeignExceptionDispatcher.class), eq(opName), eq(data), anyListOf(String.class)))
.thenReturn(coordinatorTask);
// count down the error latch when we get the remote error
Mockito.doAnswer(new Answer<Void>() {
@Override
public Void answer(InvocationOnMock invocation) throws Throwable {
// pass on the error to the master
invocation.callRealMethod();
// then count down the got error latch
coordinatorReceivedErrorLatch.countDown();
return null;
}
}).when(coordinatorTask).receive(Mockito.any(ForeignException.class));
// ----------------------------
// start running the operation
// ----------------------------
Procedure task = coordinator.startProcedure(coordinatorTaskErrorMonitor, opName, data, expected);
assertEquals("Didn't mock coordinator task", coordinatorTask, task);
// wait for the task to complete
try {
task.waitForCompleted();
} catch (ForeignException fe) {
// this may get caught or may not
}
// -------------
// verification
// -------------
waitAndVerifyProc(coordinatorTask, once, never(), once, once, true);
verifyCohortSuccessful(expected, subprocFactory, cohortTasks, once, never(), once,
once, true);
// close all the open things
closeAll(coordinator, coordinatorController, members);
}
/**
* Wait for the coordinator task to complete, and verify all the mocks
* @param task to wait on
* @throws Exception on unexpected failure
*/
private void waitAndVerifyProc(Procedure proc, VerificationMode prepare,
VerificationMode commit, VerificationMode cleanup, VerificationMode finish, boolean opHasError)
throws Exception {
boolean caughtError = false;
try {
proc.waitForCompleted();
} catch (ForeignException fe) {
caughtError = true;
}
// make sure that the task called all the expected phases
Mockito.verify(proc, prepare).sendGlobalBarrierStart();
Mockito.verify(proc, commit).sendGlobalBarrierReached();
Mockito.verify(proc, finish).sendGlobalBarrierComplete();
assertEquals("Operation error state was unexpected", opHasError, proc.getErrorMonitor()
.hasException());
assertEquals("Operation error state was unexpected", opHasError, caughtError);
}
/**
* Wait for the coordinator task to complete, and verify all the mocks
* @param task to wait on
* @throws Exception on unexpected failure
*/
private void waitAndVerifySubproc(Subprocedure op, VerificationMode prepare,
VerificationMode commit, VerificationMode cleanup, VerificationMode finish, boolean opHasError)
throws Exception {
boolean caughtError = false;
try {
op.waitForLocallyCompleted();
} catch (ForeignException fe) {
caughtError = true;
}
// make sure that the task called all the expected phases
Mockito.verify(op, prepare).acquireBarrier();
Mockito.verify(op, commit).insideBarrier();
// We cannot guarantee that cleanup has run so we don't check it.
assertEquals("Operation error state was unexpected", opHasError, op.getErrorCheckable()
.hasException());
assertEquals("Operation error state was unexpected", opHasError, caughtError);
}
private void verifyCohortSuccessful(List<String> cohortNames,
SubprocedureFactory subprocFactory, Iterable<Subprocedure> cohortTasks,
VerificationMode prepare, VerificationMode commit, VerificationMode cleanup,
VerificationMode finish, boolean opHasError) throws Exception {
// make sure we build the correct number of cohort members
Mockito.verify(subprocFactory, Mockito.times(cohortNames.size())).buildSubprocedure(
Mockito.eq(opName), (byte[]) Mockito.argThat(new ArrayEquals(data)));
// verify that we ran each of the operations cleanly
int j = 0;
for (Subprocedure op : cohortTasks) {
LOG.debug("Checking mock:" + (j++));
waitAndVerifySubproc(op, prepare, commit, cleanup, finish, opHasError);
}
}
private void closeAll(
ProcedureCoordinator coordinator,
ZKProcedureCoordinatorRpcs coordinatorController,
List<Pair<ProcedureMember, ZKProcedureMemberRpcs>> cohort)
throws IOException {
// make sure we close all the resources
for (Pair<ProcedureMember, ZKProcedureMemberRpcs> member : cohort) {
member.getFirst().close();
member.getSecond().close();
}
coordinator.close();
coordinatorController.close();
}
}

View File

@ -0,0 +1,429 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.procedure;
import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.MediumTests;
import org.apache.hadoop.hbase.errorhandling.ForeignException;
import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.mockito.verification.VerificationMode;
import com.google.common.collect.Lists;
/**
* Test zookeeper-based, procedure controllers
*/
@Category(MediumTests.class)
public class TestZKProcedureControllers {
static final Log LOG = LogFactory.getLog(TestZKProcedureControllers.class);
private final static HBaseTestingUtility UTIL = new HBaseTestingUtility();
private static final String COHORT_NODE_NAME = "expected";
private static final String CONTROLLER_NODE_NAME = "controller";
private static final VerificationMode once = Mockito.times(1);
@BeforeClass
public static void setupTest() throws Exception {
UTIL.startMiniZKCluster();
}
@AfterClass
public static void cleanupTest() throws Exception {
UTIL.shutdownMiniZKCluster();
}
/**
* Smaller test to just test the actuation on the cohort member
* @throws Exception on failure
*/
@Test(timeout = 15000)
public void testSimpleZKCohortMemberController() throws Exception {
ZooKeeperWatcher watcher = UTIL.getZooKeeperWatcher();
final String operationName = "instanceTest";
final Subprocedure sub = Mockito.mock(Subprocedure.class);
Mockito.when(sub.getName()).thenReturn(operationName);
final byte[] data = new byte[] { 1, 2, 3 };
final CountDownLatch prepared = new CountDownLatch(1);
final CountDownLatch committed = new CountDownLatch(1);
final ForeignExceptionDispatcher monitor = spy(new ForeignExceptionDispatcher());
final ZKProcedureMemberRpcs controller = new ZKProcedureMemberRpcs(
watcher, "testSimple", COHORT_NODE_NAME);
// mock out cohort member callbacks
final ProcedureMember member = Mockito
.mock(ProcedureMember.class);
Mockito.doReturn(sub).when(member).createSubprocedure(operationName, data);
Mockito.doAnswer(new Answer<Void>() {
@Override
public Void answer(InvocationOnMock invocation) throws Throwable {
controller.sendMemberAcquired(sub);
prepared.countDown();
return null;
}
}).when(member).submitSubprocedure(sub);
Mockito.doAnswer(new Answer<Void>() {
@Override
public Void answer(InvocationOnMock invocation) throws Throwable {
controller.sendMemberCompleted(sub);
committed.countDown();
return null;
}
}).when(member).receivedReachedGlobalBarrier(operationName);
// start running the listener
controller.start(member);
// set a prepare node from a 'coordinator'
String prepare = ZKProcedureUtil.getAcquireBarrierNode(controller.getZkController(), operationName);
ZKUtil.createSetData(watcher, prepare, ProtobufUtil.prependPBMagic(data));
// wait for the operation to be prepared
prepared.await();
// create the commit node so we update the operation to enter the commit phase
String commit = ZKProcedureUtil.getReachedBarrierNode(controller.getZkController(), operationName);
LOG.debug("Found prepared, posting commit node:" + commit);
ZKUtil.createAndFailSilent(watcher, commit);
LOG.debug("Commit node:" + commit + ", exists:" + ZKUtil.checkExists(watcher, commit));
committed.await();
verify(monitor, never()).receive(Mockito.any(ForeignException.class));
// XXX: broken due to composition.
// verify(member, never()).getManager().controllerConnectionFailure(Mockito.anyString(),
// Mockito.any(IOException.class));
// cleanup after the test
ZKUtil.deleteNodeRecursively(watcher, controller.getZkController().getBaseZnode());
assertEquals("Didn't delete prepare node", -1, ZKUtil.checkExists(watcher, prepare));
assertEquals("Didn't delete commit node", -1, ZKUtil.checkExists(watcher, commit));
}
@Test(timeout = 15000)
public void testZKCoordinatorControllerWithNoCohort() throws Exception {
final String operationName = "no cohort controller test";
final byte[] data = new byte[] { 1, 2, 3 };
runMockCommitWithOrchestratedControllers(startCoordinatorFirst, operationName, data);
runMockCommitWithOrchestratedControllers(startCohortFirst, operationName, data);
}
@Test(timeout = 15000)
public void testZKCoordinatorControllerWithSingleMemberCohort() throws Exception {
final String operationName = "single member controller test";
final byte[] data = new byte[] { 1, 2, 3 };
runMockCommitWithOrchestratedControllers(startCoordinatorFirst, operationName, data, "cohort");
runMockCommitWithOrchestratedControllers(startCohortFirst, operationName, data, "cohort");
}
@Test(timeout = 15000)
public void testZKCoordinatorControllerMultipleCohort() throws Exception {
final String operationName = "multi member controller test";
final byte[] data = new byte[] { 1, 2, 3 };
runMockCommitWithOrchestratedControllers(startCoordinatorFirst, operationName, data, "cohort",
"cohort2", "cohort3");
runMockCommitWithOrchestratedControllers(startCohortFirst, operationName, data, "cohort",
"cohort2", "cohort3");
}
private void runMockCommitWithOrchestratedControllers(StartControllers controllers,
String operationName, byte[] data, String... cohort) throws Exception {
ZooKeeperWatcher watcher = UTIL.getZooKeeperWatcher();
List<String> expected = Lists.newArrayList(cohort);
final Subprocedure sub = Mockito.mock(Subprocedure.class);
Mockito.when(sub.getName()).thenReturn(operationName);
CountDownLatch prepared = new CountDownLatch(expected.size());
CountDownLatch committed = new CountDownLatch(expected.size());
// mock out coordinator so we can keep track of zk progress
ProcedureCoordinator coordinator = setupMockCoordinator(operationName,
prepared, committed);
ProcedureMember member = Mockito.mock(ProcedureMember.class);
Pair<ZKProcedureCoordinatorRpcs, List<ZKProcedureMemberRpcs>> pair = controllers
.start(watcher, operationName, coordinator, CONTROLLER_NODE_NAME, member, expected);
ZKProcedureCoordinatorRpcs controller = pair.getFirst();
List<ZKProcedureMemberRpcs> cohortControllers = pair.getSecond();
// start the operation
Procedure p = Mockito.mock(Procedure.class);
Mockito.when(p.getName()).thenReturn(operationName);
controller.sendGlobalBarrierAcquire(p, data, expected);
// post the prepare node for each expected node
for (ZKProcedureMemberRpcs cc : cohortControllers) {
cc.sendMemberAcquired(sub);
}
// wait for all the notifications to reach the coordinator
prepared.await();
// make sure we got the all the nodes and no more
Mockito.verify(coordinator, times(expected.size())).memberAcquiredBarrier(Mockito.eq(operationName),
Mockito.anyString());
// kick off the commit phase
controller.sendGlobalBarrierReached(p, expected);
// post the committed node for each expected node
for (ZKProcedureMemberRpcs cc : cohortControllers) {
cc.sendMemberCompleted(sub);
}
// wait for all commit notifications to reach the coordinator
committed.await();
// make sure we got the all the nodes and no more
Mockito.verify(coordinator, times(expected.size())).memberFinishedBarrier(Mockito.eq(operationName),
Mockito.anyString());
controller.resetMembers(p);
// verify all behavior
verifyZooKeeperClean(operationName, watcher, controller.getZkProcedureUtil());
verifyCohort(member, cohortControllers.size(), operationName, data);
verifyCoordinator(operationName, coordinator, expected);
}
// TODO Broken by composition.
// @Test
// public void testCoordinatorControllerHandlesEarlyPrepareNodes() throws Exception {
// runEarlyPrepareNodes(startCoordinatorFirst, "testEarlyPreparenodes", new byte[] { 1, 2, 3 },
// "cohort1", "cohort2");
// runEarlyPrepareNodes(startCohortFirst, "testEarlyPreparenodes", new byte[] { 1, 2, 3 },
// "cohort1", "cohort2");
// }
public void runEarlyPrepareNodes(StartControllers controllers, String operationName, byte[] data,
String... cohort) throws Exception {
ZooKeeperWatcher watcher = UTIL.getZooKeeperWatcher();
List<String> expected = Lists.newArrayList(cohort);
final Subprocedure sub = Mockito.mock(Subprocedure.class);
Mockito.when(sub.getName()).thenReturn(operationName);
final CountDownLatch prepared = new CountDownLatch(expected.size());
final CountDownLatch committed = new CountDownLatch(expected.size());
// mock out coordinator so we can keep track of zk progress
ProcedureCoordinator coordinator = setupMockCoordinator(operationName,
prepared, committed);
ProcedureMember member = Mockito.mock(ProcedureMember.class);
Procedure p = Mockito.mock(Procedure.class);
Mockito.when(p.getName()).thenReturn(operationName);
Pair<ZKProcedureCoordinatorRpcs, List<ZKProcedureMemberRpcs>> pair = controllers
.start(watcher, operationName, coordinator, CONTROLLER_NODE_NAME, member, expected);
ZKProcedureCoordinatorRpcs controller = pair.getFirst();
List<ZKProcedureMemberRpcs> cohortControllers = pair.getSecond();
// post 1/2 the prepare nodes early
for (int i = 0; i < cohortControllers.size() / 2; i++) {
cohortControllers.get(i).sendMemberAcquired(sub);
}
// start the operation
controller.sendGlobalBarrierAcquire(p, data, expected);
// post the prepare node for each expected node
for (ZKProcedureMemberRpcs cc : cohortControllers) {
cc.sendMemberAcquired(sub);
}
// wait for all the notifications to reach the coordinator
prepared.await();
// make sure we got the all the nodes and no more
Mockito.verify(coordinator, times(expected.size())).memberAcquiredBarrier(Mockito.eq(operationName),
Mockito.anyString());
// kick off the commit phase
controller.sendGlobalBarrierReached(p, expected);
// post the committed node for each expected node
for (ZKProcedureMemberRpcs cc : cohortControllers) {
cc.sendMemberCompleted(sub);
}
// wait for all commit notifications to reach the coordiantor
committed.await();
// make sure we got the all the nodes and no more
Mockito.verify(coordinator, times(expected.size())).memberFinishedBarrier(Mockito.eq(operationName),
Mockito.anyString());
controller.resetMembers(p);
// verify all behavior
verifyZooKeeperClean(operationName, watcher, controller.getZkProcedureUtil());
verifyCohort(member, cohortControllers.size(), operationName, data);
verifyCoordinator(operationName, coordinator, expected);
}
/**
* @return a mock {@link ProcedureCoordinator} that just counts down the
* prepared and committed latch for called to the respective method
*/
private ProcedureCoordinator setupMockCoordinator(String operationName,
final CountDownLatch prepared, final CountDownLatch committed) {
ProcedureCoordinator coordinator = Mockito
.mock(ProcedureCoordinator.class);
Mockito.mock(ProcedureCoordinator.class);
Mockito.doAnswer(new Answer<Void>() {
@Override
public Void answer(InvocationOnMock invocation) throws Throwable {
prepared.countDown();
return null;
}
}).when(coordinator).memberAcquiredBarrier(Mockito.eq(operationName), Mockito.anyString());
Mockito.doAnswer(new Answer<Void>() {
@Override
public Void answer(InvocationOnMock invocation) throws Throwable {
committed.countDown();
return null;
}
}).when(coordinator).memberFinishedBarrier(Mockito.eq(operationName), Mockito.anyString());
return coordinator;
}
/**
* Verify that the prepare, commit and abort nodes for the operation are removed from zookeeper
*/
private void verifyZooKeeperClean(String operationName, ZooKeeperWatcher watcher,
ZKProcedureUtil controller) throws Exception {
String prepare = ZKProcedureUtil.getAcquireBarrierNode(controller, operationName);
String commit = ZKProcedureUtil.getReachedBarrierNode(controller, operationName);
String abort = ZKProcedureUtil.getAbortNode(controller, operationName);
assertEquals("Didn't delete prepare node", -1, ZKUtil.checkExists(watcher, prepare));
assertEquals("Didn't delete commit node", -1, ZKUtil.checkExists(watcher, commit));
assertEquals("Didn't delete abort node", -1, ZKUtil.checkExists(watcher, abort));
}
/**
* Verify the cohort controller got called once per expected node to start the operation
*/
private void verifyCohort(ProcedureMember member, int cohortSize,
String operationName, byte[] data) {
// verify(member, Mockito.times(cohortSize)).submitSubprocedure(Mockito.eq(operationName),
// (byte[]) Mockito.argThat(new ArrayEquals(data)));
verify(member, Mockito.times(cohortSize)).submitSubprocedure(Mockito.any(Subprocedure.class));
}
/**
* Verify that the coordinator only got called once for each expected node
*/
private void verifyCoordinator(String operationName,
ProcedureCoordinator coordinator, List<String> expected) {
// verify that we got all the expected nodes
for (String node : expected) {
verify(coordinator, once).memberAcquiredBarrier(operationName, node);
verify(coordinator, once).memberFinishedBarrier(operationName, node);
}
}
/**
* Specify how the controllers that should be started (not spy/mockable) for the test.
*/
private abstract class StartControllers {
public abstract Pair<ZKProcedureCoordinatorRpcs, List<ZKProcedureMemberRpcs>> start(
ZooKeeperWatcher watcher, String operationName,
ProcedureCoordinator coordinator, String controllerName,
ProcedureMember member, List<String> cohortNames) throws Exception;
}
private final StartControllers startCoordinatorFirst = new StartControllers() {
@Override
public Pair<ZKProcedureCoordinatorRpcs, List<ZKProcedureMemberRpcs>> start(
ZooKeeperWatcher watcher, String operationName,
ProcedureCoordinator coordinator, String controllerName,
ProcedureMember member, List<String> expected) throws Exception {
// start the controller
ZKProcedureCoordinatorRpcs controller = new ZKProcedureCoordinatorRpcs(
watcher, operationName, CONTROLLER_NODE_NAME);
controller.start(coordinator);
// make a cohort controller for each expected node
List<ZKProcedureMemberRpcs> cohortControllers = new ArrayList<ZKProcedureMemberRpcs>();
for (String nodeName : expected) {
ZKProcedureMemberRpcs cc = new ZKProcedureMemberRpcs(
watcher, operationName, nodeName);
cc.start(member);
cohortControllers.add(cc);
}
return new Pair<ZKProcedureCoordinatorRpcs, List<ZKProcedureMemberRpcs>>(
controller, cohortControllers);
}
};
/**
* Check for the possible race condition where a cohort member starts after the controller and
* therefore could miss a new operation
*/
private final StartControllers startCohortFirst = new StartControllers() {
@Override
public Pair<ZKProcedureCoordinatorRpcs, List<ZKProcedureMemberRpcs>> start(
ZooKeeperWatcher watcher, String operationName,
ProcedureCoordinator coordinator, String controllerName,
ProcedureMember member, List<String> expected) throws Exception {
// make a cohort controller for each expected node
List<ZKProcedureMemberRpcs> cohortControllers = new ArrayList<ZKProcedureMemberRpcs>();
for (String nodeName : expected) {
ZKProcedureMemberRpcs cc = new ZKProcedureMemberRpcs(
watcher, operationName, nodeName);
cc.start(member);
cohortControllers.add(cc);
}
// start the controller
ZKProcedureCoordinatorRpcs controller = new ZKProcedureCoordinatorRpcs(
watcher, operationName, CONTROLLER_NODE_NAME);
controller.start(coordinator);
return new Pair<ZKProcedureCoordinatorRpcs, List<ZKProcedureMemberRpcs>>(
controller, cohortControllers);
}
};
}