HDFS-5473. Consistent naming of user-visible caching classes and methods (cmccabe)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1544252 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Colin McCabe 2013-11-21 17:12:58 +00:00
parent 18b53b780d
commit f91a45a96c
27 changed files with 909 additions and 665 deletions

View File

@ -210,6 +210,9 @@ Trunk (Unreleased)
HDFS-5531. Combine the getNsQuota() and getDsQuota() methods in INode.
(szetszwo)
HDFS-5473. Consistent naming of user-visible caching classes and methods
(cmccabe)
OPTIMIZATIONS
HDFS-5349. DNA_CACHE and DNA_UNCACHE should be by blockId only. (cmccabe)

View File

@ -108,6 +108,7 @@
import org.apache.hadoop.hdfs.client.ClientMmapManager;
import org.apache.hadoop.hdfs.client.HdfsDataInputStream;
import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry;
import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.protocol.CorruptFileBlocks;
@ -117,7 +118,7 @@
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsBlocksMetadata;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.PathBasedCacheDirective;
import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
@ -2290,41 +2291,41 @@ public SnapshotDiffReport getSnapshotDiffReport(String snapshotDir,
}
}
public long addPathBasedCacheDirective(
PathBasedCacheDirective directive) throws IOException {
public long addCacheDirective(
CacheDirectiveInfo info) throws IOException {
checkOpen();
try {
return namenode.addPathBasedCacheDirective(directive);
return namenode.addCacheDirective(info);
} catch (RemoteException re) {
throw re.unwrapRemoteException();
}
}
public void modifyPathBasedCacheDirective(
PathBasedCacheDirective directive) throws IOException {
public void modifyCacheDirective(
CacheDirectiveInfo info) throws IOException {
checkOpen();
try {
namenode.modifyPathBasedCacheDirective(directive);
namenode.modifyCacheDirective(info);
} catch (RemoteException re) {
throw re.unwrapRemoteException();
}
}
public void removePathBasedCacheDirective(long id)
public void removeCacheDirective(long id)
throws IOException {
checkOpen();
try {
namenode.removePathBasedCacheDirective(id);
namenode.removeCacheDirective(id);
} catch (RemoteException re) {
throw re.unwrapRemoteException();
}
}
public RemoteIterator<PathBasedCacheDirective> listPathBasedCacheDirectives(
PathBasedCacheDirective filter) throws IOException {
public RemoteIterator<CacheDirectiveEntry> listCacheDirectives(
CacheDirectiveInfo filter) throws IOException {
checkOpen();
try {
return namenode.listPathBasedCacheDirectives(0, filter);
return namenode.listCacheDirectives(0, filter);
} catch (RemoteException re) {
throw re.unwrapRemoteException();
}

View File

@ -57,6 +57,7 @@
import org.apache.hadoop.hdfs.client.HdfsAdmin;
import org.apache.hadoop.hdfs.client.HdfsDataInputStream;
import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry;
import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.DirectoryListing;
@ -67,7 +68,7 @@
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.protocol.HdfsLocatedFileStatus;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.PathBasedCacheDirective;
import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo;
import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport;
import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus;
import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
@ -1584,78 +1585,88 @@ public Boolean next(final FileSystem fs, final Path p)
}
/**
* Add a new PathBasedCacheDirective.
* Add a new CacheDirective.
*
* @param directive A directive to add.
* @param info Information about a directive to add.
* @return the ID of the directive that was created.
* @throws IOException if the directive could not be added
*/
public long addPathBasedCacheDirective(
PathBasedCacheDirective directive) throws IOException {
Preconditions.checkNotNull(directive.getPath());
Path path = new Path(getPathName(fixRelativePart(directive.getPath()))).
public long addCacheDirective(
CacheDirectiveInfo info) throws IOException {
Preconditions.checkNotNull(info.getPath());
Path path = new Path(getPathName(fixRelativePart(info.getPath()))).
makeQualified(getUri(), getWorkingDirectory());
return dfs.addPathBasedCacheDirective(
new PathBasedCacheDirective.Builder(directive).
return dfs.addCacheDirective(
new CacheDirectiveInfo.Builder(info).
setPath(path).
build());
}
public void modifyPathBasedCacheDirective(
PathBasedCacheDirective directive) throws IOException {
if (directive.getPath() != null) {
directive = new PathBasedCacheDirective.Builder(directive).
setPath(new Path(getPathName(fixRelativePart(directive.getPath()))).
/**
* Modify a CacheDirective.
*
* @param info Information about the directive to modify.
* You must set the ID to indicate which CacheDirective you want
* to modify.
* @throws IOException if the directive could not be modified
*/
public void modifyCacheDirective(
CacheDirectiveInfo info) throws IOException {
if (info.getPath() != null) {
info = new CacheDirectiveInfo.Builder(info).
setPath(new Path(getPathName(fixRelativePart(info.getPath()))).
makeQualified(getUri(), getWorkingDirectory())).build();
}
dfs.modifyPathBasedCacheDirective(directive);
dfs.modifyCacheDirective(info);
}
/**
* Remove a PathBasedCacheDirective.
* Remove a CacheDirectiveInfo.
*
* @param id identifier of the PathBasedCacheDirective to remove
* @param id identifier of the CacheDirectiveInfo to remove
* @throws IOException if the directive could not be removed
*/
public void removePathBasedCacheDirective(long id)
public void removeCacheDirective(long id)
throws IOException {
dfs.removePathBasedCacheDirective(id);
dfs.removeCacheDirective(id);
}
/**
* List the set of cached paths of a cache pool. Incrementally fetches results
* from the server.
* List cache directives. Incrementally fetches results from the server.
*
* @param filter Filter parameters to use when listing the directives, null to
* list all directives visible to us.
* @return A RemoteIterator which returns PathBasedCacheDirective objects.
* @return A RemoteIterator which returns CacheDirectiveInfo objects.
*/
public RemoteIterator<PathBasedCacheDirective> listPathBasedCacheDirectives(
PathBasedCacheDirective filter) throws IOException {
public RemoteIterator<CacheDirectiveEntry> listCacheDirectives(
CacheDirectiveInfo filter) throws IOException {
if (filter == null) {
filter = new PathBasedCacheDirective.Builder().build();
filter = new CacheDirectiveInfo.Builder().build();
}
if (filter.getPath() != null) {
filter = new PathBasedCacheDirective.Builder(filter).
filter = new CacheDirectiveInfo.Builder(filter).
setPath(new Path(getPathName(fixRelativePart(filter.getPath())))).
build();
}
final RemoteIterator<PathBasedCacheDirective> iter =
dfs.listPathBasedCacheDirectives(filter);
return new RemoteIterator<PathBasedCacheDirective>() {
final RemoteIterator<CacheDirectiveEntry> iter =
dfs.listCacheDirectives(filter);
return new RemoteIterator<CacheDirectiveEntry>() {
@Override
public boolean hasNext() throws IOException {
return iter.hasNext();
}
@Override
public PathBasedCacheDirective next() throws IOException {
public CacheDirectiveEntry next() throws IOException {
// Although the paths we get back from the NameNode should always be
// absolute, we call makeQualified to add the scheme and authority of
// this DistributedFilesystem.
PathBasedCacheDirective desc = iter.next();
Path p = desc.getPath().makeQualified(getUri(), getWorkingDirectory());
return new PathBasedCacheDirective.Builder(desc).setPath(p).build();
CacheDirectiveEntry desc = iter.next();
CacheDirectiveInfo info = desc.getInfo();
Path p = info.getPath().makeQualified(getUri(), getWorkingDirectory());
return new CacheDirectiveEntry(
new CacheDirectiveInfo.Builder(info).setPath(p).build(),
desc.getStats());
}
};
}

View File

@ -25,7 +25,11 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry;
import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo;
import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.tools.DFSAdmin;
@ -121,4 +125,100 @@ public void allowSnapshot(Path path) throws IOException {
public void disallowSnapshot(Path path) throws IOException {
dfs.disallowSnapshot(path);
}
/**
* Add a new CacheDirectiveInfo.
*
* @param info Information about a directive to add.
* @return the ID of the directive that was created.
* @throws IOException if the directive could not be added
*/
public long addCacheDirective(CacheDirectiveInfo info)
throws IOException {
return dfs.addCacheDirective(info);
}
/**
* Modify a CacheDirective.
*
* @param info Information about the directive to modify.
* You must set the ID to indicate which CacheDirective you want
* to modify.
* @throws IOException if the directive could not be modified
*/
public void modifyCacheDirective(CacheDirectiveInfo info)
throws IOException {
dfs.modifyCacheDirective(info);
}
/**
* Remove a CacheDirective.
*
* @param id identifier of the CacheDirectiveInfo to remove
* @throws IOException if the directive could not be removed
*/
public void removeCacheDirective(long id)
throws IOException {
dfs.removeCacheDirective(id);
}
/**
* List cache directives. Incrementally fetches results from the server.
*
* @param filter Filter parameters to use when listing the directives, null to
* list all directives visible to us.
* @return A RemoteIterator which returns CacheDirectiveInfo objects.
*/
public RemoteIterator<CacheDirectiveEntry> listCacheDirectives(
CacheDirectiveInfo filter) throws IOException {
return dfs.listCacheDirectives(filter);
}
/**
* Add a cache pool.
*
* @param info
* The request to add a cache pool.
* @throws IOException
* If the request could not be completed.
*/
public void addCachePool(CachePoolInfo info) throws IOException {
dfs.addCachePool(info);
}
/**
* Modify an existing cache pool.
*
* @param info
* The request to modify a cache pool.
* @throws IOException
* If the request could not be completed.
*/
public void modifyCachePool(CachePoolInfo info) throws IOException {
dfs.modifyCachePool(info);
}
/**
* Remove a cache pool.
*
* @param poolName
* Name of the cache pool to remove.
* @throws IOException
* if the cache pool did not exist, or could not be removed.
*/
public void removeCachePool(String poolName) throws IOException {
dfs.removeCachePool(poolName);
}
/**
* List all cache pools.
*
* @return A remote iterator from which you can get CachePoolInfo objects.
* Requests will be made as needed.
* @throws IOException
* If there was an error listing cache pools.
*/
public RemoteIterator<CachePoolInfo> listCachePools() throws IOException {
return dfs.listCachePools();
}
}

View File

@ -30,7 +30,7 @@
* This is an implementation class, not part of the public API.
*/
@InterfaceAudience.Private
public final class PathBasedCacheEntry {
public final class CacheDirective {
private final long entryId;
private final String path;
private final short replication;
@ -39,7 +39,7 @@ public final class PathBasedCacheEntry {
private long bytesCached;
private long filesAffected;
public PathBasedCacheEntry(long entryId, String path,
public CacheDirective(long entryId, String path,
short replication, CachePool pool) {
Preconditions.checkArgument(entryId > 0);
this.entryId = entryId;
@ -70,17 +70,26 @@ public short getReplication() {
return replication;
}
public PathBasedCacheDirective toDirective() {
return new PathBasedCacheDirective.Builder().
public CacheDirectiveInfo toDirective() {
return new CacheDirectiveInfo.Builder().
setId(entryId).
setPath(new Path(path)).
setReplication(replication).
setPool(pool.getPoolName()).
build();
}
public CacheDirectiveStats toStats() {
return new CacheDirectiveStats.Builder().
setBytesNeeded(bytesNeeded).
setBytesCached(bytesCached).
setFilesAffected(filesAffected).
build();
}
public CacheDirectiveEntry toEntry() {
return new CacheDirectiveEntry(toDirective(), toStats());
}
@Override
public String toString() {
@ -103,7 +112,7 @@ public boolean equals(Object o) {
if (o.getClass() != this.getClass()) {
return false;
}
PathBasedCacheEntry other = (PathBasedCacheEntry)o;
CacheDirective other = (CacheDirective)o;
return entryId == other.entryId;
}

View File

@ -0,0 +1,45 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.protocol;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
/**
* Describes a path-based cache directive entry.
*/
@InterfaceStability.Evolving
@InterfaceAudience.Public
public class CacheDirectiveEntry {
private final CacheDirectiveInfo info;
private final CacheDirectiveStats stats;
public CacheDirectiveEntry(CacheDirectiveInfo info,
CacheDirectiveStats stats) {
this.info = info;
this.stats = stats;
}
public CacheDirectiveInfo getInfo() {
return info;
}
public CacheDirectiveStats getStats() {
return stats;
}
};

View File

@ -28,27 +28,23 @@
*/
@InterfaceStability.Evolving
@InterfaceAudience.Public
public class PathBasedCacheDirective {
public class CacheDirectiveInfo {
/**
* A builder for creating new PathBasedCacheDirective instances.
* A builder for creating new CacheDirectiveInfo instances.
*/
public static class Builder {
private Long id;
private Path path;
private Short replication;
private String pool;
private Long bytesNeeded;
private Long bytesCached;
private Long filesAffected;
/**
* Builds a new PathBasedCacheDirective populated with the set properties.
* Builds a new CacheDirectiveInfo populated with the set properties.
*
* @return New PathBasedCacheDirective.
* @return New CacheDirectiveInfo.
*/
public PathBasedCacheDirective build() {
return new PathBasedCacheDirective(id, path, replication, pool,
bytesNeeded, bytesCached, filesAffected);
public CacheDirectiveInfo build() {
return new CacheDirectiveInfo(id, path, replication, pool);
}
/**
@ -59,16 +55,13 @@ public Builder() {
/**
* Creates a builder with all elements set to the same values as the
* given PathBasedCacheDirective.
* given CacheDirectiveInfo.
*/
public Builder(PathBasedCacheDirective directive) {
public Builder(CacheDirectiveInfo directive) {
this.id = directive.getId();
this.path = directive.getPath();
this.replication = directive.getReplication();
this.pool = directive.getPool();
this.bytesNeeded = directive.bytesNeeded;
this.bytesCached = directive.bytesCached;
this.filesAffected = directive.filesAffected;
}
/**
@ -104,39 +97,6 @@ public Builder setReplication(Short replication) {
return this;
}
/**
* Sets the bytes needed by this directive.
*
* @param bytesNeeded The bytes needed.
* @return This builder, for call chaining.
*/
public Builder setBytesNeeded(Long bytesNeeded) {
this.bytesNeeded = bytesNeeded;
return this;
}
/**
* Sets the bytes cached by this directive.
*
* @param bytesCached The bytes cached.
* @return This builder, for call chaining.
*/
public Builder setBytesCached(Long bytesCached) {
this.bytesCached = bytesCached;
return this;
}
/**
* Sets the files affected by this directive.
*
* @param filesAffected The files affected.
* @return This builder, for call chaining.
*/
public Builder setFilesAffected(Long filesAffected) {
this.filesAffected = filesAffected;
return this;
}
/**
* Sets the pool used in this request.
*
@ -153,19 +113,12 @@ public Builder setPool(String pool) {
private final Path path;
private final Short replication;
private final String pool;
private final Long bytesNeeded;
private final Long bytesCached;
private final Long filesAffected;
PathBasedCacheDirective(Long id, Path path, Short replication, String pool,
Long bytesNeeded, Long bytesCached, Long filesAffected) {
CacheDirectiveInfo(Long id, Path path, Short replication, String pool) {
this.id = id;
this.path = path;
this.replication = replication;
this.pool = pool;
this.bytesNeeded = bytesNeeded;
this.bytesCached = bytesCached;
this.filesAffected = filesAffected;
}
/**
@ -196,27 +149,6 @@ public String getPool() {
return pool;
}
/**
* @return The bytes needed.
*/
public Long getBytesNeeded() {
return bytesNeeded;
}
/**
* @return The bytes cached.
*/
public Long getBytesCached() {
return bytesCached;
}
/**
* @return The files affected.
*/
public Long getFilesAffected() {
return filesAffected;
}
@Override
public boolean equals(Object o) {
if (o == null) {
@ -225,7 +157,7 @@ public boolean equals(Object o) {
if (getClass() != o.getClass()) {
return false;
}
PathBasedCacheDirective other = (PathBasedCacheDirective)o;
CacheDirectiveInfo other = (CacheDirectiveInfo)o;
return new EqualsBuilder().append(getId(), other.getId()).
append(getPath(), other.getPath()).
append(getReplication(), other.getReplication()).
@ -263,18 +195,6 @@ public String toString() {
builder.append(prefix).append("pool: ").append(pool);
prefix = ",";
}
if (bytesNeeded != null) {
builder.append(prefix).append("bytesNeeded: ").append(bytesNeeded);
prefix = ",";
}
if (bytesCached != null) {
builder.append(prefix).append("bytesCached: ").append(bytesCached);
prefix = ",";
}
if (filesAffected != null) {
builder.append(prefix).append("filesAffected: ").append(filesAffected);
prefix = ",";
}
builder.append("}");
return builder.toString();
}

View File

@ -0,0 +1,125 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.protocol;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
/**
* Describes a path-based cache directive.
*/
@InterfaceStability.Evolving
@InterfaceAudience.Public
public class CacheDirectiveStats {
public static class Builder {
private long bytesNeeded;
private long bytesCached;
private long filesAffected;
/**
* Builds a new CacheDirectiveStats populated with the set properties.
*
* @return New CacheDirectiveStats.
*/
public CacheDirectiveStats build() {
return new CacheDirectiveStats(bytesNeeded, bytesCached, filesAffected);
}
/**
* Creates an empty builder.
*/
public Builder() {
}
/**
* Sets the bytes needed by this directive.
*
* @param bytesNeeded The bytes needed.
* @return This builder, for call chaining.
*/
public Builder setBytesNeeded(Long bytesNeeded) {
this.bytesNeeded = bytesNeeded;
return this;
}
/**
* Sets the bytes cached by this directive.
*
* @param bytesCached The bytes cached.
* @return This builder, for call chaining.
*/
public Builder setBytesCached(Long bytesCached) {
this.bytesCached = bytesCached;
return this;
}
/**
* Sets the files affected by this directive.
*
* @param filesAffected The files affected.
* @return This builder, for call chaining.
*/
public Builder setFilesAffected(Long filesAffected) {
this.filesAffected = filesAffected;
return this;
}
}
private final long bytesNeeded;
private final long bytesCached;
private final long filesAffected;
private CacheDirectiveStats(long bytesNeeded, long bytesCached,
long filesAffected) {
this.bytesNeeded = bytesNeeded;
this.bytesCached = bytesCached;
this.filesAffected = filesAffected;
}
/**
* @return The bytes needed.
*/
public Long getBytesNeeded() {
return bytesNeeded;
}
/**
* @return The bytes cached.
*/
public Long getBytesCached() {
return bytesCached;
}
/**
* @return The files affected.
*/
public Long getFilesAffected() {
return filesAffected;
}
@Override
public String toString() {
StringBuilder builder = new StringBuilder();
builder.append("{");
builder.append("bytesNeeded: ").append(bytesNeeded);
builder.append(", ").append("bytesCached: ").append(bytesCached);
builder.append(", ").append("filesAffected: ").append(filesAffected);
builder.append("}");
return builder.toString();
}
};

View File

@ -46,7 +46,7 @@
* This class is used in RPCs to create and modify cache pools.
* It is serializable and can be stored in the edit log.
*/
@InterfaceAudience.Private
@InterfaceAudience.Public
@InterfaceStability.Evolving
public class CachePoolInfo {
public static final Log LOG = LogFactory.getLog(CachePoolInfo.class);
@ -225,4 +225,4 @@ public static CachePoolInfo readXmlFrom(Stanza st) throws InvalidXmlException {
setMode(perm.getPermission()).
setWeight(weight);
}
}
}

View File

@ -1096,49 +1096,49 @@ public SnapshotDiffReport getSnapshotDiffReport(String snapshotRoot,
String fromSnapshot, String toSnapshot) throws IOException;
/**
* Add a PathBasedCache entry to the CacheManager.
* Add a CacheDirective to the CacheManager.
*
* @param directive A PathBasedCacheDirective to be added
* @return A PathBasedCacheDirective associated with the added directive
* @param directive A CacheDirectiveInfo to be added
* @return A CacheDirectiveInfo associated with the added directive
* @throws IOException if the directive could not be added
*/
@AtMostOnce
public long addPathBasedCacheDirective(
PathBasedCacheDirective directive) throws IOException;
public long addCacheDirective(
CacheDirectiveInfo directive) throws IOException;
/**
* Modify a PathBasedCache entry in the CacheManager.
* Modify a CacheDirective in the CacheManager.
*
* @return directive The directive to modify. Must contain
* a directive ID.
* @throws IOException if the directive could not be modified
*/
@AtMostOnce
public void modifyPathBasedCacheDirective(
PathBasedCacheDirective directive) throws IOException;
public void modifyCacheDirective(
CacheDirectiveInfo directive) throws IOException;
/**
* Remove a PathBasedCacheDirective from the CacheManager.
* Remove a CacheDirectiveInfo from the CacheManager.
*
* @param id of a PathBasedCacheDirective
* @param id of a CacheDirectiveInfo
* @throws IOException if the cache directive could not be removed
*/
@AtMostOnce
public void removePathBasedCacheDirective(long id) throws IOException;
public void removeCacheDirective(long id) throws IOException;
/**
* List the set of cached paths of a cache pool. Incrementally fetches results
* from the server.
*
* @param prevId The last listed entry ID, or -1 if this is the first call to
* listPathBasedCacheDirectives.
* listCacheDirectives.
* @param filter Parameters to use to filter the list results,
* or null to display all directives visible to us.
* @return A RemoteIterator which returns PathBasedCacheDirective objects.
* @return A RemoteIterator which returns CacheDirectiveInfo objects.
*/
@Idempotent
public RemoteIterator<PathBasedCacheDirective> listPathBasedCacheDirectives(
long prevId, PathBasedCacheDirective filter) throws IOException;
public RemoteIterator<CacheDirectiveEntry> listCacheDirectives(
long prevId, CacheDirectiveInfo filter) throws IOException;
/**
* Add a new cache pool.

View File

@ -28,6 +28,7 @@
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry;
import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.protocol.CorruptFileBlocks;
@ -35,7 +36,7 @@
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.protocol.PathBasedCacheDirective;
import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo;
import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport;
import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.AbandonBlockRequestProto;
@ -44,8 +45,8 @@
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.AddBlockResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.AddCachePoolRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.AddCachePoolResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.AddPathBasedCacheDirectiveRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.AddPathBasedCacheDirectiveResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.AddCacheDirectiveRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.AddCacheDirectiveResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.AllowSnapshotRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.AllowSnapshotResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.AppendRequestProto;
@ -106,25 +107,25 @@
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListCachePoolsResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListCorruptFileBlocksRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListCorruptFileBlocksResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListPathBasedCacheDirectivesElementProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListPathBasedCacheDirectivesRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListPathBasedCacheDirectivesResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CacheDirectiveEntryProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListCacheDirectivesRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListCacheDirectivesResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.MetaSaveRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.MetaSaveResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.MkdirsRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.MkdirsResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ModifyCachePoolRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ModifyCachePoolResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ModifyPathBasedCacheDirectiveRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ModifyPathBasedCacheDirectiveResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ModifyCacheDirectiveRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ModifyCacheDirectiveResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RecoverLeaseRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RecoverLeaseResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RefreshNodesRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RefreshNodesResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RemoveCachePoolRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RemoveCachePoolResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RemovePathBasedCacheDirectiveRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RemovePathBasedCacheDirectiveResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RemoveCacheDirectiveRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RemoveCacheDirectiveResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.Rename2RequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.Rename2ResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RenameRequestProto;
@ -1030,12 +1031,12 @@ public IsFileClosedResponseProto isFileClosed(
}
@Override
public AddPathBasedCacheDirectiveResponseProto addPathBasedCacheDirective(
RpcController controller, AddPathBasedCacheDirectiveRequestProto request)
public AddCacheDirectiveResponseProto addCacheDirective(
RpcController controller, AddCacheDirectiveRequestProto request)
throws ServiceException {
try {
return AddPathBasedCacheDirectiveResponseProto.newBuilder().
setId(server.addPathBasedCacheDirective(
return AddCacheDirectiveResponseProto.newBuilder().
setId(server.addCacheDirective(
PBHelper.convert(request.getInfo()))).build();
} catch (IOException e) {
throw new ServiceException(e);
@ -1043,26 +1044,26 @@ public AddPathBasedCacheDirectiveResponseProto addPathBasedCacheDirective(
}
@Override
public ModifyPathBasedCacheDirectiveResponseProto modifyPathBasedCacheDirective(
RpcController controller, ModifyPathBasedCacheDirectiveRequestProto request)
public ModifyCacheDirectiveResponseProto modifyCacheDirective(
RpcController controller, ModifyCacheDirectiveRequestProto request)
throws ServiceException {
try {
server.modifyPathBasedCacheDirective(
server.modifyCacheDirective(
PBHelper.convert(request.getInfo()));
return ModifyPathBasedCacheDirectiveResponseProto.newBuilder().build();
return ModifyCacheDirectiveResponseProto.newBuilder().build();
} catch (IOException e) {
throw new ServiceException(e);
}
}
@Override
public RemovePathBasedCacheDirectiveResponseProto
removePathBasedCacheDirective(RpcController controller,
RemovePathBasedCacheDirectiveRequestProto request)
public RemoveCacheDirectiveResponseProto
removeCacheDirective(RpcController controller,
RemoveCacheDirectiveRequestProto request)
throws ServiceException {
try {
server.removePathBasedCacheDirective(request.getId());
return RemovePathBasedCacheDirectiveResponseProto.
server.removeCacheDirective(request.getId());
return RemoveCacheDirectiveResponseProto.
newBuilder().build();
} catch (IOException e) {
throw new ServiceException(e);
@ -1070,28 +1071,26 @@ public ModifyPathBasedCacheDirectiveResponseProto modifyPathBasedCacheDirective(
}
@Override
public ListPathBasedCacheDirectivesResponseProto listPathBasedCacheDirectives(
RpcController controller, ListPathBasedCacheDirectivesRequestProto request)
public ListCacheDirectivesResponseProto listCacheDirectives(
RpcController controller, ListCacheDirectivesRequestProto request)
throws ServiceException {
try {
PathBasedCacheDirective filter =
CacheDirectiveInfo filter =
PBHelper.convert(request.getFilter());
RemoteIterator<PathBasedCacheDirective> iter =
server.listPathBasedCacheDirectives(request.getPrevId(), filter);
ListPathBasedCacheDirectivesResponseProto.Builder builder =
ListPathBasedCacheDirectivesResponseProto.newBuilder();
RemoteIterator<CacheDirectiveEntry> iter =
server.listCacheDirectives(request.getPrevId(), filter);
ListCacheDirectivesResponseProto.Builder builder =
ListCacheDirectivesResponseProto.newBuilder();
long prevId = 0;
while (iter.hasNext()) {
PathBasedCacheDirective directive = iter.next();
builder.addElements(
ListPathBasedCacheDirectivesElementProto.newBuilder().
setInfo(PBHelper.convert(directive)));
prevId = directive.getId();
CacheDirectiveEntry entry = iter.next();
builder.addElements(PBHelper.convert(entry));
prevId = entry.getInfo().getId();
}
if (prevId == 0) {
builder.setHasMore(false);
} else {
iter = server.listPathBasedCacheDirectives(prevId, filter);
iter = server.listCacheDirectives(prevId, filter);
builder.setHasMore(iter.hasNext());
}
return builder.build();

View File

@ -32,11 +32,11 @@
import org.apache.hadoop.fs.FsServerDefaults;
import org.apache.hadoop.fs.Options.Rename;
import org.apache.hadoop.fs.ParentNotDirectoryException;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.fs.UnresolvedLinkException;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException;
import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry;
import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.protocol.CorruptFileBlocks;
@ -51,14 +51,13 @@
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.protocol.NSQuotaExceededException;
import org.apache.hadoop.hdfs.protocol.PathBasedCacheDirective;
import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo;
import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport;
import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.AbandonBlockRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.AddBlockRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.AddCachePoolRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.AddPathBasedCacheDirectiveRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.AddPathBasedCacheDirectiveResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.AddCacheDirectiveRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.AllowSnapshotRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.AppendRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.AppendResponseProto;
@ -100,16 +99,16 @@
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListCachePoolsResponseElementProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListCachePoolsResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListCorruptFileBlocksRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListPathBasedCacheDirectivesRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListPathBasedCacheDirectivesResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListCacheDirectivesRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListCacheDirectivesResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.MetaSaveRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.MkdirsRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ModifyCachePoolRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ModifyPathBasedCacheDirectiveRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ModifyCacheDirectiveRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RecoverLeaseRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RefreshNodesRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RemoveCachePoolRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RemovePathBasedCacheDirectiveRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RemoveCacheDirectiveRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.Rename2RequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RenameRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RenameSnapshotRequestProto;
@ -146,7 +145,6 @@
import org.apache.hadoop.security.proto.SecurityProtos.RenewDelegationTokenRequestProto;
import org.apache.hadoop.security.token.Token;
import com.google.common.primitives.Shorts;
import com.google.protobuf.ByteString;
import com.google.protobuf.ServiceException;
@ -1003,11 +1001,11 @@ public SnapshotDiffReport getSnapshotDiffReport(String snapshotRoot,
}
@Override
public long addPathBasedCacheDirective(
PathBasedCacheDirective directive) throws IOException {
public long addCacheDirective(
CacheDirectiveInfo directive) throws IOException {
try {
return rpcProxy.addPathBasedCacheDirective(null,
AddPathBasedCacheDirectiveRequestProto.newBuilder().
return rpcProxy.addCacheDirective(null,
AddCacheDirectiveRequestProto.newBuilder().
setInfo(PBHelper.convert(directive)).build()).getId();
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
@ -1015,11 +1013,11 @@ public long addPathBasedCacheDirective(
}
@Override
public void modifyPathBasedCacheDirective(
PathBasedCacheDirective directive) throws IOException {
public void modifyCacheDirective(
CacheDirectiveInfo directive) throws IOException {
try {
rpcProxy.modifyPathBasedCacheDirective(null,
ModifyPathBasedCacheDirectiveRequestProto.newBuilder().
rpcProxy.modifyCacheDirective(null,
ModifyCacheDirectiveRequestProto.newBuilder().
setInfo(PBHelper.convert(directive)).build());
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
@ -1027,29 +1025,29 @@ public void modifyPathBasedCacheDirective(
}
@Override
public void removePathBasedCacheDirective(long id)
public void removeCacheDirective(long id)
throws IOException {
try {
rpcProxy.removePathBasedCacheDirective(null,
RemovePathBasedCacheDirectiveRequestProto.newBuilder().
rpcProxy.removeCacheDirective(null,
RemoveCacheDirectiveRequestProto.newBuilder().
setId(id).build());
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
}
private static class BatchedPathBasedCacheEntries
implements BatchedEntries<PathBasedCacheDirective> {
private ListPathBasedCacheDirectivesResponseProto response;
private static class BatchedCacheEntries
implements BatchedEntries<CacheDirectiveEntry> {
private ListCacheDirectivesResponseProto response;
BatchedPathBasedCacheEntries(
ListPathBasedCacheDirectivesResponseProto response) {
BatchedCacheEntries(
ListCacheDirectivesResponseProto response) {
this.response = response;
}
@Override
public PathBasedCacheDirective get(int i) {
return PBHelper.convert(response.getElements(i).getInfo());
public CacheDirectiveEntry get(int i) {
return PBHelper.convert(response.getElements(i));
}
@Override
@ -1063,46 +1061,46 @@ public boolean hasMore() {
}
}
private class PathBasedCacheEntriesIterator
extends BatchedRemoteIterator<Long, PathBasedCacheDirective> {
private final PathBasedCacheDirective filter;
private class CacheEntriesIterator
extends BatchedRemoteIterator<Long, CacheDirectiveEntry> {
private final CacheDirectiveInfo filter;
public PathBasedCacheEntriesIterator(long prevKey,
PathBasedCacheDirective filter) {
public CacheEntriesIterator(long prevKey,
CacheDirectiveInfo filter) {
super(prevKey);
this.filter = filter;
}
@Override
public BatchedEntries<PathBasedCacheDirective> makeRequest(
public BatchedEntries<CacheDirectiveEntry> makeRequest(
Long nextKey) throws IOException {
ListPathBasedCacheDirectivesResponseProto response;
ListCacheDirectivesResponseProto response;
try {
response = rpcProxy.listPathBasedCacheDirectives(null,
ListPathBasedCacheDirectivesRequestProto.newBuilder().
response = rpcProxy.listCacheDirectives(null,
ListCacheDirectivesRequestProto.newBuilder().
setPrevId(nextKey).
setFilter(PBHelper.convert(filter)).
build());
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
return new BatchedPathBasedCacheEntries(response);
return new BatchedCacheEntries(response);
}
@Override
public Long elementToPrevKey(PathBasedCacheDirective element) {
return element.getId();
public Long elementToPrevKey(CacheDirectiveEntry element) {
return element.getInfo().getId();
}
}
@Override
public RemoteIterator<PathBasedCacheDirective>
listPathBasedCacheDirectives(long prevId,
PathBasedCacheDirective filter) throws IOException {
public RemoteIterator<CacheDirectiveEntry>
listCacheDirectives(long prevId,
CacheDirectiveInfo filter) throws IOException {
if (filter == null) {
filter = new PathBasedCacheDirective.Builder().build();
filter = new CacheDirectiveInfo.Builder().build();
}
return new PathBasedCacheEntriesIterator(prevId, filter);
return new CacheEntriesIterator(prevId, filter);
}
@Override
@ -1140,11 +1138,11 @@ public void removeCachePool(String cachePoolName) throws IOException {
}
}
private static class BatchedPathDirectiveEntries
implements BatchedEntries<CachePoolInfo> {
private final ListCachePoolsResponseProto proto;
private static class BatchedCachePoolInfo
implements BatchedEntries<CachePoolInfo> {
private final ListCachePoolsResponseProto proto;
public BatchedPathDirectiveEntries(ListCachePoolsResponseProto proto) {
public BatchedCachePoolInfo(ListCachePoolsResponseProto proto) {
this.proto = proto;
}
@ -1176,7 +1174,7 @@ public CachePoolIterator(String prevKey) {
public BatchedEntries<CachePoolInfo> makeRequest(String prevKey)
throws IOException {
try {
return new BatchedPathDirectiveEntries(
return new BatchedCachePoolInfo(
rpcProxy.listCachePools(null,
ListCachePoolsRequestProto.newBuilder().
setPrevPoolName(prevKey).build()));

View File

@ -36,12 +36,14 @@
import org.apache.hadoop.ha.proto.HAServiceProtocolProtos;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry;
import org.apache.hadoop.hdfs.protocol.CacheDirectiveStats;
import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.protocol.CorruptFileBlocks;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.PathBasedCacheDirective;
import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo;
import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo.AdminStates;
import org.apache.hadoop.hdfs.protocol.DirectoryListing;
@ -56,12 +58,14 @@
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CacheDirectiveEntryProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CacheDirectiveStatsProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CachePoolInfoProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CreateFlagProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.DatanodeReportTypeProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetFsStatsResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SafeModeActionProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.PathBasedCacheDirectiveInfoProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CacheDirectiveInfoProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BalancerBandwidthCommandProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockCommandProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockIdCommandProto;
@ -1567,38 +1571,29 @@ public static DataChecksum.Type convert(HdfsProtos.ChecksumTypeProto type) {
return DataChecksum.Type.valueOf(type.getNumber());
}
public static PathBasedCacheDirectiveInfoProto convert
(PathBasedCacheDirective directive) {
PathBasedCacheDirectiveInfoProto.Builder builder =
PathBasedCacheDirectiveInfoProto.newBuilder();
if (directive.getId() != null) {
builder.setId(directive.getId());
public static CacheDirectiveInfoProto convert
(CacheDirectiveInfo info) {
CacheDirectiveInfoProto.Builder builder =
CacheDirectiveInfoProto.newBuilder();
if (info.getId() != null) {
builder.setId(info.getId());
}
if (directive.getPath() != null) {
builder.setPath(directive.getPath().toUri().getPath());
if (info.getPath() != null) {
builder.setPath(info.getPath().toUri().getPath());
}
if (directive.getReplication() != null) {
builder.setReplication(directive.getReplication());
if (info.getReplication() != null) {
builder.setReplication(info.getReplication());
}
if (directive.getPool() != null) {
builder.setPool(directive.getPool());
}
if (directive.getBytesNeeded() != null) {
builder.setBytesNeeded(directive.getBytesNeeded());
}
if (directive.getBytesCached() != null) {
builder.setBytesCached(directive.getBytesCached());
}
if (directive.getFilesAffected() != null) {
builder.setFilesAffected(directive.getFilesAffected());
if (info.getPool() != null) {
builder.setPool(info.getPool());
}
return builder.build();
}
public static PathBasedCacheDirective convert
(PathBasedCacheDirectiveInfoProto proto) {
PathBasedCacheDirective.Builder builder =
new PathBasedCacheDirective.Builder();
public static CacheDirectiveInfo convert
(CacheDirectiveInfoProto proto) {
CacheDirectiveInfo.Builder builder =
new CacheDirectiveInfo.Builder();
if (proto.hasId()) {
builder.setId(proto.getId());
}
@ -1612,18 +1607,40 @@ public static DataChecksum.Type convert(HdfsProtos.ChecksumTypeProto type) {
if (proto.hasPool()) {
builder.setPool(proto.getPool());
}
if (proto.hasBytesNeeded()) {
builder.setBytesNeeded(proto.getBytesNeeded());
}
if (proto.hasBytesCached()) {
builder.setBytesCached(proto.getBytesCached());
}
if (proto.hasFilesAffected()) {
builder.setFilesAffected(proto.getFilesAffected());
}
return builder.build();
}
public static CacheDirectiveStatsProto convert(CacheDirectiveStats stats) {
CacheDirectiveStatsProto.Builder builder =
CacheDirectiveStatsProto.newBuilder();
builder.setBytesNeeded(stats.getBytesNeeded());
builder.setBytesCached(stats.getBytesCached());
builder.setFilesAffected(stats.getFilesAffected());
return builder.build();
}
public static CacheDirectiveStats convert(CacheDirectiveStatsProto proto) {
CacheDirectiveStats.Builder builder = new CacheDirectiveStats.Builder();
builder.setBytesNeeded(proto.getBytesNeeded());
builder.setBytesCached(proto.getBytesCached());
builder.setFilesAffected(proto.getFilesAffected());
return builder.build();
}
public static CacheDirectiveEntryProto convert(CacheDirectiveEntry entry) {
CacheDirectiveEntryProto.Builder builder =
CacheDirectiveEntryProto.newBuilder();
builder.setInfo(PBHelper.convert(entry.getInfo()));
builder.setStats(PBHelper.convert(entry.getStats()));
return builder.build();
}
public static CacheDirectiveEntry convert(CacheDirectiveEntryProto proto) {
CacheDirectiveInfo info = PBHelper.convert(proto.getInfo());
CacheDirectiveStats stats = PBHelper.convert(proto.getStats());
return new CacheDirectiveEntry(info, stats);
}
public static CachePoolInfoProto convert(CachePoolInfo info) {
CachePoolInfoProto.Builder builder = CachePoolInfoProto.newBuilder();
builder.setPoolName(info.getPoolName());

View File

@ -32,7 +32,7 @@
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.fs.UnresolvedLinkException;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.PathBasedCacheEntry;
import org.apache.hadoop.hdfs.protocol.CacheDirective;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor.CachedBlocksList.Type;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
import org.apache.hadoop.hdfs.server.namenode.CacheManager;
@ -197,7 +197,7 @@ private void rescan() {
scannedBlocks = 0;
namesystem.writeLock();
try {
rescanPathBasedCacheEntries();
rescanCacheDirectives();
rescanCachedBlockMap();
blockManager.getDatanodeManager().resetLastCachingDirectiveSentTime();
} finally {
@ -206,14 +206,14 @@ private void rescan() {
}
/**
* Scan all PathBasedCacheEntries. Use the information to figure out
* Scan all CacheDirectives. Use the information to figure out
* what cache replication factor each block should have.
*
* @param mark Whether the current scan is setting or clearing the mark
*/
private void rescanPathBasedCacheEntries() {
private void rescanCacheDirectives() {
FSDirectory fsDir = namesystem.getFSDirectory();
for (PathBasedCacheEntry pce : cacheManager.getEntriesById().values()) {
for (CacheDirective pce : cacheManager.getEntriesById().values()) {
scannedDirectives++;
pce.clearBytesNeeded();
pce.clearBytesCached();
@ -250,12 +250,12 @@ private void rescanPathBasedCacheEntries() {
}
/**
* Apply a PathBasedCacheEntry to a file.
* Apply a CacheDirective to a file.
*
* @param pce The PathBasedCacheEntry to apply.
* @param pce The CacheDirective to apply.
* @param file The file.
*/
private void rescanFile(PathBasedCacheEntry pce, INodeFile file) {
private void rescanFile(CacheDirective pce, INodeFile file) {
pce.incrementFilesAffected();
BlockInfo[] blockInfos = file.getBlocks();
long cachedTotal = 0;
@ -292,7 +292,7 @@ private void rescanFile(PathBasedCacheEntry pce, INodeFile file) {
ocblock.setReplicationAndMark(pce.getReplication(), mark);
} else {
// Mark already set in this scan. Set replication to highest value in
// any PathBasedCacheEntry that covers this file.
// any CacheDirective that covers this file.
ocblock.setReplicationAndMark((short)Math.max(
pce.getReplication(), ocblock.getReplication()), mark);
}

View File

@ -48,11 +48,12 @@
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry;
import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.PathBasedCacheDirective;
import org.apache.hadoop.hdfs.protocol.PathBasedCacheEntry;
import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo;
import org.apache.hadoop.hdfs.protocol.CacheDirective;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
import org.apache.hadoop.hdfs.server.blockmanagement.CacheReplicationMonitor;
@ -100,11 +101,11 @@ public final class CacheManager {
/**
* Cache entries, sorted by ID.
*
* listPathBasedCacheDirectives relies on the ordering of elements in this map
* listCacheDirectives relies on the ordering of elements in this map
* to track what has already been listed by the client.
*/
private final TreeMap<Long, PathBasedCacheEntry> entriesById =
new TreeMap<Long, PathBasedCacheEntry>();
private final TreeMap<Long, CacheDirective> entriesById =
new TreeMap<Long, CacheDirective>();
/**
* The entry ID to use for a new entry. Entry IDs always increase, and are
@ -115,8 +116,8 @@ public final class CacheManager {
/**
* Cache entries, sorted by path
*/
private final TreeMap<String, List<PathBasedCacheEntry>> entriesByPath =
new TreeMap<String, List<PathBasedCacheEntry>>();
private final TreeMap<String, List<CacheDirective>> entriesByPath =
new TreeMap<String, List<CacheDirective>>();
/**
* Cache pools, sorted by name.
@ -236,7 +237,7 @@ public boolean isActive() {
return active;
}
public TreeMap<Long, PathBasedCacheEntry> getEntriesById() {
public TreeMap<Long, CacheDirective> getEntriesById() {
assert namesystem.hasReadLock();
return entriesById;
}
@ -264,7 +265,7 @@ private static void checkWritePermission(FSPermissionChecker pc,
}
}
private static String validatePoolName(PathBasedCacheDirective directive)
private static String validatePoolName(CacheDirectiveInfo directive)
throws InvalidRequestException {
String pool = directive.getPool();
if (pool == null) {
@ -276,7 +277,7 @@ private static String validatePoolName(PathBasedCacheDirective directive)
return pool;
}
private static String validatePath(PathBasedCacheDirective directive)
private static String validatePath(CacheDirectiveInfo directive)
throws InvalidRequestException {
if (directive.getPath() == null) {
throw new InvalidRequestException("No path specified.");
@ -288,7 +289,7 @@ private static String validatePath(PathBasedCacheDirective directive)
return path;
}
private static short validateReplication(PathBasedCacheDirective directive,
private static short validateReplication(CacheDirectiveInfo directive,
short defaultValue) throws InvalidRequestException {
short repl = (directive.getReplication() != null)
? directive.getReplication() : defaultValue;
@ -300,16 +301,16 @@ private static short validateReplication(PathBasedCacheDirective directive,
}
/**
* Get a PathBasedCacheEntry by ID, validating the ID and that the entry
* Get a CacheDirective by ID, validating the ID and that the entry
* exists.
*/
private PathBasedCacheEntry getById(long id) throws InvalidRequestException {
private CacheDirective getById(long id) throws InvalidRequestException {
// Check for invalid IDs.
if (id <= 0) {
throw new InvalidRequestException("Invalid negative ID.");
}
// Find the entry.
PathBasedCacheEntry entry = entriesById.get(id);
CacheDirective entry = entriesById.get(id);
if (entry == null) {
throw new InvalidRequestException("No directive with ID " + id
+ " found.");
@ -331,22 +332,22 @@ private CachePool getCachePool(String poolName)
// RPC handlers
private void addInternal(PathBasedCacheEntry entry) {
private void addInternal(CacheDirective entry) {
entriesById.put(entry.getEntryId(), entry);
String path = entry.getPath();
List<PathBasedCacheEntry> entryList = entriesByPath.get(path);
List<CacheDirective> entryList = entriesByPath.get(path);
if (entryList == null) {
entryList = new ArrayList<PathBasedCacheEntry>(1);
entryList = new ArrayList<CacheDirective>(1);
entriesByPath.put(path, entryList);
}
entryList.add(entry);
}
public PathBasedCacheDirective addDirective(
PathBasedCacheDirective directive, FSPermissionChecker pc)
public CacheDirectiveInfo addDirective(
CacheDirectiveInfo directive, FSPermissionChecker pc)
throws IOException {
assert namesystem.hasWriteLock();
PathBasedCacheEntry entry;
CacheDirective entry;
try {
CachePool pool = getCachePool(validatePoolName(directive));
checkWritePermission(pc, pool);
@ -372,7 +373,7 @@ public PathBasedCacheDirective addDirective(
// Add a new entry with the next available ID.
id = getNextEntryId();
}
entry = new PathBasedCacheEntry(id, path, replication, pool);
entry = new CacheDirective(id, path, replication, pool);
addInternal(entry);
} catch (IOException e) {
LOG.warn("addDirective of " + directive + " failed: ", e);
@ -385,7 +386,7 @@ public PathBasedCacheDirective addDirective(
return entry.toDirective();
}
public void modifyDirective(PathBasedCacheDirective directive,
public void modifyDirective(CacheDirectiveInfo directive,
FSPermissionChecker pc) throws IOException {
assert namesystem.hasWriteLock();
String idString =
@ -397,7 +398,7 @@ public void modifyDirective(PathBasedCacheDirective directive,
if (id == null) {
throw new InvalidRequestException("Must supply an ID.");
}
PathBasedCacheEntry prevEntry = getById(id);
CacheDirective prevEntry = getById(id);
checkWritePermission(pc, prevEntry.getPool());
String path = prevEntry.getPath();
if (directive.getPath() != null) {
@ -413,8 +414,8 @@ public void modifyDirective(PathBasedCacheDirective directive,
checkWritePermission(pc, pool);
}
removeInternal(prevEntry);
PathBasedCacheEntry newEntry =
new PathBasedCacheEntry(id, path, replication, pool);
CacheDirective newEntry =
new CacheDirective(id, path, replication, pool);
addInternal(newEntry);
} catch (IOException e) {
LOG.warn("modifyDirective of " + idString + " failed: ", e);
@ -424,12 +425,12 @@ public void modifyDirective(PathBasedCacheDirective directive,
directive + ".");
}
public void removeInternal(PathBasedCacheEntry existing)
public void removeInternal(CacheDirective existing)
throws InvalidRequestException {
assert namesystem.hasWriteLock();
// Remove the corresponding entry in entriesByPath.
String path = existing.getPath();
List<PathBasedCacheEntry> entries = entriesByPath.get(path);
List<CacheDirective> entries = entriesByPath.get(path);
if (entries == null || !entries.remove(existing)) {
throw new InvalidRequestException("Failed to locate entry " +
existing.getEntryId() + " by path " + existing.getPath());
@ -444,7 +445,7 @@ public void removeDirective(long id, FSPermissionChecker pc)
throws IOException {
assert namesystem.hasWriteLock();
try {
PathBasedCacheEntry existing = getById(id);
CacheDirective existing = getById(id);
checkWritePermission(pc, existing.getPool());
removeInternal(existing);
} catch (IOException e) {
@ -457,9 +458,9 @@ public void removeDirective(long id, FSPermissionChecker pc)
LOG.info("removeDirective of " + id + " successful.");
}
public BatchedListEntries<PathBasedCacheDirective>
listPathBasedCacheDirectives(long prevId,
PathBasedCacheDirective filter,
public BatchedListEntries<CacheDirectiveEntry>
listCacheDirectives(long prevId,
CacheDirectiveInfo filter,
FSPermissionChecker pc) throws IOException {
assert namesystem.hasReadLock();
final int NUM_PRE_ALLOCATED_ENTRIES = 16;
@ -473,23 +474,23 @@ public void removeDirective(long id, FSPermissionChecker pc)
if (filter.getReplication() != null) {
throw new IOException("Filtering by replication is unsupported.");
}
ArrayList<PathBasedCacheDirective> replies =
new ArrayList<PathBasedCacheDirective>(NUM_PRE_ALLOCATED_ENTRIES);
ArrayList<CacheDirectiveEntry> replies =
new ArrayList<CacheDirectiveEntry>(NUM_PRE_ALLOCATED_ENTRIES);
int numReplies = 0;
SortedMap<Long, PathBasedCacheEntry> tailMap =
SortedMap<Long, CacheDirective> tailMap =
entriesById.tailMap(prevId + 1);
for (Entry<Long, PathBasedCacheEntry> cur : tailMap.entrySet()) {
for (Entry<Long, CacheDirective> cur : tailMap.entrySet()) {
if (numReplies >= maxListCacheDirectivesNumResponses) {
return new BatchedListEntries<PathBasedCacheDirective>(replies, true);
return new BatchedListEntries<CacheDirectiveEntry>(replies, true);
}
PathBasedCacheEntry curEntry = cur.getValue();
PathBasedCacheDirective directive = cur.getValue().toDirective();
CacheDirective curEntry = cur.getValue();
CacheDirectiveInfo info = cur.getValue().toDirective();
if (filter.getPool() != null &&
!directive.getPool().equals(filter.getPool())) {
!info.getPool().equals(filter.getPool())) {
continue;
}
if (filterPath != null &&
!directive.getPath().toUri().getPath().equals(filterPath)) {
!info.getPath().toUri().getPath().equals(filterPath)) {
continue;
}
boolean hasPermission = true;
@ -501,11 +502,11 @@ public void removeDirective(long id, FSPermissionChecker pc)
}
}
if (hasPermission) {
replies.add(cur.getValue().toDirective());
replies.add(new CacheDirectiveEntry(info, cur.getValue().toStats()));
numReplies++;
}
}
return new BatchedListEntries<PathBasedCacheDirective>(replies, false);
return new BatchedListEntries<CacheDirectiveEntry>(replies, false);
}
/**
@ -602,10 +603,10 @@ public void removeCachePool(String poolName)
// Remove entries using this pool
// TODO: could optimize this somewhat to avoid the need to iterate
// over all entries in entriesById
Iterator<Entry<Long, PathBasedCacheEntry>> iter =
Iterator<Entry<Long, CacheDirective>> iter =
entriesById.entrySet().iterator();
while (iter.hasNext()) {
Entry<Long, PathBasedCacheEntry> entry = iter.next();
Entry<Long, CacheDirective> entry = iter.next();
if (entry.getValue().getPool() == pool) {
entriesByPath.remove(entry.getValue().getPath());
iter.remove();
@ -789,7 +790,7 @@ private void saveEntries(DataOutput out, String sdPath)
prog.setTotal(Phase.SAVING_CHECKPOINT, step, entriesById.size());
Counter counter = prog.getCounter(Phase.SAVING_CHECKPOINT, step);
out.writeInt(entriesById.size());
for (PathBasedCacheEntry entry: entriesById.values()) {
for (CacheDirective entry: entriesById.values()) {
out.writeLong(entry.getEntryId());
Text.writeString(out, entry.getPath());
out.writeShort(entry.getReplication());
@ -838,15 +839,15 @@ private void loadEntries(DataInput in) throws IOException {
throw new IOException("Entry refers to pool " + poolName +
", which does not exist.");
}
PathBasedCacheEntry entry =
new PathBasedCacheEntry(entryId, path, replication, pool);
CacheDirective entry =
new CacheDirective(entryId, path, replication, pool);
if (entriesById.put(entry.getEntryId(), entry) != null) {
throw new IOException("An entry with ID " + entry.getEntryId() +
" already exists");
}
List<PathBasedCacheEntry> entries = entriesByPath.get(entry.getPath());
List<CacheDirective> entries = entriesByPath.get(entry.getPath());
if (entries == null) {
entries = new LinkedList<PathBasedCacheEntry>();
entries = new LinkedList<CacheDirective>();
entriesByPath.put(entry.getPath(), entries);
}
entries.add(entry);

View File

@ -38,15 +38,15 @@
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.PathBasedCacheDirective;
import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole;
import org.apache.hadoop.hdfs.server.common.Storage.FormatConfirmable;
import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.AddCachePoolOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.AddOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.AddPathBasedCacheDirectiveOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.ModifyPathBasedCacheDirectiveOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.AddCacheDirectiveInfoOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.ModifyCacheDirectiveInfoOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.AllocateBlockIdOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.AllowSnapshotOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.CancelDelegationTokenOp;
@ -63,7 +63,7 @@
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.OpInstanceCache;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.ReassignLeaseOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RemoveCachePoolOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RemovePathBasedCacheDirectiveOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RemoveCacheDirectiveInfoOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RenameOldOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RenameOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RenameSnapshotOp;
@ -954,27 +954,27 @@ void logDisallowSnapshot(String path) {
logEdit(op);
}
void logAddPathBasedCacheDirective(PathBasedCacheDirective directive,
void logAddCacheDirectiveInfo(CacheDirectiveInfo directive,
boolean toLogRpcIds) {
AddPathBasedCacheDirectiveOp op =
AddPathBasedCacheDirectiveOp.getInstance(cache.get())
AddCacheDirectiveInfoOp op =
AddCacheDirectiveInfoOp.getInstance(cache.get())
.setDirective(directive);
logRpcIds(op, toLogRpcIds);
logEdit(op);
}
void logModifyPathBasedCacheDirective(
PathBasedCacheDirective directive, boolean toLogRpcIds) {
ModifyPathBasedCacheDirectiveOp op =
ModifyPathBasedCacheDirectiveOp.getInstance(
void logModifyCacheDirectiveInfo(
CacheDirectiveInfo directive, boolean toLogRpcIds) {
ModifyCacheDirectiveInfoOp op =
ModifyCacheDirectiveInfoOp.getInstance(
cache.get()).setDirective(directive);
logRpcIds(op, toLogRpcIds);
logEdit(op);
}
void logRemovePathBasedCacheDirective(Long id, boolean toLogRpcIds) {
RemovePathBasedCacheDirectiveOp op =
RemovePathBasedCacheDirectiveOp.getInstance(cache.get()).setId(id);
void logRemoveCacheDirectiveInfo(Long id, boolean toLogRpcIds) {
RemoveCacheDirectiveInfoOp op =
RemoveCacheDirectiveInfoOp.getInstance(cache.get()).setId(id);
logRpcIds(op, toLogRpcIds);
logEdit(op);
}

View File

@ -36,13 +36,13 @@
import org.apache.hadoop.hdfs.protocol.LayoutVersion;
import org.apache.hadoop.hdfs.protocol.LayoutVersion.Feature;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.PathBasedCacheDirective;
import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoUnderConstruction;
import org.apache.hadoop.hdfs.server.common.Storage;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.AddCachePoolOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.AddCloseOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.AddPathBasedCacheDirectiveOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.AddCacheDirectiveInfoOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.AllocateBlockIdOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.AllowSnapshotOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.BlockListUpdatingOp;
@ -56,10 +56,10 @@
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.GetDelegationTokenOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.MkdirOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.ModifyCachePoolOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.ModifyPathBasedCacheDirectiveOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.ModifyCacheDirectiveInfoOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.ReassignLeaseOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RemoveCachePoolOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RemovePathBasedCacheDirectiveOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RemoveCacheDirectiveInfoOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RenameOldOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RenameOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RenameSnapshotOp;
@ -639,8 +639,8 @@ private long applyEditLogOp(FSEditLogOp op, FSDirectory fsDir,
break;
}
case OP_ADD_PATH_BASED_CACHE_DIRECTIVE: {
AddPathBasedCacheDirectiveOp addOp = (AddPathBasedCacheDirectiveOp) op;
PathBasedCacheDirective result = fsNamesys.
AddCacheDirectiveInfoOp addOp = (AddCacheDirectiveInfoOp) op;
CacheDirectiveInfo result = fsNamesys.
getCacheManager().addDirective(addOp.directive, null);
if (toAddRetryCache) {
Long id = result.getId();
@ -649,8 +649,8 @@ private long applyEditLogOp(FSEditLogOp op, FSDirectory fsDir,
break;
}
case OP_MODIFY_PATH_BASED_CACHE_DIRECTIVE: {
ModifyPathBasedCacheDirectiveOp modifyOp =
(ModifyPathBasedCacheDirectiveOp) op;
ModifyCacheDirectiveInfoOp modifyOp =
(ModifyCacheDirectiveInfoOp) op;
fsNamesys.getCacheManager().modifyDirective(
modifyOp.directive, null);
if (toAddRetryCache) {
@ -659,8 +659,8 @@ private long applyEditLogOp(FSEditLogOp op, FSDirectory fsDir,
break;
}
case OP_REMOVE_PATH_BASED_CACHE_DIRECTIVE: {
RemovePathBasedCacheDirectiveOp removeOp =
(RemovePathBasedCacheDirectiveOp) op;
RemoveCacheDirectiveInfoOp removeOp =
(RemoveCacheDirectiveInfoOp) op;
fsNamesys.getCacheManager().removeDirective(removeOp.id, null);
if (toAddRetryCache) {
fsNamesys.addCacheEntry(op.rpcClientId, op.rpcCallId);

View File

@ -86,7 +86,7 @@
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.LayoutVersion;
import org.apache.hadoop.hdfs.protocol.LayoutVersion.Feature;
import org.apache.hadoop.hdfs.protocol.PathBasedCacheDirective;
import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
import org.apache.hadoop.hdfs.util.XMLUtils;
import org.apache.hadoop.hdfs.util.XMLUtils.InvalidXmlException;
@ -166,11 +166,11 @@ public OpInstanceCache() {
inst.put(OP_SET_GENSTAMP_V2, new SetGenstampV2Op());
inst.put(OP_ALLOCATE_BLOCK_ID, new AllocateBlockIdOp());
inst.put(OP_ADD_PATH_BASED_CACHE_DIRECTIVE,
new AddPathBasedCacheDirectiveOp());
new AddCacheDirectiveInfoOp());
inst.put(OP_MODIFY_PATH_BASED_CACHE_DIRECTIVE,
new ModifyPathBasedCacheDirectiveOp());
new ModifyCacheDirectiveInfoOp());
inst.put(OP_REMOVE_PATH_BASED_CACHE_DIRECTIVE,
new RemovePathBasedCacheDirectiveOp());
new RemoveCacheDirectiveInfoOp());
inst.put(OP_ADD_CACHE_POOL, new AddCachePoolOp());
inst.put(OP_MODIFY_CACHE_POOL, new ModifyCachePoolOp());
inst.put(OP_REMOVE_CACHE_POOL, new RemoveCachePoolOp());
@ -2868,22 +2868,22 @@ public String toString() {
/**
* {@literal @AtMostOnce} for
* {@link ClientProtocol#addPathBasedCacheDirective}
* {@link ClientProtocol#addCacheDirective}
*/
static class AddPathBasedCacheDirectiveOp extends FSEditLogOp {
PathBasedCacheDirective directive;
static class AddCacheDirectiveInfoOp extends FSEditLogOp {
CacheDirectiveInfo directive;
public AddPathBasedCacheDirectiveOp() {
public AddCacheDirectiveInfoOp() {
super(OP_ADD_PATH_BASED_CACHE_DIRECTIVE);
}
static AddPathBasedCacheDirectiveOp getInstance(OpInstanceCache cache) {
return (AddPathBasedCacheDirectiveOp) cache
static AddCacheDirectiveInfoOp getInstance(OpInstanceCache cache) {
return (AddCacheDirectiveInfoOp) cache
.get(OP_ADD_PATH_BASED_CACHE_DIRECTIVE);
}
public AddPathBasedCacheDirectiveOp setDirective(
PathBasedCacheDirective directive) {
public AddCacheDirectiveInfoOp setDirective(
CacheDirectiveInfo directive) {
this.directive = directive;
assert(directive.getId() != null);
assert(directive.getPath() != null);
@ -2898,7 +2898,7 @@ void readFields(DataInputStream in, int logVersion) throws IOException {
String path = FSImageSerialization.readString(in);
short replication = FSImageSerialization.readShort(in);
String pool = FSImageSerialization.readString(in);
directive = new PathBasedCacheDirective.Builder().
directive = new CacheDirectiveInfo.Builder().
setId(id).
setPath(new Path(path)).
setReplication(replication).
@ -2930,7 +2930,7 @@ protected void toXml(ContentHandler contentHandler) throws SAXException {
@Override
void fromXml(Stanza st) throws InvalidXmlException {
directive = new PathBasedCacheDirective.Builder().
directive = new CacheDirectiveInfo.Builder().
setId(Long.parseLong(st.getValue("ID"))).
setPath(new Path(st.getValue("PATH"))).
setReplication(Short.parseShort(st.getValue("REPLICATION"))).
@ -2942,7 +2942,7 @@ void fromXml(Stanza st) throws InvalidXmlException {
@Override
public String toString() {
StringBuilder builder = new StringBuilder();
builder.append("AddPathBasedCacheDirective [");
builder.append("AddCacheDirectiveInfo [");
builder.append("id=" + directive.getId() + ",");
builder.append("path=" + directive.getPath().toUri().getPath() + ",");
builder.append("replication=" + directive.getReplication() + ",");
@ -2955,22 +2955,22 @@ public String toString() {
/**
* {@literal @AtMostOnce} for
* {@link ClientProtocol#modifyPathBasedCacheDirective}
* {@link ClientProtocol#modifyCacheDirective}
*/
static class ModifyPathBasedCacheDirectiveOp extends FSEditLogOp {
PathBasedCacheDirective directive;
static class ModifyCacheDirectiveInfoOp extends FSEditLogOp {
CacheDirectiveInfo directive;
public ModifyPathBasedCacheDirectiveOp() {
public ModifyCacheDirectiveInfoOp() {
super(OP_MODIFY_PATH_BASED_CACHE_DIRECTIVE);
}
static ModifyPathBasedCacheDirectiveOp getInstance(OpInstanceCache cache) {
return (ModifyPathBasedCacheDirectiveOp) cache
static ModifyCacheDirectiveInfoOp getInstance(OpInstanceCache cache) {
return (ModifyCacheDirectiveInfoOp) cache
.get(OP_MODIFY_PATH_BASED_CACHE_DIRECTIVE);
}
public ModifyPathBasedCacheDirectiveOp setDirective(
PathBasedCacheDirective directive) {
public ModifyCacheDirectiveInfoOp setDirective(
CacheDirectiveInfo directive) {
this.directive = directive;
assert(directive.getId() != null);
return this;
@ -2978,8 +2978,8 @@ public ModifyPathBasedCacheDirectiveOp setDirective(
@Override
void readFields(DataInputStream in, int logVersion) throws IOException {
PathBasedCacheDirective.Builder builder =
new PathBasedCacheDirective.Builder();
CacheDirectiveInfo.Builder builder =
new CacheDirectiveInfo.Builder();
builder.setId(FSImageSerialization.readLong(in));
byte flags = in.readByte();
if ((flags & 0x1) != 0) {
@ -2993,7 +2993,7 @@ void readFields(DataInputStream in, int logVersion) throws IOException {
}
if ((flags & ~0x7) != 0) {
throw new IOException("unknown flags set in " +
"ModifyPathBasedCacheDirectiveOp: " + flags);
"ModifyCacheDirectiveInfoOp: " + flags);
}
this.directive = builder.build();
readRpcIds(in, logVersion);
@ -3041,8 +3041,8 @@ protected void toXml(ContentHandler contentHandler) throws SAXException {
@Override
void fromXml(Stanza st) throws InvalidXmlException {
PathBasedCacheDirective.Builder builder =
new PathBasedCacheDirective.Builder();
CacheDirectiveInfo.Builder builder =
new CacheDirectiveInfo.Builder();
builder.setId(Long.parseLong(st.getValue("ID")));
String path = st.getValueOrNull("PATH");
if (path != null) {
@ -3063,7 +3063,7 @@ void fromXml(Stanza st) throws InvalidXmlException {
@Override
public String toString() {
StringBuilder builder = new StringBuilder();
builder.append("ModifyPathBasedCacheDirectiveOp[");
builder.append("ModifyCacheDirectiveInfoOp[");
builder.append("id=").append(directive.getId());
if (directive.getPath() != null) {
builder.append(",").append("path=").append(directive.getPath());
@ -3083,21 +3083,21 @@ public String toString() {
/**
* {@literal @AtMostOnce} for
* {@link ClientProtocol#removePathBasedCacheDirective}
* {@link ClientProtocol#removeCacheDirective}
*/
static class RemovePathBasedCacheDirectiveOp extends FSEditLogOp {
static class RemoveCacheDirectiveInfoOp extends FSEditLogOp {
long id;
public RemovePathBasedCacheDirectiveOp() {
public RemoveCacheDirectiveInfoOp() {
super(OP_REMOVE_PATH_BASED_CACHE_DIRECTIVE);
}
static RemovePathBasedCacheDirectiveOp getInstance(OpInstanceCache cache) {
return (RemovePathBasedCacheDirectiveOp) cache
static RemoveCacheDirectiveInfoOp getInstance(OpInstanceCache cache) {
return (RemoveCacheDirectiveInfoOp) cache
.get(OP_REMOVE_PATH_BASED_CACHE_DIRECTIVE);
}
public RemovePathBasedCacheDirectiveOp setId(long id) {
public RemoveCacheDirectiveInfoOp setId(long id) {
this.id = id;
return this;
}
@ -3129,7 +3129,7 @@ void fromXml(Stanza st) throws InvalidXmlException {
@Override
public String toString() {
StringBuilder builder = new StringBuilder();
builder.append("RemovePathBasedCacheDirective [");
builder.append("RemoveCacheDirectiveInfo [");
builder.append("id=" + Long.toString(id));
appendRpcIdsToString(builder, rpcClientId, rpcCallId);
builder.append("]");

View File

@ -151,7 +151,8 @@
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.PathBasedCacheDirective;
import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry;
import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo;
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
@ -7064,8 +7065,8 @@ void removeSnapshottableDirs(List<INodeDirectorySnapshottable> toRemove) {
}
}
long addPathBasedCacheDirective(
PathBasedCacheDirective directive) throws IOException {
long addCacheDirective(
CacheDirectiveInfo directive) throws IOException {
checkOperation(OperationCategory.WRITE);
final FSPermissionChecker pc = isPermissionEnabled ?
getPermissionChecker() : null;
@ -7081,15 +7082,15 @@ long addPathBasedCacheDirective(
checkOperation(OperationCategory.WRITE);
if (isInSafeMode()) {
throw new SafeModeException(
"Cannot add PathBasedCache directive", safeMode);
"Cannot add cache directive", safeMode);
}
if (directive.getId() != null) {
throw new IOException("addDirective: you cannot specify an ID " +
"for this operation.");
}
PathBasedCacheDirective effectiveDirective =
CacheDirectiveInfo effectiveDirective =
cacheManager.addDirective(directive, pc);
getEditLog().logAddPathBasedCacheDirective(effectiveDirective,
getEditLog().logAddCacheDirectiveInfo(effectiveDirective,
cacheEntry != null);
result = effectiveDirective.getId();
success = true;
@ -7099,15 +7100,15 @@ long addPathBasedCacheDirective(
getEditLog().logSync();
}
if (isAuditEnabled() && isExternalInvocation()) {
logAuditEvent(success, "addPathBasedCacheDirective", null, null, null);
logAuditEvent(success, "addCacheDirective", null, null, null);
}
RetryCache.setState(cacheEntry, success, result);
}
return result;
}
void modifyPathBasedCacheDirective(
PathBasedCacheDirective directive) throws IOException {
void modifyCacheDirective(
CacheDirectiveInfo directive) throws IOException {
checkOperation(OperationCategory.WRITE);
final FSPermissionChecker pc = isPermissionEnabled ?
getPermissionChecker() : null;
@ -7121,10 +7122,10 @@ void modifyPathBasedCacheDirective(
checkOperation(OperationCategory.WRITE);
if (isInSafeMode()) {
throw new SafeModeException(
"Cannot add PathBasedCache directive", safeMode);
"Cannot add cache directive", safeMode);
}
cacheManager.modifyDirective(directive, pc);
getEditLog().logModifyPathBasedCacheDirective(directive,
getEditLog().logModifyCacheDirectiveInfo(directive,
cacheEntry != null);
success = true;
} finally {
@ -7133,13 +7134,13 @@ void modifyPathBasedCacheDirective(
getEditLog().logSync();
}
if (isAuditEnabled() && isExternalInvocation()) {
logAuditEvent(success, "addPathBasedCacheDirective", null, null, null);
logAuditEvent(success, "addCacheDirective", null, null, null);
}
RetryCache.setState(cacheEntry, success);
}
}
void removePathBasedCacheDirective(Long id) throws IOException {
void removeCacheDirective(Long id) throws IOException {
checkOperation(OperationCategory.WRITE);
final FSPermissionChecker pc = isPermissionEnabled ?
getPermissionChecker() : null;
@ -7153,15 +7154,15 @@ void removePathBasedCacheDirective(Long id) throws IOException {
checkOperation(OperationCategory.WRITE);
if (isInSafeMode()) {
throw new SafeModeException(
"Cannot remove PathBasedCache directives", safeMode);
"Cannot remove cache directives", safeMode);
}
cacheManager.removeDirective(id, pc);
getEditLog().logRemovePathBasedCacheDirective(id, cacheEntry != null);
getEditLog().logRemoveCacheDirectiveInfo(id, cacheEntry != null);
success = true;
} finally {
writeUnlock();
if (isAuditEnabled() && isExternalInvocation()) {
logAuditEvent(success, "removePathBasedCacheDirective", null, null,
logAuditEvent(success, "removeCacheDirective", null, null,
null);
}
RetryCache.setState(cacheEntry, success);
@ -7169,23 +7170,23 @@ void removePathBasedCacheDirective(Long id) throws IOException {
getEditLog().logSync();
}
BatchedListEntries<PathBasedCacheDirective> listPathBasedCacheDirectives(
long startId, PathBasedCacheDirective filter) throws IOException {
BatchedListEntries<CacheDirectiveEntry> listCacheDirectives(
long startId, CacheDirectiveInfo filter) throws IOException {
checkOperation(OperationCategory.READ);
final FSPermissionChecker pc = isPermissionEnabled ?
getPermissionChecker() : null;
BatchedListEntries<PathBasedCacheDirective> results;
BatchedListEntries<CacheDirectiveEntry> results;
readLock();
boolean success = false;
try {
checkOperation(OperationCategory.READ);
results =
cacheManager.listPathBasedCacheDirectives(startId, filter, pc);
cacheManager.listCacheDirectives(startId, filter, pc);
success = true;
} finally {
readUnlock();
if (isAuditEnabled() && isExternalInvocation()) {
logAuditEvent(success, "listPathBasedCacheDirectives", null, null,
logAuditEvent(success, "listCacheDirectives", null, null,
null);
}
}

View File

@ -61,7 +61,8 @@
import org.apache.hadoop.hdfs.HDFSPolicyProvider;
import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException;
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
import org.apache.hadoop.hdfs.protocol.PathBasedCacheDirective;
import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry;
import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo;
import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
import org.apache.hadoop.hdfs.protocol.CorruptFileBlocks;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
@ -1233,52 +1234,52 @@ public SnapshotDiffReport getSnapshotDiffReport(String snapshotRoot,
}
@Override
public long addPathBasedCacheDirective(
PathBasedCacheDirective path) throws IOException {
return namesystem.addPathBasedCacheDirective(path);
public long addCacheDirective(
CacheDirectiveInfo path) throws IOException {
return namesystem.addCacheDirective(path);
}
@Override
public void modifyPathBasedCacheDirective(
PathBasedCacheDirective directive) throws IOException {
namesystem.modifyPathBasedCacheDirective(directive);
public void modifyCacheDirective(
CacheDirectiveInfo directive) throws IOException {
namesystem.modifyCacheDirective(directive);
}
@Override
public void removePathBasedCacheDirective(long id) throws IOException {
namesystem.removePathBasedCacheDirective(id);
public void removeCacheDirective(long id) throws IOException {
namesystem.removeCacheDirective(id);
}
private class ServerSidePathBasedCacheEntriesIterator
extends BatchedRemoteIterator<Long, PathBasedCacheDirective> {
private class ServerSideCacheEntriesIterator
extends BatchedRemoteIterator<Long, CacheDirectiveEntry> {
private final PathBasedCacheDirective filter;
private final CacheDirectiveInfo filter;
public ServerSidePathBasedCacheEntriesIterator(Long firstKey,
PathBasedCacheDirective filter) {
public ServerSideCacheEntriesIterator (Long firstKey,
CacheDirectiveInfo filter) {
super(firstKey);
this.filter = filter;
}
@Override
public BatchedEntries<PathBasedCacheDirective> makeRequest(
public BatchedEntries<CacheDirectiveEntry> makeRequest(
Long nextKey) throws IOException {
return namesystem.listPathBasedCacheDirectives(nextKey, filter);
return namesystem.listCacheDirectives(nextKey, filter);
}
@Override
public Long elementToPrevKey(PathBasedCacheDirective entry) {
return entry.getId();
public Long elementToPrevKey(CacheDirectiveEntry entry) {
return entry.getInfo().getId();
}
}
@Override
public RemoteIterator<PathBasedCacheDirective> listPathBasedCacheDirectives(long prevId,
PathBasedCacheDirective filter) throws IOException {
public RemoteIterator<CacheDirectiveEntry> listCacheDirectives(long prevId,
CacheDirectiveInfo filter) throws IOException {
if (filter == null) {
filter = new PathBasedCacheDirective.Builder().build();
filter = new CacheDirectiveInfo.Builder().build();
}
return new ServerSidePathBasedCacheEntriesIterator(prevId, filter);
return new ServerSideCacheEntriesIterator(prevId, filter);
}
@Override

View File

@ -18,7 +18,6 @@
package org.apache.hadoop.hdfs.tools;
import java.io.IOException;
import java.util.Arrays;
import java.util.LinkedList;
import java.util.List;
@ -31,8 +30,10 @@
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry;
import org.apache.hadoop.hdfs.protocol.CacheDirectiveStats;
import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
import org.apache.hadoop.hdfs.protocol.PathBasedCacheDirective;
import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo;
import org.apache.hadoop.hdfs.server.namenode.CachePool;
import org.apache.hadoop.hdfs.tools.TableListing.Justification;
import org.apache.hadoop.ipc.RemoteException;
@ -121,7 +122,7 @@ interface Command {
int run(Configuration conf, List<String> args) throws IOException;
}
private static class AddPathBasedCacheDirectiveCommand implements Command {
private static class AddCacheDirectiveInfoCommand implements Command {
@Override
public String getName() {
return "-addDirective";
@ -144,7 +145,7 @@ public String getLongUsage() {
"added. You must have write permission on the cache pool "
+ "in order to add new directives.");
return getShortUsage() + "\n" +
"Add a new PathBasedCache directive.\n\n" +
"Add a new cache directive.\n\n" +
listing.toString();
}
@ -172,14 +173,14 @@ public int run(Configuration conf, List<String> args) throws IOException {
}
DistributedFileSystem dfs = getDFS(conf);
PathBasedCacheDirective directive = new PathBasedCacheDirective.Builder().
CacheDirectiveInfo directive = new CacheDirectiveInfo.Builder().
setPath(new Path(path)).
setReplication(replication).
setPool(poolName).
build();
try {
long id = dfs.addPathBasedCacheDirective(directive);
System.out.println("Added PathBasedCache entry " + id);
long id = dfs.addCacheDirective(directive);
System.out.println("Added cache directive " + id);
} catch (IOException e) {
System.err.println(prettifyException(e));
return 2;
@ -189,7 +190,7 @@ public int run(Configuration conf, List<String> args) throws IOException {
}
}
private static class RemovePathBasedCacheDirectiveCommand implements Command {
private static class RemoveCacheDirectiveInfoCommand implements Command {
@Override
public String getName() {
return "-removeDirective";
@ -206,7 +207,7 @@ public String getLongUsage() {
listing.addRow("<id>", "The id of the cache directive to remove. " +
"You must have write permission on the pool of the " +
"directive in order to remove it. To see a list " +
"of PathBasedCache directive IDs, use the -listDirectives command.");
"of cache directive IDs, use the -listDirectives command.");
return getShortUsage() + "\n" +
"Remove a cache directive.\n\n" +
listing.toString();
@ -239,8 +240,8 @@ public int run(Configuration conf, List<String> args) throws IOException {
}
DistributedFileSystem dfs = getDFS(conf);
try {
dfs.getClient().removePathBasedCacheDirective(id);
System.out.println("Removed PathBasedCache directive " + id);
dfs.getClient().removeCacheDirective(id);
System.out.println("Removed cached directive " + id);
} catch (IOException e) {
System.err.println(prettifyException(e));
return 2;
@ -249,7 +250,7 @@ public int run(Configuration conf, List<String> args) throws IOException {
}
}
private static class ModifyPathBasedCacheDirectiveCommand implements Command {
private static class ModifyCacheDirectiveInfoCommand implements Command {
@Override
public String getName() {
return "-modifyDirective";
@ -274,14 +275,14 @@ public String getLongUsage() {
"added. You must have write permission on the cache pool "
+ "in order to move a directive into it. (optional)");
return getShortUsage() + "\n" +
"Modify a PathBasedCache directive.\n\n" +
"Modify a cache directive.\n\n" +
listing.toString();
}
@Override
public int run(Configuration conf, List<String> args) throws IOException {
PathBasedCacheDirective.Builder builder =
new PathBasedCacheDirective.Builder();
CacheDirectiveInfo.Builder builder =
new CacheDirectiveInfo.Builder();
boolean modified = false;
String idString = StringUtils.popOptionWithArgument("-id", args);
if (idString == null) {
@ -317,8 +318,8 @@ public int run(Configuration conf, List<String> args) throws IOException {
}
DistributedFileSystem dfs = getDFS(conf);
try {
dfs.modifyPathBasedCacheDirective(builder.build());
System.out.println("Modified PathBasedCache entry " + idString);
dfs.modifyCacheDirective(builder.build());
System.out.println("Modified cache directive " + idString);
} catch (IOException e) {
System.err.println(prettifyException(e));
return 2;
@ -327,7 +328,7 @@ public int run(Configuration conf, List<String> args) throws IOException {
}
}
private static class RemovePathBasedCacheDirectivesCommand implements Command {
private static class RemoveCacheDirectiveInfosCommand implements Command {
@Override
public String getName() {
return "-removeDirectives";
@ -363,31 +364,31 @@ public int run(Configuration conf, List<String> args) throws IOException {
return 1;
}
DistributedFileSystem dfs = getDFS(conf);
RemoteIterator<PathBasedCacheDirective> iter =
dfs.listPathBasedCacheDirectives(
new PathBasedCacheDirective.Builder().
RemoteIterator<CacheDirectiveEntry> iter =
dfs.listCacheDirectives(
new CacheDirectiveInfo.Builder().
setPath(new Path(path)).build());
int exitCode = 0;
while (iter.hasNext()) {
PathBasedCacheDirective directive = iter.next();
CacheDirectiveEntry entry = iter.next();
try {
dfs.removePathBasedCacheDirective(directive.getId());
System.out.println("Removed PathBasedCache directive " +
directive.getId());
dfs.removeCacheDirective(entry.getInfo().getId());
System.out.println("Removed cache directive " +
entry.getInfo().getId());
} catch (IOException e) {
System.err.println(prettifyException(e));
exitCode = 2;
}
}
if (exitCode == 0) {
System.out.println("Removed every PathBasedCache directive with path " +
System.out.println("Removed every cache directive with path " +
path);
}
return exitCode;
}
}
private static class ListPathBasedCacheDirectiveCommand implements Command {
private static class ListCacheDirectiveInfoCommand implements Command {
@Override
public String getName() {
return "-listDirectives";
@ -402,21 +403,21 @@ public String getShortUsage() {
public String getLongUsage() {
TableListing listing = getOptionDescriptionListing();
listing.addRow("<path>", "List only " +
"PathBasedCache directives with this path. " +
"Note that if there is a PathBasedCache directive for <path> " +
"cache directives with this path. " +
"Note that if there is a cache directive for <path> " +
"in a cache pool that we don't have read access for, it " +
"will not be listed.");
listing.addRow("<pool>", "List only path cache directives in that pool.");
listing.addRow("-stats", "List path-based cache directive statistics.");
return getShortUsage() + "\n" +
"List PathBasedCache directives.\n\n" +
"List cache directives.\n\n" +
listing.toString();
}
@Override
public int run(Configuration conf, List<String> args) throws IOException {
PathBasedCacheDirective.Builder builder =
new PathBasedCacheDirective.Builder();
CacheDirectiveInfo.Builder builder =
new CacheDirectiveInfo.Builder();
String pathFilter = StringUtils.popOptionWithArgument("-path", args);
if (pathFilter != null) {
builder.setPath(new Path(pathFilter));
@ -443,20 +444,22 @@ public int run(Configuration conf, List<String> args) throws IOException {
TableListing tableListing = tableBuilder.build();
DistributedFileSystem dfs = getDFS(conf);
RemoteIterator<PathBasedCacheDirective> iter =
dfs.listPathBasedCacheDirectives(builder.build());
RemoteIterator<CacheDirectiveEntry> iter =
dfs.listCacheDirectives(builder.build());
int numEntries = 0;
while (iter.hasNext()) {
PathBasedCacheDirective directive = iter.next();
CacheDirectiveEntry entry = iter.next();
CacheDirectiveInfo directive = entry.getInfo();
CacheDirectiveStats stats = entry.getStats();
List<String> row = new LinkedList<String>();
row.add("" + directive.getId());
row.add(directive.getPool());
row.add("" + directive.getReplication());
row.add(directive.getPath().toUri().getPath());
if (printStats) {
row.add("" + directive.getBytesNeeded());
row.add("" + directive.getBytesCached());
row.add("" + directive.getFilesAffected());
row.add("" + stats.getBytesNeeded());
row.add("" + stats.getBytesCached());
row.add("" + stats.getFilesAffected());
}
tableListing.addRow(row.toArray(new String[0]));
numEntries++;
@ -838,11 +841,11 @@ public int run(Configuration conf, List<String> args) throws IOException {
}
private static Command[] COMMANDS = {
new AddPathBasedCacheDirectiveCommand(),
new ModifyPathBasedCacheDirectiveCommand(),
new ListPathBasedCacheDirectiveCommand(),
new RemovePathBasedCacheDirectiveCommand(),
new RemovePathBasedCacheDirectivesCommand(),
new AddCacheDirectiveInfoCommand(),
new ModifyCacheDirectiveInfoCommand(),
new ListCacheDirectiveInfoCommand(),
new RemoveCacheDirectiveInfoCommand(),
new RemoveCacheDirectiveInfosCommand(),
new AddCachePoolCommand(),
new ModifyCachePoolCommand(),
new RemoveCachePoolCommand(),

View File

@ -363,49 +363,53 @@ message IsFileClosedResponseProto {
required bool result = 1;
}
message PathBasedCacheDirectiveInfoProto {
message CacheDirectiveInfoProto {
optional int64 id = 1;
optional string path = 2;
optional uint32 replication = 3;
optional string pool = 4;
optional int64 bytesNeeded = 5;
optional int64 bytesCached = 6;
optional int64 filesAffected = 7;
}
message AddPathBasedCacheDirectiveRequestProto {
required PathBasedCacheDirectiveInfoProto info = 1;
message CacheDirectiveStatsProto {
required int64 bytesNeeded = 1;
required int64 bytesCached = 2;
required int64 filesAffected = 3;
}
message AddPathBasedCacheDirectiveResponseProto {
message AddCacheDirectiveRequestProto {
required CacheDirectiveInfoProto info = 1;
}
message AddCacheDirectiveResponseProto {
required int64 id = 1;
}
message ModifyPathBasedCacheDirectiveRequestProto {
required PathBasedCacheDirectiveInfoProto info = 1;
message ModifyCacheDirectiveRequestProto {
required CacheDirectiveInfoProto info = 1;
}
message ModifyPathBasedCacheDirectiveResponseProto {
message ModifyCacheDirectiveResponseProto {
}
message RemovePathBasedCacheDirectiveRequestProto {
message RemoveCacheDirectiveRequestProto {
required int64 id = 1;
}
message RemovePathBasedCacheDirectiveResponseProto {
message RemoveCacheDirectiveResponseProto {
}
message ListPathBasedCacheDirectivesRequestProto {
message ListCacheDirectivesRequestProto {
required int64 prevId = 1;
required PathBasedCacheDirectiveInfoProto filter = 2;
required CacheDirectiveInfoProto filter = 2;
}
message ListPathBasedCacheDirectivesElementProto {
required PathBasedCacheDirectiveInfoProto info = 1;
message CacheDirectiveEntryProto {
required CacheDirectiveInfoProto info = 1;
required CacheDirectiveStatsProto stats = 2;
}
message ListPathBasedCacheDirectivesResponseProto {
repeated ListPathBasedCacheDirectivesElementProto elements = 1;
message ListCacheDirectivesResponseProto {
repeated CacheDirectiveEntryProto elements = 1;
required bool hasMore = 2;
}
@ -632,14 +636,14 @@ service ClientNamenodeProtocol {
returns(ListCorruptFileBlocksResponseProto);
rpc metaSave(MetaSaveRequestProto) returns(MetaSaveResponseProto);
rpc getFileInfo(GetFileInfoRequestProto) returns(GetFileInfoResponseProto);
rpc addPathBasedCacheDirective(AddPathBasedCacheDirectiveRequestProto)
returns (AddPathBasedCacheDirectiveResponseProto);
rpc modifyPathBasedCacheDirective(ModifyPathBasedCacheDirectiveRequestProto)
returns (ModifyPathBasedCacheDirectiveResponseProto);
rpc removePathBasedCacheDirective(RemovePathBasedCacheDirectiveRequestProto)
returns (RemovePathBasedCacheDirectiveResponseProto);
rpc listPathBasedCacheDirectives(ListPathBasedCacheDirectivesRequestProto)
returns (ListPathBasedCacheDirectivesResponseProto);
rpc addCacheDirective(AddCacheDirectiveRequestProto)
returns (AddCacheDirectiveResponseProto);
rpc modifyCacheDirective(ModifyCacheDirectiveRequestProto)
returns (ModifyCacheDirectiveResponseProto);
rpc removeCacheDirective(RemoveCacheDirectiveRequestProto)
returns (RemoveCacheDirectiveResponseProto);
rpc listCacheDirectives(ListCacheDirectivesRequestProto)
returns (ListCacheDirectivesResponseProto);
rpc addCachePool(AddCachePoolRequestProto)
returns(AddCachePoolResponseProto);
rpc modifyCachePool(ModifyCachePoolRequestProto)

View File

@ -118,7 +118,7 @@ Centralized Cache Management in HDFS
Usage: <<<hdfs cacheadmin -addDirective -path <path> -replication <replication> -pool <pool-name> >>>
Add a new PathBasedCache directive.
Add a new cache directive.
*--+--+
\<path\> | A path to cache. The path can be a directory or a file.
@ -135,7 +135,7 @@ Centralized Cache Management in HDFS
Remove a cache directive.
*--+--+
\<id\> | The id of the cache directive to remove. You must have write permission on the pool of the directive in order to remove it. To see a list of PathBasedCache directive IDs, use the -listDirectives command.
\<id\> | The id of the cache directive to remove. You must have write permission on the pool of the directive in order to remove it. To see a list of cachedirective IDs, use the -listDirectives command.
*--+--+
*** {removeDirectives}
@ -152,10 +152,10 @@ Centralized Cache Management in HDFS
Usage: <<<hdfs cacheadmin -listDirectives [-path <path>] [-pool <pool>] >>>
List PathBasedCache directives.
List cache directives.
*--+--+
\<path\> | List only PathBasedCache directives with this path. Note that if there is a PathBasedCache directive for <path> in a cache pool that we don't have read access for, it will not be listed.
\<path\> | List only cache directives with this path. Note that if there is a cache directive for <path> in a cache pool that we don't have read access for, it will not be listed.
*--+--+
\<pool\> | List only path cache directives in that pool.
*--+--+

View File

@ -998,20 +998,20 @@ public static void runOperations(MiniDFSCluster cluster,
// OP_MODIFY_CACHE_POOL
filesystem.modifyCachePool(new CachePoolInfo("pool1").setWeight(99));
// OP_ADD_PATH_BASED_CACHE_DIRECTIVE
long id = filesystem.addPathBasedCacheDirective(
new PathBasedCacheDirective.Builder().
long id = filesystem.addCacheDirective(
new CacheDirectiveInfo.Builder().
setPath(new Path("/path")).
setReplication((short)1).
setPool("pool1").
build());
// OP_MODIFY_PATH_BASED_CACHE_DIRECTIVE
filesystem.modifyPathBasedCacheDirective(
new PathBasedCacheDirective.Builder().
filesystem.modifyCacheDirective(
new CacheDirectiveInfo.Builder().
setId(id).
setReplication((short)2).
build());
// OP_REMOVE_PATH_BASED_CACHE_DIRECTIVE
filesystem.removePathBasedCacheDirective(id);
filesystem.removeCacheDirective(id);
// OP_REMOVE_CACHE_POOL
filesystem.removeCachePool("pool1");
}

View File

@ -42,7 +42,7 @@
import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.protocol.PathBasedCacheDirective;
import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo;
import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
import org.apache.hadoop.hdfs.server.common.Util;
import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeDirType;
@ -241,20 +241,20 @@ public Object run() throws IOException, InterruptedException {
.setMode(new FsPermission((short)0700))
.setWeight(1989));
// OP_ADD_PATH_BASED_CACHE_DIRECTIVE 33
long id = dfs.addPathBasedCacheDirective(
new PathBasedCacheDirective.Builder().
long id = dfs.addCacheDirective(
new CacheDirectiveInfo.Builder().
setPath(new Path("/bar")).
setReplication((short)1).
setPool(pool).
build());
// OP_MODIFY_PATH_BASED_CACHE_DIRECTIVE 38
dfs.modifyPathBasedCacheDirective(
new PathBasedCacheDirective.Builder().
dfs.modifyCacheDirective(
new CacheDirectiveInfo.Builder().
setId(id).
setPath(new Path("/bar2")).
build());
// OP_REMOVE_PATH_BASED_CACHE_DIRECTIVE 34
dfs.removePathBasedCacheDirective(id);
dfs.removeCacheDirective(id);
// OP_REMOVE_CACHE_POOL 37
dfs.removeCachePool(pool);
// sync to disk, otherwise we parse partial edits

View File

@ -31,7 +31,6 @@
import static org.junit.Assert.fail;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Iterator;
@ -54,8 +53,10 @@
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry;
import org.apache.hadoop.hdfs.protocol.CacheDirectiveStats;
import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
import org.apache.hadoop.hdfs.protocol.PathBasedCacheDirective;
import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo;
import org.apache.hadoop.hdfs.server.blockmanagement.CacheReplicationMonitor;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor.CachedBlocksList.Type;
import org.apache.hadoop.hdfs.server.namenode.EditLogFileOutputStream;
@ -76,8 +77,8 @@
import com.google.common.base.Supplier;
public class TestPathBasedCacheRequests {
static final Log LOG = LogFactory.getLog(TestPathBasedCacheRequests.class);
public class TestCacheDirectives {
static final Log LOG = LogFactory.getLog(TestCacheDirectives.class);
private static final UserGroupInformation unprivilegedUser =
UserGroupInformation.createRemoteUser("unprivilegedUser");
@ -312,24 +313,25 @@ public void testCreateAndModifyPools() throws Exception {
}
private static void validateListAll(
RemoteIterator<PathBasedCacheDirective> iter,
RemoteIterator<CacheDirectiveEntry> iter,
Long... ids) throws Exception {
for (Long id: ids) {
assertTrue("Unexpectedly few elements", iter.hasNext());
assertEquals("Unexpected directive ID", id, iter.next().getId());
assertEquals("Unexpected directive ID", id,
iter.next().getInfo().getId());
}
assertFalse("Unexpectedly many list elements", iter.hasNext());
}
private static long addAsUnprivileged(
final PathBasedCacheDirective directive) throws Exception {
final CacheDirectiveInfo directive) throws Exception {
return unprivilegedUser
.doAs(new PrivilegedExceptionAction<Long>() {
@Override
public Long run() throws IOException {
DistributedFileSystem myDfs =
(DistributedFileSystem) FileSystem.get(conf);
return myDfs.addPathBasedCacheDirective(directive);
return myDfs.addCacheDirective(directive);
}
});
}
@ -345,15 +347,15 @@ public void testAddRemoveDirectives() throws Exception {
proto.addCachePool(new CachePoolInfo("pool4").
setMode(new FsPermission((short)0)));
PathBasedCacheDirective alpha = new PathBasedCacheDirective.Builder().
CacheDirectiveInfo alpha = new CacheDirectiveInfo.Builder().
setPath(new Path("/alpha")).
setPool("pool1").
build();
PathBasedCacheDirective beta = new PathBasedCacheDirective.Builder().
CacheDirectiveInfo beta = new CacheDirectiveInfo.Builder().
setPath(new Path("/beta")).
setPool("pool2").
build();
PathBasedCacheDirective delta = new PathBasedCacheDirective.Builder().
CacheDirectiveInfo delta = new CacheDirectiveInfo.Builder().
setPath(new Path("/delta")).
setPool("pool1").
build();
@ -361,12 +363,12 @@ public void testAddRemoveDirectives() throws Exception {
long alphaId = addAsUnprivileged(alpha);
long alphaId2 = addAsUnprivileged(alpha);
assertFalse("Expected to get unique directives when re-adding an "
+ "existing PathBasedCacheDirective",
+ "existing CacheDirectiveInfo",
alphaId == alphaId2);
long betaId = addAsUnprivileged(beta);
try {
addAsUnprivileged(new PathBasedCacheDirective.Builder().
addAsUnprivileged(new CacheDirectiveInfo.Builder().
setPath(new Path("/unicorn")).
setPool("no_such_pool").
build());
@ -376,7 +378,7 @@ public void testAddRemoveDirectives() throws Exception {
}
try {
addAsUnprivileged(new PathBasedCacheDirective.Builder().
addAsUnprivileged(new CacheDirectiveInfo.Builder().
setPath(new Path("/blackhole")).
setPool("pool4").
build());
@ -388,7 +390,7 @@ public void testAddRemoveDirectives() throws Exception {
}
try {
addAsUnprivileged(new PathBasedCacheDirective.Builder().
addAsUnprivileged(new CacheDirectiveInfo.Builder().
setPath(new Path("/illegal:path/")).
setPool("pool1").
build());
@ -399,12 +401,12 @@ public void testAddRemoveDirectives() throws Exception {
}
try {
addAsUnprivileged(new PathBasedCacheDirective.Builder().
addAsUnprivileged(new CacheDirectiveInfo.Builder().
setPath(new Path("/emptypoolname")).
setReplication((short)1).
setPool("").
build());
fail("expected an error when adding a PathBasedCache " +
fail("expected an error when adding a cache " +
"directive with an empty pool name.");
} catch (InvalidRequestException e) {
GenericTestUtils.assertExceptionContains("Invalid empty pool name", e);
@ -415,75 +417,75 @@ public void testAddRemoveDirectives() throws Exception {
// We expect the following to succeed, because DistributedFileSystem
// qualifies the path.
long relativeId = addAsUnprivileged(
new PathBasedCacheDirective.Builder().
new CacheDirectiveInfo.Builder().
setPath(new Path("relative")).
setPool("pool1").
build());
RemoteIterator<PathBasedCacheDirective> iter;
iter = dfs.listPathBasedCacheDirectives(null);
RemoteIterator<CacheDirectiveEntry> iter;
iter = dfs.listCacheDirectives(null);
validateListAll(iter, alphaId, alphaId2, betaId, deltaId, relativeId );
iter = dfs.listPathBasedCacheDirectives(
new PathBasedCacheDirective.Builder().setPool("pool3").build());
iter = dfs.listCacheDirectives(
new CacheDirectiveInfo.Builder().setPool("pool3").build());
assertFalse(iter.hasNext());
iter = dfs.listPathBasedCacheDirectives(
new PathBasedCacheDirective.Builder().setPool("pool1").build());
iter = dfs.listCacheDirectives(
new CacheDirectiveInfo.Builder().setPool("pool1").build());
validateListAll(iter, alphaId, alphaId2, deltaId, relativeId );
iter = dfs.listPathBasedCacheDirectives(
new PathBasedCacheDirective.Builder().setPool("pool2").build());
iter = dfs.listCacheDirectives(
new CacheDirectiveInfo.Builder().setPool("pool2").build());
validateListAll(iter, betaId);
dfs.removePathBasedCacheDirective(betaId);
iter = dfs.listPathBasedCacheDirectives(
new PathBasedCacheDirective.Builder().setPool("pool2").build());
dfs.removeCacheDirective(betaId);
iter = dfs.listCacheDirectives(
new CacheDirectiveInfo.Builder().setPool("pool2").build());
assertFalse(iter.hasNext());
try {
dfs.removePathBasedCacheDirective(betaId);
dfs.removeCacheDirective(betaId);
fail("expected an error when removing a non-existent ID");
} catch (InvalidRequestException e) {
GenericTestUtils.assertExceptionContains("No directive with ID", e);
}
try {
proto.removePathBasedCacheDirective(-42l);
proto.removeCacheDirective(-42l);
fail("expected an error when removing a negative ID");
} catch (InvalidRequestException e) {
GenericTestUtils.assertExceptionContains(
"Invalid negative ID", e);
}
try {
proto.removePathBasedCacheDirective(43l);
proto.removeCacheDirective(43l);
fail("expected an error when removing a non-existent ID");
} catch (InvalidRequestException e) {
GenericTestUtils.assertExceptionContains("No directive with ID", e);
}
dfs.removePathBasedCacheDirective(alphaId);
dfs.removePathBasedCacheDirective(alphaId2);
dfs.removePathBasedCacheDirective(deltaId);
dfs.removeCacheDirective(alphaId);
dfs.removeCacheDirective(alphaId2);
dfs.removeCacheDirective(deltaId);
dfs.modifyPathBasedCacheDirective(new PathBasedCacheDirective.Builder().
dfs.modifyCacheDirective(new CacheDirectiveInfo.Builder().
setId(relativeId).
setReplication((short)555).
build());
iter = dfs.listPathBasedCacheDirectives(null);
iter = dfs.listCacheDirectives(null);
assertTrue(iter.hasNext());
PathBasedCacheDirective modified = iter.next();
CacheDirectiveInfo modified = iter.next().getInfo();
assertEquals(relativeId, modified.getId().longValue());
assertEquals((short)555, modified.getReplication().shortValue());
dfs.removePathBasedCacheDirective(relativeId);
iter = dfs.listPathBasedCacheDirectives(null);
dfs.removeCacheDirective(relativeId);
iter = dfs.listCacheDirectives(null);
assertFalse(iter.hasNext());
// Verify that PBCDs with path "." work correctly
PathBasedCacheDirective directive =
new PathBasedCacheDirective.Builder().setPath(new Path("."))
CacheDirectiveInfo directive =
new CacheDirectiveInfo.Builder().setPath(new Path("."))
.setPool("pool1").build();
long id = dfs.addPathBasedCacheDirective(directive);
dfs.modifyPathBasedCacheDirective(new PathBasedCacheDirective.Builder(
long id = dfs.addCacheDirective(directive);
dfs.modifyCacheDirective(new CacheDirectiveInfo.Builder(
directive).setId(id).setReplication((short)2).build());
dfs.removePathBasedCacheDirective(id);
dfs.removeCacheDirective(id);
}
@Test(timeout=60000)
@ -519,15 +521,15 @@ public void testCacheManagerRestart() throws Exception {
String entryPrefix = "/party-";
long prevId = -1;
for (int i=0; i<numEntries; i++) {
prevId = dfs.addPathBasedCacheDirective(
new PathBasedCacheDirective.Builder().
prevId = dfs.addCacheDirective(
new CacheDirectiveInfo.Builder().
setPath(new Path(entryPrefix + i)).setPool(pool).build());
}
RemoteIterator<PathBasedCacheDirective> dit
= dfs.listPathBasedCacheDirectives(null);
RemoteIterator<CacheDirectiveEntry> dit
= dfs.listCacheDirectives(null);
for (int i=0; i<numEntries; i++) {
assertTrue("Unexpected # of cache entries: " + i, dit.hasNext());
PathBasedCacheDirective cd = dit.next();
CacheDirectiveInfo cd = dit.next().getInfo();
assertEquals(i+1, cd.getId().longValue());
assertEquals(entryPrefix + i, cd.getPath().toUri().getPath());
assertEquals(pool, cd.getPool());
@ -548,18 +550,18 @@ public void testCacheManagerRestart() throws Exception {
assertEquals(weight, (int)info.getWeight());
assertFalse("Unexpected # of cache pools found", pit.hasNext());
dit = dfs.listPathBasedCacheDirectives(null);
dit = dfs.listCacheDirectives(null);
for (int i=0; i<numEntries; i++) {
assertTrue("Unexpected # of cache entries: " + i, dit.hasNext());
PathBasedCacheDirective cd = dit.next();
CacheDirectiveInfo cd = dit.next().getInfo();
assertEquals(i+1, cd.getId().longValue());
assertEquals(entryPrefix + i, cd.getPath().toUri().getPath());
assertEquals(pool, cd.getPool());
}
assertFalse("Unexpected # of cache directives found", dit.hasNext());
long nextId = dfs.addPathBasedCacheDirective(
new PathBasedCacheDirective.Builder().
long nextId = dfs.addCacheDirective(
new CacheDirectiveInfo.Builder().
setPath(new Path("/foobar")).setPool(pool).build());
assertEquals(prevId + 1, nextId);
}
@ -691,22 +693,22 @@ public void testWaitForCachedReplicas() throws Exception {
// Cache and check each path in sequence
int expected = 0;
for (int i=0; i<numFiles; i++) {
PathBasedCacheDirective directive =
new PathBasedCacheDirective.Builder().
CacheDirectiveInfo directive =
new CacheDirectiveInfo.Builder().
setPath(new Path(paths.get(i))).
setPool(pool).
build();
nnRpc.addPathBasedCacheDirective(directive);
nnRpc.addCacheDirective(directive);
expected += numBlocksPerFile;
waitForCachedBlocks(namenode, expected, expected,
"testWaitForCachedReplicas:1");
}
// Uncache and check each path in sequence
RemoteIterator<PathBasedCacheDirective> entries =
nnRpc.listPathBasedCacheDirectives(0, null);
RemoteIterator<CacheDirectiveEntry> entries =
nnRpc.listCacheDirectives(0, null);
for (int i=0; i<numFiles; i++) {
PathBasedCacheDirective directive = entries.next();
nnRpc.removePathBasedCacheDirective(directive.getId());
CacheDirectiveEntry entry = entries.next();
nnRpc.removeCacheDirective(entry.getInfo().getId());
expected -= numBlocksPerFile;
waitForCachedBlocks(namenode, expected, expected,
"testWaitForCachedReplicas:2");
@ -717,7 +719,7 @@ public void testWaitForCachedReplicas() throws Exception {
}
@Test(timeout=120000)
public void testAddingPathBasedCacheDirectivesWhenCachingIsDisabled()
public void testAddingCacheDirectiveInfosWhenCachingIsDisabled()
throws Exception {
HdfsConfiguration conf = createCachingConf();
conf.setBoolean(DFS_NAMENODE_CACHING_ENABLED_KEY, false);
@ -743,22 +745,22 @@ public void testAddingPathBasedCacheDirectivesWhenCachingIsDisabled()
}
// Check the initial statistics at the namenode
waitForCachedBlocks(namenode, 0, 0,
"testAddingPathBasedCacheDirectivesWhenCachingIsDisabled:0");
"testAddingCacheDirectiveInfosWhenCachingIsDisabled:0");
// Cache and check each path in sequence
int expected = 0;
for (int i=0; i<numFiles; i++) {
PathBasedCacheDirective directive =
new PathBasedCacheDirective.Builder().
CacheDirectiveInfo directive =
new CacheDirectiveInfo.Builder().
setPath(new Path(paths.get(i))).
setPool(pool).
build();
dfs.addPathBasedCacheDirective(directive);
dfs.addCacheDirective(directive);
waitForCachedBlocks(namenode, expected, 0,
"testAddingPathBasedCacheDirectivesWhenCachingIsDisabled:1");
"testAddingCacheDirectiveInfosWhenCachingIsDisabled:1");
}
Thread.sleep(20000);
waitForCachedBlocks(namenode, expected, 0,
"testAddingPathBasedCacheDirectivesWhenCachingIsDisabled:2");
"testAddingCacheDirectiveInfosWhenCachingIsDisabled:2");
} finally {
cluster.shutdown();
}
@ -793,8 +795,8 @@ public void testWaitForCachedReplicasInDirectory() throws Exception {
waitForCachedBlocks(namenode, 0, 0,
"testWaitForCachedReplicasInDirectory:0");
// cache entire directory
long id = dfs.addPathBasedCacheDirective(
new PathBasedCacheDirective.Builder().
long id = dfs.addCacheDirective(
new CacheDirectiveInfo.Builder().
setPath(new Path("/foo")).
setReplication((short)2).
setPool(pool).
@ -802,22 +804,23 @@ public void testWaitForCachedReplicasInDirectory() throws Exception {
waitForCachedBlocks(namenode, 4, 8,
"testWaitForCachedReplicasInDirectory:1");
// Verify that listDirectives gives the stats we want.
RemoteIterator<PathBasedCacheDirective> iter =
dfs.listPathBasedCacheDirectives(new PathBasedCacheDirective.Builder().
RemoteIterator<CacheDirectiveEntry> iter =
dfs.listCacheDirectives(new CacheDirectiveInfo.Builder().
setPath(new Path("/foo")).
build());
PathBasedCacheDirective directive = iter.next();
CacheDirectiveEntry entry = iter.next();
CacheDirectiveStats stats = entry.getStats();
Assert.assertEquals(Long.valueOf(2),
directive.getFilesAffected());
stats.getFilesAffected());
Assert.assertEquals(Long.valueOf(
2 * numBlocksPerFile * BLOCK_SIZE * 2),
directive.getBytesNeeded());
stats.getBytesNeeded());
Assert.assertEquals(Long.valueOf(
2 * numBlocksPerFile * BLOCK_SIZE * 2),
directive.getBytesCached());
stats.getBytesCached());
long id2 = dfs.addPathBasedCacheDirective(
new PathBasedCacheDirective.Builder().
long id2 = dfs.addCacheDirective(
new CacheDirectiveInfo.Builder().
setPath(new Path("/foo/bar")).
setReplication((short)4).
setPool(pool).
@ -826,38 +829,40 @@ public void testWaitForCachedReplicasInDirectory() throws Exception {
waitForCachedBlocks(namenode, 4, 10,
"testWaitForCachedReplicasInDirectory:2");
// the directory directive's stats are unchanged
iter = dfs.listPathBasedCacheDirectives(
new PathBasedCacheDirective.Builder().
iter = dfs.listCacheDirectives(
new CacheDirectiveInfo.Builder().
setPath(new Path("/foo")).
build());
directive = iter.next();
entry = iter.next();
stats = entry.getStats();
Assert.assertEquals(Long.valueOf(2),
directive.getFilesAffected());
stats.getFilesAffected());
Assert.assertEquals(Long.valueOf(
2 * numBlocksPerFile * BLOCK_SIZE * 2),
directive.getBytesNeeded());
stats.getBytesNeeded());
Assert.assertEquals(Long.valueOf(
2 * numBlocksPerFile * BLOCK_SIZE * 2),
directive.getBytesCached());
stats.getBytesCached());
// verify /foo/bar's stats
iter = dfs.listPathBasedCacheDirectives(
new PathBasedCacheDirective.Builder().
iter = dfs.listCacheDirectives(
new CacheDirectiveInfo.Builder().
setPath(new Path("/foo/bar")).
build());
directive = iter.next();
entry = iter.next();
stats = entry.getStats();
Assert.assertEquals(Long.valueOf(1),
directive.getFilesAffected());
stats.getFilesAffected());
Assert.assertEquals(Long.valueOf(
4 * numBlocksPerFile * BLOCK_SIZE),
directive.getBytesNeeded());
stats.getBytesNeeded());
// only 3 because the file only has 3 replicas, not 4 as requested.
Assert.assertEquals(Long.valueOf(
3 * numBlocksPerFile * BLOCK_SIZE),
directive.getBytesCached());
stats.getBytesCached());
// remove and watch numCached go to 0
dfs.removePathBasedCacheDirective(id);
dfs.removePathBasedCacheDirective(id2);
dfs.removeCacheDirective(id);
dfs.removeCacheDirective(id2);
waitForCachedBlocks(namenode, 0, 0,
"testWaitForCachedReplicasInDirectory:3");
} finally {
@ -899,8 +904,8 @@ public void testReplicationFactor() throws Exception {
waitForCachedBlocks(namenode, 0, 0, "testReplicationFactor:0");
checkNumCachedReplicas(dfs, paths, 0, 0);
// cache directory
long id = dfs.addPathBasedCacheDirective(
new PathBasedCacheDirective.Builder().
long id = dfs.addCacheDirective(
new CacheDirectiveInfo.Builder().
setPath(new Path("/foo")).
setReplication((short)1).
setPool(pool).
@ -909,8 +914,8 @@ public void testReplicationFactor() throws Exception {
checkNumCachedReplicas(dfs, paths, 4, 4);
// step up the replication factor
for (int i=2; i<=3; i++) {
dfs.modifyPathBasedCacheDirective(
new PathBasedCacheDirective.Builder().
dfs.modifyCacheDirective(
new CacheDirectiveInfo.Builder().
setId(id).
setReplication((short)i).
build());
@ -919,8 +924,8 @@ public void testReplicationFactor() throws Exception {
}
// step it down
for (int i=2; i>=1; i--) {
dfs.modifyPathBasedCacheDirective(
new PathBasedCacheDirective.Builder().
dfs.modifyCacheDirective(
new CacheDirectiveInfo.Builder().
setId(id).
setReplication((short)i).
build());
@ -928,7 +933,7 @@ public void testReplicationFactor() throws Exception {
checkNumCachedReplicas(dfs, paths, 4, 4*i);
}
// remove and watch numCached go to 0
dfs.removePathBasedCacheDirective(id);
dfs.removeCacheDirective(id);
waitForCachedBlocks(namenode, 0, 0, "testReplicationFactor:4");
checkNumCachedReplicas(dfs, paths, 0, 0);
} finally {

View File

@ -61,7 +61,8 @@
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.protocol.PathBasedCacheDirective;
import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry;
import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoUnderConstruction;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.namenode.INodeFile;
@ -736,14 +737,14 @@ Object getResult() {
}
}
/** addPathBasedCacheDirective */
class AddPathBasedCacheDirectiveOp extends AtMostOnceOp {
private PathBasedCacheDirective directive;
/** addCacheDirective */
class AddCacheDirectiveInfoOp extends AtMostOnceOp {
private CacheDirectiveInfo directive;
private Long result;
AddPathBasedCacheDirectiveOp(DFSClient client,
PathBasedCacheDirective directive) {
super("addPathBasedCacheDirective", client);
AddCacheDirectiveInfoOp(DFSClient client,
CacheDirectiveInfo directive) {
super("addCacheDirective", client);
this.directive = directive;
}
@ -754,15 +755,15 @@ void prepare() throws Exception {
@Override
void invoke() throws Exception {
result = client.addPathBasedCacheDirective(directive);
result = client.addCacheDirective(directive);
}
@Override
boolean checkNamenodeBeforeReturn() throws Exception {
for (int i = 0; i < CHECKTIMES; i++) {
RemoteIterator<PathBasedCacheDirective> iter =
dfs.listPathBasedCacheDirectives(
new PathBasedCacheDirective.Builder().
RemoteIterator<CacheDirectiveEntry> iter =
dfs.listCacheDirectives(
new CacheDirectiveInfo.Builder().
setPool(directive.getPool()).
setPath(directive.getPath()).
build());
@ -780,15 +781,15 @@ Object getResult() {
}
}
/** modifyPathBasedCacheDirective */
class ModifyPathBasedCacheDirectiveOp extends AtMostOnceOp {
private final PathBasedCacheDirective directive;
/** modifyCacheDirective */
class ModifyCacheDirectiveInfoOp extends AtMostOnceOp {
private final CacheDirectiveInfo directive;
private final short newReplication;
private long id;
ModifyPathBasedCacheDirectiveOp(DFSClient client,
PathBasedCacheDirective directive, short newReplication) {
super("modifyPathBasedCacheDirective", client);
ModifyCacheDirectiveInfoOp(DFSClient client,
CacheDirectiveInfo directive, short newReplication) {
super("modifyCacheDirective", client);
this.directive = directive;
this.newReplication = newReplication;
}
@ -796,13 +797,13 @@ class ModifyPathBasedCacheDirectiveOp extends AtMostOnceOp {
@Override
void prepare() throws Exception {
dfs.addCachePool(new CachePoolInfo(directive.getPool()));
id = client.addPathBasedCacheDirective(directive);
id = client.addCacheDirective(directive);
}
@Override
void invoke() throws Exception {
client.modifyPathBasedCacheDirective(
new PathBasedCacheDirective.Builder().
client.modifyCacheDirective(
new CacheDirectiveInfo.Builder().
setId(id).
setReplication(newReplication).
build());
@ -811,14 +812,14 @@ void invoke() throws Exception {
@Override
boolean checkNamenodeBeforeReturn() throws Exception {
for (int i = 0; i < CHECKTIMES; i++) {
RemoteIterator<PathBasedCacheDirective> iter =
dfs.listPathBasedCacheDirectives(
new PathBasedCacheDirective.Builder().
RemoteIterator<CacheDirectiveEntry> iter =
dfs.listCacheDirectives(
new CacheDirectiveInfo.Builder().
setPool(directive.getPool()).
setPath(directive.getPath()).
build());
while (iter.hasNext()) {
PathBasedCacheDirective result = iter.next();
CacheDirectiveInfo result = iter.next().getInfo();
if ((result.getId() == id) &&
(result.getReplication().shortValue() == newReplication)) {
return true;
@ -835,15 +836,15 @@ Object getResult() {
}
}
/** removePathBasedCacheDirective */
class RemovePathBasedCacheDirectiveOp extends AtMostOnceOp {
private PathBasedCacheDirective directive;
/** removeCacheDirective */
class RemoveCacheDirectiveInfoOp extends AtMostOnceOp {
private CacheDirectiveInfo directive;
private long id;
RemovePathBasedCacheDirectiveOp(DFSClient client, String pool,
RemoveCacheDirectiveInfoOp(DFSClient client, String pool,
String path) {
super("removePathBasedCacheDirective", client);
this.directive = new PathBasedCacheDirective.Builder().
super("removeCacheDirective", client);
this.directive = new CacheDirectiveInfo.Builder().
setPool(pool).
setPath(new Path(path)).
build();
@ -852,20 +853,20 @@ class RemovePathBasedCacheDirectiveOp extends AtMostOnceOp {
@Override
void prepare() throws Exception {
dfs.addCachePool(new CachePoolInfo(directive.getPool()));
id = dfs.addPathBasedCacheDirective(directive);
id = dfs.addCacheDirective(directive);
}
@Override
void invoke() throws Exception {
client.removePathBasedCacheDirective(id);
client.removeCacheDirective(id);
}
@Override
boolean checkNamenodeBeforeReturn() throws Exception {
for (int i = 0; i < CHECKTIMES; i++) {
RemoteIterator<PathBasedCacheDirective> iter =
dfs.listPathBasedCacheDirectives(
new PathBasedCacheDirective.Builder().
RemoteIterator<CacheDirectiveEntry> iter =
dfs.listCacheDirectives(
new CacheDirectiveInfo.Builder().
setPool(directive.getPool()).
setPath(directive.getPath()).
build());
@ -1071,10 +1072,10 @@ public void testUpdatePipeline() throws Exception {
}
@Test (timeout=60000)
public void testAddPathBasedCacheDirective() throws Exception {
public void testAddCacheDirectiveInfo() throws Exception {
DFSClient client = genClientWithDummyHandler();
AtMostOnceOp op = new AddPathBasedCacheDirectiveOp(client,
new PathBasedCacheDirective.Builder().
AtMostOnceOp op = new AddCacheDirectiveInfoOp(client,
new CacheDirectiveInfo.Builder().
setPool("pool").
setPath(new Path("/path")).
build());
@ -1082,10 +1083,10 @@ public void testAddPathBasedCacheDirective() throws Exception {
}
@Test (timeout=60000)
public void testModifyPathBasedCacheDirective() throws Exception {
public void testModifyCacheDirectiveInfo() throws Exception {
DFSClient client = genClientWithDummyHandler();
AtMostOnceOp op = new ModifyPathBasedCacheDirectiveOp(client,
new PathBasedCacheDirective.Builder().
AtMostOnceOp op = new ModifyCacheDirectiveInfoOp(client,
new CacheDirectiveInfo.Builder().
setPool("pool").
setPath(new Path("/path")).
setReplication((short)1).build(),
@ -1094,9 +1095,9 @@ public void testModifyPathBasedCacheDirective() throws Exception {
}
@Test (timeout=60000)
public void testRemovePathBasedCacheDescriptor() throws Exception {
public void testRemoveCacheDescriptor() throws Exception {
DFSClient client = genClientWithDummyHandler();
AtMostOnceOp op = new RemovePathBasedCacheDirectiveOp(client, "pool",
AtMostOnceOp op = new RemoveCacheDirectiveInfoOp(client, "pool",
"/path");
testClientRetryWithFailover(op);
}