HBASE-17938 General fault - tolerance framework for backup/restore operations (Vladimir Rodionov)

This commit is contained in:
tedyu 2017-05-12 09:27:58 -07:00
parent da68537ae6
commit 305ffcb040
11 changed files with 571 additions and 116 deletions

View File

@ -0,0 +1,53 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.backup;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.backup.impl.FullTableBackupClient;
import org.apache.hadoop.hbase.backup.impl.IncrementalTableBackupClient;
import org.apache.hadoop.hbase.backup.impl.TableBackupClient;
import org.apache.hadoop.hbase.client.Connection;
public class BackupClientFactory {
public static TableBackupClient create (Connection conn, String backupId, BackupRequest request)
throws IOException
{
Configuration conf = conn.getConfiguration();
try {
String clsName = conf.get(TableBackupClient.BACKUP_CLIENT_IMPL_CLASS);
if (clsName != null) {
Class<?> clientImpl = Class.forName(clsName);
TableBackupClient client = (TableBackupClient) clientImpl.newInstance();
client.init(conn, backupId, request);
return client;
}
} catch (Exception e) {
throw new IOException(e);
}
BackupType type = request.getBackupType();
if (type == BackupType.FULL) {
return new FullTableBackupClient(conn, backupId, request);
} else {
return new IncrementalTableBackupClient(conn, backupId, request);
}
}
}

View File

@ -134,6 +134,8 @@ public class BackupDriver extends AbstractHBaseTool {
return -1;
}
throw e;
} finally {
command.finish();
}
return 0;
}

View File

@ -45,8 +45,6 @@ public interface BackupRestoreConstants {
public static final String BACKUP_ATTEMPTS_PAUSE_MS_KEY = "hbase.backup.attempts.pause.ms";
public static final int DEFAULT_BACKUP_ATTEMPTS_PAUSE_MS = 10000;
/*
* Drivers option list
*/

View File

@ -33,6 +33,7 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.backup.BackupAdmin;
import org.apache.hadoop.hbase.backup.BackupClientFactory;
import org.apache.hadoop.hbase.backup.BackupInfo;
import org.apache.hadoop.hbase.backup.BackupInfo.BackupState;
import org.apache.hadoop.hbase.backup.BackupRequest;
@ -530,11 +531,16 @@ public class BackupAdminImpl implements BackupAdmin {
withTotalTasks(request.getTotalTasks()).
withBandwidthPerTasks((int)request.getBandwidth()).build();
if (type == BackupType.FULL) {
new FullTableBackupClient(conn, backupId, request).execute();
} else {
new IncrementalTableBackupClient(conn, backupId, request).execute();
TableBackupClient client =null;
try {
client = BackupClientFactory.create(conn, backupId, request);
} catch (IOException e) {
LOG.error("There is an active session already running");
throw e;
}
client.execute();
return backupId;
}

View File

@ -53,8 +53,8 @@ import org.apache.hadoop.hbase.backup.BackupRequest;
import org.apache.hadoop.hbase.backup.BackupRestoreConstants;
import org.apache.hadoop.hbase.backup.BackupRestoreConstants.BackupCommand;
import org.apache.hadoop.hbase.backup.BackupType;
import org.apache.hadoop.hbase.backup.util.BackupUtils;
import org.apache.hadoop.hbase.backup.util.BackupSet;
import org.apache.hadoop.hbase.backup.util.BackupUtils;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
@ -114,9 +114,12 @@ public final class BackupCommands {
public static abstract class Command extends Configured {
CommandLine cmdline;
Connection conn;
Command(Configuration conf) {
super(conf);
if (conf == null) {
conf = HBaseConfiguration.create();
}
setConf(conf);
}
public void execute() throws IOException {
@ -124,9 +127,40 @@ public final class BackupCommands {
printUsage();
throw new IOException(INCORRECT_USAGE);
}
// Create connection
conn = ConnectionFactory.createConnection(getConf());
if (requiresNoActiveSession()) {
// Check active session
try (BackupSystemTable table = new BackupSystemTable(conn);) {
List<BackupInfo> sessions = table.getBackupInfos(BackupState.RUNNING);
if(sessions.size() > 0) {
System.err.println("Found backup session in a RUNNING state: ");
System.err.println(sessions.get(0));
System.err.println("This may indicate that a previous session has failed abnormally.");
System.err.println("In this case, backup recovery is recommended.");
throw new IOException("Active session found, aborted command execution");
}
}
}
}
public void finish() throws IOException {
if (conn != null) {
conn.close();
}
}
protected abstract void printUsage();
/**
* The command can't be run if active backup session is in progress
* @return true if no active sessions are in progress
*/
protected boolean requiresNoActiveSession() {
return false;
}
}
private BackupCommands() {
@ -177,9 +211,13 @@ public final class BackupCommands {
this.cmdline = cmdline;
}
@Override
protected boolean requiresNoActiveSession() {
return true;
}
@Override
public void execute() throws IOException {
super.execute();
if (cmdline == null || cmdline.getArgs() == null) {
printUsage();
throw new IOException(INCORRECT_USAGE);
@ -202,8 +240,8 @@ public final class BackupCommands {
throw new IOException(INCORRECT_USAGE);
}
String tables = null;
Configuration conf = getConf() != null ? getConf() : HBaseConfiguration.create();
// Check if we have both: backup set and list of tables
if (cmdline.hasOption(OPTION_TABLE) && cmdline.hasOption(OPTION_SET)) {
@ -212,12 +250,13 @@ public final class BackupCommands {
printUsage();
throw new IOException(INCORRECT_USAGE);
}
// Creates connection
super.execute();
// Check backup set
String setName = null;
if (cmdline.hasOption(OPTION_SET)) {
setName = cmdline.getOptionValue(OPTION_SET);
tables = getTablesForSet(setName, conf);
tables = getTablesForSet(setName, getConf());
if (tables == null) {
System.out.println("ERROR: Backup set '" + setName
@ -235,8 +274,7 @@ public final class BackupCommands {
cmdline.hasOption(OPTION_WORKERS) ? Integer.parseInt(cmdline
.getOptionValue(OPTION_WORKERS)) : -1;
try (Connection conn = ConnectionFactory.createConnection(getConf());
BackupAdminImpl admin = new BackupAdminImpl(conn);) {
try (BackupAdminImpl admin = new BackupAdminImpl(conn);) {
BackupRequest.Builder builder = new BackupRequest.Builder();
BackupRequest request = builder.withBackupType(BackupType.valueOf(args[1].toUpperCase()))
@ -268,8 +306,7 @@ public final class BackupCommands {
}
private String getTablesForSet(String name, Configuration conf) throws IOException {
try (final Connection conn = ConnectionFactory.createConnection(conf);
final BackupSystemTable table = new BackupSystemTable(conn)) {
try (final BackupSystemTable table = new BackupSystemTable(conn)) {
List<TableName> tables = table.describeBackupSet(name);
if (tables == null) return null;
return StringUtils.join(tables, BackupRestoreConstants.TABLENAME_DELIMITER_IN_COMMAND);
@ -304,7 +341,6 @@ public final class BackupCommands {
@Override
public void execute() throws IOException {
super.execute();
if (cmdline == null) {
printUsage();
throw new IOException(INCORRECT_USAGE);
@ -359,7 +395,6 @@ public final class BackupCommands {
@Override
public void execute() throws IOException {
super.execute();
if (cmdline == null || cmdline.getArgs() == null) {
printUsage();
throw new IOException(INCORRECT_USAGE);
@ -370,10 +405,10 @@ public final class BackupCommands {
throw new IOException(INCORRECT_USAGE);
}
super.execute();
String backupId = args[1];
Configuration conf = getConf() != null ? getConf() : HBaseConfiguration.create();
try (final Connection conn = ConnectionFactory.createConnection(conf);
final BackupSystemTable sysTable = new BackupSystemTable(conn);) {
try (final BackupSystemTable sysTable = new BackupSystemTable(conn);) {
BackupInfo info = sysTable.readBackupInfo(backupId);
if (info == null) {
System.out.println("ERROR: " + backupId + " does not exist");
@ -399,7 +434,6 @@ public final class BackupCommands {
@Override
public void execute() throws IOException {
super.execute();
if (cmdline == null || cmdline.getArgs() == null || cmdline.getArgs().length == 1) {
System.out.println("No backup id was specified, "
@ -412,10 +446,10 @@ public final class BackupCommands {
throw new IOException(INCORRECT_USAGE);
}
super.execute();
String backupId = (args == null || args.length <= 1) ? null : args[1];
Configuration conf = getConf() != null ? getConf() : HBaseConfiguration.create();
try (final Connection conn = ConnectionFactory.createConnection(conf);
final BackupSystemTable sysTable = new BackupSystemTable(conn);) {
try (final BackupSystemTable sysTable = new BackupSystemTable(conn);) {
BackupInfo info = null;
if (backupId != null) {
@ -455,20 +489,24 @@ public final class BackupCommands {
this.cmdline = cmdline;
}
@Override
protected boolean requiresNoActiveSession() {
return true;
}
@Override
public void execute() throws IOException {
super.execute();
if (cmdline == null || cmdline.getArgs() == null || cmdline.getArgs().length < 2) {
printUsage();
throw new IOException(INCORRECT_USAGE);
}
super.execute();
String[] args = cmdline.getArgs();
String[] backupIds = new String[args.length - 1];
System.arraycopy(args, 1, backupIds, 0, backupIds.length);
Configuration conf = getConf() != null ? getConf() : HBaseConfiguration.create();
try (final Connection conn = ConnectionFactory.createConnection(conf);
BackupAdminImpl admin = new BackupAdminImpl(conn);) {
try (BackupAdminImpl admin = new BackupAdminImpl(conn);) {
int deleted = admin.deleteBackups(backupIds);
System.out.println("Deleted " + deleted + " backups. Total requested: " + args.length);
}
@ -512,7 +550,6 @@ public final class BackupCommands {
@Override
public void execute() throws IOException {
super.execute();
int n = parseHistoryLength();
final TableName tableName = getTableName();
@ -535,18 +572,16 @@ public final class BackupCommands {
};
Path backupRootPath = getBackupRootPath();
List<BackupInfo> history = null;
Configuration conf = getConf() != null ? getConf() : HBaseConfiguration.create();
if (backupRootPath == null) {
// Load from backup system table
try (final Connection conn = ConnectionFactory.createConnection(conf);
final BackupSystemTable sysTable = new BackupSystemTable(conn);) {
super.execute();
try (final BackupSystemTable sysTable = new BackupSystemTable(conn);) {
history = sysTable.getBackupHistory(n, tableNameFilter, tableSetFilter);
}
} else {
// load from backup FS
history =
BackupUtils.getHistory(conf, n, backupRootPath, tableNameFilter, tableSetFilter);
BackupUtils.getHistory(getConf(), n, backupRootPath, tableNameFilter, tableSetFilter);
}
for (BackupInfo info : history) {
System.out.println(info.getShortDescription());
@ -627,7 +662,6 @@ public final class BackupCommands {
@Override
public void execute() throws IOException {
super.execute();
// Command-line must have at least one element
if (cmdline == null || cmdline.getArgs() == null || cmdline.getArgs().length < 2) {
printUsage();
@ -661,11 +695,11 @@ public final class BackupCommands {
}
private void processSetList(String[] args) throws IOException {
super.execute();
// List all backup set names
// does not expect any args
Configuration conf = getConf() != null ? getConf() : HBaseConfiguration.create();
try (final Connection conn = ConnectionFactory.createConnection(conf);
BackupAdminImpl admin = new BackupAdminImpl(conn);) {
try (BackupAdminImpl admin = new BackupAdminImpl(conn);) {
List<BackupSet> list = admin.listBackupSets();
for (BackupSet bs : list) {
System.out.println(bs);
@ -678,10 +712,10 @@ public final class BackupCommands {
printUsage();
throw new IOException(INCORRECT_USAGE);
}
super.execute();
String setName = args[2];
Configuration conf = getConf() != null ? getConf() : HBaseConfiguration.create();
try (final Connection conn = ConnectionFactory.createConnection(conf);
final BackupSystemTable sysTable = new BackupSystemTable(conn);) {
try (final BackupSystemTable sysTable = new BackupSystemTable(conn);) {
List<TableName> tables = sysTable.describeBackupSet(setName);
BackupSet set = tables == null ? null : new BackupSet(setName, tables);
if (set == null) {
@ -697,10 +731,10 @@ public final class BackupCommands {
printUsage();
throw new IOException(INCORRECT_USAGE);
}
super.execute();
String setName = args[2];
Configuration conf = getConf() != null ? getConf() : HBaseConfiguration.create();
try (final Connection conn = ConnectionFactory.createConnection(conf);
final BackupAdminImpl admin = new BackupAdminImpl(conn);) {
try (final BackupAdminImpl admin = new BackupAdminImpl(conn);) {
boolean result = admin.deleteBackupSet(setName);
if (result) {
System.out.println("Delete set " + setName + " OK.");
@ -715,13 +749,12 @@ public final class BackupCommands {
printUsage();
throw new IOException(INCORRECT_USAGE);
}
super.execute();
String setName = args[2];
String[] tables = args[3].split(",");
TableName[] tableNames = toTableNames(tables);
Configuration conf = getConf() != null ? getConf() : HBaseConfiguration.create();
try (final Connection conn = ConnectionFactory.createConnection(conf);
final BackupAdminImpl admin = new BackupAdminImpl(conn);) {
try (final BackupAdminImpl admin = new BackupAdminImpl(conn);) {
admin.removeFromBackupSet(setName, tableNames);
}
}
@ -739,15 +772,15 @@ public final class BackupCommands {
printUsage();
throw new IOException(INCORRECT_USAGE);
}
super.execute();
String setName = args[2];
String[] tables = args[3].split(",");
TableName[] tableNames = new TableName[tables.length];
for (int i = 0; i < tables.length; i++) {
tableNames[i] = TableName.valueOf(tables[i]);
}
Configuration conf = getConf() != null ? getConf() : HBaseConfiguration.create();
try (final Connection conn = ConnectionFactory.createConnection(conf);
final BackupAdminImpl admin = new BackupAdminImpl(conn);) {
try (final BackupAdminImpl admin = new BackupAdminImpl(conn);) {
admin.addToBackupSet(setName, tableNames);
}

View File

@ -366,6 +366,23 @@ public class BackupManager implements Closeable {
systemTable.updateBackupInfo(context);
}
/**
* Starts new backup session
* @throws IOException if active session already exists
*/
public void startBackupSession() throws IOException {
systemTable.startBackupSession();
}
/**
* Finishes active backup session
* @throws IOException if no active session
*/
public void finishBackupSession() throws IOException {
systemTable.finishBackupSession();
}
/**
* Read the last backup start code (timestamp) of last successful backup. Will return null if
* there is no startcode stored in backup system table or the value is of length 0. These two

View File

@ -133,6 +133,12 @@ public final class BackupSystemTable implements Closeable {
private final static String BACKUP_INFO_PREFIX = "session:";
private final static String START_CODE_ROW = "startcode:";
private final static byte[] ACTIVE_SESSION_ROW = "activesession:".getBytes();
private final static byte[] ACTIVE_SESSION_COL = "c".getBytes();
private final static byte[] ACTIVE_SESSION_YES = "yes".getBytes();
private final static byte[] ACTIVE_SESSION_NO = "no".getBytes();
private final static String INCR_BACKUP_SET = "incrbackupset:";
private final static String TABLE_RS_LOG_MAP_PREFIX = "trslm:";
private final static String RS_LOG_TS_PREFIX = "rslogts:";
@ -555,6 +561,50 @@ public final class BackupSystemTable implements Closeable {
}
}
public void startBackupSession() throws IOException {
if (LOG.isTraceEnabled()) {
LOG.trace("Start new backup session");
}
try (Table table = connection.getTable(tableName)) {
Put put = createPutForStartBackupSession();
//First try to put if row does not exist
if (!table.checkAndPut(ACTIVE_SESSION_ROW, SESSIONS_FAMILY, ACTIVE_SESSION_COL, null, put)) {
// Row exists, try to put if value == ACTIVE_SESSION_NO
if (!table.checkAndPut(ACTIVE_SESSION_ROW, SESSIONS_FAMILY, ACTIVE_SESSION_COL,
ACTIVE_SESSION_NO, put)) {
throw new IOException("There is an active backup session");
}
}
}
}
private Put createPutForStartBackupSession() {
Put put = new Put(ACTIVE_SESSION_ROW);
put.addColumn(SESSIONS_FAMILY, ACTIVE_SESSION_COL, ACTIVE_SESSION_YES);
return put;
}
public void finishBackupSession() throws IOException
{
if (LOG.isTraceEnabled()) {
LOG.trace("Stop backup session");
}
try (Table table = connection.getTable(tableName)) {
Put put = createPutForStopBackupSession();
if(!table.checkAndPut(ACTIVE_SESSION_ROW, SESSIONS_FAMILY, ACTIVE_SESSION_COL,
ACTIVE_SESSION_YES, put))
{
throw new IOException("There is no active backup session");
}
}
}
private Put createPutForStopBackupSession() {
Put put = new Put(ACTIVE_SESSION_ROW);
put.addColumn(SESSIONS_FAMILY, ACTIVE_SESSION_COL, ACTIVE_SESSION_NO);
return put;
}
/**
* Get the Region Servers log information after the last log roll from backup system table.
* @param backupRoot root directory path to backup
@ -1302,9 +1352,9 @@ public final class BackupSystemTable implements Closeable {
return getTableName(conf).getNameAsString();
}
public static String getSnapshotName(Configuration conf) {
return "snapshot_"+getTableNameAsString(conf).replace(":", "_");
}
/**
* Creates Put operation for a given backup info object

View File

@ -53,6 +53,9 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
public class FullTableBackupClient extends TableBackupClient {
private static final Log LOG = LogFactory.getLog(FullTableBackupClient.class);
public FullTableBackupClient() {
}
public FullTableBackupClient(final Connection conn, final String backupId, BackupRequest request)
throws IOException {
super(conn, backupId, request);
@ -63,7 +66,7 @@ public class FullTableBackupClient extends TableBackupClient {
* @param backupInfo backup info
* @throws Exception exception
*/
private void snapshotCopy(BackupInfo backupInfo) throws Exception {
protected void snapshotCopy(BackupInfo backupInfo) throws Exception {
LOG.info("Snapshot copy is starting.");
// set overall backup phase: snapshot_copy
@ -108,7 +111,6 @@ public class FullTableBackupClient extends TableBackupClient {
*/
@Override
public void execute() throws IOException {
try (Admin admin = conn.getAdmin();) {
// Begin BACKUP
@ -190,7 +192,8 @@ public class FullTableBackupClient extends TableBackupClient {
}
private void snapshotTable(Admin admin, TableName tableName, String snapshotName)
protected void snapshotTable(Admin admin, TableName tableName, String snapshotName)
throws IOException {
int maxAttempts =

View File

@ -69,7 +69,7 @@ public class IncrementalTableBackupClient extends TableBackupClient {
super(conn, backupId, request);
}
private List<String> filterMissingFiles(List<String> incrBackupFileList) throws IOException {
protected List<String> filterMissingFiles(List<String> incrBackupFileList) throws IOException {
FileSystem fs = FileSystem.get(conf);
List<String> list = new ArrayList<String>();
for (String file : incrBackupFileList) {
@ -88,11 +88,11 @@ public class IncrementalTableBackupClient extends TableBackupClient {
* @param p path
* @return true, if yes
*/
private boolean isActiveWalPath(Path p) {
protected boolean isActiveWalPath(Path p) {
return !AbstractFSWALProvider.isArchivedLogFile(p);
}
static int getIndex(TableName tbl, List<TableName> sTableList) {
protected static int getIndex(TableName tbl, List<TableName> sTableList) {
if (sTableList == null) return 0;
for (int i = 0; i < sTableList.size(); i++) {
if (tbl.equals(sTableList.get(i))) {
@ -108,7 +108,7 @@ public class IncrementalTableBackupClient extends TableBackupClient {
* @param sTableList list of tables to be backed up
* @return map of table to List of files
*/
Map<byte[], List<Path>>[] handleBulkLoad(List<TableName> sTableList) throws IOException {
protected Map<byte[], List<Path>>[] handleBulkLoad(List<TableName> sTableList) throws IOException {
Map<byte[], List<Path>>[] mapForSrc = new Map[sTableList.size()];
Pair<Map<TableName, Map<String, Map<String, List<Pair<String, Boolean>>>>>, List<byte[]>> pair =
backupManager.readBulkloadRows(sTableList);
@ -207,18 +207,19 @@ public class IncrementalTableBackupClient extends TableBackupClient {
@Override
public void execute() throws IOException {
try {
// case PREPARE_INCREMENTAL:
beginBackup(backupManager, backupInfo);
backupInfo.setPhase(BackupPhase.PREPARE_INCREMENTAL);
LOG.debug("For incremental backup, current table set is "
+ backupManager.getIncrementalBackupTableSet());
try {
newTimestamps =
((IncrementalBackupManager) backupManager).getIncrBackupLogFileMap();
} catch (Exception e) {
// fail the overall backup and return
failBackup(conn, backupInfo, backupManager, e, "Unexpected Exception : ",
BackupType.INCREMENTAL, conf);
return;
}
// case INCREMENTAL_COPY:
@ -267,7 +268,7 @@ public class IncrementalTableBackupClient extends TableBackupClient {
}
}
private void incrementalCopyHFiles(BackupInfo backupInfo) throws Exception {
protected void incrementalCopyHFiles(BackupInfo backupInfo) throws Exception {
try {
LOG.debug("Incremental copy HFiles is starting.");
@ -293,7 +294,7 @@ public class IncrementalTableBackupClient extends TableBackupClient {
}
}
private void deleteBulkLoadDirectory() throws IOException {
protected void deleteBulkLoadDirectory() throws IOException {
// delete original bulk load directory on method exit
Path path = getBulkOutputDir();
FileSystem fs = FileSystem.get(conf);
@ -304,7 +305,7 @@ public class IncrementalTableBackupClient extends TableBackupClient {
}
private void convertWALsToHFiles(BackupInfo backupInfo) throws IOException {
protected void convertWALsToHFiles(BackupInfo backupInfo) throws IOException {
// get incremental backup file list and prepare parameters for DistCp
List<String> incrBackupFileList = backupInfo.getIncrBackupFileList();
// Get list of tables in incremental backup set
@ -322,13 +323,13 @@ public class IncrementalTableBackupClient extends TableBackupClient {
}
private boolean tableExists(TableName table, Connection conn) throws IOException {
protected boolean tableExists(TableName table, Connection conn) throws IOException {
try (Admin admin = conn.getAdmin();) {
return admin.tableExists(table);
}
}
private void walToHFiles(List<String> dirPaths, TableName tableName) throws IOException {
protected void walToHFiles(List<String> dirPaths, TableName tableName) throws IOException {
Tool player = new WALPlayer();
@ -357,14 +358,14 @@ public class IncrementalTableBackupClient extends TableBackupClient {
}
}
private Path getBulkOutputDirForTable(TableName table) {
protected Path getBulkOutputDirForTable(TableName table) {
Path tablePath = getBulkOutputDir();
tablePath = new Path(tablePath, table.getNamespaceAsString());
tablePath = new Path(tablePath, table.getQualifierAsString());
return new Path(tablePath, "data");
}
private Path getBulkOutputDir() {
protected Path getBulkOutputDir() {
String backupId = backupInfo.getBackupId();
Path path = new Path(backupInfo.getBackupRootDir());
path = new Path(path, ".tmp");

View File

@ -40,6 +40,7 @@ import org.apache.hadoop.hbase.backup.impl.BackupManifest.BackupImage;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.SnapshotDescription;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.FSUtils;
@ -51,6 +52,9 @@ import org.apache.hadoop.hbase.util.FSUtils;
*/
@InterfaceAudience.Private
public abstract class TableBackupClient {
public static final String BACKUP_CLIENT_IMPL_CLASS = "backup.client.impl.class";
private static final Log LOG = LogFactory.getLog(TableBackupClient.class);
protected Configuration conf;
@ -62,8 +66,17 @@ public abstract class TableBackupClient {
protected BackupManager backupManager;
protected BackupInfo backupInfo;
public TableBackupClient() {
}
public TableBackupClient(final Connection conn, final String backupId, BackupRequest request)
throws IOException {
init(conn, backupId, request);
}
public void init(final Connection conn, final String backupId, BackupRequest request)
throws IOException
{
if (request.getBackupType() == BackupType.FULL) {
backupManager = new BackupManager(conn, conn.getConfiguration());
} else {
@ -79,6 +92,8 @@ public abstract class TableBackupClient {
if (tableList == null || tableList.isEmpty()) {
this.tableList = new ArrayList<>(backupInfo.getTables());
}
// Start new session
backupManager.startBackupSession();
}
/**
@ -88,6 +103,8 @@ public abstract class TableBackupClient {
*/
protected void beginBackup(BackupManager backupManager, BackupInfo backupInfo)
throws IOException {
snapshotBackupTable();
backupManager.setBackupInfo(backupInfo);
// set the start timestamp of the overall backup
long startTs = EnvironmentEdgeManager.currentTime();
@ -103,7 +120,7 @@ public abstract class TableBackupClient {
}
}
private String getMessage(Exception e) {
protected String getMessage(Exception e) {
String msg = e.getMessage();
if (msg == null || msg.equals("")) {
msg = e.getClass().getName();
@ -116,7 +133,7 @@ public abstract class TableBackupClient {
* @param backupInfo backup info
* @throws Exception exception
*/
private void deleteSnapshot(final Connection conn, BackupInfo backupInfo, Configuration conf)
protected static void deleteSnapshots(final Connection conn, BackupInfo backupInfo, Configuration conf)
throws IOException {
LOG.debug("Trying to delete snapshot for full backup.");
for (String snapshotName : backupInfo.getSnapshotNames()) {
@ -127,8 +144,6 @@ public abstract class TableBackupClient {
try (Admin admin = conn.getAdmin();) {
admin.deleteSnapshot(snapshotName);
} catch (IOException ioe) {
LOG.debug("when deleting snapshot " + snapshotName, ioe);
}
LOG.debug("Deleting the snapshot " + snapshotName + " for backup " + backupInfo.getBackupId()
+ " succeeded.");
@ -140,7 +155,7 @@ public abstract class TableBackupClient {
* snapshots.
* @throws IOException exception
*/
private void cleanupExportSnapshotLog(Configuration conf) throws IOException {
protected static void cleanupExportSnapshotLog(Configuration conf) throws IOException {
FileSystem fs = FSUtils.getCurrentFileSystem(conf);
Path stagingDir =
new Path(conf.get(BackupRestoreConstants.CONF_STAGING_ROOT, fs.getWorkingDirectory()
@ -163,7 +178,7 @@ public abstract class TableBackupClient {
* Clean up the uncompleted data at target directory if the ongoing backup has already entered
* the copy phase.
*/
private void cleanupTargetDir(BackupInfo backupInfo, Configuration conf) {
protected static void cleanupTargetDir(BackupInfo backupInfo, Configuration conf) {
try {
// clean up the uncompleted data at target directory if the ongoing backup has already entered
// the copy phase
@ -182,10 +197,10 @@ public abstract class TableBackupClient {
new Path(HBackupFileSystem.getTableBackupDir(backupInfo.getBackupRootDir(),
backupInfo.getBackupId(), table));
if (outputFs.delete(targetDirPath, true)) {
LOG.info("Cleaning up uncompleted backup data at " + targetDirPath.toString()
LOG.debug("Cleaning up uncompleted backup data at " + targetDirPath.toString()
+ " done.");
} else {
LOG.info("No data has been copied to " + targetDirPath.toString() + ".");
LOG.debug("No data has been copied to " + targetDirPath.toString() + ".");
}
Path tableDir = targetDirPath.getParent();
@ -211,39 +226,106 @@ public abstract class TableBackupClient {
*/
protected void failBackup(Connection conn, BackupInfo backupInfo, BackupManager backupManager,
Exception e, String msg, BackupType type, Configuration conf) throws IOException {
try {
LOG.error(msg + getMessage(e), e);
// If this is a cancel exception, then we've already cleaned.
// set the failure timestamp of the overall backup
backupInfo.setCompleteTs(EnvironmentEdgeManager.currentTime());
// set failure message
backupInfo.setFailedMsg(e.getMessage());
// set overall backup status: failed
backupInfo.setState(BackupState.FAILED);
// compose the backup failed data
String backupFailedData =
"BackupId=" + backupInfo.getBackupId() + ",startts=" + backupInfo.getStartTs()
+ ",failedts=" + backupInfo.getCompleteTs() + ",failedphase=" + backupInfo.getPhase()
+ ",failedmessage=" + backupInfo.getFailedMsg();
LOG.error(backupFailedData);
cleanupAndRestoreBackupSystem(conn, backupInfo, conf);
// If backup session is updated to FAILED state - means we
// processed recovery already.
backupManager.updateBackupInfo(backupInfo);
backupManager.finishBackupSession();
LOG.error("Backup " + backupInfo.getBackupId() + " failed.");
} catch (IOException ee) {
LOG.error("Please run backup repair tool manually to restore backup system integrity");
throw ee;
}
}
public static void cleanupAndRestoreBackupSystem (Connection conn, BackupInfo backupInfo,
Configuration conf) throws IOException
{
BackupType type = backupInfo.getType();
// if full backup, then delete HBase snapshots if there already are snapshots taken
// and also clean up export snapshot log files if exist
if (type == BackupType.FULL) {
deleteSnapshot(conn, backupInfo, conf);
deleteSnapshots(conn, backupInfo, conf);
cleanupExportSnapshotLog(conf);
}
restoreBackupTable(conn, conf);
deleteBackupTableSnapshot(conn, conf);
// clean up the uncompleted data at target directory if the ongoing backup has already entered
// the copy phase
// For incremental backup, DistCp logs will be cleaned with the targetDir.
cleanupTargetDir(backupInfo, conf);
LOG.info("Backup " + backupInfo.getBackupId() + " failed.");
}
protected void snapshotBackupTable() throws IOException {
try (Admin admin = conn.getAdmin();){
admin.snapshot(BackupSystemTable.getSnapshotName(conf),
BackupSystemTable.getTableName(conf));
}
}
protected static void restoreBackupTable(Connection conn, Configuration conf)
throws IOException {
LOG.debug("Restoring " + BackupSystemTable.getTableNameAsString(conf) +
" from snapshot");
try (Admin admin = conn.getAdmin();) {
String snapshotName = BackupSystemTable.getSnapshotName(conf);
if (snapshotExists(admin, snapshotName)) {
admin.disableTable(BackupSystemTable.getTableName(conf));
admin.restoreSnapshot(snapshotName);
admin.enableTable(BackupSystemTable.getTableName(conf));
LOG.debug("Done restoring backup system table");
} else {
// Snapshot does not exists, i.e completeBackup failed after
// deleting backup system table snapshot
// In this case we log WARN and proceed
LOG.error("Could not restore backup system table. Snapshot " + snapshotName+
" does not exists.");
}
}
}
protected static boolean snapshotExists(Admin admin, String snapshotName) throws IOException {
List<SnapshotDescription> list = admin.listSnapshots();
for (SnapshotDescription desc: list) {
if (desc.getName().equals(snapshotName)) {
return true;
}
}
return false;
}
protected static void deleteBackupTableSnapshot(Connection conn, Configuration conf)
throws IOException {
LOG.debug("Deleting " + BackupSystemTable.getSnapshotName(conf) +
" from the system");
try (Admin admin = conn.getAdmin();) {
String snapshotName = BackupSystemTable.getSnapshotName(conf);
if (snapshotExists(admin, snapshotName)) {
admin.deleteSnapshot(snapshotName);
LOG.debug("Done deleting backup system table snapshot");
} else {
LOG.error("Snapshot "+snapshotName+" does not exists");
}
}
}
/**
@ -252,7 +334,7 @@ public abstract class TableBackupClient {
* @throws IOException exception
* @throws BackupException exception
*/
private void addManifest(BackupInfo backupInfo, BackupManager backupManager, BackupType type,
protected void addManifest(BackupInfo backupInfo, BackupManager backupManager, BackupType type,
Configuration conf) throws IOException, BackupException {
// set the overall backup phase : store manifest
backupInfo.setPhase(BackupPhase.STORE_MANIFEST);
@ -302,7 +384,7 @@ public abstract class TableBackupClient {
* @param backupInfo backup info
* @return meta data dir
*/
private String obtainBackupMetaDataStr(BackupInfo backupInfo) {
protected String obtainBackupMetaDataStr(BackupInfo backupInfo) {
StringBuffer sb = new StringBuffer();
sb.append("type=" + backupInfo.getType() + ",tablelist=");
for (TableName table : backupInfo.getTables()) {
@ -321,7 +403,7 @@ public abstract class TableBackupClient {
* hlogs.
* @throws IOException exception
*/
private void cleanupDistCpLog(BackupInfo backupInfo, Configuration conf) throws IOException {
protected void cleanupDistCpLog(BackupInfo backupInfo, Configuration conf) throws IOException {
Path rootPath = new Path(backupInfo.getHLogTargetDir()).getParent();
FileSystem fs = FileSystem.get(rootPath.toUri(), conf);
FileStatus[] files = FSUtils.listStatus(fs, rootPath);
@ -366,11 +448,15 @@ public abstract class TableBackupClient {
// - clean up directories with prefix "exportSnapshot-", which are generated when exporting
// snapshots
if (type == BackupType.FULL) {
deleteSnapshot(conn, backupInfo, conf);
deleteSnapshots(conn, backupInfo, conf);
cleanupExportSnapshotLog(conf);
} else if (type == BackupType.INCREMENTAL) {
cleanupDistCpLog(backupInfo, conf);
}
deleteBackupTableSnapshot(conn, conf);
// Finish active session
backupManager.finishBackupSession();
LOG.info("Backup " + backupInfo.getBackupId() + " completed.");
}

View File

@ -0,0 +1,206 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.backup;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.backup.BackupInfo.BackupPhase;
import org.apache.hadoop.hbase.backup.BackupInfo.BackupState;
import org.apache.hadoop.hbase.backup.impl.BackupSystemTable;
import org.apache.hadoop.hbase.backup.impl.FullTableBackupClient;
import org.apache.hadoop.hbase.backup.impl.TableBackupClient;
import org.apache.hadoop.hbase.backup.master.LogRollMasterProcedureManager;
import org.apache.hadoop.hbase.backup.util.BackupUtils;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.util.ToolRunner;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import com.google.common.annotations.VisibleForTesting;
@Category(LargeTests.class)
public class TestFullBackupWithFailures extends TestBackupBase {
private static final Log LOG = LogFactory.getLog(TestFullBackupWithFailures.class);
static class FullTableBackupClientForTest extends FullTableBackupClient
{
public static final String BACKUP_TEST_MODE_STAGE = "backup.test.mode.stage";
public FullTableBackupClientForTest() {
}
public FullTableBackupClientForTest(Connection conn, String backupId, BackupRequest request)
throws IOException {
super(conn, backupId, request);
}
@Override
public void execute() throws IOException
{
// Get the stage ID to fail on
try (Admin admin = conn.getAdmin();) {
// Begin BACKUP
beginBackup(backupManager, backupInfo);
failStageIf(0);
String savedStartCode = null;
boolean firstBackup = false;
// do snapshot for full table backup
savedStartCode = backupManager.readBackupStartCode();
firstBackup = savedStartCode == null || Long.parseLong(savedStartCode) == 0L;
if (firstBackup) {
// This is our first backup. Let's put some marker to system table so that we can hold the logs
// while we do the backup.
backupManager.writeBackupStartCode(0L);
}
failStageIf(1);
// We roll log here before we do the snapshot. It is possible there is duplicate data
// in the log that is already in the snapshot. But if we do it after the snapshot, we
// could have data loss.
// A better approach is to do the roll log on each RS in the same global procedure as
// the snapshot.
LOG.info("Execute roll log procedure for full backup ...");
Map<String, String> props = new HashMap<String, String>();
props.put("backupRoot", backupInfo.getBackupRootDir());
admin.execProcedure(LogRollMasterProcedureManager.ROLLLOG_PROCEDURE_SIGNATURE,
LogRollMasterProcedureManager.ROLLLOG_PROCEDURE_NAME, props);
failStageIf(2);
newTimestamps = backupManager.readRegionServerLastLogRollResult();
if (firstBackup) {
// Updates registered log files
// We record ALL old WAL files as registered, because
// this is a first full backup in the system and these
// files are not needed for next incremental backup
List<String> logFiles = BackupUtils.getWALFilesOlderThan(conf, newTimestamps);
backupManager.recordWALFiles(logFiles);
}
// SNAPSHOT_TABLES:
backupInfo.setPhase(BackupPhase.SNAPSHOT);
for (TableName tableName : tableList) {
String snapshotName =
"snapshot_" + Long.toString(EnvironmentEdgeManager.currentTime()) + "_"
+ tableName.getNamespaceAsString() + "_" + tableName.getQualifierAsString();
snapshotTable(admin, tableName, snapshotName);
backupInfo.setSnapshotName(tableName, snapshotName);
}
failStageIf(3);
// SNAPSHOT_COPY:
// do snapshot copy
LOG.debug("snapshot copy for " + backupId);
snapshotCopy(backupInfo);
// Updates incremental backup table set
backupManager.addIncrementalBackupTableSet(backupInfo.getTables());
// BACKUP_COMPLETE:
// set overall backup status: complete. Here we make sure to complete the backup.
// After this checkpoint, even if entering cancel process, will let the backup finished
backupInfo.setState(BackupState.COMPLETE);
// The table list in backupInfo is good for both full backup and incremental backup.
// For incremental backup, it contains the incremental backup table set.
backupManager.writeRegionServerLogTimestamp(backupInfo.getTables(), newTimestamps);
HashMap<TableName, HashMap<String, Long>> newTableSetTimestampMap =
backupManager.readLogTimestampMap();
Long newStartCode =
BackupUtils.getMinValue(BackupUtils
.getRSLogTimestampMins(newTableSetTimestampMap));
backupManager.writeBackupStartCode(newStartCode);
failStageIf(4);
// backup complete
completeBackup(conn, backupInfo, backupManager, BackupType.FULL, conf);
} catch (Exception e) {
failBackup(conn, backupInfo, backupManager, e, "Unexpected BackupException : ",
BackupType.FULL, conf);
throw new IOException(e);
}
}
@VisibleForTesting
protected int getTestStageId() {
return conf.getInt(BACKUP_TEST_MODE_STAGE, 0);
}
@VisibleForTesting
protected void failStageIf(int stage) throws IOException {
int current = getTestStageId();
if (current == stage) {
throw new IOException("Failed stage " + stage+" in testing");
}
}
}
@Test
public void testFullBackupWithFailures() throws Exception {
conf1.set(TableBackupClient.BACKUP_CLIENT_IMPL_CLASS,
FullTableBackupClientForTest.class.getName());
int stage = (new Random()).nextInt(5);
// Fail random stage between 0 and 4 inclusive
LOG.info("Running stage " + stage);
runBackupAndFailAtStage(stage);
}
public void runBackupAndFailAtStage(int stage) throws Exception {
conf1.setInt(FullTableBackupClientForTest.BACKUP_TEST_MODE_STAGE, stage);
try (BackupSystemTable table = new BackupSystemTable(TEST_UTIL.getConnection())) {
int before = table.getBackupHistory().size();
String[] args =
new String[] { "create", "full", BACKUP_ROOT_DIR, "-t",
table1.getNameAsString() + "," + table2.getNameAsString() };
// Run backup
int ret = ToolRunner.run(conf1, new BackupDriver(), args);
assertFalse(ret == 0);
List<BackupInfo> backups = table.getBackupHistory();
int after = table.getBackupHistory().size();
assertTrue(after == before +1);
for (BackupInfo data : backups) {
String backupId = data.getBackupId();
assertFalse(checkSucceeded(backupId));
}
Set<TableName> tables = table.getIncrementalBackupTableSet(BACKUP_ROOT_DIR);
assertTrue(tables.size() == 0);
}
}
}