HADOOP-13626. Remove distcp dependency on FileStatus serialization
(cherry picked from commit a1a0281e12
)
This commit is contained in:
parent
9f20532ef5
commit
72aa0a7ab3
|
@ -28,11 +28,15 @@ import java.util.Map.Entry;
|
||||||
|
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.fs.FileStatus;
|
import org.apache.hadoop.fs.FileStatus;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.fs.permission.AclEntry;
|
import org.apache.hadoop.fs.permission.AclEntry;
|
||||||
import org.apache.hadoop.fs.permission.AclEntryType;
|
|
||||||
import org.apache.hadoop.fs.permission.AclEntryScope;
|
import org.apache.hadoop.fs.permission.AclEntryScope;
|
||||||
|
import org.apache.hadoop.fs.permission.AclEntryType;
|
||||||
import org.apache.hadoop.fs.permission.AclUtil;
|
import org.apache.hadoop.fs.permission.AclUtil;
|
||||||
import org.apache.hadoop.fs.permission.FsAction;
|
import org.apache.hadoop.fs.permission.FsAction;
|
||||||
|
import org.apache.hadoop.fs.permission.FsPermission;
|
||||||
|
import org.apache.hadoop.io.Text;
|
||||||
|
import org.apache.hadoop.io.Writable;
|
||||||
import org.apache.hadoop.io.WritableUtils;
|
import org.apache.hadoop.io.WritableUtils;
|
||||||
|
|
||||||
import com.google.common.base.Objects;
|
import com.google.common.base.Objects;
|
||||||
|
@ -40,17 +44,27 @@ import com.google.common.collect.Lists;
|
||||||
import com.google.common.collect.Maps;
|
import com.google.common.collect.Maps;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* CopyListingFileStatus is a specialized subclass of {@link FileStatus} for
|
* CopyListingFileStatus is a view of {@link FileStatus}, recording additional
|
||||||
* attaching additional data members useful to distcp. This class does not
|
* data members useful to distcp.
|
||||||
* override {@link FileStatus#compareTo}, because the additional data members
|
|
||||||
* are not relevant to sort order.
|
|
||||||
*/
|
*/
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
public final class CopyListingFileStatus extends FileStatus {
|
public final class CopyListingFileStatus implements Writable {
|
||||||
|
|
||||||
private static final byte NO_ACL_ENTRIES = -1;
|
private static final byte NO_ACL_ENTRIES = -1;
|
||||||
private static final int NO_XATTRS = -1;
|
private static final int NO_XATTRS = -1;
|
||||||
|
|
||||||
|
// FileStatus fields
|
||||||
|
private Path path;
|
||||||
|
private long length;
|
||||||
|
private boolean isdir;
|
||||||
|
private short blockReplication;
|
||||||
|
private long blocksize;
|
||||||
|
private long modificationTime;
|
||||||
|
private long accessTime;
|
||||||
|
private FsPermission permission;
|
||||||
|
private String owner;
|
||||||
|
private String group;
|
||||||
|
|
||||||
// Retain static arrays of enum values to prevent repeated allocation of new
|
// Retain static arrays of enum values to prevent repeated allocation of new
|
||||||
// arrays during deserialization.
|
// arrays during deserialization.
|
||||||
private static final AclEntryType[] ACL_ENTRY_TYPES = AclEntryType.values();
|
private static final AclEntryType[] ACL_ENTRY_TYPES = AclEntryType.values();
|
||||||
|
@ -64,6 +78,7 @@ public final class CopyListingFileStatus extends FileStatus {
|
||||||
* Default constructor.
|
* Default constructor.
|
||||||
*/
|
*/
|
||||||
public CopyListingFileStatus() {
|
public CopyListingFileStatus() {
|
||||||
|
this(0, false, 0, 0, 0, 0, null, null, null, null);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -72,8 +87,76 @@ public final class CopyListingFileStatus extends FileStatus {
|
||||||
*
|
*
|
||||||
* @param fileStatus FileStatus to copy
|
* @param fileStatus FileStatus to copy
|
||||||
*/
|
*/
|
||||||
public CopyListingFileStatus(FileStatus fileStatus) throws IOException {
|
public CopyListingFileStatus(FileStatus fileStatus) {
|
||||||
super(fileStatus);
|
this(fileStatus.getLen(), fileStatus.isDirectory(),
|
||||||
|
fileStatus.getReplication(), fileStatus.getBlockSize(),
|
||||||
|
fileStatus.getModificationTime(), fileStatus.getAccessTime(),
|
||||||
|
fileStatus.getPermission(), fileStatus.getOwner(),
|
||||||
|
fileStatus.getGroup(),
|
||||||
|
fileStatus.getPath());
|
||||||
|
}
|
||||||
|
|
||||||
|
@SuppressWarnings("checkstyle:parameternumber")
|
||||||
|
public CopyListingFileStatus(long length, boolean isdir,
|
||||||
|
int blockReplication, long blocksize, long modificationTime,
|
||||||
|
long accessTime, FsPermission permission, String owner, String group,
|
||||||
|
Path path) {
|
||||||
|
this.length = length;
|
||||||
|
this.isdir = isdir;
|
||||||
|
this.blockReplication = (short)blockReplication;
|
||||||
|
this.blocksize = blocksize;
|
||||||
|
this.modificationTime = modificationTime;
|
||||||
|
this.accessTime = accessTime;
|
||||||
|
if (permission != null) {
|
||||||
|
this.permission = permission;
|
||||||
|
} else {
|
||||||
|
this.permission = isdir
|
||||||
|
? FsPermission.getDirDefault()
|
||||||
|
: FsPermission.getFileDefault();
|
||||||
|
}
|
||||||
|
this.owner = (owner == null) ? "" : owner;
|
||||||
|
this.group = (group == null) ? "" : group;
|
||||||
|
this.path = path;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Path getPath() {
|
||||||
|
return path;
|
||||||
|
}
|
||||||
|
|
||||||
|
public long getLen() {
|
||||||
|
return length;
|
||||||
|
}
|
||||||
|
|
||||||
|
public long getBlockSize() {
|
||||||
|
return blocksize;
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean isDirectory() {
|
||||||
|
return isdir;
|
||||||
|
}
|
||||||
|
|
||||||
|
public short getReplication() {
|
||||||
|
return blockReplication;
|
||||||
|
}
|
||||||
|
|
||||||
|
public long getModificationTime() {
|
||||||
|
return modificationTime;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getOwner() {
|
||||||
|
return owner;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getGroup() {
|
||||||
|
return group;
|
||||||
|
}
|
||||||
|
|
||||||
|
public long getAccessTime() {
|
||||||
|
return accessTime;
|
||||||
|
}
|
||||||
|
|
||||||
|
public FsPermission getPermission() {
|
||||||
|
return permission;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -115,7 +198,16 @@ public final class CopyListingFileStatus extends FileStatus {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void write(DataOutput out) throws IOException {
|
public void write(DataOutput out) throws IOException {
|
||||||
super.write(out);
|
Text.writeString(out, getPath().toString(), Text.DEFAULT_MAX_LEN);
|
||||||
|
out.writeLong(getLen());
|
||||||
|
out.writeBoolean(isDirectory());
|
||||||
|
out.writeShort(getReplication());
|
||||||
|
out.writeLong(getBlockSize());
|
||||||
|
out.writeLong(getModificationTime());
|
||||||
|
out.writeLong(getAccessTime());
|
||||||
|
getPermission().write(out);
|
||||||
|
Text.writeString(out, getOwner(), Text.DEFAULT_MAX_LEN);
|
||||||
|
Text.writeString(out, getGroup(), Text.DEFAULT_MAX_LEN);
|
||||||
if (aclEntries != null) {
|
if (aclEntries != null) {
|
||||||
// byte is sufficient, because 32 ACL entries is the max enforced by HDFS.
|
// byte is sufficient, because 32 ACL entries is the max enforced by HDFS.
|
||||||
out.writeByte(aclEntries.size());
|
out.writeByte(aclEntries.size());
|
||||||
|
@ -152,7 +244,17 @@ public final class CopyListingFileStatus extends FileStatus {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void readFields(DataInput in) throws IOException {
|
public void readFields(DataInput in) throws IOException {
|
||||||
super.readFields(in);
|
String strPath = Text.readString(in, Text.DEFAULT_MAX_LEN);
|
||||||
|
this.path = new Path(strPath);
|
||||||
|
this.length = in.readLong();
|
||||||
|
this.isdir = in.readBoolean();
|
||||||
|
this.blockReplication = in.readShort();
|
||||||
|
blocksize = in.readLong();
|
||||||
|
modificationTime = in.readLong();
|
||||||
|
accessTime = in.readLong();
|
||||||
|
permission.readFields(in);
|
||||||
|
owner = Text.readString(in, Text.DEFAULT_MAX_LEN);
|
||||||
|
group = Text.readString(in, Text.DEFAULT_MAX_LEN);
|
||||||
byte aclEntriesSize = in.readByte();
|
byte aclEntriesSize = in.readByte();
|
||||||
if (aclEntriesSize != NO_ACL_ENTRIES) {
|
if (aclEntriesSize != NO_ACL_ENTRIES) {
|
||||||
aclEntries = Lists.newArrayListWithCapacity(aclEntriesSize);
|
aclEntries = Lists.newArrayListWithCapacity(aclEntriesSize);
|
||||||
|
@ -190,15 +292,16 @@ public final class CopyListingFileStatus extends FileStatus {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean equals(Object o) {
|
public boolean equals(Object o) {
|
||||||
if (!super.equals(o)) {
|
if (null == o) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
if (getClass() != o.getClass()) {
|
if (getClass() != o.getClass()) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
CopyListingFileStatus other = (CopyListingFileStatus)o;
|
CopyListingFileStatus other = (CopyListingFileStatus)o;
|
||||||
return Objects.equal(aclEntries, other.aclEntries) &&
|
return getPath().equals(other.getPath())
|
||||||
Objects.equal(xAttrs, other.xAttrs);
|
&& Objects.equal(aclEntries, other.aclEntries)
|
||||||
|
&& Objects.equal(xAttrs, other.xAttrs);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -264,8 +264,18 @@ public class CopyMapper extends Mapper<Text, CopyListingFileStatus, Text, Text>
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private String getFileType(CopyListingFileStatus fileStatus) {
|
||||||
|
if (null == fileStatus) {
|
||||||
|
return "N/A";
|
||||||
|
}
|
||||||
|
return fileStatus.isDirectory() ? "dir" : "file";
|
||||||
|
}
|
||||||
|
|
||||||
private String getFileType(FileStatus fileStatus) {
|
private String getFileType(FileStatus fileStatus) {
|
||||||
return fileStatus == null ? "N/A" : (fileStatus.isDirectory() ? "dir" : "file");
|
if (null == fileStatus) {
|
||||||
|
return "N/A";
|
||||||
|
}
|
||||||
|
return fileStatus.isDirectory() ? "dir" : "file";
|
||||||
}
|
}
|
||||||
|
|
||||||
private static EnumSet<DistCpOptions.FileAttribute>
|
private static EnumSet<DistCpOptions.FileAttribute>
|
||||||
|
@ -276,7 +286,7 @@ public class CopyMapper extends Mapper<Text, CopyListingFileStatus, Text, Text>
|
||||||
}
|
}
|
||||||
|
|
||||||
private void copyFileWithRetry(String description,
|
private void copyFileWithRetry(String description,
|
||||||
FileStatus sourceFileStatus, Path target, Context context,
|
CopyListingFileStatus sourceFileStatus, Path target, Context context,
|
||||||
FileAction action, EnumSet<DistCpOptions.FileAttribute> fileAttributes)
|
FileAction action, EnumSet<DistCpOptions.FileAttribute> fileAttributes)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
long bytesCopied;
|
long bytesCopied;
|
||||||
|
@ -304,15 +314,15 @@ public class CopyMapper extends Mapper<Text, CopyListingFileStatus, Text, Text>
|
||||||
}
|
}
|
||||||
|
|
||||||
private static void updateSkipCounters(Context context,
|
private static void updateSkipCounters(Context context,
|
||||||
FileStatus sourceFile) {
|
CopyListingFileStatus sourceFile) {
|
||||||
incrementCounter(context, Counter.SKIP, 1);
|
incrementCounter(context, Counter.SKIP, 1);
|
||||||
incrementCounter(context, Counter.BYTESSKIPPED, sourceFile.getLen());
|
incrementCounter(context, Counter.BYTESSKIPPED, sourceFile.getLen());
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private void handleFailures(IOException exception,
|
private void handleFailures(IOException exception,
|
||||||
FileStatus sourceFileStatus, Path target,
|
CopyListingFileStatus sourceFileStatus, Path target, Context context)
|
||||||
Context context) throws IOException, InterruptedException {
|
throws IOException, InterruptedException {
|
||||||
LOG.error("Failure in copying " + sourceFileStatus.getPath() + " to " +
|
LOG.error("Failure in copying " + sourceFileStatus.getPath() + " to " +
|
||||||
target, exception);
|
target, exception);
|
||||||
|
|
||||||
|
@ -332,8 +342,9 @@ public class CopyMapper extends Mapper<Text, CopyListingFileStatus, Text, Text>
|
||||||
context.getCounter(counter).increment(value);
|
context.getCounter(counter).increment(value);
|
||||||
}
|
}
|
||||||
|
|
||||||
private FileAction checkUpdate(FileSystem sourceFS, FileStatus source,
|
private FileAction checkUpdate(FileSystem sourceFS,
|
||||||
Path target, FileStatus targetFileStatus) throws IOException {
|
CopyListingFileStatus source, Path target, FileStatus targetFileStatus)
|
||||||
|
throws IOException {
|
||||||
if (targetFileStatus != null && !overWrite) {
|
if (targetFileStatus != null && !overWrite) {
|
||||||
if (canSkip(sourceFS, source, targetFileStatus)) {
|
if (canSkip(sourceFS, source, targetFileStatus)) {
|
||||||
return FileAction.SKIP;
|
return FileAction.SKIP;
|
||||||
|
@ -354,7 +365,7 @@ public class CopyMapper extends Mapper<Text, CopyListingFileStatus, Text, Text>
|
||||||
return FileAction.OVERWRITE;
|
return FileAction.OVERWRITE;
|
||||||
}
|
}
|
||||||
|
|
||||||
private boolean canSkip(FileSystem sourceFS, FileStatus source,
|
private boolean canSkip(FileSystem sourceFS, CopyListingFileStatus source,
|
||||||
FileStatus target) throws IOException {
|
FileStatus target) throws IOException {
|
||||||
if (!syncFolders) {
|
if (!syncFolders) {
|
||||||
return true;
|
return true;
|
||||||
|
|
|
@ -30,13 +30,13 @@ import org.apache.hadoop.fs.CreateFlag;
|
||||||
import org.apache.hadoop.fs.FSDataInputStream;
|
import org.apache.hadoop.fs.FSDataInputStream;
|
||||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||||
import org.apache.hadoop.fs.FileChecksum;
|
import org.apache.hadoop.fs.FileChecksum;
|
||||||
import org.apache.hadoop.fs.FileStatus;
|
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.Options.ChecksumOpt;
|
import org.apache.hadoop.fs.Options.ChecksumOpt;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.fs.permission.FsPermission;
|
import org.apache.hadoop.fs.permission.FsPermission;
|
||||||
import org.apache.hadoop.io.IOUtils;
|
import org.apache.hadoop.io.IOUtils;
|
||||||
import org.apache.hadoop.mapreduce.Mapper;
|
import org.apache.hadoop.mapreduce.Mapper;
|
||||||
|
import org.apache.hadoop.tools.CopyListingFileStatus;
|
||||||
import org.apache.hadoop.tools.DistCpConstants;
|
import org.apache.hadoop.tools.DistCpConstants;
|
||||||
import org.apache.hadoop.tools.DistCpOptions.FileAttribute;
|
import org.apache.hadoop.tools.DistCpOptions.FileAttribute;
|
||||||
import org.apache.hadoop.tools.mapred.CopyMapper.FileAction;
|
import org.apache.hadoop.tools.mapred.CopyMapper.FileAction;
|
||||||
|
@ -90,7 +90,7 @@ public class RetriableFileCopyCommand extends RetriableCommand {
|
||||||
@Override
|
@Override
|
||||||
protected Object doExecute(Object... arguments) throws Exception {
|
protected Object doExecute(Object... arguments) throws Exception {
|
||||||
assert arguments.length == 4 : "Unexpected argument list.";
|
assert arguments.length == 4 : "Unexpected argument list.";
|
||||||
FileStatus source = (FileStatus)arguments[0];
|
CopyListingFileStatus source = (CopyListingFileStatus)arguments[0];
|
||||||
assert !source.isDirectory() : "Unexpected file-status. Expected file.";
|
assert !source.isDirectory() : "Unexpected file-status. Expected file.";
|
||||||
Path target = (Path)arguments[1];
|
Path target = (Path)arguments[1];
|
||||||
Mapper.Context context = (Mapper.Context)arguments[2];
|
Mapper.Context context = (Mapper.Context)arguments[2];
|
||||||
|
@ -99,7 +99,7 @@ public class RetriableFileCopyCommand extends RetriableCommand {
|
||||||
return doCopy(source, target, context, fileAttributes);
|
return doCopy(source, target, context, fileAttributes);
|
||||||
}
|
}
|
||||||
|
|
||||||
private long doCopy(FileStatus sourceFileStatus, Path target,
|
private long doCopy(CopyListingFileStatus source, Path target,
|
||||||
Mapper.Context context, EnumSet<FileAttribute> fileAttributes)
|
Mapper.Context context, EnumSet<FileAttribute> fileAttributes)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
final boolean toAppend = action == FileAction.APPEND;
|
final boolean toAppend = action == FileAction.APPEND;
|
||||||
|
@ -109,10 +109,10 @@ public class RetriableFileCopyCommand extends RetriableCommand {
|
||||||
|
|
||||||
try {
|
try {
|
||||||
if (LOG.isDebugEnabled()) {
|
if (LOG.isDebugEnabled()) {
|
||||||
LOG.debug("Copying " + sourceFileStatus.getPath() + " to " + target);
|
LOG.debug("Copying " + source.getPath() + " to " + target);
|
||||||
LOG.debug("Target file path: " + targetPath);
|
LOG.debug("Target file path: " + targetPath);
|
||||||
}
|
}
|
||||||
final Path sourcePath = sourceFileStatus.getPath();
|
final Path sourcePath = source.getPath();
|
||||||
final FileSystem sourceFS = sourcePath.getFileSystem(configuration);
|
final FileSystem sourceFS = sourcePath.getFileSystem(configuration);
|
||||||
final FileChecksum sourceChecksum = fileAttributes
|
final FileChecksum sourceChecksum = fileAttributes
|
||||||
.contains(FileAttribute.CHECKSUMTYPE) ? sourceFS
|
.contains(FileAttribute.CHECKSUMTYPE) ? sourceFS
|
||||||
|
@ -120,14 +120,14 @@ public class RetriableFileCopyCommand extends RetriableCommand {
|
||||||
|
|
||||||
final long offset = action == FileAction.APPEND ? targetFS.getFileStatus(
|
final long offset = action == FileAction.APPEND ? targetFS.getFileStatus(
|
||||||
target).getLen() : 0;
|
target).getLen() : 0;
|
||||||
long bytesRead = copyToFile(targetPath, targetFS, sourceFileStatus,
|
long bytesRead = copyToFile(targetPath, targetFS, source,
|
||||||
offset, context, fileAttributes, sourceChecksum);
|
offset, context, fileAttributes, sourceChecksum);
|
||||||
|
|
||||||
compareFileLengths(sourceFileStatus, targetPath, configuration, bytesRead
|
compareFileLengths(source, targetPath, configuration, bytesRead
|
||||||
+ offset);
|
+ offset);
|
||||||
//At this point, src&dest lengths are same. if length==0, we skip checksum
|
//At this point, src&dest lengths are same. if length==0, we skip checksum
|
||||||
if ((bytesRead != 0) && (!skipCrc)) {
|
if ((bytesRead != 0) && (!skipCrc)) {
|
||||||
compareCheckSums(sourceFS, sourceFileStatus.getPath(), sourceChecksum,
|
compareCheckSums(sourceFS, source.getPath(), sourceChecksum,
|
||||||
targetFS, targetPath);
|
targetFS, targetPath);
|
||||||
}
|
}
|
||||||
// it's not append case, thus we first write to a temporary file, rename
|
// it's not append case, thus we first write to a temporary file, rename
|
||||||
|
@ -160,16 +160,16 @@ public class RetriableFileCopyCommand extends RetriableCommand {
|
||||||
}
|
}
|
||||||
|
|
||||||
private long copyToFile(Path targetPath, FileSystem targetFS,
|
private long copyToFile(Path targetPath, FileSystem targetFS,
|
||||||
FileStatus sourceFileStatus, long sourceOffset, Mapper.Context context,
|
CopyListingFileStatus source, long sourceOffset, Mapper.Context context,
|
||||||
EnumSet<FileAttribute> fileAttributes, final FileChecksum sourceChecksum)
|
EnumSet<FileAttribute> fileAttributes, final FileChecksum sourceChecksum)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
FsPermission permission = FsPermission.getFileDefault().applyUMask(
|
FsPermission permission = FsPermission.getFileDefault().applyUMask(
|
||||||
FsPermission.getUMask(targetFS.getConf()));
|
FsPermission.getUMask(targetFS.getConf()));
|
||||||
final OutputStream outStream;
|
final OutputStream outStream;
|
||||||
if (action == FileAction.OVERWRITE) {
|
if (action == FileAction.OVERWRITE) {
|
||||||
final short repl = getReplicationFactor(fileAttributes, sourceFileStatus,
|
final short repl = getReplicationFactor(fileAttributes, source,
|
||||||
targetFS, targetPath);
|
targetFS, targetPath);
|
||||||
final long blockSize = getBlockSize(fileAttributes, sourceFileStatus,
|
final long blockSize = getBlockSize(fileAttributes, source,
|
||||||
targetFS, targetPath);
|
targetFS, targetPath);
|
||||||
FSDataOutputStream out = targetFS.create(targetPath, permission,
|
FSDataOutputStream out = targetFS.create(targetPath, permission,
|
||||||
EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE),
|
EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE),
|
||||||
|
@ -180,14 +180,14 @@ public class RetriableFileCopyCommand extends RetriableCommand {
|
||||||
outStream = new BufferedOutputStream(targetFS.append(targetPath,
|
outStream = new BufferedOutputStream(targetFS.append(targetPath,
|
||||||
BUFFER_SIZE));
|
BUFFER_SIZE));
|
||||||
}
|
}
|
||||||
return copyBytes(sourceFileStatus, sourceOffset, outStream, BUFFER_SIZE,
|
return copyBytes(source, sourceOffset, outStream, BUFFER_SIZE,
|
||||||
context);
|
context);
|
||||||
}
|
}
|
||||||
|
|
||||||
private void compareFileLengths(FileStatus sourceFileStatus, Path target,
|
private void compareFileLengths(CopyListingFileStatus source, Path target,
|
||||||
Configuration configuration, long targetLen)
|
Configuration configuration, long targetLen)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
final Path sourcePath = sourceFileStatus.getPath();
|
final Path sourcePath = source.getPath();
|
||||||
FileSystem fs = sourcePath.getFileSystem(configuration);
|
FileSystem fs = sourcePath.getFileSystem(configuration);
|
||||||
long srcLen = fs.getFileStatus(sourcePath).getLen();
|
long srcLen = fs.getFileStatus(sourcePath).getLen();
|
||||||
if (srcLen != targetLen)
|
if (srcLen != targetLen)
|
||||||
|
@ -238,10 +238,10 @@ public class RetriableFileCopyCommand extends RetriableCommand {
|
||||||
}
|
}
|
||||||
|
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
long copyBytes(FileStatus sourceFileStatus, long sourceOffset,
|
long copyBytes(CopyListingFileStatus source2, long sourceOffset,
|
||||||
OutputStream outStream, int bufferSize, Mapper.Context context)
|
OutputStream outStream, int bufferSize, Mapper.Context context)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
Path source = sourceFileStatus.getPath();
|
Path source = source2.getPath();
|
||||||
byte buf[] = new byte[bufferSize];
|
byte buf[] = new byte[bufferSize];
|
||||||
ThrottledInputStream inStream = null;
|
ThrottledInputStream inStream = null;
|
||||||
long totalBytesRead = 0;
|
long totalBytesRead = 0;
|
||||||
|
@ -255,7 +255,7 @@ public class RetriableFileCopyCommand extends RetriableCommand {
|
||||||
sourceOffset += bytesRead;
|
sourceOffset += bytesRead;
|
||||||
}
|
}
|
||||||
outStream.write(buf, 0, bytesRead);
|
outStream.write(buf, 0, bytesRead);
|
||||||
updateContextStatus(totalBytesRead, context, sourceFileStatus);
|
updateContextStatus(totalBytesRead, context, source2);
|
||||||
bytesRead = readBytes(inStream, buf, sourceOffset);
|
bytesRead = readBytes(inStream, buf, sourceOffset);
|
||||||
}
|
}
|
||||||
outStream.close();
|
outStream.close();
|
||||||
|
@ -267,14 +267,14 @@ public class RetriableFileCopyCommand extends RetriableCommand {
|
||||||
}
|
}
|
||||||
|
|
||||||
private void updateContextStatus(long totalBytesRead, Mapper.Context context,
|
private void updateContextStatus(long totalBytesRead, Mapper.Context context,
|
||||||
FileStatus sourceFileStatus) {
|
CopyListingFileStatus source2) {
|
||||||
StringBuilder message = new StringBuilder(DistCpUtils.getFormatter()
|
StringBuilder message = new StringBuilder(DistCpUtils.getFormatter()
|
||||||
.format(totalBytesRead * 100.0f / sourceFileStatus.getLen()));
|
.format(totalBytesRead * 100.0f / source2.getLen()));
|
||||||
message.append("% ")
|
message.append("% ")
|
||||||
.append(description).append(" [")
|
.append(description).append(" [")
|
||||||
.append(DistCpUtils.getStringDescriptionFor(totalBytesRead))
|
.append(DistCpUtils.getStringDescriptionFor(totalBytesRead))
|
||||||
.append('/')
|
.append('/')
|
||||||
.append(DistCpUtils.getStringDescriptionFor(sourceFileStatus.getLen()))
|
.append(DistCpUtils.getStringDescriptionFor(source2.getLen()))
|
||||||
.append(']');
|
.append(']');
|
||||||
context.setStatus(message.toString());
|
context.setStatus(message.toString());
|
||||||
}
|
}
|
||||||
|
@ -307,10 +307,11 @@ public class RetriableFileCopyCommand extends RetriableCommand {
|
||||||
}
|
}
|
||||||
|
|
||||||
private static short getReplicationFactor(
|
private static short getReplicationFactor(
|
||||||
EnumSet<FileAttribute> fileAttributes,
|
EnumSet<FileAttribute> fileAttributes, CopyListingFileStatus source,
|
||||||
FileStatus sourceFile, FileSystem targetFS, Path tmpTargetPath) {
|
FileSystem targetFS, Path tmpTargetPath) {
|
||||||
return fileAttributes.contains(FileAttribute.REPLICATION)?
|
return fileAttributes.contains(FileAttribute.REPLICATION)
|
||||||
sourceFile.getReplication() : targetFS.getDefaultReplication(tmpTargetPath);
|
? source.getReplication()
|
||||||
|
: targetFS.getDefaultReplication(tmpTargetPath);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -319,11 +320,11 @@ public class RetriableFileCopyCommand extends RetriableCommand {
|
||||||
* size of the target FS.
|
* size of the target FS.
|
||||||
*/
|
*/
|
||||||
private static long getBlockSize(
|
private static long getBlockSize(
|
||||||
EnumSet<FileAttribute> fileAttributes,
|
EnumSet<FileAttribute> fileAttributes, CopyListingFileStatus source,
|
||||||
FileStatus sourceFile, FileSystem targetFS, Path tmpTargetPath) {
|
FileSystem targetFS, Path tmpTargetPath) {
|
||||||
boolean preserve = fileAttributes.contains(FileAttribute.BLOCKSIZE)
|
boolean preserve = fileAttributes.contains(FileAttribute.BLOCKSIZE)
|
||||||
|| fileAttributes.contains(FileAttribute.CHECKSUMTYPE);
|
|| fileAttributes.contains(FileAttribute.CHECKSUMTYPE);
|
||||||
return preserve ? sourceFile.getBlockSize() : targetFS
|
return preserve ? source.getBlockSize() : targetFS
|
||||||
.getDefaultBlockSize(tmpTargetPath);
|
.getDefaultBlockSize(tmpTargetPath);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -334,6 +335,7 @@ public class RetriableFileCopyCommand extends RetriableCommand {
|
||||||
* Such failures may be skipped if the DistCpOptions indicate so.
|
* Such failures may be skipped if the DistCpOptions indicate so.
|
||||||
* Write failures are intolerable, and amount to CopyMapper failure.
|
* Write failures are intolerable, and amount to CopyMapper failure.
|
||||||
*/
|
*/
|
||||||
|
@SuppressWarnings("serial")
|
||||||
public static class CopyReadException extends IOException {
|
public static class CopyReadException extends IOException {
|
||||||
public CopyReadException(Throwable rootCause) {
|
public CopyReadException(Throwable rootCause) {
|
||||||
super(rootCause);
|
super(rootCause);
|
||||||
|
|
|
@ -0,0 +1,67 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.tools;
|
||||||
|
|
||||||
|
import org.apache.hadoop.fs.FileStatus;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.fs.permission.FsPermission;
|
||||||
|
import org.apache.hadoop.io.DataInputBuffer;
|
||||||
|
import org.apache.hadoop.io.DataOutputBuffer;
|
||||||
|
|
||||||
|
import org.junit.Test;
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Verify CopyListingFileStatus serialization and requirements for distcp.
|
||||||
|
*/
|
||||||
|
public class TestCopyListingFileStatus {
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testCopyListingFileStatusSerialization() throws Exception {
|
||||||
|
CopyListingFileStatus src = new CopyListingFileStatus(
|
||||||
|
4344L, false, 2, 512 << 20, 1234L, 5678L, new FsPermission((short)0512),
|
||||||
|
"dingo", "yaks", new Path("hdfs://localhost:4344"));
|
||||||
|
DataOutputBuffer dob = new DataOutputBuffer();
|
||||||
|
src.write(dob);
|
||||||
|
|
||||||
|
DataInputBuffer dib = new DataInputBuffer();
|
||||||
|
dib.reset(dob.getData(), 0, dob.getLength());
|
||||||
|
CopyListingFileStatus dst = new CopyListingFileStatus();
|
||||||
|
dst.readFields(dib);
|
||||||
|
assertEquals(src, dst);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testFileStatusEquality() throws Exception {
|
||||||
|
FileStatus stat = new FileStatus(
|
||||||
|
4344L, false, 2, 512 << 20, 1234L, 5678L, new FsPermission((short)0512),
|
||||||
|
"dingo", "yaks", new Path("hdfs://localhost:4344/foo/bar/baz"));
|
||||||
|
CopyListingFileStatus clfs = new CopyListingFileStatus(stat);
|
||||||
|
assertEquals(stat.getLen(), clfs.getLen());
|
||||||
|
assertEquals(stat.isDirectory(), clfs.isDirectory());
|
||||||
|
assertEquals(stat.getReplication(), clfs.getReplication());
|
||||||
|
assertEquals(stat.getBlockSize(), clfs.getBlockSize());
|
||||||
|
assertEquals(stat.getAccessTime(), clfs.getAccessTime());
|
||||||
|
assertEquals(stat.getModificationTime(), clfs.getModificationTime());
|
||||||
|
assertEquals(stat.getPermission(), clfs.getPermission());
|
||||||
|
assertEquals(stat.getOwner(), clfs.getOwner());
|
||||||
|
assertEquals(stat.getGroup(), clfs.getGroup());
|
||||||
|
assertEquals(stat.getPath(), clfs.getPath());
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -22,6 +22,7 @@ import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.FileStatus;
|
import org.apache.hadoop.fs.FileStatus;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.mapreduce.*;
|
import org.apache.hadoop.mapreduce.*;
|
||||||
|
import org.apache.hadoop.tools.CopyListingFileStatus;
|
||||||
import org.apache.hadoop.tools.mapred.CopyMapper.FileAction;
|
import org.apache.hadoop.tools.mapred.CopyMapper.FileAction;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import static org.junit.Assert.*;
|
import static org.junit.Assert.*;
|
||||||
|
@ -44,8 +45,8 @@ public class TestRetriableFileCopyCommand {
|
||||||
|
|
||||||
File f = File.createTempFile(this.getClass().getSimpleName(), null);
|
File f = File.createTempFile(this.getClass().getSimpleName(), null);
|
||||||
f.deleteOnExit();
|
f.deleteOnExit();
|
||||||
FileStatus stat =
|
CopyListingFileStatus stat = new CopyListingFileStatus(
|
||||||
new FileStatus(1L, false, 1, 1024, 0, new Path(f.toURI()));
|
new FileStatus(1L, false, 1, 1024, 0, new Path(f.toURI())));
|
||||||
|
|
||||||
Exception actualEx = null;
|
Exception actualEx = null;
|
||||||
try {
|
try {
|
||||||
|
|
Loading…
Reference in New Issue