HBASE-20579 Improve snapshot manifest copy in ExportSnapshot
Signed-off-by: tedyu <yuzhihong@gmail.com>
This commit is contained in:
parent
0836b0719a
commit
c9f8c3436f
|
@ -29,6 +29,11 @@ import java.util.Collections;
|
|||
import java.util.Comparator;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.Future;
|
||||
import java.util.function.BiConsumer;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FSDataInputStream;
|
||||
|
@ -36,7 +41,6 @@ import org.apache.hadoop.fs.FSDataOutputStream;
|
|||
import org.apache.hadoop.fs.FileChecksum;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.FileUtil;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.permission.FsPermission;
|
||||
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||
|
@ -109,6 +113,10 @@ public class ExportSnapshot extends AbstractHBaseTool implements Tool {
|
|||
private static final String CONF_BANDWIDTH_MB = "snapshot.export.map.bandwidth.mb";
|
||||
private static final String CONF_MR_JOB_NAME = "mapreduce.job.name";
|
||||
protected static final String CONF_SKIP_TMP = "snapshot.export.skip.tmp";
|
||||
private static final String CONF_COPY_MANIFEST_THREADS =
|
||||
"snapshot.export.copy.references.threads";
|
||||
private static final int DEFAULT_COPY_MANIFEST_THREADS =
|
||||
Runtime.getRuntime().availableProcessors();
|
||||
|
||||
static class Testing {
|
||||
static final String CONF_TEST_FAILURE = "test.snapshot.export.failure";
|
||||
|
@ -842,35 +850,52 @@ public class ExportSnapshot extends AbstractHBaseTool implements Tool {
|
|||
SnapshotReferenceUtil.verifySnapshot(conf, fs, snapshotDir, snapshotDesc);
|
||||
}
|
||||
|
||||
/**
|
||||
* Set path ownership.
|
||||
*/
|
||||
private void setOwner(final FileSystem fs, final Path path, final String user,
|
||||
final String group, final boolean recursive) throws IOException {
|
||||
if (user != null || group != null) {
|
||||
if (recursive && fs.isDirectory(path)) {
|
||||
for (FileStatus child : fs.listStatus(path)) {
|
||||
setOwner(fs, child.getPath(), user, group, recursive);
|
||||
}
|
||||
private void setConfigParallel(FileSystem outputFs, List<Path> traversedPath,
|
||||
BiConsumer<FileSystem, Path> task, Configuration conf) throws IOException {
|
||||
ExecutorService pool = Executors
|
||||
.newFixedThreadPool(conf.getInt(CONF_COPY_MANIFEST_THREADS, DEFAULT_COPY_MANIFEST_THREADS));
|
||||
List<Future<Void>> futures = new ArrayList<>();
|
||||
for (Path dstPath : traversedPath) {
|
||||
Future<Void> future = (Future<Void>) pool.submit(() -> task.accept(outputFs, dstPath));
|
||||
futures.add(future);
|
||||
}
|
||||
try {
|
||||
for (Future<Void> future : futures) {
|
||||
future.get();
|
||||
}
|
||||
fs.setOwner(path, user, group);
|
||||
} catch (InterruptedException | ExecutionException e) {
|
||||
throw new IOException(e);
|
||||
} finally {
|
||||
pool.shutdownNow();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Set path permission.
|
||||
*/
|
||||
private void setPermission(final FileSystem fs, final Path path, final short filesMode,
|
||||
final boolean recursive) throws IOException {
|
||||
if (filesMode > 0) {
|
||||
FsPermission perm = new FsPermission(filesMode);
|
||||
if (recursive && fs.isDirectory(path)) {
|
||||
for (FileStatus child : fs.listStatus(path)) {
|
||||
setPermission(fs, child.getPath(), filesMode, recursive);
|
||||
}
|
||||
private void setOwnerParallel(FileSystem outputFs, String filesUser, String filesGroup,
|
||||
Configuration conf, List<Path> traversedPath) throws IOException {
|
||||
setConfigParallel(outputFs, traversedPath, (fs, path) -> {
|
||||
try {
|
||||
fs.setOwner(path, filesUser, filesGroup);
|
||||
} catch (IOException e) {
|
||||
throw new RuntimeException(
|
||||
"set owner for file " + path + " to " + filesUser + ":" + filesGroup + " failed");
|
||||
}
|
||||
fs.setPermission(path, perm);
|
||||
}, conf);
|
||||
}
|
||||
|
||||
private void setPermissionParallel(final FileSystem outputFs, final short filesMode,
|
||||
final List<Path> traversedPath, final Configuration conf) throws IOException {
|
||||
if (filesMode <= 0) {
|
||||
return;
|
||||
}
|
||||
FsPermission perm = new FsPermission(filesMode);
|
||||
setConfigParallel(outputFs, traversedPath, (fs, path) -> {
|
||||
try {
|
||||
fs.setPermission(path, perm);
|
||||
} catch (IOException e) {
|
||||
throw new RuntimeException(
|
||||
"set permission for file " + path + " to " + filesMode + " failed");
|
||||
}
|
||||
}, conf);
|
||||
}
|
||||
|
||||
private boolean verifyTarget = true;
|
||||
|
@ -1001,9 +1026,12 @@ public class ExportSnapshot extends AbstractHBaseTool implements Tool {
|
|||
// Step 1 - Copy fs1:/.snapshot/<snapshot> to fs2:/.snapshot/.tmp/<snapshot>
|
||||
// The snapshot references must be copied before the hfiles otherwise the cleaner
|
||||
// will remove them because they are unreferenced.
|
||||
List<Path> travesedPaths = new ArrayList<>();
|
||||
try {
|
||||
LOG.info("Copy Snapshot Manifest");
|
||||
FileUtil.copy(inputFs, snapshotDir, outputFs, initialOutputSnapshotDir, false, false, conf);
|
||||
travesedPaths =
|
||||
FSUtils.copyFilesParallel(inputFs, snapshotDir, outputFs, initialOutputSnapshotDir, conf,
|
||||
conf.getInt(CONF_COPY_MANIFEST_THREADS, DEFAULT_COPY_MANIFEST_THREADS));
|
||||
} catch (IOException e) {
|
||||
throw new ExportSnapshotException("Failed to copy the snapshot directory: from=" +
|
||||
snapshotDir + " to=" + initialOutputSnapshotDir, e);
|
||||
|
@ -1013,11 +1041,11 @@ public class ExportSnapshot extends AbstractHBaseTool implements Tool {
|
|||
+ filesUser)
|
||||
+ (filesGroup == null ? "" : ", Change the group of " + needSetOwnerDir + " to "
|
||||
+ filesGroup));
|
||||
setOwner(outputFs, needSetOwnerDir, filesUser, filesGroup, true);
|
||||
setOwnerParallel(outputFs, filesUser, filesGroup, conf, travesedPaths);
|
||||
}
|
||||
if (filesMode > 0) {
|
||||
LOG.warn("Change the permission of " + needSetOwnerDir + " to " + filesMode);
|
||||
setPermission(outputFs, needSetOwnerDir, (short)filesMode, true);
|
||||
setPermissionParallel(outputFs, (short)filesMode, travesedPaths, conf);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -43,6 +43,7 @@ import java.util.concurrent.ArrayBlockingQueue;
|
|||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.Future;
|
||||
import java.util.concurrent.FutureTask;
|
||||
import java.util.concurrent.ThreadPoolExecutor;
|
||||
|
@ -54,6 +55,7 @@ import org.apache.hadoop.fs.FSDataInputStream;
|
|||
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.FileUtil;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.PathFilter;
|
||||
import org.apache.hadoop.fs.permission.FsAction;
|
||||
|
@ -1741,4 +1743,45 @@ public abstract class FSUtils extends CommonFSUtils {
|
|||
}
|
||||
}
|
||||
|
||||
public static List<Path> copyFilesParallel(FileSystem srcFS, Path src, FileSystem dstFS, Path dst,
|
||||
Configuration conf, int threads) throws IOException {
|
||||
ExecutorService pool = Executors.newFixedThreadPool(threads);
|
||||
List<Future<Void>> futures = new ArrayList<>();
|
||||
List<Path> traversedPaths;
|
||||
try {
|
||||
traversedPaths = copyFiles(srcFS, src, dstFS, dst, conf, pool, futures);
|
||||
for (Future<Void> future : futures) {
|
||||
future.get();
|
||||
}
|
||||
} catch (ExecutionException | InterruptedException | IOException e) {
|
||||
throw new IOException("copy snapshot reference files failed", e);
|
||||
} finally {
|
||||
pool.shutdownNow();
|
||||
}
|
||||
return traversedPaths;
|
||||
}
|
||||
|
||||
private static List<Path> copyFiles(FileSystem srcFS, Path src, FileSystem dstFS, Path dst,
|
||||
Configuration conf, ExecutorService pool, List<Future<Void>> futures) throws IOException {
|
||||
List<Path> traversedPaths = new ArrayList<>();
|
||||
traversedPaths.add(dst);
|
||||
FileStatus currentFileStatus = srcFS.getFileStatus(src);
|
||||
if (currentFileStatus.isDirectory()) {
|
||||
if (!dstFS.mkdirs(dst)) {
|
||||
throw new IOException("create dir failed: " + dst);
|
||||
}
|
||||
FileStatus[] subPaths = srcFS.listStatus(src);
|
||||
for (FileStatus subPath : subPaths) {
|
||||
traversedPaths.addAll(copyFiles(srcFS, subPath.getPath(), dstFS,
|
||||
new Path(dst, subPath.getPath().getName()), conf, pool, futures));
|
||||
}
|
||||
} else {
|
||||
Future<Void> future = pool.submit(() -> {
|
||||
FileUtil.copy(srcFS, src, dstFS, dst, false, false, conf);
|
||||
return null;
|
||||
});
|
||||
futures.add(future);
|
||||
}
|
||||
return traversedPaths;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -26,6 +26,7 @@ import static org.junit.Assert.assertTrue;
|
|||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
import java.util.Random;
|
||||
import java.util.UUID;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
|
@ -412,6 +413,32 @@ public class TestFSUtils {
|
|||
}
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void testCopyFilesParallel() throws Exception {
|
||||
MiniDFSCluster cluster = htu.startMiniDFSCluster(1);
|
||||
cluster.waitActive();
|
||||
FileSystem fs = cluster.getFileSystem();
|
||||
Path src = new Path("/src");
|
||||
fs.mkdirs(src);
|
||||
for (int i = 0; i < 50; i++) {
|
||||
WriteDataToHDFS(fs, new Path(src, String.valueOf(i)), 1024);
|
||||
}
|
||||
Path sub = new Path(src, "sub");
|
||||
fs.mkdirs(sub);
|
||||
for (int i = 0; i < 50; i++) {
|
||||
WriteDataToHDFS(fs, new Path(sub, String.valueOf(i)), 1024);
|
||||
}
|
||||
Path dst = new Path("/dst");
|
||||
List<Path> allFiles = FSUtils.copyFilesParallel(fs, src, fs, dst, conf, 4);
|
||||
|
||||
assertEquals(102, allFiles.size());
|
||||
FileStatus[] list = fs.listStatus(dst);
|
||||
assertEquals(51, list.length);
|
||||
FileStatus[] sublist = fs.listStatus(new Path(dst, "sub"));
|
||||
assertEquals(50, sublist.length);
|
||||
}
|
||||
|
||||
// Below is taken from TestPread over in HDFS.
|
||||
static final int blockSize = 4096;
|
||||
static final long seed = 0xDEADBEEFL;
|
||||
|
|
Loading…
Reference in New Issue