MAPREDUCE-5899. Support incremental data copy in DistCp. Contributed by Jing Zhao.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1596931 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
5b1e88b6d2
commit
3671a5e16f
|
@ -2140,9 +2140,21 @@ public abstract class FileSystem extends Configured implements Closeable {
|
|||
* in the corresponding FileSystem.
|
||||
*/
|
||||
public FileChecksum getFileChecksum(Path f) throws IOException {
|
||||
return getFileChecksum(f, Long.MAX_VALUE);
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the checksum of a file, from the beginning of the file till the
|
||||
* specific length.
|
||||
* @param f The file path
|
||||
* @param length The length of the file range for checksum calculation
|
||||
* @return The file checksum.
|
||||
*/
|
||||
public FileChecksum getFileChecksum(Path f, final long length)
|
||||
throws IOException {
|
||||
return null;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Set the verify checksum flag. This is only applicable if the
|
||||
* corresponding FileSystem supports checksum. By default doesn't do anything.
|
||||
|
|
|
@ -31,7 +31,6 @@ import org.apache.hadoop.conf.Configuration;
|
|||
import org.apache.hadoop.fs.permission.AclEntry;
|
||||
import org.apache.hadoop.fs.permission.AclStatus;
|
||||
import org.apache.hadoop.fs.permission.FsPermission;
|
||||
import org.apache.hadoop.fs.ContentSummary;
|
||||
import org.apache.hadoop.fs.Options.ChecksumOpt;
|
||||
import org.apache.hadoop.security.AccessControlException;
|
||||
import org.apache.hadoop.util.Progressable;
|
||||
|
@ -428,7 +427,12 @@ public class FilterFileSystem extends FileSystem {
|
|||
public FileChecksum getFileChecksum(Path f) throws IOException {
|
||||
return fs.getFileChecksum(f);
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public FileChecksum getFileChecksum(Path f, long length) throws IOException {
|
||||
return fs.getFileChecksum(f, length);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setVerifyChecksum(boolean verifyChecksum) {
|
||||
fs.setVerifyChecksum(verifyChecksum);
|
||||
|
|
|
@ -687,7 +687,7 @@ public class HarFileSystem extends FileSystem {
|
|||
* @return null since no checksum algorithm is implemented.
|
||||
*/
|
||||
@Override
|
||||
public FileChecksum getFileChecksum(Path f) {
|
||||
public FileChecksum getFileChecksum(Path f, long length) {
|
||||
return null;
|
||||
}
|
||||
|
||||
|
|
|
@ -139,6 +139,7 @@ public class TestHarFileSystem {
|
|||
public int getDefaultPort();
|
||||
public String getCanonicalServiceName();
|
||||
public Token<?> getDelegationToken(String renewer) throws IOException;
|
||||
public FileChecksum getFileChecksum(Path f) throws IOException;
|
||||
public boolean deleteOnExit(Path f) throws IOException;
|
||||
public boolean cancelDeleteOnExit(Path f) throws IOException;
|
||||
public Token<?>[] addDelegationTokens(String renewer, Credentials creds)
|
||||
|
@ -223,10 +224,16 @@ public class TestHarFileSystem {
|
|||
}
|
||||
|
||||
@Test
|
||||
public void testFileChecksum() {
|
||||
public void testFileChecksum() throws Exception {
|
||||
final Path p = new Path("har://file-localhost/foo.har/file1");
|
||||
final HarFileSystem harfs = new HarFileSystem();
|
||||
Assert.assertEquals(null, harfs.getFileChecksum(p));
|
||||
try {
|
||||
Assert.assertEquals(null, harfs.getFileChecksum(p));
|
||||
} finally {
|
||||
if (harfs != null) {
|
||||
harfs.close();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -116,7 +116,7 @@ public class Hdfs extends AbstractFileSystem {
|
|||
@Override
|
||||
public FileChecksum getFileChecksum(Path f)
|
||||
throws IOException, UnresolvedLinkException {
|
||||
return dfs.getFileChecksum(getUriPath(f));
|
||||
return dfs.getFileChecksum(getUriPath(f), Long.MAX_VALUE);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -1801,15 +1801,19 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory {
|
|||
}
|
||||
|
||||
/**
|
||||
* Get the checksum of a file.
|
||||
* Get the checksum of the whole file of a range of the file. Note that the
|
||||
* range always starts from the beginning of the file.
|
||||
* @param src The file path
|
||||
* @param length The length of the range
|
||||
* @return The checksum
|
||||
* @see DistributedFileSystem#getFileChecksum(Path)
|
||||
*/
|
||||
public MD5MD5CRC32FileChecksum getFileChecksum(String src) throws IOException {
|
||||
public MD5MD5CRC32FileChecksum getFileChecksum(String src, long length)
|
||||
throws IOException {
|
||||
checkOpen();
|
||||
return getFileChecksum(src, clientName, namenode, socketFactory,
|
||||
dfsClientConf.socketTimeout, getDataEncryptionKey(),
|
||||
Preconditions.checkArgument(length >= 0);
|
||||
return getFileChecksum(src, length, clientName, namenode,
|
||||
socketFactory, dfsClientConf.socketTimeout, getDataEncryptionKey(),
|
||||
dfsClientConf.connectToDnViaHostname);
|
||||
}
|
||||
|
||||
|
@ -1850,8 +1854,9 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory {
|
|||
}
|
||||
|
||||
/**
|
||||
* Get the checksum of a file.
|
||||
* Get the checksum of the whole file or a range of the file.
|
||||
* @param src The file path
|
||||
* @param length the length of the range, i.e., the range is [0, length]
|
||||
* @param clientName the name of the client requesting the checksum.
|
||||
* @param namenode the RPC proxy for the namenode
|
||||
* @param socketFactory to create sockets to connect to DNs
|
||||
|
@ -1861,12 +1866,13 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory {
|
|||
* @return The checksum
|
||||
*/
|
||||
private static MD5MD5CRC32FileChecksum getFileChecksum(String src,
|
||||
String clientName,
|
||||
ClientProtocol namenode, SocketFactory socketFactory, int socketTimeout,
|
||||
long length, String clientName, ClientProtocol namenode,
|
||||
SocketFactory socketFactory, int socketTimeout,
|
||||
DataEncryptionKey encryptionKey, boolean connectToDnViaHostname)
|
||||
throws IOException {
|
||||
//get all block locations
|
||||
LocatedBlocks blockLocations = callGetBlockLocations(namenode, src, 0, Long.MAX_VALUE);
|
||||
//get block locations for the file range
|
||||
LocatedBlocks blockLocations = callGetBlockLocations(namenode, src, 0,
|
||||
length);
|
||||
if (null == blockLocations) {
|
||||
throw new FileNotFoundException("File does not exist: " + src);
|
||||
}
|
||||
|
@ -1878,10 +1884,11 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory {
|
|||
boolean refetchBlocks = false;
|
||||
int lastRetriedIndex = -1;
|
||||
|
||||
//get block checksum for each block
|
||||
for(int i = 0; i < locatedblocks.size(); i++) {
|
||||
// get block checksum for each block
|
||||
long remaining = length;
|
||||
for(int i = 0; i < locatedblocks.size() && remaining > 0; i++) {
|
||||
if (refetchBlocks) { // refetch to get fresh tokens
|
||||
blockLocations = callGetBlockLocations(namenode, src, 0, Long.MAX_VALUE);
|
||||
blockLocations = callGetBlockLocations(namenode, src, 0, length);
|
||||
if (null == blockLocations) {
|
||||
throw new FileNotFoundException("File does not exist: " + src);
|
||||
}
|
||||
|
@ -1890,6 +1897,10 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory {
|
|||
}
|
||||
LocatedBlock lb = locatedblocks.get(i);
|
||||
final ExtendedBlock block = lb.getBlock();
|
||||
if (remaining < block.getNumBytes()) {
|
||||
block.setNumBytes(remaining);
|
||||
}
|
||||
remaining -= block.getNumBytes();
|
||||
final DatanodeInfo[] datanodes = lb.getLocations();
|
||||
|
||||
//try each datanode location of the block
|
||||
|
|
|
@ -68,14 +68,12 @@ import org.apache.hadoop.hdfs.protocol.CachePoolEntry;
|
|||
import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
|
||||
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
||||
import org.apache.hadoop.hdfs.protocol.DirectoryListing;
|
||||
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
|
||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants.RollingUpgradeAction;
|
||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
|
||||
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
|
||||
import org.apache.hadoop.hdfs.protocol.HdfsLocatedFileStatus;
|
||||
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
||||
import org.apache.hadoop.hdfs.protocol.RollingUpgradeInfo;
|
||||
import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport;
|
||||
import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus;
|
||||
|
@ -85,7 +83,6 @@ import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
|||
import org.apache.hadoop.io.Text;
|
||||
import org.apache.hadoop.net.NetUtils;
|
||||
import org.apache.hadoop.security.AccessControlException;
|
||||
import org.apache.hadoop.security.token.SecretManager.InvalidToken;
|
||||
import org.apache.hadoop.security.token.Token;
|
||||
import org.apache.hadoop.util.Progressable;
|
||||
|
||||
|
@ -1142,7 +1139,7 @@ public class DistributedFileSystem extends FileSystem {
|
|||
@Override
|
||||
public FileChecksum doCall(final Path p)
|
||||
throws IOException, UnresolvedLinkException {
|
||||
return dfs.getFileChecksum(getPathName(p));
|
||||
return dfs.getFileChecksum(getPathName(p), Long.MAX_VALUE);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -1153,6 +1150,32 @@ public class DistributedFileSystem extends FileSystem {
|
|||
}.resolve(this, absF);
|
||||
}
|
||||
|
||||
@Override
|
||||
public FileChecksum getFileChecksum(Path f, final long length)
|
||||
throws IOException {
|
||||
statistics.incrementReadOps(1);
|
||||
Path absF = fixRelativePart(f);
|
||||
return new FileSystemLinkResolver<FileChecksum>() {
|
||||
@Override
|
||||
public FileChecksum doCall(final Path p)
|
||||
throws IOException, UnresolvedLinkException {
|
||||
return dfs.getFileChecksum(getPathName(p), length);
|
||||
}
|
||||
|
||||
@Override
|
||||
public FileChecksum next(final FileSystem fs, final Path p)
|
||||
throws IOException {
|
||||
if (fs instanceof DistributedFileSystem) {
|
||||
return ((DistributedFileSystem) fs).getFileChecksum(p, length);
|
||||
} else {
|
||||
throw new UnsupportedFileSystemException(
|
||||
"getFileChecksum(Path, long) is not supported by "
|
||||
+ fs.getClass().getSimpleName());
|
||||
}
|
||||
}
|
||||
}.resolve(this, absF);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setPermission(Path p, final FsPermission permission
|
||||
) throws IOException {
|
||||
|
|
|
@ -42,6 +42,7 @@ import java.net.Socket;
|
|||
import java.net.SocketException;
|
||||
import java.net.UnknownHostException;
|
||||
import java.nio.channels.ClosedChannelException;
|
||||
import java.security.MessageDigest;
|
||||
import java.util.Arrays;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
|
@ -83,6 +84,7 @@ import org.apache.hadoop.security.token.SecretManager.InvalidToken;
|
|||
import org.apache.hadoop.security.token.Token;
|
||||
import org.apache.hadoop.util.DataChecksum;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.net.InetAddresses;
|
||||
import com.google.protobuf.ByteString;
|
||||
|
||||
|
@ -802,7 +804,44 @@ class DataXceiver extends Receiver implements Runnable {
|
|||
IOUtils.closeStream(out);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
private MD5Hash calcPartialBlockChecksum(ExtendedBlock block,
|
||||
long requestLength, DataChecksum checksum, DataInputStream checksumIn)
|
||||
throws IOException {
|
||||
final int bytesPerCRC = checksum.getBytesPerChecksum();
|
||||
final int csize = checksum.getChecksumSize();
|
||||
final byte[] buffer = new byte[4*1024];
|
||||
MessageDigest digester = MD5Hash.getDigester();
|
||||
|
||||
long remaining = requestLength / bytesPerCRC * csize;
|
||||
for (int toDigest = 0; remaining > 0; remaining -= toDigest) {
|
||||
toDigest = checksumIn.read(buffer, 0,
|
||||
(int) Math.min(remaining, buffer.length));
|
||||
if (toDigest < 0) {
|
||||
break;
|
||||
}
|
||||
digester.update(buffer, 0, toDigest);
|
||||
}
|
||||
|
||||
int partialLength = (int) (requestLength % bytesPerCRC);
|
||||
if (partialLength > 0) {
|
||||
byte[] buf = new byte[partialLength];
|
||||
final InputStream blockIn = datanode.data.getBlockInputStream(block,
|
||||
requestLength - partialLength);
|
||||
try {
|
||||
// Get the CRC of the partialLength.
|
||||
IOUtils.readFully(blockIn, buf, 0, partialLength);
|
||||
} finally {
|
||||
IOUtils.closeStream(blockIn);
|
||||
}
|
||||
checksum.update(buf, 0, partialLength);
|
||||
byte[] partialCrc = new byte[csize];
|
||||
checksum.writeValue(partialCrc, 0, true);
|
||||
digester.update(partialCrc);
|
||||
}
|
||||
return new MD5Hash(digester.digest());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void blockChecksum(final ExtendedBlock block,
|
||||
final Token<BlockTokenIdentifier> blockToken) throws IOException {
|
||||
|
@ -810,25 +849,32 @@ class DataXceiver extends Receiver implements Runnable {
|
|||
getOutputStream());
|
||||
checkAccess(out, true, block, blockToken,
|
||||
Op.BLOCK_CHECKSUM, BlockTokenSecretManager.AccessMode.READ);
|
||||
updateCurrentThreadName("Reading metadata for block " + block);
|
||||
final LengthInputStream metadataIn =
|
||||
datanode.data.getMetaDataInputStream(block);
|
||||
final DataInputStream checksumIn = new DataInputStream(new BufferedInputStream(
|
||||
metadataIn, HdfsConstants.IO_FILE_BUFFER_SIZE));
|
||||
// client side now can specify a range of the block for checksum
|
||||
long requestLength = block.getNumBytes();
|
||||
Preconditions.checkArgument(requestLength >= 0);
|
||||
long visibleLength = datanode.data.getReplicaVisibleLength(block);
|
||||
boolean partialBlk = requestLength < visibleLength;
|
||||
|
||||
updateCurrentThreadName("Reading metadata for block " + block);
|
||||
final LengthInputStream metadataIn = datanode.data
|
||||
.getMetaDataInputStream(block);
|
||||
|
||||
final DataInputStream checksumIn = new DataInputStream(
|
||||
new BufferedInputStream(metadataIn, HdfsConstants.IO_FILE_BUFFER_SIZE));
|
||||
updateCurrentThreadName("Getting checksum for block " + block);
|
||||
try {
|
||||
//read metadata file
|
||||
final BlockMetadataHeader header = BlockMetadataHeader.readHeader(checksumIn);
|
||||
final DataChecksum checksum = header.getChecksum();
|
||||
final BlockMetadataHeader header = BlockMetadataHeader
|
||||
.readHeader(checksumIn);
|
||||
final DataChecksum checksum = header.getChecksum();
|
||||
final int csize = checksum.getChecksumSize();
|
||||
final int bytesPerCRC = checksum.getBytesPerChecksum();
|
||||
final long crcPerBlock = checksum.getChecksumSize() > 0
|
||||
? (metadataIn.getLength() - BlockMetadataHeader.getHeaderSize())/checksum.getChecksumSize()
|
||||
: 0;
|
||||
|
||||
//compute block checksum
|
||||
final MD5Hash md5 = MD5Hash.digest(checksumIn);
|
||||
final long crcPerBlock = csize <= 0 ? 0 :
|
||||
(metadataIn.getLength() - BlockMetadataHeader.getHeaderSize()) / csize;
|
||||
|
||||
final MD5Hash md5 = partialBlk && crcPerBlock > 0 ?
|
||||
calcPartialBlockChecksum(block, requestLength, checksum, checksumIn)
|
||||
: MD5Hash.digest(checksumIn);
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("block=" + block + ", bytesPerCRC=" + bytesPerCRC
|
||||
+ ", crcPerBlock=" + crcPerBlock + ", md5=" + md5);
|
||||
|
@ -841,8 +887,7 @@ class DataXceiver extends Receiver implements Runnable {
|
|||
.setBytesPerCrc(bytesPerCRC)
|
||||
.setCrcPerBlock(crcPerBlock)
|
||||
.setMd5(ByteString.copyFrom(md5.getDigest()))
|
||||
.setCrcType(PBHelper.convert(checksum.getChecksumType()))
|
||||
)
|
||||
.setCrcType(PBHelper.convert(checksum.getChecksumType())))
|
||||
.build()
|
||||
.writeDelimitedTo(out);
|
||||
out.flush();
|
||||
|
|
|
@ -74,7 +74,6 @@ import org.apache.hadoop.hdfs.web.resources.PutOpParam;
|
|||
import org.apache.hadoop.hdfs.web.resources.ReplicationParam;
|
||||
import org.apache.hadoop.hdfs.web.resources.UriFsPathParam;
|
||||
import org.apache.hadoop.io.IOUtils;
|
||||
import org.apache.hadoop.io.Text;
|
||||
import org.apache.hadoop.security.SecurityUtil;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.security.token.Token;
|
||||
|
@ -452,7 +451,7 @@ public class DatanodeWebHdfsMethods {
|
|||
MD5MD5CRC32FileChecksum checksum = null;
|
||||
DFSClient dfsclient = newDfsClient(nnId, conf);
|
||||
try {
|
||||
checksum = dfsclient.getFileChecksum(fullpath);
|
||||
checksum = dfsclient.getFileChecksum(fullpath, Long.MAX_VALUE);
|
||||
dfsclient.close();
|
||||
dfsclient = null;
|
||||
} finally {
|
||||
|
|
|
@ -0,0 +1,75 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hdfs;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileChecksum;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.junit.After;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
public class TestGetFileChecksum {
|
||||
private static final int BLOCKSIZE = 1024;
|
||||
private static final short REPLICATION = 3;
|
||||
|
||||
private Configuration conf;
|
||||
private MiniDFSCluster cluster;
|
||||
private DistributedFileSystem dfs;
|
||||
|
||||
@Before
|
||||
public void setUp() throws Exception {
|
||||
conf = new Configuration();
|
||||
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCKSIZE);
|
||||
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(REPLICATION)
|
||||
.build();
|
||||
cluster.waitActive();
|
||||
dfs = cluster.getFileSystem();
|
||||
}
|
||||
|
||||
@After
|
||||
public void tearDown() throws Exception {
|
||||
if (cluster != null) {
|
||||
cluster.shutdown();
|
||||
}
|
||||
}
|
||||
|
||||
public void testGetFileChecksum(final Path foo, final int appendLength)
|
||||
throws Exception {
|
||||
final int appendRounds = 16;
|
||||
FileChecksum[] fc = new FileChecksum[appendRounds + 1];
|
||||
DFSTestUtil.createFile(dfs, foo, appendLength, REPLICATION, 0L);
|
||||
fc[0] = dfs.getFileChecksum(foo);
|
||||
for (int i = 0; i < appendRounds; i++) {
|
||||
DFSTestUtil.appendFile(dfs, foo, appendLength);
|
||||
fc[i + 1] = dfs.getFileChecksum(foo);
|
||||
}
|
||||
|
||||
for (int i = 0; i < appendRounds + 1; i++) {
|
||||
FileChecksum checksum = dfs.getFileChecksum(foo, appendLength * (i+1));
|
||||
Assert.assertTrue(checksum.equals(fc[i]));
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetFileChecksum() throws Exception {
|
||||
testGetFileChecksum(new Path("/foo"), BLOCKSIZE / 4);
|
||||
testGetFileChecksum(new Path("/bar"), BLOCKSIZE / 4 - 1);
|
||||
}
|
||||
}
|
|
@ -202,6 +202,8 @@ Release 2.5.0 - UNRELEASED
|
|||
|
||||
MAPREDUCE-5809. Enhance distcp to support preserving HDFS ACLs. (cnauroth)
|
||||
|
||||
MAPREDUCE-5899. Support incremental data copy in DistCp. (jing9)
|
||||
|
||||
OPTIMIZATIONS
|
||||
|
||||
BUG FIXES
|
||||
|
|
|
@ -50,6 +50,7 @@ public class DistCpConstants {
|
|||
public static final String CONF_LABEL_COPY_STRATEGY = "distcp.copy.strategy";
|
||||
public static final String CONF_LABEL_SKIP_CRC = "distcp.skip.crc";
|
||||
public static final String CONF_LABEL_OVERWRITE = "distcp.copy.overwrite";
|
||||
public static final String CONF_LABEL_APPEND = "distcp.copy.append";
|
||||
public static final String CONF_LABEL_BANDWIDTH_MB = "distcp.map.bandwidth.mb";
|
||||
|
||||
public static final String CONF_LABEL_MAX_CHUNKS_TOLERABLE =
|
||||
|
|
|
@ -138,6 +138,10 @@ public enum DistCpOptionSwitch {
|
|||
new Option("overwrite", false, "Choose to overwrite target files " +
|
||||
"unconditionally, even if they exist.")),
|
||||
|
||||
APPEND(DistCpConstants.CONF_LABEL_APPEND,
|
||||
new Option("append", false,
|
||||
"Reuse existing data in target files and append new data to them if possible")),
|
||||
|
||||
/**
|
||||
* Should DisctpExecution be blocking
|
||||
*/
|
||||
|
|
|
@ -39,6 +39,7 @@ public class DistCpOptions {
|
|||
private boolean deleteMissing = false;
|
||||
private boolean ignoreFailures = false;
|
||||
private boolean overwrite = false;
|
||||
private boolean append = false;
|
||||
private boolean skipCRC = false;
|
||||
private boolean blocking = true;
|
||||
|
||||
|
@ -244,6 +245,22 @@ public class DistCpOptions {
|
|||
this.overwrite = overwrite;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return whether we can append new data to target files
|
||||
*/
|
||||
public boolean shouldAppend() {
|
||||
return append;
|
||||
}
|
||||
|
||||
/**
|
||||
* Set if we want to append new data to target files. This is valid only with
|
||||
* update option and CRC is not skipped.
|
||||
*/
|
||||
public void setAppend(boolean append) {
|
||||
validate(DistCpOptionSwitch.APPEND, append);
|
||||
this.append = append;
|
||||
}
|
||||
|
||||
/**
|
||||
* Should CRC/checksum check be skipped while checking files are identical
|
||||
*
|
||||
|
@ -472,6 +489,7 @@ public class DistCpOptions {
|
|||
value : this.atomicCommit);
|
||||
boolean skipCRC = (option == DistCpOptionSwitch.SKIP_CRC ?
|
||||
value : this.skipCRC);
|
||||
boolean append = (option == DistCpOptionSwitch.APPEND ? value : this.append);
|
||||
|
||||
if (syncFolder && atomicCommit) {
|
||||
throw new IllegalArgumentException("Atomic commit can't be used with " +
|
||||
|
@ -492,6 +510,14 @@ public class DistCpOptions {
|
|||
throw new IllegalArgumentException("Skip CRC is valid only with update options");
|
||||
}
|
||||
|
||||
if (!syncFolder && append) {
|
||||
throw new IllegalArgumentException(
|
||||
"Append is valid only with update options");
|
||||
}
|
||||
if (skipCRC && append) {
|
||||
throw new IllegalArgumentException(
|
||||
"Append is disallowed when skipping CRC");
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -510,6 +536,8 @@ public class DistCpOptions {
|
|||
String.valueOf(deleteMissing));
|
||||
DistCpOptionSwitch.addToConf(conf, DistCpOptionSwitch.OVERWRITE,
|
||||
String.valueOf(overwrite));
|
||||
DistCpOptionSwitch.addToConf(conf, DistCpOptionSwitch.APPEND,
|
||||
String.valueOf(append));
|
||||
DistCpOptionSwitch.addToConf(conf, DistCpOptionSwitch.SKIP_CRC,
|
||||
String.valueOf(skipCRC));
|
||||
DistCpOptionSwitch.addToConf(conf, DistCpOptionSwitch.BANDWIDTH,
|
||||
|
|
|
@ -140,6 +140,10 @@ public class OptionsParser {
|
|||
option.setOverwrite(true);
|
||||
}
|
||||
|
||||
if (command.hasOption(DistCpOptionSwitch.APPEND.getSwitch())) {
|
||||
option.setAppend(true);
|
||||
}
|
||||
|
||||
if (command.hasOption(DistCpOptionSwitch.DELETE_MISSING.getSwitch())) {
|
||||
option.setDeleteMissing(true);
|
||||
}
|
||||
|
|
|
@ -18,13 +18,20 @@
|
|||
|
||||
package org.apache.hadoop.tools.mapred;
|
||||
|
||||
import java.io.FileNotFoundException;
|
||||
import java.io.FileOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.OutputStream;
|
||||
import java.util.Arrays;
|
||||
import java.util.EnumSet;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileChecksum;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.permission.AclEntry;
|
||||
import org.apache.hadoop.io.Text;
|
||||
import org.apache.hadoop.mapreduce.JobContext;
|
||||
import org.apache.hadoop.mapreduce.Mapper;
|
||||
|
@ -36,11 +43,6 @@ import org.apache.hadoop.tools.DistCpOptions.FileAttribute;
|
|||
import org.apache.hadoop.tools.util.DistCpUtils;
|
||||
import org.apache.hadoop.util.StringUtils;
|
||||
|
||||
import java.io.*;
|
||||
import java.util.EnumSet;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* Mapper class that executes the DistCp copy operation.
|
||||
* Implements the o.a.h.mapreduce.Mapper<> interface.
|
||||
|
@ -62,6 +64,15 @@ public class CopyMapper extends Mapper<Text, CopyListingFileStatus, Text, Text>
|
|||
BYTESSKIPPED, // Number of bytes that were skipped from copy.
|
||||
}
|
||||
|
||||
/**
|
||||
* Indicate the action for each file
|
||||
*/
|
||||
static enum FileAction {
|
||||
SKIP, // Skip copying the file since it's already in the target FS
|
||||
APPEND, // Only need to append new data to the file in the target FS
|
||||
OVERWRITE, // Overwrite the whole file
|
||||
}
|
||||
|
||||
private static Log LOG = LogFactory.getLog(CopyMapper.class);
|
||||
|
||||
private Configuration conf;
|
||||
|
@ -70,6 +81,7 @@ public class CopyMapper extends Mapper<Text, CopyListingFileStatus, Text, Text>
|
|||
private boolean ignoreFailures = false;
|
||||
private boolean skipCrc = false;
|
||||
private boolean overWrite = false;
|
||||
private boolean append = false;
|
||||
private EnumSet<FileAttribute> preserve = EnumSet.noneOf(FileAttribute.class);
|
||||
|
||||
private FileSystem targetFS = null;
|
||||
|
@ -90,6 +102,7 @@ public class CopyMapper extends Mapper<Text, CopyListingFileStatus, Text, Text>
|
|||
ignoreFailures = conf.getBoolean(DistCpOptionSwitch.IGNORE_FAILURES.getConfigLabel(), false);
|
||||
skipCrc = conf.getBoolean(DistCpOptionSwitch.SKIP_CRC.getConfigLabel(), false);
|
||||
overWrite = conf.getBoolean(DistCpOptionSwitch.OVERWRITE.getConfigLabel(), false);
|
||||
append = conf.getBoolean(DistCpOptionSwitch.APPEND.getConfigLabel(), false);
|
||||
preserve = DistCpUtils.unpackAttributes(conf.get(DistCpOptionSwitch.
|
||||
PRESERVE_STATUS.getConfigLabel()));
|
||||
|
||||
|
@ -224,20 +237,19 @@ public class CopyMapper extends Mapper<Text, CopyListingFileStatus, Text, Text>
|
|||
return;
|
||||
}
|
||||
|
||||
if (skipFile(sourceFS, sourceCurrStatus, target)) {
|
||||
FileAction action = checkUpdate(sourceFS, sourceCurrStatus, target);
|
||||
if (action == FileAction.SKIP) {
|
||||
LOG.info("Skipping copy of " + sourceCurrStatus.getPath()
|
||||
+ " to " + target);
|
||||
updateSkipCounters(context, sourceCurrStatus);
|
||||
context.write(null, new Text("SKIP: " + sourceCurrStatus.getPath()));
|
||||
}
|
||||
else {
|
||||
} else {
|
||||
copyFileWithRetry(description, sourceCurrStatus, target, context,
|
||||
fileAttributes);
|
||||
action, fileAttributes);
|
||||
}
|
||||
|
||||
DistCpUtils.preserve(target.getFileSystem(conf), target,
|
||||
sourceCurrStatus, fileAttributes);
|
||||
|
||||
} catch (IOException exception) {
|
||||
handleFailures(exception, sourceFileStatus, target, context);
|
||||
}
|
||||
|
@ -254,14 +266,14 @@ public class CopyMapper extends Mapper<Text, CopyListingFileStatus, Text, Text>
|
|||
return DistCpUtils.unpackAttributes(attributeString);
|
||||
}
|
||||
|
||||
private void copyFileWithRetry(String description, FileStatus sourceFileStatus,
|
||||
Path target, Context context,
|
||||
EnumSet<DistCpOptions.FileAttribute> fileAttributes) throws IOException {
|
||||
|
||||
private void copyFileWithRetry(String description,
|
||||
FileStatus sourceFileStatus, Path target, Context context,
|
||||
FileAction action, EnumSet<DistCpOptions.FileAttribute> fileAttributes)
|
||||
throws IOException {
|
||||
long bytesCopied;
|
||||
try {
|
||||
bytesCopied = (Long)new RetriableFileCopyCommand(skipCrc, description)
|
||||
.execute(sourceFileStatus, target, context, fileAttributes);
|
||||
bytesCopied = (Long) new RetriableFileCopyCommand(skipCrc, description,
|
||||
action).execute(sourceFileStatus, target, context, fileAttributes);
|
||||
} catch (Exception e) {
|
||||
context.setStatus("Copy Failure: " + sourceFileStatus.getPath());
|
||||
throw new IOException("File copy failed: " + sourceFileStatus.getPath() +
|
||||
|
@ -311,25 +323,48 @@ public class CopyMapper extends Mapper<Text, CopyListingFileStatus, Text, Text>
|
|||
context.getCounter(counter).increment(value);
|
||||
}
|
||||
|
||||
private boolean skipFile(FileSystem sourceFS, FileStatus source, Path target)
|
||||
throws IOException {
|
||||
return targetFS.exists(target)
|
||||
&& !overWrite
|
||||
&& !mustUpdate(sourceFS, source, target);
|
||||
private FileAction checkUpdate(FileSystem sourceFS, FileStatus source,
|
||||
Path target) throws IOException {
|
||||
final FileStatus targetFileStatus;
|
||||
try {
|
||||
targetFileStatus = targetFS.getFileStatus(target);
|
||||
} catch (FileNotFoundException e) {
|
||||
return FileAction.OVERWRITE;
|
||||
}
|
||||
if (targetFileStatus != null && !overWrite) {
|
||||
if (canSkip(sourceFS, source, targetFileStatus)) {
|
||||
return FileAction.SKIP;
|
||||
} else if (append) {
|
||||
long targetLen = targetFileStatus.getLen();
|
||||
if (targetLen < source.getLen()) {
|
||||
FileChecksum sourceChecksum = sourceFS.getFileChecksum(
|
||||
source.getPath(), targetLen);
|
||||
if (sourceChecksum != null
|
||||
&& sourceChecksum.equals(targetFS.getFileChecksum(target))) {
|
||||
// We require that the checksum is not null. Thus currently only
|
||||
// DistributedFileSystem is supported
|
||||
return FileAction.APPEND;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return FileAction.OVERWRITE;
|
||||
}
|
||||
|
||||
private boolean mustUpdate(FileSystem sourceFS, FileStatus source, Path target)
|
||||
throws IOException {
|
||||
final FileStatus targetFileStatus = targetFS.getFileStatus(target);
|
||||
|
||||
return syncFolders
|
||||
&& (
|
||||
targetFileStatus.getLen() != source.getLen()
|
||||
|| (!skipCrc &&
|
||||
!DistCpUtils.checksumsAreEqual(sourceFS,
|
||||
source.getPath(), null, targetFS, target))
|
||||
|| (source.getBlockSize() != targetFileStatus.getBlockSize() &&
|
||||
preserve.contains(FileAttribute.BLOCKSIZE))
|
||||
);
|
||||
private boolean canSkip(FileSystem sourceFS, FileStatus source,
|
||||
FileStatus target) throws IOException {
|
||||
if (!syncFolders) {
|
||||
return true;
|
||||
}
|
||||
boolean sameLength = target.getLen() == source.getLen();
|
||||
boolean sameBlockSize = source.getBlockSize() == target.getBlockSize()
|
||||
|| !preserve.contains(FileAttribute.BLOCKSIZE);
|
||||
if (sameLength && sameBlockSize) {
|
||||
return skipCrc ||
|
||||
DistCpUtils.checksumsAreEqual(sourceFS, source.getPath(), null,
|
||||
targetFS, target.getPath());
|
||||
} else {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -18,10 +18,8 @@
|
|||
|
||||
package org.apache.hadoop.tools.mapred;
|
||||
|
||||
import java.io.BufferedInputStream;
|
||||
import java.io.BufferedOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.io.OutputStream;
|
||||
import java.util.EnumSet;
|
||||
|
||||
|
@ -29,6 +27,8 @@ import org.apache.commons.logging.Log;
|
|||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.CreateFlag;
|
||||
import org.apache.hadoop.fs.FSDataInputStream;
|
||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||
import org.apache.hadoop.fs.FileChecksum;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
|
@ -39,6 +39,7 @@ import org.apache.hadoop.io.IOUtils;
|
|||
import org.apache.hadoop.mapreduce.Mapper;
|
||||
import org.apache.hadoop.tools.DistCpConstants;
|
||||
import org.apache.hadoop.tools.DistCpOptions.FileAttribute;
|
||||
import org.apache.hadoop.tools.mapred.CopyMapper.FileAction;
|
||||
import org.apache.hadoop.tools.util.DistCpUtils;
|
||||
import org.apache.hadoop.tools.util.RetriableCommand;
|
||||
import org.apache.hadoop.tools.util.ThrottledInputStream;
|
||||
|
@ -54,13 +55,15 @@ public class RetriableFileCopyCommand extends RetriableCommand {
|
|||
private static Log LOG = LogFactory.getLog(RetriableFileCopyCommand.class);
|
||||
private static int BUFFER_SIZE = 8 * 1024;
|
||||
private boolean skipCrc = false;
|
||||
private FileAction action;
|
||||
|
||||
/**
|
||||
* Constructor, taking a description of the action.
|
||||
* @param description Verbose description of the copy operation.
|
||||
*/
|
||||
public RetriableFileCopyCommand(String description) {
|
||||
public RetriableFileCopyCommand(String description, FileAction action) {
|
||||
super(description);
|
||||
this.action = action;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -68,9 +71,11 @@ public class RetriableFileCopyCommand extends RetriableCommand {
|
|||
*
|
||||
* @param skipCrc Whether to skip the crc check.
|
||||
* @param description A verbose description of the copy operation.
|
||||
* @param action We should overwrite the target file or append new data to it.
|
||||
*/
|
||||
public RetriableFileCopyCommand(boolean skipCrc, String description) {
|
||||
this(description);
|
||||
public RetriableFileCopyCommand(boolean skipCrc, String description,
|
||||
FileAction action) {
|
||||
this(description, action);
|
||||
this.skipCrc = skipCrc;
|
||||
}
|
||||
|
||||
|
@ -96,18 +101,17 @@ public class RetriableFileCopyCommand extends RetriableCommand {
|
|||
}
|
||||
|
||||
private long doCopy(FileStatus sourceFileStatus, Path target,
|
||||
Mapper.Context context,
|
||||
EnumSet<FileAttribute> fileAttributes)
|
||||
throws IOException {
|
||||
|
||||
Path tmpTargetPath = getTmpFile(target, context);
|
||||
Mapper.Context context, EnumSet<FileAttribute> fileAttributes)
|
||||
throws IOException {
|
||||
final boolean toAppend = action == FileAction.APPEND;
|
||||
Path targetPath = toAppend ? target : getTmpFile(target, context);
|
||||
final Configuration configuration = context.getConfiguration();
|
||||
FileSystem targetFS = target.getFileSystem(configuration);
|
||||
|
||||
try {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Copying " + sourceFileStatus.getPath() + " to " + target);
|
||||
LOG.debug("Tmp-file path: " + tmpTargetPath);
|
||||
LOG.debug("Target file path: " + targetPath);
|
||||
}
|
||||
final Path sourcePath = sourceFileStatus.getPath();
|
||||
final FileSystem sourceFS = sourcePath.getFileSystem(configuration);
|
||||
|
@ -115,22 +119,31 @@ public class RetriableFileCopyCommand extends RetriableCommand {
|
|||
.contains(FileAttribute.CHECKSUMTYPE) ? sourceFS
|
||||
.getFileChecksum(sourcePath) : null;
|
||||
|
||||
long bytesRead = copyToTmpFile(tmpTargetPath, targetFS, sourceFileStatus,
|
||||
context, fileAttributes, sourceChecksum);
|
||||
final long offset = action == FileAction.APPEND ? targetFS.getFileStatus(
|
||||
target).getLen() : 0;
|
||||
long bytesRead = copyToFile(targetPath, targetFS, sourceFileStatus,
|
||||
offset, context, fileAttributes, sourceChecksum);
|
||||
|
||||
compareFileLengths(sourceFileStatus, tmpTargetPath, configuration,
|
||||
bytesRead);
|
||||
compareFileLengths(sourceFileStatus, targetPath, configuration, bytesRead
|
||||
+ offset);
|
||||
//At this point, src&dest lengths are same. if length==0, we skip checksum
|
||||
if ((bytesRead != 0) && (!skipCrc)) {
|
||||
compareCheckSums(sourceFS, sourceFileStatus.getPath(), sourceChecksum,
|
||||
targetFS, tmpTargetPath);
|
||||
targetFS, targetPath);
|
||||
}
|
||||
// it's not append case, thus we first write to a temporary file, rename
|
||||
// it to the target path.
|
||||
if (!toAppend) {
|
||||
promoteTmpToTarget(targetPath, target, targetFS);
|
||||
}
|
||||
promoteTmpToTarget(tmpTargetPath, target, targetFS);
|
||||
return bytesRead;
|
||||
|
||||
} finally {
|
||||
if (targetFS.exists(tmpTargetPath))
|
||||
targetFS.delete(tmpTargetPath, false);
|
||||
// note that for append case, it is possible that we append partial data
|
||||
// and then fail. In that case, for the next retry, we either reuse the
|
||||
// partial appended data if it is good or we overwrite the whole file
|
||||
if (!toAppend && targetFS.exists(targetPath)) {
|
||||
targetFS.delete(targetPath, false);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -147,29 +160,37 @@ public class RetriableFileCopyCommand extends RetriableCommand {
|
|||
return null;
|
||||
}
|
||||
|
||||
private long copyToTmpFile(Path tmpTargetPath, FileSystem targetFS,
|
||||
FileStatus sourceFileStatus, Mapper.Context context,
|
||||
private long copyToFile(Path targetPath, FileSystem targetFS,
|
||||
FileStatus sourceFileStatus, long sourceOffset, Mapper.Context context,
|
||||
EnumSet<FileAttribute> fileAttributes, final FileChecksum sourceChecksum)
|
||||
throws IOException {
|
||||
FsPermission permission = FsPermission.getFileDefault().applyUMask(
|
||||
FsPermission.getUMask(targetFS.getConf()));
|
||||
OutputStream outStream = new BufferedOutputStream(
|
||||
targetFS.create(tmpTargetPath, permission,
|
||||
EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE), BUFFER_SIZE,
|
||||
getReplicationFactor(fileAttributes, sourceFileStatus, targetFS,
|
||||
tmpTargetPath),
|
||||
getBlockSize(fileAttributes, sourceFileStatus, targetFS,
|
||||
tmpTargetPath),
|
||||
context, getChecksumOpt(fileAttributes, sourceChecksum)));
|
||||
return copyBytes(sourceFileStatus, outStream, BUFFER_SIZE, context);
|
||||
final OutputStream outStream;
|
||||
if (action == FileAction.OVERWRITE) {
|
||||
final short repl = getReplicationFactor(fileAttributes, sourceFileStatus,
|
||||
targetFS, targetPath);
|
||||
final long blockSize = getBlockSize(fileAttributes, sourceFileStatus,
|
||||
targetFS, targetPath);
|
||||
FSDataOutputStream out = targetFS.create(targetPath, permission,
|
||||
EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE),
|
||||
BUFFER_SIZE, repl, blockSize, context,
|
||||
getChecksumOpt(fileAttributes, sourceChecksum));
|
||||
outStream = new BufferedOutputStream(out);
|
||||
} else {
|
||||
outStream = new BufferedOutputStream(targetFS.append(targetPath,
|
||||
BUFFER_SIZE));
|
||||
}
|
||||
return copyBytes(sourceFileStatus, sourceOffset, outStream, BUFFER_SIZE,
|
||||
context);
|
||||
}
|
||||
|
||||
private void compareFileLengths(FileStatus sourceFileStatus, Path target,
|
||||
Configuration configuration, long bytesRead)
|
||||
Configuration configuration, long targetLen)
|
||||
throws IOException {
|
||||
final Path sourcePath = sourceFileStatus.getPath();
|
||||
FileSystem fs = sourcePath.getFileSystem(configuration);
|
||||
if (fs.getFileStatus(sourcePath).getLen() != bytesRead)
|
||||
if (fs.getFileStatus(sourcePath).getLen() != targetLen)
|
||||
throw new IOException("Mismatch in length of source:" + sourcePath
|
||||
+ " and target:" + target);
|
||||
}
|
||||
|
@ -215,8 +236,8 @@ public class RetriableFileCopyCommand extends RetriableCommand {
|
|||
}
|
||||
|
||||
@VisibleForTesting
|
||||
long copyBytes(FileStatus sourceFileStatus, OutputStream outStream,
|
||||
int bufferSize, Mapper.Context context)
|
||||
long copyBytes(FileStatus sourceFileStatus, long sourceOffset,
|
||||
OutputStream outStream, int bufferSize, Mapper.Context context)
|
||||
throws IOException {
|
||||
Path source = sourceFileStatus.getPath();
|
||||
byte buf[] = new byte[bufferSize];
|
||||
|
@ -225,19 +246,21 @@ public class RetriableFileCopyCommand extends RetriableCommand {
|
|||
|
||||
try {
|
||||
inStream = getInputStream(source, context.getConfiguration());
|
||||
int bytesRead = readBytes(inStream, buf);
|
||||
int bytesRead = readBytes(inStream, buf, sourceOffset);
|
||||
while (bytesRead >= 0) {
|
||||
totalBytesRead += bytesRead;
|
||||
if (action == FileAction.APPEND) {
|
||||
sourceOffset += bytesRead;
|
||||
}
|
||||
outStream.write(buf, 0, bytesRead);
|
||||
updateContextStatus(totalBytesRead, context, sourceFileStatus);
|
||||
bytesRead = inStream.read(buf);
|
||||
bytesRead = readBytes(inStream, buf, sourceOffset);
|
||||
}
|
||||
outStream.close();
|
||||
outStream = null;
|
||||
} finally {
|
||||
IOUtils.cleanup(LOG, outStream, inStream);
|
||||
}
|
||||
|
||||
return totalBytesRead;
|
||||
}
|
||||
|
||||
|
@ -254,24 +277,27 @@ public class RetriableFileCopyCommand extends RetriableCommand {
|
|||
context.setStatus(message.toString());
|
||||
}
|
||||
|
||||
private static int readBytes(InputStream inStream, byte buf[])
|
||||
throws IOException {
|
||||
private static int readBytes(ThrottledInputStream inStream, byte buf[],
|
||||
long position) throws IOException {
|
||||
try {
|
||||
return inStream.read(buf);
|
||||
}
|
||||
catch (IOException e) {
|
||||
if (position == 0) {
|
||||
return inStream.read(buf);
|
||||
} else {
|
||||
return inStream.read(position, buf, 0, buf.length);
|
||||
}
|
||||
} catch (IOException e) {
|
||||
throw new CopyReadException(e);
|
||||
}
|
||||
}
|
||||
|
||||
private static ThrottledInputStream getInputStream(Path path, Configuration conf)
|
||||
throws IOException {
|
||||
private static ThrottledInputStream getInputStream(Path path,
|
||||
Configuration conf) throws IOException {
|
||||
try {
|
||||
FileSystem fs = path.getFileSystem(conf);
|
||||
long bandwidthMB = conf.getInt(DistCpConstants.CONF_LABEL_BANDWIDTH_MB,
|
||||
DistCpConstants.DEFAULT_BANDWIDTH_MB);
|
||||
return new ThrottledInputStream(new BufferedInputStream(fs.open(path)),
|
||||
bandwidthMB * 1024 * 1024);
|
||||
FSDataInputStream in = fs.open(path);
|
||||
return new ThrottledInputStream(in, bandwidthMB * 1024 * 1024);
|
||||
}
|
||||
catch (IOException e) {
|
||||
throw new CopyReadException(e);
|
||||
|
|
|
@ -21,6 +21,11 @@ package org.apache.hadoop.tools.util;
|
|||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
|
||||
import org.apache.hadoop.fs.FSDataInputStream;
|
||||
import org.apache.hadoop.fs.PositionedReadable;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
|
||||
/**
|
||||
* The ThrottleInputStream provides bandwidth throttling on a specified
|
||||
* InputStream. It is implemented as a wrapper on top of another InputStream
|
||||
|
@ -90,6 +95,25 @@ public class ThrottledInputStream extends InputStream {
|
|||
return readLen;
|
||||
}
|
||||
|
||||
/**
|
||||
* Read bytes starting from the specified position. This requires rawStream is
|
||||
* an instance of {@link PositionedReadable}.
|
||||
*/
|
||||
public int read(long position, byte[] buffer, int offset, int length)
|
||||
throws IOException {
|
||||
if (!(rawStream instanceof PositionedReadable)) {
|
||||
throw new UnsupportedOperationException(
|
||||
"positioned read is not supported by the internal stream");
|
||||
}
|
||||
throttle();
|
||||
int readLen = ((PositionedReadable) rawStream).read(position, buffer,
|
||||
offset, length);
|
||||
if (readLen != -1) {
|
||||
bytesRead += readLen;
|
||||
}
|
||||
return readLen;
|
||||
}
|
||||
|
||||
private void throttle() throws IOException {
|
||||
if (getBytesPerSec() > maxBytesPerSec) {
|
||||
try {
|
||||
|
|
|
@ -18,9 +18,12 @@
|
|||
|
||||
package org.apache.hadoop.tools;
|
||||
|
||||
import static org.junit.Assert.fail;
|
||||
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.test.GenericTestUtils;
|
||||
import org.apache.hadoop.tools.DistCpOptions.*;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
|
||||
|
@ -554,4 +557,45 @@ public class TestOptionsParser {
|
|||
Assert.assertEquals(conf.get(DistCpOptionSwitch.PRESERVE_STATUS.getConfigLabel()), "U");
|
||||
Assert.assertEquals(conf.getInt(DistCpOptionSwitch.BANDWIDTH.getConfigLabel(), -1), 11);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAppendOption() {
|
||||
Configuration conf = new Configuration();
|
||||
Assert.assertFalse(conf.getBoolean(
|
||||
DistCpOptionSwitch.APPEND.getConfigLabel(), false));
|
||||
Assert.assertFalse(conf.getBoolean(
|
||||
DistCpOptionSwitch.SYNC_FOLDERS.getConfigLabel(), false));
|
||||
|
||||
DistCpOptions options = OptionsParser.parse(new String[] { "-update",
|
||||
"-append", "hdfs://localhost:8020/source/first",
|
||||
"hdfs://localhost:8020/target/" });
|
||||
options.appendToConf(conf);
|
||||
Assert.assertTrue(conf.getBoolean(
|
||||
DistCpOptionSwitch.APPEND.getConfigLabel(), false));
|
||||
Assert.assertTrue(conf.getBoolean(
|
||||
DistCpOptionSwitch.SYNC_FOLDERS.getConfigLabel(), false));
|
||||
|
||||
// make sure -append is only valid when -update is specified
|
||||
try {
|
||||
options = OptionsParser.parse(new String[] { "-append",
|
||||
"hdfs://localhost:8020/source/first",
|
||||
"hdfs://localhost:8020/target/" });
|
||||
fail("Append should fail if update option is not specified");
|
||||
} catch (IllegalArgumentException e) {
|
||||
GenericTestUtils.assertExceptionContains(
|
||||
"Append is valid only with update options", e);
|
||||
}
|
||||
|
||||
// make sure -append is invalid when skipCrc is specified
|
||||
try {
|
||||
options = OptionsParser.parse(new String[] {
|
||||
"-append", "-update", "-skipcrccheck",
|
||||
"hdfs://localhost:8020/source/first",
|
||||
"hdfs://localhost:8020/target/" });
|
||||
fail("Append should fail if skipCrc option is specified");
|
||||
} catch (IllegalArgumentException e) {
|
||||
GenericTestUtils.assertExceptionContains(
|
||||
"Append is disallowed when skipping CRC", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -25,11 +25,13 @@ import java.security.PrivilegedAction;
|
|||
import java.util.ArrayList;
|
||||
import java.util.EnumSet;
|
||||
import java.util.List;
|
||||
import java.util.Random;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.CreateFlag;
|
||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Options.ChecksumOpt;
|
||||
|
@ -118,6 +120,16 @@ public class TestCopyMapper {
|
|||
touchFile(SOURCE_PATH + "/7/8/9");
|
||||
}
|
||||
|
||||
private static void appendSourceData() throws Exception {
|
||||
FileSystem fs = cluster.getFileSystem();
|
||||
for (Path source : pathList) {
|
||||
if (fs.getFileStatus(source).isFile()) {
|
||||
// append 2048 bytes per file
|
||||
appendFile(source, DEFAULT_FILE_SIZE * 2);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private static void createSourceDataWithDifferentBlockSize() throws Exception {
|
||||
mkdirs(SOURCE_PATH + "/1");
|
||||
mkdirs(SOURCE_PATH + "/2");
|
||||
|
@ -201,85 +213,132 @@ public class TestCopyMapper {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Append specified length of bytes to a given file
|
||||
*/
|
||||
private static void appendFile(Path p, int length) throws IOException {
|
||||
byte[] toAppend = new byte[length];
|
||||
Random random = new Random();
|
||||
random.nextBytes(toAppend);
|
||||
FSDataOutputStream out = cluster.getFileSystem().append(p);
|
||||
try {
|
||||
out.write(toAppend);
|
||||
} finally {
|
||||
IOUtils.closeStream(out);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCopyWithDifferentChecksumType() throws Exception {
|
||||
testCopy(true);
|
||||
}
|
||||
|
||||
@Test(timeout=40000)
|
||||
public void testRun() {
|
||||
public void testRun() throws Exception {
|
||||
testCopy(false);
|
||||
}
|
||||
|
||||
private void testCopy(boolean preserveChecksum) {
|
||||
try {
|
||||
deleteState();
|
||||
if (preserveChecksum) {
|
||||
createSourceDataWithDifferentChecksumType();
|
||||
} else {
|
||||
createSourceData();
|
||||
}
|
||||
@Test
|
||||
public void testCopyWithAppend() throws Exception {
|
||||
final FileSystem fs = cluster.getFileSystem();
|
||||
// do the first distcp
|
||||
testCopy(false);
|
||||
// start appending data to source
|
||||
appendSourceData();
|
||||
|
||||
FileSystem fs = cluster.getFileSystem();
|
||||
CopyMapper copyMapper = new CopyMapper();
|
||||
StubContext stubContext = new StubContext(getConfiguration(), null, 0);
|
||||
Mapper<Text, CopyListingFileStatus, Text, Text>.Context context
|
||||
= stubContext.getContext();
|
||||
|
||||
Configuration configuration = context.getConfiguration();
|
||||
EnumSet<DistCpOptions.FileAttribute> fileAttributes
|
||||
= EnumSet.of(DistCpOptions.FileAttribute.REPLICATION);
|
||||
if (preserveChecksum) {
|
||||
fileAttributes.add(DistCpOptions.FileAttribute.CHECKSUMTYPE);
|
||||
}
|
||||
configuration.set(DistCpOptionSwitch.PRESERVE_STATUS.getConfigLabel(),
|
||||
DistCpUtils.packAttributes(fileAttributes));
|
||||
|
||||
copyMapper.setup(context);
|
||||
|
||||
for (Path path: pathList) {
|
||||
copyMapper.map(new Text(DistCpUtils.getRelativePath(new Path(SOURCE_PATH), path)),
|
||||
new CopyListingFileStatus(fs.getFileStatus(path)), context);
|
||||
}
|
||||
|
||||
// Check that the maps worked.
|
||||
for (Path path : pathList) {
|
||||
final Path targetPath = new Path(path.toString()
|
||||
.replaceAll(SOURCE_PATH, TARGET_PATH));
|
||||
Assert.assertTrue(fs.exists(targetPath));
|
||||
Assert.assertTrue(fs.isFile(targetPath) == fs.isFile(path));
|
||||
FileStatus sourceStatus = fs.getFileStatus(path);
|
||||
FileStatus targetStatus = fs.getFileStatus(targetPath);
|
||||
Assert.assertEquals(sourceStatus.getReplication(),
|
||||
targetStatus.getReplication());
|
||||
if (preserveChecksum) {
|
||||
Assert.assertEquals(sourceStatus.getBlockSize(),
|
||||
targetStatus.getBlockSize());
|
||||
}
|
||||
Assert.assertTrue(!fs.isFile(targetPath)
|
||||
|| fs.getFileChecksum(targetPath).equals(fs.getFileChecksum(path)));
|
||||
}
|
||||
|
||||
Assert.assertEquals(pathList.size(),
|
||||
stubContext.getReporter().getCounter(CopyMapper.Counter.COPY).getValue());
|
||||
if (!preserveChecksum) {
|
||||
Assert.assertEquals(nFiles * DEFAULT_FILE_SIZE, stubContext
|
||||
.getReporter().getCounter(CopyMapper.Counter.BYTESCOPIED)
|
||||
.getValue());
|
||||
} else {
|
||||
Assert.assertEquals(nFiles * NON_DEFAULT_BLOCK_SIZE * 2, stubContext
|
||||
.getReporter().getCounter(CopyMapper.Counter.BYTESCOPIED)
|
||||
.getValue());
|
||||
}
|
||||
|
||||
testCopyingExistingFiles(fs, copyMapper, context);
|
||||
for (Text value : stubContext.getWriter().values()) {
|
||||
Assert.assertTrue(value.toString() + " is not skipped", value.toString().startsWith("SKIP:"));
|
||||
}
|
||||
// do the distcp again with -update and -append option
|
||||
CopyMapper copyMapper = new CopyMapper();
|
||||
StubContext stubContext = new StubContext(getConfiguration(), null, 0);
|
||||
Mapper<Text, CopyListingFileStatus, Text, Text>.Context context =
|
||||
stubContext.getContext();
|
||||
// Enable append
|
||||
context.getConfiguration().setBoolean(
|
||||
DistCpOptionSwitch.APPEND.getConfigLabel(), true);
|
||||
copyMapper.setup(context);
|
||||
for (Path path: pathList) {
|
||||
copyMapper.map(new Text(DistCpUtils.getRelativePath(new Path(SOURCE_PATH), path)),
|
||||
new CopyListingFileStatus(cluster.getFileSystem().getFileStatus(
|
||||
path)), context);
|
||||
}
|
||||
catch (Exception e) {
|
||||
LOG.error("Unexpected exception: ", e);
|
||||
Assert.assertTrue(false);
|
||||
|
||||
verifyCopy(fs, false);
|
||||
// verify that we only copied new appended data
|
||||
Assert.assertEquals(nFiles * DEFAULT_FILE_SIZE * 2, stubContext
|
||||
.getReporter().getCounter(CopyMapper.Counter.BYTESCOPIED)
|
||||
.getValue());
|
||||
Assert.assertEquals(pathList.size(), stubContext.getReporter().
|
||||
getCounter(CopyMapper.Counter.COPY).getValue());
|
||||
}
|
||||
|
||||
private void testCopy(boolean preserveChecksum) throws Exception {
|
||||
deleteState();
|
||||
if (preserveChecksum) {
|
||||
createSourceDataWithDifferentChecksumType();
|
||||
} else {
|
||||
createSourceData();
|
||||
}
|
||||
|
||||
FileSystem fs = cluster.getFileSystem();
|
||||
CopyMapper copyMapper = new CopyMapper();
|
||||
StubContext stubContext = new StubContext(getConfiguration(), null, 0);
|
||||
Mapper<Text, CopyListingFileStatus, Text, Text>.Context context
|
||||
= stubContext.getContext();
|
||||
|
||||
Configuration configuration = context.getConfiguration();
|
||||
EnumSet<DistCpOptions.FileAttribute> fileAttributes
|
||||
= EnumSet.of(DistCpOptions.FileAttribute.REPLICATION);
|
||||
if (preserveChecksum) {
|
||||
fileAttributes.add(DistCpOptions.FileAttribute.CHECKSUMTYPE);
|
||||
}
|
||||
configuration.set(DistCpOptionSwitch.PRESERVE_STATUS.getConfigLabel(),
|
||||
DistCpUtils.packAttributes(fileAttributes));
|
||||
|
||||
copyMapper.setup(context);
|
||||
|
||||
for (Path path: pathList) {
|
||||
copyMapper.map(
|
||||
new Text(DistCpUtils.getRelativePath(new Path(SOURCE_PATH), path)),
|
||||
new CopyListingFileStatus(fs.getFileStatus(path)), context);
|
||||
}
|
||||
|
||||
// Check that the maps worked.
|
||||
verifyCopy(fs, preserveChecksum);
|
||||
Assert.assertEquals(pathList.size(), stubContext.getReporter()
|
||||
.getCounter(CopyMapper.Counter.COPY).getValue());
|
||||
if (!preserveChecksum) {
|
||||
Assert.assertEquals(nFiles * DEFAULT_FILE_SIZE, stubContext
|
||||
.getReporter().getCounter(CopyMapper.Counter.BYTESCOPIED)
|
||||
.getValue());
|
||||
} else {
|
||||
Assert.assertEquals(nFiles * NON_DEFAULT_BLOCK_SIZE * 2, stubContext
|
||||
.getReporter().getCounter(CopyMapper.Counter.BYTESCOPIED)
|
||||
.getValue());
|
||||
}
|
||||
|
||||
testCopyingExistingFiles(fs, copyMapper, context);
|
||||
for (Text value : stubContext.getWriter().values()) {
|
||||
Assert.assertTrue(value.toString() + " is not skipped", value
|
||||
.toString().startsWith("SKIP:"));
|
||||
}
|
||||
}
|
||||
|
||||
private void verifyCopy(FileSystem fs, boolean preserveChecksum)
|
||||
throws Exception {
|
||||
for (Path path : pathList) {
|
||||
final Path targetPath = new Path(path.toString().replaceAll(SOURCE_PATH,
|
||||
TARGET_PATH));
|
||||
Assert.assertTrue(fs.exists(targetPath));
|
||||
Assert.assertTrue(fs.isFile(targetPath) == fs.isFile(path));
|
||||
FileStatus sourceStatus = fs.getFileStatus(path);
|
||||
FileStatus targetStatus = fs.getFileStatus(targetPath);
|
||||
Assert.assertEquals(sourceStatus.getReplication(),
|
||||
targetStatus.getReplication());
|
||||
if (preserveChecksum) {
|
||||
Assert.assertEquals(sourceStatus.getBlockSize(),
|
||||
targetStatus.getBlockSize());
|
||||
}
|
||||
Assert.assertTrue(!fs.isFile(targetPath)
|
||||
|| fs.getFileChecksum(targetPath).equals(fs.getFileChecksum(path)));
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -22,6 +22,7 @@ import org.apache.hadoop.conf.Configuration;
|
|||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.mapreduce.*;
|
||||
import org.apache.hadoop.tools.mapred.CopyMapper.FileAction;
|
||||
import org.junit.Test;
|
||||
import static org.junit.Assert.*;
|
||||
import static org.mockito.Mockito.*;
|
||||
|
@ -48,8 +49,8 @@ public class TestRetriableFileCopyCommand {
|
|||
|
||||
Exception actualEx = null;
|
||||
try {
|
||||
new RetriableFileCopyCommand("testFailOnCloseError")
|
||||
.copyBytes(stat, out, 512, context);
|
||||
new RetriableFileCopyCommand("testFailOnCloseError", FileAction.OVERWRITE)
|
||||
.copyBytes(stat, 0, out, 512, context);
|
||||
} catch (Exception e) {
|
||||
actualEx = e;
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue