HDFS-5326. add modifyDirective to cacheAdmin (cmccabe)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1539839 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Colin McCabe 2013-11-07 22:07:16 +00:00
parent 3fccdec6e0
commit f79b3e6b17
34 changed files with 1178 additions and 1047 deletions

View File

@ -0,0 +1,31 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs;
import java.io.IOException;
/**
* Exception corresponding to ID not found - EINVAL
*/
public class IdNotFoundException extends IOException {
static final long serialVersionUID = 0L;
public IdNotFoundException(String str) {
super(str);
}
}

View File

@ -188,6 +188,8 @@ Trunk (Unreleased)
HDFS-5386. Add feature documentation for datanode caching.
(Colin Patrick McCabe via cnauroth)
HDFS-5326. add modifyDirective to cacheAdmin. (cmccabe)
OPTIMIZATIONS
HDFS-5349. DNA_CACHE and DNA_UNCACHE should be by blockId only. (cmccabe)

View File

@ -117,7 +117,6 @@ import org.apache.hadoop.hdfs.protocol.DirectoryListing;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsBlocksMetadata;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.PathBasedCacheDescriptor;
import org.apache.hadoop.hdfs.protocol.PathBasedCacheDirective;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
@ -2291,7 +2290,7 @@ public class DFSClient implements java.io.Closeable {
}
}
public PathBasedCacheDescriptor addPathBasedCacheDirective(
public long addPathBasedCacheDirective(
PathBasedCacheDirective directive) throws IOException {
checkOpen();
try {
@ -2301,21 +2300,31 @@ public class DFSClient implements java.io.Closeable {
}
}
public void removePathBasedCacheDescriptor(long id)
public void modifyPathBasedCacheDirective(
PathBasedCacheDirective directive) throws IOException {
checkOpen();
try {
namenode.modifyPathBasedCacheDirective(directive);
} catch (RemoteException re) {
throw re.unwrapRemoteException();
}
}
public void removePathBasedCacheDirective(long id)
throws IOException {
checkOpen();
try {
namenode.removePathBasedCacheDescriptor(id);
namenode.removePathBasedCacheDirective(id);
} catch (RemoteException re) {
throw re.unwrapRemoteException();
}
}
public RemoteIterator<PathBasedCacheDescriptor> listPathBasedCacheDescriptors(
String pool, String path) throws IOException {
public RemoteIterator<PathBasedCacheDirective> listPathBasedCacheDirectives(
PathBasedCacheDirective filter) throws IOException {
checkOpen();
try {
return namenode.listPathBasedCacheDescriptors(0, pool, path);
return namenode.listPathBasedCacheDirectives(0, filter);
} catch (RemoteException re) {
throw re.unwrapRemoteException();
}

View File

@ -211,9 +211,9 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final String DFS_NAMENODE_LIST_CACHE_POOLS_NUM_RESPONSES =
"dfs.namenode.list.cache.pools.num.responses";
public static final int DFS_NAMENODE_LIST_CACHE_POOLS_NUM_RESPONSES_DEFAULT = 100;
public static final String DFS_NAMENODE_LIST_CACHE_DESCRIPTORS_NUM_RESPONSES =
"dfs.namenode.list.cache.descriptors.num.responses";
public static final int DFS_NAMENODE_LIST_CACHE_DESCRIPTORS_NUM_RESPONSES_DEFAULT = 100;
public static final String DFS_NAMENODE_LIST_CACHE_DIRECTIVES_NUM_RESPONSES =
"dfs.namenode.list.cache.directives.num.responses";
public static final int DFS_NAMENODE_LIST_CACHE_DIRECTIVES_NUM_RESPONSES_DEFAULT = 100;
public static final String DFS_NAMENODE_PATH_BASED_CACHE_REFRESH_INTERVAL_MS =
"dfs.namenode.path.based.cache.refresh.interval.ms";
public static final long DFS_NAMENODE_PATH_BASED_CACHE_REFRESH_INTERVAL_MS_DEFAULT = 300000L;

View File

@ -68,7 +68,6 @@ import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.protocol.HdfsLocatedFileStatus;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.PathBasedCacheDirective;
import org.apache.hadoop.hdfs.protocol.PathBasedCacheDescriptor;
import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport;
import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus;
import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
@ -82,6 +81,7 @@ import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.Progressable;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
/****************************************************************
@ -1586,57 +1586,74 @@ public class DistributedFileSystem extends FileSystem {
/**
* Add a new PathBasedCacheDirective.
*
* @param directive A PathBasedCacheDirectives to add
* @return PathBasedCacheDescriptor associated with the added directive
* @param directive A directive to add.
* @return the ID of the directive that was created.
* @throws IOException if the directive could not be added
*/
public PathBasedCacheDescriptor addPathBasedCacheDirective(
public long addPathBasedCacheDirective(
PathBasedCacheDirective directive) throws IOException {
Preconditions.checkNotNull(directive.getPath());
Path path = new Path(getPathName(fixRelativePart(directive.getPath()))).
makeQualified(getUri(), getWorkingDirectory());
return dfs.addPathBasedCacheDirective(new PathBasedCacheDirective.Builder().
setPath(path).
setReplication(directive.getReplication()).
setPool(directive.getPool()).
build());
return dfs.addPathBasedCacheDirective(
new PathBasedCacheDirective.Builder(directive).
setPath(path).
build());
}
public void modifyPathBasedCacheDirective(
PathBasedCacheDirective directive) throws IOException {
if (directive.getPath() != null) {
directive = new PathBasedCacheDirective.Builder(directive).
setPath(new Path(getPathName(fixRelativePart(directive.getPath()))).
makeQualified(getUri(), getWorkingDirectory())).build();
}
dfs.modifyPathBasedCacheDirective(directive);
}
/**
* Remove a PathBasedCacheDescriptor.
* Remove a PathBasedCacheDirective.
*
* @param descriptor PathBasedCacheDescriptor to remove
* @throws IOException if the descriptor could not be removed
* @param id identifier of the PathBasedCacheDirective to remove
* @throws IOException if the directive could not be removed
*/
public void removePathBasedCacheDescriptor(PathBasedCacheDescriptor descriptor)
public void removePathBasedCacheDirective(long id)
throws IOException {
dfs.removePathBasedCacheDescriptor(descriptor.getEntryId());
dfs.removePathBasedCacheDirective(id);
}
/**
* List the set of cached paths of a cache pool. Incrementally fetches results
* from the server.
*
* @param pool The cache pool to list, or null to list all pools.
* @param path The path name to list, or null to list all paths.
* @return A RemoteIterator which returns PathBasedCacheDescriptor objects.
* @param filter Filter parameters to use when listing the directives, null to
* list all directives visible to us.
* @return A RemoteIterator which returns PathBasedCacheDirective objects.
*/
public RemoteIterator<PathBasedCacheDescriptor> listPathBasedCacheDescriptors(
String pool, final Path path) throws IOException {
String pathName = path != null ? getPathName(fixRelativePart(path)) : null;
final RemoteIterator<PathBasedCacheDescriptor> iter =
dfs.listPathBasedCacheDescriptors(pool, pathName);
return new RemoteIterator<PathBasedCacheDescriptor>() {
public RemoteIterator<PathBasedCacheDirective> listPathBasedCacheDirectives(
PathBasedCacheDirective filter) throws IOException {
if (filter == null) {
filter = new PathBasedCacheDirective.Builder().build();
}
if (filter.getPath() != null) {
filter = new PathBasedCacheDirective.Builder(filter).
setPath(filter.getPath().
makeQualified(getUri(), filter.getPath())).
build();
}
final RemoteIterator<PathBasedCacheDirective> iter =
dfs.listPathBasedCacheDirectives(filter);
return new RemoteIterator<PathBasedCacheDirective>() {
@Override
public boolean hasNext() throws IOException {
return iter.hasNext();
}
@Override
public PathBasedCacheDescriptor next() throws IOException {
PathBasedCacheDescriptor desc = iter.next();
Path qualPath = desc.getPath().makeQualified(getUri(), path);
return new PathBasedCacheDescriptor(desc.getEntryId(), qualPath,
desc.getReplication(), desc.getPool());
public PathBasedCacheDirective next() throws IOException {
PathBasedCacheDirective desc = iter.next();
Path p = desc.getPath().makeQualified(getUri(), desc.getPath());
return new PathBasedCacheDirective.Builder(desc).setPath(p).build();
}
};
}

View File

@ -1,108 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.protocol;
import java.io.IOException;
/**
* An exception which occurred when trying to add a PathBasedCache directive.
*/
public abstract class AddPathBasedCacheDirectiveException extends IOException {
private static final long serialVersionUID = 1L;
public AddPathBasedCacheDirectiveException(String description) {
super(description);
}
public static final class EmptyPathError
extends AddPathBasedCacheDirectiveException {
private static final long serialVersionUID = 1L;
public EmptyPathError() {
super("empty path in directive");
}
}
public static class InvalidPathNameError
extends AddPathBasedCacheDirectiveException {
private static final long serialVersionUID = 1L;
public InvalidPathNameError(String msg) {
super(msg);
}
public InvalidPathNameError(PathBasedCacheDirective directive) {
this("can't handle invalid path name " + directive.getPath());
}
}
public static class InvalidPoolNameError
extends AddPathBasedCacheDirectiveException {
private static final long serialVersionUID = 1L;
public InvalidPoolNameError(String msg) {
super(msg);
}
public InvalidPoolNameError(PathBasedCacheDirective directive) {
this("invalid pool name '" + directive.getPool() + "'");
}
}
public static class PoolWritePermissionDeniedError
extends AddPathBasedCacheDirectiveException {
private static final long serialVersionUID = 1L;
public PoolWritePermissionDeniedError(String msg) {
super(msg);
}
public PoolWritePermissionDeniedError(PathBasedCacheDirective directive) {
this("write permission denied for pool '" + directive.getPool() + "'");
}
}
public static class PathAlreadyExistsInPoolError
extends AddPathBasedCacheDirectiveException {
private static final long serialVersionUID = 1L;
public PathAlreadyExistsInPoolError(String msg) {
super(msg);
}
public PathAlreadyExistsInPoolError(PathBasedCacheDirective directive) {
this("path " + directive.getPath() + " already exists in pool " +
directive.getPool());
}
}
public static class UnexpectedAddPathBasedCacheDirectiveException
extends AddPathBasedCacheDirectiveException {
private static final long serialVersionUID = 1L;
public UnexpectedAddPathBasedCacheDirectiveException(String msg) {
super(msg);
}
public UnexpectedAddPathBasedCacheDirectiveException(
PathBasedCacheDirective directive) {
this("encountered an unexpected error when trying to " +
"add PathBasedCache directive " + directive);
}
}
}

View File

@ -1099,35 +1099,46 @@ public interface ClientProtocol {
* Add a PathBasedCache entry to the CacheManager.
*
* @param directive A PathBasedCacheDirective to be added
* @return A PathBasedCacheDescriptor associated with the added directive
* @return A PathBasedCacheDirective associated with the added directive
* @throws IOException if the directive could not be added
*/
@AtMostOnce
public PathBasedCacheDescriptor addPathBasedCacheDirective(
public long addPathBasedCacheDirective(
PathBasedCacheDirective directive) throws IOException;
/**
* Remove a PathBasedCacheDescriptor from the CacheManager.
* Modify a PathBasedCache entry in the CacheManager.
*
* @param id of a PathBasedCacheDescriptor
* @throws IOException if the cache descriptor could not be removed
* @return directive The directive to modify. Must contain
* a directive ID.
* @throws IOException if the directive could not be modified
*/
@AtMostOnce
public void removePathBasedCacheDescriptor(Long id) throws IOException;
public void modifyPathBasedCacheDirective(
PathBasedCacheDirective directive) throws IOException;
/**
* Remove a PathBasedCacheDirective from the CacheManager.
*
* @param id of a PathBasedCacheDirective
* @throws IOException if the cache directive could not be removed
*/
@AtMostOnce
public void removePathBasedCacheDirective(long id) throws IOException;
/**
* List the set of cached paths of a cache pool. Incrementally fetches results
* from the server.
*
* @param prevId The last listed entry ID, or -1 if this is the first call to
* listPathBasedCacheDescriptors.
* @param pool The cache pool to list, or null to list all pools.
* @param path The path name to list, or null to list all paths.
* @return A RemoteIterator which returns PathBasedCacheDescriptor objects.
* listPathBasedCacheDirectives.
* @param filter Parameters to use to filter the list results,
* or null to display all directives visible to us.
* @return A RemoteIterator which returns PathBasedCacheDirective objects.
*/
@Idempotent
public RemoteIterator<PathBasedCacheDescriptor> listPathBasedCacheDescriptors(
long prevId, String pool, String path) throws IOException;
public RemoteIterator<PathBasedCacheDirective> listPathBasedCacheDirectives(
long prevId, PathBasedCacheDirective filter) throws IOException;
/**
* Add a new cache pool.

View File

@ -1,82 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.protocol;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.Path;
import org.apache.commons.lang.builder.EqualsBuilder;
import org.apache.commons.lang.builder.HashCodeBuilder;
import com.google.common.base.Preconditions;
/**
* A directive in a cache pool that includes an identifying ID number.
*/
@InterfaceStability.Evolving
@InterfaceAudience.Public
public final class PathBasedCacheDescriptor extends PathBasedCacheDirective {
private final long entryId;
public PathBasedCacheDescriptor(long entryId, Path path,
short replication, String pool) {
super(path, replication, pool);
Preconditions.checkArgument(entryId > 0);
this.entryId = entryId;
}
public long getEntryId() {
return entryId;
}
@Override
public boolean equals(Object o) {
if (o == null) {
return false;
}
if (getClass() != o.getClass()) {
return false;
}
PathBasedCacheDescriptor other = (PathBasedCacheDescriptor)o;
return new EqualsBuilder().append(entryId, other.entryId).
append(getPath(), other.getPath()).
append(getReplication(), other.getReplication()).
append(getPool(), other.getPool()).
isEquals();
}
@Override
public int hashCode() {
return new HashCodeBuilder().append(entryId).
append(getPath()).
append(getReplication()).
append(getPool()).
hashCode();
}
@Override
public String toString() {
StringBuilder builder = new StringBuilder();
builder.append("{ entryId:").append(entryId).
append(", path:").append(getPath()).
append(", replication:").append(getReplication()).
append(", pool:").append(getPool()).
append(" }");
return builder.toString();
}
};

View File

@ -17,32 +17,27 @@
*/
package org.apache.hadoop.hdfs.protocol;
import java.io.IOException;
import com.google.common.base.Preconditions;
import java.net.URI;
import org.apache.commons.lang.builder.EqualsBuilder;
import org.apache.commons.lang.builder.HashCodeBuilder;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.protocol.AddPathBasedCacheDirectiveException.InvalidPoolNameError;
import org.apache.hadoop.hdfs.protocol.AddPathBasedCacheDirectiveException.InvalidPathNameError;
/**
* A directive to add a path to a cache pool.
* Describes a path-based cache directive.
*/
@InterfaceStability.Evolving
@InterfaceAudience.Public
public class PathBasedCacheDirective {
/**
* A builder for creating new PathBasedCacheDirective instances.
*/
public static class Builder {
private Long id;
private Path path;
private short replication = (short)1;
private Short replication;
private String pool;
/**
@ -51,7 +46,37 @@ public class PathBasedCacheDirective {
* @return New PathBasedCacheDirective.
*/
public PathBasedCacheDirective build() {
return new PathBasedCacheDirective(path, replication, pool);
return new PathBasedCacheDirective(id, path, replication, pool);
}
/**
* Creates an empty builder.
*/
public Builder() {
}
/**
* Creates a builder with all elements set to the same values as the
* given PathBasedCacheDirective.
*/
public Builder(PathBasedCacheDirective directive) {
this.id = directive.getId();
// deep-copy URI
URI uri = directive.getPath().toUri();
this.path = new Path(uri.getScheme(), uri.getAuthority(), uri.getPath());
this.replication = directive.getReplication();
this.pool = directive.getPool();
}
/**
* Sets the id used in this request.
*
* @param id The id used in this request.
* @return This builder, for call chaining.
*/
public Builder setId(Long id) {
this.id = id;
return this;
}
/**
@ -71,7 +96,7 @@ public class PathBasedCacheDirective {
* @param replication The replication used in this request.
* @return This builder, for call chaining.
*/
public Builder setReplication(short replication) {
public Builder setReplication(Short replication) {
this.replication = replication;
return this;
}
@ -88,10 +113,25 @@ public class PathBasedCacheDirective {
}
}
private final Long id;
private final Path path;
private final short replication;
private final Short replication;
private final String pool;
PathBasedCacheDirective(Long id, Path path, Short replication, String pool) {
this.id = id;
this.path = path;
this.replication = replication;
this.pool = pool;
}
/**
* @return The ID of this directive.
*/
public Long getId() {
return id;
}
/**
* @return The path used in this request.
*/
@ -102,7 +142,7 @@ public class PathBasedCacheDirective {
/**
* @return The number of times the block should be cached.
*/
public short getReplication() {
public Short getReplication() {
return replication;
}
@ -113,25 +153,6 @@ public class PathBasedCacheDirective {
return pool;
}
/**
* Check if this PathBasedCacheDirective is valid.
*
* @throws IOException
* If this PathBasedCacheDirective is not valid.
*/
public void validate() throws IOException {
if (!DFSUtil.isValidName(path.toUri().getPath())) {
throw new InvalidPathNameError(this);
}
if (replication <= 0) {
throw new IOException("Tried to request a cache replication " +
"factor of " + replication + ", but that is less than 1.");
}
if (pool.isEmpty()) {
throw new InvalidPoolNameError(this);
}
}
@Override
public boolean equals(Object o) {
if (o == null) {
@ -141,7 +162,8 @@ public class PathBasedCacheDirective {
return false;
}
PathBasedCacheDirective other = (PathBasedCacheDirective)o;
return new EqualsBuilder().append(getPath(), other.getPath()).
return new EqualsBuilder().append(getId(), other.getId()).
append(getPath(), other.getPath()).
append(getReplication(), other.getReplication()).
append(getPool(), other.getPool()).
isEquals();
@ -149,34 +171,35 @@ public class PathBasedCacheDirective {
@Override
public int hashCode() {
return new HashCodeBuilder().append(getPath()).
return new HashCodeBuilder().append(id).
append(path).
append(replication).
append(getPool()).
append(pool).
hashCode();
}
@Override
public String toString() {
StringBuilder builder = new StringBuilder();
builder.append("{ path:").append(path).
append(", replication:").append(replication).
append(", pool:").append(pool).
append(" }");
builder.append("{");
String prefix = "";
if (id != null) {
builder.append(prefix).append("id: ").append(id);
prefix = ",";
}
if (path != null) {
builder.append(prefix).append("path: ").append(path);
prefix = ",";
}
if (replication != null) {
builder.append(prefix).append("replication: ").append(replication);
prefix = ",";
}
if (pool != null) {
builder.append(prefix).append("pool: ").append(pool);
prefix = ",";
}
builder.append("}");
return builder.toString();
}
/**
* Protected constructor. Callers use Builder to create new instances.
*
* @param path The path used in this request.
* @param replication The replication used in this request.
* @param pool The pool used in this request.
*/
protected PathBasedCacheDirective(Path path, short replication, String pool) {
Preconditions.checkNotNull(path);
Preconditions.checkNotNull(pool);
this.path = path;
this.replication = replication;
this.pool = pool;
}
};

View File

@ -64,6 +64,15 @@ public final class PathBasedCacheEntry {
return replication;
}
public PathBasedCacheDirective toDirective() {
return new PathBasedCacheDirective.Builder().
setId(entryId).
setPath(new Path(path)).
setReplication(replication).
setPool(pool.getPoolName()).
build();
}
@Override
public String toString() {
StringBuilder builder = new StringBuilder();
@ -75,11 +84,6 @@ public final class PathBasedCacheEntry {
return builder.toString();
}
public PathBasedCacheDescriptor getDescriptor() {
return new PathBasedCacheDescriptor(entryId, new Path(path), replication,
pool.getPoolName());
}
@Override
public boolean equals(Object o) {
if (o == null) { return false; }

View File

@ -1,85 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.protocol;
import java.io.IOException;
/**
* An exception which occurred when trying to remove a PathBasedCache entry.
*/
public abstract class RemovePathBasedCacheDescriptorException extends IOException {
private static final long serialVersionUID = 1L;
public RemovePathBasedCacheDescriptorException(String description) {
super(description);
}
public final static class InvalidIdException
extends RemovePathBasedCacheDescriptorException {
private static final long serialVersionUID = 1L;
public InvalidIdException(String msg) {
super(msg);
}
public InvalidIdException(long entryId) {
this("invalid PathBasedCacheDescriptor id " + entryId);
}
}
public final static class RemovePermissionDeniedException
extends RemovePathBasedCacheDescriptorException {
private static final long serialVersionUID = 1L;
public RemovePermissionDeniedException(String msg) {
super(msg);
}
public RemovePermissionDeniedException(long entryId) {
this("permission denied when trying to remove " +
"PathBasedCacheDescriptor id " + entryId);
}
}
public final static class NoSuchIdException
extends RemovePathBasedCacheDescriptorException {
private static final long serialVersionUID = 1L;
public NoSuchIdException(String msg) {
super(msg);
}
public NoSuchIdException(long entryId) {
this("there is no PathBasedCacheDescriptor with id " + entryId);
}
}
public final static class UnexpectedRemovePathBasedCacheDescriptorException
extends RemovePathBasedCacheDescriptorException {
private static final long serialVersionUID = 1L;
public UnexpectedRemovePathBasedCacheDescriptorException(String msg) {
super(msg);
}
public UnexpectedRemovePathBasedCacheDescriptorException(long id) {
this("encountered an unexpected error when trying to " +
"remove PathBasedCacheDescriptor with id " + id);
}
}
}

View File

@ -1,78 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.protocol;
import java.io.IOException;
import com.google.common.base.Preconditions;
/**
* An exception which occurred when trying to remove a PathBasedCache entry.
*/
public abstract class RemovePathBasedCacheEntryException extends IOException {
private static final long serialVersionUID = 1L;
private final long entryId;
public RemovePathBasedCacheEntryException(String description, long entryId) {
super(description);
this.entryId = entryId;
}
public long getEntryId() {
return this.entryId;
}
public final static class InvalidIdException
extends RemovePathBasedCacheEntryException {
private static final long serialVersionUID = 1L;
public InvalidIdException(long entryId) {
super("invalid cache path entry id " + entryId, entryId);
}
}
public final static class RemovePermissionDeniedException
extends RemovePathBasedCacheEntryException {
private static final long serialVersionUID = 1L;
public RemovePermissionDeniedException(long entryId) {
super("permission denied when trying to remove PathBasedCache entry id " +
entryId, entryId);
}
}
public final static class NoSuchIdException
extends RemovePathBasedCacheEntryException {
private static final long serialVersionUID = 1L;
public NoSuchIdException(long entryId) {
super("there is no PathBasedCache entry with id " + entryId, entryId);
}
}
public final static class UnexpectedRemovePathBasedCacheEntryException
extends RemovePathBasedCacheEntryException {
private static final long serialVersionUID = 1L;
public UnexpectedRemovePathBasedCacheEntryException(long id) {
super("encountered an unexpected error when trying to " +
"remove PathBasedCache entry id " + id, id);
}
}
}

View File

@ -28,7 +28,6 @@ import org.apache.hadoop.fs.Options.Rename;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.protocol.AddPathBasedCacheDirectiveException.EmptyPathError;
import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.protocol.CorruptFileBlocks;
@ -36,11 +35,7 @@ import org.apache.hadoop.hdfs.protocol.DirectoryListing;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.protocol.PathBasedCacheDescriptor;
import org.apache.hadoop.hdfs.protocol.PathBasedCacheDirective;
import org.apache.hadoop.hdfs.protocol.RemovePathBasedCacheDescriptorException.InvalidIdException;
import org.apache.hadoop.hdfs.protocol.RemovePathBasedCacheDescriptorException.NoSuchIdException;
import org.apache.hadoop.hdfs.protocol.RemovePathBasedCacheDescriptorException.RemovePermissionDeniedException;
import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport;
import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.AbandonBlockRequestProto;
@ -111,24 +106,25 @@ import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListCa
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListCachePoolsResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListCorruptFileBlocksRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListCorruptFileBlocksResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListPathBasedCacheDescriptorsElementProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListPathBasedCacheDescriptorsRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListPathBasedCacheDescriptorsResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListPathBasedCacheDirectivesElementProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListPathBasedCacheDirectivesRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListPathBasedCacheDirectivesResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.MetaSaveRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.MetaSaveResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.MkdirsRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.MkdirsResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ModifyCachePoolRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ModifyCachePoolResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.PathBasedCacheDirectiveProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ModifyPathBasedCacheDirectiveRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ModifyPathBasedCacheDirectiveResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RecoverLeaseRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RecoverLeaseResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RefreshNodesRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RefreshNodesResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RemoveCachePoolRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RemoveCachePoolResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RemovePathBasedCacheDescriptorRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RemovePathBasedCacheDescriptorResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RemovePathBasedCacheDirectiveRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RemovePathBasedCacheDirectiveResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.Rename2RequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.Rename2ResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RenameRequestProto;
@ -177,7 +173,6 @@ import org.apache.hadoop.security.proto.SecurityProtos.GetDelegationTokenRespons
import org.apache.hadoop.security.proto.SecurityProtos.RenewDelegationTokenRequestProto;
import org.apache.hadoop.security.proto.SecurityProtos.RenewDelegationTokenResponseProto;
import org.apache.hadoop.security.token.Token;
import org.apache.commons.lang.StringUtils;
import com.google.common.primitives.Shorts;
@ -1039,69 +1034,64 @@ public class ClientNamenodeProtocolServerSideTranslatorPB implements
RpcController controller, AddPathBasedCacheDirectiveRequestProto request)
throws ServiceException {
try {
PathBasedCacheDirectiveProto proto = request.getDirective();
if (StringUtils.isEmpty(proto.getPath())) {
throw new EmptyPathError();
}
PathBasedCacheDirective directive = new PathBasedCacheDirective.Builder().
setPath(new Path(proto.getPath())).
setReplication(Shorts.checkedCast(proto.getReplication())).
setPool(proto.getPool()).
build();
PathBasedCacheDescriptor descriptor =
server.addPathBasedCacheDirective(directive);
AddPathBasedCacheDirectiveResponseProto.Builder builder =
AddPathBasedCacheDirectiveResponseProto.newBuilder();
builder.setDescriptorId(descriptor.getEntryId());
return builder.build();
return AddPathBasedCacheDirectiveResponseProto.newBuilder().
setId(server.addPathBasedCacheDirective(
PBHelper.convert(request.getInfo()))).build();
} catch (IOException e) {
throw new ServiceException(e);
}
}
@Override
public RemovePathBasedCacheDescriptorResponseProto removePathBasedCacheDescriptor(
RpcController controller,
RemovePathBasedCacheDescriptorRequestProto request)
public ModifyPathBasedCacheDirectiveResponseProto modifyPathBasedCacheDirective(
RpcController controller, ModifyPathBasedCacheDirectiveRequestProto request)
throws ServiceException {
try {
server.removePathBasedCacheDescriptor(request.getDescriptorId());
RemovePathBasedCacheDescriptorResponseProto.Builder builder =
RemovePathBasedCacheDescriptorResponseProto.newBuilder();
return builder.build();
server.modifyPathBasedCacheDirective(
PBHelper.convert(request.getInfo()));
return ModifyPathBasedCacheDirectiveResponseProto.newBuilder().build();
} catch (IOException e) {
throw new ServiceException(e);
}
}
@Override
public ListPathBasedCacheDescriptorsResponseProto listPathBasedCacheDescriptors(
RpcController controller, ListPathBasedCacheDescriptorsRequestProto request)
public RemovePathBasedCacheDirectiveResponseProto
removePathBasedCacheDirective(RpcController controller,
RemovePathBasedCacheDirectiveRequestProto request)
throws ServiceException {
try {
server.removePathBasedCacheDirective(request.getId());
return RemovePathBasedCacheDirectiveResponseProto.
newBuilder().build();
} catch (IOException e) {
throw new ServiceException(e);
}
}
@Override
public ListPathBasedCacheDirectivesResponseProto listPathBasedCacheDirectives(
RpcController controller, ListPathBasedCacheDirectivesRequestProto request)
throws ServiceException {
try {
RemoteIterator<PathBasedCacheDescriptor> iter =
server.listPathBasedCacheDescriptors(request.getPrevId(),
request.hasPool() ? request.getPool() : null,
request.hasPath() ? request.getPath() : null);
ListPathBasedCacheDescriptorsResponseProto.Builder builder =
ListPathBasedCacheDescriptorsResponseProto.newBuilder();
PathBasedCacheDirective filter =
PBHelper.convert(request.getFilter());
RemoteIterator<PathBasedCacheDirective> iter =
server.listPathBasedCacheDirectives(request.getPrevId(), filter);
ListPathBasedCacheDirectivesResponseProto.Builder builder =
ListPathBasedCacheDirectivesResponseProto.newBuilder();
long prevId = 0;
while (iter.hasNext()) {
PathBasedCacheDescriptor directive = iter.next();
PathBasedCacheDirective directive = iter.next();
builder.addElements(
ListPathBasedCacheDescriptorsElementProto.newBuilder().
setId(directive.getEntryId()).
setPath(directive.getPath().toUri().getPath()).
setReplication(directive.getReplication()).
setPool(directive.getPool()));
prevId = directive.getEntryId();
ListPathBasedCacheDirectivesElementProto.newBuilder().
setInfo(PBHelper.convert(directive)));
prevId = directive.getId();
}
if (prevId == 0) {
builder.setHasMore(false);
} else {
iter = server.listPathBasedCacheDescriptors(prevId,
request.hasPool() ? request.getPool() : null,
request.hasPath() ? request.getPath() : null);
iter = server.listPathBasedCacheDirectives(prevId, filter);
builder.setHasMore(iter.hasNext());
}
return builder.build();

View File

@ -51,7 +51,6 @@ import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.protocol.NSQuotaExceededException;
import org.apache.hadoop.hdfs.protocol.PathBasedCacheDescriptor;
import org.apache.hadoop.hdfs.protocol.PathBasedCacheDirective;
import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport;
import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus;
@ -101,17 +100,16 @@ import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListCa
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListCachePoolsResponseElementProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListCachePoolsResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListCorruptFileBlocksRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListPathBasedCacheDescriptorsElementProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListPathBasedCacheDescriptorsRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListPathBasedCacheDescriptorsResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListPathBasedCacheDirectivesRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListPathBasedCacheDirectivesResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.MetaSaveRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.MkdirsRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ModifyCachePoolRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.PathBasedCacheDirectiveProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ModifyPathBasedCacheDirectiveRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RecoverLeaseRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RefreshNodesRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RemoveCachePoolRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RemovePathBasedCacheDescriptorRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RemovePathBasedCacheDirectiveRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.Rename2RequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RenameRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RenameSnapshotRequestProto;
@ -1005,55 +1003,53 @@ public class ClientNamenodeProtocolTranslatorPB implements
}
@Override
public PathBasedCacheDescriptor addPathBasedCacheDirective(
public long addPathBasedCacheDirective(
PathBasedCacheDirective directive) throws IOException {
try {
AddPathBasedCacheDirectiveRequestProto.Builder builder =
AddPathBasedCacheDirectiveRequestProto.newBuilder();
builder.setDirective(PathBasedCacheDirectiveProto.newBuilder()
.setPath(directive.getPath().toUri().getPath())
.setReplication(directive.getReplication())
.setPool(directive.getPool())
.build());
AddPathBasedCacheDirectiveResponseProto result =
rpcProxy.addPathBasedCacheDirective(null, builder.build());
return new PathBasedCacheDescriptor(result.getDescriptorId(),
directive.getPath(), directive.getReplication(),
directive.getPool());
return rpcProxy.addPathBasedCacheDirective(null,
AddPathBasedCacheDirectiveRequestProto.newBuilder().
setInfo(PBHelper.convert(directive)).build()).getId();
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
}
@Override
public void removePathBasedCacheDescriptor(Long id)
public void modifyPathBasedCacheDirective(
PathBasedCacheDirective directive) throws IOException {
try {
rpcProxy.modifyPathBasedCacheDirective(null,
ModifyPathBasedCacheDirectiveRequestProto.newBuilder().
setInfo(PBHelper.convert(directive)).build());
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
}
@Override
public void removePathBasedCacheDirective(long id)
throws IOException {
try {
RemovePathBasedCacheDescriptorRequestProto.Builder builder =
RemovePathBasedCacheDescriptorRequestProto.newBuilder();
builder.setDescriptorId(id);
rpcProxy.removePathBasedCacheDescriptor(null, builder.build());
rpcProxy.removePathBasedCacheDirective(null,
RemovePathBasedCacheDirectiveRequestProto.newBuilder().
setId(id).build());
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
}
private static class BatchedPathBasedCacheEntries
implements BatchedEntries<PathBasedCacheDescriptor> {
private ListPathBasedCacheDescriptorsResponseProto response;
implements BatchedEntries<PathBasedCacheDirective> {
private ListPathBasedCacheDirectivesResponseProto response;
BatchedPathBasedCacheEntries(ListPathBasedCacheDescriptorsResponseProto response) {
BatchedPathBasedCacheEntries(
ListPathBasedCacheDirectivesResponseProto response) {
this.response = response;
}
@Override
public PathBasedCacheDescriptor get(int i) {
ListPathBasedCacheDescriptorsElementProto elementProto =
response.getElements(i);
return new PathBasedCacheDescriptor(elementProto.getId(),
new Path(elementProto.getPath()),
Shorts.checkedCast(elementProto.getReplication()),
elementProto.getPool());
public PathBasedCacheDirective get(int i) {
return PBHelper.convert(response.getElements(i).getInfo());
}
@Override
@ -1068,31 +1064,25 @@ public class ClientNamenodeProtocolTranslatorPB implements
}
private class PathBasedCacheEntriesIterator
extends BatchedRemoteIterator<Long, PathBasedCacheDescriptor> {
private final String pool;
private final String path;
extends BatchedRemoteIterator<Long, PathBasedCacheDirective> {
private final PathBasedCacheDirective filter;
public PathBasedCacheEntriesIterator(long prevKey, String pool, String path) {
public PathBasedCacheEntriesIterator(long prevKey,
PathBasedCacheDirective filter) {
super(prevKey);
this.pool = pool;
this.path = path;
this.filter = filter;
}
@Override
public BatchedEntries<PathBasedCacheDescriptor> makeRequest(
public BatchedEntries<PathBasedCacheDirective> makeRequest(
Long nextKey) throws IOException {
ListPathBasedCacheDescriptorsResponseProto response;
ListPathBasedCacheDirectivesResponseProto response;
try {
ListPathBasedCacheDescriptorsRequestProto.Builder builder =
ListPathBasedCacheDescriptorsRequestProto.newBuilder().setPrevId(nextKey);
if (pool != null) {
builder.setPool(pool);
}
if (path != null) {
builder.setPath(path);
}
ListPathBasedCacheDescriptorsRequestProto req = builder.build();
response = rpcProxy.listPathBasedCacheDescriptors(null, req);
response = rpcProxy.listPathBasedCacheDirectives(null,
ListPathBasedCacheDirectivesRequestProto.newBuilder().
setPrevId(nextKey).
setFilter(PBHelper.convert(filter)).
build());
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
@ -1100,15 +1090,19 @@ public class ClientNamenodeProtocolTranslatorPB implements
}
@Override
public Long elementToPrevKey(PathBasedCacheDescriptor element) {
return element.getEntryId();
public Long elementToPrevKey(PathBasedCacheDirective element) {
return element.getId();
}
}
@Override
public RemoteIterator<PathBasedCacheDescriptor> listPathBasedCacheDescriptors(long prevId,
String pool, String path) throws IOException {
return new PathBasedCacheEntriesIterator(prevId, pool, path);
public RemoteIterator<PathBasedCacheDirective>
listPathBasedCacheDirectives(long prevId,
PathBasedCacheDirective filter) throws IOException {
if (filter == null) {
filter = new PathBasedCacheDirective.Builder().build();
}
return new PathBasedCacheEntriesIterator(prevId, filter);
}
@Override

View File

@ -28,6 +28,7 @@ import java.util.List;
import org.apache.hadoop.fs.ContentSummary;
import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.FsServerDefaults;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
import org.apache.hadoop.ha.proto.HAServiceProtocolProtos;
@ -37,6 +38,7 @@ import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.protocol.CorruptFileBlocks;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.PathBasedCacheDirective;
import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo.AdminStates;
import org.apache.hadoop.hdfs.protocol.DirectoryListing;
@ -55,6 +57,7 @@ import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.Create
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.DatanodeReportTypeProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetFsStatsResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SafeModeActionProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.PathBasedCacheDirectiveInfoProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BalancerBandwidthCommandProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockCommandProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockIdCommandProto;
@ -152,6 +155,7 @@ import org.apache.hadoop.util.DataChecksum;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.primitives.Shorts;
import com.google.protobuf.ByteString;
import com.google.protobuf.CodedInputStream;
@ -1559,6 +1563,45 @@ public class PBHelper {
return DataChecksum.Type.valueOf(type.getNumber());
}
public static PathBasedCacheDirectiveInfoProto convert
(PathBasedCacheDirective directive) {
PathBasedCacheDirectiveInfoProto.Builder builder =
PathBasedCacheDirectiveInfoProto.newBuilder();
if (directive.getId() != null) {
builder.setId(directive.getId());
}
if (directive.getPath() != null) {
builder.setPath(directive.getPath().toUri().getPath());
}
if (directive.getReplication() != null) {
builder.setReplication(directive.getReplication());
}
if (directive.getPool() != null) {
builder.setPool(directive.getPool());
}
return builder.build();
}
public static PathBasedCacheDirective convert
(PathBasedCacheDirectiveInfoProto proto) {
PathBasedCacheDirective.Builder builder =
new PathBasedCacheDirective.Builder();
if (proto.hasId()) {
builder.setId(proto.getId());
}
if (proto.hasPath()) {
builder.setPath(new Path(proto.getPath()));
}
if (proto.hasReplication()) {
builder.setReplication(Shorts.checkedCast(
proto.getReplication()));
}
if (proto.hasPool()) {
builder.setPool(proto.getPool());
}
return builder.build();
}
public static HdfsProtos.ChecksumTypeProto convert(DataChecksum.Type type) {
return HdfsProtos.ChecksumTypeProto.valueOf(type.id);
}

View File

@ -19,14 +19,13 @@ package org.apache.hadoop.hdfs.server.namenode;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LIST_CACHE_POOLS_NUM_RESPONSES;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LIST_CACHE_POOLS_NUM_RESPONSES_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LIST_CACHE_DESCRIPTORS_NUM_RESPONSES;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LIST_CACHE_DIRECTIVES_NUM_RESPONSES;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_CACHING_ENABLED_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_CACHING_ENABLED_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LIST_CACHE_DESCRIPTORS_NUM_RESPONSES_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LIST_CACHE_DIRECTIVES_NUM_RESPONSES_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_PATH_BASED_CACHE_REFRESH_INTERVAL_MS;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_PATH_BASED_CACHE_REFRESH_INTERVAL_MS_DEFAULT;
import java.io.Closeable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
@ -44,33 +43,22 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.IdNotFoundException;
import org.apache.hadoop.fs.BatchedRemoteIterator.BatchedListEntries;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs.BlockReportIterator;
import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.PathBasedCacheDirective;
import org.apache.hadoop.hdfs.protocol.PathBasedCacheDescriptor;
import org.apache.hadoop.hdfs.protocol.AddPathBasedCacheDirectiveException.InvalidPoolNameError;
import org.apache.hadoop.hdfs.protocol.AddPathBasedCacheDirectiveException.UnexpectedAddPathBasedCacheDirectiveException;
import org.apache.hadoop.hdfs.protocol.AddPathBasedCacheDirectiveException.PoolWritePermissionDeniedError;
import org.apache.hadoop.hdfs.protocol.PathBasedCacheEntry;
import org.apache.hadoop.hdfs.protocol.RemovePathBasedCacheDescriptorException.InvalidIdException;
import org.apache.hadoop.hdfs.protocol.RemovePathBasedCacheDescriptorException.NoSuchIdException;
import org.apache.hadoop.hdfs.protocol.RemovePathBasedCacheDescriptorException.UnexpectedRemovePathBasedCacheDescriptorException;
import org.apache.hadoop.hdfs.protocol.RemovePathBasedCacheDescriptorException.RemovePermissionDeniedException;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
import org.apache.hadoop.hdfs.server.blockmanagement.CacheReplicationMonitor;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor.CachedBlocksList;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor.CachedBlocksList.Type;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
import org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeMetrics;
import org.apache.hadoop.hdfs.server.namenode.startupprogress.Phase;
import org.apache.hadoop.hdfs.server.namenode.startupprogress.StartupProgress;
@ -78,6 +66,7 @@ import org.apache.hadoop.hdfs.server.namenode.startupprogress.StartupProgress.Co
import org.apache.hadoop.hdfs.server.namenode.startupprogress.Step;
import org.apache.hadoop.hdfs.server.namenode.startupprogress.StepType;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.util.GSet;
import org.apache.hadoop.util.LightWeightGSet;
import org.apache.hadoop.util.Time;
@ -111,7 +100,7 @@ public final class CacheManager {
/**
* Cache entries, sorted by ID.
*
* listPathBasedCacheDescriptors relies on the ordering of elements in this map
* listPathBasedCacheDirectives relies on the ordering of elements in this map
* to track what has already been listed by the client.
*/
private final TreeMap<Long, PathBasedCacheEntry> entriesById =
@ -143,7 +132,7 @@ public final class CacheManager {
/**
* Maximum number of cache pool directives to list in one operation.
*/
private final int maxListCacheDescriptorsResponses;
private final int maxListCacheDirectivesNumResponses;
/**
* Interval between scans in milliseconds.
@ -191,9 +180,9 @@ public final class CacheManager {
this.maxListCachePoolsResponses = conf.getInt(
DFS_NAMENODE_LIST_CACHE_POOLS_NUM_RESPONSES,
DFS_NAMENODE_LIST_CACHE_POOLS_NUM_RESPONSES_DEFAULT);
this.maxListCacheDescriptorsResponses = conf.getInt(
DFS_NAMENODE_LIST_CACHE_DESCRIPTORS_NUM_RESPONSES,
DFS_NAMENODE_LIST_CACHE_DESCRIPTORS_NUM_RESPONSES_DEFAULT);
this.maxListCacheDirectivesNumResponses = conf.getInt(
DFS_NAMENODE_LIST_CACHE_DIRECTIVES_NUM_RESPONSES,
DFS_NAMENODE_LIST_CACHE_DIRECTIVES_NUM_RESPONSES_DEFAULT);
scanIntervalMs = conf.getLong(
DFS_NAMENODE_PATH_BASED_CACHE_REFRESH_INTERVAL_MS,
DFS_NAMENODE_PATH_BASED_CACHE_REFRESH_INTERVAL_MS_DEFAULT);
@ -266,132 +255,239 @@ public final class CacheManager {
return nextEntryId++;
}
public PathBasedCacheDescriptor addDirective(
PathBasedCacheDirective directive, FSPermissionChecker pc)
throws IOException {
assert namesystem.hasWriteLock();
CachePool pool = cachePools.get(directive.getPool());
if (pool == null) {
LOG.info("addDirective " + directive + ": pool not found.");
throw new InvalidPoolNameError(directive);
}
if ((pc != null) && (!pc.checkPermission(pool, FsAction.WRITE))) {
LOG.info("addDirective " + directive + ": write permission denied.");
throw new PoolWritePermissionDeniedError(directive);
}
try {
directive.validate();
} catch (IOException ioe) {
LOG.info("addDirective " + directive + ": validation failed: "
+ ioe.getClass().getName() + ": " + ioe.getMessage());
throw ioe;
}
// Add a new entry with the next available ID.
PathBasedCacheEntry entry;
try {
entry = new PathBasedCacheEntry(getNextEntryId(),
directive.getPath().toUri().getPath(),
directive.getReplication(), pool);
} catch (IOException ioe) {
throw new UnexpectedAddPathBasedCacheDirectiveException(directive);
}
LOG.info("addDirective " + directive + ": added cache directive "
+ directive);
// Success!
// First, add it to the various maps
private void addInternal(PathBasedCacheEntry entry) {
entriesById.put(entry.getEntryId(), entry);
String path = directive.getPath().toUri().getPath();
String path = entry.getPath();
List<PathBasedCacheEntry> entryList = entriesByPath.get(path);
if (entryList == null) {
entryList = new ArrayList<PathBasedCacheEntry>(1);
entriesByPath.put(path, entryList);
}
entryList.add(entry);
}
public PathBasedCacheDirective addDirective(
PathBasedCacheDirective directive, FSPermissionChecker pc)
throws IOException {
assert namesystem.hasWriteLock();
PathBasedCacheEntry entry;
try {
if (directive.getPool() == null) {
throw new IdNotFoundException("addDirective: no pool was specified.");
}
if (directive.getPool().isEmpty()) {
throw new IdNotFoundException("addDirective: pool name was empty.");
}
CachePool pool = cachePools.get(directive.getPool());
if (pool == null) {
throw new IdNotFoundException("addDirective: no such pool as " +
directive.getPool());
}
if ((pc != null) && (!pc.checkPermission(pool, FsAction.WRITE))) {
throw new AccessControlException("addDirective: write " +
"permission denied for pool " + directive.getPool());
}
if (directive.getPath() == null) {
throw new IOException("addDirective: no path was specified.");
}
String path = directive.getPath().toUri().getPath();
if (!DFSUtil.isValidName(path)) {
throw new IOException("addDirective: path '" + path + "' is invalid.");
}
short replication = directive.getReplication() == null ?
(short)1 : directive.getReplication();
if (replication <= 0) {
throw new IOException("addDirective: replication " + replication +
" is invalid.");
}
long id;
if (directive.getId() != null) {
// We are loading an entry from the edit log.
// Use the ID from the edit log.
id = directive.getId();
} else {
// Add a new entry with the next available ID.
id = getNextEntryId();
}
entry = new PathBasedCacheEntry(id, path, replication, pool);
addInternal(entry);
} catch (IOException e) {
LOG.warn("addDirective " + directive + ": failed.", e);
throw e;
}
LOG.info("addDirective " + directive + ": succeeded.");
if (monitor != null) {
monitor.kick();
}
return entry.getDescriptor();
return entry.toDirective();
}
public void removeDescriptor(long id, FSPermissionChecker pc)
public void modifyDirective(PathBasedCacheDirective directive,
FSPermissionChecker pc) throws IOException {
assert namesystem.hasWriteLock();
String idString =
(directive.getId() == null) ?
"(null)" : directive.getId().toString();
try {
// Check for invalid IDs.
Long id = directive.getId();
if (id == null) {
throw new IdNotFoundException("modifyDirective: " +
"no ID to modify was supplied.");
}
if (id <= 0) {
throw new IdNotFoundException("modifyDirective " + id +
": invalid non-positive directive ID.");
}
// Find the entry.
PathBasedCacheEntry prevEntry = entriesById.get(id);
if (prevEntry == null) {
throw new IdNotFoundException("modifyDirective " + id +
": id not found.");
}
if ((pc != null) &&
(!pc.checkPermission(prevEntry.getPool(), FsAction.WRITE))) {
throw new AccessControlException("modifyDirective " + id +
": permission denied for initial pool " + prevEntry.getPool());
}
String path = prevEntry.getPath();
if (directive.getPath() != null) {
path = directive.getPath().toUri().getPath();
if (!DFSUtil.isValidName(path)) {
throw new IOException("modifyDirective " + id + ": new path " +
path + " is not valid.");
}
}
short replication = (directive.getReplication() != null) ?
directive.getReplication() : prevEntry.getReplication();
if (replication <= 0) {
throw new IOException("modifyDirective: replication " + replication +
" is invalid.");
}
CachePool pool = prevEntry.getPool();
if (directive.getPool() != null) {
pool = cachePools.get(directive.getPool());
if (pool == null) {
throw new IdNotFoundException("modifyDirective " + id +
": pool " + directive.getPool() + " not found.");
}
if (directive.getPool().isEmpty()) {
throw new IdNotFoundException("modifyDirective: pool name was " +
"empty.");
}
if ((pc != null) &&
(!pc.checkPermission(pool, FsAction.WRITE))) {
throw new AccessControlException("modifyDirective " + id +
": permission denied for target pool " + pool);
}
}
removeInternal(prevEntry);
PathBasedCacheEntry newEntry =
new PathBasedCacheEntry(id, path, replication, pool);
addInternal(newEntry);
} catch (IOException e) {
LOG.warn("modifyDirective " + idString + ": failed.", e);
throw e;
}
LOG.info("modifyDirective " + idString + ": successfully applied " +
directive);
}
public void removeInternal(PathBasedCacheEntry existing)
throws IOException {
assert namesystem.hasWriteLock();
// Check for invalid IDs.
if (id <= 0) {
LOG.info("removeDescriptor " + id + ": invalid non-positive " +
"descriptor ID.");
throw new InvalidIdException(id);
}
// Find the entry.
PathBasedCacheEntry existing = entriesById.get(id);
if (existing == null) {
LOG.info("removeDescriptor " + id + ": entry not found.");
throw new NoSuchIdException(id);
}
CachePool pool = cachePools.get(existing.getDescriptor().getPool());
if (pool == null) {
LOG.info("removeDescriptor " + id + ": pool not found for directive " +
existing.getDescriptor());
throw new UnexpectedRemovePathBasedCacheDescriptorException(id);
}
if ((pc != null) && (!pc.checkPermission(pool, FsAction.WRITE))) {
LOG.info("removeDescriptor " + id + ": write permission denied to " +
"pool " + pool + " for entry " + existing);
throw new RemovePermissionDeniedException(id);
}
// Remove the corresponding entry in entriesByPath.
String path = existing.getDescriptor().getPath().toUri().getPath();
String path = existing.getPath();
List<PathBasedCacheEntry> entries = entriesByPath.get(path);
if (entries == null || !entries.remove(existing)) {
throw new UnexpectedRemovePathBasedCacheDescriptorException(id);
throw new IdNotFoundException("removeInternal: failed to locate entry " +
existing.getEntryId() + " by path " + existing.getPath());
}
if (entries.size() == 0) {
entriesByPath.remove(path);
}
entriesById.remove(id);
entriesById.remove(existing.getEntryId());
}
public void removeDirective(long id, FSPermissionChecker pc)
throws IOException {
assert namesystem.hasWriteLock();
try {
// Check for invalid IDs.
if (id <= 0) {
throw new IdNotFoundException("removeDirective " + id + ": invalid " +
"non-positive directive ID.");
}
// Find the entry.
PathBasedCacheEntry existing = entriesById.get(id);
if (existing == null) {
throw new IdNotFoundException("removeDirective " + id +
": id not found.");
}
if ((pc != null) &&
(!pc.checkPermission(existing.getPool(), FsAction.WRITE))) {
throw new AccessControlException("removeDirective " + id +
": write permission denied on pool " +
existing.getPool().getPoolName());
}
removeInternal(existing);
} catch (IOException e) {
LOG.warn("removeDirective " + id + " failed.", e);
throw e;
}
if (monitor != null) {
monitor.kick();
}
LOG.info("removeDescriptor successful for PathCacheEntry id " + id);
LOG.info("removeDirective " + id + ": succeeded.");
}
public BatchedListEntries<PathBasedCacheDescriptor>
listPathBasedCacheDescriptors(long prevId, String filterPool,
String filterPath, FSPermissionChecker pc) throws IOException {
public BatchedListEntries<PathBasedCacheDirective>
listPathBasedCacheDirectives(long prevId,
PathBasedCacheDirective filter,
FSPermissionChecker pc) throws IOException {
assert namesystem.hasReadOrWriteLock();
final int NUM_PRE_ALLOCATED_ENTRIES = 16;
if (filterPath != null) {
String filterPath = null;
if (filter.getId() != null) {
throw new IOException("we currently don't support filtering by ID");
}
if (filter.getPath() != null) {
filterPath = filter.getPath().toUri().getPath();
if (!DFSUtil.isValidName(filterPath)) {
throw new IOException("invalid path name '" + filterPath + "'");
throw new IOException("listPathBasedCacheDirectives: invalid " +
"path name '" + filterPath + "'");
}
}
ArrayList<PathBasedCacheDescriptor> replies =
new ArrayList<PathBasedCacheDescriptor>(NUM_PRE_ALLOCATED_ENTRIES);
if (filter.getReplication() != null) {
throw new IOException("we currently don't support filtering " +
"by replication");
}
ArrayList<PathBasedCacheDirective> replies =
new ArrayList<PathBasedCacheDirective>(NUM_PRE_ALLOCATED_ENTRIES);
int numReplies = 0;
SortedMap<Long, PathBasedCacheEntry> tailMap = entriesById.tailMap(prevId + 1);
SortedMap<Long, PathBasedCacheEntry> tailMap =
entriesById.tailMap(prevId + 1);
for (Entry<Long, PathBasedCacheEntry> cur : tailMap.entrySet()) {
if (numReplies >= maxListCacheDescriptorsResponses) {
return new BatchedListEntries<PathBasedCacheDescriptor>(replies, true);
if (numReplies >= maxListCacheDirectivesNumResponses) {
return new BatchedListEntries<PathBasedCacheDirective>(replies, true);
}
PathBasedCacheEntry curEntry = cur.getValue();
PathBasedCacheDirective directive = cur.getValue().getDescriptor();
if (filterPool != null &&
!directive.getPool().equals(filterPool)) {
PathBasedCacheDirective directive = cur.getValue().toDirective();
if (filter.getPool() != null &&
!directive.getPool().equals(filter.getPool())) {
continue;
}
if (filterPath != null &&
!directive.getPath().toUri().getPath().equals(filterPath)) {
continue;
}
if (pc.checkPermission(curEntry.getPool(), FsAction.READ)) {
replies.add(cur.getValue().getDescriptor());
if ((pc == null) ||
(pc.checkPermission(curEntry.getPool(), FsAction.READ))) {
replies.add(cur.getValue().toDirective());
numReplies++;
}
}
return new BatchedListEntries<PathBasedCacheDescriptor>(replies, false);
return new BatchedListEntries<PathBasedCacheDirective>(replies, false);
}
/**
@ -553,7 +649,8 @@ public final class CacheManager {
blockManager.getDatanodeManager().getDatanode(datanodeID);
if (datanode == null || !datanode.isAlive) {
throw new IOException(
"processCacheReport from dead or unregistered datanode: " + datanode);
"processCacheReport from dead or unregistered datanode: " +
datanode);
}
processCacheReportImpl(datanode, blockIds);
} finally {

View File

@ -39,7 +39,6 @@ import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.PathBasedCacheDirective;
import org.apache.hadoop.hdfs.protocol.PathBasedCacheEntry;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole;
import org.apache.hadoop.hdfs.server.common.Storage.FormatConfirmable;
@ -47,6 +46,7 @@ import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.AddCachePoolOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.AddOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.AddPathBasedCacheDirectiveOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.ModifyPathBasedCacheDirectiveOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.AllocateBlockIdOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.AllowSnapshotOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.CancelDelegationTokenOp;
@ -63,7 +63,7 @@ import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.ModifyCachePoolOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.OpInstanceCache;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.ReassignLeaseOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RemoveCachePoolOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RemovePathBasedCacheDescriptorOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RemovePathBasedCacheDirectiveOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RenameOldOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RenameOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RenameSnapshotOp;
@ -956,18 +956,25 @@ public class FSEditLog implements LogsPurgeable {
void logAddPathBasedCacheDirective(PathBasedCacheDirective directive,
boolean toLogRpcIds) {
AddPathBasedCacheDirectiveOp op = AddPathBasedCacheDirectiveOp.getInstance(
cache.get())
.setPath(directive.getPath().toUri().getPath())
.setReplication(directive.getReplication())
.setPool(directive.getPool());
AddPathBasedCacheDirectiveOp op =
AddPathBasedCacheDirectiveOp.getInstance(cache.get())
.setDirective(directive);
logRpcIds(op, toLogRpcIds);
logEdit(op);
}
void logRemovePathBasedCacheDescriptor(Long id, boolean toLogRpcIds) {
RemovePathBasedCacheDescriptorOp op =
RemovePathBasedCacheDescriptorOp.getInstance(cache.get()).setId(id);
void logModifyPathBasedCacheDirective(
PathBasedCacheDirective directive, boolean toLogRpcIds) {
ModifyPathBasedCacheDirectiveOp op =
ModifyPathBasedCacheDirectiveOp.getInstance(
cache.get()).setDirective(directive);
logRpcIds(op, toLogRpcIds);
logEdit(op);
}
void logRemovePathBasedCacheDirective(Long id, boolean toLogRpcIds) {
RemovePathBasedCacheDirectiveOp op =
RemovePathBasedCacheDirectiveOp.getInstance(cache.get()).setId(id);
logRpcIds(op, toLogRpcIds);
logEdit(op);
}

View File

@ -37,7 +37,6 @@ import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.protocol.LayoutVersion;
import org.apache.hadoop.hdfs.protocol.LayoutVersion.Feature;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.PathBasedCacheDescriptor;
import org.apache.hadoop.hdfs.protocol.PathBasedCacheDirective;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoUnderConstruction;
@ -58,9 +57,10 @@ import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.DisallowSnapshotOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.GetDelegationTokenOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.MkdirOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.ModifyCachePoolOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.ModifyPathBasedCacheDirectiveOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.ReassignLeaseOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RemoveCachePoolOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RemovePathBasedCacheDescriptorOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RemovePathBasedCacheDirectiveOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RenameOldOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RenameOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RenameSnapshotOp;
@ -642,23 +642,28 @@ public class FSEditLogLoader {
}
case OP_ADD_PATH_BASED_CACHE_DIRECTIVE: {
AddPathBasedCacheDirectiveOp addOp = (AddPathBasedCacheDirectiveOp) op;
PathBasedCacheDirective d = new PathBasedCacheDirective.Builder().
setPath(new Path(addOp.path)).
setReplication(addOp.replication).
setPool(addOp.pool).
build();
PathBasedCacheDescriptor descriptor =
fsNamesys.getCacheManager().addDirective(d, null);
PathBasedCacheDirective result = fsNamesys.
getCacheManager().addDirective(addOp.directive, null);
if (toAddRetryCache) {
fsNamesys.addCacheEntryWithPayload(op.rpcClientId, op.rpcCallId,
descriptor);
Long id = result.getId();
fsNamesys.addCacheEntryWithPayload(op.rpcClientId, op.rpcCallId, id);
}
break;
}
case OP_REMOVE_PATH_BASED_CACHE_DESCRIPTOR: {
RemovePathBasedCacheDescriptorOp removeOp =
(RemovePathBasedCacheDescriptorOp) op;
fsNamesys.getCacheManager().removeDescriptor(removeOp.id, null);
case OP_MODIFY_PATH_BASED_CACHE_DIRECTIVE: {
ModifyPathBasedCacheDirectiveOp modifyOp =
(ModifyPathBasedCacheDirectiveOp) op;
fsNamesys.getCacheManager().modifyDirective(
modifyOp.directive, null);
if (toAddRetryCache) {
fsNamesys.addCacheEntry(op.rpcClientId, op.rpcCallId);
}
break;
}
case OP_REMOVE_PATH_BASED_CACHE_DIRECTIVE: {
RemovePathBasedCacheDirectiveOp removeOp =
(RemovePathBasedCacheDirectiveOp) op;
fsNamesys.getCacheManager().removeDirective(removeOp.id, null);
if (toAddRetryCache) {
fsNamesys.addCacheEntry(op.rpcClientId, op.rpcCallId);
}

View File

@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs.server.namenode;
import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_ADD;
import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_ADD_CACHE_POOL;
import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_ADD_PATH_BASED_CACHE_DIRECTIVE;
import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_MODIFY_PATH_BASED_CACHE_DIRECTIVE;
import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_ALLOCATE_BLOCK_ID;
import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_ALLOW_SNAPSHOT;
import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_CANCEL_DELEGATION_TOKEN;
@ -37,7 +38,7 @@ import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_MKDIR;
import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_MODIFY_CACHE_POOL;
import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_REASSIGN_LEASE;
import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_REMOVE_CACHE_POOL;
import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_REMOVE_PATH_BASED_CACHE_DESCRIPTOR;
import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_REMOVE_PATH_BASED_CACHE_DIRECTIVE;
import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_RENAME;
import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_RENAME_OLD;
import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_RENAME_SNAPSHOT;
@ -74,6 +75,7 @@ import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.ChecksumException;
import org.apache.hadoop.fs.Options.Rename;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.fs.permission.PermissionStatus;
import org.apache.hadoop.hdfs.DFSConfigKeys;
@ -84,6 +86,7 @@ import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.LayoutVersion;
import org.apache.hadoop.hdfs.protocol.LayoutVersion.Feature;
import org.apache.hadoop.hdfs.protocol.PathBasedCacheDirective;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
import org.apache.hadoop.hdfs.util.XMLUtils;
import org.apache.hadoop.hdfs.util.XMLUtils.InvalidXmlException;
@ -164,8 +167,10 @@ public abstract class FSEditLogOp {
inst.put(OP_ALLOCATE_BLOCK_ID, new AllocateBlockIdOp());
inst.put(OP_ADD_PATH_BASED_CACHE_DIRECTIVE,
new AddPathBasedCacheDirectiveOp());
inst.put(OP_REMOVE_PATH_BASED_CACHE_DESCRIPTOR,
new RemovePathBasedCacheDescriptorOp());
inst.put(OP_MODIFY_PATH_BASED_CACHE_DIRECTIVE,
new ModifyPathBasedCacheDirectiveOp());
inst.put(OP_REMOVE_PATH_BASED_CACHE_DIRECTIVE,
new RemovePathBasedCacheDirectiveOp());
inst.put(OP_ADD_CACHE_POOL, new AddCachePoolOp());
inst.put(OP_MODIFY_CACHE_POOL, new ModifyCachePoolOp());
inst.put(OP_REMOVE_CACHE_POOL, new RemoveCachePoolOp());
@ -2866,9 +2871,7 @@ public abstract class FSEditLogOp {
* {@link ClientProtocol#addPathBasedCacheDirective}
*/
static class AddPathBasedCacheDirectiveOp extends FSEditLogOp {
String path;
short replication;
String pool;
PathBasedCacheDirective directive;
public AddPathBasedCacheDirectiveOp() {
super(OP_ADD_PATH_BASED_CACHE_DIRECTIVE);
@ -2879,51 +2882,60 @@ public abstract class FSEditLogOp {
.get(OP_ADD_PATH_BASED_CACHE_DIRECTIVE);
}
public AddPathBasedCacheDirectiveOp setPath(String path) {
this.path = path;
return this;
}
public AddPathBasedCacheDirectiveOp setReplication(short replication) {
this.replication = replication;
return this;
}
public AddPathBasedCacheDirectiveOp setPool(String pool) {
this.pool = pool;
public AddPathBasedCacheDirectiveOp setDirective(
PathBasedCacheDirective directive) {
this.directive = directive;
assert(directive.getId() != null);
assert(directive.getPath() != null);
assert(directive.getReplication() != null);
assert(directive.getPool() != null);
return this;
}
@Override
void readFields(DataInputStream in, int logVersion) throws IOException {
this.path = FSImageSerialization.readString(in);
this.replication = FSImageSerialization.readShort(in);
this.pool = FSImageSerialization.readString(in);
long id = FSImageSerialization.readLong(in);
String path = FSImageSerialization.readString(in);
short replication = FSImageSerialization.readShort(in);
String pool = FSImageSerialization.readString(in);
directive = new PathBasedCacheDirective.Builder().
setId(id).
setPath(new Path(path)).
setReplication(replication).
setPool(pool).
build();
readRpcIds(in, logVersion);
}
@Override
public void writeFields(DataOutputStream out) throws IOException {
FSImageSerialization.writeString(path, out);
FSImageSerialization.writeShort(replication, out);
FSImageSerialization.writeString(pool, out);
FSImageSerialization.writeLong(directive.getId(), out);
FSImageSerialization.writeString(directive.getPath().toUri().getPath(), out);
FSImageSerialization.writeShort(directive.getReplication(), out);
FSImageSerialization.writeString(directive.getPool(), out);
writeRpcIds(rpcClientId, rpcCallId, out);
}
@Override
protected void toXml(ContentHandler contentHandler) throws SAXException {
XMLUtils.addSaxString(contentHandler, "PATH", path);
XMLUtils.addSaxString(contentHandler, "ID",
directive.getId().toString());
XMLUtils.addSaxString(contentHandler, "PATH",
directive.getPath().toUri().getPath());
XMLUtils.addSaxString(contentHandler, "REPLICATION",
Short.toString(replication));
XMLUtils.addSaxString(contentHandler, "POOL", pool);
Short.toString(directive.getReplication()));
XMLUtils.addSaxString(contentHandler, "POOL", directive.getPool());
appendRpcIdsToXml(contentHandler, rpcClientId, rpcCallId);
}
@Override
void fromXml(Stanza st) throws InvalidXmlException {
path = st.getValue("PATH");
replication = Short.parseShort(st.getValue("REPLICATION"));
pool = st.getValue("POOL");
directive = new PathBasedCacheDirective.Builder().
setId(Long.parseLong(st.getValue("ID"))).
setPath(new Path(st.getValue("PATH"))).
setReplication(Short.parseShort(st.getValue("REPLICATION"))).
setPool(st.getValue("POOL")).
build();
readRpcIdsFromXml(st);
}
@ -2931,9 +2943,10 @@ public abstract class FSEditLogOp {
public String toString() {
StringBuilder builder = new StringBuilder();
builder.append("AddPathBasedCacheDirective [");
builder.append("path=" + path + ",");
builder.append("replication=" + replication + ",");
builder.append("pool=" + pool);
builder.append("id=" + directive.getId() + ",");
builder.append("path=" + directive.getPath().toUri().getPath() + ",");
builder.append("replication=" + directive.getReplication() + ",");
builder.append("pool=" + directive.getPool());
appendRpcIdsToString(builder, rpcClientId, rpcCallId);
builder.append("]");
return builder.toString();
@ -2942,21 +2955,149 @@ public abstract class FSEditLogOp {
/**
* {@literal @AtMostOnce} for
* {@link ClientProtocol#removePathBasedCacheDescriptor}
* {@link ClientProtocol#modifyPathBasedCacheDirective}
*/
static class RemovePathBasedCacheDescriptorOp extends FSEditLogOp {
static class ModifyPathBasedCacheDirectiveOp extends FSEditLogOp {
PathBasedCacheDirective directive;
public ModifyPathBasedCacheDirectiveOp() {
super(OP_MODIFY_PATH_BASED_CACHE_DIRECTIVE);
}
static ModifyPathBasedCacheDirectiveOp getInstance(OpInstanceCache cache) {
return (ModifyPathBasedCacheDirectiveOp) cache
.get(OP_MODIFY_PATH_BASED_CACHE_DIRECTIVE);
}
public ModifyPathBasedCacheDirectiveOp setDirective(
PathBasedCacheDirective directive) {
this.directive = directive;
assert(directive.getId() != null);
return this;
}
@Override
void readFields(DataInputStream in, int logVersion) throws IOException {
PathBasedCacheDirective.Builder builder =
new PathBasedCacheDirective.Builder();
builder.setId(FSImageSerialization.readLong(in));
byte flags = in.readByte();
if ((flags & 0x1) != 0) {
builder.setPath(new Path(FSImageSerialization.readString(in)));
}
if ((flags & 0x2) != 0) {
builder.setReplication(FSImageSerialization.readShort(in));
}
if ((flags & 0x4) != 0) {
builder.setPool(FSImageSerialization.readString(in));
}
if ((flags & ~0x7) != 0) {
throw new IOException("unknown flags set in " +
"ModifyPathBasedCacheDirectiveOp: " + flags);
}
this.directive = builder.build();
readRpcIds(in, logVersion);
}
@Override
public void writeFields(DataOutputStream out) throws IOException {
FSImageSerialization.writeLong(directive.getId(), out);
byte flags = (byte)(
((directive.getPath() != null) ? 0x1 : 0) |
((directive.getReplication() != null) ? 0x2 : 0) |
((directive.getPool() != null) ? 0x4 : 0)
);
out.writeByte(flags);
if (directive.getPath() != null) {
FSImageSerialization.writeString(
directive.getPath().toUri().getPath(), out);
}
if (directive.getReplication() != null) {
FSImageSerialization.writeShort(directive.getReplication(), out);
}
if (directive.getPool() != null) {
FSImageSerialization.writeString(directive.getPool(), out);
}
writeRpcIds(rpcClientId, rpcCallId, out);
}
@Override
protected void toXml(ContentHandler contentHandler) throws SAXException {
XMLUtils.addSaxString(contentHandler, "ID",
Long.toString(directive.getId()));
if (directive.getPath() != null) {
XMLUtils.addSaxString(contentHandler, "PATH",
directive.getPath().toUri().getPath());
}
if (directive.getReplication() != null) {
XMLUtils.addSaxString(contentHandler, "REPLICATION",
Short.toString(directive.getReplication()));
}
if (directive.getPool() != null) {
XMLUtils.addSaxString(contentHandler, "POOL", directive.getPool());
}
appendRpcIdsToXml(contentHandler, rpcClientId, rpcCallId);
}
@Override
void fromXml(Stanza st) throws InvalidXmlException {
PathBasedCacheDirective.Builder builder =
new PathBasedCacheDirective.Builder();
builder.setId(Long.parseLong(st.getValue("ID")));
String path = st.getValueOrNull("PATH");
if (path != null) {
builder.setPath(new Path(path));
}
String replicationString = st.getValueOrNull("REPLICATION");
if (replicationString != null) {
builder.setReplication(Short.parseShort(replicationString));
}
String pool = st.getValueOrNull("POOL");
if (pool != null) {
builder.setPool(pool);
}
this.directive = builder.build();
readRpcIdsFromXml(st);
}
@Override
public String toString() {
StringBuilder builder = new StringBuilder();
builder.append("ModifyPathBasedCacheDirectiveOp[");
builder.append("id=").append(directive.getId());
if (directive.getPath() != null) {
builder.append(",").append("path=").append(directive.getPath());
}
if (directive.getReplication() != null) {
builder.append(",").append("replication=").
append(directive.getReplication());
}
if (directive.getPool() != null) {
builder.append(",").append("pool=").append(directive.getPool());
}
appendRpcIdsToString(builder, rpcClientId, rpcCallId);
builder.append("]");
return builder.toString();
}
}
/**
* {@literal @AtMostOnce} for
* {@link ClientProtocol#removePathBasedCacheDirective}
*/
static class RemovePathBasedCacheDirectiveOp extends FSEditLogOp {
long id;
public RemovePathBasedCacheDescriptorOp() {
super(OP_REMOVE_PATH_BASED_CACHE_DESCRIPTOR);
public RemovePathBasedCacheDirectiveOp() {
super(OP_REMOVE_PATH_BASED_CACHE_DIRECTIVE);
}
static RemovePathBasedCacheDescriptorOp getInstance(OpInstanceCache cache) {
return (RemovePathBasedCacheDescriptorOp) cache
.get(OP_REMOVE_PATH_BASED_CACHE_DESCRIPTOR);
static RemovePathBasedCacheDirectiveOp getInstance(OpInstanceCache cache) {
return (RemovePathBasedCacheDirectiveOp) cache
.get(OP_REMOVE_PATH_BASED_CACHE_DIRECTIVE);
}
public RemovePathBasedCacheDescriptorOp setId(long id) {
public RemovePathBasedCacheDirectiveOp setId(long id) {
this.id = id;
return this;
}
@ -2988,7 +3129,7 @@ public abstract class FSEditLogOp {
@Override
public String toString() {
StringBuilder builder = new StringBuilder();
builder.append("RemovePathBasedCacheDescriptor [");
builder.append("RemovePathBasedCacheDirective [");
builder.append("id=" + Long.toString(id));
appendRpcIdsToString(builder, rpcClientId, rpcCallId);
builder.append("]");

View File

@ -64,11 +64,12 @@ public enum FSEditLogOpCodes {
OP_DISALLOW_SNAPSHOT ((byte) 30),
OP_SET_GENSTAMP_V2 ((byte) 31),
OP_ALLOCATE_BLOCK_ID ((byte) 32),
OP_ADD_PATH_BASED_CACHE_DIRECTIVE ((byte) 33),
OP_REMOVE_PATH_BASED_CACHE_DESCRIPTOR ((byte) 34),
OP_ADD_CACHE_POOL ((byte) 35),
OP_MODIFY_CACHE_POOL ((byte) 36),
OP_REMOVE_CACHE_POOL ((byte) 37);
OP_ADD_PATH_BASED_CACHE_DIRECTIVE ((byte) 33),
OP_REMOVE_PATH_BASED_CACHE_DIRECTIVE ((byte) 34),
OP_ADD_CACHE_POOL ((byte) 35),
OP_MODIFY_CACHE_POOL ((byte) 36),
OP_REMOVE_CACHE_POOL ((byte) 37),
OP_MODIFY_PATH_BASED_CACHE_DIRECTIVE ((byte) 38);
private byte opCode;

View File

@ -152,7 +152,6 @@ import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.PathBasedCacheDirective;
import org.apache.hadoop.hdfs.protocol.PathBasedCacheDescriptor;
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
@ -7008,7 +7007,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
}
}
PathBasedCacheDescriptor addPathBasedCacheDirective(
long addPathBasedCacheDirective(
PathBasedCacheDirective directive) throws IOException {
checkOperation(OperationCategory.WRITE);
final FSPermissionChecker pc = isPermissionEnabled ?
@ -7016,20 +7015,26 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
CacheEntryWithPayload cacheEntry =
RetryCache.waitForCompletion(retryCache, null);
if (cacheEntry != null && cacheEntry.isSuccess()) {
return (PathBasedCacheDescriptor) cacheEntry.getPayload();
return (Long) cacheEntry.getPayload();
}
boolean success = false;
PathBasedCacheDescriptor result = null;
writeLock();
Long result = null;
try {
checkOperation(OperationCategory.WRITE);
if (isInSafeMode()) {
throw new SafeModeException(
"Cannot add PathBasedCache directive", safeMode);
}
result = cacheManager.addDirective(directive, pc);
getEditLog().logAddPathBasedCacheDirective(directive,
if (directive.getId() != null) {
throw new IOException("addDirective: you cannot specify an ID " +
"for this operation.");
}
PathBasedCacheDirective effectiveDirective =
cacheManager.addDirective(directive, pc);
getEditLog().logAddPathBasedCacheDirective(effectiveDirective,
cacheEntry != null);
result = effectiveDirective.getId();
success = true;
} finally {
writeUnlock();
@ -7044,7 +7049,40 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
return result;
}
void removePathBasedCacheDescriptor(Long id) throws IOException {
void modifyPathBasedCacheDirective(
PathBasedCacheDirective directive) throws IOException {
checkOperation(OperationCategory.WRITE);
final FSPermissionChecker pc = isPermissionEnabled ?
getPermissionChecker() : null;
boolean success = false;
CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
if (cacheEntry != null && cacheEntry.isSuccess()) {
return;
}
writeLock();
try {
checkOperation(OperationCategory.WRITE);
if (isInSafeMode()) {
throw new SafeModeException(
"Cannot add PathBasedCache directive", safeMode);
}
cacheManager.modifyDirective(directive, pc);
getEditLog().logModifyPathBasedCacheDirective(directive,
cacheEntry != null);
success = true;
} finally {
writeUnlock();
if (success) {
getEditLog().logSync();
}
if (isAuditEnabled() && isExternalInvocation()) {
logAuditEvent(success, "addPathBasedCacheDirective", null, null, null);
}
RetryCache.setState(cacheEntry, success);
}
}
void removePathBasedCacheDirective(Long id) throws IOException {
checkOperation(OperationCategory.WRITE);
final FSPermissionChecker pc = isPermissionEnabled ?
getPermissionChecker() : null;
@ -7060,13 +7098,13 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
throw new SafeModeException(
"Cannot remove PathBasedCache directives", safeMode);
}
cacheManager.removeDescriptor(id, pc);
getEditLog().logRemovePathBasedCacheDescriptor(id, cacheEntry != null);
cacheManager.removeDirective(id, pc);
getEditLog().logRemovePathBasedCacheDirective(id, cacheEntry != null);
success = true;
} finally {
writeUnlock();
if (isAuditEnabled() && isExternalInvocation()) {
logAuditEvent(success, "removePathBasedCacheDescriptor", null, null,
logAuditEvent(success, "removePathBasedCacheDirective", null, null,
null);
}
RetryCache.setState(cacheEntry, success);
@ -7074,23 +7112,23 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
getEditLog().logSync();
}
BatchedListEntries<PathBasedCacheDescriptor> listPathBasedCacheDescriptors(
long startId, String pool, String path) throws IOException {
BatchedListEntries<PathBasedCacheDirective> listPathBasedCacheDirectives(
long startId, PathBasedCacheDirective filter) throws IOException {
checkOperation(OperationCategory.READ);
final FSPermissionChecker pc = isPermissionEnabled ?
getPermissionChecker() : null;
BatchedListEntries<PathBasedCacheDescriptor> results;
BatchedListEntries<PathBasedCacheDirective> results;
readLock();
boolean success = false;
try {
checkOperation(OperationCategory.READ);
results =
cacheManager.listPathBasedCacheDescriptors(startId, pool, path, pc);
cacheManager.listPathBasedCacheDirectives(startId, filter, pc);
success = true;
} finally {
readUnlock();
if (isAuditEnabled() && isExternalInvocation()) {
logAuditEvent(success, "listPathBasedCacheDescriptors", null, null,
logAuditEvent(success, "listPathBasedCacheDirectives", null, null,
null);
}
}

View File

@ -62,7 +62,6 @@ import org.apache.hadoop.hdfs.HDFSPolicyProvider;
import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException;
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
import org.apache.hadoop.hdfs.protocol.PathBasedCacheDirective;
import org.apache.hadoop.hdfs.protocol.PathBasedCacheDescriptor;
import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
import org.apache.hadoop.hdfs.protocol.CorruptFileBlocks;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
@ -1234,46 +1233,52 @@ class NameNodeRpcServer implements NamenodeProtocols {
}
@Override
public PathBasedCacheDescriptor addPathBasedCacheDirective(
public long addPathBasedCacheDirective(
PathBasedCacheDirective path) throws IOException {
return namesystem.addPathBasedCacheDirective(path);
}
@Override
public void removePathBasedCacheDescriptor(Long id) throws IOException {
namesystem.removePathBasedCacheDescriptor(id);
public void modifyPathBasedCacheDirective(
PathBasedCacheDirective directive) throws IOException {
namesystem.modifyPathBasedCacheDirective(directive);
}
@Override
public void removePathBasedCacheDirective(long id) throws IOException {
namesystem.removePathBasedCacheDirective(id);
}
private class ServerSidePathBasedCacheEntriesIterator
extends BatchedRemoteIterator<Long, PathBasedCacheDescriptor> {
extends BatchedRemoteIterator<Long, PathBasedCacheDirective> {
private final String pool;
private final String path;
public ServerSidePathBasedCacheEntriesIterator(Long firstKey, String pool,
String path) {
private final PathBasedCacheDirective filter;
public ServerSidePathBasedCacheEntriesIterator(Long firstKey,
PathBasedCacheDirective filter) {
super(firstKey);
this.pool = pool;
this.path = path;
this.filter = filter;
}
@Override
public BatchedEntries<PathBasedCacheDescriptor> makeRequest(
public BatchedEntries<PathBasedCacheDirective> makeRequest(
Long nextKey) throws IOException {
return namesystem.listPathBasedCacheDescriptors(nextKey, pool, path);
return namesystem.listPathBasedCacheDirectives(nextKey, filter);
}
@Override
public Long elementToPrevKey(PathBasedCacheDescriptor entry) {
return entry.getEntryId();
public Long elementToPrevKey(PathBasedCacheDirective entry) {
return entry.getId();
}
}
@Override
public RemoteIterator<PathBasedCacheDescriptor> listPathBasedCacheDescriptors(long prevId,
String pool, String path) throws IOException {
return new ServerSidePathBasedCacheEntriesIterator(prevId, pool, path);
public RemoteIterator<PathBasedCacheDirective> listPathBasedCacheDirectives(long prevId,
PathBasedCacheDirective filter) throws IOException {
if (filter == null) {
filter = new PathBasedCacheDirective.Builder().build();
}
return new ServerSidePathBasedCacheEntriesIterator(prevId, filter);
}
@Override

View File

@ -30,11 +30,8 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.protocol.AddPathBasedCacheDirectiveException;
import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
import org.apache.hadoop.hdfs.protocol.PathBasedCacheDescriptor;
import org.apache.hadoop.hdfs.protocol.PathBasedCacheDirective;
import org.apache.hadoop.hdfs.protocol.RemovePathBasedCacheDescriptorException;
import org.apache.hadoop.hdfs.server.namenode.CachePool;
import org.apache.hadoop.hdfs.tools.TableListing.Justification;
import org.apache.hadoop.ipc.RemoteException;
@ -180,11 +177,9 @@ public class CacheAdmin extends Configured implements Tool {
setPool(poolName).
build();
try {
PathBasedCacheDescriptor descriptor =
dfs.addPathBasedCacheDirective(directive);
System.out.println("Added PathBasedCache entry "
+ descriptor.getEntryId());
} catch (AddPathBasedCacheDirectiveException e) {
long id = dfs.addPathBasedCacheDirective(directive);
System.out.println("Added PathBasedCache entry " + id);
} catch (IOException e) {
System.err.println(prettifyException(e));
return 2;
}
@ -243,9 +238,9 @@ public class CacheAdmin extends Configured implements Tool {
}
DistributedFileSystem dfs = getDFS(conf);
try {
dfs.getClient().removePathBasedCacheDescriptor(id);
dfs.getClient().removePathBasedCacheDirective(id);
System.out.println("Removed PathBasedCache directive " + id);
} catch (RemovePathBasedCacheDescriptorException e) {
} catch (IOException e) {
System.err.println(prettifyException(e));
return 2;
}
@ -289,16 +284,18 @@ public class CacheAdmin extends Configured implements Tool {
return 1;
}
DistributedFileSystem dfs = getDFS(conf);
RemoteIterator<PathBasedCacheDescriptor> iter =
dfs.listPathBasedCacheDescriptors(null, new Path(path));
RemoteIterator<PathBasedCacheDirective> iter =
dfs.listPathBasedCacheDirectives(
new PathBasedCacheDirective.Builder().
setPath(new Path(path)).build());
int exitCode = 0;
while (iter.hasNext()) {
PathBasedCacheDescriptor entry = iter.next();
PathBasedCacheDirective directive = iter.next();
try {
dfs.removePathBasedCacheDescriptor(entry);
dfs.removePathBasedCacheDirective(directive.getId());
System.out.println("Removed PathBasedCache directive " +
entry.getEntryId());
} catch (RemovePathBasedCacheDescriptorException e) {
directive.getId());
} catch (IOException e) {
System.err.println(prettifyException(e));
exitCode = 2;
}
@ -338,8 +335,16 @@ public class CacheAdmin extends Configured implements Tool {
@Override
public int run(Configuration conf, List<String> args) throws IOException {
PathBasedCacheDirective.Builder builder =
new PathBasedCacheDirective.Builder();
String pathFilter = StringUtils.popOptionWithArgument("-path", args);
if (pathFilter != null) {
builder.setPath(new Path(pathFilter));
}
String poolFilter = StringUtils.popOptionWithArgument("-pool", args);
if (poolFilter != null) {
builder.setPool(poolFilter);
}
if (!args.isEmpty()) {
System.err.println("Can't understand argument: " + args.get(0));
return 1;
@ -350,15 +355,14 @@ public class CacheAdmin extends Configured implements Tool {
addField("PATH", Justification.LEFT).
build();
DistributedFileSystem dfs = getDFS(conf);
RemoteIterator<PathBasedCacheDescriptor> iter =
dfs.listPathBasedCacheDescriptors(poolFilter, pathFilter != null ?
new Path(pathFilter) : null);
RemoteIterator<PathBasedCacheDirective> iter =
dfs.listPathBasedCacheDirectives(builder.build());
int numEntries = 0;
while (iter.hasNext()) {
PathBasedCacheDescriptor entry = iter.next();
PathBasedCacheDirective directive = iter.next();
String row[] = new String[] {
"" + entry.getEntryId(), entry.getPool(),
entry.getPath().toUri().getPath(),
"" + directive.getId(), directive.getPool(),
directive.getPath().toUri().getPath(),
};
tableListing.addRow(row);
numEntries++;

View File

@ -252,9 +252,24 @@ public class XMLUtils {
* @return the entry
*/
public String getValue(String name) throws InvalidXmlException {
if (!subtrees.containsKey(name)) {
String ret = getValueOrNull(name);
if (ret == null) {
throw new InvalidXmlException("no entry found for " + name);
}
return ret;
}
/**
* Pull a string entry from a stanza, or null.
*
* @param name entry to look for
*
* @return the entry, or null if it was not found.
*/
public String getValueOrNull(String name) throws InvalidXmlException {
if (!subtrees.containsKey(name)) {
return null;
}
LinkedList <Stanza> l = subtrees.get(name);
if (l.size() != 1) {
throw new InvalidXmlException("More than one value found for " + name);

View File

@ -363,42 +363,46 @@ message IsFileClosedResponseProto {
required bool result = 1;
}
message PathBasedCacheDirectiveProto {
required string path = 1;
required uint32 replication = 2;
required string pool = 3;
message PathBasedCacheDirectiveInfoProto {
optional int64 id = 1;
optional string path = 2;
optional uint32 replication = 3;
optional string pool = 4;
}
message AddPathBasedCacheDirectiveRequestProto {
required PathBasedCacheDirectiveProto directive = 1;
required PathBasedCacheDirectiveInfoProto info = 1;
}
message AddPathBasedCacheDirectiveResponseProto {
required int64 descriptorId = 1;
}
message RemovePathBasedCacheDescriptorRequestProto {
required int64 descriptorId = 1;
}
message RemovePathBasedCacheDescriptorResponseProto {
}
message ListPathBasedCacheDescriptorsRequestProto {
required int64 prevId = 1;
optional string pool = 2;
optional string path = 3;
}
message ListPathBasedCacheDescriptorsElementProto {
required int64 id = 1;
required string pool = 2;
required uint32 replication = 3;
required string path = 4;
}
message ListPathBasedCacheDescriptorsResponseProto {
repeated ListPathBasedCacheDescriptorsElementProto elements = 1;
message ModifyPathBasedCacheDirectiveRequestProto {
required PathBasedCacheDirectiveInfoProto info = 1;
}
message ModifyPathBasedCacheDirectiveResponseProto {
}
message RemovePathBasedCacheDirectiveRequestProto {
required int64 id = 1;
}
message RemovePathBasedCacheDirectiveResponseProto {
}
message ListPathBasedCacheDirectivesRequestProto {
required int64 prevId = 1;
required PathBasedCacheDirectiveInfoProto filter = 2;
}
message ListPathBasedCacheDirectivesElementProto {
required PathBasedCacheDirectiveInfoProto info = 1;
}
message ListPathBasedCacheDirectivesResponseProto {
repeated ListPathBasedCacheDirectivesElementProto elements = 1;
required bool hasMore = 2;
}
@ -631,10 +635,12 @@ service ClientNamenodeProtocol {
rpc getFileInfo(GetFileInfoRequestProto) returns(GetFileInfoResponseProto);
rpc addPathBasedCacheDirective(AddPathBasedCacheDirectiveRequestProto)
returns (AddPathBasedCacheDirectiveResponseProto);
rpc removePathBasedCacheDescriptor(RemovePathBasedCacheDescriptorRequestProto)
returns (RemovePathBasedCacheDescriptorResponseProto);
rpc listPathBasedCacheDescriptors(ListPathBasedCacheDescriptorsRequestProto)
returns (ListPathBasedCacheDescriptorsResponseProto);
rpc modifyPathBasedCacheDirective(ModifyPathBasedCacheDirectiveRequestProto)
returns (ModifyPathBasedCacheDirectiveResponseProto);
rpc removePathBasedCacheDirective(RemovePathBasedCacheDirectiveRequestProto)
returns (RemovePathBasedCacheDirectiveResponseProto);
rpc listPathBasedCacheDirectives(ListPathBasedCacheDirectivesRequestProto)
returns (ListPathBasedCacheDirectivesResponseProto);
rpc addCachePool(AddCachePoolRequestProto)
returns(AddCachePoolResponseProto);
rpc modifyCachePool(ModifyCachePoolRequestProto)

View File

@ -1487,10 +1487,10 @@
</property>
<property>
<name>dfs.namenode.list.cache.descriptors.num.responses</name>
<name>dfs.namenode.list.cache.directives.num.responses</name>
<value>100</value>
<description>
This value controls the number of cache descriptors that the NameNode will
This value controls the number of cache directives that the NameNode will
send over the wire in response to a listDirectives RPC.
</description>
</property>

View File

@ -993,19 +993,26 @@ public class DFSTestUtil {
locatedBlocks = DFSClientAdapter.callGetBlockLocations(
cluster.getNameNodeRpc(nnIndex), filePath, 0L, bytes.length);
} while (locatedBlocks.isUnderConstruction());
// OP_ADD_CACHE_POOL 35
// OP_ADD_CACHE_POOL
filesystem.addCachePool(new CachePoolInfo("pool1"));
// OP_MODIFY_CACHE_POOL 36
// OP_MODIFY_CACHE_POOL
filesystem.modifyCachePool(new CachePoolInfo("pool1").setWeight(99));
// OP_ADD_PATH_BASED_CACHE_DIRECTIVE 33
PathBasedCacheDescriptor pbcd = filesystem.addPathBasedCacheDirective(
// OP_ADD_PATH_BASED_CACHE_DIRECTIVE
long id = filesystem.addPathBasedCacheDirective(
new PathBasedCacheDirective.Builder().
setPath(new Path("/path")).
setReplication((short)1).
setPool("pool1").
build());
// OP_REMOVE_PATH_BASED_CACHE_DESCRIPTOR 34
filesystem.removePathBasedCacheDescriptor(pbcd);
// OP_REMOVE_CACHE_POOL 37
// OP_MODIFY_PATH_BASED_CACHE_DIRECTIVE
filesystem.modifyPathBasedCacheDirective(
new PathBasedCacheDirective.Builder().
setId(id).
setReplication((short)2).
build());
// OP_REMOVE_PATH_BASED_CACHE_DIRECTIVE
filesystem.removePathBasedCacheDirective(id);
// OP_REMOVE_CACHE_POOL
filesystem.removeCachePool("pool1");
}

View File

@ -1,57 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.protocolPB;
import static org.junit.Assert.*;
import static org.mockito.Mockito.*;
import org.junit.Test;
import org.apache.hadoop.hdfs.protocol.AddPathBasedCacheDirectiveException.EmptyPathError;
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.AddPathBasedCacheDirectiveRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.PathBasedCacheDirectiveProto;
import com.google.protobuf.RpcController;
import com.google.protobuf.ServiceException;
public class TestClientNamenodeProtocolServerSideTranslatorPB {
@Test
public void testAddPathBasedCacheDirectiveEmptyPathError() throws Exception {
ClientProtocol server = mock(ClientProtocol.class);
RpcController controller = mock(RpcController.class);
AddPathBasedCacheDirectiveRequestProto request =
AddPathBasedCacheDirectiveRequestProto.newBuilder().
setDirective(PathBasedCacheDirectiveProto.newBuilder().
setPath("").
setPool("pool").
setReplication(1).
build()).
build();
ClientNamenodeProtocolServerSideTranslatorPB translator =
new ClientNamenodeProtocolServerSideTranslatorPB(server);
try {
translator.addPathBasedCacheDirective(controller, request);
fail("Expected ServiceException");
} catch (ServiceException e) {
assertNotNull(e.getCause());
assertTrue(e.getCause() instanceof EmptyPathError);
}
}
}

View File

@ -42,7 +42,6 @@ import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.protocol.PathBasedCacheDescriptor;
import org.apache.hadoop.hdfs.protocol.PathBasedCacheDirective;
import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
import org.apache.hadoop.hdfs.server.common.Util;
@ -242,14 +241,20 @@ public class OfflineEditsViewerHelper {
.setMode(new FsPermission((short)0700))
.setWeight(1989));
// OP_ADD_PATH_BASED_CACHE_DIRECTIVE 33
PathBasedCacheDescriptor descriptor =
dfs.addPathBasedCacheDirective(new PathBasedCacheDirective.Builder().
long id = dfs.addPathBasedCacheDirective(
new PathBasedCacheDirective.Builder().
setPath(new Path("/bar")).
setReplication((short)1).
setPool(pool).
build());
// OP_REMOVE_PATH_BASED_CACHE_DESCRIPTOR 34
dfs.removePathBasedCacheDescriptor(descriptor);
// OP_MODIFY_PATH_BASED_CACHE_DIRECTIVE 38
dfs.modifyPathBasedCacheDirective(
new PathBasedCacheDirective.Builder().
setId(id).
setPath(new Path("/bar2")).
build());
// OP_REMOVE_PATH_BASED_CACHE_DIRECTIVE 34
dfs.removePathBasedCacheDirective(id);
// OP_REMOVE_CACHE_POOL 37
dfs.removeCachePool(pool);
// sync to disk, otherwise we parse partial edits

View File

@ -413,7 +413,7 @@ public class TestNamenodeRetryCache {
LightWeightCache<CacheEntry, CacheEntry> cacheSet =
(LightWeightCache<CacheEntry, CacheEntry>) namesystem.getRetryCache().getCacheSet();
assertEquals(19, cacheSet.size());
assertEquals(20, cacheSet.size());
Map<CacheEntry, CacheEntry> oldEntries =
new HashMap<CacheEntry, CacheEntry>();
@ -432,7 +432,7 @@ public class TestNamenodeRetryCache {
assertTrue(namesystem.hasRetryCache());
cacheSet = (LightWeightCache<CacheEntry, CacheEntry>) namesystem
.getRetryCache().getCacheSet();
assertEquals(19, cacheSet.size());
assertEquals(20, cacheSet.size());
iter = cacheSet.iterator();
while (iter.hasNext()) {
CacheEntry entry = iter.next();

View File

@ -42,6 +42,7 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileSystemTestHelper;
import org.apache.hadoop.fs.IdNotFoundException;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.fs.permission.FsPermission;
@ -49,17 +50,12 @@ import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.AddPathBasedCacheDirectiveException.InvalidPathNameError;
import org.apache.hadoop.hdfs.protocol.AddPathBasedCacheDirectiveException.InvalidPoolNameError;
import org.apache.hadoop.hdfs.protocol.AddPathBasedCacheDirectiveException.PoolWritePermissionDeniedError;
import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
import org.apache.hadoop.hdfs.protocol.PathBasedCacheDescriptor;
import org.apache.hadoop.hdfs.protocol.PathBasedCacheDirective;
import org.apache.hadoop.hdfs.protocol.RemovePathBasedCacheDescriptorException.InvalidIdException;
import org.apache.hadoop.hdfs.protocol.RemovePathBasedCacheDescriptorException.NoSuchIdException;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor.CachedBlocksList.Type;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
import org.apache.hadoop.io.nativeio.NativeIO;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.GSet;
@ -86,7 +82,7 @@ public class TestPathBasedCacheRequests {
conf = new HdfsConfiguration();
// set low limits here for testing purposes
conf.setInt(DFSConfigKeys.DFS_NAMENODE_LIST_CACHE_POOLS_NUM_RESPONSES, 2);
conf.setInt(DFSConfigKeys.DFS_NAMENODE_LIST_CACHE_DESCRIPTORS_NUM_RESPONSES, 2);
conf.setInt(DFSConfigKeys.DFS_NAMENODE_LIST_CACHE_DIRECTIVES_NUM_RESPONSES, 2);
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
cluster.waitActive();
dfs = cluster.getFileSystem();
@ -296,21 +292,21 @@ public class TestPathBasedCacheRequests {
}
private static void validateListAll(
RemoteIterator<PathBasedCacheDescriptor> iter,
PathBasedCacheDescriptor... descriptors) throws Exception {
for (PathBasedCacheDescriptor descriptor: descriptors) {
RemoteIterator<PathBasedCacheDirective> iter,
Long... ids) throws Exception {
for (Long id: ids) {
assertTrue("Unexpectedly few elements", iter.hasNext());
assertEquals("Unexpected descriptor", descriptor, iter.next());
assertEquals("Unexpected directive ID", id, iter.next().getId());
}
assertFalse("Unexpectedly many list elements", iter.hasNext());
}
private static PathBasedCacheDescriptor addAsUnprivileged(
private static long addAsUnprivileged(
final PathBasedCacheDirective directive) throws Exception {
return unprivilegedUser
.doAs(new PrivilegedExceptionAction<PathBasedCacheDescriptor>() {
.doAs(new PrivilegedExceptionAction<Long>() {
@Override
public PathBasedCacheDescriptor run() throws IOException {
public Long run() throws IOException {
DistributedFileSystem myDfs =
(DistributedFileSystem) FileSystem.get(conf);
return myDfs.addPathBasedCacheDirective(directive);
@ -342,12 +338,12 @@ public class TestPathBasedCacheRequests {
setPool("pool1").
build();
PathBasedCacheDescriptor alphaD = addAsUnprivileged(alpha);
PathBasedCacheDescriptor alphaD2 = addAsUnprivileged(alpha);
assertFalse("Expected to get unique descriptors when re-adding an "
long alphaId = addAsUnprivileged(alpha);
long alphaId2 = addAsUnprivileged(alpha);
assertFalse("Expected to get unique directives when re-adding an "
+ "existing PathBasedCacheDirective",
alphaD.getEntryId() == alphaD2.getEntryId());
PathBasedCacheDescriptor betaD = addAsUnprivileged(beta);
alphaId == alphaId2);
long betaId = addAsUnprivileged(beta);
try {
addAsUnprivileged(new PathBasedCacheDirective.Builder().
@ -355,8 +351,8 @@ public class TestPathBasedCacheRequests {
setPool("no_such_pool").
build());
fail("expected an error when adding to a non-existent pool.");
} catch (IOException ioe) {
assertTrue(ioe instanceof InvalidPoolNameError);
} catch (IdNotFoundException ioe) {
GenericTestUtils.assertExceptionContains("no such pool as", ioe);
}
try {
@ -366,8 +362,9 @@ public class TestPathBasedCacheRequests {
build());
fail("expected an error when adding to a pool with " +
"mode 0 (no permissions for anyone).");
} catch (IOException ioe) {
assertTrue(ioe instanceof PoolWritePermissionDeniedError);
} catch (AccessControlException e) {
GenericTestUtils.
assertExceptionContains("permission denied for pool", e);
}
try {
@ -378,7 +375,7 @@ public class TestPathBasedCacheRequests {
fail("expected an error when adding a malformed path " +
"to the cache directives.");
} catch (IllegalArgumentException e) {
// expected
GenericTestUtils.assertExceptionContains("is not a valid DFS filename", e);
}
try {
@ -389,59 +386,74 @@ public class TestPathBasedCacheRequests {
build());
Assert.fail("expected an error when adding a PathBasedCache " +
"directive with an empty pool name.");
} catch (IOException ioe) {
Assert.assertTrue(ioe instanceof InvalidPoolNameError);
} catch (IdNotFoundException e) {
GenericTestUtils.assertExceptionContains("pool name was empty", e);
}
PathBasedCacheDescriptor deltaD = addAsUnprivileged(delta);
long deltaId = addAsUnprivileged(delta);
// We expect the following to succeed, because DistributedFileSystem
// qualifies the path.
PathBasedCacheDescriptor relativeD = addAsUnprivileged(
long relativeId = addAsUnprivileged(
new PathBasedCacheDirective.Builder().
setPath(new Path("relative")).
setPool("pool1").
build());
RemoteIterator<PathBasedCacheDescriptor> iter;
iter = dfs.listPathBasedCacheDescriptors(null, null);
validateListAll(iter, alphaD, alphaD2, betaD, deltaD, relativeD);
iter = dfs.listPathBasedCacheDescriptors("pool3", null);
RemoteIterator<PathBasedCacheDirective> iter;
iter = dfs.listPathBasedCacheDirectives(null);
validateListAll(iter, alphaId, alphaId2, betaId, deltaId, relativeId );
iter = dfs.listPathBasedCacheDirectives(
new PathBasedCacheDirective.Builder().setPool("pool3").build());
Assert.assertFalse(iter.hasNext());
iter = dfs.listPathBasedCacheDescriptors("pool1", null);
validateListAll(iter, alphaD, alphaD2, deltaD, relativeD);
iter = dfs.listPathBasedCacheDescriptors("pool2", null);
validateListAll(iter, betaD);
iter = dfs.listPathBasedCacheDirectives(
new PathBasedCacheDirective.Builder().setPool("pool1").build());
validateListAll(iter, alphaId, alphaId2, deltaId, relativeId );
iter = dfs.listPathBasedCacheDirectives(
new PathBasedCacheDirective.Builder().setPool("pool2").build());
validateListAll(iter, betaId);
dfs.removePathBasedCacheDescriptor(betaD);
iter = dfs.listPathBasedCacheDescriptors("pool2", null);
dfs.removePathBasedCacheDirective(betaId);
iter = dfs.listPathBasedCacheDirectives(
new PathBasedCacheDirective.Builder().setPool("pool2").build());
Assert.assertFalse(iter.hasNext());
try {
dfs.removePathBasedCacheDescriptor(betaD);
dfs.removePathBasedCacheDirective(betaId);
Assert.fail("expected an error when removing a non-existent ID");
} catch (IOException ioe) {
Assert.assertTrue(ioe instanceof NoSuchIdException);
} catch (IdNotFoundException e) {
GenericTestUtils.assertExceptionContains("id not found", e);
}
try {
proto.removePathBasedCacheDescriptor(-42l);
proto.removePathBasedCacheDirective(-42l);
Assert.fail("expected an error when removing a negative ID");
} catch (IOException ioe) {
Assert.assertTrue(ioe instanceof InvalidIdException);
} catch (IdNotFoundException e) {
GenericTestUtils.assertExceptionContains(
"invalid non-positive directive ID", e);
}
try {
proto.removePathBasedCacheDescriptor(43l);
proto.removePathBasedCacheDirective(43l);
Assert.fail("expected an error when removing a non-existent ID");
} catch (IOException ioe) {
Assert.assertTrue(ioe instanceof NoSuchIdException);
} catch (IdNotFoundException e) {
GenericTestUtils.assertExceptionContains("id not found", e);
}
dfs.removePathBasedCacheDescriptor(alphaD);
dfs.removePathBasedCacheDescriptor(alphaD2);
dfs.removePathBasedCacheDescriptor(deltaD);
dfs.removePathBasedCacheDescriptor(relativeD);
iter = dfs.listPathBasedCacheDescriptors(null, null);
dfs.removePathBasedCacheDirective(alphaId);
dfs.removePathBasedCacheDirective(alphaId2);
dfs.removePathBasedCacheDirective(deltaId);
dfs.modifyPathBasedCacheDirective(new PathBasedCacheDirective.Builder().
setId(relativeId).
setReplication((short)555).
build());
iter = dfs.listPathBasedCacheDirectives(null);
assertTrue(iter.hasNext());
PathBasedCacheDirective modified = iter.next();
assertEquals(relativeId, modified.getId().longValue());
assertEquals((short)555, modified.getReplication().shortValue());
dfs.removePathBasedCacheDirective(relativeId);
iter = dfs.listPathBasedCacheDirectives(null);
assertFalse(iter.hasNext());
}
@ -481,16 +493,16 @@ public class TestPathBasedCacheRequests {
new PathBasedCacheDirective.Builder().
setPath(new Path(entryPrefix + i)).setPool(pool).build());
}
RemoteIterator<PathBasedCacheDescriptor> dit
= dfs.listPathBasedCacheDescriptors(null, null);
RemoteIterator<PathBasedCacheDirective> dit
= dfs.listPathBasedCacheDirectives(null);
for (int i=0; i<numEntries; i++) {
assertTrue("Unexpected # of cache entries: " + i, dit.hasNext());
PathBasedCacheDescriptor cd = dit.next();
assertEquals(i+1, cd.getEntryId());
PathBasedCacheDirective cd = dit.next();
assertEquals(i+1, cd.getId().longValue());
assertEquals(entryPrefix + i, cd.getPath().toUri().getPath());
assertEquals(pool, cd.getPool());
}
assertFalse("Unexpected # of cache descriptors found", dit.hasNext());
assertFalse("Unexpected # of cache directives found", dit.hasNext());
// Restart namenode
cluster.restartNameNode();
@ -506,15 +518,15 @@ public class TestPathBasedCacheRequests {
assertEquals(weight, (int)info.getWeight());
assertFalse("Unexpected # of cache pools found", pit.hasNext());
dit = dfs.listPathBasedCacheDescriptors(null, null);
dit = dfs.listPathBasedCacheDirectives(null);
for (int i=0; i<numEntries; i++) {
assertTrue("Unexpected # of cache entries: " + i, dit.hasNext());
PathBasedCacheDescriptor cd = dit.next();
assertEquals(i+1, cd.getEntryId());
PathBasedCacheDirective cd = dit.next();
assertEquals(i+1, cd.getId().longValue());
assertEquals(entryPrefix + i, cd.getPath().toUri().getPath());
assertEquals(pool, cd.getPool());
}
assertFalse("Unexpected # of cache descriptors found", dit.hasNext());
assertFalse("Unexpected # of cache directives found", dit.hasNext());
}
private static void waitForCachedBlocks(NameNode nn,
@ -625,21 +637,16 @@ public class TestPathBasedCacheRequests {
setPath(new Path(paths.get(i))).
setPool(pool).
build();
PathBasedCacheDescriptor descriptor =
nnRpc.addPathBasedCacheDirective(directive);
assertEquals("Descriptor does not match requested path",
new Path(paths.get(i)), descriptor.getPath());
assertEquals("Descriptor does not match requested pool", pool,
descriptor.getPool());
nnRpc.addPathBasedCacheDirective(directive);
expected += numBlocksPerFile;
waitForCachedBlocks(namenode, expected, expected);
}
// Uncache and check each path in sequence
RemoteIterator<PathBasedCacheDescriptor> entries =
nnRpc.listPathBasedCacheDescriptors(0, null, null);
RemoteIterator<PathBasedCacheDirective> entries =
nnRpc.listPathBasedCacheDirectives(0, null);
for (int i=0; i<numFiles; i++) {
PathBasedCacheDescriptor descriptor = entries.next();
nnRpc.removePathBasedCacheDescriptor(descriptor.getEntryId());
PathBasedCacheDirective directive = entries.next();
nnRpc.removePathBasedCacheDirective(directive.getId());
expected -= numBlocksPerFile;
waitForCachedBlocks(namenode, expected, expected);
}
@ -723,17 +730,15 @@ public class TestPathBasedCacheRequests {
}
waitForCachedBlocks(namenode, 0, 0);
// cache entire directory
PathBasedCacheDescriptor descriptor = dfs.addPathBasedCacheDirective(
long id = dfs.addPathBasedCacheDirective(
new PathBasedCacheDirective.Builder().
setPath(new Path("/foo")).
setReplication((short)2).
setPool(pool).
build());
assertEquals("Descriptor does not match requested pool", pool,
descriptor.getPool());
waitForCachedBlocks(namenode, 4, 8);
// remove and watch numCached go to 0
dfs.removePathBasedCacheDescriptor(descriptor);
dfs.removePathBasedCacheDirective(id);
waitForCachedBlocks(namenode, 0, 0);
} finally {
cluster.shutdown();

View File

@ -61,7 +61,6 @@ import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.protocol.PathBasedCacheDescriptor;
import org.apache.hadoop.hdfs.protocol.PathBasedCacheDirective;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoUnderConstruction;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
@ -151,7 +150,7 @@ public class TestRetryCacheWithHA {
FSNamesystem fsn0 = cluster.getNamesystem(0);
LightWeightCache<CacheEntry, CacheEntry> cacheSet =
(LightWeightCache<CacheEntry, CacheEntry>) fsn0.getRetryCache().getCacheSet();
assertEquals(19, cacheSet.size());
assertEquals(20, cacheSet.size());
Map<CacheEntry, CacheEntry> oldEntries =
new HashMap<CacheEntry, CacheEntry>();
@ -172,7 +171,7 @@ public class TestRetryCacheWithHA {
FSNamesystem fsn1 = cluster.getNamesystem(1);
cacheSet = (LightWeightCache<CacheEntry, CacheEntry>) fsn1
.getRetryCache().getCacheSet();
assertEquals(19, cacheSet.size());
assertEquals(20, cacheSet.size());
iter = cacheSet.iterator();
while (iter.hasNext()) {
CacheEntry entry = iter.next();
@ -739,35 +738,34 @@ public class TestRetryCacheWithHA {
/** addPathBasedCacheDirective */
class AddPathBasedCacheDirectiveOp extends AtMostOnceOp {
private String pool;
private String path;
private PathBasedCacheDescriptor descriptor;
private PathBasedCacheDirective directive;
private Long result;
AddPathBasedCacheDirectiveOp(DFSClient client, String pool, String path) {
AddPathBasedCacheDirectiveOp(DFSClient client,
PathBasedCacheDirective directive) {
super("addPathBasedCacheDirective", client);
this.pool = pool;
this.path = path;
this.directive = directive;
}
@Override
void prepare() throws Exception {
dfs.addCachePool(new CachePoolInfo(pool));
dfs.addCachePool(new CachePoolInfo(directive.getPool()));
}
@Override
void invoke() throws Exception {
descriptor = client.addPathBasedCacheDirective(
new PathBasedCacheDirective.Builder().
setPath(new Path(path)).
setPool(pool).
build());
result = client.addPathBasedCacheDirective(directive);
}
@Override
boolean checkNamenodeBeforeReturn() throws Exception {
for (int i = 0; i < CHECKTIMES; i++) {
RemoteIterator<PathBasedCacheDescriptor> iter =
dfs.listPathBasedCacheDescriptors(pool, new Path(path));
RemoteIterator<PathBasedCacheDirective> iter =
dfs.listPathBasedCacheDirectives(
new PathBasedCacheDirective.Builder().
setPool(directive.getPool()).
setPath(directive.getPath()).
build());
if (iter.hasNext()) {
return true;
}
@ -778,43 +776,99 @@ public class TestRetryCacheWithHA {
@Override
Object getResult() {
return descriptor;
return result;
}
}
/** removePathBasedCacheDescriptor */
class RemovePathBasedCacheDescriptorOp extends AtMostOnceOp {
private String pool;
private String path;
private PathBasedCacheDescriptor descriptor;
/** modifyPathBasedCacheDirective */
class ModifyPathBasedCacheDirectiveOp extends AtMostOnceOp {
private final PathBasedCacheDirective directive;
private final short newReplication;
private long id;
RemovePathBasedCacheDescriptorOp(DFSClient client, String pool,
String path) {
super("removePathBasedCacheDescriptor", client);
this.pool = pool;
this.path = path;
ModifyPathBasedCacheDirectiveOp(DFSClient client,
PathBasedCacheDirective directive, short newReplication) {
super("modifyPathBasedCacheDirective", client);
this.directive = directive;
this.newReplication = newReplication;
}
@Override
void prepare() throws Exception {
dfs.addCachePool(new CachePoolInfo(pool));
descriptor = dfs.addPathBasedCacheDirective(
new PathBasedCacheDirective.Builder().
setPath(new Path(path)).
setPool(pool).
build());
dfs.addCachePool(new CachePoolInfo(directive.getPool()));
id = client.addPathBasedCacheDirective(directive);
}
@Override
void invoke() throws Exception {
client.removePathBasedCacheDescriptor(descriptor.getEntryId());
client.modifyPathBasedCacheDirective(
new PathBasedCacheDirective.Builder().
setId(id).
setReplication(newReplication).
build());
}
@Override
boolean checkNamenodeBeforeReturn() throws Exception {
for (int i = 0; i < CHECKTIMES; i++) {
RemoteIterator<PathBasedCacheDescriptor> iter =
dfs.listPathBasedCacheDescriptors(pool, new Path(path));
RemoteIterator<PathBasedCacheDirective> iter =
dfs.listPathBasedCacheDirectives(
new PathBasedCacheDirective.Builder().
setPool(directive.getPool()).
setPath(directive.getPath()).
build());
while (iter.hasNext()) {
PathBasedCacheDirective result = iter.next();
if ((result.getId() == id) &&
(result.getReplication().shortValue() == newReplication)) {
return true;
}
}
Thread.sleep(1000);
}
return false;
}
@Override
Object getResult() {
return null;
}
}
/** removePathBasedCacheDirective */
class RemovePathBasedCacheDirectiveOp extends AtMostOnceOp {
private PathBasedCacheDirective directive;
private long id;
RemovePathBasedCacheDirectiveOp(DFSClient client, String pool,
String path) {
super("removePathBasedCacheDirective", client);
this.directive = new PathBasedCacheDirective.Builder().
setPool(pool).
setPath(new Path(path)).
build();
}
@Override
void prepare() throws Exception {
dfs.addCachePool(new CachePoolInfo(directive.getPool()));
id = dfs.addPathBasedCacheDirective(directive);
}
@Override
void invoke() throws Exception {
client.removePathBasedCacheDirective(id);
}
@Override
boolean checkNamenodeBeforeReturn() throws Exception {
for (int i = 0; i < CHECKTIMES; i++) {
RemoteIterator<PathBasedCacheDirective> iter =
dfs.listPathBasedCacheDirectives(
new PathBasedCacheDirective.Builder().
setPool(directive.getPool()).
setPath(directive.getPath()).
build());
if (!iter.hasNext()) {
return true;
}
@ -1019,14 +1073,30 @@ public class TestRetryCacheWithHA {
@Test (timeout=60000)
public void testAddPathBasedCacheDirective() throws Exception {
DFSClient client = genClientWithDummyHandler();
AtMostOnceOp op = new AddPathBasedCacheDirectiveOp(client, "pool", "/path");
AtMostOnceOp op = new AddPathBasedCacheDirectiveOp(client,
new PathBasedCacheDirective.Builder().
setPool("pool").
setPath(new Path("/path")).
build());
testClientRetryWithFailover(op);
}
@Test (timeout=60000)
public void testModifyPathBasedCacheDirective() throws Exception {
DFSClient client = genClientWithDummyHandler();
AtMostOnceOp op = new ModifyPathBasedCacheDirectiveOp(client,
new PathBasedCacheDirective.Builder().
setPool("pool").
setPath(new Path("/path")).
setReplication((short)1).build(),
(short)555);
testClientRetryWithFailover(op);
}
@Test (timeout=60000)
public void testRemovePathBasedCacheDescriptor() throws Exception {
DFSClient client = genClientWithDummyHandler();
AtMostOnceOp op = new RemovePathBasedCacheDescriptorOp(client, "pool",
AtMostOnceOp op = new RemovePathBasedCacheDirectiveOp(client, "pool",
"/path");
testClientRetryWithFailover(op);
}

View File

@ -843,6 +843,7 @@
<OPCODE>OP_ADD_PATH_BASED_CACHE_DIRECTIVE</OPCODE>
<DATA>
<TXID>63</TXID>
<ID>1</ID>
<PATH>/bar</PATH>
<REPLICATION>1</REPLICATION>
<POOL>poolparty</POOL>
@ -851,10 +852,20 @@
</DATA>
</RECORD>
<RECORD>
<OPCODE>OP_REMOVE_PATH_BASED_CACHE_DESCRIPTOR</OPCODE>
<OPCODE>OP_MODIFY_PATH_BASED_CACHE_DIRECTIVE</OPCODE>
<DATA>
<TXID>64</TXID>
<ID>1</ID>
<REPLICATION>2</REPLICATION>
<RPC_CLIENTID></RPC_CLIENTID>
<RPC_CALLID>-2</RPC_CALLID>
</DATA>
</RECORD>
<RECORD>
<OPCODE>OP_REMOVE_PATH_BASED_CACHE_DIRECTIVE</OPCODE>
<DATA>
<TXID>65</TXID>
<ID>1</ID>
<RPC_CLIENTID>27ac79f0-d378-4933-824b-c2a188968d97</RPC_CLIENTID>
<RPC_CALLID>78</RPC_CALLID>
</DATA>
@ -862,7 +873,7 @@
<RECORD>
<OPCODE>OP_REMOVE_CACHE_POOL</OPCODE>
<DATA>
<TXID>65</TXID>
<TXID>66</TXID>
<POOLNAME>poolparty</POOLNAME>
<RPC_CLIENTID>27ac79f0-d378-4933-824b-c2a188968d97</RPC_CLIENTID>
<RPC_CALLID>79</RPC_CALLID>
@ -871,7 +882,7 @@
<RECORD>
<OPCODE>OP_ADD</OPCODE>
<DATA>
<TXID>66</TXID>
<TXID>67</TXID>
<LENGTH>0</LENGTH>
<INODEID>16393</INODEID>
<PATH>/hard-lease-recovery-test</PATH>
@ -893,29 +904,15 @@
<RECORD>
<OPCODE>OP_ALLOCATE_BLOCK_ID</OPCODE>
<DATA>
<TXID>67</TXID>
<TXID>68</TXID>
<BLOCK_ID>1073741834</BLOCK_ID>
</DATA>
</RECORD>
<RECORD>
<OPCODE>OP_SET_GENSTAMP_V2</OPCODE>
<DATA>
<TXID>68</TXID>
<GENSTAMPV2>1010</GENSTAMPV2>
</DATA>
</RECORD>
<RECORD>
<OPCODE>OP_UPDATE_BLOCKS</OPCODE>
<DATA>
<TXID>69</TXID>
<PATH>/hard-lease-recovery-test</PATH>
<BLOCK>
<BLOCK_ID>1073741834</BLOCK_ID>
<NUM_BYTES>0</NUM_BYTES>
<GENSTAMP>1010</GENSTAMP>
</BLOCK>
<RPC_CLIENTID></RPC_CLIENTID>
<RPC_CALLID>-2</RPC_CALLID>
<GENSTAMPV2>1010</GENSTAMPV2>
</DATA>
</RECORD>
<RECORD>
@ -933,16 +930,30 @@
</DATA>
</RECORD>
<RECORD>
<OPCODE>OP_SET_GENSTAMP_V2</OPCODE>
<OPCODE>OP_UPDATE_BLOCKS</OPCODE>
<DATA>
<TXID>71</TXID>
<PATH>/hard-lease-recovery-test</PATH>
<BLOCK>
<BLOCK_ID>1073741834</BLOCK_ID>
<NUM_BYTES>0</NUM_BYTES>
<GENSTAMP>1010</GENSTAMP>
</BLOCK>
<RPC_CLIENTID></RPC_CLIENTID>
<RPC_CALLID>-2</RPC_CALLID>
</DATA>
</RECORD>
<RECORD>
<OPCODE>OP_SET_GENSTAMP_V2</OPCODE>
<DATA>
<TXID>72</TXID>
<GENSTAMPV2>1011</GENSTAMPV2>
</DATA>
</RECORD>
<RECORD>
<OPCODE>OP_REASSIGN_LEASE</OPCODE>
<DATA>
<TXID>72</TXID>
<TXID>73</TXID>
<LEASEHOLDER>DFSClient_NONMAPREDUCE_-134124999_1</LEASEHOLDER>
<PATH>/hard-lease-recovery-test</PATH>
<NEWHOLDER>HDFS_NameNode</NEWHOLDER>
@ -951,14 +962,14 @@
<RECORD>
<OPCODE>OP_SET_GENSTAMP_V2</OPCODE>
<DATA>
<TXID>73</TXID>
<TXID>74</TXID>
<GENSTAMPV2>1012</GENSTAMPV2>
</DATA>
</RECORD>
<RECORD>
<OPCODE>OP_REASSIGN_LEASE</OPCODE>
<DATA>
<TXID>74</TXID>
<TXID>75</TXID>
<LEASEHOLDER>HDFS_NameNode</LEASEHOLDER>
<PATH>/hard-lease-recovery-test</PATH>
<NEWHOLDER>HDFS_NameNode</NEWHOLDER>
@ -967,7 +978,7 @@
<RECORD>
<OPCODE>OP_CLOSE</OPCODE>
<DATA>
<TXID>75</TXID>
<TXID>76</TXID>
<LENGTH>0</LENGTH>
<INODEID>0</INODEID>
<PATH>/hard-lease-recovery-test</PATH>
@ -992,7 +1003,7 @@
<RECORD>
<OPCODE>OP_END_LOG_SEGMENT</OPCODE>
<DATA>
<TXID>76</TXID>
<TXID>77</TXID>
</DATA>
</RECORD>
</EDITS>