HDFS-5556. Add some more NameNode cache statistics, cache pool stats (cmccabe)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1546143 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Colin McCabe 2013-11-27 17:55:52 +00:00
parent b6d483b122
commit 13edb391d0
32 changed files with 600 additions and 225 deletions

View File

@ -218,6 +218,9 @@ Trunk (Unreleased)
HDFS-5286. Flatten INodeDirectory hierarchy: Replace INodeDirectoryWithQuota HDFS-5286. Flatten INodeDirectory hierarchy: Replace INodeDirectoryWithQuota
with DirectoryWithQuotaFeature. (szetszwo) with DirectoryWithQuotaFeature. (szetszwo)
HDFS-5556. Add some more NameNode cache statistics, cache pool stats
(cmccabe)
OPTIMIZATIONS OPTIMIZATIONS
HDFS-5349. DNA_CACHE and DNA_UNCACHE should be by blockId only. (cmccabe) HDFS-5349. DNA_CACHE and DNA_UNCACHE should be by blockId only. (cmccabe)

View File

@ -352,6 +352,11 @@
<Method name="getReplication" /> <Method name="getReplication" />
<Bug pattern="ICAST_QUESTIONABLE_UNSIGNED_RIGHT_SHIFT" /> <Bug pattern="ICAST_QUESTIONABLE_UNSIGNED_RIGHT_SHIFT" />
</Match> </Match>
<Match>
<Class name="org.apache.hadoop.hdfs.protocol.CacheDirective" />
<Method name="insertInternal" />
<Bug pattern="BC_UNCONFIRMED_CAST" />
</Match>
<!-- These two are used for shutting down and kicking the CRMon, do not need strong sync --> <!-- These two are used for shutting down and kicking the CRMon, do not need strong sync -->
<Match> <Match>
<Class name="org.apache.hadoop.hdfs.server.blockmanagement.CacheReplicationMonitor" /> <Class name="org.apache.hadoop.hdfs.server.blockmanagement.CacheReplicationMonitor" />

View File

@ -109,6 +109,7 @@ import org.apache.hadoop.hdfs.client.ClientMmapManager;
import org.apache.hadoop.hdfs.client.HdfsDataInputStream; import org.apache.hadoop.hdfs.client.HdfsDataInputStream;
import org.apache.hadoop.hdfs.client.HdfsDataOutputStream; import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry; import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry;
import org.apache.hadoop.hdfs.protocol.CachePoolEntry;
import org.apache.hadoop.hdfs.protocol.CachePoolInfo; import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
import org.apache.hadoop.hdfs.protocol.ClientProtocol; import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.protocol.CorruptFileBlocks; import org.apache.hadoop.hdfs.protocol.CorruptFileBlocks;
@ -2358,7 +2359,7 @@ public class DFSClient implements java.io.Closeable {
} }
} }
public RemoteIterator<CachePoolInfo> listCachePools() throws IOException { public RemoteIterator<CachePoolEntry> listCachePools() throws IOException {
checkOpen(); checkOpen();
try { try {
return namenode.listCachePools(""); return namenode.listCachePools("");

View File

@ -58,6 +58,7 @@ import org.apache.hadoop.hdfs.client.HdfsAdmin;
import org.apache.hadoop.hdfs.client.HdfsDataInputStream; import org.apache.hadoop.hdfs.client.HdfsDataInputStream;
import org.apache.hadoop.hdfs.client.HdfsDataOutputStream; import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry; import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry;
import org.apache.hadoop.hdfs.protocol.CachePoolEntry;
import org.apache.hadoop.hdfs.protocol.CachePoolInfo; import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.DirectoryListing; import org.apache.hadoop.hdfs.protocol.DirectoryListing;
@ -1713,12 +1714,12 @@ public class DistributedFileSystem extends FileSystem {
/** /**
* List all cache pools. * List all cache pools.
* *
* @return A remote iterator from which you can get CachePoolInfo objects. * @return A remote iterator from which you can get CachePoolEntry objects.
* Requests will be made as needed. * Requests will be made as needed.
* @throws IOException * @throws IOException
* If there was an error listing cache pools. * If there was an error listing cache pools.
*/ */
public RemoteIterator<CachePoolInfo> listCachePools() throws IOException { public RemoteIterator<CachePoolEntry> listCachePools() throws IOException {
return dfs.listCachePools(); return dfs.listCachePools();
} }
} }

View File

@ -29,6 +29,7 @@ import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry; import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry;
import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo; import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo;
import org.apache.hadoop.hdfs.protocol.CachePoolEntry;
import org.apache.hadoop.hdfs.protocol.CachePoolInfo; import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.tools.DFSAdmin; import org.apache.hadoop.hdfs.tools.DFSAdmin;
@ -213,12 +214,12 @@ public class HdfsAdmin {
/** /**
* List all cache pools. * List all cache pools.
* *
* @return A remote iterator from which you can get CachePoolInfo objects. * @return A remote iterator from which you can get CachePoolEntry objects.
* Requests will be made as needed. * Requests will be made as needed.
* @throws IOException * @throws IOException
* If there was an error listing cache pools. * If there was an error listing cache pools.
*/ */
public RemoteIterator<CachePoolInfo> listCachePools() throws IOException { public RemoteIterator<CachePoolEntry> listCachePools() throws IOException {
return dfs.listCachePools(); return dfs.listCachePools();
} }
} }

View File

@ -21,6 +21,8 @@ import org.apache.commons.lang.builder.HashCodeBuilder;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.server.namenode.CachePool; import org.apache.hadoop.hdfs.server.namenode.CachePool;
import org.apache.hadoop.util.IntrusiveCollection;
import org.apache.hadoop.util.IntrusiveCollection.Element;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
@ -30,32 +32,32 @@ import com.google.common.base.Preconditions;
* This is an implementation class, not part of the public API. * This is an implementation class, not part of the public API.
*/ */
@InterfaceAudience.Private @InterfaceAudience.Private
public final class CacheDirective { public final class CacheDirective implements IntrusiveCollection.Element {
private final long entryId; private final long id;
private final String path; private final String path;
private final short replication; private final short replication;
private final CachePool pool; private CachePool pool;
private long bytesNeeded; private long bytesNeeded;
private long bytesCached; private long bytesCached;
private long filesAffected; private long filesAffected;
private Element prev;
private Element next;
public CacheDirective(long entryId, String path, public CacheDirective(long id, String path,
short replication, CachePool pool) { short replication) {
Preconditions.checkArgument(entryId > 0); Preconditions.checkArgument(id > 0);
this.entryId = entryId; this.id = id;
Preconditions.checkArgument(replication > 0); Preconditions.checkArgument(replication > 0);
this.path = path; this.path = path;
Preconditions.checkNotNull(pool);
this.replication = replication; this.replication = replication;
Preconditions.checkNotNull(path); Preconditions.checkNotNull(path);
this.pool = pool;
this.bytesNeeded = 0; this.bytesNeeded = 0;
this.bytesCached = 0; this.bytesCached = 0;
this.filesAffected = 0; this.filesAffected = 0;
} }
public long getEntryId() { public long getId() {
return entryId; return id;
} }
public String getPath() { public String getPath() {
@ -70,9 +72,9 @@ public final class CacheDirective {
return replication; return replication;
} }
public CacheDirectiveInfo toDirective() { public CacheDirectiveInfo toInfo() {
return new CacheDirectiveInfo.Builder(). return new CacheDirectiveInfo.Builder().
setId(entryId). setId(id).
setPath(new Path(path)). setPath(new Path(path)).
setReplication(replication). setReplication(replication).
setPool(pool.getPoolName()). setPool(pool.getPoolName()).
@ -88,13 +90,13 @@ public final class CacheDirective {
} }
public CacheDirectiveEntry toEntry() { public CacheDirectiveEntry toEntry() {
return new CacheDirectiveEntry(toDirective(), toStats()); return new CacheDirectiveEntry(toInfo(), toStats());
} }
@Override @Override
public String toString() { public String toString() {
StringBuilder builder = new StringBuilder(); StringBuilder builder = new StringBuilder();
builder.append("{ entryId:").append(entryId). builder.append("{ id:").append(id).
append(", path:").append(path). append(", path:").append(path).
append(", replication:").append(replication). append(", replication:").append(replication).
append(", pool:").append(pool). append(", pool:").append(pool).
@ -113,12 +115,12 @@ public final class CacheDirective {
return false; return false;
} }
CacheDirective other = (CacheDirective)o; CacheDirective other = (CacheDirective)o;
return entryId == other.entryId; return id == other.id;
} }
@Override @Override
public int hashCode() { public int hashCode() {
return new HashCodeBuilder().append(entryId).toHashCode(); return new HashCodeBuilder().append(id).toHashCode();
} }
public long getBytesNeeded() { public long getBytesNeeded() {
@ -156,4 +158,55 @@ public final class CacheDirective {
public void incrementFilesAffected() { public void incrementFilesAffected() {
this.filesAffected++; this.filesAffected++;
} }
@SuppressWarnings("unchecked")
@Override // IntrusiveCollection.Element
public void insertInternal(IntrusiveCollection<? extends Element> list,
Element prev, Element next) {
assert this.pool == null;
this.pool = ((CachePool.DirectiveList)list).getCachePool();
this.prev = prev;
this.next = next;
}
@Override // IntrusiveCollection.Element
public void setPrev(IntrusiveCollection<? extends Element> list, Element prev) {
assert list == pool.getDirectiveList();
this.prev = prev;
}
@Override // IntrusiveCollection.Element
public void setNext(IntrusiveCollection<? extends Element> list, Element next) {
assert list == pool.getDirectiveList();
this.next = next;
}
@Override // IntrusiveCollection.Element
public void removeInternal(IntrusiveCollection<? extends Element> list) {
assert list == pool.getDirectiveList();
this.pool = null;
this.prev = null;
this.next = null;
}
@Override // IntrusiveCollection.Element
public Element getPrev(IntrusiveCollection<? extends Element> list) {
if (list != pool.getDirectiveList()) {
return null;
}
return this.prev;
}
@Override // IntrusiveCollection.Element
public Element getNext(IntrusiveCollection<? extends Element> list) {
if (list != pool.getDirectiveList()) {
return null;
}
return this.next;
}
@Override // IntrusiveCollection.Element
public boolean isInList(IntrusiveCollection<? extends Element> list) {
return pool == null ? false : list == pool.getDirectiveList();
}
}; };

View File

@ -94,21 +94,21 @@ public class CacheDirectiveStats {
/** /**
* @return The bytes needed. * @return The bytes needed.
*/ */
public Long getBytesNeeded() { public long getBytesNeeded() {
return bytesNeeded; return bytesNeeded;
} }
/** /**
* @return The bytes cached. * @return The bytes cached.
*/ */
public Long getBytesCached() { public long getBytesCached() {
return bytesCached; return bytesCached;
} }
/** /**
* @return The files affected. * @return The files affected.
*/ */
public Long getFilesAffected() { public long getFilesAffected() {
return filesAffected; return filesAffected;
} }

View File

@ -0,0 +1,45 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.protocol;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
/**
* Describes a Cache Pool entry.
*/
@InterfaceAudience.Public
@InterfaceStability.Evolving
public class CachePoolEntry {
private final CachePoolInfo info;
private final CachePoolStats stats;
public CachePoolEntry(CachePoolInfo info, CachePoolStats stats) {
this.info = info;
this.stats = stats;
}
public CachePoolInfo getInfo() {
return info;
}
public CachePoolStats getStats() {
return stats;
}
}

View File

@ -30,6 +30,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.InvalidRequestException;
import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.fs.permission.PermissionStatus; import org.apache.hadoop.fs.permission.PermissionStatus;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp; import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp;
@ -150,7 +151,10 @@ public class CachePoolInfo {
public static void validate(CachePoolInfo info) throws IOException { public static void validate(CachePoolInfo info) throws IOException {
if (info == null) { if (info == null) {
throw new IOException("CachePoolInfo is null"); throw new InvalidRequestException("CachePoolInfo is null");
}
if ((info.getWeight() != null) && (info.getWeight() < 0)) {
throw new InvalidRequestException("CachePool weight is negative.");
} }
validateName(info.poolName); validateName(info.poolName);
} }

View File

@ -0,0 +1,87 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.protocol;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
/**
* CachePoolStats describes cache pool statistics.
*/
@InterfaceAudience.Public
@InterfaceStability.Evolving
public class CachePoolStats {
public static class Builder {
private long bytesNeeded;
private long bytesCached;
private long filesAffected;
public Builder() {
}
public Builder setBytesNeeded(long bytesNeeded) {
this.bytesNeeded = bytesNeeded;
return this;
}
public Builder setBytesCached(long bytesCached) {
this.bytesCached = bytesCached;
return this;
}
public Builder setFilesAffected(long filesAffected) {
this.filesAffected = filesAffected;
return this;
}
public CachePoolStats build() {
return new CachePoolStats(bytesNeeded, bytesCached, filesAffected);
}
};
private final long bytesNeeded;
private final long bytesCached;
private final long filesAffected;
private CachePoolStats(long bytesNeeded, long bytesCached, long filesAffected) {
this.bytesNeeded = bytesNeeded;
this.bytesCached = bytesCached;
this.filesAffected = filesAffected;
}
public long getBytesNeeded() {
return bytesNeeded;
}
public long getBytesCached() {
return bytesNeeded;
}
public long getFilesAffected() {
return filesAffected;
}
public String toString() {
return new StringBuilder().append("{").
append("bytesNeeded:").append(bytesNeeded).
append(", bytesCached:").append(bytesCached).
append(", filesAffected:").append(filesAffected).
append("}").toString();
}
}

View File

@ -1178,6 +1178,6 @@ public interface ClientProtocol {
* @return A RemoteIterator which returns CachePool objects. * @return A RemoteIterator which returns CachePool objects.
*/ */
@Idempotent @Idempotent
public RemoteIterator<CachePoolInfo> listCachePools(String prevPool) public RemoteIterator<CachePoolEntry> listCachePools(String prevPool)
throws IOException; throws IOException;
} }

View File

@ -30,6 +30,7 @@ import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry; import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry;
import org.apache.hadoop.hdfs.protocol.CachePoolInfo; import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
import org.apache.hadoop.hdfs.protocol.CachePoolEntry;
import org.apache.hadoop.hdfs.protocol.ClientProtocol; import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.protocol.CorruptFileBlocks; import org.apache.hadoop.hdfs.protocol.CorruptFileBlocks;
import org.apache.hadoop.hdfs.protocol.DirectoryListing; import org.apache.hadoop.hdfs.protocol.DirectoryListing;
@ -51,6 +52,8 @@ import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.AllowS
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.AllowSnapshotResponseProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.AllowSnapshotResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.AppendRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.AppendRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.AppendResponseProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.AppendResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CachePoolEntryProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CachePoolInfoProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CompleteRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CompleteRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CompleteResponseProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CompleteResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ConcatRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ConcatRequestProto;
@ -103,7 +106,6 @@ import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetSna
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.IsFileClosedRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.IsFileClosedRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.IsFileClosedResponseProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.IsFileClosedResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListCachePoolsRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListCachePoolsRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListCachePoolsResponseElementProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListCachePoolsResponseProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListCachePoolsResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListCorruptFileBlocksRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListCorruptFileBlocksRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListCorruptFileBlocksResponseProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListCorruptFileBlocksResponseProto;
@ -1136,18 +1138,15 @@ public class ClientNamenodeProtocolServerSideTranslatorPB implements
public ListCachePoolsResponseProto listCachePools(RpcController controller, public ListCachePoolsResponseProto listCachePools(RpcController controller,
ListCachePoolsRequestProto request) throws ServiceException { ListCachePoolsRequestProto request) throws ServiceException {
try { try {
RemoteIterator<CachePoolInfo> iter = RemoteIterator<CachePoolEntry> iter =
server.listCachePools(request.getPrevPoolName()); server.listCachePools(request.getPrevPoolName());
ListCachePoolsResponseProto.Builder responseBuilder = ListCachePoolsResponseProto.Builder responseBuilder =
ListCachePoolsResponseProto.newBuilder(); ListCachePoolsResponseProto.newBuilder();
String prevPoolName = null; String prevPoolName = null;
while (iter.hasNext()) { while (iter.hasNext()) {
CachePoolInfo pool = iter.next(); CachePoolEntry entry = iter.next();
ListCachePoolsResponseElementProto.Builder elemBuilder = responseBuilder.addEntries(PBHelper.convert(entry));
ListCachePoolsResponseElementProto.newBuilder(); prevPoolName = entry.getInfo().getPoolName();
elemBuilder.setInfo(PBHelper.convert(pool));
responseBuilder.addElements(elemBuilder.build());
prevPoolName = pool.getPoolName();
} }
// fill in hasNext // fill in hasNext
if (prevPoolName == null) { if (prevPoolName == null) {

View File

@ -38,6 +38,7 @@ import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException; import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException;
import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry; import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry;
import org.apache.hadoop.hdfs.protocol.CachePoolInfo; import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
import org.apache.hadoop.hdfs.protocol.CachePoolEntry;
import org.apache.hadoop.hdfs.protocol.ClientProtocol; import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.protocol.CorruptFileBlocks; import org.apache.hadoop.hdfs.protocol.CorruptFileBlocks;
import org.apache.hadoop.hdfs.protocol.DSQuotaExceededException; import org.apache.hadoop.hdfs.protocol.DSQuotaExceededException;
@ -61,6 +62,7 @@ import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.AddCac
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.AllowSnapshotRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.AllowSnapshotRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.AppendRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.AppendRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.AppendResponseProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.AppendResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CachePoolEntryProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CompleteRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CompleteRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ConcatRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ConcatRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CreateRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CreateRequestProto;
@ -96,7 +98,6 @@ import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetSna
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetSnapshottableDirListingResponseProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetSnapshottableDirListingResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.IsFileClosedRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.IsFileClosedRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListCachePoolsRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListCachePoolsRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListCachePoolsResponseElementProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListCachePoolsResponseProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListCachePoolsResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListCorruptFileBlocksRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListCorruptFileBlocksRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListCacheDirectivesRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListCacheDirectivesRequestProto;
@ -1138,23 +1139,23 @@ public class ClientNamenodeProtocolTranslatorPB implements
} }
} }
private static class BatchedCachePoolInfo private static class BatchedCachePoolEntries
implements BatchedEntries<CachePoolInfo> { implements BatchedEntries<CachePoolEntry> {
private final ListCachePoolsResponseProto proto; private final ListCachePoolsResponseProto proto;
public BatchedCachePoolInfo(ListCachePoolsResponseProto proto) { public BatchedCachePoolEntries(ListCachePoolsResponseProto proto) {
this.proto = proto; this.proto = proto;
} }
@Override @Override
public CachePoolInfo get(int i) { public CachePoolEntry get(int i) {
ListCachePoolsResponseElementProto elem = proto.getElements(i); CachePoolEntryProto elem = proto.getEntries(i);
return PBHelper.convert(elem.getInfo()); return PBHelper.convert(elem);
} }
@Override @Override
public int size() { public int size() {
return proto.getElementsCount(); return proto.getEntriesCount();
} }
@Override @Override
@ -1162,19 +1163,19 @@ public class ClientNamenodeProtocolTranslatorPB implements
return proto.getHasMore(); return proto.getHasMore();
} }
} }
private class CachePoolIterator private class CachePoolIterator
extends BatchedRemoteIterator<String, CachePoolInfo> { extends BatchedRemoteIterator<String, CachePoolEntry> {
public CachePoolIterator(String prevKey) { public CachePoolIterator(String prevKey) {
super(prevKey); super(prevKey);
} }
@Override @Override
public BatchedEntries<CachePoolInfo> makeRequest(String prevKey) public BatchedEntries<CachePoolEntry> makeRequest(String prevKey)
throws IOException { throws IOException {
try { try {
return new BatchedCachePoolInfo( return new BatchedCachePoolEntries(
rpcProxy.listCachePools(null, rpcProxy.listCachePools(null,
ListCachePoolsRequestProto.newBuilder(). ListCachePoolsRequestProto.newBuilder().
setPrevPoolName(prevKey).build())); setPrevPoolName(prevKey).build()));
@ -1184,13 +1185,13 @@ public class ClientNamenodeProtocolTranslatorPB implements
} }
@Override @Override
public String elementToPrevKey(CachePoolInfo element) { public String elementToPrevKey(CachePoolEntry entry) {
return element.getPoolName(); return entry.getInfo().getPoolName();
} }
} }
@Override @Override
public RemoteIterator<CachePoolInfo> listCachePools(String prevKey) public RemoteIterator<CachePoolEntry> listCachePools(String prevKey)
throws IOException { throws IOException {
return new CachePoolIterator(prevKey); return new CachePoolIterator(prevKey);
} }

View File

@ -38,7 +38,9 @@ import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry; import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry;
import org.apache.hadoop.hdfs.protocol.CacheDirectiveStats; import org.apache.hadoop.hdfs.protocol.CacheDirectiveStats;
import org.apache.hadoop.hdfs.protocol.CachePoolEntry;
import org.apache.hadoop.hdfs.protocol.CachePoolInfo; import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
import org.apache.hadoop.hdfs.protocol.CachePoolStats;
import org.apache.hadoop.hdfs.protocol.ClientProtocol; import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.protocol.CorruptFileBlocks; import org.apache.hadoop.hdfs.protocol.CorruptFileBlocks;
import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.hdfs.protocol.DatanodeID;
@ -60,7 +62,9 @@ import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CacheDirectiveEntryProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CacheDirectiveEntryProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CacheDirectiveStatsProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CacheDirectiveStatsProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CachePoolEntryProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CachePoolInfoProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CachePoolInfoProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CachePoolStatsProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CreateFlagProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CreateFlagProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.DatanodeReportTypeProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.DatanodeReportTypeProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetFsStatsResponseProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetFsStatsResponseProto;
@ -1678,6 +1682,35 @@ public class PBHelper {
return info; return info;
} }
public static CachePoolStatsProto convert(CachePoolStats stats) {
CachePoolStatsProto.Builder builder = CachePoolStatsProto.newBuilder();
builder.setBytesNeeded(stats.getBytesNeeded());
builder.setBytesCached(stats.getBytesCached());
builder.setFilesAffected(stats.getFilesAffected());
return builder.build();
}
public static CachePoolStats convert (CachePoolStatsProto proto) {
CachePoolStats.Builder builder = new CachePoolStats.Builder();
builder.setBytesNeeded(proto.getBytesNeeded());
builder.setBytesCached(proto.getBytesCached());
builder.setFilesAffected(proto.getFilesAffected());
return builder.build();
}
public static CachePoolEntryProto convert(CachePoolEntry entry) {
CachePoolEntryProto.Builder builder = CachePoolEntryProto.newBuilder();
builder.setInfo(PBHelper.convert(entry.getInfo()));
builder.setStats(PBHelper.convert(entry.getStats()));
return builder.build();
}
public static CachePoolEntry convert (CachePoolEntryProto proto) {
CachePoolInfo info = PBHelper.convert(proto.getInfo());
CachePoolStats stats = PBHelper.convert(proto.getStats());
return new CachePoolEntry(info, stats);
}
public static HdfsProtos.ChecksumTypeProto convert(DataChecksum.Type type) { public static HdfsProtos.ChecksumTypeProto convert(DataChecksum.Type type) {
return HdfsProtos.ChecksumTypeProto.valueOf(type.id); return HdfsProtos.ChecksumTypeProto.valueOf(type.id);
} }

View File

@ -208,8 +208,6 @@ public class CacheReplicationMonitor extends Thread implements Closeable {
/** /**
* Scan all CacheDirectives. Use the information to figure out * Scan all CacheDirectives. Use the information to figure out
* what cache replication factor each block should have. * what cache replication factor each block should have.
*
* @param mark Whether the current scan is setting or clearing the mark
*/ */
private void rescanCacheDirectives() { private void rescanCacheDirectives() {
FSDirectory fsDir = namesystem.getFSDirectory(); FSDirectory fsDir = namesystem.getFSDirectory();
@ -301,7 +299,7 @@ public class CacheReplicationMonitor extends Thread implements Closeable {
pce.addBytesNeeded(neededTotal); pce.addBytesNeeded(neededTotal);
pce.addBytesCached(cachedTotal); pce.addBytesCached(cachedTotal);
if (LOG.isTraceEnabled()) { if (LOG.isTraceEnabled()) {
LOG.debug("Directive " + pce.getEntryId() + " is caching " + LOG.debug("Directive " + pce.getId() + " is caching " +
file.getFullPathName() + ": " + cachedTotal + "/" + neededTotal); file.getFullPathName() + ": " + cachedTotal + "/" + neededTotal);
} }
} }

View File

@ -42,6 +42,12 @@ public interface DatanodeStatistics {
/** @return the percentage of the block pool used space over the total capacity. */ /** @return the percentage of the block pool used space over the total capacity. */
public float getPercentBlockPoolUsed(); public float getPercentBlockPoolUsed();
/** @return the total cache capacity of all DataNodes */
public long getCacheCapacity();
/** @return the total cache used by all DataNodes */
public long getCacheUsed();
/** @return the xceiver count */ /** @return the xceiver count */
public int getXceiverCount(); public int getXceiverCount();

View File

@ -148,6 +148,17 @@ class HeartbeatManager implements DatanodeStatistics {
public synchronized int getXceiverCount() { public synchronized int getXceiverCount() {
return stats.xceiverCount; return stats.xceiverCount;
} }
@Override
public synchronized long getCacheCapacity() {
return stats.cacheCapacity;
}
@Override
public synchronized long getCacheUsed() {
return stats.cacheUsed;
}
@Override @Override
public synchronized long[] getStats() { public synchronized long[] getStats() {
@ -308,6 +319,8 @@ class HeartbeatManager implements DatanodeStatistics {
private long capacityRemaining = 0L; private long capacityRemaining = 0L;
private long blockPoolUsed = 0L; private long blockPoolUsed = 0L;
private int xceiverCount = 0; private int xceiverCount = 0;
private long cacheCapacity = 0L;
private long cacheUsed = 0L;
private int expiredHeartbeats = 0; private int expiredHeartbeats = 0;
@ -321,6 +334,8 @@ class HeartbeatManager implements DatanodeStatistics {
} else { } else {
capacityTotal += node.getDfsUsed(); capacityTotal += node.getDfsUsed();
} }
cacheCapacity += node.getCacheCapacity();
cacheUsed += node.getCacheUsed();
} }
private void subtract(final DatanodeDescriptor node) { private void subtract(final DatanodeDescriptor node) {
@ -333,6 +348,8 @@ class HeartbeatManager implements DatanodeStatistics {
} else { } else {
capacityTotal -= node.getDfsUsed(); capacityTotal -= node.getDfsUsed();
} }
cacheCapacity -= node.getCacheCapacity();
cacheUsed -= node.getCacheUsed();
} }
/** Increment expired heartbeat counter. */ /** Increment expired heartbeat counter. */

View File

@ -145,6 +145,8 @@ public class FsDatasetCache {
*/ */
private final HashMap<Key, Value> mappableBlockMap = new HashMap<Key, Value>(); private final HashMap<Key, Value> mappableBlockMap = new HashMap<Key, Value>();
private final AtomicLong numBlocksCached = new AtomicLong(0);
private final FsDatasetImpl dataset; private final FsDatasetImpl dataset;
private final ThreadPoolExecutor uncachingExecutor; private final ThreadPoolExecutor uncachingExecutor;
@ -417,6 +419,7 @@ public class FsDatasetCache {
LOG.debug("Successfully cached block " + key.id + " in " + key.bpid + LOG.debug("Successfully cached block " + key.id + " in " + key.bpid +
". We are now caching " + newUsedBytes + " bytes in total."); ". We are now caching " + newUsedBytes + " bytes in total.");
} }
numBlocksCached.addAndGet(1);
success = true; success = true;
} finally { } finally {
if (!success) { if (!success) {
@ -465,6 +468,7 @@ public class FsDatasetCache {
} }
long newUsedBytes = long newUsedBytes =
usedBytesCount.release(value.mappableBlock.getLength()); usedBytesCount.release(value.mappableBlock.getLength());
numBlocksCached.addAndGet(-1);
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("Uncaching of block " + key.id + " in " + key.bpid + LOG.debug("Uncaching of block " + key.id + " in " + key.bpid +
" completed. usedBytes = " + newUsedBytes); " completed. usedBytes = " + newUsedBytes);
@ -477,14 +481,14 @@ public class FsDatasetCache {
/** /**
* Get the approximate amount of cache space used. * Get the approximate amount of cache space used.
*/ */
public long getDnCacheUsed() { public long getCacheUsed() {
return usedBytesCount.get(); return usedBytesCount.get();
} }
/** /**
* Get the maximum amount of bytes we can cache. This is a constant. * Get the maximum amount of bytes we can cache. This is a constant.
*/ */
public long getDnCacheCapacity() { public long getCacheCapacity() {
return maxBytes; return maxBytes;
} }
@ -496,4 +500,7 @@ public class FsDatasetCache {
return numBlocksFailedToUncache.get(); return numBlocksFailedToUncache.get();
} }
public long getNumBlocksCached() {
return numBlocksCached.get();
}
} }

View File

@ -292,12 +292,12 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
@Override // FSDatasetMBean @Override // FSDatasetMBean
public long getCacheUsed() { public long getCacheUsed() {
return cacheManager.getDnCacheUsed(); return cacheManager.getCacheUsed();
} }
@Override // FSDatasetMBean @Override // FSDatasetMBean
public long getCacheCapacity() { public long getCacheCapacity() {
return cacheManager.getDnCacheCapacity(); return cacheManager.getCacheCapacity();
} }
@Override // FSDatasetMBean @Override // FSDatasetMBean
@ -310,6 +310,11 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
return cacheManager.getNumBlocksFailedToUncache(); return cacheManager.getNumBlocksFailedToUncache();
} }
@Override // FSDatasetMBean
public long getNumBlocksCached() {
return cacheManager.getNumBlocksCached();
}
/** /**
* Find the block's on-disk length * Find the block's on-disk length
*/ */

View File

@ -88,6 +88,11 @@ public interface FSDatasetMBean {
*/ */
public long getCacheCapacity(); public long getCacheCapacity();
/**
* Returns the number of blocks cached.
*/
public long getNumBlocksCached();
/** /**
* Returns the number of blocks that the datanode was unable to cache * Returns the number of blocks that the datanode was unable to cache
*/ */

View File

@ -49,6 +49,7 @@ import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry; import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry;
import org.apache.hadoop.hdfs.protocol.CachePoolEntry;
import org.apache.hadoop.hdfs.protocol.CachePoolInfo; import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlock;
@ -99,24 +100,24 @@ public final class CacheManager {
private final BlockManager blockManager; private final BlockManager blockManager;
/** /**
* Cache entries, sorted by ID. * Cache directives, sorted by ID.
* *
* listCacheDirectives relies on the ordering of elements in this map * listCacheDirectives relies on the ordering of elements in this map
* to track what has already been listed by the client. * to track what has already been listed by the client.
*/ */
private final TreeMap<Long, CacheDirective> entriesById = private final TreeMap<Long, CacheDirective> directivesById =
new TreeMap<Long, CacheDirective>(); new TreeMap<Long, CacheDirective>();
/** /**
* The entry ID to use for a new entry. Entry IDs always increase, and are * The directive ID to use for a new directive. IDs always increase, and are
* never reused. * never reused.
*/ */
private long nextEntryId; private long nextDirectiveId;
/** /**
* Cache entries, sorted by path * Cache directives, sorted by path
*/ */
private final TreeMap<String, List<CacheDirective>> entriesByPath = private final TreeMap<String, List<CacheDirective>> directivesByPath =
new TreeMap<String, List<CacheDirective>>(); new TreeMap<String, List<CacheDirective>>();
/** /**
@ -177,7 +178,7 @@ public final class CacheManager {
BlockManager blockManager) { BlockManager blockManager) {
this.namesystem = namesystem; this.namesystem = namesystem;
this.blockManager = blockManager; this.blockManager = blockManager;
this.nextEntryId = 1; this.nextDirectiveId = 1;
this.maxListCachePoolsResponses = conf.getInt( this.maxListCachePoolsResponses = conf.getInt(
DFS_NAMENODE_LIST_CACHE_POOLS_NUM_RESPONSES, DFS_NAMENODE_LIST_CACHE_POOLS_NUM_RESPONSES,
DFS_NAMENODE_LIST_CACHE_POOLS_NUM_RESPONSES_DEFAULT); DFS_NAMENODE_LIST_CACHE_POOLS_NUM_RESPONSES_DEFAULT);
@ -239,7 +240,7 @@ public final class CacheManager {
public TreeMap<Long, CacheDirective> getEntriesById() { public TreeMap<Long, CacheDirective> getEntriesById() {
assert namesystem.hasReadLock(); assert namesystem.hasReadLock();
return entriesById; return directivesById;
} }
@VisibleForTesting @VisibleForTesting
@ -250,10 +251,10 @@ public final class CacheManager {
private long getNextEntryId() throws IOException { private long getNextEntryId() throws IOException {
assert namesystem.hasWriteLock(); assert namesystem.hasWriteLock();
if (nextEntryId >= Long.MAX_VALUE - 1) { if (nextDirectiveId >= Long.MAX_VALUE - 1) {
throw new IOException("No more available IDs."); throw new IOException("No more available IDs.");
} }
return nextEntryId++; return nextDirectiveId++;
} }
// Helper getter / validation methods // Helper getter / validation methods
@ -301,7 +302,7 @@ public final class CacheManager {
} }
/** /**
* Get a CacheDirective by ID, validating the ID and that the entry * Get a CacheDirective by ID, validating the ID and that the directive
* exists. * exists.
*/ */
private CacheDirective getById(long id) throws InvalidRequestException { private CacheDirective getById(long id) throws InvalidRequestException {
@ -309,13 +310,13 @@ public final class CacheManager {
if (id <= 0) { if (id <= 0) {
throw new InvalidRequestException("Invalid negative ID."); throw new InvalidRequestException("Invalid negative ID.");
} }
// Find the entry. // Find the directive.
CacheDirective entry = entriesById.get(id); CacheDirective directive = directivesById.get(id);
if (entry == null) { if (directive == null) {
throw new InvalidRequestException("No directive with ID " + id throw new InvalidRequestException("No directive with ID " + id
+ " found."); + " found.");
} }
return entry; return directive;
} }
/** /**
@ -332,32 +333,34 @@ public final class CacheManager {
// RPC handlers // RPC handlers
private void addInternal(CacheDirective entry) { private void addInternal(CacheDirective directive, CachePool pool) {
entriesById.put(entry.getEntryId(), entry); boolean addedDirective = pool.getDirectiveList().add(directive);
String path = entry.getPath(); assert addedDirective;
List<CacheDirective> entryList = entriesByPath.get(path); directivesById.put(directive.getId(), directive);
if (entryList == null) { String path = directive.getPath();
entryList = new ArrayList<CacheDirective>(1); List<CacheDirective> directives = directivesByPath.get(path);
entriesByPath.put(path, entryList); if (directives == null) {
directives = new ArrayList<CacheDirective>(1);
directivesByPath.put(path, directives);
} }
entryList.add(entry); directives.add(directive);
} }
public CacheDirectiveInfo addDirective( public CacheDirectiveInfo addDirective(
CacheDirectiveInfo directive, FSPermissionChecker pc) CacheDirectiveInfo info, FSPermissionChecker pc)
throws IOException { throws IOException {
assert namesystem.hasWriteLock(); assert namesystem.hasWriteLock();
CacheDirective entry; CacheDirective directive;
try { try {
CachePool pool = getCachePool(validatePoolName(directive)); CachePool pool = getCachePool(validatePoolName(info));
checkWritePermission(pc, pool); checkWritePermission(pc, pool);
String path = validatePath(directive); String path = validatePath(info);
short replication = validateReplication(directive, (short)1); short replication = validateReplication(info, (short)1);
long id; long id;
if (directive.getId() != null) { if (info.getId() != null) {
// We are loading an entry from the edit log. // We are loading a directive from the edit log.
// Use the ID from the edit log. // Use the ID from the edit log.
id = directive.getId(); id = info.getId();
if (id <= 0) { if (id <= 0) {
throw new InvalidRequestException("can't add an ID " + throw new InvalidRequestException("can't add an ID " +
"of " + id + ": it is not positive."); "of " + id + ": it is not positive.");
@ -366,88 +369,90 @@ public final class CacheManager {
throw new InvalidRequestException("can't add an ID " + throw new InvalidRequestException("can't add an ID " +
"of " + id + ": it is too big."); "of " + id + ": it is too big.");
} }
if (nextEntryId <= id) { if (nextDirectiveId <= id) {
nextEntryId = id + 1; nextDirectiveId = id + 1;
} }
} else { } else {
// Add a new entry with the next available ID. // Add a new directive with the next available ID.
id = getNextEntryId(); id = getNextEntryId();
} }
entry = new CacheDirective(id, path, replication, pool); directive = new CacheDirective(id, path, replication);
addInternal(entry); addInternal(directive, pool);
} catch (IOException e) { } catch (IOException e) {
LOG.warn("addDirective of " + directive + " failed: ", e); LOG.warn("addDirective of " + info + " failed: ", e);
throw e; throw e;
} }
LOG.info("addDirective of " + directive + " successful."); LOG.info("addDirective of " + info + " successful.");
if (monitor != null) { if (monitor != null) {
monitor.kick(); monitor.kick();
} }
return entry.toDirective(); return directive.toInfo();
} }
public void modifyDirective(CacheDirectiveInfo directive, public void modifyDirective(CacheDirectiveInfo info,
FSPermissionChecker pc) throws IOException { FSPermissionChecker pc) throws IOException {
assert namesystem.hasWriteLock(); assert namesystem.hasWriteLock();
String idString = String idString =
(directive.getId() == null) ? (info.getId() == null) ?
"(null)" : directive.getId().toString(); "(null)" : info.getId().toString();
try { try {
// Check for invalid IDs. // Check for invalid IDs.
Long id = directive.getId(); Long id = info.getId();
if (id == null) { if (id == null) {
throw new InvalidRequestException("Must supply an ID."); throw new InvalidRequestException("Must supply an ID.");
} }
CacheDirective prevEntry = getById(id); CacheDirective prevEntry = getById(id);
checkWritePermission(pc, prevEntry.getPool()); checkWritePermission(pc, prevEntry.getPool());
String path = prevEntry.getPath(); String path = prevEntry.getPath();
if (directive.getPath() != null) { if (info.getPath() != null) {
path = validatePath(directive); path = validatePath(info);
} }
short replication = prevEntry.getReplication(); short replication = prevEntry.getReplication();
if (directive.getReplication() != null) { if (info.getReplication() != null) {
replication = validateReplication(directive, replication); replication = validateReplication(info, replication);
} }
CachePool pool = prevEntry.getPool(); CachePool pool = prevEntry.getPool();
if (directive.getPool() != null) { if (info.getPool() != null) {
pool = getCachePool(validatePoolName(directive)); pool = getCachePool(validatePoolName(info));
checkWritePermission(pc, pool); checkWritePermission(pc, pool);
} }
removeInternal(prevEntry); removeInternal(prevEntry);
CacheDirective newEntry = CacheDirective newEntry =
new CacheDirective(id, path, replication, pool); new CacheDirective(id, path, replication);
addInternal(newEntry); addInternal(newEntry, pool);
} catch (IOException e) { } catch (IOException e) {
LOG.warn("modifyDirective of " + idString + " failed: ", e); LOG.warn("modifyDirective of " + idString + " failed: ", e);
throw e; throw e;
} }
LOG.info("modifyDirective of " + idString + " successfully applied " + LOG.info("modifyDirective of " + idString + " successfully applied " +
directive + "."); info+ ".");
} }
public void removeInternal(CacheDirective existing) public void removeInternal(CacheDirective directive)
throws InvalidRequestException { throws InvalidRequestException {
assert namesystem.hasWriteLock(); assert namesystem.hasWriteLock();
// Remove the corresponding entry in entriesByPath. // Remove the corresponding entry in directivesByPath.
String path = existing.getPath(); String path = directive.getPath();
List<CacheDirective> entries = entriesByPath.get(path); List<CacheDirective> directives = directivesByPath.get(path);
if (entries == null || !entries.remove(existing)) { if (directives == null || !directives.remove(directive)) {
throw new InvalidRequestException("Failed to locate entry " + throw new InvalidRequestException("Failed to locate entry " +
existing.getEntryId() + " by path " + existing.getPath()); directive.getId() + " by path " + directive.getPath());
} }
if (entries.size() == 0) { if (directives.size() == 0) {
entriesByPath.remove(path); directivesByPath.remove(path);
} }
entriesById.remove(existing.getEntryId()); directivesById.remove(directive.getId());
directive.getPool().getDirectiveList().remove(directive);
assert directive.getPool() == null;
} }
public void removeDirective(long id, FSPermissionChecker pc) public void removeDirective(long id, FSPermissionChecker pc)
throws IOException { throws IOException {
assert namesystem.hasWriteLock(); assert namesystem.hasWriteLock();
try { try {
CacheDirective existing = getById(id); CacheDirective directive = getById(id);
checkWritePermission(pc, existing.getPool()); checkWritePermission(pc, directive.getPool());
removeInternal(existing); removeInternal(directive);
} catch (IOException e) { } catch (IOException e) {
LOG.warn("removeDirective of " + id + " failed: ", e); LOG.warn("removeDirective of " + id + " failed: ", e);
throw e; throw e;
@ -478,13 +483,13 @@ public final class CacheManager {
new ArrayList<CacheDirectiveEntry>(NUM_PRE_ALLOCATED_ENTRIES); new ArrayList<CacheDirectiveEntry>(NUM_PRE_ALLOCATED_ENTRIES);
int numReplies = 0; int numReplies = 0;
SortedMap<Long, CacheDirective> tailMap = SortedMap<Long, CacheDirective> tailMap =
entriesById.tailMap(prevId + 1); directivesById.tailMap(prevId + 1);
for (Entry<Long, CacheDirective> cur : tailMap.entrySet()) { for (Entry<Long, CacheDirective> cur : tailMap.entrySet()) {
if (numReplies >= maxListCacheDirectivesNumResponses) { if (numReplies >= maxListCacheDirectivesNumResponses) {
return new BatchedListEntries<CacheDirectiveEntry>(replies, true); return new BatchedListEntries<CacheDirectiveEntry>(replies, true);
} }
CacheDirective curEntry = cur.getValue(); CacheDirective curDirective = cur.getValue();
CacheDirectiveInfo info = cur.getValue().toDirective(); CacheDirectiveInfo info = cur.getValue().toInfo();
if (filter.getPool() != null && if (filter.getPool() != null &&
!info.getPool().equals(filter.getPool())) { !info.getPool().equals(filter.getPool())) {
continue; continue;
@ -496,7 +501,7 @@ public final class CacheManager {
boolean hasPermission = true; boolean hasPermission = true;
if (pc != null) { if (pc != null) {
try { try {
pc.checkPermission(curEntry.getPool(), FsAction.READ); pc.checkPermission(curDirective.getPool(), FsAction.READ);
} catch (AccessControlException e) { } catch (AccessControlException e) {
hasPermission = false; hasPermission = false;
} }
@ -530,7 +535,7 @@ public final class CacheManager {
pool = CachePool.createFromInfoAndDefaults(info); pool = CachePool.createFromInfoAndDefaults(info);
cachePools.put(pool.getPoolName(), pool); cachePools.put(pool.getPoolName(), pool);
LOG.info("Created new cache pool " + pool); LOG.info("Created new cache pool " + pool);
return pool.getInfo(null); return pool.getInfo(true);
} }
/** /**
@ -599,39 +604,34 @@ public final class CacheManager {
throw new InvalidRequestException( throw new InvalidRequestException(
"Cannot remove non-existent cache pool " + poolName); "Cannot remove non-existent cache pool " + poolName);
} }
// Remove all directives in this pool.
// Remove entries using this pool Iterator<CacheDirective> iter = pool.getDirectiveList().iterator();
// TODO: could optimize this somewhat to avoid the need to iterate
// over all entries in entriesById
Iterator<Entry<Long, CacheDirective>> iter =
entriesById.entrySet().iterator();
while (iter.hasNext()) { while (iter.hasNext()) {
Entry<Long, CacheDirective> entry = iter.next(); CacheDirective directive = iter.next();
if (entry.getValue().getPool() == pool) { directivesByPath.remove(directive.getPath());
entriesByPath.remove(entry.getValue().getPath()); directivesById.remove(directive.getId());
iter.remove(); iter.remove();
}
} }
if (monitor != null) { if (monitor != null) {
monitor.kick(); monitor.kick();
} }
} }
public BatchedListEntries<CachePoolInfo> public BatchedListEntries<CachePoolEntry>
listCachePools(FSPermissionChecker pc, String prevKey) { listCachePools(FSPermissionChecker pc, String prevKey) {
assert namesystem.hasReadLock(); assert namesystem.hasReadLock();
final int NUM_PRE_ALLOCATED_ENTRIES = 16; final int NUM_PRE_ALLOCATED_ENTRIES = 16;
ArrayList<CachePoolInfo> results = ArrayList<CachePoolEntry> results =
new ArrayList<CachePoolInfo>(NUM_PRE_ALLOCATED_ENTRIES); new ArrayList<CachePoolEntry>(NUM_PRE_ALLOCATED_ENTRIES);
SortedMap<String, CachePool> tailMap = cachePools.tailMap(prevKey, false); SortedMap<String, CachePool> tailMap = cachePools.tailMap(prevKey, false);
int numListed = 0; int numListed = 0;
for (Entry<String, CachePool> cur : tailMap.entrySet()) { for (Entry<String, CachePool> cur : tailMap.entrySet()) {
if (numListed++ >= maxListCachePoolsResponses) { if (numListed++ >= maxListCachePoolsResponses) {
return new BatchedListEntries<CachePoolInfo>(results, true); return new BatchedListEntries<CachePoolEntry>(results, true);
} }
results.add(cur.getValue().getInfo(pc)); results.add(cur.getValue().getEntry(pc));
} }
return new BatchedListEntries<CachePoolInfo>(results, false); return new BatchedListEntries<CachePoolEntry>(results, false);
} }
public void setCachedLocations(LocatedBlock block) { public void setCachedLocations(LocatedBlock block) {
@ -693,13 +693,6 @@ public final class CacheManager {
for (Iterator<Long> iter = blockIds.iterator(); iter.hasNext(); ) { for (Iterator<Long> iter = blockIds.iterator(); iter.hasNext(); ) {
Block block = new Block(iter.next()); Block block = new Block(iter.next());
BlockInfo blockInfo = blockManager.getStoredBlock(block); BlockInfo blockInfo = blockManager.getStoredBlock(block);
if (blockInfo.getGenerationStamp() < block.getGenerationStamp()) {
// The NameNode will eventually remove or update the out-of-date block.
// Until then, we pretend that it isn't cached.
LOG.warn("Genstamp in cache report disagrees with our genstamp for " +
block + ": expected genstamp " + blockInfo.getGenerationStamp());
continue;
}
if (!blockInfo.isComplete()) { if (!blockInfo.isComplete()) {
LOG.warn("Ignoring block id " + block.getBlockId() + ", because " + LOG.warn("Ignoring block id " + block.getBlockId() + ", because " +
"it is in not complete yet. It is in state " + "it is in not complete yet. It is in state " +
@ -743,9 +736,9 @@ public final class CacheManager {
*/ */
public void saveState(DataOutput out, String sdPath) public void saveState(DataOutput out, String sdPath)
throws IOException { throws IOException {
out.writeLong(nextEntryId); out.writeLong(nextDirectiveId);
savePools(out, sdPath); savePools(out, sdPath);
saveEntries(out, sdPath); saveDirectives(out, sdPath);
} }
/** /**
@ -755,10 +748,10 @@ public final class CacheManager {
* @throws IOException * @throws IOException
*/ */
public void loadState(DataInput in) throws IOException { public void loadState(DataInput in) throws IOException {
nextEntryId = in.readLong(); nextDirectiveId = in.readLong();
// pools need to be loaded first since entries point to their parent pool // pools need to be loaded first since directives point to their parent pool
loadPools(in); loadPools(in);
loadEntries(in); loadDirectives(in);
} }
/** /**
@ -773,7 +766,7 @@ public final class CacheManager {
Counter counter = prog.getCounter(Phase.SAVING_CHECKPOINT, step); Counter counter = prog.getCounter(Phase.SAVING_CHECKPOINT, step);
out.writeInt(cachePools.size()); out.writeInt(cachePools.size());
for (CachePool pool: cachePools.values()) { for (CachePool pool: cachePools.values()) {
pool.getInfo(null).writeTo(out); pool.getInfo(true).writeTo(out);
counter.increment(); counter.increment();
} }
prog.endStep(Phase.SAVING_CHECKPOINT, step); prog.endStep(Phase.SAVING_CHECKPOINT, step);
@ -782,19 +775,19 @@ public final class CacheManager {
/* /*
* Save cache entries to fsimage * Save cache entries to fsimage
*/ */
private void saveEntries(DataOutput out, String sdPath) private void saveDirectives(DataOutput out, String sdPath)
throws IOException { throws IOException {
StartupProgress prog = NameNode.getStartupProgress(); StartupProgress prog = NameNode.getStartupProgress();
Step step = new Step(StepType.CACHE_ENTRIES, sdPath); Step step = new Step(StepType.CACHE_ENTRIES, sdPath);
prog.beginStep(Phase.SAVING_CHECKPOINT, step); prog.beginStep(Phase.SAVING_CHECKPOINT, step);
prog.setTotal(Phase.SAVING_CHECKPOINT, step, entriesById.size()); prog.setTotal(Phase.SAVING_CHECKPOINT, step, directivesById.size());
Counter counter = prog.getCounter(Phase.SAVING_CHECKPOINT, step); Counter counter = prog.getCounter(Phase.SAVING_CHECKPOINT, step);
out.writeInt(entriesById.size()); out.writeInt(directivesById.size());
for (CacheDirective entry: entriesById.values()) { for (CacheDirective directive : directivesById.values()) {
out.writeLong(entry.getEntryId()); out.writeLong(directive.getId());
Text.writeString(out, entry.getPath()); Text.writeString(out, directive.getPath());
out.writeShort(entry.getReplication()); out.writeShort(directive.getReplication());
Text.writeString(out, entry.getPool().getPoolName()); Text.writeString(out, directive.getPool().getPoolName());
counter.increment(); counter.increment();
} }
prog.endStep(Phase.SAVING_CHECKPOINT, step); prog.endStep(Phase.SAVING_CHECKPOINT, step);
@ -819,38 +812,41 @@ public final class CacheManager {
} }
/** /**
* Load cache entries from the fsimage * Load cache directives from the fsimage
*/ */
private void loadEntries(DataInput in) throws IOException { private void loadDirectives(DataInput in) throws IOException {
StartupProgress prog = NameNode.getStartupProgress(); StartupProgress prog = NameNode.getStartupProgress();
Step step = new Step(StepType.CACHE_ENTRIES); Step step = new Step(StepType.CACHE_ENTRIES);
prog.beginStep(Phase.LOADING_FSIMAGE, step); prog.beginStep(Phase.LOADING_FSIMAGE, step);
int numberOfEntries = in.readInt(); int numDirectives = in.readInt();
prog.setTotal(Phase.LOADING_FSIMAGE, step, numberOfEntries); prog.setTotal(Phase.LOADING_FSIMAGE, step, numDirectives);
Counter counter = prog.getCounter(Phase.LOADING_FSIMAGE, step); Counter counter = prog.getCounter(Phase.LOADING_FSIMAGE, step);
for (int i = 0; i < numberOfEntries; i++) { for (int i = 0; i < numDirectives; i++) {
long entryId = in.readLong(); long directiveId = in.readLong();
String path = Text.readString(in); String path = Text.readString(in);
short replication = in.readShort(); short replication = in.readShort();
String poolName = Text.readString(in); String poolName = Text.readString(in);
// Get pool reference by looking it up in the map // Get pool reference by looking it up in the map
CachePool pool = cachePools.get(poolName); CachePool pool = cachePools.get(poolName);
if (pool == null) { if (pool == null) {
throw new IOException("Entry refers to pool " + poolName + throw new IOException("Directive refers to pool " + poolName +
", which does not exist."); ", which does not exist.");
} }
CacheDirective entry = CacheDirective directive =
new CacheDirective(entryId, path, replication, pool); new CacheDirective(directiveId, path, replication);
if (entriesById.put(entry.getEntryId(), entry) != null) { boolean addedDirective = pool.getDirectiveList().add(directive);
throw new IOException("An entry with ID " + entry.getEntryId() + assert addedDirective;
if (directivesById.put(directive.getId(), directive) != null) {
throw new IOException("A directive with ID " + directive.getId() +
" already exists"); " already exists");
} }
List<CacheDirective> entries = entriesByPath.get(entry.getPath()); List<CacheDirective> directives =
if (entries == null) { directivesByPath.get(directive.getPath());
entries = new LinkedList<CacheDirective>(); if (directives == null) {
entriesByPath.put(entry.getPath(), entries); directives = new LinkedList<CacheDirective>();
directivesByPath.put(directive.getPath(), directives);
} }
entries.add(entry); directives.add(directive);
counter.increment(); counter.increment();
} }
prog.endStep(Phase.LOADING_FSIMAGE, step); prog.endStep(Phase.LOADING_FSIMAGE, step);

View File

@ -26,9 +26,13 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.fs.permission.FsAction; import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.protocol.CacheDirective;
import org.apache.hadoop.hdfs.protocol.CachePoolEntry;
import org.apache.hadoop.hdfs.protocol.CachePoolInfo; import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
import org.apache.hadoop.hdfs.protocol.CachePoolStats;
import org.apache.hadoop.security.AccessControlException; import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.IntrusiveCollection;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
@ -69,6 +73,22 @@ public final class CachePool {
private int weight; private int weight;
public final static class DirectiveList
extends IntrusiveCollection<CacheDirective> {
private CachePool cachePool;
private DirectiveList(CachePool cachePool) {
this.cachePool = cachePool;
}
public CachePool getCachePool() {
return cachePool;
}
}
@Nonnull
private final DirectiveList directiveList = new DirectiveList(this);
/** /**
* Create a new cache pool based on a CachePoolInfo object and the defaults. * Create a new cache pool based on a CachePoolInfo object and the defaults.
* We will fill in information that was not supplied according to the * We will fill in information that was not supplied according to the
@ -171,7 +191,7 @@ public final class CachePool {
* @return * @return
* Cache pool information. * Cache pool information.
*/ */
private CachePoolInfo getInfo(boolean fullInfo) { CachePoolInfo getInfo(boolean fullInfo) {
CachePoolInfo info = new CachePoolInfo(poolName); CachePoolInfo info = new CachePoolInfo(poolName);
if (!fullInfo) { if (!fullInfo) {
return info; return info;
@ -182,6 +202,19 @@ public final class CachePool {
setWeight(weight); setWeight(weight);
} }
/**
* Get statistics about this CachePool.
*
* @return Cache pool statistics.
*/
private CachePoolStats getStats() {
return new CachePoolStats.Builder().
setBytesNeeded(0).
setBytesCached(0).
setFilesAffected(0).
build();
}
/** /**
* Returns a CachePoolInfo describing this CachePool based on the permissions * Returns a CachePoolInfo describing this CachePool based on the permissions
* of the calling user. Unprivileged users will see only minimal descriptive * of the calling user. Unprivileged users will see only minimal descriptive
@ -189,9 +222,9 @@ public final class CachePool {
* *
* @param pc Permission checker to be used to validate the user's permissions, * @param pc Permission checker to be used to validate the user's permissions,
* or null * or null
* @return CachePoolInfo describing this CachePool * @return CachePoolEntry describing this CachePool
*/ */
public CachePoolInfo getInfo(FSPermissionChecker pc) { public CachePoolEntry getEntry(FSPermissionChecker pc) {
boolean hasPermission = true; boolean hasPermission = true;
if (pc != null) { if (pc != null) {
try { try {
@ -200,7 +233,8 @@ public final class CachePool {
hasPermission = false; hasPermission = false;
} }
} }
return getInfo(hasPermission); return new CachePoolEntry(getInfo(hasPermission),
hasPermission ? getStats() : new CachePoolStats.Builder().build());
} }
public String toString() { public String toString() {
@ -212,4 +246,8 @@ public final class CachePool {
append(", weight:").append(weight). append(", weight:").append(weight).
append(" }").toString(); append(" }").toString();
} }
public DirectiveList getDirectiveList() {
return directiveList;
}
} }

View File

@ -164,6 +164,7 @@ import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks; import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.protocol.CachePoolEntry;
import org.apache.hadoop.hdfs.protocol.CachePoolInfo; import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
import org.apache.hadoop.hdfs.protocol.QuotaExceededException; import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
import org.apache.hadoop.hdfs.protocol.RecoveryInProgressException; import org.apache.hadoop.hdfs.protocol.RecoveryInProgressException;
@ -6429,6 +6430,16 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
return datanodeStatistics.getCapacityRemainingPercent(); return datanodeStatistics.getCapacityRemainingPercent();
} }
@Override // NameNodeMXBean
public long getCacheCapacity() {
return datanodeStatistics.getCacheCapacity();
}
@Override // NameNodeMXBean
public long getCacheUsed() {
return datanodeStatistics.getCacheUsed();
}
@Override // NameNodeMXBean @Override // NameNodeMXBean
public long getTotalBlocks() { public long getTotalBlocks() {
return getBlocksTotal(); return getBlocksTotal();
@ -7285,11 +7296,11 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
getEditLog().logSync(); getEditLog().logSync();
} }
public BatchedListEntries<CachePoolInfo> listCachePools(String prevKey) public BatchedListEntries<CachePoolEntry> listCachePools(String prevKey)
throws IOException { throws IOException {
final FSPermissionChecker pc = final FSPermissionChecker pc =
isPermissionEnabled ? getPermissionChecker() : null; isPermissionEnabled ? getPermissionChecker() : null;
BatchedListEntries<CachePoolInfo> results; BatchedListEntries<CachePoolEntry> results;
checkOperation(OperationCategory.READ); checkOperation(OperationCategory.READ);
boolean success = false; boolean success = false;
readLock(); readLock();

View File

@ -101,6 +101,16 @@ public interface NameNodeMXBean {
* @return the percentage of the remaining space on the cluster * @return the percentage of the remaining space on the cluster
*/ */
public float getPercentRemaining(); public float getPercentRemaining();
/**
* Returns the amount of cache used by the datanode (in bytes).
*/
public long getCacheUsed();
/**
* Returns the total cache capacity of the datanode (in bytes).
*/
public long getCacheCapacity();
/** /**
* Get the total space used by the block pools of this namenode * Get the total space used by the block pools of this namenode

View File

@ -64,6 +64,7 @@ import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry; import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry;
import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo; import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo;
import org.apache.hadoop.hdfs.protocol.CachePoolInfo; import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
import org.apache.hadoop.hdfs.protocol.CachePoolEntry;
import org.apache.hadoop.hdfs.protocol.CorruptFileBlocks; import org.apache.hadoop.hdfs.protocol.CorruptFileBlocks;
import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
@ -1298,26 +1299,26 @@ class NameNodeRpcServer implements NamenodeProtocols {
} }
private class ServerSideCachePoolIterator private class ServerSideCachePoolIterator
extends BatchedRemoteIterator<String, CachePoolInfo> { extends BatchedRemoteIterator<String, CachePoolEntry> {
public ServerSideCachePoolIterator(String prevKey) { public ServerSideCachePoolIterator(String prevKey) {
super(prevKey); super(prevKey);
} }
@Override @Override
public BatchedEntries<CachePoolInfo> makeRequest(String prevKey) public BatchedEntries<CachePoolEntry> makeRequest(String prevKey)
throws IOException { throws IOException {
return namesystem.listCachePools(prevKey); return namesystem.listCachePools(prevKey);
} }
@Override @Override
public String elementToPrevKey(CachePoolInfo element) { public String elementToPrevKey(CachePoolEntry entry) {
return element.getPoolName(); return entry.getInfo().getPoolName();
} }
} }
@Override @Override
public RemoteIterator<CachePoolInfo> listCachePools(String prevKey) public RemoteIterator<CachePoolEntry> listCachePools(String prevKey)
throws IOException { throws IOException {
return new ServerSideCachePoolIterator(prevKey); return new ServerSideCachePoolIterator(prevKey);
} }

View File

@ -32,6 +32,7 @@ import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry; import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry;
import org.apache.hadoop.hdfs.protocol.CacheDirectiveStats; import org.apache.hadoop.hdfs.protocol.CacheDirectiveStats;
import org.apache.hadoop.hdfs.protocol.CachePoolEntry;
import org.apache.hadoop.hdfs.protocol.CachePoolInfo; import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo; import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo;
import org.apache.hadoop.hdfs.server.namenode.CachePool; import org.apache.hadoop.hdfs.server.namenode.CachePool;
@ -755,9 +756,10 @@ public class CacheAdmin extends Configured implements Tool {
build(); build();
int numResults = 0; int numResults = 0;
try { try {
RemoteIterator<CachePoolInfo> iter = dfs.listCachePools(); RemoteIterator<CachePoolEntry> iter = dfs.listCachePools();
while (iter.hasNext()) { while (iter.hasNext()) {
CachePoolInfo info = iter.next(); CachePoolEntry entry = iter.next();
CachePoolInfo info = entry.getInfo();
String[] row = new String[5]; String[] row = new String[5];
if (name == null || info.getPoolName().equals(name)) { if (name == null || info.getPoolName().equals(name)) {
row[0] = info.getPoolName(); row[0] = info.getPoolName();

View File

@ -421,6 +421,12 @@ message CachePoolInfoProto {
optional int32 weight = 5; optional int32 weight = 5;
} }
message CachePoolStatsProto {
required int64 bytesNeeded = 1;
required int64 bytesCached = 2;
required int64 filesAffected = 3;
}
message AddCachePoolRequestProto { message AddCachePoolRequestProto {
required CachePoolInfoProto info = 1; required CachePoolInfoProto info = 1;
} }
@ -447,12 +453,13 @@ message ListCachePoolsRequestProto {
} }
message ListCachePoolsResponseProto { message ListCachePoolsResponseProto {
repeated ListCachePoolsResponseElementProto elements = 1; repeated CachePoolEntryProto entries = 1;
required bool hasMore = 2; required bool hasMore = 2;
} }
message ListCachePoolsResponseElementProto { message CachePoolEntryProto {
required CachePoolInfoProto info = 1; required CachePoolInfoProto info = 1;
required CachePoolStatsProto stats = 2;
} }
message GetFileLinkInfoRequestProto { message GetFileLinkInfoRequestProto {

View File

@ -505,6 +505,11 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
return 0l; return 0l;
} }
@Override // FSDatasetMBean
public long getNumBlocksCached() {
return 0l;
}
@Override @Override
public long getNumBlocksFailedToCache() { public long getNumBlocksFailedToCache() {
return 0l; return 0l;

View File

@ -49,8 +49,8 @@ import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB; import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetCache;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetCache.PageRounder; import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetCache.PageRounder;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.MappableBlock;
import org.apache.hadoop.hdfs.server.namenode.EditLogFileOutputStream; import org.apache.hadoop.hdfs.server.namenode.EditLogFileOutputStream;
import org.apache.hadoop.hdfs.server.namenode.FSImage; import org.apache.hadoop.hdfs.server.namenode.FSImage;
import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.hdfs.server.namenode.NameNode;
@ -72,6 +72,8 @@ import org.junit.After;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import org.apache.log4j.Level;
import org.apache.log4j.LogManager;
import com.google.common.base.Supplier; import com.google.common.base.Supplier;
@ -95,6 +97,7 @@ public class TestFsDatasetCache {
static { static {
EditLogFileOutputStream.setShouldSkipFsyncForTesting(false); EditLogFileOutputStream.setShouldSkipFsyncForTesting(false);
LogManager.getLogger(FsDatasetCache.class).setLevel(Level.DEBUG);
} }
@Before @Before
@ -201,17 +204,21 @@ public class TestFsDatasetCache {
/** /**
* Blocks until cache usage hits the expected new value. * Blocks until cache usage hits the expected new value.
*/ */
private long verifyExpectedCacheUsage(final long expected) throws Exception { private long verifyExpectedCacheUsage(final long expectedCacheUsed,
final long expectedBlocks) throws Exception {
GenericTestUtils.waitFor(new Supplier<Boolean>() { GenericTestUtils.waitFor(new Supplier<Boolean>() {
private int tries = 0; private int tries = 0;
@Override @Override
public Boolean get() { public Boolean get() {
long curDnCacheUsed = fsd.getCacheUsed(); long curCacheUsed = fsd.getCacheUsed();
if (curDnCacheUsed != expected) { long curBlocks = fsd.getNumBlocksCached();
if ((curCacheUsed != expectedCacheUsed) ||
(curBlocks != expectedBlocks)) {
if (tries++ > 10) { if (tries++ > 10) {
LOG.info("verifyExpectedCacheUsage: expected " + LOG.info("verifyExpectedCacheUsage: have " +
expected + ", got " + curDnCacheUsed + "; " + curCacheUsed + "/" + expectedCacheUsed + " bytes cached; " +
curBlocks + "/" + expectedBlocks + " blocks cached. " +
"memlock limit = " + "memlock limit = " +
NativeIO.POSIX.getCacheManipulator().getMemlockLimit() + NativeIO.POSIX.getCacheManipulator().getMemlockLimit() +
". Waiting..."); ". Waiting...");
@ -221,14 +228,15 @@ public class TestFsDatasetCache {
return true; return true;
} }
}, 100, 60000); }, 100, 60000);
return expected; return expectedCacheUsed;
} }
private void testCacheAndUncacheBlock() throws Exception { private void testCacheAndUncacheBlock() throws Exception {
LOG.info("beginning testCacheAndUncacheBlock"); LOG.info("beginning testCacheAndUncacheBlock");
final int NUM_BLOCKS = 5; final int NUM_BLOCKS = 5;
verifyExpectedCacheUsage(0); verifyExpectedCacheUsage(0, 0);
assertEquals(0, fsd.getNumBlocksCached());
// Write a test file // Write a test file
final Path testFile = new Path("/testCacheBlock"); final Path testFile = new Path("/testCacheBlock");
@ -255,7 +263,7 @@ public class TestFsDatasetCache {
// Cache each block in succession, checking each time // Cache each block in succession, checking each time
for (int i=0; i<NUM_BLOCKS; i++) { for (int i=0; i<NUM_BLOCKS; i++) {
setHeartbeatResponse(cacheBlock(locs[i])); setHeartbeatResponse(cacheBlock(locs[i]));
current = verifyExpectedCacheUsage(current + blockSizes[i]); current = verifyExpectedCacheUsage(current + blockSizes[i], i + 1);
dnMetrics = getMetrics(dn.getMetrics().name()); dnMetrics = getMetrics(dn.getMetrics().name());
long cmds = MetricsAsserts.getLongCounter("BlocksCached", dnMetrics); long cmds = MetricsAsserts.getLongCounter("BlocksCached", dnMetrics);
assertTrue("Expected more cache requests from the NN (" assertTrue("Expected more cache requests from the NN ("
@ -267,7 +275,8 @@ public class TestFsDatasetCache {
// Uncache each block in succession, again checking each time // Uncache each block in succession, again checking each time
for (int i=0; i<NUM_BLOCKS; i++) { for (int i=0; i<NUM_BLOCKS; i++) {
setHeartbeatResponse(uncacheBlock(locs[i])); setHeartbeatResponse(uncacheBlock(locs[i]));
current = verifyExpectedCacheUsage(current - blockSizes[i]); current = verifyExpectedCacheUsage(current - blockSizes[i],
NUM_BLOCKS - 1 - i);
dnMetrics = getMetrics(dn.getMetrics().name()); dnMetrics = getMetrics(dn.getMetrics().name());
long cmds = MetricsAsserts.getLongCounter("BlocksUncached", dnMetrics); long cmds = MetricsAsserts.getLongCounter("BlocksUncached", dnMetrics);
assertTrue("Expected more uncache requests from the NN", assertTrue("Expected more uncache requests from the NN",
@ -334,10 +343,11 @@ public class TestFsDatasetCache {
// Cache the first n-1 files // Cache the first n-1 files
long total = 0; long total = 0;
verifyExpectedCacheUsage(0); verifyExpectedCacheUsage(0, 0);
for (int i=0; i<numFiles-1; i++) { for (int i=0; i<numFiles-1; i++) {
setHeartbeatResponse(cacheBlocks(fileLocs[i])); setHeartbeatResponse(cacheBlocks(fileLocs[i]));
total = verifyExpectedCacheUsage(rounder.round(total + fileSizes[i])); total = verifyExpectedCacheUsage(
rounder.round(total + fileSizes[i]), 4 * (i + 1));
} }
// nth file should hit a capacity exception // nth file should hit a capacity exception
@ -363,7 +373,7 @@ public class TestFsDatasetCache {
for (int i=0; i<numFiles-1; i++) { for (int i=0; i<numFiles-1; i++) {
setHeartbeatResponse(uncacheBlocks(fileLocs[i])); setHeartbeatResponse(uncacheBlocks(fileLocs[i]));
total -= rounder.round(fileSizes[i]); total -= rounder.round(fileSizes[i]);
verifyExpectedCacheUsage(total); verifyExpectedCacheUsage(total, 4 * (numFiles - 2 - i));
} }
LOG.info("finishing testFilesExceedMaxLockedMemory"); LOG.info("finishing testFilesExceedMaxLockedMemory");
} }
@ -373,7 +383,7 @@ public class TestFsDatasetCache {
LOG.info("beginning testUncachingBlocksBeforeCachingFinishes"); LOG.info("beginning testUncachingBlocksBeforeCachingFinishes");
final int NUM_BLOCKS = 5; final int NUM_BLOCKS = 5;
verifyExpectedCacheUsage(0); verifyExpectedCacheUsage(0, 0);
// Write a test file // Write a test file
final Path testFile = new Path("/testCacheBlock"); final Path testFile = new Path("/testCacheBlock");
@ -409,7 +419,7 @@ public class TestFsDatasetCache {
// should increase, even though caching doesn't complete on any of them. // should increase, even though caching doesn't complete on any of them.
for (int i=0; i<NUM_BLOCKS; i++) { for (int i=0; i<NUM_BLOCKS; i++) {
setHeartbeatResponse(cacheBlock(locs[i])); setHeartbeatResponse(cacheBlock(locs[i]));
current = verifyExpectedCacheUsage(current + blockSizes[i]); current = verifyExpectedCacheUsage(current + blockSizes[i], i + 1);
} }
setHeartbeatResponse(new DatanodeCommand[] { setHeartbeatResponse(new DatanodeCommand[] {
@ -417,7 +427,7 @@ public class TestFsDatasetCache {
}); });
// wait until all caching jobs are finished cancelling. // wait until all caching jobs are finished cancelling.
current = verifyExpectedCacheUsage(0); current = verifyExpectedCacheUsage(0, 0);
LOG.info("finishing testUncachingBlocksBeforeCachingFinishes"); LOG.info("finishing testUncachingBlocksBeforeCachingFinishes");
} }

View File

@ -55,6 +55,7 @@ import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry; import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry;
import org.apache.hadoop.hdfs.protocol.CacheDirectiveStats; import org.apache.hadoop.hdfs.protocol.CacheDirectiveStats;
import org.apache.hadoop.hdfs.protocol.CachePoolEntry;
import org.apache.hadoop.hdfs.protocol.CachePoolInfo; import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo; import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo;
import org.apache.hadoop.hdfs.server.blockmanagement.CacheReplicationMonitor; import org.apache.hadoop.hdfs.server.blockmanagement.CacheReplicationMonitor;
@ -263,8 +264,8 @@ public class TestCacheDirectives {
setOwnerName(ownerName).setGroupName(groupName). setOwnerName(ownerName).setGroupName(groupName).
setMode(mode).setWeight(weight)); setMode(mode).setWeight(weight));
RemoteIterator<CachePoolInfo> iter = dfs.listCachePools(); RemoteIterator<CachePoolEntry> iter = dfs.listCachePools();
CachePoolInfo info = iter.next(); CachePoolInfo info = iter.next().getInfo();
assertEquals(poolName, info.getPoolName()); assertEquals(poolName, info.getPoolName());
assertEquals(ownerName, info.getOwnerName()); assertEquals(ownerName, info.getOwnerName());
assertEquals(groupName, info.getGroupName()); assertEquals(groupName, info.getGroupName());
@ -278,7 +279,7 @@ public class TestCacheDirectives {
setMode(mode).setWeight(weight)); setMode(mode).setWeight(weight));
iter = dfs.listCachePools(); iter = dfs.listCachePools();
info = iter.next(); info = iter.next().getInfo();
assertEquals(poolName, info.getPoolName()); assertEquals(poolName, info.getPoolName());
assertEquals(ownerName, info.getOwnerName()); assertEquals(ownerName, info.getOwnerName());
assertEquals(groupName, info.getGroupName()); assertEquals(groupName, info.getGroupName());
@ -507,9 +508,9 @@ public class TestCacheDirectives {
.setGroupName(groupName) .setGroupName(groupName)
.setMode(mode) .setMode(mode)
.setWeight(weight)); .setWeight(weight));
RemoteIterator<CachePoolInfo> pit = dfs.listCachePools(); RemoteIterator<CachePoolEntry> pit = dfs.listCachePools();
assertTrue("No cache pools found", pit.hasNext()); assertTrue("No cache pools found", pit.hasNext());
CachePoolInfo info = pit.next(); CachePoolInfo info = pit.next().getInfo();
assertEquals(pool, info.getPoolName()); assertEquals(pool, info.getPoolName());
assertEquals(groupName, info.getGroupName()); assertEquals(groupName, info.getGroupName());
assertEquals(mode, info.getMode()); assertEquals(mode, info.getMode());
@ -542,7 +543,7 @@ public class TestCacheDirectives {
// Check that state came back up // Check that state came back up
pit = dfs.listCachePools(); pit = dfs.listCachePools();
assertTrue("No cache pools found", pit.hasNext()); assertTrue("No cache pools found", pit.hasNext());
info = pit.next(); info = pit.next().getInfo();
assertEquals(pool, info.getPoolName()); assertEquals(pool, info.getPoolName());
assertEquals(pool, info.getPoolName()); assertEquals(pool, info.getPoolName());
assertEquals(groupName, info.getGroupName()); assertEquals(groupName, info.getGroupName());
@ -713,7 +714,16 @@ public class TestCacheDirectives {
try { try {
cluster.waitActive(); cluster.waitActive();
DistributedFileSystem dfs = cluster.getFileSystem(); DistributedFileSystem dfs = cluster.getFileSystem();
NameNode namenode = cluster.getNameNode(); final NameNode namenode = cluster.getNameNode();
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
return ((namenode.getNamesystem().getCacheCapacity() ==
(NUM_DATANODES * CACHE_CAPACITY)) &&
(namenode.getNamesystem().getCacheUsed() == 0));
}
}, 500, 60000);
NamenodeProtocols nnRpc = namenode.getRpcServer(); NamenodeProtocols nnRpc = namenode.getRpcServer();
Path rootDir = helper.getDefaultWorkingDirectory(dfs); Path rootDir = helper.getDefaultWorkingDirectory(dfs);
// Create the pool // Create the pool
@ -967,8 +977,8 @@ public class TestCacheDirectives {
dfs.addCachePool(new CachePoolInfo(poolName) dfs.addCachePool(new CachePoolInfo(poolName)
.setMode(new FsPermission((short)0700))); .setMode(new FsPermission((short)0700)));
// Should only see partial info // Should only see partial info
RemoteIterator<CachePoolInfo> it = myDfs.listCachePools(); RemoteIterator<CachePoolEntry> it = myDfs.listCachePools();
CachePoolInfo info = it.next(); CachePoolInfo info = it.next().getInfo();
assertFalse(it.hasNext()); assertFalse(it.hasNext());
assertEquals("Expected pool name", poolName, info.getPoolName()); assertEquals("Expected pool name", poolName, info.getPoolName());
assertNull("Unexpected owner name", info.getOwnerName()); assertNull("Unexpected owner name", info.getOwnerName());
@ -981,7 +991,7 @@ public class TestCacheDirectives {
.setWeight(99)); .setWeight(99));
// Should see full info // Should see full info
it = myDfs.listCachePools(); it = myDfs.listCachePools();
info = it.next(); info = it.next().getInfo();
assertFalse(it.hasNext()); assertFalse(it.hasNext());
assertEquals("Expected pool name", poolName, info.getPoolName()); assertEquals("Expected pool name", poolName, info.getPoolName());
assertEquals("Mismatched owner name", myUser.getShortUserName(), assertEquals("Mismatched owner name", myUser.getShortUserName(),

View File

@ -31,7 +31,10 @@ import javax.management.ObjectName;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.io.nativeio.NativeIO;
import org.apache.hadoop.io.nativeio.NativeIO.POSIX.NoMlockCacheManipulator;
import org.apache.hadoop.util.VersionInfo; import org.apache.hadoop.util.VersionInfo;
import org.junit.Test; import org.junit.Test;
import org.mortbay.util.ajax.JSON; import org.mortbay.util.ajax.JSON;
@ -46,10 +49,16 @@ public class TestNameNodeMXBean {
*/ */
private static final double DELTA = 0.000001; private static final double DELTA = 0.000001;
static {
NativeIO.POSIX.setCacheManipulator(new NoMlockCacheManipulator());
}
@SuppressWarnings({ "unchecked" }) @SuppressWarnings({ "unchecked" })
@Test @Test
public void testNameNodeMXBeanInfo() throws Exception { public void testNameNodeMXBeanInfo() throws Exception {
Configuration conf = new Configuration(); Configuration conf = new Configuration();
conf.setLong(DFSConfigKeys.DFS_DATANODE_MAX_LOCKED_MEMORY_KEY,
NativeIO.POSIX.getCacheManipulator().getMemlockLimit());
MiniDFSCluster cluster = null; MiniDFSCluster cluster = null;
try { try {
@ -171,6 +180,10 @@ public class TestNameNodeMXBean {
} }
assertEquals(1, statusMap.get("active").size()); assertEquals(1, statusMap.get("active").size());
assertEquals(1, statusMap.get("failed").size()); assertEquals(1, statusMap.get("failed").size());
assertEquals(0L, mbs.getAttribute(mxbeanName, "CacheUsed"));
assertEquals(NativeIO.POSIX.getCacheManipulator().getMemlockLimit() *
cluster.getDataNodes().size(),
mbs.getAttribute(mxbeanName, "CacheCapacity"));
} finally { } finally {
if (cluster != null) { if (cluster != null) {
for (URI dir : cluster.getNameDirs(0)) { for (URI dir : cluster.getNameDirs(0)) {

View File

@ -54,6 +54,7 @@ import org.apache.hadoop.hdfs.MiniDFSNNTopology;
import org.apache.hadoop.hdfs.NameNodeProxies; import org.apache.hadoop.hdfs.NameNodeProxies;
import org.apache.hadoop.hdfs.client.HdfsDataOutputStream; import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
import org.apache.hadoop.hdfs.client.HdfsDataOutputStream.SyncFlag; import org.apache.hadoop.hdfs.client.HdfsDataOutputStream.SyncFlag;
import org.apache.hadoop.hdfs.protocol.CachePoolEntry;
import org.apache.hadoop.hdfs.protocol.CachePoolInfo; import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
import org.apache.hadoop.hdfs.protocol.ClientProtocol; import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
@ -904,7 +905,7 @@ public class TestRetryCacheWithHA {
@Override @Override
boolean checkNamenodeBeforeReturn() throws Exception { boolean checkNamenodeBeforeReturn() throws Exception {
for (int i = 0; i < CHECKTIMES; i++) { for (int i = 0; i < CHECKTIMES; i++) {
RemoteIterator<CachePoolInfo> iter = dfs.listCachePools(); RemoteIterator<CachePoolEntry> iter = dfs.listCachePools();
if (iter.hasNext()) { if (iter.hasNext()) {
return true; return true;
} }
@ -941,8 +942,8 @@ public class TestRetryCacheWithHA {
@Override @Override
boolean checkNamenodeBeforeReturn() throws Exception { boolean checkNamenodeBeforeReturn() throws Exception {
for (int i = 0; i < CHECKTIMES; i++) { for (int i = 0; i < CHECKTIMES; i++) {
RemoteIterator<CachePoolInfo> iter = dfs.listCachePools(); RemoteIterator<CachePoolEntry> iter = dfs.listCachePools();
if (iter.hasNext() && iter.next().getWeight() == 99) { if (iter.hasNext() && iter.next().getInfo().getWeight() == 99) {
return true; return true;
} }
Thread.sleep(1000); Thread.sleep(1000);
@ -978,7 +979,7 @@ public class TestRetryCacheWithHA {
@Override @Override
boolean checkNamenodeBeforeReturn() throws Exception { boolean checkNamenodeBeforeReturn() throws Exception {
for (int i = 0; i < CHECKTIMES; i++) { for (int i = 0; i < CHECKTIMES; i++) {
RemoteIterator<CachePoolInfo> iter = dfs.listCachePools(); RemoteIterator<CachePoolEntry> iter = dfs.listCachePools();
if (!iter.hasNext()) { if (!iter.hasNext()) {
return true; return true;
} }