diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/BatchedRemoteIterator.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/BatchedRemoteIterator.java new file mode 100644 index 00000000000..4c682c6b18b --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/BatchedRemoteIterator.java @@ -0,0 +1,121 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.fs; + +import java.io.IOException; +import java.util.List; +import java.util.NoSuchElementException; + +/** + * A RemoteIterator that fetches elements in batches. + */ +public abstract class BatchedRemoteIterator implements RemoteIterator { + public interface BatchedEntries { + public E get(int i); + public int size(); + } + + public static class BatchedListEntries implements BatchedEntries { + private final List entries; + + public BatchedListEntries(List entries) { + this.entries = entries; + } + + public E get(int i) { + return entries.get(i); + + } + + public int size() { + return entries.size(); + } + } + + private K nextKey; + private final int maxRepliesPerRequest; + private BatchedEntries entries; + private int idx; + + public BatchedRemoteIterator(K nextKey, int maxRepliesPerRequest) { + this.nextKey = nextKey; + this.maxRepliesPerRequest = maxRepliesPerRequest; + this.entries = null; + this.idx = -1; + } + + /** + * Perform the actual remote request. + * + * @param key The key to send. + * @param maxRepliesPerRequest The maximum number of replies to allow. + * @return A list of replies. + */ + public abstract BatchedEntries makeRequest(K nextKey, int maxRepliesPerRequest) + throws IOException; + + private void makeRequest() throws IOException { + idx = 0; + entries = null; + entries = makeRequest(nextKey, maxRepliesPerRequest); + if (entries.size() > maxRepliesPerRequest) { + throw new IOException("invalid number of replies returned: got " + + entries.size() + ", expected " + maxRepliesPerRequest + + " at most."); + } + if (entries.size() == 0) { + entries = null; + } + } + + private void makeRequestIfNeeded() throws IOException { + if (idx == -1) { + makeRequest(); + } else if ((entries != null) && (idx >= entries.size())) { + if (entries.size() < maxRepliesPerRequest) { + // Last time, we got fewer entries than requested. + // So we should be at the end. + entries = null; + } else { + makeRequest(); + } + } + } + + @Override + public boolean hasNext() throws IOException { + makeRequestIfNeeded(); + return (entries != null); + } + + /** + * Return the next list key associated with an element. + */ + public abstract K elementToNextKey(E element); + + @Override + public E next() throws IOException { + makeRequestIfNeeded(); + if (entries == null) { + throw new NoSuchElementException(); + } + E entry = entries.get(idx++); + nextKey = elementToNextKey(entry); + return entry; + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-4949.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-4949.txt index 769996d46a8..acc949680f4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-4949.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-4949.txt @@ -21,6 +21,10 @@ HDFS-4949 (Unreleased) HDFS-5141. Add cache status information to datanode heartbeat. (Contributed by Andrew Wang) + HDFS-5121. Add RPCs for creating and manipulating cache pools. + (Contributed by Colin Patrick McCabe) + + OPTIMIZATIONS BUG FIXES diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/AddPathCacheDirectiveException.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/AddPathCacheDirectiveException.java index 3e0531c20c8..e162463d8d4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/AddPathCacheDirectiveException.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/AddPathCacheDirectiveException.java @@ -65,6 +65,16 @@ public InvalidPoolNameError(PathCacheDirective directive) { } } + public static class PoolWritePermissionDeniedError + extends AddPathCacheDirectiveException { + private static final long serialVersionUID = 1L; + + public PoolWritePermissionDeniedError(PathCacheDirective directive) { + super("write permission denied for pool '" + directive.getPool() + "'", + directive); + } + } + public static class UnexpectedAddPathCacheDirectiveException extends AddPathCacheDirectiveException { private static final long serialVersionUID = 1L; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/CachePoolInfo.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/CachePoolInfo.java new file mode 100644 index 00000000000..20006059a7a --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/CachePoolInfo.java @@ -0,0 +1,89 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs.protocol; + +import javax.annotation.Nullable; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; + +/** + * Information about a cache pool. + */ +@InterfaceAudience.Private +@InterfaceStability.Evolving +public class CachePoolInfo { + final String poolName; + + @Nullable + String ownerName; + + @Nullable + String groupName; + + @Nullable + Integer mode; + + @Nullable + Integer weight; + + public CachePoolInfo(String poolName) { + this.poolName = poolName; + } + + public String getPoolName() { + return poolName; + } + + public String getOwnerName() { + return ownerName; + } + + public CachePoolInfo setOwnerName(String ownerName) { + this.ownerName = ownerName; + return this; + } + + public String getGroupName() { + return groupName; + } + + public CachePoolInfo setGroupName(String groupName) { + this.groupName = groupName; + return this; + } + + public Integer getMode() { + return mode; + } + + public CachePoolInfo setMode(Integer mode) { + this.mode = mode; + return this; + } + + public Integer getWeight() { + return weight; + } + + public CachePoolInfo setWeight(Integer weight) { + this.weight = weight; + return this; + } +} \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java index 165d0673f47..f07c950d215 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java @@ -1144,5 +1144,53 @@ public List> removePathCacheEntries(List ids) @Idempotent public RemoteIterator listPathCacheEntries(long prevId, String pool, int maxRepliesPerRequest) throws IOException; -} + + /** + * Modify a cache pool. + * + * @param req + * The request to modify a cache pool. + * @throws IOException + * If the request could not be completed. + */ + @AtMostOnce + public void addCachePool(CachePoolInfo info) throws IOException; + /** + * Modify a cache pool. + * + * @param req + * The request to modify a cache pool. + * @throws IOException + * If the request could not be completed. + */ + @Idempotent + public void modifyCachePool(CachePoolInfo req) throws IOException; + + /** + * Remove a cache pool. + * + * @param cachePoolName + * Name of the cache pool to remove. + * @throws IOException + * if the cache pool did not exist, or could not be removed. + */ + @AtMostOnce + public void removeCachePool(String cachePoolName) throws IOException; + + /** + * List some cache pools. + * + * @param prevKey + * The previous key we listed. We will list keys greater than this. + * @param maxRepliesPerRequest + * Maximum number of cache pools to list. + * @return A remote iterator from which you can get CachePool objects. + * Requests will be made as needed. + * @throws IOException + * If there was an error listing cache pools. + */ + @Idempotent + public RemoteIterator listCachePools(String prevKey, + int maxRepliesPerRequest) throws IOException; +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/PathCacheDirective.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/PathCacheDirective.java index 8045186a6c2..8c6d742d4cd 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/PathCacheDirective.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/PathCacheDirective.java @@ -36,7 +36,7 @@ public class PathCacheDirective implements Comparable { private final String pool; - public PathCacheDirective(String path, String pool) throws IOException { + public PathCacheDirective(String path, String pool) { Preconditions.checkNotNull(path); Preconditions.checkNotNull(pool); this.path = path; @@ -67,10 +67,9 @@ public void validate() throws IOException { if (path.isEmpty()) { throw new EmptyPathError(this); } - if (DFSUtil.isValidName(path)) { + if (!DFSUtil.isValidName(path)) { throw new InvalidPathNameError(this); } - if (pool.isEmpty()) { throw new InvalidPoolNameError(this); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/RemovePathCacheEntryException.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/RemovePathCacheEntryException.java index 41f7269cdd1..04e88dfe6c9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/RemovePathCacheEntryException.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/RemovePathCacheEntryException.java @@ -47,6 +47,16 @@ public InvalidIdException(long entryId) { } } + public final static class RemovePermissionDeniedException + extends RemovePathCacheEntryException { + private static final long serialVersionUID = 1L; + + public RemovePermissionDeniedException(long entryId) { + super("permission denied when trying to remove path cache entry id " + + entryId, entryId); + } + } + public final static class NoSuchIdException extends RemovePathCacheEntryException { private static final long serialVersionUID = 1L; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java index c02bcecbe61..f9a5bfbc914 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java @@ -27,26 +27,29 @@ import org.apache.hadoop.fs.FsServerDefaults; import org.apache.hadoop.fs.Options.Rename; import org.apache.hadoop.fs.RemoteIterator; -import org.apache.hadoop.hdfs.protocol.PathCacheDirective; -import org.apache.hadoop.hdfs.protocol.PathCacheEntry; import org.apache.hadoop.hdfs.protocol.AddPathCacheDirectiveException.EmptyPathError; import org.apache.hadoop.hdfs.protocol.AddPathCacheDirectiveException.InvalidPathNameError; import org.apache.hadoop.hdfs.protocol.AddPathCacheDirectiveException.InvalidPoolNameError; -import org.apache.hadoop.hdfs.protocol.RemovePathCacheEntryException.InvalidIdException; -import org.apache.hadoop.hdfs.protocol.RemovePathCacheEntryException.NoSuchIdException; +import org.apache.hadoop.hdfs.protocol.CachePoolInfo; import org.apache.hadoop.hdfs.protocol.ClientProtocol; import org.apache.hadoop.hdfs.protocol.CorruptFileBlocks; import org.apache.hadoop.hdfs.protocol.DirectoryListing; import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlocks; +import org.apache.hadoop.hdfs.protocol.PathCacheDirective; +import org.apache.hadoop.hdfs.protocol.PathCacheEntry; +import org.apache.hadoop.hdfs.protocol.RemovePathCacheEntryException.InvalidIdException; +import org.apache.hadoop.hdfs.protocol.RemovePathCacheEntryException.NoSuchIdException; +import org.apache.hadoop.hdfs.protocol.RemovePathCacheEntryException.RemovePermissionDeniedException; import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport; import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.AbandonBlockRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.AbandonBlockResponseProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.AddBlockRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.AddBlockResponseProto; -import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.PathCacheDirectiveProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.AddCachePoolRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.AddCachePoolResponseProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.AddPathCacheDirectiveErrorProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.AddPathCacheDirectivesRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.AddPathCacheDirectivesResponseProto; @@ -77,8 +80,8 @@ import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetAdditionalDatanodeRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetAdditionalDatanodeResponseProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetBlockLocationsRequestProto; -import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetBlockLocationsResponseProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetBlockLocationsResponseProto.Builder; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetBlockLocationsResponseProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetContentSummaryRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetContentSummaryResponseProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetDataEncryptionKeyRequestProto; @@ -105,22 +108,30 @@ import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetSnapshottableDirListingResponseProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.IsFileClosedRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.IsFileClosedResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListCachePoolsRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListCachePoolsResponseElementProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListCachePoolsResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListCorruptFileBlocksRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListCorruptFileBlocksResponseProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListPathCacheEntriesElementProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListPathCacheEntriesRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListPathCacheEntriesResponseProto; -import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListCorruptFileBlocksRequestProto; -import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListCorruptFileBlocksResponseProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.MetaSaveRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.MetaSaveResponseProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.MkdirsRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.MkdirsResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ModifyCachePoolRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ModifyCachePoolResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.PathCacheDirectiveProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RecoverLeaseRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RecoverLeaseResponseProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RefreshNodesRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RefreshNodesResponseProto; -import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RemovePathCacheEntryErrorProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RemoveCachePoolRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RemoveCachePoolResponseProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RemovePathCacheEntriesRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RemovePathCacheEntriesResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RemovePathCacheEntryErrorProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.Rename2RequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.Rename2ResponseProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RenameRequestProto; @@ -160,6 +171,7 @@ import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.LocatedBlockProto; import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey; import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier; +import org.apache.hadoop.hdfs.server.namenode.CachePool; import org.apache.hadoop.hdfs.server.namenode.INodeId; import org.apache.hadoop.hdfs.server.namenode.UnsupportedActionException; import org.apache.hadoop.io.Text; @@ -1081,6 +1093,9 @@ public RemovePathCacheEntriesResponseProto removePathCacheEntries( } catch (NoSuchIdException ioe) { builder.addResults(RemovePathCacheEntryErrorProto. NO_SUCH_CACHED_PATH_ID_ERROR_VALUE); + } catch (RemovePermissionDeniedException ioe) { + builder.addResults(RemovePathCacheEntryErrorProto. + REMOVE_PERMISSION_DENIED_ERROR_VALUE); } catch (IOException ioe) { builder.addResults(RemovePathCacheEntryErrorProto. UNEXPECTED_REMOVE_ERROR_VALUE); @@ -1115,4 +1130,99 @@ public ListPathCacheEntriesResponseProto listPathCacheEntries(RpcController cont throw new ServiceException(e); } } + + @Override + public AddCachePoolResponseProto addCachePool(RpcController controller, + AddCachePoolRequestProto request) throws ServiceException { + try { + CachePoolInfo info = + new CachePoolInfo(request.getPoolName()); + if (request.hasOwnerName()) { + info.setOwnerName(request.getOwnerName()); + } + if (request.hasGroupName()) { + info.setGroupName(request.getGroupName()); + } + if (request.hasMode()) { + info.setMode(request.getMode()); + } + if (request.hasWeight()) { + info.setWeight(request.getWeight()); + } + server.addCachePool(info); + return AddCachePoolResponseProto.newBuilder().build(); + } catch (IOException e) { + throw new ServiceException(e); + } + } + + @Override + public ModifyCachePoolResponseProto modifyCachePool(RpcController controller, + ModifyCachePoolRequestProto request) throws ServiceException { + try { + CachePoolInfo info = + new CachePoolInfo(request.getPoolName()); + if (request.hasOwnerName()) { + info.setOwnerName(request.getOwnerName()); + } + if (request.hasGroupName()) { + info.setGroupName(request.getGroupName()); + } + if (request.hasMode()) { + info.setMode(request.getMode()); + } + if (request.hasWeight()) { + info.setWeight(request.getWeight()); + } + server.modifyCachePool(info); + return ModifyCachePoolResponseProto.newBuilder().build(); + } catch (IOException e) { + throw new ServiceException(e); + } + } + + @Override + public RemoveCachePoolResponseProto removeCachePool(RpcController controller, + RemoveCachePoolRequestProto request) throws ServiceException { + try { + server.removeCachePool(request.getPoolName()); + return RemoveCachePoolResponseProto.newBuilder().build(); + } catch (IOException e) { + throw new ServiceException(e); + } + } + + @Override + public ListCachePoolsResponseProto listCachePools(RpcController controller, + ListCachePoolsRequestProto request) throws ServiceException { + try { + RemoteIterator iter = + server.listCachePools(request.getPrevPoolName(), + request.getMaxReplies()); + ListCachePoolsResponseProto.Builder responseBuilder = + ListCachePoolsResponseProto.newBuilder(); + while (iter.hasNext()) { + CachePoolInfo pool = iter.next(); + ListCachePoolsResponseElementProto.Builder elemBuilder = + ListCachePoolsResponseElementProto.newBuilder(); + elemBuilder.setPoolName(pool.getPoolName()); + if (pool.getOwnerName() != null) { + elemBuilder.setOwnerName(pool.getOwnerName()); + } + if (pool.getGroupName() != null) { + elemBuilder.setGroupName(pool.getGroupName()); + } + if (pool.getMode() != null) { + elemBuilder.setMode(pool.getMode()); + } + if (pool.getWeight() != null) { + elemBuilder.setWeight(pool.getWeight()); + } + responseBuilder.addElements(elemBuilder.build()); + } + return responseBuilder.build(); + } catch (IOException e) { + throw new ServiceException(e); + } + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java index eb9845e849b..4b8687e1d99 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java @@ -27,6 +27,8 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.fs.BatchedRemoteIterator; +import org.apache.hadoop.fs.BatchedRemoteIterator.BatchedEntries; import org.apache.hadoop.fs.ContentSummary; import org.apache.hadoop.fs.CreateFlag; import org.apache.hadoop.fs.FileAlreadyExistsException; @@ -45,6 +47,7 @@ import org.apache.hadoop.hdfs.protocol.AddPathCacheDirectiveException.UnexpectedAddPathCacheDirectiveException; import org.apache.hadoop.hdfs.protocol.RemovePathCacheEntryException.InvalidIdException; import org.apache.hadoop.hdfs.protocol.RemovePathCacheEntryException.NoSuchIdException; +import org.apache.hadoop.hdfs.protocol.RemovePathCacheEntryException.RemovePermissionDeniedException; import org.apache.hadoop.hdfs.protocol.RemovePathCacheEntryException.UnexpectedRemovePathCacheEntryException; import org.apache.hadoop.hdfs.protocol.ClientProtocol; import org.apache.hadoop.hdfs.protocol.CorruptFileBlocks; @@ -58,11 +61,13 @@ import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlocks; +import org.apache.hadoop.hdfs.protocol.CachePoolInfo; import org.apache.hadoop.hdfs.protocol.NSQuotaExceededException; import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport; import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.AbandonBlockRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.AddBlockRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.AddCachePoolRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.PathCacheDirectiveProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.AddPathCacheDirectiveErrorProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.AddPathCacheDirectivesRequestProto; @@ -108,14 +113,19 @@ import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListPathCacheEntriesRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListPathCacheEntriesRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListPathCacheEntriesResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListCachePoolsRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListCachePoolsResponseElementProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListCachePoolsResponseProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListCorruptFileBlocksRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.MetaSaveRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.MkdirsRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ModifyCachePoolRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RecoverLeaseRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RefreshNodesRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RemovePathCacheEntriesRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RemovePathCacheEntriesResponseProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RemovePathCacheEntryErrorProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RemoveCachePoolRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.Rename2RequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RenameRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RenameSnapshotRequestProto; @@ -1064,6 +1074,9 @@ private static IOException removePathCacheEntriesError(long code, long id) { } else if (code == RemovePathCacheEntryErrorProto. NO_SUCH_CACHED_PATH_ID_ERROR_VALUE) { return new NoSuchIdException(id); + } else if (code == RemovePathCacheEntryErrorProto. + REMOVE_PERMISSION_DENIED_ERROR_VALUE) { + return new RemovePermissionDeniedException(id); } else { return new UnexpectedRemovePathCacheEntryException(id); } @@ -1098,32 +1111,49 @@ public List> removePathCacheEntries(List ids) } } - private class PathCacheEntriesIterator - implements RemoteIterator { - private long prevId; - private final String pool; - private final int repliesPerRequest; + private static class BatchedPathCacheEntries + implements BatchedEntries { private ListPathCacheEntriesResponseProto response; - private int idx; - public PathCacheEntriesIterator(long prevId, String pool, - int repliesPerRequest) { - this.prevId = prevId; - this.pool = pool; - this.repliesPerRequest = repliesPerRequest; - this.response = null; - this.idx = -1; + BatchedPathCacheEntries(ListPathCacheEntriesResponseProto response) { + this.response = response; } - private void makeRequest() throws IOException { - idx = 0; - response = null; + @Override + public PathCacheEntry get(int i) { + ListPathCacheEntriesElementProto elementProto = + response.getElements(i); + return new PathCacheEntry(elementProto.getId(), + new PathCacheDirective(elementProto.getPath(), + elementProto.getPool())); + } + + @Override + public int size() { + return response.getElementsCount(); + } + } + + private class PathCacheEntriesIterator + extends BatchedRemoteIterator { + private final String pool; + + public PathCacheEntriesIterator(long prevKey, int maxRepliesPerRequest, + String pool) { + super(prevKey, maxRepliesPerRequest); + this.pool = pool; + } + + @Override + public BatchedEntries makeRequest( + Long nextKey, int maxRepliesPerRequest) throws IOException { + ListPathCacheEntriesResponseProto response; try { ListPathCacheEntriesRequestProto req = ListPathCacheEntriesRequestProto.newBuilder(). - setPrevId(prevId). + setPrevId(nextKey). setPool(pool). - setMaxReplies(repliesPerRequest). + setMaxReplies(maxRepliesPerRequest). build(); response = rpcProxy.listPathCacheEntries(null, req); if (response.getElementsCount() == 0) { @@ -1132,45 +1162,134 @@ private void makeRequest() throws IOException { } catch (ServiceException e) { throw ProtobufHelper.getRemoteException(e); } - } - - private void makeRequestIfNeeded() throws IOException { - if (idx == -1) { - makeRequest(); - } else if ((response != null) && (idx >= response.getElementsCount())) { - if (response.getHasMore()) { - makeRequest(); - } else { - response = null; - } - } + return new BatchedPathCacheEntries(response); } @Override - public boolean hasNext() throws IOException { - makeRequestIfNeeded(); - return (response != null); - } - - @Override - public PathCacheEntry next() throws IOException { - makeRequestIfNeeded(); - if (response == null) { - throw new NoSuchElementException(); - } - ListPathCacheEntriesElementProto elementProto = - response.getElements(idx); - prevId = elementProto.getId(); - idx++; - return new PathCacheEntry(elementProto.getId(), - new PathCacheDirective(elementProto.getPath(), - elementProto.getPool())); + public Long elementToNextKey(PathCacheEntry element) { + return element.getEntryId(); } } @Override public RemoteIterator listPathCacheEntries(long prevId, String pool, int repliesPerRequest) throws IOException { - return new PathCacheEntriesIterator(prevId, pool, repliesPerRequest); + return new PathCacheEntriesIterator(prevId, repliesPerRequest, pool); + } + + @Override + public void addCachePool(CachePoolInfo info) throws IOException { + AddCachePoolRequestProto.Builder builder = + AddCachePoolRequestProto.newBuilder(); + builder.setPoolName(info.getPoolName()); + if (info.getOwnerName() != null) { + builder.setOwnerName(info.getOwnerName()); + } + if (info.getGroupName() != null) { + builder.setGroupName(info.getGroupName()); + } + if (info.getMode() != null) { + builder.setMode(info.getMode()); + } + if (info.getWeight() != null) { + builder.setWeight(info.getWeight()); + } + try { + rpcProxy.addCachePool(null, builder.build()); + } catch (ServiceException e) { + throw ProtobufHelper.getRemoteException(e); + } + } + + @Override + public void modifyCachePool(CachePoolInfo req) throws IOException { + ModifyCachePoolRequestProto.Builder builder = + ModifyCachePoolRequestProto.newBuilder(); + builder.setPoolName(req.getPoolName()); + if (req.getOwnerName() != null) { + builder.setOwnerName(req.getOwnerName()); + } + if (req.getGroupName() != null) { + builder.setGroupName(req.getGroupName()); + } + if (req.getMode() != null) { + builder.setMode(req.getMode()); + } + if (req.getWeight() != null) { + builder.setWeight(req.getWeight()); + } + try { + rpcProxy.modifyCachePool(null, builder.build()); + } catch (ServiceException e) { + throw ProtobufHelper.getRemoteException(e); + } + } + + @Override + public void removeCachePool(String cachePoolName) throws IOException { + try { + rpcProxy.removeCachePool(null, + RemoveCachePoolRequestProto.newBuilder(). + setPoolName(cachePoolName).build()); + } catch (ServiceException e) { + throw ProtobufHelper.getRemoteException(e); + } + } + + private static class BatchedPathDirectiveEntries + implements BatchedEntries { + private final ListCachePoolsResponseProto proto; + + public BatchedPathDirectiveEntries(ListCachePoolsResponseProto proto) { + this.proto = proto; + } + + @Override + public CachePoolInfo get(int i) { + ListCachePoolsResponseElementProto elem = proto.getElements(i); + return new CachePoolInfo(elem.getPoolName()). + setOwnerName(elem.getOwnerName()). + setGroupName(elem.getGroupName()). + setMode(elem.getMode()). + setWeight(elem.getWeight()); + } + + @Override + public int size() { + return proto.getElementsCount(); + } + } + + private class CachePoolIterator + extends BatchedRemoteIterator { + + public CachePoolIterator(String prevKey, int maxRepliesPerRequest) { + super(prevKey, maxRepliesPerRequest); + } + + @Override + public BatchedEntries makeRequest(String prevKey, + int maxRepliesPerRequest) throws IOException { + try { + return new BatchedPathDirectiveEntries( + rpcProxy.listCachePools(null, + ListCachePoolsRequestProto.newBuilder(). + setPrevPoolName(prevKey). + setMaxReplies(maxRepliesPerRequest).build())); + } catch (ServiceException e) { + throw ProtobufHelper.getRemoteException(e); + } + } + + @Override + public String elementToNextKey(CachePoolInfo element) { + return element.getPoolName(); + } + } + + @Override + public RemoteIterator listCachePools(String prevKey, + int maxRepliesPerRequest) throws IOException { + return new CachePoolIterator(prevKey, maxRepliesPerRequest); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CacheManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CacheManager.java index 8be575a9701..06475802c61 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CacheManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CacheManager.java @@ -27,12 +27,17 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException; +import org.apache.hadoop.hdfs.protocol.CachePoolInfo; import org.apache.hadoop.hdfs.protocol.PathCacheDirective; import org.apache.hadoop.hdfs.protocol.PathCacheEntry; +import org.apache.hadoop.hdfs.protocol.AddPathCacheDirectiveException.InvalidPoolNameError; import org.apache.hadoop.hdfs.protocol.AddPathCacheDirectiveException.UnexpectedAddPathCacheDirectiveException; +import org.apache.hadoop.hdfs.protocol.AddPathCacheDirectiveException.PoolWritePermissionDeniedError; import org.apache.hadoop.hdfs.protocol.RemovePathCacheEntryException.InvalidIdException; import org.apache.hadoop.hdfs.protocol.RemovePathCacheEntryException.NoSuchIdException; import org.apache.hadoop.hdfs.protocol.RemovePathCacheEntryException.UnexpectedRemovePathCacheEntryException; +import org.apache.hadoop.hdfs.protocol.RemovePathCacheEntryException.RemovePermissionDeniedException; import org.apache.hadoop.util.Fallible; /** @@ -56,6 +61,12 @@ final class CacheManager { private final TreeMap entriesByDirective = new TreeMap(); + /** + * Cache pools, sorted by name. + */ + private final TreeMap cachePools = + new TreeMap(); + /** * The entry ID to use for a new entry. */ @@ -80,16 +91,31 @@ synchronized long getNextEntryId() throws IOException { } private synchronized Fallible addDirective( - PathCacheDirective directive) { + PathCacheDirective directive, FSPermissionChecker pc) { + CachePool pool = cachePools.get(directive.getPool()); + if (pool == null) { + LOG.info("addDirective " + directive + ": pool not found."); + return new Fallible( + new InvalidPoolNameError(directive)); + } + if (!pc.checkWritePermission(pool.getOwnerName(), + pool.getGroupName(), pool.getMode())) { + LOG.info("addDirective " + directive + ": write permission denied."); + return new Fallible( + new PoolWritePermissionDeniedError(directive)); + } try { directive.validate(); } catch (IOException ioe) { + LOG.info("addDirective " + directive + ": validation failed."); return new Fallible(ioe); } // Check if we already have this entry. PathCacheEntry existing = entriesByDirective.get(directive); if (existing != null) { // Entry already exists: return existing entry. + LOG.info("addDirective " + directive + ": there is an " + + "existing directive " + existing); return new Fallible(existing); } // Add a new entry with the next available ID. @@ -100,33 +126,57 @@ private synchronized Fallible addDirective( return new Fallible( new UnexpectedAddPathCacheDirectiveException(directive)); } + LOG.info("addDirective " + directive + ": added cache directive " + + directive); entriesByDirective.put(directive, entry); entriesById.put(entry.getEntryId(), entry); return new Fallible(entry); } public synchronized List> addDirectives( - List directives) { + List directives, FSPermissionChecker pc) { ArrayList> results = new ArrayList>(directives.size()); for (PathCacheDirective directive: directives) { - results.add(addDirective(directive)); + results.add(addDirective(directive, pc)); } return results; } - private synchronized Fallible removeEntry(long entryId) { + private synchronized Fallible removeEntry(long entryId, + FSPermissionChecker pc) { // Check for invalid IDs. if (entryId <= 0) { + LOG.info("removeEntry " + entryId + ": invalid non-positive entry ID."); return new Fallible(new InvalidIdException(entryId)); } // Find the entry. PathCacheEntry existing = entriesById.get(entryId); if (existing == null) { + LOG.info("removeEntry " + entryId + ": entry not found."); return new Fallible(new NoSuchIdException(entryId)); } + CachePool pool = cachePools.get(existing.getDirective().getPool()); + if (pool == null) { + LOG.info("removeEntry " + entryId + ": pool not found for directive " + + existing.getDirective()); + return new Fallible( + new UnexpectedRemovePathCacheEntryException(entryId)); + } + if (!pc.isSuperUser()) { + if (!pc.checkWritePermission(pool.getOwnerName(), + pool.getGroupName(), pool.getMode())) { + LOG.info("removeEntry " + entryId + ": write permission denied to " + + "pool " + pool + " for entry " + existing); + return new Fallible( + new RemovePermissionDeniedException(entryId)); + } + } + // Remove the corresponding entry in entriesByDirective. if (entriesByDirective.remove(existing.getDirective()) == null) { + LOG.warn("removeEntry " + entryId + ": failed to find existing entry " + + existing + " in entriesByDirective"); return new Fallible( new UnexpectedRemovePathCacheEntryException(entryId)); } @@ -134,11 +184,12 @@ private synchronized Fallible removeEntry(long entryId) { return new Fallible(entryId); } - public synchronized List> removeEntries(List entryIds) { + public synchronized List> removeEntries(List entryIds, + FSPermissionChecker pc) { ArrayList> results = new ArrayList>(entryIds.size()); for (Long entryId : entryIds) { - results.add(removeEntry(entryId)); + results.add(removeEntry(entryId, pc)); } return results; } @@ -162,4 +213,109 @@ public synchronized List listPathCacheEntries(long prevId, } return replies; } + + /** + * Create a cache pool. + * + * Only the superuser should be able to call this function. + * + * @param info + * The info for the cache pool to create. + */ + public synchronized void addCachePool(CachePoolInfo info) + throws IOException { + String poolName = info.getPoolName(); + if (poolName.isEmpty()) { + throw new IOException("invalid empty cache pool name"); + } + CachePool pool = cachePools.get(poolName); + if (pool != null) { + throw new IOException("cache pool " + poolName + " already exists."); + } + CachePool cachePool = new CachePool(poolName, + info.getOwnerName(), info.getGroupName(), info.getMode(), + info.getWeight()); + cachePools.put(poolName, cachePool); + LOG.info("created new cache pool " + cachePool); + } + + /** + * Modify a cache pool. + * + * Only the superuser should be able to call this function. + * + * @param info + * The info for the cache pool to modify. + */ + public synchronized void modifyCachePool(CachePoolInfo info) + throws IOException { + String poolName = info.getPoolName(); + if (poolName.isEmpty()) { + throw new IOException("invalid empty cache pool name"); + } + CachePool pool = cachePools.get(poolName); + if (pool == null) { + throw new IOException("cache pool " + poolName + " does not exist."); + } + StringBuilder bld = new StringBuilder(); + String prefix = ""; + if (info.getOwnerName() != null) { + pool.setOwnerName(info.getOwnerName()); + bld.append(prefix). + append("set owner to ").append(info.getOwnerName()); + prefix = "; "; + } + if (info.getGroupName() != null) { + pool.setGroupName(info.getGroupName()); + bld.append(prefix). + append("set group to ").append(info.getGroupName()); + prefix = "; "; + } + if (info.getMode() != null) { + pool.setMode(info.getMode()); + bld.append(prefix). + append(String.format("set mode to 0%3o", info.getMode())); + prefix = "; "; + } + if (info.getWeight() != null) { + pool.setWeight(info.getWeight()); + bld.append(prefix). + append("set weight to ").append(info.getWeight()); + prefix = "; "; + } + if (prefix.isEmpty()) { + bld.append("no changes."); + } + LOG.info("modified " + poolName + "; " + bld.toString()); + } + + /** + * Remove a cache pool. + * + * Only the superuser should be able to call this function. + * + * @param poolName + * The name for the cache pool to remove. + */ + public synchronized void removeCachePool(String poolName) + throws IOException { + CachePool pool = cachePools.remove(poolName); + if (pool == null) { + throw new IOException("can't remove nonexistent cache pool " + poolName); + } + } + + public synchronized List + listCachePools(FSPermissionChecker pc, String prevKey, + int maxRepliesPerRequest) { + final int MAX_PREALLOCATED_REPLIES = 16; + ArrayList results = + new ArrayList(Math.min(MAX_PREALLOCATED_REPLIES, + maxRepliesPerRequest)); + SortedMap tailMap = cachePools.tailMap(prevKey, false); + for (Entry cur : tailMap.entrySet()) { + results.add(cur.getValue().getInfo(pc)); + } + return results; + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CachePool.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CachePool.java new file mode 100644 index 00000000000..8a8f30b8121 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CachePool.java @@ -0,0 +1,141 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.namenode; + +import java.io.IOException; + +import javax.annotation.Nonnull; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hdfs.protocol.CachePoolInfo; +import org.apache.hadoop.security.UserGroupInformation; + +/** + * The NameNode uses CachePools to manage cache resources on the DataNodes. + */ +public final class CachePool { + public static final Log LOG = LogFactory.getLog(CachePool.class); + + @Nonnull + private final String poolName; + + @Nonnull + private String ownerName; + + @Nonnull + private String groupName; + + private int mode; + + private int weight; + + public static String getCurrentUserPrimaryGroupName() throws IOException { + UserGroupInformation ugi= NameNode.getRemoteUser(); + String[] groups = ugi.getGroupNames(); + if (groups.length == 0) { + throw new IOException("failed to get group names from UGI " + ugi); + } + return groups[0]; + } + + public CachePool(String poolName, String ownerName, String groupName, + Integer mode, Integer weight) throws IOException { + this.poolName = poolName; + this.ownerName = ownerName != null ? ownerName : + NameNode.getRemoteUser().getShortUserName(); + this.groupName = groupName != null ? groupName : + getCurrentUserPrimaryGroupName(); + this.mode = mode != null ? mode : 0644; + this.weight = weight != null ? weight : 100; + } + + public String getName() { + return poolName; + } + + public String getOwnerName() { + return ownerName; + } + + public CachePool setOwnerName(String ownerName) { + this.ownerName = ownerName; + return this; + } + + public String getGroupName() { + return groupName; + } + + public CachePool setGroupName(String groupName) { + this.groupName = groupName; + return this; + } + + public int getMode() { + return mode; + } + + public CachePool setMode(int mode) { + this.mode = mode; + return this; + } + + public int getWeight() { + return weight; + } + + public CachePool setWeight(int weight) { + this.weight = weight; + return this; + } + + /** + * Get information about this cache pool. + * + * @param fullInfo + * If true, only the name will be returned (i.e., what you + * would get if you didn't have read permission for this pool.) + * @return + * Cache pool information. + */ + public CachePoolInfo getInfo(boolean fullInfo) { + CachePoolInfo info = new CachePoolInfo(poolName); + if (!fullInfo) { + return info; + } + return info.setOwnerName(ownerName). + setGroupName(groupName). + setMode(mode). + setWeight(weight); + } + + public CachePoolInfo getInfo(FSPermissionChecker pc) { + return getInfo(pc.checkReadPermission(ownerName, groupName, mode)); + } + + public String toString() { + return new StringBuilder(). + append("{ ").append("poolName:").append(poolName). + append(", ownerName:").append(ownerName). + append(", groupName:").append(groupName). + append(", mode:").append(String.format("%3o", mode)). + append(", weight:").append(weight). + append(" }").toString(); + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java index 9e817629d10..ca287ab7dcb 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java @@ -155,6 +155,7 @@ import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlocks; +import org.apache.hadoop.hdfs.protocol.CachePoolInfo; import org.apache.hadoop.hdfs.protocol.QuotaExceededException; import org.apache.hadoop.hdfs.protocol.RecoveryInProgressException; import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport; @@ -6700,6 +6701,7 @@ void deleteSnapshot(String snapshotRoot, String snapshotName) return; // Return previous response } boolean success = false; + checkOperation(OperationCategory.WRITE); writeLock(); try { checkOperation(OperationCategory.WRITE); @@ -6748,17 +6750,198 @@ void removeSnapshottableDirs(List toRemove) { } List> addPathCacheDirectives( - List directives) { - return cacheManager.addDirectives(directives); + List directives) throws IOException { + CacheEntryWithPayload retryCacheEntry = + RetryCache.waitForCompletion(retryCache, null); + if (retryCacheEntry != null && retryCacheEntry.isSuccess()) { + return (List>) retryCacheEntry.getPayload(); + } + final FSPermissionChecker pc = getPermissionChecker(); + boolean success = false; + List> results = null; + checkOperation(OperationCategory.WRITE); + writeLock(); + try { + checkOperation(OperationCategory.WRITE); + if (isInSafeMode()) { + throw new SafeModeException( + "Cannot add path cache directive", safeMode); + } + results = cacheManager.addDirectives(directives, pc); + //getEditLog().logAddPathCacheDirectives(results); FIXME: HDFS-5119 + success = true; + } finally { + writeUnlock(); + if (success) { + getEditLog().logSync(); + } + if (isAuditEnabled() && isExternalInvocation()) { + logAuditEvent(success, "addPathCacheDirectives", null, null, null); + } + RetryCache.setState(retryCacheEntry, success, results); + } + return results; } - List> removePathCacheEntries(List ids) { - return cacheManager.removeEntries(ids); + List> removePathCacheEntries(List ids) throws IOException { + CacheEntryWithPayload retryCacheEntry = + RetryCache.waitForCompletion(retryCache, null); + if (retryCacheEntry != null && retryCacheEntry.isSuccess()) { + return (List>) retryCacheEntry.getPayload(); + } + final FSPermissionChecker pc = getPermissionChecker(); + boolean success = false; + List> results = null; + checkOperation(OperationCategory.WRITE); + writeLock(); + try { + checkOperation(OperationCategory.WRITE); + if (isInSafeMode()) { + throw new SafeModeException( + "Cannot remove path cache directives", safeMode); + } + results = cacheManager.removeEntries(ids, pc); + //getEditLog().logRemovePathCacheEntries(results); FIXME: HDFS-5119 + success = true; + } finally { + writeUnlock(); + if (isAuditEnabled() && isExternalInvocation()) { + logAuditEvent(success, "removePathCacheEntries", null, null, null); + } + RetryCache.setState(retryCacheEntry, success, results); + } + getEditLog().logSync(); + return results; } List listPathCacheEntries(long startId, String pool, - int maxReplies) { - return cacheManager.listPathCacheEntries(startId, pool, maxReplies); + int maxReplies) throws IOException { + checkOperation(OperationCategory.READ); + readLock(); + try { + checkOperation(OperationCategory.READ); + return cacheManager.listPathCacheEntries(startId, pool, maxReplies); + } finally { + readUnlock(); + } + } + + public void addCachePool(CachePoolInfo req) throws IOException { + final FSPermissionChecker pc = getPermissionChecker(); + CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache); + if (cacheEntry != null && cacheEntry.isSuccess()) { + return; // Return previous response + } + checkOperation(OperationCategory.WRITE); + writeLock(); + boolean success = false; + try { + checkOperation(OperationCategory.WRITE); + if (!pc.isSuperUser()) { + throw new AccessControlException("Non-super users cannot " + + "add cache pools."); + } + if (isInSafeMode()) { + throw new SafeModeException( + "Cannot add cache pool " + req.getPoolName(), safeMode); + } + cacheManager.addCachePool(req); + //getEditLog().logAddCachePool(req); // FIXME: HDFS-5119 + success = true; + } finally { + writeUnlock(); + if (isAuditEnabled() && isExternalInvocation()) { + logAuditEvent(success, "addCachePool", req.getPoolName(), null, null); + } + RetryCache.setState(cacheEntry, success); + } + + getEditLog().logSync(); + } + + public void modifyCachePool(CachePoolInfo req) throws IOException { + final FSPermissionChecker pc = getPermissionChecker(); + CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache); + if (cacheEntry != null && cacheEntry.isSuccess()) { + return; // Return previous response + } + checkOperation(OperationCategory.WRITE); + writeLock(); + boolean success = false; + try { + checkOperation(OperationCategory.WRITE); + if (!pc.isSuperUser()) { + throw new AccessControlException("Non-super users cannot " + + "modify cache pools."); + } + if (isInSafeMode()) { + throw new SafeModeException( + "Cannot modify cache pool " + req.getPoolName(), safeMode); + } + cacheManager.modifyCachePool(req); + //getEditLog().logModifyCachePool(req); // FIXME: HDFS-5119 + success = true; + } finally { + writeUnlock(); + if (isAuditEnabled() && isExternalInvocation()) { + logAuditEvent(success, "modifyCachePool", req.getPoolName(), null, null); + } + RetryCache.setState(cacheEntry, success); + } + + getEditLog().logSync(); + } + + public void removeCachePool(String cachePoolName) throws IOException { + final FSPermissionChecker pc = getPermissionChecker(); + CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache); + if (cacheEntry != null && cacheEntry.isSuccess()) { + return; // Return previous response + } + checkOperation(OperationCategory.WRITE); + writeLock(); + boolean success = false; + try { + checkOperation(OperationCategory.WRITE); + if (!pc.isSuperUser()) { + throw new AccessControlException("Non-super users cannot " + + "remove cache pools."); + } + if (isInSafeMode()) { + throw new SafeModeException( + "Cannot remove cache pool " + cachePoolName, safeMode); + } + cacheManager.removeCachePool(cachePoolName); + //getEditLog().logRemoveCachePool(req); // FIXME: HDFS-5119 + success = true; + } finally { + writeUnlock(); + if (isAuditEnabled() && isExternalInvocation()) { + logAuditEvent(success, "removeCachePool", cachePoolName, null, null); + } + RetryCache.setState(cacheEntry, success); + } + + getEditLog().logSync(); + } + + public List listCachePools(String prevKey, + int maxRepliesPerRequest) throws IOException { + final FSPermissionChecker pc = getPermissionChecker(); + List results; + checkOperation(OperationCategory.READ); + readLock(); + try { + checkOperation(OperationCategory.READ); + results = cacheManager.listCachePools(pc, prevKey, maxRepliesPerRequest); + } finally { + readUnlock(); + } + return results; + } + + public CacheManager getCacheManager() { + return cacheManager; } /** @@ -6798,8 +6981,4 @@ public void logAuditEvent(boolean succeeded, String userName, } } } - - public CacheManager getCacheManager() { - return cacheManager; - } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSPermissionChecker.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSPermissionChecker.java index a02bc4044de..c516a73e57f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSPermissionChecker.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSPermissionChecker.java @@ -255,4 +255,40 @@ private void checkStickyBit(INode parent, INode inode, Snapshot snapshot throw new AccessControlException("Permission denied by sticky bit setting:" + " user=" + user + ", inode=" + inode); } + + /** + * Check if this CachePool can be accessed. + * + * @param pc + * Permission checker object with user name and groups. + * @param write + * True if we care about write access; false otherwise. + * @return + * True only if the cache pool is accessible. + */ + private boolean checkPermission(String userName, + String groupName, int mode, int mask) { + if ((mode & mask) != 0) { + return true; + } + if (((mode & (mask << 6)) != 0) + && (getUser().equals(userName))) { + return true; + } + if (((mode & (mask << 6)) != 0) + && (containsGroup(groupName))) { + return true; + } + return false; + } + + public boolean checkWritePermission(String userName, + String groupName, int mode) { + return checkPermission(userName, groupName, mode, 02); + } + + public boolean checkReadPermission(String userName, + String groupName, int mode) { + return checkPermission(userName, groupName, mode, 04); + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java index b96df2a6aa0..9eb09bb43af 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java @@ -36,6 +36,8 @@ import org.apache.commons.logging.Log; import org.apache.hadoop.HadoopIllegalArgumentException; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.BatchedRemoteIterator; +import org.apache.hadoop.fs.BatchedRemoteIterator.BatchedEntries; import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.hadoop.fs.ContentSummary; import org.apache.hadoop.fs.CreateFlag; @@ -62,6 +64,7 @@ import org.apache.hadoop.hdfs.protocol.BlockListAsLongs; import org.apache.hadoop.hdfs.protocol.PathCacheDirective; import org.apache.hadoop.hdfs.protocol.PathCacheEntry; +import org.apache.hadoop.hdfs.protocol.CachePoolInfo; import org.apache.hadoop.hdfs.protocol.CorruptFileBlocks; import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; @@ -1219,68 +1222,75 @@ public List> removePathCacheEntries(List ids) return namesystem.removePathCacheEntries(ids); } - private class PathCacheEntriesIterator - implements RemoteIterator { - private long prevId; + private class ServerSidePathCacheEntriesIterator + extends BatchedRemoteIterator { + private final String pool; - private final int repliesPerRequest; - private List entries; - private int idx; - public PathCacheEntriesIterator(long prevId, String pool, - int repliesPerRequest) { - this.prevId = prevId; + public ServerSidePathCacheEntriesIterator(Long firstKey, + int maxRepliesPerRequest, String pool) { + super(firstKey, maxRepliesPerRequest); this.pool = pool; - this.repliesPerRequest = repliesPerRequest; - this.entries = null; - this.idx = -1; - } - - private void makeRequest() throws IOException { - idx = 0; - entries = null; - entries = namesystem.listPathCacheEntries(prevId, pool, - repliesPerRequest); - if (entries.isEmpty()) { - entries = null; - } - } - - private void makeRequestIfNeeded() throws IOException { - if (idx == -1) { - makeRequest(); - } else if ((entries != null) && (idx >= entries.size())) { - if (entries.size() < repliesPerRequest) { - // Last time, we got fewer entries than requested. - // So we should be at the end. - entries = null; - } else { - makeRequest(); - } - } } @Override - public boolean hasNext() throws IOException { - makeRequestIfNeeded(); - return (entries != null); + public BatchedEntries makeRequest( + Long nextKey, int maxRepliesPerRequest) throws IOException { + return new BatchedListEntries( + namesystem.listPathCacheEntries(nextKey, pool, + maxRepliesPerRequest)); } @Override - public PathCacheEntry next() throws IOException { - makeRequestIfNeeded(); - if (entries == null) { - throw new NoSuchElementException(); - } - PathCacheEntry entry = entries.get(idx++); - prevId = entry.getEntryId(); - return entry; + public Long elementToNextKey(PathCacheEntry entry) { + return entry.getEntryId(); } } @Override public RemoteIterator listPathCacheEntries(long prevId, String pool, int maxReplies) throws IOException { - return new PathCacheEntriesIterator(prevId, pool, maxReplies); + return new ServerSidePathCacheEntriesIterator(prevId, maxReplies, pool); + } + + @Override + public void addCachePool(CachePoolInfo info) throws IOException { + namesystem.addCachePool(info); + } + + @Override + public void modifyCachePool(CachePoolInfo info) throws IOException { + namesystem.modifyCachePool(info); + } + + @Override + public void removeCachePool(String cachePoolName) throws IOException { + namesystem.removeCachePool(cachePoolName); + } + + private class ServerSideCachePoolIterator + extends BatchedRemoteIterator { + + public ServerSideCachePoolIterator(String prevKey, int maxRepliesPerRequest) { + super(prevKey, maxRepliesPerRequest); + } + + @Override + public BatchedEntries makeRequest(String prevKey, + int maxRepliesPerRequest) throws IOException { + return new BatchedListEntries( + namesystem.listCachePools(prevKey, maxRepliesPerRequest)); + } + + @Override + public String elementToNextKey(CachePoolInfo element) { + return element.getPoolName(); + } + } + + @Override + public RemoteIterator listCachePools(String prevKey, + int maxRepliesPerRequest) throws IOException { + return new ServerSideCachePoolIterator(prevKey, maxRepliesPerRequest); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ClientNamenodeProtocol.proto b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ClientNamenodeProtocol.proto index 9d1bfd5a354..e799ebf413a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ClientNamenodeProtocol.proto +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ClientNamenodeProtocol.proto @@ -394,7 +394,8 @@ message RemovePathCacheEntriesResponseProto { enum RemovePathCacheEntryErrorProto { INVALID_CACHED_PATH_ID_ERROR = -1; NO_SUCH_CACHED_PATH_ID_ERROR = -2; - UNEXPECTED_REMOVE_ERROR = -3; + REMOVE_PERMISSION_DENIED_ERROR = -3; + UNEXPECTED_REMOVE_ERROR = -4; } message ListPathCacheEntriesRequestProto { @@ -414,6 +415,53 @@ message ListPathCacheEntriesResponseProto { required bool hasMore = 2; } +message AddCachePoolRequestProto { + required string poolName = 1; + optional string ownerName = 2; + optional string groupName = 3; + optional int32 mode = 4; + optional int32 weight = 5; +} + +message AddCachePoolResponseProto { // void response +} + +message ModifyCachePoolRequestProto { + required string poolName = 1; + optional string ownerName = 2; + optional string groupName = 3; + optional int32 mode = 4; + optional int32 weight = 5; +} + +message ModifyCachePoolResponseProto { // void response +} + +message RemoveCachePoolRequestProto { + required string poolName = 1; +} + +message RemoveCachePoolResponseProto { // void response +} + +message ListCachePoolsRequestProto { + required string prevPoolName = 1; + required int32 maxReplies = 2; +} + +message ListCachePoolsResponseProto { + repeated ListCachePoolsResponseElementProto elements = 1; + optional bool hasMore = 2; +} + +message ListCachePoolsResponseElementProto { + required string poolName = 1; + required string ownerName = 2; + required string groupName = 3; + required int32 mode = 4; + required int32 weight = 5; +} + message GetFileLinkInfoRequestProto { required string src = 1; } @@ -601,6 +649,14 @@ service ClientNamenodeProtocol { returns (RemovePathCacheEntriesResponseProto); rpc listPathCacheEntries(ListPathCacheEntriesRequestProto) returns (ListPathCacheEntriesResponseProto); + rpc addCachePool(AddCachePoolRequestProto) + returns(AddCachePoolResponseProto); + rpc modifyCachePool(ModifyCachePoolRequestProto) + returns(ModifyCachePoolResponseProto); + rpc removeCachePool(RemoveCachePoolRequestProto) + returns(RemoveCachePoolResponseProto); + rpc listCachePools(ListCachePoolsRequestProto) + returns(ListCachePoolsResponseProto); rpc getFileLinkInfo(GetFileLinkInfoRequestProto) returns(GetFileLinkInfoResponseProto); rpc getContentSummary(GetContentSummaryRequestProto) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestPathCacheRequests.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestPathCacheRequests.java index fe7ae38d7b5..66aba064537 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestPathCacheRequests.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestPathCacheRequests.java @@ -17,6 +17,8 @@ */ package org.apache.hadoop.hdfs.server.namenode; +import static org.junit.Assert.*; + import java.io.IOException; import java.util.Arrays; import java.util.List; @@ -32,17 +34,89 @@ import org.apache.hadoop.hdfs.protocol.AddPathCacheDirectiveException.EmptyPathError; import org.apache.hadoop.hdfs.protocol.AddPathCacheDirectiveException.InvalidPoolNameError; import org.apache.hadoop.hdfs.protocol.AddPathCacheDirectiveException.InvalidPathNameError; +import org.apache.hadoop.hdfs.protocol.AddPathCacheDirectiveException.PoolWritePermissionDeniedError; +import org.apache.hadoop.hdfs.protocol.CachePoolInfo; import org.apache.hadoop.hdfs.protocol.RemovePathCacheEntryException.InvalidIdException; import org.apache.hadoop.hdfs.protocol.PathCacheDirective; import org.apache.hadoop.hdfs.protocol.PathCacheEntry; import org.apache.hadoop.hdfs.protocol.RemovePathCacheEntryException.NoSuchIdException; import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols; +import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.util.Fallible; import org.junit.Test; public class TestPathCacheRequests { static final Log LOG = LogFactory.getLog(TestPathCacheRequests.class); + @Test + public void testCreateAndRemovePools() throws Exception { + Configuration conf = new HdfsConfiguration(); + MiniDFSCluster cluster = null; + + cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build(); + cluster.waitActive(); + NamenodeProtocols proto = cluster.getNameNodeRpc(); + CachePoolInfo req = new CachePoolInfo("pool1"). + setOwnerName("bob").setGroupName("bobgroup"). + setMode(0755).setWeight(150); + proto.addCachePool(req); + try { + proto.removeCachePool("pool99"); + Assert.fail("expected to get an exception when " + + "removing a non-existent pool."); + } catch (IOException ioe) { + GenericTestUtils.assertExceptionContains("can't remove " + + "nonexistent cache pool", ioe); + } + proto.removeCachePool("pool1"); + try { + proto.removeCachePool("pool1"); + Assert.fail("expected to get an exception when " + + "removing a non-existent pool."); + } catch (IOException ioe) { + GenericTestUtils.assertExceptionContains("can't remove " + + "nonexistent cache pool", ioe); + } + req = new CachePoolInfo("pool2"); + proto.addCachePool(req); + } + + @Test + public void testCreateAndModifyPools() throws Exception { + Configuration conf = new HdfsConfiguration(); + MiniDFSCluster cluster = null; + + cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build(); + cluster.waitActive(); + NamenodeProtocols proto = cluster.getNameNodeRpc(); + proto.addCachePool(new CachePoolInfo("pool1"). + setOwnerName("abc").setGroupName("123"). + setMode(0755).setWeight(150)); + proto.modifyCachePool(new CachePoolInfo("pool1"). + setOwnerName("def").setGroupName("456")); + RemoteIterator iter = proto.listCachePools("", 1); + CachePoolInfo info = iter.next(); + assertEquals("pool1", info.getPoolName()); + assertEquals("def", info.getOwnerName()); + assertEquals("456", info.getGroupName()); + assertEquals(Integer.valueOf(0755), info.getMode()); + assertEquals(Integer.valueOf(150), info.getWeight()); + + try { + proto.removeCachePool("pool99"); + Assert.fail("expected to get an exception when " + + "removing a non-existent pool."); + } catch (IOException ioe) { + } + proto.removeCachePool("pool1"); + try { + proto.removeCachePool("pool1"); + Assert.fail("expected to get an exception when " + + "removing a non-existent pool."); + } catch (IOException ioe) { + } + } + private static void validateListAll( RemoteIterator iter, long id0, long id1, long id2) throws Exception { @@ -67,12 +141,18 @@ public void testSetAndGet() throws Exception { cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build(); cluster.waitActive(); NamenodeProtocols proto = cluster.getNameNodeRpc(); + proto.addCachePool(new CachePoolInfo("pool1")); + proto.addCachePool(new CachePoolInfo("pool2")); + proto.addCachePool(new CachePoolInfo("pool3")); + proto.addCachePool(new CachePoolInfo("pool4").setMode(0)); List> addResults1 = proto.addPathCacheDirectives(Arrays.asList( new PathCacheDirective[] { new PathCacheDirective("/alpha", "pool1"), new PathCacheDirective("/beta", "pool2"), - new PathCacheDirective("", "pool3") + new PathCacheDirective("", "pool3"), + new PathCacheDirective("/zeta", "nonexistent_pool"), + new PathCacheDirective("/zeta", "pool4") })); long ids1[] = new long[2]; ids1[0] = addResults1.get(0).get().getEntryId(); @@ -83,6 +163,20 @@ public void testSetAndGet() throws Exception { } catch (IOException ioe) { Assert.assertTrue(ioe.getCause() instanceof EmptyPathError); } + try { + addResults1.get(3).get(); + Assert.fail("expected an error when adding to a nonexistent pool."); + } catch (IOException ioe) { + Assert.assertTrue(ioe.getCause() instanceof InvalidPoolNameError); + } + try { + addResults1.get(4).get(); + Assert.fail("expected an error when adding to a pool with " + + "mode 0 (no permissions for anyone)."); + } catch (IOException ioe) { + Assert.assertTrue(ioe.getCause() + instanceof PoolWritePermissionDeniedError); + } List> addResults2 = proto.addPathCacheDirectives(Arrays.asList(