HDFS-5236. Change PathBasedCacheDirective APIs to be a single value rather than batch. (Contributed by Andrew Wang)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-4949@1525183 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Andrew Wang 2013-09-21 00:20:36 +00:00
parent aae86e4f3f
commit a0d9a155a4
17 changed files with 652 additions and 638 deletions

View File

@ -42,6 +42,9 @@ HDFS-4949 (Unreleased)
HDFS-5213. Separate PathBasedCacheEntry and PathBasedCacheDirectiveWithId. HDFS-5213. Separate PathBasedCacheEntry and PathBasedCacheDirectiveWithId.
(Contributed by Colin Patrick McCabe) (Contributed by Colin Patrick McCabe)
HDFS-5236. Change PathBasedCacheDirective APIs to be a single value
rather than batch. (Contributed by Andrew Wang)
OPTIMIZATIONS OPTIMIZATIONS
BUG FIXES BUG FIXES

View File

@ -98,6 +98,7 @@ import org.apache.hadoop.fs.MD5MD5CRC32CastagnoliFileChecksum;
import org.apache.hadoop.fs.MD5MD5CRC32FileChecksum; import org.apache.hadoop.fs.MD5MD5CRC32FileChecksum;
import org.apache.hadoop.fs.MD5MD5CRC32GzipFileChecksum; import org.apache.hadoop.fs.MD5MD5CRC32GzipFileChecksum;
import org.apache.hadoop.fs.Options; import org.apache.hadoop.fs.Options;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.fs.Options.ChecksumOpt; import org.apache.hadoop.fs.Options.ChecksumOpt;
import org.apache.hadoop.fs.ParentNotDirectoryException; import org.apache.hadoop.fs.ParentNotDirectoryException;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
@ -107,6 +108,7 @@ import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.client.ClientMmapManager; import org.apache.hadoop.hdfs.client.ClientMmapManager;
import org.apache.hadoop.hdfs.client.HdfsDataInputStream; import org.apache.hadoop.hdfs.client.HdfsDataInputStream;
import org.apache.hadoop.hdfs.client.HdfsDataOutputStream; import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
import org.apache.hadoop.hdfs.protocol.ClientProtocol; import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.protocol.CorruptFileBlocks; import org.apache.hadoop.hdfs.protocol.CorruptFileBlocks;
import org.apache.hadoop.hdfs.protocol.DSQuotaExceededException; import org.apache.hadoop.hdfs.protocol.DSQuotaExceededException;
@ -115,6 +117,8 @@ import org.apache.hadoop.hdfs.protocol.DirectoryListing;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsBlocksMetadata; import org.apache.hadoop.hdfs.protocol.HdfsBlocksMetadata;
import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.PathBasedCacheDescriptor;
import org.apache.hadoop.hdfs.protocol.PathBasedCacheDirective;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType; import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction; import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
@ -2279,7 +2283,73 @@ public class DFSClient implements java.io.Closeable {
throw re.unwrapRemoteException(); throw re.unwrapRemoteException();
} }
} }
public PathBasedCacheDescriptor addPathBasedCacheDirective(
PathBasedCacheDirective directive) throws IOException {
checkOpen();
try {
return namenode.addPathBasedCacheDirective(directive);
} catch (RemoteException re) {
throw re.unwrapRemoteException();
}
}
public void removePathBasedCacheDescriptor(PathBasedCacheDescriptor descriptor)
throws IOException {
checkOpen();
try {
namenode.removePathBasedCacheDescriptor(descriptor.getEntryId());
} catch (RemoteException re) {
throw re.unwrapRemoteException();
}
}
public RemoteIterator<PathBasedCacheDescriptor> listPathBasedCacheDescriptors(
String pool, String path) throws IOException {
checkOpen();
try {
return namenode.listPathBasedCacheDescriptors(0, pool, path);
} catch (RemoteException re) {
throw re.unwrapRemoteException();
}
}
public void addCachePool(CachePoolInfo info) throws IOException {
checkOpen();
try {
namenode.addCachePool(info);
} catch (RemoteException re) {
throw re.unwrapRemoteException();
}
}
public void modifyCachePool(CachePoolInfo info) throws IOException {
checkOpen();
try {
namenode.modifyCachePool(info);
} catch (RemoteException re) {
throw re.unwrapRemoteException();
}
}
public void removeCachePool(String poolName) throws IOException {
checkOpen();
try {
namenode.removeCachePool(poolName);
} catch (RemoteException re) {
throw re.unwrapRemoteException();
}
}
public RemoteIterator<CachePoolInfo> listCachePools() throws IOException {
checkOpen();
try {
return namenode.listCachePools("");
} catch (RemoteException re) {
throw re.unwrapRemoteException();
}
}
/** /**
* Save namespace image. * Save namespace image.
* *

View File

@ -79,7 +79,6 @@ import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.AccessControlException; import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.token.SecretManager.InvalidToken; import org.apache.hadoop.security.token.SecretManager.InvalidToken;
import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.Fallible;
import org.apache.hadoop.util.Progressable; import org.apache.hadoop.util.Progressable;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
@ -1584,29 +1583,26 @@ public class DistributedFileSystem extends FileSystem {
} }
/** /**
* Add some PathBasedCache directives. * Add a new PathBasedCacheDirective.
* *
* @param directives A list of PathBasedCache directives to be added. * @param directive A PathBasedCacheDirectives to add
* @return A Fallible list, where each element is either a successfully addded * @return PathBasedCacheDescriptor associated with the added directive
* PathBasedCache entry, or an IOException describing why the directive * @throws IOException if the directive could not be added
* could not be added.
*/ */
public List<Fallible<PathBasedCacheDescriptor>> public PathBasedCacheDescriptor addPathBasedCacheDirective(
addPathBasedCacheDirective(List<PathBasedCacheDirective> directives) PathBasedCacheDirective directive) throws IOException {
throws IOException { return dfs.addPathBasedCacheDirective(directive);
return dfs.namenode.addPathBasedCacheDirectives(directives);
} }
/** /**
* Remove some PathBasedCache entries. * Remove a PathBasedCacheDescriptor.
* *
* @param ids A list of all the entry IDs to be removed. * @param descriptor PathBasedCacheDescriptor to remove
* @return A Fallible list where each element is either a successfully removed * @throws IOException if the descriptor could not be removed
* ID, or an IOException describing why the ID could not be removed.
*/ */
public List<Fallible<Long>> public void removePathBasedCacheDescriptor(PathBasedCacheDescriptor descriptor)
removePathBasedCacheDescriptors(List<Long> ids) throws IOException { throws IOException {
return dfs.namenode.removePathBasedCacheDescriptors(ids); dfs.removePathBasedCacheDescriptor(descriptor);
} }
/** /**
@ -1619,43 +1615,46 @@ public class DistributedFileSystem extends FileSystem {
*/ */
public RemoteIterator<PathBasedCacheDescriptor> listPathBasedCacheDescriptors( public RemoteIterator<PathBasedCacheDescriptor> listPathBasedCacheDescriptors(
String pool, String path) throws IOException { String pool, String path) throws IOException {
return dfs.namenode.listPathBasedCacheDescriptors(0, pool, path); return dfs.listPathBasedCacheDescriptors(pool, path);
} }
/** /**
* Add a cache pool. * Add a cache pool.
* *
* @param req * @param info
* The request to add a cache pool. * The request to add a cache pool.
* @throws IOException * @throws IOException
* If the request could not be completed. * If the request could not be completed.
*/ */
public void addCachePool(CachePoolInfo info) throws IOException { public void addCachePool(CachePoolInfo info) throws IOException {
dfs.namenode.addCachePool(info); CachePoolInfo.validate(info);
dfs.addCachePool(info);
} }
/** /**
* Modify an existing cache pool. * Modify an existing cache pool.
* *
* @param req * @param info
* The request to modify a cache pool. * The request to modify a cache pool.
* @throws IOException * @throws IOException
* If the request could not be completed. * If the request could not be completed.
*/ */
public void modifyCachePool(CachePoolInfo info) throws IOException { public void modifyCachePool(CachePoolInfo info) throws IOException {
dfs.namenode.modifyCachePool(info); CachePoolInfo.validate(info);
dfs.modifyCachePool(info);
} }
/** /**
* Remove a cache pool. * Remove a cache pool.
* *
* @param cachePoolName * @param poolName
* Name of the cache pool to remove. * Name of the cache pool to remove.
* @throws IOException * @throws IOException
* if the cache pool did not exist, or could not be removed. * if the cache pool did not exist, or could not be removed.
*/ */
public void removeCachePool(String name) throws IOException { public void removeCachePool(String poolName) throws IOException {
dfs.namenode.removeCachePool(name); CachePoolInfo.validateName(poolName);
dfs.removeCachePool(poolName);
} }
/** /**
@ -1667,6 +1666,6 @@ public class DistributedFileSystem extends FileSystem {
* If there was an error listing cache pools. * If there was an error listing cache pools.
*/ */
public RemoteIterator<CachePoolInfo> listCachePools() throws IOException { public RemoteIterator<CachePoolInfo> listCachePools() throws IOException {
return dfs.namenode.listCachePools(""); return dfs.listCachePools();
} }
} }

View File

@ -25,24 +25,20 @@ import java.io.IOException;
public abstract class AddPathBasedCacheDirectiveException extends IOException { public abstract class AddPathBasedCacheDirectiveException extends IOException {
private static final long serialVersionUID = 1L; private static final long serialVersionUID = 1L;
private final PathBasedCacheDirective directive; public AddPathBasedCacheDirectiveException(String description) {
public AddPathBasedCacheDirectiveException(String description,
PathBasedCacheDirective directive) {
super(description); super(description);
this.directive = directive;
}
public PathBasedCacheDirective getDirective() {
return directive;
} }
public static final class EmptyPathError public static final class EmptyPathError
extends AddPathBasedCacheDirectiveException { extends AddPathBasedCacheDirectiveException {
private static final long serialVersionUID = 1L; private static final long serialVersionUID = 1L;
public EmptyPathError(String msg) {
super(msg);
}
public EmptyPathError(PathBasedCacheDirective directive) { public EmptyPathError(PathBasedCacheDirective directive) {
super("empty path in directive " + directive, directive); this("empty path in directive " + directive);
} }
} }
@ -50,9 +46,12 @@ public abstract class AddPathBasedCacheDirectiveException extends IOException {
extends AddPathBasedCacheDirectiveException { extends AddPathBasedCacheDirectiveException {
private static final long serialVersionUID = 1L; private static final long serialVersionUID = 1L;
public InvalidPathNameError(String msg) {
super(msg);
}
public InvalidPathNameError(PathBasedCacheDirective directive) { public InvalidPathNameError(PathBasedCacheDirective directive) {
super("can't handle non-absolute path name " + directive.getPath(), this("can't handle invalid path name " + directive.getPath());
directive);
} }
} }
@ -60,8 +59,12 @@ public abstract class AddPathBasedCacheDirectiveException extends IOException {
extends AddPathBasedCacheDirectiveException { extends AddPathBasedCacheDirectiveException {
private static final long serialVersionUID = 1L; private static final long serialVersionUID = 1L;
public InvalidPoolNameError(String msg) {
super(msg);
}
public InvalidPoolNameError(PathBasedCacheDirective directive) { public InvalidPoolNameError(PathBasedCacheDirective directive) {
super("invalid pool name '" + directive.getPool() + "'", directive); this("invalid pool name '" + directive.getPool() + "'");
} }
} }
@ -69,9 +72,12 @@ public abstract class AddPathBasedCacheDirectiveException extends IOException {
extends AddPathBasedCacheDirectiveException { extends AddPathBasedCacheDirectiveException {
private static final long serialVersionUID = 1L; private static final long serialVersionUID = 1L;
public PoolWritePermissionDeniedError(String msg) {
super(msg);
}
public PoolWritePermissionDeniedError(PathBasedCacheDirective directive) { public PoolWritePermissionDeniedError(PathBasedCacheDirective directive) {
super("write permission denied for pool '" + directive.getPool() + "'", this("write permission denied for pool '" + directive.getPool() + "'");
directive);
} }
} }
@ -79,9 +85,13 @@ public abstract class AddPathBasedCacheDirectiveException extends IOException {
extends AddPathBasedCacheDirectiveException { extends AddPathBasedCacheDirectiveException {
private static final long serialVersionUID = 1L; private static final long serialVersionUID = 1L;
public PathAlreadyExistsInPoolError(String msg) {
super(msg);
}
public PathAlreadyExistsInPoolError(PathBasedCacheDirective directive) { public PathAlreadyExistsInPoolError(PathBasedCacheDirective directive) {
super("path " + directive.getPath() + " already exists in pool " + this("path " + directive.getPath() + " already exists in pool " +
directive.getPool(), directive); directive.getPool());
} }
} }
@ -89,10 +99,14 @@ public abstract class AddPathBasedCacheDirectiveException extends IOException {
extends AddPathBasedCacheDirectiveException { extends AddPathBasedCacheDirectiveException {
private static final long serialVersionUID = 1L; private static final long serialVersionUID = 1L;
public UnexpectedAddPathBasedCacheDirectiveException(String msg) {
super(msg);
}
public UnexpectedAddPathBasedCacheDirectiveException( public UnexpectedAddPathBasedCacheDirectiveException(
PathBasedCacheDirective directive) { PathBasedCacheDirective directive) {
super("encountered an unexpected error when trying to " + this("encountered an unexpected error when trying to " +
"add PathBasedCache directive " + directive, directive); "add PathBasedCache directive " + directive);
} }
} }
}; }

View File

@ -18,6 +18,8 @@
package org.apache.hadoop.hdfs.protocol; package org.apache.hadoop.hdfs.protocol;
import java.io.IOException;
import javax.annotation.Nullable; import javax.annotation.Nullable;
import org.apache.commons.lang.builder.EqualsBuilder; import org.apache.commons.lang.builder.EqualsBuilder;
@ -127,4 +129,20 @@ public class CachePoolInfo {
append(weight). append(weight).
hashCode(); hashCode();
} }
public static void validate(CachePoolInfo info) throws IOException {
if (info == null) {
throw new IOException("CachePoolInfo is null");
}
validateName(info.poolName);
}
public static void validateName(String poolName) throws IOException {
if (poolName == null || poolName.isEmpty()) {
// Empty pool names are not allowed because they would be highly
// confusing. They would also break the ability to list all pools
// by starting with prevKey = ""
throw new IOException("invalid empty cache pool name");
}
}
} }

View File

@ -19,7 +19,6 @@ package org.apache.hadoop.hdfs.protocol;
import java.io.FileNotFoundException; import java.io.FileNotFoundException;
import java.io.IOException; import java.io.IOException;
import java.util.List;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.InterfaceStability;
@ -48,7 +47,6 @@ import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.KerberosInfo; import org.apache.hadoop.security.KerberosInfo;
import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenInfo; import org.apache.hadoop.security.token.TokenInfo;
import org.apache.hadoop.util.Fallible;
/********************************************************************** /**********************************************************************
* ClientProtocol is used by user code via * ClientProtocol is used by user code via
@ -1098,28 +1096,24 @@ public interface ClientProtocol {
String fromSnapshot, String toSnapshot) throws IOException; String fromSnapshot, String toSnapshot) throws IOException;
/** /**
* Add some PathBasedCache directives to the CacheManager. * Add a PathBasedCache entry to the CacheManager.
* *
* @param directives A list of PathBasedCache directives to be added. * @param directive A PathBasedCacheDirective to be added
* @return A Fallible list, where each element is either a successfully addded * @return A PathBasedCacheDescriptor associated with the added directive
* PathBasedCache entry, or an IOException describing why the directive * @throws IOException if the directive could not be added
* could not be added.
*/ */
@AtMostOnce @AtMostOnce
public List<Fallible<PathBasedCacheDescriptor>> public PathBasedCacheDescriptor addPathBasedCacheDirective(
addPathBasedCacheDirectives(List<PathBasedCacheDirective> directives) PathBasedCacheDirective directive) throws IOException;
throws IOException;
/** /**
* Remove some PathBasedCache entries from the CacheManager. * Remove a PathBasedCacheDescriptor from the CacheManager.
* *
* @param ids A list of all the entry IDs to be removed from the CacheManager. * @param id of a PathBasedCacheDescriptor
* @return A Fallible list where each element is either a successfully removed * @throws IOException if the cache descriptor could not be removed
* ID, or an IOException describing why the ID could not be removed.
*/ */
@AtMostOnce @AtMostOnce
public List<Fallible<Long>> removePathBasedCacheDescriptors(List<Long> ids) public void removePathBasedCacheDescriptor(Long id) throws IOException;
throws IOException;
/** /**
* List the set of cached paths of a cache pool. Incrementally fetches results * List the set of cached paths of a cache pool. Incrementally fetches results
@ -1132,9 +1126,9 @@ public interface ClientProtocol {
* @return A RemoteIterator which returns PathBasedCacheDescriptor objects. * @return A RemoteIterator which returns PathBasedCacheDescriptor objects.
*/ */
@Idempotent @Idempotent
public RemoteIterator<PathBasedCacheDescriptor> listPathBasedCacheDescriptors(long prevId, public RemoteIterator<PathBasedCacheDescriptor> listPathBasedCacheDescriptors(
String pool, String path) throws IOException; long prevId, String pool, String path) throws IOException;
/** /**
* Add a new cache pool. * Add a new cache pool.
* *

View File

@ -19,31 +19,26 @@ package org.apache.hadoop.hdfs.protocol;
import java.io.IOException; import java.io.IOException;
import com.google.common.base.Preconditions;
/** /**
* An exception which occurred when trying to remove a PathBasedCache entry. * An exception which occurred when trying to remove a PathBasedCache entry.
*/ */
public abstract class RemovePathBasedCacheDescriptorException extends IOException { public abstract class RemovePathBasedCacheDescriptorException extends IOException {
private static final long serialVersionUID = 1L; private static final long serialVersionUID = 1L;
private final long entryId; public RemovePathBasedCacheDescriptorException(String description) {
public RemovePathBasedCacheDescriptorException(String description, long entryId) {
super(description); super(description);
this.entryId = entryId;
}
public long getEntryId() {
return this.entryId;
} }
public final static class InvalidIdException public final static class InvalidIdException
extends RemovePathBasedCacheDescriptorException { extends RemovePathBasedCacheDescriptorException {
private static final long serialVersionUID = 1L; private static final long serialVersionUID = 1L;
public InvalidIdException(String msg) {
super(msg);
}
public InvalidIdException(long entryId) { public InvalidIdException(long entryId) {
super("invalid PathBasedCacheDescriptor id " + entryId, entryId); this("invalid PathBasedCacheDescriptor id " + entryId);
} }
} }
@ -51,9 +46,13 @@ public abstract class RemovePathBasedCacheDescriptorException extends IOExceptio
extends RemovePathBasedCacheDescriptorException { extends RemovePathBasedCacheDescriptorException {
private static final long serialVersionUID = 1L; private static final long serialVersionUID = 1L;
public RemovePermissionDeniedException(String msg) {
super(msg);
}
public RemovePermissionDeniedException(long entryId) { public RemovePermissionDeniedException(long entryId) {
super("permission denied when trying to remove " + this("permission denied when trying to remove " +
"PathBasedCacheDescriptor id " + entryId, entryId); "PathBasedCacheDescriptor id " + entryId);
} }
} }
@ -61,9 +60,12 @@ public abstract class RemovePathBasedCacheDescriptorException extends IOExceptio
extends RemovePathBasedCacheDescriptorException { extends RemovePathBasedCacheDescriptorException {
private static final long serialVersionUID = 1L; private static final long serialVersionUID = 1L;
public NoSuchIdException(String msg) {
super(msg);
}
public NoSuchIdException(long entryId) { public NoSuchIdException(long entryId) {
super("there is no PathBasedCacheDescriptor with id " + entryId, this("there is no PathBasedCacheDescriptor with id " + entryId);
entryId);
} }
} }
@ -71,9 +73,13 @@ public abstract class RemovePathBasedCacheDescriptorException extends IOExceptio
extends RemovePathBasedCacheDescriptorException { extends RemovePathBasedCacheDescriptorException {
private static final long serialVersionUID = 1L; private static final long serialVersionUID = 1L;
public UnexpectedRemovePathBasedCacheDescriptorException(String msg) {
super(msg);
}
public UnexpectedRemovePathBasedCacheDescriptorException(long id) { public UnexpectedRemovePathBasedCacheDescriptorException(long id) {
super("encountered an unexpected error when trying to " + this("encountered an unexpected error when trying to " +
"remove PathBasedCacheDescriptor with id " + id, id); "remove PathBasedCacheDescriptor with id " + id);
} }
} }
} }

View File

@ -18,7 +18,6 @@
package org.apache.hadoop.hdfs.protocolPB; package org.apache.hadoop.hdfs.protocolPB;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList;
import java.util.List; import java.util.List;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
@ -28,11 +27,6 @@ import org.apache.hadoop.fs.FsServerDefaults;
import org.apache.hadoop.fs.Options.Rename; import org.apache.hadoop.fs.Options.Rename;
import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.protocol.AddPathBasedCacheDirectiveException.EmptyPathError;
import org.apache.hadoop.hdfs.protocol.AddPathBasedCacheDirectiveException.InvalidPathNameError;
import org.apache.hadoop.hdfs.protocol.AddPathBasedCacheDirectiveException.InvalidPoolNameError;
import org.apache.hadoop.hdfs.protocol.AddPathBasedCacheDirectiveException.PoolWritePermissionDeniedError;
import org.apache.hadoop.hdfs.protocol.AddPathBasedCacheDirectiveException.PathAlreadyExistsInPoolError;
import org.apache.hadoop.hdfs.protocol.CachePoolInfo; import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
import org.apache.hadoop.hdfs.protocol.ClientProtocol; import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.protocol.CorruptFileBlocks; import org.apache.hadoop.hdfs.protocol.CorruptFileBlocks;
@ -40,8 +34,8 @@ import org.apache.hadoop.hdfs.protocol.DirectoryListing;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks; import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.protocol.PathBasedCacheDirective;
import org.apache.hadoop.hdfs.protocol.PathBasedCacheDescriptor; import org.apache.hadoop.hdfs.protocol.PathBasedCacheDescriptor;
import org.apache.hadoop.hdfs.protocol.PathBasedCacheDirective;
import org.apache.hadoop.hdfs.protocol.RemovePathBasedCacheDescriptorException.InvalidIdException; import org.apache.hadoop.hdfs.protocol.RemovePathBasedCacheDescriptorException.InvalidIdException;
import org.apache.hadoop.hdfs.protocol.RemovePathBasedCacheDescriptorException.NoSuchIdException; import org.apache.hadoop.hdfs.protocol.RemovePathBasedCacheDescriptorException.NoSuchIdException;
import org.apache.hadoop.hdfs.protocol.RemovePathBasedCacheDescriptorException.RemovePermissionDeniedException; import org.apache.hadoop.hdfs.protocol.RemovePathBasedCacheDescriptorException.RemovePermissionDeniedException;
@ -53,9 +47,8 @@ import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.AddBlo
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.AddBlockResponseProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.AddBlockResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.AddCachePoolRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.AddCachePoolRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.AddCachePoolResponseProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.AddCachePoolResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.AddPathBasedCacheDirectiveErrorProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.AddPathBasedCacheDirectiveRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.AddPathBasedCacheDirectivesRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.AddPathBasedCacheDirectiveResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.AddPathBasedCacheDirectivesResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.AllowSnapshotRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.AllowSnapshotRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.AllowSnapshotResponseProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.AllowSnapshotResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.AppendRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.AppendRequestProto;
@ -83,8 +76,8 @@ import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.FsyncR
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetAdditionalDatanodeRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetAdditionalDatanodeRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetAdditionalDatanodeResponseProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetAdditionalDatanodeResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetBlockLocationsRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetBlockLocationsRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetBlockLocationsResponseProto.Builder;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetBlockLocationsResponseProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetBlockLocationsResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetBlockLocationsResponseProto.Builder;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetContentSummaryRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetContentSummaryRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetContentSummaryResponseProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetContentSummaryResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetDataEncryptionKeyRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetDataEncryptionKeyRequestProto;
@ -132,9 +125,8 @@ import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.Refres
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RefreshNodesResponseProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RefreshNodesResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RemoveCachePoolRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RemoveCachePoolRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RemoveCachePoolResponseProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RemoveCachePoolResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RemovePathBasedCacheDescriptorsRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RemovePathBasedCacheDescriptorRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RemovePathBasedCacheDescriptorsResponseProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RemovePathBasedCacheDescriptorResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RemovePathBasedCacheDescriptorErrorProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.Rename2RequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.Rename2RequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.Rename2ResponseProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.Rename2ResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RenameRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RenameRequestProto;
@ -174,7 +166,6 @@ import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.DatanodeInfoProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.LocatedBlockProto; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.LocatedBlockProto;
import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey; import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier; import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
import org.apache.hadoop.hdfs.server.namenode.CachePool;
import org.apache.hadoop.hdfs.server.namenode.INodeId; import org.apache.hadoop.hdfs.server.namenode.INodeId;
import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.proto.SecurityProtos.CancelDelegationTokenRequestProto; import org.apache.hadoop.security.proto.SecurityProtos.CancelDelegationTokenRequestProto;
@ -184,7 +175,6 @@ import org.apache.hadoop.security.proto.SecurityProtos.GetDelegationTokenRespons
import org.apache.hadoop.security.proto.SecurityProtos.RenewDelegationTokenRequestProto; import org.apache.hadoop.security.proto.SecurityProtos.RenewDelegationTokenRequestProto;
import org.apache.hadoop.security.proto.SecurityProtos.RenewDelegationTokenResponseProto; import org.apache.hadoop.security.proto.SecurityProtos.RenewDelegationTokenResponseProto;
import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.Fallible;
import com.google.protobuf.RpcController; import com.google.protobuf.RpcController;
import com.google.protobuf.ServiceException; import com.google.protobuf.ServiceException;
@ -1040,44 +1030,18 @@ public class ClientNamenodeProtocolServerSideTranslatorPB implements
} }
@Override @Override
public AddPathBasedCacheDirectivesResponseProto addPathBasedCacheDirectives(RpcController controller, public AddPathBasedCacheDirectiveResponseProto addPathBasedCacheDirective(
AddPathBasedCacheDirectivesRequestProto request) throws ServiceException { RpcController controller, AddPathBasedCacheDirectiveRequestProto request)
throws ServiceException {
try { try {
ArrayList<PathBasedCacheDirective> input = PathBasedCacheDirectiveProto proto = request.getDirective();
new ArrayList<PathBasedCacheDirective>(request.getElementsCount()); PathBasedCacheDirective directive =
for (int i = 0; i < request.getElementsCount(); i++) { new PathBasedCacheDirective(proto.getPath(), proto.getPool());
PathBasedCacheDirectiveProto proto = request.getElements(i); PathBasedCacheDescriptor descriptor =
input.add(new PathBasedCacheDirective(proto.getPath(), proto.getPool())); server.addPathBasedCacheDirective(directive);
} AddPathBasedCacheDirectiveResponseProto.Builder builder =
List<Fallible<PathBasedCacheDescriptor>> output = server.addPathBasedCacheDirectives(input); AddPathBasedCacheDirectiveResponseProto.newBuilder();
AddPathBasedCacheDirectivesResponseProto.Builder builder = builder.setDescriptorId(descriptor.getEntryId());
AddPathBasedCacheDirectivesResponseProto.newBuilder();
for (int idx = 0; idx < output.size(); idx++) {
try {
PathBasedCacheDescriptor directive = output.get(idx).get();
builder.addResults(directive.getEntryId());
} catch (IOException ioe) {
if (ioe.getCause() instanceof EmptyPathError) {
builder.addResults(AddPathBasedCacheDirectiveErrorProto.
EMPTY_PATH_ERROR_VALUE);
} else if (ioe.getCause() instanceof InvalidPathNameError) {
builder.addResults(AddPathBasedCacheDirectiveErrorProto.
INVALID_PATH_NAME_ERROR_VALUE);
} else if (ioe.getCause() instanceof InvalidPoolNameError) {
builder.addResults(AddPathBasedCacheDirectiveErrorProto.
INVALID_POOL_NAME_ERROR_VALUE);
} else if (ioe.getCause() instanceof PoolWritePermissionDeniedError) {
builder.addResults(AddPathBasedCacheDirectiveErrorProto.
ADD_PERMISSION_DENIED_ERROR_VALUE);
} else if (ioe.getCause() instanceof PathAlreadyExistsInPoolError) {
builder.addResults(AddPathBasedCacheDirectiveErrorProto.
PATH_ALREADY_EXISTS_IN_POOL_ERROR_VALUE);
} else {
builder.addResults(AddPathBasedCacheDirectiveErrorProto.
UNEXPECTED_ADD_ERROR_VALUE);
}
}
}
return builder.build(); return builder.build();
} catch (IOException e) { } catch (IOException e) {
throw new ServiceException(e); throw new ServiceException(e);
@ -1085,32 +1049,14 @@ public class ClientNamenodeProtocolServerSideTranslatorPB implements
} }
@Override @Override
public RemovePathBasedCacheDescriptorsResponseProto removePathBasedCacheDescriptors( public RemovePathBasedCacheDescriptorResponseProto removePathBasedCacheDescriptor(
RpcController controller, RemovePathBasedCacheDescriptorsRequestProto request) RpcController controller,
RemovePathBasedCacheDescriptorRequestProto request)
throws ServiceException { throws ServiceException {
try { try {
List<Fallible<Long>> output = server.removePathBasedCacheDescriptor(request.getDescriptorId());
server.removePathBasedCacheDescriptors(request.getElementsList()); RemovePathBasedCacheDescriptorResponseProto.Builder builder =
RemovePathBasedCacheDescriptorsResponseProto.Builder builder = RemovePathBasedCacheDescriptorResponseProto.newBuilder();
RemovePathBasedCacheDescriptorsResponseProto.newBuilder();
for (int idx = 0; idx < output.size(); idx++) {
try {
long id = output.get(idx).get();
builder.addResults(id);
} catch (InvalidIdException ioe) {
builder.addResults(RemovePathBasedCacheDescriptorErrorProto.
INVALID_CACHED_PATH_ID_ERROR_VALUE);
} catch (NoSuchIdException ioe) {
builder.addResults(RemovePathBasedCacheDescriptorErrorProto.
NO_SUCH_CACHED_PATH_ID_ERROR_VALUE);
} catch (RemovePermissionDeniedException ioe) {
builder.addResults(RemovePathBasedCacheDescriptorErrorProto.
REMOVE_PERMISSION_DENIED_ERROR_VALUE);
} catch (IOException ioe) {
builder.addResults(RemovePathBasedCacheDescriptorErrorProto.
UNEXPECTED_REMOVE_ERROR_VALUE);
}
}
return builder.build(); return builder.build();
} catch (IOException e) { } catch (IOException e) {
throw new ServiceException(e); throw new ServiceException(e);

View File

@ -20,9 +20,7 @@ package org.apache.hadoop.hdfs.protocolPB;
import java.io.Closeable; import java.io.Closeable;
import java.io.FileNotFoundException; import java.io.FileNotFoundException;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.List;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.InterfaceStability;
@ -38,18 +36,7 @@ import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.fs.UnresolvedLinkException; import org.apache.hadoop.fs.UnresolvedLinkException;
import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException; import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException;
import org.apache.hadoop.hdfs.protocol.PathBasedCacheDirective; import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
import org.apache.hadoop.hdfs.protocol.PathBasedCacheDescriptor;
import org.apache.hadoop.hdfs.protocol.AddPathBasedCacheDirectiveException.EmptyPathError;
import org.apache.hadoop.hdfs.protocol.AddPathBasedCacheDirectiveException.InvalidPathNameError;
import org.apache.hadoop.hdfs.protocol.AddPathBasedCacheDirectiveException.InvalidPoolNameError;
import org.apache.hadoop.hdfs.protocol.AddPathBasedCacheDirectiveException.PoolWritePermissionDeniedError;
import org.apache.hadoop.hdfs.protocol.AddPathBasedCacheDirectiveException.PathAlreadyExistsInPoolError;
import org.apache.hadoop.hdfs.protocol.AddPathBasedCacheDirectiveException.UnexpectedAddPathBasedCacheDirectiveException;
import org.apache.hadoop.hdfs.protocol.RemovePathBasedCacheDescriptorException.InvalidIdException;
import org.apache.hadoop.hdfs.protocol.RemovePathBasedCacheDescriptorException.NoSuchIdException;
import org.apache.hadoop.hdfs.protocol.RemovePathBasedCacheDescriptorException.RemovePermissionDeniedException;
import org.apache.hadoop.hdfs.protocol.RemovePathBasedCacheDescriptorException.UnexpectedRemovePathBasedCacheDescriptorException;
import org.apache.hadoop.hdfs.protocol.ClientProtocol; import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.protocol.CorruptFileBlocks; import org.apache.hadoop.hdfs.protocol.CorruptFileBlocks;
import org.apache.hadoop.hdfs.protocol.DSQuotaExceededException; import org.apache.hadoop.hdfs.protocol.DSQuotaExceededException;
@ -62,17 +49,16 @@ import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks; import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
import org.apache.hadoop.hdfs.protocol.NSQuotaExceededException; import org.apache.hadoop.hdfs.protocol.NSQuotaExceededException;
import org.apache.hadoop.hdfs.protocol.PathBasedCacheDescriptor;
import org.apache.hadoop.hdfs.protocol.PathBasedCacheDirective;
import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport; import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport;
import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus; import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.AbandonBlockRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.AbandonBlockRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.AddBlockRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.AddBlockRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.AddCachePoolRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.AddCachePoolRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.PathBasedCacheDirectiveProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.AddPathBasedCacheDirectiveRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.AddPathBasedCacheDirectiveErrorProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.AddPathBasedCacheDirectiveResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.AddPathBasedCacheDirectivesRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.AddPathBasedCacheDirectivesResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.AllowSnapshotRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.AllowSnapshotRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.AppendRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.AppendRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.AppendResponseProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.AppendResponseProto;
@ -110,23 +96,21 @@ import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetSna
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetSnapshottableDirListingRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetSnapshottableDirListingRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetSnapshottableDirListingResponseProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetSnapshottableDirListingResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.IsFileClosedRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.IsFileClosedRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListPathBasedCacheDescriptorsElementProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListPathBasedCacheDescriptorsRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListPathBasedCacheDescriptorsRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListPathBasedCacheDescriptorsResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListCachePoolsRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListCachePoolsRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListCachePoolsResponseElementProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListCachePoolsResponseElementProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListCachePoolsResponseProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListCachePoolsResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListCorruptFileBlocksRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListCorruptFileBlocksRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListPathBasedCacheDescriptorsElementProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListPathBasedCacheDescriptorsRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListPathBasedCacheDescriptorsResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.MetaSaveRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.MetaSaveRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.MkdirsRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.MkdirsRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ModifyCachePoolRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ModifyCachePoolRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.PathBasedCacheDirectiveProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RecoverLeaseRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RecoverLeaseRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RefreshNodesRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RefreshNodesRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RemovePathBasedCacheDescriptorsRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RemovePathBasedCacheDescriptorsResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RemovePathBasedCacheDescriptorErrorProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RemoveCachePoolRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RemoveCachePoolRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RemovePathBasedCacheDescriptorRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.Rename2RequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.Rename2RequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RenameRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RenameRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RenameSnapshotRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RenameSnapshotRequestProto;
@ -147,7 +131,6 @@ import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.Update
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.UpdatePipelineRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.UpdatePipelineRequestProto;
import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey; import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier; import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
import org.apache.hadoop.hdfs.server.namenode.CacheManager;
import org.apache.hadoop.hdfs.server.namenode.NotReplicatedYetException; import org.apache.hadoop.hdfs.server.namenode.NotReplicatedYetException;
import org.apache.hadoop.hdfs.server.namenode.SafeModeException; import org.apache.hadoop.hdfs.server.namenode.SafeModeException;
import org.apache.hadoop.io.EnumSetWritable; import org.apache.hadoop.io.EnumSetWritable;
@ -163,7 +146,6 @@ import org.apache.hadoop.security.proto.SecurityProtos.GetDelegationTokenRequest
import org.apache.hadoop.security.proto.SecurityProtos.GetDelegationTokenResponseProto; import org.apache.hadoop.security.proto.SecurityProtos.GetDelegationTokenResponseProto;
import org.apache.hadoop.security.proto.SecurityProtos.RenewDelegationTokenRequestProto; import org.apache.hadoop.security.proto.SecurityProtos.RenewDelegationTokenRequestProto;
import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.Fallible;
import com.google.protobuf.ByteString; import com.google.protobuf.ByteString;
import com.google.protobuf.ServiceException; import com.google.protobuf.ServiceException;
@ -1020,101 +1002,33 @@ public class ClientNamenodeProtocolTranslatorPB implements
} }
} }
private static IOException addPathBasedCacheDirectivesError(long code,
PathBasedCacheDirective directive) {
if (code == AddPathBasedCacheDirectiveErrorProto.EMPTY_PATH_ERROR_VALUE) {
return new EmptyPathError(directive);
} else if (code == AddPathBasedCacheDirectiveErrorProto.
INVALID_PATH_NAME_ERROR_VALUE) {
return new InvalidPathNameError(directive);
} else if (code == AddPathBasedCacheDirectiveErrorProto.
INVALID_POOL_NAME_ERROR_VALUE) {
return new InvalidPoolNameError(directive);
} else if (code == AddPathBasedCacheDirectiveErrorProto.
ADD_PERMISSION_DENIED_ERROR_VALUE) {
return new PoolWritePermissionDeniedError(directive);
} else if (code == AddPathBasedCacheDirectiveErrorProto.
PATH_ALREADY_EXISTS_IN_POOL_ERROR_VALUE) {
return new PathAlreadyExistsInPoolError(directive);
} else {
return new UnexpectedAddPathBasedCacheDirectiveException(directive);
}
}
@Override @Override
public List<Fallible<PathBasedCacheDescriptor>> addPathBasedCacheDirectives( public PathBasedCacheDescriptor addPathBasedCacheDirective(
List<PathBasedCacheDirective> directives) throws IOException { PathBasedCacheDirective directive) throws IOException {
try { try {
AddPathBasedCacheDirectivesRequestProto.Builder builder = AddPathBasedCacheDirectiveRequestProto.Builder builder =
AddPathBasedCacheDirectivesRequestProto.newBuilder(); AddPathBasedCacheDirectiveRequestProto.newBuilder();
for (PathBasedCacheDirective directive : directives) { builder.setDirective(PathBasedCacheDirectiveProto.newBuilder()
builder.addElements(PathBasedCacheDirectiveProto.newBuilder(). .setPath(directive.getPath())
setPath(directive.getPath()). .setPool(directive.getPool())
setPool(directive.getPool()). .build());
build()); AddPathBasedCacheDirectiveResponseProto result =
} rpcProxy.addPathBasedCacheDirective(null, builder.build());
AddPathBasedCacheDirectivesResponseProto result = return new PathBasedCacheDescriptor(result.getDescriptorId(),
rpcProxy.addPathBasedCacheDirectives(null, builder.build()); directive.getPath(), directive.getPool());
int resultsCount = result.getResultsCount();
ArrayList<Fallible<PathBasedCacheDescriptor>> results =
new ArrayList<Fallible<PathBasedCacheDescriptor>>(resultsCount);
for (int i = 0; i < resultsCount; i++) {
PathBasedCacheDirective directive = directives.get(i);
long code = result.getResults(i);
if (code > 0) {
results.add(new Fallible<PathBasedCacheDescriptor>(
new PathBasedCacheDescriptor(code,
directive.getPath(), directive.getPool())));
} else {
results.add(new Fallible<PathBasedCacheDescriptor>(
addPathBasedCacheDirectivesError(code, directive)));
}
}
return results;
} catch (ServiceException e) { } catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e); throw ProtobufHelper.getRemoteException(e);
} }
} }
private static IOException removePathBasedCacheDescriptorsError(long code, long id) {
if (code == RemovePathBasedCacheDescriptorErrorProto.
INVALID_CACHED_PATH_ID_ERROR_VALUE) {
return new InvalidIdException(id);
} else if (code == RemovePathBasedCacheDescriptorErrorProto.
NO_SUCH_CACHED_PATH_ID_ERROR_VALUE) {
return new NoSuchIdException(id);
} else if (code == RemovePathBasedCacheDescriptorErrorProto.
REMOVE_PERMISSION_DENIED_ERROR_VALUE) {
return new RemovePermissionDeniedException(id);
} else {
return new UnexpectedRemovePathBasedCacheDescriptorException(id);
}
}
@Override @Override
public List<Fallible<Long>> removePathBasedCacheDescriptors(List<Long> ids) public void removePathBasedCacheDescriptor(Long id)
throws IOException { throws IOException {
try { try {
RemovePathBasedCacheDescriptorsRequestProto.Builder builder = RemovePathBasedCacheDescriptorRequestProto.Builder builder =
RemovePathBasedCacheDescriptorsRequestProto.newBuilder(); RemovePathBasedCacheDescriptorRequestProto.newBuilder();
for (Long id : ids) { builder.setDescriptorId(id);
builder.addElements(id); rpcProxy.removePathBasedCacheDescriptor(null, builder.build());
}
RemovePathBasedCacheDescriptorsResponseProto result =
rpcProxy.removePathBasedCacheDescriptors(null, builder.build());
int resultsCount = result.getResultsCount();
ArrayList<Fallible<Long>> results =
new ArrayList<Fallible<Long>>(resultsCount);
for (int i = 0; i < resultsCount; i++) {
long code = result.getResults(i);
if (code > 0) {
results.add(new Fallible<Long>(code));
} else {
results.add(new Fallible<Long>(
removePathBasedCacheDescriptorsError(code, ids.get(i))));
}
}
return results;
} catch (ServiceException e) { } catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e); throw ProtobufHelper.getRemoteException(e);
} }

View File

@ -47,7 +47,6 @@ import org.apache.hadoop.hdfs.protocol.RemovePathBasedCacheDescriptorException.I
import org.apache.hadoop.hdfs.protocol.RemovePathBasedCacheDescriptorException.NoSuchIdException; import org.apache.hadoop.hdfs.protocol.RemovePathBasedCacheDescriptorException.NoSuchIdException;
import org.apache.hadoop.hdfs.protocol.RemovePathBasedCacheDescriptorException.UnexpectedRemovePathBasedCacheDescriptorException; import org.apache.hadoop.hdfs.protocol.RemovePathBasedCacheDescriptorException.UnexpectedRemovePathBasedCacheDescriptorException;
import org.apache.hadoop.hdfs.protocol.RemovePathBasedCacheDescriptorException.RemovePermissionDeniedException; import org.apache.hadoop.hdfs.protocol.RemovePathBasedCacheDescriptorException.RemovePermissionDeniedException;
import org.apache.hadoop.util.Fallible;
/** /**
* The Cache Manager handles caching on DataNodes. * The Cache Manager handles caching on DataNodes.
@ -136,25 +135,24 @@ public final class CacheManager {
return null; return null;
} }
private synchronized Fallible<PathBasedCacheDescriptor> addDirective( public synchronized PathBasedCacheDescriptor addDirective(
PathBasedCacheDirective directive, FSPermissionChecker pc) { PathBasedCacheDirective directive, FSPermissionChecker pc)
throws IOException {
CachePool pool = cachePools.get(directive.getPool()); CachePool pool = cachePools.get(directive.getPool());
if (pool == null) { if (pool == null) {
LOG.info("addDirective " + directive + ": pool not found."); LOG.info("addDirective " + directive + ": pool not found.");
return new Fallible<PathBasedCacheDescriptor>( throw new InvalidPoolNameError(directive);
new InvalidPoolNameError(directive));
} }
if ((pc != null) && (!pc.checkPermission(pool, FsAction.WRITE))) { if ((pc != null) && (!pc.checkPermission(pool, FsAction.WRITE))) {
LOG.info("addDirective " + directive + ": write permission denied."); LOG.info("addDirective " + directive + ": write permission denied.");
return new Fallible<PathBasedCacheDescriptor>( throw new PoolWritePermissionDeniedError(directive);
new PoolWritePermissionDeniedError(directive));
} }
try { try {
directive.validate(); directive.validate();
} catch (IOException ioe) { } catch (IOException ioe) {
LOG.info("addDirective " + directive + ": validation failed: " LOG.info("addDirective " + directive + ": validation failed: "
+ ioe.getClass().getName() + ": " + ioe.getMessage()); + ioe.getClass().getName() + ": " + ioe.getMessage());
return new Fallible<PathBasedCacheDescriptor>(ioe); throw ioe;
} }
// Check if we already have this entry. // Check if we already have this entry.
@ -162,8 +160,7 @@ public final class CacheManager {
if (existing != null) { if (existing != null) {
LOG.info("addDirective " + directive + ": there is an " + LOG.info("addDirective " + directive + ": there is an " +
"existing directive " + existing + " in this pool."); "existing directive " + existing + " in this pool.");
return new Fallible<PathBasedCacheDescriptor>( return existing.getDescriptor();
existing.getDescriptor());
} }
// Add a new entry with the next available ID. // Add a new entry with the next available ID.
PathBasedCacheEntry entry; PathBasedCacheEntry entry;
@ -171,8 +168,7 @@ public final class CacheManager {
entry = new PathBasedCacheEntry(getNextEntryId(), entry = new PathBasedCacheEntry(getNextEntryId(),
directive.getPath(), pool); directive.getPath(), pool);
} catch (IOException ioe) { } catch (IOException ioe) {
return new Fallible<PathBasedCacheDescriptor>( throw new UnexpectedAddPathBasedCacheDirectiveException(directive);
new UnexpectedAddPathBasedCacheDirectiveException(directive));
} }
LOG.info("addDirective " + directive + ": added cache directive " LOG.info("addDirective " + directive + ": added cache directive "
+ directive); + directive);
@ -202,56 +198,42 @@ public final class CacheManager {
} catch (IOException ioe) { } catch (IOException ioe) {
LOG.info("addDirective " + directive +": failed to cache file: " + LOG.info("addDirective " + directive +": failed to cache file: " +
ioe.getClass().getName() +": " + ioe.getMessage()); ioe.getClass().getName() +": " + ioe.getMessage());
return new Fallible<PathBasedCacheDescriptor>(ioe); throw ioe;
} }
return new Fallible<PathBasedCacheDescriptor>( return entry.getDescriptor();
entry.getDescriptor());
} }
public synchronized List<Fallible<PathBasedCacheDescriptor>> addDirectives( public synchronized void removeDescriptor(long id, FSPermissionChecker pc)
List<PathBasedCacheDirective> directives, FSPermissionChecker pc) { throws IOException {
ArrayList<Fallible<PathBasedCacheDescriptor>> results =
new ArrayList<Fallible<PathBasedCacheDescriptor>>(directives.size());
for (PathBasedCacheDirective directive: directives) {
results.add(addDirective(directive, pc));
}
return results;
}
private synchronized Fallible<Long> removeDescriptor(long id,
FSPermissionChecker pc) {
// Check for invalid IDs. // Check for invalid IDs.
if (id <= 0) { if (id <= 0) {
LOG.info("removeDescriptor " + id + ": invalid non-positive " + LOG.info("removeDescriptor " + id + ": invalid non-positive " +
"descriptor ID."); "descriptor ID.");
return new Fallible<Long>(new InvalidIdException(id)); throw new InvalidIdException(id);
} }
// Find the entry. // Find the entry.
PathBasedCacheEntry existing = entriesById.get(id); PathBasedCacheEntry existing = entriesById.get(id);
if (existing == null) { if (existing == null) {
LOG.info("removeDescriptor " + id + ": entry not found."); LOG.info("removeDescriptor " + id + ": entry not found.");
return new Fallible<Long>(new NoSuchIdException(id)); throw new NoSuchIdException(id);
} }
CachePool pool = cachePools.get(existing.getDescriptor().getPool()); CachePool pool = cachePools.get(existing.getDescriptor().getPool());
if (pool == null) { if (pool == null) {
LOG.info("removeDescriptor " + id + ": pool not found for directive " + LOG.info("removeDescriptor " + id + ": pool not found for directive " +
existing.getDescriptor()); existing.getDescriptor());
return new Fallible<Long>( throw new UnexpectedRemovePathBasedCacheDescriptorException(id);
new UnexpectedRemovePathBasedCacheDescriptorException(id));
} }
if ((pc != null) && (!pc.checkPermission(pool, FsAction.WRITE))) { if ((pc != null) && (!pc.checkPermission(pool, FsAction.WRITE))) {
LOG.info("removeDescriptor " + id + ": write permission denied to " + LOG.info("removeDescriptor " + id + ": write permission denied to " +
"pool " + pool + " for entry " + existing); "pool " + pool + " for entry " + existing);
return new Fallible<Long>( throw new RemovePermissionDeniedException(id);
new RemovePermissionDeniedException(id));
} }
// Remove the corresponding entry in entriesByPath. // Remove the corresponding entry in entriesByPath.
String path = existing.getDescriptor().getPath(); String path = existing.getDescriptor().getPath();
List<PathBasedCacheEntry> entries = entriesByPath.get(path); List<PathBasedCacheEntry> entries = entriesByPath.get(path);
if (entries == null || !entries.remove(existing)) { if (entries == null || !entries.remove(existing)) {
return new Fallible<Long>( throw new UnexpectedRemovePathBasedCacheDescriptorException(id);
new UnexpectedRemovePathBasedCacheDescriptorException(id));
} }
if (entries.size() == 0) { if (entries.size() == 0) {
entriesByPath.remove(path); entriesByPath.remove(path);
@ -268,20 +250,9 @@ public final class CacheManager {
} catch (IOException e) { } catch (IOException e) {
LOG.warn("removeDescriptor " + id + ": failure while setting cache" LOG.warn("removeDescriptor " + id + ": failure while setting cache"
+ " replication factor", e); + " replication factor", e);
return new Fallible<Long>(e); throw e;
} }
LOG.info("removeDescriptor successful for PathCacheEntry id " + id); LOG.info("removeDescriptor successful for PathCacheEntry id " + id);
return new Fallible<Long>(id);
}
public synchronized List<Fallible<Long>> removeDescriptors(List<Long> ids,
FSPermissionChecker pc) {
ArrayList<Fallible<Long>> results =
new ArrayList<Fallible<Long>>(ids.size());
for (Long id : ids) {
results.add(removeDescriptor(id, pc));
}
return results;
} }
public synchronized BatchedListEntries<PathBasedCacheDescriptor> public synchronized BatchedListEntries<PathBasedCacheDescriptor>
@ -329,8 +300,8 @@ public final class CacheManager {
*/ */
public synchronized void addCachePool(CachePoolInfo info) public synchronized void addCachePool(CachePoolInfo info)
throws IOException { throws IOException {
CachePoolInfo.validate(info);
String poolName = info.getPoolName(); String poolName = info.getPoolName();
CachePool.validateName(poolName);
CachePool pool = cachePools.get(poolName); CachePool pool = cachePools.get(poolName);
if (pool != null) { if (pool != null) {
throw new IOException("cache pool " + poolName + " already exists."); throw new IOException("cache pool " + poolName + " already exists.");
@ -352,10 +323,8 @@ public final class CacheManager {
*/ */
public synchronized void modifyCachePool(CachePoolInfo info) public synchronized void modifyCachePool(CachePoolInfo info)
throws IOException { throws IOException {
CachePoolInfo.validate(info);
String poolName = info.getPoolName(); String poolName = info.getPoolName();
if (poolName.isEmpty()) {
throw new IOException("invalid empty cache pool name");
}
CachePool pool = cachePools.get(poolName); CachePool pool = cachePools.get(poolName);
if (pool == null) { if (pool == null) {
throw new IOException("cache pool " + poolName + " does not exist."); throw new IOException("cache pool " + poolName + " does not exist.");
@ -401,9 +370,10 @@ public final class CacheManager {
*/ */
public synchronized void removeCachePool(String poolName) public synchronized void removeCachePool(String poolName)
throws IOException { throws IOException {
CachePoolInfo.validateName(poolName);
CachePool pool = cachePools.remove(poolName); CachePool pool = cachePools.remove(poolName);
if (pool == null) { if (pool == null) {
throw new IOException("can't remove nonexistent cache pool " + poolName); throw new IOException("can't remove non-existent cache pool " + poolName);
} }
// Remove entries using this pool // Remove entries using this pool

View File

@ -24,9 +24,9 @@ import javax.annotation.Nonnull;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
import org.apache.hadoop.fs.permission.FsAction; import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
/** /**
@ -162,13 +162,4 @@ public final class CachePool {
append(", weight:").append(weight). append(", weight:").append(weight).
append(" }").toString(); append(" }").toString();
} }
public static void validateName(String name) throws IOException {
if (name.isEmpty()) {
// Empty pool names are not allowed because they would be highly
// confusing. They would also break the ability to list all pools
// by starting with prevKey = ""
throw new IOException("invalid empty cache pool name");
}
}
} }

View File

@ -132,7 +132,6 @@ import org.apache.hadoop.fs.Options;
import org.apache.hadoop.fs.Options.Rename; import org.apache.hadoop.fs.Options.Rename;
import org.apache.hadoop.fs.ParentNotDirectoryException; import org.apache.hadoop.fs.ParentNotDirectoryException;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.fs.UnresolvedLinkException; import org.apache.hadoop.fs.UnresolvedLinkException;
import org.apache.hadoop.fs.permission.FsAction; import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.fs.permission.FsPermission;
@ -232,7 +231,6 @@ import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdenti
import org.apache.hadoop.security.token.delegation.DelegationKey; import org.apache.hadoop.security.token.delegation.DelegationKey;
import org.apache.hadoop.util.Daemon; import org.apache.hadoop.util.Daemon;
import org.apache.hadoop.util.DataChecksum; import org.apache.hadoop.util.DataChecksum;
import org.apache.hadoop.util.Fallible;
import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.Time; import org.apache.hadoop.util.Time;
import org.apache.hadoop.util.VersionInfo; import org.apache.hadoop.util.VersionInfo;
@ -6884,18 +6882,17 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
} }
} }
@SuppressWarnings("unchecked") PathBasedCacheDescriptor addPathBasedCacheDirective(
List<Fallible<PathBasedCacheDescriptor>> addPathBasedCacheDirectives( PathBasedCacheDirective directive) throws IOException {
List<PathBasedCacheDirective> directives) throws IOException {
CacheEntryWithPayload retryCacheEntry = CacheEntryWithPayload retryCacheEntry =
RetryCache.waitForCompletion(retryCache, null); RetryCache.waitForCompletion(retryCache, null);
if (retryCacheEntry != null && retryCacheEntry.isSuccess()) { if (retryCacheEntry != null && retryCacheEntry.isSuccess()) {
return (List<Fallible<PathBasedCacheDescriptor>>) retryCacheEntry.getPayload(); return (PathBasedCacheDescriptor) retryCacheEntry.getPayload();
} }
final FSPermissionChecker pc = isPermissionEnabled ? final FSPermissionChecker pc = isPermissionEnabled ?
getPermissionChecker() : null; getPermissionChecker() : null;
boolean success = false; boolean success = false;
List<Fallible<PathBasedCacheDescriptor>> results = null; PathBasedCacheDescriptor result = null;
checkOperation(OperationCategory.WRITE); checkOperation(OperationCategory.WRITE);
writeLock(); writeLock();
try { try {
@ -6904,8 +6901,8 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
throw new SafeModeException( throw new SafeModeException(
"Cannot add PathBasedCache directive", safeMode); "Cannot add PathBasedCache directive", safeMode);
} }
results = cacheManager.addDirectives(directives, pc); result = cacheManager.addDirective(directive, pc);
//getEditLog().logAddPathBasedCacheDirectives(results); FIXME: HDFS-5119 //getEditLog().logAddPathBasedCacheDirective(result); FIXME: HDFS-5119
success = true; success = true;
} finally { } finally {
writeUnlock(); writeUnlock();
@ -6913,24 +6910,21 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
getEditLog().logSync(); getEditLog().logSync();
} }
if (isAuditEnabled() && isExternalInvocation()) { if (isAuditEnabled() && isExternalInvocation()) {
logAuditEvent(success, "addPathBasedCacheDirectives", null, null, null); logAuditEvent(success, "addPathBasedCacheDirective", null, null, null);
} }
RetryCache.setState(retryCacheEntry, success, results); RetryCache.setState(retryCacheEntry, success, result);
} }
return results; return result;
} }
@SuppressWarnings("unchecked") void removePathBasedCacheDescriptor(Long id) throws IOException {
List<Fallible<Long>> removePathBasedCacheDescriptors(List<Long> ids) throws IOException { CacheEntry retryCacheEntry = RetryCache.waitForCompletion(retryCache);
CacheEntryWithPayload retryCacheEntry =
RetryCache.waitForCompletion(retryCache, null);
if (retryCacheEntry != null && retryCacheEntry.isSuccess()) { if (retryCacheEntry != null && retryCacheEntry.isSuccess()) {
return (List<Fallible<Long>>) retryCacheEntry.getPayload(); return;
} }
final FSPermissionChecker pc = isPermissionEnabled ? final FSPermissionChecker pc = isPermissionEnabled ?
getPermissionChecker() : null; getPermissionChecker() : null;
boolean success = false; boolean success = false;
List<Fallible<Long>> results = null;
checkOperation(OperationCategory.WRITE); checkOperation(OperationCategory.WRITE);
writeLock(); writeLock();
try { try {
@ -6939,22 +6933,22 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
throw new SafeModeException( throw new SafeModeException(
"Cannot remove PathBasedCache directives", safeMode); "Cannot remove PathBasedCache directives", safeMode);
} }
results = cacheManager.removeDescriptors(ids, pc); cacheManager.removeDescriptor(id, pc);
//getEditLog().logRemovePathBasedCacheEntries(results); FIXME: HDFS-5119 //getEditLog().logRemovePathBasedCacheEntries(results); FIXME: HDFS-5119
success = true; success = true;
} finally { } finally {
writeUnlock(); writeUnlock();
if (isAuditEnabled() && isExternalInvocation()) { if (isAuditEnabled() && isExternalInvocation()) {
logAuditEvent(success, "removePathBasedCacheDescriptors", null, null, null); logAuditEvent(success, "removePathBasedCacheDescriptors", null, null,
null);
} }
RetryCache.setState(retryCacheEntry, success, results); RetryCache.setState(retryCacheEntry, success);
} }
getEditLog().logSync(); getEditLog().logSync();
return results;
} }
BatchedListEntries<PathBasedCacheDescriptor> listPathBasedCacheDescriptors(long startId, BatchedListEntries<PathBasedCacheDescriptor> listPathBasedCacheDescriptors(
String pool, String path) throws IOException { long startId, String pool, String path) throws IOException {
final FSPermissionChecker pc = isPermissionEnabled ? final FSPermissionChecker pc = isPermissionEnabled ?
getPermissionChecker() : null; getPermissionChecker() : null;
BatchedListEntries<PathBasedCacheDescriptor> results; BatchedListEntries<PathBasedCacheDescriptor> results;
@ -6963,12 +6957,14 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
boolean success = false; boolean success = false;
try { try {
checkOperation(OperationCategory.READ); checkOperation(OperationCategory.READ);
results = cacheManager.listPathBasedCacheDescriptors(startId, pool, path, pc); results =
cacheManager.listPathBasedCacheDescriptors(startId, pool, path, pc);
success = true; success = true;
} finally { } finally {
readUnlock(); readUnlock();
if (isAuditEnabled() && isExternalInvocation()) { if (isAuditEnabled() && isExternalInvocation()) {
logAuditEvent(success, "listPathBasedCacheDescriptors", null, null, null); logAuditEvent(success, "listPathBasedCacheDescriptors", null, null,
null);
} }
} }
return results; return results;

View File

@ -139,7 +139,6 @@ import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.tools.proto.GetUserMappingsProtocolProtos.GetUserMappingsProtocolService; import org.apache.hadoop.tools.proto.GetUserMappingsProtocolProtos.GetUserMappingsProtocolService;
import org.apache.hadoop.tools.protocolPB.GetUserMappingsProtocolPB; import org.apache.hadoop.tools.protocolPB.GetUserMappingsProtocolPB;
import org.apache.hadoop.tools.protocolPB.GetUserMappingsProtocolServerSideTranslatorPB; import org.apache.hadoop.tools.protocolPB.GetUserMappingsProtocolServerSideTranslatorPB;
import org.apache.hadoop.util.Fallible;
import org.apache.hadoop.util.VersionInfo; import org.apache.hadoop.util.VersionInfo;
import org.apache.hadoop.util.VersionUtil; import org.apache.hadoop.util.VersionUtil;
@ -1238,15 +1237,14 @@ class NameNodeRpcServer implements NamenodeProtocols {
} }
@Override @Override
public List<Fallible<PathBasedCacheDescriptor>> addPathBasedCacheDirectives( public PathBasedCacheDescriptor addPathBasedCacheDirective(
List<PathBasedCacheDirective> paths) throws IOException { PathBasedCacheDirective path) throws IOException {
return namesystem.addPathBasedCacheDirectives(paths); return namesystem.addPathBasedCacheDirective(path);
} }
@Override @Override
public List<Fallible<Long>> removePathBasedCacheDescriptors(List<Long> ids) public void removePathBasedCacheDescriptor(Long id) throws IOException {
throws IOException { namesystem.removePathBasedCacheDescriptor(id);
return namesystem.removePathBasedCacheDescriptors(ids);
} }
private class ServerSidePathBasedCacheEntriesIterator private class ServerSidePathBasedCacheEntriesIterator

View File

@ -22,15 +22,13 @@ import java.util.LinkedList;
import java.util.List; import java.util.List;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.protocol.PathBasedCacheDirective;
import org.apache.hadoop.hdfs.protocol.PathBasedCacheDescriptor; import org.apache.hadoop.hdfs.protocol.PathBasedCacheDescriptor;
import org.apache.hadoop.hdfs.protocol.PathBasedCacheDirective;
import org.apache.hadoop.hdfs.tools.TableListing.Justification; import org.apache.hadoop.hdfs.tools.TableListing.Justification;
import org.apache.hadoop.util.Fallible;
import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.StringUtils;
/** /**
@ -96,21 +94,14 @@ public class CacheAdmin {
} }
DistributedFileSystem dfs = getDFS(); DistributedFileSystem dfs = getDFS();
List<PathBasedCacheDirective> directives = PathBasedCacheDirective directive =
new LinkedList<PathBasedCacheDirective>(); new PathBasedCacheDirective(path, poolName);
PathBasedCacheDirective directive = new PathBasedCacheDirective(path, poolName);
directives.add(directive); PathBasedCacheDescriptor descriptor =
List<Fallible<PathBasedCacheDescriptor>> results = dfs.addPathBasedCacheDirective(directive);
dfs.addPathBasedCacheDirective(directives); System.out.println("Added PathBasedCache entry "
try { + descriptor.getEntryId());
PathBasedCacheDescriptor entry = results.get(0).get(); return 0;
System.out.println("Added PathBasedCache entry " + entry.getEntryId());
return 0;
} catch (IOException e) {
System.err.println("Error adding cache directive " + directive + ": " +
e.getMessage());
return 1;
}
} }
} }
@ -153,18 +144,10 @@ public class CacheAdmin {
return 1; return 1;
} }
DistributedFileSystem dfs = getDFS(); DistributedFileSystem dfs = getDFS();
List<Long> ids = new LinkedList<Long>(); dfs.removePathBasedCacheDescriptor(new PathBasedCacheDescriptor(id, null,
ids.add(id); null));
List<Fallible<Long>> results = dfs.removePathBasedCacheDescriptors(ids); System.out.println("Removed PathBasedCache directive " + id);
try { return 0;
Long resultId = results.get(0).get();
System.out.println("Removed PathBasedCache entry " + resultId);
return 0;
} catch (IOException e) {
System.err.println("Error removing cache directive " + id + ": " +
e.getMessage());
return 1;
}
} }
} }

View File

@ -368,36 +368,19 @@ message PathBasedCacheDirectiveProto {
required string pool = 2; required string pool = 2;
} }
message AddPathBasedCacheDirectivesRequestProto { message AddPathBasedCacheDirectiveRequestProto {
repeated PathBasedCacheDirectiveProto elements = 1; required PathBasedCacheDirectiveProto directive = 1;
} }
message AddPathBasedCacheDirectivesResponseProto { message AddPathBasedCacheDirectiveResponseProto {
repeated int64 results = 1 [packed=true]; required int64 descriptorId = 1;
} }
enum AddPathBasedCacheDirectiveErrorProto { message RemovePathBasedCacheDescriptorRequestProto {
UNEXPECTED_ADD_ERROR = -1; required int64 descriptorId = 1;
EMPTY_PATH_ERROR = -2;
INVALID_PATH_NAME_ERROR = -3;
INVALID_POOL_NAME_ERROR = -4;
ADD_PERMISSION_DENIED_ERROR = -5;
PATH_ALREADY_EXISTS_IN_POOL_ERROR = -6;
} }
message RemovePathBasedCacheDescriptorsRequestProto { message RemovePathBasedCacheDescriptorResponseProto {
repeated int64 elements = 1 [packed=true];
}
message RemovePathBasedCacheDescriptorsResponseProto {
repeated int64 results = 1 [packed=true];
}
enum RemovePathBasedCacheDescriptorErrorProto {
UNEXPECTED_REMOVE_ERROR = -1;
INVALID_CACHED_PATH_ID_ERROR = -2;
NO_SUCH_CACHED_PATH_ID_ERROR = -3;
REMOVE_PERMISSION_DENIED_ERROR = -4;
} }
message ListPathBasedCacheDescriptorsRequestProto { message ListPathBasedCacheDescriptorsRequestProto {
@ -644,10 +627,10 @@ service ClientNamenodeProtocol {
returns(ListCorruptFileBlocksResponseProto); returns(ListCorruptFileBlocksResponseProto);
rpc metaSave(MetaSaveRequestProto) returns(MetaSaveResponseProto); rpc metaSave(MetaSaveRequestProto) returns(MetaSaveResponseProto);
rpc getFileInfo(GetFileInfoRequestProto) returns(GetFileInfoResponseProto); rpc getFileInfo(GetFileInfoRequestProto) returns(GetFileInfoResponseProto);
rpc addPathBasedCacheDirectives(AddPathBasedCacheDirectivesRequestProto) rpc addPathBasedCacheDirective(AddPathBasedCacheDirectiveRequestProto)
returns (AddPathBasedCacheDirectivesResponseProto); returns (AddPathBasedCacheDirectiveResponseProto);
rpc removePathBasedCacheDescriptors(RemovePathBasedCacheDescriptorsRequestProto) rpc removePathBasedCacheDescriptor(RemovePathBasedCacheDescriptorRequestProto)
returns (RemovePathBasedCacheDescriptorsResponseProto); returns (RemovePathBasedCacheDescriptorResponseProto);
rpc listPathBasedCacheDescriptors(ListPathBasedCacheDescriptorsRequestProto) rpc listPathBasedCacheDescriptors(ListPathBasedCacheDescriptorsRequestProto)
returns (ListPathBasedCacheDescriptorsResponseProto); returns (ListPathBasedCacheDescriptorsResponseProto);
rpc addCachePool(AddCachePoolRequestProto) rpc addCachePool(AddCachePoolRequestProto)

View File

@ -36,12 +36,11 @@ import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.CachePoolInfo; import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
import org.apache.hadoop.hdfs.protocol.PathBasedCacheDirective;
import org.apache.hadoop.hdfs.protocol.PathBasedCacheDescriptor; import org.apache.hadoop.hdfs.protocol.PathBasedCacheDescriptor;
import org.apache.hadoop.hdfs.protocol.PathBasedCacheDirective;
import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols; import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
import org.apache.hadoop.io.nativeio.NativeIO; import org.apache.hadoop.io.nativeio.NativeIO;
import org.apache.hadoop.util.Fallible;
import org.junit.After; import org.junit.After;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
@ -148,17 +147,13 @@ public class TestCacheReplicationManager {
waitForExpectedNumCachedBlocks(expected); waitForExpectedNumCachedBlocks(expected);
// Cache and check each path in sequence // Cache and check each path in sequence
for (int i=0; i<numFiles; i++) { for (int i=0; i<numFiles; i++) {
List<PathBasedCacheDirective> toAdd = PathBasedCacheDirective directive = new PathBasedCacheDirective(paths
new ArrayList<PathBasedCacheDirective>(); .get(i), pool);
toAdd.add(new PathBasedCacheDirective(paths.get(i), pool)); PathBasedCacheDescriptor descriptor =
List<Fallible<PathBasedCacheDescriptor>> fallibles = nnRpc.addPathBasedCacheDirective(directive);
nnRpc.addPathBasedCacheDirectives(toAdd); assertEquals("Descriptor does not match requested path", paths.get(i),
assertEquals("Unexpected number of fallibles",
1, fallibles.size());
PathBasedCacheDescriptor directive = fallibles.get(0).get();
assertEquals("Directive does not match requested path", paths.get(i),
directive.getPath()); directive.getPath());
assertEquals("Directive does not match requested pool", pool, assertEquals("Descriptor does not match requested pool", pool,
directive.getPool()); directive.getPool());
expected += numBlocksPerFile; expected += numBlocksPerFile;
waitForExpectedNumCachedBlocks(expected); waitForExpectedNumCachedBlocks(expected);
@ -167,14 +162,8 @@ public class TestCacheReplicationManager {
RemoteIterator<PathBasedCacheDescriptor> entries = RemoteIterator<PathBasedCacheDescriptor> entries =
nnRpc.listPathBasedCacheDescriptors(0, null, null); nnRpc.listPathBasedCacheDescriptors(0, null, null);
for (int i=0; i<numFiles; i++) { for (int i=0; i<numFiles; i++) {
PathBasedCacheDescriptor entry = entries.next(); PathBasedCacheDescriptor descriptor = entries.next();
List<Long> toRemove = new ArrayList<Long>(); nnRpc.removePathBasedCacheDescriptor(descriptor.getEntryId());
toRemove.add(entry.getEntryId());
List<Fallible<Long>> fallibles = nnRpc.removePathBasedCacheDescriptors(toRemove);
assertEquals("Unexpected number of fallibles", 1, fallibles.size());
Long l = fallibles.get(0).get();
assertEquals("Removed entryId does not match requested",
entry.getEntryId(), l.longValue());
expected -= numBlocksPerFile; expected -= numBlocksPerFile;
waitForExpectedNumCachedBlocks(expected); waitForExpectedNumCachedBlocks(expected);
} }

View File

@ -17,36 +17,40 @@
*/ */
package org.apache.hadoop.hdfs.server.namenode; package org.apache.hadoop.hdfs.server.namenode;
import static org.junit.Assert.*; import static junit.framework.Assert.assertTrue;
import static junit.framework.Assert.fail;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import java.io.IOException; import java.io.IOException;
import java.security.PrivilegedExceptionAction; import java.security.PrivilegedExceptionAction;
import java.util.Arrays;
import java.util.List;
import junit.framework.Assert; import junit.framework.Assert;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.AddPathBasedCacheDirectiveException.EmptyPathError; import org.apache.hadoop.hdfs.protocol.AddPathBasedCacheDirectiveException.EmptyPathError;
import org.apache.hadoop.hdfs.protocol.AddPathBasedCacheDirectiveException.InvalidPoolNameError;
import org.apache.hadoop.hdfs.protocol.AddPathBasedCacheDirectiveException.InvalidPathNameError; import org.apache.hadoop.hdfs.protocol.AddPathBasedCacheDirectiveException.InvalidPathNameError;
import org.apache.hadoop.hdfs.protocol.AddPathBasedCacheDirectiveException.InvalidPoolNameError;
import org.apache.hadoop.hdfs.protocol.AddPathBasedCacheDirectiveException.PoolWritePermissionDeniedError; import org.apache.hadoop.hdfs.protocol.AddPathBasedCacheDirectiveException.PoolWritePermissionDeniedError;
import org.apache.hadoop.hdfs.protocol.CachePoolInfo; import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
import org.apache.hadoop.hdfs.protocol.RemovePathBasedCacheDescriptorException.InvalidIdException;
import org.apache.hadoop.hdfs.protocol.PathBasedCacheDirective;
import org.apache.hadoop.hdfs.protocol.PathBasedCacheDescriptor; import org.apache.hadoop.hdfs.protocol.PathBasedCacheDescriptor;
import org.apache.hadoop.hdfs.protocol.PathBasedCacheDirective;
import org.apache.hadoop.hdfs.protocol.RemovePathBasedCacheDescriptorException.InvalidIdException;
import org.apache.hadoop.hdfs.protocol.RemovePathBasedCacheDescriptorException.NoSuchIdException; import org.apache.hadoop.hdfs.protocol.RemovePathBasedCacheDescriptorException.NoSuchIdException;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols; import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.Fallible; import org.junit.After;
import org.junit.Before;
import org.junit.Test; import org.junit.Test;
public class TestPathBasedCacheRequests { public class TestPathBasedCacheRequests {
@ -55,221 +59,357 @@ public class TestPathBasedCacheRequests {
private static final UserGroupInformation unprivilegedUser = private static final UserGroupInformation unprivilegedUser =
UserGroupInformation.createRemoteUser("unprivilegedUser"); UserGroupInformation.createRemoteUser("unprivilegedUser");
@Test static private Configuration conf;
public void testCreateAndRemovePools() throws Exception { static private MiniDFSCluster cluster;
Configuration conf = new HdfsConfiguration(); static private DistributedFileSystem dfs;
MiniDFSCluster cluster = null; static private NamenodeProtocols proto;
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build(); @Before
cluster.waitActive(); public void setup() throws Exception {
NamenodeProtocols proto = cluster.getNameNodeRpc(); conf = new HdfsConfiguration();
CachePoolInfo req = new CachePoolInfo("pool1").
setOwnerName("bob").setGroupName("bobgroup").
setMode(new FsPermission((short)0755)).setWeight(150);
proto.addCachePool(req);
try {
proto.removeCachePool("pool99");
Assert.fail("expected to get an exception when " +
"removing a non-existent pool.");
} catch (IOException ioe) {
GenericTestUtils.assertExceptionContains("can't remove " +
"nonexistent cache pool", ioe);
}
proto.removeCachePool("pool1");
try {
proto.removeCachePool("pool1");
Assert.fail("expected to get an exception when " +
"removing a non-existent pool.");
} catch (IOException ioe) {
GenericTestUtils.assertExceptionContains("can't remove " +
"nonexistent cache pool", ioe);
}
req = new CachePoolInfo("pool2");
proto.addCachePool(req);
}
@Test
public void testCreateAndModifyPools() throws Exception {
Configuration conf = new HdfsConfiguration();
MiniDFSCluster cluster = null;
// set low limits here for testing purposes // set low limits here for testing purposes
conf.setInt(DFSConfigKeys.DFS_NAMENODE_LIST_CACHE_POOLS_NUM_RESPONSES, 2); conf.setInt(DFSConfigKeys.DFS_NAMENODE_LIST_CACHE_POOLS_NUM_RESPONSES, 2);
conf.setInt(DFSConfigKeys.DFS_NAMENODE_LIST_CACHE_DESCRIPTORS_NUM_RESPONSES, 2); conf.setInt(DFSConfigKeys.DFS_NAMENODE_LIST_CACHE_DESCRIPTORS_NUM_RESPONSES, 2);
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build(); cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
cluster.waitActive(); cluster.waitActive();
NamenodeProtocols proto = cluster.getNameNodeRpc(); dfs = cluster.getFileSystem();
proto.addCachePool(new CachePoolInfo("pool1"). proto = cluster.getNameNodeRpc();
setOwnerName("abc").setGroupName("123"). }
setMode(new FsPermission((short)0755)).setWeight(150));
RemoteIterator<CachePoolInfo> iter = proto.listCachePools(""); @After
public void teardown() throws Exception {
if (cluster != null) {
cluster.shutdown();
}
}
@Test
public void testBasicPoolOperations() throws Exception {
final String poolName = "pool1";
CachePoolInfo info = new CachePoolInfo(poolName).
setOwnerName("bob").setGroupName("bobgroup").
setMode(new FsPermission((short)0755)).setWeight(150);
// Add a pool
dfs.addCachePool(info);
// Do some bad addCachePools
try {
dfs.addCachePool(info);
fail("added the pool with the same name twice");
} catch (IOException ioe) {
GenericTestUtils.assertExceptionContains("pool1 already exists", ioe);
}
try {
dfs.addCachePool(new CachePoolInfo(""));
fail("added empty pool");
} catch (IOException ioe) {
GenericTestUtils.assertExceptionContains("invalid empty cache pool name",
ioe);
}
try {
dfs.addCachePool(null);
fail("added null pool");
} catch (IOException ioe) {
GenericTestUtils.assertExceptionContains("CachePoolInfo is null", ioe);
}
try {
proto.addCachePool(new CachePoolInfo(""));
fail("added empty pool");
} catch (IOException ioe) {
GenericTestUtils.assertExceptionContains("invalid empty cache pool name",
ioe);
}
try {
proto.addCachePool(null);
fail("added null pool");
} catch (IOException ioe) {
GenericTestUtils.assertExceptionContains("CachePoolInfo is null", ioe);
}
// Modify the pool
info.setOwnerName("jane").setGroupName("janegroup")
.setMode(new FsPermission((short)0700)).setWeight(314);
dfs.modifyCachePool(info);
// Do some invalid modify pools
try {
dfs.modifyCachePool(new CachePoolInfo("fool"));
fail("modified non-existent cache pool");
} catch (IOException ioe) {
GenericTestUtils.assertExceptionContains("fool does not exist", ioe);
}
try {
dfs.modifyCachePool(new CachePoolInfo(""));
fail("modified empty pool");
} catch (IOException ioe) {
GenericTestUtils.assertExceptionContains("invalid empty cache pool name",
ioe);
}
try {
dfs.modifyCachePool(null);
fail("modified null pool");
} catch (IOException ioe) {
GenericTestUtils.assertExceptionContains("CachePoolInfo is null", ioe);
}
try {
proto.modifyCachePool(new CachePoolInfo(""));
fail("modified empty pool");
} catch (IOException ioe) {
GenericTestUtils.assertExceptionContains("invalid empty cache pool name",
ioe);
}
try {
proto.modifyCachePool(null);
fail("modified null pool");
} catch (IOException ioe) {
GenericTestUtils.assertExceptionContains("CachePoolInfo is null", ioe);
}
// Remove the pool
dfs.removeCachePool(poolName);
// Do some bad removePools
try {
dfs.removeCachePool("pool99");
fail("expected to get an exception when " +
"removing a non-existent pool.");
} catch (IOException ioe) {
GenericTestUtils.assertExceptionContains("can't remove " +
"non-existent cache pool", ioe);
}
try {
dfs.removeCachePool(poolName);
Assert.fail("expected to get an exception when " +
"removing a non-existent pool.");
} catch (IOException ioe) {
GenericTestUtils.assertExceptionContains("can't remove " +
"non-existent cache pool", ioe);
}
try {
dfs.removeCachePool("");
fail("removed empty pool");
} catch (IOException ioe) {
GenericTestUtils.assertExceptionContains("invalid empty cache pool name",
ioe);
}
try {
dfs.removeCachePool(null);
fail("removed null pool");
} catch (IOException ioe) {
GenericTestUtils.assertExceptionContains("invalid empty cache pool name",
ioe);
}
try {
proto.removeCachePool("");
fail("removed empty pool");
} catch (IOException ioe) {
GenericTestUtils.assertExceptionContains("invalid empty cache pool name",
ioe);
}
try {
proto.removeCachePool(null);
fail("removed null pool");
} catch (IOException ioe) {
GenericTestUtils.assertExceptionContains("invalid empty cache pool name",
ioe);
}
info = new CachePoolInfo("pool2");
dfs.addCachePool(info);
}
@Test
public void testCreateAndModifyPools() throws Exception {
String poolName = "pool1";
String ownerName = "abc";
String groupName = "123";
FsPermission mode = new FsPermission((short)0755);
int weight = 150;
dfs.addCachePool(new CachePoolInfo(poolName).
setOwnerName(ownerName).setGroupName(groupName).
setMode(mode).setWeight(weight));
RemoteIterator<CachePoolInfo> iter = dfs.listCachePools();
CachePoolInfo info = iter.next(); CachePoolInfo info = iter.next();
assertEquals("pool1", info.getPoolName()); assertEquals(poolName, info.getPoolName());
assertEquals("abc", info.getOwnerName()); assertEquals(ownerName, info.getOwnerName());
assertEquals("123", info.getGroupName()); assertEquals(groupName, info.getGroupName());
proto.modifyCachePool(new CachePoolInfo("pool1").
setOwnerName("def").setGroupName("456")); ownerName = "def";
iter = proto.listCachePools(""); groupName = "456";
mode = new FsPermission((short)0700);
weight = 151;
dfs.modifyCachePool(new CachePoolInfo(poolName).
setOwnerName(ownerName).setGroupName(groupName).
setMode(mode).setWeight(weight));
iter = dfs.listCachePools();
info = iter.next(); info = iter.next();
assertEquals("pool1", info.getPoolName()); assertEquals(poolName, info.getPoolName());
assertEquals("def", info.getOwnerName()); assertEquals(ownerName, info.getOwnerName());
assertEquals("456", info.getGroupName()); assertEquals(groupName, info.getGroupName());
assertEquals(new FsPermission((short)0755), info.getMode()); assertEquals(mode, info.getMode());
assertEquals(Integer.valueOf(150), info.getWeight()); assertEquals(Integer.valueOf(weight), info.getWeight());
dfs.removeCachePool(poolName);
iter = dfs.listCachePools();
assertFalse("expected no cache pools after deleting pool", iter.hasNext());
proto.listCachePools(null);
try { try {
proto.removeCachePool("pool99"); proto.removeCachePool("pool99");
Assert.fail("expected to get an exception when " + Assert.fail("expected to get an exception when " +
"removing a non-existent pool."); "removing a non-existent pool.");
} catch (IOException ioe) { } catch (IOException ioe) {
GenericTestUtils.assertExceptionContains("can't remove non-existent",
ioe);
} }
proto.removeCachePool("pool1");
try { try {
proto.removeCachePool("pool1"); proto.removeCachePool(poolName);
Assert.fail("expected to get an exception when " + Assert.fail("expected to get an exception when " +
"removing a non-existent pool."); "removing a non-existent pool.");
} catch (IOException ioe) { } catch (IOException ioe) {
GenericTestUtils.assertExceptionContains("can't remove non-existent",
ioe);
} }
iter = dfs.listCachePools();
assertFalse("expected no cache pools after deleting pool", iter.hasNext());
} }
private static void validateListAll( private static void validateListAll(
RemoteIterator<PathBasedCacheDescriptor> iter, RemoteIterator<PathBasedCacheDescriptor> iter,
long id0, long id1, long id2) throws Exception { PathBasedCacheDescriptor... descriptors) throws Exception {
Assert.assertEquals(new PathBasedCacheDescriptor(id0, for (PathBasedCacheDescriptor descriptor: descriptors) {
"/alpha", "pool1"), iter.next()); assertTrue("Unexpectedly few elements", iter.hasNext());
Assert.assertEquals(new PathBasedCacheDescriptor(id1, assertEquals("Unexpected descriptor", descriptor, iter.next());
"/beta", "pool2"), iter.next()); }
Assert.assertEquals(new PathBasedCacheDescriptor(id2, assertFalse("Unexpectedly many list elements", iter.hasNext());
"/gamma", "pool1"), iter.next()); }
Assert.assertFalse(iter.hasNext());
private static PathBasedCacheDescriptor addAsUnprivileged(
final PathBasedCacheDirective directive) throws Exception {
return unprivilegedUser
.doAs(new PrivilegedExceptionAction<PathBasedCacheDescriptor>() {
@Override
public PathBasedCacheDescriptor run() throws IOException {
DistributedFileSystem myDfs =
(DistributedFileSystem) FileSystem.get(conf);
return myDfs.addPathBasedCacheDirective(directive);
}
});
} }
@Test @Test
public void testSetAndGet() throws Exception { public void testAddRemoveDirectives() throws Exception {
Configuration conf = new HdfsConfiguration(); proto.addCachePool(new CachePoolInfo("pool1").
MiniDFSCluster cluster = null; setMode(new FsPermission((short)0777)));
proto.addCachePool(new CachePoolInfo("pool2").
setMode(new FsPermission((short)0777)));
proto.addCachePool(new CachePoolInfo("pool3").
setMode(new FsPermission((short)0777)));
proto.addCachePool(new CachePoolInfo("pool4").
setMode(new FsPermission((short)0)));
PathBasedCacheDirective alpha =
new PathBasedCacheDirective("/alpha", "pool1");
PathBasedCacheDirective beta =
new PathBasedCacheDirective("/beta", "pool2");
PathBasedCacheDirective delta =
new PathBasedCacheDirective("/delta", "pool1");
PathBasedCacheDescriptor alphaD = addAsUnprivileged(alpha);
PathBasedCacheDescriptor alphaD2 = addAsUnprivileged(alpha);
assertEquals("Expected to get the same descriptor when re-adding"
+ "an existing PathBasedCacheDirective", alphaD, alphaD2);
PathBasedCacheDescriptor betaD = addAsUnprivileged(beta);
try { try {
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build(); addAsUnprivileged(new PathBasedCacheDirective("", "pool3"));
cluster.waitActive(); fail("expected an error when adding an empty path");
final NamenodeProtocols proto = cluster.getNameNodeRpc(); } catch (IOException ioe) {
proto.addCachePool(new CachePoolInfo("pool1"). assertTrue(ioe instanceof EmptyPathError);
setMode(new FsPermission((short)0777)));
proto.addCachePool(new CachePoolInfo("pool2").
setMode(new FsPermission((short)0777)));
proto.addCachePool(new CachePoolInfo("pool3").
setMode(new FsPermission((short)0777)));
proto.addCachePool(new CachePoolInfo("pool4").
setMode(new FsPermission((short)0)));
List<Fallible<PathBasedCacheDescriptor>> addResults1 =
unprivilegedUser.doAs(new PrivilegedExceptionAction<
List<Fallible<PathBasedCacheDescriptor>>>() {
@Override
public List<Fallible<PathBasedCacheDescriptor>> run() throws IOException {
return proto.addPathBasedCacheDirectives(Arrays.asList(
new PathBasedCacheDirective[] {
new PathBasedCacheDirective("/alpha", "pool1"),
new PathBasedCacheDirective("/beta", "pool2"),
new PathBasedCacheDirective("", "pool3"),
new PathBasedCacheDirective("/zeta", "nonexistent_pool"),
new PathBasedCacheDirective("/zeta", "pool4"),
new PathBasedCacheDirective("//illegal/path/", "pool1")
}));
}
});
long ids1[] = new long[2];
ids1[0] = addResults1.get(0).get().getEntryId();
ids1[1] = addResults1.get(1).get().getEntryId();
try {
addResults1.get(2).get();
Assert.fail("expected an error when adding an empty path");
} catch (IOException ioe) {
Assert.assertTrue(ioe.getCause() instanceof EmptyPathError);
}
try {
addResults1.get(3).get();
Assert.fail("expected an error when adding to a nonexistent pool.");
} catch (IOException ioe) {
Assert.assertTrue(ioe.getCause() instanceof InvalidPoolNameError);
}
try {
addResults1.get(4).get();
Assert.fail("expected an error when adding to a pool with " +
"mode 0 (no permissions for anyone).");
} catch (IOException ioe) {
Assert.assertTrue(ioe.getCause()
instanceof PoolWritePermissionDeniedError);
}
try {
addResults1.get(5).get();
Assert.fail("expected an error when adding a malformed path " +
"to the cache directives.");
} catch (IOException ioe) {
//Assert.assertTrue(ioe.getCause()
//instanceof PoolWritePermissionDeniedError);
}
List<Fallible<PathBasedCacheDescriptor>> addResults2 =
proto.addPathBasedCacheDirectives(Arrays.asList(
new PathBasedCacheDirective[] {
new PathBasedCacheDirective("/alpha", "pool1"),
new PathBasedCacheDirective("/theta", ""),
new PathBasedCacheDirective("bogus", "pool1"),
new PathBasedCacheDirective("/gamma", "pool1")
}));
long id = addResults2.get(0).get().getEntryId();
Assert.assertEquals("expected to get back the same ID as last time " +
"when re-adding an existing PathBasedCache directive.", ids1[0], id);
try {
addResults2.get(1).get();
Assert.fail("expected an error when adding a PathBasedCache " +
"directive with an empty pool name.");
} catch (IOException ioe) {
Assert.assertTrue(ioe.getCause() instanceof InvalidPoolNameError);
}
try {
addResults2.get(2).get();
Assert.fail("expected an error when adding a PathBasedCache " +
"directive with a non-absolute path name.");
} catch (IOException ioe) {
Assert.assertTrue(ioe.getCause() instanceof InvalidPathNameError);
}
long ids2[] = new long[1];
ids2[0] = addResults2.get(3).get().getEntryId();
RemoteIterator<PathBasedCacheDescriptor> iter =
proto.listPathBasedCacheDescriptors(0, null, null);
validateListAll(iter, ids1[0], ids1[1], ids2[0]);
iter = proto.listPathBasedCacheDescriptors(0, null, null);
validateListAll(iter, ids1[0], ids1[1], ids2[0]);
iter = proto.listPathBasedCacheDescriptors(0, "pool3", null);
Assert.assertFalse(iter.hasNext());
iter = proto.listPathBasedCacheDescriptors(0, "pool2", null);
Assert.assertEquals(addResults1.get(1).get(),
iter.next());
Assert.assertFalse(iter.hasNext());
List<Fallible<Long>> removeResults1 =
proto.removePathBasedCacheDescriptors(Arrays.asList(
new Long[] { ids1[1], -42L, 999999L }));
Assert.assertEquals(Long.valueOf(ids1[1]),
removeResults1.get(0).get());
try {
removeResults1.get(1).get();
Assert.fail("expected an error when removing a negative ID");
} catch (IOException ioe) {
Assert.assertTrue(ioe.getCause() instanceof InvalidIdException);
}
try {
removeResults1.get(2).get();
Assert.fail("expected an error when removing a nonexistent ID");
} catch (IOException ioe) {
Assert.assertTrue(ioe.getCause() instanceof NoSuchIdException);
}
iter = proto.listPathBasedCacheDescriptors(0, "pool2", null);
Assert.assertFalse(iter.hasNext());
} finally {
if (cluster != null) { cluster.shutdown(); }
} }
try {
addAsUnprivileged(new PathBasedCacheDirective("/unicorn", "no_such_pool"));
fail("expected an error when adding to a non-existent pool.");
} catch (IOException ioe) {
assertTrue(ioe instanceof InvalidPoolNameError);
}
try {
addAsUnprivileged(new PathBasedCacheDirective("/blackhole", "pool4"));
fail("expected an error when adding to a pool with " +
"mode 0 (no permissions for anyone).");
} catch (IOException ioe) {
assertTrue(ioe instanceof PoolWritePermissionDeniedError);
}
try {
addAsUnprivileged(new PathBasedCacheDirective("//illegal/path/", "pool1"));
fail("expected an error when adding a malformed path " +
"to the cache directives.");
} catch (IOException ioe) {
assertTrue(ioe instanceof InvalidPathNameError);
}
try {
addAsUnprivileged(new PathBasedCacheDirective("/emptypoolname", ""));
Assert.fail("expected an error when adding a PathBasedCache " +
"directive with an empty pool name.");
} catch (IOException ioe) {
Assert.assertTrue(ioe instanceof InvalidPoolNameError);
}
try {
addAsUnprivileged(new PathBasedCacheDirective("bogus", "pool1"));
Assert.fail("expected an error when adding a PathBasedCache " +
"directive with a non-absolute path name.");
} catch (IOException ioe) {
Assert.assertTrue(ioe instanceof InvalidPathNameError);
}
PathBasedCacheDescriptor deltaD = addAsUnprivileged(delta);
RemoteIterator<PathBasedCacheDescriptor> iter;
iter = proto.listPathBasedCacheDescriptors(0, null, null);
validateListAll(iter, alphaD, betaD, deltaD);
iter = proto.listPathBasedCacheDescriptors(0, "pool3", null);
Assert.assertFalse(iter.hasNext());
iter = proto.listPathBasedCacheDescriptors(0, "pool1", null);
validateListAll(iter, alphaD, deltaD);
iter = proto.listPathBasedCacheDescriptors(0, "pool2", null);
validateListAll(iter, betaD);
dfs.removePathBasedCacheDescriptor(betaD);
iter = proto.listPathBasedCacheDescriptors(0, "pool2", null);
Assert.assertFalse(iter.hasNext());
try {
dfs.removePathBasedCacheDescriptor(betaD);
Assert.fail("expected an error when removing a non-existent ID");
} catch (IOException ioe) {
Assert.assertTrue(ioe instanceof NoSuchIdException);
}
try {
proto.removePathBasedCacheDescriptor(-42l);
Assert.fail("expected an error when removing a negative ID");
} catch (IOException ioe) {
Assert.assertTrue(ioe instanceof InvalidIdException);
}
try {
proto.removePathBasedCacheDescriptor(43l);
Assert.fail("expected an error when removing a non-existent ID");
} catch (IOException ioe) {
Assert.assertTrue(ioe instanceof NoSuchIdException);
}
dfs.removePathBasedCacheDescriptor(alphaD);
dfs.removePathBasedCacheDescriptor(deltaD);
iter = proto.listPathBasedCacheDescriptors(0, null, null);
assertFalse(iter.hasNext());
} }
} }