HBASE-7858 cleanup before merging snapshots branch to trunk (Ted Yu)
git-svn-id: https://svn.apache.org/repos/asf/hbase/branches/hbase-7290v2@1447967 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
8606f57f29
commit
c36b6fb32c
|
@ -749,7 +749,7 @@ public final class HConstants {
|
||||||
public static final String HFILE_ARCHIVE_DIRECTORY = ".archive";
|
public static final String HFILE_ARCHIVE_DIRECTORY = ".archive";
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Name of the directory to store snapshots all snapshots. See SnapshotDescriptionUtils for
|
* Name of the directory to store all snapshots. See SnapshotDescriptionUtils for
|
||||||
* remaining snapshot constants; this is here to keep HConstants dependencies at a minimum and
|
* remaining snapshot constants; this is here to keep HConstants dependencies at a minimum and
|
||||||
* uni-directional.
|
* uni-directional.
|
||||||
*/
|
*/
|
||||||
|
|
|
@ -16,7 +16,7 @@
|
||||||
* limitations under the License.
|
* limitations under the License.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
// This file contains protocol buffers that used to error handling
|
// This file contains protocol buffers that are used for error handling
|
||||||
|
|
||||||
option java_package = "org.apache.hadoop.hbase.protobuf.generated";
|
option java_package = "org.apache.hadoop.hbase.protobuf.generated";
|
||||||
option java_outer_classname = "ErrorHandlingProtos";
|
option java_outer_classname = "ErrorHandlingProtos";
|
||||||
|
|
|
@ -330,8 +330,8 @@ service MasterAdminService {
|
||||||
rpc execMasterService(CoprocessorServiceRequest)
|
rpc execMasterService(CoprocessorServiceRequest)
|
||||||
returns(CoprocessorServiceResponse);
|
returns(CoprocessorServiceResponse);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Create a snapshot for the given table.
|
* Create a snapshot for the given table.
|
||||||
* @param snapshot description of the snapshot to take
|
* @param snapshot description of the snapshot to take
|
||||||
*/
|
*/
|
||||||
rpc snapshot(TakeSnapshotRequest) returns(TakeSnapshotResponse);
|
rpc snapshot(TakeSnapshotRequest) returns(TakeSnapshotResponse);
|
||||||
|
@ -343,7 +343,7 @@ service MasterAdminService {
|
||||||
rpc listSnapshots(ListSnapshotRequest) returns(ListSnapshotResponse);
|
rpc listSnapshots(ListSnapshotRequest) returns(ListSnapshotResponse);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Delete an existing snapshot. This method can also be used to clean up a aborted snapshot.
|
* Delete an existing snapshot. This method can also be used to clean up an aborted snapshot.
|
||||||
* @param snapshotName snapshot to delete
|
* @param snapshotName snapshot to delete
|
||||||
*/
|
*/
|
||||||
rpc deleteSnapshot(DeleteSnapshotRequest) returns(DeleteSnapshotResponse);
|
rpc deleteSnapshot(DeleteSnapshotRequest) returns(DeleteSnapshotResponse);
|
||||||
|
|
|
@ -267,15 +267,15 @@ message NameInt64Pair {
|
||||||
* Description of the snapshot to take
|
* Description of the snapshot to take
|
||||||
*/
|
*/
|
||||||
message SnapshotDescription {
|
message SnapshotDescription {
|
||||||
required string name = 1;
|
required string name = 1;
|
||||||
optional string table = 2; // not needed for delete, but checked for in taking snapshot
|
optional string table = 2; // not needed for delete, but checked for in taking snapshot
|
||||||
optional int64 creationTime = 3 [default = 0];
|
optional int64 creationTime = 3 [default = 0];
|
||||||
enum Type {
|
enum Type {
|
||||||
DISABLED = 0;
|
DISABLED = 0;
|
||||||
FLUSH = 1;
|
FLUSH = 1;
|
||||||
}
|
}
|
||||||
optional Type type = 4 [default = FLUSH];
|
optional Type type = 4 [default = FLUSH];
|
||||||
optional int32 version = 5;
|
optional int32 version = 5;
|
||||||
}
|
}
|
||||||
|
|
||||||
message EmptyMsg {
|
message EmptyMsg {
|
||||||
|
|
|
@ -154,7 +154,7 @@ public class MetaEditor {
|
||||||
* @param deletes Deletes to add to .META. This list should support #remove.
|
* @param deletes Deletes to add to .META. This list should support #remove.
|
||||||
* @throws IOException
|
* @throws IOException
|
||||||
*/
|
*/
|
||||||
static void deleteFromMetaTable(final CatalogTracker ct, final List<Delete> deletes)
|
public static void deleteFromMetaTable(final CatalogTracker ct, final List<Delete> deletes)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
HTable t = MetaReader.getMetaHTable(ct);
|
HTable t = MetaReader.getMetaHTable(ct);
|
||||||
try {
|
try {
|
||||||
|
|
|
@ -2116,7 +2116,8 @@ public class HBaseAdmin implements Abortable, Closeable {
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Create a timestamp consistent snapshot for the given table.
|
* Take a snapshot for the given table. If the table is enabled, a FLUSH-type snapshot will be
|
||||||
|
* taken. If the table is disabled, an offline snapshot is taken.
|
||||||
* <p>
|
* <p>
|
||||||
* Snapshots are considered unique based on <b>the name of the snapshot</b>. Attempts to take a
|
* Snapshots are considered unique based on <b>the name of the snapshot</b>. Attempts to take a
|
||||||
* snapshot with the same name (even a different type or with different parameters) will fail with
|
* snapshot with the same name (even a different type or with different parameters) will fail with
|
||||||
|
@ -2206,9 +2207,6 @@ public class HBaseAdmin implements Abortable, Closeable {
|
||||||
*/
|
*/
|
||||||
public void snapshot(SnapshotDescription snapshot) throws IOException, SnapshotCreationException,
|
public void snapshot(SnapshotDescription snapshot) throws IOException, SnapshotCreationException,
|
||||||
IllegalArgumentException {
|
IllegalArgumentException {
|
||||||
// make sure the snapshot is valid
|
|
||||||
SnapshotDescriptionUtils.assertSnapshotRequestIsValid(snapshot);
|
|
||||||
|
|
||||||
// actually take the snapshot
|
// actually take the snapshot
|
||||||
TakeSnapshotResponse response = takeSnapshotAsync(snapshot);
|
TakeSnapshotResponse response = takeSnapshotAsync(snapshot);
|
||||||
final IsSnapshotDoneRequest request = IsSnapshotDoneRequest.newBuilder().setSnapshot(snapshot)
|
final IsSnapshotDoneRequest request = IsSnapshotDoneRequest.newBuilder().setSnapshot(snapshot)
|
||||||
|
@ -2226,9 +2224,9 @@ public class HBaseAdmin implements Abortable, Closeable {
|
||||||
try {
|
try {
|
||||||
// sleep a backoff <= pauseTime amount
|
// sleep a backoff <= pauseTime amount
|
||||||
long sleep = getPauseTime(tries++);
|
long sleep = getPauseTime(tries++);
|
||||||
LOG.debug("Found sleep:" + sleep);
|
|
||||||
sleep = sleep > maxPauseTime ? maxPauseTime : sleep;
|
sleep = sleep > maxPauseTime ? maxPauseTime : sleep;
|
||||||
LOG.debug(tries + ") Sleeping: " + sleep + " ms while we wait for snapshot to complete.");
|
LOG.debug("(#" + tries + ") Sleeping: " + sleep +
|
||||||
|
"ms while waiting for snapshot completion.");
|
||||||
Thread.sleep(sleep);
|
Thread.sleep(sleep);
|
||||||
|
|
||||||
} catch (InterruptedException e) {
|
} catch (InterruptedException e) {
|
||||||
|
@ -2242,8 +2240,7 @@ public class HBaseAdmin implements Abortable, Closeable {
|
||||||
return masterAdmin.isSnapshotDone(null, request);
|
return masterAdmin.isSnapshotDone(null, request);
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
}
|
};
|
||||||
;
|
|
||||||
if (!done.getDone()) {
|
if (!done.getDone()) {
|
||||||
throw new SnapshotCreationException("Snapshot '" + snapshot.getName()
|
throw new SnapshotCreationException("Snapshot '" + snapshot.getName()
|
||||||
+ "' wasn't completed in expectedTime:" + max + " ms", snapshot);
|
+ "' wasn't completed in expectedTime:" + max + " ms", snapshot);
|
||||||
|
@ -2251,7 +2248,7 @@ public class HBaseAdmin implements Abortable, Closeable {
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Take a snapshot and wait for the server to complete that snapshot (asynchronous)
|
* Take a snapshot without waiting for the server to complete that snapshot (asynchronous)
|
||||||
* <p>
|
* <p>
|
||||||
* Only a single snapshot should be taken at a time, or results may be undefined.
|
* Only a single snapshot should be taken at a time, or results may be undefined.
|
||||||
* @param snapshot snapshot to take
|
* @param snapshot snapshot to take
|
||||||
|
@ -2309,7 +2306,7 @@ public class HBaseAdmin implements Abortable, Closeable {
|
||||||
/**
|
/**
|
||||||
* Restore the specified snapshot on the original table. (The table must be disabled)
|
* Restore the specified snapshot on the original table. (The table must be disabled)
|
||||||
* Before restoring the table, a new snapshot with the current table state is created.
|
* Before restoring the table, a new snapshot with the current table state is created.
|
||||||
* In case of failure, the table will be rolled back to the its original state.
|
* In case of failure, the table will be rolled back to its original state.
|
||||||
*
|
*
|
||||||
* @param snapshotName name of the snapshot to restore
|
* @param snapshotName name of the snapshot to restore
|
||||||
* @throws IOException if a remote or network exception occurs
|
* @throws IOException if a remote or network exception occurs
|
||||||
|
@ -2358,7 +2355,7 @@ public class HBaseAdmin implements Abortable, Closeable {
|
||||||
// Try to rollback
|
// Try to rollback
|
||||||
try {
|
try {
|
||||||
String msg = "Restore snapshot=" + snapshotName +
|
String msg = "Restore snapshot=" + snapshotName +
|
||||||
" failed. Rollback to snapshot=" + rollbackSnapshot + " succeded.";
|
" failed. Rollback to snapshot=" + rollbackSnapshot + " succeeded.";
|
||||||
LOG.error(msg, e);
|
LOG.error(msg, e);
|
||||||
internalRestoreSnapshot(rollbackSnapshot, tableName);
|
internalRestoreSnapshot(rollbackSnapshot, tableName);
|
||||||
throw new RestoreSnapshotException(msg, e);
|
throw new RestoreSnapshotException(msg, e);
|
||||||
|
|
|
@ -51,20 +51,6 @@ public class ForeignException extends IOException {
|
||||||
*/
|
*/
|
||||||
private final String source;
|
private final String source;
|
||||||
|
|
||||||
/**
|
|
||||||
* Create a new ForeignException that can be serialized. It is assumed that this came from a
|
|
||||||
* remote source.
|
|
||||||
* @param source
|
|
||||||
* @param cause
|
|
||||||
*/
|
|
||||||
private ForeignException(String source, String clazz, ProxyThrowable cause) {
|
|
||||||
super(cause);
|
|
||||||
assert source != null;
|
|
||||||
assert cause != null;
|
|
||||||
assert clazz != null;
|
|
||||||
this.source = source;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Create a new ForeignException that can be serialized. It is assumed that this came form a
|
* Create a new ForeignException that can be serialized. It is assumed that this came form a
|
||||||
* local source.
|
* local source.
|
||||||
|
@ -114,7 +100,7 @@ public class ForeignException extends IOException {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Convert a stack trace to list of {@link StackTraceElement}.
|
* Convert a stack trace to list of {@link StackTraceElement}.
|
||||||
* @param stackTrace the stack trace to convert to protobuf message
|
* @param trace the stack trace to convert to protobuf message
|
||||||
* @return <tt>null</tt> if the passed stack is <tt>null</tt>.
|
* @return <tt>null</tt> if the passed stack is <tt>null</tt>.
|
||||||
*/
|
*/
|
||||||
private static List<StackTraceElementMessage> toStackTraceElementMessages(
|
private static List<StackTraceElementMessage> toStackTraceElementMessages(
|
||||||
|
@ -146,10 +132,10 @@ public class ForeignException extends IOException {
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Converts an ForeignException to a array of bytes.
|
* Converts a ForeignException to an array of bytes.
|
||||||
* @param source the name of the external exception source
|
* @param source the name of the external exception source
|
||||||
* @param t the "local" external exception (local)
|
* @param t the "local" external exception (local)
|
||||||
* @return protobuf serialized version of ForeignThreadException
|
* @return protobuf serialized version of ForeignException
|
||||||
*/
|
*/
|
||||||
public static byte[] serialize(String source, Throwable t) {
|
public static byte[] serialize(String source, Throwable t) {
|
||||||
GenericExceptionMessage.Builder gemBuilder = GenericExceptionMessage.newBuilder();
|
GenericExceptionMessage.Builder gemBuilder = GenericExceptionMessage.newBuilder();
|
||||||
|
@ -158,7 +144,8 @@ public class ForeignException extends IOException {
|
||||||
gemBuilder.setMessage(t.getMessage());
|
gemBuilder.setMessage(t.getMessage());
|
||||||
}
|
}
|
||||||
// set the stack trace, if there is one
|
// set the stack trace, if there is one
|
||||||
List<StackTraceElementMessage> stack = ForeignException.toStackTraceElementMessages(t.getStackTrace());
|
List<StackTraceElementMessage> stack =
|
||||||
|
ForeignException.toStackTraceElementMessages(t.getStackTrace());
|
||||||
if (stack != null) {
|
if (stack != null) {
|
||||||
gemBuilder.addAllTrace(stack);
|
gemBuilder.addAllTrace(stack);
|
||||||
}
|
}
|
||||||
|
@ -172,16 +159,16 @@ public class ForeignException extends IOException {
|
||||||
/**
|
/**
|
||||||
* Takes a series of bytes and tries to generate an ForeignException instance for it.
|
* Takes a series of bytes and tries to generate an ForeignException instance for it.
|
||||||
* @param bytes
|
* @param bytes
|
||||||
* @return the ExternalExcpetion instance
|
* @return the ForeignExcpetion instance
|
||||||
* @throws InvalidProtocolBufferException if there was deserialization problem this is thrown.
|
* @throws InvalidProtocolBufferException if there was deserialization problem this is thrown.
|
||||||
*/
|
*/
|
||||||
public static ForeignException deserialize(byte[] bytes) throws InvalidProtocolBufferException {
|
public static ForeignException deserialize(byte[] bytes) throws InvalidProtocolBufferException {
|
||||||
// figure out the data we need to pass
|
// figure out the data we need to pass
|
||||||
ForeignExceptionMessage eem = ForeignExceptionMessage.parseFrom(bytes);
|
ForeignExceptionMessage eem = ForeignExceptionMessage.parseFrom(bytes);
|
||||||
GenericExceptionMessage gem = eem.getGenericException();
|
GenericExceptionMessage gem = eem.getGenericException();
|
||||||
StackTraceElement [] trace = ForeignException.toStack(gem.getTraceList());
|
StackTraceElement [] trace = ForeignException.toStackTrace(gem.getTraceList());
|
||||||
ProxyThrowable dfe = new ProxyThrowable(gem.getMessage(), trace);
|
ProxyThrowable dfe = new ProxyThrowable(gem.getMessage(), trace);
|
||||||
ForeignException e = new ForeignException(eem.getSource(), gem.getClassName(), dfe);
|
ForeignException e = new ForeignException(eem.getSource(), dfe);
|
||||||
return e;
|
return e;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -192,7 +179,7 @@ public class ForeignException extends IOException {
|
||||||
* @return the deserialized list or <tt>null</tt> if it couldn't be unwound (e.g. wasn't set on
|
* @return the deserialized list or <tt>null</tt> if it couldn't be unwound (e.g. wasn't set on
|
||||||
* the sender).
|
* the sender).
|
||||||
*/
|
*/
|
||||||
private static StackTraceElement[] toStack(List<StackTraceElementMessage> traceList) {
|
private static StackTraceElement[] toStackTrace(List<StackTraceElementMessage> traceList) {
|
||||||
if (traceList == null || traceList.size() == 0) {
|
if (traceList == null || traceList.size() == 0) {
|
||||||
return new StackTraceElement[0]; // empty array
|
return new StackTraceElement[0]; // empty array
|
||||||
}
|
}
|
||||||
|
|
|
@ -44,7 +44,8 @@ import org.apache.hadoop.classification.InterfaceStability;
|
||||||
public class ForeignExceptionDispatcher implements ForeignExceptionListener, ForeignExceptionSnare {
|
public class ForeignExceptionDispatcher implements ForeignExceptionListener, ForeignExceptionSnare {
|
||||||
public static final Log LOG = LogFactory.getLog(ForeignExceptionDispatcher.class);
|
public static final Log LOG = LogFactory.getLog(ForeignExceptionDispatcher.class);
|
||||||
protected final String name;
|
protected final String name;
|
||||||
protected final List<ForeignExceptionListener> listeners = new ArrayList<ForeignExceptionListener>();
|
protected final List<ForeignExceptionListener> listeners =
|
||||||
|
new ArrayList<ForeignExceptionListener>();
|
||||||
private ForeignException exception;
|
private ForeignException exception;
|
||||||
|
|
||||||
public ForeignExceptionDispatcher(String name) {
|
public ForeignExceptionDispatcher(String name) {
|
||||||
|
@ -69,7 +70,7 @@ public class ForeignExceptionDispatcher implements ForeignExceptionListener, For
|
||||||
if (e != null) {
|
if (e != null) {
|
||||||
exception = e;
|
exception = e;
|
||||||
} else {
|
} else {
|
||||||
exception = new ForeignException(name, e);
|
exception = new ForeignException(name, "");
|
||||||
}
|
}
|
||||||
|
|
||||||
// notify all the listeners
|
// notify all the listeners
|
||||||
|
@ -77,16 +78,16 @@ public class ForeignExceptionDispatcher implements ForeignExceptionListener, For
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void rethrowException() throws ForeignException {
|
public synchronized void rethrowException() throws ForeignException {
|
||||||
if (exception != null) {
|
if (exception != null) {
|
||||||
// This gets the stack where this is caused, (instead of where it was deserialized).
|
// This gets the stack where this is caused, (instead of where it was deserialized).
|
||||||
// This which is much more useful for debugging
|
// This is much more useful for debugging
|
||||||
throw new ForeignException(exception.getSource(), exception.getCause());
|
throw new ForeignException(exception.getSource(), exception.getCause());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean hasException() {
|
public synchronized boolean hasException() {
|
||||||
return exception != null;
|
return exception != null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -102,7 +103,6 @@ public class ForeignExceptionDispatcher implements ForeignExceptionListener, For
|
||||||
*/
|
*/
|
||||||
private void dispatch(ForeignException e) {
|
private void dispatch(ForeignException e) {
|
||||||
// update all the listeners with the passed error
|
// update all the listeners with the passed error
|
||||||
LOG.debug(name + " Recieved error, notifying listeners...");
|
|
||||||
for (ForeignExceptionListener l: listeners) {
|
for (ForeignExceptionListener l: listeners) {
|
||||||
l.receive(e);
|
l.receive(e);
|
||||||
}
|
}
|
||||||
|
|
|
@ -21,7 +21,7 @@ import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.classification.InterfaceStability;
|
import org.apache.hadoop.classification.InterfaceStability;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Exception for a timeout of a task.
|
* Exception for timeout of a task.
|
||||||
* @see TimeoutExceptionInjector
|
* @see TimeoutExceptionInjector
|
||||||
*/
|
*/
|
||||||
@InterfaceAudience.Public
|
@InterfaceAudience.Public
|
||||||
|
|
|
@ -75,7 +75,7 @@ public class HFileLink extends FileLink {
|
||||||
HRegionInfo.ENCODED_REGION_NAME_REGEX, StoreFile.HFILE_NAME_REGEX));
|
HRegionInfo.ENCODED_REGION_NAME_REGEX, StoreFile.HFILE_NAME_REGEX));
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The link should be used for hfile and reference links
|
* The pattern should be used for hfile and reference links
|
||||||
* that can be found in /hbase/table/region/family/
|
* that can be found in /hbase/table/region/family/
|
||||||
*/
|
*/
|
||||||
private static final Pattern REF_OR_HFILE_LINK_PATTERN =
|
private static final Pattern REF_OR_HFILE_LINK_PATTERN =
|
||||||
|
|
|
@ -205,7 +205,7 @@ public class CreateTableHandler extends EventHandler {
|
||||||
try {
|
try {
|
||||||
assignmentManager.getZKTable().setEnabledTable(tableName);
|
assignmentManager.getZKTable().setEnabledTable(tableName);
|
||||||
} catch (KeeperException e) {
|
} catch (KeeperException e) {
|
||||||
throw new IOException("Unable to ensure that the table will be" +
|
throw new IOException("Unable to ensure that " + tableName + " will be" +
|
||||||
" enabled because of a ZooKeeper issue", e);
|
" enabled because of a ZooKeeper issue", e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -216,7 +216,8 @@ public class CreateTableHandler extends EventHandler {
|
||||||
* @param tableName name of the table under construction
|
* @param tableName name of the table under construction
|
||||||
* @return the list of regions created
|
* @return the list of regions created
|
||||||
*/
|
*/
|
||||||
protected List<HRegionInfo> handleCreateHdfsRegions(final Path tableRootDir, final String tableName)
|
protected List<HRegionInfo> handleCreateHdfsRegions(final Path tableRootDir,
|
||||||
|
final String tableName)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
int regionNumber = newRegions.length;
|
int regionNumber = newRegions.length;
|
||||||
ThreadPoolExecutor regionOpenAndInitThreadPool = getRegionOpenAndInitThreadPool(
|
ThreadPoolExecutor regionOpenAndInitThreadPool = getRegionOpenAndInitThreadPool(
|
||||||
|
|
|
@ -91,13 +91,15 @@ public class DeleteTableHandler extends TableEventHandler {
|
||||||
// 4. Delete regions from FS (temp directory)
|
// 4. Delete regions from FS (temp directory)
|
||||||
FileSystem fs = mfs.getFileSystem();
|
FileSystem fs = mfs.getFileSystem();
|
||||||
for (HRegionInfo hri: regions) {
|
for (HRegionInfo hri: regions) {
|
||||||
LOG.debug("Deleting region " + hri.getRegionNameAsString() + " from FS");
|
LOG.debug("Archiving region " + hri.getRegionNameAsString() + " from FS");
|
||||||
HFileArchiver.archiveRegion(fs, mfs.getRootDir(),
|
HFileArchiver.archiveRegion(fs, mfs.getRootDir(),
|
||||||
tempTableDir, new Path(tempTableDir, hri.getEncodedName()));
|
tempTableDir, new Path(tempTableDir, hri.getEncodedName()));
|
||||||
}
|
}
|
||||||
|
|
||||||
// 5. Delete table from FS (temp directory)
|
// 5. Delete table from FS (temp directory)
|
||||||
fs.delete(tempTableDir, true);
|
if (!fs.delete(tempTableDir, true)) {
|
||||||
|
LOG.error("Couldn't delete " + tempTableDir);
|
||||||
|
}
|
||||||
} finally {
|
} finally {
|
||||||
// 6. Update table descriptor cache
|
// 6. Update table descriptor cache
|
||||||
this.masterServices.getTableDescriptors().remove(Bytes.toString(tableName));
|
this.masterServices.getTableDescriptors().remove(Bytes.toString(tableName));
|
||||||
|
|
|
@ -107,10 +107,11 @@ public class DisabledTableSnapshotHandler extends TakeSnapshotHandler {
|
||||||
}
|
}
|
||||||
|
|
||||||
// 3. write the table info to disk
|
// 3. write the table info to disk
|
||||||
LOG.info("Starting to copy tableinfo for offline snapshot: " + SnapshotDescriptionUtils.toString(snapshot));
|
LOG.info("Starting to copy tableinfo for offline snapshot: " +
|
||||||
TableInfoCopyTask tableInfo = new TableInfoCopyTask(this.monitor, snapshot, fs,
|
SnapshotDescriptionUtils.toString(snapshot));
|
||||||
|
TableInfoCopyTask tableInfoCopyTask = new TableInfoCopyTask(this.monitor, snapshot, fs,
|
||||||
FSUtils.getRootDir(conf));
|
FSUtils.getRootDir(conf));
|
||||||
tableInfo.call();
|
tableInfoCopyTask.call();
|
||||||
monitor.rethrowException();
|
monitor.rethrowException();
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
// make sure we capture the exception to propagate back to the client later
|
// make sure we capture the exception to propagate back to the client later
|
||||||
|
|
|
@ -39,7 +39,7 @@ import com.google.common.collect.Lists;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Handle the master side of taking a snapshot of an online table, regardless of snapshot type.
|
* Handle the master side of taking a snapshot of an online table, regardless of snapshot type.
|
||||||
* Uses a {@link Procedure} to run the snapshot across all the involved regions.
|
* Uses a {@link Procedure} to run the snapshot across all the involved region servers.
|
||||||
* @see ProcedureCoordinator
|
* @see ProcedureCoordinator
|
||||||
*/
|
*/
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
|
@ -84,7 +84,7 @@ public class EnabledTableSnapshotHandler extends TakeSnapshotHandler {
|
||||||
// wait for the snapshot to complete. A timer thread is kicked off that should cancel this
|
// wait for the snapshot to complete. A timer thread is kicked off that should cancel this
|
||||||
// if it takes too long.
|
// if it takes too long.
|
||||||
proc.waitForCompleted();
|
proc.waitForCompleted();
|
||||||
LOG.info("Done waiting - snapshot finished!");
|
LOG.info("Done waiting - snapshot for " + this.snapshot.getName() + " finished!");
|
||||||
} catch (InterruptedException e) {
|
} catch (InterruptedException e) {
|
||||||
ForeignException ee =
|
ForeignException ee =
|
||||||
new ForeignException("Interrupted while waiting for snapshot to finish", e);
|
new ForeignException("Interrupted while waiting for snapshot to finish", e);
|
||||||
|
|
|
@ -84,7 +84,6 @@ public final class MasterSnapshotVerifier {
|
||||||
private MasterServices services;
|
private MasterServices services;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Build a util for the given snapshot
|
|
||||||
* @param services services for the master
|
* @param services services for the master
|
||||||
* @param snapshot snapshot to check
|
* @param snapshot snapshot to check
|
||||||
* @param rootDir root directory of the hbase installation.
|
* @param rootDir root directory of the hbase installation.
|
||||||
|
@ -137,7 +136,7 @@ public final class MasterSnapshotVerifier {
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Check that all the regions in the the snapshot are valid, and accounted for.
|
* Check that all the regions in the snapshot are valid, and accounted for.
|
||||||
* @param snapshotDir snapshot directory to check
|
* @param snapshotDir snapshot directory to check
|
||||||
* @throws IOException if we can't reach .META. or read the files from the FS
|
* @throws IOException if we can't reach .META. or read the files from the FS
|
||||||
*/
|
*/
|
||||||
|
@ -146,7 +145,7 @@ public final class MasterSnapshotVerifier {
|
||||||
Bytes.toBytes(tableName));
|
Bytes.toBytes(tableName));
|
||||||
for (HRegionInfo region : regions) {
|
for (HRegionInfo region : regions) {
|
||||||
// if offline split parent, skip it
|
// if offline split parent, skip it
|
||||||
if (region.isOffline() || region.isSplit() || region.isSplitParent()) {
|
if (region.isOffline() && (region.isSplit() || region.isSplitParent())) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -156,6 +155,7 @@ public final class MasterSnapshotVerifier {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Verify that the region (regioninfo, hfiles) are valid
|
* Verify that the region (regioninfo, hfiles) are valid
|
||||||
|
* @param fs the FileSystem instance
|
||||||
* @param snapshotDir snapshot directory to check
|
* @param snapshotDir snapshot directory to check
|
||||||
* @param region the region to check
|
* @param region the region to check
|
||||||
*/
|
*/
|
||||||
|
@ -174,10 +174,15 @@ public final class MasterSnapshotVerifier {
|
||||||
throw new CorruptedSnapshotException("No region info found for region:" + region, snapshot);
|
throw new CorruptedSnapshotException("No region info found for region:" + region, snapshot);
|
||||||
}
|
}
|
||||||
FSDataInputStream in = fs.open(regionInfo);
|
FSDataInputStream in = fs.open(regionInfo);
|
||||||
HRegionInfo found = HRegionInfo.parseFrom(in);
|
HRegionInfo found;
|
||||||
if (!region.equals(found)) {
|
try {
|
||||||
throw new CorruptedSnapshotException("Found region info (" + found
|
found = HRegionInfo.parseFrom(in);
|
||||||
|
if (!region.equals(found)) {
|
||||||
|
throw new CorruptedSnapshotException("Found region info (" + found
|
||||||
+ ") doesn't match expected region:" + region, snapshot);
|
+ ") doesn't match expected region:" + region, snapshot);
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
in.close();
|
||||||
}
|
}
|
||||||
|
|
||||||
// make sure we have the expected recovered edits files
|
// make sure we have the expected recovered edits files
|
||||||
|
@ -225,20 +230,4 @@ public final class MasterSnapshotVerifier {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Check that the logs stored in the log directory for the snapshot are valid - it contains all
|
|
||||||
* the expected logs for all servers involved in the snapshot.
|
|
||||||
* @param snapshotDir snapshot directory to check
|
|
||||||
* @param snapshotServers list of the names of servers involved in the snapshot.
|
|
||||||
* @throws CorruptedSnapshotException if the hlogs in the snapshot are not correct
|
|
||||||
* @throws IOException if we can't reach the filesystem
|
|
||||||
*/
|
|
||||||
private void verifyLogs(Path snapshotDir, Set<String> snapshotServers)
|
|
||||||
throws CorruptedSnapshotException, IOException {
|
|
||||||
Path snapshotLogDir = new Path(snapshotDir, HConstants.HREGION_LOGDIR_NAME);
|
|
||||||
Path logsDir = new Path(rootDir, HConstants.HREGION_LOGDIR_NAME);
|
|
||||||
TakeSnapshotUtils.verifyAllLogsGotReferenced(fs, logsDir, snapshotServers, snapshot,
|
|
||||||
snapshotLogDir);
|
|
||||||
}
|
|
||||||
}
|
}
|
|
@ -80,10 +80,11 @@ public class RestoreSnapshotHandler extends TableEventHandler implements Snapsho
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The restore table is executed in place.
|
* The restore table is executed in place.
|
||||||
* - The on-disk data will be restored
|
* - The on-disk data will be restored - reference files are put in place without moving data
|
||||||
* - [if something fail here: you need to delete the table and re-run the restore]
|
* - [if something fail here: you need to delete the table and re-run the restore]
|
||||||
* - META will be updated
|
* - META will be updated
|
||||||
* - [if something fail here: you need to run hbck to fix META entries]
|
* - [if something fail here: you need to run hbck to fix META entries]
|
||||||
|
* The passed in list gets changed in this method
|
||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
protected void handleTableOperation(List<HRegionInfo> hris) throws IOException {
|
protected void handleTableOperation(List<HRegionInfo> hris) throws IOException {
|
||||||
|
|
|
@ -58,10 +58,10 @@ import org.apache.hadoop.hbase.util.FSUtils;
|
||||||
* Further, the cache is periodically refreshed ensure that files in snapshots that were deleted are
|
* Further, the cache is periodically refreshed ensure that files in snapshots that were deleted are
|
||||||
* also removed from the cache.
|
* also removed from the cache.
|
||||||
* <p>
|
* <p>
|
||||||
* A {@link SnapshotFileInspector} must be passed when creating <tt>this</tt> to allow extraction of files
|
* A {@link SnapshotFileInspector} must be passed when creating <tt>this</tt> to allow extraction
|
||||||
* under the /hbase/.snapshot/[snapshot name] directory, for each snapshot. This allows you to only
|
* of files under the /hbase/.snapshot/[snapshot name] directory, for each snapshot.
|
||||||
* cache files under, for instance, all the logs in the .logs directory or all the files under all
|
* This allows you to only cache files under, for instance, all the logs in the .logs directory or
|
||||||
* the regions.
|
* all the files under all the regions.
|
||||||
* <p>
|
* <p>
|
||||||
* <tt>this</tt> also considers all running snapshots (those under /hbase/.snapshot/.tmp) as valid
|
* <tt>this</tt> also considers all running snapshots (those under /hbase/.snapshot/.tmp) as valid
|
||||||
* snapshots and will attempt to cache files from those snapshots as well.
|
* snapshots and will attempt to cache files from those snapshots as well.
|
||||||
|
@ -71,7 +71,7 @@ import org.apache.hadoop.hbase.util.FSUtils;
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
@InterfaceStability.Evolving
|
@InterfaceStability.Evolving
|
||||||
public class SnapshotFileCache implements Stoppable {
|
public class SnapshotFileCache implements Stoppable {
|
||||||
public interface SnapshotFileInspector {
|
interface SnapshotFileInspector {
|
||||||
/**
|
/**
|
||||||
* Returns a collection of file names needed by the snapshot.
|
* Returns a collection of file names needed by the snapshot.
|
||||||
* @param snapshotDir {@link Path} to the snapshot directory to scan.
|
* @param snapshotDir {@link Path} to the snapshot directory to scan.
|
||||||
|
@ -90,7 +90,8 @@ public class SnapshotFileCache implements Stoppable {
|
||||||
* This is a helper map of information about the snapshot directories so we don't need to rescan
|
* This is a helper map of information about the snapshot directories so we don't need to rescan
|
||||||
* them if they haven't changed since the last time we looked.
|
* them if they haven't changed since the last time we looked.
|
||||||
*/
|
*/
|
||||||
private final Map<String, SnapshotDirectoryInfo> snapshots = new HashMap<String, SnapshotDirectoryInfo>();
|
private final Map<String, SnapshotDirectoryInfo> snapshots =
|
||||||
|
new HashMap<String, SnapshotDirectoryInfo>();
|
||||||
private final Timer refreshTimer;
|
private final Timer refreshTimer;
|
||||||
|
|
||||||
private long lastModifiedTime = Long.MIN_VALUE;
|
private long lastModifiedTime = Long.MIN_VALUE;
|
||||||
|
@ -118,7 +119,7 @@ public class SnapshotFileCache implements Stoppable {
|
||||||
* filesystem
|
* filesystem
|
||||||
* @param fs {@link FileSystem} where the snapshots are stored
|
* @param fs {@link FileSystem} where the snapshots are stored
|
||||||
* @param rootDir hbase root directory
|
* @param rootDir hbase root directory
|
||||||
* @param cacheRefreshPeriod frequency (ms) with which the cache should be refreshed
|
* @param cacheRefreshPeriod period (ms) with which the cache should be refreshed
|
||||||
* @param cacheRefreshDelay amount of time to wait for the cache to be refreshed
|
* @param cacheRefreshDelay amount of time to wait for the cache to be refreshed
|
||||||
* @param refreshThreadName name of the cache refresh thread
|
* @param refreshThreadName name of the cache refresh thread
|
||||||
* @param inspectSnapshotFiles Filter to apply to each snapshot to extract the files.
|
* @param inspectSnapshotFiles Filter to apply to each snapshot to extract the files.
|
||||||
|
@ -143,8 +144,11 @@ public class SnapshotFileCache implements Stoppable {
|
||||||
* Exposed for TESTING.
|
* Exposed for TESTING.
|
||||||
*/
|
*/
|
||||||
public void triggerCacheRefreshForTesting() {
|
public void triggerCacheRefreshForTesting() {
|
||||||
LOG.debug("Triggering cache refresh");
|
try {
|
||||||
new RefreshCacheTask().run();
|
SnapshotFileCache.this.refreshCache();
|
||||||
|
} catch (IOException e) {
|
||||||
|
LOG.warn("Failed to refresh snapshot hfile cache!", e);
|
||||||
|
}
|
||||||
LOG.debug("Current cache:" + cache);
|
LOG.debug("Current cache:" + cache);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -184,7 +188,7 @@ public class SnapshotFileCache implements Stoppable {
|
||||||
try {
|
try {
|
||||||
status = fs.getFileStatus(snapshotDir);
|
status = fs.getFileStatus(snapshotDir);
|
||||||
} catch (FileNotFoundException e) {
|
} catch (FileNotFoundException e) {
|
||||||
LOG.warn("Snapshot directory: " + snapshotDir + " doesn't exist");
|
LOG.error("Snapshot directory: " + snapshotDir + " doesn't exist");
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
// if the snapshot directory wasn't modified since we last check, we are done
|
// if the snapshot directory wasn't modified since we last check, we are done
|
||||||
|
|
|
@ -44,7 +44,8 @@ public class SnapshotHFileCleaner extends BaseHFileCleanerDelegate {
|
||||||
* Conf key for the frequency to attempt to refresh the cache of hfiles currently used in
|
* Conf key for the frequency to attempt to refresh the cache of hfiles currently used in
|
||||||
* snapshots (ms)
|
* snapshots (ms)
|
||||||
*/
|
*/
|
||||||
public static final String HFILE_CACHE_REFRESH_PERIOD_CONF_KEY = "hbase.master.hfilecleaner.plugins.snapshot.period";
|
public static final String HFILE_CACHE_REFRESH_PERIOD_CONF_KEY =
|
||||||
|
"hbase.master.hfilecleaner.plugins.snapshot.period";
|
||||||
|
|
||||||
/** Refresh cache, by default, every 5 minutes */
|
/** Refresh cache, by default, every 5 minutes */
|
||||||
private static final long DEFAULT_HFILE_CACHE_REFRESH_PERIOD = 300000;
|
private static final long DEFAULT_HFILE_CACHE_REFRESH_PERIOD = 300000;
|
||||||
|
|
|
@ -44,7 +44,8 @@ public class SnapshotLogCleaner extends BaseLogCleanerDelegate {
|
||||||
* Conf key for the frequency to attempt to refresh the cache of hfiles currently used in
|
* Conf key for the frequency to attempt to refresh the cache of hfiles currently used in
|
||||||
* snapshots (ms)
|
* snapshots (ms)
|
||||||
*/
|
*/
|
||||||
static final String HLOG_CACHE_REFRESH_PERIOD_CONF_KEY = "hbase.master.hlogcleaner.plugins.snapshot.period";
|
static final String HLOG_CACHE_REFRESH_PERIOD_CONF_KEY =
|
||||||
|
"hbase.master.hlogcleaner.plugins.snapshot.period";
|
||||||
|
|
||||||
/** Refresh cache, by default, every 5 minutes */
|
/** Refresh cache, by default, every 5 minutes */
|
||||||
private static final long DEFAULT_HLOG_CACHE_REFRESH_PERIOD = 300000;
|
private static final long DEFAULT_HLOG_CACHE_REFRESH_PERIOD = 300000;
|
||||||
|
@ -54,6 +55,7 @@ public class SnapshotLogCleaner extends BaseLogCleanerDelegate {
|
||||||
@Override
|
@Override
|
||||||
public synchronized boolean isFileDeletable(Path filePath) {
|
public synchronized boolean isFileDeletable(Path filePath) {
|
||||||
try {
|
try {
|
||||||
|
if (null == cache) return false;
|
||||||
return !cache.contains(filePath.getName());
|
return !cache.contains(filePath.getName());
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
LOG.error("Exception while checking if:" + filePath + " was valid, keeping it just in case.",
|
LOG.error("Exception while checking if:" + filePath + " was valid, keeping it just in case.",
|
||||||
|
|
|
@ -77,7 +77,7 @@ import org.apache.zookeeper.KeeperException;
|
||||||
* <p>
|
* <p>
|
||||||
* The class provides methods for monitoring in-progress snapshot actions.
|
* The class provides methods for monitoring in-progress snapshot actions.
|
||||||
* <p>
|
* <p>
|
||||||
* Note: Currently there can only one snapshot being taken at a time over the cluster. This is a
|
* Note: Currently there can only be one snapshot being taken at a time over the cluster. This is a
|
||||||
* simplification in the current implementation.
|
* simplification in the current implementation.
|
||||||
*/
|
*/
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
|
@ -86,7 +86,7 @@ public class SnapshotManager implements Stoppable {
|
||||||
private static final Log LOG = LogFactory.getLog(SnapshotManager.class);
|
private static final Log LOG = LogFactory.getLog(SnapshotManager.class);
|
||||||
|
|
||||||
/** By default, check to see if the snapshot is complete every WAKE MILLIS (ms) */
|
/** By default, check to see if the snapshot is complete every WAKE MILLIS (ms) */
|
||||||
public static final int SNAPSHOT_WAKE_MILLIS_DEFAULT = 500;
|
private static final int SNAPSHOT_WAKE_MILLIS_DEFAULT = 500;
|
||||||
|
|
||||||
/** Enable or disable snapshot support */
|
/** Enable or disable snapshot support */
|
||||||
public static final String HBASE_SNAPSHOT_ENABLED = "hbase.snapshot.enabled";
|
public static final String HBASE_SNAPSHOT_ENABLED = "hbase.snapshot.enabled";
|
||||||
|
@ -95,16 +95,16 @@ public class SnapshotManager implements Stoppable {
|
||||||
* Conf key for # of ms elapsed between checks for snapshot errors while waiting for
|
* Conf key for # of ms elapsed between checks for snapshot errors while waiting for
|
||||||
* completion.
|
* completion.
|
||||||
*/
|
*/
|
||||||
public static final String SNAPSHOT_WAKE_MILLIS_KEY = "hbase.snapshot.master.wakeMillis";
|
private static final String SNAPSHOT_WAKE_MILLIS_KEY = "hbase.snapshot.master.wakeMillis";
|
||||||
|
|
||||||
/** By default, check to see if the snapshot is complete (ms) */
|
/** By default, check to see if the snapshot is complete (ms) */
|
||||||
public static final int SNAPSHOT_TIMEOUT_MILLIS_DEFAULT = 5000;
|
private static final int SNAPSHOT_TIMEOUT_MILLIS_DEFAULT = 5000;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Conf key for # of ms elapsed before injecting a snapshot timeout error when waiting for
|
* Conf key for # of ms elapsed before injecting a snapshot timeout error when waiting for
|
||||||
* completion.
|
* completion.
|
||||||
*/
|
*/
|
||||||
public static final String SNAPSHOT_TIMEOUT_MILLIS_KEY = "hbase.snapshot.master.timeoutMillis";
|
private static final String SNAPSHOT_TIMEOUT_MILLIS_KEY = "hbase.snapshot.master.timeoutMillis";
|
||||||
|
|
||||||
/** Name of the operation to use in the controller */
|
/** Name of the operation to use in the controller */
|
||||||
public static final String ONLINE_SNAPSHOT_CONTROLLER_DESCRIPTION = "online-snapshot";
|
public static final String ONLINE_SNAPSHOT_CONTROLLER_DESCRIPTION = "online-snapshot";
|
||||||
|
@ -230,7 +230,7 @@ public class SnapshotManager implements Stoppable {
|
||||||
void resetTempDir() throws IOException {
|
void resetTempDir() throws IOException {
|
||||||
// cleanup any existing snapshots.
|
// cleanup any existing snapshots.
|
||||||
Path tmpdir = SnapshotDescriptionUtils.getWorkingSnapshotDir(rootDir);
|
Path tmpdir = SnapshotDescriptionUtils.getWorkingSnapshotDir(rootDir);
|
||||||
if (master.getMasterFileSystem().getFileSystem().delete(tmpdir, true)) {
|
if (!master.getMasterFileSystem().getFileSystem().delete(tmpdir, true)) {
|
||||||
LOG.warn("Couldn't delete working snapshot directory: " + tmpdir);
|
LOG.warn("Couldn't delete working snapshot directory: " + tmpdir);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -277,7 +277,7 @@ public class SnapshotManager implements Stoppable {
|
||||||
* @param snapshot
|
* @param snapshot
|
||||||
* @return null if doesn't match, else a live handler.
|
* @return null if doesn't match, else a live handler.
|
||||||
*/
|
*/
|
||||||
TakeSnapshotHandler getTakeSnapshotHandler(SnapshotDescription snapshot) {
|
private synchronized TakeSnapshotHandler getTakeSnapshotHandler(SnapshotDescription snapshot) {
|
||||||
TakeSnapshotHandler h = this.handler;
|
TakeSnapshotHandler h = this.handler;
|
||||||
if (h == null) {
|
if (h == null) {
|
||||||
return null;
|
return null;
|
||||||
|
@ -397,7 +397,7 @@ public class SnapshotManager implements Stoppable {
|
||||||
}
|
}
|
||||||
|
|
||||||
try {
|
try {
|
||||||
// delete the working directory, since we aren't running the snapshot. Likely leftovers
|
// delete the working directory, since we aren't running the snapshot. Likely leftovers
|
||||||
// from a failed attempt.
|
// from a failed attempt.
|
||||||
fs.delete(workingDir, true);
|
fs.delete(workingDir, true);
|
||||||
|
|
||||||
|
@ -415,7 +415,7 @@ public class SnapshotManager implements Stoppable {
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Take a snapshot of a enabled table.
|
* Take a snapshot of an enabled table.
|
||||||
* <p>
|
* <p>
|
||||||
* The thread limitation on the executorService's thread pool for snapshots ensures the
|
* The thread limitation on the executorService's thread pool for snapshots ensures the
|
||||||
* snapshot won't be started if there is another snapshot already running. Does
|
* snapshot won't be started if there is another snapshot already running. Does
|
||||||
|
@ -434,7 +434,7 @@ public class SnapshotManager implements Stoppable {
|
||||||
// cleanup the working directory by trying to delete it from the fs.
|
// cleanup the working directory by trying to delete it from the fs.
|
||||||
Path workingDir = SnapshotDescriptionUtils.getWorkingSnapshotDir(snapshot, rootDir);
|
Path workingDir = SnapshotDescriptionUtils.getWorkingSnapshotDir(snapshot, rootDir);
|
||||||
try {
|
try {
|
||||||
if (this.master.getMasterFileSystem().getFileSystem().delete(workingDir, true)) {
|
if (!this.master.getMasterFileSystem().getFileSystem().delete(workingDir, true)) {
|
||||||
LOG.warn("Couldn't delete working directory (" + workingDir + " for snapshot:"
|
LOG.warn("Couldn't delete working directory (" + workingDir + " for snapshot:"
|
||||||
+ SnapshotDescriptionUtils.toString(snapshot));
|
+ SnapshotDescriptionUtils.toString(snapshot));
|
||||||
}
|
}
|
||||||
|
@ -454,7 +454,7 @@ public class SnapshotManager implements Stoppable {
|
||||||
* @throws HBaseSnapshotException when a snapshot specific exception occurs.
|
* @throws HBaseSnapshotException when a snapshot specific exception occurs.
|
||||||
* @throws IOException when some sort of generic IO exception occurs.
|
* @throws IOException when some sort of generic IO exception occurs.
|
||||||
*/
|
*/
|
||||||
public void takeSnapshot(SnapshotDescription snapshot) throws HBaseSnapshotException, IOException {
|
public void takeSnapshot(SnapshotDescription snapshot) throws IOException {
|
||||||
// check to see if we already completed the snapshot
|
// check to see if we already completed the snapshot
|
||||||
if (isSnapshotCompleted(snapshot)) {
|
if (isSnapshotCompleted(snapshot)) {
|
||||||
throw new SnapshotExistsException("Snapshot '" + snapshot.getName()
|
throw new SnapshotExistsException("Snapshot '" + snapshot.getName()
|
||||||
|
@ -543,7 +543,7 @@ public class SnapshotManager implements Stoppable {
|
||||||
// cleanup the working directory by trying to delete it from the fs.
|
// cleanup the working directory by trying to delete it from the fs.
|
||||||
Path workingDir = SnapshotDescriptionUtils.getWorkingSnapshotDir(snapshot, rootDir);
|
Path workingDir = SnapshotDescriptionUtils.getWorkingSnapshotDir(snapshot, rootDir);
|
||||||
try {
|
try {
|
||||||
if (this.master.getMasterFileSystem().getFileSystem().delete(workingDir, true)) {
|
if (!this.master.getMasterFileSystem().getFileSystem().delete(workingDir, true)) {
|
||||||
LOG.error("Couldn't delete working directory (" + workingDir + " for snapshot:" +
|
LOG.error("Couldn't delete working directory (" + workingDir + " for snapshot:" +
|
||||||
SnapshotDescriptionUtils.toString(snapshot));
|
SnapshotDescriptionUtils.toString(snapshot));
|
||||||
}
|
}
|
||||||
|
@ -596,8 +596,8 @@ public class SnapshotManager implements Stoppable {
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Restore the specified snapshot.
|
* Clone the specified snapshot into a new table.
|
||||||
* The restore will fail if the destination table has a snapshot or restore in progress.
|
* The operation will fail if the destination table has a snapshot or restore in progress.
|
||||||
*
|
*
|
||||||
* @param snapshot Snapshot Descriptor
|
* @param snapshot Snapshot Descriptor
|
||||||
* @param hTableDescriptor Table Descriptor of the table to create
|
* @param hTableDescriptor Table Descriptor of the table to create
|
||||||
|
@ -722,7 +722,7 @@ public class SnapshotManager implements Stoppable {
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Verify if the the restore of the specified table is in progress.
|
* Verify if the restore of the specified table is in progress.
|
||||||
*
|
*
|
||||||
* @param tableName table under restore
|
* @param tableName table under restore
|
||||||
* @return <tt>true</tt> if there is a restore in progress of the specified table.
|
* @return <tt>true</tt> if there is a restore in progress of the specified table.
|
||||||
|
@ -736,7 +736,7 @@ public class SnapshotManager implements Stoppable {
|
||||||
* Returns status of a restore request, specifically comparing source snapshot and target table
|
* Returns status of a restore request, specifically comparing source snapshot and target table
|
||||||
* names. Throws exception if not a known snapshot.
|
* names. Throws exception if not a known snapshot.
|
||||||
* @param snapshot
|
* @param snapshot
|
||||||
* @return true if in progress, false if is not.
|
* @return true if in progress, false if snapshot is completed.
|
||||||
* @throws UnknownSnapshotException if specified source snapshot does not exit.
|
* @throws UnknownSnapshotException if specified source snapshot does not exit.
|
||||||
* @throws IOException if there was some sort of IO failure
|
* @throws IOException if there was some sort of IO failure
|
||||||
*/
|
*/
|
||||||
|
@ -792,7 +792,7 @@ public class SnapshotManager implements Stoppable {
|
||||||
/**
|
/**
|
||||||
* Scan the restore handlers and remove the finished ones.
|
* Scan the restore handlers and remove the finished ones.
|
||||||
*/
|
*/
|
||||||
private void cleanupRestoreSentinels() {
|
private synchronized void cleanupRestoreSentinels() {
|
||||||
Iterator<Map.Entry<String, SnapshotSentinel>> it = restoreHandlers.entrySet().iterator();
|
Iterator<Map.Entry<String, SnapshotSentinel>> it = restoreHandlers.entrySet().iterator();
|
||||||
while (it.hasNext()) {
|
while (it.hasNext()) {
|
||||||
Map.Entry<String, SnapshotSentinel> entry = it.next();
|
Map.Entry<String, SnapshotSentinel> entry = it.next();
|
||||||
|
@ -852,7 +852,7 @@ public class SnapshotManager implements Stoppable {
|
||||||
*/
|
*/
|
||||||
private void checkSnapshotSupport(final Configuration conf, final MasterFileSystem mfs)
|
private void checkSnapshotSupport(final Configuration conf, final MasterFileSystem mfs)
|
||||||
throws IOException, UnsupportedOperationException {
|
throws IOException, UnsupportedOperationException {
|
||||||
// Verify if snapshot are disabled by the user
|
// Verify if snapshot is disabled by the user
|
||||||
String enabled = conf.get(HBASE_SNAPSHOT_ENABLED);
|
String enabled = conf.get(HBASE_SNAPSHOT_ENABLED);
|
||||||
boolean snapshotEnabled = conf.getBoolean(HBASE_SNAPSHOT_ENABLED, false);
|
boolean snapshotEnabled = conf.getBoolean(HBASE_SNAPSHOT_ENABLED, false);
|
||||||
boolean userDisabled = (enabled != null && enabled.trim().length() > 0 && !snapshotEnabled);
|
boolean userDisabled = (enabled != null && enabled.trim().length() > 0 && !snapshotEnabled);
|
||||||
|
@ -894,7 +894,7 @@ public class SnapshotManager implements Stoppable {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Mark snapshot feature as enabled if cleaners are present and user as not disabled it.
|
// Mark snapshot feature as enabled if cleaners are present and user has not disabled it.
|
||||||
this.isSnapshotSupported = snapshotEnabled && !userDisabled;
|
this.isSnapshotSupported = snapshotEnabled && !userDisabled;
|
||||||
|
|
||||||
// If cleaners are not enabled, verify that there're no snapshot in the .snapshot folder
|
// If cleaners are not enabled, verify that there're no snapshot in the .snapshot folder
|
||||||
|
|
|
@ -52,6 +52,8 @@ import org.apache.zookeeper.KeeperException;
|
||||||
* A handler for taking snapshots from the master.
|
* A handler for taking snapshots from the master.
|
||||||
*
|
*
|
||||||
* This is not a subclass of TableEventHandler because using that would incur an extra META scan.
|
* This is not a subclass of TableEventHandler because using that would incur an extra META scan.
|
||||||
|
*
|
||||||
|
* The {@link #snapshotRegions(List)} call should get implemented for each snapshot flavor.
|
||||||
*/
|
*/
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
public abstract class TakeSnapshotHandler extends EventHandler implements SnapshotSentinel,
|
public abstract class TakeSnapshotHandler extends EventHandler implements SnapshotSentinel,
|
||||||
|
@ -91,6 +93,8 @@ public abstract class TakeSnapshotHandler extends EventHandler implements Snapsh
|
||||||
this.workingDir = SnapshotDescriptionUtils.getWorkingSnapshotDir(snapshot, rootDir);
|
this.workingDir = SnapshotDescriptionUtils.getWorkingSnapshotDir(snapshot, rootDir);
|
||||||
this.monitor = new ForeignExceptionDispatcher();
|
this.monitor = new ForeignExceptionDispatcher();
|
||||||
|
|
||||||
|
loadTableDescriptor(); // check that .tableinfo is present
|
||||||
|
|
||||||
// prepare the verify
|
// prepare the verify
|
||||||
this.verifier = new MasterSnapshotVerifier(masterServices, snapshot, rootDir);
|
this.verifier = new MasterSnapshotVerifier(masterServices, snapshot, rootDir);
|
||||||
}
|
}
|
||||||
|
@ -107,19 +111,13 @@ public abstract class TakeSnapshotHandler extends EventHandler implements Snapsh
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Execute the core common portions of taking a snapshot. the {@link #snapshotRegions(List)}
|
* Execute the core common portions of taking a snapshot. The {@link #snapshotRegions(List)}
|
||||||
* call should get implemented for each snapshot flavor.
|
* call should get implemented for each snapshot flavor.
|
||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
public void process() {
|
public void process() {
|
||||||
LOG.info("Running table snapshot operation " + eventType + " on table " + snapshot.getTable());
|
LOG.info("Running table snapshot operation " + eventType + " on table " + snapshot.getTable());
|
||||||
try {
|
try {
|
||||||
loadTableDescriptor(); // check that .tableinfo is present
|
|
||||||
|
|
||||||
byte[] ssbytes = Bytes.toBytes(snapshot.getTable());
|
|
||||||
List<Pair<HRegionInfo, ServerName>> regionsAndLocations = MetaReader.getTableRegionsAndLocations(
|
|
||||||
this.server.getCatalogTracker(), ssbytes, true);
|
|
||||||
|
|
||||||
// If regions move after this meta scan, the region specific snapshot should fail, triggering
|
// If regions move after this meta scan, the region specific snapshot should fail, triggering
|
||||||
// an external exception that gets captured here.
|
// an external exception that gets captured here.
|
||||||
|
|
||||||
|
@ -128,6 +126,10 @@ public abstract class TakeSnapshotHandler extends EventHandler implements Snapsh
|
||||||
new TableInfoCopyTask(monitor, snapshot, fs, rootDir).call();
|
new TableInfoCopyTask(monitor, snapshot, fs, rootDir).call();
|
||||||
monitor.rethrowException();
|
monitor.rethrowException();
|
||||||
|
|
||||||
|
List<Pair<HRegionInfo, ServerName>> regionsAndLocations =
|
||||||
|
MetaReader.getTableRegionsAndLocations(this.server.getCatalogTracker(),
|
||||||
|
Bytes.toBytes(snapshot.getTable()), true);
|
||||||
|
|
||||||
// run the snapshot
|
// run the snapshot
|
||||||
snapshotRegions(regionsAndLocations);
|
snapshotRegions(regionsAndLocations);
|
||||||
|
|
||||||
|
|
|
@ -165,8 +165,7 @@ public class Procedure implements Callable<Void>, ForeignExceptionListener {
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Returns a copy of the procedure members still trying to enter the barrier.
|
* @return String of the procedure members both trying to enter the barrier and already in barrier
|
||||||
* @return
|
|
||||||
*/
|
*/
|
||||||
public String getStatus() {
|
public String getStatus() {
|
||||||
String waiting, done;
|
String waiting, done;
|
||||||
|
@ -176,9 +175,9 @@ public class Procedure implements Callable<Void>, ForeignExceptionListener {
|
||||||
}
|
}
|
||||||
return "Procedure " + procName + " { waiting=" + waiting + " done="+ done + " }";
|
return "Procedure " + procName + " { waiting=" + waiting + " done="+ done + " }";
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Get the ExternalErrorDispatcher
|
* Get the ForeignExceptionDispatcher
|
||||||
* @return the Procedure's monitor.
|
* @return the Procedure's monitor.
|
||||||
*/
|
*/
|
||||||
public ForeignExceptionDispatcher getErrorMonitor() {
|
public ForeignExceptionDispatcher getErrorMonitor() {
|
||||||
|
@ -306,7 +305,7 @@ public class Procedure implements Callable<Void>, ForeignExceptionListener {
|
||||||
LOG.debug("Waiting on: " + acquiredBarrierLatch + " remaining members to acquire global barrier");
|
LOG.debug("Waiting on: " + acquiredBarrierLatch + " remaining members to acquire global barrier");
|
||||||
} else {
|
} else {
|
||||||
LOG.warn("Member " + member + " joined barrier, but we weren't waiting on it to join." +
|
LOG.warn("Member " + member + " joined barrier, but we weren't waiting on it to join." +
|
||||||
" Continuting on.");
|
" Continuing on.");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -345,7 +344,7 @@ public class Procedure implements Callable<Void>, ForeignExceptionListener {
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A callback that handles incoming ExternalExceptions.
|
* A callback that handles incoming ForeignExceptions.
|
||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
public void receive(ForeignException e) {
|
public void receive(ForeignException e) {
|
||||||
|
@ -371,7 +370,9 @@ public class Procedure implements Callable<Void>, ForeignExceptionListener {
|
||||||
if (monitor != null) {
|
if (monitor != null) {
|
||||||
monitor.rethrowException();
|
monitor.rethrowException();
|
||||||
}
|
}
|
||||||
ForeignExceptionDispatcher.LOG.debug("Waiting for '" + latchDescription + "' latch. (sleep:" + wakeFrequency + " ms)");
|
/*
|
||||||
|
ForeignExceptionDispatcher.LOG.debug("Waiting for '" + latchDescription + "' latch. (sleep:"
|
||||||
|
+ wakeFrequency + " ms)"); */
|
||||||
released = latch.await(wakeFrequency, TimeUnit.MILLISECONDS);
|
released = latch.await(wakeFrequency, TimeUnit.MILLISECONDS);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -112,7 +112,7 @@ public class ProcedureCoordinator {
|
||||||
}
|
}
|
||||||
String procName = proc.getName();
|
String procName = proc.getName();
|
||||||
|
|
||||||
// make sure we aren't already running an procedure of that name
|
// make sure we aren't already running a procedure of that name
|
||||||
synchronized (procedures) {
|
synchronized (procedures) {
|
||||||
Procedure oldProc = procedures.get(procName);
|
Procedure oldProc = procedures.get(procName);
|
||||||
if (oldProc != null) {
|
if (oldProc != null) {
|
||||||
|
@ -129,9 +129,9 @@ public class ProcedureCoordinator {
|
||||||
// kick off the procedure's execution in a separate thread
|
// kick off the procedure's execution in a separate thread
|
||||||
Future<Void> f = null;
|
Future<Void> f = null;
|
||||||
try {
|
try {
|
||||||
f = this.pool.submit(proc);
|
|
||||||
// if everything got started properly, we can add it known running procedures
|
|
||||||
synchronized (procedures) {
|
synchronized (procedures) {
|
||||||
|
f = this.pool.submit(proc);
|
||||||
|
// if everything got started properly, we can add it known running procedures
|
||||||
this.procedures.put(procName, proc);
|
this.procedures.put(procName, proc);
|
||||||
}
|
}
|
||||||
return true;
|
return true;
|
||||||
|
|
|
@ -54,8 +54,8 @@ public class ProcedureMember implements Closeable {
|
||||||
private final SubprocedureFactory builder;
|
private final SubprocedureFactory builder;
|
||||||
private final ProcedureMemberRpcs rpcs;
|
private final ProcedureMemberRpcs rpcs;
|
||||||
|
|
||||||
// private final WeakValueMapping<String, Subprocedure> subprocs = new WeakValueMapping<String, Subprocedure>();
|
private final ConcurrentMap<String,Subprocedure> subprocs =
|
||||||
private final ConcurrentMap<String,Subprocedure> subprocs = new MapMaker().concurrencyLevel(4).weakValues().makeMap();
|
new MapMaker().concurrencyLevel(4).weakValues().makeMap();
|
||||||
private final ExecutorService pool;
|
private final ExecutorService pool;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -167,7 +167,8 @@ public class ProcedureMember implements Closeable {
|
||||||
public void receivedReachedGlobalBarrier(String procName) {
|
public void receivedReachedGlobalBarrier(String procName) {
|
||||||
Subprocedure subproc = subprocs.get(procName);
|
Subprocedure subproc = subprocs.get(procName);
|
||||||
if (subproc == null) {
|
if (subproc == null) {
|
||||||
LOG.warn("Unexpected reached glabal barrier message for Procedure '" + procName + "'");
|
LOG.warn("Unexpected reached glabal barrier message for Sub-Procedure '" + procName + "'");
|
||||||
|
return;
|
||||||
}
|
}
|
||||||
subproc.receiveReachedGlobalBarrier();
|
subproc.receiveReachedGlobalBarrier();
|
||||||
}
|
}
|
||||||
|
@ -187,7 +188,7 @@ public class ProcedureMember implements Closeable {
|
||||||
* @return true if successfully, false if bailed due to timeout.
|
* @return true if successfully, false if bailed due to timeout.
|
||||||
* @throws InterruptedException
|
* @throws InterruptedException
|
||||||
*/
|
*/
|
||||||
public boolean closeAndWait(long timeoutMs) throws InterruptedException {
|
boolean closeAndWait(long timeoutMs) throws InterruptedException {
|
||||||
pool.shutdown();
|
pool.shutdown();
|
||||||
return pool.awaitTermination(timeoutMs, TimeUnit.MILLISECONDS);
|
return pool.awaitTermination(timeoutMs, TimeUnit.MILLISECONDS);
|
||||||
}
|
}
|
||||||
|
@ -204,9 +205,9 @@ public class ProcedureMember implements Closeable {
|
||||||
*/
|
*/
|
||||||
public void controllerConnectionFailure(final String message, final IOException cause) {
|
public void controllerConnectionFailure(final String message, final IOException cause) {
|
||||||
Collection<Subprocedure> toNotify = subprocs.values();
|
Collection<Subprocedure> toNotify = subprocs.values();
|
||||||
|
LOG.error(message, cause);
|
||||||
for (Subprocedure sub : toNotify) {
|
for (Subprocedure sub : toNotify) {
|
||||||
// TODO notify the elements, if they aren't null
|
// TODO notify the elements, if they aren't null
|
||||||
LOG.error(message, cause);
|
|
||||||
sub.cancel(message, cause);
|
sub.cancel(message, cause);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -181,7 +181,7 @@ abstract public class Subprocedure implements Callable<Void> {
|
||||||
insideBarrier();
|
insideBarrier();
|
||||||
LOG.debug("Subprocedure '" + barrierName + "' locally completed");
|
LOG.debug("Subprocedure '" + barrierName + "' locally completed");
|
||||||
|
|
||||||
// Ack that the member has executed and relased local barrier
|
// Ack that the member has executed and released local barrier
|
||||||
rpcs.sendMemberCompleted(this);
|
rpcs.sendMemberCompleted(this);
|
||||||
LOG.debug("Subprocedure '" + barrierName + "' has notified controller of completion");
|
LOG.debug("Subprocedure '" + barrierName + "' has notified controller of completion");
|
||||||
|
|
||||||
|
|
|
@ -66,6 +66,9 @@ public class ZKProcedureCoordinatorRpcs implements ProcedureCoordinatorRpcs {
|
||||||
* appear, first acquire to relevant listener or sets watch waiting for notification of
|
* appear, first acquire to relevant listener or sets watch waiting for notification of
|
||||||
* the acquire node
|
* the acquire node
|
||||||
*
|
*
|
||||||
|
* @param proc the Procedure
|
||||||
|
* @param info data to be stored in the acquire node
|
||||||
|
* @param nodeNames children of the acquire phase
|
||||||
* @throws IOException if any failure occurs.
|
* @throws IOException if any failure occurs.
|
||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
|
@ -79,12 +82,10 @@ public class ZKProcedureCoordinatorRpcs implements ProcedureCoordinatorRpcs {
|
||||||
if (ZKUtil.watchAndCheckExists(zkProc.getWatcher(), abortNode)) {
|
if (ZKUtil.watchAndCheckExists(zkProc.getWatcher(), abortNode)) {
|
||||||
abort(abortNode);
|
abort(abortNode);
|
||||||
}
|
}
|
||||||
|
|
||||||
// If we get an abort node watch triggered here, we'll go complete creating the acquired
|
// If we get an abort node watch triggered here, we'll go complete creating the acquired
|
||||||
// znode but then handle the acquire znode and bail out
|
// znode but then handle the acquire znode and bail out
|
||||||
|
|
||||||
} catch (KeeperException e) {
|
} catch (KeeperException e) {
|
||||||
LOG.error("Failed to create abort", e);
|
LOG.error("Failed to watch abort", e);
|
||||||
throw new IOException("Failed while watching abort node:" + abortNode, e);
|
throw new IOException("Failed while watching abort node:" + abortNode, e);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -155,11 +156,12 @@ public class ZKProcedureCoordinatorRpcs implements ProcedureCoordinatorRpcs {
|
||||||
* Start monitoring znodes in ZK - subclass hook to start monitoring znodes they are about.
|
* Start monitoring znodes in ZK - subclass hook to start monitoring znodes they are about.
|
||||||
* @return true if succeed, false if encountered initialization errors.
|
* @return true if succeed, false if encountered initialization errors.
|
||||||
*/
|
*/
|
||||||
final public boolean start(final ProcedureCoordinator listener) {
|
final public boolean start(final ProcedureCoordinator coordinator) {
|
||||||
if (this.coordinator != null) {
|
if (this.coordinator != null) {
|
||||||
throw new IllegalStateException("ZKProcedureCoordinator already started and already has listener installed");
|
throw new IllegalStateException(
|
||||||
|
"ZKProcedureCoordinator already started and already has listener installed");
|
||||||
}
|
}
|
||||||
this.coordinator = listener;
|
this.coordinator = coordinator;
|
||||||
|
|
||||||
try {
|
try {
|
||||||
this.zkProc = new ZKProcedureUtil(watcher, procedureType, coordName) {
|
this.zkProc = new ZKProcedureUtil(watcher, procedureType, coordName) {
|
||||||
|
@ -170,15 +172,15 @@ public class ZKProcedureCoordinatorRpcs implements ProcedureCoordinatorRpcs {
|
||||||
logZKTree(this.baseZNode);
|
logZKTree(this.baseZNode);
|
||||||
if (isAcquiredPathNode(path)) {
|
if (isAcquiredPathNode(path)) {
|
||||||
// node wasn't present when we created the watch so zk event triggers acquire
|
// node wasn't present when we created the watch so zk event triggers acquire
|
||||||
listener.memberAcquiredBarrier(ZKUtil.getNodeName(ZKUtil.getParent(path)), ZKUtil.getNodeName(path));
|
coordinator.memberAcquiredBarrier(ZKUtil.getNodeName(ZKUtil.getParent(path)),
|
||||||
}
|
ZKUtil.getNodeName(path));
|
||||||
if (isReachedPathNode(path)) {
|
} else if (isReachedPathNode(path)) {
|
||||||
// node wasn't present when we created the watch so zk event triggers the finished barrier.
|
// node was absent when we created the watch so zk event triggers the finished barrier.
|
||||||
|
|
||||||
// TODO Nothing enforces that acquire and reached znodes from showing up in the wrong order.
|
// TODO Nothing enforces that acquire and reached znodes from showing up in wrong order.
|
||||||
listener.memberFinishedBarrier(ZKUtil.getNodeName(ZKUtil.getParent(path)), ZKUtil.getNodeName(path));
|
coordinator.memberFinishedBarrier(ZKUtil.getNodeName(ZKUtil.getParent(path)),
|
||||||
}
|
ZKUtil.getNodeName(path));
|
||||||
if (isAbortPathNode(path)) {
|
} else if (isAbortPathNode(path)) {
|
||||||
abort(path);
|
abort(path);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -62,7 +62,7 @@ public class ZKProcedureMemberRpcs implements ProcedureMemberRpcs {
|
||||||
private ZKProcedureUtil zkController;
|
private ZKProcedureUtil zkController;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Must call {@link #start(ProcedureMember)} before this is can be used.
|
* Must call {@link #start(ProcedureMember)} before this can be used.
|
||||||
* @param watcher {@link ZooKeeperWatcher} to be owned by <tt>this</tt>. Closed via
|
* @param watcher {@link ZooKeeperWatcher} to be owned by <tt>this</tt>. Closed via
|
||||||
* {@link #close()}.
|
* {@link #close()}.
|
||||||
* @param procType name of the znode describing the procedure type
|
* @param procType name of the znode describing the procedure type
|
||||||
|
@ -120,12 +120,6 @@ public class ZKProcedureMemberRpcs implements ProcedureMemberRpcs {
|
||||||
return zkController;
|
return zkController;
|
||||||
}
|
}
|
||||||
|
|
||||||
public void start() {
|
|
||||||
LOG.debug("Starting the procedure member");
|
|
||||||
watchForAbortedProcedures();
|
|
||||||
waitForNewProcedures();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public String getMemberName() {
|
public String getMemberName() {
|
||||||
return memberName;
|
return memberName;
|
||||||
|
@ -145,7 +139,8 @@ public class ZKProcedureMemberRpcs implements ProcedureMemberRpcs {
|
||||||
LOG.debug("Checking for aborted procedures on node: '" + zkController.getAbortZnode() + "'");
|
LOG.debug("Checking for aborted procedures on node: '" + zkController.getAbortZnode() + "'");
|
||||||
try {
|
try {
|
||||||
// this is the list of the currently aborted procedues
|
// this is the list of the currently aborted procedues
|
||||||
for (String node : ZKUtil.listChildrenAndWatchForNewChildren(zkController.getWatcher(), zkController.getAbortZnode())) {
|
for (String node : ZKUtil.listChildrenAndWatchForNewChildren(zkController.getWatcher(),
|
||||||
|
zkController.getAbortZnode())) {
|
||||||
String abortNode = ZKUtil.joinZNode(zkController.getAbortZnode(), node);
|
String abortNode = ZKUtil.joinZNode(zkController.getAbortZnode(), node);
|
||||||
abort(abortNode);
|
abort(abortNode);
|
||||||
}
|
}
|
||||||
|
@ -157,11 +152,12 @@ public class ZKProcedureMemberRpcs implements ProcedureMemberRpcs {
|
||||||
|
|
||||||
private void waitForNewProcedures() {
|
private void waitForNewProcedures() {
|
||||||
// watch for new procedues that we need to start subprocedures for
|
// watch for new procedues that we need to start subprocedures for
|
||||||
LOG.debug("Looking for new procedures under znode: '" + zkController.getAcquiredBarrier() + "'");
|
LOG.debug("Looking for new procedures under znode:'" + zkController.getAcquiredBarrier() + "'");
|
||||||
List<String> runningProcedure = null;
|
List<String> runningProcedures = null;
|
||||||
try {
|
try {
|
||||||
runningProcedure = ZKUtil.listChildrenAndWatchForNewChildren(zkController.getWatcher(), zkController.getAcquiredBarrier());
|
runningProcedures = ZKUtil.listChildrenAndWatchForNewChildren(zkController.getWatcher(),
|
||||||
if (runningProcedure == null) {
|
zkController.getAcquiredBarrier());
|
||||||
|
if (runningProcedures == null) {
|
||||||
LOG.debug("No running procedures.");
|
LOG.debug("No running procedures.");
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
@ -169,7 +165,11 @@ public class ZKProcedureMemberRpcs implements ProcedureMemberRpcs {
|
||||||
member.controllerConnectionFailure("General failure when watching for new procedures",
|
member.controllerConnectionFailure("General failure when watching for new procedures",
|
||||||
new IOException(e));
|
new IOException(e));
|
||||||
}
|
}
|
||||||
for (String procName : runningProcedure) {
|
if (runningProcedures == null) {
|
||||||
|
LOG.debug("No running procedures.");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
for (String procName : runningProcedures) {
|
||||||
// then read in the procedure information
|
// then read in the procedure information
|
||||||
String path = ZKUtil.joinZNode(zkController.getAcquiredBarrier(), procName);
|
String path = ZKUtil.joinZNode(zkController.getAcquiredBarrier(), procName);
|
||||||
startNewSubprocedure(path);
|
startNewSubprocedure(path);
|
||||||
|
@ -177,7 +177,7 @@ public class ZKProcedureMemberRpcs implements ProcedureMemberRpcs {
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Kick off a new procedure on the listener with the data stored in the passed znode.
|
* Kick off a new sub-procedure on the listener with the data stored in the passed znode.
|
||||||
* <p>
|
* <p>
|
||||||
* Will attempt to create the same procedure multiple times if an procedure znode with the same
|
* Will attempt to create the same procedure multiple times if an procedure znode with the same
|
||||||
* name is created. It is left up the coordinator to ensure this doesn't occur.
|
* name is created. It is left up the coordinator to ensure this doesn't occur.
|
||||||
|
@ -238,7 +238,8 @@ public class ZKProcedureMemberRpcs implements ProcedureMemberRpcs {
|
||||||
try {
|
try {
|
||||||
LOG.debug("Member: '" + memberName + "' joining acquired barrier for procedure (" + procName
|
LOG.debug("Member: '" + memberName + "' joining acquired barrier for procedure (" + procName
|
||||||
+ ") in zk");
|
+ ") in zk");
|
||||||
String acquiredZNode = ZKUtil.joinZNode(ZKProcedureUtil.getAcquireBarrierNode(zkController, procName), memberName);
|
String acquiredZNode = ZKUtil.joinZNode(ZKProcedureUtil.getAcquireBarrierNode(
|
||||||
|
zkController, procName), memberName);
|
||||||
ZKUtil.createAndFailSilent(zkController.getWatcher(), acquiredZNode);
|
ZKUtil.createAndFailSilent(zkController.getWatcher(), acquiredZNode);
|
||||||
|
|
||||||
// watch for the complete node for this snapshot
|
// watch for the complete node for this snapshot
|
||||||
|
@ -254,7 +255,7 @@ public class ZKProcedureMemberRpcs implements ProcedureMemberRpcs {
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This acts as the ack for a completed
|
* This acts as the ack for a completed snapshot
|
||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
public void sendMemberCompleted(Subprocedure sub) throws IOException {
|
public void sendMemberCompleted(Subprocedure sub) throws IOException {
|
||||||
|
@ -278,12 +279,12 @@ public class ZKProcedureMemberRpcs implements ProcedureMemberRpcs {
|
||||||
public void sendMemberAborted(Subprocedure sub, ForeignException ee) {
|
public void sendMemberAborted(Subprocedure sub, ForeignException ee) {
|
||||||
if (sub == null) {
|
if (sub == null) {
|
||||||
LOG.error("Failed due to null subprocedure", ee);
|
LOG.error("Failed due to null subprocedure", ee);
|
||||||
|
return;
|
||||||
}
|
}
|
||||||
String procName = sub.getName();
|
String procName = sub.getName();
|
||||||
LOG.debug("Aborting procedure (" + procName + ") in zk");
|
LOG.debug("Aborting procedure (" + procName + ") in zk");
|
||||||
String procAbortZNode = zkController.getAbortZNode(procName);
|
String procAbortZNode = zkController.getAbortZNode(procName);
|
||||||
try {
|
try {
|
||||||
LOG.debug("Creating abort znode:" + procAbortZNode);
|
|
||||||
String source = (ee.getSource() == null) ? memberName: ee.getSource();
|
String source = (ee.getSource() == null) ? memberName: ee.getSource();
|
||||||
byte[] errorInfo = ProtobufUtil.prependPBMagic(ForeignException.serialize(source, ee));
|
byte[] errorInfo = ProtobufUtil.prependPBMagic(ForeignException.serialize(source, ee));
|
||||||
ZKUtil.createAndFailSilent(zkController.getWatcher(), procAbortZNode, errorInfo);
|
ZKUtil.createAndFailSilent(zkController.getWatcher(), procAbortZNode, errorInfo);
|
||||||
|
@ -316,9 +317,10 @@ public class ZKProcedureMemberRpcs implements ProcedureMemberRpcs {
|
||||||
LOG.error(msg);
|
LOG.error(msg);
|
||||||
// we got a remote exception, but we can't describe it so just return exn from here
|
// we got a remote exception, but we can't describe it so just return exn from here
|
||||||
ee = new ForeignException(getMemberName(), new IllegalArgumentException(msg));
|
ee = new ForeignException(getMemberName(), new IllegalArgumentException(msg));
|
||||||
|
} else {
|
||||||
|
data = Arrays.copyOfRange(data, ProtobufUtil.lengthOfPBMagic(), data.length);
|
||||||
|
ee = ForeignException.deserialize(data);
|
||||||
}
|
}
|
||||||
data = Arrays.copyOfRange(data, ProtobufUtil.lengthOfPBMagic(), data.length);
|
|
||||||
ee = ForeignException.deserialize(data);
|
|
||||||
} catch (InvalidProtocolBufferException e) {
|
} catch (InvalidProtocolBufferException e) {
|
||||||
LOG.warn("Got an error notification for op:" + opName
|
LOG.warn("Got an error notification for op:" + opName
|
||||||
+ " but we can't read the information. Killing the procedure.");
|
+ " but we can't read the information. Killing the procedure.");
|
||||||
|
@ -336,7 +338,8 @@ public class ZKProcedureMemberRpcs implements ProcedureMemberRpcs {
|
||||||
public void start(ProcedureMember listener) {
|
public void start(ProcedureMember listener) {
|
||||||
LOG.debug("Starting procedure member '" + this.memberName + "'");
|
LOG.debug("Starting procedure member '" + this.memberName + "'");
|
||||||
this.member = listener;
|
this.member = listener;
|
||||||
this.start();
|
watchForAbortedProcedures();
|
||||||
|
waitForNewProcedures();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -234,7 +234,7 @@ public abstract class ZKProcedureUtil
|
||||||
* @param root name of the root directory in zk to print
|
* @param root name of the root directory in zk to print
|
||||||
* @throws KeeperException
|
* @throws KeeperException
|
||||||
*/
|
*/
|
||||||
public void logZKTree(String root) {
|
void logZKTree(String root) {
|
||||||
if (!LOG.isDebugEnabled()) return;
|
if (!LOG.isDebugEnabled()) return;
|
||||||
LOG.debug("Current zk system:");
|
LOG.debug("Current zk system:");
|
||||||
String prefix = "|-";
|
String prefix = "|-";
|
||||||
|
|
|
@ -34,7 +34,7 @@ import org.apache.hadoop.hbase.regionserver.snapshot.RegionServerSnapshotManager
|
||||||
import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
|
import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This online snapshot implementation forces uses the distributed procedure framework to force a
|
* This online snapshot implementation uses the distributed procedure framework to force a
|
||||||
* store flush and then records the hfiles. Its enter stage does nothing. Its leave stage then
|
* store flush and then records the hfiles. Its enter stage does nothing. Its leave stage then
|
||||||
* flushes the memstore, builds the region server's snapshot manifest from its hfiles list, and
|
* flushes the memstore, builds the region server's snapshot manifest from its hfiles list, and
|
||||||
* copies .regioninfos into the snapshot working directory. At the master side, there is an atomic
|
* copies .regioninfos into the snapshot working directory. At the master side, there is an atomic
|
||||||
|
@ -102,9 +102,9 @@ public class FlushSnapshotSubprocedure extends Subprocedure {
|
||||||
if (taskManager.hasTasks()) {
|
if (taskManager.hasTasks()) {
|
||||||
throw new IllegalStateException("Attempting to take snapshot "
|
throw new IllegalStateException("Attempting to take snapshot "
|
||||||
+ SnapshotDescriptionUtils.toString(snapshot)
|
+ SnapshotDescriptionUtils.toString(snapshot)
|
||||||
+ " but we have currently have outstanding tasks");
|
+ " but we currently have outstanding tasks");
|
||||||
}
|
}
|
||||||
|
|
||||||
// Add all hfiles already existing in region.
|
// Add all hfiles already existing in region.
|
||||||
for (HRegion region : regions) {
|
for (HRegion region : regions) {
|
||||||
// submit one task per region for parallelize by region.
|
// submit one task per region for parallelize by region.
|
||||||
|
|
|
@ -73,7 +73,7 @@ import com.google.protobuf.InvalidProtocolBufferException;
|
||||||
public class RegionServerSnapshotManager {
|
public class RegionServerSnapshotManager {
|
||||||
private static final Log LOG = LogFactory.getLog(RegionServerSnapshotManager.class);
|
private static final Log LOG = LogFactory.getLog(RegionServerSnapshotManager.class);
|
||||||
|
|
||||||
/** Maximum number of concurrent snapshot region tasks that can run concurrently */
|
/** Maximum number of snapshot region tasks that can run concurrently */
|
||||||
private static final String CONCURENT_SNAPSHOT_TASKS_KEY = "hbase.snapshot.region.concurrentTasks";
|
private static final String CONCURENT_SNAPSHOT_TASKS_KEY = "hbase.snapshot.region.concurrentTasks";
|
||||||
private static final int DEFAULT_CONCURRENT_SNAPSHOT_TASKS = 3;
|
private static final int DEFAULT_CONCURRENT_SNAPSHOT_TASKS = 3;
|
||||||
|
|
||||||
|
@ -212,8 +212,8 @@ public class RegionServerSnapshotManager {
|
||||||
/**
|
/**
|
||||||
* Determine if the snapshot should be handled on this server
|
* Determine if the snapshot should be handled on this server
|
||||||
*
|
*
|
||||||
* NOTE: This is racy -- the master expects a list of regionservers, but the regions get the
|
* NOTE: This is racy -- the master expects a list of regionservers.
|
||||||
* regions. This means if a region moves somewhere between the calls we'll miss some regions.
|
* This means if a region moves somewhere between the calls we'll miss some regions.
|
||||||
* For example, a region move during a snapshot could result in a region to be skipped or done
|
* For example, a region move during a snapshot could result in a region to be skipped or done
|
||||||
* twice. This is manageable because the {@link MasterSnapshotVerifier} will double check the
|
* twice. This is manageable because the {@link MasterSnapshotVerifier} will double check the
|
||||||
* region lists after the online portion of the snapshot completes and will explicitly fail the
|
* region lists after the online portion of the snapshot completes and will explicitly fail the
|
||||||
|
@ -297,7 +297,7 @@ public class RegionServerSnapshotManager {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Wait for all of the currently outstanding tasks submitted via {@link #submitTask(Callable)}.
|
* Wait for all of the currently outstanding tasks submitted via {@link #submitTask(Callable)}.
|
||||||
* This *must* be called to after all tasks are submitted via submitTask.
|
* This *must* be called after all tasks are submitted via submitTask.
|
||||||
*
|
*
|
||||||
* @return <tt>true</tt> on success, <tt>false</tt> otherwise
|
* @return <tt>true</tt> on success, <tt>false</tt> otherwise
|
||||||
* @throws InterruptedException
|
* @throws InterruptedException
|
||||||
|
@ -313,7 +313,7 @@ public class RegionServerSnapshotManager {
|
||||||
Future<Void> f = taskPool.take();
|
Future<Void> f = taskPool.take();
|
||||||
f.get();
|
f.get();
|
||||||
if (!futures.remove(f)) {
|
if (!futures.remove(f)) {
|
||||||
LOG.warn("unexpected future");
|
LOG.warn("unexpected future" + f);
|
||||||
}
|
}
|
||||||
LOG.debug("Completed " + (i+1) + "/" + sz + " local region snapshots.");
|
LOG.debug("Completed " + (i+1) + "/" + sz + " local region snapshots.");
|
||||||
}
|
}
|
||||||
|
|
|
@ -362,7 +362,8 @@ public final class ExportSnapshot extends Configured implements Tool {
|
||||||
* Extract the list of files (HFiles/HLogs) to copy using Map-Reduce.
|
* Extract the list of files (HFiles/HLogs) to copy using Map-Reduce.
|
||||||
* @return list of files referenced by the snapshot (pair of path and size)
|
* @return list of files referenced by the snapshot (pair of path and size)
|
||||||
*/
|
*/
|
||||||
private List<Pair<Path, Long>> getSnapshotFiles(final FileSystem fs, final Path snapshotDir) throws IOException {
|
private List<Pair<Path, Long>> getSnapshotFiles(final FileSystem fs, final Path snapshotDir)
|
||||||
|
throws IOException {
|
||||||
SnapshotDescription snapshotDesc = SnapshotDescriptionUtils.readSnapshotInfo(fs, snapshotDir);
|
SnapshotDescription snapshotDesc = SnapshotDescriptionUtils.readSnapshotInfo(fs, snapshotDir);
|
||||||
|
|
||||||
final List<Pair<Path, Long>> files = new ArrayList<Pair<Path, Long>>();
|
final List<Pair<Path, Long>> files = new ArrayList<Pair<Path, Long>>();
|
||||||
|
|
|
@ -95,7 +95,7 @@ import org.apache.hadoop.io.IOUtils;
|
||||||
*/
|
*/
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
public class RestoreSnapshotHelper {
|
public class RestoreSnapshotHelper {
|
||||||
private static final Log LOG = LogFactory.getLog(RestoreSnapshotHelper.class);
|
private static final Log LOG = LogFactory.getLog(RestoreSnapshotHelper.class);
|
||||||
|
|
||||||
private final Map<byte[], byte[]> regionsMap =
|
private final Map<byte[], byte[]> regionsMap =
|
||||||
new TreeMap<byte[], byte[]>(Bytes.BYTES_COMPARATOR);
|
new TreeMap<byte[], byte[]>(Bytes.BYTES_COMPARATOR);
|
||||||
|
@ -290,7 +290,7 @@ public class RestoreSnapshotHelper {
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Restore region by removing files not it in the snapshot
|
* Restore region by removing files not in the snapshot
|
||||||
* and adding the missing ones from the snapshot.
|
* and adding the missing ones from the snapshot.
|
||||||
*/
|
*/
|
||||||
private void restoreRegion(HRegionInfo regionInfo) throws IOException {
|
private void restoreRegion(HRegionInfo regionInfo) throws IOException {
|
||||||
|
|
|
@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.snapshot;
|
||||||
|
|
||||||
import java.util.concurrent.Callable;
|
import java.util.concurrent.Callable;
|
||||||
|
|
||||||
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.hbase.errorhandling.ForeignException;
|
import org.apache.hadoop.hbase.errorhandling.ForeignException;
|
||||||
import org.apache.hadoop.hbase.errorhandling.ForeignExceptionSnare;
|
import org.apache.hadoop.hbase.errorhandling.ForeignExceptionSnare;
|
||||||
import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher;
|
import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher;
|
||||||
|
@ -27,6 +28,7 @@ import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescriptio
|
||||||
/**
|
/**
|
||||||
* General snapshot operation taken on a regionserver
|
* General snapshot operation taken on a regionserver
|
||||||
*/
|
*/
|
||||||
|
@InterfaceAudience.Private
|
||||||
public abstract class SnapshotTask implements ForeignExceptionSnare, Callable<Void>{
|
public abstract class SnapshotTask implements ForeignExceptionSnare, Callable<Void>{
|
||||||
|
|
||||||
protected final SnapshotDescription snapshot;
|
protected final SnapshotDescription snapshot;
|
||||||
|
|
|
@ -24,7 +24,7 @@ import org.apache.hadoop.classification.InterfaceStability;
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Thrown if a table should be online/offline but is partial open
|
* Thrown if a table should be online/offline but is partially open
|
||||||
*/
|
*/
|
||||||
@InterfaceAudience.Public
|
@InterfaceAudience.Public
|
||||||
@InterfaceStability.Evolving
|
@InterfaceStability.Evolving
|
||||||
|
|
|
@ -1,42 +0,0 @@
|
||||||
/**
|
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one
|
|
||||||
* or more contributor license agreements. See the NOTICE file
|
|
||||||
* distributed with this work for additional information
|
|
||||||
* regarding copyright ownership. The ASF licenses this file
|
|
||||||
* to you under the Apache License, Version 2.0 (the
|
|
||||||
* "License"); you may not use this file except in compliance
|
|
||||||
* with the License. You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
package org.apache.hadoop.hbase.snapshot;
|
|
||||||
|
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
|
||||||
import org.apache.hadoop.classification.InterfaceStability;
|
|
||||||
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* General exception when an unexpected error occurs while running a snapshot.
|
|
||||||
*/
|
|
||||||
@SuppressWarnings("serial")
|
|
||||||
@InterfaceAudience.Public
|
|
||||||
@InterfaceStability.Evolving
|
|
||||||
public class UnexpectedSnapshotException extends HBaseSnapshotException {
|
|
||||||
|
|
||||||
/**
|
|
||||||
* General exception for some cause
|
|
||||||
* @param msg reason why the snapshot couldn't be completed
|
|
||||||
* @param cause root cause of the failure
|
|
||||||
* @param snapshot description of the snapshot attempted
|
|
||||||
*/
|
|
||||||
public UnexpectedSnapshotException(String msg, Exception cause, SnapshotDescription snapshot) {
|
|
||||||
super(msg, cause, snapshot);
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
|
@ -949,7 +949,7 @@ public abstract class FSUtils {
|
||||||
/**
|
/**
|
||||||
* A {@link PathFilter} that returns only regular files.
|
* A {@link PathFilter} that returns only regular files.
|
||||||
*/
|
*/
|
||||||
public static class FileFilter implements PathFilter {
|
static class FileFilter implements PathFilter {
|
||||||
private final FileSystem fs;
|
private final FileSystem fs;
|
||||||
|
|
||||||
public FileFilter(final FileSystem fs) {
|
public FileFilter(final FileSystem fs) {
|
||||||
|
@ -994,21 +994,6 @@ public abstract class FSUtils {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Filter out paths that are hidden (start with '.') and are not directories.
|
|
||||||
*/
|
|
||||||
public static class VisibleDirectory extends DirFilter {
|
|
||||||
|
|
||||||
public VisibleDirectory(FileSystem fs) {
|
|
||||||
super(fs);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public boolean accept(Path file) {
|
|
||||||
return super.accept(file) && !file.getName().startsWith(".");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Heuristic to determine whether is safe or not to open a file for append
|
* Heuristic to determine whether is safe or not to open a file for append
|
||||||
* Looks both for dfs.support.append and use reflection to search
|
* Looks both for dfs.support.append and use reflection to search
|
||||||
|
|
|
@ -118,7 +118,7 @@ public final class FSVisitor {
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Iterate over each region in the table the table and inform about recovered.edits
|
* Iterate over each region in the table and inform about recovered.edits
|
||||||
*
|
*
|
||||||
* @param fs {@link FileSystem}
|
* @param fs {@link FileSystem}
|
||||||
* @param tableDir {@link Path} to the table directory
|
* @param tableDir {@link Path} to the table directory
|
||||||
|
|
|
@ -45,6 +45,7 @@ import org.apache.hadoop.hbase.HTableDescriptor;
|
||||||
import org.apache.hadoop.hbase.backup.HFileArchiver;
|
import org.apache.hadoop.hbase.backup.HFileArchiver;
|
||||||
import org.apache.hadoop.hbase.catalog.CatalogTracker;
|
import org.apache.hadoop.hbase.catalog.CatalogTracker;
|
||||||
import org.apache.hadoop.hbase.catalog.MetaEditor;
|
import org.apache.hadoop.hbase.catalog.MetaEditor;
|
||||||
|
import org.apache.hadoop.hbase.client.Delete;
|
||||||
import org.apache.hadoop.hbase.master.AssignmentManager;
|
import org.apache.hadoop.hbase.master.AssignmentManager;
|
||||||
import org.apache.hadoop.hbase.regionserver.HRegion;
|
import org.apache.hadoop.hbase.regionserver.HRegion;
|
||||||
|
|
||||||
|
@ -187,26 +188,14 @@ public abstract class ModifyRegionUtils {
|
||||||
public static void deleteRegions(final Configuration conf, final FileSystem fs,
|
public static void deleteRegions(final Configuration conf, final FileSystem fs,
|
||||||
final CatalogTracker catalogTracker, final List<HRegionInfo> regions) throws IOException {
|
final CatalogTracker catalogTracker, final List<HRegionInfo> regions) throws IOException {
|
||||||
if (regions != null && regions.size() > 0) {
|
if (regions != null && regions.size() > 0) {
|
||||||
|
List<Delete> deletes = new ArrayList<Delete>(regions.size());
|
||||||
for (HRegionInfo hri: regions) {
|
for (HRegionInfo hri: regions) {
|
||||||
deleteRegion(conf, fs, catalogTracker, hri);
|
deletes.add(new Delete(hri.getRegionName()));
|
||||||
|
|
||||||
|
// "Delete" region from FS
|
||||||
|
HFileArchiver.archiveRegion(conf, fs, hri);
|
||||||
}
|
}
|
||||||
|
MetaEditor.deleteFromMetaTable(catalogTracker, deletes);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Remove region from file-system and .META.
|
|
||||||
* (The region must be offline).
|
|
||||||
*
|
|
||||||
* @param fs {@link FileSystem} on which to delete the region directory
|
|
||||||
* @param catalogTracker the catalog tracker
|
|
||||||
* @param regionInfo {@link HRegionInfo} to delete.
|
|
||||||
*/
|
|
||||||
public static void deleteRegion(final Configuration conf, final FileSystem fs,
|
|
||||||
final CatalogTracker catalogTracker, final HRegionInfo regionInfo) throws IOException {
|
|
||||||
// Remove region from .META.
|
|
||||||
MetaEditor.deleteRegion(catalogTracker, regionInfo);
|
|
||||||
|
|
||||||
// "Delete" region from FS
|
|
||||||
HFileArchiver.archiveRegion(conf, fs, regionInfo);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -964,7 +964,6 @@
|
||||||
The thread pool always has at least these number of threads so
|
The thread pool always has at least these number of threads so
|
||||||
the REST server is ready to serve incoming requests. The default
|
the REST server is ready to serve incoming requests. The default
|
||||||
is 2.
|
is 2.
|
||||||
>>>>>>> apache/trunk
|
|
||||||
</description>
|
</description>
|
||||||
</property>
|
</property>
|
||||||
</configuration>
|
</configuration>
|
||||||
|
|
|
@ -21,10 +21,11 @@ module Shell
|
||||||
class CloneSnapshot < Command
|
class CloneSnapshot < Command
|
||||||
def help
|
def help
|
||||||
return <<-EOF
|
return <<-EOF
|
||||||
Create a new table by cloning the snapshot content. Examples:
|
Create a new table by cloning the snapshot content.
|
||||||
There're no copies of data involved.
|
There're no copies of data involved.
|
||||||
And writing on the newly created table will not influence the snapshot data.
|
And writing on the newly created table will not influence the snapshot data.
|
||||||
|
|
||||||
|
Examples:
|
||||||
hbase> clone_snapshot 'snapshotName', 'tableName'
|
hbase> clone_snapshot 'snapshotName', 'tableName'
|
||||||
EOF
|
EOF
|
||||||
end
|
end
|
||||||
|
|
Loading…
Reference in New Issue