HBASE-17864: Implement async snapshot/cloneSnapshot/restoreSnapshot methods

Signed-off-by: Guanghao Zhang <zghao@apache.org>
This commit is contained in:
huzheng 2017-04-20 18:59:43 +08:00 committed by Guanghao Zhang
parent 33dadc1a94
commit d39f40e787
5 changed files with 351 additions and 1 deletions

View File

@ -573,4 +573,92 @@ public interface AsyncAdmin {
* {@link CompletableFuture}.
*/
CompletableFuture<List<TableCFs>> listReplicatedTableCFs();
/**
* Take a snapshot for the given table. If the table is enabled, a FLUSH-type snapshot will be
* taken. If the table is disabled, an offline snapshot is taken. Snapshots are considered unique
* based on <b>the name of the snapshot</b>. Attempts to take a snapshot with the same name (even
* a different type or with different parameters) will fail with a
* {@link org.apache.hadoop.hbase.snapshot.SnapshotCreationException} indicating the duplicate
* naming. Snapshot names follow the same naming constraints as tables in HBase. See
* {@link org.apache.hadoop.hbase.TableName#isLegalFullyQualifiedTableName(byte[])}.
* @param snapshotName name of the snapshot to be created
* @param tableName name of the table for which snapshot is created
*/
CompletableFuture<Void> snapshot(String snapshotName, TableName tableName);
/**
* Create typed snapshot of the table. Snapshots are considered unique based on <b>the name of the
* snapshot</b>. Attempts to take a snapshot with the same name (even a different type or with
* different parameters) will fail with a
* {@link org.apache.hadoop.hbase.snapshot.SnapshotCreationException} indicating the duplicate
* naming. Snapshot names follow the same naming constraints as tables in HBase. See
* {@link org.apache.hadoop.hbase.TableName#isLegalFullyQualifiedTableName(byte[])}.
* @param snapshotName name to give the snapshot on the filesystem. Must be unique from all other
* snapshots stored on the cluster
* @param tableName name of the table to snapshot
* @param type type of snapshot to take
*/
CompletableFuture<Void> snapshot(final String snapshotName, final TableName tableName,
SnapshotType type);
/**
* Take a snapshot and wait for the server to complete that snapshot asynchronously. Only a single
* snapshot should be taken at a time for an instance of HBase, or results may be undefined (you
* can tell multiple HBase clusters to snapshot at the same time, but only one at a time for a
* single cluster). Snapshots are considered unique based on <b>the name of the snapshot</b>.
* Attempts to take a snapshot with the same name (even a different type or with different
* parameters) will fail with a {@link org.apache.hadoop.hbase.snapshot.SnapshotCreationException}
* indicating the duplicate naming. Snapshot names follow the same naming constraints as tables in
* HBase. See {@link org.apache.hadoop.hbase.TableName#isLegalFullyQualifiedTableName(byte[])}.
* You should probably use {@link #snapshot(String, org.apache.hadoop.hbase.TableName)} unless you
* are sure about the type of snapshot that you want to take.
* @param snapshot snapshot to take
*/
CompletableFuture<Void> snapshot(SnapshotDescription snapshot);
/**
* Check the current state of the passed snapshot. There are three possible states:
* <ol>
* <li>running - returns <tt>false</tt></li>
* <li>finished - returns <tt>true</tt></li>
* <li>finished with error - throws the exception that caused the snapshot to fail</li>
* </ol>
* The cluster only knows about the most recent snapshot. Therefore, if another snapshot has been
* run/started since the snapshot your are checking, you will recieve an
* {@link org.apache.hadoop.hbase.snapshot.UnknownSnapshotException}.
* @param snapshot description of the snapshot to check
* @return <tt>true</tt> if the snapshot is completed, <tt>false</tt> if the snapshot is still
* running
*/
CompletableFuture<Boolean> isSnapshotFinished(final SnapshotDescription snapshot);
/**
* Restore the specified snapshot on the original table. (The table must be disabled) If the
* "hbase.snapshot.restore.take.failsafe.snapshot" configuration property is set to true, a
* snapshot of the current table is taken before executing the restore operation. In case of
* restore failure, the failsafe snapshot will be restored. If the restore completes without
* problem the failsafe snapshot is deleted.
* @param snapshotName name of the snapshot to restore
*/
CompletableFuture<Void> restoreSnapshot(String snapshotName);
/**
* Restore the specified snapshot on the original table. (The table must be disabled) If
* 'takeFailSafeSnapshot' is set to true, a snapshot of the current table is taken before
* executing the restore operation. In case of restore failure, the failsafe snapshot will be
* restored. If the restore completes without problem the failsafe snapshot is deleted. The
* failsafe snapshot name is configurable by using the property
* "hbase.snapshot.restore.failsafe.name".
* @param snapshotName name of the snapshot to restore
* @param takeFailSafeSnapshot true if the failsafe snapshot should be taken
*/
CompletableFuture<Void> restoreSnapshot(String snapshotName, boolean takeFailSafeSnapshot);
/**
* Create a new table by cloning the snapshot content.
* @param snapshotName name of the snapshot to be cloned
* @param tableName name of the table where the snapshot will be restored
*/
CompletableFuture<Void> cloneSnapshot(final String snapshotName, final TableName tableName);
}

View File

@ -37,6 +37,8 @@ import java.util.stream.Collectors;
import com.google.common.annotations.VisibleForTesting;
import io.netty.util.Timeout;
import io.netty.util.TimerTask;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.HColumnDescriptor;
@ -50,6 +52,7 @@ import org.apache.hadoop.hbase.RegionLocations;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.NamespaceDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableExistsException;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.AsyncMetaTableAccessor;
import org.apache.hadoop.hbase.TableNotFoundException;
@ -77,6 +80,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegion
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.SplitRegionRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.SplitRegionResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.TableSchema;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AddColumnRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AddColumnResponse;
@ -110,6 +114,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteTabl
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteTableResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsBalancerEnabledRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsBalancerEnabledResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSnapshotDoneRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSnapshotDoneResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamespaceDescriptorsRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamespaceDescriptorsResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MasterService;
@ -123,10 +129,14 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MoveRegion
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MoveRegionResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.OfflineRegionRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.OfflineRegionResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RestoreSnapshotRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RestoreSnapshotResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetBalancerRunningRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetBalancerRunningResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetQuotaRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetQuotaResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SnapshotRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SnapshotResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.TruncateTableRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.TruncateTableResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.UnassignRegionRequest;
@ -145,7 +155,10 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.Remov
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.RemoveReplicationPeerResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.UpdateReplicationPeerConfigRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.UpdateReplicationPeerConfigResponse;
import org.apache.hadoop.hbase.snapshot.ClientSnapshotDescriptionUtils;
import org.apache.hadoop.hbase.snapshot.SnapshotCreationException;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.ForeignExceptionUtil;
import org.apache.hadoop.hbase.util.Pair;
@ -1387,6 +1400,138 @@ public class AsyncHBaseAdmin implements AsyncAdmin {
return future;
}
@Override
public CompletableFuture<Void> snapshot(String snapshotName, TableName tableName) {
return snapshot(snapshotName, tableName, SnapshotType.FLUSH);
}
@Override
public CompletableFuture<Void> snapshot(String snapshotName, TableName tableName,
SnapshotType type) {
return snapshot(new SnapshotDescription(snapshotName, tableName, type));
}
@Override
public CompletableFuture<Void> snapshot(SnapshotDescription snapshotDesc) {
HBaseProtos.SnapshotDescription snapshot =
ProtobufUtil.createHBaseProtosSnapshotDesc(snapshotDesc);
try {
ClientSnapshotDescriptionUtils.assertSnapshotRequestIsValid(snapshot);
} catch (IllegalArgumentException e) {
return failedFuture(e);
}
CompletableFuture<Void> future = new CompletableFuture<>();
final SnapshotRequest request = SnapshotRequest.newBuilder().setSnapshot(snapshot).build();
this.<Long> newMasterCaller()
.action((controller, stub) -> this.<SnapshotRequest, SnapshotResponse, Long> call(
controller, stub, request, (s, c, req, done) -> s.snapshot(c, req, done),
resp -> resp.getExpectedTimeout()))
.call().whenComplete((expectedTimeout, err) -> {
if (err != null) {
future.completeExceptionally(err);
return;
}
TimerTask pollingTask = new TimerTask() {
int tries = 0;
long startTime = EnvironmentEdgeManager.currentTime();
long endTime = startTime + expectedTimeout;
long maxPauseTime = expectedTimeout / maxAttempts;
@Override
public void run(Timeout timeout) throws Exception {
if (EnvironmentEdgeManager.currentTime() < endTime) {
isSnapshotFinished(snapshotDesc).whenComplete((done, err) -> {
if (err != null) {
future.completeExceptionally(err);
} else if (done) {
future.complete(null);
} else {
// retry again after pauseTime.
long pauseTime = ConnectionUtils
.getPauseTime(TimeUnit.NANOSECONDS.toMillis(pauseNs), ++tries);
pauseTime = Math.min(pauseTime, maxPauseTime);
AsyncConnectionImpl.RETRY_TIMER.newTimeout(this, pauseTime,
TimeUnit.MILLISECONDS);
}
});
} else {
future.completeExceptionally(new SnapshotCreationException(
"Snapshot '" + snapshot.getName() + "' wasn't completed in expectedTime:"
+ expectedTimeout + " ms",
snapshotDesc));
}
}
};
AsyncConnectionImpl.RETRY_TIMER.newTimeout(pollingTask, 1, TimeUnit.MILLISECONDS);
});
return future;
}
@Override
public CompletableFuture<Boolean> isSnapshotFinished(SnapshotDescription snapshot) {
return this.<Boolean> newMasterCaller()
.action((controller, stub) -> this
.<IsSnapshotDoneRequest, IsSnapshotDoneResponse, Boolean> call(controller, stub,
IsSnapshotDoneRequest.newBuilder()
.setSnapshot(ProtobufUtil.createHBaseProtosSnapshotDesc(snapshot)).build(),
(s, c, req, done) -> s.isSnapshotDone(c, req, done), resp -> resp.getDone()))
.call();
}
@Override
public CompletableFuture<Void> restoreSnapshot(String snapshotName) {
boolean takeFailSafeSnapshot = this.connection.getConfiguration().getBoolean(
HConstants.SNAPSHOT_RESTORE_TAKE_FAILSAFE_SNAPSHOT,
HConstants.DEFAULT_SNAPSHOT_RESTORE_TAKE_FAILSAFE_SNAPSHOT);
return restoreSnapshot(snapshotName, takeFailSafeSnapshot);
}
@Override
public CompletableFuture<Void> restoreSnapshot(String snapshotName,
boolean takeFailSafeSnapshot) {
// TODO It depend on listSnapshots() method.
return failedFuture(new UnsupportedOperationException("restoreSnapshot do not supported yet"));
}
@Override
public CompletableFuture<Void> cloneSnapshot(String snapshotName, TableName tableName) {
CompletableFuture<Void> future = new CompletableFuture<>();
tableExists(tableName).whenComplete((exists, err) -> {
if (err != null) {
future.completeExceptionally(err);
} else if (exists) {
future.completeExceptionally(new TableExistsException(tableName));
} else {
internalRestoreSnapshot(snapshotName, tableName).whenComplete((ret, err2) -> {
if (err2 != null) {
future.completeExceptionally(err2);
} else {
future.complete(ret);
}
});
}
});
return future;
}
private CompletableFuture<Void> internalRestoreSnapshot(String snapshotName,
TableName tableName) {
HBaseProtos.SnapshotDescription snapshot = HBaseProtos.SnapshotDescription.newBuilder()
.setName(snapshotName).setTable(tableName.getNameAsString()).build();
try {
ClientSnapshotDescriptionUtils.assertSnapshotRequestIsValid(snapshot);
} catch (IllegalArgumentException e) {
return failedFuture(e);
}
return this.<Void> newMasterCaller()
.action((controller, stub) -> this
.<RestoreSnapshotRequest, RestoreSnapshotResponse, Void> call(controller, stub,
RestoreSnapshotRequest.newBuilder().setSnapshot(snapshot)
.setNonceGroup(ng.getNonceGroup()).setNonce(ng.newNonce()).build(),
(s, c, req, done) -> s.restoreSnapshot(c, req, done), resp -> null))
.call();
}
private byte[][] getSplitKeys(byte[] startKey, byte[] endKey, int numRegions) {
if (numRegions < 3) {
throw new IllegalArgumentException("Must create at least three regions");

View File

@ -2498,7 +2498,8 @@ public class HBaseAdmin implements Admin {
public void restoreSnapshot(final String snapshotName)
throws IOException, RestoreSnapshotException {
boolean takeFailSafeSnapshot =
conf.getBoolean("hbase.snapshot.restore.take.failsafe.snapshot", false);
conf.getBoolean(HConstants.SNAPSHOT_RESTORE_TAKE_FAILSAFE_SNAPSHOT,
HConstants.DEFAULT_SNAPSHOT_RESTORE_TAKE_FAILSAFE_SNAPSHOT);
restoreSnapshot(snapshotName, takeFailSafeSnapshot);
}

View File

@ -1351,6 +1351,10 @@ public final class HConstants {
public static final String DEFAULT_TEMPORARY_HDFS_DIRECTORY = "/user/"
+ System.getProperty("user.name") + "/hbase-staging";
public static final String SNAPSHOT_RESTORE_TAKE_FAILSAFE_SNAPSHOT =
"hbase.snapshot.restore.take.failsafe.snapshot";
public static final boolean DEFAULT_SNAPSHOT_RESTORE_TAKE_FAILSAFE_SNAPSHOT = false;
private HConstants() {
// Can't be instantiated with this ctor.
}

View File

@ -0,0 +1,112 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.testclassification.ClientTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.Assert;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import java.util.Collections;
import java.util.List;
@Category({ MediumTests.class, ClientTests.class })
public class TestAsyncSnapshotAdminApi extends TestAsyncAdminBase {
@Test
public void testTakeSnapshot() throws Exception {
String snapshotName1 = "snapshotName1";
String snapshotName2 = "snapshotName2";
TableName tableName = TableName.valueOf("testTakeSnapshot");
Admin syncAdmin = TEST_UTIL.getAdmin();
try {
Table table = TEST_UTIL.createTable(tableName, Bytes.toBytes("f1"));
for (int i = 0; i < 3000; i++) {
table.put(new Put(Bytes.toBytes(i)).addColumn(Bytes.toBytes("f1"), Bytes.toBytes("cq"),
Bytes.toBytes(i)));
}
admin.snapshot(snapshotName1, tableName).get();
admin.snapshot(snapshotName2, tableName).get();
List<SnapshotDescription> snapshots = syncAdmin.listSnapshots();
Collections.sort(snapshots, (snap1, snap2) -> {
Assert.assertNotNull(snap1);
Assert.assertNotNull(snap1.getName());
Assert.assertNotNull(snap2);
Assert.assertNotNull(snap2.getName());
return snap1.getName().compareTo(snap2.getName());
});
Assert.assertEquals(snapshotName1, snapshots.get(0).getName());
Assert.assertEquals(tableName, snapshots.get(0).getTableName());
Assert.assertEquals(SnapshotType.FLUSH, snapshots.get(0).getType());
Assert.assertEquals(snapshotName2, snapshots.get(1).getName());
Assert.assertEquals(tableName, snapshots.get(1).getTableName());
Assert.assertEquals(SnapshotType.FLUSH, snapshots.get(1).getType());
} finally {
syncAdmin.deleteSnapshot(snapshotName1);
syncAdmin.deleteSnapshot(snapshotName2);
TEST_UTIL.deleteTable(tableName);
}
}
@Test
public void testCloneSnapshot() throws Exception {
String snapshotName1 = "snapshotName1";
TableName tableName = TableName.valueOf("testCloneSnapshot");
TableName tableName2 = TableName.valueOf("testCloneSnapshot2");
Admin syncAdmin = TEST_UTIL.getAdmin();
try {
Table table = TEST_UTIL.createTable(tableName, Bytes.toBytes("f1"));
for (int i = 0; i < 3000; i++) {
table.put(new Put(Bytes.toBytes(i)).addColumn(Bytes.toBytes("f1"), Bytes.toBytes("cq"),
Bytes.toBytes(i)));
}
admin.snapshot(snapshotName1, tableName).get();
List<SnapshotDescription> snapshots = syncAdmin.listSnapshots();
Assert.assertEquals(snapshots.size(), 1);
Assert.assertEquals(snapshotName1, snapshots.get(0).getName());
Assert.assertEquals(tableName, snapshots.get(0).getTableName());
Assert.assertEquals(SnapshotType.FLUSH, snapshots.get(0).getType());
// cloneSnapshot into a existed table.
boolean failed = false;
try {
admin.cloneSnapshot(snapshotName1, tableName).get();
} catch (Exception e) {
failed = true;
}
Assert.assertTrue(failed);
// cloneSnapshot into a new table.
Assert.assertTrue(!syncAdmin.tableExists(tableName2));
admin.cloneSnapshot(snapshotName1, tableName2).get();
syncAdmin.tableExists(tableName2);
} finally {
syncAdmin.deleteSnapshot(snapshotName1);
TEST_UTIL.deleteTable(tableName);
}
}
}