HBASE-15521 Procedure V2 - RestoreSnapshot and CloneSnapshot (Stephen Yuan Jiang)
This commit is contained in:
parent
ff6a339582
commit
e1d5c3d269
|
@ -1329,6 +1329,22 @@ public interface Admin extends Abortable, Closeable {
|
|||
*/
|
||||
void restoreSnapshot(final String snapshotName) throws IOException, RestoreSnapshotException;
|
||||
|
||||
/**
|
||||
* Restore the specified snapshot on the original table. (The table must be disabled) If the
|
||||
* "hbase.snapshot.restore.take.failsafe.snapshot" configuration property is set to true, a
|
||||
* snapshot of the current table is taken before executing the restore operation. In case of
|
||||
* restore failure, the failsafe snapshot will be restored. If the restore completes without
|
||||
* problem the failsafe snapshot is deleted.
|
||||
*
|
||||
* @param snapshotName name of the snapshot to restore
|
||||
* @throws IOException if a remote or network exception occurs
|
||||
* @throws RestoreSnapshotException if snapshot failed to be restored
|
||||
* @return the result of the async restore snapshot. You can use Future.get(long, TimeUnit)
|
||||
* to wait on the operation to complete.
|
||||
*/
|
||||
Future<Void> restoreSnapshotAsync(final String snapshotName)
|
||||
throws IOException, RestoreSnapshotException;
|
||||
|
||||
/**
|
||||
* Restore the specified snapshot on the original table. (The table must be disabled) If
|
||||
* 'takeFailSafeSnapshot' is set to true, a snapshot of the current table is taken before
|
||||
|
@ -1360,7 +1376,7 @@ public interface Admin extends Abortable, Closeable {
|
|||
* @throws RestoreSnapshotException if snapshot failed to be restored
|
||||
* @throws IllegalArgumentException if the restore request is formatted incorrectly
|
||||
*/
|
||||
void restoreSnapshot(final String snapshotName, boolean takeFailSafeSnapshot)
|
||||
void restoreSnapshot(final String snapshotName, final boolean takeFailSafeSnapshot)
|
||||
throws IOException, RestoreSnapshotException;
|
||||
|
||||
/**
|
||||
|
@ -1389,6 +1405,24 @@ public interface Admin extends Abortable, Closeable {
|
|||
void cloneSnapshot(final String snapshotName, final TableName tableName)
|
||||
throws IOException, TableExistsException, RestoreSnapshotException;
|
||||
|
||||
/**
|
||||
* Create a new table by cloning the snapshot content, but does not block
|
||||
* and wait for it be completely cloned.
|
||||
* You can use Future.get(long, TimeUnit) to wait on the operation to complete.
|
||||
* It may throw ExecutionException if there was an error while executing the operation
|
||||
* or TimeoutException in case the wait timeout was not long enough to allow the
|
||||
* operation to complete.
|
||||
*
|
||||
* @param snapshotName name of the snapshot to be cloned
|
||||
* @param tableName name of the table where the snapshot will be restored
|
||||
* @throws IOException if a remote or network exception occurs
|
||||
* @throws TableExistsException if table to be cloned already exists
|
||||
* @return the result of the async clone snapshot. You can use Future.get(long, TimeUnit)
|
||||
* to wait on the operation to complete.
|
||||
*/
|
||||
Future<Void> cloneSnapshotAsync(final String snapshotName, final TableName tableName)
|
||||
throws IOException, TableExistsException;
|
||||
|
||||
/**
|
||||
* Execute a distributed procedure on a cluster.
|
||||
*
|
||||
|
|
|
@ -1587,13 +1587,6 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
|
|||
return stub.restoreSnapshot(controller, request);
|
||||
}
|
||||
|
||||
@Override
|
||||
public MasterProtos.IsRestoreSnapshotDoneResponse isRestoreSnapshotDone(
|
||||
RpcController controller, MasterProtos.IsRestoreSnapshotDoneRequest request)
|
||||
throws ServiceException {
|
||||
return stub.isRestoreSnapshotDone(controller, request);
|
||||
}
|
||||
|
||||
@Override
|
||||
public MasterProtos.ExecProcedureResponse execProcedure(
|
||||
RpcController controller, MasterProtos.ExecProcedureRequest request)
|
||||
|
|
|
@ -127,8 +127,6 @@ import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetTableDescripto
|
|||
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetTableNamesRequest;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsProcedureDoneRequest;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsProcedureDoneResponse;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsRestoreSnapshotDoneRequest;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsRestoreSnapshotDoneResponse;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsSnapshotDoneRequest;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsSnapshotDoneResponse;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ListNamespaceDescriptorsRequest;
|
||||
|
@ -693,47 +691,6 @@ public class HBaseAdmin implements Admin {
|
|||
get(enableTableAsync(tableName), syncWaitTimeout, TimeUnit.MILLISECONDS);
|
||||
}
|
||||
|
||||
/**
|
||||
* Wait for the table to be enabled and available
|
||||
* If enabling the table exceeds the retry period, an exception is thrown.
|
||||
* @param tableName name of the table
|
||||
* @throws IOException if a remote or network exception occurs or
|
||||
* table is not enabled after the retries period.
|
||||
*/
|
||||
private void waitUntilTableIsEnabled(final TableName tableName) throws IOException {
|
||||
boolean enabled = false;
|
||||
long start = EnvironmentEdgeManager.currentTime();
|
||||
for (int tries = 0; tries < (this.numRetries * this.retryLongerMultiplier); tries++) {
|
||||
try {
|
||||
enabled = isTableEnabled(tableName);
|
||||
} catch (TableNotFoundException tnfe) {
|
||||
// wait for table to be created
|
||||
enabled = false;
|
||||
}
|
||||
enabled = enabled && isTableAvailable(tableName);
|
||||
if (enabled) {
|
||||
break;
|
||||
}
|
||||
long sleep = getPauseTime(tries);
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Sleeping= " + sleep + "ms, waiting for all regions to be " +
|
||||
"enabled in " + tableName);
|
||||
}
|
||||
try {
|
||||
Thread.sleep(sleep);
|
||||
} catch (InterruptedException e) {
|
||||
// Do this conversion rather than let it out because do not want to
|
||||
// change the method signature.
|
||||
throw (InterruptedIOException)new InterruptedIOException("Interrupted").initCause(e);
|
||||
}
|
||||
}
|
||||
if (!enabled) {
|
||||
long msec = EnvironmentEdgeManager.currentTime() - start;
|
||||
throw new IOException("Table '" + tableName +
|
||||
"' not yet enabled, after " + msec + "ms.");
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Future<Void> enableTableAsync(final TableName tableName) throws IOException {
|
||||
TableName.isLegalFullyQualifiedTableName(tableName.getName());
|
||||
|
@ -2430,8 +2387,14 @@ public class HBaseAdmin implements Admin {
|
|||
restoreSnapshot(Bytes.toString(snapshotName), takeFailSafeSnapshot);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void restoreSnapshot(final String snapshotName, boolean takeFailSafeSnapshot)
|
||||
/*
|
||||
* Check whether the snapshot exists and contains disabled table
|
||||
*
|
||||
* @param snapshotName name of the snapshot to restore
|
||||
* @throws IOException if a remote or network exception occurs
|
||||
* @throws RestoreSnapshotException if no valid snapshot is found
|
||||
*/
|
||||
private TableName getTableNameBeforeRestoreSnapshot(final String snapshotName)
|
||||
throws IOException, RestoreSnapshotException {
|
||||
TableName tableName = null;
|
||||
for (SnapshotDescription snapshotInfo: listSnapshots()) {
|
||||
|
@ -2445,6 +2408,13 @@ public class HBaseAdmin implements Admin {
|
|||
throw new RestoreSnapshotException(
|
||||
"Unable to find the table name for snapshot=" + snapshotName);
|
||||
}
|
||||
return tableName;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void restoreSnapshot(final String snapshotName, final boolean takeFailSafeSnapshot)
|
||||
throws IOException, RestoreSnapshotException {
|
||||
TableName tableName = getTableNameBeforeRestoreSnapshot(snapshotName);
|
||||
|
||||
// The table does not exists, switch to clone.
|
||||
if (!tableExists(tableName)) {
|
||||
|
@ -2472,13 +2442,19 @@ public class HBaseAdmin implements Admin {
|
|||
|
||||
try {
|
||||
// Restore snapshot
|
||||
internalRestoreSnapshot(snapshotName, tableName);
|
||||
get(
|
||||
internalRestoreSnapshotAsync(snapshotName, tableName),
|
||||
syncWaitTimeout,
|
||||
TimeUnit.MILLISECONDS);
|
||||
} catch (IOException e) {
|
||||
// Somthing went wrong during the restore...
|
||||
// if the pre-restore snapshot is available try to rollback
|
||||
if (takeFailSafeSnapshot) {
|
||||
try {
|
||||
internalRestoreSnapshot(failSafeSnapshotSnapshotName, tableName);
|
||||
get(
|
||||
internalRestoreSnapshotAsync(failSafeSnapshotSnapshotName, tableName),
|
||||
syncWaitTimeout,
|
||||
TimeUnit.MILLISECONDS);
|
||||
String msg = "Restore snapshot=" + snapshotName +
|
||||
" failed. Rollback to snapshot=" + failSafeSnapshotSnapshotName + " succeeded.";
|
||||
LOG.error(msg, e);
|
||||
|
@ -2504,6 +2480,24 @@ public class HBaseAdmin implements Admin {
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Future<Void> restoreSnapshotAsync(final String snapshotName)
|
||||
throws IOException, RestoreSnapshotException {
|
||||
TableName tableName = getTableNameBeforeRestoreSnapshot(snapshotName);
|
||||
|
||||
// The table does not exists, switch to clone.
|
||||
if (!tableExists(tableName)) {
|
||||
return cloneSnapshotAsync(snapshotName, tableName);
|
||||
}
|
||||
|
||||
// Check if the table is disabled
|
||||
if (!isTableDisabled(tableName)) {
|
||||
throw new TableNotDisabledException(tableName);
|
||||
}
|
||||
|
||||
return internalRestoreSnapshotAsync(snapshotName, tableName);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void cloneSnapshot(final byte[] snapshotName, final TableName tableName)
|
||||
throws IOException, TableExistsException, RestoreSnapshotException {
|
||||
|
@ -2516,8 +2510,19 @@ public class HBaseAdmin implements Admin {
|
|||
if (tableExists(tableName)) {
|
||||
throw new TableExistsException(tableName);
|
||||
}
|
||||
internalRestoreSnapshot(snapshotName, tableName);
|
||||
waitUntilTableIsEnabled(tableName);
|
||||
get(
|
||||
internalRestoreSnapshotAsync(snapshotName, tableName),
|
||||
Integer.MAX_VALUE,
|
||||
TimeUnit.MILLISECONDS);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Future<Void> cloneSnapshotAsync(final String snapshotName, final TableName tableName)
|
||||
throws IOException, TableExistsException {
|
||||
if (tableExists(tableName)) {
|
||||
throw new TableExistsException(tableName);
|
||||
}
|
||||
return internalRestoreSnapshotAsync(snapshotName, tableName);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -2632,73 +2637,59 @@ public class HBaseAdmin implements Admin {
|
|||
* @throws RestoreSnapshotException if snapshot failed to be restored
|
||||
* @throws IllegalArgumentException if the restore request is formatted incorrectly
|
||||
*/
|
||||
private void internalRestoreSnapshot(final String snapshotName, final TableName tableName)
|
||||
throws IOException, RestoreSnapshotException {
|
||||
SnapshotDescription snapshot = SnapshotDescription.newBuilder()
|
||||
private Future<Void> internalRestoreSnapshotAsync(
|
||||
final String snapshotName,
|
||||
final TableName tableName) throws IOException, RestoreSnapshotException {
|
||||
final SnapshotDescription snapshot = SnapshotDescription.newBuilder()
|
||||
.setName(snapshotName).setTable(tableName.getNameAsString()).build();
|
||||
|
||||
// actually restore the snapshot
|
||||
internalRestoreSnapshotAsync(snapshot);
|
||||
|
||||
final IsRestoreSnapshotDoneRequest request = IsRestoreSnapshotDoneRequest.newBuilder()
|
||||
.setSnapshot(snapshot).build();
|
||||
IsRestoreSnapshotDoneResponse done = IsRestoreSnapshotDoneResponse.newBuilder()
|
||||
.setDone(false).buildPartial();
|
||||
final long maxPauseTime = 5000;
|
||||
int tries = 0;
|
||||
while (!done.getDone()) {
|
||||
try {
|
||||
// sleep a backoff <= pauseTime amount
|
||||
long sleep = getPauseTime(tries++);
|
||||
sleep = sleep > maxPauseTime ? maxPauseTime : sleep;
|
||||
LOG.debug(tries + ") Sleeping: " + sleep
|
||||
+ " ms while we wait for snapshot restore to complete.");
|
||||
Thread.sleep(sleep);
|
||||
} catch (InterruptedException e) {
|
||||
throw (InterruptedIOException)new InterruptedIOException("Interrupted").initCause(e);
|
||||
}
|
||||
LOG.debug("Getting current status of snapshot restore from master...");
|
||||
done = executeCallable(new MasterCallable<IsRestoreSnapshotDoneResponse>(
|
||||
getConnection()) {
|
||||
@Override
|
||||
public IsRestoreSnapshotDoneResponse call(int callTimeout) throws ServiceException {
|
||||
PayloadCarryingRpcController controller = rpcControllerFactory.newController();
|
||||
controller.setCallTimeout(callTimeout);
|
||||
return master.isRestoreSnapshotDone(controller, request);
|
||||
}
|
||||
});
|
||||
}
|
||||
if (!done.getDone()) {
|
||||
throw new RestoreSnapshotException("Snapshot '" + snapshot.getName() + "' wasn't restored.");
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Execute Restore/Clone snapshot and wait for the server to complete (asynchronous)
|
||||
* <p>
|
||||
* Only a single snapshot should be restored at a time, or results may be undefined.
|
||||
* @param snapshot snapshot to restore
|
||||
* @return response from the server indicating the max time to wait for the snapshot
|
||||
* @throws IOException if a remote or network exception occurs
|
||||
* @throws RestoreSnapshotException if snapshot failed to be restored
|
||||
* @throws IllegalArgumentException if the restore request is formatted incorrectly
|
||||
*/
|
||||
private RestoreSnapshotResponse internalRestoreSnapshotAsync(final SnapshotDescription snapshot)
|
||||
throws IOException, RestoreSnapshotException {
|
||||
ClientSnapshotDescriptionUtils.assertSnapshotRequestIsValid(snapshot);
|
||||
|
||||
final RestoreSnapshotRequest request = RestoreSnapshotRequest.newBuilder().setSnapshot(snapshot)
|
||||
.build();
|
||||
|
||||
// run the snapshot restore on the master
|
||||
return executeCallable(new MasterCallable<RestoreSnapshotResponse>(getConnection()) {
|
||||
RestoreSnapshotResponse response = executeCallable(
|
||||
new MasterCallable<RestoreSnapshotResponse>(getConnection()) {
|
||||
@Override
|
||||
public RestoreSnapshotResponse call(int callTimeout) throws ServiceException {
|
||||
final RestoreSnapshotRequest request = RestoreSnapshotRequest.newBuilder()
|
||||
.setSnapshot(snapshot)
|
||||
.setNonceGroup(ng.getNonceGroup())
|
||||
.setNonce(ng.newNonce())
|
||||
.build();
|
||||
PayloadCarryingRpcController controller = rpcControllerFactory.newController();
|
||||
controller.setCallTimeout(callTimeout);
|
||||
return master.restoreSnapshot(controller, request);
|
||||
}
|
||||
});
|
||||
|
||||
return new RestoreSnapshotFuture(
|
||||
this, snapshot, TableName.valueOf(snapshot.getTable()), response);
|
||||
}
|
||||
|
||||
private static class RestoreSnapshotFuture extends TableFuture<Void> {
|
||||
public RestoreSnapshotFuture(
|
||||
final HBaseAdmin admin,
|
||||
final SnapshotDescription snapshot,
|
||||
final TableName tableName,
|
||||
final RestoreSnapshotResponse response) {
|
||||
super(admin, tableName,
|
||||
(response != null && response.hasProcId()) ? response.getProcId() : null);
|
||||
|
||||
if (response != null && !response.hasProcId()) {
|
||||
throw new UnsupportedOperationException("Client could not call old version of Server");
|
||||
}
|
||||
}
|
||||
|
||||
public RestoreSnapshotFuture(
|
||||
final HBaseAdmin admin,
|
||||
final TableName tableName,
|
||||
final Long procId) {
|
||||
super(admin, tableName, procId);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getOperationType() {
|
||||
return "MODIFY";
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -694,7 +694,7 @@ public class ProcedureExecutor<TEnvironment> {
|
|||
* @return true if the procedure execution is finished, otherwise false.
|
||||
*/
|
||||
public boolean isFinished(final long procId) {
|
||||
return completed.containsKey(procId);
|
||||
return !procedures.containsKey(procId);
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
File diff suppressed because it is too large
Load Diff
File diff suppressed because it is too large
Load Diff
|
@ -370,9 +370,12 @@ message DeleteSnapshotResponse {
|
|||
|
||||
message RestoreSnapshotRequest {
|
||||
required SnapshotDescription snapshot = 1;
|
||||
optional uint64 nonce_group = 2 [default = 0];
|
||||
optional uint64 nonce = 3 [default = 0];
|
||||
}
|
||||
|
||||
message RestoreSnapshotResponse {
|
||||
required uint64 proc_id = 1;
|
||||
}
|
||||
|
||||
/* if you don't send the snapshot, then you will get it back
|
||||
|
@ -734,11 +737,6 @@ service MasterService {
|
|||
*/
|
||||
rpc RestoreSnapshot(RestoreSnapshotRequest) returns(RestoreSnapshotResponse);
|
||||
|
||||
/**
|
||||
* Determine if the snapshot restore is done yet.
|
||||
*/
|
||||
rpc IsRestoreSnapshotDone(IsRestoreSnapshotDoneRequest) returns(IsRestoreSnapshotDoneResponse);
|
||||
|
||||
/**
|
||||
* Execute a distributed procedure.
|
||||
*/
|
||||
|
|
|
@ -222,6 +222,46 @@ message DisableTableStateData {
|
|||
required bool skip_table_state_check = 3;
|
||||
}
|
||||
|
||||
message RestoreParentToChildRegionsPair {
|
||||
required string parent_region_name = 1;
|
||||
required string child1_region_name = 2;
|
||||
required string child2_region_name = 3;
|
||||
}
|
||||
|
||||
enum CloneSnapshotState {
|
||||
CLONE_SNAPSHOT_PRE_OPERATION = 1;
|
||||
CLONE_SNAPSHOT_WRITE_FS_LAYOUT = 2;
|
||||
CLONE_SNAPSHOT_ADD_TO_META = 3;
|
||||
CLONE_SNAPSHOT_ASSIGN_REGIONS = 4;
|
||||
CLONE_SNAPSHOT_UPDATE_DESC_CACHE = 5;
|
||||
CLONE_SNAPSHOT_POST_OPERATION = 6;
|
||||
}
|
||||
|
||||
message CloneSnapshotStateData {
|
||||
required UserInformation user_info = 1;
|
||||
required SnapshotDescription snapshot = 2;
|
||||
required TableSchema table_schema = 3;
|
||||
repeated RegionInfo region_info = 4;
|
||||
repeated RestoreParentToChildRegionsPair parent_to_child_regions_pair_list = 5;
|
||||
}
|
||||
|
||||
enum RestoreSnapshotState {
|
||||
RESTORE_SNAPSHOT_PRE_OPERATION = 1;
|
||||
RESTORE_SNAPSHOT_UPDATE_TABLE_DESCRIPTOR = 2;
|
||||
RESTORE_SNAPSHOT_WRITE_FS_LAYOUT = 3;
|
||||
RESTORE_SNAPSHOT_UPDATE_META = 4;
|
||||
}
|
||||
|
||||
message RestoreSnapshotStateData {
|
||||
required UserInformation user_info = 1;
|
||||
required SnapshotDescription snapshot = 2;
|
||||
required TableSchema modified_table_schema = 3;
|
||||
repeated RegionInfo region_info_for_restore = 4;
|
||||
repeated RegionInfo region_info_for_remove = 5;
|
||||
repeated RegionInfo region_info_for_add = 6;
|
||||
repeated RestoreParentToChildRegionsPair parent_to_child_regions_pair_list = 7;
|
||||
}
|
||||
|
||||
message ServerCrashStateData {
|
||||
required ServerName server_name = 1;
|
||||
optional bool distributed_log_replay = 2;
|
||||
|
|
|
@ -915,33 +915,6 @@ public class MasterRpcServices extends RSRpcServices
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the status of the requested snapshot restore/clone operation.
|
||||
* This method is not exposed to the user, it is just used internally by HBaseAdmin
|
||||
* to verify if the restore is completed.
|
||||
*
|
||||
* No exceptions are thrown if the restore is not running, the result will be "done".
|
||||
*
|
||||
* @return done <tt>true</tt> if the restore/clone operation is completed.
|
||||
* @throws ServiceException if the operation failed.
|
||||
*/
|
||||
@Override
|
||||
public IsRestoreSnapshotDoneResponse isRestoreSnapshotDone(RpcController controller,
|
||||
IsRestoreSnapshotDoneRequest request) throws ServiceException {
|
||||
try {
|
||||
master.checkInitialized();
|
||||
SnapshotDescription snapshot = request.getSnapshot();
|
||||
IsRestoreSnapshotDoneResponse.Builder builder = IsRestoreSnapshotDoneResponse.newBuilder();
|
||||
boolean done = master.snapshotManager.isRestoreDone(snapshot);
|
||||
builder.setDone(done);
|
||||
return builder.build();
|
||||
} catch (ForeignException e) {
|
||||
throw new ServiceException(e.getCause());
|
||||
} catch (IOException e) {
|
||||
throw new ServiceException(e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Checks if the specified snapshot is done.
|
||||
* @return true if the snapshot is in file system ready to use,
|
||||
|
@ -1215,8 +1188,9 @@ public class MasterRpcServices extends RSRpcServices
|
|||
TableName dstTable = TableName.valueOf(request.getSnapshot().getTable());
|
||||
master.getNamespace(dstTable.getNamespaceAsString());
|
||||
SnapshotDescription reqSnapshot = request.getSnapshot();
|
||||
master.snapshotManager.restoreSnapshot(reqSnapshot);
|
||||
return RestoreSnapshotResponse.newBuilder().build();
|
||||
long procId = master.snapshotManager.restoreOrCloneSnapshot(
|
||||
reqSnapshot, request.getNonceGroup(), request.getNonce());
|
||||
return RestoreSnapshotResponse.newBuilder().setProcId(procId).build();
|
||||
} catch (ForeignException e) {
|
||||
throw new ServiceException(e.getCause());
|
||||
} catch (IOException e) {
|
||||
|
|
|
@ -0,0 +1,522 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.hbase.master.procedure;
|
||||
|
||||
import java.io.InputStream;
|
||||
import java.io.OutputStream;
|
||||
import java.io.IOException;
|
||||
import java.security.PrivilegedExceptionAction;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.HRegionInfo;
|
||||
import org.apache.hadoop.hbase.HTableDescriptor;
|
||||
import org.apache.hadoop.hbase.MetaTableAccessor;
|
||||
import org.apache.hadoop.hbase.TableDescriptor;
|
||||
import org.apache.hadoop.hbase.TableExistsException;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.errorhandling.ForeignException;
|
||||
import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher;
|
||||
import org.apache.hadoop.hbase.master.MasterCoprocessorHost;
|
||||
import org.apache.hadoop.hbase.master.MasterFileSystem;
|
||||
import org.apache.hadoop.hbase.master.MetricsSnapshot;
|
||||
import org.apache.hadoop.hbase.master.procedure.CreateTableProcedure.CreateHdfsRegions;
|
||||
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
|
||||
import org.apache.hadoop.hbase.monitoring.TaskMonitor;
|
||||
import org.apache.hadoop.hbase.procedure2.StateMachineProcedure;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.MasterProcedureProtos;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.MasterProcedureProtos.CloneSnapshotState;
|
||||
import org.apache.hadoop.hbase.util.FSTableDescriptors;
|
||||
import org.apache.hadoop.hbase.util.FSUtils;
|
||||
import org.apache.hadoop.hbase.util.Pair;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
|
||||
import org.apache.hadoop.hbase.snapshot.ClientSnapshotDescriptionUtils;
|
||||
import org.apache.hadoop.hbase.snapshot.RestoreSnapshotException;
|
||||
import org.apache.hadoop.hbase.snapshot.RestoreSnapshotHelper;
|
||||
import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
|
||||
import org.apache.hadoop.hbase.snapshot.SnapshotManifest;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
|
||||
@InterfaceAudience.Private
|
||||
public class CloneSnapshotProcedure
|
||||
extends StateMachineProcedure<MasterProcedureEnv, CloneSnapshotState>
|
||||
implements TableProcedureInterface {
|
||||
private static final Log LOG = LogFactory.getLog(CloneSnapshotProcedure.class);
|
||||
|
||||
private final AtomicBoolean aborted = new AtomicBoolean(false);
|
||||
|
||||
private UserGroupInformation user;
|
||||
private HTableDescriptor hTableDescriptor;
|
||||
private SnapshotDescription snapshot;
|
||||
private List<HRegionInfo> newRegions = null;
|
||||
private Map<String, Pair<String, String> > parentsToChildrenPairMap =
|
||||
new HashMap<String, Pair<String, String>>();
|
||||
|
||||
// Monitor
|
||||
private MonitoredTask monitorStatus = null;
|
||||
|
||||
private Boolean traceEnabled = null;
|
||||
|
||||
/**
|
||||
* Constructor (for failover)
|
||||
*/
|
||||
public CloneSnapshotProcedure() {
|
||||
}
|
||||
|
||||
/**
|
||||
* Constructor
|
||||
* @param env MasterProcedureEnv
|
||||
* @param hTableDescriptor the table to operate on
|
||||
* @param snapshot snapshot to clone from
|
||||
* @throws IOException
|
||||
*/
|
||||
public CloneSnapshotProcedure(
|
||||
final MasterProcedureEnv env,
|
||||
final HTableDescriptor hTableDescriptor,
|
||||
final SnapshotDescription snapshot)
|
||||
throws IOException {
|
||||
this.hTableDescriptor = hTableDescriptor;
|
||||
this.snapshot = snapshot;
|
||||
this.user = env.getRequestUser().getUGI();
|
||||
this.setOwner(this.user.getShortUserName());
|
||||
|
||||
getMonitorStatus();
|
||||
}
|
||||
|
||||
/**
|
||||
* Set up monitor status if it is not created.
|
||||
*/
|
||||
private MonitoredTask getMonitorStatus() {
|
||||
if (monitorStatus == null) {
|
||||
monitorStatus = TaskMonitor.get().createStatus("Cloning snapshot '" + snapshot.getName() +
|
||||
"' to table " + getTableName());
|
||||
}
|
||||
return monitorStatus;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Flow executeFromState(final MasterProcedureEnv env, final CloneSnapshotState state)
|
||||
throws InterruptedException {
|
||||
if (isTraceEnabled()) {
|
||||
LOG.trace(this + " execute state=" + state);
|
||||
}
|
||||
try {
|
||||
switch (state) {
|
||||
case CLONE_SNAPSHOT_PRE_OPERATION:
|
||||
// Verify if we can clone the table
|
||||
prepareClone(env);
|
||||
|
||||
preCloneSnapshot(env);
|
||||
setNextState(CloneSnapshotState.CLONE_SNAPSHOT_WRITE_FS_LAYOUT);
|
||||
break;
|
||||
case CLONE_SNAPSHOT_WRITE_FS_LAYOUT:
|
||||
newRegions = createFilesystemLayout(env, hTableDescriptor, newRegions);
|
||||
setNextState(CloneSnapshotState.CLONE_SNAPSHOT_ADD_TO_META);
|
||||
break;
|
||||
case CLONE_SNAPSHOT_ADD_TO_META:
|
||||
addRegionsToMeta(env);
|
||||
setNextState(CloneSnapshotState.CLONE_SNAPSHOT_ASSIGN_REGIONS);
|
||||
break;
|
||||
case CLONE_SNAPSHOT_ASSIGN_REGIONS:
|
||||
CreateTableProcedure.assignRegions(env, getTableName(), newRegions);
|
||||
setNextState(CloneSnapshotState.CLONE_SNAPSHOT_UPDATE_DESC_CACHE);
|
||||
break;
|
||||
case CLONE_SNAPSHOT_UPDATE_DESC_CACHE:
|
||||
CreateTableProcedure.updateTableDescCache(env, getTableName());
|
||||
setNextState(CloneSnapshotState.CLONE_SNAPSHOT_POST_OPERATION);
|
||||
break;
|
||||
case CLONE_SNAPSHOT_POST_OPERATION:
|
||||
postCloneSnapshot(env);
|
||||
|
||||
MetricsSnapshot metricsSnapshot = new MetricsSnapshot();
|
||||
metricsSnapshot.addSnapshotClone(
|
||||
getMonitorStatus().getCompletionTimestamp() - getMonitorStatus().getStartTime());
|
||||
getMonitorStatus().markComplete("Clone snapshot '"+ snapshot.getName() +"' completed!");
|
||||
return Flow.NO_MORE_STATE;
|
||||
default:
|
||||
throw new UnsupportedOperationException("unhandled state=" + state);
|
||||
}
|
||||
} catch (IOException e) {
|
||||
LOG.error("Error trying to create table=" + getTableName() + " state=" + state, e);
|
||||
setFailure("master-create-table", e);
|
||||
}
|
||||
return Flow.HAS_MORE_STATE;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void rollbackState(final MasterProcedureEnv env, final CloneSnapshotState state)
|
||||
throws IOException {
|
||||
if (isTraceEnabled()) {
|
||||
LOG.trace(this + " rollback state=" + state);
|
||||
}
|
||||
try {
|
||||
switch (state) {
|
||||
case CLONE_SNAPSHOT_POST_OPERATION:
|
||||
// TODO-MAYBE: call the deleteTable coprocessor event?
|
||||
break;
|
||||
case CLONE_SNAPSHOT_UPDATE_DESC_CACHE:
|
||||
DeleteTableProcedure.deleteTableDescriptorCache(env, getTableName());
|
||||
break;
|
||||
case CLONE_SNAPSHOT_ASSIGN_REGIONS:
|
||||
DeleteTableProcedure.deleteAssignmentState(env, getTableName());
|
||||
break;
|
||||
case CLONE_SNAPSHOT_ADD_TO_META:
|
||||
DeleteTableProcedure.deleteFromMeta(env, getTableName(), newRegions);
|
||||
break;
|
||||
case CLONE_SNAPSHOT_WRITE_FS_LAYOUT:
|
||||
DeleteTableProcedure.deleteFromFs(env, getTableName(), newRegions, false);
|
||||
break;
|
||||
case CLONE_SNAPSHOT_PRE_OPERATION:
|
||||
DeleteTableProcedure.deleteTableStates(env, getTableName());
|
||||
// TODO-MAYBE: call the deleteTable coprocessor event?
|
||||
break;
|
||||
default:
|
||||
throw new UnsupportedOperationException("unhandled state=" + state);
|
||||
}
|
||||
} catch (IOException e) {
|
||||
// This will be retried. Unless there is a bug in the code,
|
||||
// this should be just a "temporary error" (e.g. network down)
|
||||
LOG.warn("Failed rollback attempt step=" + state + " table=" + getTableName(), e);
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected CloneSnapshotState getState(final int stateId) {
|
||||
return CloneSnapshotState.valueOf(stateId);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected int getStateId(final CloneSnapshotState state) {
|
||||
return state.getNumber();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected CloneSnapshotState getInitialState() {
|
||||
return CloneSnapshotState.CLONE_SNAPSHOT_PRE_OPERATION;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void setNextState(final CloneSnapshotState state) {
|
||||
if (aborted.get()) {
|
||||
setAbortFailure("clone-snapshot", "abort requested");
|
||||
} else {
|
||||
super.setNextState(state);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public TableName getTableName() {
|
||||
return hTableDescriptor.getTableName();
|
||||
}
|
||||
|
||||
@Override
|
||||
public TableOperationType getTableOperationType() {
|
||||
return TableOperationType.CREATE; // Clone is creating a table
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean abort(final MasterProcedureEnv env) {
|
||||
aborted.set(true);
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void toStringClassDetails(StringBuilder sb) {
|
||||
sb.append(getClass().getSimpleName());
|
||||
sb.append(" (table=");
|
||||
sb.append(getTableName());
|
||||
sb.append(" snapshot=");
|
||||
sb.append(snapshot);
|
||||
sb.append(")");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void serializeStateData(final OutputStream stream) throws IOException {
|
||||
super.serializeStateData(stream);
|
||||
|
||||
MasterProcedureProtos.CloneSnapshotStateData.Builder cloneSnapshotMsg =
|
||||
MasterProcedureProtos.CloneSnapshotStateData.newBuilder()
|
||||
.setUserInfo(MasterProcedureUtil.toProtoUserInfo(this.user))
|
||||
.setSnapshot(this.snapshot)
|
||||
.setTableSchema(hTableDescriptor.convert());
|
||||
if (newRegions != null) {
|
||||
for (HRegionInfo hri: newRegions) {
|
||||
cloneSnapshotMsg.addRegionInfo(HRegionInfo.convert(hri));
|
||||
}
|
||||
}
|
||||
if (!parentsToChildrenPairMap.isEmpty()) {
|
||||
final Iterator<Map.Entry<String, Pair<String, String>>> it =
|
||||
parentsToChildrenPairMap.entrySet().iterator();
|
||||
while (it.hasNext()) {
|
||||
final Map.Entry<String, Pair<String, String>> entry = it.next();
|
||||
|
||||
MasterProcedureProtos.RestoreParentToChildRegionsPair.Builder parentToChildrenPair =
|
||||
MasterProcedureProtos.RestoreParentToChildRegionsPair.newBuilder()
|
||||
.setParentRegionName(entry.getKey())
|
||||
.setChild1RegionName(entry.getValue().getFirst())
|
||||
.setChild2RegionName(entry.getValue().getSecond());
|
||||
cloneSnapshotMsg.addParentToChildRegionsPairList(parentToChildrenPair);
|
||||
}
|
||||
}
|
||||
cloneSnapshotMsg.build().writeDelimitedTo(stream);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void deserializeStateData(final InputStream stream) throws IOException {
|
||||
super.deserializeStateData(stream);
|
||||
|
||||
MasterProcedureProtos.CloneSnapshotStateData cloneSnapshotMsg =
|
||||
MasterProcedureProtos.CloneSnapshotStateData.parseDelimitedFrom(stream);
|
||||
user = MasterProcedureUtil.toUserInfo(cloneSnapshotMsg.getUserInfo());
|
||||
snapshot = cloneSnapshotMsg.getSnapshot();
|
||||
hTableDescriptor = HTableDescriptor.convert(cloneSnapshotMsg.getTableSchema());
|
||||
if (cloneSnapshotMsg.getRegionInfoCount() == 0) {
|
||||
newRegions = null;
|
||||
} else {
|
||||
newRegions = new ArrayList<HRegionInfo>(cloneSnapshotMsg.getRegionInfoCount());
|
||||
for (HBaseProtos.RegionInfo hri: cloneSnapshotMsg.getRegionInfoList()) {
|
||||
newRegions.add(HRegionInfo.convert(hri));
|
||||
}
|
||||
}
|
||||
if (cloneSnapshotMsg.getParentToChildRegionsPairListCount() > 0) {
|
||||
parentsToChildrenPairMap = new HashMap<String, Pair<String, String>>();
|
||||
for (MasterProcedureProtos.RestoreParentToChildRegionsPair parentToChildrenPair:
|
||||
cloneSnapshotMsg.getParentToChildRegionsPairListList()) {
|
||||
parentsToChildrenPairMap.put(
|
||||
parentToChildrenPair.getParentRegionName(),
|
||||
new Pair<String, String>(
|
||||
parentToChildrenPair.getChild1RegionName(),
|
||||
parentToChildrenPair.getChild2RegionName()));
|
||||
}
|
||||
}
|
||||
// Make sure that the monitor status is set up
|
||||
getMonitorStatus();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean acquireLock(final MasterProcedureEnv env) {
|
||||
if (env.waitInitialized(this)) {
|
||||
return false;
|
||||
}
|
||||
return env.getProcedureQueue().tryAcquireTableExclusiveLock(this, getTableName());
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void releaseLock(final MasterProcedureEnv env) {
|
||||
env.getProcedureQueue().releaseTableExclusiveLock(this, getTableName());
|
||||
}
|
||||
|
||||
/**
|
||||
* Action before any real action of cloning from snapshot.
|
||||
* @param env MasterProcedureEnv
|
||||
* @throws IOException
|
||||
*/
|
||||
private void prepareClone(final MasterProcedureEnv env) throws IOException {
|
||||
final TableName tableName = getTableName();
|
||||
if (MetaTableAccessor.tableExists(env.getMasterServices().getConnection(), tableName)) {
|
||||
throw new TableExistsException(getTableName());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Action before cloning from snapshot.
|
||||
* @param env MasterProcedureEnv
|
||||
* @throws IOException
|
||||
* @throws InterruptedException
|
||||
*/
|
||||
private void preCloneSnapshot(final MasterProcedureEnv env)
|
||||
throws IOException, InterruptedException {
|
||||
if (!getTableName().isSystemTable()) {
|
||||
// Check and update namespace quota
|
||||
final MasterFileSystem mfs = env.getMasterServices().getMasterFileSystem();
|
||||
|
||||
SnapshotManifest manifest = SnapshotManifest.open(
|
||||
env.getMasterConfiguration(),
|
||||
mfs.getFileSystem(),
|
||||
SnapshotDescriptionUtils.getCompletedSnapshotDir(snapshot, mfs.getRootDir()),
|
||||
snapshot);
|
||||
|
||||
ProcedureSyncWait.getMasterQuotaManager(env)
|
||||
.checkNamespaceTableAndRegionQuota(getTableName(), manifest.getRegionManifestsMap().size());
|
||||
}
|
||||
|
||||
final MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost();
|
||||
if (cpHost != null) {
|
||||
user.doAs(new PrivilegedExceptionAction<Void>() {
|
||||
@Override
|
||||
public Void run() throws Exception {
|
||||
cpHost.preCreateTableHandler(hTableDescriptor, null);
|
||||
return null;
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Action after cloning from snapshot.
|
||||
* @param env MasterProcedureEnv
|
||||
* @throws IOException
|
||||
* @throws InterruptedException
|
||||
*/
|
||||
private void postCloneSnapshot(final MasterProcedureEnv env)
|
||||
throws IOException, InterruptedException {
|
||||
final MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost();
|
||||
if (cpHost != null) {
|
||||
final HRegionInfo[] regions = (newRegions == null) ? null :
|
||||
newRegions.toArray(new HRegionInfo[newRegions.size()]);
|
||||
user.doAs(new PrivilegedExceptionAction<Void>() {
|
||||
@Override
|
||||
public Void run() throws Exception {
|
||||
cpHost.postCreateTableHandler(hTableDescriptor, regions);
|
||||
return null;
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Create regions in file system.
|
||||
* @param env MasterProcedureEnv
|
||||
* @throws IOException
|
||||
*/
|
||||
private List<HRegionInfo> createFilesystemLayout(
|
||||
final MasterProcedureEnv env,
|
||||
final HTableDescriptor hTableDescriptor,
|
||||
final List<HRegionInfo> newRegions) throws IOException {
|
||||
return createFsLayout(env, hTableDescriptor, newRegions, new CreateHdfsRegions() {
|
||||
@Override
|
||||
public List<HRegionInfo> createHdfsRegions(
|
||||
final MasterProcedureEnv env,
|
||||
final Path tableRootDir, final TableName tableName,
|
||||
final List<HRegionInfo> newRegions) throws IOException {
|
||||
|
||||
final MasterFileSystem mfs = env.getMasterServices().getMasterFileSystem();
|
||||
final FileSystem fs = mfs.getFileSystem();
|
||||
final Path rootDir = mfs.getRootDir();
|
||||
final Configuration conf = env.getMasterConfiguration();
|
||||
final ForeignExceptionDispatcher monitorException = new ForeignExceptionDispatcher();
|
||||
|
||||
getMonitorStatus().setStatus("Clone snapshot - creating regions for table: " + tableName);
|
||||
|
||||
try {
|
||||
// 1. Execute the on-disk Clone
|
||||
Path snapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(snapshot, rootDir);
|
||||
SnapshotManifest manifest = SnapshotManifest.open(conf, fs, snapshotDir, snapshot);
|
||||
RestoreSnapshotHelper restoreHelper = new RestoreSnapshotHelper(
|
||||
conf, fs, manifest, hTableDescriptor, tableRootDir, monitorException, monitorStatus);
|
||||
RestoreSnapshotHelper.RestoreMetaChanges metaChanges = restoreHelper.restoreHdfsRegions();
|
||||
|
||||
// Clone operation should not have stuff to restore or remove
|
||||
Preconditions.checkArgument(
|
||||
!metaChanges.hasRegionsToRestore(), "A clone should not have regions to restore");
|
||||
Preconditions.checkArgument(
|
||||
!metaChanges.hasRegionsToRemove(), "A clone should not have regions to remove");
|
||||
|
||||
// At this point the clone is complete. Next step is enabling the table.
|
||||
String msg =
|
||||
"Clone snapshot="+ snapshot.getName() +" on table=" + tableName + " completed!";
|
||||
LOG.info(msg);
|
||||
monitorStatus.setStatus(msg + " Waiting for table to be enabled...");
|
||||
|
||||
// 2. Let the next step to add the regions to meta
|
||||
return metaChanges.getRegionsToAdd();
|
||||
} catch (Exception e) {
|
||||
String msg = "clone snapshot=" + ClientSnapshotDescriptionUtils.toString(snapshot) +
|
||||
" failed because " + e.getMessage();
|
||||
LOG.error(msg, e);
|
||||
IOException rse = new RestoreSnapshotException(msg, e, snapshot);
|
||||
|
||||
// these handlers aren't futures so we need to register the error here.
|
||||
monitorException.receive(new ForeignException("Master CloneSnapshotProcedure", rse));
|
||||
throw rse;
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Create region layout in file system.
|
||||
* @param env MasterProcedureEnv
|
||||
* @throws IOException
|
||||
*/
|
||||
private List<HRegionInfo> createFsLayout(
|
||||
final MasterProcedureEnv env,
|
||||
final HTableDescriptor hTableDescriptor,
|
||||
List<HRegionInfo> newRegions,
|
||||
final CreateHdfsRegions hdfsRegionHandler) throws IOException {
|
||||
final MasterFileSystem mfs = env.getMasterServices().getMasterFileSystem();
|
||||
final Path tempdir = mfs.getTempDir();
|
||||
|
||||
// 1. Create Table Descriptor
|
||||
// using a copy of descriptor, table will be created enabling first
|
||||
TableDescriptor underConstruction = new TableDescriptor(hTableDescriptor);
|
||||
final Path tempTableDir = FSUtils.getTableDir(tempdir, hTableDescriptor.getTableName());
|
||||
((FSTableDescriptors)(env.getMasterServices().getTableDescriptors()))
|
||||
.createTableDescriptorForTableDirectory(tempTableDir, underConstruction, false);
|
||||
|
||||
// 2. Create Regions
|
||||
newRegions = hdfsRegionHandler.createHdfsRegions(
|
||||
env, tempdir, hTableDescriptor.getTableName(), newRegions);
|
||||
|
||||
// 3. Move Table temp directory to the hbase root location
|
||||
CreateTableProcedure.moveTempDirectoryToHBaseRoot(env, hTableDescriptor, tempTableDir);
|
||||
|
||||
return newRegions;
|
||||
}
|
||||
|
||||
/**
|
||||
* Add regions to hbase:meta table.
|
||||
* @param env MasterProcedureEnv
|
||||
* @throws IOException
|
||||
*/
|
||||
private void addRegionsToMeta(final MasterProcedureEnv env) throws IOException {
|
||||
newRegions = CreateTableProcedure.addTableToMeta(env, hTableDescriptor, newRegions);
|
||||
|
||||
RestoreSnapshotHelper.RestoreMetaChanges metaChanges =
|
||||
new RestoreSnapshotHelper.RestoreMetaChanges(
|
||||
hTableDescriptor, parentsToChildrenPairMap);
|
||||
metaChanges.updateMetaParentRegions(env.getMasterServices().getConnection(), newRegions);
|
||||
}
|
||||
|
||||
/**
|
||||
* The procedure could be restarted from a different machine. If the variable is null, we need to
|
||||
* retrieve it.
|
||||
* @return traceEnabled
|
||||
*/
|
||||
private Boolean isTraceEnabled() {
|
||||
if (traceEnabled == null) {
|
||||
traceEnabled = LOG.isTraceEnabled();
|
||||
}
|
||||
return traceEnabled;
|
||||
}
|
||||
}
|
|
@ -299,7 +299,8 @@ public class CreateTableProcedure
|
|||
throws IOException, InterruptedException {
|
||||
if (!getTableName().isSystemTable()) {
|
||||
ProcedureSyncWait.getMasterQuotaManager(env)
|
||||
.checkNamespaceTableAndRegionQuota(getTableName(), newRegions.size());
|
||||
.checkNamespaceTableAndRegionQuota(
|
||||
getTableName(), (newRegions != null ? newRegions.size() : 0));
|
||||
}
|
||||
|
||||
final MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost();
|
||||
|
@ -373,6 +374,16 @@ public class CreateTableProcedure
|
|||
hTableDescriptor.getTableName(), newRegions);
|
||||
|
||||
// 3. Move Table temp directory to the hbase root location
|
||||
moveTempDirectoryToHBaseRoot(env, hTableDescriptor, tempTableDir);
|
||||
|
||||
return newRegions;
|
||||
}
|
||||
|
||||
protected static void moveTempDirectoryToHBaseRoot(
|
||||
final MasterProcedureEnv env,
|
||||
final HTableDescriptor hTableDescriptor,
|
||||
final Path tempTableDir) throws IOException {
|
||||
final MasterFileSystem mfs = env.getMasterServices().getMasterFileSystem();
|
||||
final Path tableDir = FSUtils.getTableDir(mfs.getRootDir(), hTableDescriptor.getTableName());
|
||||
FileSystem fs = mfs.getFileSystem();
|
||||
if (!fs.delete(tableDir, true) && fs.exists(tableDir)) {
|
||||
|
@ -382,7 +393,6 @@ public class CreateTableProcedure
|
|||
throw new IOException("Unable to move table from temp=" + tempTableDir +
|
||||
" to hbase root=" + tableDir);
|
||||
}
|
||||
return newRegions;
|
||||
}
|
||||
|
||||
protected static List<HRegionInfo> addTableToMeta(final MasterProcedureEnv env,
|
||||
|
@ -446,7 +456,7 @@ public class CreateTableProcedure
|
|||
/**
|
||||
* Add the specified set of regions to the hbase:meta table.
|
||||
*/
|
||||
protected static void addRegionsToMeta(final MasterProcedureEnv env,
|
||||
private static void addRegionsToMeta(final MasterProcedureEnv env,
|
||||
final HTableDescriptor hTableDescriptor,
|
||||
final List<HRegionInfo> regionInfos) throws IOException {
|
||||
MetaTableAccessor.addRegionsToMeta(env.getMasterServices().getConnection(),
|
||||
|
|
|
@ -165,4 +165,17 @@ public final class MasterDDLOperationHelper {
|
|||
}
|
||||
return done;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the region info list of a table from meta if it is not already known by the caller.
|
||||
**/
|
||||
public static List<HRegionInfo> getRegionInfoList(
|
||||
final MasterProcedureEnv env,
|
||||
final TableName tableName,
|
||||
List<HRegionInfo> regionInfoList) throws IOException {
|
||||
if (regionInfoList == null) {
|
||||
regionInfoList = ProcedureSyncWait.getRegionsFromMeta(env, tableName);
|
||||
}
|
||||
return regionInfoList;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,526 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.hbase.master.procedure;
|
||||
|
||||
import java.io.InputStream;
|
||||
import java.io.OutputStream;
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.DoNotRetryIOException;
|
||||
import org.apache.hadoop.hbase.HRegionInfo;
|
||||
import org.apache.hadoop.hbase.HTableDescriptor;
|
||||
import org.apache.hadoop.hbase.MetaTableAccessor;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.TableNotFoundException;
|
||||
import org.apache.hadoop.hbase.client.Connection;
|
||||
import org.apache.hadoop.hbase.errorhandling.ForeignException;
|
||||
import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher;
|
||||
import org.apache.hadoop.hbase.master.MasterFileSystem;
|
||||
import org.apache.hadoop.hbase.master.MetricsSnapshot;
|
||||
import org.apache.hadoop.hbase.master.RegionStates;
|
||||
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
|
||||
import org.apache.hadoop.hbase.monitoring.TaskMonitor;
|
||||
import org.apache.hadoop.hbase.procedure2.StateMachineProcedure;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.MasterProcedureProtos;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.MasterProcedureProtos.RestoreSnapshotState;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
|
||||
import org.apache.hadoop.hbase.snapshot.ClientSnapshotDescriptionUtils;
|
||||
import org.apache.hadoop.hbase.snapshot.RestoreSnapshotHelper;
|
||||
import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
|
||||
import org.apache.hadoop.hbase.snapshot.SnapshotManifest;
|
||||
import org.apache.hadoop.hbase.util.Pair;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
|
||||
@InterfaceAudience.Private
|
||||
public class RestoreSnapshotProcedure
|
||||
extends StateMachineProcedure<MasterProcedureEnv, RestoreSnapshotState>
|
||||
implements TableProcedureInterface {
|
||||
private static final Log LOG = LogFactory.getLog(RestoreSnapshotProcedure.class);
|
||||
|
||||
private final AtomicBoolean aborted = new AtomicBoolean(false);
|
||||
|
||||
private HTableDescriptor modifiedHTableDescriptor;
|
||||
private List<HRegionInfo> regionsToRestore = null;
|
||||
private List<HRegionInfo> regionsToRemove = null;
|
||||
private List<HRegionInfo> regionsToAdd = null;
|
||||
private Map<String, Pair<String, String>> parentsToChildrenPairMap =
|
||||
new HashMap<String, Pair<String, String>>();
|
||||
|
||||
private UserGroupInformation user;
|
||||
private SnapshotDescription snapshot;
|
||||
|
||||
// Monitor
|
||||
private MonitoredTask monitorStatus = null;
|
||||
|
||||
private Boolean traceEnabled = null;
|
||||
|
||||
/**
|
||||
* Constructor (for failover)
|
||||
*/
|
||||
public RestoreSnapshotProcedure() {
|
||||
}
|
||||
|
||||
/**
|
||||
* Constructor
|
||||
* @param env MasterProcedureEnv
|
||||
* @param hTableDescriptor the table to operate on
|
||||
* @param snapshot snapshot to restore from
|
||||
* @throws IOException
|
||||
*/
|
||||
public RestoreSnapshotProcedure(
|
||||
final MasterProcedureEnv env,
|
||||
final HTableDescriptor hTableDescriptor,
|
||||
final SnapshotDescription snapshot)
|
||||
throws IOException {
|
||||
// This is the new schema we are going to write out as this modification.
|
||||
this.modifiedHTableDescriptor = hTableDescriptor;
|
||||
// Snapshot information
|
||||
this.snapshot = snapshot;
|
||||
// User and owner information
|
||||
this.user = env.getRequestUser().getUGI();
|
||||
this.setOwner(this.user.getShortUserName());
|
||||
|
||||
// Monitor
|
||||
getMonitorStatus();
|
||||
}
|
||||
|
||||
/**
|
||||
* Set up monitor status if it is not created.
|
||||
*/
|
||||
private MonitoredTask getMonitorStatus() {
|
||||
if (monitorStatus == null) {
|
||||
monitorStatus = TaskMonitor.get().createStatus("Restoring snapshot '" + snapshot.getName()
|
||||
+ "' to table " + getTableName());
|
||||
}
|
||||
return monitorStatus;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Flow executeFromState(final MasterProcedureEnv env, final RestoreSnapshotState state)
|
||||
throws InterruptedException {
|
||||
if (isTraceEnabled()) {
|
||||
LOG.trace(this + " execute state=" + state);
|
||||
}
|
||||
|
||||
// Make sure that the monitor status is set up
|
||||
getMonitorStatus();
|
||||
|
||||
try {
|
||||
switch (state) {
|
||||
case RESTORE_SNAPSHOT_PRE_OPERATION:
|
||||
// Verify if we can restore the table
|
||||
prepareRestore(env);
|
||||
setNextState(RestoreSnapshotState.RESTORE_SNAPSHOT_UPDATE_TABLE_DESCRIPTOR);
|
||||
break;
|
||||
case RESTORE_SNAPSHOT_UPDATE_TABLE_DESCRIPTOR:
|
||||
updateTableDescriptor(env);
|
||||
setNextState(RestoreSnapshotState.RESTORE_SNAPSHOT_WRITE_FS_LAYOUT);
|
||||
break;
|
||||
case RESTORE_SNAPSHOT_WRITE_FS_LAYOUT:
|
||||
restoreSnapshot(env);
|
||||
setNextState(RestoreSnapshotState.RESTORE_SNAPSHOT_UPDATE_META);
|
||||
break;
|
||||
case RESTORE_SNAPSHOT_UPDATE_META:
|
||||
updateMETA(env);
|
||||
return Flow.NO_MORE_STATE;
|
||||
default:
|
||||
throw new UnsupportedOperationException("unhandled state=" + state);
|
||||
}
|
||||
} catch (IOException e) {
|
||||
LOG.error("Error trying to restore snapshot=" + getTableName() + " state=" + state, e);
|
||||
setFailure("master-restore-snapshot", e);
|
||||
}
|
||||
return Flow.HAS_MORE_STATE;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void rollbackState(final MasterProcedureEnv env, final RestoreSnapshotState state)
|
||||
throws IOException {
|
||||
if (isTraceEnabled()) {
|
||||
LOG.trace(this + " rollback state=" + state);
|
||||
}
|
||||
|
||||
if (state == RestoreSnapshotState.RESTORE_SNAPSHOT_PRE_OPERATION) {
|
||||
// nothing to rollback
|
||||
return;
|
||||
}
|
||||
|
||||
// The restore snapshot doesn't have a rollback. The execution will succeed, at some point.
|
||||
throw new UnsupportedOperationException("unhandled state=" + state);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected RestoreSnapshotState getState(final int stateId) {
|
||||
return RestoreSnapshotState.valueOf(stateId);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected int getStateId(final RestoreSnapshotState state) {
|
||||
return state.getNumber();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected RestoreSnapshotState getInitialState() {
|
||||
return RestoreSnapshotState.RESTORE_SNAPSHOT_PRE_OPERATION;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void setNextState(final RestoreSnapshotState state) {
|
||||
if (aborted.get()) {
|
||||
setAbortFailure("create-table", "abort requested");
|
||||
} else {
|
||||
super.setNextState(state);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public TableName getTableName() {
|
||||
return modifiedHTableDescriptor.getTableName();
|
||||
}
|
||||
|
||||
@Override
|
||||
public TableOperationType getTableOperationType() {
|
||||
return TableOperationType.EDIT; // Restore is modifying a table
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean abort(final MasterProcedureEnv env) {
|
||||
aborted.set(true);
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void toStringClassDetails(StringBuilder sb) {
|
||||
sb.append(getClass().getSimpleName());
|
||||
sb.append(" (table=");
|
||||
sb.append(getTableName());
|
||||
sb.append(" snapshot=");
|
||||
sb.append(snapshot);
|
||||
sb.append(")");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void serializeStateData(final OutputStream stream) throws IOException {
|
||||
super.serializeStateData(stream);
|
||||
|
||||
MasterProcedureProtos.RestoreSnapshotStateData.Builder restoreSnapshotMsg =
|
||||
MasterProcedureProtos.RestoreSnapshotStateData.newBuilder()
|
||||
.setUserInfo(MasterProcedureUtil.toProtoUserInfo(this.user))
|
||||
.setSnapshot(this.snapshot)
|
||||
.setModifiedTableSchema(modifiedHTableDescriptor.convert());
|
||||
|
||||
if (regionsToRestore != null) {
|
||||
for (HRegionInfo hri: regionsToRestore) {
|
||||
restoreSnapshotMsg.addRegionInfoForRestore(HRegionInfo.convert(hri));
|
||||
}
|
||||
}
|
||||
if (regionsToRemove != null) {
|
||||
for (HRegionInfo hri: regionsToRemove) {
|
||||
restoreSnapshotMsg.addRegionInfoForRemove(HRegionInfo.convert(hri));
|
||||
}
|
||||
}
|
||||
if (regionsToAdd != null) {
|
||||
for (HRegionInfo hri: regionsToAdd) {
|
||||
restoreSnapshotMsg.addRegionInfoForAdd(HRegionInfo.convert(hri));
|
||||
}
|
||||
}
|
||||
if (!parentsToChildrenPairMap.isEmpty()) {
|
||||
final Iterator<Map.Entry<String, Pair<String, String>>> it =
|
||||
parentsToChildrenPairMap.entrySet().iterator();
|
||||
while (it.hasNext()) {
|
||||
final Map.Entry<String, Pair<String, String>> entry = it.next();
|
||||
|
||||
MasterProcedureProtos.RestoreParentToChildRegionsPair.Builder parentToChildrenPair =
|
||||
MasterProcedureProtos.RestoreParentToChildRegionsPair.newBuilder()
|
||||
.setParentRegionName(entry.getKey())
|
||||
.setChild1RegionName(entry.getValue().getFirst())
|
||||
.setChild2RegionName(entry.getValue().getSecond());
|
||||
restoreSnapshotMsg.addParentToChildRegionsPairList (parentToChildrenPair);
|
||||
}
|
||||
}
|
||||
restoreSnapshotMsg.build().writeDelimitedTo(stream);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void deserializeStateData(final InputStream stream) throws IOException {
|
||||
super.deserializeStateData(stream);
|
||||
|
||||
MasterProcedureProtos.RestoreSnapshotStateData restoreSnapshotMsg =
|
||||
MasterProcedureProtos.RestoreSnapshotStateData.parseDelimitedFrom(stream);
|
||||
user = MasterProcedureUtil.toUserInfo(restoreSnapshotMsg.getUserInfo());
|
||||
snapshot = restoreSnapshotMsg.getSnapshot();
|
||||
modifiedHTableDescriptor =
|
||||
HTableDescriptor.convert(restoreSnapshotMsg.getModifiedTableSchema());
|
||||
|
||||
if (restoreSnapshotMsg.getRegionInfoForRestoreCount() == 0) {
|
||||
regionsToRestore = null;
|
||||
} else {
|
||||
regionsToRestore =
|
||||
new ArrayList<HRegionInfo>(restoreSnapshotMsg.getRegionInfoForRestoreCount());
|
||||
for (HBaseProtos.RegionInfo hri: restoreSnapshotMsg.getRegionInfoForRestoreList()) {
|
||||
regionsToRestore.add(HRegionInfo.convert(hri));
|
||||
}
|
||||
}
|
||||
if (restoreSnapshotMsg.getRegionInfoForRemoveCount() == 0) {
|
||||
regionsToRemove = null;
|
||||
} else {
|
||||
regionsToRemove =
|
||||
new ArrayList<HRegionInfo>(restoreSnapshotMsg.getRegionInfoForRemoveCount());
|
||||
for (HBaseProtos.RegionInfo hri: restoreSnapshotMsg.getRegionInfoForRemoveList()) {
|
||||
regionsToRemove.add(HRegionInfo.convert(hri));
|
||||
}
|
||||
}
|
||||
if (restoreSnapshotMsg.getRegionInfoForAddCount() == 0) {
|
||||
regionsToAdd = null;
|
||||
} else {
|
||||
regionsToAdd = new ArrayList<HRegionInfo>(restoreSnapshotMsg.getRegionInfoForAddCount());
|
||||
for (HBaseProtos.RegionInfo hri: restoreSnapshotMsg.getRegionInfoForAddList()) {
|
||||
regionsToAdd.add(HRegionInfo.convert(hri));
|
||||
}
|
||||
}
|
||||
if (restoreSnapshotMsg.getParentToChildRegionsPairListCount() > 0) {
|
||||
for (MasterProcedureProtos.RestoreParentToChildRegionsPair parentToChildrenPair:
|
||||
restoreSnapshotMsg.getParentToChildRegionsPairListList()) {
|
||||
parentsToChildrenPairMap.put(
|
||||
parentToChildrenPair.getParentRegionName(),
|
||||
new Pair<String, String>(
|
||||
parentToChildrenPair.getChild1RegionName(),
|
||||
parentToChildrenPair.getChild2RegionName()));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean acquireLock(final MasterProcedureEnv env) {
|
||||
if (env.waitInitialized(this)) {
|
||||
return false;
|
||||
}
|
||||
return env.getProcedureQueue().tryAcquireTableExclusiveLock(this, getTableName());
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void releaseLock(final MasterProcedureEnv env) {
|
||||
env.getProcedureQueue().releaseTableExclusiveLock(this, getTableName());
|
||||
}
|
||||
|
||||
/**
|
||||
* Action before any real action of restoring from snapshot.
|
||||
* @param env MasterProcedureEnv
|
||||
* @throws IOException
|
||||
*/
|
||||
private void prepareRestore(final MasterProcedureEnv env) throws IOException {
|
||||
final TableName tableName = getTableName();
|
||||
// Checks whether the table exists
|
||||
if (!MetaTableAccessor.tableExists(env.getMasterServices().getConnection(), tableName)) {
|
||||
throw new TableNotFoundException(tableName);
|
||||
}
|
||||
|
||||
// Check whether table is disabled.
|
||||
env.getMasterServices().checkTableModifiable(tableName);
|
||||
|
||||
// Check that we have at least 1 CF
|
||||
if (modifiedHTableDescriptor.getColumnFamilies().length == 0) {
|
||||
throw new DoNotRetryIOException("Table " + getTableName().toString() +
|
||||
" should have at least one column family.");
|
||||
}
|
||||
|
||||
if (!getTableName().isSystemTable()) {
|
||||
// Table already exist. Check and update the region quota for this table namespace.
|
||||
final MasterFileSystem mfs = env.getMasterServices().getMasterFileSystem();
|
||||
SnapshotManifest manifest = SnapshotManifest.open(
|
||||
env.getMasterConfiguration(),
|
||||
mfs.getFileSystem(),
|
||||
SnapshotDescriptionUtils.getCompletedSnapshotDir(snapshot, mfs.getRootDir()),
|
||||
snapshot);
|
||||
int snapshotRegionCount = manifest.getRegionManifestsMap().size();
|
||||
int tableRegionCount =
|
||||
ProcedureSyncWait.getMasterQuotaManager(env).getRegionCountOfTable(tableName);
|
||||
|
||||
if (snapshotRegionCount > 0 && tableRegionCount != snapshotRegionCount) {
|
||||
ProcedureSyncWait.getMasterQuotaManager(env).checkAndUpdateNamespaceRegionQuota(
|
||||
tableName, snapshotRegionCount);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Update descriptor
|
||||
* @param env MasterProcedureEnv
|
||||
* @throws IOException
|
||||
**/
|
||||
private void updateTableDescriptor(final MasterProcedureEnv env) throws IOException {
|
||||
env.getMasterServices().getTableDescriptors().add(modifiedHTableDescriptor);
|
||||
}
|
||||
|
||||
/**
|
||||
* Execute the on-disk Restore
|
||||
* @param env MasterProcedureEnv
|
||||
* @throws IOException
|
||||
**/
|
||||
private void restoreSnapshot(final MasterProcedureEnv env) throws IOException {
|
||||
MasterFileSystem fileSystemManager = env.getMasterServices().getMasterFileSystem();
|
||||
FileSystem fs = fileSystemManager.getFileSystem();
|
||||
Path rootDir = fileSystemManager.getRootDir();
|
||||
final ForeignExceptionDispatcher monitorException = new ForeignExceptionDispatcher();
|
||||
|
||||
LOG.info("Starting restore snapshot=" + ClientSnapshotDescriptionUtils.toString(snapshot));
|
||||
try {
|
||||
Path snapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(snapshot, rootDir);
|
||||
SnapshotManifest manifest = SnapshotManifest.open(
|
||||
env.getMasterServices().getConfiguration(), fs, snapshotDir, snapshot);
|
||||
RestoreSnapshotHelper restoreHelper = new RestoreSnapshotHelper(
|
||||
env.getMasterServices().getConfiguration(),
|
||||
fs,
|
||||
manifest,
|
||||
modifiedHTableDescriptor,
|
||||
rootDir,
|
||||
monitorException,
|
||||
getMonitorStatus());
|
||||
|
||||
RestoreSnapshotHelper.RestoreMetaChanges metaChanges = restoreHelper.restoreHdfsRegions();
|
||||
regionsToRestore = metaChanges.getRegionsToRestore();
|
||||
regionsToRemove = metaChanges.getRegionsToRemove();
|
||||
regionsToAdd = metaChanges.getRegionsToAdd();
|
||||
parentsToChildrenPairMap = metaChanges.getParentToChildrenPairMap();
|
||||
} catch (IOException e) {
|
||||
String msg = "restore snapshot=" + ClientSnapshotDescriptionUtils.toString(snapshot)
|
||||
+ " failed in on-disk restore. Try re-running the restore command.";
|
||||
LOG.error(msg, e);
|
||||
monitorException.receive(
|
||||
new ForeignException(env.getMasterServices().getServerName().toString(), e));
|
||||
throw new IOException(msg, e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Apply changes to hbase:meta
|
||||
* @param env MasterProcedureEnv
|
||||
* @throws IOException
|
||||
**/
|
||||
private void updateMETA(final MasterProcedureEnv env) throws IOException {
|
||||
try {
|
||||
Connection conn = env.getMasterServices().getConnection();
|
||||
|
||||
// 1. Forces all the RegionStates to be offline
|
||||
//
|
||||
// The AssignmentManager keeps all the region states around
|
||||
// with no possibility to remove them, until the master is restarted.
|
||||
// This means that a region marked as SPLIT before the restore will never be assigned again.
|
||||
// To avoid having all states around all the regions are switched to the OFFLINE state,
|
||||
// which is the same state that the regions will be after a delete table.
|
||||
forceRegionsOffline(env, regionsToAdd);
|
||||
forceRegionsOffline(env, regionsToRestore);
|
||||
forceRegionsOffline(env, regionsToRemove);
|
||||
|
||||
getMonitorStatus().setStatus("Preparing to restore each region");
|
||||
|
||||
// 2. Applies changes to hbase:meta
|
||||
// (2.1). Removes the current set of regions from META
|
||||
//
|
||||
// By removing also the regions to restore (the ones present both in the snapshot
|
||||
// and in the current state) we ensure that no extra fields are present in META
|
||||
// e.g. with a simple add addRegionToMeta() the splitA and splitB attributes
|
||||
// not overwritten/removed, so you end up with old informations
|
||||
// that are not correct after the restore.
|
||||
if (regionsToRemove != null) {
|
||||
MetaTableAccessor.deleteRegions(conn, regionsToRemove);
|
||||
}
|
||||
|
||||
// (2.2). Add the new set of regions to META
|
||||
//
|
||||
// At this point the old regions are no longer present in META.
|
||||
// and the set of regions present in the snapshot will be written to META.
|
||||
// All the information in hbase:meta are coming from the .regioninfo of each region present
|
||||
// in the snapshot folder.
|
||||
if (regionsToAdd != null) {
|
||||
MetaTableAccessor.addRegionsToMeta(
|
||||
conn,
|
||||
regionsToAdd,
|
||||
modifiedHTableDescriptor.getRegionReplication());
|
||||
}
|
||||
|
||||
if (regionsToRestore != null) {
|
||||
MetaTableAccessor.overwriteRegions(
|
||||
conn,
|
||||
regionsToRestore,
|
||||
modifiedHTableDescriptor.getRegionReplication());
|
||||
}
|
||||
|
||||
RestoreSnapshotHelper.RestoreMetaChanges metaChanges =
|
||||
new RestoreSnapshotHelper.RestoreMetaChanges(
|
||||
modifiedHTableDescriptor, parentsToChildrenPairMap);
|
||||
metaChanges.updateMetaParentRegions(conn, regionsToAdd);
|
||||
|
||||
// At this point the restore is complete.
|
||||
LOG.info("Restore snapshot=" + ClientSnapshotDescriptionUtils.toString(snapshot) +
|
||||
" on table=" + getTableName() + " completed!");
|
||||
} catch (IOException e) {
|
||||
final ForeignExceptionDispatcher monitorException = new ForeignExceptionDispatcher();
|
||||
String msg = "restore snapshot=" + ClientSnapshotDescriptionUtils.toString(snapshot)
|
||||
+ " failed in meta update. Try re-running the restore command.";
|
||||
LOG.error(msg, e);
|
||||
monitorException.receive(
|
||||
new ForeignException(env.getMasterServices().getServerName().toString(), e));
|
||||
throw new IOException(msg, e);
|
||||
}
|
||||
|
||||
monitorStatus.markComplete("Restore snapshot '"+ snapshot.getName() +"'!");
|
||||
MetricsSnapshot metricsSnapshot = new MetricsSnapshot();
|
||||
metricsSnapshot.addSnapshotRestore(
|
||||
monitorStatus.getCompletionTimestamp() - monitorStatus.getStartTime());
|
||||
}
|
||||
|
||||
/**
|
||||
* Make sure that region states of the region list is in OFFLINE state.
|
||||
* @param env MasterProcedureEnv
|
||||
* @param hris region info list
|
||||
**/
|
||||
private void forceRegionsOffline(final MasterProcedureEnv env, final List<HRegionInfo> hris) {
|
||||
RegionStates states = env.getMasterServices().getAssignmentManager().getRegionStates();
|
||||
if (hris != null) {
|
||||
for (HRegionInfo hri: hris) {
|
||||
states.regionOffline(hri);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* The procedure could be restarted from a different machine. If the variable is null, we need to
|
||||
* retrieve it.
|
||||
* @return traceEnabled
|
||||
*/
|
||||
private Boolean isTraceEnabled() {
|
||||
if (traceEnabled == null) {
|
||||
traceEnabled = LOG.isTraceEnabled();
|
||||
}
|
||||
return traceEnabled;
|
||||
}
|
||||
}
|
|
@ -1,195 +0,0 @@
|
|||
/**
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.hbase.master.snapshot;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.CancellationException;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.HRegionInfo;
|
||||
import org.apache.hadoop.hbase.HTableDescriptor;
|
||||
import org.apache.hadoop.hbase.NotAllMetaRegionsOnlineException;
|
||||
import org.apache.hadoop.hbase.TableExistsException;
|
||||
import org.apache.hadoop.hbase.errorhandling.ForeignException;
|
||||
import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher;
|
||||
import org.apache.hadoop.hbase.master.MasterServices;
|
||||
import org.apache.hadoop.hbase.master.MetricsSnapshot;
|
||||
import org.apache.hadoop.hbase.master.SnapshotSentinel;
|
||||
import org.apache.hadoop.hbase.master.handler.CreateTableHandler;
|
||||
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
|
||||
import org.apache.hadoop.hbase.monitoring.TaskMonitor;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
|
||||
import org.apache.hadoop.hbase.snapshot.ClientSnapshotDescriptionUtils;
|
||||
import org.apache.hadoop.hbase.snapshot.RestoreSnapshotException;
|
||||
import org.apache.hadoop.hbase.snapshot.RestoreSnapshotHelper;
|
||||
import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
|
||||
import org.apache.hadoop.hbase.snapshot.SnapshotManifest;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
|
||||
/**
|
||||
* Handler to Clone a snapshot.
|
||||
*
|
||||
* <p>Uses {@link RestoreSnapshotHelper} to create a new table with the same
|
||||
* content of the specified snapshot.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public class CloneSnapshotHandler extends CreateTableHandler implements SnapshotSentinel {
|
||||
private static final Log LOG = LogFactory.getLog(CloneSnapshotHandler.class);
|
||||
|
||||
private final static String NAME = "Master CloneSnapshotHandler";
|
||||
|
||||
private final SnapshotDescription snapshot;
|
||||
|
||||
private final ForeignExceptionDispatcher monitor;
|
||||
private final MetricsSnapshot metricsSnapshot = new MetricsSnapshot();
|
||||
private final MonitoredTask status;
|
||||
|
||||
private RestoreSnapshotHelper.RestoreMetaChanges metaChanges;
|
||||
|
||||
private volatile boolean stopped = false;
|
||||
|
||||
public CloneSnapshotHandler(final MasterServices masterServices,
|
||||
final SnapshotDescription snapshot, final HTableDescriptor hTableDescriptor) {
|
||||
super(masterServices, masterServices.getMasterFileSystem(), hTableDescriptor,
|
||||
masterServices.getConfiguration(), null, masterServices);
|
||||
|
||||
// Snapshot information
|
||||
this.snapshot = snapshot;
|
||||
|
||||
// Monitor
|
||||
this.monitor = new ForeignExceptionDispatcher();
|
||||
this.status = TaskMonitor.get().createStatus("Cloning snapshot '" + snapshot.getName() +
|
||||
"' to table " + hTableDescriptor.getTableName());
|
||||
}
|
||||
|
||||
@Override
|
||||
public CloneSnapshotHandler prepare() throws NotAllMetaRegionsOnlineException,
|
||||
TableExistsException, IOException {
|
||||
return (CloneSnapshotHandler) super.prepare();
|
||||
}
|
||||
|
||||
/**
|
||||
* Create the on-disk regions, using the tableRootDir provided by the CreateTableHandler.
|
||||
* The cloned table will be created in a temp directory, and then the CreateTableHandler
|
||||
* will be responsible to add the regions returned by this method to hbase:meta and do the assignment.
|
||||
*/
|
||||
@Override
|
||||
protected List<HRegionInfo> handleCreateHdfsRegions(final Path tableRootDir,
|
||||
final TableName tableName) throws IOException {
|
||||
status.setStatus("Creating regions for table: " + tableName);
|
||||
FileSystem fs = fileSystemManager.getFileSystem();
|
||||
Path rootDir = fileSystemManager.getRootDir();
|
||||
|
||||
try {
|
||||
// 1. Execute the on-disk Clone
|
||||
Path snapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(snapshot, rootDir);
|
||||
SnapshotManifest manifest = SnapshotManifest.open(conf, fs, snapshotDir, snapshot);
|
||||
RestoreSnapshotHelper restoreHelper = new RestoreSnapshotHelper(conf, fs,
|
||||
manifest, hTableDescriptor, tableRootDir, monitor, status);
|
||||
metaChanges = restoreHelper.restoreHdfsRegions();
|
||||
|
||||
// Clone operation should not have stuff to restore or remove
|
||||
Preconditions.checkArgument(!metaChanges.hasRegionsToRestore(),
|
||||
"A clone should not have regions to restore");
|
||||
Preconditions.checkArgument(!metaChanges.hasRegionsToRemove(),
|
||||
"A clone should not have regions to remove");
|
||||
|
||||
// At this point the clone is complete. Next step is enabling the table.
|
||||
String msg = "Clone snapshot="+ snapshot.getName() +" on table=" + tableName + " completed!";
|
||||
LOG.info(msg);
|
||||
status.setStatus(msg + " Waiting for table to be enabled...");
|
||||
|
||||
// 2. let the CreateTableHandler add the regions to meta
|
||||
return metaChanges.getRegionsToAdd();
|
||||
} catch (Exception e) {
|
||||
String msg = "clone snapshot=" + ClientSnapshotDescriptionUtils.toString(snapshot) +
|
||||
" failed because " + e.getMessage();
|
||||
LOG.error(msg, e);
|
||||
IOException rse = new RestoreSnapshotException(msg, e, snapshot);
|
||||
|
||||
// these handlers aren't futures so we need to register the error here.
|
||||
this.monitor.receive(new ForeignException(NAME, rse));
|
||||
throw rse;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void addRegionsToMeta(final List<HRegionInfo> regionInfos,
|
||||
int regionReplication)
|
||||
throws IOException {
|
||||
super.addRegionsToMeta(regionInfos, regionReplication);
|
||||
metaChanges.updateMetaParentRegions(this.server.getConnection(), regionInfos);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void completed(final Throwable exception) {
|
||||
this.stopped = true;
|
||||
if (exception != null) {
|
||||
status.abort("Snapshot '" + snapshot.getName() + "' clone failed because " +
|
||||
exception.getMessage());
|
||||
} else {
|
||||
status.markComplete("Snapshot '"+ snapshot.getName() +"' clone completed and table enabled!");
|
||||
}
|
||||
metricsSnapshot.addSnapshotClone(status.getCompletionTimestamp() - status.getStartTime());
|
||||
super.completed(exception);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isFinished() {
|
||||
return this.stopped;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getCompletionTimestamp() {
|
||||
return this.status.getCompletionTimestamp();
|
||||
}
|
||||
|
||||
@Override
|
||||
public SnapshotDescription getSnapshot() {
|
||||
return snapshot;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void cancel(String why) {
|
||||
if (this.stopped) return;
|
||||
this.stopped = true;
|
||||
String msg = "Stopping clone snapshot=" + snapshot + " because: " + why;
|
||||
LOG.info(msg);
|
||||
status.abort(msg);
|
||||
this.monitor.receive(new ForeignException(NAME, new CancellationException(why)));
|
||||
}
|
||||
|
||||
@Override
|
||||
public ForeignException getExceptionIfFailed() {
|
||||
return this.monitor.getException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void rethrowExceptionIfFailed() throws ForeignException {
|
||||
monitor.rethrowException();
|
||||
}
|
||||
}
|
|
@ -1,245 +0,0 @@
|
|||
/**
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.hbase.master.snapshot;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.CancellationException;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.HRegionInfo;
|
||||
import org.apache.hadoop.hbase.HTableDescriptor;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.MetaTableAccessor;
|
||||
import org.apache.hadoop.hbase.client.Connection;
|
||||
import org.apache.hadoop.hbase.errorhandling.ForeignException;
|
||||
import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher;
|
||||
import org.apache.hadoop.hbase.executor.EventType;
|
||||
import org.apache.hadoop.hbase.master.AssignmentManager;
|
||||
import org.apache.hadoop.hbase.master.MasterFileSystem;
|
||||
import org.apache.hadoop.hbase.master.MasterServices;
|
||||
import org.apache.hadoop.hbase.master.MetricsSnapshot;
|
||||
import org.apache.hadoop.hbase.master.RegionStates;
|
||||
import org.apache.hadoop.hbase.master.SnapshotSentinel;
|
||||
import org.apache.hadoop.hbase.master.handler.TableEventHandler;
|
||||
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
|
||||
import org.apache.hadoop.hbase.monitoring.TaskMonitor;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
|
||||
import org.apache.hadoop.hbase.snapshot.ClientSnapshotDescriptionUtils;
|
||||
import org.apache.hadoop.hbase.snapshot.RestoreSnapshotException;
|
||||
import org.apache.hadoop.hbase.snapshot.RestoreSnapshotHelper;
|
||||
import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
|
||||
import org.apache.hadoop.hbase.snapshot.SnapshotManifest;
|
||||
|
||||
/**
|
||||
* Handler to Restore a snapshot.
|
||||
*
|
||||
* <p>Uses {@link RestoreSnapshotHelper} to replace the table content with the
|
||||
* data available in the snapshot.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public class RestoreSnapshotHandler extends TableEventHandler implements SnapshotSentinel {
|
||||
private static final Log LOG = LogFactory.getLog(RestoreSnapshotHandler.class);
|
||||
|
||||
private final HTableDescriptor hTableDescriptor;
|
||||
private final SnapshotDescription snapshot;
|
||||
|
||||
private final ForeignExceptionDispatcher monitor;
|
||||
private final MetricsSnapshot metricsSnapshot = new MetricsSnapshot();
|
||||
private final MonitoredTask status;
|
||||
|
||||
private volatile boolean stopped = false;
|
||||
|
||||
public RestoreSnapshotHandler(final MasterServices masterServices,
|
||||
final SnapshotDescription snapshot, final HTableDescriptor htd) throws IOException {
|
||||
super(EventType.C_M_RESTORE_SNAPSHOT, htd.getTableName(), masterServices, masterServices);
|
||||
|
||||
// Snapshot information
|
||||
this.snapshot = snapshot;
|
||||
|
||||
// Monitor
|
||||
this.monitor = new ForeignExceptionDispatcher();
|
||||
|
||||
// Check table exists.
|
||||
getTableDescriptor();
|
||||
|
||||
// This is the new schema we are going to write out as this modification.
|
||||
this.hTableDescriptor = htd;
|
||||
|
||||
this.status = TaskMonitor.get().createStatus(
|
||||
"Restoring snapshot '" + snapshot.getName() + "' to table "
|
||||
+ hTableDescriptor.getTableName());
|
||||
}
|
||||
|
||||
@Override
|
||||
public RestoreSnapshotHandler prepare() throws IOException {
|
||||
return (RestoreSnapshotHandler) super.prepare();
|
||||
}
|
||||
|
||||
/**
|
||||
* The restore table is executed in place.
|
||||
* - The on-disk data will be restored - reference files are put in place without moving data
|
||||
* - [if something fail here: you need to delete the table and re-run the restore]
|
||||
* - hbase:meta will be updated
|
||||
* - [if something fail here: you need to run hbck to fix hbase:meta entries]
|
||||
* The passed in list gets changed in this method
|
||||
*/
|
||||
@Override
|
||||
protected void handleTableOperation(List<HRegionInfo> hris) throws IOException {
|
||||
MasterFileSystem fileSystemManager = masterServices.getMasterFileSystem();
|
||||
Connection conn = masterServices.getConnection();
|
||||
FileSystem fs = fileSystemManager.getFileSystem();
|
||||
Path rootDir = fileSystemManager.getRootDir();
|
||||
TableName tableName = hTableDescriptor.getTableName();
|
||||
|
||||
try {
|
||||
// 1. Update descriptor
|
||||
this.masterServices.getTableDescriptors().add(hTableDescriptor);
|
||||
|
||||
// 2. Execute the on-disk Restore
|
||||
LOG.debug("Starting restore snapshot=" + ClientSnapshotDescriptionUtils.toString(snapshot));
|
||||
Path snapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(snapshot, rootDir);
|
||||
SnapshotManifest manifest = SnapshotManifest.open(masterServices.getConfiguration(), fs,
|
||||
snapshotDir, snapshot);
|
||||
RestoreSnapshotHelper restoreHelper = new RestoreSnapshotHelper(
|
||||
masterServices.getConfiguration(), fs, manifest,
|
||||
this.hTableDescriptor, rootDir, monitor, status);
|
||||
RestoreSnapshotHelper.RestoreMetaChanges metaChanges = restoreHelper.restoreHdfsRegions();
|
||||
|
||||
// 3. Forces all the RegionStates to be offline
|
||||
//
|
||||
// The AssignmentManager keeps all the region states around
|
||||
// with no possibility to remove them, until the master is restarted.
|
||||
// This means that a region marked as SPLIT before the restore will never be assigned again.
|
||||
// To avoid having all states around all the regions are switched to the OFFLINE state,
|
||||
// which is the same state that the regions will be after a delete table.
|
||||
forceRegionsOffline(metaChanges);
|
||||
|
||||
// 4. Applies changes to hbase:meta
|
||||
status.setStatus("Preparing to restore each region");
|
||||
|
||||
// 4.1 Removes the current set of regions from META
|
||||
//
|
||||
// By removing also the regions to restore (the ones present both in the snapshot
|
||||
// and in the current state) we ensure that no extra fields are present in META
|
||||
// e.g. with a simple add addRegionToMeta() the splitA and splitB attributes
|
||||
// not overwritten/removed, so you end up with old informations
|
||||
// that are not correct after the restore.
|
||||
List<HRegionInfo> hrisToRemove = new LinkedList<HRegionInfo>();
|
||||
if (metaChanges.hasRegionsToRemove()) hrisToRemove.addAll(metaChanges.getRegionsToRemove());
|
||||
MetaTableAccessor.deleteRegions(conn, hrisToRemove);
|
||||
|
||||
// 4.2 Add the new set of regions to META
|
||||
//
|
||||
// At this point the old regions are no longer present in META.
|
||||
// and the set of regions present in the snapshot will be written to META.
|
||||
// All the information in hbase:meta are coming from the .regioninfo of each region present
|
||||
// in the snapshot folder.
|
||||
hris.clear();
|
||||
if (metaChanges.hasRegionsToAdd()) hris.addAll(metaChanges.getRegionsToAdd());
|
||||
MetaTableAccessor.addRegionsToMeta(conn, hris, hTableDescriptor.getRegionReplication());
|
||||
if (metaChanges.hasRegionsToRestore()) {
|
||||
MetaTableAccessor.overwriteRegions(conn, metaChanges.getRegionsToRestore(),
|
||||
hTableDescriptor.getRegionReplication());
|
||||
}
|
||||
metaChanges.updateMetaParentRegions(this.server.getConnection(), hris);
|
||||
|
||||
// At this point the restore is complete. Next step is enabling the table.
|
||||
LOG.info("Restore snapshot=" + ClientSnapshotDescriptionUtils.toString(snapshot) +
|
||||
" on table=" + tableName + " completed!");
|
||||
} catch (IOException e) {
|
||||
String msg = "restore snapshot=" + ClientSnapshotDescriptionUtils.toString(snapshot)
|
||||
+ " failed. Try re-running the restore command.";
|
||||
LOG.error(msg, e);
|
||||
monitor.receive(new ForeignException(masterServices.getServerName().toString(), e));
|
||||
throw new RestoreSnapshotException(msg, e);
|
||||
}
|
||||
}
|
||||
|
||||
private void forceRegionsOffline(final RestoreSnapshotHelper.RestoreMetaChanges metaChanges) {
|
||||
forceRegionsOffline(metaChanges.getRegionsToAdd());
|
||||
forceRegionsOffline(metaChanges.getRegionsToRestore());
|
||||
forceRegionsOffline(metaChanges.getRegionsToRemove());
|
||||
}
|
||||
|
||||
private void forceRegionsOffline(final List<HRegionInfo> hris) {
|
||||
AssignmentManager am = this.masterServices.getAssignmentManager();
|
||||
RegionStates states = am.getRegionStates();
|
||||
if (hris != null) {
|
||||
for (HRegionInfo hri: hris) {
|
||||
states.regionOffline(hri);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void completed(final Throwable exception) {
|
||||
this.stopped = true;
|
||||
if (exception != null) {
|
||||
status.abort("Restore snapshot '" + snapshot.getName() + "' failed because " +
|
||||
exception.getMessage());
|
||||
} else {
|
||||
status.markComplete("Restore snapshot '"+ snapshot.getName() +"'!");
|
||||
}
|
||||
metricsSnapshot.addSnapshotRestore(status.getCompletionTimestamp() - status.getStartTime());
|
||||
super.completed(exception);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isFinished() {
|
||||
return this.stopped;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getCompletionTimestamp() {
|
||||
return this.status.getCompletionTimestamp();
|
||||
}
|
||||
|
||||
@Override
|
||||
public SnapshotDescription getSnapshot() {
|
||||
return snapshot;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void cancel(String why) {
|
||||
if (this.stopped) return;
|
||||
this.stopped = true;
|
||||
String msg = "Stopping restore snapshot=" + ClientSnapshotDescriptionUtils.toString(snapshot)
|
||||
+ " because: " + why;
|
||||
LOG.info(msg);
|
||||
CancellationException ce = new CancellationException(why);
|
||||
this.monitor.receive(new ForeignException(masterServices.getServerName().toString(), ce));
|
||||
}
|
||||
|
||||
@Override
|
||||
public ForeignException getExceptionIfFailed() {
|
||||
return this.monitor.getException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void rethrowExceptionIfFailed() throws ForeignException {
|
||||
monitor.rethrowException();
|
||||
}
|
||||
}
|
|
@ -56,16 +56,19 @@ import org.apache.hadoop.hbase.master.MetricsMaster;
|
|||
import org.apache.hadoop.hbase.master.SnapshotSentinel;
|
||||
import org.apache.hadoop.hbase.master.cleaner.HFileCleaner;
|
||||
import org.apache.hadoop.hbase.master.cleaner.HFileLinkCleaner;
|
||||
import org.apache.hadoop.hbase.master.procedure.CloneSnapshotProcedure;
|
||||
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
|
||||
import org.apache.hadoop.hbase.master.procedure.RestoreSnapshotProcedure;
|
||||
import org.apache.hadoop.hbase.procedure.MasterProcedureManager;
|
||||
import org.apache.hadoop.hbase.procedure.Procedure;
|
||||
import org.apache.hadoop.hbase.procedure.ProcedureCoordinator;
|
||||
import org.apache.hadoop.hbase.procedure.ProcedureCoordinatorRpcs;
|
||||
import org.apache.hadoop.hbase.procedure.ZKProcedureCoordinatorRpcs;
|
||||
import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameStringPair;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ProcedureDescription;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription.Type;
|
||||
import org.apache.hadoop.hbase.quotas.QuotaExceededException;
|
||||
import org.apache.hadoop.hbase.security.AccessDeniedException;
|
||||
import org.apache.hadoop.hbase.security.User;
|
||||
import org.apache.hadoop.hbase.snapshot.ClientSnapshotDescriptionUtils;
|
||||
|
@ -145,12 +148,14 @@ public class SnapshotManager extends MasterProcedureManager implements Stoppable
|
|||
private Map<TableName, SnapshotSentinel> snapshotHandlers =
|
||||
new HashMap<TableName, SnapshotSentinel>();
|
||||
|
||||
// Restore Sentinels map, with table name as key.
|
||||
// Restore map, with table name as key, procedure ID as value.
|
||||
// The map is always accessed and modified under the object lock using synchronized.
|
||||
// restoreSnapshot()/cloneSnapshot() will insert an Handler in the table.
|
||||
// isRestoreDone() will remove the handler requested if the operation is finished.
|
||||
private Map<TableName, SnapshotSentinel> restoreHandlers =
|
||||
new HashMap<TableName, SnapshotSentinel>();
|
||||
// restoreSnapshot()/cloneSnapshot() will insert a procedure ID in the map.
|
||||
//
|
||||
// TODO: just as the Apache HBase 1.x implementation, this map would not survive master
|
||||
// restart/failover. This is just a stopgap implementation until implementation of taking
|
||||
// snapshot using Procedure-V2.
|
||||
private Map<TableName, Long> restoreTableToProcIdMap = new HashMap<TableName, Long>();
|
||||
|
||||
private Path rootDir;
|
||||
private ExecutorService executorService;
|
||||
|
@ -426,11 +431,9 @@ public class SnapshotManager extends MasterProcedureManager implements Stoppable
|
|||
|
||||
// make sure we aren't running a restore on the same table
|
||||
if (isRestoringTable(snapshotTable)) {
|
||||
SnapshotSentinel handler = restoreHandlers.get(snapshotTable);
|
||||
throw new SnapshotCreationException("Rejected taking "
|
||||
+ ClientSnapshotDescriptionUtils.toString(snapshot)
|
||||
+ " because we are already have a restore in progress on the same snapshot "
|
||||
+ ClientSnapshotDescriptionUtils.toString(handler.getSnapshot()), snapshot);
|
||||
+ " because we are already have a restore in progress on the same snapshot.");
|
||||
}
|
||||
|
||||
try {
|
||||
|
@ -646,15 +649,62 @@ public class SnapshotManager extends MasterProcedureManager implements Stoppable
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Clone the specified snapshot.
|
||||
* The clone will fail if the destination table has a snapshot or restore in progress.
|
||||
*
|
||||
* @param reqSnapshot Snapshot Descriptor from request
|
||||
* @param tableName table to clone
|
||||
* @param snapshot Snapshot Descriptor
|
||||
* @param snapshotTableDesc Table Descriptor
|
||||
* @param nonceGroup unique value to prevent duplicated RPC
|
||||
* @param nonce unique value to prevent duplicated RPC
|
||||
* @return procId the ID of the clone snapshot procedure
|
||||
* @throws IOException
|
||||
*/
|
||||
private long cloneSnapshot(
|
||||
final SnapshotDescription reqSnapshot,
|
||||
final TableName tableName,
|
||||
final SnapshotDescription snapshot,
|
||||
final HTableDescriptor snapshotTableDesc,
|
||||
final long nonceGroup,
|
||||
final long nonce) throws IOException {
|
||||
MasterCoprocessorHost cpHost = master.getMasterCoprocessorHost();
|
||||
HTableDescriptor htd = new HTableDescriptor(tableName, snapshotTableDesc);
|
||||
if (cpHost != null) {
|
||||
cpHost.preCloneSnapshot(reqSnapshot, htd);
|
||||
}
|
||||
long procId;
|
||||
try {
|
||||
procId = cloneSnapshot(snapshot, htd, nonceGroup, nonce);
|
||||
} catch (IOException e) {
|
||||
LOG.error("Exception occurred while cloning the snapshot " + snapshot.getName()
|
||||
+ " as table " + tableName.getNameAsString(), e);
|
||||
throw e;
|
||||
}
|
||||
LOG.info("Clone snapshot=" + snapshot.getName() + " as table=" + tableName);
|
||||
|
||||
if (cpHost != null) {
|
||||
cpHost.postCloneSnapshot(reqSnapshot, htd);
|
||||
}
|
||||
return procId;
|
||||
}
|
||||
|
||||
/**
|
||||
* Clone the specified snapshot into a new table.
|
||||
* The operation will fail if the destination table has a snapshot or restore in progress.
|
||||
*
|
||||
* @param snapshot Snapshot Descriptor
|
||||
* @param hTableDescriptor Table Descriptor of the table to create
|
||||
* @param nonceGroup unique value to prevent duplicated RPC
|
||||
* @param nonce unique value to prevent duplicated RPC
|
||||
* @return procId the ID of the clone snapshot procedure
|
||||
*/
|
||||
synchronized void cloneSnapshot(final SnapshotDescription snapshot,
|
||||
final HTableDescriptor hTableDescriptor) throws HBaseSnapshotException {
|
||||
synchronized long cloneSnapshot(
|
||||
final SnapshotDescription snapshot,
|
||||
final HTableDescriptor hTableDescriptor,
|
||||
final long nonceGroup,
|
||||
final long nonce) throws HBaseSnapshotException {
|
||||
TableName tableName = hTableDescriptor.getTableName();
|
||||
|
||||
// make sure we aren't running a snapshot on the same table
|
||||
|
@ -668,27 +718,34 @@ public class SnapshotManager extends MasterProcedureManager implements Stoppable
|
|||
}
|
||||
|
||||
try {
|
||||
CloneSnapshotHandler handler =
|
||||
new CloneSnapshotHandler(master, snapshot, hTableDescriptor).prepare();
|
||||
this.executorService.submit(handler);
|
||||
this.restoreHandlers.put(tableName, handler);
|
||||
long procId = master.getMasterProcedureExecutor().submitProcedure(
|
||||
new CloneSnapshotProcedure(
|
||||
master.getMasterProcedureExecutor().getEnvironment(), hTableDescriptor, snapshot),
|
||||
nonceGroup,
|
||||
nonce);
|
||||
this.restoreTableToProcIdMap.put(tableName, procId);
|
||||
return procId;
|
||||
} catch (Exception e) {
|
||||
String msg = "Couldn't clone the snapshot=" + ClientSnapshotDescriptionUtils.toString(snapshot) +
|
||||
" on table=" + tableName;
|
||||
String msg = "Couldn't clone the snapshot="
|
||||
+ ClientSnapshotDescriptionUtils.toString(snapshot) + " on table=" + tableName;
|
||||
LOG.error(msg, e);
|
||||
throw new RestoreSnapshotException(msg, e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Restore the specified snapshot
|
||||
* Restore or Clone the specified snapshot
|
||||
* @param reqSnapshot
|
||||
* @param nonceGroup unique value to prevent duplicated RPC
|
||||
* @param nonce unique value to prevent duplicated RPC
|
||||
* @throws IOException
|
||||
*/
|
||||
public void restoreSnapshot(SnapshotDescription reqSnapshot) throws IOException {
|
||||
public long restoreOrCloneSnapshot(
|
||||
SnapshotDescription reqSnapshot,
|
||||
final long nonceGroup,
|
||||
final long nonce) throws IOException {
|
||||
FileSystem fs = master.getMasterFileSystem().getFileSystem();
|
||||
Path snapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(reqSnapshot, rootDir);
|
||||
MasterCoprocessorHost cpHost = master.getMasterCoprocessorHost();
|
||||
|
||||
// check if the snapshot exists
|
||||
if (!fs.exists(snapshotDir)) {
|
||||
|
@ -712,109 +769,66 @@ public class SnapshotManager extends MasterProcedureManager implements Stoppable
|
|||
SnapshotReferenceUtil.verifySnapshot(master.getConfiguration(), fs, manifest);
|
||||
|
||||
// Execute the restore/clone operation
|
||||
long procId;
|
||||
if (MetaTableAccessor.tableExists(master.getConnection(), tableName)) {
|
||||
if (master.getTableStateManager().isTableState(
|
||||
TableName.valueOf(snapshot.getTable()), TableState.State.ENABLED)) {
|
||||
throw new UnsupportedOperationException("Table '" +
|
||||
TableName.valueOf(snapshot.getTable()) + "' must be disabled in order to " +
|
||||
"perform a restore operation" +
|
||||
".");
|
||||
}
|
||||
|
||||
// call coproc pre hook
|
||||
if (cpHost != null) {
|
||||
cpHost.preRestoreSnapshot(reqSnapshot, snapshotTableDesc);
|
||||
}
|
||||
|
||||
int tableRegionCount = -1;
|
||||
try {
|
||||
// Table already exist. Check and update the region quota for this table namespace.
|
||||
// The region quota may not be updated correctly if there are concurrent restore snapshot
|
||||
// requests for the same table
|
||||
|
||||
tableRegionCount = getRegionCountOfTable(tableName);
|
||||
int snapshotRegionCount = manifest.getRegionManifestsMap().size();
|
||||
|
||||
// Update region quota when snapshotRegionCount is larger. If we updated the region count
|
||||
// to a smaller value before retoreSnapshot and the retoreSnapshot fails, we may fail to
|
||||
// reset the region count to its original value if the region quota is consumed by other
|
||||
// tables in the namespace
|
||||
if (tableRegionCount > 0 && tableRegionCount < snapshotRegionCount) {
|
||||
checkAndUpdateNamespaceRegionQuota(snapshotRegionCount, tableName);
|
||||
}
|
||||
restoreSnapshot(snapshot, snapshotTableDesc);
|
||||
// Update the region quota if snapshotRegionCount is smaller. This step should not fail
|
||||
// because we have reserved enough region quota before hand
|
||||
if (tableRegionCount > 0 && tableRegionCount > snapshotRegionCount) {
|
||||
checkAndUpdateNamespaceRegionQuota(snapshotRegionCount, tableName);
|
||||
}
|
||||
} catch (QuotaExceededException e) {
|
||||
LOG.error("Region quota exceeded while restoring the snapshot " + snapshot.getName()
|
||||
+ " as table " + tableName.getNameAsString(), e);
|
||||
// If QEE is thrown before restoreSnapshot, quota information is not updated, so we
|
||||
// should throw the exception directly. If QEE is thrown after restoreSnapshot, there
|
||||
// must be unexpected reasons, we also throw the exception directly
|
||||
throw e;
|
||||
} catch (IOException e) {
|
||||
if (tableRegionCount > 0) {
|
||||
// reset the region count for table
|
||||
checkAndUpdateNamespaceRegionQuota(tableRegionCount, tableName);
|
||||
}
|
||||
LOG.error("Exception occurred while restoring the snapshot " + snapshot.getName()
|
||||
+ " as table " + tableName.getNameAsString(), e);
|
||||
throw e;
|
||||
}
|
||||
LOG.info("Restore snapshot=" + snapshot.getName() + " as table=" + tableName);
|
||||
|
||||
if (cpHost != null) {
|
||||
cpHost.postRestoreSnapshot(reqSnapshot, snapshotTableDesc);
|
||||
}
|
||||
procId = restoreSnapshot(
|
||||
reqSnapshot, tableName, snapshot, snapshotTableDesc, nonceGroup, nonce);
|
||||
} else {
|
||||
HTableDescriptor htd = new HTableDescriptor(tableName, snapshotTableDesc);
|
||||
if (cpHost != null) {
|
||||
cpHost.preCloneSnapshot(reqSnapshot, htd);
|
||||
}
|
||||
try {
|
||||
checkAndUpdateNamespaceQuota(manifest, tableName);
|
||||
cloneSnapshot(snapshot, htd);
|
||||
} catch (IOException e) {
|
||||
this.master.getMasterQuotaManager().removeTableFromNamespaceQuota(tableName);
|
||||
LOG.error("Exception occurred while cloning the snapshot " + snapshot.getName()
|
||||
+ " as table " + tableName.getNameAsString(), e);
|
||||
throw e;
|
||||
}
|
||||
LOG.info("Clone snapshot=" + snapshot.getName() + " as table=" + tableName);
|
||||
|
||||
if (cpHost != null) {
|
||||
cpHost.postCloneSnapshot(reqSnapshot, htd);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void checkAndUpdateNamespaceQuota(SnapshotManifest manifest, TableName tableName)
|
||||
throws IOException {
|
||||
if (this.master.getMasterQuotaManager().isQuotaEnabled()) {
|
||||
this.master.getMasterQuotaManager().checkNamespaceTableAndRegionQuota(tableName,
|
||||
manifest.getRegionManifestsMap().size());
|
||||
}
|
||||
}
|
||||
|
||||
private void checkAndUpdateNamespaceRegionQuota(int updatedRegionCount, TableName tableName)
|
||||
throws IOException {
|
||||
if (this.master.getMasterQuotaManager().isQuotaEnabled()) {
|
||||
this.master.getMasterQuotaManager().checkAndUpdateNamespaceRegionQuota(tableName,
|
||||
updatedRegionCount);
|
||||
procId = cloneSnapshot(
|
||||
reqSnapshot, tableName, snapshot, snapshotTableDesc, nonceGroup, nonce);
|
||||
}
|
||||
return procId;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return cached region count, or -1 if quota manager is disabled or table status not found
|
||||
*/
|
||||
private int getRegionCountOfTable(TableName tableName) throws IOException {
|
||||
if (this.master.getMasterQuotaManager().isQuotaEnabled()) {
|
||||
return this.master.getMasterQuotaManager().getRegionCountOfTable(tableName);
|
||||
* Restore the specified snapshot.
|
||||
* The restore will fail if the destination table has a snapshot or restore in progress.
|
||||
*
|
||||
* @param reqSnapshot Snapshot Descriptor from request
|
||||
* @param tableName table to restore
|
||||
* @param snapshot Snapshot Descriptor
|
||||
* @param snapshotTableDesc Table Descriptor
|
||||
* @param nonceGroup unique value to prevent duplicated RPC
|
||||
* @param nonce unique value to prevent duplicated RPC
|
||||
* @return procId the ID of the restore snapshot procedure
|
||||
* @throws IOException
|
||||
*/
|
||||
private long restoreSnapshot(
|
||||
final SnapshotDescription reqSnapshot,
|
||||
final TableName tableName,
|
||||
final SnapshotDescription snapshot,
|
||||
final HTableDescriptor snapshotTableDesc,
|
||||
final long nonceGroup,
|
||||
final long nonce) throws IOException {
|
||||
MasterCoprocessorHost cpHost = master.getMasterCoprocessorHost();
|
||||
|
||||
if (master.getTableStateManager().isTableState(
|
||||
TableName.valueOf(snapshot.getTable()), TableState.State.ENABLED)) {
|
||||
throw new UnsupportedOperationException("Table '" +
|
||||
TableName.valueOf(snapshot.getTable()) + "' must be disabled in order to " +
|
||||
"perform a restore operation.");
|
||||
}
|
||||
return -1;
|
||||
|
||||
// call Coprocessor pre hook
|
||||
if (cpHost != null) {
|
||||
cpHost.preRestoreSnapshot(reqSnapshot, snapshotTableDesc);
|
||||
}
|
||||
|
||||
long procId;
|
||||
try {
|
||||
procId = restoreSnapshot(snapshot, snapshotTableDesc, nonceGroup, nonce);
|
||||
} catch (IOException e) {
|
||||
LOG.error("Exception occurred while restoring the snapshot " + snapshot.getName()
|
||||
+ " as table " + tableName.getNameAsString(), e);
|
||||
throw e;
|
||||
}
|
||||
LOG.info("Restore snapshot=" + snapshot.getName() + " as table=" + tableName);
|
||||
|
||||
if (cpHost != null) {
|
||||
cpHost.postRestoreSnapshot(reqSnapshot, snapshotTableDesc);
|
||||
}
|
||||
|
||||
return procId;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -823,9 +837,15 @@ public class SnapshotManager extends MasterProcedureManager implements Stoppable
|
|||
*
|
||||
* @param snapshot Snapshot Descriptor
|
||||
* @param hTableDescriptor Table Descriptor
|
||||
* @param nonceGroup unique value to prevent duplicated RPC
|
||||
* @param nonce unique value to prevent duplicated RPC
|
||||
* @return procId the ID of the restore snapshot procedure
|
||||
*/
|
||||
private synchronized void restoreSnapshot(final SnapshotDescription snapshot,
|
||||
final HTableDescriptor hTableDescriptor) throws HBaseSnapshotException {
|
||||
private synchronized long restoreSnapshot(
|
||||
final SnapshotDescription snapshot,
|
||||
final HTableDescriptor hTableDescriptor,
|
||||
final long nonceGroup,
|
||||
final long nonce) throws HBaseSnapshotException {
|
||||
TableName tableName = hTableDescriptor.getTableName();
|
||||
|
||||
// make sure we aren't running a snapshot on the same table
|
||||
|
@ -839,10 +859,13 @@ public class SnapshotManager extends MasterProcedureManager implements Stoppable
|
|||
}
|
||||
|
||||
try {
|
||||
RestoreSnapshotHandler handler =
|
||||
new RestoreSnapshotHandler(master, snapshot, hTableDescriptor).prepare();
|
||||
this.executorService.submit(handler);
|
||||
restoreHandlers.put(tableName, handler);
|
||||
long procId = master.getMasterProcedureExecutor().submitProcedure(
|
||||
new RestoreSnapshotProcedure(
|
||||
master.getMasterProcedureExecutor().getEnvironment(), hTableDescriptor, snapshot),
|
||||
nonceGroup,
|
||||
nonce);
|
||||
this.restoreTableToProcIdMap.put(tableName, procId);
|
||||
return procId;
|
||||
} catch (Exception e) {
|
||||
String msg = "Couldn't restore the snapshot=" + ClientSnapshotDescriptionUtils.toString(
|
||||
snapshot) +
|
||||
|
@ -859,50 +882,18 @@ public class SnapshotManager extends MasterProcedureManager implements Stoppable
|
|||
* @return <tt>true</tt> if there is a restore in progress of the specified table.
|
||||
*/
|
||||
private synchronized boolean isRestoringTable(final TableName tableName) {
|
||||
SnapshotSentinel sentinel = this.restoreHandlers.get(tableName);
|
||||
return(sentinel != null && !sentinel.isFinished());
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the status of a restore operation.
|
||||
* If the in-progress restore is failed throws the exception that caused the failure.
|
||||
*
|
||||
* @param snapshot
|
||||
* @return false if in progress, true if restore is completed or not requested.
|
||||
* @throws IOException if there was a failure during the restore
|
||||
*/
|
||||
public boolean isRestoreDone(final SnapshotDescription snapshot) throws IOException {
|
||||
// check to see if the sentinel exists,
|
||||
// and if the task is complete removes it from the in-progress restore map.
|
||||
SnapshotSentinel sentinel = removeSentinelIfFinished(this.restoreHandlers, snapshot);
|
||||
|
||||
// stop tracking "abandoned" handlers
|
||||
cleanupSentinels();
|
||||
|
||||
if (sentinel == null) {
|
||||
// there is no sentinel so restore is not in progress.
|
||||
Long procId = this.restoreTableToProcIdMap.get(tableName);
|
||||
if (procId == null) {
|
||||
return false;
|
||||
}
|
||||
ProcedureExecutor<MasterProcedureEnv> procExec = master.getMasterProcedureExecutor();
|
||||
if (procExec.isRunning() && !procExec.isFinished(procId)) {
|
||||
return true;
|
||||
} else {
|
||||
this.restoreTableToProcIdMap.remove(tableName);
|
||||
return false;
|
||||
}
|
||||
|
||||
LOG.debug("Verify snapshot=" + snapshot.getName() + " against="
|
||||
+ sentinel.getSnapshot().getName() + " table=" +
|
||||
TableName.valueOf(snapshot.getTable()));
|
||||
|
||||
// If the restore is failed, rethrow the exception
|
||||
sentinel.rethrowExceptionIfFailed();
|
||||
|
||||
// check to see if we are done
|
||||
if (sentinel.isFinished()) {
|
||||
LOG.debug("Restore snapshot=" + ClientSnapshotDescriptionUtils.toString(snapshot) +
|
||||
" has completed. Notifying the client.");
|
||||
return true;
|
||||
}
|
||||
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Sentinel is not yet finished with restoring snapshot=" +
|
||||
ClientSnapshotDescriptionUtils.toString(snapshot));
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -947,7 +938,7 @@ public class SnapshotManager extends MasterProcedureManager implements Stoppable
|
|||
*/
|
||||
private void cleanupSentinels() {
|
||||
cleanupSentinels(this.snapshotHandlers);
|
||||
cleanupSentinels(this.restoreHandlers);
|
||||
cleanupCompletedRestoreInMap();
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -970,6 +961,21 @@ public class SnapshotManager extends MasterProcedureManager implements Stoppable
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Remove the procedures that are marked as finished
|
||||
*/
|
||||
private synchronized void cleanupCompletedRestoreInMap() {
|
||||
ProcedureExecutor<MasterProcedureEnv> procExec = master.getMasterProcedureExecutor();
|
||||
Iterator<Map.Entry<TableName, Long>> it = restoreTableToProcIdMap.entrySet().iterator();
|
||||
while (it.hasNext()) {
|
||||
Map.Entry<TableName, Long> entry = it.next();
|
||||
Long procId = entry.getValue();
|
||||
if (procExec.isRunning() && procExec.isFinished(procId)) {
|
||||
it.remove();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
//
|
||||
// Implementing Stoppable interface
|
||||
//
|
||||
|
@ -985,10 +991,6 @@ public class SnapshotManager extends MasterProcedureManager implements Stoppable
|
|||
snapshotHandler.cancel(why);
|
||||
}
|
||||
|
||||
// pass the stop onto all the restore handlers
|
||||
for (SnapshotSentinel restoreHandler: this.restoreHandlers.values()) {
|
||||
restoreHandler.cancel(why);
|
||||
}
|
||||
try {
|
||||
if (coordinator != null) {
|
||||
coordinator.close();
|
||||
|
|
|
@ -175,7 +175,7 @@ public class RestoreSnapshotHelper {
|
|||
}
|
||||
|
||||
private RestoreMetaChanges restoreHdfsRegions(final ThreadPoolExecutor exec) throws IOException {
|
||||
LOG.debug("starting restore");
|
||||
LOG.info("starting restore table regions using snapshot=" + snapshotDesc);
|
||||
|
||||
Map<String, SnapshotRegionManifest> regionManifests = snapshotManifest.getRegionManifestsMap();
|
||||
if (regionManifests == null) {
|
||||
|
@ -251,6 +251,8 @@ public class RestoreSnapshotHelper {
|
|||
status.setStatus("Finished cloning regions.");
|
||||
}
|
||||
|
||||
LOG.info("finishing restore table regions using snapshot=" + snapshotDesc);
|
||||
|
||||
return metaChanges;
|
||||
}
|
||||
|
||||
|
@ -265,7 +267,7 @@ public class RestoreSnapshotHelper {
|
|||
private List<HRegionInfo> regionsToRemove = null;
|
||||
private List<HRegionInfo> regionsToAdd = null;
|
||||
|
||||
RestoreMetaChanges(HTableDescriptor htd, Map<String, Pair<String, String> > parentsMap) {
|
||||
public RestoreMetaChanges(HTableDescriptor htd, Map<String, Pair<String, String> > parentsMap) {
|
||||
this.parentsMap = parentsMap;
|
||||
this.htd = htd;
|
||||
}
|
||||
|
@ -274,6 +276,14 @@ public class RestoreSnapshotHelper {
|
|||
return htd;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the map of parent-children_pair.
|
||||
* @return the map
|
||||
*/
|
||||
public Map<String, Pair<String, String>> getParentToChildrenPairMap() {
|
||||
return this.parentsMap;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return true if there're new regions
|
||||
*/
|
||||
|
|
|
@ -44,7 +44,6 @@ import org.junit.Before;
|
|||
import org.junit.BeforeClass;
|
||||
import org.junit.Ignore;
|
||||
import org.junit.Test;
|
||||
import org.junit.Ignore;
|
||||
import org.junit.experimental.categories.Category;
|
||||
|
||||
/**
|
||||
|
|
|
@ -129,8 +129,8 @@ public class MasterProcedureTestingUtility {
|
|||
assertEquals(family.length, htd.getFamilies().size());
|
||||
}
|
||||
|
||||
public static void validateTableDeletion(final HMaster master, final TableName tableName,
|
||||
final HRegionInfo[] regions, String... family) throws IOException {
|
||||
public static void validateTableDeletion(
|
||||
final HMaster master, final TableName tableName) throws IOException {
|
||||
// check filesystem
|
||||
final FileSystem fs = master.getMasterFileSystem().getFileSystem();
|
||||
final Path tableDir = FSUtils.getTableDir(master.getMasterFileSystem().getRootDir(), tableName);
|
||||
|
|
|
@ -0,0 +1,239 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.hbase.master.procedure;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.HColumnDescriptor;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.HTableDescriptor;
|
||||
import org.apache.hadoop.hbase.ProcedureInfo;
|
||||
import org.apache.hadoop.hbase.TableExistsException;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.client.Admin;
|
||||
import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
|
||||
import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.MasterProcedureProtos.CloneSnapshotState;
|
||||
import org.apache.hadoop.hbase.snapshot.SnapshotTestingUtils;
|
||||
import org.apache.hadoop.hbase.testclassification.MasterTests;
|
||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.junit.After;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.Before;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
@Category({MasterTests.class, MediumTests.class})
|
||||
public class TestCloneSnapshotProcedure {
|
||||
private static final Log LOG = LogFactory.getLog(TestCloneSnapshotProcedure.class);
|
||||
|
||||
protected static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
|
||||
|
||||
protected final byte[] CF = Bytes.toBytes("cf1");
|
||||
|
||||
private static long nonceGroup = HConstants.NO_NONCE;
|
||||
private static long nonce = HConstants.NO_NONCE;
|
||||
|
||||
private static SnapshotDescription snapshot = null;
|
||||
|
||||
private static void setupConf(Configuration conf) {
|
||||
conf.setInt(MasterProcedureConstants.MASTER_PROCEDURE_THREADS, 1);
|
||||
}
|
||||
|
||||
@BeforeClass
|
||||
public static void setupCluster() throws Exception {
|
||||
setupConf(UTIL.getConfiguration());
|
||||
UTIL.startMiniCluster(1);
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void cleanupTest() throws Exception {
|
||||
try {
|
||||
UTIL.shutdownMiniCluster();
|
||||
} catch (Exception e) {
|
||||
LOG.warn("failure shutting down cluster", e);
|
||||
}
|
||||
}
|
||||
|
||||
@Before
|
||||
public void setup() throws Exception {
|
||||
resetProcExecutorTestingKillFlag();
|
||||
nonceGroup =
|
||||
MasterProcedureTestingUtility.generateNonceGroup(UTIL.getHBaseCluster().getMaster());
|
||||
nonce = MasterProcedureTestingUtility.generateNonce(UTIL.getHBaseCluster().getMaster());
|
||||
}
|
||||
|
||||
@After
|
||||
public void tearDown() throws Exception {
|
||||
resetProcExecutorTestingKillFlag();
|
||||
}
|
||||
|
||||
private void resetProcExecutorTestingKillFlag() {
|
||||
final ProcedureExecutor<MasterProcedureEnv> procExec = getMasterProcedureExecutor();
|
||||
ProcedureTestingUtility.setKillAndToggleBeforeStoreUpdate(procExec, false);
|
||||
assertTrue("expected executor to be running", procExec.isRunning());
|
||||
}
|
||||
|
||||
private SnapshotDescription getSnapshot() throws Exception {
|
||||
if (snapshot == null) {
|
||||
final TableName snapshotTableName = TableName.valueOf("testCloneSnapshot");
|
||||
long tid = System.currentTimeMillis();
|
||||
final byte[] snapshotName = Bytes.toBytes("snapshot-" + tid);
|
||||
|
||||
Admin admin = UTIL.getHBaseAdmin();
|
||||
// create Table
|
||||
SnapshotTestingUtils.createTable(UTIL, snapshotTableName, getNumReplicas(), CF);
|
||||
// Load data
|
||||
SnapshotTestingUtils.loadData(UTIL, snapshotTableName, 500, CF);
|
||||
admin.disableTable(snapshotTableName);
|
||||
// take a snapshot
|
||||
admin.snapshot(snapshotName, snapshotTableName);
|
||||
admin.enableTable(snapshotTableName);
|
||||
|
||||
List<SnapshotDescription> snapshotList = admin.listSnapshots();
|
||||
snapshot = snapshotList.get(0);
|
||||
}
|
||||
return snapshot;
|
||||
}
|
||||
|
||||
private int getNumReplicas() {
|
||||
return 1;
|
||||
}
|
||||
|
||||
public static HTableDescriptor createHTableDescriptor(
|
||||
final TableName tableName, final byte[] ... family) {
|
||||
HTableDescriptor htd = new HTableDescriptor(tableName);
|
||||
for (int i = 0; i < family.length; ++i) {
|
||||
htd.addFamily(new HColumnDescriptor(family[i]));
|
||||
}
|
||||
return htd;
|
||||
}
|
||||
|
||||
@Test(timeout=60000)
|
||||
public void testCloneSnapshot() throws Exception {
|
||||
final ProcedureExecutor<MasterProcedureEnv> procExec = getMasterProcedureExecutor();
|
||||
final TableName clonedTableName = TableName.valueOf("testCloneSnapshot2");
|
||||
final HTableDescriptor htd = createHTableDescriptor(clonedTableName, CF);
|
||||
|
||||
long procId = ProcedureTestingUtility.submitAndWait(
|
||||
procExec, new CloneSnapshotProcedure(procExec.getEnvironment(), htd, getSnapshot()));
|
||||
ProcedureTestingUtility.assertProcNotFailed(procExec.getResult(procId));
|
||||
MasterProcedureTestingUtility.validateTableIsEnabled(
|
||||
UTIL.getHBaseCluster().getMaster(),
|
||||
clonedTableName);
|
||||
}
|
||||
|
||||
@Test(timeout = 60000)
|
||||
public void testCloneSnapshotTwiceWithSameNonce() throws Exception {
|
||||
final ProcedureExecutor<MasterProcedureEnv> procExec = getMasterProcedureExecutor();
|
||||
final TableName clonedTableName = TableName.valueOf("testCloneSnapshotTwiceWithSameNonce");
|
||||
final HTableDescriptor htd = createHTableDescriptor(clonedTableName, CF);
|
||||
|
||||
long procId1 = procExec.submitProcedure(
|
||||
new CloneSnapshotProcedure(procExec.getEnvironment(), htd, getSnapshot()), nonceGroup, nonce);
|
||||
long procId2 = procExec.submitProcedure(
|
||||
new CloneSnapshotProcedure(procExec.getEnvironment(), htd, getSnapshot()), nonceGroup, nonce);
|
||||
|
||||
// Wait the completion
|
||||
ProcedureTestingUtility.waitProcedure(procExec, procId1);
|
||||
ProcedureTestingUtility.assertProcNotFailed(procExec, procId1);
|
||||
// The second proc should succeed too - because it is the same proc.
|
||||
ProcedureTestingUtility.waitProcedure(procExec, procId2);
|
||||
ProcedureTestingUtility.assertProcNotFailed(procExec, procId2);
|
||||
assertTrue(procId1 == procId2);
|
||||
}
|
||||
|
||||
@Test(timeout=60000)
|
||||
public void testCloneSnapshotToSameTable() throws Exception {
|
||||
final ProcedureExecutor<MasterProcedureEnv> procExec = getMasterProcedureExecutor();
|
||||
final TableName clonedTableName = TableName.valueOf(getSnapshot().getTable());
|
||||
final HTableDescriptor htd = createHTableDescriptor(clonedTableName, CF);
|
||||
|
||||
long procId = ProcedureTestingUtility.submitAndWait(
|
||||
procExec, new CloneSnapshotProcedure(procExec.getEnvironment(), htd, getSnapshot()));
|
||||
ProcedureInfo result = procExec.getResult(procId);
|
||||
assertTrue(result.isFailed());
|
||||
LOG.debug("Clone snapshot failed with exception: " + result.getExceptionFullMessage());
|
||||
assertTrue(
|
||||
ProcedureTestingUtility.getExceptionCause(result) instanceof TableExistsException);
|
||||
}
|
||||
|
||||
@Test(timeout=60000)
|
||||
public void testRecoveryAndDoubleExecution() throws Exception {
|
||||
final ProcedureExecutor<MasterProcedureEnv> procExec = getMasterProcedureExecutor();
|
||||
final TableName clonedTableName = TableName.valueOf("testRecoveryAndDoubleExecution");
|
||||
final HTableDescriptor htd = createHTableDescriptor(clonedTableName, CF);
|
||||
|
||||
ProcedureTestingUtility.setKillAndToggleBeforeStoreUpdate(procExec, true);
|
||||
|
||||
// Start the Clone snapshot procedure && kill the executor
|
||||
long procId = procExec.submitProcedure(
|
||||
new CloneSnapshotProcedure(procExec.getEnvironment(), htd, getSnapshot()), nonceGroup, nonce);
|
||||
|
||||
// Restart the executor and execute the step twice
|
||||
int numberOfSteps = CloneSnapshotState.values().length;
|
||||
MasterProcedureTestingUtility.testRecoveryAndDoubleExecution(
|
||||
procExec,
|
||||
procId,
|
||||
numberOfSteps,
|
||||
CloneSnapshotState.values());
|
||||
|
||||
MasterProcedureTestingUtility.validateTableIsEnabled(
|
||||
UTIL.getHBaseCluster().getMaster(),
|
||||
clonedTableName);
|
||||
}
|
||||
|
||||
@Test(timeout = 60000)
|
||||
public void testRollbackAndDoubleExecution() throws Exception {
|
||||
final ProcedureExecutor<MasterProcedureEnv> procExec = getMasterProcedureExecutor();
|
||||
final TableName clonedTableName = TableName.valueOf("testRollbackAndDoubleExecution");
|
||||
final HTableDescriptor htd = createHTableDescriptor(clonedTableName, CF);
|
||||
|
||||
ProcedureTestingUtility.waitNoProcedureRunning(procExec);
|
||||
ProcedureTestingUtility.setKillAndToggleBeforeStoreUpdate(procExec, true);
|
||||
|
||||
// Start the Clone snapshot procedure && kill the executor
|
||||
long procId = procExec.submitProcedure(
|
||||
new CloneSnapshotProcedure(procExec.getEnvironment(), htd, getSnapshot()), nonceGroup, nonce);
|
||||
|
||||
int numberOfSteps = CloneSnapshotState.values().length - 2; // failing in the middle of proc
|
||||
MasterProcedureTestingUtility.testRollbackAndDoubleExecution(
|
||||
procExec,
|
||||
procId,
|
||||
numberOfSteps,
|
||||
CloneSnapshotState.values());
|
||||
|
||||
MasterProcedureTestingUtility.validateTableDeletion(
|
||||
UTIL.getHBaseCluster().getMaster(), clonedTableName);
|
||||
|
||||
}
|
||||
|
||||
private ProcedureExecutor<MasterProcedureEnv> getMasterProcedureExecutor() {
|
||||
return UTIL.getHBaseCluster().getMaster().getMasterProcedureExecutor();
|
||||
}
|
||||
}
|
|
@ -253,7 +253,7 @@ public class TestCreateTableProcedure {
|
|||
procExec, procId, 4, CreateTableState.values());
|
||||
|
||||
MasterProcedureTestingUtility.validateTableDeletion(
|
||||
UTIL.getHBaseCluster().getMaster(), tableName, regions, "f1", "f2");
|
||||
UTIL.getHBaseCluster().getMaster(), tableName);
|
||||
|
||||
// are we able to create the table after a rollback?
|
||||
resetProcExecutorTestingKillFlag();
|
||||
|
@ -310,7 +310,7 @@ public class TestCreateTableProcedure {
|
|||
procExec, procId, 4, CreateTableState.values());
|
||||
TableName tableName = htd.getTableName();
|
||||
MasterProcedureTestingUtility.validateTableDeletion(
|
||||
UTIL.getHBaseCluster().getMaster(), tableName, regions, "f1", "f2");
|
||||
UTIL.getHBaseCluster().getMaster(), tableName);
|
||||
|
||||
// are we able to create the table after a rollback?
|
||||
resetProcExecutorTestingKillFlag();
|
||||
|
|
|
@ -139,7 +139,7 @@ public class TestDeleteTableProcedure {
|
|||
// First delete should succeed
|
||||
ProcedureTestingUtility.assertProcNotFailed(procExec, procId1);
|
||||
MasterProcedureTestingUtility.validateTableDeletion(
|
||||
UTIL.getHBaseCluster().getMaster(), tableName, regions, "f");
|
||||
UTIL.getHBaseCluster().getMaster(), tableName);
|
||||
|
||||
// Second delete should fail with TableNotFound
|
||||
ProcedureInfo result = procExec.getResult(procId2);
|
||||
|
@ -171,7 +171,7 @@ public class TestDeleteTableProcedure {
|
|||
// First delete should succeed
|
||||
ProcedureTestingUtility.assertProcNotFailed(procExec, procId1);
|
||||
MasterProcedureTestingUtility.validateTableDeletion(
|
||||
UTIL.getHBaseCluster().getMaster(), tableName, regions, "f");
|
||||
UTIL.getHBaseCluster().getMaster(), tableName);
|
||||
|
||||
// Second delete should not fail, because it is the same delete
|
||||
ProcedureTestingUtility.assertProcNotFailed(procExec, procId2);
|
||||
|
@ -205,7 +205,7 @@ public class TestDeleteTableProcedure {
|
|||
new DeleteTableProcedure(procExec.getEnvironment(), tableName));
|
||||
ProcedureTestingUtility.assertProcNotFailed(procExec, procId);
|
||||
MasterProcedureTestingUtility.validateTableDeletion(
|
||||
UTIL.getHBaseCluster().getMaster(), tableName, regions, "f1", "f2");
|
||||
UTIL.getHBaseCluster().getMaster(), tableName);
|
||||
}
|
||||
|
||||
@Test(timeout=60000)
|
||||
|
@ -233,7 +233,7 @@ public class TestDeleteTableProcedure {
|
|||
procExec, procId, 6, DeleteTableState.values());
|
||||
|
||||
MasterProcedureTestingUtility.validateTableDeletion(
|
||||
UTIL.getHBaseCluster().getMaster(), tableName, regions, "f1", "f2");
|
||||
UTIL.getHBaseCluster().getMaster(), tableName);
|
||||
}
|
||||
|
||||
private ProcedureExecutor<MasterProcedureEnv> getMasterProcedureExecutor() {
|
||||
|
|
|
@ -301,7 +301,7 @@ public class TestMasterFailoverWithProcedures {
|
|||
testRecoveryAndDoubleExecution(UTIL, procId, step, DeleteTableState.values());
|
||||
|
||||
MasterProcedureTestingUtility.validateTableDeletion(
|
||||
UTIL.getHBaseCluster().getMaster(), tableName, regions, "f1", "f2");
|
||||
UTIL.getHBaseCluster().getMaster(), tableName);
|
||||
}
|
||||
|
||||
// ==========================================================================
|
||||
|
|
|
@ -142,7 +142,7 @@ public class TestProcedureAdmin {
|
|||
ProcedureTestingUtility.assertProcNotFailed(procExec, procId);
|
||||
// Validate the delete table procedure was not aborted
|
||||
MasterProcedureTestingUtility.validateTableDeletion(
|
||||
UTIL.getHBaseCluster().getMaster(), tableName, regions, "f");
|
||||
UTIL.getHBaseCluster().getMaster(), tableName);
|
||||
}
|
||||
|
||||
@Test(timeout=60000)
|
||||
|
|
|
@ -0,0 +1,291 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.hbase.master.procedure;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.HColumnDescriptor;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.HTableDescriptor;
|
||||
import org.apache.hadoop.hbase.ProcedureInfo;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.TableNotDisabledException;
|
||||
import org.apache.hadoop.hbase.TableNotFoundException;
|
||||
import org.apache.hadoop.hbase.client.Admin;
|
||||
import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
|
||||
import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.MasterProcedureProtos.RestoreSnapshotState;
|
||||
import org.apache.hadoop.hbase.snapshot.SnapshotTestingUtils;
|
||||
import org.apache.hadoop.hbase.testclassification.MasterTests;
|
||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.junit.After;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.Before;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertNotEquals;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
@Category({MasterTests.class, MediumTests.class})
|
||||
public class TestRestoreSnapshotProcedure {
|
||||
private static final Log LOG = LogFactory.getLog(TestRestoreSnapshotProcedure.class);
|
||||
|
||||
protected static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
|
||||
|
||||
protected final TableName snapshotTableName = TableName.valueOf("testRestoreSnapshot");
|
||||
protected final byte[] CF1 = Bytes.toBytes("cf1");
|
||||
protected final byte[] CF2 = Bytes.toBytes("cf2");
|
||||
protected final byte[] CF3 = Bytes.toBytes("cf3");
|
||||
protected final byte[] CF4 = Bytes.toBytes("cf4");
|
||||
protected final int rowCountCF1 = 10;
|
||||
protected final int rowCountCF2 = 40;
|
||||
protected final int rowCountCF3 = 40;
|
||||
protected final int rowCountCF4 = 40;
|
||||
protected final int rowCountCF1addition = 10;
|
||||
|
||||
private static long nonceGroup = HConstants.NO_NONCE;
|
||||
private static long nonce = HConstants.NO_NONCE;
|
||||
|
||||
private SnapshotDescription snapshot = null;
|
||||
private HTableDescriptor snapshotHTD = null;
|
||||
|
||||
private static void setupConf(Configuration conf) {
|
||||
conf.setInt(MasterProcedureConstants.MASTER_PROCEDURE_THREADS, 1);
|
||||
}
|
||||
|
||||
@BeforeClass
|
||||
public static void setupCluster() throws Exception {
|
||||
setupConf(UTIL.getConfiguration());
|
||||
UTIL.startMiniCluster(1);
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void cleanupTest() throws Exception {
|
||||
try {
|
||||
UTIL.shutdownMiniCluster();
|
||||
} catch (Exception e) {
|
||||
LOG.warn("failure shutting down cluster", e);
|
||||
}
|
||||
}
|
||||
|
||||
@Before
|
||||
public void setup() throws Exception {
|
||||
resetProcExecutorTestingKillFlag();
|
||||
nonceGroup =
|
||||
MasterProcedureTestingUtility.generateNonceGroup(UTIL.getHBaseCluster().getMaster());
|
||||
nonce = MasterProcedureTestingUtility.generateNonce(UTIL.getHBaseCluster().getMaster());
|
||||
|
||||
setupSnapshotAndUpdateTable();
|
||||
}
|
||||
|
||||
@After
|
||||
public void tearDown() throws Exception {
|
||||
resetProcExecutorTestingKillFlag();
|
||||
UTIL.deleteTable(snapshotTableName);
|
||||
SnapshotTestingUtils.deleteAllSnapshots(UTIL.getHBaseAdmin());
|
||||
SnapshotTestingUtils.deleteArchiveDirectory(UTIL);
|
||||
}
|
||||
|
||||
private void resetProcExecutorTestingKillFlag() {
|
||||
final ProcedureExecutor<MasterProcedureEnv> procExec = getMasterProcedureExecutor();
|
||||
ProcedureTestingUtility.setKillAndToggleBeforeStoreUpdate(procExec, false);
|
||||
assertTrue("expected executor to be running", procExec.isRunning());
|
||||
}
|
||||
|
||||
private int getNumReplicas() {
|
||||
return 1;
|
||||
}
|
||||
|
||||
private void setupSnapshotAndUpdateTable() throws Exception {
|
||||
long tid = System.currentTimeMillis();
|
||||
final byte[] snapshotName = Bytes.toBytes("snapshot-" + tid);
|
||||
Admin admin = UTIL.getHBaseAdmin();
|
||||
// create Table
|
||||
SnapshotTestingUtils.createTable(UTIL, snapshotTableName, getNumReplicas(), CF1, CF2);
|
||||
// Load data
|
||||
SnapshotTestingUtils.loadData(UTIL, snapshotTableName, rowCountCF1, CF1);
|
||||
SnapshotTestingUtils.loadData(UTIL, snapshotTableName, rowCountCF2, CF2);
|
||||
SnapshotTestingUtils.verifyRowCount(UTIL, snapshotTableName, rowCountCF1 + rowCountCF2);
|
||||
|
||||
snapshotHTD = admin.getTableDescriptor(snapshotTableName);
|
||||
|
||||
admin.disableTable(snapshotTableName);
|
||||
// take a snapshot
|
||||
admin.snapshot(snapshotName, snapshotTableName);
|
||||
|
||||
List<SnapshotDescription> snapshotList = admin.listSnapshots();
|
||||
snapshot = snapshotList.get(0);
|
||||
|
||||
// modify the table
|
||||
HColumnDescriptor columnFamilyDescriptor3 = new HColumnDescriptor(CF3);
|
||||
HColumnDescriptor columnFamilyDescriptor4 = new HColumnDescriptor(CF4);
|
||||
admin.addColumnFamily(snapshotTableName, columnFamilyDescriptor3);
|
||||
admin.addColumnFamily(snapshotTableName, columnFamilyDescriptor4);
|
||||
admin.deleteColumnFamily(snapshotTableName, CF2);
|
||||
// enable table and insert data
|
||||
admin.enableTable(snapshotTableName);
|
||||
SnapshotTestingUtils.loadData(UTIL, snapshotTableName, rowCountCF3, CF3);
|
||||
SnapshotTestingUtils.loadData(UTIL, snapshotTableName, rowCountCF4, CF4);
|
||||
SnapshotTestingUtils.loadData(UTIL, snapshotTableName, rowCountCF1addition, CF1);
|
||||
HTableDescriptor currentHTD = admin.getTableDescriptor(snapshotTableName);
|
||||
assertTrue(currentHTD.hasFamily(CF1));
|
||||
assertFalse(currentHTD.hasFamily(CF2));
|
||||
assertTrue(currentHTD.hasFamily(CF3));
|
||||
assertTrue(currentHTD.hasFamily(CF4));
|
||||
assertNotEquals(currentHTD.getFamiliesKeys().size(), snapshotHTD.getFamiliesKeys().size());
|
||||
SnapshotTestingUtils.verifyRowCount(
|
||||
UTIL, snapshotTableName, rowCountCF1 + rowCountCF3 + rowCountCF4 + rowCountCF1addition);
|
||||
admin.disableTable(snapshotTableName);
|
||||
}
|
||||
|
||||
private static HTableDescriptor createHTableDescriptor(
|
||||
final TableName tableName, final byte[] ... family) {
|
||||
HTableDescriptor htd = new HTableDescriptor(tableName);
|
||||
for (int i = 0; i < family.length; ++i) {
|
||||
htd.addFamily(new HColumnDescriptor(family[i]));
|
||||
}
|
||||
return htd;
|
||||
}
|
||||
|
||||
@Test(timeout=600000)
|
||||
public void testRestoreSnapshot() throws Exception {
|
||||
final ProcedureExecutor<MasterProcedureEnv> procExec = getMasterProcedureExecutor();
|
||||
|
||||
long procId = ProcedureTestingUtility.submitAndWait(
|
||||
procExec,
|
||||
new RestoreSnapshotProcedure(procExec.getEnvironment(), snapshotHTD, snapshot));
|
||||
ProcedureTestingUtility.assertProcNotFailed(procExec.getResult(procId));
|
||||
|
||||
validateSnapshotRestore();
|
||||
}
|
||||
|
||||
@Test(timeout = 60000)
|
||||
public void testRestoreSnapshotTwiceWithSameNonce() throws Exception {
|
||||
final ProcedureExecutor<MasterProcedureEnv> procExec = getMasterProcedureExecutor();
|
||||
|
||||
long procId1 = procExec.submitProcedure(
|
||||
new RestoreSnapshotProcedure(procExec.getEnvironment(), snapshotHTD, snapshot),
|
||||
nonceGroup,
|
||||
nonce);
|
||||
long procId2 = procExec.submitProcedure(
|
||||
new RestoreSnapshotProcedure(procExec.getEnvironment(), snapshotHTD, snapshot),
|
||||
nonceGroup,
|
||||
nonce);
|
||||
|
||||
// Wait the completion
|
||||
ProcedureTestingUtility.waitProcedure(procExec, procId1);
|
||||
ProcedureTestingUtility.assertProcNotFailed(procExec, procId1);
|
||||
// The second proc should succeed too - because it is the same proc.
|
||||
ProcedureTestingUtility.waitProcedure(procExec, procId2);
|
||||
ProcedureTestingUtility.assertProcNotFailed(procExec, procId2);
|
||||
assertTrue(procId1 == procId2);
|
||||
|
||||
validateSnapshotRestore();
|
||||
}
|
||||
|
||||
@Test(timeout=60000)
|
||||
public void testRestoreSnapshotToDifferentTable() throws Exception {
|
||||
final ProcedureExecutor<MasterProcedureEnv> procExec = getMasterProcedureExecutor();
|
||||
final TableName restoredTableName = TableName.valueOf("testRestoreSnapshotToDifferentTable");
|
||||
final HTableDescriptor newHTD = createHTableDescriptor(restoredTableName, CF1, CF2);
|
||||
|
||||
long procId = ProcedureTestingUtility.submitAndWait(
|
||||
procExec, new RestoreSnapshotProcedure(procExec.getEnvironment(), newHTD, snapshot));
|
||||
ProcedureInfo result = procExec.getResult(procId);
|
||||
assertTrue(result.isFailed());
|
||||
LOG.debug("Restore snapshot failed with exception: " + result.getExceptionFullMessage());
|
||||
assertTrue(
|
||||
ProcedureTestingUtility.getExceptionCause(result) instanceof TableNotFoundException);
|
||||
}
|
||||
|
||||
@Test(timeout=60000)
|
||||
public void testRestoreSnapshotToEnabledTable() throws Exception {
|
||||
final ProcedureExecutor<MasterProcedureEnv> procExec = getMasterProcedureExecutor();
|
||||
|
||||
try {
|
||||
UTIL.getHBaseAdmin().enableTable(snapshotTableName);
|
||||
|
||||
long procId = ProcedureTestingUtility.submitAndWait(
|
||||
procExec,
|
||||
new RestoreSnapshotProcedure(procExec.getEnvironment(), snapshotHTD, snapshot));
|
||||
ProcedureInfo result = procExec.getResult(procId);
|
||||
assertTrue(result.isFailed());
|
||||
LOG.debug("Restore snapshot failed with exception: " + result.getExceptionFullMessage());
|
||||
assertTrue(
|
||||
ProcedureTestingUtility.getExceptionCause(result) instanceof TableNotDisabledException);
|
||||
} finally {
|
||||
UTIL.getHBaseAdmin().disableTable(snapshotTableName);
|
||||
}
|
||||
}
|
||||
|
||||
@Test(timeout=60000)
|
||||
public void testRecoveryAndDoubleExecution() throws Exception {
|
||||
final ProcedureExecutor<MasterProcedureEnv> procExec = getMasterProcedureExecutor();
|
||||
|
||||
ProcedureTestingUtility.setKillAndToggleBeforeStoreUpdate(procExec, true);
|
||||
|
||||
// Start the Restore snapshot procedure && kill the executor
|
||||
long procId = procExec.submitProcedure(
|
||||
new RestoreSnapshotProcedure(procExec.getEnvironment(), snapshotHTD, snapshot),
|
||||
nonceGroup,
|
||||
nonce);
|
||||
|
||||
// Restart the executor and execute the step twice
|
||||
int numberOfSteps = RestoreSnapshotState.values().length;
|
||||
MasterProcedureTestingUtility.testRecoveryAndDoubleExecution(
|
||||
procExec,
|
||||
procId,
|
||||
numberOfSteps,
|
||||
RestoreSnapshotState.values());
|
||||
|
||||
resetProcExecutorTestingKillFlag();
|
||||
validateSnapshotRestore();
|
||||
}
|
||||
|
||||
private void validateSnapshotRestore() throws IOException {
|
||||
try {
|
||||
UTIL.getHBaseAdmin().enableTable(snapshotTableName);
|
||||
|
||||
HTableDescriptor currentHTD = UTIL.getHBaseAdmin().getTableDescriptor(snapshotTableName);
|
||||
assertTrue(currentHTD.hasFamily(CF1));
|
||||
assertTrue(currentHTD.hasFamily(CF2));
|
||||
assertFalse(currentHTD.hasFamily(CF3));
|
||||
assertFalse(currentHTD.hasFamily(CF4));
|
||||
assertEquals(currentHTD.getFamiliesKeys().size(), snapshotHTD.getFamiliesKeys().size());
|
||||
SnapshotTestingUtils.verifyRowCount(UTIL, snapshotTableName, rowCountCF1 + rowCountCF2);
|
||||
} finally {
|
||||
UTIL.getHBaseAdmin().disableTable(snapshotTableName);
|
||||
}
|
||||
}
|
||||
|
||||
private ProcedureExecutor<MasterProcedureEnv> getMasterProcedureExecutor() {
|
||||
return UTIL.getHBaseCluster().getMaster().getMasterProcedureExecutor();
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue