diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/BackupClientFactory.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/BackupClientFactory.java new file mode 100644 index 00000000000..21d73ccde4f --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/BackupClientFactory.java @@ -0,0 +1,53 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.backup; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.backup.impl.FullTableBackupClient; +import org.apache.hadoop.hbase.backup.impl.IncrementalTableBackupClient; +import org.apache.hadoop.hbase.backup.impl.TableBackupClient; +import org.apache.hadoop.hbase.client.Connection; + +public class BackupClientFactory { + + public static TableBackupClient create (Connection conn, String backupId, BackupRequest request) + throws IOException + { + Configuration conf = conn.getConfiguration(); + try { + String clsName = conf.get(TableBackupClient.BACKUP_CLIENT_IMPL_CLASS); + if (clsName != null) { + Class clientImpl = Class.forName(clsName); + TableBackupClient client = (TableBackupClient) clientImpl.newInstance(); + client.init(conn, backupId, request); + return client; + } + } catch (Exception e) { + throw new IOException(e); + } + + BackupType type = request.getBackupType(); + if (type == BackupType.FULL) { + return new FullTableBackupClient(conn, backupId, request); + } else { + return new IncrementalTableBackupClient(conn, backupId, request); + } + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/BackupDriver.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/BackupDriver.java index 5794fcea999..cc5cc956c0e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/BackupDriver.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/BackupDriver.java @@ -134,6 +134,8 @@ public class BackupDriver extends AbstractHBaseTool { return -1; } throw e; + } finally { + command.finish(); } return 0; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/BackupRestoreConstants.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/BackupRestoreConstants.java index d1ab2464821..80f022f0d18 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/BackupRestoreConstants.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/BackupRestoreConstants.java @@ -45,8 +45,6 @@ public interface BackupRestoreConstants { public static final String BACKUP_ATTEMPTS_PAUSE_MS_KEY = "hbase.backup.attempts.pause.ms"; public static final int DEFAULT_BACKUP_ATTEMPTS_PAUSE_MS = 10000; - - /* * Drivers option list */ diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupAdminImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupAdminImpl.java index eb60860e5b6..3a54e208100 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupAdminImpl.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupAdminImpl.java @@ -33,6 +33,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.backup.BackupAdmin; +import org.apache.hadoop.hbase.backup.BackupClientFactory; import org.apache.hadoop.hbase.backup.BackupInfo; import org.apache.hadoop.hbase.backup.BackupInfo.BackupState; import org.apache.hadoop.hbase.backup.BackupRequest; @@ -530,11 +531,16 @@ public class BackupAdminImpl implements BackupAdmin { withTotalTasks(request.getTotalTasks()). withBandwidthPerTasks((int)request.getBandwidth()).build(); - if (type == BackupType.FULL) { - new FullTableBackupClient(conn, backupId, request).execute(); - } else { - new IncrementalTableBackupClient(conn, backupId, request).execute(); + TableBackupClient client =null; + try { + client = BackupClientFactory.create(conn, backupId, request); + } catch (IOException e) { + LOG.error("There is an active session already running"); + throw e; } + + client.execute(); + return backupId; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupCommands.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupCommands.java index 75e0ab7a27e..211a7067b49 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupCommands.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupCommands.java @@ -53,8 +53,8 @@ import org.apache.hadoop.hbase.backup.BackupRequest; import org.apache.hadoop.hbase.backup.BackupRestoreConstants; import org.apache.hadoop.hbase.backup.BackupRestoreConstants.BackupCommand; import org.apache.hadoop.hbase.backup.BackupType; -import org.apache.hadoop.hbase.backup.util.BackupUtils; import org.apache.hadoop.hbase.backup.util.BackupSet; +import org.apache.hadoop.hbase.backup.util.BackupUtils; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; @@ -114,9 +114,12 @@ public final class BackupCommands { public static abstract class Command extends Configured { CommandLine cmdline; - + Connection conn; Command(Configuration conf) { - super(conf); + if (conf == null) { + conf = HBaseConfiguration.create(); + } + setConf(conf); } public void execute() throws IOException { @@ -124,9 +127,40 @@ public final class BackupCommands { printUsage(); throw new IOException(INCORRECT_USAGE); } + + // Create connection + conn = ConnectionFactory.createConnection(getConf()); + if (requiresNoActiveSession()) { + // Check active session + try (BackupSystemTable table = new BackupSystemTable(conn);) { + List sessions = table.getBackupInfos(BackupState.RUNNING); + + if(sessions.size() > 0) { + System.err.println("Found backup session in a RUNNING state: "); + System.err.println(sessions.get(0)); + System.err.println("This may indicate that a previous session has failed abnormally."); + System.err.println("In this case, backup recovery is recommended."); + throw new IOException("Active session found, aborted command execution"); + } + } + } + } + + public void finish() throws IOException { + if (conn != null) { + conn.close(); + } } protected abstract void printUsage(); + + /** + * The command can't be run if active backup session is in progress + * @return true if no active sessions are in progress + */ + protected boolean requiresNoActiveSession() { + return false; + } } private BackupCommands() { @@ -177,9 +211,13 @@ public final class BackupCommands { this.cmdline = cmdline; } + @Override + protected boolean requiresNoActiveSession() { + return true; + } + @Override public void execute() throws IOException { - super.execute(); if (cmdline == null || cmdline.getArgs() == null) { printUsage(); throw new IOException(INCORRECT_USAGE); @@ -202,8 +240,8 @@ public final class BackupCommands { throw new IOException(INCORRECT_USAGE); } + String tables = null; - Configuration conf = getConf() != null ? getConf() : HBaseConfiguration.create(); // Check if we have both: backup set and list of tables if (cmdline.hasOption(OPTION_TABLE) && cmdline.hasOption(OPTION_SET)) { @@ -212,12 +250,13 @@ public final class BackupCommands { printUsage(); throw new IOException(INCORRECT_USAGE); } - + // Creates connection + super.execute(); // Check backup set String setName = null; if (cmdline.hasOption(OPTION_SET)) { setName = cmdline.getOptionValue(OPTION_SET); - tables = getTablesForSet(setName, conf); + tables = getTablesForSet(setName, getConf()); if (tables == null) { System.out.println("ERROR: Backup set '" + setName @@ -235,8 +274,7 @@ public final class BackupCommands { cmdline.hasOption(OPTION_WORKERS) ? Integer.parseInt(cmdline .getOptionValue(OPTION_WORKERS)) : -1; - try (Connection conn = ConnectionFactory.createConnection(getConf()); - BackupAdminImpl admin = new BackupAdminImpl(conn);) { + try (BackupAdminImpl admin = new BackupAdminImpl(conn);) { BackupRequest.Builder builder = new BackupRequest.Builder(); BackupRequest request = builder.withBackupType(BackupType.valueOf(args[1].toUpperCase())) @@ -268,8 +306,7 @@ public final class BackupCommands { } private String getTablesForSet(String name, Configuration conf) throws IOException { - try (final Connection conn = ConnectionFactory.createConnection(conf); - final BackupSystemTable table = new BackupSystemTable(conn)) { + try (final BackupSystemTable table = new BackupSystemTable(conn)) { List tables = table.describeBackupSet(name); if (tables == null) return null; return StringUtils.join(tables, BackupRestoreConstants.TABLENAME_DELIMITER_IN_COMMAND); @@ -304,7 +341,6 @@ public final class BackupCommands { @Override public void execute() throws IOException { - super.execute(); if (cmdline == null) { printUsage(); throw new IOException(INCORRECT_USAGE); @@ -359,7 +395,6 @@ public final class BackupCommands { @Override public void execute() throws IOException { - super.execute(); if (cmdline == null || cmdline.getArgs() == null) { printUsage(); throw new IOException(INCORRECT_USAGE); @@ -370,10 +405,10 @@ public final class BackupCommands { throw new IOException(INCORRECT_USAGE); } + super.execute(); + String backupId = args[1]; - Configuration conf = getConf() != null ? getConf() : HBaseConfiguration.create(); - try (final Connection conn = ConnectionFactory.createConnection(conf); - final BackupSystemTable sysTable = new BackupSystemTable(conn);) { + try (final BackupSystemTable sysTable = new BackupSystemTable(conn);) { BackupInfo info = sysTable.readBackupInfo(backupId); if (info == null) { System.out.println("ERROR: " + backupId + " does not exist"); @@ -399,7 +434,6 @@ public final class BackupCommands { @Override public void execute() throws IOException { - super.execute(); if (cmdline == null || cmdline.getArgs() == null || cmdline.getArgs().length == 1) { System.out.println("No backup id was specified, " @@ -412,10 +446,10 @@ public final class BackupCommands { throw new IOException(INCORRECT_USAGE); } + super.execute(); + String backupId = (args == null || args.length <= 1) ? null : args[1]; - Configuration conf = getConf() != null ? getConf() : HBaseConfiguration.create(); - try (final Connection conn = ConnectionFactory.createConnection(conf); - final BackupSystemTable sysTable = new BackupSystemTable(conn);) { + try (final BackupSystemTable sysTable = new BackupSystemTable(conn);) { BackupInfo info = null; if (backupId != null) { @@ -455,20 +489,24 @@ public final class BackupCommands { this.cmdline = cmdline; } + @Override + protected boolean requiresNoActiveSession() { + return true; + } + @Override public void execute() throws IOException { - super.execute(); if (cmdline == null || cmdline.getArgs() == null || cmdline.getArgs().length < 2) { printUsage(); throw new IOException(INCORRECT_USAGE); } + super.execute(); + String[] args = cmdline.getArgs(); String[] backupIds = new String[args.length - 1]; System.arraycopy(args, 1, backupIds, 0, backupIds.length); - Configuration conf = getConf() != null ? getConf() : HBaseConfiguration.create(); - try (final Connection conn = ConnectionFactory.createConnection(conf); - BackupAdminImpl admin = new BackupAdminImpl(conn);) { + try (BackupAdminImpl admin = new BackupAdminImpl(conn);) { int deleted = admin.deleteBackups(backupIds); System.out.println("Deleted " + deleted + " backups. Total requested: " + args.length); } @@ -512,7 +550,6 @@ public final class BackupCommands { @Override public void execute() throws IOException { - super.execute(); int n = parseHistoryLength(); final TableName tableName = getTableName(); @@ -535,18 +572,16 @@ public final class BackupCommands { }; Path backupRootPath = getBackupRootPath(); List history = null; - Configuration conf = getConf() != null ? getConf() : HBaseConfiguration.create(); if (backupRootPath == null) { // Load from backup system table - try (final Connection conn = ConnectionFactory.createConnection(conf); - final BackupSystemTable sysTable = new BackupSystemTable(conn);) { - + super.execute(); + try (final BackupSystemTable sysTable = new BackupSystemTable(conn);) { history = sysTable.getBackupHistory(n, tableNameFilter, tableSetFilter); } } else { // load from backup FS history = - BackupUtils.getHistory(conf, n, backupRootPath, tableNameFilter, tableSetFilter); + BackupUtils.getHistory(getConf(), n, backupRootPath, tableNameFilter, tableSetFilter); } for (BackupInfo info : history) { System.out.println(info.getShortDescription()); @@ -627,7 +662,6 @@ public final class BackupCommands { @Override public void execute() throws IOException { - super.execute(); // Command-line must have at least one element if (cmdline == null || cmdline.getArgs() == null || cmdline.getArgs().length < 2) { printUsage(); @@ -661,11 +695,11 @@ public final class BackupCommands { } private void processSetList(String[] args) throws IOException { + super.execute(); + // List all backup set names // does not expect any args - Configuration conf = getConf() != null ? getConf() : HBaseConfiguration.create(); - try (final Connection conn = ConnectionFactory.createConnection(conf); - BackupAdminImpl admin = new BackupAdminImpl(conn);) { + try (BackupAdminImpl admin = new BackupAdminImpl(conn);) { List list = admin.listBackupSets(); for (BackupSet bs : list) { System.out.println(bs); @@ -678,10 +712,10 @@ public final class BackupCommands { printUsage(); throw new IOException(INCORRECT_USAGE); } + super.execute(); + String setName = args[2]; - Configuration conf = getConf() != null ? getConf() : HBaseConfiguration.create(); - try (final Connection conn = ConnectionFactory.createConnection(conf); - final BackupSystemTable sysTable = new BackupSystemTable(conn);) { + try (final BackupSystemTable sysTable = new BackupSystemTable(conn);) { List tables = sysTable.describeBackupSet(setName); BackupSet set = tables == null ? null : new BackupSet(setName, tables); if (set == null) { @@ -697,10 +731,10 @@ public final class BackupCommands { printUsage(); throw new IOException(INCORRECT_USAGE); } + super.execute(); + String setName = args[2]; - Configuration conf = getConf() != null ? getConf() : HBaseConfiguration.create(); - try (final Connection conn = ConnectionFactory.createConnection(conf); - final BackupAdminImpl admin = new BackupAdminImpl(conn);) { + try (final BackupAdminImpl admin = new BackupAdminImpl(conn);) { boolean result = admin.deleteBackupSet(setName); if (result) { System.out.println("Delete set " + setName + " OK."); @@ -715,13 +749,12 @@ public final class BackupCommands { printUsage(); throw new IOException(INCORRECT_USAGE); } + super.execute(); String setName = args[2]; String[] tables = args[3].split(","); TableName[] tableNames = toTableNames(tables); - Configuration conf = getConf() != null ? getConf() : HBaseConfiguration.create(); - try (final Connection conn = ConnectionFactory.createConnection(conf); - final BackupAdminImpl admin = new BackupAdminImpl(conn);) { + try (final BackupAdminImpl admin = new BackupAdminImpl(conn);) { admin.removeFromBackupSet(setName, tableNames); } } @@ -739,15 +772,15 @@ public final class BackupCommands { printUsage(); throw new IOException(INCORRECT_USAGE); } + super.execute(); + String setName = args[2]; String[] tables = args[3].split(","); TableName[] tableNames = new TableName[tables.length]; for (int i = 0; i < tables.length; i++) { tableNames[i] = TableName.valueOf(tables[i]); } - Configuration conf = getConf() != null ? getConf() : HBaseConfiguration.create(); - try (final Connection conn = ConnectionFactory.createConnection(conf); - final BackupAdminImpl admin = new BackupAdminImpl(conn);) { + try (final BackupAdminImpl admin = new BackupAdminImpl(conn);) { admin.addToBackupSet(setName, tableNames); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupManager.java index f09310f7e98..a929700a989 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupManager.java @@ -366,6 +366,23 @@ public class BackupManager implements Closeable { systemTable.updateBackupInfo(context); } + /** + * Starts new backup session + * @throws IOException if active session already exists + */ + public void startBackupSession() throws IOException { + systemTable.startBackupSession(); + } + + /** + * Finishes active backup session + * @throws IOException if no active session + */ + public void finishBackupSession() throws IOException { + systemTable.finishBackupSession(); + } + + /** * Read the last backup start code (timestamp) of last successful backup. Will return null if * there is no startcode stored in backup system table or the value is of length 0. These two diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupSystemTable.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupSystemTable.java index 217e750deb8..2a0815f99d3 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupSystemTable.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupSystemTable.java @@ -133,6 +133,12 @@ public final class BackupSystemTable implements Closeable { private final static String BACKUP_INFO_PREFIX = "session:"; private final static String START_CODE_ROW = "startcode:"; + private final static byte[] ACTIVE_SESSION_ROW = "activesession:".getBytes(); + private final static byte[] ACTIVE_SESSION_COL = "c".getBytes(); + + private final static byte[] ACTIVE_SESSION_YES = "yes".getBytes(); + private final static byte[] ACTIVE_SESSION_NO = "no".getBytes(); + private final static String INCR_BACKUP_SET = "incrbackupset:"; private final static String TABLE_RS_LOG_MAP_PREFIX = "trslm:"; private final static String RS_LOG_TS_PREFIX = "rslogts:"; @@ -555,6 +561,50 @@ public final class BackupSystemTable implements Closeable { } } + public void startBackupSession() throws IOException { + if (LOG.isTraceEnabled()) { + LOG.trace("Start new backup session"); + } + try (Table table = connection.getTable(tableName)) { + Put put = createPutForStartBackupSession(); + //First try to put if row does not exist + if (!table.checkAndPut(ACTIVE_SESSION_ROW, SESSIONS_FAMILY, ACTIVE_SESSION_COL, null, put)) { + // Row exists, try to put if value == ACTIVE_SESSION_NO + if (!table.checkAndPut(ACTIVE_SESSION_ROW, SESSIONS_FAMILY, ACTIVE_SESSION_COL, + ACTIVE_SESSION_NO, put)) { + throw new IOException("There is an active backup session"); + } + } + } + } + + private Put createPutForStartBackupSession() { + Put put = new Put(ACTIVE_SESSION_ROW); + put.addColumn(SESSIONS_FAMILY, ACTIVE_SESSION_COL, ACTIVE_SESSION_YES); + return put; + } + + public void finishBackupSession() throws IOException + { + if (LOG.isTraceEnabled()) { + LOG.trace("Stop backup session"); + } + try (Table table = connection.getTable(tableName)) { + Put put = createPutForStopBackupSession(); + if(!table.checkAndPut(ACTIVE_SESSION_ROW, SESSIONS_FAMILY, ACTIVE_SESSION_COL, + ACTIVE_SESSION_YES, put)) + { + throw new IOException("There is no active backup session"); + } + } + } + + private Put createPutForStopBackupSession() { + Put put = new Put(ACTIVE_SESSION_ROW); + put.addColumn(SESSIONS_FAMILY, ACTIVE_SESSION_COL, ACTIVE_SESSION_NO); + return put; + } + /** * Get the Region Servers log information after the last log roll from backup system table. * @param backupRoot root directory path to backup @@ -1302,9 +1352,9 @@ public final class BackupSystemTable implements Closeable { return getTableName(conf).getNameAsString(); } - - - + public static String getSnapshotName(Configuration conf) { + return "snapshot_"+getTableNameAsString(conf).replace(":", "_"); + } /** * Creates Put operation for a given backup info object diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/FullTableBackupClient.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/FullTableBackupClient.java index ee7a84140eb..e323e96a450 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/FullTableBackupClient.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/FullTableBackupClient.java @@ -53,6 +53,9 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; public class FullTableBackupClient extends TableBackupClient { private static final Log LOG = LogFactory.getLog(FullTableBackupClient.class); + public FullTableBackupClient() { + } + public FullTableBackupClient(final Connection conn, final String backupId, BackupRequest request) throws IOException { super(conn, backupId, request); @@ -63,7 +66,7 @@ public class FullTableBackupClient extends TableBackupClient { * @param backupInfo backup info * @throws Exception exception */ - private void snapshotCopy(BackupInfo backupInfo) throws Exception { + protected void snapshotCopy(BackupInfo backupInfo) throws Exception { LOG.info("Snapshot copy is starting."); // set overall backup phase: snapshot_copy @@ -108,7 +111,6 @@ public class FullTableBackupClient extends TableBackupClient { */ @Override public void execute() throws IOException { - try (Admin admin = conn.getAdmin();) { // Begin BACKUP @@ -190,7 +192,8 @@ public class FullTableBackupClient extends TableBackupClient { } - private void snapshotTable(Admin admin, TableName tableName, String snapshotName) + + protected void snapshotTable(Admin admin, TableName tableName, String snapshotName) throws IOException { int maxAttempts = diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java index 3003c933b7a..eb8490a1d8c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java @@ -69,7 +69,7 @@ public class IncrementalTableBackupClient extends TableBackupClient { super(conn, backupId, request); } - private List filterMissingFiles(List incrBackupFileList) throws IOException { + protected List filterMissingFiles(List incrBackupFileList) throws IOException { FileSystem fs = FileSystem.get(conf); List list = new ArrayList(); for (String file : incrBackupFileList) { @@ -88,11 +88,11 @@ public class IncrementalTableBackupClient extends TableBackupClient { * @param p path * @return true, if yes */ - private boolean isActiveWalPath(Path p) { + protected boolean isActiveWalPath(Path p) { return !AbstractFSWALProvider.isArchivedLogFile(p); } - static int getIndex(TableName tbl, List sTableList) { + protected static int getIndex(TableName tbl, List sTableList) { if (sTableList == null) return 0; for (int i = 0; i < sTableList.size(); i++) { if (tbl.equals(sTableList.get(i))) { @@ -108,7 +108,7 @@ public class IncrementalTableBackupClient extends TableBackupClient { * @param sTableList list of tables to be backed up * @return map of table to List of files */ - Map>[] handleBulkLoad(List sTableList) throws IOException { + protected Map>[] handleBulkLoad(List sTableList) throws IOException { Map>[] mapForSrc = new Map[sTableList.size()]; Pair>>>>, List> pair = backupManager.readBulkloadRows(sTableList); @@ -207,18 +207,19 @@ public class IncrementalTableBackupClient extends TableBackupClient { @Override public void execute() throws IOException { - // case PREPARE_INCREMENTAL: - beginBackup(backupManager, backupInfo); - backupInfo.setPhase(BackupPhase.PREPARE_INCREMENTAL); - LOG.debug("For incremental backup, current table set is " - + backupManager.getIncrementalBackupTableSet()); try { + // case PREPARE_INCREMENTAL: + beginBackup(backupManager, backupInfo); + backupInfo.setPhase(BackupPhase.PREPARE_INCREMENTAL); + LOG.debug("For incremental backup, current table set is " + + backupManager.getIncrementalBackupTableSet()); newTimestamps = ((IncrementalBackupManager) backupManager).getIncrBackupLogFileMap(); } catch (Exception e) { // fail the overall backup and return failBackup(conn, backupInfo, backupManager, e, "Unexpected Exception : ", BackupType.INCREMENTAL, conf); + return; } // case INCREMENTAL_COPY: @@ -267,7 +268,7 @@ public class IncrementalTableBackupClient extends TableBackupClient { } } - private void incrementalCopyHFiles(BackupInfo backupInfo) throws Exception { + protected void incrementalCopyHFiles(BackupInfo backupInfo) throws Exception { try { LOG.debug("Incremental copy HFiles is starting."); @@ -293,7 +294,7 @@ public class IncrementalTableBackupClient extends TableBackupClient { } } - private void deleteBulkLoadDirectory() throws IOException { + protected void deleteBulkLoadDirectory() throws IOException { // delete original bulk load directory on method exit Path path = getBulkOutputDir(); FileSystem fs = FileSystem.get(conf); @@ -304,7 +305,7 @@ public class IncrementalTableBackupClient extends TableBackupClient { } - private void convertWALsToHFiles(BackupInfo backupInfo) throws IOException { + protected void convertWALsToHFiles(BackupInfo backupInfo) throws IOException { // get incremental backup file list and prepare parameters for DistCp List incrBackupFileList = backupInfo.getIncrBackupFileList(); // Get list of tables in incremental backup set @@ -322,13 +323,13 @@ public class IncrementalTableBackupClient extends TableBackupClient { } - private boolean tableExists(TableName table, Connection conn) throws IOException { + protected boolean tableExists(TableName table, Connection conn) throws IOException { try (Admin admin = conn.getAdmin();) { return admin.tableExists(table); } } - private void walToHFiles(List dirPaths, TableName tableName) throws IOException { + protected void walToHFiles(List dirPaths, TableName tableName) throws IOException { Tool player = new WALPlayer(); @@ -357,14 +358,14 @@ public class IncrementalTableBackupClient extends TableBackupClient { } } - private Path getBulkOutputDirForTable(TableName table) { + protected Path getBulkOutputDirForTable(TableName table) { Path tablePath = getBulkOutputDir(); tablePath = new Path(tablePath, table.getNamespaceAsString()); tablePath = new Path(tablePath, table.getQualifierAsString()); return new Path(tablePath, "data"); } - private Path getBulkOutputDir() { + protected Path getBulkOutputDir() { String backupId = backupInfo.getBackupId(); Path path = new Path(backupInfo.getBackupRootDir()); path = new Path(path, ".tmp"); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/TableBackupClient.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/TableBackupClient.java index 125b5da0864..1673e5e6b9d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/TableBackupClient.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/TableBackupClient.java @@ -40,6 +40,7 @@ import org.apache.hadoop.hbase.backup.impl.BackupManifest.BackupImage; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.SnapshotDescription; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.FSUtils; @@ -51,6 +52,9 @@ import org.apache.hadoop.hbase.util.FSUtils; */ @InterfaceAudience.Private public abstract class TableBackupClient { + + public static final String BACKUP_CLIENT_IMPL_CLASS = "backup.client.impl.class"; + private static final Log LOG = LogFactory.getLog(TableBackupClient.class); protected Configuration conf; @@ -62,8 +66,17 @@ public abstract class TableBackupClient { protected BackupManager backupManager; protected BackupInfo backupInfo; + public TableBackupClient() { + } + public TableBackupClient(final Connection conn, final String backupId, BackupRequest request) throws IOException { + init(conn, backupId, request); + } + + public void init(final Connection conn, final String backupId, BackupRequest request) + throws IOException + { if (request.getBackupType() == BackupType.FULL) { backupManager = new BackupManager(conn, conn.getConfiguration()); } else { @@ -79,6 +92,8 @@ public abstract class TableBackupClient { if (tableList == null || tableList.isEmpty()) { this.tableList = new ArrayList<>(backupInfo.getTables()); } + // Start new session + backupManager.startBackupSession(); } /** @@ -88,6 +103,8 @@ public abstract class TableBackupClient { */ protected void beginBackup(BackupManager backupManager, BackupInfo backupInfo) throws IOException { + + snapshotBackupTable(); backupManager.setBackupInfo(backupInfo); // set the start timestamp of the overall backup long startTs = EnvironmentEdgeManager.currentTime(); @@ -103,7 +120,7 @@ public abstract class TableBackupClient { } } - private String getMessage(Exception e) { + protected String getMessage(Exception e) { String msg = e.getMessage(); if (msg == null || msg.equals("")) { msg = e.getClass().getName(); @@ -116,7 +133,7 @@ public abstract class TableBackupClient { * @param backupInfo backup info * @throws Exception exception */ - private void deleteSnapshot(final Connection conn, BackupInfo backupInfo, Configuration conf) + protected static void deleteSnapshots(final Connection conn, BackupInfo backupInfo, Configuration conf) throws IOException { LOG.debug("Trying to delete snapshot for full backup."); for (String snapshotName : backupInfo.getSnapshotNames()) { @@ -127,8 +144,6 @@ public abstract class TableBackupClient { try (Admin admin = conn.getAdmin();) { admin.deleteSnapshot(snapshotName); - } catch (IOException ioe) { - LOG.debug("when deleting snapshot " + snapshotName, ioe); } LOG.debug("Deleting the snapshot " + snapshotName + " for backup " + backupInfo.getBackupId() + " succeeded."); @@ -140,7 +155,7 @@ public abstract class TableBackupClient { * snapshots. * @throws IOException exception */ - private void cleanupExportSnapshotLog(Configuration conf) throws IOException { + protected static void cleanupExportSnapshotLog(Configuration conf) throws IOException { FileSystem fs = FSUtils.getCurrentFileSystem(conf); Path stagingDir = new Path(conf.get(BackupRestoreConstants.CONF_STAGING_ROOT, fs.getWorkingDirectory() @@ -163,7 +178,7 @@ public abstract class TableBackupClient { * Clean up the uncompleted data at target directory if the ongoing backup has already entered * the copy phase. */ - private void cleanupTargetDir(BackupInfo backupInfo, Configuration conf) { + protected static void cleanupTargetDir(BackupInfo backupInfo, Configuration conf) { try { // clean up the uncompleted data at target directory if the ongoing backup has already entered // the copy phase @@ -182,10 +197,10 @@ public abstract class TableBackupClient { new Path(HBackupFileSystem.getTableBackupDir(backupInfo.getBackupRootDir(), backupInfo.getBackupId(), table)); if (outputFs.delete(targetDirPath, true)) { - LOG.info("Cleaning up uncompleted backup data at " + targetDirPath.toString() + LOG.debug("Cleaning up uncompleted backup data at " + targetDirPath.toString() + " done."); } else { - LOG.info("No data has been copied to " + targetDirPath.toString() + "."); + LOG.debug("No data has been copied to " + targetDirPath.toString() + "."); } Path tableDir = targetDirPath.getParent(); @@ -211,39 +226,106 @@ public abstract class TableBackupClient { */ protected void failBackup(Connection conn, BackupInfo backupInfo, BackupManager backupManager, Exception e, String msg, BackupType type, Configuration conf) throws IOException { - LOG.error(msg + getMessage(e), e); - // If this is a cancel exception, then we've already cleaned. - // set the failure timestamp of the overall backup - backupInfo.setCompleteTs(EnvironmentEdgeManager.currentTime()); - - // set failure message - backupInfo.setFailedMsg(e.getMessage()); - - // set overall backup status: failed - backupInfo.setState(BackupState.FAILED); - - // compose the backup failed data - String backupFailedData = - "BackupId=" + backupInfo.getBackupId() + ",startts=" + backupInfo.getStartTs() - + ",failedts=" + backupInfo.getCompleteTs() + ",failedphase=" + backupInfo.getPhase() - + ",failedmessage=" + backupInfo.getFailedMsg(); - LOG.error(backupFailedData); - - backupManager.updateBackupInfo(backupInfo); - - // if full backup, then delete HBase snapshots if there already are snapshots taken - // and also clean up export snapshot log files if exist - if (type == BackupType.FULL) { - deleteSnapshot(conn, backupInfo, conf); - cleanupExportSnapshotLog(conf); + try { + LOG.error(msg + getMessage(e), e); + // If this is a cancel exception, then we've already cleaned. + // set the failure timestamp of the overall backup + backupInfo.setCompleteTs(EnvironmentEdgeManager.currentTime()); + // set failure message + backupInfo.setFailedMsg(e.getMessage()); + // set overall backup status: failed + backupInfo.setState(BackupState.FAILED); + // compose the backup failed data + String backupFailedData = + "BackupId=" + backupInfo.getBackupId() + ",startts=" + backupInfo.getStartTs() + + ",failedts=" + backupInfo.getCompleteTs() + ",failedphase=" + backupInfo.getPhase() + + ",failedmessage=" + backupInfo.getFailedMsg(); + LOG.error(backupFailedData); + cleanupAndRestoreBackupSystem(conn, backupInfo, conf); + // If backup session is updated to FAILED state - means we + // processed recovery already. + backupManager.updateBackupInfo(backupInfo); + backupManager.finishBackupSession(); + LOG.error("Backup " + backupInfo.getBackupId() + " failed."); + } catch (IOException ee) { + LOG.error("Please run backup repair tool manually to restore backup system integrity"); + throw ee; } + } - // clean up the uncompleted data at target directory if the ongoing backup has already entered - // the copy phase - // For incremental backup, DistCp logs will be cleaned with the targetDir. - cleanupTargetDir(backupInfo, conf); - LOG.info("Backup " + backupInfo.getBackupId() + " failed."); + public static void cleanupAndRestoreBackupSystem (Connection conn, BackupInfo backupInfo, + Configuration conf) throws IOException + { + BackupType type = backupInfo.getType(); + // if full backup, then delete HBase snapshots if there already are snapshots taken + // and also clean up export snapshot log files if exist + if (type == BackupType.FULL) { + deleteSnapshots(conn, backupInfo, conf); + cleanupExportSnapshotLog(conf); + } + restoreBackupTable(conn, conf); + deleteBackupTableSnapshot(conn, conf); + // clean up the uncompleted data at target directory if the ongoing backup has already entered + // the copy phase + // For incremental backup, DistCp logs will be cleaned with the targetDir. + cleanupTargetDir(backupInfo, conf); + } + + protected void snapshotBackupTable() throws IOException { + + try (Admin admin = conn.getAdmin();){ + admin.snapshot(BackupSystemTable.getSnapshotName(conf), + BackupSystemTable.getTableName(conf)); + } + } + + protected static void restoreBackupTable(Connection conn, Configuration conf) + throws IOException { + + LOG.debug("Restoring " + BackupSystemTable.getTableNameAsString(conf) + + " from snapshot"); + try (Admin admin = conn.getAdmin();) { + String snapshotName = BackupSystemTable.getSnapshotName(conf); + if (snapshotExists(admin, snapshotName)) { + admin.disableTable(BackupSystemTable.getTableName(conf)); + admin.restoreSnapshot(snapshotName); + admin.enableTable(BackupSystemTable.getTableName(conf)); + LOG.debug("Done restoring backup system table"); + } else { + // Snapshot does not exists, i.e completeBackup failed after + // deleting backup system table snapshot + // In this case we log WARN and proceed + LOG.error("Could not restore backup system table. Snapshot " + snapshotName+ + " does not exists."); + } + } + } + + protected static boolean snapshotExists(Admin admin, String snapshotName) throws IOException { + + List list = admin.listSnapshots(); + for (SnapshotDescription desc: list) { + if (desc.getName().equals(snapshotName)) { + return true; + } + } + return false; + } + + protected static void deleteBackupTableSnapshot(Connection conn, Configuration conf) + throws IOException { + LOG.debug("Deleting " + BackupSystemTable.getSnapshotName(conf) + + " from the system"); + try (Admin admin = conn.getAdmin();) { + String snapshotName = BackupSystemTable.getSnapshotName(conf); + if (snapshotExists(admin, snapshotName)) { + admin.deleteSnapshot(snapshotName); + LOG.debug("Done deleting backup system table snapshot"); + } else { + LOG.error("Snapshot "+snapshotName+" does not exists"); + } + } } /** @@ -252,7 +334,7 @@ public abstract class TableBackupClient { * @throws IOException exception * @throws BackupException exception */ - private void addManifest(BackupInfo backupInfo, BackupManager backupManager, BackupType type, + protected void addManifest(BackupInfo backupInfo, BackupManager backupManager, BackupType type, Configuration conf) throws IOException, BackupException { // set the overall backup phase : store manifest backupInfo.setPhase(BackupPhase.STORE_MANIFEST); @@ -302,7 +384,7 @@ public abstract class TableBackupClient { * @param backupInfo backup info * @return meta data dir */ - private String obtainBackupMetaDataStr(BackupInfo backupInfo) { + protected String obtainBackupMetaDataStr(BackupInfo backupInfo) { StringBuffer sb = new StringBuffer(); sb.append("type=" + backupInfo.getType() + ",tablelist="); for (TableName table : backupInfo.getTables()) { @@ -321,7 +403,7 @@ public abstract class TableBackupClient { * hlogs. * @throws IOException exception */ - private void cleanupDistCpLog(BackupInfo backupInfo, Configuration conf) throws IOException { + protected void cleanupDistCpLog(BackupInfo backupInfo, Configuration conf) throws IOException { Path rootPath = new Path(backupInfo.getHLogTargetDir()).getParent(); FileSystem fs = FileSystem.get(rootPath.toUri(), conf); FileStatus[] files = FSUtils.listStatus(fs, rootPath); @@ -366,11 +448,15 @@ public abstract class TableBackupClient { // - clean up directories with prefix "exportSnapshot-", which are generated when exporting // snapshots if (type == BackupType.FULL) { - deleteSnapshot(conn, backupInfo, conf); + deleteSnapshots(conn, backupInfo, conf); cleanupExportSnapshotLog(conf); } else if (type == BackupType.INCREMENTAL) { cleanupDistCpLog(backupInfo, conf); } + deleteBackupTableSnapshot(conn, conf); + // Finish active session + backupManager.finishBackupSession(); + LOG.info("Backup " + backupInfo.getBackupId() + " completed."); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestFullBackupWithFailures.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestFullBackupWithFailures.java new file mode 100644 index 00000000000..955577373e2 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestFullBackupWithFailures.java @@ -0,0 +1,206 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.backup; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.Set; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.backup.BackupInfo.BackupPhase; +import org.apache.hadoop.hbase.backup.BackupInfo.BackupState; +import org.apache.hadoop.hbase.backup.impl.BackupSystemTable; +import org.apache.hadoop.hbase.backup.impl.FullTableBackupClient; +import org.apache.hadoop.hbase.backup.impl.TableBackupClient; +import org.apache.hadoop.hbase.backup.master.LogRollMasterProcedureManager; +import org.apache.hadoop.hbase.backup.util.BackupUtils; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.util.ToolRunner; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import com.google.common.annotations.VisibleForTesting; + +@Category(LargeTests.class) +public class TestFullBackupWithFailures extends TestBackupBase { + + private static final Log LOG = LogFactory.getLog(TestFullBackupWithFailures.class); + + static class FullTableBackupClientForTest extends FullTableBackupClient + { + public static final String BACKUP_TEST_MODE_STAGE = "backup.test.mode.stage"; + + public FullTableBackupClientForTest() { + } + + public FullTableBackupClientForTest(Connection conn, String backupId, BackupRequest request) + throws IOException { + super(conn, backupId, request); + } + + @Override + public void execute() throws IOException + { + // Get the stage ID to fail on + try (Admin admin = conn.getAdmin();) { + // Begin BACKUP + beginBackup(backupManager, backupInfo); + failStageIf(0); + String savedStartCode = null; + boolean firstBackup = false; + // do snapshot for full table backup + savedStartCode = backupManager.readBackupStartCode(); + firstBackup = savedStartCode == null || Long.parseLong(savedStartCode) == 0L; + if (firstBackup) { + // This is our first backup. Let's put some marker to system table so that we can hold the logs + // while we do the backup. + backupManager.writeBackupStartCode(0L); + } + failStageIf(1); + // We roll log here before we do the snapshot. It is possible there is duplicate data + // in the log that is already in the snapshot. But if we do it after the snapshot, we + // could have data loss. + // A better approach is to do the roll log on each RS in the same global procedure as + // the snapshot. + LOG.info("Execute roll log procedure for full backup ..."); + + Map props = new HashMap(); + props.put("backupRoot", backupInfo.getBackupRootDir()); + admin.execProcedure(LogRollMasterProcedureManager.ROLLLOG_PROCEDURE_SIGNATURE, + LogRollMasterProcedureManager.ROLLLOG_PROCEDURE_NAME, props); + failStageIf(2); + newTimestamps = backupManager.readRegionServerLastLogRollResult(); + if (firstBackup) { + // Updates registered log files + // We record ALL old WAL files as registered, because + // this is a first full backup in the system and these + // files are not needed for next incremental backup + List logFiles = BackupUtils.getWALFilesOlderThan(conf, newTimestamps); + backupManager.recordWALFiles(logFiles); + } + + // SNAPSHOT_TABLES: + backupInfo.setPhase(BackupPhase.SNAPSHOT); + for (TableName tableName : tableList) { + String snapshotName = + "snapshot_" + Long.toString(EnvironmentEdgeManager.currentTime()) + "_" + + tableName.getNamespaceAsString() + "_" + tableName.getQualifierAsString(); + + snapshotTable(admin, tableName, snapshotName); + backupInfo.setSnapshotName(tableName, snapshotName); + } + failStageIf(3); + // SNAPSHOT_COPY: + // do snapshot copy + LOG.debug("snapshot copy for " + backupId); + snapshotCopy(backupInfo); + // Updates incremental backup table set + backupManager.addIncrementalBackupTableSet(backupInfo.getTables()); + + // BACKUP_COMPLETE: + // set overall backup status: complete. Here we make sure to complete the backup. + // After this checkpoint, even if entering cancel process, will let the backup finished + backupInfo.setState(BackupState.COMPLETE); + // The table list in backupInfo is good for both full backup and incremental backup. + // For incremental backup, it contains the incremental backup table set. + backupManager.writeRegionServerLogTimestamp(backupInfo.getTables(), newTimestamps); + + HashMap> newTableSetTimestampMap = + backupManager.readLogTimestampMap(); + + Long newStartCode = + BackupUtils.getMinValue(BackupUtils + .getRSLogTimestampMins(newTableSetTimestampMap)); + backupManager.writeBackupStartCode(newStartCode); + failStageIf(4); + // backup complete + completeBackup(conn, backupInfo, backupManager, BackupType.FULL, conf); + + } catch (Exception e) { + failBackup(conn, backupInfo, backupManager, e, "Unexpected BackupException : ", + BackupType.FULL, conf); + throw new IOException(e); + } + + } + + + + @VisibleForTesting + protected int getTestStageId() { + return conf.getInt(BACKUP_TEST_MODE_STAGE, 0); + } + + @VisibleForTesting + + protected void failStageIf(int stage) throws IOException { + int current = getTestStageId(); + if (current == stage) { + throw new IOException("Failed stage " + stage+" in testing"); + } + } + + } + + @Test + public void testFullBackupWithFailures() throws Exception { + conf1.set(TableBackupClient.BACKUP_CLIENT_IMPL_CLASS, + FullTableBackupClientForTest.class.getName()); + int stage = (new Random()).nextInt(5); + // Fail random stage between 0 and 4 inclusive + LOG.info("Running stage " + stage); + runBackupAndFailAtStage(stage); + } + + public void runBackupAndFailAtStage(int stage) throws Exception { + + conf1.setInt(FullTableBackupClientForTest.BACKUP_TEST_MODE_STAGE, stage); + try (BackupSystemTable table = new BackupSystemTable(TEST_UTIL.getConnection())) { + int before = table.getBackupHistory().size(); + String[] args = + new String[] { "create", "full", BACKUP_ROOT_DIR, "-t", + table1.getNameAsString() + "," + table2.getNameAsString() }; + // Run backup + int ret = ToolRunner.run(conf1, new BackupDriver(), args); + assertFalse(ret == 0); + List backups = table.getBackupHistory(); + int after = table.getBackupHistory().size(); + + assertTrue(after == before +1); + for (BackupInfo data : backups) { + String backupId = data.getBackupId(); + assertFalse(checkSucceeded(backupId)); + } + Set tables = table.getIncrementalBackupTableSet(BACKUP_ROOT_DIR); + assertTrue(tables.size() == 0); + } + } + + +} \ No newline at end of file