HDFS-5431. Support cachepool-based limit management in path-based caching. (awang via cmccabe)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1551651 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Colin McCabe 2013-12-17 18:47:04 +00:00
parent 926a86780d
commit 991c453ca3
29 changed files with 1367 additions and 938 deletions

View File

@ -253,6 +253,9 @@ Trunk (Unreleased)
INodeDirectoryWithSnapshot with DirectoryWithSnapshotFeature.
(jing9 via szetszwo)
HDFS-5431. Support cachepool-based limit management in path-based caching
(awang via cmccabe)
OPTIMIZATIONS
HDFS-5349. DNA_CACHE and DNA_UNCACHE should be by blockId only. (cmccabe)

View File

@ -357,16 +357,9 @@
<Method name="insertInternal" />
<Bug pattern="BC_UNCONFIRMED_CAST" />
</Match>
<!-- These two are used for shutting down and kicking the CRMon, do not need strong sync -->
<Match>
<Class name="org.apache.hadoop.hdfs.server.blockmanagement.CacheReplicationMonitor" />
<Field name="shutdown" />
<Bug pattern="IS2_INCONSISTENT_SYNC" />
</Match>
<Match>
<Class name="org.apache.hadoop.hdfs.server.blockmanagement.CacheReplicationMonitor" />
<Field name="rescanImmediately" />
<Bug pattern="IS2_INCONSISTENT_SYNC" />
<Bug pattern="RV_RETURN_VALUE_IGNORED_BAD_PRACTICE" />
</Match>
</FindBugsFilter>

View File

@ -0,0 +1,44 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
/**
* Specifies semantics for CacheDirective operations. Multiple flags can
* be combined in an EnumSet.
*/
@InterfaceAudience.Public
@InterfaceStability.Evolving
public enum CacheFlag {
/**
* Ignore cache pool resource limits when performing this operation.
*/
FORCE((short) 0x01);
private final short mode;
private CacheFlag(short mode) {
this.mode = mode;
}
short getMode() {
return mode;
}
}

View File

@ -85,6 +85,7 @@ import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.BlockStorageLocation;
import org.apache.hadoop.fs.CacheFlag;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.fs.ContentSummary;
import org.apache.hadoop.fs.CreateFlag;
@ -2295,20 +2296,20 @@ public class DFSClient implements java.io.Closeable {
}
public long addCacheDirective(
CacheDirectiveInfo info) throws IOException {
CacheDirectiveInfo info, EnumSet<CacheFlag> flags) throws IOException {
checkOpen();
try {
return namenode.addCacheDirective(info);
return namenode.addCacheDirective(info, flags);
} catch (RemoteException re) {
throw re.unwrapRemoteException();
}
}
public void modifyCacheDirective(
CacheDirectiveInfo info) throws IOException {
CacheDirectiveInfo info, EnumSet<CacheFlag> flags) throws IOException {
checkOpen();
try {
namenode.modifyCacheDirective(info);
namenode.modifyCacheDirective(info, flags);
} catch (RemoteException re) {
throw re.unwrapRemoteException();
}

View File

@ -31,6 +31,7 @@ import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.BlockStorageLocation;
import org.apache.hadoop.fs.CacheFlag;
import org.apache.hadoop.fs.ContentSummary;
import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.FSDataInputStream;
@ -1585,40 +1586,56 @@ public class DistributedFileSystem extends FileSystem {
}.resolve(this, absF);
}
/**
* @see {@link #addCacheDirective(CacheDirectiveInfo, EnumSet)}
*/
public long addCacheDirective(CacheDirectiveInfo info) throws IOException {
return addCacheDirective(info, EnumSet.noneOf(CacheFlag.class));
}
/**
* Add a new CacheDirective.
*
* @param info Information about a directive to add.
* @param flags {@link CacheFlag}s to use for this operation.
* @return the ID of the directive that was created.
* @throws IOException if the directive could not be added
*/
public long addCacheDirective(
CacheDirectiveInfo info) throws IOException {
CacheDirectiveInfo info, EnumSet<CacheFlag> flags) throws IOException {
Preconditions.checkNotNull(info.getPath());
Path path = new Path(getPathName(fixRelativePart(info.getPath()))).
makeQualified(getUri(), getWorkingDirectory());
return dfs.addCacheDirective(
new CacheDirectiveInfo.Builder(info).
setPath(path).
build());
build(),
flags);
}
/**
* @see {@link #modifyCacheDirective(CacheDirectiveInfo, EnumSet)}
*/
public void modifyCacheDirective(CacheDirectiveInfo info) throws IOException {
modifyCacheDirective(info, EnumSet.noneOf(CacheFlag.class));
}
/**
* Modify a CacheDirective.
*
* @param info Information about the directive to modify.
* You must set the ID to indicate which CacheDirective you want
* to modify.
* @param info Information about the directive to modify. You must set the ID
* to indicate which CacheDirective you want to modify.
* @param flags {@link CacheFlag}s to use for this operation.
* @throws IOException if the directive could not be modified
*/
public void modifyCacheDirective(
CacheDirectiveInfo info) throws IOException {
CacheDirectiveInfo info, EnumSet<CacheFlag> flags) throws IOException {
if (info.getPath() != null) {
info = new CacheDirectiveInfo.Builder(info).
setPath(new Path(getPathName(fixRelativePart(info.getPath()))).
makeQualified(getUri(), getWorkingDirectory())).build();
}
dfs.modifyCacheDirective(info);
dfs.modifyCacheDirective(info, flags);
}
/**

View File

@ -19,10 +19,12 @@ package org.apache.hadoop.hdfs.client;
import java.io.IOException;
import java.net.URI;
import java.util.EnumSet;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CacheFlag;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
@ -131,25 +133,26 @@ public class HdfsAdmin {
* Add a new CacheDirectiveInfo.
*
* @param info Information about a directive to add.
* @param flags {@link CacheFlag}s to use for this operation.
* @return the ID of the directive that was created.
* @throws IOException if the directive could not be added
*/
public long addCacheDirective(CacheDirectiveInfo info)
throws IOException {
return dfs.addCacheDirective(info);
public long addCacheDirective(CacheDirectiveInfo info,
EnumSet<CacheFlag> flags) throws IOException {
return dfs.addCacheDirective(info, flags);
}
/**
* Modify a CacheDirective.
*
* @param info Information about the directive to modify.
* You must set the ID to indicate which CacheDirective you want
* to modify.
* @param info Information about the directive to modify. You must set the ID
* to indicate which CacheDirective you want to modify.
* @param flags {@link CacheFlag}s to use for this operation.
* @throws IOException if the directive could not be modified
*/
public void modifyCacheDirective(CacheDirectiveInfo info)
throws IOException {
dfs.modifyCacheDirective(info);
public void modifyCacheDirective(CacheDirectiveInfo info,
EnumSet<CacheFlag> flags) throws IOException {
dfs.modifyCacheDirective(info, flags);
}
/**

View File

@ -18,8 +18,6 @@
package org.apache.hadoop.hdfs.protocol;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import javax.annotation.Nullable;
@ -32,14 +30,6 @@ import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.InvalidRequestException;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.fs.permission.PermissionStatus;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp;
import org.apache.hadoop.hdfs.util.XMLUtils;
import org.apache.hadoop.hdfs.util.XMLUtils.InvalidXmlException;
import org.apache.hadoop.hdfs.util.XMLUtils.Stanza;
import org.apache.hadoop.io.Text;
import org.xml.sax.ContentHandler;
import org.xml.sax.SAXException;
/**
* CachePoolInfo describes a cache pool.
@ -64,7 +54,7 @@ public class CachePoolInfo {
FsPermission mode;
@Nullable
Integer weight;
Long limit;
public CachePoolInfo(String poolName) {
this.poolName = poolName;
@ -101,12 +91,12 @@ public class CachePoolInfo {
return this;
}
public Integer getWeight() {
return weight;
public Long getLimit() {
return limit;
}
public CachePoolInfo setWeight(Integer weight) {
this.weight = weight;
public CachePoolInfo setLimit(Long bytes) {
this.limit = bytes;
return this;
}
@ -117,7 +107,7 @@ public class CachePoolInfo {
append(", groupName:").append(groupName).
append(", mode:").append((mode == null) ? "null" :
String.format("0%03o", mode.toShort())).
append(", weight:").append(weight).
append(", limit:").append(limit).
append("}").toString();
}
@ -134,7 +124,7 @@ public class CachePoolInfo {
append(ownerName, other.ownerName).
append(groupName, other.groupName).
append(mode, other.mode).
append(weight, other.weight).
append(limit, other.limit).
isEquals();
}
@ -145,7 +135,7 @@ public class CachePoolInfo {
append(ownerName).
append(groupName).
append(mode).
append(weight).
append(limit).
hashCode();
}
@ -153,8 +143,8 @@ public class CachePoolInfo {
if (info == null) {
throw new InvalidRequestException("CachePoolInfo is null");
}
if ((info.getWeight() != null) && (info.getWeight() < 0)) {
throw new InvalidRequestException("CachePool weight is negative.");
if ((info.getLimit() != null) && (info.getLimit() < 0)) {
throw new InvalidRequestException("Limit is negative.");
}
validateName(info.poolName);
}
@ -167,66 +157,4 @@ public class CachePoolInfo {
throw new IOException("invalid empty cache pool name");
}
}
public static CachePoolInfo readFrom(DataInput in) throws IOException {
String poolName = Text.readString(in);
CachePoolInfo info = new CachePoolInfo(poolName);
if (in.readBoolean()) {
info.setOwnerName(Text.readString(in));
}
if (in.readBoolean()) {
info.setGroupName(Text.readString(in));
}
if (in.readBoolean()) {
info.setMode(FsPermission.read(in));
}
if (in.readBoolean()) {
info.setWeight(in.readInt());
}
return info;
}
public void writeTo(DataOutput out) throws IOException {
Text.writeString(out, poolName);
boolean hasOwner, hasGroup, hasMode, hasWeight;
hasOwner = ownerName != null;
hasGroup = groupName != null;
hasMode = mode != null;
hasWeight = weight != null;
out.writeBoolean(hasOwner);
if (hasOwner) {
Text.writeString(out, ownerName);
}
out.writeBoolean(hasGroup);
if (hasGroup) {
Text.writeString(out, groupName);
}
out.writeBoolean(hasMode);
if (hasMode) {
mode.write(out);
}
out.writeBoolean(hasWeight);
if (hasWeight) {
out.writeInt(weight);
}
}
public void writeXmlTo(ContentHandler contentHandler) throws SAXException {
XMLUtils.addSaxString(contentHandler, "POOLNAME", poolName);
PermissionStatus perm = new PermissionStatus(ownerName,
groupName, mode);
FSEditLogOp.permissionStatusToXml(contentHandler, perm);
XMLUtils.addSaxString(contentHandler, "WEIGHT", Integer.toString(weight));
}
public static CachePoolInfo readXmlFrom(Stanza st) throws InvalidXmlException {
String poolName = st.getValue("POOLNAME");
PermissionStatus perm = FSEditLogOp.permissionStatusFromXml(st);
int weight = Integer.parseInt(st.getValue("WEIGHT"));
return new CachePoolInfo(poolName).
setOwnerName(perm.getUserName()).
setGroupName(perm.getGroupName()).
setMode(perm.getPermission()).
setWeight(weight);
}
}

View File

@ -30,6 +30,7 @@ public class CachePoolStats {
public static class Builder {
private long bytesNeeded;
private long bytesCached;
private long bytesOverlimit;
private long filesNeeded;
private long filesCached;
@ -46,6 +47,11 @@ public class CachePoolStats {
return this;
}
public Builder setBytesOverlimit(long bytesOverlimit) {
this.bytesOverlimit = bytesOverlimit;
return this;
}
public Builder setFilesNeeded(long filesNeeded) {
this.filesNeeded = filesNeeded;
return this;
@ -57,20 +63,22 @@ public class CachePoolStats {
}
public CachePoolStats build() {
return new CachePoolStats(bytesNeeded, bytesCached, filesNeeded,
filesCached);
return new CachePoolStats(bytesNeeded, bytesCached, bytesOverlimit,
filesNeeded, filesCached);
}
};
private final long bytesNeeded;
private final long bytesCached;
private final long bytesOverlimit;
private final long filesNeeded;
private final long filesCached;
private CachePoolStats(long bytesNeeded, long bytesCached, long filesNeeded,
long filesCached) {
private CachePoolStats(long bytesNeeded, long bytesCached,
long bytesOverlimit, long filesNeeded, long filesCached) {
this.bytesNeeded = bytesNeeded;
this.bytesCached = bytesCached;
this.bytesOverlimit = bytesOverlimit;
this.filesNeeded = filesNeeded;
this.filesCached = filesCached;
}
@ -83,6 +91,10 @@ public class CachePoolStats {
return bytesCached;
}
public long getBytesOverlimit() {
return bytesOverlimit;
}
public long getFilesNeeded() {
return filesNeeded;
}
@ -95,6 +107,7 @@ public class CachePoolStats {
return new StringBuilder().append("{").
append("bytesNeeded:").append(bytesNeeded).
append(", bytesCached:").append(bytesCached).
append(", bytesOverlimit:").append(bytesOverlimit).
append(", filesNeeded:").append(filesNeeded).
append(", filesCached:").append(filesCached).
append("}").toString();

View File

@ -19,9 +19,11 @@ package org.apache.hadoop.hdfs.protocol;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.EnumSet;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.CacheFlag;
import org.apache.hadoop.fs.ContentSummary;
import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.FileAlreadyExistsException;
@ -1100,23 +1102,24 @@ public interface ClientProtocol {
* Add a CacheDirective to the CacheManager.
*
* @param directive A CacheDirectiveInfo to be added
* @param flags {@link CacheFlag}s to use for this operation.
* @return A CacheDirectiveInfo associated with the added directive
* @throws IOException if the directive could not be added
*/
@AtMostOnce
public long addCacheDirective(
CacheDirectiveInfo directive) throws IOException;
public long addCacheDirective(CacheDirectiveInfo directive,
EnumSet<CacheFlag> flags) throws IOException;
/**
* Modify a CacheDirective in the CacheManager.
*
* @return directive The directive to modify. Must contain
* a directive ID.
* @return directive The directive to modify. Must contain a directive ID.
* @param flags {@link CacheFlag}s to use for this operation.
* @throws IOException if the directive could not be modified
*/
@AtMostOnce
public void modifyCacheDirective(
CacheDirectiveInfo directive) throws IOException;
public void modifyCacheDirective(CacheDirectiveInfo directive,
EnumSet<CacheFlag> flags) throws IOException;
/**
* Remove a CacheDirectiveInfo from the CacheManager.

View File

@ -320,7 +320,7 @@ public class ClientNamenodeProtocolServerSideTranslatorPB implements
try {
HdfsFileStatus result = server.create(req.getSrc(),
PBHelper.convert(req.getMasked()), req.getClientName(),
PBHelper.convert(req.getCreateFlag()), req.getCreateParent(),
PBHelper.convertCreateFlag(req.getCreateFlag()), req.getCreateParent(),
(short) req.getReplication(), req.getBlockSize());
if (result != null) {
@ -1034,9 +1034,11 @@ public class ClientNamenodeProtocolServerSideTranslatorPB implements
RpcController controller, AddCacheDirectiveRequestProto request)
throws ServiceException {
try {
long id = server.addCacheDirective(
PBHelper.convert(request.getInfo()),
PBHelper.convertCacheFlags(request.getCacheFlags()));
return AddCacheDirectiveResponseProto.newBuilder().
setId(server.addCacheDirective(
PBHelper.convert(request.getInfo()))).build();
setId(id).build();
} catch (IOException e) {
throw new ServiceException(e);
}
@ -1048,7 +1050,8 @@ public class ClientNamenodeProtocolServerSideTranslatorPB implements
throws ServiceException {
try {
server.modifyCacheDirective(
PBHelper.convert(request.getInfo()));
PBHelper.convert(request.getInfo()),
PBHelper.convertCacheFlags(request.getCacheFlags()));
return ModifyCacheDirectiveResponseProto.newBuilder().build();
} catch (IOException e) {
throw new ServiceException(e);

View File

@ -21,10 +21,12 @@ import java.io.Closeable;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.Arrays;
import java.util.EnumSet;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.BatchedRemoteIterator.BatchedEntries;
import org.apache.hadoop.fs.CacheFlag;
import org.apache.hadoop.fs.ContentSummary;
import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.FileAlreadyExistsException;
@ -1003,24 +1005,32 @@ public class ClientNamenodeProtocolTranslatorPB implements
}
@Override
public long addCacheDirective(
CacheDirectiveInfo directive) throws IOException {
public long addCacheDirective(CacheDirectiveInfo directive,
EnumSet<CacheFlag> flags) throws IOException {
try {
return rpcProxy.addCacheDirective(null,
AddCacheDirectiveRequestProto.newBuilder().
setInfo(PBHelper.convert(directive)).build()).getId();
AddCacheDirectiveRequestProto.Builder builder =
AddCacheDirectiveRequestProto.newBuilder().
setInfo(PBHelper.convert(directive));
if (!flags.isEmpty()) {
builder.setCacheFlags(PBHelper.convertCacheFlags(flags));
}
return rpcProxy.addCacheDirective(null, builder.build()).getId();
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
}
@Override
public void modifyCacheDirective(
CacheDirectiveInfo directive) throws IOException {
public void modifyCacheDirective(CacheDirectiveInfo directive,
EnumSet<CacheFlag> flags) throws IOException {
try {
rpcProxy.modifyCacheDirective(null,
ModifyCacheDirectiveRequestProto.Builder builder =
ModifyCacheDirectiveRequestProto.newBuilder().
setInfo(PBHelper.convert(directive)).build());
setInfo(PBHelper.convert(directive));
if (!flags.isEmpty()) {
builder.setCacheFlags(PBHelper.convertCacheFlags(flags));
}
rpcProxy.modifyCacheDirective(null, builder.build());
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}

View File

@ -27,6 +27,7 @@ import java.util.Arrays;
import java.util.EnumSet;
import java.util.List;
import org.apache.hadoop.fs.CacheFlag;
import org.apache.hadoop.fs.ContentSummary;
import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.FsServerDefaults;
@ -64,6 +65,7 @@ import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CacheDirectiveEntryProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CacheDirectiveInfoExpirationProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CacheDirectiveStatsProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CacheFlagProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CachePoolEntryProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CachePoolInfoProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CachePoolStatsProto;
@ -1182,7 +1184,7 @@ public class PBHelper {
return value;
}
public static EnumSetWritable<CreateFlag> convert(int flag) {
public static EnumSetWritable<CreateFlag> convertCreateFlag(int flag) {
EnumSet<CreateFlag> result =
EnumSet.noneOf(CreateFlag.class);
if ((flag & CreateFlagProto.APPEND_VALUE) == CreateFlagProto.APPEND_VALUE) {
@ -1197,7 +1199,23 @@ public class PBHelper {
}
return new EnumSetWritable<CreateFlag>(result);
}
public static int convertCacheFlags(EnumSet<CacheFlag> flags) {
int value = 0;
if (flags.contains(CacheFlag.FORCE)) {
value |= CacheFlagProto.FORCE.getNumber();
}
return value;
}
public static EnumSet<CacheFlag> convertCacheFlags(int flags) {
EnumSet<CacheFlag> result = EnumSet.noneOf(CacheFlag.class);
if ((flags & CacheFlagProto.FORCE_VALUE) == CacheFlagProto.FORCE_VALUE) {
result.add(CacheFlag.FORCE);
}
return result;
}
public static HdfsFileStatus convert(HdfsFileStatusProto fs) {
if (fs == null)
return null;
@ -1795,8 +1813,8 @@ public class PBHelper {
if (info.getMode() != null) {
builder.setMode(info.getMode().toShort());
}
if (info.getWeight() != null) {
builder.setWeight(info.getWeight());
if (info.getLimit() != null) {
builder.setLimit(info.getLimit());
}
return builder.build();
}
@ -1814,8 +1832,8 @@ public class PBHelper {
if (proto.hasMode()) {
info.setMode(new FsPermission((short)proto.getMode()));
}
if (proto.hasWeight()) {
info.setWeight(proto.getWeight());
if (proto.hasLimit()) {
info.setLimit(proto.getLimit());
}
return info;
}
@ -1824,6 +1842,7 @@ public class PBHelper {
CachePoolStatsProto.Builder builder = CachePoolStatsProto.newBuilder();
builder.setBytesNeeded(stats.getBytesNeeded());
builder.setBytesCached(stats.getBytesCached());
builder.setBytesOverlimit(stats.getBytesOverlimit());
builder.setFilesNeeded(stats.getFilesNeeded());
builder.setFilesCached(stats.getFilesCached());
return builder.build();
@ -1833,6 +1852,7 @@ public class PBHelper {
CachePoolStats.Builder builder = new CachePoolStats.Builder();
builder.setBytesNeeded(proto.getBytesNeeded());
builder.setBytesCached(proto.getBytesCached());
builder.setBytesOverlimit(proto.getBytesOverlimit());
builder.setFilesNeeded(proto.getFilesNeeded());
builder.setFilesCached(proto.getFilesCached());
return builder.build();

View File

@ -27,6 +27,9 @@ import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -48,6 +51,8 @@ import org.apache.hadoop.hdfs.util.ReadOnlyList;
import org.apache.hadoop.util.GSet;
import org.apache.hadoop.util.Time;
import com.google.common.base.Preconditions;
/**
* Scans the namesystem, scheduling blocks to be cached as appropriate.
*
@ -79,26 +84,53 @@ public class CacheReplicationMonitor extends Thread implements Closeable {
private final long intervalMs;
/**
* True if we should rescan immediately, regardless of how much time
* elapsed since the previous scan.
* The CacheReplicationMonitor (CRM) lock. Used to synchronize starting and
* waiting for rescan operations.
*/
private boolean rescanImmediately;
private final ReentrantLock lock = new ReentrantLock();
/**
* Notifies the scan thread that an immediate rescan is needed.
*/
private final Condition doRescan = lock.newCondition();
/**
* Notifies waiting threads that a rescan has finished.
*/
private final Condition scanFinished = lock.newCondition();
/**
* Whether there are pending CacheManager operations that necessitate a
* CacheReplicationMonitor rescan. Protected by the CRM lock.
*/
private boolean needsRescan = true;
/**
* Whether we are currently doing a rescan. Protected by the CRM lock.
*/
private boolean isScanning = false;
/**
* The number of rescans completed. Used to wait for scans to finish.
* Protected by the CacheReplicationMonitor lock.
*/
private long scanCount = 0;
/**
* True if this monitor should terminate. Protected by the CRM lock.
*/
private boolean shutdown = false;
/**
* The monotonic time at which the current scan started.
*/
private long scanTimeMs;
private long startTimeMs;
/**
* Mark status of the current scan.
*/
private boolean mark = false;
/**
* True if this monitor should terminate.
*/
private boolean shutdown;
/**
* Cache directives found in the previous scan.
*/
@ -108,7 +140,7 @@ public class CacheReplicationMonitor extends Thread implements Closeable {
* Blocks found in the previous scan.
*/
private long scannedBlocks;
public CacheReplicationMonitor(FSNamesystem namesystem,
CacheManager cacheManager, long intervalMs) {
this.namesystem = namesystem;
@ -120,41 +152,60 @@ public class CacheReplicationMonitor extends Thread implements Closeable {
@Override
public void run() {
shutdown = false;
rescanImmediately = true;
scanTimeMs = 0;
startTimeMs = 0;
LOG.info("Starting CacheReplicationMonitor with interval " +
intervalMs + " milliseconds");
try {
long curTimeMs = Time.monotonicNow();
while (true) {
synchronized(this) {
// Not all of the variables accessed here need the CRM lock, but take
// it anyway for simplicity
lock.lock();
try {
while (true) {
if (shutdown) {
LOG.info("Shutting down CacheReplicationMonitor");
return;
}
if (rescanImmediately) {
LOG.info("Rescanning on request");
rescanImmediately = false;
if (needsRescan) {
LOG.info("Rescanning because of pending operations");
break;
}
long delta = (scanTimeMs + intervalMs) - curTimeMs;
long delta = (startTimeMs + intervalMs) - curTimeMs;
if (delta <= 0) {
LOG.info("Rescanning after " + (curTimeMs - scanTimeMs) +
LOG.info("Rescanning after " + (curTimeMs - startTimeMs) +
" milliseconds");
break;
}
this.wait(delta);
doRescan.await(delta, TimeUnit.MILLISECONDS);
curTimeMs = Time.monotonicNow();
}
} finally {
lock.unlock();
}
scanTimeMs = curTimeMs;
// Mark scan as started, clear needsRescan
lock.lock();
try {
isScanning = true;
needsRescan = false;
} finally {
lock.unlock();
}
startTimeMs = curTimeMs;
mark = !mark;
rescan();
curTimeMs = Time.monotonicNow();
// Retake the CRM lock to update synchronization-related variables
lock.lock();
try {
isScanning = false;
scanCount++;
scanFinished.signalAll();
} finally {
lock.unlock();
}
LOG.info("Scanned " + scannedDirectives + " directive(s) and " +
scannedBlocks + " block(s) in " + (curTimeMs - scanTimeMs) + " " +
scannedBlocks + " block(s) in " + (curTimeMs - startTimeMs) + " " +
"millisecond(s).");
}
} catch (Throwable t) {
@ -164,15 +215,91 @@ public class CacheReplicationMonitor extends Thread implements Closeable {
}
/**
* Kick the monitor thread.
*
* If it is sleeping, it will wake up and start scanning.
* If it is currently scanning, it will finish the scan and immediately do
* another one.
* Similar to {@link CacheReplicationMonitor#waitForRescan()}, except it only
* waits if there are pending operations that necessitate a rescan as
* indicated by {@link #setNeedsRescan()}.
* <p>
* Note that this call may release the FSN lock, so operations before and
* after are not necessarily atomic.
*/
public synchronized void kick() {
rescanImmediately = true;
this.notifyAll();
public void waitForRescanIfNeeded() {
lock.lock();
try {
if (!needsRescan) {
return;
}
} finally {
lock.unlock();
}
waitForRescan();
}
/**
* Waits for a rescan to complete. This doesn't guarantee consistency with
* pending operations, only relative recency, since it will not force a new
* rescan if a rescan is already underway.
* <p>
* Note that this call will release the FSN lock, so operations before and
* after are not atomic.
*/
public void waitForRescan() {
// Drop the FSN lock temporarily and retake it after we finish waiting
// Need to handle both the read lock and the write lock
boolean retakeWriteLock = false;
if (namesystem.hasWriteLock()) {
namesystem.writeUnlock();
retakeWriteLock = true;
} else if (namesystem.hasReadLock()) {
namesystem.readUnlock();
} else {
// Expected to have at least one of the locks
Preconditions.checkState(false,
"Need to be holding either the read or write lock");
}
// try/finally for retaking FSN lock
try {
lock.lock();
// try/finally for releasing CRM lock
try {
// If no scan is already ongoing, mark the CRM as dirty and kick
if (!isScanning) {
needsRescan = true;
doRescan.signal();
}
// Wait until the scan finishes and the count advances
final long startCount = scanCount;
while (startCount >= scanCount) {
try {
scanFinished.await();
} catch (InterruptedException e) {
LOG.warn("Interrupted while waiting for CacheReplicationMonitor"
+ " rescan", e);
break;
}
}
} finally {
lock.unlock();
}
} finally {
if (retakeWriteLock) {
namesystem.writeLock();
} else {
namesystem.readLock();
}
}
}
/**
* Indicates to the CacheReplicationMonitor that there have been CacheManager
* changes that require a rescan.
*/
public void setNeedsRescan() {
lock.lock();
try {
this.needsRescan = true;
} finally {
lock.unlock();
}
}
/**
@ -180,10 +307,14 @@ public class CacheReplicationMonitor extends Thread implements Closeable {
*/
@Override
public void close() throws IOException {
synchronized(this) {
lock.lock();
try {
if (shutdown) return;
shutdown = true;
this.notifyAll();
doRescan.signalAll();
scanFinished.signalAll();
} finally {
lock.unlock();
}
try {
if (this.isAlive()) {
@ -228,12 +359,14 @@ public class CacheReplicationMonitor extends Thread implements Closeable {
// Reset the directive's statistics
directive.resetStatistics();
// Skip processing this entry if it has expired
LOG.info("Directive expiry is at " + directive.getExpiryTime());
if (LOG.isTraceEnabled()) {
LOG.trace("Directive expiry is at " + directive.getExpiryTime());
}
if (directive.getExpiryTime() > 0 && directive.getExpiryTime() <= now) {
if (LOG.isDebugEnabled()) {
LOG.debug("Skipping directive id " + directive.getId()
+ " because it has expired (" + directive.getExpiryTime() + ">="
+ now);
+ now + ")");
}
continue;
}
@ -280,15 +413,27 @@ public class CacheReplicationMonitor extends Thread implements Closeable {
// Increment the "needed" statistics
directive.addFilesNeeded(1);
long neededTotal = 0;
for (BlockInfo blockInfo : blockInfos) {
long neededByBlock =
directive.getReplication() * blockInfo.getNumBytes();
neededTotal += neededByBlock;
}
// We don't cache UC blocks, don't add them to the total here
long neededTotal = file.computeFileSizeNotIncludingLastUcBlock() *
directive.getReplication();
directive.addBytesNeeded(neededTotal);
// TODO: Enforce per-pool quotas
// The pool's bytesNeeded is incremented as we scan. If the demand
// thus far plus the demand of this file would exceed the pool's limit,
// do not cache this file.
CachePool pool = directive.getPool();
if (pool.getBytesNeeded() > pool.getLimit()) {
if (LOG.isDebugEnabled()) {
LOG.debug(String.format("Skipping directive id %d file %s because "
+ "limit of pool %s would be exceeded (%d > %d)",
directive.getId(),
file.getFullPathName(),
pool.getPoolName(),
pool.getBytesNeeded(),
pool.getLimit()));
}
return;
}
long cachedTotal = 0;
for (BlockInfo blockInfo : blockInfos) {

View File

@ -27,11 +27,12 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_PATH_BASED_CACHE
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_PATH_BASED_CACHE_REFRESH_INTERVAL_MS_DEFAULT;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.DataOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
@ -45,13 +46,16 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BatchedRemoteIterator.BatchedListEntries;
import org.apache.hadoop.fs.CacheFlag;
import org.apache.hadoop.fs.InvalidRequestException;
import org.apache.hadoop.fs.UnresolvedLinkException;
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.CacheDirective;
import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry;
import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo;
import org.apache.hadoop.hdfs.protocol.CacheDirectiveStats;
import org.apache.hadoop.hdfs.protocol.CachePoolEntry;
import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
@ -68,7 +72,7 @@ import org.apache.hadoop.hdfs.server.namenode.startupprogress.StartupProgress;
import org.apache.hadoop.hdfs.server.namenode.startupprogress.StartupProgress.Counter;
import org.apache.hadoop.hdfs.server.namenode.startupprogress.Step;
import org.apache.hadoop.hdfs.server.namenode.startupprogress.StepType;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.hdfs.util.ReadOnlyList;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.util.GSet;
import org.apache.hadoop.util.LightWeightGSet;
@ -341,6 +345,67 @@ public final class CacheManager {
return expiryTime;
}
/**
* Throws an exception if the CachePool does not have enough capacity to
* cache the given path at the replication factor.
*
* @param pool CachePool where the path is being cached
* @param path Path that is being cached
* @param replication Replication factor of the path
* @throws InvalidRequestException if the pool does not have enough capacity
*/
private void checkLimit(CachePool pool, String path,
short replication) throws InvalidRequestException {
CacheDirectiveStats stats = computeNeeded(path, replication);
if (pool.getBytesNeeded() + (stats.getBytesNeeded() * replication) > pool
.getLimit()) {
throw new InvalidRequestException("Caching path " + path + " of size "
+ stats.getBytesNeeded() / replication + " bytes at replication "
+ replication + " would exceed pool " + pool.getPoolName()
+ "'s remaining capacity of "
+ (pool.getLimit() - pool.getBytesNeeded()) + " bytes.");
}
}
/**
* Computes the needed number of bytes and files for a path.
* @return CacheDirectiveStats describing the needed stats for this path
*/
private CacheDirectiveStats computeNeeded(String path, short replication) {
FSDirectory fsDir = namesystem.getFSDirectory();
INode node;
long requestedBytes = 0;
long requestedFiles = 0;
CacheDirectiveStats.Builder builder = new CacheDirectiveStats.Builder();
try {
node = fsDir.getINode(path);
} catch (UnresolvedLinkException e) {
// We don't cache through symlinks
return builder.build();
}
if (node == null) {
return builder.build();
}
if (node.isFile()) {
requestedFiles = 1;
INodeFile file = node.asFile();
requestedBytes = file.computeFileSize();
} else if (node.isDirectory()) {
INodeDirectory dir = node.asDirectory();
ReadOnlyList<INode> children = dir.getChildrenList(null);
requestedFiles = children.size();
for (INode child : children) {
if (child.isFile()) {
requestedBytes += child.asFile().computeFileSize();
}
}
}
return new CacheDirectiveStats.Builder()
.setBytesNeeded(requestedBytes)
.setFilesCached(requestedFiles)
.build();
}
/**
* Get a CacheDirective by ID, validating the ID and that the directive
* exists.
@ -384,6 +449,15 @@ public final class CacheManager {
directivesByPath.put(path, directives);
}
directives.add(directive);
// Fix up pool stats
CacheDirectiveStats stats =
computeNeeded(directive.getPath(), directive.getReplication());
directive.addBytesNeeded(stats.getBytesNeeded());
directive.addFilesNeeded(directive.getFilesNeeded());
if (monitor != null) {
monitor.setNeedsRescan();
}
}
/**
@ -407,7 +481,7 @@ public final class CacheManager {
}
public CacheDirectiveInfo addDirective(
CacheDirectiveInfo info, FSPermissionChecker pc)
CacheDirectiveInfo info, FSPermissionChecker pc, EnumSet<CacheFlag> flags)
throws IOException {
assert namesystem.hasWriteLock();
CacheDirective directive;
@ -418,6 +492,14 @@ public final class CacheManager {
short replication = validateReplication(info, (short)1);
long expiryTime = validateExpiryTime(info,
CacheDirectiveInfo.Expiration.EXPIRY_NEVER);
// Do quota validation if required
if (!flags.contains(CacheFlag.FORCE)) {
// Can't kick and wait if caching is disabled
if (monitor != null) {
monitor.waitForRescan();
}
checkLimit(pool, path, replication);
}
// All validation passed
// Add a new entry with the next available ID.
long id = getNextDirectiveId();
@ -428,14 +510,11 @@ public final class CacheManager {
throw e;
}
LOG.info("addDirective of " + info + " successful.");
if (monitor != null) {
monitor.kick();
}
return directive.toInfo();
}
public void modifyDirective(CacheDirectiveInfo info,
FSPermissionChecker pc) throws IOException {
FSPermissionChecker pc, EnumSet<CacheFlag> flags) throws IOException {
assert namesystem.hasWriteLock();
String idString =
(info.getId() == null) ?
@ -463,6 +542,13 @@ public final class CacheManager {
if (info.getPool() != null) {
pool = getCachePool(validatePoolName(info));
checkWritePermission(pc, pool);
if (!flags.contains(CacheFlag.FORCE)) {
// Can't kick and wait if caching is disabled
if (monitor != null) {
monitor.waitForRescan();
}
checkLimit(pool, path, replication);
}
}
removeInternal(prevEntry);
CacheDirective newEntry =
@ -489,9 +575,18 @@ public final class CacheManager {
if (directives.size() == 0) {
directivesByPath.remove(path);
}
// Fix up the stats from removing the pool
final CachePool pool = directive.getPool();
directive.addBytesNeeded(-directive.getBytesNeeded());
directive.addFilesNeeded(-directive.getFilesNeeded());
directivesById.remove(directive.getId());
directive.getPool().getDirectiveList().remove(directive);
pool.getDirectiveList().remove(directive);
assert directive.getPool() == null;
if (monitor != null) {
monitor.setNeedsRescan();
}
}
public void removeDirective(long id, FSPermissionChecker pc)
@ -505,9 +600,6 @@ public final class CacheManager {
LOG.warn("removeDirective of " + id + " failed: ", e);
throw e;
}
if (monitor != null) {
monitor.kick();
}
LOG.info("removeDirective of " + id + " successful.");
}
@ -527,6 +619,9 @@ public final class CacheManager {
if (filter.getReplication() != null) {
throw new IOException("Filtering by replication is unsupported.");
}
if (monitor != null) {
monitor.waitForRescanIfNeeded();
}
ArrayList<CacheDirectiveEntry> replies =
new ArrayList<CacheDirectiveEntry>(NUM_PRE_ALLOCATED_ENTRIES);
int numReplies = 0;
@ -573,16 +668,22 @@ public final class CacheManager {
public CachePoolInfo addCachePool(CachePoolInfo info)
throws IOException {
assert namesystem.hasWriteLock();
CachePoolInfo.validate(info);
String poolName = info.getPoolName();
CachePool pool = cachePools.get(poolName);
if (pool != null) {
throw new InvalidRequestException("Cache pool " + poolName
+ " already exists.");
CachePool pool;
try {
CachePoolInfo.validate(info);
String poolName = info.getPoolName();
pool = cachePools.get(poolName);
if (pool != null) {
throw new InvalidRequestException("Cache pool " + poolName
+ " already exists.");
}
pool = CachePool.createFromInfoAndDefaults(info);
cachePools.put(pool.getPoolName(), pool);
} catch (IOException e) {
LOG.info("addCachePool of " + info + " failed: ", e);
throw e;
}
pool = CachePool.createFromInfoAndDefaults(info);
cachePools.put(pool.getPoolName(), pool);
LOG.info("Created new cache pool " + pool);
LOG.info("addCachePool of " + info + " successful.");
return pool.getInfo(true);
}
@ -597,42 +698,51 @@ public final class CacheManager {
public void modifyCachePool(CachePoolInfo info)
throws IOException {
assert namesystem.hasWriteLock();
CachePoolInfo.validate(info);
String poolName = info.getPoolName();
CachePool pool = cachePools.get(poolName);
if (pool == null) {
throw new InvalidRequestException("Cache pool " + poolName
+ " does not exist.");
}
StringBuilder bld = new StringBuilder();
String prefix = "";
if (info.getOwnerName() != null) {
pool.setOwnerName(info.getOwnerName());
bld.append(prefix).
append("set owner to ").append(info.getOwnerName());
prefix = "; ";
try {
CachePoolInfo.validate(info);
String poolName = info.getPoolName();
CachePool pool = cachePools.get(poolName);
if (pool == null) {
throw new InvalidRequestException("Cache pool " + poolName
+ " does not exist.");
}
String prefix = "";
if (info.getOwnerName() != null) {
pool.setOwnerName(info.getOwnerName());
bld.append(prefix).
append("set owner to ").append(info.getOwnerName());
prefix = "; ";
}
if (info.getGroupName() != null) {
pool.setGroupName(info.getGroupName());
bld.append(prefix).
append("set group to ").append(info.getGroupName());
prefix = "; ";
}
if (info.getMode() != null) {
pool.setMode(info.getMode());
bld.append(prefix).append("set mode to " + info.getMode());
prefix = "; ";
}
if (info.getLimit() != null) {
pool.setLimit(info.getLimit());
bld.append(prefix).append("set limit to " + info.getLimit());
prefix = "; ";
// New limit changes stats, need to set needs refresh
if (monitor != null) {
monitor.setNeedsRescan();
}
}
if (prefix.isEmpty()) {
bld.append("no changes.");
}
} catch (IOException e) {
LOG.info("modifyCachePool of " + info + " failed: ", e);
throw e;
}
if (info.getGroupName() != null) {
pool.setGroupName(info.getGroupName());
bld.append(prefix).
append("set group to ").append(info.getGroupName());
prefix = "; ";
}
if (info.getMode() != null) {
pool.setMode(info.getMode());
bld.append(prefix).append("set mode to " + info.getMode());
prefix = "; ";
}
if (info.getWeight() != null) {
pool.setWeight(info.getWeight());
bld.append(prefix).
append("set weight to ").append(info.getWeight());
prefix = "; ";
}
if (prefix.isEmpty()) {
bld.append("no changes.");
}
LOG.info("modified " + poolName + "; " + bld.toString());
LOG.info("modifyCachePool of " + info.getPoolName() + " successful; "
+ bld.toString());
}
/**
@ -646,28 +756,37 @@ public final class CacheManager {
public void removeCachePool(String poolName)
throws IOException {
assert namesystem.hasWriteLock();
CachePoolInfo.validateName(poolName);
CachePool pool = cachePools.remove(poolName);
if (pool == null) {
throw new InvalidRequestException(
"Cannot remove non-existent cache pool " + poolName);
}
// Remove all directives in this pool.
Iterator<CacheDirective> iter = pool.getDirectiveList().iterator();
while (iter.hasNext()) {
CacheDirective directive = iter.next();
directivesByPath.remove(directive.getPath());
directivesById.remove(directive.getId());
iter.remove();
}
if (monitor != null) {
monitor.kick();
try {
CachePoolInfo.validateName(poolName);
CachePool pool = cachePools.remove(poolName);
if (pool == null) {
throw new InvalidRequestException(
"Cannot remove non-existent cache pool " + poolName);
}
// Remove all directives in this pool.
Iterator<CacheDirective> iter = pool.getDirectiveList().iterator();
while (iter.hasNext()) {
CacheDirective directive = iter.next();
directivesByPath.remove(directive.getPath());
directivesById.remove(directive.getId());
iter.remove();
}
if (monitor != null) {
monitor.setNeedsRescan();
}
} catch (IOException e) {
LOG.info("removeCachePool of " + poolName + " failed: ", e);
throw e;
}
LOG.info("removeCachePool of " + poolName + " successful.");
}
public BatchedListEntries<CachePoolEntry>
listCachePools(FSPermissionChecker pc, String prevKey) {
assert namesystem.hasReadLock();
if (monitor != null) {
monitor.waitForRescanIfNeeded();
}
final int NUM_PRE_ALLOCATED_ENTRIES = 16;
ArrayList<CachePoolEntry> results =
new ArrayList<CachePoolEntry>(NUM_PRE_ALLOCATED_ENTRIES);
@ -782,7 +901,7 @@ public final class CacheManager {
* @param sdPath path of the storage directory
* @throws IOException
*/
public void saveState(DataOutput out, String sdPath)
public void saveState(DataOutputStream out, String sdPath)
throws IOException {
out.writeLong(nextDirectiveId);
savePools(out, sdPath);
@ -805,7 +924,7 @@ public final class CacheManager {
/**
* Save cache pools to fsimage
*/
private void savePools(DataOutput out,
private void savePools(DataOutputStream out,
String sdPath) throws IOException {
StartupProgress prog = NameNode.getStartupProgress();
Step step = new Step(StepType.CACHE_POOLS, sdPath);
@ -814,7 +933,7 @@ public final class CacheManager {
Counter counter = prog.getCounter(Phase.SAVING_CHECKPOINT, step);
out.writeInt(cachePools.size());
for (CachePool pool: cachePools.values()) {
pool.getInfo(true).writeTo(out);
FSImageSerialization.writeCachePoolInfo(out, pool.getInfo(true));
counter.increment();
}
prog.endStep(Phase.SAVING_CHECKPOINT, step);
@ -823,7 +942,7 @@ public final class CacheManager {
/*
* Save cache entries to fsimage
*/
private void saveDirectives(DataOutput out, String sdPath)
private void saveDirectives(DataOutputStream out, String sdPath)
throws IOException {
StartupProgress prog = NameNode.getStartupProgress();
Step step = new Step(StepType.CACHE_ENTRIES, sdPath);
@ -832,11 +951,7 @@ public final class CacheManager {
Counter counter = prog.getCounter(Phase.SAVING_CHECKPOINT, step);
out.writeInt(directivesById.size());
for (CacheDirective directive : directivesById.values()) {
out.writeLong(directive.getId());
Text.writeString(out, directive.getPath());
out.writeShort(directive.getReplication());
Text.writeString(out, directive.getPool().getPoolName());
out.writeLong(directive.getExpiryTime());
FSImageSerialization.writeCacheDirectiveInfo(out, directive.toInfo());
counter.increment();
}
prog.endStep(Phase.SAVING_CHECKPOINT, step);
@ -854,7 +969,7 @@ public final class CacheManager {
prog.setTotal(Phase.LOADING_FSIMAGE, step, numberOfPools);
Counter counter = prog.getCounter(Phase.LOADING_FSIMAGE, step);
for (int i = 0; i < numberOfPools; i++) {
addCachePool(CachePoolInfo.readFrom(in));
addCachePool(FSImageSerialization.readCachePoolInfo(in));
counter.increment();
}
prog.endStep(Phase.LOADING_FSIMAGE, step);
@ -871,19 +986,17 @@ public final class CacheManager {
prog.setTotal(Phase.LOADING_FSIMAGE, step, numDirectives);
Counter counter = prog.getCounter(Phase.LOADING_FSIMAGE, step);
for (int i = 0; i < numDirectives; i++) {
long directiveId = in.readLong();
String path = Text.readString(in);
short replication = in.readShort();
String poolName = Text.readString(in);
long expiryTime = in.readLong();
CacheDirectiveInfo info = FSImageSerialization.readCacheDirectiveInfo(in);
// Get pool reference by looking it up in the map
final String poolName = info.getPool();
CachePool pool = cachePools.get(poolName);
if (pool == null) {
throw new IOException("Directive refers to pool " + poolName +
", which does not exist.");
}
CacheDirective directive =
new CacheDirective(directiveId, path, replication, expiryTime);
new CacheDirective(info.getId(), info.getPath().toUri().getPath(),
info.getReplication(), info.getExpiration().getAbsoluteMillis());
boolean addedDirective = pool.getDirectiveList().add(directive);
assert addedDirective;
if (directivesById.put(directive.getId(), directive) != null) {

View File

@ -49,8 +49,8 @@ import com.google.common.base.Preconditions;
public final class CachePool {
public static final Log LOG = LogFactory.getLog(CachePool.class);
public static final int DEFAULT_WEIGHT = 100;
public static final long DEFAULT_LIMIT = Long.MAX_VALUE;
@Nonnull
private final String poolName;
@ -71,7 +71,10 @@ public final class CachePool {
@Nonnull
private FsPermission mode;
private int weight;
/**
* Maximum number of bytes that can be cached in this pool.
*/
private long limit;
private long bytesNeeded;
private long bytesCached;
@ -118,10 +121,10 @@ public final class CachePool {
}
FsPermission mode = (info.getMode() == null) ?
FsPermission.getCachePoolDefault() : info.getMode();
Integer weight = (info.getWeight() == null) ?
DEFAULT_WEIGHT : info.getWeight();
long limit = info.getLimit() == null ?
DEFAULT_LIMIT : info.getLimit();
return new CachePool(info.getPoolName(),
ownerName, groupName, mode, weight);
ownerName, groupName, mode, limit);
}
/**
@ -131,11 +134,11 @@ public final class CachePool {
static CachePool createFromInfo(CachePoolInfo info) {
return new CachePool(info.getPoolName(),
info.getOwnerName(), info.getGroupName(),
info.getMode(), info.getWeight());
info.getMode(), info.getLimit());
}
CachePool(String poolName, String ownerName, String groupName,
FsPermission mode, int weight) {
FsPermission mode, long limit) {
Preconditions.checkNotNull(poolName);
Preconditions.checkNotNull(ownerName);
Preconditions.checkNotNull(groupName);
@ -144,7 +147,7 @@ public final class CachePool {
this.ownerName = ownerName;
this.groupName = groupName;
this.mode = new FsPermission(mode);
this.weight = weight;
this.limit = limit;
}
public String getPoolName() {
@ -177,16 +180,16 @@ public final class CachePool {
this.mode = new FsPermission(mode);
return this;
}
public int getWeight() {
return weight;
public long getLimit() {
return limit;
}
public CachePool setWeight(int weight) {
this.weight = weight;
public CachePool setLimit(long bytes) {
this.limit = bytes;
return this;
}
/**
* Get either full or partial information about this CachePool.
*
@ -204,7 +207,7 @@ public final class CachePool {
return info.setOwnerName(ownerName).
setGroupName(groupName).
setMode(new FsPermission(mode)).
setWeight(weight);
setLimit(limit);
}
/**
@ -241,6 +244,10 @@ public final class CachePool {
return bytesCached;
}
public long getBytesOverlimit() {
return Math.max(bytesNeeded-limit, 0);
}
public long getFilesNeeded() {
return filesNeeded;
}
@ -258,6 +265,7 @@ public final class CachePool {
return new CachePoolStats.Builder().
setBytesNeeded(bytesNeeded).
setBytesCached(bytesCached).
setBytesOverlimit(getBytesOverlimit()).
setFilesNeeded(filesNeeded).
setFilesCached(filesCached).
build();
@ -291,7 +299,7 @@ public final class CachePool {
append(", ownerName:").append(ownerName).
append(", groupName:").append(groupName).
append(", mode:").append(mode).
append(", weight:").append(weight).
append(", limit:").append(limit).
append(" }").toString();
}

View File

@ -24,12 +24,14 @@ import java.io.IOException;
import java.io.InputStream;
import java.util.Arrays;
import java.util.EnumMap;
import java.util.EnumSet;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.CacheFlag;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
@ -650,7 +652,7 @@ public class FSEditLogLoader {
ModifyCacheDirectiveInfoOp modifyOp =
(ModifyCacheDirectiveInfoOp) op;
fsNamesys.getCacheManager().modifyDirective(
modifyOp.directive, null);
modifyOp.directive, null, EnumSet.of(CacheFlag.FORCE));
if (toAddRetryCache) {
fsNamesys.addCacheEntry(op.rpcClientId, op.rpcCallId);
}

View File

@ -64,7 +64,6 @@ import java.io.EOFException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Date;
import java.util.EnumMap;
import java.util.List;
import java.util.zip.CheckedInputStream;
@ -76,7 +75,6 @@ import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.ChecksumException;
import org.apache.hadoop.fs.Options.Rename;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.fs.permission.PermissionStatus;
import org.apache.hadoop.hdfs.DFSConfigKeys;
@ -2895,56 +2893,25 @@ public abstract class FSEditLogOp {
@Override
void readFields(DataInputStream in, int logVersion) throws IOException {
long id = FSImageSerialization.readLong(in);
String path = FSImageSerialization.readString(in);
short replication = FSImageSerialization.readShort(in);
String pool = FSImageSerialization.readString(in);
long expiryTime = FSImageSerialization.readLong(in);
directive = new CacheDirectiveInfo.Builder().
setId(id).
setPath(new Path(path)).
setReplication(replication).
setPool(pool).
setExpiration(CacheDirectiveInfo.Expiration.newAbsolute(expiryTime)).
build();
directive = FSImageSerialization.readCacheDirectiveInfo(in);
readRpcIds(in, logVersion);
}
@Override
public void writeFields(DataOutputStream out) throws IOException {
FSImageSerialization.writeLong(directive.getId(), out);
FSImageSerialization.writeString(directive.getPath().toUri().getPath(), out);
FSImageSerialization.writeShort(directive.getReplication(), out);
FSImageSerialization.writeString(directive.getPool(), out);
FSImageSerialization.writeLong(
directive.getExpiration().getMillis(), out);
FSImageSerialization.writeCacheDirectiveInfo(out, directive);
writeRpcIds(rpcClientId, rpcCallId, out);
}
@Override
protected void toXml(ContentHandler contentHandler) throws SAXException {
XMLUtils.addSaxString(contentHandler, "ID",
directive.getId().toString());
XMLUtils.addSaxString(contentHandler, "PATH",
directive.getPath().toUri().getPath());
XMLUtils.addSaxString(contentHandler, "REPLICATION",
Short.toString(directive.getReplication()));
XMLUtils.addSaxString(contentHandler, "POOL", directive.getPool());
XMLUtils.addSaxString(contentHandler, "EXPIRATION",
"" + directive.getExpiration().getMillis());
FSImageSerialization.writeCacheDirectiveInfo(contentHandler, directive);
appendRpcIdsToXml(contentHandler, rpcClientId, rpcCallId);
}
@Override
void fromXml(Stanza st) throws InvalidXmlException {
directive = new CacheDirectiveInfo.Builder().
setId(Long.parseLong(st.getValue("ID"))).
setPath(new Path(st.getValue("PATH"))).
setReplication(Short.parseShort(st.getValue("REPLICATION"))).
setPool(st.getValue("POOL")).
setExpiration(CacheDirectiveInfo.Expiration.newAbsolute(
Long.parseLong(st.getValue("EXPIRATION")))).
build();
directive = FSImageSerialization.readCacheDirectiveInfo(st);
readRpcIdsFromXml(st);
}
@ -2988,104 +2955,25 @@ public abstract class FSEditLogOp {
@Override
void readFields(DataInputStream in, int logVersion) throws IOException {
CacheDirectiveInfo.Builder builder =
new CacheDirectiveInfo.Builder();
builder.setId(FSImageSerialization.readLong(in));
byte flags = in.readByte();
if ((flags & 0x1) != 0) {
builder.setPath(new Path(FSImageSerialization.readString(in)));
}
if ((flags & 0x2) != 0) {
builder.setReplication(FSImageSerialization.readShort(in));
}
if ((flags & 0x4) != 0) {
builder.setPool(FSImageSerialization.readString(in));
}
if ((flags & 0x8) != 0) {
builder.setExpiration(
CacheDirectiveInfo.Expiration.newAbsolute(
FSImageSerialization.readLong(in)));
}
if ((flags & ~0xF) != 0) {
throw new IOException("unknown flags set in " +
"ModifyCacheDirectiveInfoOp: " + flags);
}
this.directive = builder.build();
this.directive = FSImageSerialization.readCacheDirectiveInfo(in);
readRpcIds(in, logVersion);
}
@Override
public void writeFields(DataOutputStream out) throws IOException {
FSImageSerialization.writeLong(directive.getId(), out);
byte flags = (byte)(
((directive.getPath() != null) ? 0x1 : 0) |
((directive.getReplication() != null) ? 0x2 : 0) |
((directive.getPool() != null) ? 0x4 : 0) |
((directive.getExpiration() != null) ? 0x8 : 0)
);
out.writeByte(flags);
if (directive.getPath() != null) {
FSImageSerialization.writeString(
directive.getPath().toUri().getPath(), out);
}
if (directive.getReplication() != null) {
FSImageSerialization.writeShort(directive.getReplication(), out);
}
if (directive.getPool() != null) {
FSImageSerialization.writeString(directive.getPool(), out);
}
if (directive.getExpiration() != null) {
FSImageSerialization.writeLong(directive.getExpiration().getMillis(),
out);
}
FSImageSerialization.writeCacheDirectiveInfo(out, directive);
writeRpcIds(rpcClientId, rpcCallId, out);
}
@Override
protected void toXml(ContentHandler contentHandler) throws SAXException {
XMLUtils.addSaxString(contentHandler, "ID",
Long.toString(directive.getId()));
if (directive.getPath() != null) {
XMLUtils.addSaxString(contentHandler, "PATH",
directive.getPath().toUri().getPath());
}
if (directive.getReplication() != null) {
XMLUtils.addSaxString(contentHandler, "REPLICATION",
Short.toString(directive.getReplication()));
}
if (directive.getPool() != null) {
XMLUtils.addSaxString(contentHandler, "POOL", directive.getPool());
}
if (directive.getExpiration() != null) {
XMLUtils.addSaxString(contentHandler, "EXPIRATION",
"" + directive.getExpiration().getMillis());
}
FSImageSerialization.writeCacheDirectiveInfo(contentHandler, directive);
appendRpcIdsToXml(contentHandler, rpcClientId, rpcCallId);
}
@Override
void fromXml(Stanza st) throws InvalidXmlException {
CacheDirectiveInfo.Builder builder =
new CacheDirectiveInfo.Builder();
builder.setId(Long.parseLong(st.getValue("ID")));
String path = st.getValueOrNull("PATH");
if (path != null) {
builder.setPath(new Path(path));
}
String replicationString = st.getValueOrNull("REPLICATION");
if (replicationString != null) {
builder.setReplication(Short.parseShort(replicationString));
}
String pool = st.getValueOrNull("POOL");
if (pool != null) {
builder.setPool(pool);
}
String expiryTime = st.getValueOrNull("EXPIRATION");
if (expiryTime != null) {
builder.setExpiration(CacheDirectiveInfo.Expiration.newAbsolute(
Long.parseLong(expiryTime)));
}
this.directive = builder.build();
this.directive = FSImageSerialization.readCacheDirectiveInfo(st);
readRpcIdsFromXml(st);
}
@ -3184,30 +3072,35 @@ public abstract class FSEditLogOp {
public AddCachePoolOp setPool(CachePoolInfo info) {
this.info = info;
assert(info.getPoolName() != null);
assert(info.getOwnerName() != null);
assert(info.getGroupName() != null);
assert(info.getMode() != null);
assert(info.getLimit() != null);
return this;
}
@Override
void readFields(DataInputStream in, int logVersion) throws IOException {
info = CachePoolInfo.readFrom(in);
info = FSImageSerialization.readCachePoolInfo(in);
readRpcIds(in, logVersion);
}
@Override
public void writeFields(DataOutputStream out) throws IOException {
info.writeTo(out);
FSImageSerialization.writeCachePoolInfo(out, info);
writeRpcIds(rpcClientId, rpcCallId, out);
}
@Override
protected void toXml(ContentHandler contentHandler) throws SAXException {
info.writeXmlTo(contentHandler);
FSImageSerialization.writeCachePoolInfo(contentHandler, info);
appendRpcIdsToXml(contentHandler, rpcClientId, rpcCallId);
}
@Override
void fromXml(Stanza st) throws InvalidXmlException {
this.info = CachePoolInfo.readXmlFrom(st);
this.info = FSImageSerialization.readCachePoolInfo(st);
readRpcIdsFromXml(st);
}
@ -3219,7 +3112,7 @@ public abstract class FSEditLogOp {
builder.append("ownerName=" + info.getOwnerName() + ",");
builder.append("groupName=" + info.getGroupName() + ",");
builder.append("mode=" + Short.toString(info.getMode().toShort()) + ",");
builder.append("weight=" + Integer.toString(info.getWeight()));
builder.append("limit=" + Long.toString(info.getLimit()));
appendRpcIdsToString(builder, rpcClientId, rpcCallId);
builder.append("]");
return builder.toString();
@ -3245,25 +3138,25 @@ public abstract class FSEditLogOp {
@Override
void readFields(DataInputStream in, int logVersion) throws IOException {
info = CachePoolInfo.readFrom(in);
info = FSImageSerialization.readCachePoolInfo(in);
readRpcIds(in, logVersion);
}
@Override
public void writeFields(DataOutputStream out) throws IOException {
info.writeTo(out);
FSImageSerialization.writeCachePoolInfo(out, info);
writeRpcIds(rpcClientId, rpcCallId, out);
}
@Override
protected void toXml(ContentHandler contentHandler) throws SAXException {
cachePoolInfoToXml(contentHandler, info);
FSImageSerialization.writeCachePoolInfo(contentHandler, info);
appendRpcIdsToXml(contentHandler, rpcClientId, rpcCallId);
}
@Override
void fromXml(Stanza st) throws InvalidXmlException {
this.info = cachePoolInfoFromXml(st);
this.info = FSImageSerialization.readCachePoolInfo(st);
readRpcIdsFromXml(st);
}
@ -3284,8 +3177,8 @@ public abstract class FSEditLogOp {
if (info.getMode() != null) {
fields.add("mode=" + info.getMode().toString());
}
if (info.getWeight() != null) {
fields.add("weight=" + info.getWeight());
if (info.getLimit() != null) {
fields.add("limit=" + info.getLimit());
}
builder.append(Joiner.on(",").join(fields));
appendRpcIdsToString(builder, rpcClientId, rpcCallId);
@ -3757,41 +3650,4 @@ public abstract class FSEditLogOp {
short mode = Short.valueOf(st.getValue("MODE"));
return new FsPermission(mode);
}
public static void cachePoolInfoToXml(ContentHandler contentHandler,
CachePoolInfo info) throws SAXException {
XMLUtils.addSaxString(contentHandler, "POOLNAME", info.getPoolName());
if (info.getOwnerName() != null) {
XMLUtils.addSaxString(contentHandler, "OWNERNAME", info.getOwnerName());
}
if (info.getGroupName() != null) {
XMLUtils.addSaxString(contentHandler, "GROUPNAME", info.getGroupName());
}
if (info.getMode() != null) {
fsPermissionToXml(contentHandler, info.getMode());
}
if (info.getWeight() != null) {
XMLUtils.addSaxString(contentHandler, "WEIGHT",
Integer.toString(info.getWeight()));
}
}
public static CachePoolInfo cachePoolInfoFromXml(Stanza st)
throws InvalidXmlException {
String poolName = st.getValue("POOLNAME");
CachePoolInfo info = new CachePoolInfo(poolName);
if (st.hasChildren("OWNERNAME")) {
info.setOwnerName(st.getValue("OWNERNAME"));
}
if (st.hasChildren("GROUPNAME")) {
info.setGroupName(st.getValue("GROUPNAME"));
}
if (st.hasChildren("MODE")) {
info.setMode(FSEditLogOp.fsPermissionFromXml(st));
}
if (st.hasChildren("WEIGHT")) {
info.setWeight(Integer.parseInt(st.getValue("WEIGHT")));
}
return info;
}
}

View File

@ -30,6 +30,8 @@ import org.apache.hadoop.fs.permission.PermissionStatus;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.DeprecatedUTF8;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo;
import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
import org.apache.hadoop.hdfs.protocol.LayoutVersion;
import org.apache.hadoop.hdfs.protocol.LayoutVersion.Feature;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
@ -38,11 +40,16 @@ import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
import org.apache.hadoop.hdfs.server.namenode.snapshot.INodeDirectorySnapshottable;
import org.apache.hadoop.hdfs.server.namenode.snapshot.SnapshotFSImageFormat;
import org.apache.hadoop.hdfs.server.namenode.snapshot.SnapshotFSImageFormat.ReferenceMap;
import org.apache.hadoop.hdfs.util.XMLUtils;
import org.apache.hadoop.hdfs.util.XMLUtils.InvalidXmlException;
import org.apache.hadoop.hdfs.util.XMLUtils.Stanza;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.ShortWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableUtils;
import org.xml.sax.ContentHandler;
import org.xml.sax.SAXException;
import com.google.common.base.Preconditions;
@ -476,4 +483,202 @@ public class FSImageSerialization {
}
return ret;
}
public static void writeCacheDirectiveInfo(DataOutputStream out,
CacheDirectiveInfo directive) throws IOException {
writeLong(directive.getId(), out);
int flags =
((directive.getPath() != null) ? 0x1 : 0) |
((directive.getReplication() != null) ? 0x2 : 0) |
((directive.getPool() != null) ? 0x4 : 0) |
((directive.getExpiration() != null) ? 0x8 : 0);
out.writeInt(flags);
if (directive.getPath() != null) {
writeString(directive.getPath().toUri().getPath(), out);
}
if (directive.getReplication() != null) {
writeShort(directive.getReplication(), out);
}
if (directive.getPool() != null) {
writeString(directive.getPool(), out);
}
if (directive.getExpiration() != null) {
writeLong(directive.getExpiration().getMillis(), out);
}
}
public static CacheDirectiveInfo readCacheDirectiveInfo(DataInput in)
throws IOException {
CacheDirectiveInfo.Builder builder =
new CacheDirectiveInfo.Builder();
builder.setId(readLong(in));
int flags = in.readInt();
if ((flags & 0x1) != 0) {
builder.setPath(new Path(readString(in)));
}
if ((flags & 0x2) != 0) {
builder.setReplication(readShort(in));
}
if ((flags & 0x4) != 0) {
builder.setPool(readString(in));
}
if ((flags & 0x8) != 0) {
builder.setExpiration(
CacheDirectiveInfo.Expiration.newAbsolute(readLong(in)));
}
if ((flags & ~0xF) != 0) {
throw new IOException("unknown flags set in " +
"ModifyCacheDirectiveInfoOp: " + flags);
}
return builder.build();
}
public static CacheDirectiveInfo readCacheDirectiveInfo(Stanza st)
throws InvalidXmlException {
CacheDirectiveInfo.Builder builder =
new CacheDirectiveInfo.Builder();
builder.setId(Long.parseLong(st.getValue("ID")));
String path = st.getValueOrNull("PATH");
if (path != null) {
builder.setPath(new Path(path));
}
String replicationString = st.getValueOrNull("REPLICATION");
if (replicationString != null) {
builder.setReplication(Short.parseShort(replicationString));
}
String pool = st.getValueOrNull("POOL");
if (pool != null) {
builder.setPool(pool);
}
String expiryTime = st.getValueOrNull("EXPIRATION");
if (expiryTime != null) {
builder.setExpiration(CacheDirectiveInfo.Expiration.newAbsolute(
Long.parseLong(expiryTime)));
}
return builder.build();
}
public static void writeCacheDirectiveInfo(ContentHandler contentHandler,
CacheDirectiveInfo directive) throws SAXException {
XMLUtils.addSaxString(contentHandler, "ID",
Long.toString(directive.getId()));
if (directive.getPath() != null) {
XMLUtils.addSaxString(contentHandler, "PATH",
directive.getPath().toUri().getPath());
}
if (directive.getReplication() != null) {
XMLUtils.addSaxString(contentHandler, "REPLICATION",
Short.toString(directive.getReplication()));
}
if (directive.getPool() != null) {
XMLUtils.addSaxString(contentHandler, "POOL", directive.getPool());
}
if (directive.getExpiration() != null) {
XMLUtils.addSaxString(contentHandler, "EXPIRATION",
"" + directive.getExpiration().getMillis());
}
}
public static void writeCachePoolInfo(DataOutputStream out, CachePoolInfo info)
throws IOException {
writeString(info.getPoolName(), out);
final String ownerName = info.getOwnerName();
final String groupName = info.getGroupName();
final Long limit = info.getLimit();
final FsPermission mode = info.getMode();
boolean hasOwner, hasGroup, hasMode, hasLimit;
hasOwner = ownerName != null;
hasGroup = groupName != null;
hasMode = mode != null;
hasLimit = limit != null;
int flags =
(hasOwner ? 0x1 : 0) |
(hasGroup ? 0x2 : 0) |
(hasMode ? 0x4 : 0) |
(hasLimit ? 0x8 : 0);
writeInt(flags, out);
if (hasOwner) {
writeString(ownerName, out);
}
if (hasGroup) {
writeString(groupName, out);
}
if (hasMode) {
mode.write(out);
}
if (hasLimit) {
writeLong(limit, out);
}
}
public static CachePoolInfo readCachePoolInfo(DataInput in)
throws IOException {
String poolName = readString(in);
CachePoolInfo info = new CachePoolInfo(poolName);
int flags = readInt(in);
if ((flags & 0x1) != 0) {
info.setOwnerName(readString(in));
}
if ((flags & 0x2) != 0) {
info.setGroupName(readString(in));
}
if ((flags & 0x4) != 0) {
info.setMode(FsPermission.read(in));
}
if ((flags & 0x8) != 0) {
info.setLimit(readLong(in));
}
if ((flags & ~0xF) != 0) {
throw new IOException("Unknown flag in CachePoolInfo: " + flags);
}
return info;
}
public static void writeCachePoolInfo(ContentHandler contentHandler,
CachePoolInfo info) throws SAXException {
XMLUtils.addSaxString(contentHandler, "POOLNAME", info.getPoolName());
final String ownerName = info.getOwnerName();
final String groupName = info.getGroupName();
final Long limit = info.getLimit();
final FsPermission mode = info.getMode();
if (ownerName != null) {
XMLUtils.addSaxString(contentHandler, "OWNERNAME", ownerName);
}
if (groupName != null) {
XMLUtils.addSaxString(contentHandler, "GROUPNAME", groupName);
}
if (mode != null) {
FSEditLogOp.fsPermissionToXml(contentHandler, mode);
}
if (limit != null) {
XMLUtils.addSaxString(contentHandler, "LIMIT",
Long.toString(limit));
}
}
public static CachePoolInfo readCachePoolInfo(Stanza st)
throws InvalidXmlException {
String poolName = st.getValue("POOLNAME");
CachePoolInfo info = new CachePoolInfo(poolName);
if (st.hasChildren("OWNERNAME")) {
info.setOwnerName(st.getValue("OWNERNAME"));
}
if (st.hasChildren("GROUPNAME")) {
info.setGroupName(st.getValue("GROUPNAME"));
}
if (st.hasChildren("MODE")) {
info.setMode(FSEditLogOp.fsPermissionFromXml(st));
}
if (st.hasChildren("LIMIT")) {
info.setLimit(Long.parseLong(st.getValue("LIMIT")));
}
return info;
}
}

View File

@ -126,6 +126,7 @@ import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BatchedRemoteIterator.BatchedListEntries;
import org.apache.hadoop.fs.CacheFlag;
import org.apache.hadoop.fs.ContentSummary;
import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.DirectoryListingStartAfterNotFoundException;
@ -7052,8 +7053,8 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
}
}
long addCacheDirective(
CacheDirectiveInfo directive) throws IOException {
long addCacheDirective(CacheDirectiveInfo directive, EnumSet<CacheFlag> flags)
throws IOException {
checkOperation(OperationCategory.WRITE);
final FSPermissionChecker pc = isPermissionEnabled ?
getPermissionChecker() : null;
@ -7076,7 +7077,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
"for this operation.");
}
CacheDirectiveInfo effectiveDirective =
cacheManager.addDirective(directive, pc);
cacheManager.addDirective(directive, pc, flags);
getEditLog().logAddCacheDirectiveInfo(effectiveDirective,
cacheEntry != null);
result = effectiveDirective.getId();
@ -7094,8 +7095,8 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
return result;
}
void modifyCacheDirective(
CacheDirectiveInfo directive) throws IOException {
void modifyCacheDirective(CacheDirectiveInfo directive,
EnumSet<CacheFlag> flags) throws IOException {
checkOperation(OperationCategory.WRITE);
final FSPermissionChecker pc = isPermissionEnabled ?
getPermissionChecker() : null;
@ -7111,7 +7112,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
throw new SafeModeException(
"Cannot add cache directive", safeMode);
}
cacheManager.modifyDirective(directive, pc);
cacheManager.modifyDirective(directive, pc, flags);
getEditLog().logModifyCacheDirectiveInfo(directive,
cacheEntry != null);
success = true;

View File

@ -29,6 +29,7 @@ import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.Arrays;
import java.util.Collection;
import java.util.EnumSet;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
@ -36,6 +37,7 @@ import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CacheFlag;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.ContentSummary;
import org.apache.hadoop.fs.CreateFlag;
@ -1239,14 +1241,14 @@ class NameNodeRpcServer implements NamenodeProtocols {
@Override
public long addCacheDirective(
CacheDirectiveInfo path) throws IOException {
return namesystem.addCacheDirective(path);
CacheDirectiveInfo path, EnumSet<CacheFlag> flags) throws IOException {
return namesystem.addCacheDirective(path, flags);
}
@Override
public void modifyCacheDirective(
CacheDirectiveInfo directive) throws IOException {
namesystem.modifyCacheDirective(directive);
CacheDirectiveInfo directive, EnumSet<CacheFlag> flags) throws IOException {
namesystem.modifyCacheDirective(directive, flags);
}
@Override

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.hdfs.tools;
import java.io.IOException;
import java.util.EnumSet;
import java.util.LinkedList;
import java.util.List;
@ -25,6 +26,7 @@ import org.apache.commons.lang.WordUtils;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.CacheFlag;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
@ -135,6 +137,7 @@ public class CacheAdmin extends Configured implements Tool {
public String getShortUsage() {
return "[" + getName() +
" -path <path> -pool <pool-name> " +
"[-force] " +
"[-replication <replication>] [-ttl <time-to-live>]]\n";
}
@ -146,6 +149,8 @@ public class CacheAdmin extends Configured implements Tool {
listing.addRow("<pool-name>", "The pool to which the directive will be " +
"added. You must have write permission on the cache pool "
+ "in order to add new directives.");
listing.addRow("-force",
"Skips checking of cache pool resource limits.");
listing.addRow("<replication>", "The cache replication factor to use. " +
"Defaults to 1.");
listing.addRow("<time-to-live>", "How long the directive is " +
@ -174,7 +179,7 @@ public class CacheAdmin extends Configured implements Tool {
return 1;
}
builder.setPool(poolName);
boolean force = StringUtils.popOption("-force", args);
String replicationString =
StringUtils.popOptionWithArgument("-replication", args);
if (replicationString != null) {
@ -201,8 +206,12 @@ public class CacheAdmin extends Configured implements Tool {
DistributedFileSystem dfs = getDFS(conf);
CacheDirectiveInfo directive = builder.build();
EnumSet<CacheFlag> flags = EnumSet.noneOf(CacheFlag.class);
if (force) {
flags.add(CacheFlag.FORCE);
}
try {
long id = dfs.addCacheDirective(directive);
long id = dfs.addCacheDirective(directive, flags);
System.out.println("Added cache directive " + id);
} catch (IOException e) {
System.err.println(prettifyException(e));
@ -282,7 +291,7 @@ public class CacheAdmin extends Configured implements Tool {
@Override
public String getShortUsage() {
return "[" + getName() +
" -id <id> [-path <path>] [-replication <replication>] " +
" -id <id> [-path <path>] [-force] [-replication <replication>] " +
"[-pool <pool-name>] [-ttl <time-to-live>]]\n";
}
@ -292,6 +301,8 @@ public class CacheAdmin extends Configured implements Tool {
listing.addRow("<id>", "The ID of the directive to modify (required)");
listing.addRow("<path>", "A path to cache. The path can be " +
"a directory or a file. (optional)");
listing.addRow("-force",
"Skips checking of cache pool resource limits.");
listing.addRow("<replication>", "The cache replication factor to use. " +
"(optional)");
listing.addRow("<pool-name>", "The pool to which the directive will be " +
@ -322,6 +333,7 @@ public class CacheAdmin extends Configured implements Tool {
builder.setPath(new Path(path));
modified = true;
}
boolean force = StringUtils.popOption("-force", args);
String replicationString =
StringUtils.popOptionWithArgument("-replication", args);
if (replicationString != null) {
@ -357,8 +369,12 @@ public class CacheAdmin extends Configured implements Tool {
return 1;
}
DistributedFileSystem dfs = getDFS(conf);
EnumSet<CacheFlag> flags = EnumSet.noneOf(CacheFlag.class);
if (force) {
flags.add(CacheFlag.FORCE);
}
try {
dfs.modifyCacheDirective(builder.build());
dfs.modifyCacheDirective(builder.build(), flags);
System.out.println("Modified cache directive " + idString);
} catch (IOException e) {
System.err.println(prettifyException(e));
@ -536,7 +552,7 @@ public class CacheAdmin extends Configured implements Tool {
@Override
public String getShortUsage() {
return "[" + NAME + " <name> [-owner <owner>] " +
"[-group <group>] [-mode <mode>] [-weight <weight>]]\n";
"[-group <group>] [-mode <mode>] [-limit <limit>]]\n";
}
@Override
@ -551,11 +567,10 @@ public class CacheAdmin extends Configured implements Tool {
listing.addRow("<mode>", "UNIX-style permissions for the pool. " +
"Permissions are specified in octal, e.g. 0755. " +
"By default, this is set to " + String.format("0%03o",
FsPermission.getCachePoolDefault().toShort()));
listing.addRow("<weight>", "Weight of the pool. " +
"This is a relative measure of the importance of the pool used " +
"during cache resource management. By default, it is set to " +
CachePool.DEFAULT_WEIGHT);
FsPermission.getCachePoolDefault().toShort()) + ".");
listing.addRow("<limit>", "The maximum number of bytes that can be " +
"cached by directives in this pool, in aggregate. By default, " +
"no limit is set.");
return getShortUsage() + "\n" +
"Add a new cache pool.\n\n" +
@ -564,34 +579,32 @@ public class CacheAdmin extends Configured implements Tool {
@Override
public int run(Configuration conf, List<String> args) throws IOException {
String owner = StringUtils.popOptionWithArgument("-owner", args);
if (owner == null) {
owner = UserGroupInformation.getCurrentUser().getShortUserName();
}
String group = StringUtils.popOptionWithArgument("-group", args);
if (group == null) {
group = UserGroupInformation.getCurrentUser().getGroupNames()[0];
}
String modeString = StringUtils.popOptionWithArgument("-mode", args);
int mode;
if (modeString == null) {
mode = FsPermission.getCachePoolDefault().toShort();
} else {
mode = Integer.parseInt(modeString, 8);
}
String weightString = StringUtils.popOptionWithArgument("-weight", args);
int weight;
if (weightString == null) {
weight = CachePool.DEFAULT_WEIGHT;
} else {
weight = Integer.parseInt(weightString);
}
String name = StringUtils.popFirstNonOption(args);
if (name == null) {
System.err.println("You must specify a name when creating a " +
"cache pool.");
return 1;
}
CachePoolInfo info = new CachePoolInfo(name);
String owner = StringUtils.popOptionWithArgument("-owner", args);
if (owner != null) {
info.setOwnerName(owner);
}
String group = StringUtils.popOptionWithArgument("-group", args);
if (group != null) {
info.setGroupName(group);
}
String modeString = StringUtils.popOptionWithArgument("-mode", args);
if (modeString != null) {
short mode = Short.parseShort(modeString, 8);
info.setMode(new FsPermission(mode));
}
String limitString = StringUtils.popOptionWithArgument("-limit", args);
if (limitString != null) {
long limit = Long.parseLong(limitString);
info.setLimit(limit);
}
if (!args.isEmpty()) {
System.err.print("Can't understand arguments: " +
Joiner.on(" ").join(args) + "\n");
@ -599,11 +612,6 @@ public class CacheAdmin extends Configured implements Tool {
return 1;
}
DistributedFileSystem dfs = getDFS(conf);
CachePoolInfo info = new CachePoolInfo(name).
setOwnerName(owner).
setGroupName(group).
setMode(new FsPermission((short)mode)).
setWeight(weight);
try {
dfs.addCachePool(info);
} catch (IOException e) {
@ -624,7 +632,7 @@ public class CacheAdmin extends Configured implements Tool {
@Override
public String getShortUsage() {
return "[" + getName() + " <name> [-owner <owner>] " +
"[-group <group>] [-mode <mode>] [-weight <weight>]]\n";
"[-group <group>] [-mode <mode>] [-limit <limit>]]\n";
}
@Override
@ -635,11 +643,12 @@ public class CacheAdmin extends Configured implements Tool {
listing.addRow("<owner>", "Username of the owner of the pool");
listing.addRow("<group>", "Groupname of the group of the pool.");
listing.addRow("<mode>", "Unix-style permissions of the pool in octal.");
listing.addRow("<weight>", "Weight of the pool.");
listing.addRow("<limit>", "Maximum number of bytes that can be cached " +
"by this pool.");
return getShortUsage() + "\n" +
WordUtils.wrap("Modifies the metadata of an existing cache pool. " +
"See usage of " + AddCachePoolCommand.NAME + " for more details",
"See usage of " + AddCachePoolCommand.NAME + " for more details.",
MAX_LINE_WIDTH) + "\n\n" +
listing.toString();
}
@ -651,9 +660,9 @@ public class CacheAdmin extends Configured implements Tool {
String modeString = StringUtils.popOptionWithArgument("-mode", args);
Integer mode = (modeString == null) ?
null : Integer.parseInt(modeString, 8);
String weightString = StringUtils.popOptionWithArgument("-weight", args);
Integer weight = (weightString == null) ?
null : Integer.parseInt(weightString);
String limitString = StringUtils.popOptionWithArgument("-limit", args);
Long limit = (limitString == null) ?
null : Long.parseLong(limitString);
String name = StringUtils.popFirstNonOption(args);
if (name == null) {
System.err.println("You must specify a name when creating a " +
@ -680,8 +689,8 @@ public class CacheAdmin extends Configured implements Tool {
info.setMode(new FsPermission(mode.shortValue()));
changed = true;
}
if (weight != null) {
info.setWeight(weight);
if (limit != null) {
info.setLimit(limit);
changed = true;
}
if (!changed) {
@ -709,8 +718,8 @@ public class CacheAdmin extends Configured implements Tool {
System.out.print(prefix + "mode " + new FsPermission(mode.shortValue()));
prefix = " and ";
}
if (weight != null) {
System.out.print(prefix + "weight " + weight);
if (limit != null) {
System.out.print(prefix + "limit " + limit);
prefix = " and ";
}
System.out.print("\n");
@ -804,11 +813,12 @@ public class CacheAdmin extends Configured implements Tool {
addField("OWNER", Justification.LEFT).
addField("GROUP", Justification.LEFT).
addField("MODE", Justification.LEFT).
addField("WEIGHT", Justification.RIGHT);
addField("LIMIT", Justification.RIGHT);
if (printStats) {
builder.
addField("BYTES_NEEDED", Justification.RIGHT).
addField("BYTES_CACHED", Justification.RIGHT).
addField("BYTES_OVERLIMIT", Justification.RIGHT).
addField("FILES_NEEDED", Justification.RIGHT).
addField("FILES_CACHED", Justification.RIGHT);
}
@ -825,12 +835,19 @@ public class CacheAdmin extends Configured implements Tool {
row.add(info.getOwnerName());
row.add(info.getGroupName());
row.add(info.getMode() != null ? info.getMode().toString() : null);
row.add(
info.getWeight() != null ? info.getWeight().toString() : null);
Long limit = info.getLimit();
String limitString;
if (limit != null && limit.equals(CachePool.DEFAULT_LIMIT)) {
limitString = "unlimited";
} else {
limitString = "" + limit;
}
row.add(limitString);
if (printStats) {
CachePoolStats stats = entry.getStats();
row.add(Long.toString(stats.getBytesNeeded()));
row.add(Long.toString(stats.getBytesCached()));
row.add(Long.toString(stats.getBytesOverlimit()));
row.add(Long.toString(stats.getFilesNeeded()));
row.add(Long.toString(stats.getFilesCached()));
}

View File

@ -385,8 +385,13 @@ message CacheDirectiveStatsProto {
required bool hasExpired = 5;
}
enum CacheFlagProto {
FORCE = 0x01; // Ignore pool resource limits
}
message AddCacheDirectiveRequestProto {
required CacheDirectiveInfoProto info = 1;
optional uint32 cacheFlags = 2; // bits set using CacheFlag
}
message AddCacheDirectiveResponseProto {
@ -395,6 +400,7 @@ message AddCacheDirectiveResponseProto {
message ModifyCacheDirectiveRequestProto {
required CacheDirectiveInfoProto info = 1;
optional uint32 cacheFlags = 2; // bits set using CacheFlag
}
message ModifyCacheDirectiveResponseProto {
@ -427,14 +433,15 @@ message CachePoolInfoProto {
optional string ownerName = 2;
optional string groupName = 3;
optional int32 mode = 4;
optional int32 weight = 5;
optional int64 limit = 5;
}
message CachePoolStatsProto {
required int64 bytesNeeded = 1;
required int64 bytesCached = 2;
required int64 filesNeeded = 3;
required int64 filesCached = 4;
required int64 bytesOverlimit = 3;
required int64 filesNeeded = 4;
required int64 filesCached = 5;
}
message AddCachePoolRequestProto {

View File

@ -1036,20 +1036,20 @@ public class DFSTestUtil {
// OP_ADD_CACHE_POOL
filesystem.addCachePool(new CachePoolInfo("pool1"));
// OP_MODIFY_CACHE_POOL
filesystem.modifyCachePool(new CachePoolInfo("pool1").setWeight(99));
filesystem.modifyCachePool(new CachePoolInfo("pool1").setLimit(99l));
// OP_ADD_PATH_BASED_CACHE_DIRECTIVE
long id = filesystem.addCacheDirective(
new CacheDirectiveInfo.Builder().
setPath(new Path("/path")).
setReplication((short)1).
setPool("pool1").
build());
build(), EnumSet.of(CacheFlag.FORCE));
// OP_MODIFY_PATH_BASED_CACHE_DIRECTIVE
filesystem.modifyCacheDirective(
new CacheDirectiveInfo.Builder().
setId(id).
setReplication((short)2).
build());
build(), EnumSet.of(CacheFlag.FORCE));
// OP_REMOVE_PATH_BASED_CACHE_DIRECTIVE
filesystem.removeCacheDirective(id);
// OP_REMOVE_CACHE_POOL

View File

@ -239,7 +239,7 @@ public class OfflineEditsViewerHelper {
.setOwnerName("carlton")
.setGroupName("party")
.setMode(new FsPermission((short)0700))
.setWeight(1989));
.setLimit(1989l));
// OP_ADD_PATH_BASED_CACHE_DIRECTIVE 33
long id = dfs.addCacheDirective(
new CacheDirectiveInfo.Builder().

View File

@ -34,6 +34,7 @@ import java.io.IOException;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Date;
import java.util.EnumSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
@ -43,6 +44,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.CacheFlag;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileSystemTestHelper;
@ -92,25 +94,48 @@ public class TestCacheDirectives {
static private MiniDFSCluster cluster;
static private DistributedFileSystem dfs;
static private NamenodeProtocols proto;
static private NameNode namenode;
static private CacheManipulator prevCacheManipulator;
static {
EditLogFileOutputStream.setShouldSkipFsyncForTesting(false);
}
@Before
public void setup() throws Exception {
conf = new HdfsConfiguration();
private static final long BLOCK_SIZE = 512;
private static final int NUM_DATANODES = 4;
// Most Linux installs will allow non-root users to lock 64KB.
// In this test though, we stub out mlock so this doesn't matter.
private static final long CACHE_CAPACITY = 64 * 1024 / NUM_DATANODES;
private static HdfsConfiguration createCachingConf() {
HdfsConfiguration conf = new HdfsConfiguration();
conf.setLong(DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
conf.setLong(DFS_DATANODE_MAX_LOCKED_MEMORY_KEY, CACHE_CAPACITY);
conf.setLong(DFS_HEARTBEAT_INTERVAL_KEY, 1);
conf.setBoolean(DFS_NAMENODE_CACHING_ENABLED_KEY, true);
conf.setLong(DFS_CACHEREPORT_INTERVAL_MSEC_KEY, 1000);
conf.setLong(DFS_NAMENODE_PATH_BASED_CACHE_REFRESH_INTERVAL_MS, 1000);
// set low limits here for testing purposes
conf.setInt(DFSConfigKeys.DFS_NAMENODE_LIST_CACHE_POOLS_NUM_RESPONSES, 2);
conf.setInt(DFSConfigKeys.DFS_NAMENODE_LIST_CACHE_DIRECTIVES_NUM_RESPONSES, 2);
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
conf.setInt(DFSConfigKeys.DFS_NAMENODE_LIST_CACHE_DIRECTIVES_NUM_RESPONSES,
2);
return conf;
}
@Before
public void setup() throws Exception {
conf = createCachingConf();
cluster =
new MiniDFSCluster.Builder(conf).numDataNodes(NUM_DATANODES).build();
cluster.waitActive();
dfs = cluster.getFileSystem();
proto = cluster.getNameNodeRpc();
namenode = cluster.getNameNode();
prevCacheManipulator = NativeIO.POSIX.getCacheManipulator();
NativeIO.POSIX.setCacheManipulator(new NoMlockCacheManipulator());
LogManager.getLogger(CacheReplicationMonitor.class).setLevel(Level.TRACE);
LogManager.getLogger(CacheReplicationMonitor.class.getName()).setLevel(
Level.TRACE);
}
@After
@ -127,7 +152,7 @@ public class TestCacheDirectives {
final String poolName = "pool1";
CachePoolInfo info = new CachePoolInfo(poolName).
setOwnerName("bob").setGroupName("bobgroup").
setMode(new FsPermission((short)0755)).setWeight(150);
setMode(new FsPermission((short)0755)).setLimit(150l);
// Add a pool
dfs.addCachePool(info);
@ -168,7 +193,7 @@ public class TestCacheDirectives {
// Modify the pool
info.setOwnerName("jane").setGroupName("janegroup")
.setMode(new FsPermission((short)0700)).setWeight(314);
.setMode(new FsPermission((short)0700)).setLimit(314l);
dfs.modifyCachePool(info);
// Do some invalid modify pools
@ -263,10 +288,10 @@ public class TestCacheDirectives {
String ownerName = "abc";
String groupName = "123";
FsPermission mode = new FsPermission((short)0755);
int weight = 150;
long limit = 150;
dfs.addCachePool(new CachePoolInfo(poolName).
setOwnerName(ownerName).setGroupName(groupName).
setMode(mode).setWeight(weight));
setMode(mode).setLimit(limit));
RemoteIterator<CachePoolEntry> iter = dfs.listCachePools();
CachePoolInfo info = iter.next().getInfo();
@ -277,10 +302,10 @@ public class TestCacheDirectives {
ownerName = "def";
groupName = "456";
mode = new FsPermission((short)0700);
weight = 151;
limit = 151;
dfs.modifyCachePool(new CachePoolInfo(poolName).
setOwnerName(ownerName).setGroupName(groupName).
setMode(mode).setWeight(weight));
setMode(mode).setLimit(limit));
iter = dfs.listCachePools();
info = iter.next().getInfo();
@ -288,7 +313,7 @@ public class TestCacheDirectives {
assertEquals(ownerName, info.getOwnerName());
assertEquals(groupName, info.getGroupName());
assertEquals(mode, info.getMode());
assertEquals(Integer.valueOf(weight), info.getWeight());
assertEquals(limit, (long)info.getLimit());
dfs.removeCachePool(poolName);
iter = dfs.listCachePools();
@ -495,30 +520,22 @@ public class TestCacheDirectives {
@Test(timeout=60000)
public void testCacheManagerRestart() throws Exception {
cluster.shutdown();
cluster = null;
HdfsConfiguration conf = createCachingConf();
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0).build();
cluster.waitActive();
DistributedFileSystem dfs = cluster.getFileSystem();
// Create and validate a pool
final String pool = "poolparty";
String groupName = "partygroup";
FsPermission mode = new FsPermission((short)0777);
int weight = 747;
long limit = 747;
dfs.addCachePool(new CachePoolInfo(pool)
.setGroupName(groupName)
.setMode(mode)
.setWeight(weight));
.setLimit(limit));
RemoteIterator<CachePoolEntry> pit = dfs.listCachePools();
assertTrue("No cache pools found", pit.hasNext());
CachePoolInfo info = pit.next().getInfo();
assertEquals(pool, info.getPoolName());
assertEquals(groupName, info.getGroupName());
assertEquals(mode, info.getMode());
assertEquals(weight, (int)info.getWeight());
assertEquals(limit, (long)info.getLimit());
assertFalse("Unexpected # of cache pools found", pit.hasNext());
// Create some cache entries
@ -556,7 +573,7 @@ public class TestCacheDirectives {
assertEquals(pool, info.getPoolName());
assertEquals(groupName, info.getGroupName());
assertEquals(mode, info.getMode());
assertEquals(weight, (int)info.getWeight());
assertEquals(limit, (long)info.getLimit());
assertFalse("Unexpected # of cache pools found", pit.hasNext());
dit = dfs.listCacheDirectives(null);
@ -762,91 +779,64 @@ public class TestCacheDirectives {
numCachedReplicas);
}
private static final long BLOCK_SIZE = 512;
private static final int NUM_DATANODES = 4;
// Most Linux installs will allow non-root users to lock 64KB.
private static final long CACHE_CAPACITY = 64 * 1024 / NUM_DATANODES;
private static HdfsConfiguration createCachingConf() {
HdfsConfiguration conf = new HdfsConfiguration();
conf.setLong(DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
conf.setLong(DFS_DATANODE_MAX_LOCKED_MEMORY_KEY, CACHE_CAPACITY);
conf.setLong(DFS_HEARTBEAT_INTERVAL_KEY, 1);
conf.setBoolean(DFS_NAMENODE_CACHING_ENABLED_KEY, true);
conf.setLong(DFS_CACHEREPORT_INTERVAL_MSEC_KEY, 1000);
conf.setLong(DFS_NAMENODE_PATH_BASED_CACHE_REFRESH_INTERVAL_MS, 1000);
return conf;
}
@Test(timeout=120000)
public void testWaitForCachedReplicas() throws Exception {
HdfsConfiguration conf = createCachingConf();
FileSystemTestHelper helper = new FileSystemTestHelper();
MiniDFSCluster cluster =
new MiniDFSCluster.Builder(conf).numDataNodes(NUM_DATANODES).build();
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
return ((namenode.getNamesystem().getCacheCapacity() ==
(NUM_DATANODES * CACHE_CAPACITY)) &&
(namenode.getNamesystem().getCacheUsed() == 0));
}
}, 500, 60000);
try {
cluster.waitActive();
DistributedFileSystem dfs = cluster.getFileSystem();
final NameNode namenode = cluster.getNameNode();
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
return ((namenode.getNamesystem().getCacheCapacity() ==
(NUM_DATANODES * CACHE_CAPACITY)) &&
(namenode.getNamesystem().getCacheUsed() == 0));
}
}, 500, 60000);
NamenodeProtocols nnRpc = namenode.getRpcServer();
Path rootDir = helper.getDefaultWorkingDirectory(dfs);
// Create the pool
final String pool = "friendlyPool";
nnRpc.addCachePool(new CachePoolInfo("friendlyPool"));
// Create some test files
final int numFiles = 2;
final int numBlocksPerFile = 2;
final List<String> paths = new ArrayList<String>(numFiles);
for (int i=0; i<numFiles; i++) {
Path p = new Path(rootDir, "testCachePaths-" + i);
FileSystemTestHelper.createFile(dfs, p, numBlocksPerFile,
(int)BLOCK_SIZE);
paths.add(p.toUri().getPath());
}
// Check the initial statistics at the namenode
waitForCachedBlocks(namenode, 0, 0, "testWaitForCachedReplicas:0");
// Cache and check each path in sequence
int expected = 0;
for (int i=0; i<numFiles; i++) {
CacheDirectiveInfo directive =
new CacheDirectiveInfo.Builder().
setPath(new Path(paths.get(i))).
setPool(pool).
build();
nnRpc.addCacheDirective(directive);
expected += numBlocksPerFile;
waitForCachedBlocks(namenode, expected, expected,
"testWaitForCachedReplicas:1");
}
// Uncache and check each path in sequence
RemoteIterator<CacheDirectiveEntry> entries =
new CacheDirectiveIterator(nnRpc, null);
for (int i=0; i<numFiles; i++) {
CacheDirectiveEntry entry = entries.next();
nnRpc.removeCacheDirective(entry.getInfo().getId());
expected -= numBlocksPerFile;
waitForCachedBlocks(namenode, expected, expected,
"testWaitForCachedReplicas:2");
}
} finally {
cluster.shutdown();
NamenodeProtocols nnRpc = namenode.getRpcServer();
Path rootDir = helper.getDefaultWorkingDirectory(dfs);
// Create the pool
final String pool = "friendlyPool";
nnRpc.addCachePool(new CachePoolInfo("friendlyPool"));
// Create some test files
final int numFiles = 2;
final int numBlocksPerFile = 2;
final List<String> paths = new ArrayList<String>(numFiles);
for (int i=0; i<numFiles; i++) {
Path p = new Path(rootDir, "testCachePaths-" + i);
FileSystemTestHelper.createFile(dfs, p, numBlocksPerFile,
(int)BLOCK_SIZE);
paths.add(p.toUri().getPath());
}
// Check the initial statistics at the namenode
waitForCachedBlocks(namenode, 0, 0, "testWaitForCachedReplicas:0");
// Cache and check each path in sequence
int expected = 0;
for (int i=0; i<numFiles; i++) {
CacheDirectiveInfo directive =
new CacheDirectiveInfo.Builder().
setPath(new Path(paths.get(i))).
setPool(pool).
build();
nnRpc.addCacheDirective(directive, EnumSet.noneOf(CacheFlag.class));
expected += numBlocksPerFile;
waitForCachedBlocks(namenode, expected, expected,
"testWaitForCachedReplicas:1");
}
// Uncache and check each path in sequence
RemoteIterator<CacheDirectiveEntry> entries =
new CacheDirectiveIterator(nnRpc, null);
for (int i=0; i<numFiles; i++) {
CacheDirectiveEntry entry = entries.next();
nnRpc.removeCacheDirective(entry.getInfo().getId());
expected -= numBlocksPerFile;
waitForCachedBlocks(namenode, expected, expected,
"testWaitForCachedReplicas:2");
}
}
@Test(timeout=120000)
public void testAddingCacheDirectiveInfosWhenCachingIsDisabled()
throws Exception {
cluster.shutdown();
HdfsConfiguration conf = createCachingConf();
conf.setBoolean(DFS_NAMENODE_CACHING_ENABLED_KEY, false);
MiniDFSCluster cluster =
@ -894,103 +884,92 @@ public class TestCacheDirectives {
@Test(timeout=120000)
public void testWaitForCachedReplicasInDirectory() throws Exception {
HdfsConfiguration conf = createCachingConf();
MiniDFSCluster cluster =
new MiniDFSCluster.Builder(conf).numDataNodes(NUM_DATANODES).build();
try {
cluster.waitActive();
DistributedFileSystem dfs = cluster.getFileSystem();
NameNode namenode = cluster.getNameNode();
// Create the pool
final String pool = "friendlyPool";
final CachePoolInfo poolInfo = new CachePoolInfo(pool);
dfs.addCachePool(poolInfo);
// Create some test files
final List<Path> paths = new LinkedList<Path>();
paths.add(new Path("/foo/bar"));
paths.add(new Path("/foo/baz"));
paths.add(new Path("/foo2/bar2"));
paths.add(new Path("/foo2/baz2"));
dfs.mkdir(new Path("/foo"), FsPermission.getDirDefault());
dfs.mkdir(new Path("/foo2"), FsPermission.getDirDefault());
final int numBlocksPerFile = 2;
for (Path path : paths) {
FileSystemTestHelper.createFile(dfs, path, numBlocksPerFile,
(int)BLOCK_SIZE, (short)3, false);
}
waitForCachedBlocks(namenode, 0, 0,
"testWaitForCachedReplicasInDirectory:0");
// cache entire directory
long id = dfs.addCacheDirective(
new CacheDirectiveInfo.Builder().
setPath(new Path("/foo")).
setReplication((short)2).
setPool(pool).
build());
waitForCachedBlocks(namenode, 4, 8,
"testWaitForCachedReplicasInDirectory:1:blocks");
// Verify that listDirectives gives the stats we want.
waitForCacheDirectiveStats(dfs,
4 * numBlocksPerFile * BLOCK_SIZE, 4 * numBlocksPerFile * BLOCK_SIZE,
2, 2,
new CacheDirectiveInfo.Builder().
setPath(new Path("/foo")).
build(),
"testWaitForCachedReplicasInDirectory:1:directive");
waitForCachePoolStats(dfs,
4 * numBlocksPerFile * BLOCK_SIZE, 4 * numBlocksPerFile * BLOCK_SIZE,
2, 2,
poolInfo, "testWaitForCachedReplicasInDirectory:1:pool");
long id2 = dfs.addCacheDirective(
new CacheDirectiveInfo.Builder().
setPath(new Path("/foo/bar")).
setReplication((short)4).
setPool(pool).
build());
// wait for an additional 2 cached replicas to come up
waitForCachedBlocks(namenode, 4, 10,
"testWaitForCachedReplicasInDirectory:2:blocks");
// the directory directive's stats are unchanged
waitForCacheDirectiveStats(dfs,
4 * numBlocksPerFile * BLOCK_SIZE, 4 * numBlocksPerFile * BLOCK_SIZE,
2, 2,
new CacheDirectiveInfo.Builder().
setPath(new Path("/foo")).
build(),
"testWaitForCachedReplicasInDirectory:2:directive-1");
// verify /foo/bar's stats
waitForCacheDirectiveStats(dfs,
4 * numBlocksPerFile * BLOCK_SIZE,
// only 3 because the file only has 3 replicas, not 4 as requested.
3 * numBlocksPerFile * BLOCK_SIZE,
1,
// only 0 because the file can't be fully cached
0,
new CacheDirectiveInfo.Builder().
setPath(new Path("/foo/bar")).
build(),
"testWaitForCachedReplicasInDirectory:2:directive-2");
waitForCachePoolStats(dfs,
(4+4) * numBlocksPerFile * BLOCK_SIZE,
(4+3) * numBlocksPerFile * BLOCK_SIZE,
3, 2,
poolInfo, "testWaitForCachedReplicasInDirectory:2:pool");
// remove and watch numCached go to 0
dfs.removeCacheDirective(id);
dfs.removeCacheDirective(id2);
waitForCachedBlocks(namenode, 0, 0,
"testWaitForCachedReplicasInDirectory:3:blocks");
waitForCachePoolStats(dfs,
0, 0,
0, 0,
poolInfo, "testWaitForCachedReplicasInDirectory:3:pool");
} finally {
cluster.shutdown();
// Create the pool
final String pool = "friendlyPool";
final CachePoolInfo poolInfo = new CachePoolInfo(pool);
dfs.addCachePool(poolInfo);
// Create some test files
final List<Path> paths = new LinkedList<Path>();
paths.add(new Path("/foo/bar"));
paths.add(new Path("/foo/baz"));
paths.add(new Path("/foo2/bar2"));
paths.add(new Path("/foo2/baz2"));
dfs.mkdir(new Path("/foo"), FsPermission.getDirDefault());
dfs.mkdir(new Path("/foo2"), FsPermission.getDirDefault());
final int numBlocksPerFile = 2;
for (Path path : paths) {
FileSystemTestHelper.createFile(dfs, path, numBlocksPerFile,
(int)BLOCK_SIZE, (short)3, false);
}
waitForCachedBlocks(namenode, 0, 0,
"testWaitForCachedReplicasInDirectory:0");
// cache entire directory
long id = dfs.addCacheDirective(
new CacheDirectiveInfo.Builder().
setPath(new Path("/foo")).
setReplication((short)2).
setPool(pool).
build());
waitForCachedBlocks(namenode, 4, 8,
"testWaitForCachedReplicasInDirectory:1:blocks");
// Verify that listDirectives gives the stats we want.
waitForCacheDirectiveStats(dfs,
4 * numBlocksPerFile * BLOCK_SIZE, 4 * numBlocksPerFile * BLOCK_SIZE,
2, 2,
new CacheDirectiveInfo.Builder().
setPath(new Path("/foo")).
build(),
"testWaitForCachedReplicasInDirectory:1:directive");
waitForCachePoolStats(dfs,
4 * numBlocksPerFile * BLOCK_SIZE, 4 * numBlocksPerFile * BLOCK_SIZE,
2, 2,
poolInfo, "testWaitForCachedReplicasInDirectory:1:pool");
long id2 = dfs.addCacheDirective(
new CacheDirectiveInfo.Builder().
setPath(new Path("/foo/bar")).
setReplication((short)4).
setPool(pool).
build());
// wait for an additional 2 cached replicas to come up
waitForCachedBlocks(namenode, 4, 10,
"testWaitForCachedReplicasInDirectory:2:blocks");
// the directory directive's stats are unchanged
waitForCacheDirectiveStats(dfs,
4 * numBlocksPerFile * BLOCK_SIZE, 4 * numBlocksPerFile * BLOCK_SIZE,
2, 2,
new CacheDirectiveInfo.Builder().
setPath(new Path("/foo")).
build(),
"testWaitForCachedReplicasInDirectory:2:directive-1");
// verify /foo/bar's stats
waitForCacheDirectiveStats(dfs,
4 * numBlocksPerFile * BLOCK_SIZE,
// only 3 because the file only has 3 replicas, not 4 as requested.
3 * numBlocksPerFile * BLOCK_SIZE,
1,
// only 0 because the file can't be fully cached
0,
new CacheDirectiveInfo.Builder().
setPath(new Path("/foo/bar")).
build(),
"testWaitForCachedReplicasInDirectory:2:directive-2");
waitForCachePoolStats(dfs,
(4+4) * numBlocksPerFile * BLOCK_SIZE,
(4+3) * numBlocksPerFile * BLOCK_SIZE,
3, 2,
poolInfo, "testWaitForCachedReplicasInDirectory:2:pool");
// remove and watch numCached go to 0
dfs.removeCacheDirective(id);
dfs.removeCacheDirective(id2);
waitForCachedBlocks(namenode, 0, 0,
"testWaitForCachedReplicasInDirectory:3:blocks");
waitForCachePoolStats(dfs,
0, 0,
0, 0,
poolInfo, "testWaitForCachedReplicasInDirectory:3:pool");
}
/**
@ -1000,68 +979,57 @@ public class TestCacheDirectives {
*/
@Test(timeout=120000)
public void testReplicationFactor() throws Exception {
HdfsConfiguration conf = createCachingConf();
MiniDFSCluster cluster =
new MiniDFSCluster.Builder(conf).numDataNodes(NUM_DATANODES).build();
try {
cluster.waitActive();
DistributedFileSystem dfs = cluster.getFileSystem();
NameNode namenode = cluster.getNameNode();
// Create the pool
final String pool = "friendlyPool";
dfs.addCachePool(new CachePoolInfo(pool));
// Create some test files
final List<Path> paths = new LinkedList<Path>();
paths.add(new Path("/foo/bar"));
paths.add(new Path("/foo/baz"));
paths.add(new Path("/foo2/bar2"));
paths.add(new Path("/foo2/baz2"));
dfs.mkdir(new Path("/foo"), FsPermission.getDirDefault());
dfs.mkdir(new Path("/foo2"), FsPermission.getDirDefault());
final int numBlocksPerFile = 2;
for (Path path : paths) {
FileSystemTestHelper.createFile(dfs, path, numBlocksPerFile,
(int)BLOCK_SIZE, (short)3, false);
}
waitForCachedBlocks(namenode, 0, 0, "testReplicationFactor:0");
checkNumCachedReplicas(dfs, paths, 0, 0);
// cache directory
long id = dfs.addCacheDirective(
new CacheDirectiveInfo.Builder().
setPath(new Path("/foo")).
setReplication((short)1).
setPool(pool).
build());
waitForCachedBlocks(namenode, 4, 4, "testReplicationFactor:1");
checkNumCachedReplicas(dfs, paths, 4, 4);
// step up the replication factor
for (int i=2; i<=3; i++) {
dfs.modifyCacheDirective(
new CacheDirectiveInfo.Builder().
setId(id).
setReplication((short)i).
build());
waitForCachedBlocks(namenode, 4, 4*i, "testReplicationFactor:2");
checkNumCachedReplicas(dfs, paths, 4, 4*i);
}
// step it down
for (int i=2; i>=1; i--) {
dfs.modifyCacheDirective(
new CacheDirectiveInfo.Builder().
setId(id).
setReplication((short)i).
build());
waitForCachedBlocks(namenode, 4, 4*i, "testReplicationFactor:3");
checkNumCachedReplicas(dfs, paths, 4, 4*i);
}
// remove and watch numCached go to 0
dfs.removeCacheDirective(id);
waitForCachedBlocks(namenode, 0, 0, "testReplicationFactor:4");
checkNumCachedReplicas(dfs, paths, 0, 0);
} finally {
cluster.shutdown();
// Create the pool
final String pool = "friendlyPool";
dfs.addCachePool(new CachePoolInfo(pool));
// Create some test files
final List<Path> paths = new LinkedList<Path>();
paths.add(new Path("/foo/bar"));
paths.add(new Path("/foo/baz"));
paths.add(new Path("/foo2/bar2"));
paths.add(new Path("/foo2/baz2"));
dfs.mkdir(new Path("/foo"), FsPermission.getDirDefault());
dfs.mkdir(new Path("/foo2"), FsPermission.getDirDefault());
final int numBlocksPerFile = 2;
for (Path path : paths) {
FileSystemTestHelper.createFile(dfs, path, numBlocksPerFile,
(int)BLOCK_SIZE, (short)3, false);
}
waitForCachedBlocks(namenode, 0, 0, "testReplicationFactor:0");
checkNumCachedReplicas(dfs, paths, 0, 0);
// cache directory
long id = dfs.addCacheDirective(
new CacheDirectiveInfo.Builder().
setPath(new Path("/foo")).
setReplication((short)1).
setPool(pool).
build());
waitForCachedBlocks(namenode, 4, 4, "testReplicationFactor:1");
checkNumCachedReplicas(dfs, paths, 4, 4);
// step up the replication factor
for (int i=2; i<=3; i++) {
dfs.modifyCacheDirective(
new CacheDirectiveInfo.Builder().
setId(id).
setReplication((short)i).
build());
waitForCachedBlocks(namenode, 4, 4*i, "testReplicationFactor:2");
checkNumCachedReplicas(dfs, paths, 4, 4*i);
}
// step it down
for (int i=2; i>=1; i--) {
dfs.modifyCacheDirective(
new CacheDirectiveInfo.Builder().
setId(id).
setReplication((short)i).
build());
waitForCachedBlocks(namenode, 4, 4*i, "testReplicationFactor:3");
checkNumCachedReplicas(dfs, paths, 4, 4*i);
}
// remove and watch numCached go to 0
dfs.removeCacheDirective(id);
waitForCachedBlocks(namenode, 0, 0, "testReplicationFactor:4");
checkNumCachedReplicas(dfs, paths, 0, 0);
}
@Test(timeout=60000)
@ -1081,11 +1049,12 @@ public class TestCacheDirectives {
assertNull("Unexpected owner name", info.getOwnerName());
assertNull("Unexpected group name", info.getGroupName());
assertNull("Unexpected mode", info.getMode());
assertNull("Unexpected weight", info.getWeight());
assertNull("Unexpected limit", info.getLimit());
// Modify the pool so myuser is now the owner
final long limit = 99;
dfs.modifyCachePool(new CachePoolInfo(poolName)
.setOwnerName(myUser.getShortUserName())
.setWeight(99));
.setLimit(limit));
// Should see full info
it = myDfs.listCachePools();
info = it.next().getInfo();
@ -1096,60 +1065,127 @@ public class TestCacheDirectives {
assertNotNull("Expected group name", info.getGroupName());
assertEquals("Mismatched mode", (short) 0700,
info.getMode().toShort());
assertEquals("Mismatched weight", 99, (int)info.getWeight());
assertEquals("Mismatched limit", limit, (long)info.getLimit());
}
@Test(timeout=60000)
@Test(timeout=120000)
public void testExpiry() throws Exception {
HdfsConfiguration conf = createCachingConf();
MiniDFSCluster cluster =
new MiniDFSCluster.Builder(conf).numDataNodes(NUM_DATANODES).build();
String pool = "pool1";
dfs.addCachePool(new CachePoolInfo(pool));
Path p = new Path("/mypath");
DFSTestUtil.createFile(dfs, p, BLOCK_SIZE*2, (short)2, 0x999);
// Expire after test timeout
Date start = new Date();
Date expiry = DateUtils.addSeconds(start, 120);
final long id = dfs.addCacheDirective(new CacheDirectiveInfo.Builder()
.setPath(p)
.setPool(pool)
.setExpiration(CacheDirectiveInfo.Expiration.newAbsolute(expiry))
.setReplication((short)2)
.build());
waitForCachedBlocks(cluster.getNameNode(), 2, 4, "testExpiry:1");
// Change it to expire sooner
dfs.modifyCacheDirective(new CacheDirectiveInfo.Builder().setId(id)
.setExpiration(Expiration.newRelative(0)).build());
waitForCachedBlocks(cluster.getNameNode(), 0, 0, "testExpiry:2");
RemoteIterator<CacheDirectiveEntry> it = dfs.listCacheDirectives(null);
CacheDirectiveEntry ent = it.next();
assertFalse(it.hasNext());
Date entryExpiry = new Date(ent.getInfo().getExpiration().getMillis());
assertTrue("Directive should have expired",
entryExpiry.before(new Date()));
// Change it back to expire later
dfs.modifyCacheDirective(new CacheDirectiveInfo.Builder().setId(id)
.setExpiration(Expiration.newRelative(120000)).build());
waitForCachedBlocks(cluster.getNameNode(), 2, 4, "testExpiry:3");
it = dfs.listCacheDirectives(null);
ent = it.next();
assertFalse(it.hasNext());
entryExpiry = new Date(ent.getInfo().getExpiration().getMillis());
assertTrue("Directive should not have expired",
entryExpiry.after(new Date()));
// Verify that setting a negative TTL throws an error
try {
DistributedFileSystem dfs = cluster.getFileSystem();
String pool = "pool1";
dfs.addCachePool(new CachePoolInfo(pool));
Path p = new Path("/mypath");
DFSTestUtil.createFile(dfs, p, BLOCK_SIZE*2, (short)2, 0x999);
// Expire after test timeout
Date start = new Date();
Date expiry = DateUtils.addSeconds(start, 120);
final long id = dfs.addCacheDirective(new CacheDirectiveInfo.Builder()
.setPath(p)
.setPool(pool)
.setExpiration(CacheDirectiveInfo.Expiration.newAbsolute(expiry))
.setReplication((short)2)
.build());
waitForCachedBlocks(cluster.getNameNode(), 2, 4, "testExpiry:1");
// Change it to expire sooner
dfs.modifyCacheDirective(new CacheDirectiveInfo.Builder().setId(id)
.setExpiration(Expiration.newRelative(0)).build());
waitForCachedBlocks(cluster.getNameNode(), 0, 0, "testExpiry:2");
RemoteIterator<CacheDirectiveEntry> it = dfs.listCacheDirectives(null);
CacheDirectiveEntry ent = it.next();
assertFalse(it.hasNext());
Date entryExpiry = new Date(ent.getInfo().getExpiration().getMillis());
assertTrue("Directive should have expired",
entryExpiry.before(new Date()));
// Change it back to expire later
dfs.modifyCacheDirective(new CacheDirectiveInfo.Builder().setId(id)
.setExpiration(Expiration.newRelative(120000)).build());
waitForCachedBlocks(cluster.getNameNode(), 2, 4, "testExpiry:3");
it = dfs.listCacheDirectives(null);
ent = it.next();
assertFalse(it.hasNext());
entryExpiry = new Date(ent.getInfo().getExpiration().getMillis());
assertTrue("Directive should not have expired",
entryExpiry.after(new Date()));
// Verify that setting a negative TTL throws an error
try {
dfs.modifyCacheDirective(new CacheDirectiveInfo.Builder().setId(id)
.setExpiration(Expiration.newRelative(-1)).build());
} catch (InvalidRequestException e) {
GenericTestUtils
.assertExceptionContains("Cannot set a negative expiration", e);
}
} finally {
cluster.shutdown();
.setExpiration(Expiration.newRelative(-1)).build());
} catch (InvalidRequestException e) {
GenericTestUtils
.assertExceptionContains("Cannot set a negative expiration", e);
}
}
@Test(timeout=120000)
public void testLimit() throws Exception {
try {
dfs.addCachePool(new CachePoolInfo("poolofnegativity").setLimit(-99l));
fail("Should not be able to set a negative limit");
} catch (InvalidRequestException e) {
GenericTestUtils.assertExceptionContains("negative", e);
}
final String destiny = "poolofdestiny";
final Path path1 = new Path("/destiny");
DFSTestUtil.createFile(dfs, path1, 2*BLOCK_SIZE, (short)1, 0x9494);
// Start off with a limit that is too small
final CachePoolInfo poolInfo = new CachePoolInfo(destiny)
.setLimit(2*BLOCK_SIZE-1);
dfs.addCachePool(poolInfo);
final CacheDirectiveInfo info1 = new CacheDirectiveInfo.Builder()
.setPool(destiny).setPath(path1).build();
try {
dfs.addCacheDirective(info1);
fail("Should not be able to cache when there is no more limit");
} catch (InvalidRequestException e) {
GenericTestUtils.assertExceptionContains("remaining capacity", e);
}
// Raise the limit up to fit and it should work this time
poolInfo.setLimit(2*BLOCK_SIZE);
dfs.modifyCachePool(poolInfo);
long id1 = dfs.addCacheDirective(info1);
waitForCachePoolStats(dfs,
2*BLOCK_SIZE, 2*BLOCK_SIZE,
1, 1,
poolInfo, "testLimit:1");
// Adding another file, it shouldn't be cached
final Path path2 = new Path("/failure");
DFSTestUtil.createFile(dfs, path2, BLOCK_SIZE, (short)1, 0x9495);
try {
dfs.addCacheDirective(new CacheDirectiveInfo.Builder()
.setPool(destiny).setPath(path2).build(),
EnumSet.noneOf(CacheFlag.class));
fail("Should not be able to add another cached file");
} catch (InvalidRequestException e) {
GenericTestUtils.assertExceptionContains("remaining capacity", e);
}
// Bring the limit down, the first file should get uncached
poolInfo.setLimit(BLOCK_SIZE);
dfs.modifyCachePool(poolInfo);
waitForCachePoolStats(dfs,
2*BLOCK_SIZE, 0,
1, 0,
poolInfo, "testLimit:2");
RemoteIterator<CachePoolEntry> it = dfs.listCachePools();
assertTrue("Expected a cache pool", it.hasNext());
CachePoolStats stats = it.next().getStats();
assertEquals("Overlimit bytes should be difference of needed and limit",
BLOCK_SIZE, stats.getBytesOverlimit());
// Moving a directive to a pool without enough limit should fail
CachePoolInfo inadequate =
new CachePoolInfo("poolofinadequacy").setLimit(BLOCK_SIZE);
dfs.addCachePool(inadequate);
try {
dfs.modifyCacheDirective(new CacheDirectiveInfo.Builder(info1)
.setId(id1).setPool(inadequate.getPoolName()).build(),
EnumSet.noneOf(CacheFlag.class));
} catch(InvalidRequestException e) {
GenericTestUtils.assertExceptionContains("remaining capacity", e);
}
// Succeeds when force=true
dfs.modifyCacheDirective(new CacheDirectiveInfo.Builder(info1).setId(id1)
.setPool(inadequate.getPoolName()).build(),
EnumSet.of(CacheFlag.FORCE));
// Also can add with force=true
dfs.addCacheDirective(
new CacheDirectiveInfo.Builder().setPool(inadequate.getPoolName())
.setPath(path1).build(), EnumSet.of(CacheFlag.FORCE));
}
}

View File

@ -38,6 +38,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CacheFlag;
import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
@ -760,7 +761,7 @@ public class TestRetryCacheWithHA {
@Override
void invoke() throws Exception {
result = client.addCacheDirective(directive);
result = client.addCacheDirective(directive, EnumSet.of(CacheFlag.FORCE));
}
@Override
@ -802,7 +803,7 @@ public class TestRetryCacheWithHA {
@Override
void prepare() throws Exception {
dfs.addCachePool(new CachePoolInfo(directive.getPool()));
id = client.addCacheDirective(directive);
id = client.addCacheDirective(directive, EnumSet.of(CacheFlag.FORCE));
}
@Override
@ -811,7 +812,7 @@ public class TestRetryCacheWithHA {
new CacheDirectiveInfo.Builder().
setId(id).
setReplication(newReplication).
build());
build(), EnumSet.of(CacheFlag.FORCE));
}
@Override
@ -858,7 +859,7 @@ public class TestRetryCacheWithHA {
@Override
void prepare() throws Exception {
dfs.addCachePool(new CachePoolInfo(directive.getPool()));
id = dfs.addCacheDirective(directive);
id = dfs.addCacheDirective(directive, EnumSet.of(CacheFlag.FORCE));
}
@Override
@ -936,19 +937,19 @@ public class TestRetryCacheWithHA {
@Override
void prepare() throws Exception {
client.addCachePool(new CachePoolInfo(pool).setWeight(10));
client.addCachePool(new CachePoolInfo(pool).setLimit(10l));
}
@Override
void invoke() throws Exception {
client.modifyCachePool(new CachePoolInfo(pool).setWeight(99));
client.modifyCachePool(new CachePoolInfo(pool).setLimit(99l));
}
@Override
boolean checkNamenodeBeforeReturn() throws Exception {
for (int i = 0; i < CHECKTIMES; i++) {
RemoteIterator<CachePoolEntry> iter = dfs.listCachePools();
if (iter.hasNext() && iter.next().getInfo().getWeight() == 99) {
if (iter.hasNext() && (long)iter.next().getInfo().getLimit() == 99) {
return true;
}
Thread.sleep(1000);
@ -1216,7 +1217,7 @@ public class TestRetryCacheWithHA {
CacheDirectiveInfo directiveInfo =
new CacheDirectiveInfo.Builder().setPool(poolName).setPath(path).build();
dfs.addCachePool(new CachePoolInfo(poolName));
dfs.addCacheDirective(directiveInfo);
dfs.addCacheDirective(directiveInfo, EnumSet.of(CacheFlag.FORCE));
poolNames.add(poolName);
}
listCacheDirectives(poolNames, 0);

View File

@ -13,8 +13,8 @@
<TXID>2</TXID>
<DELEGATION_KEY>
<KEY_ID>1</KEY_ID>
<EXPIRY_DATE>1386695013416</EXPIRY_DATE>
<KEY>360a10c6ecac725e</KEY>
<EXPIRY_DATE>1387701670577</EXPIRY_DATE>
<KEY>7bb5467995769b59</KEY>
</DELEGATION_KEY>
</DATA>
</RECORD>
@ -24,8 +24,8 @@
<TXID>3</TXID>
<DELEGATION_KEY>
<KEY_ID>2</KEY_ID>
<EXPIRY_DATE>1386695013425</EXPIRY_DATE>
<KEY>9b110c0b83225f7d</KEY>
<EXPIRY_DATE>1387701670580</EXPIRY_DATE>
<KEY>a5a3a2755e36827b</KEY>
</DELEGATION_KEY>
</DATA>
</RECORD>
@ -37,17 +37,17 @@
<INODEID>16386</INODEID>
<PATH>/file_create_u\0001;F431</PATH>
<REPLICATION>1</REPLICATION>
<MTIME>1386003814612</MTIME>
<ATIME>1386003814612</ATIME>
<MTIME>1387010471220</MTIME>
<ATIME>1387010471220</ATIME>
<BLOCKSIZE>512</BLOCKSIZE>
<CLIENT_NAME>DFSClient_NONMAPREDUCE_-1253204429_1</CLIENT_NAME>
<CLIENT_NAME>DFSClient_NONMAPREDUCE_-52011019_1</CLIENT_NAME>
<CLIENT_MACHINE>127.0.0.1</CLIENT_MACHINE>
<PERMISSION_STATUS>
<USERNAME>aagarwal</USERNAME>
<USERNAME>andrew</USERNAME>
<GROUPNAME>supergroup</GROUPNAME>
<MODE>420</MODE>
</PERMISSION_STATUS>
<RPC_CLIENTID>f583267a-ef8c-4f3f-9014-b067b83945ad</RPC_CLIENTID>
<RPC_CLIENTID>508263bb-692e-4439-8738-ff89b8b03923</RPC_CLIENTID>
<RPC_CALLID>7</RPC_CALLID>
</DATA>
</RECORD>
@ -59,13 +59,13 @@
<INODEID>0</INODEID>
<PATH>/file_create_u\0001;F431</PATH>
<REPLICATION>1</REPLICATION>
<MTIME>1386003814665</MTIME>
<ATIME>1386003814612</ATIME>
<MTIME>1387010471276</MTIME>
<ATIME>1387010471220</ATIME>
<BLOCKSIZE>512</BLOCKSIZE>
<CLIENT_NAME></CLIENT_NAME>
<CLIENT_MACHINE></CLIENT_MACHINE>
<PERMISSION_STATUS>
<USERNAME>aagarwal</USERNAME>
<USERNAME>andrew</USERNAME>
<GROUPNAME>supergroup</GROUPNAME>
<MODE>420</MODE>
</PERMISSION_STATUS>
@ -78,8 +78,8 @@
<LENGTH>0</LENGTH>
<SRC>/file_create_u\0001;F431</SRC>
<DST>/file_moved</DST>
<TIMESTAMP>1386003814671</TIMESTAMP>
<RPC_CLIENTID>f583267a-ef8c-4f3f-9014-b067b83945ad</RPC_CLIENTID>
<TIMESTAMP>1387010471286</TIMESTAMP>
<RPC_CLIENTID>508263bb-692e-4439-8738-ff89b8b03923</RPC_CLIENTID>
<RPC_CALLID>9</RPC_CALLID>
</DATA>
</RECORD>
@ -89,8 +89,8 @@
<TXID>7</TXID>
<LENGTH>0</LENGTH>
<PATH>/file_moved</PATH>
<TIMESTAMP>1386003814678</TIMESTAMP>
<RPC_CLIENTID>f583267a-ef8c-4f3f-9014-b067b83945ad</RPC_CLIENTID>
<TIMESTAMP>1387010471299</TIMESTAMP>
<RPC_CLIENTID>508263bb-692e-4439-8738-ff89b8b03923</RPC_CLIENTID>
<RPC_CALLID>10</RPC_CALLID>
</DATA>
</RECORD>
@ -101,9 +101,9 @@
<LENGTH>0</LENGTH>
<INODEID>16387</INODEID>
<PATH>/directory_mkdir</PATH>
<TIMESTAMP>1386003814686</TIMESTAMP>
<TIMESTAMP>1387010471312</TIMESTAMP>
<PERMISSION_STATUS>
<USERNAME>aagarwal</USERNAME>
<USERNAME>andrew</USERNAME>
<GROUPNAME>supergroup</GROUPNAME>
<MODE>493</MODE>
</PERMISSION_STATUS>
@ -136,7 +136,7 @@
<TXID>12</TXID>
<SNAPSHOTROOT>/directory_mkdir</SNAPSHOTROOT>
<SNAPSHOTNAME>snapshot1</SNAPSHOTNAME>
<RPC_CLIENTID>f583267a-ef8c-4f3f-9014-b067b83945ad</RPC_CLIENTID>
<RPC_CLIENTID>508263bb-692e-4439-8738-ff89b8b03923</RPC_CLIENTID>
<RPC_CALLID>15</RPC_CALLID>
</DATA>
</RECORD>
@ -147,7 +147,7 @@
<SNAPSHOTROOT>/directory_mkdir</SNAPSHOTROOT>
<SNAPSHOTOLDNAME>snapshot1</SNAPSHOTOLDNAME>
<SNAPSHOTNEWNAME>snapshot2</SNAPSHOTNEWNAME>
<RPC_CLIENTID>f583267a-ef8c-4f3f-9014-b067b83945ad</RPC_CLIENTID>
<RPC_CLIENTID>508263bb-692e-4439-8738-ff89b8b03923</RPC_CLIENTID>
<RPC_CALLID>16</RPC_CALLID>
</DATA>
</RECORD>
@ -157,7 +157,7 @@
<TXID>14</TXID>
<SNAPSHOTROOT>/directory_mkdir</SNAPSHOTROOT>
<SNAPSHOTNAME>snapshot2</SNAPSHOTNAME>
<RPC_CLIENTID>f583267a-ef8c-4f3f-9014-b067b83945ad</RPC_CLIENTID>
<RPC_CLIENTID>508263bb-692e-4439-8738-ff89b8b03923</RPC_CLIENTID>
<RPC_CALLID>17</RPC_CALLID>
</DATA>
</RECORD>
@ -169,17 +169,17 @@
<INODEID>16388</INODEID>
<PATH>/file_create_u\0001;F431</PATH>
<REPLICATION>1</REPLICATION>
<MTIME>1386003814712</MTIME>
<ATIME>1386003814712</ATIME>
<MTIME>1387010471373</MTIME>
<ATIME>1387010471373</ATIME>
<BLOCKSIZE>512</BLOCKSIZE>
<CLIENT_NAME>DFSClient_NONMAPREDUCE_-1253204429_1</CLIENT_NAME>
<CLIENT_NAME>DFSClient_NONMAPREDUCE_-52011019_1</CLIENT_NAME>
<CLIENT_MACHINE>127.0.0.1</CLIENT_MACHINE>
<PERMISSION_STATUS>
<USERNAME>aagarwal</USERNAME>
<USERNAME>andrew</USERNAME>
<GROUPNAME>supergroup</GROUPNAME>
<MODE>420</MODE>
</PERMISSION_STATUS>
<RPC_CLIENTID>f583267a-ef8c-4f3f-9014-b067b83945ad</RPC_CLIENTID>
<RPC_CLIENTID>508263bb-692e-4439-8738-ff89b8b03923</RPC_CLIENTID>
<RPC_CALLID>18</RPC_CALLID>
</DATA>
</RECORD>
@ -191,13 +191,13 @@
<INODEID>0</INODEID>
<PATH>/file_create_u\0001;F431</PATH>
<REPLICATION>1</REPLICATION>
<MTIME>1386003814714</MTIME>
<ATIME>1386003814712</ATIME>
<MTIME>1387010471380</MTIME>
<ATIME>1387010471373</ATIME>
<BLOCKSIZE>512</BLOCKSIZE>
<CLIENT_NAME></CLIENT_NAME>
<CLIENT_MACHINE></CLIENT_MACHINE>
<PERMISSION_STATUS>
<USERNAME>aagarwal</USERNAME>
<USERNAME>andrew</USERNAME>
<GROUPNAME>supergroup</GROUPNAME>
<MODE>420</MODE>
</PERMISSION_STATUS>
@ -253,9 +253,9 @@
<LENGTH>0</LENGTH>
<SRC>/file_create_u\0001;F431</SRC>
<DST>/file_moved</DST>
<TIMESTAMP>1386003814732</TIMESTAMP>
<TIMESTAMP>1387010471428</TIMESTAMP>
<OPTIONS>NONE</OPTIONS>
<RPC_CLIENTID>f583267a-ef8c-4f3f-9014-b067b83945ad</RPC_CLIENTID>
<RPC_CLIENTID>508263bb-692e-4439-8738-ff89b8b03923</RPC_CLIENTID>
<RPC_CALLID>25</RPC_CALLID>
</DATA>
</RECORD>
@ -267,17 +267,17 @@
<INODEID>16389</INODEID>
<PATH>/file_concat_target</PATH>
<REPLICATION>1</REPLICATION>
<MTIME>1386003814737</MTIME>
<ATIME>1386003814737</ATIME>
<MTIME>1387010471438</MTIME>
<ATIME>1387010471438</ATIME>
<BLOCKSIZE>512</BLOCKSIZE>
<CLIENT_NAME>DFSClient_NONMAPREDUCE_-1253204429_1</CLIENT_NAME>
<CLIENT_NAME>DFSClient_NONMAPREDUCE_-52011019_1</CLIENT_NAME>
<CLIENT_MACHINE>127.0.0.1</CLIENT_MACHINE>
<PERMISSION_STATUS>
<USERNAME>aagarwal</USERNAME>
<USERNAME>andrew</USERNAME>
<GROUPNAME>supergroup</GROUPNAME>
<MODE>420</MODE>
</PERMISSION_STATUS>
<RPC_CLIENTID>f583267a-ef8c-4f3f-9014-b067b83945ad</RPC_CLIENTID>
<RPC_CLIENTID>508263bb-692e-4439-8738-ff89b8b03923</RPC_CLIENTID>
<RPC_CALLID>27</RPC_CALLID>
</DATA>
</RECORD>
@ -388,8 +388,8 @@
<INODEID>0</INODEID>
<PATH>/file_concat_target</PATH>
<REPLICATION>1</REPLICATION>
<MTIME>1386003814889</MTIME>
<ATIME>1386003814737</ATIME>
<MTIME>1387010471540</MTIME>
<ATIME>1387010471438</ATIME>
<BLOCKSIZE>512</BLOCKSIZE>
<CLIENT_NAME></CLIENT_NAME>
<CLIENT_MACHINE></CLIENT_MACHINE>
@ -409,7 +409,7 @@
<GENSTAMP>1003</GENSTAMP>
</BLOCK>
<PERMISSION_STATUS>
<USERNAME>aagarwal</USERNAME>
<USERNAME>andrew</USERNAME>
<GROUPNAME>supergroup</GROUPNAME>
<MODE>420</MODE>
</PERMISSION_STATUS>
@ -423,17 +423,17 @@
<INODEID>16390</INODEID>
<PATH>/file_concat_0</PATH>
<REPLICATION>1</REPLICATION>
<MTIME>1386003814891</MTIME>
<ATIME>1386003814891</ATIME>
<MTIME>1387010471547</MTIME>
<ATIME>1387010471547</ATIME>
<BLOCKSIZE>512</BLOCKSIZE>
<CLIENT_NAME>DFSClient_NONMAPREDUCE_-1253204429_1</CLIENT_NAME>
<CLIENT_NAME>DFSClient_NONMAPREDUCE_-52011019_1</CLIENT_NAME>
<CLIENT_MACHINE>127.0.0.1</CLIENT_MACHINE>
<PERMISSION_STATUS>
<USERNAME>aagarwal</USERNAME>
<USERNAME>andrew</USERNAME>
<GROUPNAME>supergroup</GROUPNAME>
<MODE>420</MODE>
</PERMISSION_STATUS>
<RPC_CLIENTID>f583267a-ef8c-4f3f-9014-b067b83945ad</RPC_CLIENTID>
<RPC_CLIENTID>508263bb-692e-4439-8738-ff89b8b03923</RPC_CLIENTID>
<RPC_CALLID>40</RPC_CALLID>
</DATA>
</RECORD>
@ -544,8 +544,8 @@
<INODEID>0</INODEID>
<PATH>/file_concat_0</PATH>
<REPLICATION>1</REPLICATION>
<MTIME>1386003814914</MTIME>
<ATIME>1386003814891</ATIME>
<MTIME>1387010471588</MTIME>
<ATIME>1387010471547</ATIME>
<BLOCKSIZE>512</BLOCKSIZE>
<CLIENT_NAME></CLIENT_NAME>
<CLIENT_MACHINE></CLIENT_MACHINE>
@ -565,7 +565,7 @@
<GENSTAMP>1006</GENSTAMP>
</BLOCK>
<PERMISSION_STATUS>
<USERNAME>aagarwal</USERNAME>
<USERNAME>andrew</USERNAME>
<GROUPNAME>supergroup</GROUPNAME>
<MODE>420</MODE>
</PERMISSION_STATUS>
@ -579,17 +579,17 @@
<INODEID>16391</INODEID>
<PATH>/file_concat_1</PATH>
<REPLICATION>1</REPLICATION>
<MTIME>1386003814916</MTIME>
<ATIME>1386003814916</ATIME>
<MTIME>1387010471595</MTIME>
<ATIME>1387010471595</ATIME>
<BLOCKSIZE>512</BLOCKSIZE>
<CLIENT_NAME>DFSClient_NONMAPREDUCE_-1253204429_1</CLIENT_NAME>
<CLIENT_NAME>DFSClient_NONMAPREDUCE_-52011019_1</CLIENT_NAME>
<CLIENT_MACHINE>127.0.0.1</CLIENT_MACHINE>
<PERMISSION_STATUS>
<USERNAME>aagarwal</USERNAME>
<USERNAME>andrew</USERNAME>
<GROUPNAME>supergroup</GROUPNAME>
<MODE>420</MODE>
</PERMISSION_STATUS>
<RPC_CLIENTID>f583267a-ef8c-4f3f-9014-b067b83945ad</RPC_CLIENTID>
<RPC_CLIENTID>508263bb-692e-4439-8738-ff89b8b03923</RPC_CLIENTID>
<RPC_CALLID>52</RPC_CALLID>
</DATA>
</RECORD>
@ -700,8 +700,8 @@
<INODEID>0</INODEID>
<PATH>/file_concat_1</PATH>
<REPLICATION>1</REPLICATION>
<MTIME>1386003814938</MTIME>
<ATIME>1386003814916</ATIME>
<MTIME>1387010471651</MTIME>
<ATIME>1387010471595</ATIME>
<BLOCKSIZE>512</BLOCKSIZE>
<CLIENT_NAME></CLIENT_NAME>
<CLIENT_MACHINE></CLIENT_MACHINE>
@ -721,7 +721,7 @@
<GENSTAMP>1009</GENSTAMP>
</BLOCK>
<PERMISSION_STATUS>
<USERNAME>aagarwal</USERNAME>
<USERNAME>andrew</USERNAME>
<GROUPNAME>supergroup</GROUPNAME>
<MODE>420</MODE>
</PERMISSION_STATUS>
@ -733,12 +733,12 @@
<TXID>56</TXID>
<LENGTH>0</LENGTH>
<TRG>/file_concat_target</TRG>
<TIMESTAMP>1386003814940</TIMESTAMP>
<TIMESTAMP>1387010471663</TIMESTAMP>
<SOURCES>
<SOURCE1>/file_concat_0</SOURCE1>
<SOURCE2>/file_concat_1</SOURCE2>
</SOURCES>
<RPC_CLIENTID>f583267a-ef8c-4f3f-9014-b067b83945ad</RPC_CLIENTID>
<RPC_CLIENTID>508263bb-692e-4439-8738-ff89b8b03923</RPC_CLIENTID>
<RPC_CALLID>63</RPC_CALLID>
</DATA>
</RECORD>
@ -750,14 +750,14 @@
<INODEID>16392</INODEID>
<PATH>/file_symlink</PATH>
<VALUE>/file_concat_target</VALUE>
<MTIME>1386003814956</MTIME>
<ATIME>1386003814956</ATIME>
<MTIME>1387010471674</MTIME>
<ATIME>1387010471674</ATIME>
<PERMISSION_STATUS>
<USERNAME>aagarwal</USERNAME>
<USERNAME>andrew</USERNAME>
<GROUPNAME>supergroup</GROUPNAME>
<MODE>511</MODE>
</PERMISSION_STATUS>
<RPC_CLIENTID>f583267a-ef8c-4f3f-9014-b067b83945ad</RPC_CLIENTID>
<RPC_CLIENTID>508263bb-692e-4439-8738-ff89b8b03923</RPC_CLIENTID>
<RPC_CALLID>64</RPC_CALLID>
</DATA>
</RECORD>
@ -768,14 +768,14 @@
<DELEGATION_TOKEN_IDENTIFIER>
<KIND>HDFS_DELEGATION_TOKEN</KIND>
<SEQUENCE_NUMBER>1</SEQUENCE_NUMBER>
<OWNER>aagarwal</OWNER>
<OWNER>andrew</OWNER>
<RENEWER>JobTracker</RENEWER>
<REALUSER></REALUSER>
<ISSUE_DATE>1386003814961</ISSUE_DATE>
<MAX_DATE>1386608614961</MAX_DATE>
<ISSUE_DATE>1387010471682</ISSUE_DATE>
<MAX_DATE>1387615271682</MAX_DATE>
<MASTER_KEY_ID>2</MASTER_KEY_ID>
</DELEGATION_TOKEN_IDENTIFIER>
<EXPIRY_TIME>1386090214961</EXPIRY_TIME>
<EXPIRY_TIME>1387096871682</EXPIRY_TIME>
</DATA>
</RECORD>
<RECORD>
@ -785,14 +785,14 @@
<DELEGATION_TOKEN_IDENTIFIER>
<KIND>HDFS_DELEGATION_TOKEN</KIND>
<SEQUENCE_NUMBER>1</SEQUENCE_NUMBER>
<OWNER>aagarwal</OWNER>
<OWNER>andrew</OWNER>
<RENEWER>JobTracker</RENEWER>
<REALUSER></REALUSER>
<ISSUE_DATE>1386003814961</ISSUE_DATE>
<MAX_DATE>1386608614961</MAX_DATE>
<ISSUE_DATE>1387010471682</ISSUE_DATE>
<MAX_DATE>1387615271682</MAX_DATE>
<MASTER_KEY_ID>2</MASTER_KEY_ID>
</DELEGATION_TOKEN_IDENTIFIER>
<EXPIRY_TIME>1386090215078</EXPIRY_TIME>
<EXPIRY_TIME>1387096871717</EXPIRY_TIME>
</DATA>
</RECORD>
<RECORD>
@ -802,11 +802,11 @@
<DELEGATION_TOKEN_IDENTIFIER>
<KIND>HDFS_DELEGATION_TOKEN</KIND>
<SEQUENCE_NUMBER>1</SEQUENCE_NUMBER>
<OWNER>aagarwal</OWNER>
<OWNER>andrew</OWNER>
<RENEWER>JobTracker</RENEWER>
<REALUSER></REALUSER>
<ISSUE_DATE>1386003814961</ISSUE_DATE>
<MAX_DATE>1386608614961</MAX_DATE>
<ISSUE_DATE>1387010471682</ISSUE_DATE>
<MAX_DATE>1387615271682</MAX_DATE>
<MASTER_KEY_ID>2</MASTER_KEY_ID>
</DELEGATION_TOKEN_IDENTIFIER>
</DATA>
@ -816,13 +816,11 @@
<DATA>
<TXID>61</TXID>
<POOLNAME>poolparty</POOLNAME>
<PERMISSION_STATUS>
<USERNAME>aagarwal</USERNAME>
<GROUPNAME>staff</GROUPNAME>
<MODE>493</MODE>
</PERMISSION_STATUS>
<WEIGHT>100</WEIGHT>
<RPC_CLIENTID>f583267a-ef8c-4f3f-9014-b067b83945ad</RPC_CLIENTID>
<OWNERNAME>andrew</OWNERNAME>
<GROUPNAME>andrew</GROUPNAME>
<MODE>493</MODE>
<LIMIT>9223372036854775807</LIMIT>
<RPC_CLIENTID>508263bb-692e-4439-8738-ff89b8b03923</RPC_CLIENTID>
<RPC_CALLID>68</RPC_CALLID>
</DATA>
</RECORD>
@ -834,8 +832,8 @@
<OWNERNAME>carlton</OWNERNAME>
<GROUPNAME>party</GROUPNAME>
<MODE>448</MODE>
<WEIGHT>1989</WEIGHT>
<RPC_CLIENTID>f583267a-ef8c-4f3f-9014-b067b83945ad</RPC_CLIENTID>
<LIMIT>1989</LIMIT>
<RPC_CLIENTID>508263bb-692e-4439-8738-ff89b8b03923</RPC_CLIENTID>
<RPC_CALLID>69</RPC_CALLID>
</DATA>
</RECORD>
@ -848,7 +846,7 @@
<REPLICATION>1</REPLICATION>
<POOL>poolparty</POOL>
<EXPIRATION>-1</EXPIRATION>
<RPC_CLIENTID>f583267a-ef8c-4f3f-9014-b067b83945ad</RPC_CLIENTID>
<RPC_CLIENTID>508263bb-692e-4439-8738-ff89b8b03923</RPC_CLIENTID>
<RPC_CALLID>70</RPC_CALLID>
</DATA>
</RECORD>
@ -858,7 +856,7 @@
<TXID>64</TXID>
<ID>1</ID>
<PATH>/bar2</PATH>
<RPC_CLIENTID>f583267a-ef8c-4f3f-9014-b067b83945ad</RPC_CLIENTID>
<RPC_CLIENTID>508263bb-692e-4439-8738-ff89b8b03923</RPC_CLIENTID>
<RPC_CALLID>71</RPC_CALLID>
</DATA>
</RECORD>
@ -867,7 +865,7 @@
<DATA>
<TXID>65</TXID>
<ID>1</ID>
<RPC_CLIENTID>f583267a-ef8c-4f3f-9014-b067b83945ad</RPC_CLIENTID>
<RPC_CLIENTID>508263bb-692e-4439-8738-ff89b8b03923</RPC_CLIENTID>
<RPC_CALLID>72</RPC_CALLID>
</DATA>
</RECORD>
@ -876,7 +874,7 @@
<DATA>
<TXID>66</TXID>
<POOLNAME>poolparty</POOLNAME>
<RPC_CLIENTID>f583267a-ef8c-4f3f-9014-b067b83945ad</RPC_CLIENTID>
<RPC_CLIENTID>508263bb-692e-4439-8738-ff89b8b03923</RPC_CLIENTID>
<RPC_CALLID>73</RPC_CALLID>
</DATA>
</RECORD>
@ -888,17 +886,17 @@
<INODEID>16393</INODEID>
<PATH>/hard-lease-recovery-test</PATH>
<REPLICATION>1</REPLICATION>
<MTIME>1386003815135</MTIME>
<ATIME>1386003815135</ATIME>
<MTIME>1387010471802</MTIME>
<ATIME>1387010471802</ATIME>
<BLOCKSIZE>512</BLOCKSIZE>
<CLIENT_NAME>DFSClient_NONMAPREDUCE_-1253204429_1</CLIENT_NAME>
<CLIENT_NAME>DFSClient_NONMAPREDUCE_-52011019_1</CLIENT_NAME>
<CLIENT_MACHINE>127.0.0.1</CLIENT_MACHINE>
<PERMISSION_STATUS>
<USERNAME>aagarwal</USERNAME>
<USERNAME>andrew</USERNAME>
<GROUPNAME>supergroup</GROUPNAME>
<MODE>420</MODE>
</PERMISSION_STATUS>
<RPC_CLIENTID>f583267a-ef8c-4f3f-9014-b067b83945ad</RPC_CLIENTID>
<RPC_CLIENTID>508263bb-692e-4439-8738-ff89b8b03923</RPC_CLIENTID>
<RPC_CALLID>74</RPC_CALLID>
</DATA>
</RECORD>
@ -955,7 +953,7 @@
<OPCODE>OP_REASSIGN_LEASE</OPCODE>
<DATA>
<TXID>73</TXID>
<LEASEHOLDER>DFSClient_NONMAPREDUCE_-1253204429_1</LEASEHOLDER>
<LEASEHOLDER>DFSClient_NONMAPREDUCE_-52011019_1</LEASEHOLDER>
<PATH>/hard-lease-recovery-test</PATH>
<NEWHOLDER>HDFS_NameNode</NEWHOLDER>
</DATA>
@ -968,8 +966,8 @@
<INODEID>0</INODEID>
<PATH>/hard-lease-recovery-test</PATH>
<REPLICATION>1</REPLICATION>
<MTIME>1386003817462</MTIME>
<ATIME>1386003815135</ATIME>
<MTIME>1387010474126</MTIME>
<ATIME>1387010471802</ATIME>
<BLOCKSIZE>512</BLOCKSIZE>
<CLIENT_NAME></CLIENT_NAME>
<CLIENT_MACHINE></CLIENT_MACHINE>
@ -979,7 +977,7 @@
<GENSTAMP>1011</GENSTAMP>
</BLOCK>
<PERMISSION_STATUS>
<USERNAME>aagarwal</USERNAME>
<USERNAME>andrew</USERNAME>
<GROUPNAME>supergroup</GROUPNAME>
<MODE>420</MODE>
</PERMISSION_STATUS>

View File

@ -80,8 +80,8 @@
<test> <!--Tested -->
<description>Testing modifying a cache pool</description>
<test-commands>
<cache-admin-command>-addPool poolparty -owner alice -group alicegroup -mode 0000 -weight 50</cache-admin-command>
<cache-admin-command>-modifyPool poolparty -owner bob -group bobgroup -mode 0777 -weight 51</cache-admin-command>
<cache-admin-command>-addPool poolparty -owner alice -group alicegroup -mode 0000 -limit 50</cache-admin-command>
<cache-admin-command>-modifyPool poolparty -owner bob -group bobgroup -mode 0777 -limit 51</cache-admin-command>
<cache-admin-command>-listPools</cache-admin-command>
</test-commands>
<cleanup-commands>
@ -90,7 +90,7 @@
<comparators>
<comparator>
<type>SubstringComparator</type>
<expected-output>poolparty bob bobgroup rwxrwxrwx 51</expected-output>
<expected-output>poolparty bob bobgroup rwxrwxrwx 51</expected-output>
</comparator>
</comparators>
</test>
@ -129,11 +129,11 @@
</comparator>
<comparator>
<type>SubstringComparator</type>
<expected-output>bar alice alicegroup rwxr-xr-x 100</expected-output>
<expected-output>bar alice alicegroup rwxr-xr-x unlimited</expected-output>
</comparator>
<comparator>
<type>SubstringComparator</type>
<expected-output>foo bob bob rw-rw-r-- 100</expected-output>
<expected-output>foo bob bob rw-rw-r-- unlimited</expected-output>
</comparator>
</comparators>
</test>
@ -156,7 +156,7 @@
</comparator>
<comparator>
<type>SubstringComparator</type>
<expected-output>foo bob bob rw-rw-r-- 100</expected-output>
<expected-output>foo bob bob rw-rw-r-- unlimited</expected-output>
</comparator>
</comparators>
</test>
@ -417,11 +417,11 @@
</comparator>
<comparator>
<type>SubstringComparator</type>
<expected-output>bar alice alicegroup rwxr-xr-x 100 0 0 0 0</expected-output>
<expected-output>bar alice alicegroup rwxr-xr-x unlimited 0 0 0 0 0</expected-output>
</comparator>
<comparator>
<type>SubstringComparator</type>
<expected-output>foo bob bob rw-rw-r-- 100 0 0 0 0</expected-output>
<expected-output>foo bob bob rw-rw-r-- unlimited 0 0 0 0 0</expected-output>
</comparator>
</comparators>
</test>