From 920b4cc06f1bc15809902bdd1968cc434a694a08 Mon Sep 17 00:00:00 2001 From: Colin McCabe Date: Thu, 22 Aug 2013 23:37:51 +0000 Subject: [PATCH] HDFS-5052. Add cacheRequest/uncacheRequest support to NameNode. (Contributed by Colin Patrick McCabe.) git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-4949@1516669 13f79535-47bb-0310-9956-ffa450edef68 --- .../java/org/apache/hadoop/util/Fallible.java | 53 +++++ .../hadoop-hdfs/CHANGES-HDFS-4949.txt | 3 + .../AddPathCacheDirectiveException.java | 78 +++++++ .../hadoop/hdfs/protocol/ClientProtocol.java | 51 +++++ .../hdfs/protocol/PathCacheDirective.java | 110 ++++++++++ .../hadoop/hdfs/protocol/PathCacheEntry.java | 75 +++++++ .../RemovePathCacheEntryException.java | 68 +++++++ ...amenodeProtocolServerSideTranslatorPB.java | 113 ++++++++++- .../ClientNamenodeProtocolTranslatorPB.java | 191 ++++++++++++++++++ .../hdfs/server/namenode/CacheManager.java | 165 +++++++++++++++ .../hdfs/server/namenode/FSNamesystem.java | 24 ++- .../server/namenode/NameNodeRpcServer.java | 82 ++++++++ .../main/proto/ClientNamenodeProtocol.proto | 57 ++++++ .../namenode/TestPathCacheRequests.java | 150 ++++++++++++++ 14 files changed, 1218 insertions(+), 2 deletions(-) create mode 100644 hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/Fallible.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/AddPathCacheDirectiveException.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/PathCacheDirective.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/PathCacheEntry.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/RemovePathCacheEntryException.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CacheManager.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestPathCacheRequests.java diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/Fallible.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/Fallible.java new file mode 100644 index 00000000000..fe343d9eeaf --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/Fallible.java @@ -0,0 +1,53 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.util; + +import java.io.IOException; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; + +/** + * Contains either a value of type T, or an IOException. + * + * This can be useful as a return value for batch APIs that need granular + * error reporting. + */ +@InterfaceAudience.LimitedPrivate({"HDFS"}) +@InterfaceStability.Unstable +public class Fallible { + private final T val; + private final IOException ioe; + + public Fallible(T val) { + this.val = val; + this.ioe = null; + } + + public Fallible(IOException ioe) { + this.val = null; + this.ioe = ioe; + } + + public T get() throws IOException { + if (ioe != null) { + throw new IOException(ioe); + } + return this.val; + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-4949.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-4949.txt index d12d273abea..0e1805ee3ed 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-4949.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-4949.txt @@ -12,6 +12,9 @@ HDFS-4949 (Unreleased) HDFS-5051. Propagate cache status information from the DataNode to the NameNode (Andrew Wang via Colin Patrick McCabe) + HDFS-5052. Add cacheRequest/uncacheRequest support to NameNode. + (contributed by Colin Patrick McCabe) + OPTIMIZATIONS BUG FIXES diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/AddPathCacheDirectiveException.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/AddPathCacheDirectiveException.java new file mode 100644 index 00000000000..3e0531c20c8 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/AddPathCacheDirectiveException.java @@ -0,0 +1,78 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.protocol; + +import java.io.IOException; + +/** + * An exception which occurred when trying to add a path cache directive. + */ +public abstract class AddPathCacheDirectiveException extends IOException { + private static final long serialVersionUID = 1L; + + private final PathCacheDirective directive; + + public AddPathCacheDirectiveException(String description, + PathCacheDirective directive) { + super(description); + this.directive = directive; + } + + public PathCacheDirective getDirective() { + return directive; + } + + public static final class EmptyPathError + extends AddPathCacheDirectiveException { + private static final long serialVersionUID = 1L; + + public EmptyPathError(PathCacheDirective directive) { + super("empty path in directive " + directive, directive); + } + } + + public static class InvalidPathNameError + extends AddPathCacheDirectiveException { + private static final long serialVersionUID = 1L; + + public InvalidPathNameError(PathCacheDirective directive) { + super("can't handle non-absolute path name " + directive.getPath(), + directive); + } + } + + public static class InvalidPoolNameError + extends AddPathCacheDirectiveException { + private static final long serialVersionUID = 1L; + + public InvalidPoolNameError(PathCacheDirective directive) { + super("invalid pool name '" + directive.getPool() + "'", directive); + } + } + + public static class UnexpectedAddPathCacheDirectiveException + extends AddPathCacheDirectiveException { + private static final long serialVersionUID = 1L; + + public UnexpectedAddPathCacheDirectiveException( + PathCacheDirective directive) { + super("encountered an unexpected error when trying to " + + "add path cache directive " + directive, directive); + } + } +}; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java index 5789c3615eb..165d0673f47 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hdfs.protocol; import java.io.FileNotFoundException; import java.io.IOException; +import java.util.List; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; @@ -30,6 +31,7 @@ import org.apache.hadoop.fs.InvalidPathException; import org.apache.hadoop.fs.Options; import org.apache.hadoop.fs.Options.Rename; import org.apache.hadoop.fs.ParentNotDirectoryException; +import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.fs.UnresolvedLinkException; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.hdfs.DFSConfigKeys; @@ -46,6 +48,7 @@ import org.apache.hadoop.security.AccessControlException; import org.apache.hadoop.security.KerberosInfo; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.TokenInfo; +import org.apache.hadoop.util.Fallible; /********************************************************************** * ClientProtocol is used by user code via @@ -1093,5 +1096,53 @@ public interface ClientProtocol { @Idempotent public SnapshotDiffReport getSnapshotDiffReport(String snapshotRoot, String fromSnapshot, String toSnapshot) throws IOException; + + /** + * Add some path cache directives to the CacheManager. + * + * @param directives + * A list of all the path cache directives we want to add. + * @return + * An list where each element is either a path cache entry that was + * added, or an IOException exception describing why the directive + * could not be added. + */ + @AtMostOnce + public List> + addPathCacheDirectives(List directives) + throws IOException; + + /** + * Remove some path cache entries from the CacheManager. + * + * @param ids + * A list of all the IDs we want to remove from the CacheManager. + * @return + * An list where each element is either an ID that was removed, + * or an IOException exception describing why the ID could not be + * removed. + */ + @AtMostOnce + public List> removePathCacheEntries(List ids) + throws IOException; + + /** + * List cached paths on the server. + * + * @param prevId + * The previous ID that we listed, or 0 if this is the first call + * to listPathCacheEntries. + * @param pool + * The pool ID to list. If this is the empty string, all pool ids + * will be listed. + * @param maxRepliesPerRequest + * The maximum number of replies to make in each request. + * @return + * A RemoteIterator from which you can get PathCacheEntry objects. + * Requests will be made as needed. + */ + @Idempotent + public RemoteIterator listPathCacheEntries(long prevId, + String pool, int maxRepliesPerRequest) throws IOException; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/PathCacheDirective.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/PathCacheDirective.java new file mode 100644 index 00000000000..8045186a6c2 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/PathCacheDirective.java @@ -0,0 +1,110 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.protocol; + +import java.io.IOException; + +import com.google.common.base.Preconditions; +import com.google.common.collect.ComparisonChain; + +import org.apache.commons.lang.builder.HashCodeBuilder; +import org.apache.hadoop.hdfs.DFSUtil; +import org.apache.hadoop.hdfs.protocol.AddPathCacheDirectiveException.EmptyPathError; +import org.apache.hadoop.hdfs.protocol.AddPathCacheDirectiveException.InvalidPoolNameError; +import org.apache.hadoop.hdfs.protocol.AddPathCacheDirectiveException.InvalidPathNameError; + +/** + * A directive to add a path to a cache pool. + */ +public class PathCacheDirective implements Comparable { + private final String path; + + private final String pool; + + public PathCacheDirective(String path, String pool) throws IOException { + Preconditions.checkNotNull(path); + Preconditions.checkNotNull(pool); + this.path = path; + this.pool = pool; + } + + /** + * @return The path used in this request. + */ + public String getPath() { + return path; + } + + /** + * @return The pool used in this request. + */ + public String getPool() { + return pool; + } + + /** + * Check if this PathCacheDirective is valid. + * + * @throws IOException + * If this PathCacheDirective is not valid. + */ + public void validate() throws IOException { + if (path.isEmpty()) { + throw new EmptyPathError(this); + } + if (DFSUtil.isValidName(path)) { + throw new InvalidPathNameError(this); + } + + if (pool.isEmpty()) { + throw new InvalidPoolNameError(this); + } + } + + @Override + public int compareTo(PathCacheDirective rhs) { + return ComparisonChain.start(). + compare(pool, rhs.getPool()). + compare(path, rhs.getPath()). + result(); + } + + @Override + public int hashCode() { + return new HashCodeBuilder().append(path).append(pool).hashCode(); + } + + @Override + public boolean equals(Object o) { + try { + PathCacheDirective other = (PathCacheDirective)o; + return other.compareTo(this) == 0; + } catch (ClassCastException e) { + return false; + } + } + + @Override + public String toString() { + StringBuilder builder = new StringBuilder(); + builder.append("{ path:").append(path). + append(", pool:").append(pool). + append(" }"); + return builder.toString(); + } +}; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/PathCacheEntry.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/PathCacheEntry.java new file mode 100644 index 00000000000..62b8b0968b5 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/PathCacheEntry.java @@ -0,0 +1,75 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.protocol; + +import org.apache.commons.lang.builder.EqualsBuilder; +import org.apache.commons.lang.builder.HashCodeBuilder; + +import com.google.common.base.Preconditions; + +/** + * An entry in the NameNode's path cache. + */ +public final class PathCacheEntry { + private final long entryId; + private final PathCacheDirective directive; + + public PathCacheEntry(long entryId, PathCacheDirective directive) { + Preconditions.checkArgument(entryId > 0); + this.entryId = entryId; + this.directive = directive; + } + + public long getEntryId() { + return entryId; + } + + public PathCacheDirective getDirective() { + return directive; + } + + @Override + public boolean equals(Object o) { + try { + PathCacheEntry other = (PathCacheEntry)o; + return new EqualsBuilder(). + append(this.entryId, other.entryId). + append(this.directive, other.directive). + isEquals(); + } catch (ClassCastException e) { + return false; + } + } + + @Override + public int hashCode() { + return new HashCodeBuilder(). + append(entryId). + append(directive). + hashCode(); + } + + @Override + public String toString() { + StringBuilder builder = new StringBuilder(); + builder.append("{ entryId:").append(entryId). + append(", directive:").append(directive.toString()). + append(" }"); + return builder.toString(); + } +}; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/RemovePathCacheEntryException.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/RemovePathCacheEntryException.java new file mode 100644 index 00000000000..41f7269cdd1 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/RemovePathCacheEntryException.java @@ -0,0 +1,68 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.protocol; + +import java.io.IOException; + +import com.google.common.base.Preconditions; + +/** + * An exception which occurred when trying to remove a path cache entry. + */ +public abstract class RemovePathCacheEntryException extends IOException { + private static final long serialVersionUID = 1L; + + private final long entryId; + + public RemovePathCacheEntryException(String description, long entryId) { + super(description); + this.entryId = entryId; + } + + public long getEntryId() { + return this.entryId; + } + + public final static class InvalidIdException + extends RemovePathCacheEntryException { + private static final long serialVersionUID = 1L; + + public InvalidIdException(long entryId) { + super("invalid cache path entry id " + entryId, entryId); + } + } + + public final static class NoSuchIdException + extends RemovePathCacheEntryException { + private static final long serialVersionUID = 1L; + + public NoSuchIdException(long entryId) { + super("there is no path cache entry with id " + entryId, entryId); + } + } + + public final static class UnexpectedRemovePathCacheEntryException + extends RemovePathCacheEntryException { + private static final long serialVersionUID = 1L; + + public UnexpectedRemovePathCacheEntryException(long id) { + super("encountered an unexpected error when trying to " + + "remove path cache entry id " + id, id); + } + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java index d7a18a60ac8..c02bcecbe61 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hdfs.protocolPB; import java.io.IOException; +import java.util.ArrayList; import java.util.List; import org.apache.hadoop.classification.InterfaceAudience; @@ -25,6 +26,14 @@ import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.fs.ContentSummary; import org.apache.hadoop.fs.FsServerDefaults; import org.apache.hadoop.fs.Options.Rename; +import org.apache.hadoop.fs.RemoteIterator; +import org.apache.hadoop.hdfs.protocol.PathCacheDirective; +import org.apache.hadoop.hdfs.protocol.PathCacheEntry; +import org.apache.hadoop.hdfs.protocol.AddPathCacheDirectiveException.EmptyPathError; +import org.apache.hadoop.hdfs.protocol.AddPathCacheDirectiveException.InvalidPathNameError; +import org.apache.hadoop.hdfs.protocol.AddPathCacheDirectiveException.InvalidPoolNameError; +import org.apache.hadoop.hdfs.protocol.RemovePathCacheEntryException.InvalidIdException; +import org.apache.hadoop.hdfs.protocol.RemovePathCacheEntryException.NoSuchIdException; import org.apache.hadoop.hdfs.protocol.ClientProtocol; import org.apache.hadoop.hdfs.protocol.CorruptFileBlocks; import org.apache.hadoop.hdfs.protocol.DirectoryListing; @@ -37,6 +46,10 @@ import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.Abando import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.AbandonBlockResponseProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.AddBlockRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.AddBlockResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.PathCacheDirectiveProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.AddPathCacheDirectiveErrorProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.AddPathCacheDirectivesRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.AddPathCacheDirectivesResponseProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.AllowSnapshotRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.AllowSnapshotResponseProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.AppendRequestProto; @@ -92,6 +105,9 @@ import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetSna import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetSnapshottableDirListingResponseProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.IsFileClosedRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.IsFileClosedResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListPathCacheEntriesElementProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListPathCacheEntriesRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListPathCacheEntriesResponseProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListCorruptFileBlocksRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListCorruptFileBlocksResponseProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.MetaSaveRequestProto; @@ -102,6 +118,9 @@ import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.Recove import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RecoverLeaseResponseProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RefreshNodesRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RefreshNodesResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RemovePathCacheEntryErrorProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RemovePathCacheEntriesRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RemovePathCacheEntriesResponseProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.Rename2RequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.Rename2ResponseProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RenameRequestProto; @@ -142,6 +161,7 @@ import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.LocatedBlockProto; import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey; import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier; import org.apache.hadoop.hdfs.server.namenode.INodeId; +import org.apache.hadoop.hdfs.server.namenode.UnsupportedActionException; import org.apache.hadoop.io.Text; import org.apache.hadoop.security.proto.SecurityProtos.CancelDelegationTokenRequestProto; import org.apache.hadoop.security.proto.SecurityProtos.CancelDelegationTokenResponseProto; @@ -150,6 +170,7 @@ import org.apache.hadoop.security.proto.SecurityProtos.GetDelegationTokenRespons import org.apache.hadoop.security.proto.SecurityProtos.RenewDelegationTokenRequestProto; import org.apache.hadoop.security.proto.SecurityProtos.RenewDelegationTokenResponseProto; import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.util.Fallible; import com.google.protobuf.RpcController; import com.google.protobuf.ServiceException; @@ -1003,5 +1024,95 @@ public class ClientNamenodeProtocolServerSideTranslatorPB implements throw new ServiceException(e); } } - + + @Override + public AddPathCacheDirectivesResponseProto addPathCacheDirectives(RpcController controller, + AddPathCacheDirectivesRequestProto request) throws ServiceException { + try { + ArrayList input = + new ArrayList(request.getElementsCount()); + for (int i = 0; i < request.getElementsCount(); i++) { + PathCacheDirectiveProto proto = request.getElements(i); + input.add(new PathCacheDirective(proto.getPath(), proto.getPool())); + } + List> output = server.addPathCacheDirectives(input); + AddPathCacheDirectivesResponseProto.Builder builder = + AddPathCacheDirectivesResponseProto.newBuilder(); + for (int idx = 0; idx < output.size(); idx++) { + try { + PathCacheEntry entry = output.get(idx).get(); + builder.addResults(entry.getEntryId()); + } catch (EmptyPathError ioe) { + builder.addResults(AddPathCacheDirectiveErrorProto. + EMPTY_PATH_ERROR_VALUE); + } catch (InvalidPathNameError ioe) { + builder.addResults(AddPathCacheDirectiveErrorProto. + INVALID_PATH_NAME_ERROR_VALUE); + } catch (InvalidPoolNameError ioe) { + builder.addResults(AddPathCacheDirectiveErrorProto. + INVALID_POOL_NAME_ERROR_VALUE); + } catch (IOException ioe) { + builder.addResults(AddPathCacheDirectiveErrorProto. + UNEXPECTED_ADD_ERROR_VALUE); + } + } + return builder.build(); + } catch (IOException e) { + throw new ServiceException(e); + } + } + + @Override + public RemovePathCacheEntriesResponseProto removePathCacheEntries( + RpcController controller, RemovePathCacheEntriesRequestProto request) + throws ServiceException { + try { + List> output = + server.removePathCacheEntries(request.getElementsList()); + RemovePathCacheEntriesResponseProto.Builder builder = + RemovePathCacheEntriesResponseProto.newBuilder(); + for (int idx = 0; idx < output.size(); idx++) { + try { + long id = output.get(idx).get(); + builder.addResults(id); + } catch (InvalidIdException ioe) { + builder.addResults(RemovePathCacheEntryErrorProto. + INVALID_CACHED_PATH_ID_ERROR_VALUE); + } catch (NoSuchIdException ioe) { + builder.addResults(RemovePathCacheEntryErrorProto. + NO_SUCH_CACHED_PATH_ID_ERROR_VALUE); + } catch (IOException ioe) { + builder.addResults(RemovePathCacheEntryErrorProto. + UNEXPECTED_REMOVE_ERROR_VALUE); + } + } + return builder.build(); + } catch (IOException e) { + throw new ServiceException(e); + } + } + + @Override + public ListPathCacheEntriesResponseProto listPathCacheEntries(RpcController controller, + ListPathCacheEntriesRequestProto request) throws ServiceException { + try { + RemoteIterator iter = + server.listPathCacheEntries(request.getPrevId(), + request.getPool(), + request.getMaxReplies()); + ListPathCacheEntriesResponseProto.Builder builder = + ListPathCacheEntriesResponseProto.newBuilder(); + while (iter.hasNext()) { + PathCacheEntry entry = iter.next(); + builder.addElements( + ListPathCacheEntriesElementProto.newBuilder(). + setId(entry.getEntryId()). + setPath(entry.getDirective().getPath()). + setPool(entry.getDirective().getPool())); + } + return builder.build(); + } catch (IOException e) { + throw new ServiceException(e); + } + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java index b5be61a3bcd..eb9845e849b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java @@ -20,7 +20,10 @@ package org.apache.hadoop.hdfs.protocolPB; import java.io.Closeable; import java.io.FileNotFoundException; import java.io.IOException; +import java.util.ArrayList; import java.util.Arrays; +import java.util.List; +import java.util.NoSuchElementException; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; @@ -30,9 +33,19 @@ import org.apache.hadoop.fs.FileAlreadyExistsException; import org.apache.hadoop.fs.FsServerDefaults; import org.apache.hadoop.fs.Options.Rename; import org.apache.hadoop.fs.ParentNotDirectoryException; +import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.fs.UnresolvedLinkException; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException; +import org.apache.hadoop.hdfs.protocol.PathCacheDirective; +import org.apache.hadoop.hdfs.protocol.PathCacheEntry; +import org.apache.hadoop.hdfs.protocol.AddPathCacheDirectiveException.EmptyPathError; +import org.apache.hadoop.hdfs.protocol.AddPathCacheDirectiveException.InvalidPathNameError; +import org.apache.hadoop.hdfs.protocol.AddPathCacheDirectiveException.InvalidPoolNameError; +import org.apache.hadoop.hdfs.protocol.AddPathCacheDirectiveException.UnexpectedAddPathCacheDirectiveException; +import org.apache.hadoop.hdfs.protocol.RemovePathCacheEntryException.InvalidIdException; +import org.apache.hadoop.hdfs.protocol.RemovePathCacheEntryException.NoSuchIdException; +import org.apache.hadoop.hdfs.protocol.RemovePathCacheEntryException.UnexpectedRemovePathCacheEntryException; import org.apache.hadoop.hdfs.protocol.ClientProtocol; import org.apache.hadoop.hdfs.protocol.CorruptFileBlocks; import org.apache.hadoop.hdfs.protocol.DSQuotaExceededException; @@ -50,6 +63,10 @@ import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport; import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.AbandonBlockRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.AddBlockRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.PathCacheDirectiveProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.AddPathCacheDirectiveErrorProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.AddPathCacheDirectivesRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.AddPathCacheDirectivesResponseProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.AllowSnapshotRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.AppendRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.AppendResponseProto; @@ -87,11 +104,18 @@ import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetSna import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetSnapshottableDirListingRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetSnapshottableDirListingResponseProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.IsFileClosedRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListPathCacheEntriesElementProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListPathCacheEntriesRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListPathCacheEntriesRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListPathCacheEntriesResponseProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListCorruptFileBlocksRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.MetaSaveRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.MkdirsRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RecoverLeaseRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RefreshNodesRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RemovePathCacheEntriesRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RemovePathCacheEntriesResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RemovePathCacheEntryErrorProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.Rename2RequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RenameRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RenameSnapshotRequestProto; @@ -127,6 +151,7 @@ import org.apache.hadoop.security.proto.SecurityProtos.GetDelegationTokenRequest import org.apache.hadoop.security.proto.SecurityProtos.GetDelegationTokenResponseProto; import org.apache.hadoop.security.proto.SecurityProtos.RenewDelegationTokenRequestProto; import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.util.Fallible; import com.google.protobuf.ByteString; import com.google.protobuf.ServiceException; @@ -982,4 +1007,170 @@ public class ClientNamenodeProtocolTranslatorPB implements throw ProtobufHelper.getRemoteException(e); } } + + private static IOException addPathCacheDirectivesError(long code, + PathCacheDirective directive) { + if (code == AddPathCacheDirectiveErrorProto.EMPTY_PATH_ERROR_VALUE) { + return new EmptyPathError(directive); + } else if (code == AddPathCacheDirectiveErrorProto. + INVALID_PATH_NAME_ERROR_VALUE) { + return new InvalidPathNameError(directive); + } else if (code == AddPathCacheDirectiveErrorProto. + INVALID_POOL_NAME_ERROR_VALUE) { + return new InvalidPoolNameError(directive); + } else { + return new UnexpectedAddPathCacheDirectiveException(directive); + } + } + + @Override + public List> addPathCacheDirectives( + List directives) throws IOException { + try { + AddPathCacheDirectivesRequestProto.Builder builder = + AddPathCacheDirectivesRequestProto.newBuilder(); + for (PathCacheDirective directive : directives) { + builder.addElements(PathCacheDirectiveProto.newBuilder(). + setPath(directive.getPath()). + setPool(directive.getPool()). + build()); + } + AddPathCacheDirectivesResponseProto result = + rpcProxy.addPathCacheDirectives(null, builder.build()); + int resultsCount = result.getResultsCount(); + ArrayList> results = + new ArrayList>(resultsCount); + for (int i = 0; i < resultsCount; i++) { + PathCacheDirective directive = directives.get(i); + long code = result.getResults(i); + if (code > 0) { + results.add(new Fallible( + new PathCacheEntry(code, directive))); + } else { + results.add(new Fallible( + addPathCacheDirectivesError(code, directive))); + } + } + return results; + } catch (ServiceException e) { + throw ProtobufHelper.getRemoteException(e); + } + } + + private static IOException removePathCacheEntriesError(long code, long id) { + if (code == RemovePathCacheEntryErrorProto. + INVALID_CACHED_PATH_ID_ERROR_VALUE) { + return new InvalidIdException(id); + } else if (code == RemovePathCacheEntryErrorProto. + NO_SUCH_CACHED_PATH_ID_ERROR_VALUE) { + return new NoSuchIdException(id); + } else { + return new UnexpectedRemovePathCacheEntryException(id); + } + } + + @Override + public List> removePathCacheEntries(List ids) + throws IOException { + try { + RemovePathCacheEntriesRequestProto.Builder builder = + RemovePathCacheEntriesRequestProto.newBuilder(); + for (Long id : ids) { + builder.addElements(id); + } + RemovePathCacheEntriesResponseProto result = + rpcProxy.removePathCacheEntries(null, builder.build()); + int resultsCount = result.getResultsCount(); + ArrayList> results = + new ArrayList>(resultsCount); + for (int i = 0; i < resultsCount; i++) { + long code = result.getResults(i); + if (code > 0) { + results.add(new Fallible(code)); + } else { + results.add(new Fallible( + removePathCacheEntriesError(code, ids.get(i)))); + } + } + return results; + } catch (ServiceException e) { + throw ProtobufHelper.getRemoteException(e); + } + } + + private class PathCacheEntriesIterator + implements RemoteIterator { + private long prevId; + private final String pool; + private final int repliesPerRequest; + private ListPathCacheEntriesResponseProto response; + private int idx; + + public PathCacheEntriesIterator(long prevId, String pool, + int repliesPerRequest) { + this.prevId = prevId; + this.pool = pool; + this.repliesPerRequest = repliesPerRequest; + this.response = null; + this.idx = -1; + } + + private void makeRequest() throws IOException { + idx = 0; + response = null; + try { + ListPathCacheEntriesRequestProto req = + ListPathCacheEntriesRequestProto.newBuilder(). + setPrevId(prevId). + setPool(pool). + setMaxReplies(repliesPerRequest). + build(); + response = rpcProxy.listPathCacheEntries(null, req); + if (response.getElementsCount() == 0) { + response = null; + } + } catch (ServiceException e) { + throw ProtobufHelper.getRemoteException(e); + } + } + + private void makeRequestIfNeeded() throws IOException { + if (idx == -1) { + makeRequest(); + } else if ((response != null) && (idx >= response.getElementsCount())) { + if (response.getHasMore()) { + makeRequest(); + } else { + response = null; + } + } + } + + @Override + public boolean hasNext() throws IOException { + makeRequestIfNeeded(); + return (response != null); + } + + @Override + public PathCacheEntry next() throws IOException { + makeRequestIfNeeded(); + if (response == null) { + throw new NoSuchElementException(); + } + ListPathCacheEntriesElementProto elementProto = + response.getElements(idx); + prevId = elementProto.getId(); + idx++; + return new PathCacheEntry(elementProto.getId(), + new PathCacheDirective(elementProto.getPath(), + elementProto.getPool())); + } + } + + @Override + public RemoteIterator listPathCacheEntries(long prevId, + String pool, int repliesPerRequest) throws IOException { + return new PathCacheEntriesIterator(prevId, pool, repliesPerRequest); + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CacheManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CacheManager.java new file mode 100644 index 00000000000..8be575a9701 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CacheManager.java @@ -0,0 +1,165 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.namenode; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.SortedMap; +import java.util.TreeMap; +import java.util.Map.Entry; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.protocol.PathCacheDirective; +import org.apache.hadoop.hdfs.protocol.PathCacheEntry; +import org.apache.hadoop.hdfs.protocol.AddPathCacheDirectiveException.UnexpectedAddPathCacheDirectiveException; +import org.apache.hadoop.hdfs.protocol.RemovePathCacheEntryException.InvalidIdException; +import org.apache.hadoop.hdfs.protocol.RemovePathCacheEntryException.NoSuchIdException; +import org.apache.hadoop.hdfs.protocol.RemovePathCacheEntryException.UnexpectedRemovePathCacheEntryException; +import org.apache.hadoop.util.Fallible; + +/** + * The Cache Manager handles caching on DataNodes. + */ +final class CacheManager { + public static final Log LOG = LogFactory.getLog(CacheManager.class); + + /** + * Cache entries, sorted by ID. + * + * listPathCacheEntries relies on the ordering of elements in this map + * to track what has already been listed by the client. + */ + private final TreeMap entriesById = + new TreeMap(); + + /** + * Cache entries, sorted by directive. + */ + private final TreeMap entriesByDirective = + new TreeMap(); + + /** + * The entry ID to use for a new entry. + */ + private long nextEntryId; + + CacheManager(FSDirectory dir, Configuration conf) { + // TODO: support loading and storing of the CacheManager state + clear(); + } + + synchronized void clear() { + entriesById.clear(); + entriesByDirective.clear(); + nextEntryId = 1; + } + + synchronized long getNextEntryId() throws IOException { + if (nextEntryId == Long.MAX_VALUE) { + throw new IOException("no more available IDs"); + } + return nextEntryId++; + } + + private synchronized Fallible addDirective( + PathCacheDirective directive) { + try { + directive.validate(); + } catch (IOException ioe) { + return new Fallible(ioe); + } + // Check if we already have this entry. + PathCacheEntry existing = entriesByDirective.get(directive); + if (existing != null) { + // Entry already exists: return existing entry. + return new Fallible(existing); + } + // Add a new entry with the next available ID. + PathCacheEntry entry; + try { + entry = new PathCacheEntry(getNextEntryId(), directive); + } catch (IOException ioe) { + return new Fallible( + new UnexpectedAddPathCacheDirectiveException(directive)); + } + entriesByDirective.put(directive, entry); + entriesById.put(entry.getEntryId(), entry); + return new Fallible(entry); + } + + public synchronized List> addDirectives( + List directives) { + ArrayList> results = + new ArrayList>(directives.size()); + for (PathCacheDirective directive: directives) { + results.add(addDirective(directive)); + } + return results; + } + + private synchronized Fallible removeEntry(long entryId) { + // Check for invalid IDs. + if (entryId <= 0) { + return new Fallible(new InvalidIdException(entryId)); + } + // Find the entry. + PathCacheEntry existing = entriesById.get(entryId); + if (existing == null) { + return new Fallible(new NoSuchIdException(entryId)); + } + // Remove the corresponding entry in entriesByDirective. + if (entriesByDirective.remove(existing.getDirective()) == null) { + return new Fallible( + new UnexpectedRemovePathCacheEntryException(entryId)); + } + entriesById.remove(entryId); + return new Fallible(entryId); + } + + public synchronized List> removeEntries(List entryIds) { + ArrayList> results = + new ArrayList>(entryIds.size()); + for (Long entryId : entryIds) { + results.add(removeEntry(entryId)); + } + return results; + } + + public synchronized List listPathCacheEntries(long prevId, + String pool, int maxReplies) { + final int MAX_PRE_ALLOCATED_ENTRIES = 16; + ArrayList replies = + new ArrayList(Math.min(MAX_PRE_ALLOCATED_ENTRIES, maxReplies)); + int numReplies = 0; + SortedMap tailMap = entriesById.tailMap(prevId + 1); + for (Entry cur : tailMap.entrySet()) { + if (numReplies >= maxReplies) { + return replies; + } + if (pool.isEmpty() || cur.getValue().getDirective(). + getPool().equals(pool)) { + replies.add(cur.getValue()); + numReplies++; + } + } + return replies; + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java index 989f688a0fd..b93d75c1256 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java @@ -129,6 +129,7 @@ import org.apache.hadoop.fs.Options; import org.apache.hadoop.fs.Options.Rename; import org.apache.hadoop.fs.ParentNotDirectoryException; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.fs.UnresolvedLinkException; import org.apache.hadoop.fs.permission.FsAction; import org.apache.hadoop.fs.permission.FsPermission; @@ -141,6 +142,8 @@ import org.apache.hadoop.hdfs.HAUtil; import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException; import org.apache.hadoop.hdfs.protocol.Block; +import org.apache.hadoop.hdfs.protocol.PathCacheDirective; +import org.apache.hadoop.hdfs.protocol.PathCacheEntry; import org.apache.hadoop.hdfs.protocol.ClientProtocol; import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; @@ -223,6 +226,7 @@ import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.delegation.DelegationKey; import org.apache.hadoop.util.Daemon; import org.apache.hadoop.util.DataChecksum; +import org.apache.hadoop.util.Fallible; import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.Time; import org.apache.hadoop.util.VersionInfo; @@ -360,6 +364,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats, FSDirectory dir; private final BlockManager blockManager; private final SnapshotManager snapshotManager; + private final CacheManager cacheManager; private final DatanodeStatistics datanodeStatistics; // Block pool ID used by this namenode @@ -687,6 +692,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats, this.dtSecretManager = createDelegationTokenSecretManager(conf); this.dir = new FSDirectory(fsImage, this, conf); this.snapshotManager = new SnapshotManager(dir); + this.cacheManager= new CacheManager(dir, conf); this.safeMode = new SafeModeInfo(conf); this.auditLoggers = initAuditLoggers(conf); this.isDefaultAuditLogger = auditLoggers.size() == 1 && @@ -6741,6 +6747,20 @@ public class FSNamesystem implements Namesystem, FSClusterStats, } } + List> addPathCacheDirectives( + List directives) { + return cacheManager.addDirectives(directives); + } + + List> removePathCacheEntries(List ids) { + return cacheManager.removeEntries(ids); + } + + List listPathCacheEntries(long startId, String pool, + int maxReplies) { + return cacheManager.listPathCacheEntries(startId, pool, maxReplies); + } + /** * Default AuditLogger implementation; used when no access logger is * defined in the config file. It can also be explicitly listed in the @@ -6777,7 +6797,9 @@ public class FSNamesystem implements Namesystem, FSClusterStats, auditLog.info(sb); } } - } + public CacheManager getCacheManager() { + return cacheManager; + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java index 3fd76a695bf..875f81642e5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java @@ -31,6 +31,7 @@ import java.util.Arrays; import java.util.Collection; import java.util.HashMap; import java.util.List; +import java.util.NoSuchElementException; import org.apache.commons.logging.Log; import org.apache.hadoop.HadoopIllegalArgumentException; @@ -44,6 +45,7 @@ import org.apache.hadoop.fs.InvalidPathException; import org.apache.hadoop.fs.Options; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.ParentNotDirectoryException; +import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.fs.UnresolvedLinkException; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.fs.permission.PermissionStatus; @@ -58,6 +60,8 @@ import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.HDFSPolicyProvider; import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException; import org.apache.hadoop.hdfs.protocol.BlockListAsLongs; +import org.apache.hadoop.hdfs.protocol.PathCacheDirective; +import org.apache.hadoop.hdfs.protocol.PathCacheEntry; import org.apache.hadoop.hdfs.protocol.CorruptFileBlocks; import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; @@ -133,6 +137,7 @@ import org.apache.hadoop.security.token.Token; import org.apache.hadoop.tools.proto.GetUserMappingsProtocolProtos.GetUserMappingsProtocolService; import org.apache.hadoop.tools.protocolPB.GetUserMappingsProtocolPB; import org.apache.hadoop.tools.protocolPB.GetUserMappingsProtocolServerSideTranslatorPB; +import org.apache.hadoop.util.Fallible; import org.apache.hadoop.util.VersionInfo; import org.apache.hadoop.util.VersionUtil; @@ -1200,4 +1205,81 @@ class NameNodeRpcServer implements NamenodeProtocols { metrics.incrSnapshotDiffReportOps(); return report; } + + @Override + public List> addPathCacheDirectives( + List paths) throws IOException { + return namesystem.addPathCacheDirectives(paths); + } + + @Override + public List> removePathCacheEntries(List ids) + throws IOException { + return namesystem.removePathCacheEntries(ids); + } + + private class PathCacheEntriesIterator + implements RemoteIterator { + private long prevId; + private final String pool; + private final int repliesPerRequest; + private List entries; + private int idx; + + public PathCacheEntriesIterator(long prevId, String pool, + int repliesPerRequest) { + this.prevId = prevId; + this.pool = pool; + this.repliesPerRequest = repliesPerRequest; + this.entries = null; + this.idx = -1; + } + + private void makeRequest() throws IOException { + idx = 0; + entries = null; + entries = namesystem.listPathCacheEntries(prevId, pool, + repliesPerRequest); + if (entries.isEmpty()) { + entries = null; + } + } + + private void makeRequestIfNeeded() throws IOException { + if (idx == -1) { + makeRequest(); + } else if ((entries != null) && (idx >= entries.size())) { + if (entries.size() < repliesPerRequest) { + // Last time, we got fewer entries than requested. + // So we should be at the end. + entries = null; + } else { + makeRequest(); + } + } + } + + @Override + public boolean hasNext() throws IOException { + makeRequestIfNeeded(); + return (entries != null); + } + + @Override + public PathCacheEntry next() throws IOException { + makeRequestIfNeeded(); + if (entries == null) { + throw new NoSuchElementException(); + } + PathCacheEntry entry = entries.get(idx++); + prevId = entry.getEntryId(); + return entry; + } + } + + @Override + public RemoteIterator listPathCacheEntries(long prevId, String pool, + int maxReplies) throws IOException { + return new PathCacheEntriesIterator(prevId, pool, maxReplies); + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ClientNamenodeProtocol.proto b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ClientNamenodeProtocol.proto index 95fcc50ebd3..9d1bfd5a354 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ClientNamenodeProtocol.proto +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ClientNamenodeProtocol.proto @@ -363,6 +363,57 @@ message IsFileClosedResponseProto { required bool result = 1; } +message PathCacheDirectiveProto { + required string path = 1; + required string pool = 2; +} + +message AddPathCacheDirectivesRequestProto { + repeated PathCacheDirectiveProto elements = 1; +} + +message AddPathCacheDirectivesResponseProto { + repeated int64 results = 1 [packed=true]; +} + +enum AddPathCacheDirectiveErrorProto { + EMPTY_PATH_ERROR = -1; + INVALID_PATH_NAME_ERROR = -2; + INVALID_POOL_NAME_ERROR = -3; + UNEXPECTED_ADD_ERROR = -4; +} + +message RemovePathCacheEntriesRequestProto { + repeated int64 elements = 1 [packed=true]; +} + +message RemovePathCacheEntriesResponseProto { + repeated int64 results = 1 [packed=true]; +} + +enum RemovePathCacheEntryErrorProto { + INVALID_CACHED_PATH_ID_ERROR = -1; + NO_SUCH_CACHED_PATH_ID_ERROR = -2; + UNEXPECTED_REMOVE_ERROR = -3; +} + +message ListPathCacheEntriesRequestProto { + required int64 prevId = 1; + required string pool = 2; + optional int32 maxReplies = 3; +} + +message ListPathCacheEntriesElementProto { + required int64 id = 1; + required string path = 2; + required string pool = 3; +} + +message ListPathCacheEntriesResponseProto { + repeated ListPathCacheEntriesElementProto elements = 1; + required bool hasMore = 2; +} + message GetFileLinkInfoRequestProto { required string src = 1; } @@ -544,6 +595,12 @@ service ClientNamenodeProtocol { returns(ListCorruptFileBlocksResponseProto); rpc metaSave(MetaSaveRequestProto) returns(MetaSaveResponseProto); rpc getFileInfo(GetFileInfoRequestProto) returns(GetFileInfoResponseProto); + rpc addPathCacheDirectives(AddPathCacheDirectivesRequestProto) + returns (AddPathCacheDirectivesResponseProto); + rpc removePathCacheEntries(RemovePathCacheEntriesRequestProto) + returns (RemovePathCacheEntriesResponseProto); + rpc listPathCacheEntries(ListPathCacheEntriesRequestProto) + returns (ListPathCacheEntriesResponseProto); rpc getFileLinkInfo(GetFileLinkInfoRequestProto) returns(GetFileLinkInfoResponseProto); rpc getContentSummary(GetContentSummaryRequestProto) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestPathCacheRequests.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestPathCacheRequests.java new file mode 100644 index 00000000000..fe7ae38d7b5 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestPathCacheRequests.java @@ -0,0 +1,150 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.namenode; + +import java.io.IOException; +import java.util.Arrays; +import java.util.List; + +import junit.framework.Assert; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.RemoteIterator; +import org.apache.hadoop.hdfs.HdfsConfiguration; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.protocol.AddPathCacheDirectiveException.EmptyPathError; +import org.apache.hadoop.hdfs.protocol.AddPathCacheDirectiveException.InvalidPoolNameError; +import org.apache.hadoop.hdfs.protocol.AddPathCacheDirectiveException.InvalidPathNameError; +import org.apache.hadoop.hdfs.protocol.RemovePathCacheEntryException.InvalidIdException; +import org.apache.hadoop.hdfs.protocol.PathCacheDirective; +import org.apache.hadoop.hdfs.protocol.PathCacheEntry; +import org.apache.hadoop.hdfs.protocol.RemovePathCacheEntryException.NoSuchIdException; +import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols; +import org.apache.hadoop.util.Fallible; +import org.junit.Test; + +public class TestPathCacheRequests { + static final Log LOG = LogFactory.getLog(TestPathCacheRequests.class); + + private static void validateListAll( + RemoteIterator iter, + long id0, long id1, long id2) throws Exception { + Assert.assertEquals(new PathCacheEntry(id0, + new PathCacheDirective("/alpha", "pool1")), + iter.next()); + Assert.assertEquals(new PathCacheEntry(id1, + new PathCacheDirective("/beta", "pool2")), + iter.next()); + Assert.assertEquals(new PathCacheEntry(id2, + new PathCacheDirective("/gamma", "pool1")), + iter.next()); + Assert.assertFalse(iter.hasNext()); + } + + @Test + public void testSetAndGet() throws Exception { + Configuration conf = new HdfsConfiguration(); + MiniDFSCluster cluster = null; + + try { + cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build(); + cluster.waitActive(); + NamenodeProtocols proto = cluster.getNameNodeRpc(); + List> addResults1 = + proto.addPathCacheDirectives(Arrays.asList( + new PathCacheDirective[] { + new PathCacheDirective("/alpha", "pool1"), + new PathCacheDirective("/beta", "pool2"), + new PathCacheDirective("", "pool3") + })); + long ids1[] = new long[2]; + ids1[0] = addResults1.get(0).get().getEntryId(); + ids1[1] = addResults1.get(1).get().getEntryId(); + try { + addResults1.get(2).get(); + Assert.fail("expected an error when adding an empty path"); + } catch (IOException ioe) { + Assert.assertTrue(ioe.getCause() instanceof EmptyPathError); + } + + List> addResults2 = + proto.addPathCacheDirectives(Arrays.asList( + new PathCacheDirective[] { + new PathCacheDirective("/alpha", "pool1"), + new PathCacheDirective("/theta", ""), + new PathCacheDirective("bogus", "pool1"), + new PathCacheDirective("/gamma", "pool1") + })); + long id = addResults2.get(0).get().getEntryId(); + Assert.assertEquals("expected to get back the same ID as last time " + + "when re-adding an existing path cache directive.", ids1[0], id); + try { + addResults2.get(1).get(); + Assert.fail("expected an error when adding a path cache " + + "directive with an empty pool name."); + } catch (IOException ioe) { + Assert.assertTrue(ioe.getCause() instanceof InvalidPoolNameError); + } + try { + addResults2.get(2).get(); + Assert.fail("expected an error when adding a path cache " + + "directive with a non-absolute path name."); + } catch (IOException ioe) { + Assert.assertTrue(ioe.getCause() instanceof InvalidPathNameError); + } + long ids2[] = new long[1]; + ids2[0] = addResults2.get(3).get().getEntryId(); + + RemoteIterator iter = + proto.listPathCacheEntries(0, "", 100); + validateListAll(iter, ids1[0], ids1[1], ids2[0]); + iter = proto.listPathCacheEntries(0, "", 1); + validateListAll(iter, ids1[0], ids1[1], ids2[0]); + iter = proto.listPathCacheEntries(0, "pool3", 1); + Assert.assertFalse(iter.hasNext()); + iter = proto.listPathCacheEntries(0, "pool2", 4444); + Assert.assertEquals(addResults1.get(1).get(), + iter.next()); + Assert.assertFalse(iter.hasNext()); + + List> removeResults1 = + proto.removePathCacheEntries(Arrays.asList( + new Long[] { ids1[1], -42L, 999999L })); + Assert.assertEquals(Long.valueOf(ids1[1]), + removeResults1.get(0).get()); + try { + removeResults1.get(1).get(); + Assert.fail("expected an error when removing a negative ID"); + } catch (IOException ioe) { + Assert.assertTrue(ioe.getCause() instanceof InvalidIdException); + } + try { + removeResults1.get(2).get(); + Assert.fail("expected an error when removing a nonexistent ID"); + } catch (IOException ioe) { + Assert.assertTrue(ioe.getCause() instanceof NoSuchIdException); + } + iter = proto.listPathCacheEntries(0, "pool2", 4444); + Assert.assertFalse(iter.hasNext()); + } finally { + if (cluster != null) { cluster.shutdown(); } + } + } +}