diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/CopyListingFileStatus.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/CopyListingFileStatus.java index 8af799a48fe..2b1e7e4ce47 100644 --- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/CopyListingFileStatus.java +++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/CopyListingFileStatus.java @@ -28,11 +28,15 @@ import java.util.Map.Entry; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.AclEntry; -import org.apache.hadoop.fs.permission.AclEntryType; import org.apache.hadoop.fs.permission.AclEntryScope; +import org.apache.hadoop.fs.permission.AclEntryType; import org.apache.hadoop.fs.permission.AclUtil; import org.apache.hadoop.fs.permission.FsAction; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableUtils; import com.google.common.base.Objects; @@ -40,17 +44,27 @@ import com.google.common.collect.Lists; import com.google.common.collect.Maps; /** - * CopyListingFileStatus is a specialized subclass of {@link FileStatus} for - * attaching additional data members useful to distcp. This class does not - * override {@link FileStatus#compareTo}, because the additional data members - * are not relevant to sort order. + * CopyListingFileStatus is a view of {@link FileStatus}, recording additional + * data members useful to distcp. */ @InterfaceAudience.Private -public final class CopyListingFileStatus extends FileStatus { +public final class CopyListingFileStatus implements Writable { private static final byte NO_ACL_ENTRIES = -1; private static final int NO_XATTRS = -1; + // FileStatus fields + private Path path; + private long length; + private boolean isdir; + private short blockReplication; + private long blocksize; + private long modificationTime; + private long accessTime; + private FsPermission permission; + private String owner; + private String group; + // Retain static arrays of enum values to prevent repeated allocation of new // arrays during deserialization. private static final AclEntryType[] ACL_ENTRY_TYPES = AclEntryType.values(); @@ -64,6 +78,7 @@ public final class CopyListingFileStatus extends FileStatus { * Default constructor. */ public CopyListingFileStatus() { + this(0, false, 0, 0, 0, 0, null, null, null, null); } /** @@ -72,8 +87,76 @@ public final class CopyListingFileStatus extends FileStatus { * * @param fileStatus FileStatus to copy */ - public CopyListingFileStatus(FileStatus fileStatus) throws IOException { - super(fileStatus); + public CopyListingFileStatus(FileStatus fileStatus) { + this(fileStatus.getLen(), fileStatus.isDirectory(), + fileStatus.getReplication(), fileStatus.getBlockSize(), + fileStatus.getModificationTime(), fileStatus.getAccessTime(), + fileStatus.getPermission(), fileStatus.getOwner(), + fileStatus.getGroup(), + fileStatus.getPath()); + } + + @SuppressWarnings("checkstyle:parameternumber") + public CopyListingFileStatus(long length, boolean isdir, + int blockReplication, long blocksize, long modificationTime, + long accessTime, FsPermission permission, String owner, String group, + Path path) { + this.length = length; + this.isdir = isdir; + this.blockReplication = (short)blockReplication; + this.blocksize = blocksize; + this.modificationTime = modificationTime; + this.accessTime = accessTime; + if (permission != null) { + this.permission = permission; + } else { + this.permission = isdir + ? FsPermission.getDirDefault() + : FsPermission.getFileDefault(); + } + this.owner = (owner == null) ? "" : owner; + this.group = (group == null) ? "" : group; + this.path = path; + } + + public Path getPath() { + return path; + } + + public long getLen() { + return length; + } + + public long getBlockSize() { + return blocksize; + } + + public boolean isDirectory() { + return isdir; + } + + public short getReplication() { + return blockReplication; + } + + public long getModificationTime() { + return modificationTime; + } + + public String getOwner() { + return owner; + } + + public String getGroup() { + return group; + } + + public long getAccessTime() { + return accessTime; + } + + public FsPermission getPermission() { + return permission; } /** @@ -115,7 +198,16 @@ public final class CopyListingFileStatus extends FileStatus { @Override public void write(DataOutput out) throws IOException { - super.write(out); + Text.writeString(out, getPath().toString(), Text.DEFAULT_MAX_LEN); + out.writeLong(getLen()); + out.writeBoolean(isDirectory()); + out.writeShort(getReplication()); + out.writeLong(getBlockSize()); + out.writeLong(getModificationTime()); + out.writeLong(getAccessTime()); + getPermission().write(out); + Text.writeString(out, getOwner(), Text.DEFAULT_MAX_LEN); + Text.writeString(out, getGroup(), Text.DEFAULT_MAX_LEN); if (aclEntries != null) { // byte is sufficient, because 32 ACL entries is the max enforced by HDFS. out.writeByte(aclEntries.size()); @@ -152,7 +244,17 @@ public final class CopyListingFileStatus extends FileStatus { @Override public void readFields(DataInput in) throws IOException { - super.readFields(in); + String strPath = Text.readString(in, Text.DEFAULT_MAX_LEN); + this.path = new Path(strPath); + this.length = in.readLong(); + this.isdir = in.readBoolean(); + this.blockReplication = in.readShort(); + blocksize = in.readLong(); + modificationTime = in.readLong(); + accessTime = in.readLong(); + permission.readFields(in); + owner = Text.readString(in, Text.DEFAULT_MAX_LEN); + group = Text.readString(in, Text.DEFAULT_MAX_LEN); byte aclEntriesSize = in.readByte(); if (aclEntriesSize != NO_ACL_ENTRIES) { aclEntries = Lists.newArrayListWithCapacity(aclEntriesSize); @@ -190,15 +292,16 @@ public final class CopyListingFileStatus extends FileStatus { @Override public boolean equals(Object o) { - if (!super.equals(o)) { + if (null == o) { return false; } if (getClass() != o.getClass()) { return false; } CopyListingFileStatus other = (CopyListingFileStatus)o; - return Objects.equal(aclEntries, other.aclEntries) && - Objects.equal(xAttrs, other.xAttrs); + return getPath().equals(other.getPath()) + && Objects.equal(aclEntries, other.aclEntries) + && Objects.equal(xAttrs, other.xAttrs); } @Override diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyMapper.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyMapper.java index 6d93d81b8c8..41c5d78cb50 100644 --- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyMapper.java +++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyMapper.java @@ -264,8 +264,18 @@ public class CopyMapper extends Mapper } } + private String getFileType(CopyListingFileStatus fileStatus) { + if (null == fileStatus) { + return "N/A"; + } + return fileStatus.isDirectory() ? "dir" : "file"; + } + private String getFileType(FileStatus fileStatus) { - return fileStatus == null ? "N/A" : (fileStatus.isDirectory() ? "dir" : "file"); + if (null == fileStatus) { + return "N/A"; + } + return fileStatus.isDirectory() ? "dir" : "file"; } private static EnumSet @@ -276,7 +286,7 @@ public class CopyMapper extends Mapper } private void copyFileWithRetry(String description, - FileStatus sourceFileStatus, Path target, Context context, + CopyListingFileStatus sourceFileStatus, Path target, Context context, FileAction action, EnumSet fileAttributes) throws IOException { long bytesCopied; @@ -304,15 +314,15 @@ public class CopyMapper extends Mapper } private static void updateSkipCounters(Context context, - FileStatus sourceFile) { + CopyListingFileStatus sourceFile) { incrementCounter(context, Counter.SKIP, 1); incrementCounter(context, Counter.BYTESSKIPPED, sourceFile.getLen()); } private void handleFailures(IOException exception, - FileStatus sourceFileStatus, Path target, - Context context) throws IOException, InterruptedException { + CopyListingFileStatus sourceFileStatus, Path target, Context context) + throws IOException, InterruptedException { LOG.error("Failure in copying " + sourceFileStatus.getPath() + " to " + target, exception); @@ -332,8 +342,9 @@ public class CopyMapper extends Mapper context.getCounter(counter).increment(value); } - private FileAction checkUpdate(FileSystem sourceFS, FileStatus source, - Path target, FileStatus targetFileStatus) throws IOException { + private FileAction checkUpdate(FileSystem sourceFS, + CopyListingFileStatus source, Path target, FileStatus targetFileStatus) + throws IOException { if (targetFileStatus != null && !overWrite) { if (canSkip(sourceFS, source, targetFileStatus)) { return FileAction.SKIP; @@ -354,7 +365,7 @@ public class CopyMapper extends Mapper return FileAction.OVERWRITE; } - private boolean canSkip(FileSystem sourceFS, FileStatus source, + private boolean canSkip(FileSystem sourceFS, CopyListingFileStatus source, FileStatus target) throws IOException { if (!syncFolders) { return true; diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/RetriableFileCopyCommand.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/RetriableFileCopyCommand.java index bed9162727a..17773642b67 100644 --- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/RetriableFileCopyCommand.java +++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/RetriableFileCopyCommand.java @@ -30,13 +30,13 @@ import org.apache.hadoop.fs.CreateFlag; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileChecksum; -import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Options.ChecksumOpt; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.tools.CopyListingFileStatus; import org.apache.hadoop.tools.DistCpConstants; import org.apache.hadoop.tools.DistCpOptions.FileAttribute; import org.apache.hadoop.tools.mapred.CopyMapper.FileAction; @@ -90,7 +90,7 @@ public class RetriableFileCopyCommand extends RetriableCommand { @Override protected Object doExecute(Object... arguments) throws Exception { assert arguments.length == 4 : "Unexpected argument list."; - FileStatus source = (FileStatus)arguments[0]; + CopyListingFileStatus source = (CopyListingFileStatus)arguments[0]; assert !source.isDirectory() : "Unexpected file-status. Expected file."; Path target = (Path)arguments[1]; Mapper.Context context = (Mapper.Context)arguments[2]; @@ -99,7 +99,7 @@ public class RetriableFileCopyCommand extends RetriableCommand { return doCopy(source, target, context, fileAttributes); } - private long doCopy(FileStatus sourceFileStatus, Path target, + private long doCopy(CopyListingFileStatus source, Path target, Mapper.Context context, EnumSet fileAttributes) throws IOException { final boolean toAppend = action == FileAction.APPEND; @@ -109,10 +109,10 @@ public class RetriableFileCopyCommand extends RetriableCommand { try { if (LOG.isDebugEnabled()) { - LOG.debug("Copying " + sourceFileStatus.getPath() + " to " + target); + LOG.debug("Copying " + source.getPath() + " to " + target); LOG.debug("Target file path: " + targetPath); } - final Path sourcePath = sourceFileStatus.getPath(); + final Path sourcePath = source.getPath(); final FileSystem sourceFS = sourcePath.getFileSystem(configuration); final FileChecksum sourceChecksum = fileAttributes .contains(FileAttribute.CHECKSUMTYPE) ? sourceFS @@ -120,14 +120,14 @@ public class RetriableFileCopyCommand extends RetriableCommand { final long offset = action == FileAction.APPEND ? targetFS.getFileStatus( target).getLen() : 0; - long bytesRead = copyToFile(targetPath, targetFS, sourceFileStatus, + long bytesRead = copyToFile(targetPath, targetFS, source, offset, context, fileAttributes, sourceChecksum); - compareFileLengths(sourceFileStatus, targetPath, configuration, bytesRead + compareFileLengths(source, targetPath, configuration, bytesRead + offset); //At this point, src&dest lengths are same. if length==0, we skip checksum if ((bytesRead != 0) && (!skipCrc)) { - compareCheckSums(sourceFS, sourceFileStatus.getPath(), sourceChecksum, + compareCheckSums(sourceFS, source.getPath(), sourceChecksum, targetFS, targetPath); } // it's not append case, thus we first write to a temporary file, rename @@ -160,16 +160,16 @@ public class RetriableFileCopyCommand extends RetriableCommand { } private long copyToFile(Path targetPath, FileSystem targetFS, - FileStatus sourceFileStatus, long sourceOffset, Mapper.Context context, + CopyListingFileStatus source, long sourceOffset, Mapper.Context context, EnumSet fileAttributes, final FileChecksum sourceChecksum) throws IOException { FsPermission permission = FsPermission.getFileDefault().applyUMask( FsPermission.getUMask(targetFS.getConf())); final OutputStream outStream; if (action == FileAction.OVERWRITE) { - final short repl = getReplicationFactor(fileAttributes, sourceFileStatus, + final short repl = getReplicationFactor(fileAttributes, source, targetFS, targetPath); - final long blockSize = getBlockSize(fileAttributes, sourceFileStatus, + final long blockSize = getBlockSize(fileAttributes, source, targetFS, targetPath); FSDataOutputStream out = targetFS.create(targetPath, permission, EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE), @@ -180,14 +180,14 @@ public class RetriableFileCopyCommand extends RetriableCommand { outStream = new BufferedOutputStream(targetFS.append(targetPath, BUFFER_SIZE)); } - return copyBytes(sourceFileStatus, sourceOffset, outStream, BUFFER_SIZE, + return copyBytes(source, sourceOffset, outStream, BUFFER_SIZE, context); } - private void compareFileLengths(FileStatus sourceFileStatus, Path target, + private void compareFileLengths(CopyListingFileStatus source, Path target, Configuration configuration, long targetLen) throws IOException { - final Path sourcePath = sourceFileStatus.getPath(); + final Path sourcePath = source.getPath(); FileSystem fs = sourcePath.getFileSystem(configuration); long srcLen = fs.getFileStatus(sourcePath).getLen(); if (srcLen != targetLen) @@ -238,10 +238,10 @@ public class RetriableFileCopyCommand extends RetriableCommand { } @VisibleForTesting - long copyBytes(FileStatus sourceFileStatus, long sourceOffset, + long copyBytes(CopyListingFileStatus source2, long sourceOffset, OutputStream outStream, int bufferSize, Mapper.Context context) throws IOException { - Path source = sourceFileStatus.getPath(); + Path source = source2.getPath(); byte buf[] = new byte[bufferSize]; ThrottledInputStream inStream = null; long totalBytesRead = 0; @@ -255,7 +255,7 @@ public class RetriableFileCopyCommand extends RetriableCommand { sourceOffset += bytesRead; } outStream.write(buf, 0, bytesRead); - updateContextStatus(totalBytesRead, context, sourceFileStatus); + updateContextStatus(totalBytesRead, context, source2); bytesRead = readBytes(inStream, buf, sourceOffset); } outStream.close(); @@ -267,14 +267,14 @@ public class RetriableFileCopyCommand extends RetriableCommand { } private void updateContextStatus(long totalBytesRead, Mapper.Context context, - FileStatus sourceFileStatus) { + CopyListingFileStatus source2) { StringBuilder message = new StringBuilder(DistCpUtils.getFormatter() - .format(totalBytesRead * 100.0f / sourceFileStatus.getLen())); + .format(totalBytesRead * 100.0f / source2.getLen())); message.append("% ") .append(description).append(" [") .append(DistCpUtils.getStringDescriptionFor(totalBytesRead)) .append('/') - .append(DistCpUtils.getStringDescriptionFor(sourceFileStatus.getLen())) + .append(DistCpUtils.getStringDescriptionFor(source2.getLen())) .append(']'); context.setStatus(message.toString()); } @@ -307,10 +307,11 @@ public class RetriableFileCopyCommand extends RetriableCommand { } private static short getReplicationFactor( - EnumSet fileAttributes, - FileStatus sourceFile, FileSystem targetFS, Path tmpTargetPath) { - return fileAttributes.contains(FileAttribute.REPLICATION)? - sourceFile.getReplication() : targetFS.getDefaultReplication(tmpTargetPath); + EnumSet fileAttributes, CopyListingFileStatus source, + FileSystem targetFS, Path tmpTargetPath) { + return fileAttributes.contains(FileAttribute.REPLICATION) + ? source.getReplication() + : targetFS.getDefaultReplication(tmpTargetPath); } /** @@ -319,11 +320,11 @@ public class RetriableFileCopyCommand extends RetriableCommand { * size of the target FS. */ private static long getBlockSize( - EnumSet fileAttributes, - FileStatus sourceFile, FileSystem targetFS, Path tmpTargetPath) { + EnumSet fileAttributes, CopyListingFileStatus source, + FileSystem targetFS, Path tmpTargetPath) { boolean preserve = fileAttributes.contains(FileAttribute.BLOCKSIZE) || fileAttributes.contains(FileAttribute.CHECKSUMTYPE); - return preserve ? sourceFile.getBlockSize() : targetFS + return preserve ? source.getBlockSize() : targetFS .getDefaultBlockSize(tmpTargetPath); } @@ -334,6 +335,7 @@ public class RetriableFileCopyCommand extends RetriableCommand { * Such failures may be skipped if the DistCpOptions indicate so. * Write failures are intolerable, and amount to CopyMapper failure. */ + @SuppressWarnings("serial") public static class CopyReadException extends IOException { public CopyReadException(Throwable rootCause) { super(rootCause); diff --git a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestCopyListingFileStatus.java b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestCopyListingFileStatus.java new file mode 100644 index 00000000000..f512ef6d9d8 --- /dev/null +++ b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestCopyListingFileStatus.java @@ -0,0 +1,67 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.tools; + +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.io.DataInputBuffer; +import org.apache.hadoop.io.DataOutputBuffer; + +import org.junit.Test; +import static org.junit.Assert.assertEquals; + +/** + * Verify CopyListingFileStatus serialization and requirements for distcp. + */ +public class TestCopyListingFileStatus { + + @Test + public void testCopyListingFileStatusSerialization() throws Exception { + CopyListingFileStatus src = new CopyListingFileStatus( + 4344L, false, 2, 512 << 20, 1234L, 5678L, new FsPermission((short)0512), + "dingo", "yaks", new Path("hdfs://localhost:4344")); + DataOutputBuffer dob = new DataOutputBuffer(); + src.write(dob); + + DataInputBuffer dib = new DataInputBuffer(); + dib.reset(dob.getData(), 0, dob.getLength()); + CopyListingFileStatus dst = new CopyListingFileStatus(); + dst.readFields(dib); + assertEquals(src, dst); + } + + @Test + public void testFileStatusEquality() throws Exception { + FileStatus stat = new FileStatus( + 4344L, false, 2, 512 << 20, 1234L, 5678L, new FsPermission((short)0512), + "dingo", "yaks", new Path("hdfs://localhost:4344/foo/bar/baz")); + CopyListingFileStatus clfs = new CopyListingFileStatus(stat); + assertEquals(stat.getLen(), clfs.getLen()); + assertEquals(stat.isDirectory(), clfs.isDirectory()); + assertEquals(stat.getReplication(), clfs.getReplication()); + assertEquals(stat.getBlockSize(), clfs.getBlockSize()); + assertEquals(stat.getAccessTime(), clfs.getAccessTime()); + assertEquals(stat.getModificationTime(), clfs.getModificationTime()); + assertEquals(stat.getPermission(), clfs.getPermission()); + assertEquals(stat.getOwner(), clfs.getOwner()); + assertEquals(stat.getGroup(), clfs.getGroup()); + assertEquals(stat.getPath(), clfs.getPath()); + } + +} diff --git a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestRetriableFileCopyCommand.java b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestRetriableFileCopyCommand.java index f1b8532b74c..1f8a915f5f3 100644 --- a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestRetriableFileCopyCommand.java +++ b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestRetriableFileCopyCommand.java @@ -22,6 +22,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapreduce.*; +import org.apache.hadoop.tools.CopyListingFileStatus; import org.apache.hadoop.tools.mapred.CopyMapper.FileAction; import org.junit.Test; import static org.junit.Assert.*; @@ -44,8 +45,8 @@ public class TestRetriableFileCopyCommand { File f = File.createTempFile(this.getClass().getSimpleName(), null); f.deleteOnExit(); - FileStatus stat = - new FileStatus(1L, false, 1, 1024, 0, new Path(f.toURI())); + CopyListingFileStatus stat = new CopyListingFileStatus( + new FileStatus(1L, false, 1, 1024, 0, new Path(f.toURI()))); Exception actualEx = null; try {