From 910b68082c8f200f0ba6395a76b7ee1c8917e401 Mon Sep 17 00:00:00 2001 From: tedyu Date: Tue, 4 Apr 2017 18:20:11 -0700 Subject: [PATCH] HBASE-14141 HBase Backup/Restore Phase 3: Filter WALs on backup to include only edits from backed up tables (Vladimir Rodionov) --- .../hbase/backup/impl/BackupManager.java | 2 +- .../backup/impl/IncrementalBackupManager.java | 89 ++++++-- .../impl/IncrementalTableBackupClient.java | 211 ++++++++++-------- .../backup/impl/RestoreTablesClient.java | 5 +- .../hbase/backup/impl/TableBackupClient.java | 4 - .../backup/mapreduce/HFileSplitterJob.java | 2 +- .../backup/mapreduce/MapReduceRestoreJob.java | 14 +- .../hadoop/hbase/backup/util/RestoreTool.java | 134 ++--------- .../hbase/mapreduce/WALInputFormat.java | 119 +++++++--- .../hadoop/hbase/mapreduce/WALPlayer.java | 10 +- .../hbase/wal/AbstractFSWALProvider.java | 101 +++++++++ 11 files changed, 410 insertions(+), 281 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupManager.java index c09ce481245..f09310f7e98 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupManager.java @@ -466,7 +466,7 @@ public class BackupManager implements Closeable { /** * Saves list of WAL files after incremental backup operation. These files will be stored until - * TTL expiration and are used by Backup Log Cleaner plugin to determine which WAL files can be + * TTL expiration and are used by Backup Log Cleaner plug-in to determine which WAL files can be * safely purged. */ public void recordWALFiles(List files) throws IOException { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalBackupManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalBackupManager.java index 0f1453ee90e..63308994e22 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalBackupManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalBackupManager.java @@ -21,8 +21,10 @@ package org.apache.hadoop.hbase.backup.impl; import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; +import java.util.HashSet; import java.util.Iterator; import java.util.List; +import java.util.Set; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -33,7 +35,6 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PathFilter; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.backup.BackupInfo; import org.apache.hadoop.hbase.backup.impl.BackupSystemTable.WALItem; import org.apache.hadoop.hbase.backup.master.LogRollMasterProcedureManager; import org.apache.hadoop.hbase.backup.util.BackupUtils; @@ -59,12 +60,10 @@ public class IncrementalBackupManager extends BackupManager { /** * Obtain the list of logs that need to be copied out for this incremental backup. The list is set * in BackupInfo. - * @param conn the Connection - * @param backupInfo backup info - * @return The new HashMap of RS log timestamps after the log roll for this incremental backup. + * @return The new HashMap of RS log time stamps after the log roll for this incremental backup. * @throws IOException exception */ - public HashMap getIncrBackupLogFileList(Connection conn, BackupInfo backupInfo) + public HashMap getIncrBackupLogFileMap() throws IOException { List logList; HashMap newTimestamps; @@ -105,40 +104,84 @@ public class IncrementalBackupManager extends BackupManager { List logFromSystemTable = getLogFilesFromBackupSystem(previousTimestampMins, newTimestamps, getBackupInfo() .getBackupRootDir()); - addLogsFromBackupSystemToContext(logFromSystemTable); - logList = excludeAlreadyBackedUpWALs(logList, logFromSystemTable); backupInfo.setIncrBackupFileList(logList); return newTimestamps; } - private List excludeAlreadyBackedUpWALs(List logList, - List logFromSystemTable) { + /** + * Get list of WAL files eligible for incremental backup + * @return list of WAL files + * @throws IOException + */ + public List getIncrBackupLogFileList() + throws IOException { + List logList; + HashMap newTimestamps; + HashMap previousTimestampMins; + + String savedStartCode = readBackupStartCode(); + + // key: tableName + // value: + HashMap> previousTimestampMap = readLogTimestampMap(); + + previousTimestampMins = BackupUtils.getRSLogTimestampMins(previousTimestampMap); + + if (LOG.isDebugEnabled()) { + LOG.debug("StartCode " + savedStartCode + "for backupID " + backupInfo.getBackupId()); + } + // get all new log files from .logs and .oldlogs after last TS and before new timestamp + if (savedStartCode == null || previousTimestampMins == null + || previousTimestampMins.isEmpty()) { + throw new IOException( + "Cannot read any previous back up timestamps from backup system table. " + + "In order to create an incremental backup, at least one full backup is needed."); + } + + newTimestamps = readRegionServerLastLogRollResult(); + + logList = getLogFilesForNewBackup(previousTimestampMins, newTimestamps, conf, savedStartCode); + List logFromSystemTable = + getLogFilesFromBackupSystem(previousTimestampMins, newTimestamps, getBackupInfo() + .getBackupRootDir()); + + logList = excludeAlreadyBackedUpWALs(logList, logFromSystemTable); + backupInfo.setIncrBackupFileList(logList); - List backupedWALList = toWALList(logFromSystemTable); - logList.removeAll(backupedWALList); return logList; } - private List toWALList(List logFromSystemTable) { - List list = new ArrayList(logFromSystemTable.size()); - for (WALItem item : logFromSystemTable) { - list.add(item.getWalFile()); + private List excludeAlreadyBackedUpWALs(List logList, + List logFromSystemTable) { + + Set walFileNameSet = convertToSet(logFromSystemTable); + + List list = new ArrayList(); + for (int i=0; i < logList.size(); i++) { + Path p = new Path(logList.get(i)); + String name = p.getName(); + if (walFileNameSet.contains(name)) continue; + list.add(logList.get(i)); } return list; } - private void addLogsFromBackupSystemToContext(List logFromSystemTable) { - List walFiles = new ArrayList(); - for (WALItem item : logFromSystemTable) { - Path p = new Path(item.getWalFile()); - String walFileName = p.getName(); - String backupId = item.getBackupId(); - String relWALPath = backupId + Path.SEPARATOR + walFileName; - walFiles.add(relWALPath); + /** + * Create Set of WAL file names (not full path names) + * @param logFromSystemTable + * @return set of WAL file names + */ + private Set convertToSet(List logFromSystemTable) { + + Set set = new HashSet(); + for (int i=0; i < logFromSystemTable.size(); i++) { + WALItem item = logFromSystemTable.get(i); + set.add(item.walFile); } + return set; } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java index 8f6f2642499..3003c933b7a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java @@ -26,6 +26,7 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.TreeMap; import org.apache.commons.lang.StringUtils; @@ -34,7 +35,6 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.backup.BackupCopyJob; import org.apache.hadoop.hbase.backup.BackupInfo; @@ -45,11 +45,15 @@ import org.apache.hadoop.hbase.backup.BackupRestoreFactory; import org.apache.hadoop.hbase.backup.BackupType; import org.apache.hadoop.hbase.backup.util.BackupUtils; import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.mapreduce.WALPlayer; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.HFileArchiveUtil; import org.apache.hadoop.hbase.util.Pair; +import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; +import org.apache.hadoop.util.Tool; /** * Incremental backup implementation. @@ -69,7 +73,8 @@ public class IncrementalTableBackupClient extends TableBackupClient { FileSystem fs = FileSystem.get(conf); List list = new ArrayList(); for (String file : incrBackupFileList) { - if (fs.exists(new Path(file))) { + Path p = new Path(file); + if (fs.exists(p) || isActiveWalPath(p)) { list.add(file); } else { LOG.warn("Can't find file: " + file); @@ -78,90 +83,13 @@ public class IncrementalTableBackupClient extends TableBackupClient { return list; } - private List getMissingFiles(List incrBackupFileList) throws IOException { - FileSystem fs = FileSystem.get(conf); - List list = new ArrayList(); - for (String file : incrBackupFileList) { - if (!fs.exists(new Path(file))) { - list.add(file); - } - } - return list; - - } - /** - * Do incremental copy. - * @param backupInfo backup info + * Check if a given path is belongs to active WAL directory + * @param p path + * @return true, if yes */ - private void incrementalCopy(BackupInfo backupInfo) throws Exception { - - LOG.info("Incremental copy is starting."); - // set overall backup phase: incremental_copy - backupInfo.setPhase(BackupPhase.INCREMENTAL_COPY); - // get incremental backup file list and prepare parms for DistCp - List incrBackupFileList = backupInfo.getIncrBackupFileList(); - // filter missing files out (they have been copied by previous backups) - incrBackupFileList = filterMissingFiles(incrBackupFileList); - String[] strArr = incrBackupFileList.toArray(new String[incrBackupFileList.size() + 1]); - strArr[strArr.length - 1] = backupInfo.getHLogTargetDir(); - - BackupCopyJob copyService = BackupRestoreFactory.getBackupCopyJob(conf); - int counter = 0; - int MAX_ITERAIONS = 2; - while (counter++ < MAX_ITERAIONS) { - // We run DistCp maximum 2 times - // If it fails on a second time, we throw Exception - int res = - copyService.copy(backupInfo, backupManager, conf, BackupType.INCREMENTAL, strArr); - - if (res != 0) { - LOG.error("Copy incremental log files failed with return code: " + res + "."); - throw new IOException("Failed of Hadoop Distributed Copy from " - + StringUtils.join(incrBackupFileList, ",") + " to " - + backupInfo.getHLogTargetDir()); - } - List missingFiles = getMissingFiles(incrBackupFileList); - - if (missingFiles.isEmpty()) { - break; - } else { - // Repeat DistCp, some files have been moved from WALs to oldWALs during previous run - // update backupInfo and strAttr - if (counter == MAX_ITERAIONS) { - String msg = - "DistCp could not finish the following files: " + StringUtils.join(missingFiles, ","); - LOG.error(msg); - throw new IOException(msg); - } - List converted = convertFilesFromWALtoOldWAL(missingFiles); - incrBackupFileList.removeAll(missingFiles); - incrBackupFileList.addAll(converted); - backupInfo.setIncrBackupFileList(incrBackupFileList); - - // Run DistCp only for missing files (which have been moved from WALs to oldWALs - // during previous run) - strArr = converted.toArray(new String[converted.size() + 1]); - strArr[strArr.length - 1] = backupInfo.getHLogTargetDir(); - } - } - - LOG.info("Incremental copy from " + StringUtils.join(incrBackupFileList, ",") + " to " - + backupInfo.getHLogTargetDir() + " finished."); - } - - private List convertFilesFromWALtoOldWAL(List missingFiles) throws IOException { - List list = new ArrayList(); - for (String path : missingFiles) { - if (path.indexOf(Path.SEPARATOR + HConstants.HREGION_LOGDIR_NAME) < 0) { - LOG.error("Copy incremental log files failed, file is missing : " + path); - throw new IOException("Failed of Hadoop Distributed Copy to " - + backupInfo.getHLogTargetDir() + ", file is missing " + path); - } - list.add(path.replace(Path.SEPARATOR + HConstants.HREGION_LOGDIR_NAME, Path.SEPARATOR - + HConstants.HREGION_OLDLOGDIR_NAME)); - } - return list; + private boolean isActiveWalPath(Path p) { + return !AbstractFSWALProvider.isArchivedLogFile(p); } static int getIndex(TableName tbl, List sTableList) { @@ -286,7 +214,7 @@ public class IncrementalTableBackupClient extends TableBackupClient { + backupManager.getIncrementalBackupTableSet()); try { newTimestamps = - ((IncrementalBackupManager) backupManager).getIncrBackupLogFileList(conn, backupInfo); + ((IncrementalBackupManager) backupManager).getIncrBackupLogFileMap(); } catch (Exception e) { // fail the overall backup and return failBackup(conn, backupInfo, backupManager, e, "Unexpected Exception : ", @@ -297,13 +225,16 @@ public class IncrementalTableBackupClient extends TableBackupClient { try { // copy out the table and region info files for each table BackupUtils.copyTableRegionInfo(conn, backupInfo, conf); - incrementalCopy(backupInfo); + // convert WAL to HFiles and copy them to .tmp under BACKUP_ROOT + convertWALsToHFiles(backupInfo); + incrementalCopyHFiles(backupInfo); // Save list of WAL files copied backupManager.recordWALFiles(backupInfo.getIncrBackupFileList()); } catch (Exception e) { String msg = "Unexpected exception in incremental-backup: incremental copy " + backupId; // fail the overall backup and return failBackup(conn, backupInfo, backupManager, e, msg, BackupType.INCREMENTAL, conf); + return; } // case INCR_BACKUP_COMPLETE: // set overall backup status: complete. Here we make sure to complete the backup. @@ -323,8 +254,7 @@ public class IncrementalTableBackupClient extends TableBackupClient { backupManager.readLogTimestampMap(); Long newStartCode = - BackupUtils.getMinValue(BackupUtils - .getRSLogTimestampMins(newTableSetTimestampMap)); + BackupUtils.getMinValue(BackupUtils.getRSLogTimestampMins(newTableSetTimestampMap)); backupManager.writeBackupStartCode(newStartCode); handleBulkLoad(backupInfo.getTableNames()); @@ -337,4 +267,109 @@ public class IncrementalTableBackupClient extends TableBackupClient { } } + private void incrementalCopyHFiles(BackupInfo backupInfo) throws Exception { + + try { + LOG.debug("Incremental copy HFiles is starting."); + // set overall backup phase: incremental_copy + backupInfo.setPhase(BackupPhase.INCREMENTAL_COPY); + // get incremental backup file list and prepare parms for DistCp + List incrBackupFileList = new ArrayList(); + // Add Bulk output + incrBackupFileList.add(getBulkOutputDir().toString()); + String[] strArr = incrBackupFileList.toArray(new String[incrBackupFileList.size() + 1]); + strArr[strArr.length - 1] = backupInfo.getBackupRootDir(); + BackupCopyJob copyService = BackupRestoreFactory.getBackupCopyJob(conf); + int res = copyService.copy(backupInfo, backupManager, conf, BackupType.INCREMENTAL, strArr); + if (res != 0) { + LOG.error("Copy incremental HFile files failed with return code: " + res + "."); + throw new IOException("Failed copy from " + StringUtils.join(incrBackupFileList, ',') + + " to " + backupInfo.getHLogTargetDir()); + } + LOG.debug("Incremental copy HFiles from " + StringUtils.join(incrBackupFileList, ',') + + " to " + backupInfo.getBackupRootDir() + " finished."); + } finally { + deleteBulkLoadDirectory(); + } + } + + private void deleteBulkLoadDirectory() throws IOException { + // delete original bulk load directory on method exit + Path path = getBulkOutputDir(); + FileSystem fs = FileSystem.get(conf); + boolean result = fs.delete(path, true); + if (!result) { + LOG.warn("Could not delete " + path); + } + + } + + private void convertWALsToHFiles(BackupInfo backupInfo) throws IOException { + // get incremental backup file list and prepare parameters for DistCp + List incrBackupFileList = backupInfo.getIncrBackupFileList(); + // Get list of tables in incremental backup set + Set tableSet = backupManager.getIncrementalBackupTableSet(); + // filter missing files out (they have been copied by previous backups) + incrBackupFileList = filterMissingFiles(incrBackupFileList); + for (TableName table : tableSet) { + // Check if table exists + if (tableExists(table, conn)) { + walToHFiles(incrBackupFileList, table); + } else { + LOG.warn("Table " + table + " does not exists. Skipping in WAL converter"); + } + } + } + + + private boolean tableExists(TableName table, Connection conn) throws IOException { + try (Admin admin = conn.getAdmin();) { + return admin.tableExists(table); + } + } + + private void walToHFiles(List dirPaths, TableName tableName) throws IOException { + + Tool player = new WALPlayer(); + + // Player reads all files in arbitrary directory structure and creates + // a Map task for each file. We use ';' as separator + // because WAL file names contains ',' + String dirs = StringUtils.join(dirPaths, ';'); + + Path bulkOutputPath = getBulkOutputDirForTable(tableName); + conf.set(WALPlayer.BULK_OUTPUT_CONF_KEY, bulkOutputPath.toString()); + conf.set(WALPlayer.INPUT_FILES_SEPARATOR_KEY, ";"); + String[] playerArgs = { dirs, tableName.getNameAsString() }; + + try { + player.setConf(conf); + int result = player.run(playerArgs); + if(result != 0) { + throw new IOException("WAL Player failed"); + } + conf.unset(WALPlayer.INPUT_FILES_SEPARATOR_KEY); + } catch (IOException e) { + throw e; + } catch (Exception ee) { + throw new IOException("Can not convert from directory " + dirs + + " (check Hadoop, HBase and WALPlayer M/R job logs) ", ee); + } + } + + private Path getBulkOutputDirForTable(TableName table) { + Path tablePath = getBulkOutputDir(); + tablePath = new Path(tablePath, table.getNamespaceAsString()); + tablePath = new Path(tablePath, table.getQualifierAsString()); + return new Path(tablePath, "data"); + } + + private Path getBulkOutputDir() { + String backupId = backupInfo.getBackupId(); + Path path = new Path(backupInfo.getBackupRootDir()); + path = new Path(path, ".tmp"); + path = new Path(path, backupId); + return path; + } + } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/RestoreTablesClient.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/RestoreTablesClient.java index 2e4ecce6268..381e9b15dfa 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/RestoreTablesClient.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/RestoreTablesClient.java @@ -169,8 +169,9 @@ public class RestoreTablesClient { // full backup path comes first for (int i = 1; i < images.length; i++) { BackupImage im = images[i]; - String logBackupDir = HBackupFileSystem.getLogBackupDir(im.getRootDir(), im.getBackupId()); - dirList.add(new Path(logBackupDir)); + String fileBackupDir = HBackupFileSystem.getTableBackupDir(im.getRootDir(), + im.getBackupId(), sTable)+ Path.SEPARATOR+"data"; + dirList.add(new Path(fileBackupDir)); } String dirs = StringUtils.join(dirList, ","); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/TableBackupClient.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/TableBackupClient.java index 42a80766ab4..125b5da0864 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/TableBackupClient.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/TableBackupClient.java @@ -351,10 +351,6 @@ public abstract class TableBackupClient { // add and store the manifest for the backup addManifest(backupInfo, backupManager, type, conf); - // after major steps done and manifest persisted, do convert if needed for incremental backup - /* in-fly convert code here, provided by future jira */ - LOG.debug("in-fly convert code here, provided by future jira"); - // compose the backup complete data String backupCompleteData = obtainBackupMetaDataStr(backupInfo) + ",startts=" + backupInfo.getStartTs() diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/HFileSplitterJob.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/HFileSplitterJob.java index 564172073fd..604e5021f5b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/HFileSplitterJob.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/HFileSplitterJob.java @@ -107,11 +107,11 @@ public class HFileSplitterJob extends Configured implements Tool { String inputDirs = args[0]; String tabName = args[1]; conf.setStrings(TABLES_KEY, tabName); + conf.set(FileInputFormat.INPUT_DIR, inputDirs); Job job = Job.getInstance(conf, conf.get(JOB_NAME_CONF_KEY, NAME + "_" + EnvironmentEdgeManager.currentTime())); job.setJarByClass(HFileSplitterJob.class); - FileInputFormat.addInputPaths(job, inputDirs); job.setInputFormatClass(HFileInputFormat.class); job.setMapOutputKeyClass(ImmutableBytesWritable.class); String hfileOutPath = conf.get(BULK_OUTPUT_CONF_KEY); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceRestoreJob.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceRestoreJob.java index 9bafe12ee2f..4161ca9ccac 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceRestoreJob.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceRestoreJob.java @@ -62,13 +62,8 @@ public class MapReduceRestoreJob implements RestoreJob { String bulkOutputConfKey; - if (fullBackupRestore) { - player = new HFileSplitterJob(); - bulkOutputConfKey = HFileSplitterJob.BULK_OUTPUT_CONF_KEY; - } else { - player = new WALPlayer(); - bulkOutputConfKey = WALPlayer.BULK_OUTPUT_CONF_KEY; - } + player = new HFileSplitterJob(); + bulkOutputConfKey = HFileSplitterJob.BULK_OUTPUT_CONF_KEY; // Player reads all files in arbitrary directory structure and creates // a Map task for each file String dirs = StringUtils.join(dirPaths, ","); @@ -88,7 +83,10 @@ public class MapReduceRestoreJob implements RestoreJob { Path bulkOutputPath = getBulkOutputDir(getFileNameCompatibleString(newTableNames[i])); Configuration conf = getConf(); conf.set(bulkOutputConfKey, bulkOutputPath.toString()); - String[] playerArgs = { dirs, tableNames[i].getNameAsString() }; + String[] playerArgs = + { dirs, + fullBackupRestore? newTableNames[i].getNameAsString():tableNames[i].getNameAsString() + }; int result = 0; int loaderResult = 0; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/util/RestoreTool.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/util/RestoreTool.java index 79adcabe326..d34701fb6fa 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/util/RestoreTool.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/util/RestoreTool.java @@ -46,8 +46,6 @@ import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.io.HFileLink; import org.apache.hadoop.hbase.io.hfile.HFile; import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles; -import org.apache.hadoop.hbase.regionserver.HRegionFileSystem; -import org.apache.hadoop.hbase.regionserver.HStore; import org.apache.hadoop.hbase.regionserver.StoreFileInfo; import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.SnapshotDescription; import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils; @@ -63,19 +61,13 @@ import org.apache.hadoop.hbase.util.FSTableDescriptors; public class RestoreTool { public static final Log LOG = LogFactory.getLog(BackupUtils.class); - - private final String[] ignoreDirs = { HConstants.RECOVERED_EDITS_DIR }; - private final static long TABLE_AVAILABILITY_WAIT_TIME = 180000; + private final String[] ignoreDirs = { HConstants.RECOVERED_EDITS_DIR }; protected Configuration conf = null; - protected Path backupRootPath; - protected String backupId; - protected FileSystem fs; - private final Path restoreTmpPath; // store table name and snapshot dir mapping private final HashMap snapshotMap = new HashMap<>(); @@ -86,9 +78,6 @@ public class RestoreTool { this.backupRootPath = backupRootPath; this.backupId = backupId; this.fs = backupRootPath.getFileSystem(conf); - this.restoreTmpPath = - new Path(conf.get(HConstants.TEMPORARY_FS_DIRECTORY_KEY, - HConstants.DEFAULT_TEMPORARY_HDFS_DIRECTORY), "restore"); } /** @@ -218,7 +207,7 @@ public class RestoreTool { public void fullRestoreTable(Connection conn, Path tableBackupPath, TableName tableName, TableName newTableName, boolean truncateIfExists, String lastIncrBackupId) throws IOException { - restoreTableAndCreate(conn, tableName, newTableName, tableBackupPath, truncateIfExists, + createAndRestoreTable(conn, tableName, newTableName, tableBackupPath, truncateIfExists, lastIncrBackupId); } @@ -281,48 +270,6 @@ public class RestoreTool { return tableDescriptor; } - /** - * Duplicate the backup image if it's on local cluster - * @see HStore#bulkLoadHFile(org.apache.hadoop.hbase.regionserver.StoreFile) - * @see HRegionFileSystem#bulkLoadStoreFile(String familyName, Path srcPath, long seqNum) - * @param tableArchivePath archive path - * @return the new tableArchivePath - * @throws IOException exception - */ - Path checkLocalAndBackup(Path tableArchivePath) throws IOException { - // Move the file if it's on local cluster - boolean isCopyNeeded = false; - - FileSystem srcFs = tableArchivePath.getFileSystem(conf); - FileSystem desFs = FileSystem.get(conf); - if (tableArchivePath.getName().startsWith("/")) { - isCopyNeeded = true; - } else { - // This should match what is done in @see HRegionFileSystem#bulkLoadStoreFile(String, Path, - // long) - if (srcFs.getUri().equals(desFs.getUri())) { - LOG.debug("cluster hold the backup image: " + srcFs.getUri() + "; local cluster node: " - + desFs.getUri()); - isCopyNeeded = true; - } - } - if (isCopyNeeded) { - LOG.debug("File " + tableArchivePath + " on local cluster, back it up before restore"); - if (desFs.exists(restoreTmpPath)) { - try { - desFs.delete(restoreTmpPath, true); - } catch (IOException e) { - LOG.debug("Failed to delete path: " + restoreTmpPath - + ", need to check whether restore target DFS cluster is healthy"); - } - } - FileUtil.copy(srcFs, tableArchivePath, desFs, restoreTmpPath, false, conf); - LOG.debug("Copied to temporary path on local cluster: " + restoreTmpPath); - tableArchivePath = restoreTmpPath; - } - return tableArchivePath; - } - private HTableDescriptor getTableDescriptor(FileSystem fileSys, TableName tableName, String lastIncrBackupId) throws IOException { if (lastIncrBackupId != null) { @@ -334,7 +281,7 @@ public class RestoreTool { return null; } - private void restoreTableAndCreate(Connection conn, TableName tableName, TableName newTableName, + private void createAndRestoreTable(Connection conn, TableName tableName, TableName newTableName, Path tableBackupPath, boolean truncateIfExists, String lastIncrBackupId) throws IOException { if (newTableName == null) { newTableName = tableName; @@ -403,33 +350,13 @@ public class RestoreTool { // the regions in fine grain checkAndCreateTable(conn, tableBackupPath, tableName, newTableName, regionPathList, tableDescriptor, truncateIfExists); - if (tableArchivePath != null) { - // start real restore through bulkload - // if the backup target is on local cluster, special action needed - Path tempTableArchivePath = checkLocalAndBackup(tableArchivePath); - if (tempTableArchivePath.equals(tableArchivePath)) { - if (LOG.isDebugEnabled()) { - LOG.debug("TableArchivePath for bulkload using existPath: " + tableArchivePath); - } - } else { - regionPathList = getRegionList(tempTableArchivePath); // point to the tempDir - if (LOG.isDebugEnabled()) { - LOG.debug("TableArchivePath for bulkload using tempPath: " + tempTableArchivePath); - } - } + RestoreJob restoreService = BackupRestoreFactory.getRestoreJob(conf); + Path[] paths = new Path[regionPathList.size()]; + regionPathList.toArray(paths); + restoreService.run(paths, new TableName[]{tableName}, new TableName[] {newTableName}, true); - LoadIncrementalHFiles loader = createLoader(tempTableArchivePath, false); - for (Path regionPath : regionPathList) { - String regionName = regionPath.toString(); - if (LOG.isDebugEnabled()) { - LOG.debug("Restoring HFiles from directory " + regionName); - } - String[] args = { regionName, newTableName.getNameAsString() }; - loader.run(args); - } - } - // we do not recovered edits } catch (Exception e) { + LOG.error(e); throw new IllegalStateException("Cannot restore hbase table", e); } } @@ -452,28 +379,6 @@ public class RestoreTool { return regionDirList; } - /** - * Create a {@link LoadIncrementalHFiles} instance to be used to restore the HFiles of a full - * backup. - * @return the {@link LoadIncrementalHFiles} instance - * @throws IOException exception - */ - private LoadIncrementalHFiles createLoader(Path tableArchivePath, boolean multipleTables) - throws IOException { - - // By default, it is 32 and loader will fail if # of files in any region exceed this - // limit. Bad for snapshot restore. - this.conf.setInt(LoadIncrementalHFiles.MAX_FILES_PER_REGION_PER_FAMILY, Integer.MAX_VALUE); - this.conf.set(LoadIncrementalHFiles.IGNORE_UNMATCHED_CF_CONF_KEY, "yes"); - LoadIncrementalHFiles loader = null; - try { - loader = new LoadIncrementalHFiles(this.conf); - } catch (Exception e1) { - throw new IOException(e1); - } - return loader; - } - /** * Calculate region boundaries and add all the column families to the table descriptor * @param regionDirList region dir list @@ -591,17 +496,18 @@ public class RestoreTool { // create table using table descriptor and region boundaries admin.createTable(htd, keys); } - long startTime = EnvironmentEdgeManager.currentTime(); - while (!admin.isTableAvailable(targetTableName, keys)) { - try { - Thread.sleep(100); - } catch (InterruptedException ie) { - Thread.currentThread().interrupt(); - } - if (EnvironmentEdgeManager.currentTime() - startTime > TABLE_AVAILABILITY_WAIT_TIME) { - throw new IOException("Time out " + TABLE_AVAILABILITY_WAIT_TIME + "ms expired, table " - + targetTableName + " is still not available"); - } + + } + long startTime = EnvironmentEdgeManager.currentTime(); + while (!admin.isTableAvailable(targetTableName)) { + try { + Thread.sleep(100); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + } + if (EnvironmentEdgeManager.currentTime() - startTime > TABLE_AVAILABILITY_WAIT_TIME) { + throw new IOException("Time out " + TABLE_AVAILABILITY_WAIT_TIME + "ms expired, table " + + targetTableName + " is still not available"); } } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/WALInputFormat.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/WALInputFormat.java index 0ca78b42731..8b4e96731a9 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/WALInputFormat.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/WALInputFormat.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.mapreduce; import java.io.DataInput; import java.io.DataOutput; import java.io.EOFException; +import java.io.FileNotFoundException; import java.io.IOException; import java.util.ArrayList; import java.util.Collections; @@ -35,10 +36,10 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.regionserver.wal.WALEdit; +import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; import org.apache.hadoop.hbase.wal.WAL; import org.apache.hadoop.hbase.wal.WAL.Entry; import org.apache.hadoop.hbase.wal.WAL.Reader; -import org.apache.hadoop.hbase.wal.WALFactory; import org.apache.hadoop.hbase.wal.WALKey; import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapreduce.InputFormat; @@ -46,6 +47,7 @@ import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.util.StringUtils; /** @@ -142,56 +144,89 @@ public class WALInputFormat extends InputFormat { Entry currentEntry = new Entry(); private long startTime; private long endTime; + private Configuration conf; + private Path logFile; + private long currentPos; @Override public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { WALSplit hsplit = (WALSplit)split; - Path logFile = new Path(hsplit.getLogFileName()); - Configuration conf = context.getConfiguration(); + logFile = new Path(hsplit.getLogFileName()); + conf = context.getConfiguration(); LOG.info("Opening reader for "+split); - try { - this.reader = WALFactory.createReader(logFile.getFileSystem(conf), logFile, conf); - } catch (EOFException x) { - LOG.info("Ignoring corrupted WAL file: " + logFile - + " (This is normal when a RegionServer crashed.)"); - this.reader = null; - } + openReader(logFile); this.startTime = hsplit.getStartTime(); this.endTime = hsplit.getEndTime(); } + private void openReader(Path path) throws IOException + { + closeReader(); + reader = AbstractFSWALProvider.openReader(path, conf); + seek(); + setCurrentPath(path); + } + + private void setCurrentPath(Path path) { + this.logFile = path; + } + + private void closeReader() throws IOException { + if (reader != null) { + reader.close(); + reader = null; + } + } + + private void seek() throws IOException { + if (currentPos != 0) { + reader.seek(currentPos); + } + } + @Override public boolean nextKeyValue() throws IOException, InterruptedException { if (reader == null) return false; - + this.currentPos = reader.getPosition(); Entry temp; long i = -1; - do { - // skip older entries - try { - temp = reader.next(currentEntry); - i++; - } catch (EOFException x) { - LOG.warn("Corrupted entry detected. Ignoring the rest of the file." - + " (This is normal when a RegionServer crashed.)"); + try { + do { + // skip older entries + try { + temp = reader.next(currentEntry); + i++; + } catch (EOFException x) { + LOG.warn("Corrupted entry detected. Ignoring the rest of the file." + + " (This is normal when a RegionServer crashed.)"); + return false; + } + } while (temp != null && temp.getKey().getWriteTime() < startTime); + + if (temp == null) { + if (i > 0) LOG.info("Skipped " + i + " entries."); + LOG.info("Reached end of file."); return false; + } else if (i > 0) { + LOG.info("Skipped " + i + " entries, until ts: " + temp.getKey().getWriteTime() + "."); + } + boolean res = temp.getKey().getWriteTime() <= endTime; + if (!res) { + LOG.info("Reached ts: " + temp.getKey().getWriteTime() + + " ignoring the rest of the file."); + } + return res; + } catch (IOException e) { + Path archivedLog = AbstractFSWALProvider.getArchivedLogPath(logFile, conf); + if (logFile != archivedLog) { + openReader(archivedLog); + // Try call again in recursion + return nextKeyValue(); + } else { + throw e; } } - while(temp != null && temp.getKey().getWriteTime() < startTime); - - if (temp == null) { - if (i > 0) LOG.info("Skipped " + i + " entries."); - LOG.info("Reached end of file."); - return false; - } else if (i > 0) { - LOG.info("Skipped " + i + " entries, until ts: " + temp.getKey().getWriteTime() + "."); - } - boolean res = temp.getKey().getWriteTime() <= endTime; - if (!res) { - LOG.info("Reached ts: " + temp.getKey().getWriteTime() + " ignoring the rest of the file."); - } - return res; } @Override @@ -235,6 +270,7 @@ public class WALInputFormat extends InputFormat { List getSplits(final JobContext context, final String startKey, final String endKey) throws IOException, InterruptedException { Configuration conf = context.getConfiguration(); + boolean ignoreMissing = conf.getBoolean(WALPlayer.IGNORE_MISSING_FILES, false); Path[] inputPaths = getInputPaths(conf); long startTime = conf.getLong(startKey, Long.MIN_VALUE); long endTime = conf.getLong(endKey, Long.MAX_VALUE); @@ -242,8 +278,16 @@ public class WALInputFormat extends InputFormat { List allFiles = new ArrayList(); for(Path inputPath: inputPaths){ FileSystem fs = inputPath.getFileSystem(conf); - List files = getFiles(fs, inputPath, startTime, endTime); - allFiles.addAll(files); + try { + List files = getFiles(fs, inputPath, startTime, endTime); + allFiles.addAll(files); + } catch (FileNotFoundException e) { + if (ignoreMissing) { + LOG.warn("File "+ inputPath +" is missing. Skipping it."); + continue; + } + throw e; + } } List splits = new ArrayList(allFiles.size()); for (FileStatus file : allFiles) { @@ -253,8 +297,9 @@ public class WALInputFormat extends InputFormat { } private Path[] getInputPaths(Configuration conf) { - String inpDirs = conf.get("mapreduce.input.fileinputformat.inputdir"); - return StringUtils.stringToPath(inpDirs.split(",")); + String inpDirs = conf.get(FileInputFormat.INPUT_DIR); + return StringUtils.stringToPath( + inpDirs.split(conf.get(WALPlayer.INPUT_FILES_SEPARATOR_KEY, ","))); } private List getFiles(FileSystem fs, Path dir, long startTime, long endTime) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java index d16dcf54b39..d15ffcf2803 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java @@ -58,7 +58,7 @@ import org.apache.hadoop.util.ToolRunner; /** * A tool to replay WAL files as a M/R job. * The WAL can be replayed for a set of tables or all tables, - * and a timerange can be provided (in milliseconds). + * and a time range can be provided (in milliseconds). * The WAL is filtered to the passed set of tables and the output * can optionally be mapped to another set of tables. * @@ -73,6 +73,9 @@ public class WALPlayer extends Configured implements Tool { public final static String BULK_OUTPUT_CONF_KEY = "wal.bulk.output"; public final static String TABLES_KEY = "wal.input.tables"; public final static String TABLE_MAP_KEY = "wal.input.tablesmap"; + public final static String INPUT_FILES_SEPARATOR_KEY = "wal.input.separator"; + public final static String IGNORE_MISSING_FILES = "wal.input.ignore.missing.files"; + // This relies on Hadoop Configuration to handle warning about deprecated configs and // to set the correct non-deprecated configs when an old one shows up. @@ -128,7 +131,9 @@ public class WALPlayer extends Configured implements Tool { throw new IOException("Exactly one table must be specified for bulk HFile case."); } table = Bytes.toBytes(tables[0]); + } + } /** @@ -280,11 +285,10 @@ public class WALPlayer extends Configured implements Tool { } conf.setStrings(TABLES_KEY, tables); conf.setStrings(TABLE_MAP_KEY, tableMap); + conf.set(FileInputFormat.INPUT_DIR, inputDirs); Job job = Job.getInstance(conf, conf.get(JOB_NAME_CONF_KEY, NAME + "_" + System.currentTimeMillis())); job.setJarByClass(WALPlayer.class); - FileInputFormat.addInputPaths(job, inputDirs); - job.setInputFormatClass(WALInputFormat.class); job.setMapOutputKeyClass(ImmutableBytesWritable.class); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java index bf1493342f7..28b7fda2859 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hbase.wal; +import java.io.FileNotFoundException; import java.io.IOException; import java.util.ArrayList; import java.util.Collections; @@ -36,6 +37,9 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceStability; import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL; import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; +import org.apache.hadoop.hbase.util.CancelableProgressable; +import org.apache.hadoop.hbase.util.FSUtils; +import org.apache.hadoop.hbase.util.LeaseNotRecoveredException; import com.google.common.annotations.VisibleForTesting; @@ -373,6 +377,103 @@ public abstract class AbstractFSWALProvider> implemen return p.toString().contains(oldLog); } + /** + * Get the archived WAL file path + * @param path - active WAL file path + * @param conf - configuration + * @return archived path if exists, path - otherwise + * @throws IOException exception + */ + public static Path getArchivedLogPath(Path path, Configuration conf) throws IOException { + Path rootDir = FSUtils.getRootDir(conf); + Path oldLogDir = new Path(rootDir, HConstants.HREGION_OLDLOGDIR_NAME); + Path archivedLogLocation = new Path(oldLogDir, path.getName()); + final FileSystem fs = FSUtils.getCurrentFileSystem(conf); + + if (fs.exists(archivedLogLocation)) { + LOG.info("Log " + path + " was moved to " + archivedLogLocation); + return archivedLogLocation; + } else { + LOG.error("Couldn't locate log: " + path); + return path; + } + } + + /** + * Opens WAL reader with retries and + * additional exception handling + * @param path path to WAL file + * @param conf configuration + * @return WAL Reader instance + * @throws IOException + */ + public static org.apache.hadoop.hbase.wal.WAL.Reader + openReader(Path path, Configuration conf) + throws IOException + + { + long retryInterval = 2000; // 2 sec + int maxAttempts = 30; + int attempt = 0; + Exception ee = null; + org.apache.hadoop.hbase.wal.WAL.Reader reader = null; + while (reader == null && attempt++ < maxAttempts) { + try { + // Detect if this is a new file, if so get a new reader else + // reset the current reader so that we see the new data + reader = WALFactory.createReader(path.getFileSystem(conf), path, conf); + return reader; + } catch (FileNotFoundException fnfe) { + // If the log was archived, continue reading from there + Path archivedLog = AbstractFSWALProvider.getArchivedLogPath(path, conf); + if (path != archivedLog) { + return openReader(archivedLog, conf); + } else { + throw fnfe; + } + } catch (LeaseNotRecoveredException lnre) { + // HBASE-15019 the WAL was not closed due to some hiccup. + LOG.warn("Try to recover the WAL lease " + path, lnre); + recoverLease(conf, path); + reader = null; + ee = lnre; + } catch (NullPointerException npe) { + // Workaround for race condition in HDFS-4380 + // which throws a NPE if we open a file before any data node has the most recent block + // Just sleep and retry. Will require re-reading compressed WALs for compressionContext. + LOG.warn("Got NPE opening reader, will retry."); + reader = null; + ee = npe; + } + if (reader == null) { + // sleep before next attempt + try { + Thread.sleep(retryInterval); + } catch (InterruptedException e) { + } + } + } + throw new IOException("Could not open reader", ee); + } + + // For HBASE-15019 + private static void recoverLease(final Configuration conf, final Path path) { + try { + final FileSystem dfs = FSUtils.getCurrentFileSystem(conf); + FSUtils fsUtils = FSUtils.getInstance(dfs, conf); + fsUtils.recoverFileLease(dfs, path, conf, new CancelableProgressable() { + @Override + public boolean progress() { + LOG.debug("Still trying to recover WAL lease: " + path); + return true; + } + }); + } catch (IOException e) { + LOG.warn("unable to recover lease for WAL: " + path, e); + } + } + + /** * Get prefix of the log from its name, assuming WAL name in format of * log_prefix.filenumber.log_suffix