HBASE-17865: Implement async listSnapshot/deleteSnapshot methods.

Signed-off-by: Guanghao Zhang <zghao@apache.org>
This commit is contained in:
huzheng 2017-04-24 11:51:36 +08:00 committed by Guanghao Zhang
parent b81e00f5ea
commit 4bc0eb31c3
5 changed files with 462 additions and 3 deletions

View File

@ -663,4 +663,82 @@ public interface AsyncAdmin {
* @param tableName name of the table where the snapshot will be restored
*/
CompletableFuture<Void> cloneSnapshot(final String snapshotName, final TableName tableName);
/**
* List completed snapshots.
* @return a list of snapshot descriptors for completed snapshots wrapped by a
* {@link CompletableFuture}
*/
CompletableFuture<List<SnapshotDescription>> listSnapshots();
/**
* List all the completed snapshots matching the given regular expression.
* @param regex The regular expression to match against
* @return - returns a List of SnapshotDescription wrapped by a {@link CompletableFuture}
*/
CompletableFuture<List<SnapshotDescription>> listSnapshots(String regex);
/**
* List all the completed snapshots matching the given pattern.
* @param pattern The compiled regular expression to match against
* @return - returns a List of SnapshotDescription wrapped by a {@link CompletableFuture}
*/
CompletableFuture<List<SnapshotDescription>> listSnapshots(Pattern pattern);
/**
* List all the completed snapshots matching the given table name regular expression and snapshot
* name regular expression.
* @param tableNameRegex The table name regular expression to match against
* @param snapshotNameRegex The snapshot name regular expression to match against
* @return - returns a List of completed SnapshotDescription wrapped by a
* {@link CompletableFuture}
*/
CompletableFuture<List<SnapshotDescription>> listTableSnapshots(String tableNameRegex,
String snapshotNameRegex);
/**
* List all the completed snapshots matching the given table name regular expression and snapshot
* name regular expression.
* @param tableNamePattern The compiled table name regular expression to match against
* @param snapshotNamePattern The compiled snapshot name regular expression to match against
* @return - returns a List of completed SnapshotDescription wrapped by a
* {@link CompletableFuture}
*/
CompletableFuture<List<SnapshotDescription>> listTableSnapshots(Pattern tableNamePattern,
Pattern snapshotNamePattern);
/**
* Delete an existing snapshot.
* @param snapshotName name of the snapshot
*/
CompletableFuture<Void> deleteSnapshot(String snapshotName);
/**
* Delete existing snapshots whose names match the pattern passed.
* @param regex The regular expression to match against
*/
CompletableFuture<Void> deleteSnapshots(String regex);
/**
* Delete existing snapshots whose names match the pattern passed.
* @param pattern pattern for names of the snapshot to match
*/
CompletableFuture<Void> deleteSnapshots(Pattern pattern);
/**
* Delete all existing snapshots matching the given table name regular expression and snapshot
* name regular expression.
* @param tableNameRegex The table name regular expression to match against
* @param snapshotNameRegex The snapshot name regular expression to match against
*/
CompletableFuture<Void> deleteTableSnapshots(String tableNameRegex, String snapshotNameRegex);
/**
* Delete all existing snapshots matching the given table name regular expression and snapshot
* name regular expression.
* @param tableNamePattern The compiled table name regular expression to match against
* @param snapshotNamePattern The compiled snapshot name regular expression to match against
*/
CompletableFuture<Void> deleteTableSnapshots(Pattern tableNamePattern,
Pattern snapshotNamePattern);
}

View File

@ -23,6 +23,7 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
@ -54,6 +55,7 @@ import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableExistsException;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.AsyncMetaTableAccessor;
import org.apache.hadoop.hbase.TableNotDisabledException;
import org.apache.hadoop.hbase.TableNotFoundException;
import org.apache.hadoop.hbase.UnknownRegionException;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
@ -91,12 +93,16 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateName
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateNamespaceResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteNamespaceRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteNamespaceResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteSnapshotRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteSnapshotResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DisableTableRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DisableTableResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableTableRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableTableResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteColumnRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteColumnResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetCompletedSnapshotsRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetCompletedSnapshotsResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetNamespaceDescriptorRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetNamespaceDescriptorResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetProcedureResultRequest;
@ -155,6 +161,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.Remov
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.UpdateReplicationPeerConfigRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.UpdateReplicationPeerConfigResponse;
import org.apache.hadoop.hbase.snapshot.ClientSnapshotDescriptionUtils;
import org.apache.hadoop.hbase.snapshot.RestoreSnapshotException;
import org.apache.hadoop.hbase.snapshot.SnapshotCreationException;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
@ -1485,11 +1492,118 @@ public class AsyncHBaseAdmin implements AsyncAdmin {
return restoreSnapshot(snapshotName, takeFailSafeSnapshot);
}
private CompletableFuture<Void> restoreSnapshotWithFailSafe(String snapshotName,
TableName tableName, boolean takeFailSafeSnapshot) {
if (takeFailSafeSnapshot) {
CompletableFuture<Void> future = new CompletableFuture<>();
// Step.1 Take a snapshot of the current state
String failSafeSnapshotSnapshotNameFormat =
this.connection.getConfiguration().get(HConstants.SNAPSHOT_RESTORE_FAILSAFE_NAME,
HConstants.DEFAULT_SNAPSHOT_RESTORE_FAILSAFE_NAME);
final String failSafeSnapshotSnapshotName =
failSafeSnapshotSnapshotNameFormat.replace("{snapshot.name}", snapshotName)
.replace("{table.name}", tableName.toString().replace(TableName.NAMESPACE_DELIM, '.'))
.replace("{restore.timestamp}", String.valueOf(EnvironmentEdgeManager.currentTime()));
LOG.info("Taking restore-failsafe snapshot: " + failSafeSnapshotSnapshotName);
snapshot(failSafeSnapshotSnapshotName, tableName).whenComplete((ret, err) -> {
if (err != null) {
future.completeExceptionally(err);
} else {
// Step.2 Restore snapshot
internalRestoreSnapshot(snapshotName, tableName).whenComplete((ret2, err2) -> {
if (err2 != null) {
// Step.3.a Something went wrong during the restore and try to rollback.
internalRestoreSnapshot(failSafeSnapshotSnapshotName, tableName)
.whenComplete((ret3, err3) -> {
if (err3 != null) {
future.completeExceptionally(err3);
} else {
String msg =
"Restore snapshot=" + snapshotName + " failed. Rollback to snapshot="
+ failSafeSnapshotSnapshotName + " succeeded.";
future.completeExceptionally(new RestoreSnapshotException(msg));
}
});
} else {
// Step.3.b If the restore is succeeded, delete the pre-restore snapshot.
LOG.info("Deleting restore-failsafe snapshot: " + failSafeSnapshotSnapshotName);
deleteSnapshot(failSafeSnapshotSnapshotName).whenComplete((ret3, err3) -> {
if (err3 != null) {
LOG.error(
"Unable to remove the failsafe snapshot: " + failSafeSnapshotSnapshotName,
err3);
future.completeExceptionally(err3);
} else {
future.complete(ret3);
}
});
}
});
}
});
return future;
} else {
return internalRestoreSnapshot(snapshotName, tableName);
}
}
@Override
public CompletableFuture<Void> restoreSnapshot(String snapshotName,
boolean takeFailSafeSnapshot) {
// TODO It depend on listSnapshots() method.
return failedFuture(new UnsupportedOperationException("restoreSnapshot do not supported yet"));
CompletableFuture<Void> future = new CompletableFuture<>();
listSnapshots(snapshotName).whenComplete((snapshotDescriptions, err) -> {
if (err != null) {
future.completeExceptionally(err);
return;
}
TableName tableName = null;
if (snapshotDescriptions != null && !snapshotDescriptions.isEmpty()) {
for (SnapshotDescription snap : snapshotDescriptions) {
if (snap.getName().equals(snapshotName)) {
tableName = snap.getTableName();
break;
}
}
}
if (tableName == null) {
future.completeExceptionally(new RestoreSnapshotException(
"Unable to find the table name for snapshot=" + snapshotName));
return;
}
final TableName finalTableName = tableName;
tableExists(finalTableName).whenComplete((exists, err2) -> {
if (err2 != null) {
future.completeExceptionally(err2);
} else if (!exists) {
// if table does not exist, then just clone snapshot into new table.
internalRestoreSnapshot(snapshotName, finalTableName).whenComplete((ret, err3) -> {
if (err3 != null) {
future.completeExceptionally(err3);
} else {
future.complete(ret);
}
});
} else {
isTableDisabled(finalTableName).whenComplete((disabled, err4) -> {
if (err4 != null) {
future.completeExceptionally(err4);
} else if (!disabled) {
future.completeExceptionally(new TableNotDisabledException(finalTableName));
} else {
restoreSnapshotWithFailSafe(snapshotName, finalTableName, takeFailSafeSnapshot)
.whenComplete((ret, err5) -> {
if (err5 != null) {
future.completeExceptionally(err5);
} else {
future.complete(ret);
}
});
}
});
}
});
});
return future;
}
@Override
@ -1531,6 +1645,135 @@ public class AsyncHBaseAdmin implements AsyncAdmin {
.call();
}
@Override
public CompletableFuture<List<SnapshotDescription>> listSnapshots() {
return this.<List<SnapshotDescription>> newMasterCaller()
.action((controller, stub) -> this
.<GetCompletedSnapshotsRequest, GetCompletedSnapshotsResponse, List<SnapshotDescription>> call(
controller, stub, GetCompletedSnapshotsRequest.newBuilder().build(),
(s, c, req, done) -> s.getCompletedSnapshots(c, req, done),
resp -> resp.getSnapshotsList().stream().map(ProtobufUtil::createSnapshotDesc)
.collect(Collectors.toList())))
.call();
}
@Override
public CompletableFuture<List<SnapshotDescription>> listSnapshots(String regex) {
return listSnapshots(Pattern.compile(regex));
}
@Override
public CompletableFuture<List<SnapshotDescription>> listSnapshots(Pattern pattern) {
CompletableFuture<List<SnapshotDescription>> future = new CompletableFuture<>();
listSnapshots().whenComplete((snapshotDescList, err) -> {
if (err != null) {
future.completeExceptionally(err);
return;
}
if (snapshotDescList == null || snapshotDescList.isEmpty()) {
future.complete(Collections.emptyList());
return;
}
future.complete(snapshotDescList.stream()
.filter(snap -> pattern.matcher(snap.getName()).matches()).collect(Collectors.toList()));
});
return future;
}
@Override
public CompletableFuture<List<SnapshotDescription>> listTableSnapshots(String tableNameRegex,
String snapshotNameRegex) {
return listTableSnapshots(Pattern.compile(tableNameRegex), Pattern.compile(snapshotNameRegex));
}
@Override
public CompletableFuture<List<SnapshotDescription>> listTableSnapshots(Pattern tableNamePattern,
Pattern snapshotNamePattern) {
CompletableFuture<List<SnapshotDescription>> future = new CompletableFuture<>();
listTableNames(tableNamePattern, false).whenComplete((tableNames, err) -> {
if (err != null) {
future.completeExceptionally(err);
return;
}
if (tableNames == null || tableNames.length <= 0) {
future.complete(Collections.emptyList());
return;
}
List<TableName> tableNameList = Arrays.asList(tableNames);
listSnapshots(snapshotNamePattern).whenComplete((snapshotDescList, err2) -> {
if (err2 != null) {
future.completeExceptionally(err2);
return;
}
if (snapshotDescList == null || snapshotDescList.isEmpty()) {
future.complete(Collections.emptyList());
return;
}
future.complete(snapshotDescList.stream()
.filter(snap -> (snap != null && tableNameList.contains(snap.getTableName())))
.collect(Collectors.toList()));
});
});
return future;
}
@Override
public CompletableFuture<Void> deleteSnapshot(String snapshotName) {
return internalDeleteSnapshot(new SnapshotDescription(snapshotName));
}
@Override
public CompletableFuture<Void> deleteSnapshots(String regex) {
return deleteSnapshots(Pattern.compile(regex));
}
@Override
public CompletableFuture<Void> deleteSnapshots(Pattern snapshotNamePattern) {
return deleteTableSnapshots(null, snapshotNamePattern);
}
@Override
public CompletableFuture<Void> deleteTableSnapshots(String tableNameRegex,
String snapshotNameRegex) {
return deleteTableSnapshots(Pattern.compile(tableNameRegex),
Pattern.compile(snapshotNameRegex));
}
@Override
public CompletableFuture<Void> deleteTableSnapshots(Pattern tableNamePattern,
Pattern snapshotNamePattern) {
CompletableFuture<Void> future = new CompletableFuture<>();
listTableSnapshots(tableNamePattern, snapshotNamePattern)
.whenComplete(((snapshotDescriptions, err) -> {
if (err != null) {
future.completeExceptionally(err);
return;
}
if (snapshotDescriptions == null || snapshotDescriptions.isEmpty()) {
future.complete(null);
return;
}
List<CompletableFuture<Void>> deleteSnapshotFutures = new ArrayList<>();
snapshotDescriptions
.forEach(snapDesc -> deleteSnapshotFutures.add(internalDeleteSnapshot(snapDesc)));
CompletableFuture
.allOf(deleteSnapshotFutures
.toArray(new CompletableFuture<?>[deleteSnapshotFutures.size()]))
.thenAccept(v -> future.complete(v));
}));
return future;
}
private CompletableFuture<Void> internalDeleteSnapshot(SnapshotDescription snapshot) {
return this.<Void> newMasterCaller()
.action((controller, stub) -> this
.<DeleteSnapshotRequest, DeleteSnapshotResponse, Void> call(controller, stub,
DeleteSnapshotRequest.newBuilder()
.setSnapshot(ProtobufUtil.createHBaseProtosSnapshotDesc(snapshot)).build(),
(s, c, req, done) -> s.deleteSnapshot(c, req, done), resp -> null))
.call();
}
private byte[][] getSplitKeys(byte[] startKey, byte[] endKey, int numRegions) {
if (numRegions < 3) {
throw new IllegalArgumentException("Must create at least three regions");

View File

@ -2576,7 +2576,7 @@ public class HBaseAdmin implements Admin {
syncWaitTimeout,
TimeUnit.MILLISECONDS);
} catch (IOException e) {
// Somthing went wrong during the restore...
// Something went wrong during the restore...
// if the pre-restore snapshot is available try to rollback
if (takeFailSafeSnapshot) {
try {

View File

@ -1355,6 +1355,11 @@ public final class HConstants {
"hbase.snapshot.restore.take.failsafe.snapshot";
public static final boolean DEFAULT_SNAPSHOT_RESTORE_TAKE_FAILSAFE_SNAPSHOT = false;
public static final String SNAPSHOT_RESTORE_FAILSAFE_NAME =
"hbase.snapshot.restore.failsafe.name";
public static final String DEFAULT_SNAPSHOT_RESTORE_FAILSAFE_NAME =
"hbase-failsafe-{snapshot.name}-{restore.timestamp}";
private HConstants() {
// Can't be instantiated with this ctor.
}

View File

@ -26,8 +26,10 @@ import org.junit.Assert;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.regex.Pattern;
@Category({ MediumTests.class, ClientTests.class })
public class TestAsyncSnapshotAdminApi extends TestAsyncAdminBase {
@ -109,4 +111,135 @@ public class TestAsyncSnapshotAdminApi extends TestAsyncAdminBase {
TEST_UTIL.deleteTable(tableName);
}
}
private void assertResult(TableName tableName, int expectedRowCount) throws IOException {
try (Table table = TEST_UTIL.getConnection().getTable(tableName)) {
Scan scan = new Scan();
try (ResultScanner scanner = table.getScanner(scan)) {
Result result;
int rowCount = 0;
while ((result = scanner.next()) != null) {
Assert.assertArrayEquals(result.getRow(), Bytes.toBytes(rowCount));
Assert.assertArrayEquals(result.getValue(Bytes.toBytes("f1"), Bytes.toBytes("cq")),
Bytes.toBytes(rowCount));
rowCount += 1;
}
Assert.assertEquals(rowCount, expectedRowCount);
}
}
}
@Test
public void testRestoreSnapshot() throws Exception {
String snapshotName1 = "snapshotName1";
String snapshotName2 = "snapshotName2";
TableName tableName = TableName.valueOf("testRestoreSnapshot");
Admin syncAdmin = TEST_UTIL.getAdmin();
try {
Table table = TEST_UTIL.createTable(tableName, Bytes.toBytes("f1"));
for (int i = 0; i < 3000; i++) {
table.put(new Put(Bytes.toBytes(i)).addColumn(Bytes.toBytes("f1"), Bytes.toBytes("cq"),
Bytes.toBytes(i)));
}
Assert.assertEquals(admin.listSnapshots().get().size(), 0);
admin.snapshot(snapshotName1, tableName).get();
admin.snapshot(snapshotName2, tableName).get();
Assert.assertEquals(admin.listSnapshots().get().size(), 2);
admin.disableTable(tableName).get();
admin.restoreSnapshot(snapshotName1, true).get();
admin.enableTable(tableName).get();
assertResult(tableName, 3000);
admin.disableTable(tableName).get();
admin.restoreSnapshot(snapshotName2, false).get();
admin.enableTable(tableName).get();
assertResult(tableName, 3000);
} finally {
syncAdmin.deleteSnapshot(snapshotName1);
syncAdmin.deleteSnapshot(snapshotName2);
TEST_UTIL.deleteTable(tableName);
}
}
@Test
public void testListSnapshots() throws Exception {
String snapshotName1 = "snapshotName1";
String snapshotName2 = "snapshotName2";
String snapshotName3 = "snapshotName3";
TableName tableName = TableName.valueOf("testListSnapshots");
Admin syncAdmin = TEST_UTIL.getAdmin();
try {
Table table = TEST_UTIL.createTable(tableName, Bytes.toBytes("f1"));
for (int i = 0; i < 3000; i++) {
table.put(new Put(Bytes.toBytes(i)).addColumn(Bytes.toBytes("f1"), Bytes.toBytes("cq"),
Bytes.toBytes(i)));
}
Assert.assertEquals(admin.listSnapshots().get().size(), 0);
admin.snapshot(snapshotName1, tableName).get();
admin.snapshot(snapshotName2, tableName).get();
admin.snapshot(snapshotName3, tableName).get();
Assert.assertEquals(admin.listSnapshots().get().size(), 3);
Assert.assertEquals(admin.listSnapshots("(.*)").get().size(), 3);
Assert.assertEquals(admin.listSnapshots("snapshotName(\\d+)").get().size(), 3);
Assert.assertEquals(admin.listSnapshots("snapshotName[1|3]").get().size(), 2);
Assert.assertEquals(admin.listSnapshots(Pattern.compile("snapshot(.*)")).get().size(), 3);
Assert.assertEquals(admin.listTableSnapshots("testListSnapshots", "s(.*)").get().size(), 3);
Assert.assertEquals(admin.listTableSnapshots("fakeTableName", "snap(.*)").get().size(), 0);
Assert.assertEquals(admin.listTableSnapshots("test(.*)", "snap(.*)[1|3]").get().size(), 2);
} finally {
syncAdmin.deleteSnapshot(snapshotName1);
syncAdmin.deleteSnapshot(snapshotName2);
syncAdmin.deleteSnapshot(snapshotName3);
TEST_UTIL.deleteTable(tableName);
}
}
@Test
public void testDeleteSnapshots() throws Exception {
String snapshotName1 = "snapshotName1";
String snapshotName2 = "snapshotName2";
String snapshotName3 = "snapshotName3";
TableName tableName = TableName.valueOf("testDeleteSnapshots");
try {
Table table = TEST_UTIL.createTable(tableName, Bytes.toBytes("f1"));
for (int i = 0; i < 3000; i++) {
table.put(new Put(Bytes.toBytes(i)).addColumn(Bytes.toBytes("f1"), Bytes.toBytes("cq"),
Bytes.toBytes(i)));
}
Assert.assertEquals(admin.listSnapshots().get().size(), 0);
admin.snapshot(snapshotName1, tableName).get();
admin.snapshot(snapshotName2, tableName).get();
admin.snapshot(snapshotName3, tableName).get();
Assert.assertEquals(admin.listSnapshots().get().size(), 3);
admin.deleteSnapshot(snapshotName1).get();
Assert.assertEquals(admin.listSnapshots().get().size(), 2);
admin.deleteSnapshots("(.*)abc").get();
Assert.assertEquals(admin.listSnapshots().get().size(), 2);
admin.deleteSnapshots("(.*)1").get();
Assert.assertEquals(admin.listSnapshots().get().size(), 2);
admin.deleteTableSnapshots("(.*)", "(.*)1").get();
Assert.assertEquals(admin.listSnapshots().get().size(), 2);
admin.deleteTableSnapshots("(.*)", "(.*)2").get();
Assert.assertEquals(admin.listSnapshots().get().size(), 1);
admin.deleteTableSnapshots("(.*)", "(.*)3").get();
Assert.assertEquals(admin.listSnapshots().get().size(), 0);
} finally {
TEST_UTIL.deleteTable(tableName);
}
}
}