HDFS-5052. Add cacheRequest/uncacheRequest support to NameNode. (Contributed by Colin Patrick McCabe.)
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-4949@1516669 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
a99edd1f40
commit
920b4cc06f
|
@ -0,0 +1,53 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.util;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
|
||||
/**
|
||||
* Contains either a value of type T, or an IOException.
|
||||
*
|
||||
* This can be useful as a return value for batch APIs that need granular
|
||||
* error reporting.
|
||||
*/
|
||||
@InterfaceAudience.LimitedPrivate({"HDFS"})
|
||||
@InterfaceStability.Unstable
|
||||
public class Fallible<T> {
|
||||
private final T val;
|
||||
private final IOException ioe;
|
||||
|
||||
public Fallible(T val) {
|
||||
this.val = val;
|
||||
this.ioe = null;
|
||||
}
|
||||
|
||||
public Fallible(IOException ioe) {
|
||||
this.val = null;
|
||||
this.ioe = ioe;
|
||||
}
|
||||
|
||||
public T get() throws IOException {
|
||||
if (ioe != null) {
|
||||
throw new IOException(ioe);
|
||||
}
|
||||
return this.val;
|
||||
}
|
||||
}
|
|
@ -12,6 +12,9 @@ HDFS-4949 (Unreleased)
|
|||
HDFS-5051. Propagate cache status information from the DataNode to the
|
||||
NameNode (Andrew Wang via Colin Patrick McCabe)
|
||||
|
||||
HDFS-5052. Add cacheRequest/uncacheRequest support to NameNode.
|
||||
(contributed by Colin Patrick McCabe)
|
||||
|
||||
OPTIMIZATIONS
|
||||
|
||||
BUG FIXES
|
||||
|
|
|
@ -0,0 +1,78 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hdfs.protocol;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
/**
|
||||
* An exception which occurred when trying to add a path cache directive.
|
||||
*/
|
||||
public abstract class AddPathCacheDirectiveException extends IOException {
|
||||
private static final long serialVersionUID = 1L;
|
||||
|
||||
private final PathCacheDirective directive;
|
||||
|
||||
public AddPathCacheDirectiveException(String description,
|
||||
PathCacheDirective directive) {
|
||||
super(description);
|
||||
this.directive = directive;
|
||||
}
|
||||
|
||||
public PathCacheDirective getDirective() {
|
||||
return directive;
|
||||
}
|
||||
|
||||
public static final class EmptyPathError
|
||||
extends AddPathCacheDirectiveException {
|
||||
private static final long serialVersionUID = 1L;
|
||||
|
||||
public EmptyPathError(PathCacheDirective directive) {
|
||||
super("empty path in directive " + directive, directive);
|
||||
}
|
||||
}
|
||||
|
||||
public static class InvalidPathNameError
|
||||
extends AddPathCacheDirectiveException {
|
||||
private static final long serialVersionUID = 1L;
|
||||
|
||||
public InvalidPathNameError(PathCacheDirective directive) {
|
||||
super("can't handle non-absolute path name " + directive.getPath(),
|
||||
directive);
|
||||
}
|
||||
}
|
||||
|
||||
public static class InvalidPoolNameError
|
||||
extends AddPathCacheDirectiveException {
|
||||
private static final long serialVersionUID = 1L;
|
||||
|
||||
public InvalidPoolNameError(PathCacheDirective directive) {
|
||||
super("invalid pool name '" + directive.getPool() + "'", directive);
|
||||
}
|
||||
}
|
||||
|
||||
public static class UnexpectedAddPathCacheDirectiveException
|
||||
extends AddPathCacheDirectiveException {
|
||||
private static final long serialVersionUID = 1L;
|
||||
|
||||
public UnexpectedAddPathCacheDirectiveException(
|
||||
PathCacheDirective directive) {
|
||||
super("encountered an unexpected error when trying to " +
|
||||
"add path cache directive " + directive, directive);
|
||||
}
|
||||
}
|
||||
};
|
|
@ -19,6 +19,7 @@ package org.apache.hadoop.hdfs.protocol;
|
|||
|
||||
import java.io.FileNotFoundException;
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
|
@ -30,6 +31,7 @@ import org.apache.hadoop.fs.InvalidPathException;
|
|||
import org.apache.hadoop.fs.Options;
|
||||
import org.apache.hadoop.fs.Options.Rename;
|
||||
import org.apache.hadoop.fs.ParentNotDirectoryException;
|
||||
import org.apache.hadoop.fs.RemoteIterator;
|
||||
import org.apache.hadoop.fs.UnresolvedLinkException;
|
||||
import org.apache.hadoop.fs.permission.FsPermission;
|
||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||
|
@ -46,6 +48,7 @@ import org.apache.hadoop.security.AccessControlException;
|
|||
import org.apache.hadoop.security.KerberosInfo;
|
||||
import org.apache.hadoop.security.token.Token;
|
||||
import org.apache.hadoop.security.token.TokenInfo;
|
||||
import org.apache.hadoop.util.Fallible;
|
||||
|
||||
/**********************************************************************
|
||||
* ClientProtocol is used by user code via
|
||||
|
@ -1093,5 +1096,53 @@ public interface ClientProtocol {
|
|||
@Idempotent
|
||||
public SnapshotDiffReport getSnapshotDiffReport(String snapshotRoot,
|
||||
String fromSnapshot, String toSnapshot) throws IOException;
|
||||
|
||||
/**
|
||||
* Add some path cache directives to the CacheManager.
|
||||
*
|
||||
* @param directives
|
||||
* A list of all the path cache directives we want to add.
|
||||
* @return
|
||||
* An list where each element is either a path cache entry that was
|
||||
* added, or an IOException exception describing why the directive
|
||||
* could not be added.
|
||||
*/
|
||||
@AtMostOnce
|
||||
public List<Fallible<PathCacheEntry>>
|
||||
addPathCacheDirectives(List<PathCacheDirective> directives)
|
||||
throws IOException;
|
||||
|
||||
/**
|
||||
* Remove some path cache entries from the CacheManager.
|
||||
*
|
||||
* @param ids
|
||||
* A list of all the IDs we want to remove from the CacheManager.
|
||||
* @return
|
||||
* An list where each element is either an ID that was removed,
|
||||
* or an IOException exception describing why the ID could not be
|
||||
* removed.
|
||||
*/
|
||||
@AtMostOnce
|
||||
public List<Fallible<Long>> removePathCacheEntries(List<Long> ids)
|
||||
throws IOException;
|
||||
|
||||
/**
|
||||
* List cached paths on the server.
|
||||
*
|
||||
* @param prevId
|
||||
* The previous ID that we listed, or 0 if this is the first call
|
||||
* to listPathCacheEntries.
|
||||
* @param pool
|
||||
* The pool ID to list. If this is the empty string, all pool ids
|
||||
* will be listed.
|
||||
* @param maxRepliesPerRequest
|
||||
* The maximum number of replies to make in each request.
|
||||
* @return
|
||||
* A RemoteIterator from which you can get PathCacheEntry objects.
|
||||
* Requests will be made as needed.
|
||||
*/
|
||||
@Idempotent
|
||||
public RemoteIterator<PathCacheEntry> listPathCacheEntries(long prevId,
|
||||
String pool, int maxRepliesPerRequest) throws IOException;
|
||||
}
|
||||
|
||||
|
|
|
@ -0,0 +1,110 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hdfs.protocol;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.collect.ComparisonChain;
|
||||
|
||||
import org.apache.commons.lang.builder.HashCodeBuilder;
|
||||
import org.apache.hadoop.hdfs.DFSUtil;
|
||||
import org.apache.hadoop.hdfs.protocol.AddPathCacheDirectiveException.EmptyPathError;
|
||||
import org.apache.hadoop.hdfs.protocol.AddPathCacheDirectiveException.InvalidPoolNameError;
|
||||
import org.apache.hadoop.hdfs.protocol.AddPathCacheDirectiveException.InvalidPathNameError;
|
||||
|
||||
/**
|
||||
* A directive to add a path to a cache pool.
|
||||
*/
|
||||
public class PathCacheDirective implements Comparable<PathCacheDirective> {
|
||||
private final String path;
|
||||
|
||||
private final String pool;
|
||||
|
||||
public PathCacheDirective(String path, String pool) throws IOException {
|
||||
Preconditions.checkNotNull(path);
|
||||
Preconditions.checkNotNull(pool);
|
||||
this.path = path;
|
||||
this.pool = pool;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return The path used in this request.
|
||||
*/
|
||||
public String getPath() {
|
||||
return path;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return The pool used in this request.
|
||||
*/
|
||||
public String getPool() {
|
||||
return pool;
|
||||
}
|
||||
|
||||
/**
|
||||
* Check if this PathCacheDirective is valid.
|
||||
*
|
||||
* @throws IOException
|
||||
* If this PathCacheDirective is not valid.
|
||||
*/
|
||||
public void validate() throws IOException {
|
||||
if (path.isEmpty()) {
|
||||
throw new EmptyPathError(this);
|
||||
}
|
||||
if (DFSUtil.isValidName(path)) {
|
||||
throw new InvalidPathNameError(this);
|
||||
}
|
||||
|
||||
if (pool.isEmpty()) {
|
||||
throw new InvalidPoolNameError(this);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public int compareTo(PathCacheDirective rhs) {
|
||||
return ComparisonChain.start().
|
||||
compare(pool, rhs.getPool()).
|
||||
compare(path, rhs.getPath()).
|
||||
result();
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return new HashCodeBuilder().append(path).append(pool).hashCode();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object o) {
|
||||
try {
|
||||
PathCacheDirective other = (PathCacheDirective)o;
|
||||
return other.compareTo(this) == 0;
|
||||
} catch (ClassCastException e) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
StringBuilder builder = new StringBuilder();
|
||||
builder.append("{ path:").append(path).
|
||||
append(", pool:").append(pool).
|
||||
append(" }");
|
||||
return builder.toString();
|
||||
}
|
||||
};
|
|
@ -0,0 +1,75 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hdfs.protocol;
|
||||
|
||||
import org.apache.commons.lang.builder.EqualsBuilder;
|
||||
import org.apache.commons.lang.builder.HashCodeBuilder;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
|
||||
/**
|
||||
* An entry in the NameNode's path cache.
|
||||
*/
|
||||
public final class PathCacheEntry {
|
||||
private final long entryId;
|
||||
private final PathCacheDirective directive;
|
||||
|
||||
public PathCacheEntry(long entryId, PathCacheDirective directive) {
|
||||
Preconditions.checkArgument(entryId > 0);
|
||||
this.entryId = entryId;
|
||||
this.directive = directive;
|
||||
}
|
||||
|
||||
public long getEntryId() {
|
||||
return entryId;
|
||||
}
|
||||
|
||||
public PathCacheDirective getDirective() {
|
||||
return directive;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object o) {
|
||||
try {
|
||||
PathCacheEntry other = (PathCacheEntry)o;
|
||||
return new EqualsBuilder().
|
||||
append(this.entryId, other.entryId).
|
||||
append(this.directive, other.directive).
|
||||
isEquals();
|
||||
} catch (ClassCastException e) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return new HashCodeBuilder().
|
||||
append(entryId).
|
||||
append(directive).
|
||||
hashCode();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
StringBuilder builder = new StringBuilder();
|
||||
builder.append("{ entryId:").append(entryId).
|
||||
append(", directive:").append(directive.toString()).
|
||||
append(" }");
|
||||
return builder.toString();
|
||||
}
|
||||
};
|
|
@ -0,0 +1,68 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hdfs.protocol;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
|
||||
/**
|
||||
* An exception which occurred when trying to remove a path cache entry.
|
||||
*/
|
||||
public abstract class RemovePathCacheEntryException extends IOException {
|
||||
private static final long serialVersionUID = 1L;
|
||||
|
||||
private final long entryId;
|
||||
|
||||
public RemovePathCacheEntryException(String description, long entryId) {
|
||||
super(description);
|
||||
this.entryId = entryId;
|
||||
}
|
||||
|
||||
public long getEntryId() {
|
||||
return this.entryId;
|
||||
}
|
||||
|
||||
public final static class InvalidIdException
|
||||
extends RemovePathCacheEntryException {
|
||||
private static final long serialVersionUID = 1L;
|
||||
|
||||
public InvalidIdException(long entryId) {
|
||||
super("invalid cache path entry id " + entryId, entryId);
|
||||
}
|
||||
}
|
||||
|
||||
public final static class NoSuchIdException
|
||||
extends RemovePathCacheEntryException {
|
||||
private static final long serialVersionUID = 1L;
|
||||
|
||||
public NoSuchIdException(long entryId) {
|
||||
super("there is no path cache entry with id " + entryId, entryId);
|
||||
}
|
||||
}
|
||||
|
||||
public final static class UnexpectedRemovePathCacheEntryException
|
||||
extends RemovePathCacheEntryException {
|
||||
private static final long serialVersionUID = 1L;
|
||||
|
||||
public UnexpectedRemovePathCacheEntryException(long id) {
|
||||
super("encountered an unexpected error when trying to " +
|
||||
"remove path cache entry id " + id, id);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -18,6 +18,7 @@
|
|||
package org.apache.hadoop.hdfs.protocolPB;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
|
@ -25,6 +26,14 @@ import org.apache.hadoop.classification.InterfaceStability;
|
|||
import org.apache.hadoop.fs.ContentSummary;
|
||||
import org.apache.hadoop.fs.FsServerDefaults;
|
||||
import org.apache.hadoop.fs.Options.Rename;
|
||||
import org.apache.hadoop.fs.RemoteIterator;
|
||||
import org.apache.hadoop.hdfs.protocol.PathCacheDirective;
|
||||
import org.apache.hadoop.hdfs.protocol.PathCacheEntry;
|
||||
import org.apache.hadoop.hdfs.protocol.AddPathCacheDirectiveException.EmptyPathError;
|
||||
import org.apache.hadoop.hdfs.protocol.AddPathCacheDirectiveException.InvalidPathNameError;
|
||||
import org.apache.hadoop.hdfs.protocol.AddPathCacheDirectiveException.InvalidPoolNameError;
|
||||
import org.apache.hadoop.hdfs.protocol.RemovePathCacheEntryException.InvalidIdException;
|
||||
import org.apache.hadoop.hdfs.protocol.RemovePathCacheEntryException.NoSuchIdException;
|
||||
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
|
||||
import org.apache.hadoop.hdfs.protocol.CorruptFileBlocks;
|
||||
import org.apache.hadoop.hdfs.protocol.DirectoryListing;
|
||||
|
@ -37,6 +46,10 @@ import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.Abando
|
|||
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.AbandonBlockResponseProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.AddBlockRequestProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.AddBlockResponseProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.PathCacheDirectiveProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.AddPathCacheDirectiveErrorProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.AddPathCacheDirectivesRequestProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.AddPathCacheDirectivesResponseProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.AllowSnapshotRequestProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.AllowSnapshotResponseProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.AppendRequestProto;
|
||||
|
@ -92,6 +105,9 @@ import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetSna
|
|||
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetSnapshottableDirListingResponseProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.IsFileClosedRequestProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.IsFileClosedResponseProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListPathCacheEntriesElementProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListPathCacheEntriesRequestProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListPathCacheEntriesResponseProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListCorruptFileBlocksRequestProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListCorruptFileBlocksResponseProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.MetaSaveRequestProto;
|
||||
|
@ -102,6 +118,9 @@ import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.Recove
|
|||
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RecoverLeaseResponseProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RefreshNodesRequestProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RefreshNodesResponseProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RemovePathCacheEntryErrorProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RemovePathCacheEntriesRequestProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RemovePathCacheEntriesResponseProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.Rename2RequestProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.Rename2ResponseProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RenameRequestProto;
|
||||
|
@ -142,6 +161,7 @@ import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.LocatedBlockProto;
|
|||
import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
|
||||
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
|
||||
import org.apache.hadoop.hdfs.server.namenode.INodeId;
|
||||
import org.apache.hadoop.hdfs.server.namenode.UnsupportedActionException;
|
||||
import org.apache.hadoop.io.Text;
|
||||
import org.apache.hadoop.security.proto.SecurityProtos.CancelDelegationTokenRequestProto;
|
||||
import org.apache.hadoop.security.proto.SecurityProtos.CancelDelegationTokenResponseProto;
|
||||
|
@ -150,6 +170,7 @@ import org.apache.hadoop.security.proto.SecurityProtos.GetDelegationTokenRespons
|
|||
import org.apache.hadoop.security.proto.SecurityProtos.RenewDelegationTokenRequestProto;
|
||||
import org.apache.hadoop.security.proto.SecurityProtos.RenewDelegationTokenResponseProto;
|
||||
import org.apache.hadoop.security.token.Token;
|
||||
import org.apache.hadoop.util.Fallible;
|
||||
|
||||
import com.google.protobuf.RpcController;
|
||||
import com.google.protobuf.ServiceException;
|
||||
|
@ -1003,5 +1024,95 @@ public class ClientNamenodeProtocolServerSideTranslatorPB implements
|
|||
throw new ServiceException(e);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public AddPathCacheDirectivesResponseProto addPathCacheDirectives(RpcController controller,
|
||||
AddPathCacheDirectivesRequestProto request) throws ServiceException {
|
||||
try {
|
||||
ArrayList<PathCacheDirective> input =
|
||||
new ArrayList<PathCacheDirective>(request.getElementsCount());
|
||||
for (int i = 0; i < request.getElementsCount(); i++) {
|
||||
PathCacheDirectiveProto proto = request.getElements(i);
|
||||
input.add(new PathCacheDirective(proto.getPath(), proto.getPool()));
|
||||
}
|
||||
List<Fallible<PathCacheEntry>> output = server.addPathCacheDirectives(input);
|
||||
AddPathCacheDirectivesResponseProto.Builder builder =
|
||||
AddPathCacheDirectivesResponseProto.newBuilder();
|
||||
for (int idx = 0; idx < output.size(); idx++) {
|
||||
try {
|
||||
PathCacheEntry entry = output.get(idx).get();
|
||||
builder.addResults(entry.getEntryId());
|
||||
} catch (EmptyPathError ioe) {
|
||||
builder.addResults(AddPathCacheDirectiveErrorProto.
|
||||
EMPTY_PATH_ERROR_VALUE);
|
||||
} catch (InvalidPathNameError ioe) {
|
||||
builder.addResults(AddPathCacheDirectiveErrorProto.
|
||||
INVALID_PATH_NAME_ERROR_VALUE);
|
||||
} catch (InvalidPoolNameError ioe) {
|
||||
builder.addResults(AddPathCacheDirectiveErrorProto.
|
||||
INVALID_POOL_NAME_ERROR_VALUE);
|
||||
} catch (IOException ioe) {
|
||||
builder.addResults(AddPathCacheDirectiveErrorProto.
|
||||
UNEXPECTED_ADD_ERROR_VALUE);
|
||||
}
|
||||
}
|
||||
return builder.build();
|
||||
} catch (IOException e) {
|
||||
throw new ServiceException(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public RemovePathCacheEntriesResponseProto removePathCacheEntries(
|
||||
RpcController controller, RemovePathCacheEntriesRequestProto request)
|
||||
throws ServiceException {
|
||||
try {
|
||||
List<Fallible<Long>> output =
|
||||
server.removePathCacheEntries(request.getElementsList());
|
||||
RemovePathCacheEntriesResponseProto.Builder builder =
|
||||
RemovePathCacheEntriesResponseProto.newBuilder();
|
||||
for (int idx = 0; idx < output.size(); idx++) {
|
||||
try {
|
||||
long id = output.get(idx).get();
|
||||
builder.addResults(id);
|
||||
} catch (InvalidIdException ioe) {
|
||||
builder.addResults(RemovePathCacheEntryErrorProto.
|
||||
INVALID_CACHED_PATH_ID_ERROR_VALUE);
|
||||
} catch (NoSuchIdException ioe) {
|
||||
builder.addResults(RemovePathCacheEntryErrorProto.
|
||||
NO_SUCH_CACHED_PATH_ID_ERROR_VALUE);
|
||||
} catch (IOException ioe) {
|
||||
builder.addResults(RemovePathCacheEntryErrorProto.
|
||||
UNEXPECTED_REMOVE_ERROR_VALUE);
|
||||
}
|
||||
}
|
||||
return builder.build();
|
||||
} catch (IOException e) {
|
||||
throw new ServiceException(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public ListPathCacheEntriesResponseProto listPathCacheEntries(RpcController controller,
|
||||
ListPathCacheEntriesRequestProto request) throws ServiceException {
|
||||
try {
|
||||
RemoteIterator<PathCacheEntry> iter =
|
||||
server.listPathCacheEntries(request.getPrevId(),
|
||||
request.getPool(),
|
||||
request.getMaxReplies());
|
||||
ListPathCacheEntriesResponseProto.Builder builder =
|
||||
ListPathCacheEntriesResponseProto.newBuilder();
|
||||
while (iter.hasNext()) {
|
||||
PathCacheEntry entry = iter.next();
|
||||
builder.addElements(
|
||||
ListPathCacheEntriesElementProto.newBuilder().
|
||||
setId(entry.getEntryId()).
|
||||
setPath(entry.getDirective().getPath()).
|
||||
setPool(entry.getDirective().getPool()));
|
||||
}
|
||||
return builder.build();
|
||||
} catch (IOException e) {
|
||||
throw new ServiceException(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -20,7 +20,10 @@ package org.apache.hadoop.hdfs.protocolPB;
|
|||
import java.io.Closeable;
|
||||
import java.io.FileNotFoundException;
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.NoSuchElementException;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
|
@ -30,9 +33,19 @@ import org.apache.hadoop.fs.FileAlreadyExistsException;
|
|||
import org.apache.hadoop.fs.FsServerDefaults;
|
||||
import org.apache.hadoop.fs.Options.Rename;
|
||||
import org.apache.hadoop.fs.ParentNotDirectoryException;
|
||||
import org.apache.hadoop.fs.RemoteIterator;
|
||||
import org.apache.hadoop.fs.UnresolvedLinkException;
|
||||
import org.apache.hadoop.fs.permission.FsPermission;
|
||||
import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException;
|
||||
import org.apache.hadoop.hdfs.protocol.PathCacheDirective;
|
||||
import org.apache.hadoop.hdfs.protocol.PathCacheEntry;
|
||||
import org.apache.hadoop.hdfs.protocol.AddPathCacheDirectiveException.EmptyPathError;
|
||||
import org.apache.hadoop.hdfs.protocol.AddPathCacheDirectiveException.InvalidPathNameError;
|
||||
import org.apache.hadoop.hdfs.protocol.AddPathCacheDirectiveException.InvalidPoolNameError;
|
||||
import org.apache.hadoop.hdfs.protocol.AddPathCacheDirectiveException.UnexpectedAddPathCacheDirectiveException;
|
||||
import org.apache.hadoop.hdfs.protocol.RemovePathCacheEntryException.InvalidIdException;
|
||||
import org.apache.hadoop.hdfs.protocol.RemovePathCacheEntryException.NoSuchIdException;
|
||||
import org.apache.hadoop.hdfs.protocol.RemovePathCacheEntryException.UnexpectedRemovePathCacheEntryException;
|
||||
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
|
||||
import org.apache.hadoop.hdfs.protocol.CorruptFileBlocks;
|
||||
import org.apache.hadoop.hdfs.protocol.DSQuotaExceededException;
|
||||
|
@ -50,6 +63,10 @@ import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport;
|
|||
import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.AbandonBlockRequestProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.AddBlockRequestProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.PathCacheDirectiveProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.AddPathCacheDirectiveErrorProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.AddPathCacheDirectivesRequestProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.AddPathCacheDirectivesResponseProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.AllowSnapshotRequestProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.AppendRequestProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.AppendResponseProto;
|
||||
|
@ -87,11 +104,18 @@ import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetSna
|
|||
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetSnapshottableDirListingRequestProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetSnapshottableDirListingResponseProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.IsFileClosedRequestProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListPathCacheEntriesElementProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListPathCacheEntriesRequestProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListPathCacheEntriesRequestProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListPathCacheEntriesResponseProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListCorruptFileBlocksRequestProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.MetaSaveRequestProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.MkdirsRequestProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RecoverLeaseRequestProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RefreshNodesRequestProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RemovePathCacheEntriesRequestProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RemovePathCacheEntriesResponseProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RemovePathCacheEntryErrorProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.Rename2RequestProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RenameRequestProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RenameSnapshotRequestProto;
|
||||
|
@ -127,6 +151,7 @@ import org.apache.hadoop.security.proto.SecurityProtos.GetDelegationTokenRequest
|
|||
import org.apache.hadoop.security.proto.SecurityProtos.GetDelegationTokenResponseProto;
|
||||
import org.apache.hadoop.security.proto.SecurityProtos.RenewDelegationTokenRequestProto;
|
||||
import org.apache.hadoop.security.token.Token;
|
||||
import org.apache.hadoop.util.Fallible;
|
||||
|
||||
import com.google.protobuf.ByteString;
|
||||
import com.google.protobuf.ServiceException;
|
||||
|
@ -982,4 +1007,170 @@ public class ClientNamenodeProtocolTranslatorPB implements
|
|||
throw ProtobufHelper.getRemoteException(e);
|
||||
}
|
||||
}
|
||||
|
||||
private static IOException addPathCacheDirectivesError(long code,
|
||||
PathCacheDirective directive) {
|
||||
if (code == AddPathCacheDirectiveErrorProto.EMPTY_PATH_ERROR_VALUE) {
|
||||
return new EmptyPathError(directive);
|
||||
} else if (code == AddPathCacheDirectiveErrorProto.
|
||||
INVALID_PATH_NAME_ERROR_VALUE) {
|
||||
return new InvalidPathNameError(directive);
|
||||
} else if (code == AddPathCacheDirectiveErrorProto.
|
||||
INVALID_POOL_NAME_ERROR_VALUE) {
|
||||
return new InvalidPoolNameError(directive);
|
||||
} else {
|
||||
return new UnexpectedAddPathCacheDirectiveException(directive);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<Fallible<PathCacheEntry>> addPathCacheDirectives(
|
||||
List<PathCacheDirective> directives) throws IOException {
|
||||
try {
|
||||
AddPathCacheDirectivesRequestProto.Builder builder =
|
||||
AddPathCacheDirectivesRequestProto.newBuilder();
|
||||
for (PathCacheDirective directive : directives) {
|
||||
builder.addElements(PathCacheDirectiveProto.newBuilder().
|
||||
setPath(directive.getPath()).
|
||||
setPool(directive.getPool()).
|
||||
build());
|
||||
}
|
||||
AddPathCacheDirectivesResponseProto result =
|
||||
rpcProxy.addPathCacheDirectives(null, builder.build());
|
||||
int resultsCount = result.getResultsCount();
|
||||
ArrayList<Fallible<PathCacheEntry>> results =
|
||||
new ArrayList<Fallible<PathCacheEntry>>(resultsCount);
|
||||
for (int i = 0; i < resultsCount; i++) {
|
||||
PathCacheDirective directive = directives.get(i);
|
||||
long code = result.getResults(i);
|
||||
if (code > 0) {
|
||||
results.add(new Fallible<PathCacheEntry>(
|
||||
new PathCacheEntry(code, directive)));
|
||||
} else {
|
||||
results.add(new Fallible<PathCacheEntry>(
|
||||
addPathCacheDirectivesError(code, directive)));
|
||||
}
|
||||
}
|
||||
return results;
|
||||
} catch (ServiceException e) {
|
||||
throw ProtobufHelper.getRemoteException(e);
|
||||
}
|
||||
}
|
||||
|
||||
private static IOException removePathCacheEntriesError(long code, long id) {
|
||||
if (code == RemovePathCacheEntryErrorProto.
|
||||
INVALID_CACHED_PATH_ID_ERROR_VALUE) {
|
||||
return new InvalidIdException(id);
|
||||
} else if (code == RemovePathCacheEntryErrorProto.
|
||||
NO_SUCH_CACHED_PATH_ID_ERROR_VALUE) {
|
||||
return new NoSuchIdException(id);
|
||||
} else {
|
||||
return new UnexpectedRemovePathCacheEntryException(id);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<Fallible<Long>> removePathCacheEntries(List<Long> ids)
|
||||
throws IOException {
|
||||
try {
|
||||
RemovePathCacheEntriesRequestProto.Builder builder =
|
||||
RemovePathCacheEntriesRequestProto.newBuilder();
|
||||
for (Long id : ids) {
|
||||
builder.addElements(id);
|
||||
}
|
||||
RemovePathCacheEntriesResponseProto result =
|
||||
rpcProxy.removePathCacheEntries(null, builder.build());
|
||||
int resultsCount = result.getResultsCount();
|
||||
ArrayList<Fallible<Long>> results =
|
||||
new ArrayList<Fallible<Long>>(resultsCount);
|
||||
for (int i = 0; i < resultsCount; i++) {
|
||||
long code = result.getResults(i);
|
||||
if (code > 0) {
|
||||
results.add(new Fallible<Long>(code));
|
||||
} else {
|
||||
results.add(new Fallible<Long>(
|
||||
removePathCacheEntriesError(code, ids.get(i))));
|
||||
}
|
||||
}
|
||||
return results;
|
||||
} catch (ServiceException e) {
|
||||
throw ProtobufHelper.getRemoteException(e);
|
||||
}
|
||||
}
|
||||
|
||||
private class PathCacheEntriesIterator
|
||||
implements RemoteIterator<PathCacheEntry> {
|
||||
private long prevId;
|
||||
private final String pool;
|
||||
private final int repliesPerRequest;
|
||||
private ListPathCacheEntriesResponseProto response;
|
||||
private int idx;
|
||||
|
||||
public PathCacheEntriesIterator(long prevId, String pool,
|
||||
int repliesPerRequest) {
|
||||
this.prevId = prevId;
|
||||
this.pool = pool;
|
||||
this.repliesPerRequest = repliesPerRequest;
|
||||
this.response = null;
|
||||
this.idx = -1;
|
||||
}
|
||||
|
||||
private void makeRequest() throws IOException {
|
||||
idx = 0;
|
||||
response = null;
|
||||
try {
|
||||
ListPathCacheEntriesRequestProto req =
|
||||
ListPathCacheEntriesRequestProto.newBuilder().
|
||||
setPrevId(prevId).
|
||||
setPool(pool).
|
||||
setMaxReplies(repliesPerRequest).
|
||||
build();
|
||||
response = rpcProxy.listPathCacheEntries(null, req);
|
||||
if (response.getElementsCount() == 0) {
|
||||
response = null;
|
||||
}
|
||||
} catch (ServiceException e) {
|
||||
throw ProtobufHelper.getRemoteException(e);
|
||||
}
|
||||
}
|
||||
|
||||
private void makeRequestIfNeeded() throws IOException {
|
||||
if (idx == -1) {
|
||||
makeRequest();
|
||||
} else if ((response != null) && (idx >= response.getElementsCount())) {
|
||||
if (response.getHasMore()) {
|
||||
makeRequest();
|
||||
} else {
|
||||
response = null;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean hasNext() throws IOException {
|
||||
makeRequestIfNeeded();
|
||||
return (response != null);
|
||||
}
|
||||
|
||||
@Override
|
||||
public PathCacheEntry next() throws IOException {
|
||||
makeRequestIfNeeded();
|
||||
if (response == null) {
|
||||
throw new NoSuchElementException();
|
||||
}
|
||||
ListPathCacheEntriesElementProto elementProto =
|
||||
response.getElements(idx);
|
||||
prevId = elementProto.getId();
|
||||
idx++;
|
||||
return new PathCacheEntry(elementProto.getId(),
|
||||
new PathCacheDirective(elementProto.getPath(),
|
||||
elementProto.getPool()));
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public RemoteIterator<PathCacheEntry> listPathCacheEntries(long prevId,
|
||||
String pool, int repliesPerRequest) throws IOException {
|
||||
return new PathCacheEntriesIterator(prevId, pool, repliesPerRequest);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,165 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hdfs.server.namenode;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.SortedMap;
|
||||
import java.util.TreeMap;
|
||||
import java.util.Map.Entry;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hdfs.protocol.PathCacheDirective;
|
||||
import org.apache.hadoop.hdfs.protocol.PathCacheEntry;
|
||||
import org.apache.hadoop.hdfs.protocol.AddPathCacheDirectiveException.UnexpectedAddPathCacheDirectiveException;
|
||||
import org.apache.hadoop.hdfs.protocol.RemovePathCacheEntryException.InvalidIdException;
|
||||
import org.apache.hadoop.hdfs.protocol.RemovePathCacheEntryException.NoSuchIdException;
|
||||
import org.apache.hadoop.hdfs.protocol.RemovePathCacheEntryException.UnexpectedRemovePathCacheEntryException;
|
||||
import org.apache.hadoop.util.Fallible;
|
||||
|
||||
/**
|
||||
* The Cache Manager handles caching on DataNodes.
|
||||
*/
|
||||
final class CacheManager {
|
||||
public static final Log LOG = LogFactory.getLog(CacheManager.class);
|
||||
|
||||
/**
|
||||
* Cache entries, sorted by ID.
|
||||
*
|
||||
* listPathCacheEntries relies on the ordering of elements in this map
|
||||
* to track what has already been listed by the client.
|
||||
*/
|
||||
private final TreeMap<Long, PathCacheEntry> entriesById =
|
||||
new TreeMap<Long, PathCacheEntry>();
|
||||
|
||||
/**
|
||||
* Cache entries, sorted by directive.
|
||||
*/
|
||||
private final TreeMap<PathCacheDirective, PathCacheEntry> entriesByDirective =
|
||||
new TreeMap<PathCacheDirective, PathCacheEntry>();
|
||||
|
||||
/**
|
||||
* The entry ID to use for a new entry.
|
||||
*/
|
||||
private long nextEntryId;
|
||||
|
||||
CacheManager(FSDirectory dir, Configuration conf) {
|
||||
// TODO: support loading and storing of the CacheManager state
|
||||
clear();
|
||||
}
|
||||
|
||||
synchronized void clear() {
|
||||
entriesById.clear();
|
||||
entriesByDirective.clear();
|
||||
nextEntryId = 1;
|
||||
}
|
||||
|
||||
synchronized long getNextEntryId() throws IOException {
|
||||
if (nextEntryId == Long.MAX_VALUE) {
|
||||
throw new IOException("no more available IDs");
|
||||
}
|
||||
return nextEntryId++;
|
||||
}
|
||||
|
||||
private synchronized Fallible<PathCacheEntry> addDirective(
|
||||
PathCacheDirective directive) {
|
||||
try {
|
||||
directive.validate();
|
||||
} catch (IOException ioe) {
|
||||
return new Fallible<PathCacheEntry>(ioe);
|
||||
}
|
||||
// Check if we already have this entry.
|
||||
PathCacheEntry existing = entriesByDirective.get(directive);
|
||||
if (existing != null) {
|
||||
// Entry already exists: return existing entry.
|
||||
return new Fallible<PathCacheEntry>(existing);
|
||||
}
|
||||
// Add a new entry with the next available ID.
|
||||
PathCacheEntry entry;
|
||||
try {
|
||||
entry = new PathCacheEntry(getNextEntryId(), directive);
|
||||
} catch (IOException ioe) {
|
||||
return new Fallible<PathCacheEntry>(
|
||||
new UnexpectedAddPathCacheDirectiveException(directive));
|
||||
}
|
||||
entriesByDirective.put(directive, entry);
|
||||
entriesById.put(entry.getEntryId(), entry);
|
||||
return new Fallible<PathCacheEntry>(entry);
|
||||
}
|
||||
|
||||
public synchronized List<Fallible<PathCacheEntry>> addDirectives(
|
||||
List<PathCacheDirective> directives) {
|
||||
ArrayList<Fallible<PathCacheEntry>> results =
|
||||
new ArrayList<Fallible<PathCacheEntry>>(directives.size());
|
||||
for (PathCacheDirective directive: directives) {
|
||||
results.add(addDirective(directive));
|
||||
}
|
||||
return results;
|
||||
}
|
||||
|
||||
private synchronized Fallible<Long> removeEntry(long entryId) {
|
||||
// Check for invalid IDs.
|
||||
if (entryId <= 0) {
|
||||
return new Fallible<Long>(new InvalidIdException(entryId));
|
||||
}
|
||||
// Find the entry.
|
||||
PathCacheEntry existing = entriesById.get(entryId);
|
||||
if (existing == null) {
|
||||
return new Fallible<Long>(new NoSuchIdException(entryId));
|
||||
}
|
||||
// Remove the corresponding entry in entriesByDirective.
|
||||
if (entriesByDirective.remove(existing.getDirective()) == null) {
|
||||
return new Fallible<Long>(
|
||||
new UnexpectedRemovePathCacheEntryException(entryId));
|
||||
}
|
||||
entriesById.remove(entryId);
|
||||
return new Fallible<Long>(entryId);
|
||||
}
|
||||
|
||||
public synchronized List<Fallible<Long>> removeEntries(List<Long> entryIds) {
|
||||
ArrayList<Fallible<Long>> results =
|
||||
new ArrayList<Fallible<Long>>(entryIds.size());
|
||||
for (Long entryId : entryIds) {
|
||||
results.add(removeEntry(entryId));
|
||||
}
|
||||
return results;
|
||||
}
|
||||
|
||||
public synchronized List<PathCacheEntry> listPathCacheEntries(long prevId,
|
||||
String pool, int maxReplies) {
|
||||
final int MAX_PRE_ALLOCATED_ENTRIES = 16;
|
||||
ArrayList<PathCacheEntry> replies =
|
||||
new ArrayList<PathCacheEntry>(Math.min(MAX_PRE_ALLOCATED_ENTRIES, maxReplies));
|
||||
int numReplies = 0;
|
||||
SortedMap<Long, PathCacheEntry> tailMap = entriesById.tailMap(prevId + 1);
|
||||
for (Entry<Long, PathCacheEntry> cur : tailMap.entrySet()) {
|
||||
if (numReplies >= maxReplies) {
|
||||
return replies;
|
||||
}
|
||||
if (pool.isEmpty() || cur.getValue().getDirective().
|
||||
getPool().equals(pool)) {
|
||||
replies.add(cur.getValue());
|
||||
numReplies++;
|
||||
}
|
||||
}
|
||||
return replies;
|
||||
}
|
||||
}
|
|
@ -129,6 +129,7 @@ import org.apache.hadoop.fs.Options;
|
|||
import org.apache.hadoop.fs.Options.Rename;
|
||||
import org.apache.hadoop.fs.ParentNotDirectoryException;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.RemoteIterator;
|
||||
import org.apache.hadoop.fs.UnresolvedLinkException;
|
||||
import org.apache.hadoop.fs.permission.FsAction;
|
||||
import org.apache.hadoop.fs.permission.FsPermission;
|
||||
|
@ -141,6 +142,8 @@ import org.apache.hadoop.hdfs.HAUtil;
|
|||
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||
import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException;
|
||||
import org.apache.hadoop.hdfs.protocol.Block;
|
||||
import org.apache.hadoop.hdfs.protocol.PathCacheDirective;
|
||||
import org.apache.hadoop.hdfs.protocol.PathCacheEntry;
|
||||
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
|
||||
import org.apache.hadoop.hdfs.protocol.DatanodeID;
|
||||
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
||||
|
@ -223,6 +226,7 @@ import org.apache.hadoop.security.token.Token;
|
|||
import org.apache.hadoop.security.token.delegation.DelegationKey;
|
||||
import org.apache.hadoop.util.Daemon;
|
||||
import org.apache.hadoop.util.DataChecksum;
|
||||
import org.apache.hadoop.util.Fallible;
|
||||
import org.apache.hadoop.util.StringUtils;
|
||||
import org.apache.hadoop.util.Time;
|
||||
import org.apache.hadoop.util.VersionInfo;
|
||||
|
@ -360,6 +364,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
|
|||
FSDirectory dir;
|
||||
private final BlockManager blockManager;
|
||||
private final SnapshotManager snapshotManager;
|
||||
private final CacheManager cacheManager;
|
||||
private final DatanodeStatistics datanodeStatistics;
|
||||
|
||||
// Block pool ID used by this namenode
|
||||
|
@ -687,6 +692,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
|
|||
this.dtSecretManager = createDelegationTokenSecretManager(conf);
|
||||
this.dir = new FSDirectory(fsImage, this, conf);
|
||||
this.snapshotManager = new SnapshotManager(dir);
|
||||
this.cacheManager= new CacheManager(dir, conf);
|
||||
this.safeMode = new SafeModeInfo(conf);
|
||||
this.auditLoggers = initAuditLoggers(conf);
|
||||
this.isDefaultAuditLogger = auditLoggers.size() == 1 &&
|
||||
|
@ -6741,6 +6747,20 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
|
|||
}
|
||||
}
|
||||
|
||||
List<Fallible<PathCacheEntry>> addPathCacheDirectives(
|
||||
List<PathCacheDirective> directives) {
|
||||
return cacheManager.addDirectives(directives);
|
||||
}
|
||||
|
||||
List<Fallible<Long>> removePathCacheEntries(List<Long> ids) {
|
||||
return cacheManager.removeEntries(ids);
|
||||
}
|
||||
|
||||
List<PathCacheEntry> listPathCacheEntries(long startId, String pool,
|
||||
int maxReplies) {
|
||||
return cacheManager.listPathCacheEntries(startId, pool, maxReplies);
|
||||
}
|
||||
|
||||
/**
|
||||
* Default AuditLogger implementation; used when no access logger is
|
||||
* defined in the config file. It can also be explicitly listed in the
|
||||
|
@ -6777,7 +6797,9 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
|
|||
auditLog.info(sb);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
public CacheManager getCacheManager() {
|
||||
return cacheManager;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -31,6 +31,7 @@ import java.util.Arrays;
|
|||
import java.util.Collection;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.NoSuchElementException;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.hadoop.HadoopIllegalArgumentException;
|
||||
|
@ -44,6 +45,7 @@ import org.apache.hadoop.fs.InvalidPathException;
|
|||
import org.apache.hadoop.fs.Options;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.ParentNotDirectoryException;
|
||||
import org.apache.hadoop.fs.RemoteIterator;
|
||||
import org.apache.hadoop.fs.UnresolvedLinkException;
|
||||
import org.apache.hadoop.fs.permission.FsPermission;
|
||||
import org.apache.hadoop.fs.permission.PermissionStatus;
|
||||
|
@ -58,6 +60,8 @@ import org.apache.hadoop.hdfs.DFSUtil;
|
|||
import org.apache.hadoop.hdfs.HDFSPolicyProvider;
|
||||
import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException;
|
||||
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
|
||||
import org.apache.hadoop.hdfs.protocol.PathCacheDirective;
|
||||
import org.apache.hadoop.hdfs.protocol.PathCacheEntry;
|
||||
import org.apache.hadoop.hdfs.protocol.CorruptFileBlocks;
|
||||
import org.apache.hadoop.hdfs.protocol.DatanodeID;
|
||||
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
||||
|
@ -133,6 +137,7 @@ import org.apache.hadoop.security.token.Token;
|
|||
import org.apache.hadoop.tools.proto.GetUserMappingsProtocolProtos.GetUserMappingsProtocolService;
|
||||
import org.apache.hadoop.tools.protocolPB.GetUserMappingsProtocolPB;
|
||||
import org.apache.hadoop.tools.protocolPB.GetUserMappingsProtocolServerSideTranslatorPB;
|
||||
import org.apache.hadoop.util.Fallible;
|
||||
import org.apache.hadoop.util.VersionInfo;
|
||||
import org.apache.hadoop.util.VersionUtil;
|
||||
|
||||
|
@ -1200,4 +1205,81 @@ class NameNodeRpcServer implements NamenodeProtocols {
|
|||
metrics.incrSnapshotDiffReportOps();
|
||||
return report;
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<Fallible<PathCacheEntry>> addPathCacheDirectives(
|
||||
List<PathCacheDirective> paths) throws IOException {
|
||||
return namesystem.addPathCacheDirectives(paths);
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<Fallible<Long>> removePathCacheEntries(List<Long> ids)
|
||||
throws IOException {
|
||||
return namesystem.removePathCacheEntries(ids);
|
||||
}
|
||||
|
||||
private class PathCacheEntriesIterator
|
||||
implements RemoteIterator<PathCacheEntry> {
|
||||
private long prevId;
|
||||
private final String pool;
|
||||
private final int repliesPerRequest;
|
||||
private List<PathCacheEntry> entries;
|
||||
private int idx;
|
||||
|
||||
public PathCacheEntriesIterator(long prevId, String pool,
|
||||
int repliesPerRequest) {
|
||||
this.prevId = prevId;
|
||||
this.pool = pool;
|
||||
this.repliesPerRequest = repliesPerRequest;
|
||||
this.entries = null;
|
||||
this.idx = -1;
|
||||
}
|
||||
|
||||
private void makeRequest() throws IOException {
|
||||
idx = 0;
|
||||
entries = null;
|
||||
entries = namesystem.listPathCacheEntries(prevId, pool,
|
||||
repliesPerRequest);
|
||||
if (entries.isEmpty()) {
|
||||
entries = null;
|
||||
}
|
||||
}
|
||||
|
||||
private void makeRequestIfNeeded() throws IOException {
|
||||
if (idx == -1) {
|
||||
makeRequest();
|
||||
} else if ((entries != null) && (idx >= entries.size())) {
|
||||
if (entries.size() < repliesPerRequest) {
|
||||
// Last time, we got fewer entries than requested.
|
||||
// So we should be at the end.
|
||||
entries = null;
|
||||
} else {
|
||||
makeRequest();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean hasNext() throws IOException {
|
||||
makeRequestIfNeeded();
|
||||
return (entries != null);
|
||||
}
|
||||
|
||||
@Override
|
||||
public PathCacheEntry next() throws IOException {
|
||||
makeRequestIfNeeded();
|
||||
if (entries == null) {
|
||||
throw new NoSuchElementException();
|
||||
}
|
||||
PathCacheEntry entry = entries.get(idx++);
|
||||
prevId = entry.getEntryId();
|
||||
return entry;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public RemoteIterator<PathCacheEntry> listPathCacheEntries(long prevId, String pool,
|
||||
int maxReplies) throws IOException {
|
||||
return new PathCacheEntriesIterator(prevId, pool, maxReplies);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -363,6 +363,57 @@ message IsFileClosedResponseProto {
|
|||
required bool result = 1;
|
||||
}
|
||||
|
||||
message PathCacheDirectiveProto {
|
||||
required string path = 1;
|
||||
required string pool = 2;
|
||||
}
|
||||
|
||||
message AddPathCacheDirectivesRequestProto {
|
||||
repeated PathCacheDirectiveProto elements = 1;
|
||||
}
|
||||
|
||||
message AddPathCacheDirectivesResponseProto {
|
||||
repeated int64 results = 1 [packed=true];
|
||||
}
|
||||
|
||||
enum AddPathCacheDirectiveErrorProto {
|
||||
EMPTY_PATH_ERROR = -1;
|
||||
INVALID_PATH_NAME_ERROR = -2;
|
||||
INVALID_POOL_NAME_ERROR = -3;
|
||||
UNEXPECTED_ADD_ERROR = -4;
|
||||
}
|
||||
|
||||
message RemovePathCacheEntriesRequestProto {
|
||||
repeated int64 elements = 1 [packed=true];
|
||||
}
|
||||
|
||||
message RemovePathCacheEntriesResponseProto {
|
||||
repeated int64 results = 1 [packed=true];
|
||||
}
|
||||
|
||||
enum RemovePathCacheEntryErrorProto {
|
||||
INVALID_CACHED_PATH_ID_ERROR = -1;
|
||||
NO_SUCH_CACHED_PATH_ID_ERROR = -2;
|
||||
UNEXPECTED_REMOVE_ERROR = -3;
|
||||
}
|
||||
|
||||
message ListPathCacheEntriesRequestProto {
|
||||
required int64 prevId = 1;
|
||||
required string pool = 2;
|
||||
optional int32 maxReplies = 3;
|
||||
}
|
||||
|
||||
message ListPathCacheEntriesElementProto {
|
||||
required int64 id = 1;
|
||||
required string path = 2;
|
||||
required string pool = 3;
|
||||
}
|
||||
|
||||
message ListPathCacheEntriesResponseProto {
|
||||
repeated ListPathCacheEntriesElementProto elements = 1;
|
||||
required bool hasMore = 2;
|
||||
}
|
||||
|
||||
message GetFileLinkInfoRequestProto {
|
||||
required string src = 1;
|
||||
}
|
||||
|
@ -544,6 +595,12 @@ service ClientNamenodeProtocol {
|
|||
returns(ListCorruptFileBlocksResponseProto);
|
||||
rpc metaSave(MetaSaveRequestProto) returns(MetaSaveResponseProto);
|
||||
rpc getFileInfo(GetFileInfoRequestProto) returns(GetFileInfoResponseProto);
|
||||
rpc addPathCacheDirectives(AddPathCacheDirectivesRequestProto)
|
||||
returns (AddPathCacheDirectivesResponseProto);
|
||||
rpc removePathCacheEntries(RemovePathCacheEntriesRequestProto)
|
||||
returns (RemovePathCacheEntriesResponseProto);
|
||||
rpc listPathCacheEntries(ListPathCacheEntriesRequestProto)
|
||||
returns (ListPathCacheEntriesResponseProto);
|
||||
rpc getFileLinkInfo(GetFileLinkInfoRequestProto)
|
||||
returns(GetFileLinkInfoResponseProto);
|
||||
rpc getContentSummary(GetContentSummaryRequestProto)
|
||||
|
|
|
@ -0,0 +1,150 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hdfs.server.namenode;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
|
||||
import junit.framework.Assert;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.RemoteIterator;
|
||||
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||
import org.apache.hadoop.hdfs.protocol.AddPathCacheDirectiveException.EmptyPathError;
|
||||
import org.apache.hadoop.hdfs.protocol.AddPathCacheDirectiveException.InvalidPoolNameError;
|
||||
import org.apache.hadoop.hdfs.protocol.AddPathCacheDirectiveException.InvalidPathNameError;
|
||||
import org.apache.hadoop.hdfs.protocol.RemovePathCacheEntryException.InvalidIdException;
|
||||
import org.apache.hadoop.hdfs.protocol.PathCacheDirective;
|
||||
import org.apache.hadoop.hdfs.protocol.PathCacheEntry;
|
||||
import org.apache.hadoop.hdfs.protocol.RemovePathCacheEntryException.NoSuchIdException;
|
||||
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
|
||||
import org.apache.hadoop.util.Fallible;
|
||||
import org.junit.Test;
|
||||
|
||||
public class TestPathCacheRequests {
|
||||
static final Log LOG = LogFactory.getLog(TestPathCacheRequests.class);
|
||||
|
||||
private static void validateListAll(
|
||||
RemoteIterator<PathCacheEntry> iter,
|
||||
long id0, long id1, long id2) throws Exception {
|
||||
Assert.assertEquals(new PathCacheEntry(id0,
|
||||
new PathCacheDirective("/alpha", "pool1")),
|
||||
iter.next());
|
||||
Assert.assertEquals(new PathCacheEntry(id1,
|
||||
new PathCacheDirective("/beta", "pool2")),
|
||||
iter.next());
|
||||
Assert.assertEquals(new PathCacheEntry(id2,
|
||||
new PathCacheDirective("/gamma", "pool1")),
|
||||
iter.next());
|
||||
Assert.assertFalse(iter.hasNext());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSetAndGet() throws Exception {
|
||||
Configuration conf = new HdfsConfiguration();
|
||||
MiniDFSCluster cluster = null;
|
||||
|
||||
try {
|
||||
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
|
||||
cluster.waitActive();
|
||||
NamenodeProtocols proto = cluster.getNameNodeRpc();
|
||||
List<Fallible<PathCacheEntry>> addResults1 =
|
||||
proto.addPathCacheDirectives(Arrays.asList(
|
||||
new PathCacheDirective[] {
|
||||
new PathCacheDirective("/alpha", "pool1"),
|
||||
new PathCacheDirective("/beta", "pool2"),
|
||||
new PathCacheDirective("", "pool3")
|
||||
}));
|
||||
long ids1[] = new long[2];
|
||||
ids1[0] = addResults1.get(0).get().getEntryId();
|
||||
ids1[1] = addResults1.get(1).get().getEntryId();
|
||||
try {
|
||||
addResults1.get(2).get();
|
||||
Assert.fail("expected an error when adding an empty path");
|
||||
} catch (IOException ioe) {
|
||||
Assert.assertTrue(ioe.getCause() instanceof EmptyPathError);
|
||||
}
|
||||
|
||||
List<Fallible<PathCacheEntry>> addResults2 =
|
||||
proto.addPathCacheDirectives(Arrays.asList(
|
||||
new PathCacheDirective[] {
|
||||
new PathCacheDirective("/alpha", "pool1"),
|
||||
new PathCacheDirective("/theta", ""),
|
||||
new PathCacheDirective("bogus", "pool1"),
|
||||
new PathCacheDirective("/gamma", "pool1")
|
||||
}));
|
||||
long id = addResults2.get(0).get().getEntryId();
|
||||
Assert.assertEquals("expected to get back the same ID as last time " +
|
||||
"when re-adding an existing path cache directive.", ids1[0], id);
|
||||
try {
|
||||
addResults2.get(1).get();
|
||||
Assert.fail("expected an error when adding a path cache " +
|
||||
"directive with an empty pool name.");
|
||||
} catch (IOException ioe) {
|
||||
Assert.assertTrue(ioe.getCause() instanceof InvalidPoolNameError);
|
||||
}
|
||||
try {
|
||||
addResults2.get(2).get();
|
||||
Assert.fail("expected an error when adding a path cache " +
|
||||
"directive with a non-absolute path name.");
|
||||
} catch (IOException ioe) {
|
||||
Assert.assertTrue(ioe.getCause() instanceof InvalidPathNameError);
|
||||
}
|
||||
long ids2[] = new long[1];
|
||||
ids2[0] = addResults2.get(3).get().getEntryId();
|
||||
|
||||
RemoteIterator<PathCacheEntry> iter =
|
||||
proto.listPathCacheEntries(0, "", 100);
|
||||
validateListAll(iter, ids1[0], ids1[1], ids2[0]);
|
||||
iter = proto.listPathCacheEntries(0, "", 1);
|
||||
validateListAll(iter, ids1[0], ids1[1], ids2[0]);
|
||||
iter = proto.listPathCacheEntries(0, "pool3", 1);
|
||||
Assert.assertFalse(iter.hasNext());
|
||||
iter = proto.listPathCacheEntries(0, "pool2", 4444);
|
||||
Assert.assertEquals(addResults1.get(1).get(),
|
||||
iter.next());
|
||||
Assert.assertFalse(iter.hasNext());
|
||||
|
||||
List<Fallible<Long>> removeResults1 =
|
||||
proto.removePathCacheEntries(Arrays.asList(
|
||||
new Long[] { ids1[1], -42L, 999999L }));
|
||||
Assert.assertEquals(Long.valueOf(ids1[1]),
|
||||
removeResults1.get(0).get());
|
||||
try {
|
||||
removeResults1.get(1).get();
|
||||
Assert.fail("expected an error when removing a negative ID");
|
||||
} catch (IOException ioe) {
|
||||
Assert.assertTrue(ioe.getCause() instanceof InvalidIdException);
|
||||
}
|
||||
try {
|
||||
removeResults1.get(2).get();
|
||||
Assert.fail("expected an error when removing a nonexistent ID");
|
||||
} catch (IOException ioe) {
|
||||
Assert.assertTrue(ioe.getCause() instanceof NoSuchIdException);
|
||||
}
|
||||
iter = proto.listPathCacheEntries(0, "pool2", 4444);
|
||||
Assert.assertFalse(iter.hasNext());
|
||||
} finally {
|
||||
if (cluster != null) { cluster.shutdown(); }
|
||||
}
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue