diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/snapshot/ExportSnapshot.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/snapshot/ExportSnapshot.java index ef67b7bd824..4af7dfbb8e6 100644 --- a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/snapshot/ExportSnapshot.java +++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/snapshot/ExportSnapshot.java @@ -29,6 +29,11 @@ import java.util.Collections; import java.util.Comparator; import java.util.LinkedList; import java.util.List; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.function.BiConsumer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; @@ -36,7 +41,6 @@ import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileChecksum; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.hbase.HBaseConfiguration; @@ -109,6 +113,10 @@ public class ExportSnapshot extends AbstractHBaseTool implements Tool { private static final String CONF_BANDWIDTH_MB = "snapshot.export.map.bandwidth.mb"; private static final String CONF_MR_JOB_NAME = "mapreduce.job.name"; protected static final String CONF_SKIP_TMP = "snapshot.export.skip.tmp"; + private static final String CONF_COPY_MANIFEST_THREADS = + "snapshot.export.copy.references.threads"; + private static final int DEFAULT_COPY_MANIFEST_THREADS = + Runtime.getRuntime().availableProcessors(); static class Testing { static final String CONF_TEST_FAILURE = "test.snapshot.export.failure"; @@ -842,35 +850,52 @@ public class ExportSnapshot extends AbstractHBaseTool implements Tool { SnapshotReferenceUtil.verifySnapshot(conf, fs, snapshotDir, snapshotDesc); } - /** - * Set path ownership. - */ - private void setOwner(final FileSystem fs, final Path path, final String user, - final String group, final boolean recursive) throws IOException { - if (user != null || group != null) { - if (recursive && fs.isDirectory(path)) { - for (FileStatus child : fs.listStatus(path)) { - setOwner(fs, child.getPath(), user, group, recursive); - } + private void setConfigParallel(FileSystem outputFs, List traversedPath, + BiConsumer task, Configuration conf) throws IOException { + ExecutorService pool = Executors + .newFixedThreadPool(conf.getInt(CONF_COPY_MANIFEST_THREADS, DEFAULT_COPY_MANIFEST_THREADS)); + List> futures = new ArrayList<>(); + for (Path dstPath : traversedPath) { + Future future = (Future) pool.submit(() -> task.accept(outputFs, dstPath)); + futures.add(future); + } + try { + for (Future future : futures) { + future.get(); } - fs.setOwner(path, user, group); + } catch (InterruptedException | ExecutionException e) { + throw new IOException(e); + } finally { + pool.shutdownNow(); } } - /** - * Set path permission. - */ - private void setPermission(final FileSystem fs, final Path path, final short filesMode, - final boolean recursive) throws IOException { - if (filesMode > 0) { - FsPermission perm = new FsPermission(filesMode); - if (recursive && fs.isDirectory(path)) { - for (FileStatus child : fs.listStatus(path)) { - setPermission(fs, child.getPath(), filesMode, recursive); - } + private void setOwnerParallel(FileSystem outputFs, String filesUser, String filesGroup, + Configuration conf, List traversedPath) throws IOException { + setConfigParallel(outputFs, traversedPath, (fs, path) -> { + try { + fs.setOwner(path, filesUser, filesGroup); + } catch (IOException e) { + throw new RuntimeException( + "set owner for file " + path + " to " + filesUser + ":" + filesGroup + " failed"); } - fs.setPermission(path, perm); + }, conf); + } + + private void setPermissionParallel(final FileSystem outputFs, final short filesMode, + final List traversedPath, final Configuration conf) throws IOException { + if (filesMode <= 0) { + return; } + FsPermission perm = new FsPermission(filesMode); + setConfigParallel(outputFs, traversedPath, (fs, path) -> { + try { + fs.setPermission(path, perm); + } catch (IOException e) { + throw new RuntimeException( + "set permission for file " + path + " to " + filesMode + " failed"); + } + }, conf); } private boolean verifyTarget = true; @@ -1001,9 +1026,12 @@ public class ExportSnapshot extends AbstractHBaseTool implements Tool { // Step 1 - Copy fs1:/.snapshot/ to fs2:/.snapshot/.tmp/ // The snapshot references must be copied before the hfiles otherwise the cleaner // will remove them because they are unreferenced. + List travesedPaths = new ArrayList<>(); try { LOG.info("Copy Snapshot Manifest"); - FileUtil.copy(inputFs, snapshotDir, outputFs, initialOutputSnapshotDir, false, false, conf); + travesedPaths = + FSUtils.copyFilesParallel(inputFs, snapshotDir, outputFs, initialOutputSnapshotDir, conf, + conf.getInt(CONF_COPY_MANIFEST_THREADS, DEFAULT_COPY_MANIFEST_THREADS)); } catch (IOException e) { throw new ExportSnapshotException("Failed to copy the snapshot directory: from=" + snapshotDir + " to=" + initialOutputSnapshotDir, e); @@ -1013,11 +1041,11 @@ public class ExportSnapshot extends AbstractHBaseTool implements Tool { + filesUser) + (filesGroup == null ? "" : ", Change the group of " + needSetOwnerDir + " to " + filesGroup)); - setOwner(outputFs, needSetOwnerDir, filesUser, filesGroup, true); + setOwnerParallel(outputFs, filesUser, filesGroup, conf, travesedPaths); } if (filesMode > 0) { LOG.warn("Change the permission of " + needSetOwnerDir + " to " + filesMode); - setPermission(outputFs, needSetOwnerDir, (short)filesMode, true); + setPermissionParallel(outputFs, (short)filesMode, travesedPaths, conf); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java index b106a316ef8..53db140b28d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java @@ -43,6 +43,7 @@ import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.FutureTask; import java.util.concurrent.ThreadPoolExecutor; @@ -54,6 +55,7 @@ import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PathFilter; import org.apache.hadoop.fs.permission.FsAction; @@ -1741,4 +1743,45 @@ public abstract class FSUtils extends CommonFSUtils { } } + public static List copyFilesParallel(FileSystem srcFS, Path src, FileSystem dstFS, Path dst, + Configuration conf, int threads) throws IOException { + ExecutorService pool = Executors.newFixedThreadPool(threads); + List> futures = new ArrayList<>(); + List traversedPaths; + try { + traversedPaths = copyFiles(srcFS, src, dstFS, dst, conf, pool, futures); + for (Future future : futures) { + future.get(); + } + } catch (ExecutionException | InterruptedException | IOException e) { + throw new IOException("copy snapshot reference files failed", e); + } finally { + pool.shutdownNow(); + } + return traversedPaths; + } + + private static List copyFiles(FileSystem srcFS, Path src, FileSystem dstFS, Path dst, + Configuration conf, ExecutorService pool, List> futures) throws IOException { + List traversedPaths = new ArrayList<>(); + traversedPaths.add(dst); + FileStatus currentFileStatus = srcFS.getFileStatus(src); + if (currentFileStatus.isDirectory()) { + if (!dstFS.mkdirs(dst)) { + throw new IOException("create dir failed: " + dst); + } + FileStatus[] subPaths = srcFS.listStatus(src); + for (FileStatus subPath : subPaths) { + traversedPaths.addAll(copyFiles(srcFS, subPath.getPath(), dstFS, + new Path(dst, subPath.getPath().getName()), conf, pool, futures)); + } + } else { + Future future = pool.submit(() -> { + FileUtil.copy(srcFS, src, dstFS, dst, false, false, conf); + return null; + }); + futures.add(future); + } + return traversedPaths; + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestFSUtils.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestFSUtils.java index 2718120084b..a862c8c4506 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestFSUtils.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestFSUtils.java @@ -26,6 +26,7 @@ import static org.junit.Assert.assertTrue; import java.io.File; import java.io.IOException; +import java.util.List; import java.util.Random; import java.util.UUID; import org.apache.hadoop.conf.Configuration; @@ -412,6 +413,32 @@ public class TestFSUtils { } } + + @Test + public void testCopyFilesParallel() throws Exception { + MiniDFSCluster cluster = htu.startMiniDFSCluster(1); + cluster.waitActive(); + FileSystem fs = cluster.getFileSystem(); + Path src = new Path("/src"); + fs.mkdirs(src); + for (int i = 0; i < 50; i++) { + WriteDataToHDFS(fs, new Path(src, String.valueOf(i)), 1024); + } + Path sub = new Path(src, "sub"); + fs.mkdirs(sub); + for (int i = 0; i < 50; i++) { + WriteDataToHDFS(fs, new Path(sub, String.valueOf(i)), 1024); + } + Path dst = new Path("/dst"); + List allFiles = FSUtils.copyFilesParallel(fs, src, fs, dst, conf, 4); + + assertEquals(102, allFiles.size()); + FileStatus[] list = fs.listStatus(dst); + assertEquals(51, list.length); + FileStatus[] sublist = fs.listStatus(new Path(dst, "sub")); + assertEquals(50, sublist.length); + } + // Below is taken from TestPread over in HDFS. static final int blockSize = 4096; static final long seed = 0xDEADBEEFL;