HBASE-19969: Improve fault tolerance in backup Merge operation
Signed-off-by: tedyu <yuzhihong@gmail.com>
This commit is contained in:
parent
19a396b9c2
commit
d5aaeee88b
@ -63,6 +63,25 @@ public final class HBackupFileSystem {
|
||||
+ Path.SEPARATOR;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get backup temporary directory
|
||||
* @param backupRootDir backup root
|
||||
* @return backup tmp directory path
|
||||
*/
|
||||
public static Path getBackupTmpDirPath(String backupRootDir) {
|
||||
return new Path(backupRootDir, ".tmp");
|
||||
}
|
||||
|
||||
/**
|
||||
* Get backup tmp directory for backupId
|
||||
* @param backupRoot backup root
|
||||
* @param backupId backup id
|
||||
* @return backup tmp directory path
|
||||
*/
|
||||
public static Path getBackupTmpDirPathForBackupId(String backupRoot, String backupId) {
|
||||
return new Path(getBackupTmpDirPath(backupRoot), backupId);
|
||||
}
|
||||
|
||||
public static String getTableBackupDataDir(String backupRootDir, String backupId,
|
||||
TableName tableName) {
|
||||
return getTableBackupDir(backupRootDir, backupId, tableName) + Path.SEPARATOR + "data";
|
||||
|
@ -619,6 +619,7 @@ public class BackupAdminImpl implements BackupAdmin {
|
||||
public void mergeBackups(String[] backupIds) throws IOException {
|
||||
try (final BackupSystemTable sysTable = new BackupSystemTable(conn)) {
|
||||
checkIfValidForMerge(backupIds, sysTable);
|
||||
//TODO run job on remote cluster
|
||||
BackupMergeJob job = BackupRestoreFactory.getBackupMergeJob(conn.getConfiguration());
|
||||
job.run(backupIds);
|
||||
}
|
||||
|
@ -58,13 +58,13 @@ import org.apache.hadoop.hbase.backup.BackupRequest;
|
||||
import org.apache.hadoop.hbase.backup.BackupRestoreConstants;
|
||||
import org.apache.hadoop.hbase.backup.BackupRestoreConstants.BackupCommand;
|
||||
import org.apache.hadoop.hbase.backup.BackupType;
|
||||
import org.apache.hadoop.hbase.backup.HBackupFileSystem;
|
||||
import org.apache.hadoop.hbase.backup.util.BackupSet;
|
||||
import org.apache.hadoop.hbase.backup.util.BackupUtils;
|
||||
import org.apache.hadoop.hbase.client.Connection;
|
||||
import org.apache.hadoop.hbase.client.ConnectionFactory;
|
||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
|
||||
import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
|
||||
|
||||
/**
|
||||
@ -403,7 +403,7 @@ public final class BackupCommands {
|
||||
}
|
||||
}
|
||||
|
||||
private static class HelpCommand extends Command {
|
||||
public static class HelpCommand extends Command {
|
||||
HelpCommand(Configuration conf, CommandLine cmdline) {
|
||||
super(conf);
|
||||
this.cmdline = cmdline;
|
||||
@ -454,7 +454,7 @@ public final class BackupCommands {
|
||||
}
|
||||
}
|
||||
|
||||
private static class DescribeCommand extends Command {
|
||||
public static class DescribeCommand extends Command {
|
||||
DescribeCommand(Configuration conf, CommandLine cmdline) {
|
||||
super(conf);
|
||||
this.cmdline = cmdline;
|
||||
@ -492,7 +492,7 @@ public final class BackupCommands {
|
||||
}
|
||||
}
|
||||
|
||||
private static class ProgressCommand extends Command {
|
||||
public static class ProgressCommand extends Command {
|
||||
ProgressCommand(Configuration conf, CommandLine cmdline) {
|
||||
super(conf);
|
||||
this.cmdline = cmdline;
|
||||
@ -547,7 +547,7 @@ public final class BackupCommands {
|
||||
}
|
||||
}
|
||||
|
||||
private static class DeleteCommand extends Command {
|
||||
public static class DeleteCommand extends Command {
|
||||
DeleteCommand(Configuration conf, CommandLine cmdline) {
|
||||
super(conf);
|
||||
this.cmdline = cmdline;
|
||||
@ -586,7 +586,7 @@ public final class BackupCommands {
|
||||
}
|
||||
}
|
||||
|
||||
private static class RepairCommand extends Command {
|
||||
public static class RepairCommand extends Command {
|
||||
RepairCommand(Configuration conf, CommandLine cmdline) {
|
||||
super(conf);
|
||||
this.cmdline = cmdline;
|
||||
@ -661,8 +661,9 @@ public final class BackupCommands {
|
||||
System.out.println("DELETE operation finished OK: " + StringUtils.join(backupIds));
|
||||
}
|
||||
|
||||
private void repairFailedBackupMergeIfAny(Connection conn, BackupSystemTable sysTable)
|
||||
public static void repairFailedBackupMergeIfAny(Connection conn, BackupSystemTable sysTable)
|
||||
throws IOException {
|
||||
|
||||
String[] backupIds = sysTable.getListOfBackupIdsFromMergeOperation();
|
||||
if (backupIds == null || backupIds.length == 0) {
|
||||
System.out.println("No failed backup MERGE operation found");
|
||||
@ -671,17 +672,52 @@ public final class BackupCommands {
|
||||
return;
|
||||
}
|
||||
System.out.println("Found failed MERGE operation for: " + StringUtils.join(backupIds));
|
||||
System.out.println("Running MERGE again ...");
|
||||
// Check if backup .tmp exists
|
||||
BackupInfo bInfo = sysTable.readBackupInfo(backupIds[0]);
|
||||
String backupRoot = bInfo.getBackupRootDir();
|
||||
FileSystem fs = FileSystem.get(new Path(backupRoot).toUri(), new Configuration());
|
||||
String backupId = BackupUtils.findMostRecentBackupId(backupIds);
|
||||
Path tmpPath = HBackupFileSystem.getBackupTmpDirPathForBackupId(backupRoot, backupId);
|
||||
if (fs.exists(tmpPath)) {
|
||||
// Move data back
|
||||
Path destPath = HBackupFileSystem.getBackupPath(backupRoot, backupId);
|
||||
if (!fs.delete(destPath, true)) {
|
||||
System.out.println("Failed to delete " + destPath);
|
||||
}
|
||||
boolean res = fs.rename(tmpPath, destPath);
|
||||
if (!res) {
|
||||
throw new IOException("MERGE repair: failed to rename from "+ tmpPath+" to "+ destPath);
|
||||
}
|
||||
System.out.println("MERGE repair: renamed from "+ tmpPath+" to "+ destPath+" res="+ res);
|
||||
} else {
|
||||
checkRemoveBackupImages(fs, backupRoot, backupIds);
|
||||
}
|
||||
// Restore table from snapshot
|
||||
BackupSystemTable.restoreFromSnapshot(conn);
|
||||
// Unlock backupo system
|
||||
// Unlock backup system
|
||||
sysTable.finishBackupExclusiveOperation();
|
||||
// Finish previous failed session
|
||||
sysTable.finishMergeOperation();
|
||||
try (BackupAdmin admin = new BackupAdminImpl(conn)) {
|
||||
admin.mergeBackups(backupIds);
|
||||
|
||||
System.out.println("MERGE repair operation finished OK: " + StringUtils.join(backupIds));
|
||||
}
|
||||
|
||||
private static void checkRemoveBackupImages(FileSystem fs, String backupRoot,
|
||||
String[] backupIds) throws IOException {
|
||||
String mergedBackupId = BackupUtils.findMostRecentBackupId(backupIds);
|
||||
for (String backupId: backupIds) {
|
||||
if (backupId.equals(mergedBackupId)) {
|
||||
continue;
|
||||
}
|
||||
Path path = HBackupFileSystem.getBackupPath(backupRoot, backupId);
|
||||
if (fs.exists(path)) {
|
||||
if (!fs.delete(path, true)) {
|
||||
System.out.println("MERGE repair removing: "+ path +" - FAILED");
|
||||
} else {
|
||||
System.out.println("MERGE repair removing: "+ path +" - OK");
|
||||
}
|
||||
}
|
||||
}
|
||||
System.out.println("MERGE operation finished OK: " + StringUtils.join(backupIds));
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -690,7 +726,7 @@ public final class BackupCommands {
|
||||
}
|
||||
}
|
||||
|
||||
private static class MergeCommand extends Command {
|
||||
public static class MergeCommand extends Command {
|
||||
MergeCommand(Configuration conf, CommandLine cmdline) {
|
||||
super(conf);
|
||||
this.cmdline = cmdline;
|
||||
@ -739,7 +775,7 @@ public final class BackupCommands {
|
||||
}
|
||||
}
|
||||
|
||||
private static class HistoryCommand extends Command {
|
||||
public static class HistoryCommand extends Command {
|
||||
private final static int DEFAULT_HISTORY_LENGTH = 10;
|
||||
|
||||
HistoryCommand(Configuration conf, CommandLine cmdline) {
|
||||
@ -862,7 +898,7 @@ public final class BackupCommands {
|
||||
}
|
||||
}
|
||||
|
||||
private static class BackupSetCommand extends Command {
|
||||
public static class BackupSetCommand extends Command {
|
||||
private final static String SET_ADD_CMD = "add";
|
||||
private final static String SET_REMOVE_CMD = "remove";
|
||||
private final static String SET_DELETE_CMD = "delete";
|
||||
|
@ -19,12 +19,13 @@ package org.apache.hadoop.hbase.backup.mapreduce;
|
||||
|
||||
import static org.apache.hadoop.hbase.backup.util.BackupUtils.succeeded;
|
||||
|
||||
import java.io.FileNotFoundException;
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
import java.util.Stack;
|
||||
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
@ -36,7 +37,6 @@ import org.apache.hadoop.fs.RemoteIterator;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.backup.BackupInfo;
|
||||
import org.apache.hadoop.hbase.backup.BackupMergeJob;
|
||||
import org.apache.hadoop.hbase.backup.BackupRestoreConstants;
|
||||
import org.apache.hadoop.hbase.backup.HBackupFileSystem;
|
||||
import org.apache.hadoop.hbase.backup.impl.BackupManifest;
|
||||
import org.apache.hadoop.hbase.backup.impl.BackupSystemTable;
|
||||
@ -105,7 +105,7 @@ public class MapReduceBackupMergeJob implements BackupMergeJob {
|
||||
table.startMergeOperation(backupIds);
|
||||
|
||||
// Select most recent backup id
|
||||
String mergedBackupId = findMostRecentBackupId(backupIds);
|
||||
String mergedBackupId = BackupUtils.findMostRecentBackupId(backupIds);
|
||||
|
||||
TableName[] tableNames = getTableNamesInBackupImages(backupIds);
|
||||
|
||||
@ -146,15 +146,34 @@ public class MapReduceBackupMergeJob implements BackupMergeJob {
|
||||
table.updateProcessedTablesForMerge(tableList);
|
||||
finishedTables = true;
|
||||
|
||||
// Move data
|
||||
// PHASE 2 (modification of a backup file system)
|
||||
// Move existing mergedBackupId data into tmp directory
|
||||
// we will need it later in case of a failure
|
||||
Path tmpBackupDir = HBackupFileSystem.getBackupTmpDirPathForBackupId(backupRoot,
|
||||
mergedBackupId);
|
||||
Path backupDirPath = HBackupFileSystem.getBackupPath(backupRoot, mergedBackupId);
|
||||
|
||||
if (!fs.rename(backupDirPath, tmpBackupDir)) {
|
||||
throw new IOException("Failed to rename "+ backupDirPath +" to "+tmpBackupDir);
|
||||
} else {
|
||||
LOG.debug("Renamed "+ backupDirPath +" to "+ tmpBackupDir);
|
||||
}
|
||||
// Move new data into backup dest
|
||||
for (Pair<TableName, Path> tn : processedTableList) {
|
||||
moveData(fs, backupRoot, tn.getSecond(), tn.getFirst(), mergedBackupId);
|
||||
}
|
||||
|
||||
// Delete old data and update manifest
|
||||
// Update backup manifest
|
||||
List<String> backupsToDelete = getBackupIdsToDelete(backupIds, mergedBackupId);
|
||||
updateBackupManifest(tmpBackupDir.getParent().toString(), mergedBackupId, backupsToDelete);
|
||||
// Copy meta files back from tmp to backup dir
|
||||
copyMetaData(fs, tmpBackupDir, backupDirPath);
|
||||
// Delete tmp dir (Rename back during repair)
|
||||
if (!fs.delete(tmpBackupDir, true)) {
|
||||
// WARN and ignore
|
||||
LOG.warn("Could not delete tmp dir: "+ tmpBackupDir);
|
||||
}
|
||||
// Delete old data
|
||||
deleteBackupImages(backupsToDelete, conn, fs, backupRoot);
|
||||
updateBackupManifest(backupRoot, mergedBackupId, backupsToDelete);
|
||||
// Finish merge session
|
||||
table.finishMergeOperation();
|
||||
// Release lock
|
||||
@ -183,6 +202,80 @@ public class MapReduceBackupMergeJob implements BackupMergeJob {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Copy meta data to of a backup session
|
||||
* @param fs file system
|
||||
* @param tmpBackupDir temp backup directory, where meta is locaed
|
||||
* @param backupDirPath new path for backup
|
||||
* @throws IOException exception
|
||||
*/
|
||||
protected void copyMetaData(FileSystem fs, Path tmpBackupDir, Path backupDirPath)
|
||||
throws IOException {
|
||||
RemoteIterator<LocatedFileStatus> it = fs.listFiles(tmpBackupDir, true);
|
||||
List<Path> toKeep = new ArrayList<Path>();
|
||||
while (it.hasNext()) {
|
||||
Path p = it.next().getPath();
|
||||
if (fs.isDirectory(p)) {
|
||||
continue;
|
||||
}
|
||||
// Keep meta
|
||||
String fileName = p.toString();
|
||||
if (fileName.indexOf(FSTableDescriptors.TABLEINFO_DIR) > 0
|
||||
|| fileName.indexOf(HRegionFileSystem.REGION_INFO_FILE) > 0) {
|
||||
toKeep.add(p);
|
||||
}
|
||||
}
|
||||
// Copy meta to destination
|
||||
for (Path p : toKeep) {
|
||||
Path newPath = convertToDest(p, backupDirPath);
|
||||
copyFile(fs, p, newPath);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Copy file in DFS from p to newPath
|
||||
* @param fs file system
|
||||
* @param p old path
|
||||
* @param newPath new path
|
||||
* @throws IOException exception
|
||||
*/
|
||||
protected void copyFile(FileSystem fs, Path p, Path newPath) throws IOException {
|
||||
File f = File.createTempFile("data", "meta");
|
||||
Path localPath = new Path(f.getAbsolutePath());
|
||||
fs.copyToLocalFile(p, localPath);
|
||||
fs.copyFromLocalFile(localPath, newPath);
|
||||
boolean exists = fs.exists(newPath);
|
||||
if (!exists) {
|
||||
throw new IOException("Failed to copy meta file to: "+ newPath);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Converts path before copying
|
||||
* @param p path
|
||||
* @param backupDirPath backup root
|
||||
* @return converted path
|
||||
*/
|
||||
protected Path convertToDest(Path p, Path backupDirPath) {
|
||||
String backupId = backupDirPath.getName();
|
||||
Stack<String> stack = new Stack<String>();
|
||||
String name = null;
|
||||
while (true) {
|
||||
name = p.getName();
|
||||
if (!name.equals(backupId)) {
|
||||
stack.push(name);
|
||||
p = p.getParent();
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
}
|
||||
Path newPath = new Path(backupDirPath.toString());
|
||||
while (!stack.isEmpty()) {
|
||||
newPath = new Path(newPath, stack.pop());
|
||||
}
|
||||
return newPath;
|
||||
}
|
||||
|
||||
protected List<Path> toPathList(List<Pair<TableName, Path>> processedTableList) {
|
||||
ArrayList<Path> list = new ArrayList<>();
|
||||
for (Pair<TableName, Path> p : processedTableList) {
|
||||
@ -251,11 +344,6 @@ public class MapReduceBackupMergeJob implements BackupMergeJob {
|
||||
Path dest =
|
||||
new Path(HBackupFileSystem.getTableBackupDir(backupRoot, mergedBackupId, tableName));
|
||||
|
||||
// Delete all *data* files in dest
|
||||
if (!deleteData(fs, dest)) {
|
||||
throw new IOException("Could not delete " + dest);
|
||||
}
|
||||
|
||||
FileStatus[] fsts = fs.listStatus(bulkOutputPath);
|
||||
for (FileStatus fst : fsts) {
|
||||
if (fst.isDirectory()) {
|
||||
@ -265,56 +353,15 @@ public class MapReduceBackupMergeJob implements BackupMergeJob {
|
||||
if (!fs.delete(newDst, true)) {
|
||||
throw new IOException("failed to delete :"+ newDst);
|
||||
}
|
||||
} else {
|
||||
fs.mkdirs(dest);
|
||||
}
|
||||
fs.rename(fst.getPath(), dest);
|
||||
boolean result = fs.rename(fst.getPath(), dest);
|
||||
LOG.debug("MoveData from "+ fst.getPath() +" to "+ dest+" result="+ result);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Deletes only data files and keeps all META
|
||||
* @param fs file system instance
|
||||
* @param dest destination location
|
||||
* @return true, if success, false - otherwise
|
||||
* @throws FileNotFoundException exception
|
||||
* @throws IOException exception
|
||||
*/
|
||||
private boolean deleteData(FileSystem fs, Path dest) throws FileNotFoundException, IOException {
|
||||
RemoteIterator<LocatedFileStatus> it = fs.listFiles(dest, true);
|
||||
List<Path> toDelete = new ArrayList<Path>();
|
||||
while (it.hasNext()) {
|
||||
Path p = it.next().getPath();
|
||||
if (fs.isDirectory(p)) {
|
||||
continue;
|
||||
}
|
||||
// Keep meta
|
||||
String fileName = p.toString();
|
||||
if (fileName.indexOf(FSTableDescriptors.TABLEINFO_DIR) > 0 ||
|
||||
fileName.indexOf(HRegionFileSystem.REGION_INFO_FILE) > 0) {
|
||||
continue;
|
||||
}
|
||||
toDelete.add(p);
|
||||
}
|
||||
for (Path p : toDelete) {
|
||||
boolean result = fs.delete(p, false);
|
||||
if (!result) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
protected String findMostRecentBackupId(String[] backupIds) {
|
||||
long recentTimestamp = Long.MIN_VALUE;
|
||||
for (String backupId : backupIds) {
|
||||
long ts = Long.parseLong(backupId.split("_")[1]);
|
||||
if (ts > recentTimestamp) {
|
||||
recentTimestamp = ts;
|
||||
}
|
||||
}
|
||||
return BackupRestoreConstants.BACKUPID_PREFIX + recentTimestamp;
|
||||
}
|
||||
|
||||
protected TableName[] getTableNamesInBackupImages(String[] backupIds) throws IOException {
|
||||
Set<TableName> allSet = new HashSet<>();
|
||||
|
||||
|
@ -740,4 +740,16 @@ public final class BackupUtils {
|
||||
}
|
||||
return loader;
|
||||
}
|
||||
|
||||
public static String findMostRecentBackupId(String[] backupIds) {
|
||||
long recentTimestamp = Long.MIN_VALUE;
|
||||
for (String backupId : backupIds) {
|
||||
long ts = Long.parseLong(backupId.split("_")[1]);
|
||||
if (ts > recentTimestamp) {
|
||||
recentTimestamp = ts;
|
||||
}
|
||||
}
|
||||
return BackupRestoreConstants.BACKUPID_PREFIX + recentTimestamp;
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -0,0 +1,132 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.backup;
|
||||
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.backup.impl.BackupAdminImpl;
|
||||
import org.apache.hadoop.hbase.backup.util.BackupUtils;
|
||||
import org.apache.hadoop.hbase.client.Connection;
|
||||
import org.apache.hadoop.hbase.client.ConnectionFactory;
|
||||
import org.apache.hadoop.hbase.client.HBaseAdmin;
|
||||
import org.apache.hadoop.hbase.client.HTable;
|
||||
import org.apache.hadoop.hbase.client.Table;
|
||||
import org.apache.hadoop.hbase.testclassification.LargeTests;
|
||||
import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
|
||||
import org.junit.Assert;
|
||||
import org.junit.ClassRule;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
@Category(LargeTests.class)
|
||||
public class TestBackupMerge extends TestBackupBase {
|
||||
|
||||
@ClassRule
|
||||
public static final HBaseClassTestRule CLASS_RULE =
|
||||
HBaseClassTestRule.forClass(TestBackupMerge.class);
|
||||
|
||||
private static final Logger LOG =
|
||||
LoggerFactory.getLogger(TestBackupMerge.class);
|
||||
|
||||
|
||||
|
||||
@Test
|
||||
public void TestIncBackupMergeRestore() throws Exception {
|
||||
int ADD_ROWS = 99;
|
||||
// #1 - create full backup for all tables
|
||||
LOG.info("create full backup image for all tables");
|
||||
|
||||
List<TableName> tables = Lists.newArrayList(table1, table2);
|
||||
// Set custom Merge Job implementation
|
||||
|
||||
|
||||
Connection conn = ConnectionFactory.createConnection(conf1);
|
||||
|
||||
HBaseAdmin admin = (HBaseAdmin) conn.getAdmin();
|
||||
BackupAdminImpl client = new BackupAdminImpl(conn);
|
||||
|
||||
BackupRequest request = createBackupRequest(BackupType.FULL, tables, BACKUP_ROOT_DIR);
|
||||
String backupIdFull = client.backupTables(request);
|
||||
|
||||
assertTrue(checkSucceeded(backupIdFull));
|
||||
|
||||
// #2 - insert some data to table1
|
||||
HTable t1 = insertIntoTable(conn, table1, famName, 1, ADD_ROWS);
|
||||
LOG.debug("writing " + ADD_ROWS + " rows to " + table1);
|
||||
|
||||
Assert.assertEquals(TEST_UTIL.countRows(t1), NB_ROWS_IN_BATCH + ADD_ROWS);
|
||||
t1.close();
|
||||
LOG.debug("written " + ADD_ROWS + " rows to " + table1);
|
||||
|
||||
HTable t2 = insertIntoTable(conn, table2, famName, 1, ADD_ROWS);
|
||||
|
||||
Assert.assertEquals(TEST_UTIL.countRows(t2), NB_ROWS_IN_BATCH + ADD_ROWS);
|
||||
t2.close();
|
||||
LOG.debug("written " + ADD_ROWS + " rows to " + table2);
|
||||
|
||||
// #3 - incremental backup for multiple tables
|
||||
tables = Lists.newArrayList(table1, table2);
|
||||
request = createBackupRequest(BackupType.INCREMENTAL, tables, BACKUP_ROOT_DIR);
|
||||
String backupIdIncMultiple = client.backupTables(request);
|
||||
|
||||
assertTrue(checkSucceeded(backupIdIncMultiple));
|
||||
|
||||
t1 = insertIntoTable(conn, table1, famName, 2, ADD_ROWS);
|
||||
t1.close();
|
||||
|
||||
t2 = insertIntoTable(conn, table2, famName, 2, ADD_ROWS);
|
||||
t2.close();
|
||||
|
||||
// #3 - incremental backup for multiple tables
|
||||
request = createBackupRequest(BackupType.INCREMENTAL, tables, BACKUP_ROOT_DIR);
|
||||
String backupIdIncMultiple2 = client.backupTables(request);
|
||||
assertTrue(checkSucceeded(backupIdIncMultiple2));
|
||||
|
||||
try (BackupAdmin bAdmin = new BackupAdminImpl(conn)) {
|
||||
String[] backups = new String[] { backupIdIncMultiple, backupIdIncMultiple2 };
|
||||
bAdmin.mergeBackups(backups);
|
||||
}
|
||||
|
||||
// #6 - restore incremental backup for multiple tables, with overwrite
|
||||
TableName[] tablesRestoreIncMultiple = new TableName[] { table1, table2 };
|
||||
TableName[] tablesMapIncMultiple = new TableName[] { table1_restore, table2_restore };
|
||||
client.restore(BackupUtils.createRestoreRequest(BACKUP_ROOT_DIR, backupIdIncMultiple2, false,
|
||||
tablesRestoreIncMultiple, tablesMapIncMultiple, true));
|
||||
|
||||
Table hTable = conn.getTable(table1_restore);
|
||||
LOG.debug("After incremental restore: " + hTable.getTableDescriptor());
|
||||
int countRows = TEST_UTIL.countRows(hTable, famName);
|
||||
LOG.debug("f1 has " + countRows + " rows");
|
||||
Assert.assertEquals(NB_ROWS_IN_BATCH + 2 * ADD_ROWS, countRows);
|
||||
|
||||
hTable.close();
|
||||
|
||||
hTable = conn.getTable(table2_restore);
|
||||
Assert.assertEquals(TEST_UTIL.countRows(hTable), NB_ROWS_IN_BATCH + 2 * ADD_ROWS);
|
||||
hTable.close();
|
||||
|
||||
admin.close();
|
||||
conn.close();
|
||||
}
|
||||
}
|
@ -31,6 +31,7 @@ import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.backup.impl.BackupAdminImpl;
|
||||
import org.apache.hadoop.hbase.backup.impl.BackupCommands;
|
||||
import org.apache.hadoop.hbase.backup.impl.BackupSystemTable;
|
||||
import org.apache.hadoop.hbase.backup.mapreduce.MapReduceBackupMergeJob;
|
||||
import org.apache.hadoop.hbase.backup.mapreduce.MapReduceHFileSplitterJob;
|
||||
@ -113,7 +114,7 @@ public class TestIncrementalBackupMergeWithFailures extends TestBackupBase {
|
||||
table.startMergeOperation(backupIds);
|
||||
|
||||
// Select most recent backup id
|
||||
String mergedBackupId = findMostRecentBackupId(backupIds);
|
||||
String mergedBackupId = BackupUtils.findMostRecentBackupId(backupIds);
|
||||
|
||||
TableName[] tableNames = getTableNamesInBackupImages(backupIds);
|
||||
|
||||
@ -160,18 +161,38 @@ public class TestIncrementalBackupMergeWithFailures extends TestBackupBase {
|
||||
table.updateProcessedTablesForMerge(tableList);
|
||||
finishedTables = true;
|
||||
|
||||
// Move data
|
||||
// (modification of a backup file system)
|
||||
// Move existing mergedBackupId data into tmp directory
|
||||
// we will need it later in case of a failure
|
||||
Path tmpBackupDir = HBackupFileSystem.getBackupTmpDirPathForBackupId(backupRoot,
|
||||
mergedBackupId);
|
||||
Path backupDirPath = HBackupFileSystem.getBackupPath(backupRoot, mergedBackupId);
|
||||
if (!fs.rename(backupDirPath, tmpBackupDir)) {
|
||||
throw new IOException("Failed to rename "+ backupDirPath +" to "+tmpBackupDir);
|
||||
} else {
|
||||
LOG.debug("Renamed "+ backupDirPath +" to "+ tmpBackupDir);
|
||||
}
|
||||
// Move new data into backup dest
|
||||
for (Pair<TableName, Path> tn : processedTableList) {
|
||||
moveData(fs, backupRoot, tn.getSecond(), tn.getFirst(), mergedBackupId);
|
||||
}
|
||||
// PHASE 4
|
||||
checkFailure(FailurePhase.PHASE4);
|
||||
// Delete old data and update manifest
|
||||
// Update backup manifest
|
||||
List<String> backupsToDelete = getBackupIdsToDelete(backupIds, mergedBackupId);
|
||||
updateBackupManifest(tmpBackupDir.getParent().toString(), mergedBackupId, backupsToDelete);
|
||||
// Copy meta files back from tmp to backup dir
|
||||
copyMetaData(fs, tmpBackupDir, backupDirPath);
|
||||
// Delete tmp dir (Rename back during repair)
|
||||
if (!fs.delete(tmpBackupDir, true)) {
|
||||
// WARN and ignore
|
||||
LOG.warn("Could not delete tmp dir: "+ tmpBackupDir);
|
||||
}
|
||||
// Delete old data
|
||||
deleteBackupImages(backupsToDelete, conn, fs, backupRoot);
|
||||
updateBackupManifest(backupRoot, mergedBackupId, backupsToDelete);
|
||||
// Finish merge session
|
||||
table.finishMergeOperation();
|
||||
// Release lock
|
||||
table.finishBackupExclusiveOperation();
|
||||
} catch (RuntimeException e) {
|
||||
throw e;
|
||||
} catch (Exception e) {
|
||||
@ -285,8 +306,8 @@ public class TestIncrementalBackupMergeWithFailures extends TestBackupBase {
|
||||
Assert.fail("IOException is expected");
|
||||
} catch(IOException ee) {
|
||||
// Expected - clean up before proceeding
|
||||
table.finishMergeOperation();
|
||||
table.finishBackupExclusiveOperation();
|
||||
//table.finishMergeOperation();
|
||||
//table.finishBackupExclusiveOperation();
|
||||
}
|
||||
}
|
||||
table.close();
|
||||
@ -297,7 +318,10 @@ public class TestIncrementalBackupMergeWithFailures extends TestBackupBase {
|
||||
Configuration conf = conn.getConfiguration();
|
||||
conf.unset(FAILURE_PHASE_KEY);
|
||||
conf.unset(BackupRestoreFactory.HBASE_BACKUP_MERGE_IMPL_CLASS);
|
||||
|
||||
// Now run repair
|
||||
BackupSystemTable sysTable = new BackupSystemTable(conn);
|
||||
BackupCommands.RepairCommand.repairFailedBackupMergeIfAny(conn, sysTable);
|
||||
// Now repeat merge
|
||||
try (BackupAdmin bAdmin = new BackupAdminImpl(conn)) {
|
||||
String[] backups = new String[] { backupIdIncMultiple, backupIdIncMultiple2 };
|
||||
bAdmin.mergeBackups(backups);
|
||||
|
Loading…
x
Reference in New Issue
Block a user