From 3671a5e16fbddbe5a0516289ce98e1305e02291c Mon Sep 17 00:00:00 2001 From: Jing Zhao Date: Thu, 22 May 2014 18:17:11 +0000 Subject: [PATCH] MAPREDUCE-5899. Support incremental data copy in DistCp. Contributed by Jing Zhao. git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1596931 13f79535-47bb-0310-9956-ffa450edef68 --- .../java/org/apache/hadoop/fs/FileSystem.java | 14 +- .../apache/hadoop/fs/FilterFileSystem.java | 8 +- .../org/apache/hadoop/fs/HarFileSystem.java | 2 +- .../apache/hadoop/fs/TestHarFileSystem.java | 11 +- .../main/java/org/apache/hadoop/fs/Hdfs.java | 2 +- .../org/apache/hadoop/hdfs/DFSClient.java | 35 ++-- .../hadoop/hdfs/DistributedFileSystem.java | 31 ++- .../hdfs/server/datanode/DataXceiver.java | 77 +++++-- .../web/resources/DatanodeWebHdfsMethods.java | 3 +- .../hadoop/hdfs/TestGetFileChecksum.java | 75 +++++++ hadoop-mapreduce-project/CHANGES.txt | 2 + .../apache/hadoop/tools/DistCpConstants.java | 1 + .../hadoop/tools/DistCpOptionSwitch.java | 4 + .../apache/hadoop/tools/DistCpOptions.java | 28 +++ .../apache/hadoop/tools/OptionsParser.java | 4 + .../hadoop/tools/mapred/CopyMapper.java | 105 ++++++---- .../mapred/RetriableFileCopyCommand.java | 120 ++++++----- .../tools/util/ThrottledInputStream.java | 24 +++ .../hadoop/tools/TestOptionsParser.java | 44 ++++ .../hadoop/tools/mapred/TestCopyMapper.java | 195 ++++++++++++------ .../mapred/TestRetriableFileCopyCommand.java | 5 +- 21 files changed, 597 insertions(+), 193 deletions(-) create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestGetFileChecksum.java diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java index d2f38cbdf47..b79196eed52 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java @@ -2140,9 +2140,21 @@ public abstract class FileSystem extends Configured implements Closeable { * in the corresponding FileSystem. */ public FileChecksum getFileChecksum(Path f) throws IOException { + return getFileChecksum(f, Long.MAX_VALUE); + } + + /** + * Get the checksum of a file, from the beginning of the file till the + * specific length. + * @param f The file path + * @param length The length of the file range for checksum calculation + * @return The file checksum. + */ + public FileChecksum getFileChecksum(Path f, final long length) + throws IOException { return null; } - + /** * Set the verify checksum flag. This is only applicable if the * corresponding FileSystem supports checksum. By default doesn't do anything. diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FilterFileSystem.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FilterFileSystem.java index ac094272dd9..b98ff40ab56 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FilterFileSystem.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FilterFileSystem.java @@ -31,7 +31,6 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.permission.AclEntry; import org.apache.hadoop.fs.permission.AclStatus; import org.apache.hadoop.fs.permission.FsPermission; -import org.apache.hadoop.fs.ContentSummary; import org.apache.hadoop.fs.Options.ChecksumOpt; import org.apache.hadoop.security.AccessControlException; import org.apache.hadoop.util.Progressable; @@ -428,7 +427,12 @@ public class FilterFileSystem extends FileSystem { public FileChecksum getFileChecksum(Path f) throws IOException { return fs.getFileChecksum(f); } - + + @Override + public FileChecksum getFileChecksum(Path f, long length) throws IOException { + return fs.getFileChecksum(f, length); + } + @Override public void setVerifyChecksum(boolean verifyChecksum) { fs.setVerifyChecksum(verifyChecksum); diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/HarFileSystem.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/HarFileSystem.java index 04780d0911e..3ba6de1f9a3 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/HarFileSystem.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/HarFileSystem.java @@ -687,7 +687,7 @@ public class HarFileSystem extends FileSystem { * @return null since no checksum algorithm is implemented. */ @Override - public FileChecksum getFileChecksum(Path f) { + public FileChecksum getFileChecksum(Path f, long length) { return null; } diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestHarFileSystem.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestHarFileSystem.java index 1614d6b1c6d..16db5b11160 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestHarFileSystem.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestHarFileSystem.java @@ -139,6 +139,7 @@ public class TestHarFileSystem { public int getDefaultPort(); public String getCanonicalServiceName(); public Token getDelegationToken(String renewer) throws IOException; + public FileChecksum getFileChecksum(Path f) throws IOException; public boolean deleteOnExit(Path f) throws IOException; public boolean cancelDeleteOnExit(Path f) throws IOException; public Token[] addDelegationTokens(String renewer, Credentials creds) @@ -223,10 +224,16 @@ public class TestHarFileSystem { } @Test - public void testFileChecksum() { + public void testFileChecksum() throws Exception { final Path p = new Path("har://file-localhost/foo.har/file1"); final HarFileSystem harfs = new HarFileSystem(); - Assert.assertEquals(null, harfs.getFileChecksum(p)); + try { + Assert.assertEquals(null, harfs.getFileChecksum(p)); + } finally { + if (harfs != null) { + harfs.close(); + } + } } /** diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/fs/Hdfs.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/fs/Hdfs.java index 4f51fc1f7e3..e308a966f55 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/fs/Hdfs.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/fs/Hdfs.java @@ -116,7 +116,7 @@ public class Hdfs extends AbstractFileSystem { @Override public FileChecksum getFileChecksum(Path f) throws IOException, UnresolvedLinkException { - return dfs.getFileChecksum(getUriPath(f)); + return dfs.getFileChecksum(getUriPath(f), Long.MAX_VALUE); } @Override diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java index 38eca806d1c..5c7095931ad 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java @@ -1801,15 +1801,19 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory { } /** - * Get the checksum of a file. + * Get the checksum of the whole file of a range of the file. Note that the + * range always starts from the beginning of the file. * @param src The file path + * @param length The length of the range * @return The checksum * @see DistributedFileSystem#getFileChecksum(Path) */ - public MD5MD5CRC32FileChecksum getFileChecksum(String src) throws IOException { + public MD5MD5CRC32FileChecksum getFileChecksum(String src, long length) + throws IOException { checkOpen(); - return getFileChecksum(src, clientName, namenode, socketFactory, - dfsClientConf.socketTimeout, getDataEncryptionKey(), + Preconditions.checkArgument(length >= 0); + return getFileChecksum(src, length, clientName, namenode, + socketFactory, dfsClientConf.socketTimeout, getDataEncryptionKey(), dfsClientConf.connectToDnViaHostname); } @@ -1850,8 +1854,9 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory { } /** - * Get the checksum of a file. + * Get the checksum of the whole file or a range of the file. * @param src The file path + * @param length the length of the range, i.e., the range is [0, length] * @param clientName the name of the client requesting the checksum. * @param namenode the RPC proxy for the namenode * @param socketFactory to create sockets to connect to DNs @@ -1861,12 +1866,13 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory { * @return The checksum */ private static MD5MD5CRC32FileChecksum getFileChecksum(String src, - String clientName, - ClientProtocol namenode, SocketFactory socketFactory, int socketTimeout, + long length, String clientName, ClientProtocol namenode, + SocketFactory socketFactory, int socketTimeout, DataEncryptionKey encryptionKey, boolean connectToDnViaHostname) throws IOException { - //get all block locations - LocatedBlocks blockLocations = callGetBlockLocations(namenode, src, 0, Long.MAX_VALUE); + //get block locations for the file range + LocatedBlocks blockLocations = callGetBlockLocations(namenode, src, 0, + length); if (null == blockLocations) { throw new FileNotFoundException("File does not exist: " + src); } @@ -1878,10 +1884,11 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory { boolean refetchBlocks = false; int lastRetriedIndex = -1; - //get block checksum for each block - for(int i = 0; i < locatedblocks.size(); i++) { + // get block checksum for each block + long remaining = length; + for(int i = 0; i < locatedblocks.size() && remaining > 0; i++) { if (refetchBlocks) { // refetch to get fresh tokens - blockLocations = callGetBlockLocations(namenode, src, 0, Long.MAX_VALUE); + blockLocations = callGetBlockLocations(namenode, src, 0, length); if (null == blockLocations) { throw new FileNotFoundException("File does not exist: " + src); } @@ -1890,6 +1897,10 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory { } LocatedBlock lb = locatedblocks.get(i); final ExtendedBlock block = lb.getBlock(); + if (remaining < block.getNumBytes()) { + block.setNumBytes(remaining); + } + remaining -= block.getNumBytes(); final DatanodeInfo[] datanodes = lb.getLocations(); //try each datanode location of the block diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java index 38bb95a163f..1f577271b44 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java @@ -68,14 +68,12 @@ import org.apache.hadoop.hdfs.protocol.CachePoolEntry; import org.apache.hadoop.hdfs.protocol.CachePoolInfo; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.DirectoryListing; -import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType; import org.apache.hadoop.hdfs.protocol.HdfsConstants.RollingUpgradeAction; import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction; import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; import org.apache.hadoop.hdfs.protocol.HdfsLocatedFileStatus; -import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.RollingUpgradeInfo; import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport; import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus; @@ -85,7 +83,6 @@ import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.io.Text; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.security.AccessControlException; -import org.apache.hadoop.security.token.SecretManager.InvalidToken; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.util.Progressable; @@ -1142,7 +1139,7 @@ public class DistributedFileSystem extends FileSystem { @Override public FileChecksum doCall(final Path p) throws IOException, UnresolvedLinkException { - return dfs.getFileChecksum(getPathName(p)); + return dfs.getFileChecksum(getPathName(p), Long.MAX_VALUE); } @Override @@ -1153,6 +1150,32 @@ public class DistributedFileSystem extends FileSystem { }.resolve(this, absF); } + @Override + public FileChecksum getFileChecksum(Path f, final long length) + throws IOException { + statistics.incrementReadOps(1); + Path absF = fixRelativePart(f); + return new FileSystemLinkResolver() { + @Override + public FileChecksum doCall(final Path p) + throws IOException, UnresolvedLinkException { + return dfs.getFileChecksum(getPathName(p), length); + } + + @Override + public FileChecksum next(final FileSystem fs, final Path p) + throws IOException { + if (fs instanceof DistributedFileSystem) { + return ((DistributedFileSystem) fs).getFileChecksum(p, length); + } else { + throw new UnsupportedFileSystemException( + "getFileChecksum(Path, long) is not supported by " + + fs.getClass().getSimpleName()); + } + } + }.resolve(this, absF); + } + @Override public void setPermission(Path p, final FsPermission permission ) throws IOException { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java index a0de9fa83ef..a118fcb1ff3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java @@ -42,6 +42,7 @@ import java.net.Socket; import java.net.SocketException; import java.net.UnknownHostException; import java.nio.channels.ClosedChannelException; +import java.security.MessageDigest; import java.util.Arrays; import org.apache.commons.logging.Log; @@ -83,6 +84,7 @@ import org.apache.hadoop.security.token.SecretManager.InvalidToken; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.util.DataChecksum; +import com.google.common.base.Preconditions; import com.google.common.net.InetAddresses; import com.google.protobuf.ByteString; @@ -802,7 +804,44 @@ class DataXceiver extends Receiver implements Runnable { IOUtils.closeStream(out); } } - + + private MD5Hash calcPartialBlockChecksum(ExtendedBlock block, + long requestLength, DataChecksum checksum, DataInputStream checksumIn) + throws IOException { + final int bytesPerCRC = checksum.getBytesPerChecksum(); + final int csize = checksum.getChecksumSize(); + final byte[] buffer = new byte[4*1024]; + MessageDigest digester = MD5Hash.getDigester(); + + long remaining = requestLength / bytesPerCRC * csize; + for (int toDigest = 0; remaining > 0; remaining -= toDigest) { + toDigest = checksumIn.read(buffer, 0, + (int) Math.min(remaining, buffer.length)); + if (toDigest < 0) { + break; + } + digester.update(buffer, 0, toDigest); + } + + int partialLength = (int) (requestLength % bytesPerCRC); + if (partialLength > 0) { + byte[] buf = new byte[partialLength]; + final InputStream blockIn = datanode.data.getBlockInputStream(block, + requestLength - partialLength); + try { + // Get the CRC of the partialLength. + IOUtils.readFully(blockIn, buf, 0, partialLength); + } finally { + IOUtils.closeStream(blockIn); + } + checksum.update(buf, 0, partialLength); + byte[] partialCrc = new byte[csize]; + checksum.writeValue(partialCrc, 0, true); + digester.update(partialCrc); + } + return new MD5Hash(digester.digest()); + } + @Override public void blockChecksum(final ExtendedBlock block, final Token blockToken) throws IOException { @@ -810,25 +849,32 @@ class DataXceiver extends Receiver implements Runnable { getOutputStream()); checkAccess(out, true, block, blockToken, Op.BLOCK_CHECKSUM, BlockTokenSecretManager.AccessMode.READ); - updateCurrentThreadName("Reading metadata for block " + block); - final LengthInputStream metadataIn = - datanode.data.getMetaDataInputStream(block); - final DataInputStream checksumIn = new DataInputStream(new BufferedInputStream( - metadataIn, HdfsConstants.IO_FILE_BUFFER_SIZE)); + // client side now can specify a range of the block for checksum + long requestLength = block.getNumBytes(); + Preconditions.checkArgument(requestLength >= 0); + long visibleLength = datanode.data.getReplicaVisibleLength(block); + boolean partialBlk = requestLength < visibleLength; + updateCurrentThreadName("Reading metadata for block " + block); + final LengthInputStream metadataIn = datanode.data + .getMetaDataInputStream(block); + + final DataInputStream checksumIn = new DataInputStream( + new BufferedInputStream(metadataIn, HdfsConstants.IO_FILE_BUFFER_SIZE)); updateCurrentThreadName("Getting checksum for block " + block); try { //read metadata file - final BlockMetadataHeader header = BlockMetadataHeader.readHeader(checksumIn); - final DataChecksum checksum = header.getChecksum(); + final BlockMetadataHeader header = BlockMetadataHeader + .readHeader(checksumIn); + final DataChecksum checksum = header.getChecksum(); + final int csize = checksum.getChecksumSize(); final int bytesPerCRC = checksum.getBytesPerChecksum(); - final long crcPerBlock = checksum.getChecksumSize() > 0 - ? (metadataIn.getLength() - BlockMetadataHeader.getHeaderSize())/checksum.getChecksumSize() - : 0; - - //compute block checksum - final MD5Hash md5 = MD5Hash.digest(checksumIn); + final long crcPerBlock = csize <= 0 ? 0 : + (metadataIn.getLength() - BlockMetadataHeader.getHeaderSize()) / csize; + final MD5Hash md5 = partialBlk && crcPerBlock > 0 ? + calcPartialBlockChecksum(block, requestLength, checksum, checksumIn) + : MD5Hash.digest(checksumIn); if (LOG.isDebugEnabled()) { LOG.debug("block=" + block + ", bytesPerCRC=" + bytesPerCRC + ", crcPerBlock=" + crcPerBlock + ", md5=" + md5); @@ -841,8 +887,7 @@ class DataXceiver extends Receiver implements Runnable { .setBytesPerCrc(bytesPerCRC) .setCrcPerBlock(crcPerBlock) .setMd5(ByteString.copyFrom(md5.getDigest())) - .setCrcType(PBHelper.convert(checksum.getChecksumType())) - ) + .setCrcType(PBHelper.convert(checksum.getChecksumType()))) .build() .writeDelimitedTo(out); out.flush(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/resources/DatanodeWebHdfsMethods.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/resources/DatanodeWebHdfsMethods.java index fb144d39b82..83de6ebe41b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/resources/DatanodeWebHdfsMethods.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/resources/DatanodeWebHdfsMethods.java @@ -74,7 +74,6 @@ import org.apache.hadoop.hdfs.web.resources.PutOpParam; import org.apache.hadoop.hdfs.web.resources.ReplicationParam; import org.apache.hadoop.hdfs.web.resources.UriFsPathParam; import org.apache.hadoop.io.IOUtils; -import org.apache.hadoop.io.Text; import org.apache.hadoop.security.SecurityUtil; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; @@ -452,7 +451,7 @@ public class DatanodeWebHdfsMethods { MD5MD5CRC32FileChecksum checksum = null; DFSClient dfsclient = newDfsClient(nnId, conf); try { - checksum = dfsclient.getFileChecksum(fullpath); + checksum = dfsclient.getFileChecksum(fullpath, Long.MAX_VALUE); dfsclient.close(); dfsclient = null; } finally { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestGetFileChecksum.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestGetFileChecksum.java new file mode 100644 index 00000000000..0e56ba7f05e --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestGetFileChecksum.java @@ -0,0 +1,75 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileChecksum; +import org.apache.hadoop.fs.Path; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +public class TestGetFileChecksum { + private static final int BLOCKSIZE = 1024; + private static final short REPLICATION = 3; + + private Configuration conf; + private MiniDFSCluster cluster; + private DistributedFileSystem dfs; + + @Before + public void setUp() throws Exception { + conf = new Configuration(); + conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCKSIZE); + cluster = new MiniDFSCluster.Builder(conf).numDataNodes(REPLICATION) + .build(); + cluster.waitActive(); + dfs = cluster.getFileSystem(); + } + + @After + public void tearDown() throws Exception { + if (cluster != null) { + cluster.shutdown(); + } + } + + public void testGetFileChecksum(final Path foo, final int appendLength) + throws Exception { + final int appendRounds = 16; + FileChecksum[] fc = new FileChecksum[appendRounds + 1]; + DFSTestUtil.createFile(dfs, foo, appendLength, REPLICATION, 0L); + fc[0] = dfs.getFileChecksum(foo); + for (int i = 0; i < appendRounds; i++) { + DFSTestUtil.appendFile(dfs, foo, appendLength); + fc[i + 1] = dfs.getFileChecksum(foo); + } + + for (int i = 0; i < appendRounds + 1; i++) { + FileChecksum checksum = dfs.getFileChecksum(foo, appendLength * (i+1)); + Assert.assertTrue(checksum.equals(fc[i])); + } + } + + @Test + public void testGetFileChecksum() throws Exception { + testGetFileChecksum(new Path("/foo"), BLOCKSIZE / 4); + testGetFileChecksum(new Path("/bar"), BLOCKSIZE / 4 - 1); + } +} diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index ce257506ff9..82e90984420 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -202,6 +202,8 @@ Release 2.5.0 - UNRELEASED MAPREDUCE-5809. Enhance distcp to support preserving HDFS ACLs. (cnauroth) + MAPREDUCE-5899. Support incremental data copy in DistCp. (jing9) + OPTIMIZATIONS BUG FIXES diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpConstants.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpConstants.java index 695d8bde394..5fa26da6135 100644 --- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpConstants.java +++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpConstants.java @@ -50,6 +50,7 @@ public class DistCpConstants { public static final String CONF_LABEL_COPY_STRATEGY = "distcp.copy.strategy"; public static final String CONF_LABEL_SKIP_CRC = "distcp.skip.crc"; public static final String CONF_LABEL_OVERWRITE = "distcp.copy.overwrite"; + public static final String CONF_LABEL_APPEND = "distcp.copy.append"; public static final String CONF_LABEL_BANDWIDTH_MB = "distcp.map.bandwidth.mb"; public static final String CONF_LABEL_MAX_CHUNKS_TOLERABLE = diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptionSwitch.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptionSwitch.java index 1639c1d8373..bfaba966be7 100644 --- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptionSwitch.java +++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptionSwitch.java @@ -138,6 +138,10 @@ public enum DistCpOptionSwitch { new Option("overwrite", false, "Choose to overwrite target files " + "unconditionally, even if they exist.")), + APPEND(DistCpConstants.CONF_LABEL_APPEND, + new Option("append", false, + "Reuse existing data in target files and append new data to them if possible")), + /** * Should DisctpExecution be blocking */ diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptions.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptions.java index 5906266fb78..2d19c6afc18 100644 --- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptions.java +++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptions.java @@ -39,6 +39,7 @@ public class DistCpOptions { private boolean deleteMissing = false; private boolean ignoreFailures = false; private boolean overwrite = false; + private boolean append = false; private boolean skipCRC = false; private boolean blocking = true; @@ -244,6 +245,22 @@ public class DistCpOptions { this.overwrite = overwrite; } + /** + * @return whether we can append new data to target files + */ + public boolean shouldAppend() { + return append; + } + + /** + * Set if we want to append new data to target files. This is valid only with + * update option and CRC is not skipped. + */ + public void setAppend(boolean append) { + validate(DistCpOptionSwitch.APPEND, append); + this.append = append; + } + /** * Should CRC/checksum check be skipped while checking files are identical * @@ -472,6 +489,7 @@ public class DistCpOptions { value : this.atomicCommit); boolean skipCRC = (option == DistCpOptionSwitch.SKIP_CRC ? value : this.skipCRC); + boolean append = (option == DistCpOptionSwitch.APPEND ? value : this.append); if (syncFolder && atomicCommit) { throw new IllegalArgumentException("Atomic commit can't be used with " + @@ -492,6 +510,14 @@ public class DistCpOptions { throw new IllegalArgumentException("Skip CRC is valid only with update options"); } + if (!syncFolder && append) { + throw new IllegalArgumentException( + "Append is valid only with update options"); + } + if (skipCRC && append) { + throw new IllegalArgumentException( + "Append is disallowed when skipping CRC"); + } } /** @@ -510,6 +536,8 @@ public class DistCpOptions { String.valueOf(deleteMissing)); DistCpOptionSwitch.addToConf(conf, DistCpOptionSwitch.OVERWRITE, String.valueOf(overwrite)); + DistCpOptionSwitch.addToConf(conf, DistCpOptionSwitch.APPEND, + String.valueOf(append)); DistCpOptionSwitch.addToConf(conf, DistCpOptionSwitch.SKIP_CRC, String.valueOf(skipCRC)); DistCpOptionSwitch.addToConf(conf, DistCpOptionSwitch.BANDWIDTH, diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/OptionsParser.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/OptionsParser.java index d36311d17b8..09e85505227 100644 --- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/OptionsParser.java +++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/OptionsParser.java @@ -140,6 +140,10 @@ public class OptionsParser { option.setOverwrite(true); } + if (command.hasOption(DistCpOptionSwitch.APPEND.getSwitch())) { + option.setAppend(true); + } + if (command.hasOption(DistCpOptionSwitch.DELETE_MISSING.getSwitch())) { option.setDeleteMissing(true); } diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyMapper.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyMapper.java index caf4057c6c2..02337f78bbc 100644 --- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyMapper.java +++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyMapper.java @@ -18,13 +18,20 @@ package org.apache.hadoop.tools.mapred; +import java.io.FileNotFoundException; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.util.Arrays; +import java.util.EnumSet; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileChecksum; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.permission.AclEntry; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.Mapper; @@ -36,11 +43,6 @@ import org.apache.hadoop.tools.DistCpOptions.FileAttribute; import org.apache.hadoop.tools.util.DistCpUtils; import org.apache.hadoop.util.StringUtils; -import java.io.*; -import java.util.EnumSet; -import java.util.Arrays; -import java.util.List; - /** * Mapper class that executes the DistCp copy operation. * Implements the o.a.h.mapreduce.Mapper<> interface. @@ -62,6 +64,15 @@ public class CopyMapper extends Mapper BYTESSKIPPED, // Number of bytes that were skipped from copy. } + /** + * Indicate the action for each file + */ + static enum FileAction { + SKIP, // Skip copying the file since it's already in the target FS + APPEND, // Only need to append new data to the file in the target FS + OVERWRITE, // Overwrite the whole file + } + private static Log LOG = LogFactory.getLog(CopyMapper.class); private Configuration conf; @@ -70,6 +81,7 @@ public class CopyMapper extends Mapper private boolean ignoreFailures = false; private boolean skipCrc = false; private boolean overWrite = false; + private boolean append = false; private EnumSet preserve = EnumSet.noneOf(FileAttribute.class); private FileSystem targetFS = null; @@ -90,6 +102,7 @@ public class CopyMapper extends Mapper ignoreFailures = conf.getBoolean(DistCpOptionSwitch.IGNORE_FAILURES.getConfigLabel(), false); skipCrc = conf.getBoolean(DistCpOptionSwitch.SKIP_CRC.getConfigLabel(), false); overWrite = conf.getBoolean(DistCpOptionSwitch.OVERWRITE.getConfigLabel(), false); + append = conf.getBoolean(DistCpOptionSwitch.APPEND.getConfigLabel(), false); preserve = DistCpUtils.unpackAttributes(conf.get(DistCpOptionSwitch. PRESERVE_STATUS.getConfigLabel())); @@ -224,20 +237,19 @@ public class CopyMapper extends Mapper return; } - if (skipFile(sourceFS, sourceCurrStatus, target)) { + FileAction action = checkUpdate(sourceFS, sourceCurrStatus, target); + if (action == FileAction.SKIP) { LOG.info("Skipping copy of " + sourceCurrStatus.getPath() + " to " + target); updateSkipCounters(context, sourceCurrStatus); context.write(null, new Text("SKIP: " + sourceCurrStatus.getPath())); - } - else { + } else { copyFileWithRetry(description, sourceCurrStatus, target, context, - fileAttributes); + action, fileAttributes); } DistCpUtils.preserve(target.getFileSystem(conf), target, sourceCurrStatus, fileAttributes); - } catch (IOException exception) { handleFailures(exception, sourceFileStatus, target, context); } @@ -254,14 +266,14 @@ public class CopyMapper extends Mapper return DistCpUtils.unpackAttributes(attributeString); } - private void copyFileWithRetry(String description, FileStatus sourceFileStatus, - Path target, Context context, - EnumSet fileAttributes) throws IOException { - + private void copyFileWithRetry(String description, + FileStatus sourceFileStatus, Path target, Context context, + FileAction action, EnumSet fileAttributes) + throws IOException { long bytesCopied; try { - bytesCopied = (Long)new RetriableFileCopyCommand(skipCrc, description) - .execute(sourceFileStatus, target, context, fileAttributes); + bytesCopied = (Long) new RetriableFileCopyCommand(skipCrc, description, + action).execute(sourceFileStatus, target, context, fileAttributes); } catch (Exception e) { context.setStatus("Copy Failure: " + sourceFileStatus.getPath()); throw new IOException("File copy failed: " + sourceFileStatus.getPath() + @@ -311,25 +323,48 @@ public class CopyMapper extends Mapper context.getCounter(counter).increment(value); } - private boolean skipFile(FileSystem sourceFS, FileStatus source, Path target) - throws IOException { - return targetFS.exists(target) - && !overWrite - && !mustUpdate(sourceFS, source, target); + private FileAction checkUpdate(FileSystem sourceFS, FileStatus source, + Path target) throws IOException { + final FileStatus targetFileStatus; + try { + targetFileStatus = targetFS.getFileStatus(target); + } catch (FileNotFoundException e) { + return FileAction.OVERWRITE; + } + if (targetFileStatus != null && !overWrite) { + if (canSkip(sourceFS, source, targetFileStatus)) { + return FileAction.SKIP; + } else if (append) { + long targetLen = targetFileStatus.getLen(); + if (targetLen < source.getLen()) { + FileChecksum sourceChecksum = sourceFS.getFileChecksum( + source.getPath(), targetLen); + if (sourceChecksum != null + && sourceChecksum.equals(targetFS.getFileChecksum(target))) { + // We require that the checksum is not null. Thus currently only + // DistributedFileSystem is supported + return FileAction.APPEND; + } + } + } + } + return FileAction.OVERWRITE; } - private boolean mustUpdate(FileSystem sourceFS, FileStatus source, Path target) - throws IOException { - final FileStatus targetFileStatus = targetFS.getFileStatus(target); - - return syncFolders - && ( - targetFileStatus.getLen() != source.getLen() - || (!skipCrc && - !DistCpUtils.checksumsAreEqual(sourceFS, - source.getPath(), null, targetFS, target)) - || (source.getBlockSize() != targetFileStatus.getBlockSize() && - preserve.contains(FileAttribute.BLOCKSIZE)) - ); + private boolean canSkip(FileSystem sourceFS, FileStatus source, + FileStatus target) throws IOException { + if (!syncFolders) { + return true; + } + boolean sameLength = target.getLen() == source.getLen(); + boolean sameBlockSize = source.getBlockSize() == target.getBlockSize() + || !preserve.contains(FileAttribute.BLOCKSIZE); + if (sameLength && sameBlockSize) { + return skipCrc || + DistCpUtils.checksumsAreEqual(sourceFS, source.getPath(), null, + targetFS, target.getPath()); + } else { + return false; + } } } diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/RetriableFileCopyCommand.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/RetriableFileCopyCommand.java index 1b735262581..1d6115685e5 100644 --- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/RetriableFileCopyCommand.java +++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/RetriableFileCopyCommand.java @@ -18,10 +18,8 @@ package org.apache.hadoop.tools.mapred; -import java.io.BufferedInputStream; import java.io.BufferedOutputStream; import java.io.IOException; -import java.io.InputStream; import java.io.OutputStream; import java.util.EnumSet; @@ -29,6 +27,8 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CreateFlag; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileChecksum; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; @@ -39,6 +39,7 @@ import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.tools.DistCpConstants; import org.apache.hadoop.tools.DistCpOptions.FileAttribute; +import org.apache.hadoop.tools.mapred.CopyMapper.FileAction; import org.apache.hadoop.tools.util.DistCpUtils; import org.apache.hadoop.tools.util.RetriableCommand; import org.apache.hadoop.tools.util.ThrottledInputStream; @@ -54,13 +55,15 @@ public class RetriableFileCopyCommand extends RetriableCommand { private static Log LOG = LogFactory.getLog(RetriableFileCopyCommand.class); private static int BUFFER_SIZE = 8 * 1024; private boolean skipCrc = false; + private FileAction action; /** * Constructor, taking a description of the action. * @param description Verbose description of the copy operation. */ - public RetriableFileCopyCommand(String description) { + public RetriableFileCopyCommand(String description, FileAction action) { super(description); + this.action = action; } /** @@ -68,9 +71,11 @@ public class RetriableFileCopyCommand extends RetriableCommand { * * @param skipCrc Whether to skip the crc check. * @param description A verbose description of the copy operation. + * @param action We should overwrite the target file or append new data to it. */ - public RetriableFileCopyCommand(boolean skipCrc, String description) { - this(description); + public RetriableFileCopyCommand(boolean skipCrc, String description, + FileAction action) { + this(description, action); this.skipCrc = skipCrc; } @@ -96,18 +101,17 @@ public class RetriableFileCopyCommand extends RetriableCommand { } private long doCopy(FileStatus sourceFileStatus, Path target, - Mapper.Context context, - EnumSet fileAttributes) - throws IOException { - - Path tmpTargetPath = getTmpFile(target, context); + Mapper.Context context, EnumSet fileAttributes) + throws IOException { + final boolean toAppend = action == FileAction.APPEND; + Path targetPath = toAppend ? target : getTmpFile(target, context); final Configuration configuration = context.getConfiguration(); FileSystem targetFS = target.getFileSystem(configuration); try { if (LOG.isDebugEnabled()) { LOG.debug("Copying " + sourceFileStatus.getPath() + " to " + target); - LOG.debug("Tmp-file path: " + tmpTargetPath); + LOG.debug("Target file path: " + targetPath); } final Path sourcePath = sourceFileStatus.getPath(); final FileSystem sourceFS = sourcePath.getFileSystem(configuration); @@ -115,22 +119,31 @@ public class RetriableFileCopyCommand extends RetriableCommand { .contains(FileAttribute.CHECKSUMTYPE) ? sourceFS .getFileChecksum(sourcePath) : null; - long bytesRead = copyToTmpFile(tmpTargetPath, targetFS, sourceFileStatus, - context, fileAttributes, sourceChecksum); + final long offset = action == FileAction.APPEND ? targetFS.getFileStatus( + target).getLen() : 0; + long bytesRead = copyToFile(targetPath, targetFS, sourceFileStatus, + offset, context, fileAttributes, sourceChecksum); - compareFileLengths(sourceFileStatus, tmpTargetPath, configuration, - bytesRead); + compareFileLengths(sourceFileStatus, targetPath, configuration, bytesRead + + offset); //At this point, src&dest lengths are same. if length==0, we skip checksum if ((bytesRead != 0) && (!skipCrc)) { compareCheckSums(sourceFS, sourceFileStatus.getPath(), sourceChecksum, - targetFS, tmpTargetPath); + targetFS, targetPath); + } + // it's not append case, thus we first write to a temporary file, rename + // it to the target path. + if (!toAppend) { + promoteTmpToTarget(targetPath, target, targetFS); } - promoteTmpToTarget(tmpTargetPath, target, targetFS); return bytesRead; - } finally { - if (targetFS.exists(tmpTargetPath)) - targetFS.delete(tmpTargetPath, false); + // note that for append case, it is possible that we append partial data + // and then fail. In that case, for the next retry, we either reuse the + // partial appended data if it is good or we overwrite the whole file + if (!toAppend && targetFS.exists(targetPath)) { + targetFS.delete(targetPath, false); + } } } @@ -147,29 +160,37 @@ public class RetriableFileCopyCommand extends RetriableCommand { return null; } - private long copyToTmpFile(Path tmpTargetPath, FileSystem targetFS, - FileStatus sourceFileStatus, Mapper.Context context, + private long copyToFile(Path targetPath, FileSystem targetFS, + FileStatus sourceFileStatus, long sourceOffset, Mapper.Context context, EnumSet fileAttributes, final FileChecksum sourceChecksum) throws IOException { FsPermission permission = FsPermission.getFileDefault().applyUMask( FsPermission.getUMask(targetFS.getConf())); - OutputStream outStream = new BufferedOutputStream( - targetFS.create(tmpTargetPath, permission, - EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE), BUFFER_SIZE, - getReplicationFactor(fileAttributes, sourceFileStatus, targetFS, - tmpTargetPath), - getBlockSize(fileAttributes, sourceFileStatus, targetFS, - tmpTargetPath), - context, getChecksumOpt(fileAttributes, sourceChecksum))); - return copyBytes(sourceFileStatus, outStream, BUFFER_SIZE, context); + final OutputStream outStream; + if (action == FileAction.OVERWRITE) { + final short repl = getReplicationFactor(fileAttributes, sourceFileStatus, + targetFS, targetPath); + final long blockSize = getBlockSize(fileAttributes, sourceFileStatus, + targetFS, targetPath); + FSDataOutputStream out = targetFS.create(targetPath, permission, + EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE), + BUFFER_SIZE, repl, blockSize, context, + getChecksumOpt(fileAttributes, sourceChecksum)); + outStream = new BufferedOutputStream(out); + } else { + outStream = new BufferedOutputStream(targetFS.append(targetPath, + BUFFER_SIZE)); + } + return copyBytes(sourceFileStatus, sourceOffset, outStream, BUFFER_SIZE, + context); } private void compareFileLengths(FileStatus sourceFileStatus, Path target, - Configuration configuration, long bytesRead) + Configuration configuration, long targetLen) throws IOException { final Path sourcePath = sourceFileStatus.getPath(); FileSystem fs = sourcePath.getFileSystem(configuration); - if (fs.getFileStatus(sourcePath).getLen() != bytesRead) + if (fs.getFileStatus(sourcePath).getLen() != targetLen) throw new IOException("Mismatch in length of source:" + sourcePath + " and target:" + target); } @@ -215,8 +236,8 @@ public class RetriableFileCopyCommand extends RetriableCommand { } @VisibleForTesting - long copyBytes(FileStatus sourceFileStatus, OutputStream outStream, - int bufferSize, Mapper.Context context) + long copyBytes(FileStatus sourceFileStatus, long sourceOffset, + OutputStream outStream, int bufferSize, Mapper.Context context) throws IOException { Path source = sourceFileStatus.getPath(); byte buf[] = new byte[bufferSize]; @@ -225,19 +246,21 @@ public class RetriableFileCopyCommand extends RetriableCommand { try { inStream = getInputStream(source, context.getConfiguration()); - int bytesRead = readBytes(inStream, buf); + int bytesRead = readBytes(inStream, buf, sourceOffset); while (bytesRead >= 0) { totalBytesRead += bytesRead; + if (action == FileAction.APPEND) { + sourceOffset += bytesRead; + } outStream.write(buf, 0, bytesRead); updateContextStatus(totalBytesRead, context, sourceFileStatus); - bytesRead = inStream.read(buf); + bytesRead = readBytes(inStream, buf, sourceOffset); } outStream.close(); outStream = null; } finally { IOUtils.cleanup(LOG, outStream, inStream); } - return totalBytesRead; } @@ -254,24 +277,27 @@ public class RetriableFileCopyCommand extends RetriableCommand { context.setStatus(message.toString()); } - private static int readBytes(InputStream inStream, byte buf[]) - throws IOException { + private static int readBytes(ThrottledInputStream inStream, byte buf[], + long position) throws IOException { try { - return inStream.read(buf); - } - catch (IOException e) { + if (position == 0) { + return inStream.read(buf); + } else { + return inStream.read(position, buf, 0, buf.length); + } + } catch (IOException e) { throw new CopyReadException(e); } } - private static ThrottledInputStream getInputStream(Path path, Configuration conf) - throws IOException { + private static ThrottledInputStream getInputStream(Path path, + Configuration conf) throws IOException { try { FileSystem fs = path.getFileSystem(conf); long bandwidthMB = conf.getInt(DistCpConstants.CONF_LABEL_BANDWIDTH_MB, DistCpConstants.DEFAULT_BANDWIDTH_MB); - return new ThrottledInputStream(new BufferedInputStream(fs.open(path)), - bandwidthMB * 1024 * 1024); + FSDataInputStream in = fs.open(path); + return new ThrottledInputStream(in, bandwidthMB * 1024 * 1024); } catch (IOException e) { throw new CopyReadException(e); diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/ThrottledInputStream.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/ThrottledInputStream.java index 75ae86ad054..f6fe11847a2 100644 --- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/ThrottledInputStream.java +++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/ThrottledInputStream.java @@ -21,6 +21,11 @@ package org.apache.hadoop.tools.util; import java.io.IOException; import java.io.InputStream; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.PositionedReadable; + +import com.google.common.base.Preconditions; + /** * The ThrottleInputStream provides bandwidth throttling on a specified * InputStream. It is implemented as a wrapper on top of another InputStream @@ -90,6 +95,25 @@ public class ThrottledInputStream extends InputStream { return readLen; } + /** + * Read bytes starting from the specified position. This requires rawStream is + * an instance of {@link PositionedReadable}. + */ + public int read(long position, byte[] buffer, int offset, int length) + throws IOException { + if (!(rawStream instanceof PositionedReadable)) { + throw new UnsupportedOperationException( + "positioned read is not supported by the internal stream"); + } + throttle(); + int readLen = ((PositionedReadable) rawStream).read(position, buffer, + offset, length); + if (readLen != -1) { + bytesRead += readLen; + } + return readLen; + } + private void throttle() throws IOException { if (getBytesPerSec() > maxBytesPerSec) { try { diff --git a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestOptionsParser.java b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestOptionsParser.java index 296e994f930..8486aa1fcea 100644 --- a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestOptionsParser.java +++ b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestOptionsParser.java @@ -18,9 +18,12 @@ package org.apache.hadoop.tools; +import static org.junit.Assert.fail; + import org.junit.Assert; import org.junit.Test; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.tools.DistCpOptions.*; import org.apache.hadoop.conf.Configuration; @@ -554,4 +557,45 @@ public class TestOptionsParser { Assert.assertEquals(conf.get(DistCpOptionSwitch.PRESERVE_STATUS.getConfigLabel()), "U"); Assert.assertEquals(conf.getInt(DistCpOptionSwitch.BANDWIDTH.getConfigLabel(), -1), 11); } + + @Test + public void testAppendOption() { + Configuration conf = new Configuration(); + Assert.assertFalse(conf.getBoolean( + DistCpOptionSwitch.APPEND.getConfigLabel(), false)); + Assert.assertFalse(conf.getBoolean( + DistCpOptionSwitch.SYNC_FOLDERS.getConfigLabel(), false)); + + DistCpOptions options = OptionsParser.parse(new String[] { "-update", + "-append", "hdfs://localhost:8020/source/first", + "hdfs://localhost:8020/target/" }); + options.appendToConf(conf); + Assert.assertTrue(conf.getBoolean( + DistCpOptionSwitch.APPEND.getConfigLabel(), false)); + Assert.assertTrue(conf.getBoolean( + DistCpOptionSwitch.SYNC_FOLDERS.getConfigLabel(), false)); + + // make sure -append is only valid when -update is specified + try { + options = OptionsParser.parse(new String[] { "-append", + "hdfs://localhost:8020/source/first", + "hdfs://localhost:8020/target/" }); + fail("Append should fail if update option is not specified"); + } catch (IllegalArgumentException e) { + GenericTestUtils.assertExceptionContains( + "Append is valid only with update options", e); + } + + // make sure -append is invalid when skipCrc is specified + try { + options = OptionsParser.parse(new String[] { + "-append", "-update", "-skipcrccheck", + "hdfs://localhost:8020/source/first", + "hdfs://localhost:8020/target/" }); + fail("Append should fail if skipCrc option is specified"); + } catch (IllegalArgumentException e) { + GenericTestUtils.assertExceptionContains( + "Append is disallowed when skipping CRC", e); + } + } } diff --git a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyMapper.java b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyMapper.java index 7eb1b6801e0..2f16682fcd0 100644 --- a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyMapper.java +++ b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyMapper.java @@ -25,11 +25,13 @@ import java.security.PrivilegedAction; import java.util.ArrayList; import java.util.EnumSet; import java.util.List; +import java.util.Random; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CreateFlag; +import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Options.ChecksumOpt; @@ -118,6 +120,16 @@ public class TestCopyMapper { touchFile(SOURCE_PATH + "/7/8/9"); } + private static void appendSourceData() throws Exception { + FileSystem fs = cluster.getFileSystem(); + for (Path source : pathList) { + if (fs.getFileStatus(source).isFile()) { + // append 2048 bytes per file + appendFile(source, DEFAULT_FILE_SIZE * 2); + } + } + } + private static void createSourceDataWithDifferentBlockSize() throws Exception { mkdirs(SOURCE_PATH + "/1"); mkdirs(SOURCE_PATH + "/2"); @@ -201,85 +213,132 @@ public class TestCopyMapper { } } + /** + * Append specified length of bytes to a given file + */ + private static void appendFile(Path p, int length) throws IOException { + byte[] toAppend = new byte[length]; + Random random = new Random(); + random.nextBytes(toAppend); + FSDataOutputStream out = cluster.getFileSystem().append(p); + try { + out.write(toAppend); + } finally { + IOUtils.closeStream(out); + } + } + @Test public void testCopyWithDifferentChecksumType() throws Exception { testCopy(true); } @Test(timeout=40000) - public void testRun() { + public void testRun() throws Exception { testCopy(false); } - private void testCopy(boolean preserveChecksum) { - try { - deleteState(); - if (preserveChecksum) { - createSourceDataWithDifferentChecksumType(); - } else { - createSourceData(); - } + @Test + public void testCopyWithAppend() throws Exception { + final FileSystem fs = cluster.getFileSystem(); + // do the first distcp + testCopy(false); + // start appending data to source + appendSourceData(); - FileSystem fs = cluster.getFileSystem(); - CopyMapper copyMapper = new CopyMapper(); - StubContext stubContext = new StubContext(getConfiguration(), null, 0); - Mapper.Context context - = stubContext.getContext(); - - Configuration configuration = context.getConfiguration(); - EnumSet fileAttributes - = EnumSet.of(DistCpOptions.FileAttribute.REPLICATION); - if (preserveChecksum) { - fileAttributes.add(DistCpOptions.FileAttribute.CHECKSUMTYPE); - } - configuration.set(DistCpOptionSwitch.PRESERVE_STATUS.getConfigLabel(), - DistCpUtils.packAttributes(fileAttributes)); - - copyMapper.setup(context); - - for (Path path: pathList) { - copyMapper.map(new Text(DistCpUtils.getRelativePath(new Path(SOURCE_PATH), path)), - new CopyListingFileStatus(fs.getFileStatus(path)), context); - } - - // Check that the maps worked. - for (Path path : pathList) { - final Path targetPath = new Path(path.toString() - .replaceAll(SOURCE_PATH, TARGET_PATH)); - Assert.assertTrue(fs.exists(targetPath)); - Assert.assertTrue(fs.isFile(targetPath) == fs.isFile(path)); - FileStatus sourceStatus = fs.getFileStatus(path); - FileStatus targetStatus = fs.getFileStatus(targetPath); - Assert.assertEquals(sourceStatus.getReplication(), - targetStatus.getReplication()); - if (preserveChecksum) { - Assert.assertEquals(sourceStatus.getBlockSize(), - targetStatus.getBlockSize()); - } - Assert.assertTrue(!fs.isFile(targetPath) - || fs.getFileChecksum(targetPath).equals(fs.getFileChecksum(path))); - } - - Assert.assertEquals(pathList.size(), - stubContext.getReporter().getCounter(CopyMapper.Counter.COPY).getValue()); - if (!preserveChecksum) { - Assert.assertEquals(nFiles * DEFAULT_FILE_SIZE, stubContext - .getReporter().getCounter(CopyMapper.Counter.BYTESCOPIED) - .getValue()); - } else { - Assert.assertEquals(nFiles * NON_DEFAULT_BLOCK_SIZE * 2, stubContext - .getReporter().getCounter(CopyMapper.Counter.BYTESCOPIED) - .getValue()); - } - - testCopyingExistingFiles(fs, copyMapper, context); - for (Text value : stubContext.getWriter().values()) { - Assert.assertTrue(value.toString() + " is not skipped", value.toString().startsWith("SKIP:")); - } + // do the distcp again with -update and -append option + CopyMapper copyMapper = new CopyMapper(); + StubContext stubContext = new StubContext(getConfiguration(), null, 0); + Mapper.Context context = + stubContext.getContext(); + // Enable append + context.getConfiguration().setBoolean( + DistCpOptionSwitch.APPEND.getConfigLabel(), true); + copyMapper.setup(context); + for (Path path: pathList) { + copyMapper.map(new Text(DistCpUtils.getRelativePath(new Path(SOURCE_PATH), path)), + new CopyListingFileStatus(cluster.getFileSystem().getFileStatus( + path)), context); } - catch (Exception e) { - LOG.error("Unexpected exception: ", e); - Assert.assertTrue(false); + + verifyCopy(fs, false); + // verify that we only copied new appended data + Assert.assertEquals(nFiles * DEFAULT_FILE_SIZE * 2, stubContext + .getReporter().getCounter(CopyMapper.Counter.BYTESCOPIED) + .getValue()); + Assert.assertEquals(pathList.size(), stubContext.getReporter(). + getCounter(CopyMapper.Counter.COPY).getValue()); + } + + private void testCopy(boolean preserveChecksum) throws Exception { + deleteState(); + if (preserveChecksum) { + createSourceDataWithDifferentChecksumType(); + } else { + createSourceData(); + } + + FileSystem fs = cluster.getFileSystem(); + CopyMapper copyMapper = new CopyMapper(); + StubContext stubContext = new StubContext(getConfiguration(), null, 0); + Mapper.Context context + = stubContext.getContext(); + + Configuration configuration = context.getConfiguration(); + EnumSet fileAttributes + = EnumSet.of(DistCpOptions.FileAttribute.REPLICATION); + if (preserveChecksum) { + fileAttributes.add(DistCpOptions.FileAttribute.CHECKSUMTYPE); + } + configuration.set(DistCpOptionSwitch.PRESERVE_STATUS.getConfigLabel(), + DistCpUtils.packAttributes(fileAttributes)); + + copyMapper.setup(context); + + for (Path path: pathList) { + copyMapper.map( + new Text(DistCpUtils.getRelativePath(new Path(SOURCE_PATH), path)), + new CopyListingFileStatus(fs.getFileStatus(path)), context); + } + + // Check that the maps worked. + verifyCopy(fs, preserveChecksum); + Assert.assertEquals(pathList.size(), stubContext.getReporter() + .getCounter(CopyMapper.Counter.COPY).getValue()); + if (!preserveChecksum) { + Assert.assertEquals(nFiles * DEFAULT_FILE_SIZE, stubContext + .getReporter().getCounter(CopyMapper.Counter.BYTESCOPIED) + .getValue()); + } else { + Assert.assertEquals(nFiles * NON_DEFAULT_BLOCK_SIZE * 2, stubContext + .getReporter().getCounter(CopyMapper.Counter.BYTESCOPIED) + .getValue()); + } + + testCopyingExistingFiles(fs, copyMapper, context); + for (Text value : stubContext.getWriter().values()) { + Assert.assertTrue(value.toString() + " is not skipped", value + .toString().startsWith("SKIP:")); + } + } + + private void verifyCopy(FileSystem fs, boolean preserveChecksum) + throws Exception { + for (Path path : pathList) { + final Path targetPath = new Path(path.toString().replaceAll(SOURCE_PATH, + TARGET_PATH)); + Assert.assertTrue(fs.exists(targetPath)); + Assert.assertTrue(fs.isFile(targetPath) == fs.isFile(path)); + FileStatus sourceStatus = fs.getFileStatus(path); + FileStatus targetStatus = fs.getFileStatus(targetPath); + Assert.assertEquals(sourceStatus.getReplication(), + targetStatus.getReplication()); + if (preserveChecksum) { + Assert.assertEquals(sourceStatus.getBlockSize(), + targetStatus.getBlockSize()); + } + Assert.assertTrue(!fs.isFile(targetPath) + || fs.getFileChecksum(targetPath).equals(fs.getFileChecksum(path))); } } diff --git a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestRetriableFileCopyCommand.java b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestRetriableFileCopyCommand.java index c5ec513bec5..f1b8532b74c 100644 --- a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestRetriableFileCopyCommand.java +++ b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestRetriableFileCopyCommand.java @@ -22,6 +22,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapreduce.*; +import org.apache.hadoop.tools.mapred.CopyMapper.FileAction; import org.junit.Test; import static org.junit.Assert.*; import static org.mockito.Mockito.*; @@ -48,8 +49,8 @@ public class TestRetriableFileCopyCommand { Exception actualEx = null; try { - new RetriableFileCopyCommand("testFailOnCloseError") - .copyBytes(stat, out, 512, context); + new RetriableFileCopyCommand("testFailOnCloseError", FileAction.OVERWRITE) + .copyBytes(stat, 0, out, 512, context); } catch (Exception e) { actualEx = e; }