diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/CachePoolInfo.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/CachePoolInfo.java index c8a70ace3bd..daa77be118a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/CachePoolInfo.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/CachePoolInfo.java @@ -54,6 +54,8 @@ public class CachePoolInfo { public static final long LIMIT_UNLIMITED = Long.MAX_VALUE; public static final long DEFAULT_LIMIT = LIMIT_UNLIMITED; + public static final short DEFAULT_REPLICATION_NUM = 1; + final String poolName; @Nullable @@ -68,6 +70,9 @@ public class CachePoolInfo { @Nullable Long limit; + @Nullable + private Short defaultReplication; + @Nullable Long maxRelativeExpiryMs; @@ -134,6 +139,18 @@ public class CachePoolInfo { return this; } + /** + * @return The default replication num for CacheDirective in this pool + */ + public Short getDefaultReplication() { + return defaultReplication; + } + + public CachePoolInfo setDefaultReplication(Short repl) { + this.defaultReplication = repl; + return this; + } + /** * @return The maximum relative expiration of directives of this pool in * milliseconds @@ -161,6 +178,7 @@ public class CachePoolInfo { + ", mode:" + ((mode == null) ? "null" : String.format("0%03o", mode.toShort())) + ", limit:" + limit + + ", defaultReplication:" + defaultReplication + ", maxRelativeExpiryMs:" + maxRelativeExpiryMs + "}"; } @@ -178,6 +196,7 @@ public class CachePoolInfo { append(groupName, other.groupName). append(mode, other.mode). append(limit, other.limit). + append(defaultReplication, other.defaultReplication). append(maxRelativeExpiryMs, other.maxRelativeExpiryMs). isEquals(); } @@ -190,6 +209,7 @@ public class CachePoolInfo { append(groupName). append(mode). append(limit). + append(defaultReplication). append(maxRelativeExpiryMs). hashCode(); } @@ -201,6 +221,11 @@ public class CachePoolInfo { if ((info.getLimit() != null) && (info.getLimit() < 0)) { throw new InvalidRequestException("Limit is negative."); } + if ((info.getDefaultReplication() != null) + && (info.getDefaultReplication() < 0)) { + throw new InvalidRequestException("Default Replication is negative"); + } + if (info.getMaxRelativeExpiryMs() != null) { long maxRelativeExpiryMs = info.getMaxRelativeExpiryMs(); if (maxRelativeExpiryMs < 0l) { diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java index d5bb1e711e7..3c5f583b3d9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java @@ -1203,6 +1203,10 @@ public class PBHelperClient { if (proto.hasLimit()) { info.setLimit(proto.getLimit()); } + if (proto.hasDefaultReplication()) { + info.setDefaultReplication(Shorts.checkedCast( + proto.getDefaultReplication())); + } if (proto.hasMaxRelativeExpiry()) { info.setMaxRelativeExpiryMs(proto.getMaxRelativeExpiry()); } @@ -1234,6 +1238,9 @@ public class PBHelperClient { if (info.getLimit() != null) { builder.setLimit(info.getLimit()); } + if (info.getDefaultReplication() != null) { + builder.setDefaultReplication(info.getDefaultReplication()); + } if (info.getMaxRelativeExpiryMs() != null) { builder.setMaxRelativeExpiry(info.getMaxRelativeExpiryMs()); } diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/ClientNamenodeProtocol.proto b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/ClientNamenodeProtocol.proto index 7acb394e09e..aac48a4e5df 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/ClientNamenodeProtocol.proto +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/ClientNamenodeProtocol.proto @@ -534,6 +534,7 @@ message CachePoolInfoProto { optional int32 mode = 4; optional int64 limit = 5; optional int64 maxRelativeExpiry = 6; + optional uint32 defaultReplication = 7 [default=1]; } message CachePoolStatsProto { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CacheManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CacheManager.java index b1f936bc4d9..cd57c56ff17 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CacheManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CacheManager.java @@ -523,7 +523,8 @@ public final class CacheManager { CachePool pool = getCachePool(validatePoolName(info)); checkWritePermission(pc, pool); String path = validatePath(info); - short replication = validateReplication(info, (short)1); + short replication = validateReplication( + info, pool.getDefaultReplication()); long expiryTime = validateExpiryTime(info, pool.getMaxRelativeExpiryMs()); // Do quota validation if required if (!flags.contains(CacheFlag.FORCE)) { @@ -826,6 +827,13 @@ public final class CacheManager { // New limit changes stats, need to set needs refresh setNeedsRescan(); } + if (info.getDefaultReplication() != null) { + final short defaultReplication = info.getDefaultReplication(); + pool.setDefaultReplication(defaultReplication); + bld.append(prefix).append("set default replication to " + + defaultReplication); + prefix = "; "; + } if (info.getMaxRelativeExpiryMs() != null) { final Long maxRelativeExpiry = info.getMaxRelativeExpiryMs(); pool.setMaxRelativeExpiryMs(maxRelativeExpiry); @@ -1083,6 +1091,10 @@ public final class CacheManager { if (p.hasMode()) info.setMode(new FsPermission((short) p.getMode())); + if (p.hasDefaultReplication()) { + info.setDefaultReplication((short) p.getDefaultReplication()); + } + if (p.hasLimit()) info.setLimit(p.getLimit()); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CachePool.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CachePool.java index 585124f93b7..a2613d999da 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CachePool.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CachePool.java @@ -72,6 +72,11 @@ public final class CachePool { */ private long limit; + /** + * Default replication num for CacheDirective in this pool. + */ + private short defaultReplication; + /** * Maximum duration that a CacheDirective in this pool remains valid, * in milliseconds. @@ -123,11 +128,15 @@ public final class CachePool { FsPermission.getCachePoolDefault() : info.getMode(); long limit = info.getLimit() == null ? CachePoolInfo.DEFAULT_LIMIT : info.getLimit(); + short defaultReplication = info.getDefaultReplication() == null ? + CachePoolInfo.DEFAULT_REPLICATION_NUM : + info.getDefaultReplication(); long maxRelativeExpiry = info.getMaxRelativeExpiryMs() == null ? CachePoolInfo.DEFAULT_MAX_RELATIVE_EXPIRY : info.getMaxRelativeExpiryMs(); return new CachePool(info.getPoolName(), - ownerName, groupName, mode, limit, maxRelativeExpiry); + ownerName, groupName, mode, limit, + defaultReplication, maxRelativeExpiry); } /** @@ -137,11 +146,13 @@ public final class CachePool { static CachePool createFromInfo(CachePoolInfo info) { return new CachePool(info.getPoolName(), info.getOwnerName(), info.getGroupName(), - info.getMode(), info.getLimit(), info.getMaxRelativeExpiryMs()); + info.getMode(), info.getLimit(), + info.getDefaultReplication(), info.getMaxRelativeExpiryMs()); } CachePool(String poolName, String ownerName, String groupName, - FsPermission mode, long limit, long maxRelativeExpiry) { + FsPermission mode, long limit, + short defaultReplication, long maxRelativeExpiry) { Preconditions.checkNotNull(poolName); Preconditions.checkNotNull(ownerName); Preconditions.checkNotNull(groupName); @@ -151,6 +162,7 @@ public final class CachePool { this.groupName = groupName; this.mode = new FsPermission(mode); this.limit = limit; + this.defaultReplication = defaultReplication; this.maxRelativeExpiryMs = maxRelativeExpiry; } @@ -194,6 +206,14 @@ public final class CachePool { return this; } + public short getDefaultReplication() { + return defaultReplication; + } + + public void setDefaultReplication(short replication) { + this.defaultReplication = replication; + } + public long getMaxRelativeExpiryMs() { return maxRelativeExpiryMs; } @@ -221,6 +241,7 @@ public final class CachePool { setGroupName(groupName). setMode(new FsPermission(mode)). setLimit(limit). + setDefaultReplication(defaultReplication). setMaxRelativeExpiryMs(maxRelativeExpiryMs); } @@ -314,6 +335,7 @@ public final class CachePool { append(", groupName:").append(groupName). append(", mode:").append(mode). append(", limit:").append(limit). + append(", defaultReplication").append(defaultReplication). append(", maxRelativeExpiryMs:").append(maxRelativeExpiryMs). append(" }").toString(); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageSerialization.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageSerialization.java index 419a704d903..06ac6a74e31 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageSerialization.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageSerialization.java @@ -619,20 +619,24 @@ public class FSImageSerialization { final Long limit = info.getLimit(); final FsPermission mode = info.getMode(); final Long maxRelativeExpiry = info.getMaxRelativeExpiryMs(); + final Short defaultReplication = info.getDefaultReplication(); - boolean hasOwner, hasGroup, hasMode, hasLimit, hasMaxRelativeExpiry; + boolean hasOwner, hasGroup, hasMode, hasLimit, + hasMaxRelativeExpiry, hasDefaultReplication; hasOwner = ownerName != null; hasGroup = groupName != null; hasMode = mode != null; hasLimit = limit != null; hasMaxRelativeExpiry = maxRelativeExpiry != null; + hasDefaultReplication = defaultReplication != null; int flags = (hasOwner ? 0x1 : 0) | (hasGroup ? 0x2 : 0) | (hasMode ? 0x4 : 0) | (hasLimit ? 0x8 : 0) | - (hasMaxRelativeExpiry ? 0x10 : 0); + (hasMaxRelativeExpiry ? 0x10 : 0) | + (hasDefaultReplication ? 0x20 : 0); writeInt(flags, out); @@ -651,6 +655,9 @@ public class FSImageSerialization { if (hasMaxRelativeExpiry) { writeLong(maxRelativeExpiry, out); } + if (hasDefaultReplication) { + writeShort(defaultReplication, out); + } } public static CachePoolInfo readCachePoolInfo(DataInput in) @@ -673,7 +680,10 @@ public class FSImageSerialization { if ((flags & 0x10) != 0) { info.setMaxRelativeExpiryMs(readLong(in)); } - if ((flags & ~0x1F) != 0) { + if ((flags & 0x20) != 0) { + info.setDefaultReplication(readShort(in)); + } + if ((flags & ~0x2F) != 0) { throw new IOException("Unknown flag in CachePoolInfo: " + flags); } return info; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/CacheAdmin.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/CacheAdmin.java index 7912c3a16ac..270a662a9f4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/CacheAdmin.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/CacheAdmin.java @@ -555,7 +555,7 @@ public class CacheAdmin extends Configured implements Tool { public String getShortUsage() { return "[" + NAME + " [-owner ] " + "[-group ] [-mode ] [-limit ] " + - "[-maxTtl ]" + + "[-defaultReplication ] [-maxTtl ]" + "]\n"; } @@ -575,6 +575,9 @@ public class CacheAdmin extends Configured implements Tool { listing.addRow("", "The maximum number of bytes that can be " + "cached by directives in this pool, in aggregate. By default, " + "no limit is set."); + listing.addRow("", "The default replication " + + "number for cache directive in the pool. " + + "If not set, the replication is set to 1"); listing.addRow("", "The maximum allowed time-to-live for " + "directives being added to the pool. This can be specified in " + "seconds, minutes, hours, and days, e.g. 120s, 30m, 4h, 2d. " + @@ -613,6 +616,12 @@ public class CacheAdmin extends Configured implements Tool { if (limit != null) { info.setLimit(limit); } + String replicationString = StringUtils. + popOptionWithArgument("-defaultReplication", args); + if (replicationString != null) { + short defaultReplication = Short.parseShort(replicationString); + info.setDefaultReplication(defaultReplication); + } String maxTtlString = StringUtils.popOptionWithArgument("-maxTtl", args); try { Long maxTtl = AdminHelper.parseTtlString(maxTtlString); @@ -654,7 +663,7 @@ public class CacheAdmin extends Configured implements Tool { public String getShortUsage() { return "[" + getName() + " [-owner ] " + "[-group ] [-mode ] [-limit ] " + - "[-maxTtl ]]\n"; + "[-defaultReplication ] [-maxTtl ]]\n"; } @Override @@ -667,6 +676,8 @@ public class CacheAdmin extends Configured implements Tool { listing.addRow("", "Unix-style permissions of the pool in octal."); listing.addRow("", "Maximum number of bytes that can be cached " + "by this pool."); + listing.addRow("", "Default replication num for " + + "directives in this pool"); listing.addRow("", "The maximum allowed time-to-live for " + "directives being added to the pool."); @@ -686,6 +697,12 @@ public class CacheAdmin extends Configured implements Tool { null : Integer.parseInt(modeString, 8); String limitString = StringUtils.popOptionWithArgument("-limit", args); Long limit = AdminHelper.parseLimitString(limitString); + String replicationString = + StringUtils.popOptionWithArgument("-defaultReplication", args); + Short defaultReplication = null; + if (replicationString != null) { + defaultReplication = Short.parseShort(replicationString); + } String maxTtlString = StringUtils.popOptionWithArgument("-maxTtl", args); Long maxTtl; try { @@ -725,6 +742,10 @@ public class CacheAdmin extends Configured implements Tool { info.setLimit(limit); changed = true; } + if (defaultReplication != null) { + info.setDefaultReplication(defaultReplication); + changed = true; + } if (maxTtl != null) { info.setMaxRelativeExpiryMs(maxTtl); changed = true; @@ -759,6 +780,10 @@ public class CacheAdmin extends Configured implements Tool { System.out.print(prefix + "limit " + limit); prefix = " and "; } + if (defaultReplication != null) { + System.out.println(prefix + "replication " + defaultReplication); + prefix = " replication "; + } if (maxTtl != null) { System.out.print(prefix + "max time-to-live " + maxTtlString); } @@ -854,7 +879,8 @@ public class CacheAdmin extends Configured implements Tool { addField("GROUP", Justification.LEFT). addField("MODE", Justification.LEFT). addField("LIMIT", Justification.RIGHT). - addField("MAXTTL", Justification.RIGHT); + addField("MAXTTL", Justification.RIGHT). + addField("DEFAULT_REPLICATION", Justification.RIGHT); if (printStats) { builder. addField("BYTES_NEEDED", Justification.RIGHT). @@ -895,6 +921,8 @@ public class CacheAdmin extends Configured implements Tool { } } row.add(maxTtlString); + row.add("" + info.getDefaultReplication()); + if (printStats) { CachePoolStats stats = entry.getStats(); row.add(Long.toString(stats.getBytesNeeded())); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/resources/testCacheAdminConf.xml b/hadoop-hdfs-project/hadoop-hdfs/src/test/resources/testCacheAdminConf.xml index 058eec59aa5..479deb570a2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/resources/testCacheAdminConf.xml +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/resources/testCacheAdminConf.xml @@ -80,8 +80,8 @@ Testing modifying a cache pool - -addPool poolparty -owner alice -group alicegroup -mode 0000 -limit 50 - -modifyPool poolparty -owner bob -group bobgroup -mode 0777 -limit 51 + -addPool poolparty -owner alice -group alicegroup -mode 0000 -limit 50 -defaultReplication 3 + -modifyPool poolparty -owner bob -group bobgroup -mode 0777 -limit 51 -defaultReplication 2 -listPools @@ -90,11 +90,12 @@ SubstringComparator - poolparty bob bobgroup rwxrwxrwx 51 + poolparty bob bobgroup rwxrwxrwx 51 never 2 + Testing deleting a cache pool @@ -114,7 +115,7 @@ Testing listing all cache pools - -addPool foo -owner bob -group bob -mode 0664 + -addPool foo -owner bob -group bob -mode 0664 -defaultReplication 2 -addPool bar -owner alice -group alicegroup -mode 0755 -listPools @@ -129,11 +130,11 @@ SubstringComparator - bar alice alicegroup rwxr-xr-x unlimited + bar alice alicegroup rwxr-xr-x unlimited never 1 SubstringComparator - foo bob bob rw-rw-r-- unlimited + foo bob bob rw-rw-r-- unlimited never 2 @@ -156,7 +157,7 @@ SubstringComparator - foo bob bob rw-rw-r-- unlimited + foo bob bob rw-rw-r-- unlimited never 1 @@ -417,11 +418,11 @@ SubstringComparator - bar alice alicegroup rwxr-xr-x unlimited never 0 0 0 0 0 + bar alice alicegroup rwxr-xr-x unlimited never 1 0 0 0 0 0 SubstringComparator - foo bob bob rw-rw-r-- unlimited never 0 0 0 0 0 + foo bob bob rw-rw-r-- unlimited never 1 0 0 0 0 0 @@ -543,5 +544,37 @@ + + + Testing overriding cache pool replication + + -addPool pool1 -defaultReplication 2 + -addPool pool2 -defaultReplication 3 + -addDirective -path /foo -pool pool1 -ttl 2d + -addDirective -path /bar -pool pool2 -ttl 24h + -modifyDirective -id 32 -replication 3 + -modifyDirective -id 34 -pool pool1 + -addDirective -path /baz -replication 3 -pool pool1 -ttl 60m + -listDirectives -pool pool1 + + + -removePool pool1 + -removePool pool2 + + + + SubstringComparator + Found 2 entries + + + SubstringComparator + 32 pool1 3 + + + SubstringComparator + 34 pool1 3 + + +