HBASE-14141 HBase Backup/Restore Phase 3: Filter WALs on backup to include only edits from backed up tables (Vladimir Rodionov)

This commit is contained in:
tedyu 2017-04-04 18:20:11 -07:00
parent e916b79db5
commit 910b68082c
11 changed files with 410 additions and 281 deletions

View File

@ -466,7 +466,7 @@ public class BackupManager implements Closeable {
/** /**
* Saves list of WAL files after incremental backup operation. These files will be stored until * Saves list of WAL files after incremental backup operation. These files will be stored until
* TTL expiration and are used by Backup Log Cleaner plugin to determine which WAL files can be * TTL expiration and are used by Backup Log Cleaner plug-in to determine which WAL files can be
* safely purged. * safely purged.
*/ */
public void recordWALFiles(List<String> files) throws IOException { public void recordWALFiles(List<String> files) throws IOException {

View File

@ -21,8 +21,10 @@ package org.apache.hadoop.hbase.backup.impl;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Set;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
@ -33,7 +35,6 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter; import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.backup.BackupInfo;
import org.apache.hadoop.hbase.backup.impl.BackupSystemTable.WALItem; import org.apache.hadoop.hbase.backup.impl.BackupSystemTable.WALItem;
import org.apache.hadoop.hbase.backup.master.LogRollMasterProcedureManager; import org.apache.hadoop.hbase.backup.master.LogRollMasterProcedureManager;
import org.apache.hadoop.hbase.backup.util.BackupUtils; import org.apache.hadoop.hbase.backup.util.BackupUtils;
@ -59,12 +60,10 @@ public class IncrementalBackupManager extends BackupManager {
/** /**
* Obtain the list of logs that need to be copied out for this incremental backup. The list is set * Obtain the list of logs that need to be copied out for this incremental backup. The list is set
* in BackupInfo. * in BackupInfo.
* @param conn the Connection * @return The new HashMap of RS log time stamps after the log roll for this incremental backup.
* @param backupInfo backup info
* @return The new HashMap of RS log timestamps after the log roll for this incremental backup.
* @throws IOException exception * @throws IOException exception
*/ */
public HashMap<String, Long> getIncrBackupLogFileList(Connection conn, BackupInfo backupInfo) public HashMap<String, Long> getIncrBackupLogFileMap()
throws IOException { throws IOException {
List<String> logList; List<String> logList;
HashMap<String, Long> newTimestamps; HashMap<String, Long> newTimestamps;
@ -105,40 +104,84 @@ public class IncrementalBackupManager extends BackupManager {
List<WALItem> logFromSystemTable = List<WALItem> logFromSystemTable =
getLogFilesFromBackupSystem(previousTimestampMins, newTimestamps, getBackupInfo() getLogFilesFromBackupSystem(previousTimestampMins, newTimestamps, getBackupInfo()
.getBackupRootDir()); .getBackupRootDir());
addLogsFromBackupSystemToContext(logFromSystemTable);
logList = excludeAlreadyBackedUpWALs(logList, logFromSystemTable); logList = excludeAlreadyBackedUpWALs(logList, logFromSystemTable);
backupInfo.setIncrBackupFileList(logList); backupInfo.setIncrBackupFileList(logList);
return newTimestamps; return newTimestamps;
} }
private List<String> excludeAlreadyBackedUpWALs(List<String> logList, /**
List<WALItem> logFromSystemTable) { * Get list of WAL files eligible for incremental backup
* @return list of WAL files
* @throws IOException
*/
public List<String> getIncrBackupLogFileList()
throws IOException {
List<String> logList;
HashMap<String, Long> newTimestamps;
HashMap<String, Long> previousTimestampMins;
String savedStartCode = readBackupStartCode();
// key: tableName
// value: <RegionServer,PreviousTimeStamp>
HashMap<TableName, HashMap<String, Long>> previousTimestampMap = readLogTimestampMap();
previousTimestampMins = BackupUtils.getRSLogTimestampMins(previousTimestampMap);
if (LOG.isDebugEnabled()) {
LOG.debug("StartCode " + savedStartCode + "for backupID " + backupInfo.getBackupId());
}
// get all new log files from .logs and .oldlogs after last TS and before new timestamp
if (savedStartCode == null || previousTimestampMins == null
|| previousTimestampMins.isEmpty()) {
throw new IOException(
"Cannot read any previous back up timestamps from backup system table. "
+ "In order to create an incremental backup, at least one full backup is needed.");
}
newTimestamps = readRegionServerLastLogRollResult();
logList = getLogFilesForNewBackup(previousTimestampMins, newTimestamps, conf, savedStartCode);
List<WALItem> logFromSystemTable =
getLogFilesFromBackupSystem(previousTimestampMins, newTimestamps, getBackupInfo()
.getBackupRootDir());
logList = excludeAlreadyBackedUpWALs(logList, logFromSystemTable);
backupInfo.setIncrBackupFileList(logList);
List<String> backupedWALList = toWALList(logFromSystemTable);
logList.removeAll(backupedWALList);
return logList; return logList;
} }
private List<String> toWALList(List<WALItem> logFromSystemTable) {
List<String> list = new ArrayList<String>(logFromSystemTable.size()); private List<String> excludeAlreadyBackedUpWALs(List<String> logList,
for (WALItem item : logFromSystemTable) { List<WALItem> logFromSystemTable) {
list.add(item.getWalFile());
Set<String> walFileNameSet = convertToSet(logFromSystemTable);
List<String> list = new ArrayList<String>();
for (int i=0; i < logList.size(); i++) {
Path p = new Path(logList.get(i));
String name = p.getName();
if (walFileNameSet.contains(name)) continue;
list.add(logList.get(i));
} }
return list; return list;
} }
private void addLogsFromBackupSystemToContext(List<WALItem> logFromSystemTable) { /**
List<String> walFiles = new ArrayList<String>(); * Create Set of WAL file names (not full path names)
for (WALItem item : logFromSystemTable) { * @param logFromSystemTable
Path p = new Path(item.getWalFile()); * @return set of WAL file names
String walFileName = p.getName(); */
String backupId = item.getBackupId(); private Set<String> convertToSet(List<WALItem> logFromSystemTable) {
String relWALPath = backupId + Path.SEPARATOR + walFileName;
walFiles.add(relWALPath); Set<String> set = new HashSet<String>();
for (int i=0; i < logFromSystemTable.size(); i++) {
WALItem item = logFromSystemTable.get(i);
set.add(item.walFile);
} }
return set;
} }
/** /**

View File

@ -26,6 +26,7 @@ import java.util.ArrayList;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set;
import java.util.TreeMap; import java.util.TreeMap;
import org.apache.commons.lang.StringUtils; import org.apache.commons.lang.StringUtils;
@ -34,7 +35,6 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.backup.BackupCopyJob; import org.apache.hadoop.hbase.backup.BackupCopyJob;
import org.apache.hadoop.hbase.backup.BackupInfo; import org.apache.hadoop.hbase.backup.BackupInfo;
@ -45,11 +45,15 @@ import org.apache.hadoop.hbase.backup.BackupRestoreFactory;
import org.apache.hadoop.hbase.backup.BackupType; import org.apache.hadoop.hbase.backup.BackupType;
import org.apache.hadoop.hbase.backup.util.BackupUtils; import org.apache.hadoop.hbase.backup.util.BackupUtils;
import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.mapreduce.WALPlayer;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.HFileArchiveUtil; import org.apache.hadoop.hbase.util.HFileArchiveUtil;
import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
import org.apache.hadoop.util.Tool;
/** /**
* Incremental backup implementation. * Incremental backup implementation.
@ -69,7 +73,8 @@ public class IncrementalTableBackupClient extends TableBackupClient {
FileSystem fs = FileSystem.get(conf); FileSystem fs = FileSystem.get(conf);
List<String> list = new ArrayList<String>(); List<String> list = new ArrayList<String>();
for (String file : incrBackupFileList) { for (String file : incrBackupFileList) {
if (fs.exists(new Path(file))) { Path p = new Path(file);
if (fs.exists(p) || isActiveWalPath(p)) {
list.add(file); list.add(file);
} else { } else {
LOG.warn("Can't find file: " + file); LOG.warn("Can't find file: " + file);
@ -78,90 +83,13 @@ public class IncrementalTableBackupClient extends TableBackupClient {
return list; return list;
} }
private List<String> getMissingFiles(List<String> incrBackupFileList) throws IOException {
FileSystem fs = FileSystem.get(conf);
List<String> list = new ArrayList<String>();
for (String file : incrBackupFileList) {
if (!fs.exists(new Path(file))) {
list.add(file);
}
}
return list;
}
/** /**
* Do incremental copy. * Check if a given path is belongs to active WAL directory
* @param backupInfo backup info * @param p path
* @return true, if yes
*/ */
private void incrementalCopy(BackupInfo backupInfo) throws Exception { private boolean isActiveWalPath(Path p) {
return !AbstractFSWALProvider.isArchivedLogFile(p);
LOG.info("Incremental copy is starting.");
// set overall backup phase: incremental_copy
backupInfo.setPhase(BackupPhase.INCREMENTAL_COPY);
// get incremental backup file list and prepare parms for DistCp
List<String> incrBackupFileList = backupInfo.getIncrBackupFileList();
// filter missing files out (they have been copied by previous backups)
incrBackupFileList = filterMissingFiles(incrBackupFileList);
String[] strArr = incrBackupFileList.toArray(new String[incrBackupFileList.size() + 1]);
strArr[strArr.length - 1] = backupInfo.getHLogTargetDir();
BackupCopyJob copyService = BackupRestoreFactory.getBackupCopyJob(conf);
int counter = 0;
int MAX_ITERAIONS = 2;
while (counter++ < MAX_ITERAIONS) {
// We run DistCp maximum 2 times
// If it fails on a second time, we throw Exception
int res =
copyService.copy(backupInfo, backupManager, conf, BackupType.INCREMENTAL, strArr);
if (res != 0) {
LOG.error("Copy incremental log files failed with return code: " + res + ".");
throw new IOException("Failed of Hadoop Distributed Copy from "
+ StringUtils.join(incrBackupFileList, ",") + " to "
+ backupInfo.getHLogTargetDir());
}
List<String> missingFiles = getMissingFiles(incrBackupFileList);
if (missingFiles.isEmpty()) {
break;
} else {
// Repeat DistCp, some files have been moved from WALs to oldWALs during previous run
// update backupInfo and strAttr
if (counter == MAX_ITERAIONS) {
String msg =
"DistCp could not finish the following files: " + StringUtils.join(missingFiles, ",");
LOG.error(msg);
throw new IOException(msg);
}
List<String> converted = convertFilesFromWALtoOldWAL(missingFiles);
incrBackupFileList.removeAll(missingFiles);
incrBackupFileList.addAll(converted);
backupInfo.setIncrBackupFileList(incrBackupFileList);
// Run DistCp only for missing files (which have been moved from WALs to oldWALs
// during previous run)
strArr = converted.toArray(new String[converted.size() + 1]);
strArr[strArr.length - 1] = backupInfo.getHLogTargetDir();
}
}
LOG.info("Incremental copy from " + StringUtils.join(incrBackupFileList, ",") + " to "
+ backupInfo.getHLogTargetDir() + " finished.");
}
private List<String> convertFilesFromWALtoOldWAL(List<String> missingFiles) throws IOException {
List<String> list = new ArrayList<String>();
for (String path : missingFiles) {
if (path.indexOf(Path.SEPARATOR + HConstants.HREGION_LOGDIR_NAME) < 0) {
LOG.error("Copy incremental log files failed, file is missing : " + path);
throw new IOException("Failed of Hadoop Distributed Copy to "
+ backupInfo.getHLogTargetDir() + ", file is missing " + path);
}
list.add(path.replace(Path.SEPARATOR + HConstants.HREGION_LOGDIR_NAME, Path.SEPARATOR
+ HConstants.HREGION_OLDLOGDIR_NAME));
}
return list;
} }
static int getIndex(TableName tbl, List<TableName> sTableList) { static int getIndex(TableName tbl, List<TableName> sTableList) {
@ -286,7 +214,7 @@ public class IncrementalTableBackupClient extends TableBackupClient {
+ backupManager.getIncrementalBackupTableSet()); + backupManager.getIncrementalBackupTableSet());
try { try {
newTimestamps = newTimestamps =
((IncrementalBackupManager) backupManager).getIncrBackupLogFileList(conn, backupInfo); ((IncrementalBackupManager) backupManager).getIncrBackupLogFileMap();
} catch (Exception e) { } catch (Exception e) {
// fail the overall backup and return // fail the overall backup and return
failBackup(conn, backupInfo, backupManager, e, "Unexpected Exception : ", failBackup(conn, backupInfo, backupManager, e, "Unexpected Exception : ",
@ -297,13 +225,16 @@ public class IncrementalTableBackupClient extends TableBackupClient {
try { try {
// copy out the table and region info files for each table // copy out the table and region info files for each table
BackupUtils.copyTableRegionInfo(conn, backupInfo, conf); BackupUtils.copyTableRegionInfo(conn, backupInfo, conf);
incrementalCopy(backupInfo); // convert WAL to HFiles and copy them to .tmp under BACKUP_ROOT
convertWALsToHFiles(backupInfo);
incrementalCopyHFiles(backupInfo);
// Save list of WAL files copied // Save list of WAL files copied
backupManager.recordWALFiles(backupInfo.getIncrBackupFileList()); backupManager.recordWALFiles(backupInfo.getIncrBackupFileList());
} catch (Exception e) { } catch (Exception e) {
String msg = "Unexpected exception in incremental-backup: incremental copy " + backupId; String msg = "Unexpected exception in incremental-backup: incremental copy " + backupId;
// fail the overall backup and return // fail the overall backup and return
failBackup(conn, backupInfo, backupManager, e, msg, BackupType.INCREMENTAL, conf); failBackup(conn, backupInfo, backupManager, e, msg, BackupType.INCREMENTAL, conf);
return;
} }
// case INCR_BACKUP_COMPLETE: // case INCR_BACKUP_COMPLETE:
// set overall backup status: complete. Here we make sure to complete the backup. // set overall backup status: complete. Here we make sure to complete the backup.
@ -323,8 +254,7 @@ public class IncrementalTableBackupClient extends TableBackupClient {
backupManager.readLogTimestampMap(); backupManager.readLogTimestampMap();
Long newStartCode = Long newStartCode =
BackupUtils.getMinValue(BackupUtils BackupUtils.getMinValue(BackupUtils.getRSLogTimestampMins(newTableSetTimestampMap));
.getRSLogTimestampMins(newTableSetTimestampMap));
backupManager.writeBackupStartCode(newStartCode); backupManager.writeBackupStartCode(newStartCode);
handleBulkLoad(backupInfo.getTableNames()); handleBulkLoad(backupInfo.getTableNames());
@ -337,4 +267,109 @@ public class IncrementalTableBackupClient extends TableBackupClient {
} }
} }
private void incrementalCopyHFiles(BackupInfo backupInfo) throws Exception {
try {
LOG.debug("Incremental copy HFiles is starting.");
// set overall backup phase: incremental_copy
backupInfo.setPhase(BackupPhase.INCREMENTAL_COPY);
// get incremental backup file list and prepare parms for DistCp
List<String> incrBackupFileList = new ArrayList<String>();
// Add Bulk output
incrBackupFileList.add(getBulkOutputDir().toString());
String[] strArr = incrBackupFileList.toArray(new String[incrBackupFileList.size() + 1]);
strArr[strArr.length - 1] = backupInfo.getBackupRootDir();
BackupCopyJob copyService = BackupRestoreFactory.getBackupCopyJob(conf);
int res = copyService.copy(backupInfo, backupManager, conf, BackupType.INCREMENTAL, strArr);
if (res != 0) {
LOG.error("Copy incremental HFile files failed with return code: " + res + ".");
throw new IOException("Failed copy from " + StringUtils.join(incrBackupFileList, ',')
+ " to " + backupInfo.getHLogTargetDir());
}
LOG.debug("Incremental copy HFiles from " + StringUtils.join(incrBackupFileList, ',')
+ " to " + backupInfo.getBackupRootDir() + " finished.");
} finally {
deleteBulkLoadDirectory();
}
}
private void deleteBulkLoadDirectory() throws IOException {
// delete original bulk load directory on method exit
Path path = getBulkOutputDir();
FileSystem fs = FileSystem.get(conf);
boolean result = fs.delete(path, true);
if (!result) {
LOG.warn("Could not delete " + path);
}
}
private void convertWALsToHFiles(BackupInfo backupInfo) throws IOException {
// get incremental backup file list and prepare parameters for DistCp
List<String> incrBackupFileList = backupInfo.getIncrBackupFileList();
// Get list of tables in incremental backup set
Set<TableName> tableSet = backupManager.getIncrementalBackupTableSet();
// filter missing files out (they have been copied by previous backups)
incrBackupFileList = filterMissingFiles(incrBackupFileList);
for (TableName table : tableSet) {
// Check if table exists
if (tableExists(table, conn)) {
walToHFiles(incrBackupFileList, table);
} else {
LOG.warn("Table " + table + " does not exists. Skipping in WAL converter");
}
}
}
private boolean tableExists(TableName table, Connection conn) throws IOException {
try (Admin admin = conn.getAdmin();) {
return admin.tableExists(table);
}
}
private void walToHFiles(List<String> dirPaths, TableName tableName) throws IOException {
Tool player = new WALPlayer();
// Player reads all files in arbitrary directory structure and creates
// a Map task for each file. We use ';' as separator
// because WAL file names contains ','
String dirs = StringUtils.join(dirPaths, ';');
Path bulkOutputPath = getBulkOutputDirForTable(tableName);
conf.set(WALPlayer.BULK_OUTPUT_CONF_KEY, bulkOutputPath.toString());
conf.set(WALPlayer.INPUT_FILES_SEPARATOR_KEY, ";");
String[] playerArgs = { dirs, tableName.getNameAsString() };
try {
player.setConf(conf);
int result = player.run(playerArgs);
if(result != 0) {
throw new IOException("WAL Player failed");
}
conf.unset(WALPlayer.INPUT_FILES_SEPARATOR_KEY);
} catch (IOException e) {
throw e;
} catch (Exception ee) {
throw new IOException("Can not convert from directory " + dirs
+ " (check Hadoop, HBase and WALPlayer M/R job logs) ", ee);
}
}
private Path getBulkOutputDirForTable(TableName table) {
Path tablePath = getBulkOutputDir();
tablePath = new Path(tablePath, table.getNamespaceAsString());
tablePath = new Path(tablePath, table.getQualifierAsString());
return new Path(tablePath, "data");
}
private Path getBulkOutputDir() {
String backupId = backupInfo.getBackupId();
Path path = new Path(backupInfo.getBackupRootDir());
path = new Path(path, ".tmp");
path = new Path(path, backupId);
return path;
}
} }

View File

@ -169,8 +169,9 @@ public class RestoreTablesClient {
// full backup path comes first // full backup path comes first
for (int i = 1; i < images.length; i++) { for (int i = 1; i < images.length; i++) {
BackupImage im = images[i]; BackupImage im = images[i];
String logBackupDir = HBackupFileSystem.getLogBackupDir(im.getRootDir(), im.getBackupId()); String fileBackupDir = HBackupFileSystem.getTableBackupDir(im.getRootDir(),
dirList.add(new Path(logBackupDir)); im.getBackupId(), sTable)+ Path.SEPARATOR+"data";
dirList.add(new Path(fileBackupDir));
} }
String dirs = StringUtils.join(dirList, ","); String dirs = StringUtils.join(dirList, ",");

View File

@ -351,10 +351,6 @@ public abstract class TableBackupClient {
// add and store the manifest for the backup // add and store the manifest for the backup
addManifest(backupInfo, backupManager, type, conf); addManifest(backupInfo, backupManager, type, conf);
// after major steps done and manifest persisted, do convert if needed for incremental backup
/* in-fly convert code here, provided by future jira */
LOG.debug("in-fly convert code here, provided by future jira");
// compose the backup complete data // compose the backup complete data
String backupCompleteData = String backupCompleteData =
obtainBackupMetaDataStr(backupInfo) + ",startts=" + backupInfo.getStartTs() obtainBackupMetaDataStr(backupInfo) + ",startts=" + backupInfo.getStartTs()

View File

@ -107,11 +107,11 @@ public class HFileSplitterJob extends Configured implements Tool {
String inputDirs = args[0]; String inputDirs = args[0];
String tabName = args[1]; String tabName = args[1];
conf.setStrings(TABLES_KEY, tabName); conf.setStrings(TABLES_KEY, tabName);
conf.set(FileInputFormat.INPUT_DIR, inputDirs);
Job job = Job job =
Job.getInstance(conf, Job.getInstance(conf,
conf.get(JOB_NAME_CONF_KEY, NAME + "_" + EnvironmentEdgeManager.currentTime())); conf.get(JOB_NAME_CONF_KEY, NAME + "_" + EnvironmentEdgeManager.currentTime()));
job.setJarByClass(HFileSplitterJob.class); job.setJarByClass(HFileSplitterJob.class);
FileInputFormat.addInputPaths(job, inputDirs);
job.setInputFormatClass(HFileInputFormat.class); job.setInputFormatClass(HFileInputFormat.class);
job.setMapOutputKeyClass(ImmutableBytesWritable.class); job.setMapOutputKeyClass(ImmutableBytesWritable.class);
String hfileOutPath = conf.get(BULK_OUTPUT_CONF_KEY); String hfileOutPath = conf.get(BULK_OUTPUT_CONF_KEY);

View File

@ -62,13 +62,8 @@ public class MapReduceRestoreJob implements RestoreJob {
String bulkOutputConfKey; String bulkOutputConfKey;
if (fullBackupRestore) { player = new HFileSplitterJob();
player = new HFileSplitterJob(); bulkOutputConfKey = HFileSplitterJob.BULK_OUTPUT_CONF_KEY;
bulkOutputConfKey = HFileSplitterJob.BULK_OUTPUT_CONF_KEY;
} else {
player = new WALPlayer();
bulkOutputConfKey = WALPlayer.BULK_OUTPUT_CONF_KEY;
}
// Player reads all files in arbitrary directory structure and creates // Player reads all files in arbitrary directory structure and creates
// a Map task for each file // a Map task for each file
String dirs = StringUtils.join(dirPaths, ","); String dirs = StringUtils.join(dirPaths, ",");
@ -88,7 +83,10 @@ public class MapReduceRestoreJob implements RestoreJob {
Path bulkOutputPath = getBulkOutputDir(getFileNameCompatibleString(newTableNames[i])); Path bulkOutputPath = getBulkOutputDir(getFileNameCompatibleString(newTableNames[i]));
Configuration conf = getConf(); Configuration conf = getConf();
conf.set(bulkOutputConfKey, bulkOutputPath.toString()); conf.set(bulkOutputConfKey, bulkOutputPath.toString());
String[] playerArgs = { dirs, tableNames[i].getNameAsString() }; String[] playerArgs =
{ dirs,
fullBackupRestore? newTableNames[i].getNameAsString():tableNames[i].getNameAsString()
};
int result = 0; int result = 0;
int loaderResult = 0; int loaderResult = 0;

View File

@ -46,8 +46,6 @@ import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.io.HFileLink; import org.apache.hadoop.hbase.io.HFileLink;
import org.apache.hadoop.hbase.io.hfile.HFile; import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles; import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
import org.apache.hadoop.hbase.regionserver.HRegionFileSystem;
import org.apache.hadoop.hbase.regionserver.HStore;
import org.apache.hadoop.hbase.regionserver.StoreFileInfo; import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.SnapshotDescription; import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.SnapshotDescription;
import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils; import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
@ -63,19 +61,13 @@ import org.apache.hadoop.hbase.util.FSTableDescriptors;
public class RestoreTool { public class RestoreTool {
public static final Log LOG = LogFactory.getLog(BackupUtils.class); public static final Log LOG = LogFactory.getLog(BackupUtils.class);
private final String[] ignoreDirs = { HConstants.RECOVERED_EDITS_DIR };
private final static long TABLE_AVAILABILITY_WAIT_TIME = 180000; private final static long TABLE_AVAILABILITY_WAIT_TIME = 180000;
private final String[] ignoreDirs = { HConstants.RECOVERED_EDITS_DIR };
protected Configuration conf = null; protected Configuration conf = null;
protected Path backupRootPath; protected Path backupRootPath;
protected String backupId; protected String backupId;
protected FileSystem fs; protected FileSystem fs;
private final Path restoreTmpPath;
// store table name and snapshot dir mapping // store table name and snapshot dir mapping
private final HashMap<TableName, Path> snapshotMap = new HashMap<>(); private final HashMap<TableName, Path> snapshotMap = new HashMap<>();
@ -86,9 +78,6 @@ public class RestoreTool {
this.backupRootPath = backupRootPath; this.backupRootPath = backupRootPath;
this.backupId = backupId; this.backupId = backupId;
this.fs = backupRootPath.getFileSystem(conf); this.fs = backupRootPath.getFileSystem(conf);
this.restoreTmpPath =
new Path(conf.get(HConstants.TEMPORARY_FS_DIRECTORY_KEY,
HConstants.DEFAULT_TEMPORARY_HDFS_DIRECTORY), "restore");
} }
/** /**
@ -218,7 +207,7 @@ public class RestoreTool {
public void fullRestoreTable(Connection conn, Path tableBackupPath, TableName tableName, public void fullRestoreTable(Connection conn, Path tableBackupPath, TableName tableName,
TableName newTableName, boolean truncateIfExists, String lastIncrBackupId) TableName newTableName, boolean truncateIfExists, String lastIncrBackupId)
throws IOException { throws IOException {
restoreTableAndCreate(conn, tableName, newTableName, tableBackupPath, truncateIfExists, createAndRestoreTable(conn, tableName, newTableName, tableBackupPath, truncateIfExists,
lastIncrBackupId); lastIncrBackupId);
} }
@ -281,48 +270,6 @@ public class RestoreTool {
return tableDescriptor; return tableDescriptor;
} }
/**
* Duplicate the backup image if it's on local cluster
* @see HStore#bulkLoadHFile(org.apache.hadoop.hbase.regionserver.StoreFile)
* @see HRegionFileSystem#bulkLoadStoreFile(String familyName, Path srcPath, long seqNum)
* @param tableArchivePath archive path
* @return the new tableArchivePath
* @throws IOException exception
*/
Path checkLocalAndBackup(Path tableArchivePath) throws IOException {
// Move the file if it's on local cluster
boolean isCopyNeeded = false;
FileSystem srcFs = tableArchivePath.getFileSystem(conf);
FileSystem desFs = FileSystem.get(conf);
if (tableArchivePath.getName().startsWith("/")) {
isCopyNeeded = true;
} else {
// This should match what is done in @see HRegionFileSystem#bulkLoadStoreFile(String, Path,
// long)
if (srcFs.getUri().equals(desFs.getUri())) {
LOG.debug("cluster hold the backup image: " + srcFs.getUri() + "; local cluster node: "
+ desFs.getUri());
isCopyNeeded = true;
}
}
if (isCopyNeeded) {
LOG.debug("File " + tableArchivePath + " on local cluster, back it up before restore");
if (desFs.exists(restoreTmpPath)) {
try {
desFs.delete(restoreTmpPath, true);
} catch (IOException e) {
LOG.debug("Failed to delete path: " + restoreTmpPath
+ ", need to check whether restore target DFS cluster is healthy");
}
}
FileUtil.copy(srcFs, tableArchivePath, desFs, restoreTmpPath, false, conf);
LOG.debug("Copied to temporary path on local cluster: " + restoreTmpPath);
tableArchivePath = restoreTmpPath;
}
return tableArchivePath;
}
private HTableDescriptor getTableDescriptor(FileSystem fileSys, TableName tableName, private HTableDescriptor getTableDescriptor(FileSystem fileSys, TableName tableName,
String lastIncrBackupId) throws IOException { String lastIncrBackupId) throws IOException {
if (lastIncrBackupId != null) { if (lastIncrBackupId != null) {
@ -334,7 +281,7 @@ public class RestoreTool {
return null; return null;
} }
private void restoreTableAndCreate(Connection conn, TableName tableName, TableName newTableName, private void createAndRestoreTable(Connection conn, TableName tableName, TableName newTableName,
Path tableBackupPath, boolean truncateIfExists, String lastIncrBackupId) throws IOException { Path tableBackupPath, boolean truncateIfExists, String lastIncrBackupId) throws IOException {
if (newTableName == null) { if (newTableName == null) {
newTableName = tableName; newTableName = tableName;
@ -403,33 +350,13 @@ public class RestoreTool {
// the regions in fine grain // the regions in fine grain
checkAndCreateTable(conn, tableBackupPath, tableName, newTableName, regionPathList, checkAndCreateTable(conn, tableBackupPath, tableName, newTableName, regionPathList,
tableDescriptor, truncateIfExists); tableDescriptor, truncateIfExists);
if (tableArchivePath != null) { RestoreJob restoreService = BackupRestoreFactory.getRestoreJob(conf);
// start real restore through bulkload Path[] paths = new Path[regionPathList.size()];
// if the backup target is on local cluster, special action needed regionPathList.toArray(paths);
Path tempTableArchivePath = checkLocalAndBackup(tableArchivePath); restoreService.run(paths, new TableName[]{tableName}, new TableName[] {newTableName}, true);
if (tempTableArchivePath.equals(tableArchivePath)) {
if (LOG.isDebugEnabled()) {
LOG.debug("TableArchivePath for bulkload using existPath: " + tableArchivePath);
}
} else {
regionPathList = getRegionList(tempTableArchivePath); // point to the tempDir
if (LOG.isDebugEnabled()) {
LOG.debug("TableArchivePath for bulkload using tempPath: " + tempTableArchivePath);
}
}
LoadIncrementalHFiles loader = createLoader(tempTableArchivePath, false);
for (Path regionPath : regionPathList) {
String regionName = regionPath.toString();
if (LOG.isDebugEnabled()) {
LOG.debug("Restoring HFiles from directory " + regionName);
}
String[] args = { regionName, newTableName.getNameAsString() };
loader.run(args);
}
}
// we do not recovered edits
} catch (Exception e) { } catch (Exception e) {
LOG.error(e);
throw new IllegalStateException("Cannot restore hbase table", e); throw new IllegalStateException("Cannot restore hbase table", e);
} }
} }
@ -452,28 +379,6 @@ public class RestoreTool {
return regionDirList; return regionDirList;
} }
/**
* Create a {@link LoadIncrementalHFiles} instance to be used to restore the HFiles of a full
* backup.
* @return the {@link LoadIncrementalHFiles} instance
* @throws IOException exception
*/
private LoadIncrementalHFiles createLoader(Path tableArchivePath, boolean multipleTables)
throws IOException {
// By default, it is 32 and loader will fail if # of files in any region exceed this
// limit. Bad for snapshot restore.
this.conf.setInt(LoadIncrementalHFiles.MAX_FILES_PER_REGION_PER_FAMILY, Integer.MAX_VALUE);
this.conf.set(LoadIncrementalHFiles.IGNORE_UNMATCHED_CF_CONF_KEY, "yes");
LoadIncrementalHFiles loader = null;
try {
loader = new LoadIncrementalHFiles(this.conf);
} catch (Exception e1) {
throw new IOException(e1);
}
return loader;
}
/** /**
* Calculate region boundaries and add all the column families to the table descriptor * Calculate region boundaries and add all the column families to the table descriptor
* @param regionDirList region dir list * @param regionDirList region dir list
@ -591,17 +496,18 @@ public class RestoreTool {
// create table using table descriptor and region boundaries // create table using table descriptor and region boundaries
admin.createTable(htd, keys); admin.createTable(htd, keys);
} }
long startTime = EnvironmentEdgeManager.currentTime();
while (!admin.isTableAvailable(targetTableName, keys)) { }
try { long startTime = EnvironmentEdgeManager.currentTime();
Thread.sleep(100); while (!admin.isTableAvailable(targetTableName)) {
} catch (InterruptedException ie) { try {
Thread.currentThread().interrupt(); Thread.sleep(100);
} } catch (InterruptedException ie) {
if (EnvironmentEdgeManager.currentTime() - startTime > TABLE_AVAILABILITY_WAIT_TIME) { Thread.currentThread().interrupt();
throw new IOException("Time out " + TABLE_AVAILABILITY_WAIT_TIME + "ms expired, table " }
+ targetTableName + " is still not available"); if (EnvironmentEdgeManager.currentTime() - startTime > TABLE_AVAILABILITY_WAIT_TIME) {
} throw new IOException("Time out " + TABLE_AVAILABILITY_WAIT_TIME + "ms expired, table "
+ targetTableName + " is still not available");
} }
} }
} }

View File

@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.mapreduce;
import java.io.DataInput; import java.io.DataInput;
import java.io.DataOutput; import java.io.DataOutput;
import java.io.EOFException; import java.io.EOFException;
import java.io.FileNotFoundException;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
@ -35,10 +36,10 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit; import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
import org.apache.hadoop.hbase.wal.WAL; import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.wal.WAL.Entry; import org.apache.hadoop.hbase.wal.WAL.Entry;
import org.apache.hadoop.hbase.wal.WAL.Reader; import org.apache.hadoop.hbase.wal.WAL.Reader;
import org.apache.hadoop.hbase.wal.WALFactory;
import org.apache.hadoop.hbase.wal.WALKey; import org.apache.hadoop.hbase.wal.WALKey;
import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.InputFormat; import org.apache.hadoop.mapreduce.InputFormat;
@ -46,6 +47,7 @@ import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.StringUtils;
/** /**
@ -142,56 +144,89 @@ public class WALInputFormat extends InputFormat<WALKey, WALEdit> {
Entry currentEntry = new Entry(); Entry currentEntry = new Entry();
private long startTime; private long startTime;
private long endTime; private long endTime;
private Configuration conf;
private Path logFile;
private long currentPos;
@Override @Override
public void initialize(InputSplit split, TaskAttemptContext context) public void initialize(InputSplit split, TaskAttemptContext context)
throws IOException, InterruptedException { throws IOException, InterruptedException {
WALSplit hsplit = (WALSplit)split; WALSplit hsplit = (WALSplit)split;
Path logFile = new Path(hsplit.getLogFileName()); logFile = new Path(hsplit.getLogFileName());
Configuration conf = context.getConfiguration(); conf = context.getConfiguration();
LOG.info("Opening reader for "+split); LOG.info("Opening reader for "+split);
try { openReader(logFile);
this.reader = WALFactory.createReader(logFile.getFileSystem(conf), logFile, conf);
} catch (EOFException x) {
LOG.info("Ignoring corrupted WAL file: " + logFile
+ " (This is normal when a RegionServer crashed.)");
this.reader = null;
}
this.startTime = hsplit.getStartTime(); this.startTime = hsplit.getStartTime();
this.endTime = hsplit.getEndTime(); this.endTime = hsplit.getEndTime();
} }
private void openReader(Path path) throws IOException
{
closeReader();
reader = AbstractFSWALProvider.openReader(path, conf);
seek();
setCurrentPath(path);
}
private void setCurrentPath(Path path) {
this.logFile = path;
}
private void closeReader() throws IOException {
if (reader != null) {
reader.close();
reader = null;
}
}
private void seek() throws IOException {
if (currentPos != 0) {
reader.seek(currentPos);
}
}
@Override @Override
public boolean nextKeyValue() throws IOException, InterruptedException { public boolean nextKeyValue() throws IOException, InterruptedException {
if (reader == null) return false; if (reader == null) return false;
this.currentPos = reader.getPosition();
Entry temp; Entry temp;
long i = -1; long i = -1;
do { try {
// skip older entries do {
try { // skip older entries
temp = reader.next(currentEntry); try {
i++; temp = reader.next(currentEntry);
} catch (EOFException x) { i++;
LOG.warn("Corrupted entry detected. Ignoring the rest of the file." } catch (EOFException x) {
+ " (This is normal when a RegionServer crashed.)"); LOG.warn("Corrupted entry detected. Ignoring the rest of the file."
+ " (This is normal when a RegionServer crashed.)");
return false;
}
} while (temp != null && temp.getKey().getWriteTime() < startTime);
if (temp == null) {
if (i > 0) LOG.info("Skipped " + i + " entries.");
LOG.info("Reached end of file.");
return false; return false;
} else if (i > 0) {
LOG.info("Skipped " + i + " entries, until ts: " + temp.getKey().getWriteTime() + ".");
}
boolean res = temp.getKey().getWriteTime() <= endTime;
if (!res) {
LOG.info("Reached ts: " + temp.getKey().getWriteTime()
+ " ignoring the rest of the file.");
}
return res;
} catch (IOException e) {
Path archivedLog = AbstractFSWALProvider.getArchivedLogPath(logFile, conf);
if (logFile != archivedLog) {
openReader(archivedLog);
// Try call again in recursion
return nextKeyValue();
} else {
throw e;
} }
} }
while(temp != null && temp.getKey().getWriteTime() < startTime);
if (temp == null) {
if (i > 0) LOG.info("Skipped " + i + " entries.");
LOG.info("Reached end of file.");
return false;
} else if (i > 0) {
LOG.info("Skipped " + i + " entries, until ts: " + temp.getKey().getWriteTime() + ".");
}
boolean res = temp.getKey().getWriteTime() <= endTime;
if (!res) {
LOG.info("Reached ts: " + temp.getKey().getWriteTime() + " ignoring the rest of the file.");
}
return res;
} }
@Override @Override
@ -235,6 +270,7 @@ public class WALInputFormat extends InputFormat<WALKey, WALEdit> {
List<InputSplit> getSplits(final JobContext context, final String startKey, final String endKey) List<InputSplit> getSplits(final JobContext context, final String startKey, final String endKey)
throws IOException, InterruptedException { throws IOException, InterruptedException {
Configuration conf = context.getConfiguration(); Configuration conf = context.getConfiguration();
boolean ignoreMissing = conf.getBoolean(WALPlayer.IGNORE_MISSING_FILES, false);
Path[] inputPaths = getInputPaths(conf); Path[] inputPaths = getInputPaths(conf);
long startTime = conf.getLong(startKey, Long.MIN_VALUE); long startTime = conf.getLong(startKey, Long.MIN_VALUE);
long endTime = conf.getLong(endKey, Long.MAX_VALUE); long endTime = conf.getLong(endKey, Long.MAX_VALUE);
@ -242,8 +278,16 @@ public class WALInputFormat extends InputFormat<WALKey, WALEdit> {
List<FileStatus> allFiles = new ArrayList<FileStatus>(); List<FileStatus> allFiles = new ArrayList<FileStatus>();
for(Path inputPath: inputPaths){ for(Path inputPath: inputPaths){
FileSystem fs = inputPath.getFileSystem(conf); FileSystem fs = inputPath.getFileSystem(conf);
List<FileStatus> files = getFiles(fs, inputPath, startTime, endTime); try {
allFiles.addAll(files); List<FileStatus> files = getFiles(fs, inputPath, startTime, endTime);
allFiles.addAll(files);
} catch (FileNotFoundException e) {
if (ignoreMissing) {
LOG.warn("File "+ inputPath +" is missing. Skipping it.");
continue;
}
throw e;
}
} }
List<InputSplit> splits = new ArrayList<InputSplit>(allFiles.size()); List<InputSplit> splits = new ArrayList<InputSplit>(allFiles.size());
for (FileStatus file : allFiles) { for (FileStatus file : allFiles) {
@ -253,8 +297,9 @@ public class WALInputFormat extends InputFormat<WALKey, WALEdit> {
} }
private Path[] getInputPaths(Configuration conf) { private Path[] getInputPaths(Configuration conf) {
String inpDirs = conf.get("mapreduce.input.fileinputformat.inputdir"); String inpDirs = conf.get(FileInputFormat.INPUT_DIR);
return StringUtils.stringToPath(inpDirs.split(",")); return StringUtils.stringToPath(
inpDirs.split(conf.get(WALPlayer.INPUT_FILES_SEPARATOR_KEY, ",")));
} }
private List<FileStatus> getFiles(FileSystem fs, Path dir, long startTime, long endTime) private List<FileStatus> getFiles(FileSystem fs, Path dir, long startTime, long endTime)

View File

@ -58,7 +58,7 @@ import org.apache.hadoop.util.ToolRunner;
/** /**
* A tool to replay WAL files as a M/R job. * A tool to replay WAL files as a M/R job.
* The WAL can be replayed for a set of tables or all tables, * The WAL can be replayed for a set of tables or all tables,
* and a timerange can be provided (in milliseconds). * and a time range can be provided (in milliseconds).
* The WAL is filtered to the passed set of tables and the output * The WAL is filtered to the passed set of tables and the output
* can optionally be mapped to another set of tables. * can optionally be mapped to another set of tables.
* *
@ -73,6 +73,9 @@ public class WALPlayer extends Configured implements Tool {
public final static String BULK_OUTPUT_CONF_KEY = "wal.bulk.output"; public final static String BULK_OUTPUT_CONF_KEY = "wal.bulk.output";
public final static String TABLES_KEY = "wal.input.tables"; public final static String TABLES_KEY = "wal.input.tables";
public final static String TABLE_MAP_KEY = "wal.input.tablesmap"; public final static String TABLE_MAP_KEY = "wal.input.tablesmap";
public final static String INPUT_FILES_SEPARATOR_KEY = "wal.input.separator";
public final static String IGNORE_MISSING_FILES = "wal.input.ignore.missing.files";
// This relies on Hadoop Configuration to handle warning about deprecated configs and // This relies on Hadoop Configuration to handle warning about deprecated configs and
// to set the correct non-deprecated configs when an old one shows up. // to set the correct non-deprecated configs when an old one shows up.
@ -128,7 +131,9 @@ public class WALPlayer extends Configured implements Tool {
throw new IOException("Exactly one table must be specified for bulk HFile case."); throw new IOException("Exactly one table must be specified for bulk HFile case.");
} }
table = Bytes.toBytes(tables[0]); table = Bytes.toBytes(tables[0]);
} }
} }
/** /**
@ -280,11 +285,10 @@ public class WALPlayer extends Configured implements Tool {
} }
conf.setStrings(TABLES_KEY, tables); conf.setStrings(TABLES_KEY, tables);
conf.setStrings(TABLE_MAP_KEY, tableMap); conf.setStrings(TABLE_MAP_KEY, tableMap);
conf.set(FileInputFormat.INPUT_DIR, inputDirs);
Job job = Job.getInstance(conf, conf.get(JOB_NAME_CONF_KEY, NAME + "_" + System.currentTimeMillis())); Job job = Job.getInstance(conf, conf.get(JOB_NAME_CONF_KEY, NAME + "_" + System.currentTimeMillis()));
job.setJarByClass(WALPlayer.class); job.setJarByClass(WALPlayer.class);
FileInputFormat.addInputPaths(job, inputDirs);
job.setInputFormatClass(WALInputFormat.class); job.setInputFormatClass(WALInputFormat.class);
job.setMapOutputKeyClass(ImmutableBytesWritable.class); job.setMapOutputKeyClass(ImmutableBytesWritable.class);

View File

@ -17,6 +17,7 @@
*/ */
package org.apache.hadoop.hbase.wal; package org.apache.hadoop.hbase.wal;
import java.io.FileNotFoundException;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
@ -36,6 +37,9 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability; import org.apache.hadoop.hbase.classification.InterfaceStability;
import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL; import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL;
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
import org.apache.hadoop.hbase.util.CancelableProgressable;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.LeaseNotRecoveredException;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
@ -373,6 +377,103 @@ public abstract class AbstractFSWALProvider<T extends AbstractFSWAL<?>> implemen
return p.toString().contains(oldLog); return p.toString().contains(oldLog);
} }
/**
* Get the archived WAL file path
* @param path - active WAL file path
* @param conf - configuration
* @return archived path if exists, path - otherwise
* @throws IOException exception
*/
public static Path getArchivedLogPath(Path path, Configuration conf) throws IOException {
Path rootDir = FSUtils.getRootDir(conf);
Path oldLogDir = new Path(rootDir, HConstants.HREGION_OLDLOGDIR_NAME);
Path archivedLogLocation = new Path(oldLogDir, path.getName());
final FileSystem fs = FSUtils.getCurrentFileSystem(conf);
if (fs.exists(archivedLogLocation)) {
LOG.info("Log " + path + " was moved to " + archivedLogLocation);
return archivedLogLocation;
} else {
LOG.error("Couldn't locate log: " + path);
return path;
}
}
/**
* Opens WAL reader with retries and
* additional exception handling
* @param path path to WAL file
* @param conf configuration
* @return WAL Reader instance
* @throws IOException
*/
public static org.apache.hadoop.hbase.wal.WAL.Reader
openReader(Path path, Configuration conf)
throws IOException
{
long retryInterval = 2000; // 2 sec
int maxAttempts = 30;
int attempt = 0;
Exception ee = null;
org.apache.hadoop.hbase.wal.WAL.Reader reader = null;
while (reader == null && attempt++ < maxAttempts) {
try {
// Detect if this is a new file, if so get a new reader else
// reset the current reader so that we see the new data
reader = WALFactory.createReader(path.getFileSystem(conf), path, conf);
return reader;
} catch (FileNotFoundException fnfe) {
// If the log was archived, continue reading from there
Path archivedLog = AbstractFSWALProvider.getArchivedLogPath(path, conf);
if (path != archivedLog) {
return openReader(archivedLog, conf);
} else {
throw fnfe;
}
} catch (LeaseNotRecoveredException lnre) {
// HBASE-15019 the WAL was not closed due to some hiccup.
LOG.warn("Try to recover the WAL lease " + path, lnre);
recoverLease(conf, path);
reader = null;
ee = lnre;
} catch (NullPointerException npe) {
// Workaround for race condition in HDFS-4380
// which throws a NPE if we open a file before any data node has the most recent block
// Just sleep and retry. Will require re-reading compressed WALs for compressionContext.
LOG.warn("Got NPE opening reader, will retry.");
reader = null;
ee = npe;
}
if (reader == null) {
// sleep before next attempt
try {
Thread.sleep(retryInterval);
} catch (InterruptedException e) {
}
}
}
throw new IOException("Could not open reader", ee);
}
// For HBASE-15019
private static void recoverLease(final Configuration conf, final Path path) {
try {
final FileSystem dfs = FSUtils.getCurrentFileSystem(conf);
FSUtils fsUtils = FSUtils.getInstance(dfs, conf);
fsUtils.recoverFileLease(dfs, path, conf, new CancelableProgressable() {
@Override
public boolean progress() {
LOG.debug("Still trying to recover WAL lease: " + path);
return true;
}
});
} catch (IOException e) {
LOG.warn("unable to recover lease for WAL: " + path, e);
}
}
/** /**
* Get prefix of the log from its name, assuming WAL name in format of * Get prefix of the log from its name, assuming WAL name in format of
* log_prefix.filenumber.log_suffix * log_prefix.filenumber.log_suffix