HDFS-5358. Add replication field to PathBasedCacheDirective. (Contributed by Colin Patrick McCabe)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-4949@1532124 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Andrew Wang 2013-10-14 22:56:11 +00:00
parent 15d08c4778
commit efe545b0c2
17 changed files with 115 additions and 17 deletions

View File

@ -60,6 +60,9 @@ HDFS-4949 (Unreleased)
HDFS-5224. Refactor PathBasedCache* methods to use a Path rather than a
String. (cnauroth)
HDFS-5358. Add replication field to PathBasedCacheDirective.
(Contributed by Colin Patrick McCabe)
OPTIMIZATIONS
HDFS-5349. DNA_CACHE and DNA_UNCACHE should be by blockId only. (cmccabe)

View File

@ -1595,6 +1595,7 @@ public class DistributedFileSystem extends FileSystem {
makeQualified(getUri(), getWorkingDirectory());
return dfs.addPathBasedCacheDirective(new PathBasedCacheDirective.Builder().
setPath(path).
setReplication(directive.getReplication()).
setPool(directive.getPool()).
build());
}
@ -1634,7 +1635,7 @@ public class DistributedFileSystem extends FileSystem {
PathBasedCacheDescriptor desc = iter.next();
Path qualPath = desc.getPath().makeQualified(getUri(), path);
return new PathBasedCacheDescriptor(desc.getEntryId(), qualPath,
desc.getPool());
desc.getReplication(), desc.getPool());
}
};
}

View File

@ -33,8 +33,9 @@ import com.google.common.base.Preconditions;
public final class PathBasedCacheDescriptor extends PathBasedCacheDirective {
private final long entryId;
public PathBasedCacheDescriptor(long entryId, Path path, String pool) {
super(path, pool);
public PathBasedCacheDescriptor(long entryId, Path path,
short replication, String pool) {
super(path, replication, pool);
Preconditions.checkArgument(entryId > 0);
this.entryId = entryId;
}
@ -54,6 +55,7 @@ public final class PathBasedCacheDescriptor extends PathBasedCacheDirective {
PathBasedCacheDescriptor other = (PathBasedCacheDescriptor)o;
return new EqualsBuilder().append(entryId, other.entryId).
append(getPath(), other.getPath()).
append(getReplication(), other.getReplication()).
append(getPool(), other.getPool()).
isEquals();
}
@ -62,6 +64,7 @@ public final class PathBasedCacheDescriptor extends PathBasedCacheDirective {
public int hashCode() {
return new HashCodeBuilder().append(entryId).
append(getPath()).
append(getReplication()).
append(getPool()).
hashCode();
}
@ -71,6 +74,7 @@ public final class PathBasedCacheDescriptor extends PathBasedCacheDirective {
StringBuilder builder = new StringBuilder();
builder.append("{ entryId:").append(entryId).
append(", path:").append(getPath()).
append(", replication:").append(getReplication()).
append(", pool:").append(getPool()).
append(" }");
return builder.toString();

View File

@ -41,8 +41,8 @@ public class PathBasedCacheDirective {
* A builder for creating new PathBasedCacheDirective instances.
*/
public static class Builder {
private Path path;
private short replication = (short)1;
private String pool;
/**
@ -51,7 +51,7 @@ public class PathBasedCacheDirective {
* @return New PathBasedCacheDirective.
*/
public PathBasedCacheDirective build() {
return new PathBasedCacheDirective(path, pool);
return new PathBasedCacheDirective(path, replication, pool);
}
/**
@ -65,6 +65,17 @@ public class PathBasedCacheDirective {
return this;
}
/**
* Sets the replication used in this request.
*
* @param replication The replication used in this request.
* @return This builder, for call chaining.
*/
public Builder setReplication(short replication) {
this.replication = replication;
return this;
}
/**
* Sets the pool used in this request.
*
@ -78,6 +89,7 @@ public class PathBasedCacheDirective {
}
private final Path path;
private final short replication;
private final String pool;
/**
@ -87,6 +99,13 @@ public class PathBasedCacheDirective {
return path;
}
/**
* @return The number of times the block should be cached.
*/
public short getReplication() {
return replication;
}
/**
* @return The pool used in this request.
*/
@ -104,6 +123,10 @@ public class PathBasedCacheDirective {
if (!DFSUtil.isValidName(path.toUri().getPath())) {
throw new InvalidPathNameError(this);
}
if (replication <= 0) {
throw new IOException("Tried to request a cache replication " +
"factor of " + replication + ", but that is less than 1.");
}
if (pool.isEmpty()) {
throw new InvalidPoolNameError(this);
}
@ -119,6 +142,7 @@ public class PathBasedCacheDirective {
}
PathBasedCacheDirective other = (PathBasedCacheDirective)o;
return new EqualsBuilder().append(getPath(), other.getPath()).
append(getReplication(), other.getReplication()).
append(getPool(), other.getPool()).
isEquals();
}
@ -126,6 +150,7 @@ public class PathBasedCacheDirective {
@Override
public int hashCode() {
return new HashCodeBuilder().append(getPath()).
append(replication).
append(getPool()).
hashCode();
}
@ -134,6 +159,7 @@ public class PathBasedCacheDirective {
public String toString() {
StringBuilder builder = new StringBuilder();
builder.append("{ path:").append(path).
append(", replication:").append(replication).
append(", pool:").append(pool).
append(" }");
return builder.toString();
@ -143,12 +169,14 @@ public class PathBasedCacheDirective {
* Protected constructor. Callers use Builder to create new instances.
*
* @param path The path used in this request.
* @param replication The replication used in this request.
* @param pool The pool used in this request.
*/
protected PathBasedCacheDirective(Path path, String pool) {
protected PathBasedCacheDirective(Path path, short replication, String pool) {
Preconditions.checkNotNull(path);
Preconditions.checkNotNull(pool);
this.path = path;
this.replication = replication;
this.pool = pool;
}
};

View File

@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hdfs.protocol;
import org.apache.commons.lang.builder.HashCodeBuilder;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.server.namenode.CachePool;
@ -32,14 +33,18 @@ import com.google.common.base.Preconditions;
public final class PathBasedCacheEntry {
private final long entryId;
private final String path;
private final short replication;
private final CachePool pool;
public PathBasedCacheEntry(long entryId, String path, CachePool pool) {
public PathBasedCacheEntry(long entryId, String path,
short replication, CachePool pool) {
Preconditions.checkArgument(entryId > 0);
this.entryId = entryId;
Preconditions.checkNotNull(path);
Preconditions.checkArgument(replication > 0);
this.path = path;
Preconditions.checkNotNull(pool);
this.replication = replication;
Preconditions.checkNotNull(path);
this.pool = pool;
}
@ -55,18 +60,37 @@ public final class PathBasedCacheEntry {
return pool;
}
public short getReplication() {
return replication;
}
@Override
public String toString() {
StringBuilder builder = new StringBuilder();
builder.append("{ entryId:").append(entryId).
append(", path:").append(path).
append(", replication:").append(replication).
append(", pool:").append(pool).
append(" }");
return builder.toString();
}
public PathBasedCacheDescriptor getDescriptor() {
return new PathBasedCacheDescriptor(entryId, new Path(path),
return new PathBasedCacheDescriptor(entryId, new Path(path), replication,
pool.getPoolName());
}
@Override
public boolean equals(Object o) {
if (o.getClass() != this.getClass()) {
return false;
}
PathBasedCacheEntry other = (PathBasedCacheEntry)o;
return entryId == other.entryId;
}
@Override
public int hashCode() {
return new HashCodeBuilder().append(entryId).toHashCode();
}
};

View File

@ -180,6 +180,7 @@ import org.apache.hadoop.security.token.Token;
import org.apache.commons.lang.StringUtils;
import com.google.common.primitives.Shorts;
import com.google.protobuf.RpcController;
import com.google.protobuf.ServiceException;
@ -1044,6 +1045,7 @@ public class ClientNamenodeProtocolServerSideTranslatorPB implements
}
PathBasedCacheDirective directive = new PathBasedCacheDirective.Builder().
setPath(new Path(proto.getPath())).
setReplication(Shorts.checkedCast(proto.getReplication())).
setPool(proto.getPool()).
build();
PathBasedCacheDescriptor descriptor =
@ -1090,6 +1092,7 @@ public class ClientNamenodeProtocolServerSideTranslatorPB implements
ListPathBasedCacheDescriptorsElementProto.newBuilder().
setId(directive.getEntryId()).
setPath(directive.getPath().toUri().getPath()).
setReplication(directive.getReplication()).
setPool(directive.getPool()));
prevId = directive.getEntryId();
}

View File

@ -148,6 +148,7 @@ import org.apache.hadoop.security.proto.SecurityProtos.GetDelegationTokenRespons
import org.apache.hadoop.security.proto.SecurityProtos.RenewDelegationTokenRequestProto;
import org.apache.hadoop.security.token.Token;
import com.google.common.primitives.Shorts;
import com.google.protobuf.ByteString;
import com.google.protobuf.ServiceException;
@ -1011,12 +1012,14 @@ public class ClientNamenodeProtocolTranslatorPB implements
AddPathBasedCacheDirectiveRequestProto.newBuilder();
builder.setDirective(PathBasedCacheDirectiveProto.newBuilder()
.setPath(directive.getPath().toUri().getPath())
.setReplication(directive.getReplication())
.setPool(directive.getPool())
.build());
AddPathBasedCacheDirectiveResponseProto result =
rpcProxy.addPathBasedCacheDirective(null, builder.build());
return new PathBasedCacheDescriptor(result.getDescriptorId(),
directive.getPath(), directive.getPool());
directive.getPath(), directive.getReplication(),
directive.getPool());
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
@ -1048,7 +1051,9 @@ public class ClientNamenodeProtocolTranslatorPB implements
ListPathBasedCacheDescriptorsElementProto elementProto =
response.getElements(i);
return new PathBasedCacheDescriptor(elementProto.getId(),
new Path(elementProto.getPath()), elementProto.getPool());
new Path(elementProto.getPath()),
Shorts.checkedCast(elementProto.getReplication()),
elementProto.getPool());
}
@Override

View File

@ -248,7 +248,8 @@ public final class CacheManager {
// Add a new entry with the next available ID.
PathBasedCacheEntry entry;
entry = new PathBasedCacheEntry(getNextEntryId(),
directive.getPath().toUri().getPath(), pool);
directive.getPath().toUri().getPath(),
directive.getReplication(), pool);
unprotectedAddEntry(entry);
@ -597,10 +598,12 @@ public final class CacheManager {
for (int i = 0; i < numberOfEntries; i++) {
long entryId = in.readLong();
String path = Text.readString(in);
short replication = in.readShort();
String poolName = Text.readString(in);
// Get pool reference by looking it up in the map
CachePool pool = cachePools.get(poolName);
PathBasedCacheEntry entry = new PathBasedCacheEntry(entryId, path, pool);
PathBasedCacheEntry entry =
new PathBasedCacheEntry(entryId, path, replication, pool);
unprotectedAddEntry(entry);
counter.increment();
}

View File

@ -959,6 +959,7 @@ public class FSEditLog implements LogsPurgeable {
AddPathBasedCacheDirectiveOp op = AddPathBasedCacheDirectiveOp.getInstance(
cache.get())
.setPath(directive.getPath().toUri().getPath())
.setReplication(directive.getReplication())
.setPool(directive.getPool());
logRpcIds(op, toLogRpcIds);
logEdit(op);

View File

@ -644,6 +644,7 @@ public class FSEditLogLoader {
AddPathBasedCacheDirectiveOp addOp = (AddPathBasedCacheDirectiveOp) op;
PathBasedCacheDirective d = new PathBasedCacheDirective.Builder().
setPath(new Path(addOp.path)).
setReplication(addOp.replication).
setPool(addOp.pool).
build();
PathBasedCacheDescriptor descriptor =

View File

@ -2862,8 +2862,8 @@ public abstract class FSEditLogOp {
}
static class AddPathBasedCacheDirectiveOp extends FSEditLogOp {
String path;
short replication;
String pool;
public AddPathBasedCacheDirectiveOp() {
@ -2880,6 +2880,11 @@ public abstract class FSEditLogOp {
return this;
}
public AddPathBasedCacheDirectiveOp setReplication(short replication) {
this.replication = replication;
return this;
}
public AddPathBasedCacheDirectiveOp setPool(String pool) {
this.pool = pool;
return this;
@ -2888,24 +2893,29 @@ public abstract class FSEditLogOp {
@Override
void readFields(DataInputStream in, int logVersion) throws IOException {
this.path = FSImageSerialization.readString(in);
this.replication = FSImageSerialization.readShort(in);
this.pool = FSImageSerialization.readString(in);
}
@Override
public void writeFields(DataOutputStream out) throws IOException {
FSImageSerialization.writeString(path, out);
FSImageSerialization.writeShort(replication, out);
FSImageSerialization.writeString(pool, out);
}
@Override
protected void toXml(ContentHandler contentHandler) throws SAXException {
XMLUtils.addSaxString(contentHandler, "PATH", path);
XMLUtils.addSaxString(contentHandler, "REPLICATION",
Short.toString(replication));
XMLUtils.addSaxString(contentHandler, "POOL", pool);
}
@Override
void fromXml(Stanza st) throws InvalidXmlException {
path = st.getValue("PATH");
replication = Short.parseShort(st.getValue("REPLICATION"));
pool = st.getValue("POOL");
}
@ -2914,6 +2924,7 @@ public abstract class FSEditLogOp {
StringBuilder builder = new StringBuilder();
builder.append("AddPathBasedCacheDirective [");
builder.append("path=" + path + ",");
builder.append("replication=" + replication + ",");
builder.append("pool=" + pool + "]");
return builder.toString();
}

View File

@ -139,6 +139,8 @@ public class CacheAdmin extends Configured implements Tool {
TableListing listing = getOptionDescriptionListing();
listing.addRow("<path>", "A path to cache. The path can be " +
"a directory or a file.");
listing.addRow("<replication>", "The cache replication factor to use. " +
"Defaults to 1.");
listing.addRow("<pool-name>", "The pool to which the directive will be " +
"added. You must have write permission on the cache pool "
+ "in order to add new directives.");
@ -154,6 +156,12 @@ public class CacheAdmin extends Configured implements Tool {
System.err.println("You must specify a path with -path.");
return 1;
}
short replication = 1;
String replicationString =
StringUtils.popOptionWithArgument("-replication", args);
if (replicationString != null) {
replication = Short.parseShort(replicationString);
}
String poolName = StringUtils.popOptionWithArgument("-pool", args);
if (poolName == null) {
System.err.println("You must specify a pool name with -pool.");
@ -167,9 +175,9 @@ public class CacheAdmin extends Configured implements Tool {
DistributedFileSystem dfs = getDFS(conf);
PathBasedCacheDirective directive = new PathBasedCacheDirective.Builder().
setPath(new Path(path)).
setReplication(replication).
setPool(poolName).
build();
try {
PathBasedCacheDescriptor descriptor =
dfs.addPathBasedCacheDirective(directive);

View File

@ -245,6 +245,7 @@ class ImageLoaderCurrent implements ImageLoader {
final int numEntries = in.readInt();
for (int i=0; i<numEntries; i++) {
v.visit(ImageElement.CACHE_ENTRY_PATH, Text.readString(in));
v.visit(ImageElement.CACHE_ENTRY_REPLICATION, in.readShort());
v.visit(ImageElement.CACHE_ENTRY_POOL_NAME, Text.readString(in));
}
}

View File

@ -128,6 +128,7 @@ abstract class ImageVisitor {
CACHE_POOL_WEIGHT,
CACHE_NUM_ENTRIES,
CACHE_ENTRY_PATH,
CACHE_ENTRY_REPLICATION,
CACHE_ENTRY_POOL_NAME
}

View File

@ -365,7 +365,8 @@ message IsFileClosedResponseProto {
message PathBasedCacheDirectiveProto {
required string path = 1;
required string pool = 2;
required uint32 replication = 2;
required string pool = 3;
}
message AddPathBasedCacheDirectiveRequestProto {
@ -392,7 +393,8 @@ message ListPathBasedCacheDescriptorsRequestProto {
message ListPathBasedCacheDescriptorsElementProto {
required int64 id = 1;
required string pool = 2;
required string path = 3;
required uint32 replication = 3;
required string path = 4;
}
message ListPathBasedCacheDescriptorsResponseProto {

View File

@ -245,6 +245,7 @@ public class OfflineEditsViewerHelper {
PathBasedCacheDescriptor descriptor =
dfs.addPathBasedCacheDirective(new PathBasedCacheDirective.Builder().
setPath(new Path("/bar")).
setReplication((short)1).
setPool(pool).
build());
// OP_REMOVE_PATH_BASED_CACHE_DESCRIPTOR 34

View File

@ -840,6 +840,7 @@
<DATA>
<TXID>63</TXID>
<PATH>/bar</PATH>
<REPLICATION>1</REPLICATION>
<POOL>poolparty</POOL>
</DATA>
</RECORD>