diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCp.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCp.java index a1a2075831b..c36335afc16 100644 --- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCp.java +++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCp.java @@ -84,7 +84,7 @@ public class DistCp extends Configured implements Tool { if (context.shouldUseSnapshotDiff()) { // When "-diff" or "-rdiff" is passed, do sync() first, then // create copyListing based on snapshot diff. - DistCpSync distCpSync = new DistCpSync(context, getConf()); + DistCpSync distCpSync = new DistCpSync(context, job.getConfiguration()); if (distCpSync.sync()) { createInputFileListingWithDiff(job, distCpSync); } else { diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpSync.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpSync.java index a78320b05bf..35ef3e4ab77 100644 --- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpSync.java +++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpSync.java @@ -57,10 +57,13 @@ class DistCpSync { // private EnumMap> diffMap; private DiffInfo[] renameDiffs; + private CopyFilter copyFilter; DistCpSync(DistCpContext context, Configuration conf) { this.context = context; this.conf = conf; + this.copyFilter = CopyFilter.getCopyFilter(conf); + this.copyFilter.initialize(); } private boolean isRdiff() { @@ -213,18 +216,32 @@ class DistCpSync { } SnapshotDiffReport.DiffType dt = entry.getType(); List list = diffMap.get(dt); + final Path source = + new Path(DFSUtilClient.bytes2String(entry.getSourcePath())); + final Path relativeSource = new Path(Path.SEPARATOR + source); if (dt == SnapshotDiffReport.DiffType.MODIFY || dt == SnapshotDiffReport.DiffType.CREATE || dt == SnapshotDiffReport.DiffType.DELETE) { - final Path source = - new Path(DFSUtilClient.bytes2String(entry.getSourcePath())); - list.add(new DiffInfo(source, null, dt)); + if (copyFilter.shouldCopy(relativeSource)) { + list.add(new DiffInfo(source, null, dt)); + } } else if (dt == SnapshotDiffReport.DiffType.RENAME) { - final Path source = - new Path(DFSUtilClient.bytes2String(entry.getSourcePath())); final Path target = - new Path(DFSUtilClient.bytes2String(entry.getTargetPath())); - list.add(new DiffInfo(source, target, dt)); + new Path(DFSUtilClient.bytes2String(entry.getTargetPath())); + final Path relativeTarget = new Path(Path.SEPARATOR + target); + if (copyFilter.shouldCopy(relativeSource)) { + if (copyFilter.shouldCopy(relativeTarget)) { + list.add(new DiffInfo(source, target, dt)); + } else { + list = diffMap.get(SnapshotDiffReport.DiffType.DELETE); + list.add(new DiffInfo(source, target, + SnapshotDiffReport.DiffType.DELETE)); + } + } else if (copyFilter.shouldCopy(relativeTarget)) { + list = diffMap.get(SnapshotDiffReport.DiffType.CREATE); + list.add(new DiffInfo(target, null, + SnapshotDiffReport.DiffType.CREATE)); + } } } return true; diff --git a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestDistCpSync.java b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestDistCpSync.java index 717b2f08436..d6bbc25fdc7 100644 --- a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestDistCpSync.java +++ b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestDistCpSync.java @@ -39,6 +39,13 @@ import org.junit.Assert; import org.junit.Before; import org.junit.Test; + +import java.io.IOException; +import java.io.FileWriter; +import java.io.BufferedWriter; +import java.nio.file.Files; +import java.util.Arrays; +import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.Map; @@ -747,4 +754,157 @@ public class TestDistCpSync { } Assert.assertTrue(threwException); } + + private void initData10(Path dir) throws Exception { + final Path staging = new Path(dir, ".staging"); + final Path stagingF1 = new Path(staging, "f1"); + final Path data = new Path(dir, "data"); + final Path dataF1 = new Path(data, "f1"); + + DFSTestUtil.createFile(dfs, stagingF1, BLOCK_SIZE, DATA_NUM, 0L); + DFSTestUtil.createFile(dfs, dataF1, BLOCK_SIZE, DATA_NUM, 0L); + } + + private void changeData10(Path dir) throws Exception { + final Path staging = new Path(dir, ".staging"); + final Path prod = new Path(dir, "prod"); + dfs.rename(staging, prod); + } + + private java.nio.file.Path generateFilterFile(String fileName) + throws IOException { + java.nio.file.Path tmpFile = Files.createTempFile(fileName, "txt"); + String str = ".*\\.staging.*"; + try (BufferedWriter writer = new BufferedWriter( + new FileWriter(tmpFile.toString()))) { + writer.write(str); + } + return tmpFile; + } + + private void deleteFilterFile(java.nio.file.Path filePath) + throws IOException { + Files.delete(filePath); + } + + @Test + public void testSync10() throws Exception { + java.nio.file.Path filterFile = null; + try { + Path sourcePath = new Path(dfs.getWorkingDirectory(), "source"); + initData10(sourcePath); + dfs.allowSnapshot(sourcePath); + dfs.createSnapshot(sourcePath, "s1"); + filterFile = generateFilterFile("filters"); + final DistCpOptions.Builder builder = new DistCpOptions.Builder( + new ArrayList<>(Arrays.asList(sourcePath)), + target) + .withFiltersFile(filterFile.toString()) + .withSyncFolder(true); + new DistCp(conf, builder.build()).execute(); + verifySync(dfs.getFileStatus(sourcePath), + dfs.getFileStatus(target), false, ".staging"); + + dfs.allowSnapshot(target); + dfs.createSnapshot(target, "s1"); + changeData10(sourcePath); + dfs.createSnapshot(sourcePath, "s2"); + + final DistCpOptions.Builder diffBuilder = new DistCpOptions.Builder( + new ArrayList<>(Arrays.asList(sourcePath)), + target) + .withUseDiff("s1", "s2") + .withFiltersFile(filterFile.toString()) + .withSyncFolder(true); + new DistCp(conf, diffBuilder.build()).execute(); + verifyCopy(dfs.getFileStatus(sourcePath), + dfs.getFileStatus(target), false); + } finally { + deleteFilterFile(filterFile); + } + } + + private void initData11(Path dir) throws Exception { + final Path staging = new Path(dir, "prod"); + final Path stagingF1 = new Path(staging, "f1"); + final Path data = new Path(dir, "data"); + final Path dataF1 = new Path(data, "f1"); + + DFSTestUtil.createFile(dfs, stagingF1, BLOCK_SIZE, DATA_NUM, 0L); + DFSTestUtil.createFile(dfs, dataF1, BLOCK_SIZE, DATA_NUM, 0L); + } + + private void changeData11(Path dir) throws Exception { + final Path staging = new Path(dir, "prod"); + final Path prod = new Path(dir, ".staging"); + dfs.rename(staging, prod); + } + + private void verifySync(FileStatus s, FileStatus t, boolean compareName, + String deletedName) + throws Exception { + Assert.assertEquals(s.isDirectory(), t.isDirectory()); + if (compareName) { + Assert.assertEquals(s.getPath().getName(), t.getPath().getName()); + } + if (!s.isDirectory()) { + // verify the file content is the same + byte[] sbytes = DFSTestUtil.readFileBuffer(dfs, s.getPath()); + byte[] tbytes = DFSTestUtil.readFileBuffer(dfs, t.getPath()); + Assert.assertArrayEquals(sbytes, tbytes); + } else { + FileStatus[] slist = dfs.listStatus(s.getPath()); + FileStatus[] tlist = dfs.listStatus(t.getPath()); + int minFiles = tlist.length; + if (slist.length < tlist.length) { + minFiles = slist.length; + } + for (int i = 0; i < minFiles; i++) { + if (slist[i].getPath().getName().contains(deletedName)) { + if (tlist[i].getPath().getName().contains(deletedName)) { + throw new Exception("Target is not synced as per exclusion filter"); + } + continue; + } + verifySync(slist[i], tlist[i], true, deletedName); + } + } + } + + @Test + public void testSync11() throws Exception { + java.nio.file.Path filterFile = null; + try { + Path sourcePath = new Path(dfs.getWorkingDirectory(), "source"); + initData11(sourcePath); + dfs.allowSnapshot(sourcePath); + dfs.createSnapshot(sourcePath, "s1"); + filterFile = generateFilterFile("filters"); + final DistCpOptions.Builder builder = new DistCpOptions.Builder( + new ArrayList<>(Arrays.asList(sourcePath)), + target) + .withFiltersFile(filterFile.toString()) + .withSyncFolder(true); + new DistCp(conf, builder.build()).execute(); + verifyCopy(dfs.getFileStatus(sourcePath), + dfs.getFileStatus(target), false); + + dfs.allowSnapshot(target); + dfs.createSnapshot(target, "s1"); + changeData11(sourcePath); + dfs.createSnapshot(sourcePath, "s2"); + + final DistCpOptions.Builder diffBuilder = new DistCpOptions.Builder( + new ArrayList<>(Arrays.asList(sourcePath)), + target) + .withUseDiff("s1", "s2") + .withFiltersFile(filterFile.toString()) + .withSyncFolder(true); + new DistCp(conf, diffBuilder.build()).execute(); + verifySync(dfs.getFileStatus(sourcePath), + dfs.getFileStatus(target), false, ".staging"); + } finally { + deleteFilterFile(filterFile); + } + } }