HBASE-9426 Make custom distributed barrier procedure pluggable (Richard Ding)

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1560234 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Jonathan Hsieh 2014-01-22 01:42:49 +00:00
parent c7a72479b6
commit c079ba4660
22 changed files with 5170 additions and 219 deletions

View File

@ -24,8 +24,11 @@ import java.io.InterruptedIOException;
import java.net.SocketTimeoutException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Pattern;
@ -88,6 +91,8 @@ import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ClientService;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanResponse;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameStringPair;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ProcedureDescription;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.TableSchema;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.AddColumnRequest;
@ -127,6 +132,10 @@ import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.SnapshotRequest;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.SnapshotResponse;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.StopMasterRequest;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.UnassignRegionRequest;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ExecProcedureRequest;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ExecProcedureResponse;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsProcedureDoneRequest;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsProcedureDoneResponse;
import org.apache.hadoop.hbase.snapshot.ClientSnapshotDescriptionUtils;
import org.apache.hadoop.hbase.util.Addressing;
import org.apache.hadoop.hbase.util.Bytes;
@ -2991,6 +3000,103 @@ public class HBaseAdmin implements Abortable, Closeable {
waitUntilTableIsEnabled(tableName);
}
/**
* Execute a distributed procedure on a cluster.
*
* @param signature A distributed procedure is uniquely identified
* by its signature (default the root ZK node name of the procedure).
* @param instance The instance name of the procedure. For some procedures, this parameter is
* optional.
* @param props Property/Value pairs of properties passing to the procedure
*/
public void execProcedure(String signature, String instance,
Map<String, String> props) throws IOException {
ProcedureDescription.Builder builder = ProcedureDescription.newBuilder();
builder.setSignature(signature).setInstance(instance);
for (String key : props.keySet()) {
NameStringPair pair = NameStringPair.newBuilder().setName(key)
.setValue(props.get(key)).build();
builder.addConfiguration(pair);
}
final ExecProcedureRequest request = ExecProcedureRequest.newBuilder()
.setProcedure(builder.build()).build();
// run the procedure on the master
ExecProcedureResponse response = executeCallable(new MasterCallable<ExecProcedureResponse>(
getConnection()) {
@Override
public ExecProcedureResponse call() throws ServiceException {
return master.execProcedure(null, request);
}
});
long start = EnvironmentEdgeManager.currentTimeMillis();
long max = response.getExpectedTimeout();
long maxPauseTime = max / this.numRetries;
int tries = 0;
LOG.debug("Waiting a max of " + max + " ms for procedure '" +
signature + " : " + instance + "'' to complete. (max " + maxPauseTime + " ms per retry)");
boolean done = false;
while (tries == 0
|| ((EnvironmentEdgeManager.currentTimeMillis() - start) < max && !done)) {
try {
// sleep a backoff <= pauseTime amount
long sleep = getPauseTime(tries++);
sleep = sleep > maxPauseTime ? maxPauseTime : sleep;
LOG.debug("(#" + tries + ") Sleeping: " + sleep +
"ms while waiting for procedure completion.");
Thread.sleep(sleep);
} catch (InterruptedException e) {
LOG.debug("Interrupted while waiting for procedure " + signature + " to complete");
Thread.currentThread().interrupt();
}
LOG.debug("Getting current status of procedure from master...");
done = isProcedureFinished(signature, instance, props);
}
if (!done) {
throw new IOException("Procedure '" + signature + " : " + instance
+ "' wasn't completed in expectedTime:" + max + " ms");
}
}
/**
* Check the current state of the specified procedure.
* <p>
* There are three possible states:
* <ol>
* <li>running - returns <tt>false</tt></li>
* <li>finished - returns <tt>true</tt></li>
* <li>finished with error - throws the exception that caused the procedure to fail</li>
* </ol>
* <p>
*
* @param signature The signature that uniquely identifies a procedure
* @param instance The instance name of the procedure
* @param props Property/Value pairs of properties passing to the procedure
* @return true if the specified procedure is finished successfully, false if it is still running
* @throws IOException if the specified procedure finished with error
*/
public boolean isProcedureFinished(String signature, String instance, Map<String, String> props)
throws IOException {
final ProcedureDescription.Builder builder = ProcedureDescription.newBuilder();
builder.setSignature(signature).setInstance(instance);
for (String key : props.keySet()) {
NameStringPair pair = NameStringPair.newBuilder().setName(key)
.setValue(props.get(key)).build();
builder.addConfiguration(pair);
}
final ProcedureDescription desc = builder.build();
return executeCallable(
new MasterCallable<IsProcedureDoneResponse>(getConnection()) {
@Override
public IsProcedureDoneResponse call() throws ServiceException {
return master.isProcedureDone(null, IsProcedureDoneRequest
.newBuilder().setProcedure(desc).build());
}
}).getDone();
}
/**
* Execute Restore/Clone snapshot and wait for the server to complete (blocking).
* To check if the cloned table exists, use {@link #isTableAvailable} -- it is not safe to

View File

@ -2048,6 +2048,19 @@ public class HConnectionManager {
return stub.isRestoreSnapshotDone(controller, request);
}
@Override
public ExecProcedureResponse execProcedure(
RpcController controller, ExecProcedureRequest request)
throws ServiceException {
return stub.execProcedure(controller, request);
}
@Override
public IsProcedureDoneResponse isProcedureDone(RpcController controller,
IsProcedureDoneRequest request) throws ServiceException {
return stub.isProcedureDone(controller, request);
}
@Override
public IsMasterRunningResponse isMasterRunning(
RpcController controller, IsMasterRunningRequest request)

View File

@ -1129,4 +1129,25 @@ possible configurations would overwhelm and obscure the important.
section of the HBase online manual.
</description>
</property>
<property>
<name>hbase.procedure.regionserver.classes</name>
<value></value>
<description>A comma-separated list of
org.apache.hadoop.hbase.procedure.RegionServerProcedureManager procedure managers that are
loaded by default on the active HRegionServer process. The lifecycle methods (init/start/stop)
will be called by the active HRegionServer process to perform the specific globally barriered
procedure. After implementing your own RegionServerProcedureManager, just put it in
HBase's classpath and add the fully qualified class name here.
</description>
</property>
<property>
<name>hbase.procedure.master.classes</name>
<value></value>
<description>A comma-separated list of
org.apache.hadoop.hbase.procedure.MasterProcedureManager procedure managers that are
loaded by default on the active HMaster process. A procedure is identified by its signature and
users can use the signature and an instant name to trigger an execution of a globally barriered
procedure. After implementing your own MasterProcedureManager, just put it in HBase's classpath
and add the fully qualified class name here.</description>
</property>
</configuration>

View File

@ -164,6 +164,16 @@ message SnapshotDescription {
optional int32 version = 5;
}
/**
* Description of the distributed procedure to take
*/
message ProcedureDescription {
required string signature = 1; // the unique signature of the procedure
optional string instance = 2; // the procedure instance name
optional int64 creation_time = 3 [default = 0];
repeated NameStringPair configuration = 4;
}
message EmptyMsg {
}

View File

@ -332,6 +332,23 @@ message IsMasterRunningResponse {
required bool is_master_running = 1;
}
message ExecProcedureRequest {
required ProcedureDescription procedure = 1;
}
message ExecProcedureResponse {
required int64 expected_timeout = 1;
}
message IsProcedureDoneRequest {
optional ProcedureDescription procedure = 1;
}
message IsProcedureDoneResponse {
optional bool done = 1 [default = false];
optional ProcedureDescription snapshot = 2;
}
service MasterService {
/** Used by the client to get the number of regions that have received the updated schema */
rpc GetSchemaAlterStatus(GetSchemaAlterStatusRequest)
@ -490,6 +507,19 @@ service MasterService {
*/
rpc IsRestoreSnapshotDone(IsRestoreSnapshotDoneRequest) returns(IsRestoreSnapshotDoneResponse);
/**
* Execute a distributed procedure.
*/
rpc ExecProcedure(ExecProcedureRequest) returns(ExecProcedureResponse);
/**
* Determine if the procedure is done yet.
*/
rpc IsProcedureDone(IsProcedureDoneRequest) returns(IsProcedureDoneResponse);
/** return true if master is available */
/** rpc IsMasterRunning(IsMasterRunningRequest) returns(IsMasterRunningResponse); */
/** Modify a namespace's metadata */
rpc ModifyNamespace(ModifyNamespaceRequest)
returns(ModifyNamespaceResponse);

View File

@ -106,6 +106,8 @@ import org.apache.hadoop.hbase.master.snapshot.SnapshotManager;
import org.apache.hadoop.hbase.monitoring.MemoryBoundedLogMessageBuffer;
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
import org.apache.hadoop.hbase.monitoring.TaskMonitor;
import org.apache.hadoop.hbase.procedure.MasterProcedureManager;
import org.apache.hadoop.hbase.procedure.MasterProcedureManagerHost;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.RequestConverter;
import org.apache.hadoop.hbase.protobuf.ResponseConverter;
@ -116,6 +118,7 @@ import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameStringPair;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionServerInfo;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ProcedureDescription;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.AddColumnRequest;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.AddColumnResponse;
@ -193,6 +196,10 @@ import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.StopMasterRequest
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.StopMasterResponse;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.UnassignRegionRequest;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.UnassignRegionResponse;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ExecProcedureRequest;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ExecProcedureResponse;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsProcedureDoneRequest;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsProcedureDoneResponse;
import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos;
import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.GetLastFlushedSequenceIdRequest;
import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.GetLastFlushedSequenceIdResponse;
@ -378,6 +385,8 @@ MasterServices, Server {
// monitor for snapshot of hbase tables
private SnapshotManager snapshotManager;
// monitor for distributed procedures
private MasterProcedureManagerHost mpmHost;
/** The health check chore. */
private HealthCheckChore healthCheckChore;
@ -635,7 +644,7 @@ MasterServices, Server {
if (this.serverManager != null) this.serverManager.stop();
if (this.assignmentManager != null) this.assignmentManager.stop();
if (this.fileSystemManager != null) this.fileSystemManager.stop();
if (this.snapshotManager != null) this.snapshotManager.stop("server shutting down.");
if (this.mpmHost != null) this.mpmHost.stop("server shutting down.");
this.zooKeeper.close();
}
LOG.info("HMaster main thread exiting");
@ -700,8 +709,12 @@ MasterServices, Server {
Long.toHexString(this.zooKeeper.getRecoverableZooKeeper().getSessionId()) +
", setting cluster-up flag (Was=" + wasUp + ")");
// create the snapshot manager
this.snapshotManager = new SnapshotManager(this, this.metricsMaster);
// create/initialize the snapshot manager and other procedure managers
this.snapshotManager = new SnapshotManager();
this.mpmHost = new MasterProcedureManagerHost();
this.mpmHost.register(this.snapshotManager);
this.mpmHost.loadProcedures(conf);
this.mpmHost.initialize(this, this.metricsMaster);
}
/**
@ -2164,7 +2177,7 @@ MasterServices, Server {
}
return info.getInfoPort();
}
/**
* @return array of coprocessor SimpleNames.
*/
@ -2949,6 +2962,68 @@ MasterServices, Server {
}
}
/**
* Triggers an asynchronous attempt to run a distributed procedure.
* {@inheritDoc}
*/
@Override
public ExecProcedureResponse execProcedure(RpcController controller,
ExecProcedureRequest request) throws ServiceException {
ProcedureDescription desc = request.getProcedure();
MasterProcedureManager mpm = this.mpmHost.getProcedureManager(desc
.getSignature());
if (mpm == null) {
throw new ServiceException("The procedure is not registered: "
+ desc.getSignature());
}
LOG.info(getClientIdAuditPrefix() + " procedure request for: "
+ desc.getSignature());
try {
mpm.execProcedure(desc);
} catch (IOException e) {
throw new ServiceException(e);
}
// send back the max amount of time the client should wait for the procedure
// to complete
long waitTime = SnapshotDescriptionUtils.DEFAULT_MAX_WAIT_TIME;
return ExecProcedureResponse.newBuilder().setExpectedTimeout(waitTime)
.build();
}
/**
* Checks if the specified procedure is done.
* @return true if the procedure is done,
* false if the procedure is in the process of completing
* @throws ServiceException if invalid procedure, or
* a failed procedure with progress failure reason.
*/
@Override
public IsProcedureDoneResponse isProcedureDone(RpcController controller,
IsProcedureDoneRequest request) throws ServiceException {
ProcedureDescription desc = request.getProcedure();
MasterProcedureManager mpm = this.mpmHost.getProcedureManager(desc
.getSignature());
if (mpm == null) {
throw new ServiceException("The procedure is not registered: "
+ desc.getSignature());
}
LOG.debug("Checking to see if procedure from request:"
+ desc.getSignature() + " is done");
try {
IsProcedureDoneResponse.Builder builder = IsProcedureDoneResponse
.newBuilder();
boolean done = mpm.isProcedureDone(desc);
builder.setDone(done);
return builder.build();
} catch (IOException e) {
throw new ServiceException(e);
}
}
@Override
public ModifyNamespaceResponse modifyNamespace(RpcController controller,
ModifyNamespaceRequest request) throws ServiceException {

View File

@ -53,10 +53,13 @@ import org.apache.hadoop.hbase.master.MetricsMaster;
import org.apache.hadoop.hbase.master.SnapshotSentinel;
import org.apache.hadoop.hbase.master.cleaner.HFileCleaner;
import org.apache.hadoop.hbase.master.cleaner.HFileLinkCleaner;
import org.apache.hadoop.hbase.procedure.MasterProcedureManager;
import org.apache.hadoop.hbase.procedure.Procedure;
import org.apache.hadoop.hbase.procedure.ProcedureCoordinator;
import org.apache.hadoop.hbase.procedure.ProcedureCoordinatorRpcs;
import org.apache.hadoop.hbase.procedure.ZKProcedureCoordinatorRpcs;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameStringPair;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ProcedureDescription;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription.Type;
import org.apache.hadoop.hbase.snapshot.ClientSnapshotDescriptionUtils;
@ -86,7 +89,7 @@ import org.apache.zookeeper.KeeperException;
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
public class SnapshotManager implements Stoppable {
public class SnapshotManager extends MasterProcedureManager implements Stoppable {
private static final Log LOG = LogFactory.getLog(SnapshotManager.class);
/** By default, check to see if the snapshot is complete every WAKE MILLIS (ms) */
@ -133,9 +136,9 @@ public class SnapshotManager implements Stoppable {
private static final int SNAPSHOT_POOL_THREADS_DEFAULT = 1;
private boolean stopped;
private final MasterServices master; // Needed by TableEventHandlers
private final MetricsMaster metricsMaster;
private final ProcedureCoordinator coordinator;
private MasterServices master; // Needed by TableEventHandlers
private MetricsMaster metricsMaster;
private ProcedureCoordinator coordinator;
// Is snapshot feature enabled?
private boolean isSnapshotSupported = false;
@ -154,37 +157,10 @@ public class SnapshotManager implements Stoppable {
private Map<TableName, SnapshotSentinel> restoreHandlers =
new HashMap<TableName, SnapshotSentinel>();
private final Path rootDir;
private final ExecutorService executorService;
private Path rootDir;
private ExecutorService executorService;
/**
* Construct a snapshot manager.
* @param master
*/
public SnapshotManager(final MasterServices master, final MetricsMaster metricsMaster)
throws KeeperException, IOException, UnsupportedOperationException {
this.master = master;
this.metricsMaster = metricsMaster;
this.rootDir = master.getMasterFileSystem().getRootDir();
checkSnapshotSupport(master.getConfiguration(), master.getMasterFileSystem());
// get the configuration for the coordinator
Configuration conf = master.getConfiguration();
long wakeFrequency = conf.getInt(SNAPSHOT_WAKE_MILLIS_KEY, SNAPSHOT_WAKE_MILLIS_DEFAULT);
long timeoutMillis = conf.getLong(SNAPSHOT_TIMEOUT_MILLIS_KEY, SNAPSHOT_TIMEOUT_MILLIS_DEFAULT);
int opThreads = conf.getInt(SNAPSHOT_POOL_THREADS_KEY, SNAPSHOT_POOL_THREADS_DEFAULT);
// setup the default procedure coordinator
String name = master.getServerName().toString();
ThreadPoolExecutor tpool = ProcedureCoordinator.defaultPool(name, opThreads);
ProcedureCoordinatorRpcs comms = new ZKProcedureCoordinatorRpcs(
master.getZooKeeper(), SnapshotManager.ONLINE_SNAPSHOT_CONTROLLER_DESCRIPTION, name);
this.coordinator = new ProcedureCoordinator(comms, tpool, timeoutMillis, wakeFrequency);
this.executorService = master.getExecutorService();
resetTempDir();
}
public SnapshotManager() {}
/**
* Fully specify all necessary components of a snapshot manager. Exposed for testing.
@ -1024,4 +1000,69 @@ public class SnapshotManager implements Stoppable {
}
}
}
@Override
public void initialize(MasterServices master, MetricsMaster metricsMaster) throws KeeperException,
IOException, UnsupportedOperationException {
this.master = master;
this.metricsMaster = metricsMaster;
this.rootDir = master.getMasterFileSystem().getRootDir();
checkSnapshotSupport(master.getConfiguration(), master.getMasterFileSystem());
// get the configuration for the coordinator
Configuration conf = master.getConfiguration();
long wakeFrequency = conf.getInt(SNAPSHOT_WAKE_MILLIS_KEY, SNAPSHOT_WAKE_MILLIS_DEFAULT);
long timeoutMillis = conf.getLong(SNAPSHOT_TIMEOUT_MILLIS_KEY, SNAPSHOT_TIMEOUT_MILLIS_DEFAULT);
int opThreads = conf.getInt(SNAPSHOT_POOL_THREADS_KEY, SNAPSHOT_POOL_THREADS_DEFAULT);
// setup the default procedure coordinator
String name = master.getServerName().toString();
ThreadPoolExecutor tpool = ProcedureCoordinator.defaultPool(name, opThreads);
ProcedureCoordinatorRpcs comms = new ZKProcedureCoordinatorRpcs(
master.getZooKeeper(), SnapshotManager.ONLINE_SNAPSHOT_CONTROLLER_DESCRIPTION, name);
this.coordinator = new ProcedureCoordinator(comms, tpool, timeoutMillis, wakeFrequency);
this.executorService = master.getExecutorService();
resetTempDir();
}
@Override
public String getProcedureSignature() {
return ONLINE_SNAPSHOT_CONTROLLER_DESCRIPTION;
}
@Override
public void execProcedure(ProcedureDescription desc) throws IOException {
takeSnapshot(toSnapshotDescription(desc));
}
@Override
public boolean isProcedureDone(ProcedureDescription desc) throws IOException {
return isSnapshotDone(toSnapshotDescription(desc));
}
private SnapshotDescription toSnapshotDescription(ProcedureDescription desc)
throws IOException {
SnapshotDescription.Builder builder = SnapshotDescription.newBuilder();
if (!desc.hasInstance()) {
throw new IOException("Snapshot name is not defined: " + desc.toString());
}
String snapshotName = desc.getInstance();
List<NameStringPair> props = desc.getConfigurationList();
String table = null;
for (NameStringPair prop : props) {
if ("table".equalsIgnoreCase(prop.getName())) {
table = prop.getValue();
}
}
if (table == null) {
throw new IOException("Snapshot table is not defined: " + desc.toString());
}
TableName tableName = TableName.valueOf(table);
builder.setTable(tableName.getNameAsString());
builder.setName(snapshotName);
builder.setType(SnapshotDescription.Type.FLUSH);
return builder.build();
}
}

View File

@ -192,6 +192,8 @@ public abstract class TakeSnapshotHandler extends EventHandler implements Snapsh
completeSnapshot(this.snapshotDir, this.workingDir, this.fs);
status.markComplete("Snapshot " + snapshot.getName() + " of table " + snapshotTable
+ " completed");
LOG.info("Snapshot " + snapshot.getName() + " of table " + snapshotTable
+ " completed");
metricsSnapshot.addSnapshot(status.getCompletionTimestamp() - status.getStartTime());
} catch (Exception e) {
status.abort("Failed to complete snapshot " + snapshot.getName() + " on table " +

View File

@ -0,0 +1,87 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.procedure;
import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hbase.Stoppable;
import org.apache.hadoop.hbase.master.MasterServices;
import org.apache.hadoop.hbase.master.MetricsMaster;
import org.apache.hadoop.hbase.master.HMaster;
import org.apache.hadoop.hbase.master.snapshot.SnapshotManager;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.snapshot.RegionServerSnapshotManager;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ProcedureDescription;
import org.apache.zookeeper.KeeperException;
/**
* A life-cycle management interface for globally barriered procedures on master.
* See the following doc on details of globally barriered procedure:
* https://issues.apache.org/jira/secure/attachment/12555103/121127-global-barrier-proc.pdf
*
* To implement a custom globally barriered procedure, user needs to extend two classes:
* {@link MasterProcedureManager} and {@link RegionServerProcedureManager}. Implementation of
* {@link MasterProcedureManager} is loaded into {@link HMaster} process via configuration
* parameter 'hbase.procedure.master.classes', while implementation of
* {@link RegionServerProcedureManager} is loaded into {@link HRegionServer} process via
* configuration parameter 'hbase.procedure.regionserver.classes'.
*
* An example of globally barriered procedure implementation is {@link SnapshotManager} and
* {@link RegionServerSnapshotManager}.
*
* A globally barriered procedure is identified by its signature (usually it is the name of the
* procedure znode). During the initialization phase, the initialize methods are called by both
* {@link HMaster} and {@link HRegionServer} witch create the procedure znode and register the
* listeners. A procedure can be triggered by its signature and an instant name (encapsulated in
* a {@link ProcedureDescription} object). When the servers are shutdown, the stop methods on both
* classes are called to clean up the data associated with the procedure.
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
public abstract class MasterProcedureManager extends ProcedureManager implements
Stoppable {
/**
* Initialize a globally barriered procedure for master.
*
* @param master Master service interface
* @throws KeeperException
* @throws IOException
* @throws UnsupportedOperationException
*/
public abstract void initialize(MasterServices master, MetricsMaster metricsMaster)
throws KeeperException, IOException, UnsupportedOperationException;
/**
* Execute a distributed procedure on cluster
*
* @param desc Procedure description
* @throws IOException
*/
public abstract void execProcedure(ProcedureDescription desc) throws IOException;
/**
* Check if the procedure is finished successfully
*
* @param desc Procedure description
* @return true if the specified procedure is finished successfully
* @throws IOException
*/
public abstract boolean isProcedureDone(ProcedureDescription desc) throws IOException;
}

View File

@ -0,0 +1,64 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.procedure;
import java.io.IOException;
import java.util.Hashtable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.master.HMaster;
import org.apache.hadoop.hbase.master.MasterServices;
import org.apache.hadoop.hbase.master.MetricsMaster;
import org.apache.zookeeper.KeeperException;
/**
* Provides the globally barriered procedure framework and environment for
* master oriented operations. {@link HMaster} interacts with the loaded
* procedure manager through this class.
*/
public class MasterProcedureManagerHost extends
ProcedureManagerHost<MasterProcedureManager> {
private Hashtable<String, MasterProcedureManager> procedureMgrMap
= new Hashtable<String, MasterProcedureManager>();
@Override
public void loadProcedures(Configuration conf) {
loadUserProcedures(conf, MASTER_PROCEUDRE_CONF_KEY);
for (MasterProcedureManager mpm : getProcedureManagers()) {
procedureMgrMap.put(mpm.getProcedureSignature(), mpm);
}
}
public void initialize(MasterServices master, final MetricsMaster metricsMaster)
throws KeeperException, IOException, UnsupportedOperationException {
for (MasterProcedureManager mpm : getProcedureManagers()) {
mpm.initialize(master, metricsMaster);
}
}
public void stop(String why) {
for (MasterProcedureManager mpm : getProcedureManagers()) {
mpm.stop(why);
}
}
public MasterProcedureManager getProcedureManager(String signature) {
return procedureMgrMap.get(signature);
}
}

View File

@ -0,0 +1,48 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.procedure;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hbase.regionserver.snapshot.RegionServerSnapshotManager;
@InterfaceAudience.Private
@InterfaceStability.Evolving
public abstract class ProcedureManager {
/**
* Return the unique signature of the procedure. This signature uniquely
* identifies the procedure. By default, this signature is the string used in
* the procedure controller (i.e., the root ZK node name for the procedure)
*/
public abstract String getProcedureSignature();
@Override
public boolean equals(Object obj) {
if (!(obj instanceof ProcedureManager)) {
return false;
}
ProcedureManager other = (ProcedureManager)obj;
return this.getProcedureSignature().equals(other.getProcedureSignature());
}
@Override
public int hashCode() {
return this.getProcedureSignature().hashCode();
}
}

View File

@ -0,0 +1,116 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.procedure;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Set;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
/**
* Provides the common setup framework and runtime services for globally
* barriered procedure invocation from HBase services.
* @param <E> the specific procedure management extension that a concrete
* implementation provides
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
public abstract class ProcedureManagerHost<E extends ProcedureManager> {
public static final String REGIONSERVER_PROCEDURE_CONF_KEY =
"hbase.procedure.regionserver.classes";
public static final String MASTER_PROCEUDRE_CONF_KEY =
"hbase.procedure.master.classes";
private static final Log LOG = LogFactory.getLog(ProcedureManagerHost.class);
protected Set<E> procedures = new HashSet<E>();
/**
* Load system procedures. Read the class names from configuration.
* Called by constructor.
*/
protected void loadUserProcedures(Configuration conf, String confKey) {
Class<?> implClass = null;
// load default procedures from configure file
String[] defaultProcClasses = conf.getStrings(confKey);
if (defaultProcClasses == null || defaultProcClasses.length == 0)
return;
List<E> configured = new ArrayList<E>();
for (String className : defaultProcClasses) {
className = className.trim();
ClassLoader cl = this.getClass().getClassLoader();
Thread.currentThread().setContextClassLoader(cl);
try {
implClass = cl.loadClass(className);
configured.add(loadInstance(implClass));
LOG.info("User procedure " + className + " was loaded successfully.");
} catch (ClassNotFoundException e) {
LOG.warn("Class " + className + " cannot be found. " +
e.getMessage());
} catch (IOException e) {
LOG.warn("Load procedure " + className + " failed. " +
e.getMessage());
}
}
// add entire set to the collection
procedures.addAll(configured);
}
@SuppressWarnings("unchecked")
public E loadInstance(Class<?> implClass) throws IOException {
// create the instance
E impl;
Object o = null;
try {
o = implClass.newInstance();
impl = (E)o;
} catch (InstantiationException e) {
throw new IOException(e);
} catch (IllegalAccessException e) {
throw new IOException(e);
}
return impl;
}
// Register a procedure manager object
public void register(E obj) {
procedures.add(obj);
}
public Set<E> getProcedureManagers() {
Set<E> returnValue = new HashSet<E>();
for (E e: procedures) {
returnValue.add(e);
}
return returnValue;
}
public abstract void loadProcedures(Configuration conf);
}

View File

@ -0,0 +1,54 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.procedure;
import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hbase.regionserver.RegionServerServices;
import org.apache.zookeeper.KeeperException;
/**
* A life-cycle management interface for globally barriered procedures on
* region servers.
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
public abstract class RegionServerProcedureManager extends ProcedureManager {
/**
* Initialize a globally barriered procedure for region servers.
*
* @param rss Region Server service interface
* @throws KeeperException
*/
public abstract void initialize(RegionServerServices rss) throws KeeperException;
/**
* Start accepting procedure requests.
*/
public abstract void start();
/**
* Close <tt>this</tt> and all running procedure tasks
*
* @param force forcefully stop all running tasks
* @throws IOException
*/
public abstract void stop(boolean force) throws IOException;
}

View File

@ -0,0 +1,75 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.procedure;
import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.RegionServerServices;
import org.apache.hadoop.hbase.regionserver.snapshot.RegionServerSnapshotManager;
import org.apache.zookeeper.KeeperException;
/**
* Provides the globally barriered procedure framework and environment
* for region server oriented operations. {@link HRegionServer} interacts
* with the loaded procedure manager through this class.
*/
public class RegionServerProcedureManagerHost extends
ProcedureManagerHost<RegionServerProcedureManager> {
private static final Log LOG = LogFactory
.getLog(RegionServerProcedureManagerHost.class);
public void initialize(RegionServerServices rss) throws KeeperException {
for (RegionServerProcedureManager proc : procedures) {
LOG.info("Procedure " + proc.getProcedureSignature() + " is initializing");
proc.initialize(rss);
LOG.info("Procedure " + proc.getProcedureSignature() + " is initialized");
}
}
public void start() {
for (RegionServerProcedureManager proc : procedures) {
LOG.info("Procedure " + proc.getProcedureSignature() + " is starting");
proc.start();
LOG.info("Procedure " + proc.getProcedureSignature() + " is started");
}
}
public void stop(boolean force) {
for (RegionServerProcedureManager proc : procedures) {
try {
proc.stop(force);
} catch (IOException e) {
LOG.warn("Failed to close procedure " + proc.getProcedureSignature()
+ " cleanly", e);
}
}
}
@Override
public void loadProcedures(Configuration conf) {
loadUserProcedures(conf, REGIONSERVER_PROCEDURE_CONF_KEY);
// load the default snapshot manager
procedures.add(new RegionServerSnapshotManager());
}
}

View File

@ -117,6 +117,7 @@ import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
import org.apache.hadoop.hbase.ipc.ServerRpcController;
import org.apache.hadoop.hbase.master.SplitLogManager;
import org.apache.hadoop.hbase.master.TableLockManager;
import org.apache.hadoop.hbase.procedure.RegionServerProcedureManagerHost;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.RequestConverter;
import org.apache.hadoop.hbase.protobuf.ResponseConverter;
@ -489,8 +490,7 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa
private RegionServerCoprocessorHost rsHost;
/** Handle all the snapshot requests to this server */
RegionServerSnapshotManager snapshotManager;
private RegionServerProcedureManagerHost rspmHost;
// configuration setting on if replay WAL edits directly to another RS
private final boolean distributedLogReplay;
@ -749,11 +749,13 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa
this.abort("Failed to retrieve Cluster ID",e);
}
// watch for snapshots
// watch for snapshots and other procedures
try {
this.snapshotManager = new RegionServerSnapshotManager(this);
rspmHost = new RegionServerProcedureManagerHost();
rspmHost.loadProcedures(conf);
rspmHost.initialize(this);
} catch (KeeperException e) {
this.abort("Failed to reach zk cluster when creating snapshot handler.");
this.abort("Failed to reach zk cluster when creating procedure handler.", e);
}
this.tableLockManager = TableLockManager.createTableLockManager(conf, zooKeeper,
ServerName.valueOf(isa.getHostName(), isa.getPort(), startcode));
@ -856,8 +858,9 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa
}
if (!this.stopped && isHealthy()){
// start the snapshot handler, since the server is ready to run
this.snapshotManager.start();
// start the snapshot handler and other procedure handlers,
// since the server is ready to run
rspmHost.start();
}
// We registered with the Master. Go into run mode.
@ -945,12 +948,8 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa
this.nonceManagerChore.interrupt();
}
// Stop the snapshot handler, forcefully killing all running tasks
try {
if (snapshotManager != null) snapshotManager.stop(this.abortRequested || this.killed);
} catch (IOException e) {
LOG.warn("Failed to close snapshot handler cleanly", e);
}
// Stop the snapshot and other procedure handlers, forcefully killing all running tasks
rspmHost.stop(this.abortRequested || this.killed);
if (this.killed) {
// Just skip out w/o closing regions. Used when testing.

View File

@ -42,10 +42,10 @@ import org.apache.hadoop.hbase.master.snapshot.MasterSnapshotVerifier;
import org.apache.hadoop.hbase.master.snapshot.SnapshotManager;
import org.apache.hadoop.hbase.procedure.ProcedureMember;
import org.apache.hadoop.hbase.procedure.ProcedureMemberRpcs;
import org.apache.hadoop.hbase.procedure.RegionServerProcedureManager;
import org.apache.hadoop.hbase.procedure.Subprocedure;
import org.apache.hadoop.hbase.procedure.SubprocedureFactory;
import org.apache.hadoop.hbase.procedure.ZKProcedureMemberRpcs;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
@ -71,7 +71,7 @@ import com.google.protobuf.InvalidProtocolBufferException;
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
public class RegionServerSnapshotManager {
public class RegionServerSnapshotManager extends RegionServerProcedureManager {
private static final Log LOG = LogFactory.getLog(RegionServerSnapshotManager.class);
/** Maximum number of snapshot region tasks that can run concurrently */
@ -93,9 +93,9 @@ public class RegionServerSnapshotManager {
/** Default amount of time to check for errors while regions finish snapshotting */
private static final long SNAPSHOT_REQUEST_WAKE_MILLIS_DEFAULT = 500;
private final RegionServerServices rss;
private final ProcedureMemberRpcs memberRpcs;
private final ProcedureMember member;
private RegionServerServices rss;
private ProcedureMemberRpcs memberRpcs;
private ProcedureMember member;
/**
* Exposed for testing.
@ -111,32 +111,12 @@ public class RegionServerSnapshotManager {
this.member = procMember;
}
/**
* Create a default snapshot handler - uses a zookeeper based member controller.
* @param rss region server running the handler
* @throws KeeperException if the zookeeper cluster cannot be reached
*/
public RegionServerSnapshotManager(RegionServerServices rss)
throws KeeperException {
this.rss = rss;
ZooKeeperWatcher zkw = rss.getZooKeeper();
this.memberRpcs = new ZKProcedureMemberRpcs(zkw,
SnapshotManager.ONLINE_SNAPSHOT_CONTROLLER_DESCRIPTION);
// read in the snapshot request configuration properties
Configuration conf = rss.getConfiguration();
long keepAlive = conf.getLong(SNAPSHOT_TIMEOUT_MILLIS_KEY, SNAPSHOT_TIMEOUT_MILLIS_DEFAULT);
int opThreads = conf.getInt(SNAPSHOT_REQUEST_THREADS_KEY, SNAPSHOT_REQUEST_THREADS_DEFAULT);
// create the actual snapshot procedure member
ThreadPoolExecutor pool = ProcedureMember.defaultPool(rss.getServerName().toString(),
opThreads, keepAlive);
this.member = new ProcedureMember(memberRpcs, pool, new SnapshotSubprocedureBuilder());
}
public RegionServerSnapshotManager() {}
/**
* Start accepting snapshot requests.
*/
@Override
public void start() {
LOG.debug("Start Snapshot Manager " + rss.getServerName().toString());
this.memberRpcs.start(rss.getServerName().toString(), member);
@ -147,6 +127,7 @@ public class RegionServerSnapshotManager {
* @param force forcefully stop all running tasks
* @throws IOException
*/
@Override
public void stop(boolean force) throws IOException {
String mode = force ? "abruptly" : "gracefully";
LOG.info("Stopping RegionServerSnapshotManager " + mode + ".");
@ -373,4 +354,33 @@ public class RegionServerSnapshotManager {
this.executor.shutdownNow();
}
}
/**
* Create a default snapshot handler - uses a zookeeper based member controller.
* @param rss region server running the handler
* @throws KeeperException if the zookeeper cluster cannot be reached
*/
@Override
public void initialize(RegionServerServices rss) throws KeeperException {
this.rss = rss;
ZooKeeperWatcher zkw = rss.getZooKeeper();
this.memberRpcs = new ZKProcedureMemberRpcs(zkw,
SnapshotManager.ONLINE_SNAPSHOT_CONTROLLER_DESCRIPTION);
// read in the snapshot request configuration properties
Configuration conf = rss.getConfiguration();
long keepAlive = conf.getLong(SNAPSHOT_TIMEOUT_MILLIS_KEY, SNAPSHOT_TIMEOUT_MILLIS_DEFAULT);
int opThreads = conf.getInt(SNAPSHOT_REQUEST_THREADS_KEY, SNAPSHOT_REQUEST_THREADS_DEFAULT);
// create the actual snapshot procedure member
ThreadPoolExecutor pool = ProcedureMember.defaultPool(rss.getServerName().toString(),
opThreads, keepAlive);
this.member = new ProcedureMember(memberRpcs, pool, new SnapshotSubprocedureBuilder());
}
@Override
public String getProcedureSignature() {
return SnapshotManager.ONLINE_SNAPSHOT_CONTROLLER_DESCRIPTION;
}
}

View File

@ -0,0 +1,120 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.procedure;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ThreadPoolExecutor;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.errorhandling.ForeignException;
import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher;
import org.apache.hadoop.hbase.executor.ExecutorService;
import org.apache.hadoop.hbase.master.MasterServices;
import org.apache.hadoop.hbase.master.MetricsMaster;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ProcedureDescription;
import org.apache.hadoop.hbase.snapshot.HBaseSnapshotException;
import org.apache.zookeeper.KeeperException;
public class SimpleMasterProcedureManager extends MasterProcedureManager {
public static final String SIMPLE_SIGNATURE = "simle_test";
private static final Log LOG = LogFactory.getLog(SimpleMasterProcedureManager.class);
private MasterServices master;
private ProcedureCoordinator coordinator;
private ExecutorService executorService;
private boolean done;
@Override
public void stop(String why) {
LOG.info("stop: " + why);
}
@Override
public boolean isStopped() {
return false;
}
@Override
public void initialize(MasterServices master, MetricsMaster metricsMaster)
throws KeeperException, IOException, UnsupportedOperationException {
this.master = master;
this.done = false;
// setup the default procedure coordinator
String name = master.getServerName().toString();
ThreadPoolExecutor tpool = ProcedureCoordinator.defaultPool(name, 1);
ProcedureCoordinatorRpcs comms = new ZKProcedureCoordinatorRpcs(
master.getZooKeeper(), getProcedureSignature(), name);
this.coordinator = new ProcedureCoordinator(comms, tpool);
this.executorService = master.getExecutorService();
}
@Override
public String getProcedureSignature() {
return SIMPLE_SIGNATURE;
}
@Override
public void execProcedure(ProcedureDescription desc) throws IOException {
this.done = false;
// start the process on the RS
ForeignExceptionDispatcher monitor = new ForeignExceptionDispatcher(desc.getInstance());
List<ServerName> serverNames = master.getServerManager().getOnlineServersList();
List<String> servers = new ArrayList<String>();
for (ServerName sn : serverNames) {
servers.add(sn.toString());
}
Procedure proc = coordinator.startProcedure(monitor, desc.getInstance(), new byte[0], servers);
if (proc == null) {
String msg = "Failed to submit distributed procedure for '"
+ getProcedureSignature() + "'";
LOG.error(msg);
throw new HBaseSnapshotException(msg);
}
try {
// wait for the procedure to complete. A timer thread is kicked off that should cancel this
// if it takes too long.
proc.waitForCompleted();
LOG.info("Done waiting - exec procedure for " + desc.getInstance());
this.done = true;
} catch (InterruptedException e) {
ForeignException ee =
new ForeignException("Interrupted while waiting for procdure to finish", e);
monitor.receive(ee);
Thread.currentThread().interrupt();
} catch (ForeignException e) {
monitor.receive(e);
}
}
@Override
public boolean isProcedureDone(ProcedureDescription desc) throws IOException {
return done;
}
}

View File

@ -0,0 +1,263 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.procedure;
import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.DaemonThreadFactory;
import org.apache.hadoop.hbase.regionserver.RegionServerServices;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.apache.hadoop.hbase.errorhandling.ForeignException;
import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher;
import org.apache.zookeeper.KeeperException;
public class SimpleRSProcedureManager extends RegionServerProcedureManager {
private static final Log LOG = LogFactory.getLog(SimpleRSProcedureManager.class);
private RegionServerServices rss;
private ProcedureMemberRpcs memberRpcs;
private ProcedureMember member;
@Override
public void initialize(RegionServerServices rss) throws KeeperException {
this.rss = rss;
ZooKeeperWatcher zkw = rss.getZooKeeper();
this.memberRpcs = new ZKProcedureMemberRpcs(zkw, getProcedureSignature());
ThreadPoolExecutor pool =
ProcedureMember.defaultPool(rss.getServerName().toString(), 1);
this.member = new ProcedureMember(memberRpcs, pool, new SimleSubprocedureBuilder());
LOG.info("Initialized: " + rss.getServerName().toString());
}
@Override
public void start() {
this.memberRpcs.start(rss.getServerName().toString(), member);
LOG.info("Started.");
}
@Override
public void stop(boolean force) throws IOException {
LOG.info("stop: " + force);
try {
this.member.close();
} finally {
this.memberRpcs.close();
}
}
@Override
public String getProcedureSignature() {
return SimpleMasterProcedureManager.SIMPLE_SIGNATURE;
}
/**
* If in a running state, creates the specified subprocedure for handling a procedure.
* @return Subprocedure to submit to the ProcedureMemeber.
*/
public Subprocedure buildSubprocedure(String name) {
// don't run a procedure if the parent is stop(ping)
if (rss.isStopping() || rss.isStopped()) {
throw new IllegalStateException("Can't start procedure on RS: " + rss.getServerName()
+ ", because stopping/stopped!");
}
LOG.info("Attempting to run a procedure.");
ForeignExceptionDispatcher errorDispatcher = new ForeignExceptionDispatcher();
Configuration conf = rss.getConfiguration();
SimpleSubprocedurePool taskManager =
new SimpleSubprocedurePool(rss.getServerName().toString(), conf);
return new SimpleSubprocedure(rss, member, errorDispatcher, taskManager, name);
}
/**
* Build the actual procedure runner that will do all the 'hard' work
*/
public class SimleSubprocedureBuilder implements SubprocedureFactory {
@Override
public Subprocedure buildSubprocedure(String name, byte[] data) {
LOG.info("Building procedure: " + name);
return SimpleRSProcedureManager.this.buildSubprocedure(name);
}
}
public class SimpleSubprocedurePool implements Closeable, Abortable {
private final ExecutorCompletionService<Void> taskPool;
private final ThreadPoolExecutor executor;
private volatile boolean aborted;
private final List<Future<Void>> futures = new ArrayList<Future<Void>>();
private final String name;
public SimpleSubprocedurePool(String name, Configuration conf) {
this.name = name;
executor = new ThreadPoolExecutor(1, 1, 500, TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(),
new DaemonThreadFactory("rs(" + name + ")-procedure-pool"));
taskPool = new ExecutorCompletionService<Void>(executor);
}
/**
* Submit a task to the pool.
*/
public void submitTask(final Callable<Void> task) {
Future<Void> f = this.taskPool.submit(task);
futures.add(f);
}
/**
* Wait for all of the currently outstanding tasks submitted via {@link #submitTask(Callable)}
*
* @return <tt>true</tt> on success, <tt>false</tt> otherwise
* @throws ForeignException
*/
public boolean waitForOutstandingTasks() throws ForeignException {
LOG.debug("Waiting for procedure to finish.");
try {
for (Future<Void> f: futures) {
f.get();
}
return true;
} catch (InterruptedException e) {
if (aborted) throw new ForeignException(
"Interrupted and found to be aborted while waiting for tasks!", e);
Thread.currentThread().interrupt();
} catch (ExecutionException e) {
if (e.getCause() instanceof ForeignException) {
throw (ForeignException) e.getCause();
}
throw new ForeignException(name, e.getCause());
} finally {
// close off remaining tasks
for (Future<Void> f: futures) {
if (!f.isDone()) {
f.cancel(true);
}
}
}
return false;
}
/**
* Attempt to cleanly shutdown any running tasks - allows currently running tasks to cleanly
* finish
*/
@Override
public void close() {
executor.shutdown();
}
@Override
public void abort(String why, Throwable e) {
if (this.aborted) return;
this.aborted = true;
LOG.warn("Aborting because: " + why, e);
this.executor.shutdownNow();
}
@Override
public boolean isAborted() {
return this.aborted;
}
}
public class SimpleSubprocedure extends Subprocedure {
private final RegionServerServices rss;
private final SimpleSubprocedurePool taskManager;
public SimpleSubprocedure(RegionServerServices rss, ProcedureMember member,
ForeignExceptionDispatcher errorListener, SimpleSubprocedurePool taskManager, String name) {
super(member, name, errorListener, 500, 60000);
LOG.info("Constructing a SimpleSubprocedure.");
this.rss = rss;
this.taskManager = taskManager;
}
/**
* Callable task.
* TODO. We don't need a thread pool to execute roll log. This can be simplified
* with no use of subprocedurepool.
*/
class RSSimpleTask implements Callable<Void> {
RSSimpleTask() {}
@Override
public Void call() throws Exception {
LOG.info("Execute subprocedure on " + rss.getServerName().toString());
return null;
}
}
private void execute() throws ForeignException {
monitor.rethrowException();
// running a task (e.g., roll log, flush table) on region server
taskManager.submitTask(new RSSimpleTask());
monitor.rethrowException();
// wait for everything to complete.
taskManager.waitForOutstandingTasks();
monitor.rethrowException();
}
@Override
public void acquireBarrier() throws ForeignException {
// do nothing, executing in inside barrier step.
}
/**
* do a log roll.
*/
@Override
public void insideBarrier() throws ForeignException {
execute();
}
/**
* Cancel threads if they haven't finished.
*/
@Override
public void cleanup(Exception e) {
taskManager.abort("Aborting simple subprocedure tasks due to error", e);
}
}
}

View File

@ -0,0 +1,66 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.procedure;
import java.io.IOException;
import java.util.HashMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.SmallTests;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@Category(SmallTests.class)
public class TestProcedureManager {
static final Log LOG = LogFactory.getLog(TestProcedureManager.class);
private static final int NUM_RS = 2;
private static HBaseTestingUtility util = new HBaseTestingUtility();
@BeforeClass
public static void setupBeforeClass() throws Exception {
// set configure to indicate which pm should be loaded
Configuration conf = util.getConfiguration();
conf.set(ProcedureManagerHost.MASTER_PROCEUDRE_CONF_KEY,
SimpleMasterProcedureManager.class.getName());
conf.set(ProcedureManagerHost.REGIONSERVER_PROCEDURE_CONF_KEY,
SimpleRSProcedureManager.class.getName());
util.startMiniCluster(NUM_RS);
}
@AfterClass
public static void tearDownAfterClass() throws Exception {
util.shutdownMiniCluster();
}
@Test
public void testSimpleProcedureManager() throws IOException {
HBaseAdmin admin = util.getHBaseAdmin();
admin.execProcedure(SimpleMasterProcedureManager.SIMPLE_SIGNATURE,
"mytest", new HashMap<String, String>());
}
}

View File

@ -24,8 +24,10 @@ import static org.junit.Assert.fail;
import java.io.IOException;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
@ -185,6 +187,60 @@ public class TestFlushSnapshotFromClient {
admin, fs, false, new Path(rootDir, HConstants.HREGION_LOGDIR_NAME), snapshotServers);
}
/**
* Test simple flush snapshotting a table that is online
* @throws Exception
*/
@Test (timeout=300000)
public void testFlushTableSnapshotWithProcedure() throws Exception {
HBaseAdmin admin = UTIL.getHBaseAdmin();
// make sure we don't fail on listing snapshots
SnapshotTestingUtils.assertNoSnapshots(admin);
// put some stuff in the table
HTable table = new HTable(UTIL.getConfiguration(), TABLE_NAME);
SnapshotTestingUtils.loadData(UTIL, table, DEFAULT_NUM_ROWS, TEST_FAM);
// get the name of all the regionservers hosting the snapshotted table
Set<String> snapshotServers = new HashSet<String>();
List<RegionServerThread> servers = UTIL.getMiniHBaseCluster().getLiveRegionServerThreads();
for (RegionServerThread server : servers) {
if (server.getRegionServer().getOnlineRegions(TABLE_NAME).size() > 0) {
snapshotServers.add(server.getRegionServer().getServerName().toString());
}
}
LOG.debug("FS state before snapshot:");
FSUtils.logFileSystemState(UTIL.getTestFileSystem(),
FSUtils.getRootDir(UTIL.getConfiguration()), LOG);
// take a snapshot of the enabled table
String snapshotString = "offlineTableSnapshot";
byte[] snapshot = Bytes.toBytes(snapshotString);
Map<String, String> props = new HashMap<String, String>();
props.put("table", STRING_TABLE_NAME);
admin.execProcedure(SnapshotManager.ONLINE_SNAPSHOT_CONTROLLER_DESCRIPTION,
snapshotString, props);
LOG.debug("Snapshot completed.");
// make sure we have the snapshot
List<SnapshotDescription> snapshots = SnapshotTestingUtils.assertOneSnapshotThatMatches(admin,
snapshot, TABLE_NAME);
// make sure its a valid snapshot
FileSystem fs = UTIL.getHBaseCluster().getMaster().getMasterFileSystem().getFileSystem();
Path rootDir = UTIL.getHBaseCluster().getMaster().getMasterFileSystem().getRootDir();
LOG.debug("FS state after snapshot:");
FSUtils.logFileSystemState(UTIL.getTestFileSystem(),
FSUtils.getRootDir(UTIL.getConfiguration()), LOG);
SnapshotTestingUtils.confirmSnapshotValid(snapshots.get(0), TABLE_NAME, TEST_FAM, rootDir,
admin, fs, false, new Path(rootDir, HConstants.HREGION_LOGDIR_NAME), snapshotServers);
}
@Test (timeout=300000)
public void testSnapshotFailsOnNonExistantTable() throws Exception {
HBaseAdmin admin = UTIL.getHBaseAdmin();