Merge from trunk to branch

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/fs-encryption@1603993 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Andrew Wang 2014-06-19 18:32:13 +00:00
commit 97583dbb0a
40 changed files with 1378 additions and 450 deletions

View File

@ -573,6 +573,9 @@ Release 2.5.0 - UNRELEASED
HADOOP-10660. GraphiteSink should implement Closeable (Chen He and Ted Yu via raviprak)
HADOOP-10716. Cannot use more than 1 har filesystem.
(Rushabh Shah via cnauroth)
BREAKDOWN OF HADOOP-10514 SUBTASKS AND RELATED JIRAS
HADOOP-10520. Extended attributes definition and FileSystem APIs for

View File

@ -1391,4 +1391,11 @@
The secure random algorithm.
</description>
</property>
<property>
<name>fs.har.impl.disable.cache</name>
<value>true</value>
<description>Don't cache 'har' filesystem instances.</description>
</property>
</configuration>

View File

@ -36,6 +36,8 @@ public class NfsConfiguration extends HdfsConfiguration {
NfsConfigKeys.DFS_NFS_SERVER_PORT_KEY),
new DeprecationDelta("nfs3.mountd.port",
NfsConfigKeys.DFS_NFS_MOUNTD_PORT_KEY),
new DeprecationDelta("dfs.nfs.exports.cache.size",
Nfs3Constant.NFS_EXPORTS_CACHE_SIZE_KEY),
new DeprecationDelta("dfs.nfs.exports.cache.expirytime.millis",
Nfs3Constant.NFS_EXPORTS_CACHE_EXPIRYTIME_MILLIS_KEY),
new DeprecationDelta("hadoop.nfs.userupdate.milly",
@ -51,6 +53,16 @@ public class NfsConfiguration extends HdfsConfiguration {
new DeprecationDelta("dfs.nfs3.export.point",
NfsConfigKeys.DFS_NFS_EXPORT_POINT_KEY),
new DeprecationDelta("nfs.allow.insecure.ports",
NfsConfigKeys.DFS_NFS_PORT_MONITORING_DISABLED_KEY) });
NfsConfigKeys.DFS_NFS_PORT_MONITORING_DISABLED_KEY),
new DeprecationDelta("dfs.nfs.keytab.file",
NfsConfigKeys.DFS_NFS_KEYTAB_FILE_KEY),
new DeprecationDelta("dfs.nfs.kerberos.principal",
NfsConfigKeys.DFS_NFS_KERBEROS_PRINCIPAL_KEY),
new DeprecationDelta("dfs.nfs.rtmax",
NfsConfigKeys.DFS_NFS_MAX_READ_TRANSFER_SIZE_KEY),
new DeprecationDelta("dfs.nfs.wtmax",
NfsConfigKeys.DFS_NFS_MAX_WRITE_TRANSFER_SIZE_KEY),
new DeprecationDelta("dfs.nfs.dtmax",
NfsConfigKeys.DFS_NFS_MAX_READDIR_TRANSFER_SIZE_KEY) });
}
}

View File

@ -254,9 +254,6 @@ Trunk (Unreleased)
HDFS-5794. Fix the inconsistency of layout version number of
ADD_DATANODE_AND_STORAGE_UUIDS between trunk and branch-2. (jing9)
HDFS-6375. Listing extended attributes with the search permission.
(Charles Lamb via wang)
Release 2.5.0 - UNRELEASED
INCOMPATIBLE CHANGES
@ -454,6 +451,8 @@ Release 2.5.0 - UNRELEASED
HDFS-6530. Fix Balancer documentation. (szetszwo)
HDFS-6480. Move waitForReady() from FSDirectory to FSNamesystem. (wheat9)
OPTIMIZATIONS
HDFS-6214. Webhdfs has poor throughput for files >2GB (daryn)
@ -665,6 +664,12 @@ Release 2.5.0 - UNRELEASED
HDFS-6559. Fix wrong option "dfsadmin -rollingUpgrade start" in the
document. (Akira Ajisaka via Arpit Agarwal)
HDFS-6553. Add missing DeprecationDeltas for NFS Kerberos configurations
(Stephen Chu via brandonli)
HDFS-6563. NameNode cannot save fsimage in certain circumstances when
snapshots are in use. (atm)
BREAKDOWN OF HDFS-2006 SUBTASKS AND RELATED JIRAS
HDFS-6299. Protobuf for XAttr and client-side implementation. (Yi Liu via umamahesh)
@ -728,6 +733,12 @@ Release 2.5.0 - UNRELEASED
HDFS-6374. setXAttr should require the user to be the owner of the file
or directory (Charles Lamb via wang)
HDFS-6375. Listing extended attributes with the search permission.
(Charles Lamb via wang)
HDFS-6492. Support create-time xattrs and atomically setting multiple
xattrs. (wang)
Release 2.4.1 - 2014-06-23
INCOMPATIBLE CHANGES

View File

@ -19,6 +19,8 @@ package org.apache.hadoop.fs;
import java.util.Arrays;
import org.apache.commons.lang.builder.EqualsBuilder;
import org.apache.commons.lang.builder.HashCodeBuilder;
import org.apache.hadoop.classification.InterfaceAudience;
/**
@ -105,42 +107,47 @@ public class XAttr {
@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + ((name == null) ? 0 : name.hashCode());
result = prime * result + ((ns == null) ? 0 : ns.hashCode());
result = prime * result + Arrays.hashCode(value);
return result;
return new HashCodeBuilder(811, 67)
.append(name)
.append(ns)
.append(value)
.toHashCode();
}
@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (obj == null) {
if (obj == null) { return false; }
if (obj == this) { return true; }
if (obj.getClass() != getClass()) {
return false;
}
if (getClass() != obj.getClass()) {
return false;
}
XAttr other = (XAttr) obj;
if (name == null) {
if (other.name != null) {
return false;
}
} else if (!name.equals(other.name)) {
return false;
}
if (ns != other.ns) {
return false;
}
if (!Arrays.equals(value, other.value)) {
return false;
}
return true;
XAttr rhs = (XAttr) obj;
return new EqualsBuilder()
.append(ns, rhs.ns)
.append(name, rhs.name)
.append(value, rhs.value)
.isEquals();
}
/**
* Similar to {@link #equals(Object)}, except ignores the XAttr value.
*
* @param obj to compare equality
* @return if the XAttrs are equal, ignoring the XAttr value
*/
public boolean equalsIgnoreValue(Object obj) {
if (obj == null) { return false; }
if (obj == this) { return true; }
if (obj.getClass() != getClass()) {
return false;
}
XAttr rhs = (XAttr) obj;
return new EqualsBuilder()
.append(ns, rhs.ns)
.append(name, rhs.name)
.isEquals();
}
@Override
public String toString() {
return "XAttr [ns=" + ns + ", name=" + name + ", value="

View File

@ -2112,6 +2112,9 @@ public class PBHelper {
public static List<XAttrProto> convertXAttrProto(
List<XAttr> xAttrSpec) {
if (xAttrSpec == null) {
return Lists.newArrayListWithCapacity(0);
}
ArrayList<XAttrProto> xAttrs = Lists.newArrayListWithCapacity(
xAttrSpec.size());
for (XAttr a : xAttrSpec) {

View File

@ -252,7 +252,7 @@ class Checkpointer extends Daemon {
backupNode.namesystem.writeLock();
try {
backupNode.namesystem.dir.setReady();
backupNode.namesystem.setImageLoaded();
if(backupNode.namesystem.getBlocksTotal() > 0) {
backupNode.namesystem.setBlockTotal();
}

View File

@ -26,11 +26,11 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.EnumSet;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.ListIterator;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.crypto.CryptoCodec;
import org.apache.hadoop.fs.ContentSummary;
@ -89,15 +89,14 @@ import static org.apache.hadoop.hdfs.protocol.HdfsConstants.CRYPTO_XATTR_KEY_ID;
import static org.apache.hadoop.hdfs.protocol.HdfsConstants.CRYPTO_XATTR_IV;
import static org.apache.hadoop.hdfs.protocol.HdfsConstants.CRYPTO_XATTR_KEY_VERSION_ID;
/*************************************************
* FSDirectory stores the filesystem directory state.
* It handles writing/loading values to disk, and logging
* changes as we go.
*
* It keeps the filename->blockset mapping always-current
* and logged to disk.
*
*************************************************/
/**
* Both FSDirectory and FSNamesystem manage the state of the namespace.
* FSDirectory is a pure in-memory data structure, all of whose operations
* happen entirely in memory. In contrast, FSNamesystem persists the operations
* to the disk.
* @see org.apache.hadoop.hdfs.server.namenode.FSNamesystem
**/
@InterfaceAudience.Private
public class FSDirectory implements Closeable {
private static INodeDirectorySnapshottable createRoot(FSNamesystem namesystem) {
final INodeDirectory r = new INodeDirectory(
@ -126,7 +125,6 @@ public class FSDirectory implements Closeable {
INodeDirectory rootDir;
FSImage fsImage;
private final FSNamesystem namesystem;
private volatile boolean ready = false;
private volatile boolean skipQuotaCheck = false; //skip while consuming edits
private final int maxComponentLength;
private final int maxDirItems;
@ -139,7 +137,6 @@ public class FSDirectory implements Closeable {
// lock to protect the directory and BlockMap
private final ReentrantReadWriteLock dirLock;
private final Condition cond;
// utility methods to acquire and release read lock and write lock
void readLock() {
@ -182,7 +179,6 @@ public class FSDirectory implements Closeable {
FSDirectory(FSImage fsImage, FSNamesystem ns, Configuration conf) {
this.dirLock = new ReentrantReadWriteLock(true); // fair
this.cond = dirLock.writeLock().newCondition();
rootDir = createRoot(ns);
inodeMap = INodeMap.newInstance(rootDir);
this.fsImage = fsImage;
@ -239,38 +235,6 @@ public class FSDirectory implements Closeable {
return rootDir;
}
/**
* Notify that loading of this FSDirectory is complete, and
* it is ready for use
*/
void imageLoadComplete() {
Preconditions.checkState(!ready, "FSDirectory already loaded");
setReady();
}
void setReady() {
if(ready) return;
writeLock();
try {
setReady(true);
this.nameCache.initialized();
cond.signalAll();
} finally {
writeUnlock();
}
}
//This is for testing purposes only
@VisibleForTesting
boolean isReady() {
return ready;
}
// exposed for unit tests
protected void setReady(boolean flag) {
ready = flag;
}
/**
* Shutdown the filestore
*/
@ -279,22 +243,12 @@ public class FSDirectory implements Closeable {
fsImage.close();
}
/**
* Block until the object is ready to be used.
*/
void waitForReady() {
if (!ready) {
writeLock();
try {
while (!ready) {
try {
cond.await(5000, TimeUnit.MILLISECONDS);
} catch (InterruptedException ignored) {
}
}
} finally {
writeUnlock();
}
void markNameCacheInitialized() {
writeLock();
try {
nameCache.initialized();
} finally {
writeUnlock();
}
}
@ -320,7 +274,6 @@ public class FSDirectory implements Closeable {
String clientMachine, DatanodeDescriptor clientNode)
throws FileAlreadyExistsException, QuotaExceededException,
UnresolvedLinkException, SnapshotAccessControlException, AclException {
waitForReady();
long modTime = now();
INodeFile newNode = new INodeFile(namesystem.allocateNewInodeId(), null,
@ -350,6 +303,7 @@ public class FSDirectory implements Closeable {
String path,
PermissionStatus permissions,
List<AclEntry> aclEntries,
List<XAttr> xAttrs,
short replication,
long modificationTime,
long atime,
@ -376,6 +330,10 @@ public class FSDirectory implements Closeable {
AclStorage.updateINodeAcl(newNode, aclEntries,
Snapshot.CURRENT_STATE_ID);
}
if (xAttrs != null) {
XAttrStorage.updateINodeXAttrs(newNode, xAttrs,
Snapshot.CURRENT_STATE_ID);
}
return newNode;
}
} catch (IOException e) {
@ -393,8 +351,6 @@ public class FSDirectory implements Closeable {
*/
BlockInfo addBlock(String path, INodesInPath inodesInPath, Block block,
DatanodeStorageInfo[] targets) throws IOException {
waitForReady();
writeLock();
try {
final INodeFile fileINode = inodesInPath.getLastINode().asFile();
@ -432,8 +388,6 @@ public class FSDirectory implements Closeable {
boolean removeBlock(String path, INodeFile fileNode, Block block)
throws IOException {
Preconditions.checkArgument(fileNode.isUnderConstruction());
waitForReady();
writeLock();
try {
return unprotectedRemoveBlock(path, fileNode, block);
@ -477,7 +431,6 @@ public class FSDirectory implements Closeable {
NameNode.stateChangeLog.debug("DIR* FSDirectory.renameTo: "
+src+" to "+dst);
}
waitForReady();
writeLock();
try {
if (!unprotectedRenameTo(src, dst, mtime))
@ -500,7 +453,6 @@ public class FSDirectory implements Closeable {
NameNode.stateChangeLog.debug("DIR* FSDirectory.renameTo: " + src
+ " to " + dst);
}
waitForReady();
writeLock();
try {
if (unprotectedRenameTo(src, dst, mtime, options)) {
@ -1032,7 +984,6 @@ public class FSDirectory implements Closeable {
Block[] setReplication(String src, short replication, short[] blockRepls)
throws QuotaExceededException, UnresolvedLinkException,
SnapshotAccessControlException {
waitForReady();
writeLock();
try {
return unprotectedSetReplication(src, replication, blockRepls);
@ -1155,7 +1106,6 @@ public class FSDirectory implements Closeable {
writeLock();
try {
// actual move
waitForReady();
unprotectedConcat(target, srcs, timestamp);
} finally {
writeUnlock();
@ -1238,7 +1188,6 @@ public class FSDirectory implements Closeable {
if (NameNode.stateChangeLog.isDebugEnabled()) {
NameNode.stateChangeLog.debug("DIR* FSDirectory.delete: " + src);
}
waitForReady();
final long filesRemoved;
writeLock();
try {
@ -1711,7 +1660,7 @@ public class FSDirectory implements Closeable {
long nsDelta, long dsDelta, boolean checkQuota)
throws QuotaExceededException {
assert hasWriteLock();
if (!ready) {
if (!namesystem.isImageLoaded()) {
//still initializing. do not check or update quotas.
return;
}
@ -1904,7 +1853,7 @@ public class FSDirectory implements Closeable {
*/
private void verifyQuotaForRename(INode[] src, INode[] dst)
throws QuotaExceededException {
if (!ready || skipQuotaCheck) {
if (!namesystem.isImageLoaded() || skipQuotaCheck) {
// Do not check quota if edits log is still being processed
return;
}
@ -1960,7 +1909,7 @@ public class FSDirectory implements Closeable {
void verifyINodeName(byte[] childName) throws HadoopIllegalArgumentException {
if (Arrays.equals(HdfsConstants.DOT_SNAPSHOT_DIR_BYTES, childName)) {
String s = "\"" + HdfsConstants.DOT_SNAPSHOT_DIR + "\" is a reserved name.";
if (!ready) {
if (!namesystem.isImageLoaded()) {
s += " Please rename it before upgrade.";
}
throw new HadoopIllegalArgumentException(s);
@ -1987,7 +1936,7 @@ public class FSDirectory implements Closeable {
getFullPathName((INode[])parentPath, pos - 1): (String)parentPath;
final PathComponentTooLongException e = new PathComponentTooLongException(
maxComponentLength, length, p, DFSUtil.bytes2String(childName));
if (ready) {
if (namesystem.isImageLoaded()) {
throw e;
} else {
// Do not throw if edits log is still being processed
@ -2011,7 +1960,7 @@ public class FSDirectory implements Closeable {
if (count >= maxDirItems) {
final MaxDirectoryItemsExceededException e
= new MaxDirectoryItemsExceededException(maxDirItems, count);
if (ready) {
if (namesystem.isImageLoaded()) {
e.setPathName(getFullPathName(pathComponents, pos - 1));
throw e;
} else {
@ -2347,7 +2296,6 @@ public class FSDirectory implements Closeable {
void reset() {
writeLock();
try {
setReady(false);
rootDir = createRoot(getFSNamesystem());
inodeMap.clear();
addToInodeMap(rootDir);
@ -2631,45 +2579,81 @@ public class FSDirectory implements Closeable {
}
}
XAttr removeXAttr(String src, XAttr xAttr) throws IOException {
/**
* Removes a list of XAttrs from an inode at a path.
*
* @param src path of inode
* @param toRemove XAttrs to be removed
* @return List of XAttrs that were removed
* @throws IOException if the inode does not exist, if quota is exceeded
*/
List<XAttr> removeXAttrs(final String src, final List<XAttr> toRemove)
throws IOException {
writeLock();
try {
return unprotectedRemoveXAttr(src, xAttr);
return unprotectedRemoveXAttrs(src, toRemove);
} finally {
writeUnlock();
}
}
XAttr unprotectedRemoveXAttr(String src,
XAttr xAttr) throws IOException {
List<XAttr> unprotectedRemoveXAttrs(final String src,
final List<XAttr> toRemove) throws IOException {
assert hasWriteLock();
INodesInPath iip = getINodesInPath4Write(normalizePath(src), true);
INode inode = resolveLastINode(src, iip);
int snapshotId = iip.getLatestSnapshotId();
List<XAttr> existingXAttrs = XAttrStorage.readINodeXAttrs(inode);
List<XAttr> newXAttrs = filterINodeXAttr(existingXAttrs, xAttr);
List<XAttr> removedXAttrs = Lists.newArrayListWithCapacity(toRemove.size());
List<XAttr> newXAttrs = filterINodeXAttrs(existingXAttrs, toRemove,
removedXAttrs);
if (existingXAttrs.size() != newXAttrs.size()) {
XAttrStorage.updateINodeXAttrs(inode, newXAttrs, snapshotId);
return xAttr;
return removedXAttrs;
}
return null;
}
List<XAttr> filterINodeXAttr(List<XAttr> existingXAttrs,
XAttr xAttr) throws QuotaExceededException {
if (existingXAttrs == null || existingXAttrs.isEmpty()) {
/**
* Filter XAttrs from a list of existing XAttrs. Removes matched XAttrs from
* toFilter and puts them into filtered. Upon completion,
* toFilter contains the filter XAttrs that were not found, while
* fitleredXAttrs contains the XAttrs that were found.
*
* @param existingXAttrs Existing XAttrs to be filtered
* @param toFilter XAttrs to filter from the existing XAttrs
* @param filtered Return parameter, XAttrs that were filtered
* @return List of XAttrs that does not contain filtered XAttrs
*/
@VisibleForTesting
List<XAttr> filterINodeXAttrs(final List<XAttr> existingXAttrs,
final List<XAttr> toFilter, final List<XAttr> filtered) {
if (existingXAttrs == null || existingXAttrs.isEmpty() ||
toFilter == null || toFilter.isEmpty()) {
return existingXAttrs;
}
List<XAttr> xAttrs = Lists.newArrayListWithCapacity(existingXAttrs.size());
// Populate a new list with XAttrs that pass the filter
List<XAttr> newXAttrs =
Lists.newArrayListWithCapacity(existingXAttrs.size());
for (XAttr a : existingXAttrs) {
if (!(a.getNameSpace() == xAttr.getNameSpace()
&& a.getName().equals(xAttr.getName()))) {
xAttrs.add(a);
boolean add = true;
for (ListIterator<XAttr> it = toFilter.listIterator(); it.hasNext()
;) {
XAttr filter = it.next();
if (a.equalsIgnoreValue(filter)) {
add = false;
it.remove();
filtered.add(filter);
break;
}
}
if (add) {
newXAttrs.add(a);
}
}
return xAttrs;
return newXAttrs;
}
XAttr createEncryptionZone(String src, String keyId)
@ -2682,14 +2666,16 @@ public class FSDirectory implements Closeable {
}
final XAttr keyIdXAttr =
XAttrHelper.buildXAttr(CRYPTO_XATTR_KEY_ID, keyId.getBytes());
unprotectedSetXAttr(src, keyIdXAttr, EnumSet.of(XAttrSetFlag.CREATE));
List<XAttr> xattrs = Lists.newArrayListWithCapacity(1);
xattrs.add(keyIdXAttr);
unprotectedSetXAttrs(src, xattrs, EnumSet.of(XAttrSetFlag.CREATE));
return keyIdXAttr;
} finally {
writeUnlock();
}
}
XAttr deleteEncryptionZone(String src)
List<XAttr> deleteEncryptionZone(String src)
throws IOException {
writeLock();
try {
@ -2699,71 +2685,107 @@ public class FSDirectory implements Closeable {
}
final XAttr keyIdXAttr =
XAttrHelper.buildXAttr(CRYPTO_XATTR_KEY_ID, null);
final XAttr removedXAttr = unprotectedRemoveXAttr(src, keyIdXAttr);
if (removedXAttr == null) {
List<XAttr> xattrs = Lists.newArrayListWithCapacity(1);
xattrs.add(keyIdXAttr);
final List<XAttr> removedXAttrs = unprotectedRemoveXAttrs(src, xattrs);
if (removedXAttrs == null || removedXAttrs.isEmpty()) {
throw new IOException(
src + " does not appear to be the root of an encryption zone");
}
return removedXAttr;
return removedXAttrs;
} finally {
writeUnlock();
}
}
void setXAttr(String src, XAttr xAttr, EnumSet<XAttrSetFlag> flag)
throws IOException {
void setXAttrs(final String src, final List<XAttr> xAttrs,
final EnumSet<XAttrSetFlag> flag) throws IOException {
writeLock();
try {
unprotectedSetXAttr(src, xAttr, flag);
unprotectedSetXAttrs(src, xAttrs, flag);
} finally {
writeUnlock();
}
}
void unprotectedSetXAttr(String src, XAttr xAttr,
EnumSet<XAttrSetFlag> flag) throws IOException {
void unprotectedSetXAttrs(final String src, final List<XAttr> xAttrs,
final EnumSet<XAttrSetFlag> flag)
throws QuotaExceededException, IOException {
assert hasWriteLock();
INodesInPath iip = getINodesInPath4Write(normalizePath(src), true);
INode inode = resolveLastINode(src, iip);
int snapshotId = iip.getLatestSnapshotId();
List<XAttr> existingXAttrs = XAttrStorage.readINodeXAttrs(inode);
List<XAttr> newXAttrs = setINodeXAttr(existingXAttrs, xAttr, flag);
List<XAttr> newXAttrs = setINodeXAttrs(existingXAttrs, xAttrs, flag);
XAttrStorage.updateINodeXAttrs(inode, newXAttrs, snapshotId);
}
List<XAttr> setINodeXAttr(List<XAttr> existingXAttrs, XAttr xAttr,
EnumSet<XAttrSetFlag> flag) throws QuotaExceededException, IOException {
List<XAttr> xAttrs = Lists.newArrayListWithCapacity(
existingXAttrs != null ? existingXAttrs.size() + 1 : 1);
List<XAttr> setINodeXAttrs(final List<XAttr> existingXAttrs,
final List<XAttr> toSet, final EnumSet<XAttrSetFlag> flag)
throws IOException {
// Check for duplicate XAttrs in toSet
// We need to use a custom comparator, so using a HashSet is not suitable
for (int i = 0; i < toSet.size(); i++) {
for (int j = i + 1; j < toSet.size(); j++) {
if (toSet.get(i).equalsIgnoreValue(toSet.get(j))) {
throw new IOException("Cannot specify the same XAttr to be set " +
"more than once");
}
}
}
// Count the current number of user-visible XAttrs for limit checking
int userVisibleXAttrsNum = 0; // Number of user visible xAttrs
boolean exist = false;
// The XAttr list is copied to an exactly-sized array when it's stored,
// so there's no need to size it precisely here.
int newSize = (existingXAttrs != null) ? existingXAttrs.size() : 0;
newSize += toSet.size();
List<XAttr> xAttrs = Lists.newArrayListWithCapacity(newSize);
// Check if the XAttr already exists to validate with the provided flag
for (XAttr xAttr: toSet) {
boolean exist = false;
if (existingXAttrs != null) {
for (XAttr a : existingXAttrs) {
if (a.equalsIgnoreValue(xAttr)) {
exist = true;
break;
}
}
}
XAttrSetFlag.validate(xAttr.getName(), exist, flag);
// add the new XAttr since it passed validation
xAttrs.add(xAttr);
if (isUserVisible(xAttr)) {
userVisibleXAttrsNum++;
}
}
// Add the existing xattrs back in, if they weren't already set
if (existingXAttrs != null) {
for (XAttr a: existingXAttrs) {
if ((a.getNameSpace() == xAttr.getNameSpace()
&& a.getName().equals(xAttr.getName()))) {
exist = true;
} else {
xAttrs.add(a);
if (isUserVisible(a)) {
for (XAttr existing : existingXAttrs) {
boolean alreadySet = false;
for (XAttr set : toSet) {
if (set.equalsIgnoreValue(existing)) {
alreadySet = true;
break;
}
}
if (!alreadySet) {
xAttrs.add(existing);
if (isUserVisible(existing)) {
userVisibleXAttrsNum++;
}
}
}
}
XAttrSetFlag.validate(xAttr.getName(), exist, flag);
xAttrs.add(xAttr);
if (isUserVisible(xAttr)) {
userVisibleXAttrsNum++;
}
if (userVisibleXAttrsNum > inodeXAttrsLimit) {
throw new IOException("Cannot add additional XAttr to inode, "
+ "would exceed limit of " + inodeXAttrsLimit);
}
return xAttrs;
}

View File

@ -700,12 +700,19 @@ public class FSEditLog implements LogsPurgeable {
.setBlocks(newNode.getBlocks())
.setPermissionStatus(permissions)
.setClientName(newNode.getFileUnderConstructionFeature().getClientName())
.setClientMachine(newNode.getFileUnderConstructionFeature().getClientMachine());
.setClientMachine(
newNode.getFileUnderConstructionFeature().getClientMachine());
AclFeature f = newNode.getAclFeature();
if (f != null) {
op.setAclEntries(AclStorage.readINodeLogicalAcl(newNode));
}
XAttrFeature x = newNode.getXAttrFeature();
if (x != null) {
op.setXAttrs(x.getXAttrs());
}
logRpcIds(op, toLogRpcIds);
logEdit(op);
}
@ -761,6 +768,11 @@ public class FSEditLog implements LogsPurgeable {
if (f != null) {
op.setAclEntries(AclStorage.readINodeLogicalAcl(newNode));
}
XAttrFeature x = newNode.getXAttrFeature();
if (x != null) {
op.setXAttrs(x.getXAttrs());
}
logEdit(op);
}
@ -1054,18 +1066,18 @@ public class FSEditLog implements LogsPurgeable {
logEdit(op);
}
void logSetXAttr(String src, XAttr xAttr, boolean toLogRpcIds) {
void logSetXAttrs(String src, List<XAttr> xAttrs, boolean toLogRpcIds) {
final SetXAttrOp op = SetXAttrOp.getInstance();
op.src = src;
op.xAttr = xAttr;
op.xAttrs = xAttrs;
logRpcIds(op, toLogRpcIds);
logEdit(op);
}
void logRemoveXAttr(String src, XAttr xAttr) {
void logRemoveXAttrs(String src, List<XAttr> xAttrs) {
final RemoveXAttrOp op = RemoveXAttrOp.getInstance();
op.src = src;
op.xAttr = xAttr;
op.xAttrs = xAttrs;
logEdit(op);
}

View File

@ -355,6 +355,7 @@ public class FSEditLogLoader {
lastInodeId);
newFile = fsDir.unprotectedAddFile(inodeId,
path, addCloseOp.permissions, addCloseOp.aclEntries,
addCloseOp.xAttrs,
replication, addCloseOp.mtime, addCloseOp.atime,
addCloseOp.blockSize, true, addCloseOp.clientName,
addCloseOp.clientMachine);
@ -804,7 +805,7 @@ public class FSEditLogLoader {
}
case OP_SET_XATTR: {
SetXAttrOp setXAttrOp = (SetXAttrOp) op;
fsDir.unprotectedSetXAttr(setXAttrOp.src, setXAttrOp.xAttr,
fsDir.unprotectedSetXAttrs(setXAttrOp.src, setXAttrOp.xAttrs,
EnumSet.of(XAttrSetFlag.CREATE, XAttrSetFlag.REPLACE));
if (toAddRetryCache) {
fsNamesys.addCacheEntry(setXAttrOp.rpcClientId, setXAttrOp.rpcCallId);
@ -813,7 +814,8 @@ public class FSEditLogLoader {
}
case OP_REMOVE_XATTR: {
RemoveXAttrOp removeXAttrOp = (RemoveXAttrOp) op;
fsDir.unprotectedRemoveXAttr(removeXAttrOp.src, removeXAttrOp.xAttr);
fsDir.unprotectedRemoveXAttrs(removeXAttrOp.src,
removeXAttrOp.xAttrs);
break;
}
default:

View File

@ -382,6 +382,16 @@ public abstract class FSEditLogOp {
}
}
private static List<XAttr> readXAttrsFromEditLog(DataInputStream in,
int logVersion) throws IOException {
if (!NameNodeLayoutVersion.supports(NameNodeLayoutVersion.Feature.XATTRS,
logVersion)) {
return null;
}
XAttrEditLogProto proto = XAttrEditLogProto.parseDelimitedFrom(in);
return PBHelper.convertXAttrs(proto.getXAttrsList());
}
@SuppressWarnings("unchecked")
static abstract class AddCloseOp extends FSEditLogOp implements BlockListUpdatingOp {
int length;
@ -394,6 +404,7 @@ public abstract class FSEditLogOp {
Block[] blocks;
PermissionStatus permissions;
List<AclEntry> aclEntries;
List<XAttr> xAttrs;
String clientName;
String clientMachine;
@ -461,6 +472,11 @@ public abstract class FSEditLogOp {
return (T)this;
}
<T extends AddCloseOp> T setXAttrs(List<XAttr> xAttrs) {
this.xAttrs = xAttrs;
return (T)this;
}
<T extends AddCloseOp> T setClientName(String clientName) {
this.clientName = clientName;
return (T)this;
@ -484,6 +500,9 @@ public abstract class FSEditLogOp {
if (this.opCode == OP_ADD) {
AclEditLogUtil.write(aclEntries, out);
XAttrEditLogProto.Builder b = XAttrEditLogProto.newBuilder();
b.addAllXAttrs(PBHelper.convertXAttrProto(xAttrs));
b.build().writeDelimitedTo(out);
FSImageSerialization.writeString(clientName,out);
FSImageSerialization.writeString(clientMachine,out);
// write clientId and callId
@ -546,9 +565,9 @@ public abstract class FSEditLogOp {
this.blocks = readBlocks(in, logVersion);
this.permissions = PermissionStatus.read(in);
// clientname, clientMachine and block locations of last block.
if (this.opCode == OP_ADD) {
aclEntries = AclEditLogUtil.read(in, logVersion);
this.xAttrs = readXAttrsFromEditLog(in, logVersion);
this.clientName = FSImageSerialization.readString(in);
this.clientMachine = FSImageSerialization.readString(in);
// read clientId and callId
@ -1343,6 +1362,7 @@ public abstract class FSEditLogOp {
long timestamp;
PermissionStatus permissions;
List<AclEntry> aclEntries;
List<XAttr> xAttrs;
private MkdirOp() {
super(OP_MKDIR);
@ -1377,6 +1397,11 @@ public abstract class FSEditLogOp {
return this;
}
MkdirOp setXAttrs(List<XAttr> xAttrs) {
this.xAttrs = xAttrs;
return this;
}
@Override
public
void writeFields(DataOutputStream out) throws IOException {
@ -1386,6 +1411,9 @@ public abstract class FSEditLogOp {
FSImageSerialization.writeLong(timestamp, out); // atime, unused at this
permissions.write(out);
AclEditLogUtil.write(aclEntries, out);
XAttrEditLogProto.Builder b = XAttrEditLogProto.newBuilder();
b.addAllXAttrs(PBHelper.convertXAttrProto(xAttrs));
b.build().writeDelimitedTo(out);
}
@Override
@ -1430,6 +1458,8 @@ public abstract class FSEditLogOp {
this.permissions = PermissionStatus.read(in);
aclEntries = AclEditLogUtil.read(in, logVersion);
xAttrs = readXAttrsFromEditLog(in, logVersion);
}
@Override
@ -1451,6 +1481,8 @@ public abstract class FSEditLogOp {
builder.append(opCode);
builder.append(", txid=");
builder.append(txid);
builder.append(", xAttrs=");
builder.append(xAttrs);
builder.append("]");
return builder.toString();
}
@ -1468,6 +1500,9 @@ public abstract class FSEditLogOp {
if (aclEntries != null) {
appendAclEntriesToXml(contentHandler, aclEntries);
}
if (xAttrs != null) {
appendXAttrsToXml(contentHandler, xAttrs);
}
}
@Override void fromXml(Stanza st) throws InvalidXmlException {
@ -1477,6 +1512,7 @@ public abstract class FSEditLogOp {
this.timestamp = Long.parseLong(st.getValue("TIMESTAMP"));
this.permissions = permissionStatusFromXml(st);
aclEntries = readAclEntriesFromXml(st);
xAttrs = readXAttrsFromXml(st);
}
}
@ -3499,7 +3535,7 @@ public abstract class FSEditLogOp {
}
static class RemoveXAttrOp extends FSEditLogOp {
XAttr xAttr;
List<XAttr> xAttrs;
String src;
private RemoveXAttrOp() {
@ -3514,7 +3550,7 @@ public abstract class FSEditLogOp {
void readFields(DataInputStream in, int logVersion) throws IOException {
XAttrEditLogProto p = XAttrEditLogProto.parseDelimitedFrom(in);
src = p.getSrc();
xAttr = PBHelper.convertXAttr(p.getXAttr());
xAttrs = PBHelper.convertXAttrs(p.getXAttrsList());
}
@Override
@ -3523,25 +3559,25 @@ public abstract class FSEditLogOp {
if (src != null) {
b.setSrc(src);
}
b.setXAttr(PBHelper.convertXAttrProto(xAttr));
b.addAllXAttrs(PBHelper.convertXAttrProto(xAttrs));
b.build().writeDelimitedTo(out);
}
@Override
protected void toXml(ContentHandler contentHandler) throws SAXException {
XMLUtils.addSaxString(contentHandler, "SRC", src);
appendXAttrToXml(contentHandler, xAttr);
appendXAttrsToXml(contentHandler, xAttrs);
}
@Override
void fromXml(Stanza st) throws InvalidXmlException {
src = st.getValue("SRC");
xAttr = readXAttrFromXml(st);
xAttrs = readXAttrsFromXml(st);
}
}
static class SetXAttrOp extends FSEditLogOp {
XAttr xAttr;
List<XAttr> xAttrs;
String src;
private SetXAttrOp() {
@ -3556,7 +3592,7 @@ public abstract class FSEditLogOp {
void readFields(DataInputStream in, int logVersion) throws IOException {
XAttrEditLogProto p = XAttrEditLogProto.parseDelimitedFrom(in);
src = p.getSrc();
xAttr = PBHelper.convertXAttr(p.getXAttr());
xAttrs = PBHelper.convertXAttrs(p.getXAttrsList());
readRpcIds(in, logVersion);
}
@ -3566,7 +3602,7 @@ public abstract class FSEditLogOp {
if (src != null) {
b.setSrc(src);
}
b.setXAttr(PBHelper.convertXAttrProto(xAttr));
b.addAllXAttrs(PBHelper.convertXAttrProto(xAttrs));
b.build().writeDelimitedTo(out);
// clientId and callId
writeRpcIds(rpcClientId, rpcCallId, out);
@ -3575,14 +3611,14 @@ public abstract class FSEditLogOp {
@Override
protected void toXml(ContentHandler contentHandler) throws SAXException {
XMLUtils.addSaxString(contentHandler, "SRC", src);
appendXAttrToXml(contentHandler, xAttr);
appendXAttrsToXml(contentHandler, xAttrs);
appendRpcIdsToXml(contentHandler, rpcClientId, rpcCallId);
}
@Override
void fromXml(Stanza st) throws InvalidXmlException {
src = st.getValue("SRC");
xAttr = readXAttrFromXml(st);
xAttrs = readXAttrsFromXml(st);
readRpcIdsFromXml(st);
}
}
@ -4202,42 +4238,48 @@ public abstract class FSEditLogOp {
}
return aclEntries;
}
private static void appendXAttrToXml(ContentHandler contentHandler,
XAttr xAttr) throws SAXException {
contentHandler.startElement("", "", "XATTR", new AttributesImpl());
XMLUtils.addSaxString(contentHandler, "NAMESPACE",
xAttr.getNameSpace().toString());
XMLUtils.addSaxString(contentHandler, "NAME", xAttr.getName());
if (xAttr.getValue() != null) {
try {
XMLUtils.addSaxString(contentHandler, "VALUE",
XAttrCodec.encodeValue(xAttr.getValue(), XAttrCodec.HEX));
} catch (IOException e) {
throw new SAXException(e);
private static void appendXAttrsToXml(ContentHandler contentHandler,
List<XAttr> xAttrs) throws SAXException {
for (XAttr xAttr: xAttrs) {
contentHandler.startElement("", "", "XATTR", new AttributesImpl());
XMLUtils.addSaxString(contentHandler, "NAMESPACE",
xAttr.getNameSpace().toString());
XMLUtils.addSaxString(contentHandler, "NAME", xAttr.getName());
if (xAttr.getValue() != null) {
try {
XMLUtils.addSaxString(contentHandler, "VALUE",
XAttrCodec.encodeValue(xAttr.getValue(), XAttrCodec.HEX));
} catch (IOException e) {
throw new SAXException(e);
}
}
contentHandler.endElement("", "", "XATTR");
}
contentHandler.endElement("", "", "XATTR");
}
private static XAttr readXAttrFromXml(Stanza st)
private static List<XAttr> readXAttrsFromXml(Stanza st)
throws InvalidXmlException {
if (!st.hasChildren("XATTR")) {
return null;
}
Stanza a = st.getChildren("XATTR").get(0);
XAttr.Builder builder = new XAttr.Builder();
builder.setNameSpace(XAttr.NameSpace.valueOf(a.getValue("NAMESPACE"))).
setName(a.getValue("NAME"));
String v = a.getValueOrNull("VALUE");
if (v != null) {
try {
builder.setValue(XAttrCodec.decodeValue(v));
} catch (IOException e) {
throw new InvalidXmlException(e.toString());
List<Stanza> stanzas = st.getChildren("XATTR");
List<XAttr> xattrs = Lists.newArrayListWithCapacity(stanzas.size());
for (Stanza a: stanzas) {
XAttr.Builder builder = new XAttr.Builder();
builder.setNameSpace(XAttr.NameSpace.valueOf(a.getValue("NAMESPACE"))).
setName(a.getValue("NAME"));
String v = a.getValueOrNull("VALUE");
if (v != null) {
try {
builder.setValue(XAttrCodec.decodeValue(v));
} catch (IOException e) {
throw new InvalidXmlException(e.toString());
}
}
xattrs.add(builder.build());
}
return builder.build();
return xattrs;
}
}

View File

@ -533,8 +533,10 @@ public final class FSImageFormatPBINode {
INodeSection.INodeFile.Builder b = buildINodeFile(n,
parent.getSaverContext());
for (Block block : n.getBlocks()) {
b.addBlocks(PBHelper.convert(block));
if (n.getBlocks() != null) {
for (Block block : n.getBlocks()) {
b.addBlocks(PBHelper.convert(block));
}
}
FileUnderConstructionFeature uc = n.getFileUnderConstructionFeature();

View File

@ -108,6 +108,7 @@ import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@ -530,6 +531,59 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
private final Map<String, EncryptionZone> encryptionZones;
private volatile boolean imageLoaded = false;
private final Condition cond;
/**
* Notify that loading of this FSDirectory is complete, and
* it is imageLoaded for use
*/
void imageLoadComplete() {
Preconditions.checkState(!imageLoaded, "FSDirectory already loaded");
setImageLoaded();
}
void setImageLoaded() {
if(imageLoaded) return;
writeLock();
try {
setImageLoaded(true);
dir.markNameCacheInitialized();
cond.signalAll();
} finally {
writeUnlock();
}
}
//This is for testing purposes only
@VisibleForTesting
boolean isImageLoaded() {
return imageLoaded;
}
// exposed for unit tests
protected void setImageLoaded(boolean flag) {
imageLoaded = flag;
}
/**
* Block until the object is imageLoaded to be used.
*/
void waitForLoadingFSImage() {
if (!imageLoaded) {
writeLock();
try {
while (!imageLoaded) {
try {
cond.await(5000, TimeUnit.MILLISECONDS);
} catch (InterruptedException ignored) {
}
}
} finally {
writeUnlock();
}
}
}
/**
* Set the last allocated inode id when fsimage or editlog is loaded.
*/
@ -571,6 +625,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
inodeId.setCurrentValue(INodeId.LAST_RESERVED_ID);
snapshotManager.clearSnapshottableDirs();
cacheManager.clear();
setImageLoaded(false);
}
@VisibleForTesting
@ -700,6 +755,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
boolean fair = conf.getBoolean("dfs.namenode.fslock.fair", true);
LOG.info("fsLock is fair:" + fair);
fsLock = new FSNamesystemLock(fair);
cond = fsLock.writeLock().newCondition();
try {
resourceRecheckInterval = conf.getLong(
DFS_NAMENODE_RESOURCE_CHECK_INTERVAL_KEY,
@ -976,7 +1032,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
}
writeUnlock();
}
dir.imageLoadComplete();
imageLoadComplete();
}
private void startSecretManager() {
@ -1895,6 +1951,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
HdfsFileStatus resultingStat = null;
FSPermissionChecker pc = getPermissionChecker();
checkOperation(OperationCategory.WRITE);
waitForLoadingFSImage();
writeLock();
try {
checkOperation(OperationCategory.WRITE);
@ -2170,6 +2227,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
FSPermissionChecker pc = getPermissionChecker();
checkOperation(OperationCategory.WRITE);
byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
waitForLoadingFSImage();
writeLock();
try {
checkOperation(OperationCategory.WRITE);
@ -2297,6 +2355,8 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
boolean create = flag.contains(CreateFlag.CREATE);
boolean overwrite = flag.contains(CreateFlag.OVERWRITE);
waitForLoadingFSImage();
writeLock();
try {
checkOperation(OperationCategory.WRITE);
@ -2785,6 +2845,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
Block newBlock = null;
long offset;
checkOperation(OperationCategory.WRITE);
waitForLoadingFSImage();
writeLock();
try {
checkOperation(OperationCategory.WRITE);
@ -3007,6 +3068,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
}
checkOperation(OperationCategory.WRITE);
byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
waitForLoadingFSImage();
writeLock();
try {
checkOperation(OperationCategory.WRITE);
@ -3105,6 +3167,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
boolean success = false;
checkOperation(OperationCategory.WRITE);
byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
waitForLoadingFSImage();
writeLock();
try {
checkOperation(OperationCategory.WRITE);
@ -3304,6 +3367,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
try {
checkOperation(OperationCategory.WRITE);
checkNameNodeSafeMode("Cannot rename " + src);
waitForLoadingFSImage();
src = FSDirectory.resolvePath(src, srcComponents, dir);
dst = FSDirectory.resolvePath(dst, dstComponents, dir);
checkOperation(OperationCategory.WRITE);
@ -3411,6 +3475,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
false);
}
waitForLoadingFSImage();
long mtime = now();
dir.renameTo(src, dst, mtime, options);
getEditLog().logRename(src, dst, mtime, logRetryCache, options);
@ -3484,6 +3549,8 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
checkOperation(OperationCategory.WRITE);
byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
boolean ret = false;
waitForLoadingFSImage();
writeLock();
try {
checkOperation(OperationCategory.WRITE);
@ -3957,6 +4024,8 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
NameNode.stateChangeLog.info("BLOCK* fsync: " + src + " for " + clientName);
checkOperation(OperationCategory.WRITE);
byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
waitForLoadingFSImage();
writeLock();
try {
checkOperation(OperationCategory.WRITE);
@ -4158,6 +4227,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
INodeFile pendingFile, int latestSnapshot) throws IOException,
UnresolvedLinkException {
assert hasWriteLock();
FileUnderConstructionFeature uc = pendingFile.getFileUnderConstructionFeature();
Preconditions.checkArgument(uc != null);
leaseManager.removeLease(uc.getClientName(), src);
@ -4169,6 +4239,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
// since we just remove the uc feature from pendingFile
final INodeFile newFile = pendingFile.toCompleteFile(now());
waitForLoadingFSImage();
// close file and persist block allocations for this file
closeFile(src, newFile);
@ -4227,6 +4298,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
+ ")");
checkOperation(OperationCategory.WRITE);
String src = "";
waitForLoadingFSImage();
writeLock();
try {
checkOperation(OperationCategory.WRITE);
@ -4572,7 +4644,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
*/
private void closeFile(String path, INodeFile file) {
assert hasWriteLock();
dir.waitForReady();
waitForLoadingFSImage();
// file is closed
getEditLog().logCloseFile(path, file);
if (NameNode.stateChangeLog.isDebugEnabled()) {
@ -4596,7 +4668,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
boolean createParent, boolean logRetryCache)
throws UnresolvedLinkException, FileAlreadyExistsException,
QuotaExceededException, SnapshotAccessControlException, AclException {
dir.waitForReady();
waitForLoadingFSImage();
final long modTime = now();
if (createParent) {
@ -5859,7 +5931,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
boolean ignoreEmptyDir, boolean resolveLink)
throws AccessControlException, UnresolvedLinkException {
if (!pc.isSuperUser()) {
dir.waitForReady();
waitForLoadingFSImage();
readLock();
try {
pc.checkPermission(path, dir, doCheckOwner, ancestorAccess,
@ -6326,6 +6398,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
+ ", newNodes=" + Arrays.asList(newNodes)
+ ", clientName=" + clientName
+ ")");
waitForLoadingFSImage();
writeLock();
boolean success = false;
try {
@ -8206,7 +8279,9 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
}
final XAttr keyIdXAttr = dir.createEncryptionZone(src, keyId);
getEditLog().logSetXAttr(src, keyIdXAttr, logRetryCache);
List<XAttr> xAttrs = Lists.newArrayListWithCapacity(1);
xAttrs.add(keyIdXAttr);
getEditLog().logSetXAttrs(src, xAttrs, logRetryCache);
encryptionZones.put(src, new EncryptionZone(src, keyId));
resultingStat = getAuditFileInfo(src, false);
} finally {
@ -8283,9 +8358,9 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
throw new IOException("Directory " + src +
" is not the root of an encryption zone.");
}
final XAttr removedXAttr = dir.deleteEncryptionZone(src);
if (removedXAttr != null) {
getEditLog().logRemoveXAttr(src, removedXAttr);
final List<XAttr> removedXAttrs = dir.deleteEncryptionZone(src);
if (removedXAttrs != null && !removedXAttrs.isEmpty()) {
getEditLog().logRemoveXAttrs(src, removedXAttrs);
}
encryptionZones.remove(src);
resultingStat = getAuditFileInfo(src, false);
@ -8379,8 +8454,10 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
checkOwner(pc, src);
checkPathAccess(pc, src, FsAction.WRITE);
}
dir.setXAttr(src, xAttr, flag);
getEditLog().logSetXAttr(src, xAttr, logRetryCache);
List<XAttr> xAttrs = Lists.newArrayListWithCapacity(1);
xAttrs.add(xAttr);
dir.setXAttrs(src, xAttrs, flag);
getEditLog().logSetXAttrs(src, xAttrs, logRetryCache);
resultingStat = getAuditFileInfo(src, false);
} finally {
writeUnlock();
@ -8500,10 +8577,12 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
checkOwner(pc, src);
checkPathAccess(pc, src, FsAction.WRITE);
}
XAttr removedXAttr = dir.removeXAttr(src, xAttr);
if (removedXAttr != null) {
getEditLog().logRemoveXAttr(src, removedXAttr);
List<XAttr> xAttrs = Lists.newArrayListWithCapacity(1);
xAttrs.add(xAttr);
List<XAttr> removedXAttrs = dir.removeXAttrs(src, xAttrs);
if (removedXAttrs != null && !removedXAttrs.isEmpty()) {
getEditLog().logRemoveXAttrs(src, removedXAttrs);
}
resultingStat = getAuditFileInfo(src, false);
} catch (AccessControlException e) {

View File

@ -1064,7 +1064,7 @@ public class SecondaryNameNode implements Runnable,
} finally {
dstNamesystem.writeUnlock();
}
dstNamesystem.dir.imageLoadComplete();
dstNamesystem.imageLoadComplete();
}
// error simulation code for junit test
CheckpointFaultInjector.getInstance().duringMerge();

View File

@ -159,7 +159,7 @@ public class FileWithSnapshotFeature implements INode.Feature {
// resize the array.
final BlockInfo[] newBlocks;
if (n == 0) {
newBlocks = null;
newBlocks = BlockInfo.EMPTY_ARRAY;
} else {
newBlocks = new BlockInfo[n];
System.arraycopy(oldBlocks, 0, newBlocks, 0, n);

View File

@ -35,8 +35,8 @@ message XAttrProto {
}
message XAttrEditLogProto {
required string src = 1;
optional XAttrProto xAttr = 2;
optional string src = 1;
repeated XAttrProto xAttrs = 2;
}
enum XAttrSetFlagProto {

View File

@ -32,7 +32,6 @@ import java.io.IOException;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyBoolean;
import static org.mockito.Mockito.*;
/**
@ -50,6 +49,7 @@ public class TestCommitBlockSynchronization {
final DatanodeStorageInfo[] targets = {};
FSNamesystem namesystem = new FSNamesystem(conf, image);
namesystem.setImageLoaded(true);
FSNamesystem namesystemSpy = spy(namesystem);
BlockInfoUnderConstruction blockInfo = new BlockInfoUnderConstruction(
block, 1, HdfsServerConstants.BlockUCState.UNDER_CONSTRUCTION, targets);

View File

@ -24,7 +24,9 @@ import java.io.IOException;
import java.io.StringReader;
import java.util.EnumSet;
import java.util.List;
import java.util.Random;
import com.google.common.collect.ImmutableList;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@ -36,7 +38,6 @@ import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot;
import org.apache.hadoop.test.GenericTestUtils;
import org.junit.After;
import org.junit.Assert;
@ -45,6 +46,11 @@ import org.junit.Test;
import com.google.common.collect.Lists;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
/**
* Test {@link FSDirectory}, the in-memory namespace tree.
*/
@ -74,6 +80,10 @@ public class TestFSDirectory {
private DistributedFileSystem hdfs;
private static final int numGeneratedXAttrs = 256;
private static final ImmutableList<XAttr> generatedXAttrs =
ImmutableList.copyOf(generateXAttrs(numGeneratedXAttrs));
@Before
public void setUp() throws Exception {
conf = new Configuration();
@ -119,24 +129,15 @@ public class TestFSDirectory {
for(; (line = in.readLine()) != null; ) {
line = line.trim();
if (!line.isEmpty() && !line.contains("snapshot")) {
Assert.assertTrue("line=" + line,
assertTrue("line=" + line,
line.startsWith(INodeDirectory.DUMPTREE_LAST_ITEM)
|| line.startsWith(INodeDirectory.DUMPTREE_EXCEPT_LAST_ITEM));
|| line.startsWith(INodeDirectory.DUMPTREE_EXCEPT_LAST_ITEM)
);
checkClassName(line);
}
}
}
@Test
public void testReset() throws Exception {
fsdir.reset();
Assert.assertFalse(fsdir.isReady());
final INodeDirectory root = (INodeDirectory) fsdir.getINode("/");
Assert.assertTrue(root.getChildrenList(Snapshot.CURRENT_STATE_ID).isEmpty());
fsdir.imageLoadComplete();
Assert.assertTrue(fsdir.isReady());
}
@Test
public void testSkipQuotaCheck() throws Exception {
try {
@ -176,7 +177,7 @@ public class TestFSDirectory {
int i = line.lastIndexOf('(');
int j = line.lastIndexOf('@');
final String classname = line.substring(i+1, j);
Assert.assertTrue(classname.startsWith(INodeFile.class.getSimpleName())
assertTrue(classname.startsWith(INodeFile.class.getSimpleName())
|| classname.startsWith(INodeDirectory.class.getSimpleName()));
}
@ -193,22 +194,185 @@ public class TestFSDirectory {
// Adding a system namespace xAttr, isn't affected by inode xAttrs limit.
XAttr newXAttr = (new XAttr.Builder()).setNameSpace(XAttr.NameSpace.SYSTEM).
setName("a3").setValue(new byte[]{0x33, 0x33, 0x33}).build();
List<XAttr> xAttrs = fsdir.setINodeXAttr(existingXAttrs, newXAttr,
List<XAttr> newXAttrs = Lists.newArrayListWithCapacity(1);
newXAttrs.add(newXAttr);
List<XAttr> xAttrs = fsdir.setINodeXAttrs(existingXAttrs, newXAttrs,
EnumSet.of(XAttrSetFlag.CREATE, XAttrSetFlag.REPLACE));
Assert.assertEquals(xAttrs.size(), 3);
assertEquals(xAttrs.size(), 3);
// Adding a trusted namespace xAttr, is affected by inode xAttrs limit.
XAttr newXAttr1 = (new XAttr.Builder()).setNameSpace(
XAttr.NameSpace.TRUSTED).setName("a4").
setValue(new byte[]{0x34, 0x34, 0x34}).build();
newXAttrs.set(0, newXAttr1);
try {
fsdir.setINodeXAttr(existingXAttrs, newXAttr1,
fsdir.setINodeXAttrs(existingXAttrs, newXAttrs,
EnumSet.of(XAttrSetFlag.CREATE, XAttrSetFlag.REPLACE));
Assert.fail("Setting user visable xattr on inode should fail if " +
fail("Setting user visible xattr on inode should fail if " +
"reaching limit.");
} catch (IOException e) {
GenericTestUtils.assertExceptionContains("Cannot add additional XAttr " +
"to inode, would exceed limit", e);
}
}
/**
* Verify that the first <i>num</i> generatedXAttrs are present in
* newXAttrs.
*/
private static void verifyXAttrsPresent(List<XAttr> newXAttrs,
final int num) {
assertEquals("Unexpected number of XAttrs after multiset", num,
newXAttrs.size());
for (int i=0; i<num; i++) {
XAttr search = generatedXAttrs.get(i);
assertTrue("Did not find set XAttr " + search + " + after multiset",
newXAttrs.contains(search));
}
}
private static List<XAttr> generateXAttrs(final int numXAttrs) {
List<XAttr> generatedXAttrs = Lists.newArrayListWithCapacity(numXAttrs);
for (int i=0; i<numXAttrs; i++) {
XAttr xAttr = (new XAttr.Builder())
.setNameSpace(XAttr.NameSpace.SYSTEM)
.setName("a" + i)
.setValue(new byte[] { (byte) i, (byte) (i + 1), (byte) (i + 2) })
.build();
generatedXAttrs.add(xAttr);
}
return generatedXAttrs;
}
/**
* Test setting and removing multiple xattrs via single operations
*/
@Test(timeout=300000)
public void testXAttrMultiSetRemove() throws Exception {
List<XAttr> existingXAttrs = Lists.newArrayListWithCapacity(0);
// Keep adding a random number of xattrs and verifying until exhausted
final Random rand = new Random(0xFEEDA);
int numExpectedXAttrs = 0;
while (numExpectedXAttrs < numGeneratedXAttrs) {
LOG.info("Currently have " + numExpectedXAttrs + " xattrs");
final int numToAdd = rand.nextInt(5)+1;
List<XAttr> toAdd = Lists.newArrayListWithCapacity(numToAdd);
for (int i = 0; i < numToAdd; i++) {
if (numExpectedXAttrs >= numGeneratedXAttrs) {
break;
}
toAdd.add(generatedXAttrs.get(numExpectedXAttrs));
numExpectedXAttrs++;
}
LOG.info("Attempting to add " + toAdd.size() + " XAttrs");
for (int i = 0; i < toAdd.size(); i++) {
LOG.info("Will add XAttr " + toAdd.get(i));
}
List<XAttr> newXAttrs = fsdir.setINodeXAttrs(existingXAttrs, toAdd,
EnumSet.of(XAttrSetFlag.CREATE));
verifyXAttrsPresent(newXAttrs, numExpectedXAttrs);
existingXAttrs = newXAttrs;
}
// Keep removing a random number of xattrs and verifying until all gone
while (numExpectedXAttrs > 0) {
LOG.info("Currently have " + numExpectedXAttrs + " xattrs");
final int numToRemove = rand.nextInt(5)+1;
List<XAttr> toRemove = Lists.newArrayListWithCapacity(numToRemove);
for (int i = 0; i < numToRemove; i++) {
if (numExpectedXAttrs == 0) {
break;
}
toRemove.add(generatedXAttrs.get(numExpectedXAttrs-1));
numExpectedXAttrs--;
}
final int expectedNumToRemove = toRemove.size();
LOG.info("Attempting to remove " + expectedNumToRemove + " XAttrs");
List<XAttr> removedXAttrs = Lists.newArrayList();
List<XAttr> newXAttrs = fsdir.filterINodeXAttrs(existingXAttrs,
toRemove, removedXAttrs);
assertEquals("Unexpected number of removed XAttrs",
expectedNumToRemove, removedXAttrs.size());
verifyXAttrsPresent(newXAttrs, numExpectedXAttrs);
existingXAttrs = newXAttrs;
}
}
@Test(timeout=300000)
public void testXAttrMultiAddRemoveErrors() throws Exception {
// Test that the same XAttr can not be multiset twice
List<XAttr> existingXAttrs = Lists.newArrayList();
List<XAttr> toAdd = Lists.newArrayList();
toAdd.add(generatedXAttrs.get(0));
toAdd.add(generatedXAttrs.get(1));
toAdd.add(generatedXAttrs.get(2));
toAdd.add(generatedXAttrs.get(0));
try {
fsdir.setINodeXAttrs(existingXAttrs, toAdd, EnumSet.of(XAttrSetFlag
.CREATE));
fail("Specified the same xattr to be set twice");
} catch (IOException e) {
GenericTestUtils.assertExceptionContains("Cannot specify the same " +
"XAttr to be set", e);
}
// Test that CREATE and REPLACE flags are obeyed
toAdd.remove(generatedXAttrs.get(0));
existingXAttrs.add(generatedXAttrs.get(0));
try {
fsdir.setINodeXAttrs(existingXAttrs, toAdd, EnumSet.of(XAttrSetFlag
.CREATE));
fail("Set XAttr that is already set without REPLACE flag");
} catch (IOException e) {
GenericTestUtils.assertExceptionContains("already exists", e);
}
try {
fsdir.setINodeXAttrs(existingXAttrs, toAdd, EnumSet.of(XAttrSetFlag
.REPLACE));
fail("Set XAttr that does not exist without the CREATE flag");
} catch (IOException e) {
GenericTestUtils.assertExceptionContains("does not exist", e);
}
// Sanity test for CREATE
toAdd.remove(generatedXAttrs.get(0));
List<XAttr> newXAttrs = fsdir.setINodeXAttrs(existingXAttrs, toAdd,
EnumSet.of(XAttrSetFlag.CREATE));
assertEquals("Unexpected toAdd size", 2, toAdd.size());
for (XAttr x : toAdd) {
assertTrue("Did not find added XAttr " + x, newXAttrs.contains(x));
}
existingXAttrs = newXAttrs;
// Sanity test for REPLACE
toAdd = Lists.newArrayList();
for (int i=0; i<3; i++) {
XAttr xAttr = (new XAttr.Builder())
.setNameSpace(XAttr.NameSpace.SYSTEM)
.setName("a" + i)
.setValue(new byte[] { (byte) (i*2) })
.build();
toAdd.add(xAttr);
}
newXAttrs = fsdir.setINodeXAttrs(existingXAttrs, toAdd,
EnumSet.of(XAttrSetFlag.REPLACE));
assertEquals("Unexpected number of new XAttrs", 3, newXAttrs.size());
for (int i=0; i<3; i++) {
assertArrayEquals("Unexpected XAttr value",
new byte[] {(byte)(i*2)}, newXAttrs.get(i).getValue());
}
existingXAttrs = newXAttrs;
// Sanity test for CREATE+REPLACE
toAdd = Lists.newArrayList();
for (int i=0; i<4; i++) {
toAdd.add(generatedXAttrs.get(i));
}
newXAttrs = fsdir.setINodeXAttrs(existingXAttrs, toAdd,
EnumSet.of(XAttrSetFlag.CREATE, XAttrSetFlag.REPLACE));
verifyXAttrsPresent(newXAttrs, 4);
}
}

View File

@ -35,6 +35,7 @@ import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole;
import org.apache.hadoop.hdfs.server.namenode.ha.HAContext;
import org.apache.hadoop.hdfs.server.namenode.ha.HAState;
import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot;
import org.junit.After;
import org.junit.Test;
import org.mockito.Mockito;
@ -194,4 +195,22 @@ public class TestFSNamesystem {
assertFalse(rwLock.isWriteLockedByCurrentThread());
assertEquals(0, rwLock.getWriteHoldCount());
}
@Test
public void testReset() throws Exception {
Configuration conf = new Configuration();
FSEditLog fsEditLog = Mockito.mock(FSEditLog.class);
FSImage fsImage = Mockito.mock(FSImage.class);
Mockito.when(fsImage.getEditLog()).thenReturn(fsEditLog);
FSNamesystem fsn = new FSNamesystem(conf, fsImage);
fsn.imageLoadComplete();
assertTrue(fsn.isImageLoaded());
fsn.clear();
assertFalse(fsn.isImageLoaded());
final INodeDirectory root = (INodeDirectory) fsn.getFSDirectory()
.getINode("/");
assertTrue(root.getChildrenList(Snapshot.CURRENT_STATE_ID).isEmpty());
fsn.imageLoadComplete();
assertTrue(fsn.isImageLoaded());
}
}

View File

@ -19,12 +19,9 @@
package org.apache.hadoop.hdfs.server.namenode;
import static org.apache.hadoop.hdfs.server.common.Util.fileAsURI;
import static org.apache.hadoop.util.Time.now;
import static org.junit.Assert.assertEquals;
import static org.mockito.Matchers.anyObject;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.io.File;
import java.io.IOException;
@ -57,7 +54,7 @@ public class TestFsLimits {
FSEditLog editLog = mock(FSEditLog.class);
doReturn(editLog).when(fsImage).getEditLog();
FSNamesystem fsn = new FSNamesystem(conf, fsImage);
fsn.getFSDirectory().setReady(fsIsReady);
fsn.setImageLoaded(fsIsReady);
return fsn;
}

View File

@ -28,12 +28,14 @@ import static org.junit.Assert.fail;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
import org.apache.hadoop.hdfs.server.namenode.FSDirectory;
@ -396,4 +398,39 @@ public class TestSnapshotBlocksMap {
assertEquals(1, blks.length);
assertEquals(BLOCKSIZE, blks[0].getNumBytes());
}
/**
* Make sure that a delete of a non-zero-length file which results in a
* zero-length file in a snapshot works.
*/
@Test
public void testDeletionOfLaterBlocksWithZeroSizeFirstBlock() throws Exception {
final Path foo = new Path("/foo");
final Path bar = new Path(foo, "bar");
final byte[] testData = "foo bar baz".getBytes();
// Create a zero-length file.
DFSTestUtil.createFile(hdfs, bar, 0, REPLICATION, 0L);
assertEquals(0, fsdir.getINode4Write(bar.toString()).asFile().getBlocks().length);
// Create a snapshot that includes that file.
SnapshotTestHelper.createSnapshot(hdfs, foo, "s0");
// Extend that file.
FSDataOutputStream out = hdfs.append(bar);
out.write(testData);
out.close();
INodeFile barNode = fsdir.getINode4Write(bar.toString()).asFile();
BlockInfo[] blks = barNode.getBlocks();
assertEquals(1, blks.length);
assertEquals(testData.length, blks[0].getNumBytes());
// Delete the file.
hdfs.delete(bar, true);
// Now make sure that the NN can still save an fsimage successfully.
cluster.getNameNode().getRpcServer().setSafeMode(
SafeModeAction.SAFEMODE_ENTER, false);
cluster.getNameNode().getRpcServer().saveNamespace();
}
}

View File

@ -249,10 +249,10 @@ public class TestXAttrWithSnapshot {
private static void doSnapshotRootRemovalAssertions(Path path,
Path snapshotPath) throws Exception {
Map<String, byte[]> xattrs = hdfs.getXAttrs(path);
Assert.assertEquals(xattrs.size(), 0);
Assert.assertEquals(0, xattrs.size());
xattrs = hdfs.getXAttrs(snapshotPath);
Assert.assertEquals(xattrs.size(), 2);
Assert.assertEquals(2, xattrs.size());
Assert.assertArrayEquals(value1, xattrs.get(name1));
Assert.assertArrayEquals(value2, xattrs.get(name2));
}

View File

@ -13,8 +13,8 @@
<TXID>2</TXID>
<DELEGATION_KEY>
<KEY_ID>1</KEY_ID>
<EXPIRY_DATE>1394849922137</EXPIRY_DATE>
<KEY>37e1a64049bbef35</KEY>
<EXPIRY_DATE>1403590428625</EXPIRY_DATE>
<KEY>16f34bfba67b2552</KEY>
</DELEGATION_KEY>
</DATA>
</RECORD>
@ -24,8 +24,8 @@
<TXID>3</TXID>
<DELEGATION_KEY>
<KEY_ID>2</KEY_ID>
<EXPIRY_DATE>1394849922140</EXPIRY_DATE>
<KEY>7c0bf5039242fc54</KEY>
<EXPIRY_DATE>1403590428631</EXPIRY_DATE>
<KEY>dbe6282854469833</KEY>
</DELEGATION_KEY>
</DATA>
</RECORD>
@ -37,18 +37,18 @@
<INODEID>16386</INODEID>
<PATH>/file_create</PATH>
<REPLICATION>1</REPLICATION>
<MTIME>1394158722811</MTIME>
<ATIME>1394158722811</ATIME>
<MTIME>1402899229669</MTIME>
<ATIME>1402899229669</ATIME>
<BLOCKSIZE>512</BLOCKSIZE>
<CLIENT_NAME>DFSClient_NONMAPREDUCE_221786725_1</CLIENT_NAME>
<CLIENT_NAME>DFSClient_NONMAPREDUCE_1233039831_1</CLIENT_NAME>
<CLIENT_MACHINE>127.0.0.1</CLIENT_MACHINE>
<PERMISSION_STATUS>
<USERNAME>jing</USERNAME>
<USERNAME>andrew</USERNAME>
<GROUPNAME>supergroup</GROUPNAME>
<MODE>420</MODE>
</PERMISSION_STATUS>
<RPC_CLIENTID>9b85a845-bbfa-42f6-8a16-c433614b8eb9</RPC_CLIENTID>
<RPC_CALLID>6</RPC_CALLID>
<RPC_CLIENTID>e03f4a52-3d85-4e05-8942-286185e639bd</RPC_CLIENTID>
<RPC_CALLID>8</RPC_CALLID>
</DATA>
</RECORD>
<RECORD>
@ -59,13 +59,13 @@
<INODEID>0</INODEID>
<PATH>/file_create</PATH>
<REPLICATION>1</REPLICATION>
<MTIME>1394158722832</MTIME>
<ATIME>1394158722811</ATIME>
<MTIME>1402899229711</MTIME>
<ATIME>1402899229669</ATIME>
<BLOCKSIZE>512</BLOCKSIZE>
<CLIENT_NAME></CLIENT_NAME>
<CLIENT_MACHINE></CLIENT_MACHINE>
<PERMISSION_STATUS>
<USERNAME>jing</USERNAME>
<USERNAME>andrew</USERNAME>
<GROUPNAME>supergroup</GROUPNAME>
<MODE>420</MODE>
</PERMISSION_STATUS>
@ -78,9 +78,9 @@
<LENGTH>0</LENGTH>
<SRC>/file_create</SRC>
<DST>/file_moved</DST>
<TIMESTAMP>1394158722836</TIMESTAMP>
<RPC_CLIENTID>9b85a845-bbfa-42f6-8a16-c433614b8eb9</RPC_CLIENTID>
<RPC_CALLID>8</RPC_CALLID>
<TIMESTAMP>1402899229718</TIMESTAMP>
<RPC_CLIENTID>e03f4a52-3d85-4e05-8942-286185e639bd</RPC_CLIENTID>
<RPC_CALLID>10</RPC_CALLID>
</DATA>
</RECORD>
<RECORD>
@ -89,9 +89,9 @@
<TXID>7</TXID>
<LENGTH>0</LENGTH>
<PATH>/file_moved</PATH>
<TIMESTAMP>1394158722842</TIMESTAMP>
<RPC_CLIENTID>9b85a845-bbfa-42f6-8a16-c433614b8eb9</RPC_CLIENTID>
<RPC_CALLID>9</RPC_CALLID>
<TIMESTAMP>1402899229730</TIMESTAMP>
<RPC_CLIENTID>e03f4a52-3d85-4e05-8942-286185e639bd</RPC_CLIENTID>
<RPC_CALLID>11</RPC_CALLID>
</DATA>
</RECORD>
<RECORD>
@ -101,9 +101,9 @@
<LENGTH>0</LENGTH>
<INODEID>16387</INODEID>
<PATH>/directory_mkdir</PATH>
<TIMESTAMP>1394158722848</TIMESTAMP>
<TIMESTAMP>1402899229748</TIMESTAMP>
<PERMISSION_STATUS>
<USERNAME>jing</USERNAME>
<USERNAME>andrew</USERNAME>
<GROUPNAME>supergroup</GROUPNAME>
<MODE>493</MODE>
</PERMISSION_STATUS>
@ -136,8 +136,8 @@
<TXID>12</TXID>
<SNAPSHOTROOT>/directory_mkdir</SNAPSHOTROOT>
<SNAPSHOTNAME>snapshot1</SNAPSHOTNAME>
<RPC_CLIENTID>9b85a845-bbfa-42f6-8a16-c433614b8eb9</RPC_CLIENTID>
<RPC_CALLID>14</RPC_CALLID>
<RPC_CLIENTID>e03f4a52-3d85-4e05-8942-286185e639bd</RPC_CLIENTID>
<RPC_CALLID>16</RPC_CALLID>
</DATA>
</RECORD>
<RECORD>
@ -147,8 +147,8 @@
<SNAPSHOTROOT>/directory_mkdir</SNAPSHOTROOT>
<SNAPSHOTOLDNAME>snapshot1</SNAPSHOTOLDNAME>
<SNAPSHOTNEWNAME>snapshot2</SNAPSHOTNEWNAME>
<RPC_CLIENTID>9b85a845-bbfa-42f6-8a16-c433614b8eb9</RPC_CLIENTID>
<RPC_CALLID>15</RPC_CALLID>
<RPC_CLIENTID>e03f4a52-3d85-4e05-8942-286185e639bd</RPC_CLIENTID>
<RPC_CALLID>17</RPC_CALLID>
</DATA>
</RECORD>
<RECORD>
@ -157,8 +157,8 @@
<TXID>14</TXID>
<SNAPSHOTROOT>/directory_mkdir</SNAPSHOTROOT>
<SNAPSHOTNAME>snapshot2</SNAPSHOTNAME>
<RPC_CLIENTID>9b85a845-bbfa-42f6-8a16-c433614b8eb9</RPC_CLIENTID>
<RPC_CALLID>16</RPC_CALLID>
<RPC_CLIENTID>e03f4a52-3d85-4e05-8942-286185e639bd</RPC_CLIENTID>
<RPC_CALLID>18</RPC_CALLID>
</DATA>
</RECORD>
<RECORD>
@ -169,18 +169,18 @@
<INODEID>16388</INODEID>
<PATH>/file_create</PATH>
<REPLICATION>1</REPLICATION>
<MTIME>1394158722872</MTIME>
<ATIME>1394158722872</ATIME>
<MTIME>1402899229871</MTIME>
<ATIME>1402899229871</ATIME>
<BLOCKSIZE>512</BLOCKSIZE>
<CLIENT_NAME>DFSClient_NONMAPREDUCE_221786725_1</CLIENT_NAME>
<CLIENT_NAME>DFSClient_NONMAPREDUCE_1233039831_1</CLIENT_NAME>
<CLIENT_MACHINE>127.0.0.1</CLIENT_MACHINE>
<PERMISSION_STATUS>
<USERNAME>jing</USERNAME>
<USERNAME>andrew</USERNAME>
<GROUPNAME>supergroup</GROUPNAME>
<MODE>420</MODE>
</PERMISSION_STATUS>
<RPC_CLIENTID>9b85a845-bbfa-42f6-8a16-c433614b8eb9</RPC_CLIENTID>
<RPC_CALLID>17</RPC_CALLID>
<RPC_CLIENTID>e03f4a52-3d85-4e05-8942-286185e639bd</RPC_CLIENTID>
<RPC_CALLID>19</RPC_CALLID>
</DATA>
</RECORD>
<RECORD>
@ -191,13 +191,13 @@
<INODEID>0</INODEID>
<PATH>/file_create</PATH>
<REPLICATION>1</REPLICATION>
<MTIME>1394158722874</MTIME>
<ATIME>1394158722872</ATIME>
<MTIME>1402899229881</MTIME>
<ATIME>1402899229871</ATIME>
<BLOCKSIZE>512</BLOCKSIZE>
<CLIENT_NAME></CLIENT_NAME>
<CLIENT_MACHINE></CLIENT_MACHINE>
<PERMISSION_STATUS>
<USERNAME>jing</USERNAME>
<USERNAME>andrew</USERNAME>
<GROUPNAME>supergroup</GROUPNAME>
<MODE>420</MODE>
</PERMISSION_STATUS>
@ -253,10 +253,10 @@
<LENGTH>0</LENGTH>
<SRC>/file_create</SRC>
<DST>/file_moved</DST>
<TIMESTAMP>1394158722890</TIMESTAMP>
<TIMESTAMP>1402899229963</TIMESTAMP>
<OPTIONS>NONE</OPTIONS>
<RPC_CLIENTID>9b85a845-bbfa-42f6-8a16-c433614b8eb9</RPC_CLIENTID>
<RPC_CALLID>24</RPC_CALLID>
<RPC_CLIENTID>e03f4a52-3d85-4e05-8942-286185e639bd</RPC_CLIENTID>
<RPC_CALLID>26</RPC_CALLID>
</DATA>
</RECORD>
<RECORD>
@ -267,18 +267,18 @@
<INODEID>16389</INODEID>
<PATH>/file_concat_target</PATH>
<REPLICATION>1</REPLICATION>
<MTIME>1394158722895</MTIME>
<ATIME>1394158722895</ATIME>
<MTIME>1402899229981</MTIME>
<ATIME>1402899229981</ATIME>
<BLOCKSIZE>512</BLOCKSIZE>
<CLIENT_NAME>DFSClient_NONMAPREDUCE_221786725_1</CLIENT_NAME>
<CLIENT_NAME>DFSClient_NONMAPREDUCE_1233039831_1</CLIENT_NAME>
<CLIENT_MACHINE>127.0.0.1</CLIENT_MACHINE>
<PERMISSION_STATUS>
<USERNAME>jing</USERNAME>
<USERNAME>andrew</USERNAME>
<GROUPNAME>supergroup</GROUPNAME>
<MODE>420</MODE>
</PERMISSION_STATUS>
<RPC_CLIENTID>9b85a845-bbfa-42f6-8a16-c433614b8eb9</RPC_CLIENTID>
<RPC_CALLID>26</RPC_CALLID>
<RPC_CLIENTID>e03f4a52-3d85-4e05-8942-286185e639bd</RPC_CLIENTID>
<RPC_CALLID>28</RPC_CALLID>
</DATA>
</RECORD>
<RECORD>
@ -383,8 +383,8 @@
<INODEID>0</INODEID>
<PATH>/file_concat_target</PATH>
<REPLICATION>1</REPLICATION>
<MTIME>1394158722986</MTIME>
<ATIME>1394158722895</ATIME>
<MTIME>1402899230219</MTIME>
<ATIME>1402899229981</ATIME>
<BLOCKSIZE>512</BLOCKSIZE>
<CLIENT_NAME></CLIENT_NAME>
<CLIENT_MACHINE></CLIENT_MACHINE>
@ -404,7 +404,7 @@
<GENSTAMP>1003</GENSTAMP>
</BLOCK>
<PERMISSION_STATUS>
<USERNAME>jing</USERNAME>
<USERNAME>andrew</USERNAME>
<GROUPNAME>supergroup</GROUPNAME>
<MODE>420</MODE>
</PERMISSION_STATUS>
@ -418,18 +418,18 @@
<INODEID>16390</INODEID>
<PATH>/file_concat_0</PATH>
<REPLICATION>1</REPLICATION>
<MTIME>1394158722989</MTIME>
<ATIME>1394158722989</ATIME>
<MTIME>1402899230235</MTIME>
<ATIME>1402899230235</ATIME>
<BLOCKSIZE>512</BLOCKSIZE>
<CLIENT_NAME>DFSClient_NONMAPREDUCE_221786725_1</CLIENT_NAME>
<CLIENT_NAME>DFSClient_NONMAPREDUCE_1233039831_1</CLIENT_NAME>
<CLIENT_MACHINE>127.0.0.1</CLIENT_MACHINE>
<PERMISSION_STATUS>
<USERNAME>jing</USERNAME>
<USERNAME>andrew</USERNAME>
<GROUPNAME>supergroup</GROUPNAME>
<MODE>420</MODE>
</PERMISSION_STATUS>
<RPC_CLIENTID>9b85a845-bbfa-42f6-8a16-c433614b8eb9</RPC_CLIENTID>
<RPC_CALLID>39</RPC_CALLID>
<RPC_CLIENTID>e03f4a52-3d85-4e05-8942-286185e639bd</RPC_CLIENTID>
<RPC_CALLID>41</RPC_CALLID>
</DATA>
</RECORD>
<RECORD>
@ -534,8 +534,8 @@
<INODEID>0</INODEID>
<PATH>/file_concat_0</PATH>
<REPLICATION>1</REPLICATION>
<MTIME>1394158723010</MTIME>
<ATIME>1394158722989</ATIME>
<MTIME>1402899230307</MTIME>
<ATIME>1402899230235</ATIME>
<BLOCKSIZE>512</BLOCKSIZE>
<CLIENT_NAME></CLIENT_NAME>
<CLIENT_MACHINE></CLIENT_MACHINE>
@ -555,7 +555,7 @@
<GENSTAMP>1006</GENSTAMP>
</BLOCK>
<PERMISSION_STATUS>
<USERNAME>jing</USERNAME>
<USERNAME>andrew</USERNAME>
<GROUPNAME>supergroup</GROUPNAME>
<MODE>420</MODE>
</PERMISSION_STATUS>
@ -569,18 +569,18 @@
<INODEID>16391</INODEID>
<PATH>/file_concat_1</PATH>
<REPLICATION>1</REPLICATION>
<MTIME>1394158723012</MTIME>
<ATIME>1394158723012</ATIME>
<MTIME>1402899230320</MTIME>
<ATIME>1402899230320</ATIME>
<BLOCKSIZE>512</BLOCKSIZE>
<CLIENT_NAME>DFSClient_NONMAPREDUCE_221786725_1</CLIENT_NAME>
<CLIENT_NAME>DFSClient_NONMAPREDUCE_1233039831_1</CLIENT_NAME>
<CLIENT_MACHINE>127.0.0.1</CLIENT_MACHINE>
<PERMISSION_STATUS>
<USERNAME>jing</USERNAME>
<USERNAME>andrew</USERNAME>
<GROUPNAME>supergroup</GROUPNAME>
<MODE>420</MODE>
</PERMISSION_STATUS>
<RPC_CLIENTID>9b85a845-bbfa-42f6-8a16-c433614b8eb9</RPC_CLIENTID>
<RPC_CALLID>51</RPC_CALLID>
<RPC_CLIENTID>e03f4a52-3d85-4e05-8942-286185e639bd</RPC_CLIENTID>
<RPC_CALLID>53</RPC_CALLID>
</DATA>
</RECORD>
<RECORD>
@ -685,8 +685,8 @@
<INODEID>0</INODEID>
<PATH>/file_concat_1</PATH>
<REPLICATION>1</REPLICATION>
<MTIME>1394158723035</MTIME>
<ATIME>1394158723012</ATIME>
<MTIME>1402899230383</MTIME>
<ATIME>1402899230320</ATIME>
<BLOCKSIZE>512</BLOCKSIZE>
<CLIENT_NAME></CLIENT_NAME>
<CLIENT_MACHINE></CLIENT_MACHINE>
@ -706,7 +706,7 @@
<GENSTAMP>1009</GENSTAMP>
</BLOCK>
<PERMISSION_STATUS>
<USERNAME>jing</USERNAME>
<USERNAME>andrew</USERNAME>
<GROUPNAME>supergroup</GROUPNAME>
<MODE>420</MODE>
</PERMISSION_STATUS>
@ -718,13 +718,13 @@
<TXID>56</TXID>
<LENGTH>0</LENGTH>
<TRG>/file_concat_target</TRG>
<TIMESTAMP>1394158723039</TIMESTAMP>
<TIMESTAMP>1402899230394</TIMESTAMP>
<SOURCES>
<SOURCE1>/file_concat_0</SOURCE1>
<SOURCE2>/file_concat_1</SOURCE2>
</SOURCES>
<RPC_CLIENTID>9b85a845-bbfa-42f6-8a16-c433614b8eb9</RPC_CLIENTID>
<RPC_CALLID>62</RPC_CALLID>
<RPC_CLIENTID>e03f4a52-3d85-4e05-8942-286185e639bd</RPC_CLIENTID>
<RPC_CALLID>64</RPC_CALLID>
</DATA>
</RECORD>
<RECORD>
@ -735,15 +735,15 @@
<INODEID>16392</INODEID>
<PATH>/file_symlink</PATH>
<VALUE>/file_concat_target</VALUE>
<MTIME>1394158723044</MTIME>
<ATIME>1394158723044</ATIME>
<MTIME>1402899230406</MTIME>
<ATIME>1402899230406</ATIME>
<PERMISSION_STATUS>
<USERNAME>jing</USERNAME>
<USERNAME>andrew</USERNAME>
<GROUPNAME>supergroup</GROUPNAME>
<MODE>511</MODE>
</PERMISSION_STATUS>
<RPC_CLIENTID>9b85a845-bbfa-42f6-8a16-c433614b8eb9</RPC_CLIENTID>
<RPC_CALLID>63</RPC_CALLID>
<RPC_CLIENTID>e03f4a52-3d85-4e05-8942-286185e639bd</RPC_CLIENTID>
<RPC_CALLID>65</RPC_CALLID>
</DATA>
</RECORD>
<RECORD>
@ -754,18 +754,18 @@
<INODEID>16393</INODEID>
<PATH>/hard-lease-recovery-test</PATH>
<REPLICATION>1</REPLICATION>
<MTIME>1394158723047</MTIME>
<ATIME>1394158723047</ATIME>
<MTIME>1402899230413</MTIME>
<ATIME>1402899230413</ATIME>
<BLOCKSIZE>512</BLOCKSIZE>
<CLIENT_NAME>DFSClient_NONMAPREDUCE_221786725_1</CLIENT_NAME>
<CLIENT_NAME>DFSClient_NONMAPREDUCE_1233039831_1</CLIENT_NAME>
<CLIENT_MACHINE>127.0.0.1</CLIENT_MACHINE>
<PERMISSION_STATUS>
<USERNAME>jing</USERNAME>
<USERNAME>andrew</USERNAME>
<GROUPNAME>supergroup</GROUPNAME>
<MODE>420</MODE>
</PERMISSION_STATUS>
<RPC_CLIENTID>9b85a845-bbfa-42f6-8a16-c433614b8eb9</RPC_CLIENTID>
<RPC_CALLID>64</RPC_CALLID>
<RPC_CLIENTID>e03f4a52-3d85-4e05-8942-286185e639bd</RPC_CLIENTID>
<RPC_CALLID>66</RPC_CALLID>
</DATA>
</RECORD>
<RECORD>
@ -821,7 +821,7 @@
<OPCODE>OP_REASSIGN_LEASE</OPCODE>
<DATA>
<TXID>64</TXID>
<LEASEHOLDER>DFSClient_NONMAPREDUCE_221786725_1</LEASEHOLDER>
<LEASEHOLDER>DFSClient_NONMAPREDUCE_1233039831_1</LEASEHOLDER>
<PATH>/hard-lease-recovery-test</PATH>
<NEWHOLDER>HDFS_NameNode</NEWHOLDER>
</DATA>
@ -834,8 +834,8 @@
<INODEID>0</INODEID>
<PATH>/hard-lease-recovery-test</PATH>
<REPLICATION>1</REPLICATION>
<MTIME>1394158725708</MTIME>
<ATIME>1394158723047</ATIME>
<MTIME>1402899232526</MTIME>
<ATIME>1402899230413</ATIME>
<BLOCKSIZE>512</BLOCKSIZE>
<CLIENT_NAME></CLIENT_NAME>
<CLIENT_MACHINE></CLIENT_MACHINE>
@ -845,7 +845,7 @@
<GENSTAMP>1011</GENSTAMP>
</BLOCK>
<PERMISSION_STATUS>
<USERNAME>jing</USERNAME>
<USERNAME>andrew</USERNAME>
<GROUPNAME>supergroup</GROUPNAME>
<MODE>420</MODE>
</PERMISSION_STATUS>
@ -856,13 +856,13 @@
<DATA>
<TXID>66</TXID>
<POOLNAME>pool1</POOLNAME>
<OWNERNAME>jing</OWNERNAME>
<GROUPNAME>staff</GROUPNAME>
<OWNERNAME>andrew</OWNERNAME>
<GROUPNAME>andrew</GROUPNAME>
<MODE>493</MODE>
<LIMIT>9223372036854775807</LIMIT>
<MAXRELATIVEEXPIRY>2305843009213693951</MAXRELATIVEEXPIRY>
<RPC_CLIENTID>9b85a845-bbfa-42f6-8a16-c433614b8eb9</RPC_CLIENTID>
<RPC_CALLID>71</RPC_CALLID>
<RPC_CLIENTID>e03f4a52-3d85-4e05-8942-286185e639bd</RPC_CLIENTID>
<RPC_CALLID>73</RPC_CALLID>
</DATA>
</RECORD>
<RECORD>
@ -871,8 +871,8 @@
<TXID>67</TXID>
<POOLNAME>pool1</POOLNAME>
<LIMIT>99</LIMIT>
<RPC_CLIENTID>9b85a845-bbfa-42f6-8a16-c433614b8eb9</RPC_CLIENTID>
<RPC_CALLID>72</RPC_CALLID>
<RPC_CLIENTID>e03f4a52-3d85-4e05-8942-286185e639bd</RPC_CLIENTID>
<RPC_CALLID>74</RPC_CALLID>
</DATA>
</RECORD>
<RECORD>
@ -883,9 +883,9 @@
<PATH>/path</PATH>
<REPLICATION>1</REPLICATION>
<POOL>pool1</POOL>
<EXPIRATION>2305844403372420029</EXPIRATION>
<RPC_CLIENTID>9b85a845-bbfa-42f6-8a16-c433614b8eb9</RPC_CLIENTID>
<RPC_CALLID>73</RPC_CALLID>
<EXPIRATION>2305844412112927450</EXPIRATION>
<RPC_CLIENTID>e03f4a52-3d85-4e05-8942-286185e639bd</RPC_CLIENTID>
<RPC_CALLID>75</RPC_CALLID>
</DATA>
</RECORD>
<RECORD>
@ -894,8 +894,8 @@
<TXID>69</TXID>
<ID>1</ID>
<REPLICATION>2</REPLICATION>
<RPC_CLIENTID>9b85a845-bbfa-42f6-8a16-c433614b8eb9</RPC_CLIENTID>
<RPC_CALLID>74</RPC_CALLID>
<RPC_CLIENTID>e03f4a52-3d85-4e05-8942-286185e639bd</RPC_CLIENTID>
<RPC_CALLID>76</RPC_CALLID>
</DATA>
</RECORD>
<RECORD>
@ -903,8 +903,8 @@
<DATA>
<TXID>70</TXID>
<ID>1</ID>
<RPC_CLIENTID>9b85a845-bbfa-42f6-8a16-c433614b8eb9</RPC_CLIENTID>
<RPC_CALLID>75</RPC_CALLID>
<RPC_CLIENTID>e03f4a52-3d85-4e05-8942-286185e639bd</RPC_CLIENTID>
<RPC_CALLID>77</RPC_CALLID>
</DATA>
</RECORD>
<RECORD>
@ -912,8 +912,8 @@
<DATA>
<TXID>71</TXID>
<POOLNAME>pool1</POOLNAME>
<RPC_CLIENTID>9b85a845-bbfa-42f6-8a16-c433614b8eb9</RPC_CLIENTID>
<RPC_CALLID>76</RPC_CALLID>
<RPC_CLIENTID>e03f4a52-3d85-4e05-8942-286185e639bd</RPC_CLIENTID>
<RPC_CALLID>78</RPC_CALLID>
</DATA>
</RECORD>
<RECORD>
@ -921,51 +921,91 @@
<DATA>
<TXID>72</TXID>
<SRC>/file_concat_target</SRC>
</DATA>
</RECORD>
<RECORD>
<OPCODE>OP_ROLLING_UPGRADE_START</OPCODE>
<DATA>
<TXID>73</TXID>
<STARTTIME>1394158726098</STARTTIME>
</DATA>
</RECORD>
<RECORD>
<OPCODE>OP_ROLLING_UPGRADE_FINALIZE</OPCODE>
<DATA>
<TXID>74</TXID>
<FINALIZETIME>1394158726098</FINALIZETIME>
<ENTRY>
<SCOPE>ACCESS</SCOPE>
<TYPE>USER</TYPE>
<PERM>rw-</PERM>
</ENTRY>
<ENTRY>
<SCOPE>ACCESS</SCOPE>
<TYPE>USER</TYPE>
<NAME>user</NAME>
<PERM>rw-</PERM>
</ENTRY>
<ENTRY>
<SCOPE>ACCESS</SCOPE>
<TYPE>GROUP</TYPE>
<PERM>-w-</PERM>
</ENTRY>
<ENTRY>
<SCOPE>ACCESS</SCOPE>
<TYPE>MASK</TYPE>
<PERM>rw-</PERM>
</ENTRY>
<ENTRY>
<SCOPE>ACCESS</SCOPE>
<TYPE>OTHER</TYPE>
<PERM>---</PERM>
</ENTRY>
</DATA>
</RECORD>
<RECORD>
<OPCODE>OP_SET_XATTR</OPCODE>
<DATA>
<TXID>75</TXID>
<TXID>73</TXID>
<SRC>/file_concat_target</SRC>
<XATTR>
<NAMESPACE>USER</NAMESPACE>
<NAME>a1</NAME>
<VALUE>0x313233</VALUE>
</XATTR>
<RPC_CLIENTID>9b85a845-bbfa-42f6-8a16-c433614b8eb9</RPC_CLIENTID>
<RPC_CLIENTID>e03f4a52-3d85-4e05-8942-286185e639bd</RPC_CLIENTID>
<RPC_CALLID>80</RPC_CALLID>
</DATA>
</RECORD>
<RECORD>
<OPCODE>OP_SET_XATTR</OPCODE>
<DATA>
<TXID>74</TXID>
<SRC>/file_concat_target</SRC>
<XATTR>
<NAMESPACE>USER</NAMESPACE>
<NAME>a2</NAME>
<VALUE>0x373839</VALUE>
</XATTR>
<RPC_CLIENTID>e03f4a52-3d85-4e05-8942-286185e639bd</RPC_CLIENTID>
<RPC_CALLID>81</RPC_CALLID>
</DATA>
</RECORD>
<RECORD>
<OPCODE>OP_REMOVE_XATTR</OPCODE>
<DATA>
<TXID>76</TXID>
<TXID>75</TXID>
<SRC>/file_concat_target</SRC>
<XATTR>
<NAMESPACE>USER</NAMESPACE>
<NAME>a1</NAME>
<NAME>a2</NAME>
</XATTR>
</DATA>
</RECORD>
<RECORD>
<OPCODE>OP_ROLLING_UPGRADE_START</OPCODE>
<DATA>
<TXID>76</TXID>
<STARTTIME>1402899233646</STARTTIME>
</DATA>
</RECORD>
<RECORD>
<OPCODE>OP_ROLLING_UPGRADE_FINALIZE</OPCODE>
<DATA>
<TXID>77</TXID>
<FINALIZETIME>1402899233647</FINALIZETIME>
</DATA>
</RECORD>
<RECORD>
<OPCODE>OP_END_LOG_SEGMENT</OPCODE>
<DATA>
<TXID>77</TXID>
<TXID>78</TXID>
</DATA>
</RECORD>
</EDITS>

View File

@ -213,6 +213,12 @@ Release 2.5.0 - UNRELEASED
MAPREDUCE-5834. Increased test-timeouts in TestGridMixClasses to avoid
occassional failures. (Mit Desai via vinodkv)
MAPREDUCE-5896. InputSplits should indicate which locations have the block
cached in memory. (Sandy Ryza via kasha)
MAPREDUCE-5844. Add a configurable delay to reducer-preemption.
(Maysam Yabandeh via kasha)
OPTIMIZATIONS
BUG FIXES

View File

@ -475,8 +475,8 @@
<Match>
<Class name="org.apache.hadoop.mapreduce.v2.app.rm.RMContainerAllocator" />
<Or>
<Field name="mapResourceReqt" />
<Field name="reduceResourceReqt" />
<Field name="mapResourceRequest" />
<Field name="reduceResourceRequest" />
<Field name="maxReduceRampupLimit" />
<Field name="reduceSlowStart" />
</Or>

View File

@ -73,6 +73,7 @@ import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.client.api.NMTokenCache;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.RackResolver;
import com.google.common.annotations.VisibleForTesting;
@ -143,15 +144,21 @@ public class RMContainerAllocator extends RMContainerRequestor
private int lastCompletedTasks = 0;
private boolean recalculateReduceSchedule = false;
private int mapResourceReqt;//memory
private int reduceResourceReqt;//memory
private int mapResourceRequest;//memory
private int reduceResourceRequest;//memory
private boolean reduceStarted = false;
private float maxReduceRampupLimit = 0;
private float maxReducePreemptionLimit = 0;
/**
* after this threshold, if the container request is not allocated, it is
* considered delayed.
*/
private long allocationDelayThresholdMs = 0;
private float reduceSlowStart = 0;
private long retryInterval;
private long retrystartTime;
private Clock clock;
private final AMPreemptionPolicy preemptionPolicy;
@ -166,6 +173,7 @@ public class RMContainerAllocator extends RMContainerRequestor
super(clientService, context);
this.preemptionPolicy = preemptionPolicy;
this.stopped = new AtomicBoolean(false);
this.clock = context.getClock();
}
@Override
@ -180,6 +188,9 @@ public class RMContainerAllocator extends RMContainerRequestor
maxReducePreemptionLimit = conf.getFloat(
MRJobConfig.MR_AM_JOB_REDUCE_PREEMPTION_LIMIT,
MRJobConfig.DEFAULT_MR_AM_JOB_REDUCE_PREEMPTION_LIMIT);
allocationDelayThresholdMs = conf.getInt(
MRJobConfig.MR_JOB_REDUCER_PREEMPT_DELAY_SEC,
MRJobConfig.DEFAULT_MR_JOB_REDUCER_PREEMPT_DELAY_SEC) * 1000;//sec -> ms
RackResolver.init(conf);
retryInterval = getConfig().getLong(MRJobConfig.MR_AM_TO_RM_WAIT_INTERVAL_MS,
MRJobConfig.DEFAULT_MR_AM_TO_RM_WAIT_INTERVAL_MS);
@ -246,7 +257,7 @@ public class RMContainerAllocator extends RMContainerRequestor
getJob().getTotalMaps(), completedMaps,
scheduledRequests.maps.size(), scheduledRequests.reduces.size(),
assignedRequests.maps.size(), assignedRequests.reduces.size(),
mapResourceReqt, reduceResourceReqt,
mapResourceRequest, reduceResourceRequest,
pendingReduces.size(),
maxReduceRampupLimit, reduceSlowStart);
recalculateReduceSchedule = false;
@ -268,6 +279,18 @@ public class RMContainerAllocator extends RMContainerRequestor
scheduleStats.log("Final Stats: ");
}
@Private
@VisibleForTesting
AssignedRequests getAssignedRequests() {
return assignedRequests;
}
@Private
@VisibleForTesting
ScheduledRequests getScheduledRequests() {
return scheduledRequests;
}
public boolean getIsReduceStarted() {
return reduceStarted;
}
@ -303,16 +326,16 @@ public class RMContainerAllocator extends RMContainerRequestor
int supportedMaxContainerCapability =
getMaxContainerCapability().getMemory();
if (reqEvent.getAttemptID().getTaskId().getTaskType().equals(TaskType.MAP)) {
if (mapResourceReqt == 0) {
mapResourceReqt = reqEvent.getCapability().getMemory();
if (mapResourceRequest == 0) {
mapResourceRequest = reqEvent.getCapability().getMemory();
eventHandler.handle(new JobHistoryEvent(jobId,
new NormalizedResourceEvent(org.apache.hadoop.mapreduce.TaskType.MAP,
mapResourceReqt)));
LOG.info("mapResourceReqt:"+mapResourceReqt);
if (mapResourceReqt > supportedMaxContainerCapability) {
mapResourceRequest)));
LOG.info("mapResourceRequest:"+ mapResourceRequest);
if (mapResourceRequest > supportedMaxContainerCapability) {
String diagMsg = "MAP capability required is more than the supported " +
"max container capability in the cluster. Killing the Job. mapResourceReqt: " +
mapResourceReqt + " maxContainerCapability:" + supportedMaxContainerCapability;
"max container capability in the cluster. Killing the Job. mapResourceRequest: " +
mapResourceRequest + " maxContainerCapability:" + supportedMaxContainerCapability;
LOG.info(diagMsg);
eventHandler.handle(new JobDiagnosticsUpdateEvent(
jobId, diagMsg));
@ -320,20 +343,20 @@ public class RMContainerAllocator extends RMContainerRequestor
}
}
//set the rounded off memory
reqEvent.getCapability().setMemory(mapResourceReqt);
reqEvent.getCapability().setMemory(mapResourceRequest);
scheduledRequests.addMap(reqEvent);//maps are immediately scheduled
} else {
if (reduceResourceReqt == 0) {
reduceResourceReqt = reqEvent.getCapability().getMemory();
if (reduceResourceRequest == 0) {
reduceResourceRequest = reqEvent.getCapability().getMemory();
eventHandler.handle(new JobHistoryEvent(jobId,
new NormalizedResourceEvent(
org.apache.hadoop.mapreduce.TaskType.REDUCE,
reduceResourceReqt)));
LOG.info("reduceResourceReqt:"+reduceResourceReqt);
if (reduceResourceReqt > supportedMaxContainerCapability) {
reduceResourceRequest)));
LOG.info("reduceResourceRequest:"+ reduceResourceRequest);
if (reduceResourceRequest > supportedMaxContainerCapability) {
String diagMsg = "REDUCE capability required is more than the " +
"supported max container capability in the cluster. Killing the " +
"Job. reduceResourceReqt: " + reduceResourceReqt +
"Job. reduceResourceRequest: " + reduceResourceRequest +
" maxContainerCapability:" + supportedMaxContainerCapability;
LOG.info(diagMsg);
eventHandler.handle(new JobDiagnosticsUpdateEvent(
@ -342,7 +365,7 @@ public class RMContainerAllocator extends RMContainerRequestor
}
}
//set the rounded off memory
reqEvent.getCapability().setMemory(reduceResourceReqt);
reqEvent.getCapability().setMemory(reduceResourceRequest);
if (reqEvent.getEarlierAttemptFailed()) {
//add to the front of queue for fail fast
pendingReduces.addFirst(new ContainerRequest(reqEvent, PRIORITY_REDUCE));
@ -394,8 +417,22 @@ public class RMContainerAllocator extends RMContainerRequestor
return host;
}
private void preemptReducesIfNeeded() {
if (reduceResourceReqt == 0) {
@Private
@VisibleForTesting
synchronized void setReduceResourceRequest(int mem) {
this.reduceResourceRequest = mem;
}
@Private
@VisibleForTesting
synchronized void setMapResourceRequest(int mem) {
this.mapResourceRequest = mem;
}
@Private
@VisibleForTesting
void preemptReducesIfNeeded() {
if (reduceResourceRequest == 0) {
return; //no reduces
}
//check if reduces have taken over the whole cluster and there are
@ -403,9 +440,9 @@ public class RMContainerAllocator extends RMContainerRequestor
if (scheduledRequests.maps.size() > 0) {
int memLimit = getMemLimit();
int availableMemForMap = memLimit - ((assignedRequests.reduces.size() -
assignedRequests.preemptionWaitingReduces.size()) * reduceResourceReqt);
assignedRequests.preemptionWaitingReduces.size()) * reduceResourceRequest);
//availableMemForMap must be sufficient to run atleast 1 map
if (availableMemForMap < mapResourceReqt) {
if (availableMemForMap < mapResourceRequest) {
//to make sure new containers are given to maps and not reduces
//ramp down all scheduled reduces if any
//(since reduces are scheduled at higher priority than maps)
@ -414,22 +451,40 @@ public class RMContainerAllocator extends RMContainerRequestor
pendingReduces.add(req);
}
scheduledRequests.reduces.clear();
//preempt for making space for at least one map
int premeptionLimit = Math.max(mapResourceReqt,
(int) (maxReducePreemptionLimit * memLimit));
int preemptMem = Math.min(scheduledRequests.maps.size() * mapResourceReqt,
premeptionLimit);
int toPreempt = (int) Math.ceil((float) preemptMem/reduceResourceReqt);
toPreempt = Math.min(toPreempt, assignedRequests.reduces.size());
LOG.info("Going to preempt " + toPreempt + " due to lack of space for maps");
assignedRequests.preemptReduce(toPreempt);
//do further checking to find the number of map requests that were
//hanging around for a while
int hangingMapRequests = getNumOfHangingRequests(scheduledRequests.maps);
if (hangingMapRequests > 0) {
//preempt for making space for at least one map
int premeptionLimit = Math.max(mapResourceRequest,
(int) (maxReducePreemptionLimit * memLimit));
int preemptMem = Math.min(hangingMapRequests * mapResourceRequest,
premeptionLimit);
int toPreempt = (int) Math.ceil((float) preemptMem / reduceResourceRequest);
toPreempt = Math.min(toPreempt, assignedRequests.reduces.size());
LOG.info("Going to preempt " + toPreempt + " due to lack of space for maps");
assignedRequests.preemptReduce(toPreempt);
}
}
}
}
private int getNumOfHangingRequests(Map<TaskAttemptId, ContainerRequest> requestMap) {
if (allocationDelayThresholdMs <= 0)
return requestMap.size();
int hangingRequests = 0;
long currTime = clock.getTime();
for (ContainerRequest request: requestMap.values()) {
long delay = currTime - request.requestTimeMs;
if (delay > allocationDelayThresholdMs)
hangingRequests++;
}
return hangingRequests;
}
@Private
public void scheduleReduces(
@ -715,11 +770,13 @@ public class RMContainerAllocator extends RMContainerRequestor
@Private
public int getMemLimit() {
int headRoom = getAvailableResources() != null ? getAvailableResources().getMemory() : 0;
return headRoom + assignedRequests.maps.size() * mapResourceReqt +
assignedRequests.reduces.size() * reduceResourceReqt;
return headRoom + assignedRequests.maps.size() * mapResourceRequest +
assignedRequests.reduces.size() * reduceResourceRequest;
}
private class ScheduledRequests {
@Private
@VisibleForTesting
class ScheduledRequests {
private final LinkedList<TaskAttemptId> earlierFailedMaps =
new LinkedList<TaskAttemptId>();
@ -729,7 +786,8 @@ public class RMContainerAllocator extends RMContainerRequestor
new HashMap<String, LinkedList<TaskAttemptId>>();
private final Map<String, LinkedList<TaskAttemptId>> mapsRackMapping =
new HashMap<String, LinkedList<TaskAttemptId>>();
private final Map<TaskAttemptId, ContainerRequest> maps =
@VisibleForTesting
final Map<TaskAttemptId, ContainerRequest> maps =
new LinkedHashMap<TaskAttemptId, ContainerRequest>();
private final LinkedHashMap<TaskAttemptId, ContainerRequest> reduces =
@ -825,22 +883,22 @@ public class RMContainerAllocator extends RMContainerRequestor
int allocatedMemory = allocated.getResource().getMemory();
if (PRIORITY_FAST_FAIL_MAP.equals(priority)
|| PRIORITY_MAP.equals(priority)) {
if (allocatedMemory < mapResourceReqt
if (allocatedMemory < mapResourceRequest
|| maps.isEmpty()) {
LOG.info("Cannot assign container " + allocated
+ " for a map as either "
+ " container memory less than required " + mapResourceReqt
+ " container memory less than required " + mapResourceRequest
+ " or no pending map tasks - maps.isEmpty="
+ maps.isEmpty());
isAssignable = false;
}
}
else if (PRIORITY_REDUCE.equals(priority)) {
if (allocatedMemory < reduceResourceReqt
if (allocatedMemory < reduceResourceRequest
|| reduces.isEmpty()) {
LOG.info("Cannot assign container " + allocated
+ " for a reduce as either "
+ " container memory less than required " + reduceResourceReqt
+ " container memory less than required " + reduceResourceRequest
+ " or no pending reduce tasks - reduces.isEmpty="
+ reduces.isEmpty());
isAssignable = false;
@ -1119,14 +1177,18 @@ public class RMContainerAllocator extends RMContainerRequestor
}
}
private class AssignedRequests {
@Private
@VisibleForTesting
class AssignedRequests {
private final Map<ContainerId, TaskAttemptId> containerToAttemptMap =
new HashMap<ContainerId, TaskAttemptId>();
private final LinkedHashMap<TaskAttemptId, Container> maps =
new LinkedHashMap<TaskAttemptId, Container>();
private final LinkedHashMap<TaskAttemptId, Container> reduces =
@VisibleForTesting
final LinkedHashMap<TaskAttemptId, Container> reduces =
new LinkedHashMap<TaskAttemptId, Container>();
private final Set<TaskAttemptId> preemptionWaitingReduces =
@VisibleForTesting
final Set<TaskAttemptId> preemptionWaitingReduces =
new HashSet<TaskAttemptId>();
void add(Container container, TaskAttemptId tId) {

View File

@ -29,8 +29,10 @@ import java.util.TreeSet;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
@ -96,6 +98,8 @@ public abstract class RMContainerRequestor extends RMCommunicator {
super(clientService, context);
}
@Private
@VisibleForTesting
static class ContainerRequest {
final TaskAttemptId attemptID;
final Resource capability;
@ -103,20 +107,39 @@ public abstract class RMContainerRequestor extends RMCommunicator {
final String[] racks;
//final boolean earlierAttemptFailed;
final Priority priority;
/**
* the time when this request object was formed; can be used to avoid
* aggressive preemption for recently placed requests
*/
final long requestTimeMs;
public ContainerRequest(ContainerRequestEvent event, Priority priority) {
this(event.getAttemptID(), event.getCapability(), event.getHosts(),
event.getRacks(), priority);
}
public ContainerRequest(ContainerRequestEvent event, Priority priority,
long requestTimeMs) {
this(event.getAttemptID(), event.getCapability(), event.getHosts(),
event.getRacks(), priority, requestTimeMs);
}
public ContainerRequest(TaskAttemptId attemptID,
Resource capability, String[] hosts, String[] racks,
Priority priority) {
Resource capability, String[] hosts, String[] racks,
Priority priority) {
this(attemptID, capability, hosts, racks, priority,
System.currentTimeMillis());
}
public ContainerRequest(TaskAttemptId attemptID,
Resource capability, String[] hosts, String[] racks,
Priority priority, long requestTimeMs) {
this.attemptID = attemptID;
this.capability = capability;
this.hosts = hosts;
this.racks = racks;
this.priority = priority;
this.requestTimeMs = requestTimeMs;
}
public String toString() {

View File

@ -16,7 +16,7 @@
* limitations under the License.
*/
package org.apache.hadoop.mapreduce.v2.app;
package org.apache.hadoop.mapreduce.v2.app.rm;
import static org.mockito.Matchers.anyFloat;
import static org.mockito.Matchers.anyInt;
@ -40,6 +40,10 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.mapreduce.v2.app.AppContext;
import org.apache.hadoop.mapreduce.v2.app.ClusterInfo;
import org.apache.hadoop.mapreduce.v2.app.ControlledClock;
import org.apache.hadoop.mapreduce.v2.app.MRApp;
import org.junit.Assert;
import org.apache.commons.logging.Log;
@ -65,10 +69,6 @@ import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptKillEvent;
import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl;
import org.apache.hadoop.mapreduce.v2.app.job.impl.TaskAttemptImpl;
import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocator;
import org.apache.hadoop.mapreduce.v2.app.rm.ContainerFailedEvent;
import org.apache.hadoop.mapreduce.v2.app.rm.ContainerRequestEvent;
import org.apache.hadoop.mapreduce.v2.app.rm.RMContainerAllocator;
import org.apache.hadoop.mapreduce.v2.app.rm.preemption.NoopAMPreemptionPolicy;
import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
@ -80,6 +80,7 @@ import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerState;
@ -422,6 +423,115 @@ public class TestRMContainerAllocator {
killEventMessage.contains(RMContainerAllocator.RAMPDOWN_DIAGNOSTIC));
}
@Test(timeout = 30000)
public void testPreemptReducers() throws Exception {
LOG.info("Running testPreemptReducers");
Configuration conf = new Configuration();
MyResourceManager rm = new MyResourceManager(conf);
rm.start();
DrainDispatcher dispatcher = (DrainDispatcher) rm.getRMContext()
.getDispatcher();
// Submit the application
RMApp app = rm.submitApp(1024);
dispatcher.await();
MockNM amNodeManager = rm.registerNode("amNM:1234", 2048);
amNodeManager.nodeHeartbeat(true);
dispatcher.await();
ApplicationAttemptId appAttemptId = app.getCurrentAppAttempt()
.getAppAttemptId();
rm.sendAMLaunched(appAttemptId);
dispatcher.await();
JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0);
Job mockJob = mock(Job.class);
when(mockJob.getReport()).thenReturn(
MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0,
0, 0, 0, 0, 0, 0, "jobfile", null, false, ""));
MyContainerAllocator allocator = new MyContainerAllocator(rm, conf,
appAttemptId, mockJob, new SystemClock());
allocator.setMapResourceRequest(1024);
allocator.setReduceResourceRequest(1024);
RMContainerAllocator.AssignedRequests assignedRequests =
allocator.getAssignedRequests();
RMContainerAllocator.ScheduledRequests scheduledRequests =
allocator.getScheduledRequests();
ContainerRequestEvent event1 =
createReq(jobId, 1, 2048, new String[] { "h1" }, false, false);
scheduledRequests.maps.put(mock(TaskAttemptId.class),
new RMContainerRequestor.ContainerRequest(event1, null));
assignedRequests.reduces.put(mock(TaskAttemptId.class),
mock(Container.class));
allocator.preemptReducesIfNeeded();
Assert.assertEquals("The reducer is not preempted",
1, assignedRequests.preemptionWaitingReduces.size());
}
@Test(timeout = 30000)
public void testNonAggressivelyPreemptReducers() throws Exception {
LOG.info("Running testPreemptReducers");
final int preemptThreshold = 2; //sec
Configuration conf = new Configuration();
conf.setInt(
MRJobConfig.MR_JOB_REDUCER_PREEMPT_DELAY_SEC,
preemptThreshold);
MyResourceManager rm = new MyResourceManager(conf);
rm.start();
DrainDispatcher dispatcher = (DrainDispatcher) rm.getRMContext()
.getDispatcher();
// Submit the application
RMApp app = rm.submitApp(1024);
dispatcher.await();
MockNM amNodeManager = rm.registerNode("amNM:1234", 2048);
amNodeManager.nodeHeartbeat(true);
dispatcher.await();
ApplicationAttemptId appAttemptId = app.getCurrentAppAttempt()
.getAppAttemptId();
rm.sendAMLaunched(appAttemptId);
dispatcher.await();
JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0);
Job mockJob = mock(Job.class);
when(mockJob.getReport()).thenReturn(
MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0,
0, 0, 0, 0, 0, 0, "jobfile", null, false, ""));
ControlledClock clock = new ControlledClock(null);
clock.setTime(1);
MyContainerAllocator allocator = new MyContainerAllocator(rm, conf,
appAttemptId, mockJob, clock);
allocator.setMapResourceRequest(1024);
allocator.setReduceResourceRequest(1024);
RMContainerAllocator.AssignedRequests assignedRequests =
allocator.getAssignedRequests();
RMContainerAllocator.ScheduledRequests scheduledRequests =
allocator.getScheduledRequests();
ContainerRequestEvent event1 =
createReq(jobId, 1, 2048, new String[] { "h1" }, false, false);
scheduledRequests.maps.put(mock(TaskAttemptId.class),
new RMContainerRequestor.ContainerRequest(event1, null, clock.getTime()));
assignedRequests.reduces.put(mock(TaskAttemptId.class),
mock(Container.class));
clock.setTime(clock.getTime() + 1);
allocator.preemptReducesIfNeeded();
Assert.assertEquals("The reducer is aggressively preeempted", 0,
assignedRequests.preemptionWaitingReduces.size());
clock.setTime(clock.getTime() + (preemptThreshold) * 1000);
allocator.preemptReducesIfNeeded();
Assert.assertEquals("The reducer is not preeempted", 1,
assignedRequests.preemptionWaitingReduces.size());
}
@Test
public void testMapReduceScheduling() throws Exception {

View File

@ -295,6 +295,15 @@ public abstract class FileInputFormat<K, V> implements InputFormat<K, V> {
String[] hosts) {
return new FileSplit(file, start, length, hosts);
}
/**
* A factory that makes the split for this class. It can be overridden
* by sub-classes to make sub-types
*/
protected FileSplit makeSplit(Path file, long start, long length,
String[] hosts, String[] inMemoryHosts) {
return new FileSplit(file, start, length, hosts, inMemoryHosts);
}
/** Splits files returned by {@link #listStatus(JobConf)} when
* they're too big.*/
@ -337,22 +346,22 @@ public abstract class FileInputFormat<K, V> implements InputFormat<K, V> {
long bytesRemaining = length;
while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
String[] splitHosts = getSplitHosts(blkLocations,
String[][] splitHosts = getSplitHostsAndCachedHosts(blkLocations,
length-bytesRemaining, splitSize, clusterMap);
splits.add(makeSplit(path, length-bytesRemaining, splitSize,
splitHosts));
splitHosts[0], splitHosts[1]));
bytesRemaining -= splitSize;
}
if (bytesRemaining != 0) {
String[] splitHosts = getSplitHosts(blkLocations, length
String[][] splitHosts = getSplitHostsAndCachedHosts(blkLocations, length
- bytesRemaining, bytesRemaining, clusterMap);
splits.add(makeSplit(path, length - bytesRemaining, bytesRemaining,
splitHosts));
splitHosts[0], splitHosts[1]));
}
} else {
String[] splitHosts = getSplitHosts(blkLocations,0,length,clusterMap);
splits.add(makeSplit(path, 0, length, splitHosts));
String[][] splitHosts = getSplitHostsAndCachedHosts(blkLocations,0,length,clusterMap);
splits.add(makeSplit(path, 0, length, splitHosts[0], splitHosts[1]));
}
} else {
//Create empty hosts array for zero length files
@ -538,10 +547,30 @@ public abstract class FileInputFormat<K, V> implements InputFormat<K, V> {
* @param blkLocations The list of block locations
* @param offset
* @param splitSize
* @return array of hosts that contribute most to this split
* @return an array of hosts that contribute most to this split
* @throws IOException
*/
protected String[] getSplitHosts(BlockLocation[] blkLocations,
long offset, long splitSize, NetworkTopology clusterMap) throws IOException {
return getSplitHostsAndCachedHosts(blkLocations, offset, splitSize,
clusterMap)[0];
}
/**
* This function identifies and returns the hosts that contribute
* most for a given split. For calculating the contribution, rack
* locality is treated on par with host locality, so hosts from racks
* that contribute the most are preferred over hosts on racks that
* contribute less
* @param blkLocations The list of block locations
* @param offset
* @param splitSize
* @return two arrays - one of hosts that contribute most to this split, and
* one of hosts that contribute most to this split that have the data
* cached on them
* @throws IOException
*/
private String[][] getSplitHostsAndCachedHosts(BlockLocation[] blkLocations,
long offset, long splitSize, NetworkTopology clusterMap)
throws IOException {
@ -552,7 +581,8 @@ public abstract class FileInputFormat<K, V> implements InputFormat<K, V> {
//If this is the only block, just return
if (bytesInThisBlock >= splitSize) {
return blkLocations[startIndex].getHosts();
return new String[][] { blkLocations[startIndex].getHosts(),
blkLocations[startIndex].getCachedHosts() };
}
long bytesInFirstBlock = bytesInThisBlock;
@ -639,7 +669,9 @@ public abstract class FileInputFormat<K, V> implements InputFormat<K, V> {
} // for all indices
return identifyHosts(allTopos.length, racksMap);
// We don't yet support cached hosts when bytesInThisBlock > splitSize
return new String[][] { identifyHosts(allTopos.length, racksMap),
new String[0]};
}
private String[] identifyHosts(int replicationFactor,

View File

@ -24,6 +24,7 @@ import java.io.DataOutput;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.classification.InterfaceStability.Evolving;
import org.apache.hadoop.fs.Path;
/** A section of an input file. Returned by {@link
@ -33,7 +34,7 @@ import org.apache.hadoop.fs.Path;
@InterfaceAudience.Public
@InterfaceStability.Stable
public class FileSplit extends org.apache.hadoop.mapreduce.InputSplit
implements InputSplit {
implements InputSplitWithLocationInfo {
org.apache.hadoop.mapreduce.lib.input.FileSplit fs;
protected FileSplit() {
fs = new org.apache.hadoop.mapreduce.lib.input.FileSplit();
@ -62,6 +63,20 @@ public class FileSplit extends org.apache.hadoop.mapreduce.InputSplit
length, hosts);
}
/** Constructs a split with host information
*
* @param file the file name
* @param start the position of the first byte in the file to process
* @param length the number of bytes in the file to process
* @param hosts the list of hosts containing the block, possibly null
* @param inMemoryHosts the list of hosts containing the block in memory
*/
public FileSplit(Path file, long start, long length, String[] hosts,
String[] inMemoryHosts) {
fs = new org.apache.hadoop.mapreduce.lib.input.FileSplit(file, start,
length, hosts, inMemoryHosts);
}
public FileSplit(org.apache.hadoop.mapreduce.lib.input.FileSplit fs) {
this.fs = fs;
}
@ -92,4 +107,9 @@ public class FileSplit extends org.apache.hadoop.mapreduce.InputSplit
return fs.getLocations();
}
@Override
@Evolving
public SplitLocationInfo[] getLocationInfo() throws IOException {
return fs.getLocationInfo();
}
}

View File

@ -0,0 +1,39 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapred;
import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Evolving;
@Public
@Evolving
public interface InputSplitWithLocationInfo extends InputSplit {
/**
* Gets info about which nodes the input split is stored on and how it is
* stored at each location.
*
* @return list of <code>SplitLocationInfo</code>s describing how the split
* data is stored at each location. A null value indicates that all the
* locations have the data stored on disk.
* @throws IOException
*/
SplitLocationInfo[] getLocationInfo() throws IOException;
}

View File

@ -0,0 +1,46 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapred;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Evolving;
@Public
@Evolving
public class SplitLocationInfo {
private boolean inMemory;
private String location;
public SplitLocationInfo(String location, boolean inMemory) {
this.location = location;
this.inMemory = inMemory;
}
public boolean isOnDisk() {
return true;
}
public boolean isInMemory() {
return inMemory;
}
public String getLocation() {
return location;
}
}

View File

@ -22,6 +22,8 @@ import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.classification.InterfaceStability.Evolving;
import org.apache.hadoop.mapred.SplitLocationInfo;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.RecordReader;
@ -51,10 +53,25 @@ public abstract class InputSplit {
/**
* Get the list of nodes by name where the data for the split would be local.
* The locations do not need to be serialized.
*
* @return a new array of the node nodes.
* @throws IOException
* @throws InterruptedException
*/
public abstract
String[] getLocations() throws IOException, InterruptedException;
/**
* Gets info about which nodes the input split is stored on and how it is
* stored at each location.
*
* @return list of <code>SplitLocationInfo</code>s describing how the split
* data is stored at each location. A null value indicates that all the
* locations have the data stored on disk.
* @throws IOException
*/
@Evolving
public SplitLocationInfo[] getLocationInfo() throws IOException {
return null;
}
}

View File

@ -579,7 +579,17 @@ public interface MRJobConfig {
MR_AM_PREFIX + "history.use-batched-flush.queue-size.threshold";
public static final int DEFAULT_MR_AM_HISTORY_USE_BATCHED_FLUSH_QUEUE_SIZE_THRESHOLD =
50;
/**
* The threshold in terms of seconds after which an unsatisfied mapper request
* triggers reducer preemption to free space. Default 0 implies that the reduces
* should be preempted immediately after allocation if there is currently no
* room for newly allocated mappers.
*/
public static final String MR_JOB_REDUCER_PREEMPT_DELAY_SEC =
"mapreduce.job.reducer.preempt.delay.sec";
public static final int DEFAULT_MR_JOB_REDUCER_PREEMPT_DELAY_SEC = 0;
public static final String MR_AM_ENV =
MR_AM_PREFIX + "env";

View File

@ -35,6 +35,7 @@ import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.mapred.LocatedFileStatusFetcher;
import org.apache.hadoop.mapred.SplitLocationInfo;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
@ -359,6 +360,15 @@ public abstract class FileInputFormat<K, V> extends InputFormat<K, V> {
String[] hosts) {
return new FileSplit(file, start, length, hosts);
}
/**
* A factory that makes the split for this class. It can be overridden
* by sub-classes to make sub-types
*/
protected FileSplit makeSplit(Path file, long start, long length,
String[] hosts, String[] inMemoryHosts) {
return new FileSplit(file, start, length, hosts, inMemoryHosts);
}
/**
* Generate the list of files and make them into FileSplits.
@ -392,17 +402,20 @@ public abstract class FileInputFormat<K, V> extends InputFormat<K, V> {
while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
splits.add(makeSplit(path, length-bytesRemaining, splitSize,
blkLocations[blkIndex].getHosts()));
blkLocations[blkIndex].getHosts(),
blkLocations[blkIndex].getCachedHosts()));
bytesRemaining -= splitSize;
}
if (bytesRemaining != 0) {
int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining,
blkLocations[blkIndex].getHosts()));
blkLocations[blkIndex].getHosts(),
blkLocations[blkIndex].getCachedHosts()));
}
} else { // not splitable
splits.add(makeSplit(path, 0, length, blkLocations[0].getHosts()));
splits.add(makeSplit(path, 0, length, blkLocations[0].getHosts(),
blkLocations[0].getCachedHosts()));
}
} else {
//Create empty hosts array for zero length files

View File

@ -22,11 +22,13 @@ import java.io.IOException;
import java.io.DataInput;
import java.io.DataOutput;
import org.apache.hadoop.mapred.SplitLocationInfo;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.classification.InterfaceStability.Evolving;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
@ -41,6 +43,7 @@ public class FileSplit extends InputSplit implements Writable {
private long start;
private long length;
private String[] hosts;
private SplitLocationInfo[] hostInfos;
public FileSplit() {}
@ -57,6 +60,31 @@ public class FileSplit extends InputSplit implements Writable {
this.length = length;
this.hosts = hosts;
}
/** Constructs a split with host and cached-blocks information
*
* @param file the file name
* @param start the position of the first byte in the file to process
* @param length the number of bytes in the file to process
* @param hosts the list of hosts containing the block
* @param inMemoryHosts the list of hosts containing the block in memory
*/
public FileSplit(Path file, long start, long length, String[] hosts,
String[] inMemoryHosts) {
this(file, start, length, hosts);
hostInfos = new SplitLocationInfo[hosts.length];
for (int i = 0; i < hosts.length; i++) {
// because N will be tiny, scanning is probably faster than a HashSet
boolean inMemory = false;
for (String inMemoryHost : inMemoryHosts) {
if (inMemoryHost.equals(hosts[i])) {
inMemory = true;
break;
}
}
hostInfos[i] = new SplitLocationInfo(hosts[i], inMemory);
}
}
/** The file containing this split's data. */
public Path getPath() { return file; }
@ -98,4 +126,10 @@ public class FileSplit extends InputSplit implements Writable {
return this.hosts;
}
}
@Override
@Evolving
public SplitLocationInfo[] getLocationInfo() throws IOException {
return hostInfos;
}
}

View File

@ -82,6 +82,16 @@
</description>
</property>
<property>
<name>mapreduce.job.reducer.preempt.delay.sec</name>
<value>0</value>
<description>The threshold in terms of seconds after which an unsatisfied mapper
request triggers reducer preemption to free space. Default 0 implies that the
reduces should be preempted immediately after allocation if there is currently no
room for newly allocated mappers.
</description>
</property>
<property>
<name>mapreduce.job.max.split.locations</name>
<value>10</value>

View File

@ -102,6 +102,29 @@ public class TestFileInputFormat {
FileSystem.closeAll();
}
@Test
public void testSplitLocationInfo() throws Exception {
Configuration conf = getConfiguration();
conf.set(org.apache.hadoop.mapreduce.lib.input.FileInputFormat.INPUT_DIR,
"test:///a1/a2");
JobConf job = new JobConf(conf);
TextInputFormat fileInputFormat = new TextInputFormat();
fileInputFormat.configure(job);
FileSplit[] splits = (FileSplit[]) fileInputFormat.getSplits(job, 1);
String[] locations = splits[0].getLocations();
Assert.assertEquals(2, locations.length);
SplitLocationInfo[] locationInfo = splits[0].getLocationInfo();
Assert.assertEquals(2, locationInfo.length);
SplitLocationInfo localhostInfo = locations[0].equals("localhost") ?
locationInfo[0] : locationInfo[1];
SplitLocationInfo otherhostInfo = locations[0].equals("otherhost") ?
locationInfo[0] : locationInfo[1];
Assert.assertTrue(localhostInfo.isOnDisk());
Assert.assertTrue(localhostInfo.isInMemory());
Assert.assertTrue(otherhostInfo.isOnDisk());
Assert.assertFalse(otherhostInfo.isInMemory());
}
@Test
public void testListStatusSimple() throws IOException {
Configuration conf = new Configuration();
@ -223,8 +246,9 @@ public class TestFileInputFormat {
public BlockLocation[] getFileBlockLocations(Path p, long start, long len)
throws IOException {
return new BlockLocation[] {
new BlockLocation(new String[] { "localhost:50010" },
new String[] { "localhost" }, 0, len) };
new BlockLocation(new String[] { "localhost:50010", "otherhost:50010" },
new String[] { "localhost", "otherhost" }, new String[] { "localhost" },
new String[0], 0, len, false) };
}
@Override

View File

@ -39,6 +39,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.fs.RawLocalFileSystem;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.mapred.SplitLocationInfo;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
import org.junit.After;
@ -139,6 +140,28 @@ public class TestFileInputFormat {
1, mockFs.numListLocatedStatusCalls);
FileSystem.closeAll();
}
@Test
public void testSplitLocationInfo() throws Exception {
Configuration conf = getConfiguration();
conf.set(org.apache.hadoop.mapreduce.lib.input.FileInputFormat.INPUT_DIR,
"test:///a1/a2");
Job job = Job.getInstance(conf);
TextInputFormat fileInputFormat = new TextInputFormat();
List<InputSplit> splits = fileInputFormat.getSplits(job);
String[] locations = splits.get(0).getLocations();
Assert.assertEquals(2, locations.length);
SplitLocationInfo[] locationInfo = splits.get(0).getLocationInfo();
Assert.assertEquals(2, locationInfo.length);
SplitLocationInfo localhostInfo = locations[0].equals("localhost") ?
locationInfo[0] : locationInfo[1];
SplitLocationInfo otherhostInfo = locations[0].equals("otherhost") ?
locationInfo[0] : locationInfo[1];
Assert.assertTrue(localhostInfo.isOnDisk());
Assert.assertTrue(localhostInfo.isInMemory());
Assert.assertTrue(otherhostInfo.isOnDisk());
Assert.assertFalse(otherhostInfo.isInMemory());
}
@Test
public void testListStatusSimple() throws IOException {
@ -402,9 +425,9 @@ public class TestFileInputFormat {
public BlockLocation[] getFileBlockLocations(Path p, long start, long len)
throws IOException {
return new BlockLocation[] {
new BlockLocation(new String[] { "localhost:50010" },
new String[] { "localhost" }, 0, len) };
}
new BlockLocation(new String[] { "localhost:50010", "otherhost:50010" },
new String[] { "localhost", "otherhost" }, new String[] { "localhost" },
new String[0], 0, len, false) }; }
@Override
protected RemoteIterator<LocatedFileStatus> listLocatedStatus(Path f,