commit correct version of HDFS-5121

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-4949@1520090 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Colin McCabe 2013-09-04 18:23:51 +00:00
parent 97b7267977
commit d56d0b46e1
16 changed files with 828 additions and 684 deletions

View File

@ -29,7 +29,7 @@ public abstract class BatchedRemoteIterator<K, E> implements RemoteIterator<E> {
public E get(int i);
public int size();
}
public static class BatchedListEntries<E> implements BatchedEntries<E> {
private final List<E> entries;
@ -39,7 +39,6 @@ public abstract class BatchedRemoteIterator<K, E> implements RemoteIterator<E> {
public E get(int i) {
return entries.get(i);
}
public int size() {
@ -47,13 +46,13 @@ public abstract class BatchedRemoteIterator<K, E> implements RemoteIterator<E> {
}
}
private K nextKey;
private K prevKey;
private final int maxRepliesPerRequest;
private BatchedEntries<E> entries;
private int idx;
public BatchedRemoteIterator(K nextKey, int maxRepliesPerRequest) {
this.nextKey = nextKey;
public BatchedRemoteIterator(K prevKey, int maxRepliesPerRequest) {
this.prevKey = prevKey;
this.maxRepliesPerRequest = maxRepliesPerRequest;
this.entries = null;
this.idx = -1;
@ -66,13 +65,13 @@ public abstract class BatchedRemoteIterator<K, E> implements RemoteIterator<E> {
* @param maxRepliesPerRequest The maximum number of replies to allow.
* @return A list of replies.
*/
public abstract BatchedEntries<E> makeRequest(K nextKey, int maxRepliesPerRequest)
throws IOException;
public abstract BatchedEntries<E> makeRequest(K prevKey,
int maxRepliesPerRequest) throws IOException;
private void makeRequest() throws IOException {
idx = 0;
entries = null;
entries = makeRequest(nextKey, maxRepliesPerRequest);
entries = makeRequest(prevKey, maxRepliesPerRequest);
if (entries.size() > maxRepliesPerRequest) {
throw new IOException("invalid number of replies returned: got " +
entries.size() + ", expected " + maxRepliesPerRequest +
@ -106,7 +105,7 @@ public abstract class BatchedRemoteIterator<K, E> implements RemoteIterator<E> {
/**
* Return the next list key associated with an element.
*/
public abstract K elementToNextKey(E element);
public abstract K elementToPrevKey(E element);
@Override
public E next() throws IOException {
@ -115,7 +114,7 @@ public abstract class BatchedRemoteIterator<K, E> implements RemoteIterator<E> {
throw new NoSuchElementException();
}
E entry = entries.get(idx++);
nextKey = elementToNextKey(entry);
prevKey = elementToPrevKey(entry);
return entry;
}
}

View File

@ -56,12 +56,12 @@ public abstract class AddPathCacheDirectiveException extends IOException {
}
}
public static class InvalidPoolNameError
public static class InvalidPoolError
extends AddPathCacheDirectiveException {
private static final long serialVersionUID = 1L;
public InvalidPoolNameError(PathCacheDirective directive) {
super("invalid pool name '" + directive.getPool() + "'", directive);
public InvalidPoolError(PathCacheDirective directive) {
super("invalid pool id " + directive.getPoolId(), directive);
}
}
@ -70,7 +70,7 @@ public abstract class AddPathCacheDirectiveException extends IOException {
private static final long serialVersionUID = 1L;
public PoolWritePermissionDeniedError(PathCacheDirective directive) {
super("write permission denied for pool '" + directive.getPool() + "'",
super("write permission denied for pool id " + directive.getPoolId(),
directive);
}
}
@ -82,7 +82,9 @@ public abstract class AddPathCacheDirectiveException extends IOException {
public UnexpectedAddPathCacheDirectiveException(
PathCacheDirective directive) {
super("encountered an unexpected error when trying to " +
"add path cache directive " + directive, directive);
"add path cache directive to pool id " + directive.getPoolId() +
" " + directive,
directive);
}
}
};

View File

@ -18,35 +18,45 @@
package org.apache.hadoop.hdfs.protocol;
import javax.annotation.Nullable;
import org.apache.commons.lang.builder.EqualsBuilder;
import org.apache.commons.lang.builder.HashCodeBuilder;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.permission.FsPermission;
import com.google.common.base.Preconditions;
/**
* Information about a cache pool.
*
* CachePoolInfo permissions roughly map to Unix file permissions.
* Write permissions allow addition and removal of a {@link PathCacheEntry} from
* the pool. Execute permissions allow listing of PathCacheEntries in a pool.
* Read permissions have no associated meaning.
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
public class CachePoolInfo {
final String poolName;
@Nullable
String ownerName;
private String poolName;
private String ownerName;
private String groupName;
private FsPermission mode;
private Integer weight;
@Nullable
String groupName;
@Nullable
Integer mode;
@Nullable
Integer weight;
/**
* For Builder use
*/
private CachePoolInfo() {}
/**
* Use a CachePoolInfo {@link Builder} to create a new CachePoolInfo with
* more parameters
*/
public CachePoolInfo(String poolName) {
this.poolName = poolName;
}
public String getPoolName() {
return poolName;
}
@ -55,35 +65,103 @@ public class CachePoolInfo {
return ownerName;
}
public CachePoolInfo setOwnerName(String ownerName) {
this.ownerName = ownerName;
return this;
}
public String getGroupName() {
return groupName;
}
public CachePoolInfo setGroupName(String groupName) {
this.groupName = groupName;
return this;
}
public Integer getMode() {
public FsPermission getMode() {
return mode;
}
public CachePoolInfo setMode(Integer mode) {
this.mode = mode;
return this;
}
public Integer getWeight() {
return weight;
}
public CachePoolInfo setWeight(Integer weight) {
this.weight = weight;
return this;
public String toString() {
return new StringBuilder().
append("{ ").append("poolName:").append(poolName).
append(", ownerName:").append(ownerName).
append(", groupName:").append(groupName).
append(", mode:").append(mode).
append(", weight:").append(weight).
append(" }").toString();
}
@Override
public int hashCode() {
return new HashCodeBuilder().append(poolName).append(ownerName)
.append(groupName).append(mode.toShort()).append(weight).hashCode();
}
@Override
public boolean equals(Object obj) {
if (obj == null) { return false; }
if (obj == this) { return true; }
if (obj.getClass() != getClass()) {
return false;
}
CachePoolInfo rhs = (CachePoolInfo)obj;
return new EqualsBuilder()
.append(poolName, rhs.poolName)
.append(ownerName, rhs.ownerName)
.append(groupName, rhs.groupName)
.append(mode, rhs.mode)
.append(weight, rhs.weight)
.isEquals();
}
public static Builder newBuilder() {
return new Builder();
}
public static Builder newBuilder(CachePoolInfo info) {
return new Builder(info);
}
/**
* CachePoolInfo Builder
*/
public static class Builder {
private CachePoolInfo info;
public Builder() {
this.info = new CachePoolInfo();
}
public Builder(CachePoolInfo info) {
this.info = info;
}
public CachePoolInfo build() {
Preconditions.checkNotNull(info.poolName,
"Cannot create a CachePoolInfo without a pool name");
return info;
}
public Builder setPoolName(String poolName) {
info.poolName = poolName;
return this;
}
public Builder setOwnerName(String ownerName) {
info.ownerName = ownerName;
return this;
}
public Builder setGroupName(String groupName) {
info.groupName = groupName;
return this;
}
public Builder setMode(FsPermission mode) {
info.mode = mode;
return this;
}
public Builder setWeight(Integer weight) {
info.weight = weight;
return this;
}
}
}

View File

@ -38,6 +38,7 @@ import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenSelector;
import org.apache.hadoop.hdfs.server.namenode.CachePool;
import org.apache.hadoop.hdfs.server.namenode.NotReplicatedYetException;
import org.apache.hadoop.hdfs.server.namenode.SafeModeException;
import org.apache.hadoop.io.EnumSetWritable;
@ -1099,98 +1100,82 @@ public interface ClientProtocol {
/**
* Add some path cache directives to the CacheManager.
*
* @param directives
* A list of all the path cache directives we want to add.
* @return
* An list where each element is either a path cache entry that was
* added, or an IOException exception describing why the directive
* could not be added.
*
* @param directives A list of path cache directives to be added.
* @return A Fallible list, where each element is either a successfully addded
* path cache entry, or an IOException describing why the directive
* could not be added.
*/
@AtMostOnce
public List<Fallible<PathCacheEntry>>
addPathCacheDirectives(List<PathCacheDirective> directives)
throws IOException;
public List<Fallible<PathCacheEntry>> addPathCacheDirectives(
List<PathCacheDirective> directives) throws IOException;
/**
* Remove some path cache entries from the CacheManager.
*
* @param ids
* A list of all the IDs we want to remove from the CacheManager.
* @return
* An list where each element is either an ID that was removed,
* or an IOException exception describing why the ID could not be
* removed.
*
* @param ids A list of all the entry IDs to be removed from the CacheManager.
* @return A Fallible list where each element is either a successfully removed
* ID, or an IOException describing why the ID could not be removed.
*/
@AtMostOnce
@Idempotent
public List<Fallible<Long>> removePathCacheEntries(List<Long> ids)
throws IOException;
/**
* List cached paths on the server.
*
* @param prevId
* The previous ID that we listed, or 0 if this is the first call
* to listPathCacheEntries.
* @param pool
* The pool ID to list. If this is the empty string, all pool ids
* will be listed.
* @param maxRepliesPerRequest
* The maximum number of replies to make in each request.
* @return
* A RemoteIterator from which you can get PathCacheEntry objects.
* Requests will be made as needed.
* List the set of cached paths of a cache pool. Incrementally fetches results
* from the server.
*
* @param prevId The last listed entry ID, or -1 if this is the first call to
* listPathCacheEntries.
* @param pool The cache pool to list, or -1 to list all pools
* @param maxRepliesPerRequest The maximum number of entries to return per
* request
* @return A RemoteIterator which returns PathCacheEntry objects.
*/
@Idempotent
public RemoteIterator<PathCacheEntry> listPathCacheEntries(long prevId,
String pool, int maxRepliesPerRequest) throws IOException;
/**
* Modify a cache pool.
*
* @param req
* The request to modify a cache pool.
* @throws IOException
* If the request could not be completed.
*/
@AtMostOnce
public void addCachePool(CachePoolInfo info) throws IOException;
long poolId, int maxRepliesPerRequest) throws IOException;
/**
* Modify a cache pool.
*
* @param req
* The request to modify a cache pool.
* @throws IOException
* If the request could not be completed.
* Add a new cache pool.
*
* @param info Description of the new cache pool
* @throws IOException If the request could not be completed.
*/
@Idempotent
public void modifyCachePool(CachePoolInfo req) throws IOException;
@AtMostOnce
public CachePool addCachePool(CachePoolInfo info) throws IOException;
/**
* Modify a cache pool, e.g. pool name, permissions, owner, group.
*
* @param poolId ID of the cache pool to modify
* @param info New metadata for the cache pool
* @throws IOException If the request could not be completed.
*/
@AtMostOnce
public void modifyCachePool(long poolId, CachePoolInfo info)
throws IOException;
/**
* Remove a cache pool.
*
* @param cachePoolName
* Name of the cache pool to remove.
* @throws IOException
* if the cache pool did not exist, or could not be removed.
*/
@AtMostOnce
public void removeCachePool(String cachePoolName) throws IOException;
/**
* List some cache pools.
*
* @param prevKey
* The previous key we listed. We will list keys greater than this.
* @param maxRepliesPerRequest
* Maximum number of cache pools to list.
* @return A remote iterator from which you can get CachePool objects.
* Requests will be made as needed.
* @throws IOException
* If there was an error listing cache pools.
*
* @param poolId ID of the cache pool to remove.
* @throws IOException if the cache pool did not exist, or could not be
* removed.
*/
@Idempotent
public RemoteIterator<CachePoolInfo> listCachePools(String prevKey,
public void removeCachePool(long poolId) throws IOException;
/**
* List the set of cache pools. Incrementally fetches results from the server.
*
* @param prevPoolId ID of the last pool listed, or -1 if this is the first
* invocation of listCachePools
* @param maxRepliesPerRequest Maximum number of cache pools to return per
* server request.
* @return A RemoteIterator which returns CachePool objects.
*/
@Idempotent
public RemoteIterator<CachePool> listCachePools(long prevPoolId,
int maxRepliesPerRequest) throws IOException;
}

View File

@ -25,7 +25,7 @@ import com.google.common.collect.ComparisonChain;
import org.apache.commons.lang.builder.HashCodeBuilder;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.protocol.AddPathCacheDirectiveException.EmptyPathError;
import org.apache.hadoop.hdfs.protocol.AddPathCacheDirectiveException.InvalidPoolNameError;
import org.apache.hadoop.hdfs.protocol.AddPathCacheDirectiveException.InvalidPoolError;
import org.apache.hadoop.hdfs.protocol.AddPathCacheDirectiveException.InvalidPathNameError;
/**
@ -33,14 +33,13 @@ import org.apache.hadoop.hdfs.protocol.AddPathCacheDirectiveException.InvalidPat
*/
public class PathCacheDirective implements Comparable<PathCacheDirective> {
private final String path;
private final long poolId;
private final String pool;
public PathCacheDirective(String path, String pool) {
public PathCacheDirective(String path, long poolId) {
Preconditions.checkNotNull(path);
Preconditions.checkNotNull(pool);
Preconditions.checkArgument(poolId > 0);
this.path = path;
this.pool = pool;
this.poolId = poolId;
}
/**
@ -53,8 +52,8 @@ public class PathCacheDirective implements Comparable<PathCacheDirective> {
/**
* @return The pool used in this request.
*/
public String getPool() {
return pool;
public long getPoolId() {
return poolId;
}
/**
@ -70,22 +69,22 @@ public class PathCacheDirective implements Comparable<PathCacheDirective> {
if (!DFSUtil.isValidName(path)) {
throw new InvalidPathNameError(this);
}
if (pool.isEmpty()) {
throw new InvalidPoolNameError(this);
if (poolId <= 0) {
throw new InvalidPoolError(this);
}
}
@Override
public int compareTo(PathCacheDirective rhs) {
return ComparisonChain.start().
compare(pool, rhs.getPool()).
compare(poolId, rhs.getPoolId()).
compare(path, rhs.getPath()).
result();
}
@Override
public int hashCode() {
return new HashCodeBuilder().append(path).append(pool).hashCode();
return new HashCodeBuilder().append(path).append(poolId).hashCode();
}
@Override
@ -102,7 +101,7 @@ public class PathCacheDirective implements Comparable<PathCacheDirective> {
public String toString() {
StringBuilder builder = new StringBuilder();
builder.append("{ path:").append(path).
append(", pool:").append(pool).
append(", poolId:").append(poolId).
append(" }");
return builder.toString();
}

View File

@ -29,8 +29,7 @@ import org.apache.hadoop.fs.Options.Rename;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.hdfs.protocol.AddPathCacheDirectiveException.EmptyPathError;
import org.apache.hadoop.hdfs.protocol.AddPathCacheDirectiveException.InvalidPathNameError;
import org.apache.hadoop.hdfs.protocol.AddPathCacheDirectiveException.InvalidPoolNameError;
import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
import org.apache.hadoop.hdfs.protocol.AddPathCacheDirectiveException.InvalidPoolError;
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.protocol.CorruptFileBlocks;
import org.apache.hadoop.hdfs.protocol.DirectoryListing;
@ -113,7 +112,6 @@ import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListCa
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListCachePoolsResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListCorruptFileBlocksRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListCorruptFileBlocksResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListPathCacheEntriesElementProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListPathCacheEntriesRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListPathCacheEntriesResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.MetaSaveRequestProto;
@ -173,7 +171,6 @@ import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
import org.apache.hadoop.hdfs.server.namenode.CachePool;
import org.apache.hadoop.hdfs.server.namenode.INodeId;
import org.apache.hadoop.hdfs.server.namenode.UnsupportedActionException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.proto.SecurityProtos.CancelDelegationTokenRequestProto;
import org.apache.hadoop.security.proto.SecurityProtos.CancelDelegationTokenResponseProto;
@ -1038,16 +1035,19 @@ public class ClientNamenodeProtocolServerSideTranslatorPB implements
}
@Override
public AddPathCacheDirectivesResponseProto addPathCacheDirectives(RpcController controller,
AddPathCacheDirectivesRequestProto request) throws ServiceException {
public AddPathCacheDirectivesResponseProto addPathCacheDirectives(
RpcController controller, AddPathCacheDirectivesRequestProto request)
throws ServiceException {
try {
ArrayList<PathCacheDirective> input =
new ArrayList<PathCacheDirective>(request.getElementsCount());
for (int i = 0; i < request.getElementsCount(); i++) {
PathCacheDirectiveProto proto = request.getElements(i);
input.add(new PathCacheDirective(proto.getPath(), proto.getPool()));
input.add(new PathCacheDirective(proto.getPath(),
proto.getPool().getId()));
}
List<Fallible<PathCacheEntry>> output = server.addPathCacheDirectives(input);
List<Fallible<PathCacheEntry>> output = server
.addPathCacheDirectives(input);
AddPathCacheDirectivesResponseProto.Builder builder =
AddPathCacheDirectivesResponseProto.newBuilder();
for (int idx = 0; idx < output.size(); idx++) {
@ -1060,7 +1060,7 @@ public class ClientNamenodeProtocolServerSideTranslatorPB implements
} catch (InvalidPathNameError ioe) {
builder.addResults(AddPathCacheDirectiveErrorProto.
INVALID_PATH_NAME_ERROR_VALUE);
} catch (InvalidPoolNameError ioe) {
} catch (InvalidPoolError ioe) {
builder.addResults(AddPathCacheDirectiveErrorProto.
INVALID_POOL_NAME_ERROR_VALUE);
} catch (IOException ioe) {
@ -1108,22 +1108,21 @@ public class ClientNamenodeProtocolServerSideTranslatorPB implements
}
@Override
public ListPathCacheEntriesResponseProto listPathCacheEntries(RpcController controller,
ListPathCacheEntriesRequestProto request) throws ServiceException {
public ListPathCacheEntriesResponseProto listPathCacheEntries(
RpcController controller, ListPathCacheEntriesRequestProto request)
throws ServiceException {
try {
CachePool pool = PBHelper.convert(request.getPool());
RemoteIterator<PathCacheEntry> iter =
server.listPathCacheEntries(request.getPrevId(),
request.getPool(),
server.listPathCacheEntries(
PBHelper.convert(request.getPrevEntry()).getEntryId(),
pool.getId(),
request.getMaxReplies());
ListPathCacheEntriesResponseProto.Builder builder =
ListPathCacheEntriesResponseProto.newBuilder();
while (iter.hasNext()) {
PathCacheEntry entry = iter.next();
builder.addElements(
ListPathCacheEntriesElementProto.newBuilder().
setId(entry.getEntryId()).
setPath(entry.getDirective().getPath()).
setPool(entry.getDirective().getPool()));
builder.addEntries(PBHelper.convert(entry));
}
return builder.build();
} catch (IOException e) {
@ -1135,46 +1134,20 @@ public class ClientNamenodeProtocolServerSideTranslatorPB implements
public AddCachePoolResponseProto addCachePool(RpcController controller,
AddCachePoolRequestProto request) throws ServiceException {
try {
CachePoolInfo info =
new CachePoolInfo(request.getPoolName());
if (request.hasOwnerName()) {
info.setOwnerName(request.getOwnerName());
}
if (request.hasGroupName()) {
info.setGroupName(request.getGroupName());
}
if (request.hasMode()) {
info.setMode(request.getMode());
}
if (request.hasWeight()) {
info.setWeight(request.getWeight());
}
server.addCachePool(info);
server.addCachePool(PBHelper.convert(request.getInfo()));
return AddCachePoolResponseProto.newBuilder().build();
} catch (IOException e) {
throw new ServiceException(e);
}
}
@Override
public ModifyCachePoolResponseProto modifyCachePool(RpcController controller,
ModifyCachePoolRequestProto request) throws ServiceException {
try {
CachePoolInfo info =
new CachePoolInfo(request.getPoolName());
if (request.hasOwnerName()) {
info.setOwnerName(request.getOwnerName());
}
if (request.hasGroupName()) {
info.setGroupName(request.getGroupName());
}
if (request.hasMode()) {
info.setMode(request.getMode());
}
if (request.hasWeight()) {
info.setWeight(request.getWeight());
}
server.modifyCachePool(info);
server.modifyCachePool(
PBHelper.convert(request.getPool()).getId(),
PBHelper.convert(request.getInfo()));
return ModifyCachePoolResponseProto.newBuilder().build();
} catch (IOException e) {
throw new ServiceException(e);
@ -1185,7 +1158,7 @@ public class ClientNamenodeProtocolServerSideTranslatorPB implements
public RemoveCachePoolResponseProto removeCachePool(RpcController controller,
RemoveCachePoolRequestProto request) throws ServiceException {
try {
server.removeCachePool(request.getPoolName());
server.removeCachePool(PBHelper.convert(request.getPool()).getId());
return RemoveCachePoolResponseProto.newBuilder().build();
} catch (IOException e) {
throw new ServiceException(e);
@ -1196,28 +1169,16 @@ public class ClientNamenodeProtocolServerSideTranslatorPB implements
public ListCachePoolsResponseProto listCachePools(RpcController controller,
ListCachePoolsRequestProto request) throws ServiceException {
try {
RemoteIterator<CachePoolInfo> iter =
server.listCachePools(request.getPrevPoolName(),
RemoteIterator<CachePool> iter =
server.listCachePools(PBHelper.convert(request.getPrevPool()).getId(),
request.getMaxReplies());
ListCachePoolsResponseProto.Builder responseBuilder =
ListCachePoolsResponseProto.newBuilder();
while (iter.hasNext()) {
CachePoolInfo pool = iter.next();
ListCachePoolsResponseElementProto.Builder elemBuilder =
CachePool pool = iter.next();
ListCachePoolsResponseElementProto.Builder elemBuilder =
ListCachePoolsResponseElementProto.newBuilder();
elemBuilder.setPoolName(pool.getPoolName());
if (pool.getOwnerName() != null) {
elemBuilder.setOwnerName(pool.getOwnerName());
}
if (pool.getGroupName() != null) {
elemBuilder.setGroupName(pool.getGroupName());
}
if (pool.getMode() != null) {
elemBuilder.setMode(pool.getMode());
}
if (pool.getWeight() != null) {
elemBuilder.setWeight(pool.getWeight());
}
elemBuilder.setPool(PBHelper.convert(pool));
responseBuilder.addElements(elemBuilder.build());
}
return responseBuilder.build();

View File

@ -23,7 +23,6 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.NoSuchElementException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
@ -38,17 +37,12 @@ import org.apache.hadoop.fs.ParentNotDirectoryException;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.fs.UnresolvedLinkException;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException;
import org.apache.hadoop.hdfs.protocol.PathCacheDirective;
import org.apache.hadoop.hdfs.protocol.PathCacheEntry;
import org.apache.hadoop.hdfs.protocol.AddPathCacheDirectiveException.EmptyPathError;
import org.apache.hadoop.hdfs.protocol.AddPathCacheDirectiveException.InvalidPathNameError;
import org.apache.hadoop.hdfs.protocol.AddPathCacheDirectiveException.InvalidPoolNameError;
import org.apache.hadoop.hdfs.protocol.AddPathCacheDirectiveException.InvalidPoolError;
import org.apache.hadoop.hdfs.protocol.AddPathCacheDirectiveException.UnexpectedAddPathCacheDirectiveException;
import org.apache.hadoop.hdfs.protocol.RemovePathCacheEntryException.InvalidIdException;
import org.apache.hadoop.hdfs.protocol.RemovePathCacheEntryException.NoSuchIdException;
import org.apache.hadoop.hdfs.protocol.RemovePathCacheEntryException.RemovePermissionDeniedException;
import org.apache.hadoop.hdfs.protocol.RemovePathCacheEntryException.UnexpectedRemovePathCacheEntryException;
import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException;
import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.protocol.CorruptFileBlocks;
import org.apache.hadoop.hdfs.protocol.DSQuotaExceededException;
@ -61,14 +55,18 @@ import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
import org.apache.hadoop.hdfs.protocol.NSQuotaExceededException;
import org.apache.hadoop.hdfs.protocol.PathCacheDirective;
import org.apache.hadoop.hdfs.protocol.PathCacheEntry;
import org.apache.hadoop.hdfs.protocol.RemovePathCacheEntryException.InvalidIdException;
import org.apache.hadoop.hdfs.protocol.RemovePathCacheEntryException.NoSuchIdException;
import org.apache.hadoop.hdfs.protocol.RemovePathCacheEntryException.RemovePermissionDeniedException;
import org.apache.hadoop.hdfs.protocol.RemovePathCacheEntryException.UnexpectedRemovePathCacheEntryException;
import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport;
import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.AbandonBlockRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.AddBlockRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.AddCachePoolRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.PathCacheDirectiveProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.AddPathCacheDirectiveErrorProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.AddPathCacheDirectivesRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.AddPathCacheDirectivesResponseProto;
@ -109,23 +107,23 @@ import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetSna
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetSnapshottableDirListingRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetSnapshottableDirListingResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.IsFileClosedRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListPathCacheEntriesElementProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListPathCacheEntriesRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListPathCacheEntriesRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListPathCacheEntriesResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListCachePoolsRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListCachePoolsResponseElementProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListCachePoolsResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListCorruptFileBlocksRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListPathCacheEntriesRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListPathCacheEntriesResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.MetaSaveRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.MkdirsRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ModifyCachePoolRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.PathCacheDirectiveProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.PathCacheEntryProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RecoverLeaseRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RefreshNodesRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RemoveCachePoolRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RemovePathCacheEntriesRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RemovePathCacheEntriesResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RemovePathCacheEntryErrorProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RemoveCachePoolRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.Rename2RequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RenameRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RenameSnapshotRequestProto;
@ -146,6 +144,7 @@ import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.Update
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.UpdatePipelineRequestProto;
import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
import org.apache.hadoop.hdfs.server.namenode.CachePool;
import org.apache.hadoop.hdfs.server.namenode.NotReplicatedYetException;
import org.apache.hadoop.hdfs.server.namenode.SafeModeException;
import org.apache.hadoop.io.EnumSetWritable;
@ -1027,7 +1026,7 @@ public class ClientNamenodeProtocolTranslatorPB implements
return new InvalidPathNameError(directive);
} else if (code == AddPathCacheDirectiveErrorProto.
INVALID_POOL_NAME_ERROR_VALUE) {
return new InvalidPoolNameError(directive);
return new InvalidPoolError(directive);
} else {
return new UnexpectedAddPathCacheDirectiveException(directive);
}
@ -1042,7 +1041,7 @@ public class ClientNamenodeProtocolTranslatorPB implements
for (PathCacheDirective directive : directives) {
builder.addElements(PathCacheDirectiveProto.newBuilder().
setPath(directive.getPath()).
setPool(directive.getPool()).
setPool(PBHelper.convert(new CachePool(directive.getPoolId()))).
build());
}
AddPathCacheDirectivesResponseProto result =
@ -1121,42 +1120,40 @@ public class ClientNamenodeProtocolTranslatorPB implements
@Override
public PathCacheEntry get(int i) {
ListPathCacheEntriesElementProto elementProto =
response.getElements(i);
return new PathCacheEntry(elementProto.getId(),
new PathCacheDirective(elementProto.getPath(),
elementProto.getPool()));
PathCacheEntryProto entryProto = response.getEntries(i);
return PBHelper.convert(entryProto);
}
@Override
public int size() {
return response.getElementsCount();
return response.getEntriesCount();
}
}
private class PathCacheEntriesIterator
extends BatchedRemoteIterator<Long, PathCacheEntry> {
private final String pool;
private final long poolId;
public PathCacheEntriesIterator(long prevKey, int maxRepliesPerRequest,
String pool) {
long poolId) {
super(prevKey, maxRepliesPerRequest);
this.pool = pool;
this.poolId = poolId;
}
@Override
public BatchedEntries<PathCacheEntry> makeRequest(
Long nextKey, int maxRepliesPerRequest) throws IOException {
Long prevEntryId, int maxRepliesPerRequest) throws IOException {
ListPathCacheEntriesResponseProto response;
try {
ListPathCacheEntriesRequestProto req =
ListPathCacheEntriesRequestProto.newBuilder().
setPrevId(nextKey).
setPool(pool).
setPrevEntry(
PBHelper.convert(new PathCacheEntry(prevEntryId, null))).
setPool(PBHelper.convert(new CachePool(poolId))).
setMaxReplies(maxRepliesPerRequest).
build();
response = rpcProxy.listPathCacheEntries(null, req);
if (response.getElementsCount() == 0) {
if (response.getEntriesCount() == 0) {
response = null;
}
} catch (ServiceException e) {
@ -1166,58 +1163,37 @@ public class ClientNamenodeProtocolTranslatorPB implements
}
@Override
public Long elementToNextKey(PathCacheEntry element) {
public Long elementToPrevKey(PathCacheEntry element) {
return element.getEntryId();
}
}
@Override
public RemoteIterator<PathCacheEntry> listPathCacheEntries(long prevId,
String pool, int repliesPerRequest) throws IOException {
return new PathCacheEntriesIterator(prevId, repliesPerRequest, pool);
long poolId, int repliesPerRequest) throws IOException {
return new PathCacheEntriesIterator(prevId, repliesPerRequest, poolId);
}
@Override
public void addCachePool(CachePoolInfo info) throws IOException {
AddCachePoolRequestProto.Builder builder =
public CachePool addCachePool(CachePoolInfo info) throws IOException {
AddCachePoolRequestProto.Builder builder =
AddCachePoolRequestProto.newBuilder();
builder.setPoolName(info.getPoolName());
if (info.getOwnerName() != null) {
builder.setOwnerName(info.getOwnerName());
}
if (info.getGroupName() != null) {
builder.setGroupName(info.getGroupName());
}
if (info.getMode() != null) {
builder.setMode(info.getMode());
}
if (info.getWeight() != null) {
builder.setWeight(info.getWeight());
}
builder.setInfo(PBHelper.convert(info));
try {
rpcProxy.addCachePool(null, builder.build());
return PBHelper.convert(
rpcProxy.addCachePool(null, builder.build()).getPool());
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
}
@Override
public void modifyCachePool(CachePoolInfo req) throws IOException {
ModifyCachePoolRequestProto.Builder builder =
ModifyCachePoolRequestProto.newBuilder();
builder.setPoolName(req.getPoolName());
if (req.getOwnerName() != null) {
builder.setOwnerName(req.getOwnerName());
}
if (req.getGroupName() != null) {
builder.setGroupName(req.getGroupName());
}
if (req.getMode() != null) {
builder.setMode(req.getMode());
}
if (req.getWeight() != null) {
builder.setWeight(req.getWeight());
}
public void modifyCachePool(long poolId, CachePoolInfo info)
throws IOException {
ModifyCachePoolRequestProto.Builder builder =
ModifyCachePoolRequestProto.newBuilder()
.setPool(PBHelper.convert(new CachePool(poolId)))
.setInfo(PBHelper.convert(info));
try {
rpcProxy.modifyCachePool(null, builder.build());
} catch (ServiceException e) {
@ -1226,32 +1202,30 @@ public class ClientNamenodeProtocolTranslatorPB implements
}
@Override
public void removeCachePool(String cachePoolName) throws IOException {
public void removeCachePool(long poolId) throws IOException {
try {
rpcProxy.removeCachePool(null,
rpcProxy.removeCachePool(null,
RemoveCachePoolRequestProto.newBuilder().
setPoolName(cachePoolName).build());
setPool(PBHelper.convert(new CachePool(poolId))).
build());
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
}
private static class BatchedPathDirectiveEntries
implements BatchedEntries<CachePoolInfo> {
implements BatchedEntries<CachePool> {
private final ListCachePoolsResponseProto proto;
public BatchedPathDirectiveEntries(ListCachePoolsResponseProto proto) {
this.proto = proto;
}
@Override
public CachePoolInfo get(int i) {
public CachePool get(int i) {
ListCachePoolsResponseElementProto elem = proto.getElements(i);
return new CachePoolInfo(elem.getPoolName()).
setOwnerName(elem.getOwnerName()).
setGroupName(elem.getGroupName()).
setMode(elem.getMode()).
setWeight(elem.getWeight());
return PBHelper.convert(elem.getPool());
}
@Override
@ -1259,37 +1233,38 @@ public class ClientNamenodeProtocolTranslatorPB implements
return proto.getElementsCount();
}
}
private class CachePoolIterator
extends BatchedRemoteIterator<String, CachePoolInfo> {
public CachePoolIterator(String prevKey, int maxRepliesPerRequest) {
private class CachePoolIterator
extends BatchedRemoteIterator<Long, CachePool> {
public CachePoolIterator(Long prevKey, int maxRepliesPerRequest) {
super(prevKey, maxRepliesPerRequest);
}
@Override
public BatchedEntries<CachePoolInfo> makeRequest(String prevKey,
public BatchedEntries<CachePool> makeRequest(Long prevKey,
int maxRepliesPerRequest) throws IOException {
try {
return new BatchedPathDirectiveEntries(
rpcProxy.listCachePools(null,
rpcProxy.listCachePools(null,
ListCachePoolsRequestProto.newBuilder().
setPrevPoolName(prevKey).
setMaxReplies(maxRepliesPerRequest).build()));
setPrevPool(PBHelper.convert(new CachePool(prevKey))).
setMaxReplies(maxRepliesPerRequest).
build()));
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
}
@Override
public String elementToNextKey(CachePoolInfo element) {
return element.getPoolName();
public Long elementToPrevKey(CachePool element) {
return element.getId();
}
}
@Override
public RemoteIterator<CachePoolInfo> listCachePools(String prevKey,
public RemoteIterator<CachePool> listCachePools(long prevPoolId,
int maxRepliesPerRequest) throws IOException {
return new CachePoolIterator(prevKey, maxRepliesPerRequest);
return new CachePoolIterator(prevPoolId, maxRepliesPerRequest);
}
}

View File

@ -32,10 +32,13 @@ import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.protocol.CorruptFileBlocks;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.PathCacheDirective;
import org.apache.hadoop.hdfs.protocol.PathCacheEntry;
import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo.AdminStates;
import org.apache.hadoop.hdfs.protocol.DirectoryListing;
@ -50,9 +53,15 @@ import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.AddCachePoolRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CachePoolInfoProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CachePoolProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CreateFlagProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.DatanodeReportTypeProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetFsStatsResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ModifyCachePoolRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.PathCacheDirectiveProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.PathCacheEntryProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SafeModeActionProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BalancerBandwidthCommandProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockCommandProto;
@ -114,6 +123,7 @@ import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifie
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
import org.apache.hadoop.hdfs.server.common.StorageInfo;
import org.apache.hadoop.hdfs.server.namenode.CachePool;
import org.apache.hadoop.hdfs.server.namenode.CheckpointSignature;
import org.apache.hadoop.hdfs.server.namenode.INodeId;
import org.apache.hadoop.hdfs.server.protocol.BalancerBandwidthCommand;
@ -1493,6 +1503,74 @@ public class PBHelper {
return HdfsProtos.ChecksumTypeProto.valueOf(type.id);
}
public static PathCacheDirective convert(
PathCacheDirectiveProto directiveProto) {
CachePool pool = convert(directiveProto.getPool());
return new PathCacheDirective(directiveProto.getPath(), pool.getId());
}
public static PathCacheDirectiveProto convert(PathCacheDirective directive) {
PathCacheDirectiveProto.Builder builder =
PathCacheDirectiveProto.newBuilder()
.setPath(directive.getPath())
.setPool(PBHelper.convert(new CachePool(directive.getPoolId())));
return builder.build();
}
public static PathCacheEntry convert(PathCacheEntryProto entryProto) {
long entryId = entryProto.getId();
PathCacheDirective directive = convert(entryProto.getDirective());
return new PathCacheEntry(entryId, directive);
}
public static PathCacheEntryProto convert(PathCacheEntry entry) {
PathCacheEntryProto.Builder builder = PathCacheEntryProto.newBuilder()
.setId(entry.getEntryId())
.setDirective(PBHelper.convert(entry.getDirective()));
return builder.build();
}
public static CachePoolInfo convert(CachePoolInfoProto infoProto) {
CachePoolInfo.Builder builder =
CachePoolInfo.newBuilder().setPoolName(infoProto.getPoolName());
if (infoProto.hasOwnerName()) {
builder.setOwnerName(infoProto.getOwnerName());
}
if (infoProto.hasGroupName()) {
builder.setGroupName(infoProto.getGroupName());
}
if (infoProto.hasMode()) {
builder.setMode(new FsPermission((short) infoProto.getMode()));
}
if (infoProto.hasWeight()) {
builder.setWeight(infoProto.getWeight());
}
return builder.build();
}
public static CachePoolInfoProto convert(CachePoolInfo info) {
CachePoolInfoProto.Builder builder = CachePoolInfoProto.newBuilder()
.setPoolName(info.getPoolName())
.setOwnerName(info.getOwnerName())
.setGroupName(info.getGroupName())
.setMode(info.getMode().toShort())
.setWeight(info.getWeight());
return builder.build();
}
public static CachePool convert(CachePoolProto poolProto) {
CachePoolInfo info = convert(poolProto.getInfo());
CachePool pool = new CachePool(poolProto.getId(), info);
return pool;
}
public static CachePoolProto convert(CachePool pool) {
CachePoolProto.Builder builder = CachePoolProto.newBuilder()
.setId(pool.getId())
.setInfo(convert(pool.getInfo()));
return builder.build();
}
public static InputStream vintPrefixed(final InputStream input)
throws IOException {
final int firstByte = input.read();

View File

@ -19,25 +19,26 @@ package org.apache.hadoop.hdfs.server.namenode;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.Map.Entry;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException;
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.hdfs.protocol.AddPathCacheDirectiveException.InvalidPoolError;
import org.apache.hadoop.hdfs.protocol.AddPathCacheDirectiveException.PoolWritePermissionDeniedError;
import org.apache.hadoop.hdfs.protocol.AddPathCacheDirectiveException.UnexpectedAddPathCacheDirectiveException;
import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
import org.apache.hadoop.hdfs.protocol.PathCacheDirective;
import org.apache.hadoop.hdfs.protocol.PathCacheEntry;
import org.apache.hadoop.hdfs.protocol.AddPathCacheDirectiveException.InvalidPoolNameError;
import org.apache.hadoop.hdfs.protocol.AddPathCacheDirectiveException.UnexpectedAddPathCacheDirectiveException;
import org.apache.hadoop.hdfs.protocol.AddPathCacheDirectiveException.PoolWritePermissionDeniedError;
import org.apache.hadoop.hdfs.protocol.RemovePathCacheEntryException.InvalidIdException;
import org.apache.hadoop.hdfs.protocol.RemovePathCacheEntryException.NoSuchIdException;
import org.apache.hadoop.hdfs.protocol.RemovePathCacheEntryException.UnexpectedRemovePathCacheEntryException;
import org.apache.hadoop.hdfs.protocol.RemovePathCacheEntryException.RemovePermissionDeniedException;
import org.apache.hadoop.hdfs.protocol.RemovePathCacheEntryException.UnexpectedRemovePathCacheEntryException;
import org.apache.hadoop.util.Fallible;
/**
@ -64,14 +65,25 @@ final class CacheManager {
/**
* Cache pools, sorted by name.
*/
private final TreeMap<String, CachePool> cachePools =
private final TreeMap<String, CachePool> cachePoolsByName =
new TreeMap<String, CachePool>();
/**
* Cache pools, sorted by ID
*/
private final TreeMap<Long, CachePool> cachePoolsById =
new TreeMap<Long, CachePool>();
/**
* The entry ID to use for a new entry.
*/
private long nextEntryId;
/**
* The pool ID to use for a new pool.
*/
private long nextPoolId;
CacheManager(FSDirectory dir, Configuration conf) {
// TODO: support loading and storing of the CacheManager state
clear();
@ -80,26 +92,35 @@ final class CacheManager {
synchronized void clear() {
entriesById.clear();
entriesByDirective.clear();
cachePoolsByName.clear();
cachePoolsById.clear();
nextEntryId = 1;
nextPoolId = 1;
}
synchronized long getNextEntryId() throws IOException {
if (nextEntryId == Long.MAX_VALUE) {
throw new IOException("no more available IDs");
throw new IOException("no more available entry IDs");
}
return nextEntryId++;
}
synchronized long getNextPoolId() throws IOException {
if (nextPoolId == Long.MAX_VALUE) {
throw new IOException("no more available pool IDs");
}
return nextPoolId++;
}
private synchronized Fallible<PathCacheEntry> addDirective(
PathCacheDirective directive, FSPermissionChecker pc) {
CachePool pool = cachePools.get(directive.getPool());
FSPermissionChecker pc, PathCacheDirective directive) {
CachePool pool = cachePoolsById.get(directive.getPoolId());
if (pool == null) {
LOG.info("addDirective " + directive + ": pool not found.");
return new Fallible<PathCacheEntry>(
new InvalidPoolNameError(directive));
new InvalidPoolError(directive));
}
if (!pc.checkWritePermission(pool.getOwnerName(),
pool.getGroupName(), pool.getMode())) {
if (!pc.checkPermission(pool, FsAction.WRITE)) {
LOG.info("addDirective " + directive + ": write permission denied.");
return new Fallible<PathCacheEntry>(
new PoolWritePermissionDeniedError(directive));
@ -134,17 +155,17 @@ final class CacheManager {
}
public synchronized List<Fallible<PathCacheEntry>> addDirectives(
List<PathCacheDirective> directives, FSPermissionChecker pc) {
FSPermissionChecker pc, List<PathCacheDirective> directives) {
ArrayList<Fallible<PathCacheEntry>> results =
new ArrayList<Fallible<PathCacheEntry>>(directives.size());
for (PathCacheDirective directive: directives) {
results.add(addDirective(directive, pc));
results.add(addDirective(pc, directive));
}
return results;
}
private synchronized Fallible<Long> removeEntry(long entryId,
FSPermissionChecker pc) {
private synchronized Fallible<Long> removeEntry(FSPermissionChecker pc,
long entryId) {
// Check for invalid IDs.
if (entryId <= 0) {
LOG.info("removeEntry " + entryId + ": invalid non-positive entry ID.");
@ -156,23 +177,20 @@ final class CacheManager {
LOG.info("removeEntry " + entryId + ": entry not found.");
return new Fallible<Long>(new NoSuchIdException(entryId));
}
CachePool pool = cachePools.get(existing.getDirective().getPool());
CachePool pool = cachePoolsById.get(existing.getDirective().getPoolId());
if (pool == null) {
LOG.info("removeEntry " + entryId + ": pool not found for directive " +
existing.getDirective());
return new Fallible<Long>(
new UnexpectedRemovePathCacheEntryException(entryId));
}
if (!pc.isSuperUser()) {
if (!pc.checkWritePermission(pool.getOwnerName(),
pool.getGroupName(), pool.getMode())) {
LOG.info("removeEntry " + entryId + ": write permission denied to " +
"pool " + pool + " for entry " + existing);
return new Fallible<Long>(
new RemovePermissionDeniedException(entryId));
}
if (!pc.checkPermission(pool, FsAction.WRITE)) {
LOG.info("removeEntry " + entryId + ": write permission denied to " +
"pool " + pool + " for entry " + existing);
return new Fallible<Long>(
new RemovePermissionDeniedException(entryId));
}
// Remove the corresponding entry in entriesByDirective.
if (entriesByDirective.remove(existing.getDirective()) == null) {
LOG.warn("removeEntry " + entryId + ": failed to find existing entry " +
@ -184,36 +202,43 @@ final class CacheManager {
return new Fallible<Long>(entryId);
}
public synchronized List<Fallible<Long>> removeEntries(List<Long> entryIds,
FSPermissionChecker pc) {
public synchronized List<Fallible<Long>> removeEntries(FSPermissionChecker pc,
List<Long> entryIds) {
ArrayList<Fallible<Long>> results =
new ArrayList<Fallible<Long>>(entryIds.size());
for (Long entryId : entryIds) {
results.add(removeEntry(entryId, pc));
results.add(removeEntry(pc, entryId));
}
return results;
}
public synchronized List<PathCacheEntry> listPathCacheEntries(long prevId,
String pool, int maxReplies) {
public synchronized List<PathCacheEntry> listPathCacheEntries(
FSPermissionChecker pc, long prevId, Long poolId, int maxReplies) {
final int MAX_PRE_ALLOCATED_ENTRIES = 16;
ArrayList<PathCacheEntry> replies =
new ArrayList<PathCacheEntry>(Math.min(MAX_PRE_ALLOCATED_ENTRIES, maxReplies));
ArrayList<PathCacheEntry> replies = new ArrayList<PathCacheEntry>(
Math.min(MAX_PRE_ALLOCATED_ENTRIES, maxReplies));
int numReplies = 0;
SortedMap<Long, PathCacheEntry> tailMap = entriesById.tailMap(prevId + 1);
for (Entry<Long, PathCacheEntry> cur : tailMap.entrySet()) {
for (PathCacheEntry entry : tailMap.values()) {
if (numReplies >= maxReplies) {
return replies;
}
if (pool.isEmpty() || cur.getValue().getDirective().
getPool().equals(pool)) {
replies.add(cur.getValue());
numReplies++;
long entryPoolId = entry.getDirective().getPoolId();
if (poolId == null || poolId <= 0 || entryPoolId == poolId) {
if (pc.checkPermission(
cachePoolsById.get(entryPoolId), FsAction.EXECUTE)) {
replies.add(entry);
numReplies++;
}
}
}
return replies;
}
synchronized CachePool getCachePool(long id) {
return cachePoolsById.get(id);
}
/**
* Create a cache pool.
*
@ -221,22 +246,24 @@ final class CacheManager {
*
* @param info
* The info for the cache pool to create.
* @return created CachePool
*/
public synchronized void addCachePool(CachePoolInfo info)
public synchronized CachePool addCachePool(CachePoolInfo info)
throws IOException {
String poolName = info.getPoolName();
if (poolName.isEmpty()) {
if (poolName == null || poolName.isEmpty()) {
throw new IOException("invalid empty cache pool name");
}
CachePool pool = cachePools.get(poolName);
if (pool != null) {
if (cachePoolsByName.containsKey(poolName)) {
throw new IOException("cache pool " + poolName + " already exists.");
}
CachePool cachePool = new CachePool(poolName,
CachePool cachePool = new CachePool(getNextPoolId(), poolName,
info.getOwnerName(), info.getGroupName(), info.getMode(),
info.getWeight());
cachePools.put(poolName, cachePool);
cachePoolsById.put(cachePool.getId(), cachePool);
cachePoolsByName.put(poolName, cachePool);
LOG.info("created new cache pool " + cachePool);
return cachePool;
}
/**
@ -247,46 +274,62 @@ final class CacheManager {
* @param info
* The info for the cache pool to modify.
*/
public synchronized void modifyCachePool(CachePoolInfo info)
public synchronized void modifyCachePool(long poolId, CachePoolInfo info)
throws IOException {
String poolName = info.getPoolName();
if (poolName.isEmpty()) {
throw new IOException("invalid empty cache pool name");
if (poolId <= 0) {
throw new IOException("invalid pool id " + poolId);
}
CachePool pool = cachePools.get(poolName);
if (pool == null) {
throw new IOException("cache pool " + poolName + " does not exist.");
if (!cachePoolsById.containsKey(poolId)) {
throw new IOException("cache pool id " + poolId + " does not exist.");
}
CachePool pool = cachePoolsById.get(poolId);
// Remove the old CachePoolInfo
removeCachePool(poolId);
// Build up the new CachePoolInfo
CachePoolInfo.Builder newInfo = CachePoolInfo.newBuilder(pool.getInfo());
StringBuilder bld = new StringBuilder();
String prefix = "";
if (info.getPoolName() != null) {
newInfo.setPoolName(info.getPoolName());
bld.append(prefix).
append("set name to ").append(info.getOwnerName());
prefix = "; ";
}
if (info.getOwnerName() != null) {
pool.setOwnerName(info.getOwnerName());
newInfo.setOwnerName(info.getOwnerName());
bld.append(prefix).
append("set owner to ").append(info.getOwnerName());
prefix = "; ";
}
if (info.getGroupName() != null) {
pool.setGroupName(info.getGroupName());
newInfo.setGroupName(info.getGroupName());
bld.append(prefix).
append("set group to ").append(info.getGroupName());
prefix = "; ";
}
if (info.getMode() != null) {
pool.setMode(info.getMode());
newInfo.setMode(info.getMode());
bld.append(prefix).
append(String.format("set mode to 0%3o", info.getMode()));
append(String.format("set mode to ", info.getMode()));
prefix = "; ";
}
if (info.getWeight() != null) {
pool.setWeight(info.getWeight());
newInfo.setWeight(info.getWeight());
bld.append(prefix).
append("set weight to ").append(info.getWeight());
prefix = "; ";
}
if (prefix.isEmpty()) {
bld.append("no changes.");
} else {
pool.setInfo(newInfo.build());
}
LOG.info("modified " + poolName + "; " + bld.toString());
// Put the newly modified info back in
cachePoolsById.put(poolId, pool);
cachePoolsByName.put(info.getPoolName(), pool);
LOG.info("modified pool id " + pool.getId()
+ " (" + pool.getInfo().getPoolName() + "); "
+ bld.toString());
}
/**
@ -294,27 +337,38 @@ final class CacheManager {
*
* Only the superuser should be able to call this function.
*
* @param poolName
* The name for the cache pool to remove.
* @param poolId
* The id of the cache pool to remove.
*/
public synchronized void removeCachePool(String poolName)
throws IOException {
CachePool pool = cachePools.remove(poolName);
if (pool == null) {
throw new IOException("can't remove nonexistent cache pool " + poolName);
public synchronized void removeCachePool(long poolId) throws IOException {
if (!cachePoolsById.containsKey(poolId)) {
throw new IOException("can't remove nonexistent cache pool id " + poolId);
}
// Remove all the entries associated with the pool
Iterator<Map.Entry<Long, PathCacheEntry>> it =
entriesById.entrySet().iterator();
while (it.hasNext()) {
Map.Entry<Long, PathCacheEntry> entry = it.next();
if (entry.getValue().getDirective().getPoolId() == poolId) {
it.remove();
entriesByDirective.remove(entry.getValue().getDirective());
}
}
// Remove the pool
CachePool pool = cachePoolsById.remove(poolId);
cachePoolsByName.remove(pool.getInfo().getPoolName());
}
public synchronized List<CachePoolInfo>
listCachePools(FSPermissionChecker pc, String prevKey,
int maxRepliesPerRequest) {
public synchronized List<CachePool> listCachePools(Long prevKey,
int maxRepliesPerRequest) {
final int MAX_PREALLOCATED_REPLIES = 16;
ArrayList<CachePoolInfo> results =
new ArrayList<CachePoolInfo>(Math.min(MAX_PREALLOCATED_REPLIES,
ArrayList<CachePool> results =
new ArrayList<CachePool>(Math.min(MAX_PREALLOCATED_REPLIES,
maxRepliesPerRequest));
SortedMap<String, CachePool> tailMap = cachePools.tailMap(prevKey, false);
for (Entry<String, CachePool> cur : tailMap.entrySet()) {
results.add(cur.getValue().getInfo(pc));
SortedMap<Long, CachePool> tailMap =
cachePoolsById.tailMap(prevKey, false);
for (CachePool pool : tailMap.values()) {
results.add(pool);
}
return results;
}

View File

@ -19,123 +19,119 @@ package org.apache.hadoop.hdfs.server.namenode;
import java.io.IOException;
import javax.annotation.Nonnull;
import org.apache.commons.lang.builder.EqualsBuilder;
import org.apache.commons.lang.builder.HashCodeBuilder;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
import org.apache.hadoop.hdfs.protocol.CachePoolInfo.Builder;
import org.apache.hadoop.security.UserGroupInformation;
/**
* The NameNode uses CachePools to manage cache resources on the DataNodes.
* A CachePool describes a set of cache resources being managed by the NameNode.
* User caching requests are billed to the cache pool specified in the request.
*
* CachePools are uniquely identified by a numeric id as well as the
* {@link CachePoolInfo} pool name. Mutable metadata is contained in
* CachePoolInfo, including pool name, owner, group, and permissions.
* See this class for more details.
*/
public final class CachePool {
public static final Log LOG = LogFactory.getLog(CachePool.class);
@Nonnull
private final String poolName;
private final long id;
@Nonnull
private String ownerName;
private CachePoolInfo info;
@Nonnull
private String groupName;
private int mode;
private int weight;
public static String getCurrentUserPrimaryGroupName() throws IOException {
UserGroupInformation ugi= NameNode.getRemoteUser();
String[] groups = ugi.getGroupNames();
if (groups.length == 0) {
throw new IOException("failed to get group names from UGI " + ugi);
public CachePool(long id) {
this.id = id;
this.info = null;
}
CachePool(long id, String poolName, String ownerName, String groupName,
FsPermission mode, Integer weight) throws IOException {
this.id = id;
// Set CachePoolInfo default fields if null
if (poolName == null || poolName.isEmpty()) {
throw new IOException("invalid empty cache pool name");
}
return groups[0];
}
public CachePool(String poolName, String ownerName, String groupName,
Integer mode, Integer weight) throws IOException {
this.poolName = poolName;
this.ownerName = ownerName != null ? ownerName :
NameNode.getRemoteUser().getShortUserName();
this.groupName = groupName != null ? groupName :
getCurrentUserPrimaryGroupName();
this.mode = mode != null ? mode : 0644;
this.weight = weight != null ? weight : 100;
UserGroupInformation ugi = null;
if (ownerName == null) {
ugi = NameNode.getRemoteUser();
ownerName = ugi.getShortUserName();
}
if (groupName == null) {
if (ugi == null) {
ugi = NameNode.getRemoteUser();
}
String[] groups = ugi.getGroupNames();
if (groups.length == 0) {
throw new IOException("failed to get group names from UGI " + ugi);
}
groupName = groups[0];
}
if (mode == null) {
mode = FsPermission.getDirDefault();
}
if (weight == null) {
weight = 100;
}
CachePoolInfo.Builder builder = CachePoolInfo.newBuilder();
builder.setPoolName(poolName).setOwnerName(ownerName)
.setGroupName(groupName).setMode(mode).setWeight(weight);
this.info = builder.build();
}
public String getName() {
return poolName;
public CachePool(long id, CachePoolInfo info) {
this.id = id;
this.info = info;
}
public String getOwnerName() {
return ownerName;
/**
* @return id of the pool
*/
public long getId() {
return id;
}
public CachePool setOwnerName(String ownerName) {
this.ownerName = ownerName;
return this;
}
public String getGroupName() {
return groupName;
}
public CachePool setGroupName(String groupName) {
this.groupName = groupName;
return this;
}
public int getMode() {
return mode;
}
public CachePool setMode(int mode) {
this.mode = mode;
return this;
}
public int getWeight() {
return weight;
}
public CachePool setWeight(int weight) {
this.weight = weight;
return this;
}
/**
* Get information about this cache pool.
*
* @param fullInfo
* If true, only the name will be returned (i.e., what you
* would get if you didn't have read permission for this pool.)
* @return
* Cache pool information.
*/
public CachePoolInfo getInfo(boolean fullInfo) {
CachePoolInfo info = new CachePoolInfo(poolName);
if (!fullInfo) {
return info;
}
return info.setOwnerName(ownerName).
setGroupName(groupName).
setMode(mode).
setWeight(weight);
public CachePoolInfo getInfo() {
return info;
}
public CachePoolInfo getInfo(FSPermissionChecker pc) {
return getInfo(pc.checkReadPermission(ownerName, groupName, mode));
void setInfo(CachePoolInfo info) {
this.info = info;
}
public String toString() {
return new StringBuilder().
append("{ ").append("poolName:").append(poolName).
append(", ownerName:").append(ownerName).
append(", groupName:").append(groupName).
append(", mode:").append(String.format("%3o", mode)).
append(", weight:").append(weight).
append("{ ").append("id:").append(id).
append(", info:").append(info.toString()).
append(" }").toString();
}
@Override
public int hashCode() {
return new HashCodeBuilder().append(id).append(info).hashCode();
}
@Override
public boolean equals(Object obj) {
if (obj == null) { return false; }
if (obj == this) { return true; }
if (obj.getClass() != getClass()) {
return false;
}
CachePool rhs = (CachePool)obj;
return new EqualsBuilder()
.append(id, rhs.id)
.append(info, rhs.info)
.isEquals();
}
}

View File

@ -6701,7 +6701,6 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
return; // Return previous response
}
boolean success = false;
checkOperation(OperationCategory.WRITE);
writeLock();
try {
checkOperation(OperationCategory.WRITE);
@ -6749,6 +6748,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
}
}
@SuppressWarnings("unchecked")
List<Fallible<PathCacheEntry>> addPathCacheDirectives(
List<PathCacheDirective> directives) throws IOException {
CacheEntryWithPayload retryCacheEntry =
@ -6759,7 +6759,6 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
final FSPermissionChecker pc = getPermissionChecker();
boolean success = false;
List<Fallible<PathCacheEntry>> results = null;
checkOperation(OperationCategory.WRITE);
writeLock();
try {
checkOperation(OperationCategory.WRITE);
@ -6767,7 +6766,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
throw new SafeModeException(
"Cannot add path cache directive", safeMode);
}
results = cacheManager.addDirectives(directives, pc);
results = cacheManager.addDirectives(pc, directives);
//getEditLog().logAddPathCacheDirectives(results); FIXME: HDFS-5119
success = true;
} finally {
@ -6775,7 +6774,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
if (success) {
getEditLog().logSync();
}
if (isAuditEnabled() && isExternalInvocation()) {
if (auditLog.isInfoEnabled() && isExternalInvocation()) {
logAuditEvent(success, "addPathCacheDirectives", null, null, null);
}
RetryCache.setState(retryCacheEntry, success, results);
@ -6783,58 +6782,50 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
return results;
}
List<Fallible<Long>> removePathCacheEntries(List<Long> ids) throws IOException {
CacheEntryWithPayload retryCacheEntry =
RetryCache.waitForCompletion(retryCache, null);
if (retryCacheEntry != null && retryCacheEntry.isSuccess()) {
return (List<Fallible<Long>>) retryCacheEntry.getPayload();
}
@SuppressWarnings("unchecked")
List<Fallible<Long>> removePathCacheEntries(List<Long> ids)
throws IOException {
final FSPermissionChecker pc = getPermissionChecker();
boolean success = false;
List<Fallible<Long>> results = null;
checkOperation(OperationCategory.WRITE);
writeLock();
try {
checkOperation(OperationCategory.WRITE);
if (isInSafeMode()) {
throw new SafeModeException(
"Cannot remove path cache directives", safeMode);
"Cannot add path cache directive", safeMode);
}
results = cacheManager.removeEntries(ids, pc);
results = cacheManager.removeEntries(pc, ids);
//getEditLog().logRemovePathCacheEntries(results); FIXME: HDFS-5119
success = true;
} finally {
writeUnlock();
if (isAuditEnabled() && isExternalInvocation()) {
if (success) {
getEditLog().logSync();
}
if (auditLog.isInfoEnabled() && isExternalInvocation()) {
logAuditEvent(success, "removePathCacheEntries", null, null, null);
}
RetryCache.setState(retryCacheEntry, success, results);
}
getEditLog().logSync();
return results;
}
List<PathCacheEntry> listPathCacheEntries(long startId, String pool,
int maxReplies) throws IOException {
checkOperation(OperationCategory.READ);
readLock();
try {
checkOperation(OperationCategory.READ);
return cacheManager.listPathCacheEntries(startId, pool, maxReplies);
} finally {
readUnlock();
}
List<PathCacheEntry> listPathCacheEntries(long startId,
Long poolId, int maxReplies) throws IOException {
LOG.info("listPathCacheEntries with " + startId + " " + poolId);
final FSPermissionChecker pc = getPermissionChecker();
return cacheManager.listPathCacheEntries(pc, startId, poolId, maxReplies);
}
public void addCachePool(CachePoolInfo req) throws IOException {
public CachePool addCachePool(CachePoolInfo req) throws IOException {
final FSPermissionChecker pc = getPermissionChecker();
CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
CacheEntryWithPayload cacheEntry =
RetryCache.waitForCompletion(retryCache, null);
if (cacheEntry != null && cacheEntry.isSuccess()) {
return; // Return previous response
return (CachePool)cacheEntry.getPayload(); // Return previous response
}
checkOperation(OperationCategory.WRITE);
writeLock();
boolean success = false;
CachePool pool = null;
try {
checkOperation(OperationCategory.WRITE);
if (!pc.isSuperUser()) {
@ -6845,29 +6836,28 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
throw new SafeModeException(
"Cannot add cache pool " + req.getPoolName(), safeMode);
}
cacheManager.addCachePool(req);
pool = cacheManager.addCachePool(req);
RetryCache.setState(cacheEntry, true);
//getEditLog().logAddCachePool(req); // FIXME: HDFS-5119
success = true;
} finally {
writeUnlock();
if (isAuditEnabled() && isExternalInvocation()) {
logAuditEvent(success, "addCachePool", req.getPoolName(), null, null);
}
RetryCache.setState(cacheEntry, success);
}
getEditLog().logSync();
if (auditLog.isInfoEnabled() && isExternalInvocation()) {
logAuditEvent(true, "addCachePool", req.getPoolName(), null, null);
}
return pool;
}
public void modifyCachePool(CachePoolInfo req) throws IOException {
public void modifyCachePool(long poolId, CachePoolInfo info)
throws IOException {
final FSPermissionChecker pc = getPermissionChecker();
CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
if (cacheEntry != null && cacheEntry.isSuccess()) {
return; // Return previous response
}
checkOperation(OperationCategory.WRITE);
writeLock();
boolean success = false;
try {
checkOperation(OperationCategory.WRITE);
if (!pc.isSuperUser()) {
@ -6876,64 +6866,62 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
}
if (isInSafeMode()) {
throw new SafeModeException(
"Cannot modify cache pool " + req.getPoolName(), safeMode);
"Cannot modify cache pool " + info.getPoolName(), safeMode);
}
cacheManager.modifyCachePool(req);
cacheManager.modifyCachePool(poolId, info);
RetryCache.setState(cacheEntry, true);
//getEditLog().logModifyCachePool(req); // FIXME: HDFS-5119
success = true;
} finally {
writeUnlock();
if (isAuditEnabled() && isExternalInvocation()) {
logAuditEvent(success, "modifyCachePool", req.getPoolName(), null, null);
}
RetryCache.setState(cacheEntry, success);
}
getEditLog().logSync();
if (auditLog.isInfoEnabled() && isExternalInvocation()) {
logAuditEvent(true, "modifyCachePool", info.getPoolName(), null, null);
}
}
public void removeCachePool(String cachePoolName) throws IOException {
public void removeCachePool(long poolId) throws IOException {
final FSPermissionChecker pc = getPermissionChecker();
CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
if (cacheEntry != null && cacheEntry.isSuccess()) {
return; // Return previous response
}
checkOperation(OperationCategory.WRITE);
writeLock();
boolean success = false;
CachePool pool;
try {
checkOperation(OperationCategory.WRITE);
if (!pc.isSuperUser()) {
throw new AccessControlException("Non-super users cannot " +
"remove cache pools.");
}
pool = cacheManager.getCachePool(poolId);
if (isInSafeMode()) {
String identifier;
if (pool == null) {
identifier = "with id " + Long.toString(poolId);
} else {
identifier = pool.getInfo().getPoolName();
}
throw new SafeModeException(
"Cannot remove cache pool " + cachePoolName, safeMode);
"Cannot remove cache pool " + identifier, safeMode);
}
cacheManager.removeCachePool(cachePoolName);
cacheManager.removeCachePool(poolId);
//getEditLog().logRemoveCachePool(req); // FIXME: HDFS-5119
success = true;
} finally {
writeUnlock();
if (isAuditEnabled() && isExternalInvocation()) {
logAuditEvent(success, "removeCachePool", cachePoolName, null, null);
}
RetryCache.setState(cacheEntry, success);
}
getEditLog().logSync();
if (auditLog.isInfoEnabled() && isExternalInvocation()) {
logAuditEvent(true, "removeCachePool", pool.getInfo().getPoolName(),
null, null);
}
}
public List<CachePoolInfo> listCachePools(String prevKey,
public List<CachePool> listCachePools(long prevKey,
int maxRepliesPerRequest) throws IOException {
final FSPermissionChecker pc = getPermissionChecker();
List<CachePoolInfo> results;
checkOperation(OperationCategory.READ);
List<CachePool> results;
readLock();
try {
checkOperation(OperationCategory.READ);
results = cacheManager.listCachePools(pc, prevKey, maxRepliesPerRequest);
results = cacheManager.listCachePools(prevKey, maxRepliesPerRequest);
} finally {
readUnlock();
}

View File

@ -17,7 +17,6 @@
*/
package org.apache.hadoop.hdfs.server.namenode;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
@ -29,6 +28,7 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.UnresolvedLinkException;
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.UserGroupInformation;
@ -257,38 +257,29 @@ class FSPermissionChecker {
}
/**
* Check if this CachePool can be accessed.
* Whether a cache pool can be accessed by the current context
*
* @param pc
* Permission checker object with user name and groups.
* @param write
* True if we care about write access; false otherwise.
* @return
* True only if the cache pool is accessible.
* @param pool CachePool being accessed
* @param access type of action being performed on the cache pool
* @return if the pool can be accessed
*/
private boolean checkPermission(String userName,
String groupName, int mode, int mask) {
if ((mode & mask) != 0) {
public boolean checkPermission(CachePool pool, FsAction access) {
CachePoolInfo info = pool.getInfo();
FsPermission mode = info.getMode();
if (isSuperUser()) {
return true;
}
if (((mode & (mask << 6)) != 0)
&& (getUser().equals(userName))) {
if (user.equals(info.getOwnerName())
&& mode.getUserAction().implies(access)) {
return true;
}
if (((mode & (mask << 6)) != 0)
&& (containsGroup(groupName))) {
if (groups.contains(info.getGroupName())
&& mode.getGroupAction().implies(access)) {
return true;
}
if (mode.getOtherAction().implies(access)) {
return true;
}
return false;
}
public boolean checkWritePermission(String userName,
String groupName, int mode) {
return checkPermission(userName, groupName, mode, 02);
}
public boolean checkReadPermission(String userName,
String groupName, int mode) {
return checkPermission(userName, groupName, mode, 04);
}
}

View File

@ -31,13 +31,11 @@ import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.NoSuchElementException;
import org.apache.commons.logging.Log;
import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BatchedRemoteIterator;
import org.apache.hadoop.fs.BatchedRemoteIterator.BatchedEntries;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.ContentSummary;
import org.apache.hadoop.fs.CreateFlag;
@ -62,9 +60,9 @@ import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.HDFSPolicyProvider;
import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException;
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
import org.apache.hadoop.hdfs.protocol.PathCacheDirective;
import org.apache.hadoop.hdfs.protocol.PathCacheEntry;
import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
import org.apache.hadoop.hdfs.protocol.CorruptFileBlocks;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
@ -1225,72 +1223,73 @@ class NameNodeRpcServer implements NamenodeProtocols {
private class ServerSidePathCacheEntriesIterator
extends BatchedRemoteIterator<Long, PathCacheEntry> {
private final String pool;
private final Long poolId;
public ServerSidePathCacheEntriesIterator(Long firstKey,
int maxRepliesPerRequest, String pool) {
int maxRepliesPerRequest, Long poolId) {
super(firstKey, maxRepliesPerRequest);
this.pool = pool;
this.poolId = poolId;
}
@Override
public BatchedEntries<PathCacheEntry> makeRequest(
Long nextKey, int maxRepliesPerRequest) throws IOException {
Long prevKey, int maxRepliesPerRequest) throws IOException {
return new BatchedListEntries<PathCacheEntry>(
namesystem.listPathCacheEntries(nextKey, pool,
namesystem.listPathCacheEntries(prevKey, poolId,
maxRepliesPerRequest));
}
@Override
public Long elementToNextKey(PathCacheEntry entry) {
public Long elementToPrevKey(PathCacheEntry entry) {
return entry.getEntryId();
}
}
@Override
public RemoteIterator<PathCacheEntry> listPathCacheEntries(long prevId, String pool,
int maxReplies) throws IOException {
return new ServerSidePathCacheEntriesIterator(prevId, maxReplies, pool);
public RemoteIterator<PathCacheEntry> listPathCacheEntries(long prevId,
long poolId, int maxReplies) throws IOException {
return new ServerSidePathCacheEntriesIterator(prevId, maxReplies, poolId);
}
@Override
public void addCachePool(CachePoolInfo info) throws IOException {
namesystem.addCachePool(info);
public CachePool addCachePool(CachePoolInfo info) throws IOException {
return namesystem.addCachePool(info);
}
@Override
public void modifyCachePool(CachePoolInfo info) throws IOException {
namesystem.modifyCachePool(info);
public void modifyCachePool(long poolId, CachePoolInfo info)
throws IOException {
namesystem.modifyCachePool(poolId, info);
}
@Override
public void removeCachePool(String cachePoolName) throws IOException {
namesystem.removeCachePool(cachePoolName);
public void removeCachePool(long poolId) throws IOException {
namesystem.removeCachePool(poolId);
}
private class ServerSideCachePoolIterator
extends BatchedRemoteIterator<String, CachePoolInfo> {
extends BatchedRemoteIterator<Long, CachePool> {
public ServerSideCachePoolIterator(String prevKey, int maxRepliesPerRequest) {
super(prevKey, maxRepliesPerRequest);
public ServerSideCachePoolIterator(long prevId, int maxRepliesPerRequest) {
super(prevId, maxRepliesPerRequest);
}
@Override
public BatchedEntries<CachePoolInfo> makeRequest(String prevKey,
public BatchedEntries<CachePool> makeRequest(Long prevId,
int maxRepliesPerRequest) throws IOException {
return new BatchedListEntries<CachePoolInfo>(
namesystem.listCachePools(prevKey, maxRepliesPerRequest));
return new BatchedListEntries<CachePool>(
namesystem.listCachePools(prevId, maxRepliesPerRequest));
}
@Override
public String elementToNextKey(CachePoolInfo element) {
return element.getPoolName();
public Long elementToPrevKey(CachePool element) {
return element.getId();
}
}
@Override
public RemoteIterator<CachePoolInfo> listCachePools(String prevKey,
public RemoteIterator<CachePool> listCachePools(long prevPoolId,
int maxRepliesPerRequest) throws IOException {
return new ServerSideCachePoolIterator(prevKey, maxRepliesPerRequest);
return new ServerSideCachePoolIterator(prevPoolId, maxRepliesPerRequest);
}
}

View File

@ -363,9 +363,27 @@ message IsFileClosedResponseProto {
required bool result = 1;
}
message CachePoolInfoProto {
optional string poolName = 1;
optional string ownerName = 2;
optional string groupName = 3;
optional int32 mode = 4;
optional int32 weight = 5;
}
message CachePoolProto {
optional int64 id = 1;
optional CachePoolInfoProto info = 2;
}
message PathCacheDirectiveProto {
required string path = 1;
required string pool = 2;
required CachePoolProto pool = 2;
}
message PathCacheEntryProto {
required int64 id = 1;
optional PathCacheDirectiveProto directive = 2;
}
message AddPathCacheDirectivesRequestProto {
@ -399,53 +417,41 @@ enum RemovePathCacheEntryErrorProto {
}
message ListPathCacheEntriesRequestProto {
required int64 prevId = 1;
required string pool = 2;
required PathCacheEntryProto prevEntry = 1;
required CachePoolProto pool = 2;
optional int32 maxReplies = 3;
}
message ListPathCacheEntriesElementProto {
required int64 id = 1;
required string path = 2;
required string pool = 3;
}
message ListPathCacheEntriesResponseProto {
repeated ListPathCacheEntriesElementProto elements = 1;
repeated PathCacheEntryProto entries = 1;
required bool hasMore = 2;
}
message AddCachePoolRequestProto {
required string poolName = 1;
optional string ownerName = 2;
optional string groupName = 3;
optional int32 mode = 4;
optional int32 weight = 5;
required CachePoolInfoProto info = 1;
}
message AddCachePoolResponseProto { // void response
message AddCachePoolResponseProto {
required CachePoolProto pool = 1;
}
message ModifyCachePoolRequestProto {
required string poolName = 1;
optional string ownerName = 2;
optional string groupName = 3;
optional int32 mode = 4;
optional int32 weight = 5;
required CachePoolProto pool = 1;
required CachePoolInfoProto info = 2;
}
message ModifyCachePoolResponseProto { // void response
}
message RemoveCachePoolRequestProto {
required string poolName = 1;
required CachePoolProto pool = 1;
}
message RemoveCachePoolResponseProto { // void response
}
message ListCachePoolsRequestProto {
required string prevPoolName = 1;
required CachePoolProto prevPool = 1;
required int32 maxReplies = 2;
}
@ -455,11 +461,7 @@ message ListCachePoolsResponseProto {
}
message ListCachePoolsResponseElementProto {
required string poolName = 1;
required string ownerName = 2;
required string groupName = 3;
required int32 mode = 4;
required int32 weight = 5;
required CachePoolProto pool = 1;
}
message GetFileLinkInfoRequestProto {

View File

@ -92,6 +92,9 @@ public class TestFsDatasetCache {
@After
public void tearDown() throws Exception {
if (fs != null) {
fs.close();
}
if (cluster != null) {
cluster.shutdown();
}
@ -159,13 +162,11 @@ public class TestFsDatasetCache {
}
/**
* Blocks until cache usage changes from the current value, then verifies
* against the expected new value.
* Blocks until cache usage hits the expected new value.
*/
private long verifyExpectedCacheUsage(final long current,
final long expected) throws Exception {
private long verifyExpectedCacheUsage(final long expected) throws Exception {
long cacheUsed = fsd.getCacheUsed();
while (cacheUsed == current) {
while (cacheUsed != expected) {
cacheUsed = fsd.getCacheUsed();
Thread.sleep(100);
}
@ -202,13 +203,13 @@ public class TestFsDatasetCache {
// Cache each block in succession, checking each time
for (int i=0; i<NUM_BLOCKS; i++) {
setHeartbeatResponse(cacheBlock(locs[i]));
current = verifyExpectedCacheUsage(current, current + blockSizes[i]);
current = verifyExpectedCacheUsage(current + blockSizes[i]);
}
// Uncache each block in succession, again checking each time
for (int i=0; i<NUM_BLOCKS; i++) {
setHeartbeatResponse(uncacheBlock(locs[i]));
current = verifyExpectedCacheUsage(current, current - blockSizes[i]);
current = verifyExpectedCacheUsage(current - blockSizes[i]);
}
}
@ -237,7 +238,7 @@ public class TestFsDatasetCache {
long current = 0;
for (int i=0; i<numFiles-1; i++) {
setHeartbeatResponse(cacheBlocks(fileLocs[i]));
current = verifyExpectedCacheUsage(current, current + fileSizes[i]);
current = verifyExpectedCacheUsage(current + fileSizes[i]);
}
final long oldCurrent = current;
@ -262,7 +263,7 @@ public class TestFsDatasetCache {
// Uncache the n-1 files
for (int i=0; i<numFiles-1; i++) {
setHeartbeatResponse(uncacheBlocks(fileLocs[i]));
current = verifyExpectedCacheUsage(current, current - fileSizes[i]);
current = verifyExpectedCacheUsage(current - fileSizes[i]);
}
}
}

View File

@ -17,9 +17,10 @@
*/
package org.apache.hadoop.hdfs.server.namenode;
import static org.junit.Assert.*;
import static org.junit.Assert.assertEquals;
import java.io.IOException;
import java.security.PrivilegedExceptionAction;
import java.util.Arrays;
import java.util.List;
@ -29,53 +30,65 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.AddPathCacheDirectiveException.EmptyPathError;
import org.apache.hadoop.hdfs.protocol.AddPathCacheDirectiveException.InvalidPoolNameError;
import org.apache.hadoop.hdfs.protocol.AddPathCacheDirectiveException.InvalidPathNameError;
import org.apache.hadoop.hdfs.protocol.AddPathCacheDirectiveException.InvalidPoolError;
import org.apache.hadoop.hdfs.protocol.AddPathCacheDirectiveException.PoolWritePermissionDeniedError;
import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
import org.apache.hadoop.hdfs.protocol.RemovePathCacheEntryException.InvalidIdException;
import org.apache.hadoop.hdfs.protocol.PathCacheDirective;
import org.apache.hadoop.hdfs.protocol.PathCacheEntry;
import org.apache.hadoop.hdfs.protocol.RemovePathCacheEntryException.InvalidIdException;
import org.apache.hadoop.hdfs.protocol.RemovePathCacheEntryException.NoSuchIdException;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.Fallible;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
public class TestPathCacheRequests {
static final Log LOG = LogFactory.getLog(TestPathCacheRequests.class);
@Test
public void testCreateAndRemovePools() throws Exception {
Configuration conf = new HdfsConfiguration();
MiniDFSCluster cluster = null;
private static Configuration conf = new HdfsConfiguration();
private static MiniDFSCluster cluster = null;
private static NamenodeProtocols proto = null;
@Before
public void setUp() throws Exception {
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
cluster.waitActive();
NamenodeProtocols proto = cluster.getNameNodeRpc();
CachePoolInfo req = new CachePoolInfo("pool1").
setOwnerName("bob").setGroupName("bobgroup").
setMode(0755).setWeight(150);
proto.addCachePool(req);
try {
proto.removeCachePool("pool99");
Assert.fail("expected to get an exception when " +
"removing a non-existent pool.");
} catch (IOException ioe) {
GenericTestUtils.assertExceptionContains("can't remove " +
"nonexistent cache pool", ioe);
proto = cluster.getNameNodeRpc();
}
@After
public void tearDown() throws Exception {
if (cluster != null) {
cluster.shutdown();
}
proto.removeCachePool("pool1");
}
@Test
public void testCreateAndRemovePools() throws Exception {
CachePoolInfo req =
CachePoolInfo.newBuilder().setPoolName("pool1").setOwnerName("bob")
.setGroupName("bobgroup").setMode(new FsPermission((short) 0755))
.setWeight(150).build();
CachePool pool = proto.addCachePool(req);
try {
proto.removeCachePool("pool1");
proto.removeCachePool(909);
Assert.fail("expected to get an exception when " +
"removing a non-existent pool.");
} catch (IOException ioe) {
}
proto.removeCachePool(pool.getId());
try {
proto.removeCachePool(pool.getId());
Assert.fail("expected to get an exception when " +
"removing a non-existent pool.");
} catch (IOException ioe) {
GenericTestUtils.assertExceptionContains("can't remove " +
"nonexistent cache pool", ioe);
}
req = new CachePoolInfo("pool2");
proto.addCachePool(req);
@ -83,34 +96,42 @@ public class TestPathCacheRequests {
@Test
public void testCreateAndModifyPools() throws Exception {
Configuration conf = new HdfsConfiguration();
MiniDFSCluster cluster = null;
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
cluster.waitActive();
NamenodeProtocols proto = cluster.getNameNodeRpc();
proto.addCachePool(new CachePoolInfo("pool1").
setOwnerName("abc").setGroupName("123").
setMode(0755).setWeight(150));
proto.modifyCachePool(new CachePoolInfo("pool1").
setOwnerName("def").setGroupName("456"));
RemoteIterator<CachePoolInfo> iter = proto.listCachePools("", 1);
CachePoolInfo info = iter.next();
assertEquals("pool1", info.getPoolName());
assertEquals("def", info.getOwnerName());
assertEquals("456", info.getGroupName());
assertEquals(Integer.valueOf(0755), info.getMode());
assertEquals(Integer.valueOf(150), info.getWeight());
// Create a new pool
CachePoolInfo info = CachePoolInfo.newBuilder().
setPoolName("pool1").
setOwnerName("abc").
setGroupName("123").
setMode(new FsPermission((short)0755)).
setWeight(150).
build();
CachePool pool = proto.addCachePool(info);
CachePoolInfo actualInfo = pool.getInfo();
assertEquals("Expected info to match create time settings",
info, actualInfo);
// Modify the pool
info = CachePoolInfo.newBuilder().
setPoolName("pool2").
setOwnerName("def").
setGroupName("456").
setMode(new FsPermission((short)0644)).
setWeight(200).
build();
proto.modifyCachePool(pool.getId(), info);
// Check via listing this time
RemoteIterator<CachePool> iter = proto.listCachePools(0, 1);
CachePool listedPool = iter.next();
actualInfo = listedPool.getInfo();
assertEquals("Expected info to match modified settings", info, actualInfo);
try {
proto.removeCachePool("pool99");
proto.removeCachePool(808);
Assert.fail("expected to get an exception when " +
"removing a non-existent pool.");
} catch (IOException ioe) {
}
proto.removeCachePool("pool1");
proto.removeCachePool(pool.getId());
try {
proto.removeCachePool("pool1");
proto.removeCachePool(pool.getId());
Assert.fail("expected to get an exception when " +
"removing a non-existent pool.");
} catch (IOException ioe) {
@ -121,13 +142,13 @@ public class TestPathCacheRequests {
RemoteIterator<PathCacheEntry> iter,
long id0, long id1, long id2) throws Exception {
Assert.assertEquals(new PathCacheEntry(id0,
new PathCacheDirective("/alpha", "pool1")),
new PathCacheDirective("/alpha", 1)),
iter.next());
Assert.assertEquals(new PathCacheEntry(id1,
new PathCacheDirective("/beta", "pool2")),
new PathCacheDirective("/beta", 2)),
iter.next());
Assert.assertEquals(new PathCacheEntry(id2,
new PathCacheDirective("/gamma", "pool1")),
new PathCacheDirective("/gamma", 1)),
iter.next());
Assert.assertFalse(iter.hasNext());
}
@ -140,23 +161,36 @@ public class TestPathCacheRequests {
try {
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
cluster.waitActive();
NamenodeProtocols proto = cluster.getNameNodeRpc();
proto.addCachePool(new CachePoolInfo("pool1"));
proto.addCachePool(new CachePoolInfo("pool2"));
proto.addCachePool(new CachePoolInfo("pool3"));
proto.addCachePool(new CachePoolInfo("pool4").setMode(0));
List<Fallible<PathCacheEntry>> addResults1 =
proto.addPathCacheDirectives(Arrays.asList(
new PathCacheDirective[] {
new PathCacheDirective("/alpha", "pool1"),
new PathCacheDirective("/beta", "pool2"),
new PathCacheDirective("", "pool3"),
new PathCacheDirective("/zeta", "nonexistent_pool"),
new PathCacheDirective("/zeta", "pool4")
}));
final CachePool pool1 = proto.addCachePool(new CachePoolInfo("pool1"));
final CachePool pool2 = proto.addCachePool(new CachePoolInfo("pool2"));
final CachePool pool3 = proto.addCachePool(new CachePoolInfo("pool3"));
final CachePool pool4 = proto.addCachePool(CachePoolInfo.newBuilder()
.setPoolName("pool4")
.setMode(new FsPermission((short)0)).build());
UserGroupInformation testUgi = UserGroupInformation
.createUserForTesting("myuser", new String[]{"mygroup"});
List<Fallible<PathCacheEntry>> addResults1 = testUgi.doAs(
new PrivilegedExceptionAction<List<Fallible<PathCacheEntry>>>() {
@Override
public List<Fallible<PathCacheEntry>> run() throws IOException {
List<Fallible<PathCacheEntry>> entries;
entries = proto.addPathCacheDirectives(
Arrays.asList(new PathCacheDirective[] {
new PathCacheDirective("/alpha", pool1.getId()),
new PathCacheDirective("/beta", pool2.getId()),
new PathCacheDirective("", pool3.getId()),
new PathCacheDirective("/zeta", 404),
new PathCacheDirective("/zeta", pool4.getId())
}));
return entries;
}
});
// Save the successful additions
long ids1[] = new long[2];
ids1[0] = addResults1.get(0).get().getEntryId();
ids1[1] = addResults1.get(1).get().getEntryId();
for (int i=0; i<2; i++) {
ids1[i] = addResults1.get(i).get().getEntryId();
}
// Verify that the unsuccessful additions failed properly
try {
addResults1.get(2).get();
Assert.fail("expected an error when adding an empty path");
@ -167,7 +201,7 @@ public class TestPathCacheRequests {
addResults1.get(3).get();
Assert.fail("expected an error when adding to a nonexistent pool.");
} catch (IOException ioe) {
Assert.assertTrue(ioe.getCause() instanceof InvalidPoolNameError);
Assert.assertTrue(ioe.getCause() instanceof InvalidPoolError);
}
try {
addResults1.get(4).get();
@ -181,10 +215,10 @@ public class TestPathCacheRequests {
List<Fallible<PathCacheEntry>> addResults2 =
proto.addPathCacheDirectives(Arrays.asList(
new PathCacheDirective[] {
new PathCacheDirective("/alpha", "pool1"),
new PathCacheDirective("/theta", ""),
new PathCacheDirective("bogus", "pool1"),
new PathCacheDirective("/gamma", "pool1")
new PathCacheDirective("/alpha", pool1.getId()),
new PathCacheDirective("/theta", 404),
new PathCacheDirective("bogus", pool1.getId()),
new PathCacheDirective("/gamma", pool1.getId())
}));
long id = addResults2.get(0).get().getEntryId();
Assert.assertEquals("expected to get back the same ID as last time " +
@ -194,7 +228,7 @@ public class TestPathCacheRequests {
Assert.fail("expected an error when adding a path cache " +
"directive with an empty pool name.");
} catch (IOException ioe) {
Assert.assertTrue(ioe.getCause() instanceof InvalidPoolNameError);
Assert.assertTrue(ioe.getCause() instanceof InvalidPoolError);
}
try {
addResults2.get(2).get();
@ -206,14 +240,16 @@ public class TestPathCacheRequests {
long ids2[] = new long[1];
ids2[0] = addResults2.get(3).get().getEntryId();
// Validate listing all entries
RemoteIterator<PathCacheEntry> iter =
proto.listPathCacheEntries(0, "", 100);
proto.listPathCacheEntries(-1l, -1l, 100);
validateListAll(iter, ids1[0], ids1[1], ids2[0]);
iter = proto.listPathCacheEntries(0, "", 1);
iter = proto.listPathCacheEntries(-1l, -1l, 1);
validateListAll(iter, ids1[0], ids1[1], ids2[0]);
iter = proto.listPathCacheEntries(0, "pool3", 1);
// Validate listing certain pools
iter = proto.listPathCacheEntries(0, pool3.getId(), 1);
Assert.assertFalse(iter.hasNext());
iter = proto.listPathCacheEntries(0, "pool2", 4444);
iter = proto.listPathCacheEntries(0, pool2.getId(), 4444);
Assert.assertEquals(addResults1.get(1).get(),
iter.next());
Assert.assertFalse(iter.hasNext());
@ -235,7 +271,7 @@ public class TestPathCacheRequests {
} catch (IOException ioe) {
Assert.assertTrue(ioe.getCause() instanceof NoSuchIdException);
}
iter = proto.listPathCacheEntries(0, "pool2", 4444);
iter = proto.listPathCacheEntries(0, pool2.getId(), 4444);
Assert.assertFalse(iter.hasNext());
} finally {
if (cluster != null) { cluster.shutdown(); }