HADOOP-18596. Distcp -update to use modification time while checking for file skip. (#5387)

Adding toggleable support for modification time during distcp -update between two stores with incompatible checksum comparison.

Contributed by: Mehakmeet Singh <mehakmeet.singh.behl@gmail.com>
This commit is contained in:
Mehakmeet Singh 2023-02-14 15:17:27 +05:30 committed by GitHub
parent d437571fe2
commit a2ceb09323
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 297 additions and 25 deletions

View File

@ -140,6 +140,26 @@ public final class DistCpConstants {
"distcp.blocks.per.chunk"; "distcp.blocks.per.chunk";
public static final String CONF_LABEL_USE_ITERATOR = "distcp.use.iterator"; public static final String CONF_LABEL_USE_ITERATOR = "distcp.use.iterator";
/**
* Enabling {@code distcp -update} to use modification time of source and
* target file to check while copying same file with same size but
* different content.
*
* The check would verify if the target file is perceived as older than the
* source then it indicates that the source has been recently updated and it
* is a newer version than what was synced, so we should not skip the copy.
* {@value}
*/
public static final String CONF_LABEL_UPDATE_MOD_TIME =
"distcp.update.modification.time";
/**
* Default value for 'distcp.update.modification.time' configuration.
*/
public static final boolean CONF_LABEL_UPDATE_MOD_TIME_DEFAULT =
true;
/** /**
* Constants for DistCp return code to shell / consumer of ToolRunner's run * Constants for DistCp return code to shell / consumer of ToolRunner's run
*/ */

View File

@ -41,6 +41,8 @@ import org.apache.hadoop.tools.mapred.RetriableFileCopyCommand.CopyReadException
import org.apache.hadoop.tools.util.DistCpUtils; import org.apache.hadoop.tools.util.DistCpUtils;
import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.StringUtils;
import static org.apache.hadoop.tools.DistCpConstants.CONF_LABEL_UPDATE_MOD_TIME_DEFAULT;
/** /**
* Mapper class that executes the DistCp copy operation. * Mapper class that executes the DistCp copy operation.
* Implements the o.a.h.mapreduce.Mapper interface. * Implements the o.a.h.mapreduce.Mapper interface.
@ -74,6 +76,15 @@ public class CopyMapper extends Mapper<Text, CopyListingFileStatus, Text, Text>
OVERWRITE, // Overwrite the whole file OVERWRITE, // Overwrite the whole file
} }
/**
* Indicates the checksum comparison result.
*/
public enum ChecksumComparison {
TRUE, // checksum comparison is compatible and true.
FALSE, // checksum comparison is compatible and false.
INCOMPATIBLE, // checksum comparison is not compatible.
}
private static Logger LOG = LoggerFactory.getLogger(CopyMapper.class); private static Logger LOG = LoggerFactory.getLogger(CopyMapper.class);
private Configuration conf; private Configuration conf;
@ -85,6 +96,7 @@ public class CopyMapper extends Mapper<Text, CopyListingFileStatus, Text, Text>
private boolean append = false; private boolean append = false;
private boolean verboseLog = false; private boolean verboseLog = false;
private boolean directWrite = false; private boolean directWrite = false;
private boolean useModTimeToUpdate;
private EnumSet<FileAttribute> preserve = EnumSet.noneOf(FileAttribute.class); private EnumSet<FileAttribute> preserve = EnumSet.noneOf(FileAttribute.class);
private FileSystem targetFS = null; private FileSystem targetFS = null;
@ -114,6 +126,9 @@ public class CopyMapper extends Mapper<Text, CopyListingFileStatus, Text, Text>
PRESERVE_STATUS.getConfigLabel())); PRESERVE_STATUS.getConfigLabel()));
directWrite = conf.getBoolean( directWrite = conf.getBoolean(
DistCpOptionSwitch.DIRECT_WRITE.getConfigLabel(), false); DistCpOptionSwitch.DIRECT_WRITE.getConfigLabel(), false);
useModTimeToUpdate =
conf.getBoolean(DistCpConstants.CONF_LABEL_UPDATE_MOD_TIME,
CONF_LABEL_UPDATE_MOD_TIME_DEFAULT);
targetWorkPath = new Path(conf.get(DistCpConstants.CONF_LABEL_TARGET_WORK_PATH)); targetWorkPath = new Path(conf.get(DistCpConstants.CONF_LABEL_TARGET_WORK_PATH));
Path targetFinalPath = new Path(conf.get( Path targetFinalPath = new Path(conf.get(
@ -350,13 +365,65 @@ public class CopyMapper extends Mapper<Text, CopyListingFileStatus, Text, Text>
boolean sameLength = target.getLen() == source.getLen(); boolean sameLength = target.getLen() == source.getLen();
boolean sameBlockSize = source.getBlockSize() == target.getBlockSize() boolean sameBlockSize = source.getBlockSize() == target.getBlockSize()
|| !preserve.contains(FileAttribute.BLOCKSIZE); || !preserve.contains(FileAttribute.BLOCKSIZE);
// Skip the copy if a 0 size file is being copied.
if (sameLength && source.getLen() == 0) {
return true;
}
// If the src and target file have same size and block size, we would
// check if the checkCrc flag is enabled or not. If enabled, and the
// modTime comparison is enabled then return true if target file is older
// than the source file, since this indicates that the target file is
// recently updated and the source is not changed more recently than the
// update, we can skip the copy else we would copy.
// If skipCrc flag is disabled, we would check the checksum comparison
// which is an enum representing 3 values, of which if the comparison
// returns NOT_COMPATIBLE, we'll try to check modtime again, else return
// the result of checksum comparison which are compatible(true or false).
//
// Note: Different object stores can have different checksum algorithms
// resulting in no checksum comparison that results in return true
// always, having the modification time enabled can help in these
// scenarios to not incorrectly skip a copy. Refer: HADOOP-18596.
if (sameLength && sameBlockSize) { if (sameLength && sameBlockSize) {
return skipCrc || if (skipCrc) {
DistCpUtils.checksumsAreEqual(sourceFS, source.getPath(), null, return maybeUseModTimeToCompare(source, target);
targetFS, target.getPath(), source.getLen());
} else { } else {
ChecksumComparison checksumComparison = DistCpUtils
.checksumsAreEqual(sourceFS, source.getPath(), null,
targetFS, target.getPath(), source.getLen());
LOG.debug("Result of checksum comparison between src {} and target "
+ "{} : {}", source, target, checksumComparison);
if (checksumComparison.equals(ChecksumComparison.INCOMPATIBLE)) {
return maybeUseModTimeToCompare(source, target);
}
// if skipCrc is disabled and checksumComparison is compatible we
// need not check the mod time.
return checksumComparison.equals(ChecksumComparison.TRUE);
}
}
return false; return false;
} }
/**
* If the mod time comparison is enabled, check the mod time else return
* false.
* Comparison: If the target file perceives to have greater or equal mod time
* (older) than the source file, we can assume that there has been no new
* changes that occurred in the source file, hence we should return true to
* skip the copy of the file.
*
* @param source Source fileStatus.
* @param target Target fileStatus.
* @return boolean representing result of modTime check.
*/
private boolean maybeUseModTimeToCompare(
CopyListingFileStatus source, FileStatus target) {
if (useModTimeToUpdate) {
return source.getModificationTime() <= target.getModificationTime();
}
// if we cannot check mod time, return true (skip the copy).
return true;
} }
@Override @Override

View File

@ -41,6 +41,7 @@ import org.apache.hadoop.tools.CopyListing.XAttrsNotSupportedException;
import org.apache.hadoop.tools.CopyListingFileStatus; import org.apache.hadoop.tools.CopyListingFileStatus;
import org.apache.hadoop.tools.DistCpContext; import org.apache.hadoop.tools.DistCpContext;
import org.apache.hadoop.tools.DistCpOptions.FileAttribute; import org.apache.hadoop.tools.DistCpOptions.FileAttribute;
import org.apache.hadoop.tools.mapred.CopyMapper;
import org.apache.hadoop.tools.mapred.UniformSizeInputFormat; import org.apache.hadoop.tools.mapred.UniformSizeInputFormat;
import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.StringUtils;
@ -568,7 +569,9 @@ public class DistCpUtils {
* and false otherwise. * and false otherwise.
* @throws IOException if there's an exception while retrieving checksums. * @throws IOException if there's an exception while retrieving checksums.
*/ */
public static boolean checksumsAreEqual(FileSystem sourceFS, Path source, public static CopyMapper.ChecksumComparison checksumsAreEqual(
FileSystem sourceFS,
Path source,
FileChecksum sourceChecksum, FileChecksum sourceChecksum,
FileSystem targetFS, FileSystem targetFS,
Path target, long sourceLen) Path target, long sourceLen)
@ -585,8 +588,15 @@ public class DistCpUtils {
} catch (IOException e) { } catch (IOException e) {
LOG.error("Unable to retrieve checksum for " + source + " or " + target, e); LOG.error("Unable to retrieve checksum for " + source + " or " + target, e);
} }
return (sourceChecksum == null || targetChecksum == null || // If the source or target checksum is null, that means there is no
sourceChecksum.equals(targetChecksum)); // comparison that took place and return not compatible.
// else if matched, return compatible with the matched result.
if (sourceChecksum == null || targetChecksum == null) {
return CopyMapper.ChecksumComparison.INCOMPATIBLE;
} else if (sourceChecksum.equals(targetChecksum)) {
return CopyMapper.ChecksumComparison.TRUE;
}
return CopyMapper.ChecksumComparison.FALSE;
} }
/** /**
@ -613,8 +623,12 @@ public class DistCpUtils {
//At this point, src & dest lengths are same. if length==0, we skip checksum //At this point, src & dest lengths are same. if length==0, we skip checksum
if ((srcLen != 0) && (!skipCrc)) { if ((srcLen != 0) && (!skipCrc)) {
if (!checksumsAreEqual(sourceFS, source, sourceChecksum, CopyMapper.ChecksumComparison
targetFS, target, srcLen)) { checksumComparison = checksumsAreEqual(sourceFS, source, sourceChecksum,
targetFS, target, srcLen);
// If Checksum comparison is false set it to false, else set to true.
boolean checksumResult = !checksumComparison.equals(CopyMapper.ChecksumComparison.FALSE);
if (!checksumResult) {
StringBuilder errorMessage = StringBuilder errorMessage =
new StringBuilder(DistCpConstants.CHECKSUM_MISMATCH_ERROR_MSG) new StringBuilder(DistCpConstants.CHECKSUM_MISMATCH_ERROR_MSG)
.append(source).append(" and ").append(target).append("."); .append(source).append(" and ").append(target).append(".");

View File

@ -630,14 +630,37 @@ hadoop distcp -update -numListstatusThreads 20 \
Because object stores are slow to list files, consider setting the `-numListstatusThreads` option when performing a `-update` operation Because object stores are slow to list files, consider setting the `-numListstatusThreads` option when performing a `-update` operation
on a large directory tree (the limit is 40 threads). on a large directory tree (the limit is 40 threads).
When `DistCp -update` is used with object stores, When `DistCp -update` is used with object stores, generally only the
generally only the modification time and length of the individual files are compared, modification time and length of the individual files are compared, not any
not any checksums. The fact that most object stores do have valid timestamps checksums if the checksum algorithm between the two stores is different.
for directories is irrelevant; only the file timestamps are compared.
However, it is important to have the clock of the client computers close * The `distcp -update` between two object stores with different checksum
to that of the infrastructure, so that timestamps are consistent between algorithm compares the modification times of source and target files along
the client/HDFS cluster and that of the object store. Otherwise, changed files may be with the file size to determine whether to skip the file copy. The behavior
missed/copied too often. is controlled by the property `distcp.update.modification.time`, which is
set to true by default. If the source file is more recently modified than
the target file, it is assumed that the content has changed, and the file
should be updated.
We need to ensure that there is no clock skew between the machines.
The fact that most object stores do have valid timestamps for directories
is irrelevant; only the file timestamps are compared. However, it is
important to have the clock of the client computers close to that of the
infrastructure, so that timestamps are consistent between the client/HDFS
cluster and that of the object store. Otherwise, changed files may be
missed/copied too often.
* `distcp.update.modification.time` would only be used if either of the two
stores don't have checksum validation resulting in incompatible checksum
comparison between the two. Even if the property is set to true, it won't
be used if their is valid checksum comparison between the two stores.
To turn off the modification time check, set this in your core-site.xml
```xml
<property>
<name>distcp.update.modification.time</name>
<value>false</value>
</property>
```
**Notes** **Notes**

View File

@ -29,6 +29,7 @@ import java.util.HashMap;
import java.util.Map; import java.util.Map;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
@ -72,6 +73,9 @@ public abstract class AbstractContractDistCpTest
private static final Logger LOG = private static final Logger LOG =
LoggerFactory.getLogger(AbstractContractDistCpTest.class); LoggerFactory.getLogger(AbstractContractDistCpTest.class);
/** Using offset to change modification time in tests. */
private static final long MODIFICATION_TIME_OFFSET = 10000;
public static final String SCALE_TEST_DISTCP_FILE_SIZE_KB public static final String SCALE_TEST_DISTCP_FILE_SIZE_KB
= "scale.test.distcp.file.size.kb"; = "scale.test.distcp.file.size.kb";
@ -354,6 +358,29 @@ public abstract class AbstractContractDistCpTest
.withOverwrite(false))); .withOverwrite(false)));
} }
/**
* Run distcp -update srcDir destDir.
* @param srcDir local source directory
* @param destDir remote destination directory.
* @return the completed job
* @throws Exception any failure.
*/
private Job distCpUpdateWithFs(final Path srcDir, final Path destDir,
FileSystem sourceFs, FileSystem targetFs)
throws Exception {
describe("\nDistcp -update from " + srcDir + " to " + destDir);
lsR("Source Fs to update", sourceFs, srcDir);
lsR("Target Fs before update", targetFs, destDir);
return runDistCp(buildWithStandardOptions(
new DistCpOptions.Builder(
Collections.singletonList(srcDir), destDir)
.withDeleteMissing(true)
.withSyncFolder(true)
.withSkipCRC(false)
.withDirectWrite(shouldUseDirectWrite())
.withOverwrite(false)));
}
/** /**
* Update the source directories as various tests expect, * Update the source directories as various tests expect,
* including adding a new file. * including adding a new file.
@ -857,4 +884,122 @@ public abstract class AbstractContractDistCpTest
verifyFileContents(localFS, dest, block); verifyFileContents(localFS, dest, block);
} }
@Test
public void testDistCpUpdateCheckFileSkip() throws Exception {
describe("Distcp update to check file skips.");
Path source = new Path(remoteDir, "file");
Path dest = new Path(localDir, "file");
Path source0byte = new Path(remoteDir, "file_0byte");
Path dest0byte = new Path(localDir, "file_0byte");
dest = localFS.makeQualified(dest);
dest0byte = localFS.makeQualified(dest0byte);
// Creating a source file with certain dataset.
byte[] sourceBlock = dataset(10, 'a', 'z');
// Write the dataset and as well create the target path.
ContractTestUtils.createFile(localFS, dest, true, sourceBlock);
ContractTestUtils
.writeDataset(remoteFS, source, sourceBlock, sourceBlock.length,
1024, true);
// Create 0 byte source and target files.
ContractTestUtils.createFile(remoteFS, source0byte, true, new byte[0]);
ContractTestUtils.createFile(localFS, dest0byte, true, new byte[0]);
// Execute the distcp -update job.
Job job = distCpUpdateWithFs(remoteDir, localDir, remoteFS, localFS);
// First distcp -update would normally copy the source to dest.
verifyFileContents(localFS, dest, sourceBlock);
// Verify 1 file was skipped in the distcp -update (The 0 byte file).
// Verify 1 file was copied in the distcp -update (The new source file).
verifySkipAndCopyCounter(job, 1, 1);
// Remove the source file and replace with a file with same name and size
// but different content.
remoteFS.delete(source, false);
Path updatedSource = new Path(remoteDir, "file");
byte[] updatedSourceBlock = dataset(10, 'b', 'z');
ContractTestUtils.writeDataset(remoteFS, updatedSource,
updatedSourceBlock, updatedSourceBlock.length, 1024, true);
// For testing purposes we would take the modification time of the
// updated Source file and add an offset or subtract the offset and set
// that time as the modification time for target file, this way we can
// ensure that our test can emulate a scenario where source is either more
// recently changed after -update so that copy takes place or target file
// is more recently changed which would skip the copying since the source
// has not been recently updated.
FileStatus fsSourceUpd = remoteFS.getFileStatus(updatedSource);
long modTimeSourceUpd = fsSourceUpd.getModificationTime();
// Add by an offset which would ensure enough gap for the test to
// not fail due to race conditions.
long newTargetModTimeNew = modTimeSourceUpd + MODIFICATION_TIME_OFFSET;
localFS.setTimes(dest, newTargetModTimeNew, -1);
// Execute the distcp -update job.
Job updatedSourceJobOldSrc =
distCpUpdateWithFs(remoteDir, localDir, remoteFS,
localFS);
// File contents should remain same since the mod time for target is
// newer than the updatedSource which indicates that the sync happened
// more recently and there is no update.
verifyFileContents(localFS, dest, sourceBlock);
// Skipped both 0 byte file and sourceFile (since mod time of target is
// older than the source it is perceived that source is of older version
// and we can skip it's copy).
verifySkipAndCopyCounter(updatedSourceJobOldSrc, 2, 0);
// Subtract by an offset which would ensure enough gap for the test to
// not fail due to race conditions.
long newTargetModTimeOld =
Math.min(modTimeSourceUpd - MODIFICATION_TIME_OFFSET, 0);
localFS.setTimes(dest, newTargetModTimeOld, -1);
// Execute the distcp -update job.
Job updatedSourceJobNewSrc = distCpUpdateWithFs(remoteDir, localDir,
remoteFS,
localFS);
// Verifying the target directory have both 0 byte file and the content
// file.
Assertions
.assertThat(RemoteIterators.toList(localFS.listFiles(localDir, true)))
.hasSize(2);
// Now the copy should take place and the file contents should change
// since the mod time for target is older than the source file indicating
// that there was an update to the source after the last sync took place.
verifyFileContents(localFS, dest, updatedSourceBlock);
// Verifying we skipped the 0 byte file and copied the updated source
// file (since the modification time of the new source is older than the
// target now).
verifySkipAndCopyCounter(updatedSourceJobNewSrc, 1, 1);
}
/**
* Method to check the skipped and copied counters of a distcp job.
*
* @param job job to check.
* @param skipExpectedValue expected skip counter value.
* @param copyExpectedValue expected copy counter value.
* @throws IOException throw in case of failures.
*/
private void verifySkipAndCopyCounter(Job job,
int skipExpectedValue, int copyExpectedValue) throws IOException {
// get the skip and copy counters from the job.
long skipActualValue = job.getCounters()
.findCounter(CopyMapper.Counter.SKIP).getValue();
long copyActualValue = job.getCounters()
.findCounter(CopyMapper.Counter.COPY).getValue();
// Verify if the actual values equals the expected ones.
assertEquals("Mismatch in COPY counter value", copyExpectedValue,
copyActualValue);
assertEquals("Mismatch in SKIP counter value", skipExpectedValue,
skipActualValue);
}
} }

View File

@ -563,9 +563,12 @@ public class TestCopyCommitter {
Path sourcePath = new Path(sourceBase + srcFilename); Path sourcePath = new Path(sourceBase + srcFilename);
CopyListingFileStatus sourceCurrStatus = CopyListingFileStatus sourceCurrStatus =
new CopyListingFileStatus(fs.getFileStatus(sourcePath)); new CopyListingFileStatus(fs.getFileStatus(sourcePath));
Assert.assertFalse(DistCpUtils.checksumsAreEqual( Assert.assertEquals("Checksum should not be equal",
CopyMapper.ChecksumComparison.FALSE,
DistCpUtils.checksumsAreEqual(
fs, new Path(sourceBase + srcFilename), null, fs, new Path(sourceBase + srcFilename), null,
fs, new Path(targetBase + srcFilename), sourceCurrStatus.getLen())); fs, new Path(targetBase + srcFilename),
sourceCurrStatus.getLen()));
} catch(IOException exception) { } catch(IOException exception) {
if (skipCrc) { if (skipCrc) {
LOG.error("Unexpected exception is found", exception); LOG.error("Unexpected exception is found", exception);