HBASE-20579 Improve snapshot manifest copy in ExportSnapshot

Signed-off-by: tedyu <yuzhihong@gmail.com>
This commit is contained in:
jingyuntian 2018-05-17 11:32:49 +08:00 committed by tedyu
parent d06673cf3e
commit c3c9a4a595
3 changed files with 125 additions and 27 deletions

View File

@ -29,6 +29,11 @@ import java.util.Collections;
import java.util.Comparator;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.function.BiConsumer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
@ -36,7 +41,6 @@ import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileChecksum;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hbase.HBaseConfiguration;
@ -109,6 +113,10 @@ public class ExportSnapshot extends AbstractHBaseTool implements Tool {
private static final String CONF_BANDWIDTH_MB = "snapshot.export.map.bandwidth.mb";
private static final String CONF_MR_JOB_NAME = "mapreduce.job.name";
protected static final String CONF_SKIP_TMP = "snapshot.export.skip.tmp";
private static final String CONF_COPY_MANIFEST_THREADS =
"snapshot.export.copy.references.threads";
private static final int DEFAULT_COPY_MANIFEST_THREADS =
Runtime.getRuntime().availableProcessors();
static class Testing {
static final String CONF_TEST_FAILURE = "test.snapshot.export.failure";
@ -842,35 +850,52 @@ public class ExportSnapshot extends AbstractHBaseTool implements Tool {
SnapshotReferenceUtil.verifySnapshot(conf, fs, snapshotDir, snapshotDesc);
}
/**
* Set path ownership.
*/
private void setOwner(final FileSystem fs, final Path path, final String user,
final String group, final boolean recursive) throws IOException {
if (user != null || group != null) {
if (recursive && fs.isDirectory(path)) {
for (FileStatus child : fs.listStatus(path)) {
setOwner(fs, child.getPath(), user, group, recursive);
}
private void setConfigParallel(FileSystem outputFs, List<Path> traversedPath,
BiConsumer<FileSystem, Path> task, Configuration conf) throws IOException {
ExecutorService pool = Executors
.newFixedThreadPool(conf.getInt(CONF_COPY_MANIFEST_THREADS, DEFAULT_COPY_MANIFEST_THREADS));
List<Future<Void>> futures = new ArrayList<>();
for (Path dstPath : traversedPath) {
Future<Void> future = (Future<Void>) pool.submit(() -> task.accept(outputFs, dstPath));
futures.add(future);
}
try {
for (Future<Void> future : futures) {
future.get();
}
fs.setOwner(path, user, group);
} catch (InterruptedException | ExecutionException e) {
throw new IOException(e);
} finally {
pool.shutdownNow();
}
}
/**
* Set path permission.
*/
private void setPermission(final FileSystem fs, final Path path, final short filesMode,
final boolean recursive) throws IOException {
if (filesMode > 0) {
FsPermission perm = new FsPermission(filesMode);
if (recursive && fs.isDirectory(path)) {
for (FileStatus child : fs.listStatus(path)) {
setPermission(fs, child.getPath(), filesMode, recursive);
}
private void setOwnerParallel(FileSystem outputFs, String filesUser, String filesGroup,
Configuration conf, List<Path> traversedPath) throws IOException {
setConfigParallel(outputFs, traversedPath, (fs, path) -> {
try {
fs.setOwner(path, filesUser, filesGroup);
} catch (IOException e) {
throw new RuntimeException(
"set owner for file " + path + " to " + filesUser + ":" + filesGroup + " failed");
}
fs.setPermission(path, perm);
}, conf);
}
private void setPermissionParallel(final FileSystem outputFs, final short filesMode,
final List<Path> traversedPath, final Configuration conf) throws IOException {
if (filesMode <= 0) {
return;
}
FsPermission perm = new FsPermission(filesMode);
setConfigParallel(outputFs, traversedPath, (fs, path) -> {
try {
fs.setPermission(path, perm);
} catch (IOException e) {
throw new RuntimeException(
"set permission for file " + path + " to " + filesMode + " failed");
}
}, conf);
}
private boolean verifyTarget = true;
@ -1001,9 +1026,12 @@ public class ExportSnapshot extends AbstractHBaseTool implements Tool {
// Step 1 - Copy fs1:/.snapshot/<snapshot> to fs2:/.snapshot/.tmp/<snapshot>
// The snapshot references must be copied before the hfiles otherwise the cleaner
// will remove them because they are unreferenced.
List<Path> travesedPaths = new ArrayList<>();
try {
LOG.info("Copy Snapshot Manifest");
FileUtil.copy(inputFs, snapshotDir, outputFs, initialOutputSnapshotDir, false, false, conf);
travesedPaths =
FSUtils.copyFilesParallel(inputFs, snapshotDir, outputFs, initialOutputSnapshotDir, conf,
conf.getInt(CONF_COPY_MANIFEST_THREADS, DEFAULT_COPY_MANIFEST_THREADS));
} catch (IOException e) {
throw new ExportSnapshotException("Failed to copy the snapshot directory: from=" +
snapshotDir + " to=" + initialOutputSnapshotDir, e);
@ -1013,11 +1041,11 @@ public class ExportSnapshot extends AbstractHBaseTool implements Tool {
+ filesUser)
+ (filesGroup == null ? "" : ", Change the group of " + needSetOwnerDir + " to "
+ filesGroup));
setOwner(outputFs, needSetOwnerDir, filesUser, filesGroup, true);
setOwnerParallel(outputFs, filesUser, filesGroup, conf, travesedPaths);
}
if (filesMode > 0) {
LOG.warn("Change the permission of " + needSetOwnerDir + " to " + filesMode);
setPermission(outputFs, needSetOwnerDir, (short)filesMode, true);
setPermissionParallel(outputFs, (short)filesMode, travesedPaths, conf);
}
}

View File

@ -43,6 +43,7 @@ import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import java.util.concurrent.ThreadPoolExecutor;
@ -54,6 +55,7 @@ import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.fs.permission.FsAction;
@ -1741,4 +1743,45 @@ public abstract class FSUtils extends CommonFSUtils {
}
}
public static List<Path> copyFilesParallel(FileSystem srcFS, Path src, FileSystem dstFS, Path dst,
Configuration conf, int threads) throws IOException {
ExecutorService pool = Executors.newFixedThreadPool(threads);
List<Future<Void>> futures = new ArrayList<>();
List<Path> traversedPaths;
try {
traversedPaths = copyFiles(srcFS, src, dstFS, dst, conf, pool, futures);
for (Future<Void> future : futures) {
future.get();
}
} catch (ExecutionException | InterruptedException | IOException e) {
throw new IOException("copy snapshot reference files failed", e);
} finally {
pool.shutdownNow();
}
return traversedPaths;
}
private static List<Path> copyFiles(FileSystem srcFS, Path src, FileSystem dstFS, Path dst,
Configuration conf, ExecutorService pool, List<Future<Void>> futures) throws IOException {
List<Path> traversedPaths = new ArrayList<>();
traversedPaths.add(dst);
FileStatus currentFileStatus = srcFS.getFileStatus(src);
if (currentFileStatus.isDirectory()) {
if (!dstFS.mkdirs(dst)) {
throw new IOException("create dir failed: " + dst);
}
FileStatus[] subPaths = srcFS.listStatus(src);
for (FileStatus subPath : subPaths) {
traversedPaths.addAll(copyFiles(srcFS, subPath.getPath(), dstFS,
new Path(dst, subPath.getPath().getName()), conf, pool, futures));
}
} else {
Future<Void> future = pool.submit(() -> {
FileUtil.copy(srcFS, src, dstFS, dst, false, false, conf);
return null;
});
futures.add(future);
}
return traversedPaths;
}
}

View File

@ -26,6 +26,7 @@ import static org.junit.Assert.assertTrue;
import java.io.File;
import java.io.IOException;
import java.util.List;
import java.util.Random;
import java.util.UUID;
import org.apache.hadoop.conf.Configuration;
@ -412,6 +413,32 @@ public class TestFSUtils {
}
}
@Test
public void testCopyFilesParallel() throws Exception {
MiniDFSCluster cluster = htu.startMiniDFSCluster(1);
cluster.waitActive();
FileSystem fs = cluster.getFileSystem();
Path src = new Path("/src");
fs.mkdirs(src);
for (int i = 0; i < 50; i++) {
WriteDataToHDFS(fs, new Path(src, String.valueOf(i)), 1024);
}
Path sub = new Path(src, "sub");
fs.mkdirs(sub);
for (int i = 0; i < 50; i++) {
WriteDataToHDFS(fs, new Path(sub, String.valueOf(i)), 1024);
}
Path dst = new Path("/dst");
List<Path> allFiles = FSUtils.copyFilesParallel(fs, src, fs, dst, conf, 4);
assertEquals(102, allFiles.size());
FileStatus[] list = fs.listStatus(dst);
assertEquals(51, list.length);
FileStatus[] sublist = fs.listStatus(new Path(dst, "sub"));
assertEquals(50, sublist.length);
}
// Below is taken from TestPread over in HDFS.
static final int blockSize = 4096;
static final long seed = 0xDEADBEEFL;