From a4bc3c6db7bad8832c953d809e5a09c94297860f Mon Sep 17 00:00:00 2001 From: tedyu Date: Fri, 13 Oct 2017 15:05:43 -0700 Subject: [PATCH] HBASE-18843 Add DistCp support to incremental backup with bulk loading (Vladimir Rodionov ) --- .../impl/IncrementalTableBackupClient.java | 84 +++++++++++-------- .../mapreduce/MapReduceBackupCopyJob.java | 83 +++++++++++++++++- .../hadoop/hbase/backup/TestBackupBase.java | 5 +- 3 files changed, 134 insertions(+), 38 deletions(-) diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java index 52f6b5ca751..566b77d77f1 100644 --- a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java +++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java @@ -18,7 +18,6 @@ package org.apache.hadoop.hbase.backup.impl; -import java.io.FileNotFoundException; import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; @@ -33,17 +32,15 @@ import org.apache.commons.lang3.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.backup.BackupCopyJob; -import org.apache.hadoop.hbase.backup.BackupInfo; import org.apache.hadoop.hbase.backup.BackupInfo.BackupPhase; import org.apache.hadoop.hbase.backup.BackupRequest; import org.apache.hadoop.hbase.backup.BackupRestoreFactory; import org.apache.hadoop.hbase.backup.BackupType; +import org.apache.hadoop.hbase.backup.mapreduce.MapReduceBackupCopyJob; import org.apache.hadoop.hbase.backup.util.BackupUtils; -import org.apache.yetus.audience.InterfaceAudience; import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.mapreduce.WALPlayer; @@ -53,6 +50,7 @@ import org.apache.hadoop.hbase.util.HFileArchiveUtil; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; import org.apache.hadoop.util.Tool; +import org.apache.yetus.audience.InterfaceAudience; /** * Incremental backup implementation. @@ -112,6 +110,8 @@ public class IncrementalTableBackupClient extends TableBackupClient { */ protected Map>[] handleBulkLoad(List sTableList) throws IOException { Map>[] mapForSrc = new Map[sTableList.size()]; + List activeFiles = new ArrayList(); + List archiveFiles = new ArrayList(); Pair>>>>, List> pair = backupManager.readBulkloadRows(sTableList); Map>>>> map = pair.getFirst(); @@ -127,6 +127,7 @@ public class IncrementalTableBackupClient extends TableBackupClient { for (Map.Entry>>>> tblEntry : map.entrySet()) { TableName srcTable = tblEntry.getKey(); + int srcIdx = getIndex(srcTable, sTableList); if (srcIdx < 0) { LOG.warn("Couldn't find " + srcTable + " in source table List"); @@ -162,7 +163,6 @@ public class IncrementalTableBackupClient extends TableBackupClient { } for (Pair fileWithState : famEntry.getValue()) { String file = fileWithState.getFirst(); - boolean raw = fileWithState.getSecond(); int idx = file.lastIndexOf("/"); String filename = file; if (idx > 0) { @@ -175,37 +175,53 @@ public class IncrementalTableBackupClient extends TableBackupClient { if (LOG.isTraceEnabled()) { LOG.trace("found bulk hfile " + file + " in " + famDir + " for " + tblName); } - try { if (LOG.isTraceEnabled()) { LOG.trace("copying " + p + " to " + tgt); } - FileUtil.copy(fs, p, tgtFs, tgt, false,conf); - } catch (FileNotFoundException e) { - LOG.debug("copying archive " + archive + " to " + tgt); - try { - FileUtil.copy(fs, archive, tgtFs, tgt, false, conf); - } catch (FileNotFoundException fnfe) { - if (!raw) throw fnfe; - } - } - } else { + activeFiles.add(p.toString()); + } else if (fs.exists(archive)){ LOG.debug("copying archive " + archive + " to " + tgt); - try { - FileUtil.copy(fs, archive, tgtFs, tgt, false, conf); - } catch (FileNotFoundException fnfe) { - if (!raw) throw fnfe; - } + archiveFiles.add(archive.toString()); } files.add(tgt); } } } } + + copyBulkLoadedFiles(activeFiles, archiveFiles); + backupManager.writeBulkLoadedFiles(sTableList, mapForSrc); backupManager.removeBulkLoadedRows(sTableList, pair.getSecond()); return mapForSrc; } + private void copyBulkLoadedFiles(List activeFiles, List archiveFiles) + throws IOException + { + + try { + // Enable special mode of BackupDistCp + conf.setInt(MapReduceBackupCopyJob.NUMBER_OF_LEVELS_TO_PRESERVE_KEY, 5); + // Copy active files + String tgtDest = backupInfo.getBackupRootDir() + Path.SEPARATOR + backupInfo.getBackupId(); + if (activeFiles.size() > 0) { + String[] toCopy = new String[activeFiles.size()]; + activeFiles.toArray(toCopy); + incrementalCopyHFiles(toCopy, tgtDest); + } + if (archiveFiles.size() > 0) { + String[] toCopy = new String[archiveFiles.size()]; + archiveFiles.toArray(toCopy); + incrementalCopyHFiles(toCopy, tgtDest); + } + } finally { + // Disable special mode of BackupDistCp + conf.unset(MapReduceBackupCopyJob.NUMBER_OF_LEVELS_TO_PRESERVE_KEY); + } + + } + @Override public void execute() throws IOException { @@ -229,8 +245,8 @@ public class IncrementalTableBackupClient extends TableBackupClient { // copy out the table and region info files for each table BackupUtils.copyTableRegionInfo(conn, backupInfo, conf); // convert WAL to HFiles and copy them to .tmp under BACKUP_ROOT - convertWALsToHFiles(backupInfo); - incrementalCopyHFiles(backupInfo); + convertWALsToHFiles(); + incrementalCopyHFiles(new String[] {getBulkOutputDir().toString()}, backupInfo.getBackupRootDir()); // Save list of WAL files copied backupManager.recordWALFiles(backupInfo.getIncrBackupFileList()); } catch (Exception e) { @@ -270,27 +286,25 @@ public class IncrementalTableBackupClient extends TableBackupClient { } } - protected void incrementalCopyHFiles(BackupInfo backupInfo) throws Exception { + protected void incrementalCopyHFiles(String[] files, String backupDest) throws IOException { try { - LOG.debug("Incremental copy HFiles is starting."); + LOG.debug("Incremental copy HFiles is starting. dest="+backupDest); // set overall backup phase: incremental_copy backupInfo.setPhase(BackupPhase.INCREMENTAL_COPY); // get incremental backup file list and prepare parms for DistCp - List incrBackupFileList = new ArrayList(); - // Add Bulk output - incrBackupFileList.add(getBulkOutputDir().toString()); - String[] strArr = incrBackupFileList.toArray(new String[incrBackupFileList.size() + 1]); - strArr[strArr.length - 1] = backupInfo.getBackupRootDir(); + String[] strArr = new String[files.length + 1]; + System.arraycopy(files, 0, strArr, 0, files.length); + strArr[strArr.length - 1] = backupDest; BackupCopyJob copyService = BackupRestoreFactory.getBackupCopyJob(conf); int res = copyService.copy(backupInfo, backupManager, conf, BackupType.INCREMENTAL, strArr); if (res != 0) { LOG.error("Copy incremental HFile files failed with return code: " + res + "."); - throw new IOException("Failed copy from " + StringUtils.join(incrBackupFileList, ',') - + " to " + backupInfo.getHLogTargetDir()); + throw new IOException("Failed copy from " + StringUtils.join(files, ',') + + " to " + backupDest); } - LOG.debug("Incremental copy HFiles from " + StringUtils.join(incrBackupFileList, ',') - + " to " + backupInfo.getBackupRootDir() + " finished."); + LOG.debug("Incremental copy HFiles from " + StringUtils.join(files, ',') + + " to " + backupDest + " finished."); } finally { deleteBulkLoadDirectory(); } @@ -307,7 +321,7 @@ public class IncrementalTableBackupClient extends TableBackupClient { } - protected void convertWALsToHFiles(BackupInfo backupInfo) throws IOException { + protected void convertWALsToHFiles() throws IOException { // get incremental backup file list and prepare parameters for DistCp List incrBackupFileList = backupInfo.getIncrBackupFileList(); // Get list of tables in incremental backup set diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceBackupCopyJob.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceBackupCopyJob.java index 07e9fcce457..90e844264ee 100644 --- a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceBackupCopyJob.java +++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceBackupCopyJob.java @@ -28,6 +28,7 @@ import java.util.List; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.TableName; @@ -37,10 +38,13 @@ import org.apache.hadoop.hbase.backup.BackupType; import org.apache.hadoop.hbase.backup.impl.BackupManager; import org.apache.hadoop.hbase.backup.util.BackupUtils; import org.apache.hadoop.hbase.snapshot.ExportSnapshot; +import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Cluster; import org.apache.hadoop.mapreduce.Counters; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.JobID; +import org.apache.hadoop.tools.CopyListingFileStatus; import org.apache.hadoop.tools.DistCp; import org.apache.hadoop.tools.DistCpConstants; import org.apache.hadoop.tools.DistCpOptions; @@ -54,6 +58,7 @@ import org.apache.zookeeper.KeeperException.NoNodeException; */ @InterfaceAudience.Private public class MapReduceBackupCopyJob implements BackupCopyJob { + public static final String NUMBER_OF_LEVELS_TO_PRESERVE_KEY = "num.levels.preserve"; private static final Log LOG = LogFactory.getLog(MapReduceBackupCopyJob.class); private Configuration conf; @@ -143,6 +148,7 @@ public class MapReduceBackupCopyJob implements BackupCopyJob { * Only the argument "src1, [src2, [...]] dst" is supported, * no more DistCp options. */ + class BackupDistCp extends DistCp { private BackupInfo backupInfo; @@ -156,6 +162,7 @@ public class MapReduceBackupCopyJob implements BackupCopyJob { } + @Override public Job execute() throws Exception { @@ -226,7 +233,7 @@ public class MapReduceBackupCopyJob implements BackupCopyJob { LOG.debug("Backup progress data updated to backup system table: \"Progress: " + newProgressStr + " - " + bytesCopied + " bytes copied.\""); } catch (Throwable t) { - LOG.error("distcp " + job == null ? "" : job.getJobID() + " encountered error", t); + LOG.error(t); throw t; } @@ -281,6 +288,80 @@ public class MapReduceBackupCopyJob implements BackupCopyJob { } } + + @Override + protected Path createInputFileListing(Job job) throws IOException { + + if (conf.get(NUMBER_OF_LEVELS_TO_PRESERVE_KEY) == null) { + return super.createInputFileListing(job); + } + long totalBytesExpected = 0; + int totalRecords = 0; + Path fileListingPath = getFileListingPath(); + try (SequenceFile.Writer writer = getWriter(fileListingPath);) { + List srcFiles = getSourceFiles(); + if (srcFiles.size() == 0) { + return fileListingPath; + } + totalRecords = srcFiles.size(); + FileSystem fs = srcFiles.get(0).getFileSystem(conf); + for (Path path : srcFiles) { + FileStatus fst = fs.getFileStatus(path); + totalBytesExpected += fst.getLen(); + Text key = getKey(path); + writer.append(key, new CopyListingFileStatus(fst)); + } + writer.close(); + + // update jobs configuration + + Configuration cfg = job.getConfiguration(); + cfg.setLong(DistCpConstants.CONF_LABEL_TOTAL_BYTES_TO_BE_COPIED, totalBytesExpected); + cfg.set(DistCpConstants.CONF_LABEL_LISTING_FILE_PATH, fileListingPath.toString()); + cfg.setLong(DistCpConstants.CONF_LABEL_TOTAL_NUMBER_OF_RECORDS, totalRecords); + } catch (NoSuchFieldException | SecurityException | IllegalArgumentException + | IllegalAccessException | NoSuchMethodException | ClassNotFoundException + | InvocationTargetException e) { + throw new IOException(e); + } + return fileListingPath; + } + + private Text getKey(Path path) { + int level = conf.getInt(NUMBER_OF_LEVELS_TO_PRESERVE_KEY, 1); + int count = 0; + String relPath = ""; + while (count++ < level) { + relPath = Path.SEPARATOR + path.getName() + relPath; + path = path.getParent(); + } + return new Text(relPath); + } + + private List getSourceFiles() throws NoSuchFieldException, SecurityException, + IllegalArgumentException, IllegalAccessException, NoSuchMethodException, + ClassNotFoundException, InvocationTargetException, IOException { + Field options = null; + try { + options = DistCp.class.getDeclaredField("inputOptions"); + } catch (NoSuchFieldException | SecurityException e) { + options = DistCp.class.getDeclaredField("context"); + } + options.setAccessible(true); + return getSourcePaths(options); + } + + + + private SequenceFile.Writer getWriter(Path pathToListFile) throws IOException { + FileSystem fs = pathToListFile.getFileSystem(conf); + fs.delete(pathToListFile, false); + return SequenceFile.createWriter(conf, SequenceFile.Writer.file(pathToListFile), + SequenceFile.Writer.keyClass(Text.class), + SequenceFile.Writer.valueClass(CopyListingFileStatus.class), + SequenceFile.Writer.compression(SequenceFile.CompressionType.NONE)); + } + } /** diff --git a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupBase.java b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupBase.java index 69db342bf3f..c44efbdfeef 100644 --- a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupBase.java +++ b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupBase.java @@ -136,8 +136,9 @@ public class TestBackupBase { // copy out the table and region info files for each table BackupUtils.copyTableRegionInfo(conn, backupInfo, conf); // convert WAL to HFiles and copy them to .tmp under BACKUP_ROOT - convertWALsToHFiles(backupInfo); - incrementalCopyHFiles(backupInfo); + convertWALsToHFiles(); + incrementalCopyHFiles(new String[] {getBulkOutputDir().toString()}, + backupInfo.getBackupRootDir()); failStageIf(Stage.stage_2); // Save list of WAL files copied backupManager.recordWALFiles(backupInfo.getIncrBackupFileList());