HBASE-16392 Backup delete fault tolerance (Vladimir Rodionov)
This commit is contained in:
parent
da3c023635
commit
80e15aac21
|
@ -97,21 +97,81 @@ public class BackupAdminImpl implements BackupAdmin {
|
|||
int totalDeleted = 0;
|
||||
Map<String, HashSet<TableName>> allTablesMap = new HashMap<String, HashSet<TableName>>();
|
||||
|
||||
boolean deleteSessionStarted = false;
|
||||
boolean snapshotDone = false;
|
||||
try (final BackupSystemTable sysTable = new BackupSystemTable(conn)) {
|
||||
for (int i = 0; i < backupIds.length; i++) {
|
||||
BackupInfo info = sysTable.readBackupInfo(backupIds[i]);
|
||||
if (info != null) {
|
||||
String rootDir = info.getBackupRootDir();
|
||||
HashSet<TableName> allTables = allTablesMap.get(rootDir);
|
||||
if (allTables == null) {
|
||||
allTables = new HashSet<TableName>();
|
||||
allTablesMap.put(rootDir, allTables);
|
||||
|
||||
// Step 1: Make sure there is no active session
|
||||
// is running by using startBackupSession API
|
||||
// If there is an active session in progress, exception will be thrown
|
||||
try {
|
||||
sysTable.startBackupSession();
|
||||
deleteSessionStarted = true;
|
||||
} catch (IOException e) {
|
||||
LOG.warn("You can not run delete command while active backup session is in progress. \n"
|
||||
+ "If there is no active backup session running, run backup repair utility to restore \n"
|
||||
+"backup system integrity.");
|
||||
return -1;
|
||||
}
|
||||
|
||||
// Step 2: Make sure there is no failed session
|
||||
List<BackupInfo> list = sysTable.getBackupInfos(BackupState.RUNNING);
|
||||
if (list.size() != 0) {
|
||||
// ailed sessions found
|
||||
LOG.warn("Failed backup session found. Run backup repair tool first.");
|
||||
return -1;
|
||||
}
|
||||
|
||||
// Step 3: Record delete session
|
||||
sysTable.startDeleteOperation(backupIds);
|
||||
// Step 4: Snapshot backup system table
|
||||
if (!BackupSystemTable.snapshotExists(conn)) {
|
||||
BackupSystemTable.snapshot(conn);
|
||||
} else {
|
||||
LOG.warn("Backup system table snapshot exists");
|
||||
}
|
||||
snapshotDone = true;
|
||||
try {
|
||||
for (int i = 0; i < backupIds.length; i++) {
|
||||
BackupInfo info = sysTable.readBackupInfo(backupIds[i]);
|
||||
if (info != null) {
|
||||
String rootDir = info.getBackupRootDir();
|
||||
HashSet<TableName> allTables = allTablesMap.get(rootDir);
|
||||
if (allTables == null) {
|
||||
allTables = new HashSet<TableName>();
|
||||
allTablesMap.put(rootDir, allTables);
|
||||
}
|
||||
allTables.addAll(info.getTableNames());
|
||||
totalDeleted += deleteBackup(backupIds[i], sysTable);
|
||||
}
|
||||
allTables.addAll(info.getTableNames());
|
||||
totalDeleted += deleteBackup(backupIds[i], sysTable);
|
||||
}
|
||||
finalizeDelete(allTablesMap, sysTable);
|
||||
// Finish
|
||||
sysTable.finishDeleteOperation();
|
||||
// delete snapshot
|
||||
BackupSystemTable.deleteSnapshot(conn);
|
||||
} catch (IOException e) {
|
||||
// Fail delete operation
|
||||
// Step 1
|
||||
if (snapshotDone) {
|
||||
if(BackupSystemTable.snapshotExists(conn)) {
|
||||
BackupSystemTable.restoreFromSnapshot(conn);
|
||||
// delete snapshot
|
||||
BackupSystemTable.deleteSnapshot(conn);
|
||||
// We still have record with unfinished delete operation
|
||||
LOG.error("Delete operation failed, please run backup repair utility to restore "+
|
||||
"backup system integrity", e);
|
||||
throw e;
|
||||
} else {
|
||||
LOG.warn("Delete operation succeeded, there were some errors: ", e);
|
||||
}
|
||||
}
|
||||
|
||||
} finally {
|
||||
if (deleteSessionStarted) {
|
||||
sysTable.finishBackupSession();
|
||||
}
|
||||
}
|
||||
finalizeDelete(allTablesMap, sysTable);
|
||||
}
|
||||
return totalDeleted;
|
||||
}
|
||||
|
@ -169,6 +229,7 @@ public class BackupAdminImpl implements BackupAdmin {
|
|||
int totalDeleted = 0;
|
||||
if (backupInfo != null) {
|
||||
LOG.info("Deleting backup " + backupInfo.getBackupId() + " ...");
|
||||
// Step 1: clean up data for backup session (idempotent)
|
||||
BackupUtils.cleanupBackupData(backupInfo, conn.getConfiguration());
|
||||
// List of tables in this backup;
|
||||
List<TableName> tables = backupInfo.getTableNames();
|
||||
|
@ -179,7 +240,7 @@ public class BackupAdminImpl implements BackupAdmin {
|
|||
continue;
|
||||
}
|
||||
// else
|
||||
List<BackupInfo> affectedBackups = getAffectedBackupInfos(backupInfo, tn, sysTable);
|
||||
List<BackupInfo> affectedBackups = getAffectedBackupSessions(backupInfo, tn, sysTable);
|
||||
for (BackupInfo info : affectedBackups) {
|
||||
if (info.equals(backupInfo)) {
|
||||
continue;
|
||||
|
@ -189,7 +250,7 @@ public class BackupAdminImpl implements BackupAdmin {
|
|||
}
|
||||
Map<byte[], String> map = sysTable.readBulkLoadedFiles(backupId);
|
||||
FileSystem fs = FileSystem.get(conn.getConfiguration());
|
||||
boolean succ = true;
|
||||
boolean success = true;
|
||||
int numDeleted = 0;
|
||||
for (String f : map.values()) {
|
||||
Path p = new Path(f);
|
||||
|
@ -198,20 +259,20 @@ public class BackupAdminImpl implements BackupAdmin {
|
|||
if (!fs.delete(p)) {
|
||||
if (fs.exists(p)) {
|
||||
LOG.warn(f + " was not deleted");
|
||||
succ = false;
|
||||
success = false;
|
||||
}
|
||||
} else {
|
||||
numDeleted++;
|
||||
}
|
||||
} catch (IOException ioe) {
|
||||
LOG.warn(f + " was not deleted", ioe);
|
||||
succ = false;
|
||||
success = false;
|
||||
}
|
||||
}
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug(numDeleted + " bulk loaded files out of " + map.size() + " were deleted");
|
||||
}
|
||||
if (succ) {
|
||||
if (success) {
|
||||
sysTable.deleteBulkLoadedFiles(map);
|
||||
}
|
||||
|
||||
|
@ -236,17 +297,18 @@ public class BackupAdminImpl implements BackupAdmin {
|
|||
LOG.debug("Delete backup info " + info.getBackupId());
|
||||
|
||||
sysTable.deleteBackupInfo(info.getBackupId());
|
||||
// Idempotent operation
|
||||
BackupUtils.cleanupBackupData(info, conn.getConfiguration());
|
||||
} else {
|
||||
info.setTables(tables);
|
||||
sysTable.updateBackupInfo(info);
|
||||
// Now, clean up directory for table
|
||||
// Now, clean up directory for table (idempotent)
|
||||
cleanupBackupDir(info, tn, conn.getConfiguration());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private List<BackupInfo> getAffectedBackupInfos(BackupInfo backupInfo, TableName tn,
|
||||
private List<BackupInfo> getAffectedBackupSessions(BackupInfo backupInfo, TableName tn,
|
||||
BackupSystemTable table) throws IOException {
|
||||
LOG.debug("GetAffectedBackupInfos for: " + backupInfo.getBackupId() + " table=" + tn);
|
||||
long ts = backupInfo.getStartTs();
|
||||
|
|
|
@ -47,6 +47,7 @@ import org.apache.hadoop.fs.FileSystem;
|
|||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.backup.BackupAdmin;
|
||||
import org.apache.hadoop.hbase.backup.BackupInfo;
|
||||
import org.apache.hadoop.hbase.backup.BackupInfo.BackupState;
|
||||
import org.apache.hadoop.hbase.backup.BackupRequest;
|
||||
|
@ -148,6 +149,18 @@ public final class BackupCommands {
|
|||
}
|
||||
}
|
||||
}
|
||||
if (requiresConsistentState()) {
|
||||
// Check failed delete
|
||||
try (BackupSystemTable table = new BackupSystemTable(conn);) {
|
||||
String[] ids = table.getListOfBackupIdsFromDeleteOperation();
|
||||
|
||||
if(ids !=null && ids.length > 0) {
|
||||
System.err.println("Found failed backup delete coommand. ");
|
||||
System.err.println("Backup system recovery is required.");
|
||||
throw new IOException("Failed backup delete found, aborted command execution");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public void finish() throws IOException {
|
||||
|
@ -165,6 +178,15 @@ public final class BackupCommands {
|
|||
protected boolean requiresNoActiveSession() {
|
||||
return false;
|
||||
}
|
||||
/**
|
||||
* Command requires consistent state of a backup system
|
||||
* Backup system may become inconsistent because of an abnormal
|
||||
* termination of a backup session or delete command
|
||||
* @return true, if yes
|
||||
*/
|
||||
protected boolean requiresConsistentState() {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
private BackupCommands() {
|
||||
|
@ -223,6 +245,11 @@ public final class BackupCommands {
|
|||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean requiresConsistentState() {
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void execute() throws IOException {
|
||||
if (cmdline == null || cmdline.getArgs() == null) {
|
||||
|
@ -556,7 +583,9 @@ public final class BackupCommands {
|
|||
List<BackupInfo> list = sysTable.getBackupInfos(BackupState.RUNNING);
|
||||
if (list.size() == 0) {
|
||||
// No failed sessions found
|
||||
System.out.println("REPAIR status: no failed sessions found.");
|
||||
System.out.println("REPAIR status: no failed sessions found."
|
||||
+" Checking failed delete backup operation ...");
|
||||
repairFailedBackupDeletionIfAny(conn, sysTable);
|
||||
return;
|
||||
}
|
||||
backupInfo = list.get(0);
|
||||
|
@ -583,6 +612,29 @@ public final class BackupCommands {
|
|||
}
|
||||
}
|
||||
|
||||
private void repairFailedBackupDeletionIfAny(Connection conn, BackupSystemTable sysTable)
|
||||
throws IOException
|
||||
{
|
||||
String[] backupIds = sysTable.getListOfBackupIdsFromDeleteOperation();
|
||||
if (backupIds == null ||backupIds.length == 0) {
|
||||
System.out.println("No failed backup delete operation found");
|
||||
// Delete backup table snapshot if exists
|
||||
BackupSystemTable.deleteSnapshot(conn);
|
||||
return;
|
||||
}
|
||||
System.out.println("Found failed delete operation for: " + StringUtils.join(backupIds));
|
||||
System.out.println("Running delete again ...");
|
||||
// Restore table from snapshot
|
||||
BackupSystemTable.restoreFromSnapshot(conn);
|
||||
// Finish previous failed session
|
||||
sysTable.finishBackupSession();
|
||||
try(BackupAdmin admin = new BackupAdminImpl(conn);) {
|
||||
admin.deleteBackups(backupIds);
|
||||
}
|
||||
System.out.println("Delete operation finished OK: "+ StringUtils.join(backupIds));
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void printUsage() {
|
||||
System.out.println(REPAIR_CMD_USAGE);
|
||||
|
|
|
@ -59,6 +59,7 @@ import org.apache.hadoop.hbase.client.Put;
|
|||
import org.apache.hadoop.hbase.client.Result;
|
||||
import org.apache.hadoop.hbase.client.ResultScanner;
|
||||
import org.apache.hadoop.hbase.client.Scan;
|
||||
import org.apache.hadoop.hbase.client.SnapshotDescription;
|
||||
import org.apache.hadoop.hbase.client.Table;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.BackupProtos;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
|
||||
|
@ -145,6 +146,8 @@ public final class BackupSystemTable implements Closeable {
|
|||
|
||||
private final static String BULK_LOAD_PREFIX = "bulk:";
|
||||
private final static byte[] BULK_LOAD_PREFIX_BYTES = BULK_LOAD_PREFIX.getBytes();
|
||||
private final static byte[] DELETE_OP_ROW = "delete_op_row".getBytes();
|
||||
|
||||
final static byte[] TBL_COL = Bytes.toBytes("tbl");
|
||||
final static byte[] FAM_COL = Bytes.toBytes("fam");
|
||||
final static byte[] PATH_COL = Bytes.toBytes("path");
|
||||
|
@ -1602,6 +1605,69 @@ public final class BackupSystemTable implements Closeable {
|
|||
return puts;
|
||||
}
|
||||
|
||||
public static void snapshot(Connection conn) throws IOException {
|
||||
|
||||
try (Admin admin = conn.getAdmin();){
|
||||
Configuration conf = conn.getConfiguration();
|
||||
admin.snapshot(BackupSystemTable.getSnapshotName(conf),
|
||||
BackupSystemTable.getTableName(conf));
|
||||
}
|
||||
}
|
||||
|
||||
public static void restoreFromSnapshot(Connection conn)
|
||||
throws IOException {
|
||||
|
||||
Configuration conf = conn.getConfiguration();
|
||||
LOG.debug("Restoring " + BackupSystemTable.getTableNameAsString(conf) +
|
||||
" from snapshot");
|
||||
try (Admin admin = conn.getAdmin();) {
|
||||
String snapshotName = BackupSystemTable.getSnapshotName(conf);
|
||||
if (snapshotExists(admin, snapshotName)) {
|
||||
admin.disableTable(BackupSystemTable.getTableName(conf));
|
||||
admin.restoreSnapshot(snapshotName);
|
||||
admin.enableTable(BackupSystemTable.getTableName(conf));
|
||||
LOG.debug("Done restoring backup system table");
|
||||
} else {
|
||||
// Snapshot does not exists, i.e completeBackup failed after
|
||||
// deleting backup system table snapshot
|
||||
// In this case we log WARN and proceed
|
||||
LOG.warn("Could not restore backup system table. Snapshot " + snapshotName+
|
||||
" does not exists.");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
protected static boolean snapshotExists(Admin admin, String snapshotName) throws IOException {
|
||||
|
||||
List<SnapshotDescription> list = admin.listSnapshots();
|
||||
for (SnapshotDescription desc: list) {
|
||||
if (desc.getName().equals(snapshotName)) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
public static boolean snapshotExists (Connection conn) throws IOException {
|
||||
return snapshotExists(conn.getAdmin(), getSnapshotName(conn.getConfiguration()));
|
||||
}
|
||||
|
||||
public static void deleteSnapshot(Connection conn)
|
||||
throws IOException {
|
||||
|
||||
Configuration conf = conn.getConfiguration();
|
||||
LOG.debug("Deleting " + BackupSystemTable.getSnapshotName(conf) +
|
||||
" from the system");
|
||||
try (Admin admin = conn.getAdmin();) {
|
||||
String snapshotName = BackupSystemTable.getSnapshotName(conf);
|
||||
if (snapshotExists(admin, snapshotName)) {
|
||||
admin.deleteSnapshot(snapshotName);
|
||||
LOG.debug("Done deleting backup system table snapshot");
|
||||
} else {
|
||||
LOG.error("Snapshot "+snapshotName+" does not exists");
|
||||
}
|
||||
}
|
||||
}
|
||||
/*
|
||||
* Creates Put's for bulk load resulting from running LoadIncrementalHFiles
|
||||
*/
|
||||
|
@ -1626,6 +1692,7 @@ public final class BackupSystemTable implements Closeable {
|
|||
}
|
||||
return puts;
|
||||
}
|
||||
|
||||
public static List<Delete> createDeleteForOrigBulkLoad(List<TableName> lst) {
|
||||
List<Delete> lstDels = new ArrayList<>();
|
||||
for (TableName table : lst) {
|
||||
|
@ -1636,6 +1703,68 @@ public final class BackupSystemTable implements Closeable {
|
|||
return lstDels;
|
||||
}
|
||||
|
||||
private Put createPutForDeleteOperation(String[] backupIdList) {
|
||||
|
||||
byte[] value = Bytes.toBytes(StringUtils.join(backupIdList, ","));
|
||||
Put put = new Put(DELETE_OP_ROW);
|
||||
put.addColumn(META_FAMILY, FAM_COL, value);
|
||||
return put;
|
||||
}
|
||||
|
||||
private Delete createDeleteForBackupDeleteOperation() {
|
||||
|
||||
Delete delete = new Delete(DELETE_OP_ROW);
|
||||
delete.addFamily(META_FAMILY);
|
||||
return delete;
|
||||
}
|
||||
|
||||
private Get createGetForDeleteOperation() {
|
||||
|
||||
Get get = new Get(DELETE_OP_ROW);
|
||||
get.addFamily(META_FAMILY);
|
||||
return get;
|
||||
}
|
||||
|
||||
|
||||
public void startDeleteOperation(String[] backupIdList) throws IOException {
|
||||
if (LOG.isTraceEnabled()) {
|
||||
LOG.trace("Start delete operation for backups: " + StringUtils.join(backupIdList));
|
||||
}
|
||||
Put put = createPutForDeleteOperation(backupIdList);
|
||||
try (Table table = connection.getTable(tableName)) {
|
||||
table.put(put);
|
||||
}
|
||||
}
|
||||
|
||||
public void finishDeleteOperation() throws IOException {
|
||||
if (LOG.isTraceEnabled()) {
|
||||
LOG.trace("Finsih delete operation for backup ids ");
|
||||
}
|
||||
Delete delete = createDeleteForBackupDeleteOperation();
|
||||
try (Table table = connection.getTable(tableName)) {
|
||||
table.delete(delete);
|
||||
}
|
||||
}
|
||||
|
||||
public String[] getListOfBackupIdsFromDeleteOperation() throws IOException {
|
||||
if (LOG.isTraceEnabled()) {
|
||||
LOG.trace("Get delete operation for backup ids ");
|
||||
}
|
||||
Get get = createGetForDeleteOperation();
|
||||
try (Table table = connection.getTable(tableName)) {
|
||||
Result res = table.get(get);
|
||||
if (res.isEmpty()) {
|
||||
return null;
|
||||
}
|
||||
Cell cell = res.listCells().get(0);
|
||||
byte[] val = CellUtil.cloneValue(cell);
|
||||
if (val.length == 0) {
|
||||
return null;
|
||||
}
|
||||
return new String(val).split(",");
|
||||
}
|
||||
}
|
||||
|
||||
static Scan createScanForOrigBulkLoadedFiles(TableName table) throws IOException {
|
||||
Scan scan = new Scan();
|
||||
byte[] startRow = rowkey(BULK_LOAD_PREFIX, table.toString(), BLK_LD_DELIM);
|
||||
|
|
|
@ -40,7 +40,6 @@ import org.apache.hadoop.hbase.backup.impl.BackupManifest.BackupImage;
|
|||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.client.Admin;
|
||||
import org.apache.hadoop.hbase.client.Connection;
|
||||
import org.apache.hadoop.hbase.client.SnapshotDescription;
|
||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||
import org.apache.hadoop.hbase.util.FSUtils;
|
||||
|
||||
|
@ -109,7 +108,7 @@ public abstract class TableBackupClient {
|
|||
protected void beginBackup(BackupManager backupManager, BackupInfo backupInfo)
|
||||
throws IOException {
|
||||
|
||||
snapshotBackupTable();
|
||||
BackupSystemTable.snapshot(conn);
|
||||
backupManager.setBackupInfo(backupInfo);
|
||||
// set the start timestamp of the overall backup
|
||||
long startTs = EnvironmentEdgeManager.currentTime();
|
||||
|
@ -269,69 +268,15 @@ public abstract class TableBackupClient {
|
|||
deleteSnapshots(conn, backupInfo, conf);
|
||||
cleanupExportSnapshotLog(conf);
|
||||
}
|
||||
restoreBackupTable(conn, conf);
|
||||
deleteBackupTableSnapshot(conn, conf);
|
||||
BackupSystemTable.restoreFromSnapshot(conn);
|
||||
BackupSystemTable.deleteSnapshot(conn);
|
||||
// clean up the uncompleted data at target directory if the ongoing backup has already entered
|
||||
// the copy phase
|
||||
// For incremental backup, DistCp logs will be cleaned with the targetDir.
|
||||
cleanupTargetDir(backupInfo, conf);
|
||||
}
|
||||
|
||||
protected void snapshotBackupTable() throws IOException {
|
||||
|
||||
try (Admin admin = conn.getAdmin();){
|
||||
admin.snapshot(BackupSystemTable.getSnapshotName(conf),
|
||||
BackupSystemTable.getTableName(conf));
|
||||
}
|
||||
}
|
||||
|
||||
protected static void restoreBackupTable(Connection conn, Configuration conf)
|
||||
throws IOException {
|
||||
|
||||
LOG.debug("Restoring " + BackupSystemTable.getTableNameAsString(conf) +
|
||||
" from snapshot");
|
||||
try (Admin admin = conn.getAdmin();) {
|
||||
String snapshotName = BackupSystemTable.getSnapshotName(conf);
|
||||
if (snapshotExists(admin, snapshotName)) {
|
||||
admin.disableTable(BackupSystemTable.getTableName(conf));
|
||||
admin.restoreSnapshot(snapshotName);
|
||||
admin.enableTable(BackupSystemTable.getTableName(conf));
|
||||
LOG.debug("Done restoring backup system table");
|
||||
} else {
|
||||
// Snapshot does not exists, i.e completeBackup failed after
|
||||
// deleting backup system table snapshot
|
||||
// In this case we log WARN and proceed
|
||||
LOG.error("Could not restore backup system table. Snapshot " + snapshotName+
|
||||
" does not exists.");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
protected static boolean snapshotExists(Admin admin, String snapshotName) throws IOException {
|
||||
|
||||
List<SnapshotDescription> list = admin.listSnapshots();
|
||||
for (SnapshotDescription desc: list) {
|
||||
if (desc.getName().equals(snapshotName)) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
protected static void deleteBackupTableSnapshot(Connection conn, Configuration conf)
|
||||
throws IOException {
|
||||
LOG.debug("Deleting " + BackupSystemTable.getSnapshotName(conf) +
|
||||
" from the system");
|
||||
try (Admin admin = conn.getAdmin();) {
|
||||
String snapshotName = BackupSystemTable.getSnapshotName(conf);
|
||||
if (snapshotExists(admin, snapshotName)) {
|
||||
admin.deleteSnapshot(snapshotName);
|
||||
LOG.debug("Done deleting backup system table snapshot");
|
||||
} else {
|
||||
LOG.error("Snapshot "+snapshotName+" does not exists");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Add manifest for the current backup. The manifest is stored within the table backup directory.
|
||||
|
@ -457,7 +402,7 @@ public abstract class TableBackupClient {
|
|||
} else if (type == BackupType.INCREMENTAL) {
|
||||
cleanupDistCpLog(backupInfo, conf);
|
||||
}
|
||||
deleteBackupTableSnapshot(conn, conf);
|
||||
BackupSystemTable.deleteSnapshot(conn);
|
||||
backupManager.updateBackupInfo(backupInfo);
|
||||
|
||||
// Finish active session
|
||||
|
|
|
@ -66,9 +66,8 @@ import org.apache.hadoop.hbase.snapshot.SnapshotTestingUtils;
|
|||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||
import org.apache.hadoop.hbase.wal.WALFactory;
|
||||
import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Before;
|
||||
|
||||
/**
|
||||
* This class is only a base for other integration-level backup tests. Do not add tests here.
|
||||
|
@ -79,11 +78,11 @@ public class TestBackupBase {
|
|||
|
||||
private static final Log LOG = LogFactory.getLog(TestBackupBase.class);
|
||||
|
||||
protected static Configuration conf1;
|
||||
protected static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
|
||||
protected static HBaseTestingUtility TEST_UTIL2;
|
||||
protected static Configuration conf1 = TEST_UTIL.getConfiguration();
|
||||
protected static Configuration conf2;
|
||||
|
||||
protected static HBaseTestingUtility TEST_UTIL;
|
||||
protected static HBaseTestingUtility TEST_UTIL2;
|
||||
protected static TableName table1 = TableName.valueOf("table1");
|
||||
protected static HTableDescriptor table1Desc;
|
||||
protected static TableName table2 = TableName.valueOf("table2");
|
||||
|
@ -105,6 +104,9 @@ public class TestBackupBase {
|
|||
protected static boolean secure = false;
|
||||
|
||||
protected static boolean autoRestoreOnFailure = true;
|
||||
protected static boolean setupIsDone = false;
|
||||
protected static boolean useSecondCluster = false;
|
||||
|
||||
|
||||
static class IncrementalTableBackupClientForTest extends IncrementalTableBackupClient
|
||||
{
|
||||
|
@ -281,10 +283,11 @@ public class TestBackupBase {
|
|||
/**
|
||||
* @throws java.lang.Exception
|
||||
*/
|
||||
@BeforeClass
|
||||
public static void setUpBeforeClass() throws Exception {
|
||||
TEST_UTIL = new HBaseTestingUtility();
|
||||
conf1 = TEST_UTIL.getConfiguration();
|
||||
@Before
|
||||
public void setUp() throws Exception {
|
||||
if (setupIsDone) {
|
||||
return;
|
||||
}
|
||||
if (secure) {
|
||||
// set the always on security provider
|
||||
UserProvider.setUserProviderForTesting(TEST_UTIL.getConfiguration(),
|
||||
|
@ -301,24 +304,27 @@ public class TestBackupBase {
|
|||
conf1.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/1");
|
||||
// Set MultiWAL (with 2 default WAL files per RS)
|
||||
conf1.set(WALFactory.WAL_PROVIDER, provider);
|
||||
TEST_UTIL.startMiniZKCluster();
|
||||
MiniZooKeeperCluster miniZK = TEST_UTIL.getZkCluster();
|
||||
|
||||
conf2 = HBaseConfiguration.create(conf1);
|
||||
conf2.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/2");
|
||||
TEST_UTIL2 = new HBaseTestingUtility(conf2);
|
||||
TEST_UTIL2.setZkCluster(miniZK);
|
||||
TEST_UTIL.startMiniCluster();
|
||||
TEST_UTIL2.startMiniCluster();
|
||||
|
||||
if (useSecondCluster) {
|
||||
conf2 = HBaseConfiguration.create(conf1);
|
||||
conf2.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/2");
|
||||
TEST_UTIL2 = new HBaseTestingUtility(conf2);
|
||||
TEST_UTIL2.setZkCluster(TEST_UTIL.getZkCluster());
|
||||
TEST_UTIL2.startMiniCluster();
|
||||
}
|
||||
conf1 = TEST_UTIL.getConfiguration();
|
||||
|
||||
TEST_UTIL.startMiniMapReduceCluster();
|
||||
BACKUP_ROOT_DIR = TEST_UTIL.getConfiguration().get("fs.defaultFS") + "/backupUT";
|
||||
LOG.info("ROOTDIR " + BACKUP_ROOT_DIR);
|
||||
BACKUP_REMOTE_ROOT_DIR = TEST_UTIL2.getConfiguration().get("fs.defaultFS") + "/backupUT";
|
||||
LOG.info("REMOTE ROOTDIR " + BACKUP_REMOTE_ROOT_DIR);
|
||||
if (useSecondCluster) {
|
||||
BACKUP_REMOTE_ROOT_DIR = TEST_UTIL2.getConfiguration().get("fs.defaultFS") + "/backupUT";
|
||||
LOG.info("REMOTE ROOTDIR " + BACKUP_REMOTE_ROOT_DIR);
|
||||
}
|
||||
createTables();
|
||||
populateFromMasterConfig(TEST_UTIL.getHBaseCluster().getMaster().getConfiguration(), conf1);
|
||||
setupIsDone = true;
|
||||
}
|
||||
|
||||
private static void populateFromMasterConfig(Configuration masterConf, Configuration conf) {
|
||||
|
@ -333,10 +339,15 @@ public class TestBackupBase {
|
|||
* @throws java.lang.Exception
|
||||
*/
|
||||
@AfterClass
|
||||
public static void tearDownAfterClass() throws Exception {
|
||||
SnapshotTestingUtils.deleteAllSnapshots(TEST_UTIL.getHBaseAdmin());
|
||||
public static void tearDown() throws Exception {
|
||||
try{
|
||||
SnapshotTestingUtils.deleteAllSnapshots(TEST_UTIL.getHBaseAdmin());
|
||||
} catch (Exception e) {
|
||||
}
|
||||
SnapshotTestingUtils.deleteArchiveDirectory(TEST_UTIL);
|
||||
TEST_UTIL2.shutdownMiniCluster();
|
||||
if (useSecondCluster) {
|
||||
TEST_UTIL2.shutdownMiniCluster();
|
||||
}
|
||||
TEST_UTIL.shutdownMiniCluster();
|
||||
TEST_UTIL.shutdownMiniMapReduceCluster();
|
||||
}
|
||||
|
|
|
@ -0,0 +1,194 @@
|
|||
/*
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.backup;
|
||||
|
||||
import static org.junit.Assert.assertNull;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.HTableDescriptor;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.backup.impl.BackupSystemTable;
|
||||
import org.apache.hadoop.hbase.client.Admin;
|
||||
import org.apache.hadoop.hbase.client.Connection;
|
||||
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
|
||||
import org.apache.hadoop.hbase.coprocessor.MasterCoprocessorEnvironment;
|
||||
import org.apache.hadoop.hbase.coprocessor.MasterObserver;
|
||||
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotDescription;
|
||||
import org.apache.hadoop.hbase.testclassification.LargeTests;
|
||||
import org.apache.hadoop.util.ToolRunner;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
|
||||
import com.google.common.collect.Lists;
|
||||
|
||||
/**
|
||||
* This class is only a base for other integration-level backup tests. Do not add tests here.
|
||||
* TestBackupSmallTests is where tests that don't require bring machines up/down should go All other
|
||||
* tests should have their own classes and extend this one
|
||||
*/
|
||||
@Category(LargeTests.class)
|
||||
public class TestBackupDeleteWithFailures extends TestBackupBase{
|
||||
|
||||
private static final Log LOG = LogFactory.getLog(TestBackupDeleteWithFailures.class);
|
||||
|
||||
|
||||
|
||||
public static enum Failure {
|
||||
NO_FAILURES,
|
||||
PRE_SNAPSHOT_FAILURE,
|
||||
PRE_DELETE_SNAPSHOT_FAILURE,
|
||||
POST_DELETE_SNAPSHOT_FAILURE
|
||||
}
|
||||
|
||||
public static class MasterSnapshotObserver implements MasterObserver {
|
||||
|
||||
|
||||
List<Failure> failures = new ArrayList<Failure>();
|
||||
|
||||
public void setFailures(Failure ... f) {
|
||||
failures.clear();
|
||||
for (int i = 0; i < f.length; i++) {
|
||||
failures.add(f[i]);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void preSnapshot(final ObserverContext<MasterCoprocessorEnvironment> ctx,
|
||||
final SnapshotDescription snapshot, final HTableDescriptor hTableDescriptor)
|
||||
throws IOException
|
||||
{
|
||||
if (failures.contains(Failure.PRE_SNAPSHOT_FAILURE)) {
|
||||
throw new IOException ("preSnapshot");
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void preDeleteSnapshot(ObserverContext<MasterCoprocessorEnvironment> ctx,
|
||||
SnapshotDescription snapshot) throws IOException {
|
||||
if (failures.contains(Failure.PRE_DELETE_SNAPSHOT_FAILURE)) {
|
||||
throw new IOException ("preDeleteSnapshot");
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void postDeleteSnapshot(ObserverContext<MasterCoprocessorEnvironment> ctx,
|
||||
SnapshotDescription snapshot) throws IOException {
|
||||
if (failures.contains(Failure.POST_DELETE_SNAPSHOT_FAILURE)) {
|
||||
throw new IOException ("postDeleteSnapshot");
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* @throws java.lang.Exception
|
||||
*/
|
||||
@Override
|
||||
@Before
|
||||
public void setUp() throws Exception {
|
||||
conf1.set(CoprocessorHost.MASTER_COPROCESSOR_CONF_KEY,
|
||||
MasterSnapshotObserver.class.getName());
|
||||
conf1.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1);
|
||||
super.setUp();
|
||||
}
|
||||
|
||||
|
||||
private MasterSnapshotObserver getMasterSnapshotObserver() {
|
||||
return (MasterSnapshotObserver)TEST_UTIL.getHBaseCluster().getMaster()
|
||||
.getMasterCoprocessorHost().findCoprocessor(MasterSnapshotObserver.class.getName());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testBackupDeleteWithFailures() throws Exception
|
||||
{
|
||||
testBackupDeleteWithFailuresAfter(1, Failure.PRE_DELETE_SNAPSHOT_FAILURE);
|
||||
testBackupDeleteWithFailuresAfter(0, Failure.POST_DELETE_SNAPSHOT_FAILURE);
|
||||
testBackupDeleteWithFailuresAfter(1, Failure.PRE_SNAPSHOT_FAILURE);
|
||||
}
|
||||
|
||||
private void testBackupDeleteWithFailuresAfter(int expected, Failure ...failures) throws Exception {
|
||||
LOG.info("test repair backup delete on a single table with data and failures "+ failures[0]);
|
||||
List<TableName> tableList = Lists.newArrayList(table1);
|
||||
String backupId = fullTableBackup(tableList);
|
||||
assertTrue(checkSucceeded(backupId));
|
||||
LOG.info("backup complete");
|
||||
String[] backupIds = new String[] { backupId };
|
||||
BackupSystemTable table = new BackupSystemTable(TEST_UTIL.getConnection());
|
||||
BackupInfo info = table.readBackupInfo(backupId);
|
||||
Path path = new Path(info.getBackupRootDir(), backupId);
|
||||
FileSystem fs = FileSystem.get(path.toUri(), conf1);
|
||||
assertTrue(fs.exists(path));
|
||||
|
||||
Connection conn = TEST_UTIL.getConnection();
|
||||
Admin admin = conn.getAdmin();
|
||||
MasterSnapshotObserver observer = getMasterSnapshotObserver();
|
||||
|
||||
observer.setFailures(failures);
|
||||
try {
|
||||
getBackupAdmin().deleteBackups(backupIds);
|
||||
} catch(IOException e) {
|
||||
if(expected != 1) assertTrue(false);
|
||||
}
|
||||
|
||||
// Verify that history length == expected after delete failure
|
||||
assertTrue (table.getBackupHistory().size() == expected);
|
||||
|
||||
String[] ids = table.getListOfBackupIdsFromDeleteOperation();
|
||||
|
||||
// Verify that we still have delete record in backup system table
|
||||
if(expected == 1) {
|
||||
assertTrue(ids.length == 1);
|
||||
assertTrue(ids[0].equals(backupId));
|
||||
} else {
|
||||
assertNull(ids);
|
||||
}
|
||||
|
||||
// Now run repair command to repair "failed" delete operation
|
||||
String[] args = new String[] {"repair"};
|
||||
|
||||
observer.setFailures(Failure.NO_FAILURES);
|
||||
|
||||
// Run repair
|
||||
int ret = ToolRunner.run(conf1, new BackupDriver(), args);
|
||||
assertTrue(ret == 0);
|
||||
// Verify that history length == 0
|
||||
assertTrue (table.getBackupHistory().size() == 0);
|
||||
ids = table.getListOfBackupIdsFromDeleteOperation();
|
||||
|
||||
// Verify that we do not have delete record in backup system table
|
||||
assertNull(ids);
|
||||
|
||||
table.close();
|
||||
admin.close();
|
||||
}
|
||||
|
||||
|
||||
|
||||
}
|
|
@ -36,7 +36,6 @@ import org.apache.hadoop.hbase.client.HTable;
|
|||
import org.apache.hadoop.hbase.client.Put;
|
||||
import org.apache.hadoop.hbase.testclassification.LargeTests;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.hamcrest.CoreMatchers;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
|
|
|
@ -54,7 +54,7 @@ public class TestIncrementalBackupDeleteTable extends TestBackupBase {
|
|||
|
||||
// implement all test cases in 1 test since incremental backup/restore has dependencies
|
||||
@Test
|
||||
public void TestIncBackupDeleteTable() throws Exception {
|
||||
public void testIncBackupDeleteTable() throws Exception {
|
||||
// #1 - create full backup for all tables
|
||||
LOG.info("create full backup image for all tables");
|
||||
|
||||
|
|
|
@ -69,7 +69,7 @@ public class TestIncrementalBackupWithFailures extends TestBackupBase {
|
|||
|
||||
// implement all test cases in 1 test since incremental backup/restore has dependencies
|
||||
@Test
|
||||
public void TestIncBackupRestore() throws Exception {
|
||||
public void testIncBackupRestore() throws Exception {
|
||||
|
||||
int ADD_ROWS = 99;
|
||||
// #1 - create full backup for all tables
|
||||
|
|
|
@ -42,6 +42,12 @@ public class TestRemoteBackup extends TestBackupBase {
|
|||
|
||||
private static final Log LOG = LogFactory.getLog(TestRemoteBackup.class);
|
||||
|
||||
@Override
|
||||
public void setUp () throws Exception {
|
||||
useSecondCluster = true;
|
||||
super.setUp();
|
||||
}
|
||||
|
||||
/**
|
||||
* Verify that a remote full backup is created on a single table with data correctly.
|
||||
* @throws Exception
|
||||
|
|
|
@ -27,6 +27,13 @@ public class TestRemoteRestore extends TestBackupBase {
|
|||
|
||||
private static final Log LOG = LogFactory.getLog(TestRemoteRestore.class);
|
||||
|
||||
@Override
|
||||
public void setUp () throws Exception {
|
||||
useSecondCluster = true;
|
||||
super.setUp();
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Verify that a remote restore on a single table is successful.
|
||||
* @throws Exception
|
||||
|
|
|
@ -0,0 +1,93 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.hbase.backup;
|
||||
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.backup.impl.BackupSystemTable;
|
||||
import org.apache.hadoop.hbase.client.Admin;
|
||||
import org.apache.hadoop.hbase.client.Connection;
|
||||
import org.apache.hadoop.hbase.testclassification.LargeTests;
|
||||
import org.apache.hadoop.util.ToolRunner;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
|
||||
import com.google.common.collect.Lists;
|
||||
|
||||
@Category(LargeTests.class)
|
||||
public class TestRepairAfterFailedDelete extends TestBackupBase {
|
||||
|
||||
private static final Log LOG = LogFactory.getLog(TestRepairAfterFailedDelete.class);
|
||||
|
||||
@Test
|
||||
public void testRepairBackupDelete() throws Exception {
|
||||
LOG.info("test repair backup delete on a single table with data");
|
||||
List<TableName> tableList = Lists.newArrayList(table1);
|
||||
String backupId = fullTableBackup(tableList);
|
||||
assertTrue(checkSucceeded(backupId));
|
||||
LOG.info("backup complete");
|
||||
String[] backupIds = new String[] { backupId };
|
||||
BackupSystemTable table = new BackupSystemTable(TEST_UTIL.getConnection());
|
||||
BackupInfo info = table.readBackupInfo(backupId);
|
||||
Path path = new Path(info.getBackupRootDir(), backupId);
|
||||
FileSystem fs = FileSystem.get(path.toUri(), conf1);
|
||||
assertTrue(fs.exists(path));
|
||||
|
||||
// Snapshot backup system table before delete
|
||||
String snapshotName = "snapshot-backup";
|
||||
Connection conn = TEST_UTIL.getConnection();
|
||||
Admin admin = conn.getAdmin();
|
||||
admin.snapshot(snapshotName, BackupSystemTable.getTableName(conf1));
|
||||
|
||||
int deleted = getBackupAdmin().deleteBackups(backupIds);
|
||||
|
||||
assertTrue(!fs.exists(path));
|
||||
assertTrue(fs.exists(new Path(info.getBackupRootDir())));
|
||||
assertTrue(1 == deleted);
|
||||
|
||||
// Emulate delete failure
|
||||
// Restore backup system table
|
||||
admin.disableTable(BackupSystemTable.getTableName(conf1));
|
||||
admin.restoreSnapshot(snapshotName);
|
||||
admin.enableTable(BackupSystemTable.getTableName(conf1));
|
||||
// Start backup session
|
||||
table.startBackupSession();
|
||||
// Start delete operation
|
||||
table.startDeleteOperation(backupIds);
|
||||
|
||||
// Now run repair command to repair "failed" delete operation
|
||||
String[] args = new String[] {"repair"};
|
||||
// Run restore
|
||||
int ret = ToolRunner.run(conf1, new BackupDriver(), args);
|
||||
assertTrue(ret == 0);
|
||||
// Verify that history length == 0
|
||||
assertTrue (table.getBackupHistory().size() == 0);
|
||||
table.close();
|
||||
admin.close();
|
||||
}
|
||||
|
||||
|
||||
}
|
|
@ -32,11 +32,10 @@ public class TestSystemTableSnapshot extends TestBackupBase {
|
|||
private static final Log LOG = LogFactory.getLog(TestSystemTableSnapshot.class);
|
||||
|
||||
/**
|
||||
* Verify that a single table is restored to a new table
|
||||
* Verify backup system table snapshot
|
||||
* @throws Exception
|
||||
*/
|
||||
//@Test - Disabled until we get resolution on system table snapshots
|
||||
|
||||
// @Test
|
||||
public void _testBackupRestoreSystemTable() throws Exception {
|
||||
|
||||
LOG.info("test snapshot system table");
|
||||
|
|
Loading…
Reference in New Issue