HDFS-10328. Add per-cache-pool default replication num configuration (xupeng via cmccabe)

This commit is contained in:
Colin Patrick Mccabe 2016-06-17 12:44:36 -07:00
parent 5107a967fa
commit 5f6b4157a4
8 changed files with 157 additions and 19 deletions

View File

@ -54,6 +54,8 @@ public class CachePoolInfo {
public static final long LIMIT_UNLIMITED = Long.MAX_VALUE;
public static final long DEFAULT_LIMIT = LIMIT_UNLIMITED;
public static final short DEFAULT_REPLICATION_NUM = 1;
final String poolName;
@Nullable
@ -68,6 +70,9 @@ public class CachePoolInfo {
@Nullable
Long limit;
@Nullable
private Short defaultReplication;
@Nullable
Long maxRelativeExpiryMs;
@ -134,6 +139,18 @@ public class CachePoolInfo {
return this;
}
/**
* @return The default replication num for CacheDirective in this pool
*/
public Short getDefaultReplication() {
return defaultReplication;
}
public CachePoolInfo setDefaultReplication(Short repl) {
this.defaultReplication = repl;
return this;
}
/**
* @return The maximum relative expiration of directives of this pool in
* milliseconds
@ -161,6 +178,7 @@ public class CachePoolInfo {
+ ", mode:"
+ ((mode == null) ? "null" : String.format("0%03o", mode.toShort()))
+ ", limit:" + limit
+ ", defaultReplication:" + defaultReplication
+ ", maxRelativeExpiryMs:" + maxRelativeExpiryMs + "}";
}
@ -178,6 +196,7 @@ public class CachePoolInfo {
append(groupName, other.groupName).
append(mode, other.mode).
append(limit, other.limit).
append(defaultReplication, other.defaultReplication).
append(maxRelativeExpiryMs, other.maxRelativeExpiryMs).
isEquals();
}
@ -190,6 +209,7 @@ public class CachePoolInfo {
append(groupName).
append(mode).
append(limit).
append(defaultReplication).
append(maxRelativeExpiryMs).
hashCode();
}
@ -201,6 +221,11 @@ public class CachePoolInfo {
if ((info.getLimit() != null) && (info.getLimit() < 0)) {
throw new InvalidRequestException("Limit is negative.");
}
if ((info.getDefaultReplication() != null)
&& (info.getDefaultReplication() < 0)) {
throw new InvalidRequestException("Default Replication is negative");
}
if (info.getMaxRelativeExpiryMs() != null) {
long maxRelativeExpiryMs = info.getMaxRelativeExpiryMs();
if (maxRelativeExpiryMs < 0l) {

View File

@ -1203,6 +1203,10 @@ public class PBHelperClient {
if (proto.hasLimit()) {
info.setLimit(proto.getLimit());
}
if (proto.hasDefaultReplication()) {
info.setDefaultReplication(Shorts.checkedCast(
proto.getDefaultReplication()));
}
if (proto.hasMaxRelativeExpiry()) {
info.setMaxRelativeExpiryMs(proto.getMaxRelativeExpiry());
}
@ -1234,6 +1238,9 @@ public class PBHelperClient {
if (info.getLimit() != null) {
builder.setLimit(info.getLimit());
}
if (info.getDefaultReplication() != null) {
builder.setDefaultReplication(info.getDefaultReplication());
}
if (info.getMaxRelativeExpiryMs() != null) {
builder.setMaxRelativeExpiry(info.getMaxRelativeExpiryMs());
}

View File

@ -534,6 +534,7 @@ message CachePoolInfoProto {
optional int32 mode = 4;
optional int64 limit = 5;
optional int64 maxRelativeExpiry = 6;
optional uint32 defaultReplication = 7 [default=1];
}
message CachePoolStatsProto {

View File

@ -523,7 +523,8 @@ public final class CacheManager {
CachePool pool = getCachePool(validatePoolName(info));
checkWritePermission(pc, pool);
String path = validatePath(info);
short replication = validateReplication(info, (short)1);
short replication = validateReplication(
info, pool.getDefaultReplication());
long expiryTime = validateExpiryTime(info, pool.getMaxRelativeExpiryMs());
// Do quota validation if required
if (!flags.contains(CacheFlag.FORCE)) {
@ -826,6 +827,13 @@ public final class CacheManager {
// New limit changes stats, need to set needs refresh
setNeedsRescan();
}
if (info.getDefaultReplication() != null) {
final short defaultReplication = info.getDefaultReplication();
pool.setDefaultReplication(defaultReplication);
bld.append(prefix).append("set default replication to "
+ defaultReplication);
prefix = "; ";
}
if (info.getMaxRelativeExpiryMs() != null) {
final Long maxRelativeExpiry = info.getMaxRelativeExpiryMs();
pool.setMaxRelativeExpiryMs(maxRelativeExpiry);
@ -1083,6 +1091,10 @@ public final class CacheManager {
if (p.hasMode())
info.setMode(new FsPermission((short) p.getMode()));
if (p.hasDefaultReplication()) {
info.setDefaultReplication((short) p.getDefaultReplication());
}
if (p.hasLimit())
info.setLimit(p.getLimit());

View File

@ -72,6 +72,11 @@ public final class CachePool {
*/
private long limit;
/**
* Default replication num for CacheDirective in this pool.
*/
private short defaultReplication;
/**
* Maximum duration that a CacheDirective in this pool remains valid,
* in milliseconds.
@ -123,11 +128,15 @@ public final class CachePool {
FsPermission.getCachePoolDefault() : info.getMode();
long limit = info.getLimit() == null ?
CachePoolInfo.DEFAULT_LIMIT : info.getLimit();
short defaultReplication = info.getDefaultReplication() == null ?
CachePoolInfo.DEFAULT_REPLICATION_NUM :
info.getDefaultReplication();
long maxRelativeExpiry = info.getMaxRelativeExpiryMs() == null ?
CachePoolInfo.DEFAULT_MAX_RELATIVE_EXPIRY :
info.getMaxRelativeExpiryMs();
return new CachePool(info.getPoolName(),
ownerName, groupName, mode, limit, maxRelativeExpiry);
ownerName, groupName, mode, limit,
defaultReplication, maxRelativeExpiry);
}
/**
@ -137,11 +146,13 @@ public final class CachePool {
static CachePool createFromInfo(CachePoolInfo info) {
return new CachePool(info.getPoolName(),
info.getOwnerName(), info.getGroupName(),
info.getMode(), info.getLimit(), info.getMaxRelativeExpiryMs());
info.getMode(), info.getLimit(),
info.getDefaultReplication(), info.getMaxRelativeExpiryMs());
}
CachePool(String poolName, String ownerName, String groupName,
FsPermission mode, long limit, long maxRelativeExpiry) {
FsPermission mode, long limit,
short defaultReplication, long maxRelativeExpiry) {
Preconditions.checkNotNull(poolName);
Preconditions.checkNotNull(ownerName);
Preconditions.checkNotNull(groupName);
@ -151,6 +162,7 @@ public final class CachePool {
this.groupName = groupName;
this.mode = new FsPermission(mode);
this.limit = limit;
this.defaultReplication = defaultReplication;
this.maxRelativeExpiryMs = maxRelativeExpiry;
}
@ -194,6 +206,14 @@ public final class CachePool {
return this;
}
public short getDefaultReplication() {
return defaultReplication;
}
public void setDefaultReplication(short replication) {
this.defaultReplication = replication;
}
public long getMaxRelativeExpiryMs() {
return maxRelativeExpiryMs;
}
@ -221,6 +241,7 @@ public final class CachePool {
setGroupName(groupName).
setMode(new FsPermission(mode)).
setLimit(limit).
setDefaultReplication(defaultReplication).
setMaxRelativeExpiryMs(maxRelativeExpiryMs);
}
@ -314,6 +335,7 @@ public final class CachePool {
append(", groupName:").append(groupName).
append(", mode:").append(mode).
append(", limit:").append(limit).
append(", defaultReplication").append(defaultReplication).
append(", maxRelativeExpiryMs:").append(maxRelativeExpiryMs).
append(" }").toString();
}

View File

@ -619,20 +619,24 @@ public class FSImageSerialization {
final Long limit = info.getLimit();
final FsPermission mode = info.getMode();
final Long maxRelativeExpiry = info.getMaxRelativeExpiryMs();
final Short defaultReplication = info.getDefaultReplication();
boolean hasOwner, hasGroup, hasMode, hasLimit, hasMaxRelativeExpiry;
boolean hasOwner, hasGroup, hasMode, hasLimit,
hasMaxRelativeExpiry, hasDefaultReplication;
hasOwner = ownerName != null;
hasGroup = groupName != null;
hasMode = mode != null;
hasLimit = limit != null;
hasMaxRelativeExpiry = maxRelativeExpiry != null;
hasDefaultReplication = defaultReplication != null;
int flags =
(hasOwner ? 0x1 : 0) |
(hasGroup ? 0x2 : 0) |
(hasMode ? 0x4 : 0) |
(hasLimit ? 0x8 : 0) |
(hasMaxRelativeExpiry ? 0x10 : 0);
(hasMaxRelativeExpiry ? 0x10 : 0) |
(hasDefaultReplication ? 0x20 : 0);
writeInt(flags, out);
@ -651,6 +655,9 @@ public class FSImageSerialization {
if (hasMaxRelativeExpiry) {
writeLong(maxRelativeExpiry, out);
}
if (hasDefaultReplication) {
writeShort(defaultReplication, out);
}
}
public static CachePoolInfo readCachePoolInfo(DataInput in)
@ -673,7 +680,10 @@ public class FSImageSerialization {
if ((flags & 0x10) != 0) {
info.setMaxRelativeExpiryMs(readLong(in));
}
if ((flags & ~0x1F) != 0) {
if ((flags & 0x20) != 0) {
info.setDefaultReplication(readShort(in));
}
if ((flags & ~0x2F) != 0) {
throw new IOException("Unknown flag in CachePoolInfo: " + flags);
}
return info;

View File

@ -555,7 +555,7 @@ public class CacheAdmin extends Configured implements Tool {
public String getShortUsage() {
return "[" + NAME + " <name> [-owner <owner>] " +
"[-group <group>] [-mode <mode>] [-limit <limit>] " +
"[-maxTtl <maxTtl>]" +
"[-defaultReplication <defaultReplication>] [-maxTtl <maxTtl>]" +
"]\n";
}
@ -575,6 +575,9 @@ public class CacheAdmin extends Configured implements Tool {
listing.addRow("<limit>", "The maximum number of bytes that can be " +
"cached by directives in this pool, in aggregate. By default, " +
"no limit is set.");
listing.addRow("<defaultReplication>", "The default replication " +
"number for cache directive in the pool. " +
"If not set, the replication is set to 1");
listing.addRow("<maxTtl>", "The maximum allowed time-to-live for " +
"directives being added to the pool. This can be specified in " +
"seconds, minutes, hours, and days, e.g. 120s, 30m, 4h, 2d. " +
@ -613,6 +616,12 @@ public class CacheAdmin extends Configured implements Tool {
if (limit != null) {
info.setLimit(limit);
}
String replicationString = StringUtils.
popOptionWithArgument("-defaultReplication", args);
if (replicationString != null) {
short defaultReplication = Short.parseShort(replicationString);
info.setDefaultReplication(defaultReplication);
}
String maxTtlString = StringUtils.popOptionWithArgument("-maxTtl", args);
try {
Long maxTtl = AdminHelper.parseTtlString(maxTtlString);
@ -654,7 +663,7 @@ public class CacheAdmin extends Configured implements Tool {
public String getShortUsage() {
return "[" + getName() + " <name> [-owner <owner>] " +
"[-group <group>] [-mode <mode>] [-limit <limit>] " +
"[-maxTtl <maxTtl>]]\n";
"[-defaultReplication <defaultReplication>] [-maxTtl <maxTtl>]]\n";
}
@Override
@ -667,6 +676,8 @@ public class CacheAdmin extends Configured implements Tool {
listing.addRow("<mode>", "Unix-style permissions of the pool in octal.");
listing.addRow("<limit>", "Maximum number of bytes that can be cached " +
"by this pool.");
listing.addRow("<defaultReplication>", "Default replication num for " +
"directives in this pool");
listing.addRow("<maxTtl>", "The maximum allowed time-to-live for " +
"directives being added to the pool.");
@ -686,6 +697,12 @@ public class CacheAdmin extends Configured implements Tool {
null : Integer.parseInt(modeString, 8);
String limitString = StringUtils.popOptionWithArgument("-limit", args);
Long limit = AdminHelper.parseLimitString(limitString);
String replicationString =
StringUtils.popOptionWithArgument("-defaultReplication", args);
Short defaultReplication = null;
if (replicationString != null) {
defaultReplication = Short.parseShort(replicationString);
}
String maxTtlString = StringUtils.popOptionWithArgument("-maxTtl", args);
Long maxTtl;
try {
@ -725,6 +742,10 @@ public class CacheAdmin extends Configured implements Tool {
info.setLimit(limit);
changed = true;
}
if (defaultReplication != null) {
info.setDefaultReplication(defaultReplication);
changed = true;
}
if (maxTtl != null) {
info.setMaxRelativeExpiryMs(maxTtl);
changed = true;
@ -759,6 +780,10 @@ public class CacheAdmin extends Configured implements Tool {
System.out.print(prefix + "limit " + limit);
prefix = " and ";
}
if (defaultReplication != null) {
System.out.println(prefix + "replication " + defaultReplication);
prefix = " replication ";
}
if (maxTtl != null) {
System.out.print(prefix + "max time-to-live " + maxTtlString);
}
@ -854,7 +879,8 @@ public class CacheAdmin extends Configured implements Tool {
addField("GROUP", Justification.LEFT).
addField("MODE", Justification.LEFT).
addField("LIMIT", Justification.RIGHT).
addField("MAXTTL", Justification.RIGHT);
addField("MAXTTL", Justification.RIGHT).
addField("DEFAULT_REPLICATION", Justification.RIGHT);
if (printStats) {
builder.
addField("BYTES_NEEDED", Justification.RIGHT).
@ -895,6 +921,8 @@ public class CacheAdmin extends Configured implements Tool {
}
}
row.add(maxTtlString);
row.add("" + info.getDefaultReplication());
if (printStats) {
CachePoolStats stats = entry.getStats();
row.add(Long.toString(stats.getBytesNeeded()));

View File

@ -80,8 +80,8 @@
<test> <!--Tested -->
<description>Testing modifying a cache pool</description>
<test-commands>
<cache-admin-command>-addPool poolparty -owner alice -group alicegroup -mode 0000 -limit 50</cache-admin-command>
<cache-admin-command>-modifyPool poolparty -owner bob -group bobgroup -mode 0777 -limit 51</cache-admin-command>
<cache-admin-command>-addPool poolparty -owner alice -group alicegroup -mode 0000 -limit 50 -defaultReplication 3</cache-admin-command>
<cache-admin-command>-modifyPool poolparty -owner bob -group bobgroup -mode 0777 -limit 51 -defaultReplication 2</cache-admin-command>
<cache-admin-command>-listPools</cache-admin-command>
</test-commands>
<cleanup-commands>
@ -90,11 +90,12 @@
<comparators>
<comparator>
<type>SubstringComparator</type>
<expected-output>poolparty bob bobgroup rwxrwxrwx 51</expected-output>
<expected-output>poolparty bob bobgroup rwxrwxrwx 51 never 2</expected-output>
</comparator>
</comparators>
</test>
<test> <!--Tested -->
<description>Testing deleting a cache pool</description>
<test-commands>
@ -114,7 +115,7 @@
<test> <!--Tested -->
<description>Testing listing all cache pools</description>
<test-commands>
<cache-admin-command>-addPool foo -owner bob -group bob -mode 0664</cache-admin-command>
<cache-admin-command>-addPool foo -owner bob -group bob -mode 0664 -defaultReplication 2</cache-admin-command>
<cache-admin-command>-addPool bar -owner alice -group alicegroup -mode 0755</cache-admin-command>
<cache-admin-command>-listPools</cache-admin-command>
</test-commands>
@ -129,11 +130,11 @@
</comparator>
<comparator>
<type>SubstringComparator</type>
<expected-output>bar alice alicegroup rwxr-xr-x unlimited</expected-output>
<expected-output>bar alice alicegroup rwxr-xr-x unlimited never 1</expected-output>
</comparator>
<comparator>
<type>SubstringComparator</type>
<expected-output>foo bob bob rw-rw-r-- unlimited</expected-output>
<expected-output>foo bob bob rw-rw-r-- unlimited never 2</expected-output>
</comparator>
</comparators>
</test>
@ -156,7 +157,7 @@
</comparator>
<comparator>
<type>SubstringComparator</type>
<expected-output>foo bob bob rw-rw-r-- unlimited</expected-output>
<expected-output>foo bob bob rw-rw-r-- unlimited never 1</expected-output>
</comparator>
</comparators>
</test>
@ -417,11 +418,11 @@
</comparator>
<comparator>
<type>SubstringComparator</type>
<expected-output>bar alice alicegroup rwxr-xr-x unlimited never 0 0 0 0 0</expected-output>
<expected-output>bar alice alicegroup rwxr-xr-x unlimited never 1 0 0 0 0 0</expected-output>
</comparator>
<comparator>
<type>SubstringComparator</type>
<expected-output>foo bob bob rw-rw-r-- unlimited never 0 0 0 0 0</expected-output>
<expected-output>foo bob bob rw-rw-r-- unlimited never 1 0 0 0 0 0</expected-output>
</comparator>
</comparators>
</test>
@ -543,5 +544,37 @@
</comparator>
</comparators>
</test>
<test> <!--Tested -->
<description>Testing overriding cache pool replication</description>
<test-commands>
<cache-admin-command>-addPool pool1 -defaultReplication 2</cache-admin-command>
<cache-admin-command>-addPool pool2 -defaultReplication 3</cache-admin-command>
<cache-admin-command>-addDirective -path /foo -pool pool1 -ttl 2d</cache-admin-command>
<cache-admin-command>-addDirective -path /bar -pool pool2 -ttl 24h</cache-admin-command>
<cache-admin-command>-modifyDirective -id 32 -replication 3</cache-admin-command>
<cache-admin-command>-modifyDirective -id 34 -pool pool1</cache-admin-command>
<cache-admin-command>-addDirective -path /baz -replication 3 -pool pool1 -ttl 60m</cache-admin-command>
<cache-admin-command>-listDirectives -pool pool1</cache-admin-command>
</test-commands>
<cleanup-commands>
<cache-admin-command>-removePool pool1</cache-admin-command>
<cache-admin-command>-removePool pool2</cache-admin-command>
</cleanup-commands>
<comparators>
<comparator>
<type>SubstringComparator</type>
<expected-output>Found 2 entries</expected-output>
</comparator>
<comparator>
<type>SubstringComparator</type>
<expected-output>32 pool1 3</expected-output>
</comparator>
<comparator>
<type>SubstringComparator</type>
<expected-output>34 pool1 3</expected-output>
</comparator>
</comparators>
</test>
</tests>
</configuration>