HBASE-7208 Transition Offline Snapshots to ForeignExceptions and Refactor for merge

git-svn-id: https://svn.apache.org/repos/asf/hbase/branches/hbase-7290@1445813 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Jonathan Hsieh 2013-02-13 18:36:03 +00:00
parent 8da4eec9a4
commit bb5c15894e
49 changed files with 976 additions and 2068 deletions

View File

@ -51,11 +51,6 @@ public class ForeignException extends IOException {
*/
private final String source;
/**
* Name of the original throwable's class. Must be non-null.
*/
private final String clazz;
/**
* Create a new ForeignException that can be serialized. It is assumed that this came from a
* remote source.
@ -68,7 +63,6 @@ public class ForeignException extends IOException {
assert cause != null;
assert clazz != null;
this.source = source;
this.clazz = clazz;
}
/**
@ -82,7 +76,6 @@ public class ForeignException extends IOException {
assert source != null;
assert cause != null;
this.source = source;
this.clazz = getCause().getClass().getName();
}
/**
@ -94,17 +87,12 @@ public class ForeignException extends IOException {
public ForeignException(String source, String msg) {
super(new IllegalArgumentException(msg));
this.source = source;
this.clazz = getCause().getClass().getName();
}
public String getSource() {
return source;
}
public String getSourceClass() {
return clazz;
}
/**
* The cause of a ForeignException can be an exception that was generated on a local in process
* thread, or a thread from a 'remote' separate process.
@ -120,7 +108,8 @@ public class ForeignException extends IOException {
@Override
public String toString() {
return clazz + " via " + getSource() + ":" + getLocalizedMessage();
String className = getCause().getClass().getName() ;
return className + " via " + getSource() + ":" + getLocalizedMessage();
}
/**

View File

@ -18,7 +18,6 @@
*/
package org.apache.hadoop.hbase.master;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
@ -41,18 +40,10 @@ import java.util.concurrent.atomic.AtomicReference;
import javax.management.ObjectName;
import com.google.common.collect.Maps;
import com.google.protobuf.Descriptors;
import com.google.protobuf.Message;
import com.google.protobuf.RpcCallback;
import com.google.protobuf.Service;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.Chore;
@ -85,7 +76,6 @@ import org.apache.hadoop.hbase.client.MetaScanner.MetaScannerVisitor;
import org.apache.hadoop.hbase.client.MetaScanner.MetaScannerVisitorBase;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
import org.apache.hadoop.hbase.executor.EventHandler;
import org.apache.hadoop.hbase.executor.ExecutorService;
import org.apache.hadoop.hbase.executor.ExecutorService.ExecutorType;
import org.apache.hadoop.hbase.ipc.HBaseRPC;
@ -186,18 +176,9 @@ import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.Regio
import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionServerStartupResponse;
import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.ReportRSFatalErrorRequest;
import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.ReportRSFatalErrorResponse;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.replication.regionserver.Replication;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
import org.apache.hadoop.hbase.snapshot.exception.HBaseSnapshotException;
import org.apache.hadoop.hbase.snapshot.exception.RestoreSnapshotException;
import org.apache.hadoop.hbase.snapshot.exception.SnapshotCreationException;
import org.apache.hadoop.hbase.snapshot.exception.SnapshotDoesNotExistException;
import org.apache.hadoop.hbase.snapshot.exception.SnapshotExistsException;
import org.apache.hadoop.hbase.snapshot.exception.TablePartiallyOpenException;
import org.apache.hadoop.hbase.snapshot.exception.UnknownSnapshotException;
import org.apache.hadoop.hbase.snapshot.restore.RestoreSnapshotHelper;
import org.apache.hadoop.hbase.trace.SpanReceiverHost;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.CompressionTest;
@ -223,7 +204,12 @@ import org.apache.hadoop.net.DNS;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.Watcher;
import com.google.common.collect.Maps;
import com.google.protobuf.Descriptors;
import com.google.protobuf.Message;
import com.google.protobuf.RpcCallback;
import com.google.protobuf.RpcController;
import com.google.protobuf.Service;
import com.google.protobuf.ServiceException;
/**
@ -346,7 +332,6 @@ Server {
private SpanReceiverHost spanReceiverHost;
private Map<String, Service> coprocessorServiceHandlers = Maps.newHashMap();
// monitor for snapshot of hbase tables
@ -592,10 +577,8 @@ Server {
Long.toHexString(this.zooKeeper.getRecoverableZooKeeper().getSessionId()) +
", cluster-up flag was=" + wasUp);
// create the snapshot monitor
// TODO should this be config based?
this.snapshotManager = new SnapshotManager(this, zooKeeper, this.executorService);
snapshotManager.start();
// create the snapshot manager
this.snapshotManager = new SnapshotManager(this);
}
/**
@ -2447,6 +2430,10 @@ Server {
return this.snapshotManager;
}
/**
* Triggers an asynchronous attempt to take a snapshot.
* {@inheritDoc}
*/
@Override
public TakeSnapshotResponse snapshot(RpcController controller, TakeSnapshotRequest request)
throws ServiceException {
@ -2454,72 +2441,8 @@ Server {
// get the snapshot information
SnapshotDescription snapshot = SnapshotDescriptionUtils.validate(request.getSnapshot(),
this.conf);
// check to see if we already completed the snapshot
if (isSnapshotCompleted(snapshot)) {
throw new ServiceException(new SnapshotExistsException("Snapshot '" + snapshot.getName()
+ "' already stored on the filesystem.", snapshot));
}
LOG.debug("No existing snapshot, attempting snapshot...");
// check to see if the table exists
HTableDescriptor desc = null;
try {
desc = this.tableDescriptors.get(snapshot.getTable());
} catch (FileNotFoundException e) {
String msg = "Table:" + snapshot.getTable() + " info doesn't exist!";
LOG.error(msg);
throw new ServiceException(new SnapshotCreationException(msg, e, snapshot));
} catch (IOException e) {
throw new ServiceException(new SnapshotCreationException(
"Error while geting table description for table " + snapshot.getTable(), e, snapshot));
}
if (desc == null) {
throw new ServiceException(new SnapshotCreationException("Table '" + snapshot.getTable()
+ "' doesn't exist, can't take snapshot.", snapshot));
}
// set the snapshot version, now that we are ready to take it
snapshot = snapshot.toBuilder().setVersion(SnapshotDescriptionUtils.SNAPSHOT_LAYOUT_VERSION)
.build();
try {
if (cpHost != null) {
cpHost.preSnapshot(snapshot, desc);
}
} catch (IOException e) {
throw new ServiceException(e);
}
// if the table is enabled, then have the RS run actually the snapshot work
if (this.assignmentManager.getZKTable().isEnabledTable(snapshot.getTable())) {
LOG.debug("Table enabled, starting distributed snapshot.");
throw new ServiceException(new UnsupportedOperationException(
"Enabled table snapshots are not yet supported"));
}
// For disabled table, snapshot is created by the master
else if (this.assignmentManager.getZKTable().isDisabledTable(snapshot.getTable())) {
LOG.debug("Table is disabled, running snapshot entirely on master.");
try {
snapshotManager.snapshotDisabledTable(snapshot);
} catch (HBaseSnapshotException e) {
throw new ServiceException(e);
}
LOG.debug("Started snapshot: " + snapshot);
} else {
LOG.error("Can't snapshot table '" + snapshot.getTable()
+ "', isn't open or closed, we don't know what to do!");
throw new ServiceException(new SnapshotCreationException(
"Table is not entirely open or closed", new TablePartiallyOpenException(
snapshot.getTable() + " isn't fully open."), snapshot));
}
try {
if (cpHost != null) {
cpHost.postSnapshot(snapshot, desc);
}
snapshotManager.takeSnapshot(snapshot);
} catch (IOException e) {
throw new ServiceException(e);
}
@ -2538,41 +2461,11 @@ Server {
throws ServiceException {
try {
ListSnapshotResponse.Builder builder = ListSnapshotResponse.newBuilder();
List<SnapshotDescription> snapshots = snapshotManager.getCompletedSnapshots();
// first create the snapshot description and check to see if it exists
Path snapshotDir = SnapshotDescriptionUtils.getSnapshotsDir(this.getMasterFileSystem()
.getRootDir());
// if there are no snapshots, return an empty list
if (!this.getMasterFileSystem().getFileSystem().exists(snapshotDir)) {
return builder.build();
}
FileSystem fs = this.getMasterFileSystem().getFileSystem();
// ignore all the snapshots in progress
FileStatus[] snapshots = fs.listStatus(snapshotDir,
new SnapshotDescriptionUtils.CompletedSnaphotDirectoriesFilter(fs));
// look through all the completed snapshots
for (FileStatus snapshot : snapshots) {
Path info = new Path(snapshot.getPath(), SnapshotDescriptionUtils.SNAPSHOTINFO_FILE);
// if the snapshot is bad
if (!fs.exists(info)) {
LOG.error("Snapshot information for " + snapshot.getPath() + " doesn't exist");
continue;
}
FSDataInputStream in = null;
try {
in = fs.open(info);
SnapshotDescription desc = SnapshotDescription.parseFrom(in);
builder.addSnapshots(desc);
} catch (IOException e) {
LOG.warn("Found a corrupted snapshot " + snapshot.getPath(), e);
} finally {
if (in != null) {
in.close();
}
}
// convert to protobuf
for (SnapshotDescription snapshot : snapshots) {
builder.addSnapshots(snapshot);
}
return builder.build();
} catch (IOException e) {
@ -2580,109 +2473,40 @@ Server {
}
}
/**
* Execute Delete Snapshot operation.
* @returns DeleteSnapshotResponse (a protobuf wrapped void) if the snapshot existed and was
* deleted properly.
* @throws ServiceException wrapping SnapshotDoesNotExistException if specified snapshot did not
* exist.
*/
@Override
public DeleteSnapshotResponse deleteSnapshot(RpcController controller,
DeleteSnapshotRequest request) throws ServiceException {
try {
if (cpHost != null) {
cpHost.preDeleteSnapshot(request.getSnapshot());
}
// check to see if it is completed
if (!isSnapshotCompleted(request.getSnapshot())) {
throw new SnapshotDoesNotExistException(request.getSnapshot());
}
String snapshotName = request.getSnapshot().getName();
LOG.debug("Deleting snapshot: " + snapshotName);
// first create the snapshot description and check to see if it exists
Path snapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(snapshotName, this
.getMasterFileSystem().getRootDir());
// delete the existing snapshot
if (!this.getMasterFileSystem().getFileSystem().delete(snapshotDir, true)) {
throw new ServiceException("Failed to delete snapshot directory: " + snapshotDir);
}
if (cpHost != null) {
cpHost.postDeleteSnapshot(request.getSnapshot());
}
snapshotManager.deleteSnapshot(request.getSnapshot());
return DeleteSnapshotResponse.newBuilder().build();
} catch (IOException e) {
throw new ServiceException(e);
}
}
/**
* Checks if the specified snapshot is done.
* @returns true if the snapshot is in file system ready to use,
* false if the snapshot is in the process of completing
* @throws ServiceException wrapping UnknownSnapshotException if invalid snapshot, or
* a wrapped HBaseSnapshotException with progress failure reason.
*/
@Override
public IsSnapshotDoneResponse isSnapshotDone(RpcController controller,
IsSnapshotDoneRequest request) throws ServiceException {
LOG.debug("Checking to see if snapshot from request:" + request + " is done");
try {
// check the request to make sure it has a snapshot
if (!request.hasSnapshot()) {
throw new UnknownSnapshotException(
"No snapshot name passed in request, can't figure out which snapshot you want to check.");
}
SnapshotDescription expected = request.getSnapshot();
IsSnapshotDoneResponse.Builder builder = IsSnapshotDoneResponse.newBuilder();
// check to see if the sentinel exists
SnapshotSentinel sentinel = this.snapshotManager.getCurrentSnapshotSentinel();
if (sentinel != null) {
// pass on any failure we find in the sentinel
HBaseSnapshotException e = sentinel.getExceptionIfFailed();
if (e != null) throw e;
// get the current snapshot and compare it against the requested
SnapshotDescription snapshot = sentinel.getSnapshot();
LOG.debug("Have a snapshot to compare:" + snapshot);
if (expected.getName().equals(snapshot.getName())) {
LOG.trace("Running snapshot (" + snapshot.getName() + ") does match request:"
+ expected.getName());
// check to see if we are done
if (sentinel.isFinished()) {
builder.setDone(true);
LOG.debug("Snapshot " + snapshot + " has completed, notifying client.");
} else if (LOG.isDebugEnabled()) {
LOG.debug("Sentinel isn't finished with snapshot!");
}
return builder.build();
}
}
// check to see if the snapshot is already on the fs
if (!isSnapshotCompleted(expected)) {
throw new UnknownSnapshotException("Snapshot:" + expected.getName()
+ " is not currently running or one of the known completed snapshots.");
}
builder.setDone(true);
boolean done = snapshotManager.isSnapshotDone(request.getSnapshot());
builder.setDone(done);
return builder.build();
} catch (HBaseSnapshotException e) {
throw new ServiceException(e);
}
}
/**
* Check to see if the snapshot is one of the currently completed snapshots
* @param expected snapshot to check
* @return <tt>true</tt> if the snapshot is stored on the {@link FileSystem}, <tt>false</tt> if is
* not stored
* @throws IOException if the filesystem throws an unexpected exception
*/
private boolean isSnapshotCompleted(SnapshotDescription snapshot) throws ServiceException {
final Path snapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(snapshot, this
.getMasterFileSystem().getRootDir());
FileSystem fs = this.getMasterFileSystem().getFileSystem();
// check to see if the snapshot already exists
try {
return fs.exists(snapshotDir);
} catch (IOException e) {
throw new ServiceException(e);
}
@ -2704,56 +2528,9 @@ Server {
@Override
public RestoreSnapshotResponse restoreSnapshot(RpcController controller,
RestoreSnapshotRequest request) throws ServiceException {
SnapshotDescription reqSnapshot = request.getSnapshot();
FileSystem fs = this.getMasterFileSystem().getFileSystem();
Path rootDir = this.getMasterFileSystem().getRootDir();
Path snapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(request.getSnapshot(), rootDir);
try {
// check if the snapshot exists
if (!fs.exists(snapshotDir)) {
LOG.error("A Snapshot named '" + reqSnapshot.getName() + "' does not exist.");
throw new SnapshotDoesNotExistException(reqSnapshot);
}
// read snapshot information
SnapshotDescription snapshot = SnapshotDescriptionUtils.readSnapshotInfo(fs, snapshotDir);
HTableDescriptor snapshotTableDesc = FSTableDescriptors.getTableDescriptor(fs, snapshotDir);
String tableName = reqSnapshot.getTable();
// Execute the restore/clone operation
if (MetaReader.tableExists(catalogTracker, tableName)) {
if (this.assignmentManager.getZKTable().isEnabledTable(snapshot.getTable())) {
throw new ServiceException(new UnsupportedOperationException("Table '" +
snapshot.getTable() + "' must be disabled in order to perform a restore operation."));
}
if (cpHost != null) {
cpHost.preRestoreSnapshot(snapshot, snapshotTableDesc);
}
snapshotManager.restoreSnapshot(snapshot, snapshotTableDesc);
LOG.info("Restore snapshot=" + snapshot.getName() + " as table=" + tableName);
if (cpHost != null) {
cpHost.postRestoreSnapshot(snapshot, snapshotTableDesc);
}
} else {
HTableDescriptor htd = RestoreSnapshotHelper.cloneTableSchema(snapshotTableDesc,
Bytes.toBytes(tableName));
if (cpHost != null) {
cpHost.preCloneSnapshot(snapshot, htd);
}
snapshotManager.cloneSnapshot(snapshot, htd);
LOG.info("Clone snapshot=" + snapshot.getName() + " as table=" + tableName);
if (cpHost != null) {
cpHost.postCloneSnapshot(snapshot, htd);
}
}
SnapshotDescription reqSnapshot = request.getSnapshot();
snapshotManager.restoreSnapshot(reqSnapshot);
return RestoreSnapshotResponse.newBuilder().build();
} catch (IOException e) {
throw new ServiceException(e);
@ -2775,28 +2552,12 @@ Server {
IsRestoreSnapshotDoneRequest request) throws ServiceException {
try {
SnapshotDescription snapshot = request.getSnapshot();
SnapshotSentinel sentinel = this.snapshotManager.getRestoreSnapshotSentinel(snapshot.getTable());
IsRestoreSnapshotDoneResponse.Builder builder = IsRestoreSnapshotDoneResponse.newBuilder();
LOG.debug("Verify snapshot=" + snapshot.getName() + " against=" + sentinel.getSnapshot().getName() +
" table=" + snapshot.getTable());
if (sentinel != null && sentinel.getSnapshot().getName().equals(snapshot.getName())) {
HBaseSnapshotException e = sentinel.getExceptionIfFailed();
if (e != null) throw e;
// check to see if we are done
if (sentinel.isFinished()) {
LOG.debug("Restore snapshot=" + snapshot + " has completed. Notifying the client.");
} else {
builder.setDone(false);
if (LOG.isDebugEnabled()) {
LOG.debug("Sentinel is not yet finished with restoring snapshot=" + snapshot);
}
}
}
boolean isRestoring = snapshotManager.isRestoringTable(snapshot);
builder.setDone(!isRestoring);
return builder.build();
} catch (IOException e) {
throw new ServiceException(e);
}
}
}

View File

@ -19,16 +19,15 @@ package org.apache.hadoop.hbase.master;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hbase.Stoppable;
import org.apache.hadoop.hbase.errorhandling.ForeignException;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
import org.apache.hadoop.hbase.snapshot.exception.HBaseSnapshotException;
/**
* Watch the current snapshot under process
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
public interface SnapshotSentinel extends Stoppable {
public interface SnapshotSentinel {
/**
* Check to see if the snapshot is finished, where finished may be success or failure.
@ -37,6 +36,12 @@ public interface SnapshotSentinel extends Stoppable {
*/
public boolean isFinished();
/**
* Actively cancel a running snapshot.
* @param why Reason for cancellation.
*/
public void cancel(String why);
/**
* @return the description of the snapshot being run
*/
@ -44,9 +49,9 @@ public interface SnapshotSentinel extends Stoppable {
/**
* Get the exception that caused the snapshot to fail, if the snapshot has failed.
* @return <tt>null</tt> if the snapshot succeeded, or the {@link HBaseSnapshotException} that
* caused the snapshot to fail.
* @return {@link ForeignException} that caused the snapshot to fail, or <tt>null</tt> if the
* snapshot is still in progress or has succeeded
*/
public HBaseSnapshotException getExceptionIfFailed();
public ForeignException getExceptionIfFailed();
}

View File

@ -184,6 +184,7 @@ public class DisableTableHandler extends EventHandler {
while (!server.isStopped() && remaining > 0) {
Thread.sleep(waitingTimeForEvents);
regions = assignmentManager.getRegionStates().getRegionsOfTable(tableName);
LOG.debug("Disable waiting until done; " + remaining + " ms remaining; " + regions);
if (regions.isEmpty()) break;
remaining = timeout - (System.currentTimeMillis() - startTime);
}

View File

@ -190,12 +190,14 @@ public abstract class TableEventHandler extends EventHandler {
}
/**
* Gets a TableDescriptor from the masterServices. Can Throw exceptions.
*
* @return Table descriptor for this table
* @throws TableExistsException
* @throws FileNotFoundException
* @throws IOException
*/
protected HTableDescriptor getTableDescriptor()
public HTableDescriptor getTableDescriptor()
throws FileNotFoundException, IOException {
final String name = Bytes.toString(tableName);
HTableDescriptor htd =

View File

@ -21,34 +21,28 @@ package org.apache.hadoop.hbase.master.snapshot;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.CancellationException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.NotAllMetaRegionsOnlineException;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.TableExistsException;
import org.apache.hadoop.hbase.catalog.CatalogTracker;
import org.apache.hadoop.hbase.catalog.MetaReader;
import org.apache.hadoop.hbase.master.AssignmentManager;
import org.apache.hadoop.hbase.master.HMaster;
import org.apache.hadoop.hbase.errorhandling.ForeignException;
import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher;
import org.apache.hadoop.hbase.master.MasterServices;
import org.apache.hadoop.hbase.master.MasterFileSystem;
import org.apache.hadoop.hbase.master.SnapshotSentinel;
import org.apache.hadoop.hbase.master.handler.CreateTableHandler;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
import org.apache.hadoop.hbase.server.snapshot.error.SnapshotExceptionSnare;
import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
import org.apache.hadoop.hbase.snapshot.restore.RestoreSnapshotHelper;
import org.apache.hadoop.hbase.snapshot.exception.HBaseSnapshotException;
import org.apache.hadoop.hbase.snapshot.exception.RestoreSnapshotException;
import org.apache.hadoop.hbase.snapshot.restore.RestoreSnapshotHelper;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.zookeeper.KeeperException;
/**
* Handler to Clone a snapshot.
@ -60,9 +54,11 @@ import org.apache.zookeeper.KeeperException;
public class CloneSnapshotHandler extends CreateTableHandler implements SnapshotSentinel {
private static final Log LOG = LogFactory.getLog(CloneSnapshotHandler.class);
private final static String NAME = "Master CloneSnapshotHandler";
private final SnapshotDescription snapshot;
private final SnapshotExceptionSnare monitor;
private final ForeignExceptionDispatcher monitor;
private volatile boolean stopped = false;
@ -77,7 +73,7 @@ public class CloneSnapshotHandler extends CreateTableHandler implements Snapshot
this.snapshot = snapshot;
// Monitor
this.monitor = new SnapshotExceptionSnare(snapshot);
this.monitor = new ForeignExceptionDispatcher();
}
@Override
@ -100,8 +96,11 @@ public class CloneSnapshotHandler extends CreateTableHandler implements Snapshot
} catch (Exception e) {
String msg = "clone snapshot=" + snapshot + " failed";
LOG.error(msg, e);
monitor.snapshotFailure("Failed due to exception:" + e.getMessage(), snapshot, e);
throw new RestoreSnapshotException(msg, e);
IOException rse = new RestoreSnapshotException(msg, e);
// these handlers aren't futures so we need to register the error here.
this.monitor.receive(new ForeignException(NAME, rse));
throw rse;
} finally {
this.stopped = true;
}
@ -118,25 +117,15 @@ public class CloneSnapshotHandler extends CreateTableHandler implements Snapshot
}
@Override
public void stop(String why) {
public void cancel(String why) {
if (this.stopped) return;
this.stopped = true;
LOG.info("Stopping clone snapshot=" + snapshot + " because: " + why);
this.monitor.snapshotFailure("Failing clone snapshot because server is stopping.", snapshot);
this.monitor.receive(new ForeignException(NAME, new CancellationException(why)));
}
@Override
public boolean isStopped() {
return this.stopped;
}
@Override
public HBaseSnapshotException getExceptionIfFailed() {
try {
this.monitor.failOnError();
} catch (HBaseSnapshotException e) {
return e;
}
return null;
public ForeignException getExceptionIfFailed() {
return this.monitor.getException();
}
}

View File

@ -26,29 +26,22 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.catalog.MetaReader;
import org.apache.hadoop.hbase.executor.EventHandler;
import org.apache.hadoop.hbase.errorhandling.ForeignException;
import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher;
import org.apache.hadoop.hbase.errorhandling.TimeoutExceptionInjector;
import org.apache.hadoop.hbase.master.MasterServices;
import org.apache.hadoop.hbase.master.SnapshotSentinel;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.server.errorhandling.OperationAttemptTimer;
import org.apache.hadoop.hbase.server.snapshot.TakeSnapshotUtils;
import org.apache.hadoop.hbase.server.snapshot.error.SnapshotExceptionSnare;
import org.apache.hadoop.hbase.server.snapshot.task.CopyRecoveredEditsTask;
import org.apache.hadoop.hbase.server.snapshot.task.ReferenceRegionHFilesTask;
import org.apache.hadoop.hbase.server.snapshot.task.TableInfoCopyTask;
import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
import org.apache.hadoop.hbase.snapshot.exception.HBaseSnapshotException;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.zookeeper.KeeperException;
/**
* Take a snapshot of a disabled table.
@ -57,27 +50,9 @@ import org.apache.hadoop.hbase.util.Pair;
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
public class DisabledTableSnapshotHandler extends EventHandler implements SnapshotSentinel {
public class DisabledTableSnapshotHandler extends TakeSnapshotHandler {
private static final Log LOG = LogFactory.getLog(DisabledTableSnapshotHandler.class);
private volatile boolean stopped = false;
protected final Configuration conf;
protected final FileSystem fs;
protected final Path rootDir;
private final MasterServices masterServices;
private final SnapshotDescription snapshot;
private final Path workingDir;
private final String tableName;
private final OperationAttemptTimer timer;
private final SnapshotExceptionSnare monitor;
private final MasterSnapshotVerifier verify;
private final TimeoutExceptionInjector timeoutInjector;
/**
* @param snapshot descriptor of the snapshot to take
@ -85,52 +60,23 @@ public class DisabledTableSnapshotHandler extends EventHandler implements Snapsh
* @param masterServices master services provider
* @throws IOException on unexpected error
*/
public DisabledTableSnapshotHandler(SnapshotDescription snapshot, Server server,
final MasterServices masterServices)
throws IOException {
super(server, EventType.C_M_SNAPSHOT_TABLE);
this.masterServices = masterServices;
this.tableName = snapshot.getTable();
this.snapshot = snapshot;
this.monitor = new SnapshotExceptionSnare(snapshot);
this.conf = this.masterServices.getConfiguration();
this.fs = this.masterServices.getMasterFileSystem().getFileSystem();
this.rootDir = FSUtils.getRootDir(this.conf);
this.workingDir = SnapshotDescriptionUtils.getWorkingSnapshotDir(snapshot, rootDir);
// prepare the verify
this.verify = new MasterSnapshotVerifier(masterServices, snapshot, rootDir);
public DisabledTableSnapshotHandler(SnapshotDescription snapshot,
final MasterServices masterServices) throws IOException {
super(snapshot, masterServices);
// setup the timer
timer = TakeSnapshotUtils.getMasterTimerAndBindToMonitor(snapshot, conf, monitor);
timeoutInjector = TakeSnapshotUtils.getMasterTimerAndBindToMonitor(snapshot, conf, monitor);
}
// TODO consider parallelizing these operations since they are independent. Right now its just
// easier to keep them serial though
@Override
public void process() {
LOG.info("Running table snapshot operation " + eventType + " on table " + tableName);
public void snapshotRegions(List<Pair<HRegionInfo, ServerName>> regionsAndLocations) throws IOException,
KeeperException {
try {
timer.start();
// write down the snapshot info in the working directory
SnapshotDescriptionUtils.writeSnapshotInfo(snapshot, workingDir, this.fs);
timeoutInjector.start();
// 1. get all the regions hosting this table.
List<Pair<HRegionInfo, ServerName>> regionsAndLocations = null;
while (regionsAndLocations == null) {
try {
regionsAndLocations = MetaReader.getTableRegionsAndLocations(
this.server.getCatalogTracker(), Bytes.toBytes(tableName), true);
} catch (InterruptedException e) {
// check to see if we failed, in which case return
if (this.monitor.checkForError()) return;
// otherwise, just reset the interrupt and keep on going
Thread.currentThread().interrupt();
}
}
// extract each pair to separate lists
Set<String> serverNames = new HashSet<String>();
@ -149,88 +95,35 @@ public class DisabledTableSnapshotHandler extends EventHandler implements Snapsh
regionInfo.getEncodedName());
HRegion.writeRegioninfoOnFilesystem(regionInfo, snapshotRegionDir, fs, conf);
// check for error for each region
monitor.failOnError();
monitor.rethrowException();
// 2.2 for each region, copy over its recovered.edits directory
Path regionDir = HRegion.getRegionDir(rootDir, regionInfo);
new CopyRecoveredEditsTask(snapshot, monitor, fs, regionDir, snapshotRegionDir).run();
monitor.failOnError();
new CopyRecoveredEditsTask(snapshot, monitor, fs, regionDir, snapshotRegionDir).call();
monitor.rethrowException();
// 2.3 reference all the files in the region
new ReferenceRegionHFilesTask(snapshot, monitor, regionDir, fs, snapshotRegionDir).run();
monitor.failOnError();
new ReferenceRegionHFilesTask(snapshot, monitor, regionDir, fs, snapshotRegionDir).call();
monitor.rethrowException();
}
// 3. write the table info to disk
LOG.info("Starting to copy tableinfo for offline snapshot:\n" + snapshot);
TableInfoCopyTask tableInfo = new TableInfoCopyTask(this.monitor, snapshot, fs,
FSUtils.getRootDir(conf));
tableInfo.run();
monitor.failOnError();
// 4. verify the snapshot is valid
verify.verifySnapshot(this.workingDir, serverNames);
// 5. complete the snapshot
SnapshotDescriptionUtils.completeSnapshot(this.snapshot, this.rootDir, this.workingDir,
this.fs);
tableInfo.call();
monitor.rethrowException();
} catch (Exception e) {
// make sure we capture the exception to propagate back to the client later
monitor.snapshotFailure("Failed due to exception:" + e.getMessage(), snapshot, e);
String reason = "Failed due to exception:" + e.getMessage();
ForeignException ee = new ForeignException(reason, e);
monitor.receive(ee);
} finally {
LOG.debug("Marking snapshot" + this.snapshot + " as finished.");
this.stopped = true;
// 6. mark the timer as finished - even if we got an exception, we don't need to time the
// operation any further
timer.complete();
LOG.debug("Launching cleanup of working dir:" + workingDir);
try {
// don't mark the snapshot as a failure if we can't cleanup - the snapshot worked.
if (!this.fs.delete(this.workingDir, true)) {
LOG.error("Couldn't delete snapshot working directory:" + workingDir);
}
} catch (IOException e) {
LOG.error("Couldn't delete snapshot working directory:" + workingDir);
}
timeoutInjector.complete();
}
}
@Override
public boolean isFinished() {
return this.stopped;
}
@Override
public SnapshotDescription getSnapshot() {
return snapshot;
}
@Override
public void stop(String why) {
if (this.stopped) return;
this.stopped = true;
LOG.info("Stopping disabled snapshot because: " + why);
// pass along the stop as a failure. This keeps all the 'should I stop running?' logic in a
// single place, though it is technically a little bit of an overload of how the error handler
// should be used.
this.monitor.snapshotFailure("Failing snapshot because server is stopping.", snapshot);
}
@Override
public boolean isStopped() {
return this.stopped;
}
@Override
public HBaseSnapshotException getExceptionIfFailed() {
try {
this.monitor.failOnError();
} catch (HBaseSnapshotException e) {
return e;
}
return null;
}
}

View File

@ -159,6 +159,11 @@ public final class MasterSnapshotVerifier {
List<HRegionInfo> regions = MetaReader.getTableRegions(this.services.getCatalogTracker(),
Bytes.toBytes(tableName));
for (HRegionInfo region : regions) {
// if offline split parent, skip it
if (region.isOffline() || region.isSplit() || region.isSplitParent()) {
continue;
}
verifyRegion(fs, snapshotDir, region);
}
}

View File

@ -21,34 +21,28 @@ package org.apache.hadoop.hbase.master.snapshot;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.CancellationException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.TableExistsException;
import org.apache.hadoop.hbase.catalog.CatalogTracker;
import org.apache.hadoop.hbase.catalog.MetaReader;
import org.apache.hadoop.hbase.master.AssignmentManager;
import org.apache.hadoop.hbase.master.HMaster;
import org.apache.hadoop.hbase.master.MasterServices;
import org.apache.hadoop.hbase.errorhandling.ForeignException;
import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher;
import org.apache.hadoop.hbase.master.MasterFileSystem;
import org.apache.hadoop.hbase.master.MasterServices;
import org.apache.hadoop.hbase.master.SnapshotSentinel;
import org.apache.hadoop.hbase.master.handler.TableEventHandler;
import org.apache.hadoop.hbase.master.snapshot.manage.SnapshotManager;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
import org.apache.hadoop.hbase.server.snapshot.error.SnapshotExceptionSnare;
import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
import org.apache.hadoop.hbase.snapshot.restore.RestoreSnapshotHelper;
import org.apache.hadoop.hbase.snapshot.exception.HBaseSnapshotException;
import org.apache.hadoop.hbase.snapshot.exception.RestoreSnapshotException;
import org.apache.hadoop.hbase.snapshot.restore.RestoreSnapshotHelper;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.zookeeper.KeeperException;
/**
* Handler to Restore a snapshot.
@ -63,7 +57,7 @@ public class RestoreSnapshotHandler extends TableEventHandler implements Snapsho
private final HTableDescriptor hTableDescriptor;
private final SnapshotDescription snapshot;
private final SnapshotExceptionSnare monitor;
private final ForeignExceptionDispatcher monitor;
private volatile boolean stopped = false;
public RestoreSnapshotHandler(final MasterServices masterServices,
@ -75,7 +69,7 @@ public class RestoreSnapshotHandler extends TableEventHandler implements Snapsho
this.snapshot = snapshot;
// Monitor
this.monitor = new SnapshotExceptionSnare(snapshot);
this.monitor = new ForeignExceptionDispatcher();
// Check table exists.
getTableDescriptor();
@ -114,7 +108,7 @@ public class RestoreSnapshotHandler extends TableEventHandler implements Snapsho
} catch (IOException e) {
String msg = "restore snapshot=" + snapshot + " failed";
LOG.error(msg, e);
monitor.snapshotFailure("Failed due to exception:" + e.getMessage(), snapshot, e);
monitor.receive(new ForeignException(masterServices.getServerName().toString(), e));
throw new RestoreSnapshotException(msg, e);
} finally {
this.stopped = true;
@ -132,25 +126,16 @@ public class RestoreSnapshotHandler extends TableEventHandler implements Snapsho
}
@Override
public void stop(String why) {
public void cancel(String why) {
if (this.stopped) return;
this.stopped = true;
LOG.info("Stopping restore snapshot=" + snapshot + " because: " + why);
this.monitor.snapshotFailure("Failing restore because server is stopping.", snapshot);
String msg = "Stopping restore snapshot=" + snapshot + " because: " + why;
LOG.info(msg);
CancellationException ce = new CancellationException(why);
this.monitor.receive(new ForeignException(masterServices.getServerName().toString(), ce));
}
@Override
public boolean isStopped() {
return this.stopped;
}
@Override
public HBaseSnapshotException getExceptionIfFailed() {
try {
this.monitor.failOnError();
} catch (HBaseSnapshotException e) {
return e;
}
return null;
public ForeignException getExceptionIfFailed() {
return this.monitor.getException();
}
}

View File

@ -0,0 +1,233 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master.snapshot;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CancellationException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.catalog.MetaReader;
import org.apache.hadoop.hbase.errorhandling.ForeignException;
import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher;
import org.apache.hadoop.hbase.errorhandling.ForeignExceptionSnare;
import org.apache.hadoop.hbase.executor.EventHandler;
import org.apache.hadoop.hbase.master.MasterServices;
import org.apache.hadoop.hbase.master.SnapshotSentinel;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
import org.apache.hadoop.hbase.server.snapshot.task.TableInfoCopyTask;
import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
import org.apache.hadoop.hbase.snapshot.exception.SnapshotCreationException;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.zookeeper.KeeperException;
/**
* A handler for taking snapshots from the master.
*
* This is not a subclass of TableEventHandler because using that would incur an extra META scan.
*/
@InterfaceAudience.Private
public abstract class TakeSnapshotHandler extends EventHandler implements SnapshotSentinel,
ForeignExceptionSnare {
private static final Log LOG = LogFactory.getLog(TakeSnapshotHandler.class);
private volatile boolean finished;
// none of these should ever be null
protected final MasterServices master;
protected final SnapshotDescription snapshot;
protected final Configuration conf;
protected final FileSystem fs;
protected final Path rootDir;
private final Path snapshotDir;
protected final Path workingDir;
private final MasterSnapshotVerifier verifier;
protected final ForeignExceptionDispatcher monitor;
/**
* @param snapshot descriptor of the snapshot to take
* @param masterServices master services provider
* @throws IOException on unexpected error
*/
public TakeSnapshotHandler(SnapshotDescription snapshot,
final MasterServices masterServices) throws IOException {
super(masterServices, EventType.C_M_SNAPSHOT_TABLE);
assert snapshot != null : "SnapshotDescription must not be nul1";
assert masterServices != null : "MasterServices must not be nul1";
this.master = masterServices;
this.snapshot = snapshot;
this.conf = this.master.getConfiguration();
this.fs = this.master.getMasterFileSystem().getFileSystem();
this.rootDir = this.master.getMasterFileSystem().getRootDir();
this.snapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(snapshot, rootDir);
this.workingDir = SnapshotDescriptionUtils.getWorkingSnapshotDir(snapshot, rootDir);
this.monitor = new ForeignExceptionDispatcher();
// prepare the verify
this.verifier = new MasterSnapshotVerifier(masterServices, snapshot, rootDir);
}
private HTableDescriptor loadTableDescriptor()
throws FileNotFoundException, IOException {
final String name = snapshot.getTable();
HTableDescriptor htd =
this.master.getTableDescriptors().get(name);
if (htd == null) {
throw new IOException("HTableDescriptor missing for " + name);
}
return htd;
}
/**
* Execute the core common portions of taking a snapshot. the {@link #snapshotRegions(List)}
* call should get implemented for each snapshot flavor.
*/
@Override
public void process() {
LOG.info("Running table snapshot operation " + eventType + " on table " + snapshot.getTable());
try {
loadTableDescriptor(); // check that .tableinfo is present
byte[] ssbytes = Bytes.toBytes(snapshot.getTable());
List<Pair<HRegionInfo, ServerName>> regionsAndLocations = MetaReader.getTableRegionsAndLocations(
this.server.getCatalogTracker(), ssbytes, true);
// If regions move after this meta scan, the region specific snapshot should fail, triggering
// an external exception that gets captured here.
// write down the snapshot info in the working directory
SnapshotDescriptionUtils.writeSnapshotInfo(snapshot, workingDir, this.fs);
new TableInfoCopyTask(monitor, snapshot, fs, rootDir).call();
monitor.rethrowException();
// run the snapshot
snapshotRegions(regionsAndLocations);
// extract each pair to separate lists
Set<String> serverNames = new HashSet<String>();
for (Pair<HRegionInfo, ServerName> p : regionsAndLocations) {
serverNames.add(p.getSecond().toString());
}
// verify the snapshot is valid
verifier.verifySnapshot(this.workingDir, serverNames);
// complete the snapshot, atomically moving from tmp to .snapshot dir.
completeSnapshot(this.snapshotDir, this.workingDir, this.fs);
} catch (Exception e) {
LOG.error("Got exception taking snapshot", e);
String reason = "Failed due to exception:" + e.getMessage();
ForeignException ee = new ForeignException(reason, e);
monitor.receive(ee);
// need to mark this completed to close off and allow cleanup to happen.
cancel("Failed to take snapshot '" + snapshot.getName() + "' due to exception");
} finally {
LOG.debug("Launching cleanup of working dir:" + workingDir);
try {
// if the working dir is still present, the snapshot has failed. it is present we delete
// it.
if (fs.exists(workingDir) && !this.fs.delete(workingDir, true)) {
LOG.error("Couldn't delete snapshot working directory:" + workingDir);
}
} catch (IOException e) {
LOG.error("Couldn't delete snapshot working directory:" + workingDir);
}
}
}
/**
* Reset the manager to allow another snapshot to proceed
*
* @param snapshotDir final path of the snapshot
* @param workingDir directory where the in progress snapshot was built
* @param fs {@link FileSystem} where the snapshot was built
* @throws SnapshotCreationException if the snapshot could not be moved
* @throws IOException the filesystem could not be reached
*/
public void completeSnapshot(Path snapshotDir, Path workingDir, FileSystem fs)
throws SnapshotCreationException, IOException {
LOG.debug("Sentinel is done, just moving the snapshot from " + workingDir + " to "
+ snapshotDir);
if (!fs.rename(workingDir, snapshotDir)) {
throw new SnapshotCreationException("Failed to move working directory(" + workingDir
+ ") to completed directory(" + snapshotDir + ").");
}
finished = true;
}
/**
* Snapshot the specified regions
*/
protected abstract void snapshotRegions(List<Pair<HRegionInfo, ServerName>> regions) throws IOException,
KeeperException;
@Override
public void cancel(String why) {
if (finished) return;
this.finished = true;
LOG.info("Stop taking snapshot=" + snapshot + " because: " + why);
CancellationException ce = new CancellationException(why);
monitor.receive(new ForeignException(master.getServerName().toString(), ce));
}
@Override
public boolean isFinished() {
return finished;
}
@Override
public SnapshotDescription getSnapshot() {
return snapshot;
}
@Override
public ForeignException getExceptionIfFailed() {
return monitor.getException();
}
@Override
public void rethrowException() throws ForeignException {
monitor.rethrowException();
}
@Override
public boolean hasException() {
return monitor.hasException();
}
@Override
public ForeignException getException() {
return monitor.getException();
}
}

View File

@ -17,79 +17,175 @@
*/
package org.apache.hadoop.hbase.master.snapshot.manage;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.Stoppable;
import org.apache.hadoop.hbase.catalog.MetaReader;
import org.apache.hadoop.hbase.errorhandling.ForeignException;
import org.apache.hadoop.hbase.executor.ExecutorService;
import org.apache.hadoop.hbase.master.AssignmentManager;
import org.apache.hadoop.hbase.master.MasterCoprocessorHost;
import org.apache.hadoop.hbase.master.MasterFileSystem;
import org.apache.hadoop.hbase.master.MasterServices;
import org.apache.hadoop.hbase.master.SnapshotSentinel;
import org.apache.hadoop.hbase.master.snapshot.CloneSnapshotHandler;
import org.apache.hadoop.hbase.master.snapshot.DisabledTableSnapshotHandler;
import org.apache.hadoop.hbase.master.snapshot.RestoreSnapshotHandler;
import org.apache.hadoop.hbase.master.snapshot.TakeSnapshotHandler;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription.Type;
import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
import org.apache.hadoop.hbase.snapshot.exception.HBaseSnapshotException;
import org.apache.hadoop.hbase.snapshot.exception.SnapshotCreationException;
import org.apache.hadoop.hbase.snapshot.exception.RestoreSnapshotException;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.apache.zookeeper.KeeperException;
import org.apache.hadoop.hbase.snapshot.exception.SnapshotCreationException;
import org.apache.hadoop.hbase.snapshot.exception.SnapshotDoesNotExistException;
import org.apache.hadoop.hbase.snapshot.exception.SnapshotExistsException;
import org.apache.hadoop.hbase.snapshot.exception.TablePartiallyOpenException;
import org.apache.hadoop.hbase.snapshot.exception.UnknownSnapshotException;
import org.apache.hadoop.hbase.snapshot.restore.RestoreSnapshotHelper;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FSTableDescriptors;
import com.google.protobuf.ServiceException;
/**
* This class monitors the whole process of snapshots via ZooKeeper. There is only one
* SnapshotMonitor for the master.
* This class manages the procedure of taking and restoring snapshots. There is only one
* SnapshotManager for the master.
* <p>
* Start monitoring a snapshot by calling method monitor() before the snapshot is started across the
* cluster via ZooKeeper. SnapshotMonitor would stop monitoring this snapshot only if it is finished
* or aborted.
* The class provides methods for monitoring in-progress snapshot actions.
* <p>
* Note: There could be only one snapshot being processed and monitored at a time over the cluster.
* Start monitoring a snapshot only when the previous one reaches an end status.
* Note: Currently there can only one snapshot being taken at a time over the cluster. This is a
* simplification in the current implementation.
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
public class SnapshotManager implements Stoppable {
private static final Log LOG = LogFactory.getLog(SnapshotManager.class);
// TODO - enable having multiple snapshots with multiple monitors
/** By default, check to see if the snapshot is complete every WAKE MILLIS (ms) */
public static final int SNAPSHOT_WAKE_MILLIS_DEFAULT = 500;
/**
* Conf key for # of ms elapsed between checks for snapshot errors while waiting for
* completion.
*/
public static final String SNAPSHOT_WAKE_MILLIS_KEY = "hbase.snapshot.master.wakeMillis";
/** By default, check to see if the snapshot is complete (ms) */
public static final int SNAPSHOT_TIMEOUT_MILLIS_DEFAULT = 5000;
/**
* Conf key for # of ms elapsed before injecting a snapshot timeout error when waiting for
* completion.
*/
public static final String SNAPSHOT_TIMEMOUT_MILLIS_KEY = "hbase.snapshot.master.timeoutMillis";
/** Name of the operation to use in the controller */
public static final String ONLINE_SNAPSHOT_CONTROLLER_DESCRIPTION = "online-snapshot";
// TODO - enable having multiple snapshots with multiple monitors/threads
// this needs to be configuration based when running multiple snapshots is implemented
/** number of current operations running on the master */
private static final int opThreads = 1;
private boolean stopped;
private final long wakeFrequency;
private final MasterServices master; // Needed by TableEventHandlers
// A reference to a handler. If the handler is non-null, then it is assumed that a snapshot is
// in progress currently
// TODO: this is a bad smell; likely replace with a collection in the future. Also this gets
// reset by every operation.
private TakeSnapshotHandler handler;
private final Path rootDir;
private final ExecutorService executorService;
// Restore Sentinels map, with table name as key
private Map<String, SnapshotSentinel> restoreHandlers = new HashMap<String, SnapshotSentinel>();
private final MasterServices master;
private SnapshotSentinel handler;
private ExecutorService pool;
private final Path rootDir;
private boolean stopped;
public SnapshotManager(final MasterServices master, final ZooKeeperWatcher watcher,
final ExecutorService executorService) throws KeeperException {
/**
* Construct a snapshot manager.
* @param master
* @param comms
*/
public SnapshotManager(final MasterServices master) throws IOException {
this.master = master;
this.pool = executorService;
// get the configuration for the coordinator
Configuration conf = master.getConfiguration();
this.wakeFrequency = conf.getInt(SNAPSHOT_WAKE_MILLIS_KEY, SNAPSHOT_WAKE_MILLIS_DEFAULT);
this.rootDir = master.getMasterFileSystem().getRootDir();
this.executorService = master.getExecutorService();
resetTempDir();
}
/**
* Start running the manager.
* <p>
* <ol>
* <li>Cleans up any snapshots in the snapshot/.tmp directory that were left from failed
* snapshot/export attempts</li>
* </ol>
* Gets the list of all completed snapshots.
* @return list of SnapshotDescriptions
* @throws IOException File system exception
*/
public List<SnapshotDescription> getCompletedSnapshots() throws IOException {
List<SnapshotDescription> snapshotDescs = new ArrayList<SnapshotDescription>();
// first create the snapshot root path and check to see if it exists
Path snapshotDir = SnapshotDescriptionUtils.getSnapshotsDir(rootDir);
FileSystem fs = master.getMasterFileSystem().getFileSystem();
// if there are no snapshots, return an empty list
if (!fs.exists(snapshotDir)) {
return snapshotDescs;
}
// ignore all the snapshots in progress
FileStatus[] snapshots = fs.listStatus(snapshotDir,
new SnapshotDescriptionUtils.CompletedSnaphotDirectoriesFilter(fs));
// loop through all the completed snapshots
for (FileStatus snapshot : snapshots) {
Path info = new Path(snapshot.getPath(), SnapshotDescriptionUtils.SNAPSHOTINFO_FILE);
// if the snapshot is bad
if (!fs.exists(info)) {
LOG.error("Snapshot information for " + snapshot.getPath() + " doesn't exist");
continue;
}
FSDataInputStream in = null;
try {
in = fs.open(info);
SnapshotDescription desc = SnapshotDescription.parseFrom(in);
snapshotDescs.add(desc);
} catch (IOException e) {
LOG.warn("Found a corrupted snapshot " + snapshot.getPath(), e);
} finally {
if (in != null) {
in.close();
}
}
}
return snapshotDescs;
}
/**
* Cleans up any snapshots in the snapshot/.tmp directory that were left from failed
* snapshot attempts.
*
* @throws IOException if we can't reach the filesystem
*/
public void start() throws IOException {
void resetTempDir() throws IOException {
// cleanup any existing snapshots.
Path tmpdir = SnapshotDescriptionUtils.getWorkingSnapshotDir(rootDir);
if (master.getMasterFileSystem().getFileSystem().delete(tmpdir, true)) {
@ -98,16 +194,122 @@ public class SnapshotManager implements Stoppable {
}
/**
* @return <tt>true</tt> if there is a snapshot currently being taken, <tt>false</tt> otherwise
* Delete the specified snapshot
* @param snapshot
* @throws SnapshotDoesNotExistException If the specified snapshot does not exist.
* @throws IOException For filesystem IOExceptions
*/
public boolean isTakingSnapshot() {
public void deleteSnapshot(SnapshotDescription snapshot) throws SnapshotDoesNotExistException, IOException {
// call coproc pre hook
MasterCoprocessorHost cpHost = master.getCoprocessorHost();
if (cpHost != null) {
cpHost.preDeleteSnapshot(snapshot);
}
// check to see if it is completed
if (!isSnapshotCompleted(snapshot)) {
throw new SnapshotDoesNotExistException(snapshot);
}
String snapshotName = snapshot.getName();
LOG.debug("Deleting snapshot: " + snapshotName);
// first create the snapshot description and check to see if it exists
MasterFileSystem fs = master.getMasterFileSystem();
Path snapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(snapshotName, rootDir);
// delete the existing snapshot
if (!fs.getFileSystem().delete(snapshotDir, true)) {
throw new HBaseSnapshotException("Failed to delete snapshot directory: " + snapshotDir);
}
// call coproc post hook
if (cpHost != null) {
cpHost.postDeleteSnapshot(snapshot);
}
}
/**
* Return the handler if it is currently running and has the same snapshot target name.
* @param snapshot
* @return null if doesn't match, else a live handler.
*/
TakeSnapshotHandler getTakeSnapshotHandler(SnapshotDescription snapshot) {
TakeSnapshotHandler h = this.handler;
if (h == null) {
return null;
}
if (!h.getSnapshot().getName().equals(snapshot.getName())) {
// specified snapshot is to the one currently running
return null;
}
return h;
}
/**
* Check if the specified snapshot is done
* @param expected
* @return true if snapshot is ready to be restored, false if it is still being taken.
* @throws IOException IOException if error from HDFS or RPC
* @throws UnknownSnapshotException if snapshot is invalid or does not exist.
*/
public boolean isSnapshotDone(SnapshotDescription expected) throws IOException {
// check the request to make sure it has a snapshot
if (expected == null) {
throw new UnknownSnapshotException(
"No snapshot name passed in request, can't figure out which snapshot you want to check.");
}
// check to see if the sentinel exists
TakeSnapshotHandler handler = getTakeSnapshotHandler(expected);
if (handler == null) {
// doesn't exist, check if it is already completely done.
if (!isSnapshotCompleted(expected)) {
throw new UnknownSnapshotException("Snapshot:" + expected.getName()
+ " is not currently running or one of the known completed snapshots.");
}
// was done, return true;
return true;
}
// pass on any failure we find in the sentinel
try {
handler.rethrowException();
} catch (ForeignException e) {
throw new HBaseSnapshotException("Snapshot error from RS", e, expected);
}
// check to see if we are done
if (handler.isFinished()) {
LOG.debug("Snapshot '" + expected.getName() + "' has completed, notifying client.");
return true;
} else if (LOG.isDebugEnabled()) {
LOG.debug("Sentinel isn't finished with snapshot '" + expected.getName() + "'!");
}
return false;
}
/**
* Check to see if there are any snapshots in progress currently. Currently we have a
* limitation only allowing a single snapshot attempt at a time.
* @return <tt>true</tt> if there any snapshots in progress, <tt>false</tt> otherwise
* @throws SnapshotCreationException if the snapshot failed
*/
synchronized boolean isTakingSnapshot() throws SnapshotCreationException {
// TODO later when we handle multiple there would be a map with ssname to handler.
return handler != null && !handler.isFinished();
}
/*
/**
* Check to see if the specified table has a snapshot in progress. Currently we have a
* limitation only allowing a single snapshot attempt at a time.
* @param tableName name of the table being snapshotted.
* @return <tt>true</tt> if there is a snapshot in progress on the specified table.
*/
public boolean isTakingSnapshot(final String tableName) {
private boolean isTakingSnapshot(final String tableName) {
if (handler != null && handler.getSnapshot().getTable().equals(tableName)) {
return !handler.isFinished();
}
@ -138,7 +340,8 @@ public class SnapshotManager implements Stoppable {
}
try {
// delete the working directory, since we aren't running the snapshot
// delete the working directory, since we aren't running the snapshot. Likely leftovers
// from a failed attempt.
fs.delete(workingDir, true);
// recreate the working directory for the snapshot
@ -154,6 +357,77 @@ public class SnapshotManager implements Stoppable {
}
}
/**
* Take a snapshot based on the enabled/disabled state of the table.
*
* @param snapshot
* @throws HBaseSnapshotException when a snapshot specific exception occurs.
* @throws IOException when some sort of generic IO exception occurs.
*/
public void takeSnapshot(SnapshotDescription snapshot) throws HBaseSnapshotException, IOException {
// check to see if we already completed the snapshot
if (isSnapshotCompleted(snapshot)) {
throw new SnapshotExistsException("Snapshot '" + snapshot.getName()
+ "' already stored on the filesystem.", snapshot);
}
LOG.debug("No existing snapshot, attempting snapshot...");
// check to see if the table exists
HTableDescriptor desc = null;
try {
desc = master.getTableDescriptors().get(snapshot.getTable());
} catch (FileNotFoundException e) {
String msg = "Table:" + snapshot.getTable() + " info doesn't exist!";
LOG.error(msg);
throw new SnapshotCreationException(msg, e, snapshot);
} catch (IOException e) {
throw new SnapshotCreationException("Error while geting table description for table "
+ snapshot.getTable(), e, snapshot);
}
if (desc == null) {
throw new SnapshotCreationException("Table '" + snapshot.getTable()
+ "' doesn't exist, can't take snapshot.", snapshot);
}
// set the snapshot version, now that we are ready to take it
snapshot = snapshot.toBuilder().setVersion(SnapshotDescriptionUtils.SNAPSHOT_LAYOUT_VERSION)
.build();
// call pre coproc hook
MasterCoprocessorHost cpHost = master.getCoprocessorHost();
if (cpHost != null) {
cpHost.preSnapshot(snapshot, desc);
}
// setup the snapshot
prepareToTakeSnapshot(snapshot);
// if the table is enabled, then have the RS run actually the snapshot work
AssignmentManager assignmentMgr = master.getAssignmentManager();
if (assignmentMgr.getZKTable().isEnabledTable(snapshot.getTable())) {
LOG.debug("Table enabled, starting distributed snapshot.");
throw new UnsupportedOperationException("Snapshots of enabled tables is not yet supported");
}
// For disabled table, snapshot is created by the master
else if (assignmentMgr.getZKTable().isDisabledTable(snapshot.getTable())) {
LOG.debug("Table is disabled, running snapshot entirely on master.");
snapshotDisabledTable(snapshot);
LOG.debug("Started snapshot: " + snapshot);
} else {
LOG.error("Can't snapshot table '" + snapshot.getTable()
+ "', isn't open or closed, we don't know what to do!");
TablePartiallyOpenException tpoe = new TablePartiallyOpenException(snapshot.getTable()
+ " isn't fully open.");
throw new SnapshotCreationException("Table is not entirely open or closed", tpoe, snapshot);
}
// call post coproc hook
if (cpHost != null) {
cpHost.postSnapshot(snapshot, desc);
}
}
/**
* Take a snapshot of a disabled table.
* <p>
@ -162,21 +436,19 @@ public class SnapshotManager implements Stoppable {
* @param snapshot description of the snapshot to take. Modified to be {@link Type#DISABLED}.
* @throws HBaseSnapshotException if the snapshot could not be started
*/
public synchronized void snapshotDisabledTable(SnapshotDescription snapshot)
private synchronized void snapshotDisabledTable(SnapshotDescription snapshot)
throws HBaseSnapshotException {
// setup the snapshot
prepareToTakeSnapshot(snapshot);
// set the snapshot to be a disabled snapshot, since the client doesn't know about that
snapshot = snapshot.toBuilder().setType(Type.DISABLED).build();
DisabledTableSnapshotHandler handler;
try {
handler = new DisabledTableSnapshotHandler(snapshot, this.master, this.master);
handler = new DisabledTableSnapshotHandler(snapshot, this.master);
this.handler = handler;
this.pool.submit(handler);
this.executorService.submit(handler);
} catch (IOException e) {
// cleanup the working directory
// cleanup the working directory by trying to delete it from the fs.
Path workingDir = SnapshotDescriptionUtils.getWorkingSnapshotDir(snapshot, rootDir);
try {
if (this.master.getMasterFileSystem().getFileSystem().delete(workingDir, true)) {
@ -192,10 +464,35 @@ public class SnapshotManager implements Stoppable {
}
/**
* @return the current handler for the snapshot
* Set the handler for the current snapshot
* <p>
* Exposed for TESTING
* @param handler handler the master should use
*
* TODO get rid of this if possible, repackaging, modify tests.
*/
public SnapshotSentinel getCurrentSnapshotSentinel() {
return this.handler;
public synchronized void setSnapshotHandlerForTesting(TakeSnapshotHandler handler) {
this.handler = handler;
}
/**
* Check to see if the snapshot is one of the currently completed snapshots
* @param expected snapshot to check
* @return <tt>true</tt> if the snapshot is stored on the {@link FileSystem}, <tt>false</tt> if is
* not stored
* @throws IOException if the filesystem throws an unexpected exception,
* @throws IllegalArgumentException if snapshot name is invalid.
*/
private boolean isSnapshotCompleted(SnapshotDescription snapshot) throws IOException {
try {
final Path snapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(snapshot, rootDir);
FileSystem fs = master.getMasterFileSystem().getFileSystem();
// check to see if the snapshot already exists
return fs.exists(snapshotDir);
} catch (IllegalArgumentException iae) {
throw new UnknownSnapshotException("Unexpected exception thrown", iae);
}
}
/**
@ -206,10 +503,9 @@ public class SnapshotManager implements Stoppable {
* @param hTableDescriptor Table Descriptor of the table to create
* @param waitTime timeout before considering the clone failed
*/
public synchronized void cloneSnapshot(final SnapshotDescription snapshot,
synchronized void cloneSnapshot(final SnapshotDescription snapshot,
final HTableDescriptor hTableDescriptor) throws HBaseSnapshotException {
String tableName = hTableDescriptor.getNameAsString();
cleanupRestoreSentinels();
// make sure we aren't running a snapshot on the same table
if (isTakingSnapshot(tableName)) {
@ -224,7 +520,7 @@ public class SnapshotManager implements Stoppable {
try {
CloneSnapshotHandler handler =
new CloneSnapshotHandler(master, snapshot, hTableDescriptor);
this.pool.submit(handler);
this.executorService.submit(handler);
restoreHandlers.put(tableName, handler);
} catch (Exception e) {
String msg = "Couldn't clone the snapshot=" + snapshot + " on table=" + tableName;
@ -233,6 +529,62 @@ public class SnapshotManager implements Stoppable {
}
}
/**
* Restore the specified snapshot
* @param reqSnapshot
* @throws IOException
*/
public void restoreSnapshot(SnapshotDescription reqSnapshot) throws IOException {
FileSystem fs = master.getMasterFileSystem().getFileSystem();
Path snapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(reqSnapshot, rootDir);
MasterCoprocessorHost cpHost = master.getCoprocessorHost();
// check if the snapshot exists
if (!fs.exists(snapshotDir)) {
LOG.error("A Snapshot named '" + reqSnapshot.getName() + "' does not exist.");
throw new SnapshotDoesNotExistException(reqSnapshot);
}
// read snapshot information
SnapshotDescription fsSnapshot = SnapshotDescriptionUtils.readSnapshotInfo(fs, snapshotDir);
HTableDescriptor snapshotTableDesc = FSTableDescriptors.getTableDescriptor(fs, snapshotDir);
String tableName = reqSnapshot.getTable();
// stop tracking completed restores
cleanupRestoreSentinels();
// Execute the restore/clone operation
if (MetaReader.tableExists(master.getCatalogTracker(), tableName)) {
if (master.getAssignmentManager().getZKTable().isEnabledTable(fsSnapshot.getTable())) {
throw new UnsupportedOperationException("Table '" +
fsSnapshot.getTable() + "' must be disabled in order to perform a restore operation.");
}
// call coproc pre hook
if (cpHost != null) {
cpHost.preRestoreSnapshot(reqSnapshot, snapshotTableDesc);
}
restoreSnapshot(fsSnapshot, snapshotTableDesc);
LOG.info("Restore snapshot=" + fsSnapshot.getName() + " as table=" + tableName);
if (cpHost != null) {
cpHost.postRestoreSnapshot(reqSnapshot, snapshotTableDesc);
}
} else {
HTableDescriptor htd = RestoreSnapshotHelper.cloneTableSchema(snapshotTableDesc,
Bytes.toBytes(tableName));
if (cpHost != null) {
cpHost.preCloneSnapshot(reqSnapshot, htd);
}
cloneSnapshot(fsSnapshot, htd);
LOG.info("Clone snapshot=" + fsSnapshot.getName() + " as table=" + tableName);
if (cpHost != null) {
cpHost.postCloneSnapshot(reqSnapshot, htd);
}
}
}
/**
* Restore the specified snapshot.
* The restore will fail if the destination table has a snapshot or restore in progress.
@ -241,10 +593,9 @@ public class SnapshotManager implements Stoppable {
* @param hTableDescriptor Table Descriptor
* @param waitTime timeout before considering the restore failed
*/
public synchronized void restoreSnapshot(final SnapshotDescription snapshot,
private synchronized void restoreSnapshot(final SnapshotDescription snapshot,
final HTableDescriptor hTableDescriptor) throws HBaseSnapshotException {
String tableName = hTableDescriptor.getNameAsString();
cleanupRestoreSentinels();
// make sure we aren't running a snapshot on the same table
if (isTakingSnapshot(tableName)) {
@ -259,7 +610,7 @@ public class SnapshotManager implements Stoppable {
try {
RestoreSnapshotHandler handler =
new RestoreSnapshotHandler(master, snapshot, hTableDescriptor);
this.pool.submit(handler);
this.executorService.submit(handler);
restoreHandlers.put(hTableDescriptor.getNameAsString(), handler);
} catch (Exception e) {
String msg = "Couldn't restore the snapshot=" + snapshot + " on table=" + tableName;
@ -274,17 +625,59 @@ public class SnapshotManager implements Stoppable {
* @param tableName table under restore
* @return <tt>true</tt> if there is a restore in progress of the specified table.
*/
public boolean isRestoringTable(final String tableName) {
private boolean isRestoringTable(final String tableName) {
SnapshotSentinel sentinel = restoreHandlers.get(tableName);
return(sentinel != null && !sentinel.isFinished());
}
/**
* Returns status of a restore request, specifically comparing source snapshot and target table
* names. Throws exception if not a known snapshot.
* @param snapshot
* @return true if in progress, false if is not.
* @throws UnknownSnapshotException if specified source snapshot does not exit.
* @throws IOException if there was some sort of IO failure
*/
public boolean isRestoringTable(final SnapshotDescription snapshot) throws IOException {
// check to see if the snapshot is already on the fs
if (!isSnapshotCompleted(snapshot)) {
throw new UnknownSnapshotException("Snapshot:" + snapshot.getName()
+ " is not one of the known completed snapshots.");
}
SnapshotSentinel sentinel = getRestoreSnapshotSentinel(snapshot.getTable());
if (sentinel == null) {
// there is no sentinel so restore is not in progress.
return false;
}
if (!sentinel.getSnapshot().getName().equals(snapshot.getName())) {
// another handler is trying to restore to the table, but it isn't the same snapshot source.
return false;
}
LOG.debug("Verify snapshot=" + snapshot.getName() + " against="
+ sentinel.getSnapshot().getName() + " table=" + snapshot.getTable());
ForeignException e = sentinel.getExceptionIfFailed();
if (e != null) throw e;
// check to see if we are done
if (sentinel.isFinished()) {
LOG.debug("Restore snapshot=" + snapshot + " has completed. Notifying the client.");
return false;
}
if (LOG.isDebugEnabled()) {
LOG.debug("Sentinel is not yet finished with restoring snapshot=" + snapshot);
}
return true;
}
/**
* Get the restore snapshot sentinel for the specified table
* @param tableName table under restore
* @return the restore snapshot handler
*/
public synchronized SnapshotSentinel getRestoreSnapshotSentinel(final String tableName) {
private synchronized SnapshotSentinel getRestoreSnapshotSentinel(final String tableName) {
try {
return restoreHandlers.get(tableName);
} finally {
@ -306,17 +699,22 @@ public class SnapshotManager implements Stoppable {
}
}
//
// Implementing Stoppable interface
//
@Override
public void stop(String why) {
// short circuit
if (this.stopped) return;
// make sure we get stop
this.stopped = true;
// pass the stop onto all the listeners
if (this.handler != null) this.handler.stop(why);
// pass the stop onto take snapshot handlers
if (this.handler != null) this.handler.cancel(why);
// pass the stop onto all the restore handlers
for (SnapshotSentinel restoreHandler: this.restoreHandlers.values()) {
restoreHandler.stop(why);
restoreHandler.cancel(why);
}
}
@ -324,14 +722,4 @@ public class SnapshotManager implements Stoppable {
public boolean isStopped() {
return this.stopped;
}
/**
* Set the handler for the current snapshot
* <p>
* Exposed for TESTING
* @param handler handler the master should use
*/
public void setSnapshotHandlerForTesting(SnapshotSentinel handler) {
this.handler = handler;
}
}

View File

@ -2526,8 +2526,8 @@ public class HRegion implements HeapSize { // , Writable{
}
/**
* Replaces any KV timestamps set to {@link HConstants#LATEST_TIMESTAMP}
* with the provided current timestamp.
* Replaces any KV timestamps set to {@link HConstants#LATEST_TIMESTAMP} with the provided current
* timestamp.
*/
void updateKVTimestamps(
final Iterable<List<KeyValue>> keyLists, final byte[] now) {

View File

@ -1,44 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.server.errorhandling;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
/**
* Check for errors to a given process.
* @param <E> Type of error that <tt>this</tt> throws if it finds an error
*/
@InterfaceAudience.Public
@InterfaceStability.Evolving
public interface ExceptionCheckable<E extends Exception> {
/**
* Checks to see if any process to which the exception checker is bound has created an error that
* would cause a failure.
* @throws E if there has been an error, allowing a fail-fast mechanism
*/
public void failOnError() throws E;
/**
* Non-exceptional form of {@link #failOnError()}. Checks to see if any process to which the
* exception checkers is bound has created an error that would cause a failure.
* @return <tt>true</tt> if there has been an error,<tt>false</tt> otherwise
*/
public boolean checkForError();
}

View File

@ -1,40 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.server.errorhandling;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
/**
* Listen for errors on a process or operation
* @param <E> Type of exception that is expected
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
public interface ExceptionListener<E extends Exception> {
/**
* Receive an error.
* <p>
* Implementers must ensure that this method is thread-safe.
* @param message reason for the error
* @param e exception causing the error
* @param info general information about the error
*/
public void receiveError(String message, E e, Object... info);
}

View File

@ -1,41 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.server.errorhandling;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hbase.server.errorhandling.impl.ExceptionOrchestrator;
/**
* Simple visitor interface to update an error listener with an error notification
* @see ExceptionOrchestrator
* @param <T> Type of listener to update
*/
@InterfaceAudience.Public
@InterfaceStability.Unstable
public interface ExceptionVisitor<T> {
/**
* Visit the listener with the given error, possibly transforming or ignoring the error
* @param listener listener to update
* @param message error message
* @param e exception that caused the error
* @param info general information about the error
*/
public void visit(T listener, String message, Exception e, Object... info);
}

View File

@ -1,51 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.server.errorhandling;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hbase.server.errorhandling.impl.ExceptionOrchestratorFactory;
import org.apache.hadoop.hbase.util.Pair;
/**
* Inject faults when classes check to see if an error occurs.
* <p>
* Can be added to any monitoring via
* {@link ExceptionOrchestratorFactory#addFaultInjector(FaultInjector)}
* @see ExceptionListener
* @see ExceptionCheckable
* @param <E> Type of exception that the corresponding {@link ExceptionListener} is expecting
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
public interface FaultInjector<E extends Exception> {
/**
* Called by the specified class whenever checking for process errors. Care needs to be taken when
* using fault injectors to pass the correct size array back or the received error in the listener
* could not receive the correct number of argument and throw an error.
* <p>
* Note that every time the fault injector is called it does not necessarily need to inject a
* fault, but only when the fault is desired.
* @param trace full stack trace of the call to check for an error
* @return the information about the fault that should be returned if there was a fault (expected
* exception to throw and generic error information) or <tt>null</tt> if no fault should
* be injected.
*/
public Pair<E, Object[]> injectFault(StackTraceElement[] trace);
}

View File

@ -1,52 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.server.errorhandling;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
/**
* Base class for an object with a name.
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
public class Name {
private String name;
public Name(String name) {
this.name = name;
}
public void setName(String name) {
this.name = name;
}
/**
* Get the name of the class that should be used for logging
* @return {@link String} prefix for logging
*/
public String getNamePrefixForLog() {
return name != null ? "(" + name + ")" : "";
}
@Override
public String toString() {
return this.name;
}
}

View File

@ -1,129 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.server.errorhandling;
import java.util.Timer;
import java.util.TimerTask;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hbase.server.errorhandling.exception.OperationAttemptTimeoutException;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
/**
* Time a given process/operation and report a failure if the elapsed time exceeds the max allowed
* time.
* <p>
* The timer won't start tracking time until calling {@link #start()}. If {@link #complete()} or
* {@link #trigger()} is called before {@link #start()}, calls to {@link #start()} will fail.
*/
@InterfaceAudience.Public
@InterfaceStability.Evolving
public class OperationAttemptTimer {
private static final Log LOG = LogFactory.getLog(OperationAttemptTimer.class);
private final long maxTime;
private volatile boolean complete;
private final Timer timer;
private final TimerTask timerTask;
private long start = -1;
/**
* Create a generic timer for a task/process.
* @param listener listener to notify if the process times out
* @param maxTime max allowed running time for the process. Timer starts on calls to
* {@link #start()}
* @param info information about the process to pass along if the timer expires
*/
@SuppressWarnings("rawtypes")
public OperationAttemptTimer(final ExceptionListener listener, final long maxTime,
final Object... info) {
this.maxTime = maxTime;
timer = new Timer();
timerTask = new TimerTask() {
@SuppressWarnings("unchecked")
@Override
public void run() {
// ensure we don't run this task multiple times
synchronized (this) {
// quick exit if we already marked the task complete
if (OperationAttemptTimer.this.complete) return;
// mark the task is run, to avoid repeats
OperationAttemptTimer.this.complete = true;
}
long end = EnvironmentEdgeManager.currentTimeMillis();
listener.receiveError("Timeout elapsed!", new OperationAttemptTimeoutException(start, end,
maxTime), info);
}
};
}
/**
* For all time forward, do not throw an error because the process has completed.
*/
public void complete() {
// warn if the timer is already marked complete. This isn't going to be thread-safe, but should
// be good enough and its not worth locking just for a warning.
if (this.complete) {
LOG.warn("Timer already marked completed, ignoring!");
return;
}
LOG.debug("Marking timer as complete - no error notifications will be received for this timer.");
synchronized (this.timerTask) {
this.complete = true;
}
this.timer.cancel();
}
/**
* Start a timer to fail a process if it takes longer than the expected time to complete.
* <p>
* Non-blocking.
* @throws IllegalStateException if the timer has already been marked done via {@link #complete()}
* or {@link #trigger()}
*/
public synchronized void start() throws IllegalStateException {
if (this.start >= 0) {
LOG.warn("Timer already started, can't be started again. Ignoring second request.");
return;
}
LOG.debug("Scheduling process timer to run in: " + maxTime + " ms");
timer.schedule(timerTask, maxTime);
this.start = EnvironmentEdgeManager.currentTimeMillis();
}
/**
* Trigger the timer immediately.
* <p>
* Exposed for testing.
*/
public void trigger() {
synchronized (timerTask) {
if (this.complete) {
LOG.warn("Timer already completed, not triggering.");
return;
}
LOG.debug("Triggering timer immediately!");
this.timer.cancel();
this.timerTask.run();
}
}
}

View File

@ -1,43 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.server.errorhandling.exception;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hbase.server.errorhandling.OperationAttemptTimer;
/**
* Exception for a timeout of a task.
* @see OperationAttemptTimer
*/
@InterfaceAudience.Public
@InterfaceStability.Evolving
@SuppressWarnings("serial")
public class OperationAttemptTimeoutException extends Exception {
/**
* Exception indicating that an operation attempt has timed out
* @param start time the operation started (ms since epoch)
* @param end time the timeout was triggered (ms since epoch)
* @param allowed max allow amount of time for the operation to complete (ms)
*/
public OperationAttemptTimeoutException(long start, long end, long allowed) {
super("Timeout elapsed! Start:" + start + ", End:" + end + ", diff:" + (end - start) + ", max:"
+ allowed + " ms");
}
}

View File

@ -1,32 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.server.errorhandling.exception;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hbase.server.errorhandling.impl.ExceptionSnare;
/**
* Exception when an {@link ExceptionSnare} doens't have an <tt>Exception</tt> when it receives an
* error.
*/
@InterfaceAudience.Public
@InterfaceStability.Evolving
@SuppressWarnings("serial")
public class UnknownErrorException extends RuntimeException {
}

View File

@ -1,104 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.server.errorhandling.impl;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.server.errorhandling.ExceptionCheckable;
import org.apache.hadoop.hbase.server.errorhandling.ExceptionListener;
import org.apache.hadoop.hbase.server.errorhandling.ExceptionVisitor;
/**
* The dispatcher acts as a central point of control of error handling. Any exceptions from the
* dispatcher get passed directly to the listeners. Likewise, any errors from the listeners get
* passed to the dispatcher and then back to any listeners.
* <p>
* This is useful, for instance, for informing multiple process in conjunction with an
* {@link Abortable}
* <p>
* This is different than an {@link ExceptionOrchestrator} as it will only propagate an error
* <i>once</i> to all listeners; its single use, just like an {@link ExceptionSnare}. For example,
* if an error is passed to <tt>this</tt> then that error will be passed to all listeners, but a
* second error passed to {@link #receiveError(String, Exception, Object...)} will be ignored. This
* is particularly useful to help avoid accidentally having infinite loops when passing errors.
* <p>
* @param <T> generic exception listener type to update
* @param <E> Type of {@link Exception} to throw when calling {@link #failOnError()}
*/
@InterfaceAudience.Public
@InterfaceStability.Evolving
public class ExceptionDispatcher<T, E extends Exception> extends ExceptionOrchestrator<E> implements
ExceptionListener<E>, ExceptionCheckable<E> {
private static final Log LOG = LogFactory.getLog(ExceptionDispatcher.class);
protected final ExceptionVisitor<T> visitor;
private final ExceptionSnare<E> snare = new ExceptionSnare<E>();
public ExceptionDispatcher(String name, ExceptionVisitor<T> visitor) {
super(name);
this.visitor = visitor;
}
public ExceptionDispatcher(ExceptionVisitor<T> visitor) {
this("single-error-dispatcher", visitor);
}
public ExceptionDispatcher() {
this(null);
}
@Override
public synchronized void receiveError(String message, E e, Object... info) {
// if we already have an error, then ignore it
if (snare.checkForError()) return;
LOG.debug(name.getNamePrefixForLog() + "Accepting received error:" + message);
// mark that we got the error
snare.receiveError(message, e, info);
// notify all the listeners
super.receiveError(message, e, info);
}
@Override
public void failOnError() throws E {
snare.failOnError();
}
@Override
public boolean checkForError() {
return snare.checkForError();
}
public ExceptionVisitor<T> getDefaultVisitor() {
return this.visitor;
}
/**
* Add a typed error listener that will be visited by the {@link ExceptionVisitor}, passed in the
* constructor, when receiving errors.
* @param errorable listener for error notifications
*/
public void addErrorListener(T errorable) {
if (this.visitor == null) throw new UnsupportedOperationException("No error visitor for "
+ errorable + ", can't add it to the listeners");
addErrorListener(this.visitor, errorable);
}
}

View File

@ -1,57 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.server.errorhandling.impl;
import java.util.List;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hbase.server.errorhandling.ExceptionVisitor;
import org.apache.hadoop.hbase.server.errorhandling.FaultInjector;
/**
* Generic error dispatcher factory that just creates an error dispatcher on request (potentially
* wrapping with an error injector via the {@link ExceptionOrchestratorFactory}).
* @param <T> Type of generic error listener the dispatchers should handle
* @see ExceptionOrchestratorFactory
*/
@InterfaceAudience.Public
@InterfaceStability.Unstable
public class ExceptionDispatcherFactory<T> extends
ExceptionOrchestratorFactory<ExceptionDispatcher<T, Exception>, T> {
/**
* @param visitor to use when building an error handler via {@link #createErrorHandler()}.
*/
public ExceptionDispatcherFactory(ExceptionVisitor<T> visitor) {
super(visitor);
}
@Override
protected ExceptionDispatcher<T, Exception> buildErrorHandler(ExceptionVisitor<T> visitor) {
return new ExceptionDispatcher<T, Exception>(visitor);
}
@Override
protected ExceptionDispatcher<T, Exception> wrapWithInjector(
ExceptionDispatcher<T, Exception> dispatcher,
List<FaultInjector<?>> injectors) {
return new InjectingExceptionDispatcher<ExceptionDispatcher<T, Exception>, T, Exception>(dispatcher,
injectors);
}
}

View File

@ -1,123 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.server.errorhandling.impl;
import java.lang.ref.WeakReference;
import java.util.ArrayList;
import java.util.List;
import java.util.Map.Entry;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hbase.server.errorhandling.ExceptionListener;
import org.apache.hadoop.hbase.server.errorhandling.ExceptionVisitor;
import org.apache.hadoop.hbase.server.errorhandling.Name;
import org.apache.hadoop.hbase.util.Pair;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.ListMultimap;
/**
* The orchestrator acts as a central point of control of error handling. Any exceptions passed to
* <tt>this</tt> get passed directly to the listeners.
* <p>
* Any exception listener added will only be <b>weakly referenced</b>, so you must keep a reference
* to it if you want to use it other places. This allows minimal effort error monitoring, allowing
* you to register an error listener and then not worry about having to unregister the listener.
* <p>
* A single {@link ExceptionOrchestrator} should be used for each set of operation attempts (e.g.
* one parent operation with child operations, potentially multiple levels deep) to monitor. This
* allows for a single source of truth for exception dispatch between all the interested operation
* attempts.
* @param <E> Type of {@link Exception} to expect when receiving errors
*/
@InterfaceAudience.Public
@InterfaceStability.Evolving
public class ExceptionOrchestrator<E extends Exception> implements ExceptionListener<E> {
private static final Log LOG = LogFactory.getLog(ExceptionOrchestrator.class);
protected final Name name;
protected final ListMultimap<ExceptionVisitor<?>, WeakReference<?>> listeners = ArrayListMultimap
.create();
/** Error visitor for framework listeners */
public final ForwardingErrorVisitor genericVisitor = new ForwardingErrorVisitor();
public ExceptionOrchestrator() {
this("generic-error-dispatcher");
}
public ExceptionOrchestrator(String name) {
this.name = new Name(name);
}
public Name getName() {
return this.name;
}
@Override
@SuppressWarnings({ "unchecked", "rawtypes" })
public synchronized void receiveError(String message, E e, Object... info) {
// update all the listeners with the passed error
LOG.debug(name.getNamePrefixForLog() + " Recieved error, notifying listeners...");
List<Pair<ExceptionVisitor<?>, WeakReference<?>>> toRemove = new ArrayList<Pair<ExceptionVisitor<?>, WeakReference<?>>>();
for (Entry<ExceptionVisitor<?>, WeakReference<?>> entry : listeners.entries()) {
Object o = entry.getValue().get();
if (o == null) {
// if the listener doesn't have a reference, then drop it from the list
// need to copy this over b/c guava is finicky with the entries
toRemove.add(new Pair<ExceptionVisitor<?>, WeakReference<?>>(entry.getKey(), entry
.getValue()));
continue;
}
// otherwise notify the listener that we had a failure
((ExceptionVisitor) entry.getKey()).visit(o, message, e, info);
}
// cleanup all visitors that aren't referenced anymore
if (toRemove.size() > 0) LOG.debug(name.getNamePrefixForLog() + " Cleaning up entries.");
for (Pair<ExceptionVisitor<?>, WeakReference<?>> entry : toRemove) {
this.listeners.remove(entry.getFirst(), entry.getSecond());
}
}
/**
* Listen for failures to a given process
* @param visitor pass error notifications onto the typed listener, possibly transforming or
* ignore the error notification
* @param errorable listener for the errors
*/
public synchronized <L> void addErrorListener(ExceptionVisitor<L> visitor, L errorable) {
this.listeners.put(visitor, new WeakReference<L>(errorable));
}
/**
* A simple error visitor that just forwards the received error to a generic listener.
*/
private class ForwardingErrorVisitor implements ExceptionVisitor<ExceptionListener<E>> {
@Override
@SuppressWarnings("unchecked")
public void visit(ExceptionListener<E> listener, String message, Exception e, Object... info) {
listener.receiveError(message, (E) e, info);
}
}
}

View File

@ -1,115 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.server.errorhandling.impl;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hbase.server.errorhandling.ExceptionCheckable;
import org.apache.hadoop.hbase.server.errorhandling.ExceptionVisitor;
import org.apache.hadoop.hbase.server.errorhandling.FaultInjector;
/**
* Error factory that produces an {@link ExceptionOrchestrator}, potentially wrapped with a
* {@link FaultInjector}.
* @param <D> type for {@link ExceptionOrchestrator} that should be used
* @param <T> Type of error listener that the dispatcher from this factory can communicate
*/
@InterfaceAudience.Public
@InterfaceStability.Unstable
public abstract class ExceptionOrchestratorFactory<D extends ExceptionOrchestrator<?>, T> {
private static final List<FaultInjector<?>> faults = new ArrayList<FaultInjector<?>>();
/**
* Add a fault injector that will run on checks of the {@link ExceptionCheckable} generated by
* this factory. To ensure that faults are injected, this must be called before the the handler is
* created via {@link #createErrorHandler()}.
* <p>
* Exposed for TESTING.
* @param injector fault injector to add
* @param <E> type of exception that will be thrown on checks of
* {@link ExceptionCheckable#failOnError()} from created exception monitors
*/
public static <E extends Exception> void addFaultInjector(FaultInjector<E> injector) {
faults.add(injector);
}
/**
* Complement to {@link #addFaultInjector(FaultInjector)} - removes any existing fault injectors
* set for the factory.
* <p>
* Exposed for TESTING.
*/
public static void clearFaults() {
faults.clear();
}
protected final ExceptionVisitor<T> visitor;
/**
* @param visitor to use when building an error handler via {@link #createErrorHandler()}.
*/
public ExceptionOrchestratorFactory(ExceptionVisitor<T> visitor) {
this.visitor = visitor;
}
/**
* Create a dispatcher with a specific visitor
* @param visitor visitor to pass on error notifications to bound error listeners
* @return an error dispatcher that is passes on errors to all listening objects
*/
public final D createErrorHandler(ExceptionVisitor<T> visitor) {
D handler = buildErrorHandler(visitor);
// wrap with a fault injector, if we need to
if (faults.size() > 0) {
return wrapWithInjector(handler, faults);
}
return handler;
}
/**
* Create a dispatcher with a specific visitor. Uses the default visitor passed in the constructor
* @return an error dispatcher that is passes on errors to all listening objects
*/
public final D createErrorHandler() {
return createErrorHandler(this.visitor);
}
/**
* Build an error handler. This will be wrapped via
* {@link #wrapWithInjector(ErrorMonitorable, List)} if there are fault injectors present.
* @return an error handler
*/
protected abstract D buildErrorHandler(ExceptionVisitor<T> visitor);
/**
* Wrap the built error handler with an error injector. Subclasses should override if they need
* custom error injection. Generally, this will just wrap calls to &ltD&gt by first checking the
* {@link #faults} that were dynamically injected and then, if the {@link FaultInjector} didn't
* inject a fault, that actual methods are called.
* <p>
* This method will only be called if there are fault injectors present. Otherwise, the handler
* will just be built via {@link #buildErrorHandler(ExceptionVisitor)}.
* @param delegate built delegate to wrap with injector checking
* @param injectors injectors that should be checked
* @return a &ltD&gt that also does {@link FaultInjector} checking
*/
protected abstract D wrapWithInjector(D delegate, List<FaultInjector<?>> injectors);
}

View File

@ -1,96 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.server.errorhandling.impl;
import java.util.Arrays;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hbase.server.errorhandling.ExceptionCheckable;
import org.apache.hadoop.hbase.server.errorhandling.ExceptionListener;
import org.apache.hadoop.hbase.server.errorhandling.Name;
import org.apache.hadoop.hbase.server.errorhandling.exception.UnknownErrorException;
/**
* Simple exception handler that keeps track of whether of its failure state, and the exception that
* should be thrown based on the received error.
* <p>
* Ensures that an exception is not propagated if an error has already been received, ensuring that
* you don't have infinite error propagation.
* <p>
* You can think of it like a 'one-time-use' {@link ExceptionCheckable}, that once it receives an
* error will not listen to any new error updates.
* <p>
* Thread-safe.
* @param <E> Type of exception to throw when calling {@link #failOnError()}
*/
@InterfaceAudience.Public
@InterfaceStability.Evolving
public class ExceptionSnare<E extends Exception> implements ExceptionCheckable<E>,
ExceptionListener<E> {
private static final Log LOG = LogFactory.getLog(ExceptionSnare.class);
private boolean error = false;
protected E exception;
protected Name name;
/**
* Create an exception snare with a generic error name
*/
public ExceptionSnare() {
this.name = new Name("generic-error-snare");
}
@Override
public void failOnError() throws E {
if (checkForError()) {
if (exception == null) throw new UnknownErrorException();
throw exception;
}
}
@Override
public boolean checkForError() {
return this.error;
}
@Override
public void receiveError(String message, E e, Object... info) {
LOG.error(name.getNamePrefixForLog() + "Got an error:" + message + ", info:"
+ Arrays.toString(info));
receiveInternalError(e);
}
/**
* Receive an error notification from internal sources. Can be used by subclasses to set an error.
* <p>
* This method may be called concurrently, so precautions must be taken to not clobber yourself,
* either making the method <tt>synchronized</tt>, synchronizing on <tt>this</tt> of calling this
* method.
* @param e exception that caused the error (can be null).
*/
protected synchronized void receiveInternalError(E e) {
// if we already got the error or we received the error fail fast
if (this.error) return;
// store the error since we haven't seen it before
this.error = true;
this.exception = e;
}
}

View File

@ -1,92 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.server.errorhandling.impl;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hbase.server.errorhandling.ExceptionCheckable;
import org.apache.hadoop.hbase.server.errorhandling.FaultInjector;
import org.apache.hadoop.hbase.server.errorhandling.impl.delegate.DelegatingExceptionDispatcher;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.jasper.compiler.ErrorDispatcher;
/**
* {@link ErrorDispatcher} that delegates calls for all methods, but wraps exception checking to
* allow the fault injectors to have a chance to inject a fault into the running process
* @param <D> {@link ExceptionOrchestrator} to wrap for fault checking
* @param <T> type of generic error listener that should be notified
* @param <E> exception to be thrown on checks of {@link #failOnError()}
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
public class InjectingExceptionDispatcher<D extends ExceptionDispatcher<T, E>, T, E extends Exception> extends
DelegatingExceptionDispatcher<D, T, E> {
private final List<FaultInjector<E>> faults;
/**
* Wrap an exception handler with one that will inject faults on calls to {@link #checkForError()}
* .
* @param delegate base exception handler to wrap
* @param faults injectors to run each time there is a check for an error
*/
@SuppressWarnings("unchecked")
public InjectingExceptionDispatcher(D delegate, List<FaultInjector<?>> faults) {
super(delegate);
// since we don't know the type of fault injector, we need to convert it.
// this is only used in tests, so throwing a class-cast here isn't too bad.
this.faults = new ArrayList<FaultInjector<E>>(faults.size());
for (FaultInjector<?> fault : faults) {
this.faults.add((FaultInjector<E>) fault);
}
}
@Override
public void failOnError() throws E {
// first fail if there is already an error
delegate.failOnError();
// then check for an error via the update mechanism
if (this.checkForError()) delegate.failOnError();
}
/**
* Use the injectors to possibly inject an error into the delegate. Should call
* {@link ExceptionCheckable#checkForError()} or {@link ExceptionCheckable#failOnError()} after calling
* this method on return of <tt>true</tt>.
* @return <tt>true</tt> if an error found via injector or in the delegate, <tt>false</tt>
* otherwise
*/
@Override
public boolean checkForError() {
// if there are fault injectors, run them
if (faults.size() > 0) {
// get the caller of this method. Should be the direct calling class
StackTraceElement[] trace = Thread.currentThread().getStackTrace();
for (FaultInjector<E> injector : faults) {
Pair<E, Object[]> info = injector.injectFault(trace);
if (info != null) {
delegate.receiveError("Injected fail", info.getFirst(), info.getSecond());
}
}
}
return delegate.checkForError();
}
}

View File

@ -1,71 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.server.errorhandling.impl.delegate;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hbase.server.errorhandling.ExceptionVisitor;
import org.apache.hadoop.hbase.server.errorhandling.impl.ExceptionDispatcher;
/**
* Helper class for exception handler factories.
* @param <D> Type of delegate to use
* @param <T> type of generic error listener to update
* @param <E> exception to expect for errors
*/
@InterfaceAudience.Public
@InterfaceStability.Evolving
public class DelegatingExceptionDispatcher<D extends ExceptionDispatcher<T, E>, T, E extends Exception>
extends ExceptionDispatcher<T, E> {
protected final D delegate;
public DelegatingExceptionDispatcher(D delegate) {
super("delegate - " + delegate.getName(), delegate.getDefaultVisitor());
this.delegate = delegate;
}
@Override
public ExceptionVisitor<T> getDefaultVisitor() {
return delegate.getDefaultVisitor();
}
@Override
public void receiveError(String message, E e, Object... info) {
delegate.receiveError(message, e, info);
}
@Override
public <L> void addErrorListener(ExceptionVisitor<L> visitor, L errorable) {
delegate.addErrorListener(visitor, errorable);
}
@Override
public void failOnError() throws E {
delegate.failOnError();
}
@Override
public boolean checkForError() {
return delegate.checkForError();
}
@Override
public void addErrorListener(T errorable) {
delegate.addErrorListener(errorable);
}
}

View File

@ -32,19 +32,17 @@ import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.errorhandling.ForeignExceptionListener;
import org.apache.hadoop.hbase.errorhandling.TimeoutExceptionInjector;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.HStore;
import org.apache.hadoop.hbase.regionserver.wal.HLogUtil;
import org.apache.hadoop.hbase.server.errorhandling.ExceptionListener;
import org.apache.hadoop.hbase.server.errorhandling.OperationAttemptTimer;
import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
import org.apache.hadoop.hbase.snapshot.exception.CorruptedSnapshotException;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.FSUtils.DirFilter;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.Multimap;
@ -118,11 +116,11 @@ public class TakeSnapshotUtils {
* @return the timer to use update to signal the start and end of the snapshot
*/
@SuppressWarnings("rawtypes")
public static OperationAttemptTimer getMasterTimerAndBindToMonitor(SnapshotDescription snapshot,
Configuration conf, ExceptionListener monitor) {
public static TimeoutExceptionInjector getMasterTimerAndBindToMonitor(SnapshotDescription snapshot,
Configuration conf, ForeignExceptionListener monitor) {
long maxTime = SnapshotDescriptionUtils.getMaxMasterTimeout(conf, snapshot.getType(),
SnapshotDescriptionUtils.DEFAULT_MAX_WAIT_TIME);
return new OperationAttemptTimer(monitor, maxTime, snapshot);
return new TimeoutExceptionInjector(monitor, maxTime);
}
/**

View File

@ -1,71 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.server.snapshot.error;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
import org.apache.hadoop.hbase.server.errorhandling.impl.ExceptionSnare;
import org.apache.hadoop.hbase.snapshot.exception.HBaseSnapshotException;
import org.apache.hadoop.hbase.snapshot.exception.UnexpectedSnapshotException;
/**
* {@link ExceptionSnare} for snapshot exceptions, ensuring that only the first exception is
* retained and always returned via {@link #failOnError()}.
* <p>
* Ensures that any generic exceptions received via
* {@link #receiveError(String, HBaseSnapshotException, Object...)} are in fact propagated as
* {@link HBaseSnapshotException}.
*/
public class SnapshotExceptionSnare extends ExceptionSnare<HBaseSnapshotException> implements
SnapshotFailureListener {
private SnapshotDescription snapshot;
/**
* Create a snare that expects errors for the passed snapshot. Any untyped exceptions passed to
* {@link #receiveError(String, HBaseSnapshotException, Object...)} are wrapped as an
* {@link UnexpectedSnapshotException} with the passed {@link SnapshotDescription}.
* @param snapshot
*/
public SnapshotExceptionSnare(SnapshotDescription snapshot) {
this.snapshot = snapshot;
}
@Override
public void snapshotFailure(String reason, SnapshotDescription snapshot) {
this.receiveError(reason, null, snapshot);
}
@Override
public void snapshotFailure(String reason, SnapshotDescription snapshot, Exception t) {
this.receiveError(reason,
t instanceof HBaseSnapshotException ? (HBaseSnapshotException) t
: new UnexpectedSnapshotException(reason, t, snapshot), snapshot);
}
@Override
public void failOnError() throws HBaseSnapshotException {
try {
super.failOnError();
} catch (Exception e) {
if (e instanceof HBaseSnapshotException) {
throw (HBaseSnapshotException) e;
}
throw new UnexpectedSnapshotException(e.getMessage(), e, snapshot);
}
}
}

View File

@ -1,41 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.server.snapshot.error;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
/**
* Generic running snapshot failure listener
*/
public interface SnapshotFailureListener {
/**
* Notification that a given snapshot failed because of an error on the local server
* @param snapshot snapshot that failed
* @param reason explanation of why the snapshot failed
*/
public void snapshotFailure(String reason, SnapshotDescription snapshot);
/**
* Notification that a given snapshot failed because of an error on the local server
* @param reason reason the snapshot failed
* @param snapshot the snapshot that failed
* @param t the exception that caused the failure
*/
public void snapshotFailure(String reason, SnapshotDescription snapshot, Exception t);
}

View File

@ -28,9 +28,9 @@ import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
import org.apache.hadoop.hbase.regionserver.wal.HLogUtil;
import org.apache.hadoop.hbase.server.snapshot.error.SnapshotExceptionSnare;
/**
* Copy over each of the files in a region's recovered.edits directory to the region's snapshot
@ -56,18 +56,18 @@ public class CopyRecoveredEditsTask extends SnapshotTask {
* @param regionDir directory for the region to examine for edits
* @param snapshotRegionDir directory for the region in the snapshot
*/
public CopyRecoveredEditsTask(SnapshotDescription snapshot, SnapshotExceptionSnare monitor,
public CopyRecoveredEditsTask(SnapshotDescription snapshot, ForeignExceptionDispatcher monitor,
FileSystem fs, Path regionDir, Path snapshotRegionDir) {
super(snapshot, monitor, "Copy recovered.edits for region:" + regionDir.getName());
super(snapshot, monitor);
this.fs = fs;
this.regiondir = regionDir;
this.outputDir = HLogUtil.getRegionDirRecoveredEditsDir(snapshotRegionDir);
}
@Override
public void process() throws IOException {
public Void call() throws IOException {
NavigableSet<Path> files = HLogUtil.getSplitEditFilesSorted(this.fs, regiondir);
if (files == null || files.size() == 0) return;
if (files == null || files.size() == 0) return null;
// copy over each file.
// this is really inefficient (could be trivially parallelized), but is
@ -83,7 +83,8 @@ public class CopyRecoveredEditsTask extends SnapshotTask {
FileUtil.copy(fs, source, fs, out, true, fs.getConf());
// check for errors to the running operation after each file
this.failOnError();
this.rethrowException();
}
return null;
}
}

View File

@ -26,9 +26,9 @@ import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
import org.apache.hadoop.hbase.server.snapshot.TakeSnapshotUtils;
import org.apache.hadoop.hbase.server.snapshot.error.SnapshotExceptionSnare;
import org.apache.hadoop.hbase.util.FSUtils;
/**
@ -54,8 +54,8 @@ public class ReferenceRegionHFilesTask extends SnapshotTask {
* @param regionSnapshotDir directory in the snapshot to store region files
*/
public ReferenceRegionHFilesTask(final SnapshotDescription snapshot,
SnapshotExceptionSnare monitor, Path regionDir, final FileSystem fs, Path regionSnapshotDir) {
super(snapshot, monitor, "Reference hfiles for region:" + regionDir.getName());
ForeignExceptionDispatcher monitor, Path regionDir, final FileSystem fs, Path regionSnapshotDir) {
super(snapshot, monitor);
this.regiondir = regionDir;
this.fs = fs;
@ -76,14 +76,14 @@ public class ReferenceRegionHFilesTask extends SnapshotTask {
}
@Override
public void process() throws IOException {
public Void call() throws IOException {
FileStatus[] families = FSUtils.listStatus(fs, regiondir, new FSUtils.FamilyDirFilter(fs));
// if no families, then we are done again
if (families == null || families.length == 0) {
LOG.info("No families under region directory:" + regiondir
+ ", not attempting to add references.");
return;
return null;
}
// snapshot directories to store the hfile reference
@ -109,6 +109,7 @@ public class ReferenceRegionHFilesTask extends SnapshotTask {
// create a reference for each hfile
for (FileStatus hfile : hfiles) {
// references are 0-length files, relying on file name.
Path referenceFile = new Path(snapshotFamilyDir, hfile.getPath().getName());
LOG.debug("Creating reference for:" + hfile.getPath() + " at " + referenceFile);
if (!fs.createNewFile(referenceFile)) {
@ -122,5 +123,6 @@ public class ReferenceRegionHFilesTask extends SnapshotTask {
LOG.debug("and the snapshot directory:");
FSUtils.logFileSystemState(fs, snapshotDir, LOG);
}
return null;
}
}

View File

@ -28,9 +28,10 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.errorhandling.ForeignException;
import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
import org.apache.hadoop.hbase.server.snapshot.TakeSnapshotUtils;
import org.apache.hadoop.hbase.server.snapshot.error.SnapshotExceptionSnare;
import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
import org.apache.hadoop.hbase.util.FSUtils;
@ -41,7 +42,6 @@ import org.apache.hadoop.hbase.util.FSUtils;
@InterfaceStability.Evolving
public class ReferenceServerWALsTask extends SnapshotTask {
private static final Log LOG = LogFactory.getLog(ReferenceServerWALsTask.class);
// XXX does this need to be HasThread?
private final FileSystem fs;
private final Configuration conf;
private final String serverName;
@ -53,23 +53,28 @@ public class ReferenceServerWALsTask extends SnapshotTask {
* propagate errors found while running the task
* @param logDir log directory for the server. Name of the directory is taken as the name of the
* server
* @param conf {@link Configuration} to extract fileystem information
* @param conf {@link Configuration} to extract filesystem information
* @param fs filesystem where the log files are stored and should be referenced
* @throws IOException
*/
public ReferenceServerWALsTask(SnapshotDescription snapshot,
SnapshotExceptionSnare failureListener, final Path logDir, final Configuration conf,
final FileSystem fs) throws IOException {
super(snapshot, failureListener, "Reference WALs for server:" + logDir.getName());
ForeignExceptionDispatcher failureListener, final Path logDir, final Configuration conf,
final FileSystem fs) {
super(snapshot, failureListener);
this.fs = fs;
this.conf = conf;
this.serverName = logDir.getName();
this.logDir = logDir;
}
/**
* Create reference files (empty files with the same path and file name as original).
* @throws IOException exception from hdfs or network problems
* @throws ForeignException exception from an external procedure
*/
@Override
public void process() throws IOException {
public Void call() throws IOException, ForeignException {
// TODO switch to using a single file to reference all required WAL files
// Iterate through each of the log files and add a reference to it.
// assumes that all the files under the server's logs directory is a log
FileStatus[] serverLogs = FSUtils.listStatus(fs, logDir, null);
@ -80,12 +85,9 @@ public class ReferenceServerWALsTask extends SnapshotTask {
+ Arrays.toString(serverLogs));
for (FileStatus file : serverLogs) {
this.failOnError();
this.rethrowException();
// TODO - switch to using MonitoredTask
// add the reference to the file
// 0. Build a reference path based on the file name
// get the current snapshot directory
// add the reference to the file. ex: hbase/.snapshots/.logs/<serverName>/<hlog>
Path rootDir = FSUtils.getRootDir(conf);
Path snapshotDir = SnapshotDescriptionUtils.getWorkingSnapshotDir(this.snapshot, rootDir);
Path snapshotLogDir = TakeSnapshotUtils.getSnapshotHLogsDir(snapshotDir, serverName);
@ -98,6 +100,8 @@ public class ReferenceServerWALsTask extends SnapshotTask {
}
LOG.debug("Completed WAL referencing for: " + file.getPath() + " to " + ref);
}
LOG.debug("Successfully completed WAL referencing for ALL files");
return null;
}
}

View File

@ -17,65 +17,49 @@
*/
package org.apache.hadoop.hbase.server.snapshot.task;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import java.util.concurrent.Callable;
import org.apache.hadoop.hbase.errorhandling.ForeignException;
import org.apache.hadoop.hbase.errorhandling.ForeignExceptionSnare;
import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
import org.apache.hadoop.hbase.server.errorhandling.ExceptionCheckable;
import org.apache.hadoop.hbase.server.snapshot.error.SnapshotExceptionSnare;
import org.apache.hadoop.hbase.snapshot.exception.HBaseSnapshotException;
/**
* General snapshot operation taken on a regionserver
*/
public abstract class SnapshotTask implements ExceptionCheckable<HBaseSnapshotException>, Runnable {
private static final Log LOG = LogFactory.getLog(SnapshotTask.class);
private final SnapshotExceptionSnare errorMonitor;
private final String desc;
public abstract class SnapshotTask implements ForeignExceptionSnare, Callable<Void>{
protected final SnapshotDescription snapshot;
protected final ForeignExceptionDispatcher errorMonitor;
/**
* @param snapshot Description of the snapshot we are going to operate on
* @param monitor listener interested in failures to the snapshot caused by this operation
* @param description description of the task being run, for logging
*/
public SnapshotTask(SnapshotDescription snapshot, SnapshotExceptionSnare monitor,
String description) {
public SnapshotTask(SnapshotDescription snapshot, ForeignExceptionDispatcher monitor) {
assert monitor != null : "ForeignExceptionDispatcher must not be null!";
assert snapshot != null : "SnapshotDescription must not be null!";
this.snapshot = snapshot;
this.errorMonitor = monitor;
this.desc = description;
}
protected final void snapshotFailure(String message, Exception e) {
this.errorMonitor.snapshotFailure(message, this.snapshot, e);
public void snapshotFailure(String message, Exception e) {
ForeignException ee = new ForeignException(message, e);
errorMonitor.receive(ee);
}
@Override
public void failOnError() throws HBaseSnapshotException {
this.errorMonitor.failOnError();
public void rethrowException() throws ForeignException {
this.errorMonitor.rethrowException();
}
@Override
public boolean checkForError() {
return this.errorMonitor.checkForError();
public boolean hasException() {
return this.errorMonitor.hasException();
}
@Override
public void run() {
try {
LOG.debug("Running: " + desc);
this.process();
} catch (Exception e) {
this.snapshotFailure("Failed to run " + this.desc, e);
}
public ForeignException getException() {
return this.errorMonitor.getException();
}
/**
* Run the task for the snapshot.
* @throws Exception if the task fails. Will be propagated to any other tasks watching the same
* {@link SnapshotErrorListener}.
*/
protected abstract void process() throws Exception;
}

View File

@ -17,8 +17,6 @@
*/
package org.apache.hadoop.hbase.server.snapshot.task;
import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
@ -26,8 +24,8 @@ import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
import org.apache.hadoop.hbase.server.snapshot.error.SnapshotExceptionSnare;
import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FSTableDescriptors;
@ -45,31 +43,31 @@ public class TableInfoCopyTask extends SnapshotTask {
/**
* Copy the table info for the given table into the snapshot
* @param failureListener listen for errors while running the snapshot
* @param monitor listen for errors while running the snapshot
* @param snapshot snapshot for which we are copying the table info
* @param fs {@link FileSystem} where the tableinfo is stored (and where the copy will be written)
* @param rootDir root of the {@link FileSystem} where the tableinfo is stored
*/
public TableInfoCopyTask(SnapshotExceptionSnare failureListener, SnapshotDescription snapshot,
FileSystem fs, Path rootDir) {
super(snapshot, failureListener, "Copy table info for table: " + snapshot.getTable());
public TableInfoCopyTask(ForeignExceptionDispatcher monitor,
SnapshotDescription snapshot, FileSystem fs, Path rootDir) {
super(snapshot, monitor);
this.rootDir = rootDir;
this.fs = fs;
}
@Override
public void process() throws IOException {
public Void call() throws Exception {
LOG.debug("Running table info copy.");
this.failOnError();
this.rethrowException();
LOG.debug("Attempting to copy table info for snapshot:" + this.snapshot);
// get the HTable descriptor
HTableDescriptor orig = FSTableDescriptors.getTableDescriptor(fs, rootDir,
Bytes.toBytes(this.snapshot.getTable()));
this.failOnError();
this.rethrowException();
// write a copy of descriptor to the snapshot directory
Path snapshotDir = SnapshotDescriptionUtils.getWorkingSnapshotDir(snapshot, rootDir);
FSTableDescriptors.createTableDescriptorForTableDirectory(fs, snapshotDir, orig, false);
LOG.debug("Finished copying tableinfo.");
return null;
}
}

View File

@ -20,24 +20,17 @@ package org.apache.hadoop.hbase.snapshot;
import java.io.IOException;
import java.util.HashSet;
import java.util.TreeMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import java.util.TreeMap;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.io.Reference;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.io.HFileLink;
import org.apache.hadoop.hbase.regionserver.wal.HLog;
import org.apache.hadoop.hbase.regionserver.wal.HLogUtil;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.FSVisitor;

View File

@ -46,4 +46,11 @@ public class CorruptedSnapshotException extends HBaseSnapshotException {
public CorruptedSnapshotException(String message, SnapshotDescription snapshot) {
super(message, snapshot);
}
/**
* @param message message describing the exception
*/
public CorruptedSnapshotException(String message) {
super(message, (SnapshotDescription)null);
}
}

View File

@ -28,7 +28,7 @@ import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescriptio
@SuppressWarnings("serial")
@InterfaceAudience.Public
@InterfaceStability.Evolving
public abstract class HBaseSnapshotException extends HBaseIOException {
public class HBaseSnapshotException extends HBaseIOException {
private SnapshotDescription description;

View File

@ -40,6 +40,6 @@ public class SnapshotDoesNotExistException extends HBaseSnapshotException {
* @param desc expected snapshot to find
*/
public SnapshotDoesNotExistException(SnapshotDescription desc) {
super("Snapshot doesn't exist on the filesystem", desc);
super("Snapshot '" + desc.getName() +"' doesn't exist on the filesystem", desc);
}
}

View File

@ -28,11 +28,15 @@ import org.apache.hadoop.classification.InterfaceStability;
@InterfaceStability.Evolving
public class UnknownSnapshotException extends HBaseSnapshotException {
/**
* @param msg full infomration about the failure
* @param msg full information about the failure
*/
public UnknownSnapshotException(String msg) {
super(msg);
}
public UnknownSnapshotException(String msg, Exception e) {
super(msg, e);
}
}

View File

@ -18,46 +18,35 @@
package org.apache.hadoop.hbase.snapshot.restore;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.TreeMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.backup.HFileArchiver;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
import org.apache.hadoop.hbase.catalog.CatalogTracker;
import org.apache.hadoop.hbase.catalog.MetaEditor;
import org.apache.hadoop.hbase.server.snapshot.error.SnapshotExceptionSnare;
import org.apache.hadoop.hbase.snapshot.exception.RestoreSnapshotException;
import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
import org.apache.hadoop.hbase.snapshot.SnapshotReferenceUtil;
import org.apache.hadoop.hbase.io.Reference;
import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher;
import org.apache.hadoop.hbase.io.HFileLink;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.snapshot.SnapshotReferenceUtil;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.FSTableDescriptors;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.FSVisitor;
import org.apache.hadoop.hbase.util.ModifyRegionUtils;
@ -110,7 +99,7 @@ public class RestoreSnapshotHelper {
private final Map<byte[], byte[]> regionsMap =
new TreeMap<byte[], byte[]>(Bytes.BYTES_COMPARATOR);
private final SnapshotExceptionSnare monitor;
private final ForeignExceptionDispatcher monitor;
private final SnapshotDescription snapshotDesc;
private final Path snapshotDir;
@ -126,7 +115,7 @@ public class RestoreSnapshotHelper {
final CatalogTracker catalogTracker,
final SnapshotDescription snapshotDescription, final Path snapshotDir,
final HTableDescriptor tableDescriptor, final Path tableDir,
final SnapshotExceptionSnare monitor)
final ForeignExceptionDispatcher monitor)
{
this.fs = fs;
this.conf = conf;
@ -155,7 +144,7 @@ public class RestoreSnapshotHelper {
// NOTE: we rely upon the region name as: "table name, start key, end key"
List<HRegionInfo> tableRegions = getTableRegions();
if (tableRegions != null) {
monitor.failOnError();
monitor.rethrowException();
List<HRegionInfo> regionsToRestore = new LinkedList<HRegionInfo>();
List<HRegionInfo> regionsToRemove = new LinkedList<HRegionInfo>();
@ -172,11 +161,11 @@ public class RestoreSnapshotHelper {
}
// Restore regions using the snapshot data
monitor.failOnError();
monitor.rethrowException();
restoreRegions(regionsToRestore);
// Remove regions from the current table
monitor.failOnError();
monitor.rethrowException();
ModifyRegionUtils.deleteRegions(fs, catalogTracker, regionsToRemove);
}
@ -184,7 +173,7 @@ public class RestoreSnapshotHelper {
if (snapshotRegionNames.size() > 0) {
List<HRegionInfo> regionsToAdd = new LinkedList<HRegionInfo>();
monitor.failOnError();
monitor.rethrowException();
for (String regionName: snapshotRegionNames) {
LOG.info("region to add: " + regionName);
Path regionDir = new Path(snapshotDir, regionName);
@ -192,12 +181,12 @@ public class RestoreSnapshotHelper {
}
// Create new regions cloning from the snapshot
monitor.failOnError();
monitor.rethrowException();
cloneRegions(regionsToAdd);
}
// Restore WALs
monitor.failOnError();
monitor.rethrowException();
restoreWALs();
}

View File

@ -952,13 +952,14 @@ public abstract class FSUtils {
this.fs = fs;
}
@Override
public boolean accept(Path p) {
boolean isValid = false;
try {
if (HConstants.HBASE_NON_USER_TABLE_DIRS.contains(p.toString())) {
isValid = false;
} else {
isValid = this.fs.getFileStatus(p).isDir();
isValid = fs.getFileStatus(p).isDir();
}
} catch (IOException e) {
LOG.warn("An error occurred while verifying if [" + p.toString() +
@ -968,6 +969,21 @@ public abstract class FSUtils {
}
}
/**
* Filter out paths that are hidden (start with '.') and are not directories.
*/
public static class VisibleDirectory extends DirFilter {
public VisibleDirectory(FileSystem fs) {
super(fs);
}
@Override
public boolean accept(Path file) {
return super.accept(file) && !file.getName().startsWith(".");
}
}
/**
* Heuristic to determine whether is safe or not to open a file for append
* Looks both for dfs.support.append and use reflection to search
@ -1306,19 +1322,6 @@ public abstract class FSUtils {
return fs.exists(path);
}
/**
* Log the current state of the filesystem from a certain root directory
* @param fs filesystem to investigate
* @param root root file/directory to start logging from
* @param LOG log to output information
* @throws IOException if an unexpected exception occurs
*/
public static void logFileSystemState(final FileSystem fs, final Path root, Log LOG)
throws IOException {
LOG.debug("Current file system:");
logFSTree(LOG, fs, root, "|-");
}
/**
* Throw an exception if an action is not permitted by a user on a file.
*
@ -1355,6 +1358,19 @@ public abstract class FSUtils {
return false;
}
/**
* Log the current state of the filesystem from a certain root directory
* @param fs filesystem to investigate
* @param root root file/directory to start logging from
* @param LOG log to output information
* @throws IOException if an unexpected exception occurs
*/
public static void logFileSystemState(final FileSystem fs, final Path root, Log LOG)
throws IOException {
LOG.debug("Current file system:");
logFSTree(LOG, fs, root, "|-");
}
/**
* Recursive helper to log the state of the FS
*

View File

@ -38,8 +38,6 @@ import org.apache.hadoop.hbase.MediumTests;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.master.HMaster;
import org.apache.hadoop.hbase.master.cleaner.BaseHFileCleanerDelegate;
import org.apache.hadoop.hbase.master.cleaner.HFileCleaner;
import org.apache.hadoop.hbase.master.snapshot.DisabledTableSnapshotHandler;
import org.apache.hadoop.hbase.master.snapshot.SnapshotHFileCleaner;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
@ -51,8 +49,6 @@ import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.ListSnapshot
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
import org.apache.hadoop.hbase.snapshot.SnapshotTestingUtils;
import org.apache.hadoop.hbase.snapshot.exception.HBaseSnapshotException;
import org.apache.hadoop.hbase.snapshot.exception.SnapshotCreationException;
import org.apache.hadoop.hbase.snapshot.exception.UnknownSnapshotException;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FSUtils;
@ -79,10 +75,11 @@ public class TestSnapshotFromMaster {
private static final int NUM_RS = 2;
private static Path rootDir;
private static Path snapshots;
private static Path archiveDir;
private static FileSystem fs;
private static HMaster master;
// for hfile archiving test.
private static Path archiveDir;
private static final String STRING_TABLE_NAME = "test";
private static final byte[] TEST_FAM = Bytes.toBytes("fam");
private static final byte[] TABLE_NAME = Bytes.toBytes(STRING_TABLE_NAME);
@ -118,7 +115,7 @@ public class TestSnapshotFromMaster {
conf.setInt("hbase.client.retries.number", 1);
// set the only HFile cleaner as the snapshot cleaner
conf.setStrings(HFileCleaner.MASTER_HFILE_CLEANER_PLUGINS,
SnapshotHFileCleaner.class.getCanonicalName());
SnapshotHFileCleaner.class.getCanonicalName());
conf.setLong(SnapshotHFileCleaner.HFILE_CACHE_REFRESH_PERIOD_CONF_KEY, cacheRefreshPeriod);
}
@ -130,7 +127,6 @@ public class TestSnapshotFromMaster {
@After
public void tearDown() throws Exception {
UTIL.deleteTable(TABLE_NAME);
// delete the archive directory, if its exists
@ -186,7 +182,7 @@ public class TestSnapshotFromMaster {
// set a mock handler to simulate a snapshot
DisabledTableSnapshotHandler mockHandler = Mockito.mock(DisabledTableSnapshotHandler.class);
Mockito.when(mockHandler.getExceptionIfFailed()).thenReturn(null);
Mockito.when(mockHandler.getException()).thenReturn(null);
Mockito.when(mockHandler.getSnapshot()).thenReturn(desc);
Mockito.when(mockHandler.isFinished()).thenReturn(new Boolean(true));
@ -218,15 +214,6 @@ public class TestSnapshotFromMaster {
builder.setSnapshot(desc);
response = master.isSnapshotDone(null, builder.build());
assertTrue("Completed, on-disk snapshot not found", response.getDone());
HBaseSnapshotException testException = new SnapshotCreationException("test fail", desc);
Mockito.when(mockHandler.getExceptionIfFailed()).thenReturn(testException);
try {
master.isSnapshotDone(null, builder.build());
fail("Master should have passed along snapshot error, but didn't");
}catch(ServiceException e) {
LOG.debug("Correctly got exception back from the master on failure: " + e.getMessage());
}
}
@Test

View File

@ -18,27 +18,23 @@
package org.apache.hadoop.hbase.master.snapshot.manage;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.SmallTests;
import org.apache.hadoop.hbase.TableDescriptors;
import org.apache.hadoop.hbase.executor.ExecutorService;
import org.apache.hadoop.hbase.master.MasterFileSystem;
import org.apache.hadoop.hbase.master.MasterServices;
import org.apache.hadoop.hbase.master.SnapshotSentinel;
import org.apache.hadoop.hbase.master.snapshot.DisabledTableSnapshotHandler;
import org.apache.hadoop.hbase.master.snapshot.TakeSnapshotHandler;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
import org.apache.hadoop.hbase.snapshot.exception.SnapshotCreationException;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.apache.zookeeper.KeeperException;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@ -49,11 +45,9 @@ import org.mockito.Mockito;
*/
@Category(SmallTests.class)
public class TestSnapshotManager {
private static final Log LOG = LogFactory.getLog(TestSnapshotManager.class);
private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
MasterServices services = Mockito.mock(MasterServices.class);
ZooKeeperWatcher watcher = Mockito.mock(ZooKeeperWatcher.class);
ExecutorService pool = Mockito.mock(ExecutorService.class);
MasterFileSystem mfs = Mockito.mock(MasterFileSystem.class);
FileSystem fs;
@ -63,23 +57,21 @@ public class TestSnapshotManager {
} catch (IOException e) {
throw new RuntimeException("Couldn't get test filesystem", e);
}
}
private SnapshotManager getNewManager() throws KeeperException {
Mockito.reset(services, watcher, pool);
private SnapshotManager getNewManager() throws KeeperException, IOException {
Mockito.reset(services);
Mockito.when(services.getConfiguration()).thenReturn(UTIL.getConfiguration());
Mockito.when(services.getMasterFileSystem()).thenReturn(mfs);
Mockito.when(mfs.getFileSystem()).thenReturn(fs);
Mockito.when(mfs.getRootDir()).thenReturn(UTIL.getDataTestDir());
return new SnapshotManager(services, watcher, pool);
return new SnapshotManager(services);
}
@Test
public void testInProcess() throws KeeperException, SnapshotCreationException {
public void testInProcess() throws KeeperException, IOException {
SnapshotManager manager = getNewManager();
SnapshotSentinel handler = Mockito.mock(SnapshotSentinel.class);
TakeSnapshotHandler handler = Mockito.mock(TakeSnapshotHandler.class);
assertFalse("Manager is in process when there is no current handler", manager.isTakingSnapshot());
manager.setSnapshotHandlerForTesting(handler);
Mockito.when(handler.isFinished()).thenReturn(false);
@ -87,46 +79,4 @@ public class TestSnapshotManager {
Mockito.when(handler.isFinished()).thenReturn(true);
assertFalse("Manager is process when handler isn't running", manager.isTakingSnapshot());
}
/**
* Test that we stop the running disabled table snapshot by passing along an error to the error
* handler.
* @throws Exception
*/
@Test
public void testStopPropagation() throws Exception {
// create a new orchestrator and hook up a listener
SnapshotManager manager = getNewManager();
FSUtils.setRootDir(UTIL.getConfiguration(), UTIL.getDataTestDir());
// setup a mock snapshot to run
String tableName = "some table";
SnapshotDescription snapshot = SnapshotDescription.newBuilder().setName("testAbort")
.setTable(tableName).build();
// mock out all the expected call to the master services
// this allows us to run must faster and without using a minicluster
// ensure the table exists when we ask for it
TableDescriptors tables = Mockito.mock(TableDescriptors.class);
Mockito.when(services.getTableDescriptors()).thenReturn(tables);
HTableDescriptor descriptor = Mockito.mock(HTableDescriptor.class);
Mockito.when(tables.get(Mockito.anyString())).thenReturn(descriptor);
// return the local file system as the backing to the MasterFileSystem
MasterFileSystem mfs = Mockito.mock(MasterFileSystem.class);
Mockito.when(mfs.getFileSystem()).thenReturn(UTIL.getTestFileSystem());
Mockito.when(services.getMasterFileSystem()).thenReturn(mfs);
Mockito.when(services.getConfiguration()).thenReturn(UTIL.getConfiguration());
// create a new handler that we will check for errors
manager.snapshotDisabledTable(snapshot);
// make sure we submitted the handler, but because its mocked, it doesn't run it.
Mockito.verify(pool, Mockito.times(1)).submit(Mockito.any(DisabledTableSnapshotHandler.class));
// pass along the stop notification
manager.stop("stopping for test");
SnapshotSentinel handler = manager.getCurrentSnapshotSentinel();
assertNotNull("Snare didn't receive error notification from snapshot manager.",
handler.getExceptionIfFailed());
}
}

View File

@ -1,77 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.server.snapshot.error;
import static org.junit.Assert.fail;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.SmallTests;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
import org.apache.hadoop.hbase.server.errorhandling.ExceptionListener;
import org.apache.hadoop.hbase.server.errorhandling.OperationAttemptTimer;
import org.apache.hadoop.hbase.server.snapshot.TakeSnapshotUtils;
import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
import org.apache.hadoop.hbase.snapshot.exception.HBaseSnapshotException;
import org.junit.Test;
import org.junit.experimental.categories.Category;
/**
* Test the exception snare propagates errors as expected
*/
@Category(SmallTests.class)
public class TestSnapshotExceptionSnare {
private static final Log LOG = LogFactory.getLog(TestSnapshotExceptionSnare.class);
/**
* This test ensures that we only propagate snapshot exceptions, even if we don't get a snapshot
* exception
*/
@Test
@SuppressWarnings({ "rawtypes", "unchecked" })
public void testPropagatesOnlySnapshotException() {
SnapshotDescription snapshot = SnapshotDescription.newBuilder().setName("snapshot").build();
ExceptionListener snare = new SnapshotExceptionSnare(snapshot);
snare.receiveError("Some message", new Exception());
try {
((SnapshotExceptionSnare) snare).failOnError();
fail("Snare didn't throw an exception");
} catch (HBaseSnapshotException e) {
LOG.error("Correctly got a snapshot exception" + e);
}
}
@Test
public void testPropatesTimerError() {
SnapshotDescription snapshot = SnapshotDescription.newBuilder().setName("snapshot").build();
SnapshotExceptionSnare snare = new SnapshotExceptionSnare(snapshot);
Configuration conf = new Configuration();
// don't let the timer count down before we fire it off
conf.setLong(SnapshotDescriptionUtils.MASTER_WAIT_TIME_DISABLED_SNAPSHOT, Long.MAX_VALUE);
OperationAttemptTimer timer = TakeSnapshotUtils.getMasterTimerAndBindToMonitor(snapshot, conf,
snare);
timer.trigger();
try {
snare.failOnError();
} catch (HBaseSnapshotException e) {
LOG.info("Correctly failed from timer:" + e.getMessage());
}
}
}

View File

@ -26,11 +26,11 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.SmallTests;
import org.apache.hadoop.hbase.errorhandling.ForeignException;
import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
import org.apache.hadoop.hbase.regionserver.wal.HLogUtil;
import org.apache.hadoop.hbase.server.snapshot.error.SnapshotExceptionSnare;
import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
import org.apache.hadoop.hbase.snapshot.exception.HBaseSnapshotException;
import org.apache.hadoop.hbase.util.FSUtils;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@ -48,7 +48,7 @@ public class TestCopyRecoveredEditsTask {
public void testCopyFiles() throws Exception {
SnapshotDescription snapshot = SnapshotDescription.newBuilder().setName("snapshot").build();
SnapshotExceptionSnare monitor = Mockito.mock(SnapshotExceptionSnare.class);
ForeignExceptionDispatcher monitor = Mockito.mock(ForeignExceptionDispatcher.class);
FileSystem fs = UTIL.getTestFileSystem();
Path root = UTIL.getDataTestDir();
String regionName = "regionA";
@ -75,7 +75,8 @@ public class TestCopyRecoveredEditsTask {
CopyRecoveredEditsTask task = new CopyRecoveredEditsTask(snapshot, monitor, fs, regionDir,
snapshotRegionDir);
task.run();
CopyRecoveredEditsTask taskSpy = Mockito.spy(task);
taskSpy.call();
Path snapshotEdits = HLogUtil.getRegionDirRecoveredEditsDir(snapshotRegionDir);
FileStatus[] snapshotEditFiles = FSUtils.listStatus(fs, snapshotEdits);
@ -83,12 +84,10 @@ public class TestCopyRecoveredEditsTask {
FileStatus file = snapshotEditFiles[0];
assertEquals("Didn't copy expected file", file1.getName(), file.getPath().getName());
Mockito.verify(monitor, Mockito.never()).receiveError(Mockito.anyString(),
Mockito.any(HBaseSnapshotException.class));
Mockito.verify(monitor, Mockito.never()).snapshotFailure(Mockito.anyString(),
Mockito.any(SnapshotDescription.class));
Mockito.verify(monitor, Mockito.never()).snapshotFailure(Mockito.anyString(),
Mockito.any(SnapshotDescription.class), Mockito.any(Exception.class));
Mockito.verify(monitor, Mockito.never()).receive(Mockito.any(ForeignException.class));
Mockito.verify(taskSpy, Mockito.never()).snapshotFailure(Mockito.anyString(),
Mockito.any(Exception.class));
} finally {
// cleanup the working directory
FSUtils.delete(fs, regionDir, true);
@ -103,7 +102,7 @@ public class TestCopyRecoveredEditsTask {
@Test
public void testNoEditsDir() throws Exception {
SnapshotDescription snapshot = SnapshotDescription.newBuilder().setName("snapshot").build();
SnapshotExceptionSnare monitor = Mockito.mock(SnapshotExceptionSnare.class);
ForeignExceptionDispatcher monitor = Mockito.mock(ForeignExceptionDispatcher.class);
FileSystem fs = UTIL.getTestFileSystem();
Path root = UTIL.getDataTestDir();
String regionName = "regionA";
@ -118,7 +117,7 @@ public class TestCopyRecoveredEditsTask {
CopyRecoveredEditsTask task = new CopyRecoveredEditsTask(snapshot, monitor, fs, regionDir,
snapshotRegionDir);
task.run();
task.call();
} finally {
// cleanup the working directory
FSUtils.delete(fs, regionDir, true);

View File

@ -29,8 +29,8 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.SmallTests;
import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
import org.apache.hadoop.hbase.server.snapshot.error.SnapshotExceptionSnare;
import org.apache.hadoop.hbase.util.FSUtils;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@ -63,16 +63,15 @@ public class TestReferenceRegionHFilesTask {
SnapshotDescription snapshot = SnapshotDescription.newBuilder().setName("name")
.setTable("table").build();
SnapshotExceptionSnare monitor = Mockito.mock(SnapshotExceptionSnare.class);
ForeignExceptionDispatcher monitor = Mockito.mock(ForeignExceptionDispatcher.class);
ReferenceRegionHFilesTask task = new ReferenceRegionHFilesTask(snapshot, monitor, regionDir,
fs, snapshotRegionDir);
task.run();
ReferenceRegionHFilesTask taskSpy = Mockito.spy(task);
task.call();
// make sure we never get an error
Mockito.verify(monitor, Mockito.never()).snapshotFailure(Mockito.anyString(),
Mockito.eq(snapshot));
Mockito.verify(monitor, Mockito.never()).snapshotFailure(Mockito.anyString(),
Mockito.eq(snapshot), Mockito.any(Exception.class));
Mockito.verify(taskSpy, Mockito.never()).snapshotFailure(Mockito.anyString(),
Mockito.any(Exception.class));
// verify that all the hfiles get referenced
List<String> hfiles = new ArrayList<String>(2);

View File

@ -17,9 +17,15 @@
*/
package org.apache.hadoop.hbase.server.snapshot.task;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import org.apache.hadoop.hbase.SmallTests;
import org.apache.hadoop.hbase.errorhandling.ForeignException;
import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
import org.apache.hadoop.hbase.server.snapshot.error.SnapshotExceptionSnare;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.mockito.Mockito;
@ -31,22 +37,21 @@ public class TestSnapshotTask {
* Check that errors from running the task get propagated back to the error listener.
*/
@Test
public void testErrorPropagationg() {
SnapshotExceptionSnare error = Mockito.mock(SnapshotExceptionSnare.class);
public void testErrorPropagation() throws Exception {
ForeignExceptionDispatcher error = mock(ForeignExceptionDispatcher.class);
SnapshotDescription snapshot = SnapshotDescription.newBuilder().setName("snapshot")
.setTable("table").build();
final Exception thrown = new Exception("Failed!");
SnapshotTask fail = new SnapshotTask(snapshot, error, "always fails") {
SnapshotTask fail = new SnapshotTask(snapshot, error) {
@Override
protected void process() throws Exception {
throw thrown;
public Void call() {
snapshotFailure("Injected failure", thrown);
return null;
}
};
fail.run();
fail.call();
Mockito.verify(error, Mockito.times(1)).snapshotFailure(Mockito.anyString(),
Mockito.eq(snapshot), Mockito.eq(thrown));
verify(error, Mockito.times(1)).receive(any(ForeignException.class));
}
}

View File

@ -29,9 +29,9 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.SmallTests;
import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
import org.apache.hadoop.hbase.server.snapshot.TakeSnapshotUtils;
import org.apache.hadoop.hbase.server.snapshot.error.SnapshotExceptionSnare;
import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
import org.apache.hadoop.hbase.util.FSUtils;
import org.junit.Test;
@ -76,16 +76,16 @@ public class TestWALReferenceTask {
FSUtils.setRootDir(conf, testDir);
SnapshotDescription snapshot = SnapshotDescription.newBuilder()
.setName("testWALReferenceSnapshot").build();
SnapshotExceptionSnare listener = Mockito.mock(SnapshotExceptionSnare.class);
ForeignExceptionDispatcher listener = Mockito.mock(ForeignExceptionDispatcher.class);
// reference all the files in the first server directory
ReferenceServerWALsTask task = new ReferenceServerWALsTask(snapshot, listener, server1Dir,
conf, fs);
task.run();
task.call();
// reference all the files in the first server directory
task = new ReferenceServerWALsTask(snapshot, listener, server2Dir, conf, fs);
task.run();
task.call();
// verify that we got everything
FSUtils.logFileSystemState(fs, testDir, LOG);
@ -96,7 +96,7 @@ public class TestWALReferenceTask {
TakeSnapshotUtils.verifyAllLogsGotReferenced(fs, logDir, servers, snapshot, snapshotLogDir);
// make sure we never got an error
Mockito.verify(listener, Mockito.atLeastOnce()).failOnError();
Mockito.verify(listener, Mockito.atLeastOnce()).rethrowException();
Mockito.verifyNoMoreInteractions(listener);
}
}