Merge trunk to HDFS-4685.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-4685@1553226 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Chris Nauroth 2013-12-23 22:19:12 +00:00
commit eaa5321619
49 changed files with 1057 additions and 337 deletions

View File

@ -404,6 +404,9 @@ Release 2.4.0 - UNRELEASED
HADOOP-10164. Allow UGI to login with a known Subject (bobby) HADOOP-10164. Allow UGI to login with a known Subject (bobby)
HADOOP-10169. Remove the unnecessary synchronized in JvmMetrics class.
(Liang Xie via jing9)
OPTIMIZATIONS OPTIMIZATIONS
HADOOP-9748. Reduce blocking on UGI.ensureInitialized (daryn) HADOOP-9748. Reduce blocking on UGI.ensureInitialized (daryn)
@ -562,6 +565,9 @@ Release 2.3.0 - UNRELEASED
HADOOP-10087. UserGroupInformation.getGroupNames() fails to return primary HADOOP-10087. UserGroupInformation.getGroupNames() fails to return primary
group first when JniBasedUnixGroupsMappingWithFallback is used (cmccabe) group first when JniBasedUnixGroupsMappingWithFallback is used (cmccabe)
HADOOP-10175. Har files system authority should preserve userinfo.
(Chuan Liu via cnauroth)
Release 2.2.0 - 2013-10-13 Release 2.2.0 - 2013-10-13
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -294,6 +294,10 @@ public FsStatus getStatus(Path p) throws IOException {
private String getHarAuth(URI underLyingUri) { private String getHarAuth(URI underLyingUri) {
String auth = underLyingUri.getScheme() + "-"; String auth = underLyingUri.getScheme() + "-";
if (underLyingUri.getHost() != null) { if (underLyingUri.getHost() != null) {
if (underLyingUri.getUserInfo() != null) {
auth += underLyingUri.getUserInfo();
auth += "@";
}
auth += underLyingUri.getHost(); auth += underLyingUri.getHost();
if (underLyingUri.getPort() != -1) { if (underLyingUri.getPort() != -1) {
auth += ":"; auth += ":";

View File

@ -305,12 +305,13 @@ public HttpServer build() throws IOException {
} }
} }
if (endpoints.size() == 0) { if (endpoints.size() == 0 && connector == null) {
throw new HadoopIllegalArgumentException("No endpoints specified"); throw new HadoopIllegalArgumentException("No endpoints specified");
} }
if (hostName == null) { if (hostName == null) {
hostName = endpoints.get(0).getHost(); hostName = endpoints.size() == 0 ? connector.getHost() : endpoints.get(
0).getHost();
} }
if (this.conf == null) { if (this.conf == null) {

View File

@ -69,13 +69,6 @@ protected Random initialValue() {
*/ */
public static final RetryPolicy RETRY_FOREVER = new RetryForever(); public static final RetryPolicy RETRY_FOREVER = new RetryForever();
/**
* <p>
* Keep failing over forever
* </p>
*/
public static final RetryPolicy FAILOVER_FOREVER = new FailoverForever();
/** /**
* <p> * <p>
* Keep trying a limited number of times, waiting a fixed time between attempts, * Keep trying a limited number of times, waiting a fixed time between attempts,
@ -173,14 +166,6 @@ public RetryAction shouldRetry(Exception e, int retries, int failovers,
return RetryAction.RETRY; return RetryAction.RETRY;
} }
} }
static class FailoverForever implements RetryPolicy {
@Override
public RetryAction shouldRetry(Exception e, int retries, int failovers,
boolean isIdempotentOrAtMostOnce) throws Exception {
return RetryAction.FAILOVER_AND_RETRY;
}
}
/** /**
* Retry up to maxRetries. * Retry up to maxRetries.

View File

@ -24,10 +24,8 @@
import java.lang.management.ThreadInfo; import java.lang.management.ThreadInfo;
import java.lang.management.ThreadMXBean; import java.lang.management.ThreadMXBean;
import java.lang.management.GarbageCollectorMXBean; import java.lang.management.GarbageCollectorMXBean;
import java.util.Map;
import java.util.List; import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import com.google.common.collect.Maps;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.log.metrics.EventCounter; import org.apache.hadoop.log.metrics.EventCounter;
@ -67,7 +65,8 @@ synchronized JvmMetrics init(String processName, String sessionId) {
ManagementFactory.getGarbageCollectorMXBeans(); ManagementFactory.getGarbageCollectorMXBeans();
final ThreadMXBean threadMXBean = ManagementFactory.getThreadMXBean(); final ThreadMXBean threadMXBean = ManagementFactory.getThreadMXBean();
final String processName, sessionId; final String processName, sessionId;
final Map<String, MetricsInfo[]> gcInfoCache = Maps.newHashMap(); final ConcurrentHashMap<String, MetricsInfo[]> gcInfoCache =
new ConcurrentHashMap<String, MetricsInfo[]>();
JvmMetrics(String processName, String sessionId) { JvmMetrics(String processName, String sessionId) {
this.processName = processName; this.processName = processName;
@ -123,13 +122,17 @@ private void getGcUsage(MetricsRecordBuilder rb) {
.addCounter(GcTimeMillis, timeMillis); .addCounter(GcTimeMillis, timeMillis);
} }
private synchronized MetricsInfo[] getGcInfo(String gcName) { private MetricsInfo[] getGcInfo(String gcName) {
MetricsInfo[] gcInfo = gcInfoCache.get(gcName); MetricsInfo[] gcInfo = gcInfoCache.get(gcName);
if (gcInfo == null) { if (gcInfo == null) {
gcInfo = new MetricsInfo[2]; gcInfo = new MetricsInfo[2];
gcInfo[0] = Interns.info("GcCount"+ gcName, "GC Count for "+ gcName); gcInfo[0] = Interns.info("GcCount" + gcName, "GC Count for " + gcName);
gcInfo[1] = Interns.info("GcTimeMillis"+ gcName, "GC Time for "+ gcName); gcInfo[1] = Interns
gcInfoCache.put(gcName, gcInfo); .info("GcTimeMillis" + gcName, "GC Time for " + gcName);
MetricsInfo[] previousGcInfo = gcInfoCache.putIfAbsent(gcName, gcInfo);
if (previousGcInfo != null) {
return previousGcInfo;
}
} }
return gcInfo; return gcInfo;
} }

View File

@ -258,6 +258,22 @@ public void testListLocatedStatus() throws Exception {
0, expectedFileNames.size()); 0, expectedFileNames.size());
} }
@Test
public void testMakeQualifiedPath() throws Exception {
// Construct a valid har file system path with authority that
// contains userinfo and port. The userinfo and port are useless
// in local fs uri. They are only used to verify har file system
// can correctly preserve the information for the underlying file system.
String harPathWithUserinfo = "har://file-user:passwd@localhost:80"
+ harPath.toUri().getPath().toString();
Path path = new Path(harPathWithUserinfo);
Path qualifiedPath = path.getFileSystem(conf).makeQualified(path);
assertTrue(String.format(
"The qualified path (%s) did not match the expected path (%s).",
qualifiedPath.toString(), harPathWithUserinfo),
qualifiedPath.toString().equals(harPathWithUserinfo));
}
// ========== Negative: // ========== Negative:
@Test @Test

View File

@ -66,6 +66,8 @@
import org.mortbay.jetty.Connector; import org.mortbay.jetty.Connector;
import org.mortbay.util.ajax.JSON; import org.mortbay.util.ajax.JSON;
import static org.mockito.Mockito.*;
public class TestHttpServer extends HttpServerFunctionalTest { public class TestHttpServer extends HttpServerFunctionalTest {
static final Log LOG = LogFactory.getLog(TestHttpServer.class); static final Log LOG = LogFactory.getLog(TestHttpServer.class);
private static HttpServer server; private static HttpServer server;
@ -588,4 +590,15 @@ public void testNoCacheHeader() throws Exception {
assertEquals(conn.getHeaderField("Expires"), conn.getHeaderField("Date")); assertEquals(conn.getHeaderField("Expires"), conn.getHeaderField("Date"));
} }
/**
* HTTPServer.Builder should proceed if a external connector is available.
*/
@Test
public void testHttpServerBuilderWithExternalConnector() throws Exception {
Connector c = mock(Connector.class);
doReturn("localhost").when(c).getHost();
HttpServer s = new HttpServer.Builder().setName("test").setConnector(c)
.build();
s.stop();
}
} }

View File

@ -241,6 +241,8 @@ Trunk (Unreleased)
HDFS-5431. Support cachepool-based limit management in path-based caching HDFS-5431. Support cachepool-based limit management in path-based caching
(awang via cmccabe) (awang via cmccabe)
HDFS-5636. Enforce a max TTL per cache pool. (awang via cmccabe)
OPTIMIZATIONS OPTIMIZATIONS
HDFS-5349. DNA_CACHE and DNA_UNCACHE should be by blockId only. (cmccabe) HDFS-5349. DNA_CACHE and DNA_UNCACHE should be by blockId only. (cmccabe)
@ -763,6 +765,9 @@ Release 2.4.0 - UNRELEASED
HDFS-5540. Fix intermittent failure in TestBlocksWithNotEnoughRacks. HDFS-5540. Fix intermittent failure in TestBlocksWithNotEnoughRacks.
(Binglin Chang via junping_du) (Binglin Chang via junping_du)
HDFS-2933. Improve DataNode Web UI Index Page. (Vivek Ganesan via
Arpit Agarwal)
OPTIMIZATIONS OPTIMIZATIONS
HDFS-5239. Allow FSNamesystem lock fairness to be configurable (daryn) HDFS-5239. Allow FSNamesystem lock fairness to be configurable (daryn)
@ -770,6 +775,8 @@ Release 2.4.0 - UNRELEASED
HDFS-5341. Reduce fsdataset lock duration during directory scanning. HDFS-5341. Reduce fsdataset lock duration during directory scanning.
(Qus-Jiawei via kihwal) (Qus-Jiawei via kihwal)
HDFS-5681. renewLease should not hold fsn write lock. (daryn via Kihwal)
BUG FIXES BUG FIXES
HDFS-5034. Remove debug prints from GetFileLinkInfo (Andrew Wang via Colin HDFS-5034. Remove debug prints from GetFileLinkInfo (Andrew Wang via Colin
@ -820,6 +827,12 @@ Release 2.4.0 - UNRELEASED
HDFS-5676. fix inconsistent synchronization of CachingStrategy (cmccabe) HDFS-5676. fix inconsistent synchronization of CachingStrategy (cmccabe)
HDFS-5691. Fix typo in ShortCircuitLocalRead document.
(Akira Ajisaka via suresh)
HDFS-5690. DataNode fails to start in secure mode when dfs.http.policy equals to
HTTP_ONLY. (Haohui Mai via jing9)
Release 2.3.0 - UNRELEASED Release 2.3.0 - UNRELEASED
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES
@ -1132,9 +1145,6 @@ Release 2.1.1-beta - 2013-09-23
HDFS-5047. Supress logging of full stack trace of quota and lease HDFS-5047. Supress logging of full stack trace of quota and lease
exceptions. (Robert Parker via kihwal) exceptions. (Robert Parker via kihwal)
HDFS-2933. Improve DataNode Web UI Index Page. (Vivek Ganesan via
Arpit Agarwal)
HDFS-5111. Remove duplicated error message for snapshot commands when HDFS-5111. Remove duplicated error message for snapshot commands when
processing invalid arguments. (jing9) processing invalid arguments. (jing9)

View File

@ -1546,7 +1546,11 @@ public static String dateToIso8601String(Date date) {
* Converts a time duration in milliseconds into DDD:HH:MM:SS format. * Converts a time duration in milliseconds into DDD:HH:MM:SS format.
*/ */
public static String durationToString(long durationMs) { public static String durationToString(long durationMs) {
Preconditions.checkArgument(durationMs >= 0, "Invalid negative duration"); boolean negative = false;
if (durationMs < 0) {
negative = true;
durationMs = -durationMs;
}
// Chop off the milliseconds // Chop off the milliseconds
long durationSec = durationMs / 1000; long durationSec = durationMs / 1000;
final int secondsPerMinute = 60; final int secondsPerMinute = 60;
@ -1559,7 +1563,12 @@ public static String durationToString(long durationMs) {
final long minutes = durationSec / secondsPerMinute; final long minutes = durationSec / secondsPerMinute;
durationSec -= minutes * secondsPerMinute; durationSec -= minutes * secondsPerMinute;
final long seconds = durationSec; final long seconds = durationSec;
return String.format("%03d:%02d:%02d:%02d", days, hours, minutes, seconds); final long milliseconds = durationMs % 1000;
String format = "%03d:%02d:%02d:%02d.%03d";
if (negative) {
format = "-" + format;
}
return String.format(format, days, hours, minutes, seconds, milliseconds);
} }
/** /**
@ -1571,9 +1580,9 @@ public static long parseRelativeTime(String relTime) throws IOException {
+ ": too short"); + ": too short");
} }
String ttlString = relTime.substring(0, relTime.length()-1); String ttlString = relTime.substring(0, relTime.length()-1);
int ttl; long ttl;
try { try {
ttl = Integer.parseInt(ttlString); ttl = Long.parseLong(ttlString);
} catch (NumberFormatException e) { } catch (NumberFormatException e) {
throw new IOException("Unable to parse relative time value of " + relTime throw new IOException("Unable to parse relative time value of " + relTime
+ ": " + ttlString + " is not a number"); + ": " + ttlString + " is not a number");

View File

@ -52,6 +52,14 @@ public final class CacheDirective implements IntrusiveCollection.Element {
private Element prev; private Element prev;
private Element next; private Element next;
public CacheDirective(CacheDirectiveInfo info) {
this(
info.getId(),
info.getPath().toUri().getPath(),
info.getReplication(),
info.getExpiration().getAbsoluteMillis());
}
public CacheDirective(long id, String path, public CacheDirective(long id, String path,
short replication, long expiryTime) { short replication, long expiryTime) {
Preconditions.checkArgument(id > 0); Preconditions.checkArgument(id > 0);

View File

@ -26,6 +26,8 @@
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.DFSUtil;
import com.google.common.base.Preconditions;
/** /**
* Describes a path-based cache directive. * Describes a path-based cache directive.
*/ */
@ -138,11 +140,22 @@ public Builder setExpiration(Expiration expiration) {
*/ */
public static class Expiration { public static class Expiration {
/** Denotes a CacheDirectiveInfo that never expires **/ /**
public static final int EXPIRY_NEVER = -1; * The maximum value we accept for a relative expiry.
*/
public static final long MAX_RELATIVE_EXPIRY_MS =
Long.MAX_VALUE / 4; // This helps prevent weird overflow bugs
/**
* An relative Expiration that never expires.
*/
public static final Expiration NEVER = newRelative(MAX_RELATIVE_EXPIRY_MS);
/** /**
* Create a new relative Expiration. * Create a new relative Expiration.
* <p>
* Use {@link Expiration#NEVER} to indicate an Expiration that never
* expires.
* *
* @param ms how long until the CacheDirective expires, in milliseconds * @param ms how long until the CacheDirective expires, in milliseconds
* @return A relative Expiration * @return A relative Expiration
@ -153,6 +166,9 @@ public static Expiration newRelative(long ms) {
/** /**
* Create a new absolute Expiration. * Create a new absolute Expiration.
* <p>
* Use {@link Expiration#NEVER} to indicate an Expiration that never
* expires.
* *
* @param date when the CacheDirective expires * @param date when the CacheDirective expires
* @return An absolute Expiration * @return An absolute Expiration
@ -163,6 +179,9 @@ public static Expiration newAbsolute(Date date) {
/** /**
* Create a new absolute Expiration. * Create a new absolute Expiration.
* <p>
* Use {@link Expiration#NEVER} to indicate an Expiration that never
* expires.
* *
* @param ms when the CacheDirective expires, in milliseconds since the Unix * @param ms when the CacheDirective expires, in milliseconds since the Unix
* epoch. * epoch.
@ -176,6 +195,10 @@ public static Expiration newAbsolute(long ms) {
private final boolean isRelative; private final boolean isRelative;
private Expiration(long ms, boolean isRelative) { private Expiration(long ms, boolean isRelative) {
if (isRelative) {
Preconditions.checkArgument(ms <= MAX_RELATIVE_EXPIRY_MS,
"Expiration time is too far in the future!");
}
this.ms = ms; this.ms = ms;
this.isRelative = isRelative; this.isRelative = isRelative;
} }

View File

@ -30,6 +30,7 @@
import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.InvalidRequestException; import org.apache.hadoop.fs.InvalidRequestException;
import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo.Expiration;
/** /**
* CachePoolInfo describes a cache pool. * CachePoolInfo describes a cache pool.
@ -42,6 +43,20 @@
public class CachePoolInfo { public class CachePoolInfo {
public static final Log LOG = LogFactory.getLog(CachePoolInfo.class); public static final Log LOG = LogFactory.getLog(CachePoolInfo.class);
/**
* Indicates that the pool does not have a maximum relative expiry.
*/
public static final long RELATIVE_EXPIRY_NEVER =
Expiration.MAX_RELATIVE_EXPIRY_MS;
/**
* Default max relative expiry for cache pools.
*/
public static final long DEFAULT_MAX_RELATIVE_EXPIRY =
RELATIVE_EXPIRY_NEVER;
public static final long LIMIT_UNLIMITED = Long.MAX_VALUE;
public static final long DEFAULT_LIMIT = LIMIT_UNLIMITED;
final String poolName; final String poolName;
@Nullable @Nullable
@ -56,14 +71,24 @@ public class CachePoolInfo {
@Nullable @Nullable
Long limit; Long limit;
@Nullable
Long maxRelativeExpiryMs;
public CachePoolInfo(String poolName) { public CachePoolInfo(String poolName) {
this.poolName = poolName; this.poolName = poolName;
} }
/**
* @return Name of the pool.
*/
public String getPoolName() { public String getPoolName() {
return poolName; return poolName;
} }
/**
* @return The owner of the pool. Along with the group and mode, determines
* who has access to view and modify the pool.
*/
public String getOwnerName() { public String getOwnerName() {
return ownerName; return ownerName;
} }
@ -73,6 +98,10 @@ public CachePoolInfo setOwnerName(String ownerName) {
return this; return this;
} }
/**
* @return The group of the pool. Along with the owner and mode, determines
* who has access to view and modify the pool.
*/
public String getGroupName() { public String getGroupName() {
return groupName; return groupName;
} }
@ -81,7 +110,11 @@ public CachePoolInfo setGroupName(String groupName) {
this.groupName = groupName; this.groupName = groupName;
return this; return this;
} }
/**
* @return Unix-style permissions of the pool. Along with the owner and group,
* determines who has access to view and modify the pool.
*/
public FsPermission getMode() { public FsPermission getMode() {
return mode; return mode;
} }
@ -91,6 +124,10 @@ public CachePoolInfo setMode(FsPermission mode) {
return this; return this;
} }
/**
* @return The maximum aggregate number of bytes that can be cached by
* directives in this pool.
*/
public Long getLimit() { public Long getLimit() {
return limit; return limit;
} }
@ -100,6 +137,26 @@ public CachePoolInfo setLimit(Long bytes) {
return this; return this;
} }
/**
* @return The maximum relative expiration of directives of this pool in
* milliseconds
*/
public Long getMaxRelativeExpiryMs() {
return maxRelativeExpiryMs;
}
/**
* Set the maximum relative expiration of directives of this pool in
* milliseconds.
*
* @param ms in milliseconds
* @return This builder, for call chaining.
*/
public CachePoolInfo setMaxRelativeExpiryMs(Long ms) {
this.maxRelativeExpiryMs = ms;
return this;
}
public String toString() { public String toString() {
return new StringBuilder().append("{"). return new StringBuilder().append("{").
append("poolName:").append(poolName). append("poolName:").append(poolName).
@ -108,6 +165,7 @@ public String toString() {
append(", mode:").append((mode == null) ? "null" : append(", mode:").append((mode == null) ? "null" :
String.format("0%03o", mode.toShort())). String.format("0%03o", mode.toShort())).
append(", limit:").append(limit). append(", limit:").append(limit).
append(", maxRelativeExpiryMs:").append(maxRelativeExpiryMs).
append("}").toString(); append("}").toString();
} }
@ -125,6 +183,7 @@ public boolean equals(Object o) {
append(groupName, other.groupName). append(groupName, other.groupName).
append(mode, other.mode). append(mode, other.mode).
append(limit, other.limit). append(limit, other.limit).
append(maxRelativeExpiryMs, other.maxRelativeExpiryMs).
isEquals(); isEquals();
} }
@ -136,6 +195,7 @@ public int hashCode() {
append(groupName). append(groupName).
append(mode). append(mode).
append(limit). append(limit).
append(maxRelativeExpiryMs).
hashCode(); hashCode();
} }
@ -146,6 +206,15 @@ public static void validate(CachePoolInfo info) throws IOException {
if ((info.getLimit() != null) && (info.getLimit() < 0)) { if ((info.getLimit() != null) && (info.getLimit() < 0)) {
throw new InvalidRequestException("Limit is negative."); throw new InvalidRequestException("Limit is negative.");
} }
if (info.getMaxRelativeExpiryMs() != null) {
long maxRelativeExpiryMs = info.getMaxRelativeExpiryMs();
if (maxRelativeExpiryMs < 0l) {
throw new InvalidRequestException("Max relative expiry is negative.");
}
if (maxRelativeExpiryMs > Expiration.MAX_RELATIVE_EXPIRY_MS) {
throw new InvalidRequestException("Max relative expiry is too big.");
}
}
validateName(info.poolName); validateName(info.poolName);
} }

View File

@ -1839,6 +1839,9 @@ public static CachePoolInfoProto convert(CachePoolInfo info) {
if (info.getLimit() != null) { if (info.getLimit() != null) {
builder.setLimit(info.getLimit()); builder.setLimit(info.getLimit());
} }
if (info.getMaxRelativeExpiryMs() != null) {
builder.setMaxRelativeExpiry(info.getMaxRelativeExpiryMs());
}
return builder.build(); return builder.build();
} }
@ -1858,6 +1861,9 @@ public static CachePoolInfo convert (CachePoolInfoProto proto) {
if (proto.hasLimit()) { if (proto.hasLimit()) {
info.setLimit(proto.getLimit()); info.setLimit(proto.getLimit());
} }
if (proto.hasMaxRelativeExpiry()) {
info.setMaxRelativeExpiryMs(proto.getMaxRelativeExpiry());
}
return info; return info;
} }

View File

@ -365,7 +365,7 @@ private void rescanCacheDirectives() {
if (directive.getExpiryTime() > 0 && directive.getExpiryTime() <= now) { if (directive.getExpiryTime() > 0 && directive.getExpiryTime() <= now) {
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("Skipping directive id " + directive.getId() LOG.debug("Skipping directive id " + directive.getId()
+ " because it has expired (" + directive.getExpiryTime() + ">=" + " because it has expired (" + directive.getExpiryTime() + "<="
+ now + ")"); + now + ")");
} }
continue; continue;

View File

@ -87,6 +87,7 @@ public void start() throws Exception {
public static SecureResources getSecureResources(Configuration conf) public static SecureResources getSecureResources(Configuration conf)
throws Exception { throws Exception {
HttpConfig.Policy policy = DFSUtil.getHttpPolicy(conf); HttpConfig.Policy policy = DFSUtil.getHttpPolicy(conf);
boolean isSecure = UserGroupInformation.isSecurityEnabled();
// Obtain secure port for data streaming to datanode // Obtain secure port for data streaming to datanode
InetSocketAddress streamingAddr = DataNode.getStreamingAddr(conf); InetSocketAddress streamingAddr = DataNode.getStreamingAddr(conf);
@ -106,6 +107,11 @@ public static SecureResources getSecureResources(Configuration conf)
+ ss.getLocalPort()); + ss.getLocalPort());
} }
if (ss.getLocalPort() > 1023 && isSecure) {
throw new RuntimeException(
"Cannot start secure datanode with unprivileged RPC ports");
}
System.err.println("Opened streaming server at " + streamingAddr); System.err.println("Opened streaming server at " + streamingAddr);
// Bind a port for the web server. The code intends to bind HTTP server to // Bind a port for the web server. The code intends to bind HTTP server to
@ -126,9 +132,9 @@ public static SecureResources getSecureResources(Configuration conf)
System.err.println("Successfully obtained privileged resources (streaming port = " System.err.println("Successfully obtained privileged resources (streaming port = "
+ ss + " ) (http listener port = " + listener.getConnection() +")"); + ss + " ) (http listener port = " + listener.getConnection() +")");
if ((ss.getLocalPort() > 1023 || listener.getPort() > 1023) && if (listener.getPort() > 1023 && isSecure) {
UserGroupInformation.isSecurityEnabled()) { throw new RuntimeException(
throw new RuntimeException("Cannot start secure datanode with unprivileged ports"); "Cannot start secure datanode with unprivileged HTTP ports");
} }
System.err.println("Opened info server at " + infoSocAddr); System.err.println("Opened info server at " + infoSocAddr);
} }

View File

@ -32,6 +32,7 @@
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.Date;
import java.util.EnumSet; import java.util.EnumSet;
import java.util.Iterator; import java.util.Iterator;
import java.util.LinkedList; import java.util.LinkedList;
@ -55,6 +56,7 @@
import org.apache.hadoop.hdfs.protocol.CacheDirective; import org.apache.hadoop.hdfs.protocol.CacheDirective;
import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry; import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry;
import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo; import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo;
import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo.Expiration;
import org.apache.hadoop.hdfs.protocol.CacheDirectiveStats; import org.apache.hadoop.hdfs.protocol.CacheDirectiveStats;
import org.apache.hadoop.hdfs.protocol.CachePoolEntry; import org.apache.hadoop.hdfs.protocol.CachePoolEntry;
import org.apache.hadoop.hdfs.protocol.CachePoolInfo; import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
@ -322,27 +324,48 @@ private static short validateReplication(CacheDirectiveInfo directive,
* {@link CacheDirectiveInfo.Expiration}. This converts a relative Expiration * {@link CacheDirectiveInfo.Expiration}. This converts a relative Expiration
* into an absolute time based on the local clock. * into an absolute time based on the local clock.
* *
* @param directive from which to get the expiry time * @param info to validate.
* @param defaultValue to use if Expiration is not set * @param maxRelativeExpiryTime of the info's pool.
* @return Absolute expiry time in milliseconds since Unix epoch * @return the expiration time, or the pool's max absolute expiration if the
* @throws InvalidRequestException if the Expiration is invalid * info's expiration was not set.
* @throws InvalidRequestException if the info's Expiration is invalid.
*/ */
private static long validateExpiryTime(CacheDirectiveInfo directive, private static long validateExpiryTime(CacheDirectiveInfo info,
long defaultValue) throws InvalidRequestException { long maxRelativeExpiryTime) throws InvalidRequestException {
long expiryTime; if (LOG.isTraceEnabled()) {
CacheDirectiveInfo.Expiration expiration = directive.getExpiration(); LOG.trace("Validating directive " + info
if (expiration != null) { + " pool maxRelativeExpiryTime " + maxRelativeExpiryTime);
if (expiration.getMillis() < 0) {
throw new InvalidRequestException("Cannot set a negative expiration: "
+ expiration.getMillis());
}
// Converts a relative duration into an absolute time based on the local
// clock
expiryTime = expiration.getAbsoluteMillis();
} else {
expiryTime = defaultValue;
} }
return expiryTime; final long now = new Date().getTime();
final long maxAbsoluteExpiryTime = now + maxRelativeExpiryTime;
if (info == null || info.getExpiration() == null) {
return maxAbsoluteExpiryTime;
}
Expiration expiry = info.getExpiration();
if (expiry.getMillis() < 0l) {
throw new InvalidRequestException("Cannot set a negative expiration: "
+ expiry.getMillis());
}
long relExpiryTime, absExpiryTime;
if (expiry.isRelative()) {
relExpiryTime = expiry.getMillis();
absExpiryTime = now + relExpiryTime;
} else {
absExpiryTime = expiry.getMillis();
relExpiryTime = absExpiryTime - now;
}
// Need to cap the expiry so we don't overflow a long when doing math
if (relExpiryTime > Expiration.MAX_RELATIVE_EXPIRY_MS) {
throw new InvalidRequestException("Expiration "
+ expiry.toString() + " is too far in the future!");
}
// Fail if the requested expiry is greater than the max
if (relExpiryTime > maxRelativeExpiryTime) {
throw new InvalidRequestException("Expiration " + expiry.toString()
+ " exceeds the max relative expiration time of "
+ maxRelativeExpiryTime + " ms.");
}
return absExpiryTime;
} }
/** /**
@ -357,6 +380,9 @@ private static long validateExpiryTime(CacheDirectiveInfo directive,
private void checkLimit(CachePool pool, String path, private void checkLimit(CachePool pool, String path,
short replication) throws InvalidRequestException { short replication) throws InvalidRequestException {
CacheDirectiveStats stats = computeNeeded(path, replication); CacheDirectiveStats stats = computeNeeded(path, replication);
if (pool.getLimit() == CachePoolInfo.LIMIT_UNLIMITED) {
return;
}
if (pool.getBytesNeeded() + (stats.getBytesNeeded() * replication) > pool if (pool.getBytesNeeded() + (stats.getBytesNeeded() * replication) > pool
.getLimit()) { .getLimit()) {
throw new InvalidRequestException("Caching path " + path + " of size " throw new InvalidRequestException("Caching path " + path + " of size "
@ -461,17 +487,13 @@ private void addInternal(CacheDirective directive, CachePool pool) {
} }
/** /**
* To be called only from the edit log loading code * Adds a directive, skipping most error checking. This should only be called
* internally in special scenarios like edit log replay.
*/ */
CacheDirectiveInfo addDirectiveFromEditLog(CacheDirectiveInfo directive) CacheDirectiveInfo addDirectiveFromEditLog(CacheDirectiveInfo directive)
throws InvalidRequestException { throws InvalidRequestException {
long id = directive.getId(); long id = directive.getId();
CacheDirective entry = CacheDirective entry = new CacheDirective(directive);
new CacheDirective(
directive.getId(),
directive.getPath().toUri().getPath(),
directive.getReplication(),
directive.getExpiration().getAbsoluteMillis());
CachePool pool = cachePools.get(directive.getPool()); CachePool pool = cachePools.get(directive.getPool());
addInternal(entry, pool); addInternal(entry, pool);
if (nextDirectiveId <= id) { if (nextDirectiveId <= id) {
@ -490,8 +512,7 @@ public CacheDirectiveInfo addDirective(
checkWritePermission(pc, pool); checkWritePermission(pc, pool);
String path = validatePath(info); String path = validatePath(info);
short replication = validateReplication(info, (short)1); short replication = validateReplication(info, (short)1);
long expiryTime = validateExpiryTime(info, long expiryTime = validateExpiryTime(info, pool.getMaxRelativeExpiryMs());
CacheDirectiveInfo.Expiration.EXPIRY_NEVER);
// Do quota validation if required // Do quota validation if required
if (!flags.contains(CacheFlag.FORCE)) { if (!flags.contains(CacheFlag.FORCE)) {
// Can't kick and wait if caching is disabled // Can't kick and wait if caching is disabled
@ -513,6 +534,56 @@ public CacheDirectiveInfo addDirective(
return directive.toInfo(); return directive.toInfo();
} }
/**
* Factory method that makes a new CacheDirectiveInfo by applying fields in a
* CacheDirectiveInfo to an existing CacheDirective.
*
* @param info with some or all fields set.
* @param defaults directive providing default values for unset fields in
* info.
*
* @return new CacheDirectiveInfo of the info applied to the defaults.
*/
private static CacheDirectiveInfo createFromInfoAndDefaults(
CacheDirectiveInfo info, CacheDirective defaults) {
// Initialize the builder with the default values
CacheDirectiveInfo.Builder builder =
new CacheDirectiveInfo.Builder(defaults.toInfo());
// Replace default with new value if present
if (info.getPath() != null) {
builder.setPath(info.getPath());
}
if (info.getReplication() != null) {
builder.setReplication(info.getReplication());
}
if (info.getPool() != null) {
builder.setPool(info.getPool());
}
if (info.getExpiration() != null) {
builder.setExpiration(info.getExpiration());
}
return builder.build();
}
/**
* Modifies a directive, skipping most error checking. This is for careful
* internal use only. modifyDirective can be non-deterministic since its error
* checking depends on current system time, which poses a problem for edit log
* replay.
*/
void modifyDirectiveFromEditLog(CacheDirectiveInfo info)
throws InvalidRequestException {
// Check for invalid IDs.
Long id = info.getId();
if (id == null) {
throw new InvalidRequestException("Must supply an ID.");
}
CacheDirective prevEntry = getById(id);
CacheDirectiveInfo newInfo = createFromInfoAndDefaults(info, prevEntry);
removeInternal(prevEntry);
addInternal(new CacheDirective(newInfo), getCachePool(newInfo.getPool()));
}
public void modifyDirective(CacheDirectiveInfo info, public void modifyDirective(CacheDirectiveInfo info,
FSPermissionChecker pc, EnumSet<CacheFlag> flags) throws IOException { FSPermissionChecker pc, EnumSet<CacheFlag> flags) throws IOException {
assert namesystem.hasWriteLock(); assert namesystem.hasWriteLock();
@ -527,33 +598,38 @@ public void modifyDirective(CacheDirectiveInfo info,
} }
CacheDirective prevEntry = getById(id); CacheDirective prevEntry = getById(id);
checkWritePermission(pc, prevEntry.getPool()); checkWritePermission(pc, prevEntry.getPool());
String path = prevEntry.getPath();
if (info.getPath() != null) {
path = validatePath(info);
}
short replication = prevEntry.getReplication(); // Fill in defaults
replication = validateReplication(info, replication); CacheDirectiveInfo infoWithDefaults =
createFromInfoAndDefaults(info, prevEntry);
CacheDirectiveInfo.Builder builder =
new CacheDirectiveInfo.Builder(infoWithDefaults);
long expiryTime = prevEntry.getExpiryTime(); // Do validation
expiryTime = validateExpiryTime(info, expiryTime); validatePath(infoWithDefaults);
validateReplication(infoWithDefaults, (short)-1);
CachePool pool = prevEntry.getPool(); // Need to test the pool being set here to avoid rejecting a modify for a
if (info.getPool() != null) { // directive that's already been forced into a pool
pool = getCachePool(validatePoolName(info)); CachePool srcPool = prevEntry.getPool();
checkWritePermission(pc, pool); CachePool destPool = getCachePool(validatePoolName(infoWithDefaults));
if (!srcPool.getPoolName().equals(destPool.getPoolName())) {
checkWritePermission(pc, destPool);
if (!flags.contains(CacheFlag.FORCE)) { if (!flags.contains(CacheFlag.FORCE)) {
// Can't kick and wait if caching is disabled checkLimit(destPool, infoWithDefaults.getPath().toUri().getPath(),
if (monitor != null) { infoWithDefaults.getReplication());
monitor.waitForRescan();
}
checkLimit(pool, path, replication);
} }
} }
// Verify the expiration against the destination pool
validateExpiryTime(infoWithDefaults, destPool.getMaxRelativeExpiryMs());
// Indicate changes to the CRM
if (monitor != null) {
monitor.setNeedsRescan();
}
// Validation passed
removeInternal(prevEntry); removeInternal(prevEntry);
CacheDirective newEntry = addInternal(new CacheDirective(builder.build()), destPool);
new CacheDirective(id, path, replication, expiryTime);
addInternal(newEntry, pool);
} catch (IOException e) { } catch (IOException e) {
LOG.warn("modifyDirective of " + idString + " failed: ", e); LOG.warn("modifyDirective of " + idString + " failed: ", e);
throw e; throw e;
@ -562,7 +638,7 @@ public void modifyDirective(CacheDirectiveInfo info,
info+ "."); info+ ".");
} }
public void removeInternal(CacheDirective directive) private void removeInternal(CacheDirective directive)
throws InvalidRequestException { throws InvalidRequestException {
assert namesystem.hasWriteLock(); assert namesystem.hasWriteLock();
// Remove the corresponding entry in directivesByPath. // Remove the corresponding entry in directivesByPath.
@ -734,6 +810,13 @@ public void modifyCachePool(CachePoolInfo info)
monitor.setNeedsRescan(); monitor.setNeedsRescan();
} }
} }
if (info.getMaxRelativeExpiryMs() != null) {
final Long maxRelativeExpiry = info.getMaxRelativeExpiryMs();
pool.setMaxRelativeExpiryMs(maxRelativeExpiry);
bld.append(prefix).append("set maxRelativeExpiry to "
+ maxRelativeExpiry);
prefix = "; ";
}
if (prefix.isEmpty()) { if (prefix.isEmpty()) {
bld.append("no changes."); bld.append("no changes.");
} }

View File

@ -49,8 +49,6 @@
public final class CachePool { public final class CachePool {
public static final Log LOG = LogFactory.getLog(CachePool.class); public static final Log LOG = LogFactory.getLog(CachePool.class);
public static final long DEFAULT_LIMIT = Long.MAX_VALUE;
@Nonnull @Nonnull
private final String poolName; private final String poolName;
@ -76,6 +74,12 @@ public final class CachePool {
*/ */
private long limit; private long limit;
/**
* Maximum duration that a CacheDirective in this pool remains valid,
* in milliseconds.
*/
private long maxRelativeExpiryMs;
private long bytesNeeded; private long bytesNeeded;
private long bytesCached; private long bytesCached;
private long filesNeeded; private long filesNeeded;
@ -122,9 +126,12 @@ static CachePool createFromInfoAndDefaults(CachePoolInfo info)
FsPermission mode = (info.getMode() == null) ? FsPermission mode = (info.getMode() == null) ?
FsPermission.getCachePoolDefault() : info.getMode(); FsPermission.getCachePoolDefault() : info.getMode();
long limit = info.getLimit() == null ? long limit = info.getLimit() == null ?
DEFAULT_LIMIT : info.getLimit(); CachePoolInfo.DEFAULT_LIMIT : info.getLimit();
long maxRelativeExpiry = info.getMaxRelativeExpiryMs() == null ?
CachePoolInfo.DEFAULT_MAX_RELATIVE_EXPIRY :
info.getMaxRelativeExpiryMs();
return new CachePool(info.getPoolName(), return new CachePool(info.getPoolName(),
ownerName, groupName, mode, limit); ownerName, groupName, mode, limit, maxRelativeExpiry);
} }
/** /**
@ -134,11 +141,11 @@ static CachePool createFromInfoAndDefaults(CachePoolInfo info)
static CachePool createFromInfo(CachePoolInfo info) { static CachePool createFromInfo(CachePoolInfo info) {
return new CachePool(info.getPoolName(), return new CachePool(info.getPoolName(),
info.getOwnerName(), info.getGroupName(), info.getOwnerName(), info.getGroupName(),
info.getMode(), info.getLimit()); info.getMode(), info.getLimit(), info.getMaxRelativeExpiryMs());
} }
CachePool(String poolName, String ownerName, String groupName, CachePool(String poolName, String ownerName, String groupName,
FsPermission mode, long limit) { FsPermission mode, long limit, long maxRelativeExpiry) {
Preconditions.checkNotNull(poolName); Preconditions.checkNotNull(poolName);
Preconditions.checkNotNull(ownerName); Preconditions.checkNotNull(ownerName);
Preconditions.checkNotNull(groupName); Preconditions.checkNotNull(groupName);
@ -148,6 +155,7 @@ static CachePool createFromInfo(CachePoolInfo info) {
this.groupName = groupName; this.groupName = groupName;
this.mode = new FsPermission(mode); this.mode = new FsPermission(mode);
this.limit = limit; this.limit = limit;
this.maxRelativeExpiryMs = maxRelativeExpiry;
} }
public String getPoolName() { public String getPoolName() {
@ -190,6 +198,15 @@ public CachePool setLimit(long bytes) {
return this; return this;
} }
public long getMaxRelativeExpiryMs() {
return maxRelativeExpiryMs;
}
public CachePool setMaxRelativeExpiryMs(long expiry) {
this.maxRelativeExpiryMs = expiry;
return this;
}
/** /**
* Get either full or partial information about this CachePool. * Get either full or partial information about this CachePool.
* *
@ -207,7 +224,8 @@ CachePoolInfo getInfo(boolean fullInfo) {
return info.setOwnerName(ownerName). return info.setOwnerName(ownerName).
setGroupName(groupName). setGroupName(groupName).
setMode(new FsPermission(mode)). setMode(new FsPermission(mode)).
setLimit(limit); setLimit(limit).
setMaxRelativeExpiryMs(maxRelativeExpiryMs);
} }
/** /**
@ -300,6 +318,7 @@ public String toString() {
append(", groupName:").append(groupName). append(", groupName:").append(groupName).
append(", mode:").append(mode). append(", mode:").append(mode).
append(", limit:").append(limit). append(", limit:").append(limit).
append(", maxRelativeExpiryMs:").append(maxRelativeExpiryMs).
append(" }").toString(); append(" }").toString();
} }

View File

@ -652,8 +652,8 @@ private long applyEditLogOp(FSEditLogOp op, FSDirectory fsDir,
case OP_MODIFY_CACHE_DIRECTIVE: { case OP_MODIFY_CACHE_DIRECTIVE: {
ModifyCacheDirectiveInfoOp modifyOp = ModifyCacheDirectiveInfoOp modifyOp =
(ModifyCacheDirectiveInfoOp) op; (ModifyCacheDirectiveInfoOp) op;
fsNamesys.getCacheManager().modifyDirective( fsNamesys.getCacheManager().modifyDirectiveFromEditLog(
modifyOp.directive, null, EnumSet.of(CacheFlag.FORCE)); modifyOp.directive);
if (toAddRetryCache) { if (toAddRetryCache) {
fsNamesys.addCacheEntry(op.rpcClientId, op.rpcCallId); fsNamesys.addCacheEntry(op.rpcClientId, op.rpcCallId);
} }

View File

@ -604,18 +604,22 @@ public static void writeCachePoolInfo(DataOutputStream out, CachePoolInfo info)
final String groupName = info.getGroupName(); final String groupName = info.getGroupName();
final Long limit = info.getLimit(); final Long limit = info.getLimit();
final FsPermission mode = info.getMode(); final FsPermission mode = info.getMode();
final Long maxRelativeExpiry = info.getMaxRelativeExpiryMs();
boolean hasOwner, hasGroup, hasMode, hasLimit; boolean hasOwner, hasGroup, hasMode, hasLimit, hasMaxRelativeExpiry;
hasOwner = ownerName != null; hasOwner = ownerName != null;
hasGroup = groupName != null; hasGroup = groupName != null;
hasMode = mode != null; hasMode = mode != null;
hasLimit = limit != null; hasLimit = limit != null;
hasMaxRelativeExpiry = maxRelativeExpiry != null;
int flags = int flags =
(hasOwner ? 0x1 : 0) | (hasOwner ? 0x1 : 0) |
(hasGroup ? 0x2 : 0) | (hasGroup ? 0x2 : 0) |
(hasMode ? 0x4 : 0) | (hasMode ? 0x4 : 0) |
(hasLimit ? 0x8 : 0); (hasLimit ? 0x8 : 0) |
(hasMaxRelativeExpiry ? 0x10 : 0);
writeInt(flags, out); writeInt(flags, out);
if (hasOwner) { if (hasOwner) {
@ -630,6 +634,9 @@ public static void writeCachePoolInfo(DataOutputStream out, CachePoolInfo info)
if (hasLimit) { if (hasLimit) {
writeLong(limit, out); writeLong(limit, out);
} }
if (hasMaxRelativeExpiry) {
writeLong(maxRelativeExpiry, out);
}
} }
public static CachePoolInfo readCachePoolInfo(DataInput in) public static CachePoolInfo readCachePoolInfo(DataInput in)
@ -649,7 +656,10 @@ public static CachePoolInfo readCachePoolInfo(DataInput in)
if ((flags & 0x8) != 0) { if ((flags & 0x8) != 0) {
info.setLimit(readLong(in)); info.setLimit(readLong(in));
} }
if ((flags & ~0xF) != 0) { if ((flags & 0x10) != 0) {
info.setMaxRelativeExpiryMs(readLong(in));
}
if ((flags & ~0x1F) != 0) {
throw new IOException("Unknown flag in CachePoolInfo: " + flags); throw new IOException("Unknown flag in CachePoolInfo: " + flags);
} }
return info; return info;
@ -663,6 +673,7 @@ public static void writeCachePoolInfo(ContentHandler contentHandler,
final String groupName = info.getGroupName(); final String groupName = info.getGroupName();
final Long limit = info.getLimit(); final Long limit = info.getLimit();
final FsPermission mode = info.getMode(); final FsPermission mode = info.getMode();
final Long maxRelativeExpiry = info.getMaxRelativeExpiryMs();
if (ownerName != null) { if (ownerName != null) {
XMLUtils.addSaxString(contentHandler, "OWNERNAME", ownerName); XMLUtils.addSaxString(contentHandler, "OWNERNAME", ownerName);
@ -677,6 +688,10 @@ public static void writeCachePoolInfo(ContentHandler contentHandler,
XMLUtils.addSaxString(contentHandler, "LIMIT", XMLUtils.addSaxString(contentHandler, "LIMIT",
Long.toString(limit)); Long.toString(limit));
} }
if (maxRelativeExpiry != null) {
XMLUtils.addSaxString(contentHandler, "MAXRELATIVEEXPIRY",
Long.toString(maxRelativeExpiry));
}
} }
public static CachePoolInfo readCachePoolInfo(Stanza st) public static CachePoolInfo readCachePoolInfo(Stanza st)
@ -695,6 +710,10 @@ public static CachePoolInfo readCachePoolInfo(Stanza st)
if (st.hasChildren("LIMIT")) { if (st.hasChildren("LIMIT")) {
info.setLimit(Long.parseLong(st.getValue("LIMIT"))); info.setLimit(Long.parseLong(st.getValue("LIMIT")));
} }
if (st.hasChildren("MAXRELATIVEEXPIRY")) {
info.setMaxRelativeExpiryMs(
Long.parseLong(st.getValue("MAXRELATIVEEXPIRY")));
}
return info; return info;
} }

View File

@ -4001,13 +4001,13 @@ String persistBlocks(INodeFile pendingFile, boolean logRetryCache)
*/ */
void renewLease(String holder) throws IOException { void renewLease(String holder) throws IOException {
checkOperation(OperationCategory.WRITE); checkOperation(OperationCategory.WRITE);
writeLock(); readLock();
try { try {
checkOperation(OperationCategory.WRITE); checkOperation(OperationCategory.WRITE);
checkNameNodeSafeMode("Cannot renew lease for " + holder); checkNameNodeSafeMode("Cannot renew lease for " + holder);
leaseManager.renewLease(holder); leaseManager.renewLease(holder);
} finally { } finally {
writeUnlock(); readUnlock();
} }
} }

View File

@ -35,14 +35,12 @@
import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry; import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry;
import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo; import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo;
import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo.Expiration;
import org.apache.hadoop.hdfs.protocol.CacheDirectiveStats; import org.apache.hadoop.hdfs.protocol.CacheDirectiveStats;
import org.apache.hadoop.hdfs.protocol.CachePoolEntry; import org.apache.hadoop.hdfs.protocol.CachePoolEntry;
import org.apache.hadoop.hdfs.protocol.CachePoolInfo; import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
import org.apache.hadoop.hdfs.protocol.CachePoolStats; import org.apache.hadoop.hdfs.protocol.CachePoolStats;
import org.apache.hadoop.hdfs.server.namenode.CachePool;
import org.apache.hadoop.hdfs.tools.TableListing.Justification; import org.apache.hadoop.hdfs.tools.TableListing.Justification;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.Tool;
@ -120,6 +118,23 @@ private static TableListing getOptionDescriptionListing() {
return listing; return listing;
} }
/**
* Parses a time-to-live value from a string
* @return The ttl in milliseconds
* @throws IOException if it could not be parsed
*/
private static Long parseTtlString(String maxTtlString) throws IOException {
Long maxTtl = null;
if (maxTtlString != null) {
if (maxTtlString.equalsIgnoreCase("never")) {
maxTtl = CachePoolInfo.RELATIVE_EXPIRY_NEVER;
} else {
maxTtl = DFSUtil.parseRelativeTime(maxTtlString);
}
}
return maxTtl;
}
interface Command { interface Command {
String getName(); String getName();
String getShortUsage(); String getShortUsage();
@ -154,7 +169,7 @@ public String getLongUsage() {
listing.addRow("<replication>", "The cache replication factor to use. " + listing.addRow("<replication>", "The cache replication factor to use. " +
"Defaults to 1."); "Defaults to 1.");
listing.addRow("<time-to-live>", "How long the directive is " + listing.addRow("<time-to-live>", "How long the directive is " +
"valid. Can be specified in minutes, hours, and days via e.g. " + "valid. Can be specified in minutes, hours, and days, e.g. " +
"30m, 4h, 2d. Valid units are [smhd]." + "30m, 4h, 2d. Valid units are [smhd]." +
" If unspecified, the directive never expires."); " If unspecified, the directive never expires.");
return getShortUsage() + "\n" + return getShortUsage() + "\n" +
@ -309,7 +324,7 @@ public String getLongUsage() {
"added. You must have write permission on the cache pool " "added. You must have write permission on the cache pool "
+ "in order to move a directive into it. (optional)"); + "in order to move a directive into it. (optional)");
listing.addRow("<time-to-live>", "How long the directive is " + listing.addRow("<time-to-live>", "How long the directive is " +
"valid. Can be specified in minutes, hours, and days via e.g. " + "valid. Can be specified in minutes, hours, and days, e.g. " +
"30m, 4h, 2d. Valid units are [smhd]." + "30m, 4h, 2d. Valid units are [smhd]." +
" If unspecified, the directive never expires."); " If unspecified, the directive never expires.");
return getShortUsage() + "\n" + return getShortUsage() + "\n" +
@ -419,22 +434,27 @@ public int run(Configuration conf, List<String> args) throws IOException {
System.err.println("Usage is " + getShortUsage()); System.err.println("Usage is " + getShortUsage());
return 1; return 1;
} }
DistributedFileSystem dfs = getDFS(conf);
RemoteIterator<CacheDirectiveEntry> iter =
dfs.listCacheDirectives(
new CacheDirectiveInfo.Builder().
setPath(new Path(path)).build());
int exitCode = 0; int exitCode = 0;
while (iter.hasNext()) { try {
CacheDirectiveEntry entry = iter.next(); DistributedFileSystem dfs = getDFS(conf);
try { RemoteIterator<CacheDirectiveEntry> iter =
dfs.removeCacheDirective(entry.getInfo().getId()); dfs.listCacheDirectives(
System.out.println("Removed cache directive " + new CacheDirectiveInfo.Builder().
entry.getInfo().getId()); setPath(new Path(path)).build());
} catch (IOException e) { while (iter.hasNext()) {
System.err.println(prettifyException(e)); CacheDirectiveEntry entry = iter.next();
exitCode = 2; try {
dfs.removeCacheDirective(entry.getInfo().getId());
System.out.println("Removed cache directive " +
entry.getInfo().getId());
} catch (IOException e) {
System.err.println(prettifyException(e));
exitCode = 2;
}
} }
} catch (IOException e) {
System.err.println(prettifyException(e));
exitCode = 2;
} }
if (exitCode == 0) { if (exitCode == 0) {
System.out.println("Removed every cache directive with path " + System.out.println("Removed every cache directive with path " +
@ -500,41 +520,46 @@ public int run(Configuration conf, List<String> args) throws IOException {
addField("FILES_CACHED", Justification.RIGHT); addField("FILES_CACHED", Justification.RIGHT);
} }
TableListing tableListing = tableBuilder.build(); TableListing tableListing = tableBuilder.build();
try {
DistributedFileSystem dfs = getDFS(conf); DistributedFileSystem dfs = getDFS(conf);
RemoteIterator<CacheDirectiveEntry> iter = RemoteIterator<CacheDirectiveEntry> iter =
dfs.listCacheDirectives(builder.build()); dfs.listCacheDirectives(builder.build());
int numEntries = 0; int numEntries = 0;
while (iter.hasNext()) { while (iter.hasNext()) {
CacheDirectiveEntry entry = iter.next(); CacheDirectiveEntry entry = iter.next();
CacheDirectiveInfo directive = entry.getInfo(); CacheDirectiveInfo directive = entry.getInfo();
CacheDirectiveStats stats = entry.getStats(); CacheDirectiveStats stats = entry.getStats();
List<String> row = new LinkedList<String>(); List<String> row = new LinkedList<String>();
row.add("" + directive.getId()); row.add("" + directive.getId());
row.add(directive.getPool()); row.add(directive.getPool());
row.add("" + directive.getReplication()); row.add("" + directive.getReplication());
String expiry; String expiry;
if (directive.getExpiration().getMillis() == // This is effectively never, round for nice printing
CacheDirectiveInfo.Expiration.EXPIRY_NEVER) { if (directive.getExpiration().getMillis() >
expiry = "never"; Expiration.MAX_RELATIVE_EXPIRY_MS / 2) {
} else { expiry = "never";
expiry = directive.getExpiration().toString(); } else {
expiry = directive.getExpiration().toString();
}
row.add(expiry);
row.add(directive.getPath().toUri().getPath());
if (printStats) {
row.add("" + stats.getBytesNeeded());
row.add("" + stats.getBytesCached());
row.add("" + stats.getFilesNeeded());
row.add("" + stats.getFilesCached());
}
tableListing.addRow(row.toArray(new String[0]));
numEntries++;
} }
row.add(expiry); System.out.print(String.format("Found %d entr%s\n",
row.add(directive.getPath().toUri().getPath()); numEntries, numEntries == 1 ? "y" : "ies"));
if (printStats) { if (numEntries > 0) {
row.add("" + stats.getBytesNeeded()); System.out.print(tableListing);
row.add("" + stats.getBytesCached());
row.add("" + stats.getFilesNeeded());
row.add("" + stats.getFilesCached());
} }
tableListing.addRow(row.toArray(new String[0])); } catch (IOException e) {
numEntries++; System.err.println(prettifyException(e));
} return 2;
System.out.print(String.format("Found %d entr%s\n",
numEntries, numEntries == 1 ? "y" : "ies"));
if (numEntries > 0) {
System.out.print(tableListing);
} }
return 0; return 0;
} }
@ -552,7 +577,8 @@ public String getName() {
@Override @Override
public String getShortUsage() { public String getShortUsage() {
return "[" + NAME + " <name> [-owner <owner>] " + return "[" + NAME + " <name> [-owner <owner>] " +
"[-group <group>] [-mode <mode>] [-limit <limit>]]\n"; "[-group <group>] [-mode <mode>] [-limit <limit>] " +
"[-maxttl <maxTtl>]\n";
} }
@Override @Override
@ -571,7 +597,11 @@ public String getLongUsage() {
listing.addRow("<limit>", "The maximum number of bytes that can be " + listing.addRow("<limit>", "The maximum number of bytes that can be " +
"cached by directives in this pool, in aggregate. By default, " + "cached by directives in this pool, in aggregate. By default, " +
"no limit is set."); "no limit is set.");
listing.addRow("<maxTtl>", "The maximum allowed time-to-live for " +
"directives being added to the pool. This can be specified in " +
"seconds, minutes, hours, and days, e.g. 120s, 30m, 4h, 2d. " +
"Valid units are [smhd]. By default, no maximum is set. " +
"This can also be manually specified by \"never\".");
return getShortUsage() + "\n" + return getShortUsage() + "\n" +
"Add a new cache pool.\n\n" + "Add a new cache pool.\n\n" +
listing.toString(); listing.toString();
@ -605,6 +635,18 @@ public int run(Configuration conf, List<String> args) throws IOException {
long limit = Long.parseLong(limitString); long limit = Long.parseLong(limitString);
info.setLimit(limit); info.setLimit(limit);
} }
String maxTtlString = StringUtils.popOptionWithArgument("-maxTtl", args);
try {
Long maxTtl = parseTtlString(maxTtlString);
if (maxTtl != null) {
info.setMaxRelativeExpiryMs(maxTtl);
}
} catch (IOException e) {
System.err.println(
"Error while parsing maxTtl value: " + e.getMessage());
return 1;
}
if (!args.isEmpty()) { if (!args.isEmpty()) {
System.err.print("Can't understand arguments: " + System.err.print("Can't understand arguments: " +
Joiner.on(" ").join(args) + "\n"); Joiner.on(" ").join(args) + "\n");
@ -615,7 +657,8 @@ public int run(Configuration conf, List<String> args) throws IOException {
try { try {
dfs.addCachePool(info); dfs.addCachePool(info);
} catch (IOException e) { } catch (IOException e) {
throw new RemoteException(e.getClass().getName(), e.getMessage()); System.err.println(prettifyException(e));
return 2;
} }
System.out.println("Successfully added cache pool " + name + "."); System.out.println("Successfully added cache pool " + name + ".");
return 0; return 0;
@ -632,7 +675,8 @@ public String getName() {
@Override @Override
public String getShortUsage() { public String getShortUsage() {
return "[" + getName() + " <name> [-owner <owner>] " + return "[" + getName() + " <name> [-owner <owner>] " +
"[-group <group>] [-mode <mode>] [-limit <limit>]]\n"; "[-group <group>] [-mode <mode>] [-limit <limit>] " +
"[-maxTtl <maxTtl>]]\n";
} }
@Override @Override
@ -645,6 +689,8 @@ public String getLongUsage() {
listing.addRow("<mode>", "Unix-style permissions of the pool in octal."); listing.addRow("<mode>", "Unix-style permissions of the pool in octal.");
listing.addRow("<limit>", "Maximum number of bytes that can be cached " + listing.addRow("<limit>", "Maximum number of bytes that can be cached " +
"by this pool."); "by this pool.");
listing.addRow("<maxTtl>", "The maximum allowed time-to-live for " +
"directives being added to the pool.");
return getShortUsage() + "\n" + return getShortUsage() + "\n" +
WordUtils.wrap("Modifies the metadata of an existing cache pool. " + WordUtils.wrap("Modifies the metadata of an existing cache pool. " +
@ -663,6 +709,15 @@ public int run(Configuration conf, List<String> args) throws IOException {
String limitString = StringUtils.popOptionWithArgument("-limit", args); String limitString = StringUtils.popOptionWithArgument("-limit", args);
Long limit = (limitString == null) ? Long limit = (limitString == null) ?
null : Long.parseLong(limitString); null : Long.parseLong(limitString);
String maxTtlString = StringUtils.popOptionWithArgument("-maxTtl", args);
Long maxTtl = null;
try {
maxTtl = parseTtlString(maxTtlString);
} catch (IOException e) {
System.err.println(
"Error while parsing maxTtl value: " + e.getMessage());
return 1;
}
String name = StringUtils.popFirstNonOption(args); String name = StringUtils.popFirstNonOption(args);
if (name == null) { if (name == null) {
System.err.println("You must specify a name when creating a " + System.err.println("You must specify a name when creating a " +
@ -693,6 +748,10 @@ public int run(Configuration conf, List<String> args) throws IOException {
info.setLimit(limit); info.setLimit(limit);
changed = true; changed = true;
} }
if (maxTtl != null) {
info.setMaxRelativeExpiryMs(maxTtl);
changed = true;
}
if (!changed) { if (!changed) {
System.err.println("You must specify at least one attribute to " + System.err.println("You must specify at least one attribute to " +
"change in the cache pool."); "change in the cache pool.");
@ -702,7 +761,8 @@ public int run(Configuration conf, List<String> args) throws IOException {
try { try {
dfs.modifyCachePool(info); dfs.modifyCachePool(info);
} catch (IOException e) { } catch (IOException e) {
throw new RemoteException(e.getClass().getName(), e.getMessage()); System.err.println(prettifyException(e));
return 2;
} }
System.out.print("Successfully modified cache pool " + name); System.out.print("Successfully modified cache pool " + name);
String prefix = " to have "; String prefix = " to have ";
@ -722,6 +782,9 @@ public int run(Configuration conf, List<String> args) throws IOException {
System.out.print(prefix + "limit " + limit); System.out.print(prefix + "limit " + limit);
prefix = " and "; prefix = " and ";
} }
if (maxTtl != null) {
System.out.print(prefix + "max time-to-live " + maxTtlString);
}
System.out.print("\n"); System.out.print("\n");
return 0; return 0;
} }
@ -765,7 +828,8 @@ public int run(Configuration conf, List<String> args) throws IOException {
try { try {
dfs.removeCachePool(name); dfs.removeCachePool(name);
} catch (IOException e) { } catch (IOException e) {
throw new RemoteException(e.getClass().getName(), e.getMessage()); System.err.println(prettifyException(e));
return 2;
} }
System.out.println("Successfully removed cache pool " + name + "."); System.out.println("Successfully removed cache pool " + name + ".");
return 0; return 0;
@ -813,7 +877,8 @@ public int run(Configuration conf, List<String> args) throws IOException {
addField("OWNER", Justification.LEFT). addField("OWNER", Justification.LEFT).
addField("GROUP", Justification.LEFT). addField("GROUP", Justification.LEFT).
addField("MODE", Justification.LEFT). addField("MODE", Justification.LEFT).
addField("LIMIT", Justification.RIGHT); addField("LIMIT", Justification.RIGHT).
addField("MAXTTL", Justification.RIGHT);
if (printStats) { if (printStats) {
builder. builder.
addField("BYTES_NEEDED", Justification.RIGHT). addField("BYTES_NEEDED", Justification.RIGHT).
@ -837,12 +902,23 @@ public int run(Configuration conf, List<String> args) throws IOException {
row.add(info.getMode() != null ? info.getMode().toString() : null); row.add(info.getMode() != null ? info.getMode().toString() : null);
Long limit = info.getLimit(); Long limit = info.getLimit();
String limitString; String limitString;
if (limit != null && limit.equals(CachePool.DEFAULT_LIMIT)) { if (limit != null && limit.equals(CachePoolInfo.LIMIT_UNLIMITED)) {
limitString = "unlimited"; limitString = "unlimited";
} else { } else {
limitString = "" + limit; limitString = "" + limit;
} }
row.add(limitString); row.add(limitString);
Long maxTtl = info.getMaxRelativeExpiryMs();
String maxTtlString = null;
if (maxTtl != null) {
if (maxTtl.longValue() == CachePoolInfo.RELATIVE_EXPIRY_NEVER) {
maxTtlString = "never";
} else {
maxTtlString = DFSUtil.durationToString(maxTtl);
}
}
row.add(maxTtlString);
if (printStats) { if (printStats) {
CachePoolStats stats = entry.getStats(); CachePoolStats stats = entry.getStats();
row.add(Long.toString(stats.getBytesNeeded())); row.add(Long.toString(stats.getBytesNeeded()));
@ -859,7 +935,8 @@ public int run(Configuration conf, List<String> args) throws IOException {
} }
} }
} catch (IOException e) { } catch (IOException e) {
throw new RemoteException(e.getClass().getName(), e.getMessage()); System.err.println(prettifyException(e));
return 2;
} }
System.out.print(String.format("Found %d result%s.\n", numResults, System.out.print(String.format("Found %d result%s.\n", numResults,
(numResults == 1 ? "" : "s"))); (numResults == 1 ? "" : "s")));

View File

@ -435,6 +435,7 @@ message CachePoolInfoProto {
optional string groupName = 3; optional string groupName = 3;
optional int32 mode = 4; optional int32 mode = 4;
optional int64 limit = 5; optional int64 limit = 5;
optional int64 maxRelativeExpiry = 6;
} }
message CachePoolStatsProto { message CachePoolStatsProto {

View File

@ -73,7 +73,7 @@ HDFS Short-Circuit Local Reads
This configuration parameter turns on short-circuit local reads. This configuration parameter turns on short-circuit local reads.
* dfs.client.read.shortcircuit.skip.checkusm * dfs.client.read.shortcircuit.skip.checksum
If this configuration parameter is set, short-circuit local reads will skip If this configuration parameter is set, short-circuit local reads will skip
checksums. This is normally not recommended, but it may be useful for checksums. This is normally not recommended, but it may be useful for

View File

@ -62,7 +62,6 @@
import org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider; import org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider;
import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.Shell; import org.apache.hadoop.util.Shell;
import org.junit.Assume; import org.junit.Assume;
import org.junit.Before; import org.junit.Before;
@ -730,16 +729,15 @@ public void testGetSpnegoKeytabKey() {
@Test(timeout=1000) @Test(timeout=1000)
public void testDurationToString() throws Exception { public void testDurationToString() throws Exception {
assertEquals("000:00:00:00", DFSUtil.durationToString(0)); assertEquals("000:00:00:00.000", DFSUtil.durationToString(0));
try { assertEquals("001:01:01:01.000",
DFSUtil.durationToString(-199);
} catch (IllegalArgumentException e) {
GenericTestUtils.assertExceptionContains("Invalid negative duration", e);
}
assertEquals("001:01:01:01",
DFSUtil.durationToString(((24*60*60)+(60*60)+(60)+1)*1000)); DFSUtil.durationToString(((24*60*60)+(60*60)+(60)+1)*1000));
assertEquals("000:23:59:59", assertEquals("000:23:59:59.999",
DFSUtil.durationToString(((23*60*60)+(59*60)+(59))*1000)); DFSUtil.durationToString(((23*60*60)+(59*60)+(59))*1000+999));
assertEquals("-001:01:01:01.000",
DFSUtil.durationToString(-((24*60*60)+(60*60)+(60)+1)*1000));
assertEquals("-000:23:59:59.574",
DFSUtil.durationToString(-(((23*60*60)+(59*60)+(59))*1000+574)));
} }
@Test(timeout=5000) @Test(timeout=5000)
@ -763,7 +761,7 @@ public void testRelativeTimeConversion() throws Exception {
assertEquals(61*60*1000, DFSUtil.parseRelativeTime("61m")); assertEquals(61*60*1000, DFSUtil.parseRelativeTime("61m"));
assertEquals(0, DFSUtil.parseRelativeTime("0s")); assertEquals(0, DFSUtil.parseRelativeTime("0s"));
assertEquals(25*60*60*1000, DFSUtil.parseRelativeTime("25h")); assertEquals(25*60*60*1000, DFSUtil.parseRelativeTime("25h"));
assertEquals(4*24*60*60*1000, DFSUtil.parseRelativeTime("4d")); assertEquals(4*24*60*60*1000l, DFSUtil.parseRelativeTime("4d"));
assertEquals(999*24*60*60*1000, DFSUtil.parseRelativeTime("999d")); assertEquals(999*24*60*60*1000l, DFSUtil.parseRelativeTime("999d"));
} }
} }

View File

@ -23,6 +23,8 @@
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_CACHING_ENABLED_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_CACHING_ENABLED_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_PATH_BASED_CACHE_REFRESH_INTERVAL_MS; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_PATH_BASED_CACHE_REFRESH_INTERVAL_MS;
import static org.apache.hadoop.hdfs.protocol.CachePoolInfo.RELATIVE_EXPIRY_NEVER;
import static org.apache.hadoop.test.GenericTestUtils.assertExceptionContains;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNotNull;
@ -137,6 +139,8 @@ public void setup() throws Exception {
NativeIO.POSIX.setCacheManipulator(new NoMlockCacheManipulator()); NativeIO.POSIX.setCacheManipulator(new NoMlockCacheManipulator());
LogManager.getLogger(CacheReplicationMonitor.class.getName()).setLevel( LogManager.getLogger(CacheReplicationMonitor.class.getName()).setLevel(
Level.TRACE); Level.TRACE);
LogManager.getLogger(CacheManager.class.getName()).setLevel(
Level.TRACE);
} }
@After @After
@ -1189,4 +1193,185 @@ public void testLimit() throws Exception {
new CacheDirectiveInfo.Builder().setPool(inadequate.getPoolName()) new CacheDirectiveInfo.Builder().setPool(inadequate.getPoolName())
.setPath(path1).build(), EnumSet.of(CacheFlag.FORCE)); .setPath(path1).build(), EnumSet.of(CacheFlag.FORCE));
} }
@Test(timeout=30000)
public void testMaxRelativeExpiry() throws Exception {
// Test that negative and really big max expirations can't be set during add
try {
dfs.addCachePool(new CachePoolInfo("failpool").setMaxRelativeExpiryMs(-1l));
fail("Added a pool with a negative max expiry.");
} catch (InvalidRequestException e) {
GenericTestUtils.assertExceptionContains("negative", e);
}
try {
dfs.addCachePool(new CachePoolInfo("failpool")
.setMaxRelativeExpiryMs(Long.MAX_VALUE - 1));
fail("Added a pool with too big of a max expiry.");
} catch (InvalidRequestException e) {
GenericTestUtils.assertExceptionContains("too big", e);
}
// Test that setting a max relative expiry on a pool works
CachePoolInfo coolPool = new CachePoolInfo("coolPool");
final long poolExpiration = 1000 * 60 * 10l;
dfs.addCachePool(coolPool.setMaxRelativeExpiryMs(poolExpiration));
RemoteIterator<CachePoolEntry> poolIt = dfs.listCachePools();
CachePoolInfo listPool = poolIt.next().getInfo();
assertFalse("Should only be one pool", poolIt.hasNext());
assertEquals("Expected max relative expiry to match set value",
poolExpiration, listPool.getMaxRelativeExpiryMs().longValue());
// Test that negative and really big max expirations can't be modified
try {
dfs.addCachePool(coolPool.setMaxRelativeExpiryMs(-1l));
fail("Added a pool with a negative max expiry.");
} catch (InvalidRequestException e) {
assertExceptionContains("negative", e);
}
try {
dfs.modifyCachePool(coolPool
.setMaxRelativeExpiryMs(CachePoolInfo.RELATIVE_EXPIRY_NEVER+1));
fail("Added a pool with too big of a max expiry.");
} catch (InvalidRequestException e) {
assertExceptionContains("too big", e);
}
// Test that adding a directives without an expiration uses the pool's max
CacheDirectiveInfo defaultExpiry = new CacheDirectiveInfo.Builder()
.setPath(new Path("/blah"))
.setPool(coolPool.getPoolName())
.build();
dfs.addCacheDirective(defaultExpiry);
RemoteIterator<CacheDirectiveEntry> dirIt =
dfs.listCacheDirectives(defaultExpiry);
CacheDirectiveInfo listInfo = dirIt.next().getInfo();
assertFalse("Should only have one entry in listing", dirIt.hasNext());
long listExpiration = listInfo.getExpiration().getAbsoluteMillis()
- new Date().getTime();
assertTrue("Directive expiry should be approximately the pool's max expiry",
Math.abs(listExpiration - poolExpiration) < 10*1000);
// Test that the max is enforced on add for relative and absolute
CacheDirectiveInfo.Builder builder = new CacheDirectiveInfo.Builder()
.setPath(new Path("/lolcat"))
.setPool(coolPool.getPoolName());
try {
dfs.addCacheDirective(builder
.setExpiration(Expiration.newRelative(poolExpiration+1))
.build());
fail("Added a directive that exceeds pool's max relative expiration");
} catch (InvalidRequestException e) {
assertExceptionContains("exceeds the max relative expiration", e);
}
try {
dfs.addCacheDirective(builder
.setExpiration(Expiration.newAbsolute(
new Date().getTime() + poolExpiration + (10*1000)))
.build());
fail("Added a directive that exceeds pool's max relative expiration");
} catch (InvalidRequestException e) {
assertExceptionContains("exceeds the max relative expiration", e);
}
// Test that max is enforced on modify for relative and absolute Expirations
try {
dfs.modifyCacheDirective(new CacheDirectiveInfo.Builder(defaultExpiry)
.setId(listInfo.getId())
.setExpiration(Expiration.newRelative(poolExpiration+1))
.build());
fail("Modified a directive to exceed pool's max relative expiration");
} catch (InvalidRequestException e) {
assertExceptionContains("exceeds the max relative expiration", e);
}
try {
dfs.modifyCacheDirective(new CacheDirectiveInfo.Builder(defaultExpiry)
.setId(listInfo.getId())
.setExpiration(Expiration.newAbsolute(
new Date().getTime() + poolExpiration + (10*1000)))
.build());
fail("Modified a directive to exceed pool's max relative expiration");
} catch (InvalidRequestException e) {
assertExceptionContains("exceeds the max relative expiration", e);
}
// Test some giant limit values with add
try {
dfs.addCacheDirective(builder
.setExpiration(Expiration.newRelative(
Long.MAX_VALUE))
.build());
fail("Added a directive with a gigantic max value");
} catch (IllegalArgumentException e) {
assertExceptionContains("is too far in the future", e);
}
try {
dfs.addCacheDirective(builder
.setExpiration(Expiration.newAbsolute(
Long.MAX_VALUE))
.build());
fail("Added a directive with a gigantic max value");
} catch (InvalidRequestException e) {
assertExceptionContains("is too far in the future", e);
}
// Test some giant limit values with modify
try {
dfs.modifyCacheDirective(new CacheDirectiveInfo.Builder(defaultExpiry)
.setId(listInfo.getId())
.setExpiration(Expiration.NEVER)
.build());
fail("Modified a directive to exceed pool's max relative expiration");
} catch (InvalidRequestException e) {
assertExceptionContains("exceeds the max relative expiration", e);
}
try {
dfs.modifyCacheDirective(new CacheDirectiveInfo.Builder(defaultExpiry)
.setId(listInfo.getId())
.setExpiration(Expiration.newAbsolute(
Long.MAX_VALUE))
.build());
fail("Modified a directive to exceed pool's max relative expiration");
} catch (InvalidRequestException e) {
assertExceptionContains("is too far in the future", e);
}
// Test that the max is enforced on modify correctly when changing pools
CachePoolInfo destPool = new CachePoolInfo("destPool");
dfs.addCachePool(destPool.setMaxRelativeExpiryMs(poolExpiration / 2));
try {
dfs.modifyCacheDirective(new CacheDirectiveInfo.Builder(defaultExpiry)
.setId(listInfo.getId())
.setPool(destPool.getPoolName())
.build());
fail("Modified a directive to a pool with a lower max expiration");
} catch (InvalidRequestException e) {
assertExceptionContains("exceeds the max relative expiration", e);
}
dfs.modifyCacheDirective(new CacheDirectiveInfo.Builder(defaultExpiry)
.setId(listInfo.getId())
.setPool(destPool.getPoolName())
.setExpiration(Expiration.newRelative(poolExpiration / 2))
.build());
dirIt = dfs.listCacheDirectives(new CacheDirectiveInfo.Builder()
.setPool(destPool.getPoolName())
.build());
listInfo = dirIt.next().getInfo();
listExpiration = listInfo.getExpiration().getAbsoluteMillis()
- new Date().getTime();
assertTrue("Unexpected relative expiry " + listExpiration
+ " expected approximately " + poolExpiration/2,
Math.abs(poolExpiration/2 - listExpiration) < 10*1000);
// Test that cache pool and directive expiry can be modified back to never
dfs.modifyCachePool(destPool
.setMaxRelativeExpiryMs(CachePoolInfo.RELATIVE_EXPIRY_NEVER));
poolIt = dfs.listCachePools();
listPool = poolIt.next().getInfo();
while (!listPool.getPoolName().equals(destPool.getPoolName())) {
listPool = poolIt.next().getInfo();
}
assertEquals("Expected max relative expiry to match set value",
CachePoolInfo.RELATIVE_EXPIRY_NEVER,
listPool.getMaxRelativeExpiryMs().longValue());
dfs.modifyCacheDirective(new CacheDirectiveInfo.Builder()
.setId(listInfo.getId())
.setExpiration(Expiration.newRelative(RELATIVE_EXPIRY_NEVER))
.build());
// Test modifying close to the limit
dfs.modifyCacheDirective(new CacheDirectiveInfo.Builder()
.setId(listInfo.getId())
.setExpiration(Expiration.newRelative(RELATIVE_EXPIRY_NEVER - 1))
.build());
}
} }

View File

@ -13,8 +13,8 @@
<TXID>2</TXID> <TXID>2</TXID>
<DELEGATION_KEY> <DELEGATION_KEY>
<KEY_ID>1</KEY_ID> <KEY_ID>1</KEY_ID>
<EXPIRY_DATE>1387701670577</EXPIRY_DATE> <EXPIRY_DATE>1388171826188</EXPIRY_DATE>
<KEY>7bb5467995769b59</KEY> <KEY>c7d869c22c8afce1</KEY>
</DELEGATION_KEY> </DELEGATION_KEY>
</DATA> </DATA>
</RECORD> </RECORD>
@ -24,8 +24,8 @@
<TXID>3</TXID> <TXID>3</TXID>
<DELEGATION_KEY> <DELEGATION_KEY>
<KEY_ID>2</KEY_ID> <KEY_ID>2</KEY_ID>
<EXPIRY_DATE>1387701670580</EXPIRY_DATE> <EXPIRY_DATE>1388171826191</EXPIRY_DATE>
<KEY>a5a3a2755e36827b</KEY> <KEY>a3c41446507dfca9</KEY>
</DELEGATION_KEY> </DELEGATION_KEY>
</DATA> </DATA>
</RECORD> </RECORD>
@ -37,17 +37,17 @@
<INODEID>16386</INODEID> <INODEID>16386</INODEID>
<PATH>/file_create_u\0001;F431</PATH> <PATH>/file_create_u\0001;F431</PATH>
<REPLICATION>1</REPLICATION> <REPLICATION>1</REPLICATION>
<MTIME>1387010471220</MTIME> <MTIME>1387480626844</MTIME>
<ATIME>1387010471220</ATIME> <ATIME>1387480626844</ATIME>
<BLOCKSIZE>512</BLOCKSIZE> <BLOCKSIZE>512</BLOCKSIZE>
<CLIENT_NAME>DFSClient_NONMAPREDUCE_-52011019_1</CLIENT_NAME> <CLIENT_NAME>DFSClient_NONMAPREDUCE_1147796111_1</CLIENT_NAME>
<CLIENT_MACHINE>127.0.0.1</CLIENT_MACHINE> <CLIENT_MACHINE>127.0.0.1</CLIENT_MACHINE>
<PERMISSION_STATUS> <PERMISSION_STATUS>
<USERNAME>andrew</USERNAME> <USERNAME>andrew</USERNAME>
<GROUPNAME>supergroup</GROUPNAME> <GROUPNAME>supergroup</GROUPNAME>
<MODE>420</MODE> <MODE>420</MODE>
</PERMISSION_STATUS> </PERMISSION_STATUS>
<RPC_CLIENTID>508263bb-692e-4439-8738-ff89b8b03923</RPC_CLIENTID> <RPC_CLIENTID>a90261a0-3759-4480-ba80-e10c9ae331e6</RPC_CLIENTID>
<RPC_CALLID>7</RPC_CALLID> <RPC_CALLID>7</RPC_CALLID>
</DATA> </DATA>
</RECORD> </RECORD>
@ -59,8 +59,8 @@
<INODEID>0</INODEID> <INODEID>0</INODEID>
<PATH>/file_create_u\0001;F431</PATH> <PATH>/file_create_u\0001;F431</PATH>
<REPLICATION>1</REPLICATION> <REPLICATION>1</REPLICATION>
<MTIME>1387010471276</MTIME> <MTIME>1387480626885</MTIME>
<ATIME>1387010471220</ATIME> <ATIME>1387480626844</ATIME>
<BLOCKSIZE>512</BLOCKSIZE> <BLOCKSIZE>512</BLOCKSIZE>
<CLIENT_NAME></CLIENT_NAME> <CLIENT_NAME></CLIENT_NAME>
<CLIENT_MACHINE></CLIENT_MACHINE> <CLIENT_MACHINE></CLIENT_MACHINE>
@ -78,8 +78,8 @@
<LENGTH>0</LENGTH> <LENGTH>0</LENGTH>
<SRC>/file_create_u\0001;F431</SRC> <SRC>/file_create_u\0001;F431</SRC>
<DST>/file_moved</DST> <DST>/file_moved</DST>
<TIMESTAMP>1387010471286</TIMESTAMP> <TIMESTAMP>1387480626894</TIMESTAMP>
<RPC_CLIENTID>508263bb-692e-4439-8738-ff89b8b03923</RPC_CLIENTID> <RPC_CLIENTID>a90261a0-3759-4480-ba80-e10c9ae331e6</RPC_CLIENTID>
<RPC_CALLID>9</RPC_CALLID> <RPC_CALLID>9</RPC_CALLID>
</DATA> </DATA>
</RECORD> </RECORD>
@ -89,8 +89,8 @@
<TXID>7</TXID> <TXID>7</TXID>
<LENGTH>0</LENGTH> <LENGTH>0</LENGTH>
<PATH>/file_moved</PATH> <PATH>/file_moved</PATH>
<TIMESTAMP>1387010471299</TIMESTAMP> <TIMESTAMP>1387480626905</TIMESTAMP>
<RPC_CLIENTID>508263bb-692e-4439-8738-ff89b8b03923</RPC_CLIENTID> <RPC_CLIENTID>a90261a0-3759-4480-ba80-e10c9ae331e6</RPC_CLIENTID>
<RPC_CALLID>10</RPC_CALLID> <RPC_CALLID>10</RPC_CALLID>
</DATA> </DATA>
</RECORD> </RECORD>
@ -101,7 +101,7 @@
<LENGTH>0</LENGTH> <LENGTH>0</LENGTH>
<INODEID>16387</INODEID> <INODEID>16387</INODEID>
<PATH>/directory_mkdir</PATH> <PATH>/directory_mkdir</PATH>
<TIMESTAMP>1387010471312</TIMESTAMP> <TIMESTAMP>1387480626917</TIMESTAMP>
<PERMISSION_STATUS> <PERMISSION_STATUS>
<USERNAME>andrew</USERNAME> <USERNAME>andrew</USERNAME>
<GROUPNAME>supergroup</GROUPNAME> <GROUPNAME>supergroup</GROUPNAME>
@ -136,7 +136,7 @@
<TXID>12</TXID> <TXID>12</TXID>
<SNAPSHOTROOT>/directory_mkdir</SNAPSHOTROOT> <SNAPSHOTROOT>/directory_mkdir</SNAPSHOTROOT>
<SNAPSHOTNAME>snapshot1</SNAPSHOTNAME> <SNAPSHOTNAME>snapshot1</SNAPSHOTNAME>
<RPC_CLIENTID>508263bb-692e-4439-8738-ff89b8b03923</RPC_CLIENTID> <RPC_CLIENTID>a90261a0-3759-4480-ba80-e10c9ae331e6</RPC_CLIENTID>
<RPC_CALLID>15</RPC_CALLID> <RPC_CALLID>15</RPC_CALLID>
</DATA> </DATA>
</RECORD> </RECORD>
@ -147,7 +147,7 @@
<SNAPSHOTROOT>/directory_mkdir</SNAPSHOTROOT> <SNAPSHOTROOT>/directory_mkdir</SNAPSHOTROOT>
<SNAPSHOTOLDNAME>snapshot1</SNAPSHOTOLDNAME> <SNAPSHOTOLDNAME>snapshot1</SNAPSHOTOLDNAME>
<SNAPSHOTNEWNAME>snapshot2</SNAPSHOTNEWNAME> <SNAPSHOTNEWNAME>snapshot2</SNAPSHOTNEWNAME>
<RPC_CLIENTID>508263bb-692e-4439-8738-ff89b8b03923</RPC_CLIENTID> <RPC_CLIENTID>a90261a0-3759-4480-ba80-e10c9ae331e6</RPC_CLIENTID>
<RPC_CALLID>16</RPC_CALLID> <RPC_CALLID>16</RPC_CALLID>
</DATA> </DATA>
</RECORD> </RECORD>
@ -157,7 +157,7 @@
<TXID>14</TXID> <TXID>14</TXID>
<SNAPSHOTROOT>/directory_mkdir</SNAPSHOTROOT> <SNAPSHOTROOT>/directory_mkdir</SNAPSHOTROOT>
<SNAPSHOTNAME>snapshot2</SNAPSHOTNAME> <SNAPSHOTNAME>snapshot2</SNAPSHOTNAME>
<RPC_CLIENTID>508263bb-692e-4439-8738-ff89b8b03923</RPC_CLIENTID> <RPC_CLIENTID>a90261a0-3759-4480-ba80-e10c9ae331e6</RPC_CLIENTID>
<RPC_CALLID>17</RPC_CALLID> <RPC_CALLID>17</RPC_CALLID>
</DATA> </DATA>
</RECORD> </RECORD>
@ -169,17 +169,17 @@
<INODEID>16388</INODEID> <INODEID>16388</INODEID>
<PATH>/file_create_u\0001;F431</PATH> <PATH>/file_create_u\0001;F431</PATH>
<REPLICATION>1</REPLICATION> <REPLICATION>1</REPLICATION>
<MTIME>1387010471373</MTIME> <MTIME>1387480626978</MTIME>
<ATIME>1387010471373</ATIME> <ATIME>1387480626978</ATIME>
<BLOCKSIZE>512</BLOCKSIZE> <BLOCKSIZE>512</BLOCKSIZE>
<CLIENT_NAME>DFSClient_NONMAPREDUCE_-52011019_1</CLIENT_NAME> <CLIENT_NAME>DFSClient_NONMAPREDUCE_1147796111_1</CLIENT_NAME>
<CLIENT_MACHINE>127.0.0.1</CLIENT_MACHINE> <CLIENT_MACHINE>127.0.0.1</CLIENT_MACHINE>
<PERMISSION_STATUS> <PERMISSION_STATUS>
<USERNAME>andrew</USERNAME> <USERNAME>andrew</USERNAME>
<GROUPNAME>supergroup</GROUPNAME> <GROUPNAME>supergroup</GROUPNAME>
<MODE>420</MODE> <MODE>420</MODE>
</PERMISSION_STATUS> </PERMISSION_STATUS>
<RPC_CLIENTID>508263bb-692e-4439-8738-ff89b8b03923</RPC_CLIENTID> <RPC_CLIENTID>a90261a0-3759-4480-ba80-e10c9ae331e6</RPC_CLIENTID>
<RPC_CALLID>18</RPC_CALLID> <RPC_CALLID>18</RPC_CALLID>
</DATA> </DATA>
</RECORD> </RECORD>
@ -191,8 +191,8 @@
<INODEID>0</INODEID> <INODEID>0</INODEID>
<PATH>/file_create_u\0001;F431</PATH> <PATH>/file_create_u\0001;F431</PATH>
<REPLICATION>1</REPLICATION> <REPLICATION>1</REPLICATION>
<MTIME>1387010471380</MTIME> <MTIME>1387480626985</MTIME>
<ATIME>1387010471373</ATIME> <ATIME>1387480626978</ATIME>
<BLOCKSIZE>512</BLOCKSIZE> <BLOCKSIZE>512</BLOCKSIZE>
<CLIENT_NAME></CLIENT_NAME> <CLIENT_NAME></CLIENT_NAME>
<CLIENT_MACHINE></CLIENT_MACHINE> <CLIENT_MACHINE></CLIENT_MACHINE>
@ -253,9 +253,9 @@
<LENGTH>0</LENGTH> <LENGTH>0</LENGTH>
<SRC>/file_create_u\0001;F431</SRC> <SRC>/file_create_u\0001;F431</SRC>
<DST>/file_moved</DST> <DST>/file_moved</DST>
<TIMESTAMP>1387010471428</TIMESTAMP> <TIMESTAMP>1387480627035</TIMESTAMP>
<OPTIONS>NONE</OPTIONS> <OPTIONS>NONE</OPTIONS>
<RPC_CLIENTID>508263bb-692e-4439-8738-ff89b8b03923</RPC_CLIENTID> <RPC_CLIENTID>a90261a0-3759-4480-ba80-e10c9ae331e6</RPC_CLIENTID>
<RPC_CALLID>25</RPC_CALLID> <RPC_CALLID>25</RPC_CALLID>
</DATA> </DATA>
</RECORD> </RECORD>
@ -267,17 +267,17 @@
<INODEID>16389</INODEID> <INODEID>16389</INODEID>
<PATH>/file_concat_target</PATH> <PATH>/file_concat_target</PATH>
<REPLICATION>1</REPLICATION> <REPLICATION>1</REPLICATION>
<MTIME>1387010471438</MTIME> <MTIME>1387480627043</MTIME>
<ATIME>1387010471438</ATIME> <ATIME>1387480627043</ATIME>
<BLOCKSIZE>512</BLOCKSIZE> <BLOCKSIZE>512</BLOCKSIZE>
<CLIENT_NAME>DFSClient_NONMAPREDUCE_-52011019_1</CLIENT_NAME> <CLIENT_NAME>DFSClient_NONMAPREDUCE_1147796111_1</CLIENT_NAME>
<CLIENT_MACHINE>127.0.0.1</CLIENT_MACHINE> <CLIENT_MACHINE>127.0.0.1</CLIENT_MACHINE>
<PERMISSION_STATUS> <PERMISSION_STATUS>
<USERNAME>andrew</USERNAME> <USERNAME>andrew</USERNAME>
<GROUPNAME>supergroup</GROUPNAME> <GROUPNAME>supergroup</GROUPNAME>
<MODE>420</MODE> <MODE>420</MODE>
</PERMISSION_STATUS> </PERMISSION_STATUS>
<RPC_CLIENTID>508263bb-692e-4439-8738-ff89b8b03923</RPC_CLIENTID> <RPC_CLIENTID>a90261a0-3759-4480-ba80-e10c9ae331e6</RPC_CLIENTID>
<RPC_CALLID>27</RPC_CALLID> <RPC_CALLID>27</RPC_CALLID>
</DATA> </DATA>
</RECORD> </RECORD>
@ -388,8 +388,8 @@
<INODEID>0</INODEID> <INODEID>0</INODEID>
<PATH>/file_concat_target</PATH> <PATH>/file_concat_target</PATH>
<REPLICATION>1</REPLICATION> <REPLICATION>1</REPLICATION>
<MTIME>1387010471540</MTIME> <MTIME>1387480627148</MTIME>
<ATIME>1387010471438</ATIME> <ATIME>1387480627043</ATIME>
<BLOCKSIZE>512</BLOCKSIZE> <BLOCKSIZE>512</BLOCKSIZE>
<CLIENT_NAME></CLIENT_NAME> <CLIENT_NAME></CLIENT_NAME>
<CLIENT_MACHINE></CLIENT_MACHINE> <CLIENT_MACHINE></CLIENT_MACHINE>
@ -423,17 +423,17 @@
<INODEID>16390</INODEID> <INODEID>16390</INODEID>
<PATH>/file_concat_0</PATH> <PATH>/file_concat_0</PATH>
<REPLICATION>1</REPLICATION> <REPLICATION>1</REPLICATION>
<MTIME>1387010471547</MTIME> <MTIME>1387480627155</MTIME>
<ATIME>1387010471547</ATIME> <ATIME>1387480627155</ATIME>
<BLOCKSIZE>512</BLOCKSIZE> <BLOCKSIZE>512</BLOCKSIZE>
<CLIENT_NAME>DFSClient_NONMAPREDUCE_-52011019_1</CLIENT_NAME> <CLIENT_NAME>DFSClient_NONMAPREDUCE_1147796111_1</CLIENT_NAME>
<CLIENT_MACHINE>127.0.0.1</CLIENT_MACHINE> <CLIENT_MACHINE>127.0.0.1</CLIENT_MACHINE>
<PERMISSION_STATUS> <PERMISSION_STATUS>
<USERNAME>andrew</USERNAME> <USERNAME>andrew</USERNAME>
<GROUPNAME>supergroup</GROUPNAME> <GROUPNAME>supergroup</GROUPNAME>
<MODE>420</MODE> <MODE>420</MODE>
</PERMISSION_STATUS> </PERMISSION_STATUS>
<RPC_CLIENTID>508263bb-692e-4439-8738-ff89b8b03923</RPC_CLIENTID> <RPC_CLIENTID>a90261a0-3759-4480-ba80-e10c9ae331e6</RPC_CLIENTID>
<RPC_CALLID>40</RPC_CALLID> <RPC_CALLID>40</RPC_CALLID>
</DATA> </DATA>
</RECORD> </RECORD>
@ -544,8 +544,8 @@
<INODEID>0</INODEID> <INODEID>0</INODEID>
<PATH>/file_concat_0</PATH> <PATH>/file_concat_0</PATH>
<REPLICATION>1</REPLICATION> <REPLICATION>1</REPLICATION>
<MTIME>1387010471588</MTIME> <MTIME>1387480627193</MTIME>
<ATIME>1387010471547</ATIME> <ATIME>1387480627155</ATIME>
<BLOCKSIZE>512</BLOCKSIZE> <BLOCKSIZE>512</BLOCKSIZE>
<CLIENT_NAME></CLIENT_NAME> <CLIENT_NAME></CLIENT_NAME>
<CLIENT_MACHINE></CLIENT_MACHINE> <CLIENT_MACHINE></CLIENT_MACHINE>
@ -579,17 +579,17 @@
<INODEID>16391</INODEID> <INODEID>16391</INODEID>
<PATH>/file_concat_1</PATH> <PATH>/file_concat_1</PATH>
<REPLICATION>1</REPLICATION> <REPLICATION>1</REPLICATION>
<MTIME>1387010471595</MTIME> <MTIME>1387480627200</MTIME>
<ATIME>1387010471595</ATIME> <ATIME>1387480627200</ATIME>
<BLOCKSIZE>512</BLOCKSIZE> <BLOCKSIZE>512</BLOCKSIZE>
<CLIENT_NAME>DFSClient_NONMAPREDUCE_-52011019_1</CLIENT_NAME> <CLIENT_NAME>DFSClient_NONMAPREDUCE_1147796111_1</CLIENT_NAME>
<CLIENT_MACHINE>127.0.0.1</CLIENT_MACHINE> <CLIENT_MACHINE>127.0.0.1</CLIENT_MACHINE>
<PERMISSION_STATUS> <PERMISSION_STATUS>
<USERNAME>andrew</USERNAME> <USERNAME>andrew</USERNAME>
<GROUPNAME>supergroup</GROUPNAME> <GROUPNAME>supergroup</GROUPNAME>
<MODE>420</MODE> <MODE>420</MODE>
</PERMISSION_STATUS> </PERMISSION_STATUS>
<RPC_CLIENTID>508263bb-692e-4439-8738-ff89b8b03923</RPC_CLIENTID> <RPC_CLIENTID>a90261a0-3759-4480-ba80-e10c9ae331e6</RPC_CLIENTID>
<RPC_CALLID>52</RPC_CALLID> <RPC_CALLID>52</RPC_CALLID>
</DATA> </DATA>
</RECORD> </RECORD>
@ -700,8 +700,8 @@
<INODEID>0</INODEID> <INODEID>0</INODEID>
<PATH>/file_concat_1</PATH> <PATH>/file_concat_1</PATH>
<REPLICATION>1</REPLICATION> <REPLICATION>1</REPLICATION>
<MTIME>1387010471651</MTIME> <MTIME>1387480627238</MTIME>
<ATIME>1387010471595</ATIME> <ATIME>1387480627200</ATIME>
<BLOCKSIZE>512</BLOCKSIZE> <BLOCKSIZE>512</BLOCKSIZE>
<CLIENT_NAME></CLIENT_NAME> <CLIENT_NAME></CLIENT_NAME>
<CLIENT_MACHINE></CLIENT_MACHINE> <CLIENT_MACHINE></CLIENT_MACHINE>
@ -733,12 +733,12 @@
<TXID>56</TXID> <TXID>56</TXID>
<LENGTH>0</LENGTH> <LENGTH>0</LENGTH>
<TRG>/file_concat_target</TRG> <TRG>/file_concat_target</TRG>
<TIMESTAMP>1387010471663</TIMESTAMP> <TIMESTAMP>1387480627246</TIMESTAMP>
<SOURCES> <SOURCES>
<SOURCE1>/file_concat_0</SOURCE1> <SOURCE1>/file_concat_0</SOURCE1>
<SOURCE2>/file_concat_1</SOURCE2> <SOURCE2>/file_concat_1</SOURCE2>
</SOURCES> </SOURCES>
<RPC_CLIENTID>508263bb-692e-4439-8738-ff89b8b03923</RPC_CLIENTID> <RPC_CLIENTID>a90261a0-3759-4480-ba80-e10c9ae331e6</RPC_CLIENTID>
<RPC_CALLID>63</RPC_CALLID> <RPC_CALLID>63</RPC_CALLID>
</DATA> </DATA>
</RECORD> </RECORD>
@ -750,14 +750,14 @@
<INODEID>16392</INODEID> <INODEID>16392</INODEID>
<PATH>/file_symlink</PATH> <PATH>/file_symlink</PATH>
<VALUE>/file_concat_target</VALUE> <VALUE>/file_concat_target</VALUE>
<MTIME>1387010471674</MTIME> <MTIME>1387480627255</MTIME>
<ATIME>1387010471674</ATIME> <ATIME>1387480627255</ATIME>
<PERMISSION_STATUS> <PERMISSION_STATUS>
<USERNAME>andrew</USERNAME> <USERNAME>andrew</USERNAME>
<GROUPNAME>supergroup</GROUPNAME> <GROUPNAME>supergroup</GROUPNAME>
<MODE>511</MODE> <MODE>511</MODE>
</PERMISSION_STATUS> </PERMISSION_STATUS>
<RPC_CLIENTID>508263bb-692e-4439-8738-ff89b8b03923</RPC_CLIENTID> <RPC_CLIENTID>a90261a0-3759-4480-ba80-e10c9ae331e6</RPC_CLIENTID>
<RPC_CALLID>64</RPC_CALLID> <RPC_CALLID>64</RPC_CALLID>
</DATA> </DATA>
</RECORD> </RECORD>
@ -771,11 +771,11 @@
<OWNER>andrew</OWNER> <OWNER>andrew</OWNER>
<RENEWER>JobTracker</RENEWER> <RENEWER>JobTracker</RENEWER>
<REALUSER></REALUSER> <REALUSER></REALUSER>
<ISSUE_DATE>1387010471682</ISSUE_DATE> <ISSUE_DATE>1387480627262</ISSUE_DATE>
<MAX_DATE>1387615271682</MAX_DATE> <MAX_DATE>1388085427262</MAX_DATE>
<MASTER_KEY_ID>2</MASTER_KEY_ID> <MASTER_KEY_ID>2</MASTER_KEY_ID>
</DELEGATION_TOKEN_IDENTIFIER> </DELEGATION_TOKEN_IDENTIFIER>
<EXPIRY_TIME>1387096871682</EXPIRY_TIME> <EXPIRY_TIME>1387567027262</EXPIRY_TIME>
</DATA> </DATA>
</RECORD> </RECORD>
<RECORD> <RECORD>
@ -788,11 +788,11 @@
<OWNER>andrew</OWNER> <OWNER>andrew</OWNER>
<RENEWER>JobTracker</RENEWER> <RENEWER>JobTracker</RENEWER>
<REALUSER></REALUSER> <REALUSER></REALUSER>
<ISSUE_DATE>1387010471682</ISSUE_DATE> <ISSUE_DATE>1387480627262</ISSUE_DATE>
<MAX_DATE>1387615271682</MAX_DATE> <MAX_DATE>1388085427262</MAX_DATE>
<MASTER_KEY_ID>2</MASTER_KEY_ID> <MASTER_KEY_ID>2</MASTER_KEY_ID>
</DELEGATION_TOKEN_IDENTIFIER> </DELEGATION_TOKEN_IDENTIFIER>
<EXPIRY_TIME>1387096871717</EXPIRY_TIME> <EXPIRY_TIME>1387567027281</EXPIRY_TIME>
</DATA> </DATA>
</RECORD> </RECORD>
<RECORD> <RECORD>
@ -805,8 +805,8 @@
<OWNER>andrew</OWNER> <OWNER>andrew</OWNER>
<RENEWER>JobTracker</RENEWER> <RENEWER>JobTracker</RENEWER>
<REALUSER></REALUSER> <REALUSER></REALUSER>
<ISSUE_DATE>1387010471682</ISSUE_DATE> <ISSUE_DATE>1387480627262</ISSUE_DATE>
<MAX_DATE>1387615271682</MAX_DATE> <MAX_DATE>1388085427262</MAX_DATE>
<MASTER_KEY_ID>2</MASTER_KEY_ID> <MASTER_KEY_ID>2</MASTER_KEY_ID>
</DELEGATION_TOKEN_IDENTIFIER> </DELEGATION_TOKEN_IDENTIFIER>
</DATA> </DATA>
@ -820,7 +820,8 @@
<GROUPNAME>andrew</GROUPNAME> <GROUPNAME>andrew</GROUPNAME>
<MODE>493</MODE> <MODE>493</MODE>
<LIMIT>9223372036854775807</LIMIT> <LIMIT>9223372036854775807</LIMIT>
<RPC_CLIENTID>508263bb-692e-4439-8738-ff89b8b03923</RPC_CLIENTID> <MAXRELATIVEEXPIRY>2305843009213693951</MAXRELATIVEEXPIRY>
<RPC_CLIENTID>a90261a0-3759-4480-ba80-e10c9ae331e6</RPC_CLIENTID>
<RPC_CALLID>68</RPC_CALLID> <RPC_CALLID>68</RPC_CALLID>
</DATA> </DATA>
</RECORD> </RECORD>
@ -833,7 +834,7 @@
<GROUPNAME>party</GROUPNAME> <GROUPNAME>party</GROUPNAME>
<MODE>448</MODE> <MODE>448</MODE>
<LIMIT>1989</LIMIT> <LIMIT>1989</LIMIT>
<RPC_CLIENTID>508263bb-692e-4439-8738-ff89b8b03923</RPC_CLIENTID> <RPC_CLIENTID>a90261a0-3759-4480-ba80-e10c9ae331e6</RPC_CLIENTID>
<RPC_CALLID>69</RPC_CALLID> <RPC_CALLID>69</RPC_CALLID>
</DATA> </DATA>
</RECORD> </RECORD>
@ -845,8 +846,8 @@
<PATH>/bar</PATH> <PATH>/bar</PATH>
<REPLICATION>1</REPLICATION> <REPLICATION>1</REPLICATION>
<POOL>poolparty</POOL> <POOL>poolparty</POOL>
<EXPIRATION>-1</EXPIRATION> <EXPIRATION>2305844396694321272</EXPIRATION>
<RPC_CLIENTID>508263bb-692e-4439-8738-ff89b8b03923</RPC_CLIENTID> <RPC_CLIENTID>a90261a0-3759-4480-ba80-e10c9ae331e6</RPC_CLIENTID>
<RPC_CALLID>70</RPC_CALLID> <RPC_CALLID>70</RPC_CALLID>
</DATA> </DATA>
</RECORD> </RECORD>
@ -856,7 +857,7 @@
<TXID>64</TXID> <TXID>64</TXID>
<ID>1</ID> <ID>1</ID>
<PATH>/bar2</PATH> <PATH>/bar2</PATH>
<RPC_CLIENTID>508263bb-692e-4439-8738-ff89b8b03923</RPC_CLIENTID> <RPC_CLIENTID>a90261a0-3759-4480-ba80-e10c9ae331e6</RPC_CLIENTID>
<RPC_CALLID>71</RPC_CALLID> <RPC_CALLID>71</RPC_CALLID>
</DATA> </DATA>
</RECORD> </RECORD>
@ -865,7 +866,7 @@
<DATA> <DATA>
<TXID>65</TXID> <TXID>65</TXID>
<ID>1</ID> <ID>1</ID>
<RPC_CLIENTID>508263bb-692e-4439-8738-ff89b8b03923</RPC_CLIENTID> <RPC_CLIENTID>a90261a0-3759-4480-ba80-e10c9ae331e6</RPC_CLIENTID>
<RPC_CALLID>72</RPC_CALLID> <RPC_CALLID>72</RPC_CALLID>
</DATA> </DATA>
</RECORD> </RECORD>
@ -874,7 +875,7 @@
<DATA> <DATA>
<TXID>66</TXID> <TXID>66</TXID>
<POOLNAME>poolparty</POOLNAME> <POOLNAME>poolparty</POOLNAME>
<RPC_CLIENTID>508263bb-692e-4439-8738-ff89b8b03923</RPC_CLIENTID> <RPC_CLIENTID>a90261a0-3759-4480-ba80-e10c9ae331e6</RPC_CLIENTID>
<RPC_CALLID>73</RPC_CALLID> <RPC_CALLID>73</RPC_CALLID>
</DATA> </DATA>
</RECORD> </RECORD>
@ -886,17 +887,17 @@
<INODEID>16393</INODEID> <INODEID>16393</INODEID>
<PATH>/hard-lease-recovery-test</PATH> <PATH>/hard-lease-recovery-test</PATH>
<REPLICATION>1</REPLICATION> <REPLICATION>1</REPLICATION>
<MTIME>1387010471802</MTIME> <MTIME>1387480627356</MTIME>
<ATIME>1387010471802</ATIME> <ATIME>1387480627356</ATIME>
<BLOCKSIZE>512</BLOCKSIZE> <BLOCKSIZE>512</BLOCKSIZE>
<CLIENT_NAME>DFSClient_NONMAPREDUCE_-52011019_1</CLIENT_NAME> <CLIENT_NAME>DFSClient_NONMAPREDUCE_1147796111_1</CLIENT_NAME>
<CLIENT_MACHINE>127.0.0.1</CLIENT_MACHINE> <CLIENT_MACHINE>127.0.0.1</CLIENT_MACHINE>
<PERMISSION_STATUS> <PERMISSION_STATUS>
<USERNAME>andrew</USERNAME> <USERNAME>andrew</USERNAME>
<GROUPNAME>supergroup</GROUPNAME> <GROUPNAME>supergroup</GROUPNAME>
<MODE>420</MODE> <MODE>420</MODE>
</PERMISSION_STATUS> </PERMISSION_STATUS>
<RPC_CLIENTID>508263bb-692e-4439-8738-ff89b8b03923</RPC_CLIENTID> <RPC_CLIENTID>a90261a0-3759-4480-ba80-e10c9ae331e6</RPC_CLIENTID>
<RPC_CALLID>74</RPC_CALLID> <RPC_CALLID>74</RPC_CALLID>
</DATA> </DATA>
</RECORD> </RECORD>
@ -953,7 +954,7 @@
<OPCODE>OP_REASSIGN_LEASE</OPCODE> <OPCODE>OP_REASSIGN_LEASE</OPCODE>
<DATA> <DATA>
<TXID>73</TXID> <TXID>73</TXID>
<LEASEHOLDER>DFSClient_NONMAPREDUCE_-52011019_1</LEASEHOLDER> <LEASEHOLDER>DFSClient_NONMAPREDUCE_1147796111_1</LEASEHOLDER>
<PATH>/hard-lease-recovery-test</PATH> <PATH>/hard-lease-recovery-test</PATH>
<NEWHOLDER>HDFS_NameNode</NEWHOLDER> <NEWHOLDER>HDFS_NameNode</NEWHOLDER>
</DATA> </DATA>
@ -966,8 +967,8 @@
<INODEID>0</INODEID> <INODEID>0</INODEID>
<PATH>/hard-lease-recovery-test</PATH> <PATH>/hard-lease-recovery-test</PATH>
<REPLICATION>1</REPLICATION> <REPLICATION>1</REPLICATION>
<MTIME>1387010474126</MTIME> <MTIME>1387480629729</MTIME>
<ATIME>1387010471802</ATIME> <ATIME>1387480627356</ATIME>
<BLOCKSIZE>512</BLOCKSIZE> <BLOCKSIZE>512</BLOCKSIZE>
<CLIENT_NAME></CLIENT_NAME> <CLIENT_NAME></CLIENT_NAME>
<CLIENT_MACHINE></CLIENT_MACHINE> <CLIENT_MACHINE></CLIENT_MACHINE>

View File

@ -417,11 +417,11 @@
</comparator> </comparator>
<comparator> <comparator>
<type>SubstringComparator</type> <type>SubstringComparator</type>
<expected-output>bar alice alicegroup rwxr-xr-x unlimited 0 0 0 0 0</expected-output> <expected-output>bar alice alicegroup rwxr-xr-x unlimited never 0 0 0 0 0</expected-output>
</comparator> </comparator>
<comparator> <comparator>
<type>SubstringComparator</type> <type>SubstringComparator</type>
<expected-output>foo bob bob rw-rw-r-- unlimited 0 0 0 0 0</expected-output> <expected-output>foo bob bob rw-rw-r-- unlimited never 0 0 0 0 0</expected-output>
</comparator> </comparator>
</comparators> </comparators>
</test> </test>
@ -457,5 +457,37 @@
</comparator> </comparator>
</comparators> </comparators>
</test> </test>
<test> <!--Tested -->
<description>Testing pool max ttl settings</description>
<test-commands>
<cache-admin-command>-addPool pool1 -owner andrew -group andrew</cache-admin-command>
<cache-admin-command>-addPool pool2 -owner andrew -group andrew -maxTtl 999d</cache-admin-command>
<cache-admin-command>-modifyPool pool2 -maxTtl never</cache-admin-command>
<cache-admin-command>-addPool pool3 -owner andrew -group andrew -maxTtl 4h</cache-admin-command>
<cache-admin-command>-listPools</cache-admin-command>
</test-commands>
<cleanup-commands>
<cache-admin-command>-removePool pool1</cache-admin-command>
</cleanup-commands>
<comparators>
<comparator>
<type>SubstringComparator</type>
<expected-output>Found 3 results</expected-output>
</comparator>
<comparator>
<type>SubstringComparator</type>
<expected-output>pool1 andrew andrew rwxr-xr-x unlimited never</expected-output>
</comparator>
<comparator>
<type>SubstringComparator</type>
<expected-output>pool2 andrew andrew rwxr-xr-x unlimited never</expected-output>
</comparator>
<comparator>
<type>SubstringComparator</type>
<expected-output>pool3 andrew andrew rwxr-xr-x unlimited 000:04:00:00.000</expected-output>
</comparator>
</comparators>
</test>
</tests> </tests>
</configuration> </configuration>

View File

@ -187,6 +187,12 @@ Release 2.4.0 - UNRELEASED
MAPREDUCE-5052. Job History UI and web services confusing job start time and MAPREDUCE-5052. Job History UI and web services confusing job start time and
job submit time (Chen He via jeagles) job submit time (Chen He via jeagles)
MAPREDUCE-5692. Add explicit diagnostics when a task attempt is killed due
to speculative execution (Gera Shegalov via Sandy Ryza)
MAPREDUCE-5550. Task Status message (reporter.setStatus) not shown in UI
with Hadoop 2.0 (Gera Shegalov via Sandy Ryza)
OPTIMIZATIONS OPTIMIZATIONS
MAPREDUCE-5484. YarnChild unnecessarily loads job conf twice (Sandy Ryza) MAPREDUCE-5484. YarnChild unnecessarily loads job conf twice (Sandy Ryza)

View File

@ -1552,6 +1552,12 @@ public void transition(TaskAttemptImpl taskAttempt,
TaskAttemptEvent event) { TaskAttemptEvent event) {
//set the finish time //set the finish time
taskAttempt.setFinishTime(); taskAttempt.setFinishTime();
if (event instanceof TaskAttemptKillEvent) {
taskAttempt.addDiagnosticInfo(
((TaskAttemptKillEvent) event).getMessage());
}
//send the deallocate event to ContainerAllocator //send the deallocate event to ContainerAllocator
taskAttempt.eventHandler.handle( taskAttempt.eventHandler.handle(
new ContainerAllocatorEvent(taskAttempt.attemptId, new ContainerAllocatorEvent(taskAttempt.attemptId,
@ -1855,6 +1861,12 @@ public void transition(TaskAttemptImpl taskAttempt,
LOG.debug("Not generating HistoryFinish event since start event not " + LOG.debug("Not generating HistoryFinish event since start event not " +
"generated for taskAttempt: " + taskAttempt.getID()); "generated for taskAttempt: " + taskAttempt.getID());
} }
if (event instanceof TaskAttemptKillEvent) {
taskAttempt.addDiagnosticInfo(
((TaskAttemptKillEvent) event).getMessage());
}
// taskAttempt.logAttemptFinishedEvent(TaskAttemptStateInternal.KILLED); Not logging Map/Reduce attempts in case of failure. // taskAttempt.logAttemptFinishedEvent(TaskAttemptStateInternal.KILLED); Not logging Map/Reduce attempts in case of failure.
taskAttempt.eventHandler.handle(new TaskTAttemptEvent( taskAttempt.eventHandler.handle(new TaskTAttemptEvent(
taskAttempt.attemptId, taskAttempt.attemptId,
@ -1872,6 +1884,12 @@ public void transition(TaskAttemptImpl taskAttempt,
// for it // for it
taskAttempt.taskAttemptListener.unregister( taskAttempt.taskAttemptListener.unregister(
taskAttempt.attemptId, taskAttempt.jvmID); taskAttempt.attemptId, taskAttempt.jvmID);
if (event instanceof TaskAttemptKillEvent) {
taskAttempt.addDiagnosticInfo(
((TaskAttemptKillEvent) event).getMessage());
}
taskAttempt.reportedStatus.progress = 1.0f; taskAttempt.reportedStatus.progress = 1.0f;
taskAttempt.updateProgressSplits(); taskAttempt.updateProgressSplits();
//send the cleanup event to containerLauncher //send the cleanup event to containerLauncher

View File

@ -69,6 +69,7 @@
import org.apache.hadoop.mapreduce.v2.app.job.event.JobTaskAttemptCompletedEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.JobTaskAttemptCompletedEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobTaskEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.JobTaskEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptKillEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptRecoverEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptRecoverEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEvent;
@ -100,6 +101,7 @@
public abstract class TaskImpl implements Task, EventHandler<TaskEvent> { public abstract class TaskImpl implements Task, EventHandler<TaskEvent> {
private static final Log LOG = LogFactory.getLog(TaskImpl.class); private static final Log LOG = LogFactory.getLog(TaskImpl.class);
private static final String SPECULATION = "Speculation: ";
protected final JobConf conf; protected final JobConf conf;
protected final Path jobFile; protected final Path jobFile;
@ -374,11 +376,15 @@ public TaskReport getReport() {
TaskReport report = recordFactory.newRecordInstance(TaskReport.class); TaskReport report = recordFactory.newRecordInstance(TaskReport.class);
readLock.lock(); readLock.lock();
try { try {
TaskAttempt bestAttempt = selectBestAttempt();
report.setTaskId(taskId); report.setTaskId(taskId);
report.setStartTime(getLaunchTime()); report.setStartTime(getLaunchTime());
report.setFinishTime(getFinishTime()); report.setFinishTime(getFinishTime());
report.setTaskState(getState()); report.setTaskState(getState());
report.setProgress(getProgress()); report.setProgress(bestAttempt == null ? 0f : bestAttempt.getProgress());
report.setStatus(bestAttempt == null
? ""
: bestAttempt.getReport().getStateString());
for (TaskAttempt attempt : attempts.values()) { for (TaskAttempt attempt : attempts.values()) {
if (TaskAttemptState.RUNNING.equals(attempt.getState())) { if (TaskAttemptState.RUNNING.equals(attempt.getState())) {
@ -398,7 +404,9 @@ public TaskReport getReport() {
// Add a copy of counters as the last step so that their lifetime on heap // Add a copy of counters as the last step so that their lifetime on heap
// is as small as possible. // is as small as possible.
report.setCounters(TypeConverter.toYarn(getCounters())); report.setCounters(TypeConverter.toYarn(bestAttempt == null
? TaskAttemptImpl.EMPTY_COUNTERS
: bestAttempt.getCounters()));
return report; return report;
} finally { } finally {
@ -906,8 +914,8 @@ public void transition(TaskImpl task, TaskEvent event) {
LOG.info(task.commitAttempt LOG.info(task.commitAttempt
+ " already given a go for committing the task output, so killing " + " already given a go for committing the task output, so killing "
+ attemptID); + attemptID);
task.eventHandler.handle(new TaskAttemptEvent( task.eventHandler.handle(new TaskAttemptKillEvent(attemptID,
attemptID, TaskAttemptEventType.TA_KILL)); SPECULATION + task.commitAttempt + " committed first!"));
} }
} }
} }
@ -932,9 +940,8 @@ public void transition(TaskImpl task, TaskEvent event) {
// other reasons. // other reasons.
!attempt.isFinished()) { !attempt.isFinished()) {
LOG.info("Issuing kill to other attempt " + attempt.getID()); LOG.info("Issuing kill to other attempt " + attempt.getID());
task.eventHandler.handle( task.eventHandler.handle(new TaskAttemptKillEvent(attempt.getID(),
new TaskAttemptEvent(attempt.getID(), SPECULATION + task.successfulAttempt + " succeeded first!"));
TaskAttemptEventType.TA_KILL));
} }
} }
task.finished(TaskStateInternal.SUCCEEDED); task.finished(TaskStateInternal.SUCCEEDED);
@ -1199,8 +1206,7 @@ public void transition(TaskImpl task, TaskEvent event) {
private void killUnfinishedAttempt(TaskAttempt attempt, String logMsg) { private void killUnfinishedAttempt(TaskAttempt attempt, String logMsg) {
if (attempt != null && !attempt.isFinished()) { if (attempt != null && !attempt.isFinished()) {
eventHandler.handle( eventHandler.handle(
new TaskAttemptEvent(attempt.getID(), new TaskAttemptKillEvent(attempt.getID(), logMsg));
TaskAttemptEventType.TA_KILL));
} }
} }

View File

@ -63,6 +63,7 @@ protected void render(Block html) {
th(".id", "Attempt"). th(".id", "Attempt").
th(".progress", "Progress"). th(".progress", "Progress").
th(".state", "State"). th(".state", "State").
th(".status", "Status").
th(".node", "Node"). th(".node", "Node").
th(".logs", "Logs"). th(".logs", "Logs").
th(".tsh", "Started"). th(".tsh", "Started").
@ -84,6 +85,7 @@ protected void render(Block html) {
.append(ta.getId()).append("\",\"") .append(ta.getId()).append("\",\"")
.append(progress).append("\",\"") .append(progress).append("\",\"")
.append(ta.getState().toString()).append("\",\"") .append(ta.getState().toString()).append("\",\"")
.append(ta.getStatus()).append("\",\"")
.append(nodeHttpAddr == null ? "N/A" : .append(nodeHttpAddr == null ? "N/A" :
"<a class='nodelink' href='" + MRWebAppUtil.getYARNWebappScheme() + nodeHttpAddr + "'>" "<a class='nodelink' href='" + MRWebAppUtil.getYARNWebappScheme() + nodeHttpAddr + "'>"
@ -144,13 +146,13 @@ private String attemptsTableInit() {
.append("\n,aoColumnDefs:[\n") .append("\n,aoColumnDefs:[\n")
//logs column should not filterable (it includes container ID which may pollute searches) //logs column should not filterable (it includes container ID which may pollute searches)
.append("\n{'aTargets': [ 4 ]") .append("\n{'aTargets': [ 5 ]")
.append(", 'bSearchable': false }") .append(", 'bSearchable': false }")
.append("\n, {'sType':'numeric', 'aTargets': [ 5, 6") .append("\n, {'sType':'numeric', 'aTargets': [ 6, 7")
.append(" ], 'mRender': renderHadoopDate }") .append(" ], 'mRender': renderHadoopDate }")
.append("\n, {'sType':'numeric', 'aTargets': [ 7") .append("\n, {'sType':'numeric', 'aTargets': [ 8")
.append(" ], 'mRender': renderHadoopElapsedTime }]") .append(" ], 'mRender': renderHadoopElapsedTime }]")
// Sort by id upon page load // Sort by id upon page load

View File

@ -59,6 +59,7 @@ public class TasksBlock extends HtmlBlock {
tr(). tr().
th("Task"). th("Task").
th("Progress"). th("Progress").
th("Status").
th("State"). th("State").
th("Start Time"). th("Start Time").
th("Finish Time"). th("Finish Time").
@ -81,6 +82,7 @@ public class TasksBlock extends HtmlBlock {
.append(join(pct, '%')).append("'> ").append("<div class='") .append(join(pct, '%')).append("'> ").append("<div class='")
.append(C_PROGRESSBAR_VALUE).append("' style='") .append(C_PROGRESSBAR_VALUE).append("' style='")
.append(join("width:", pct, '%')).append("'> </div> </div>\",\"") .append(join("width:", pct, '%')).append("'> </div> </div>\",\"")
.append(info.getStatus()).append("\",\"")
.append(info.getState()).append("\",\"") .append(info.getState()).append("\",\"")
.append(info.getStartTime()).append("\",\"") .append(info.getStartTime()).append("\",\"")

View File

@ -50,10 +50,10 @@ private String tasksTableInit() {
.append(", 'mRender': parseHadoopProgress }") .append(", 'mRender': parseHadoopProgress }")
.append("\n, {'sType':'numeric', 'aTargets': [3, 4]") .append("\n, {'sType':'numeric', 'aTargets': [4, 5]")
.append(", 'mRender': renderHadoopDate }") .append(", 'mRender': renderHadoopDate }")
.append("\n, {'sType':'numeric', 'aTargets': [5]") .append("\n, {'sType':'numeric', 'aTargets': [6]")
.append(", 'mRender': renderHadoopElapsedTime }]") .append(", 'mRender': renderHadoopElapsedTime }]")
// Sort by id upon page load // Sort by id upon page load

View File

@ -25,6 +25,7 @@
import javax.xml.bind.annotation.XmlSeeAlso; import javax.xml.bind.annotation.XmlSeeAlso;
import javax.xml.bind.annotation.XmlTransient; import javax.xml.bind.annotation.XmlTransient;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptReport;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState; import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState;
import org.apache.hadoop.mapreduce.v2.api.records.TaskType; import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt; import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
@ -45,6 +46,7 @@ public class TaskAttemptInfo {
protected String id; protected String id;
protected String rack; protected String rack;
protected TaskAttemptState state; protected TaskAttemptState state;
protected String status;
protected String nodeHttpAddress; protected String nodeHttpAddress;
protected String diagnostics; protected String diagnostics;
protected String type; protected String type;
@ -61,29 +63,23 @@ public TaskAttemptInfo(TaskAttempt ta, Boolean isRunning) {
} }
public TaskAttemptInfo(TaskAttempt ta, TaskType type, Boolean isRunning) { public TaskAttemptInfo(TaskAttempt ta, TaskType type, Boolean isRunning) {
final TaskAttemptReport report = ta.getReport();
this.type = type.toString(); this.type = type.toString();
this.id = MRApps.toString(ta.getID()); this.id = MRApps.toString(ta.getID());
this.nodeHttpAddress = ta.getNodeHttpAddress(); this.nodeHttpAddress = ta.getNodeHttpAddress();
this.startTime = ta.getLaunchTime(); this.startTime = report.getStartTime();
this.finishTime = ta.getFinishTime(); this.finishTime = report.getFinishTime();
this.assignedContainerId = ConverterUtils.toString(ta this.assignedContainerId = ConverterUtils.toString(report.getContainerId());
.getAssignedContainerID()); this.assignedContainer = report.getContainerId();
this.assignedContainer = ta.getAssignedContainerID(); this.progress = report.getProgress() * 100;
this.progress = ta.getProgress() * 100; this.status = report.getStateString();
this.state = ta.getState(); this.state = report.getTaskAttemptState();
this.elapsedTime = Times this.elapsedTime = Times
.elapsed(this.startTime, this.finishTime, isRunning); .elapsed(this.startTime, this.finishTime, isRunning);
if (this.elapsedTime == -1) { if (this.elapsedTime == -1) {
this.elapsedTime = 0; this.elapsedTime = 0;
} }
List<String> diagnostics = ta.getDiagnostics(); this.diagnostics = report.getDiagnosticInfo();
if (diagnostics != null && !diagnostics.isEmpty()) {
StringBuffer b = new StringBuffer();
for (String diag : diagnostics) {
b.append(diag);
}
this.diagnostics = b.toString();
}
this.rack = ta.getNodeRackName(); this.rack = ta.getNodeRackName();
} }
@ -99,6 +95,10 @@ public String getState() {
return this.state.toString(); return this.state.toString();
} }
public String getStatus() {
return status;
}
public String getId() { public String getId() {
return this.id; return this.id;
} }

View File

@ -43,6 +43,7 @@ public class TaskInfo {
protected TaskState state; protected TaskState state;
protected String type; protected String type;
protected String successfulAttempt; protected String successfulAttempt;
protected String status;
@XmlTransient @XmlTransient
int taskNum; int taskNum;
@ -66,6 +67,7 @@ public TaskInfo(Task task) {
this.elapsedTime = 0; this.elapsedTime = 0;
} }
this.progress = report.getProgress() * 100; this.progress = report.getProgress() * 100;
this.status = report.getStatus();
this.id = MRApps.toString(task.getID()); this.id = MRApps.toString(task.getID());
this.taskNum = task.getID().getId(); this.taskNum = task.getID().getId();
this.successful = getSuccessfulAttempt(task); this.successful = getSuccessfulAttempt(task);
@ -121,4 +123,7 @@ private TaskAttempt getSuccessfulAttempt(Task task) {
return null; return null;
} }
public String getStatus() {
return status;
}
} }

View File

@ -174,22 +174,37 @@ public static TaskReport newTaskReport(TaskId id) {
report.setFinishTime(System.currentTimeMillis() report.setFinishTime(System.currentTimeMillis()
+ (int) (Math.random() * DT) + 1); + (int) (Math.random() * DT) + 1);
report.setProgress((float) Math.random()); report.setProgress((float) Math.random());
report.setStatus("Moving average: " + Math.random());
report.setCounters(TypeConverter.toYarn(newCounters())); report.setCounters(TypeConverter.toYarn(newCounters()));
report.setTaskState(TASK_STATES.next()); report.setTaskState(TASK_STATES.next());
return report; return report;
} }
public static TaskAttemptReport newTaskAttemptReport(TaskAttemptId id) { public static TaskAttemptReport newTaskAttemptReport(TaskAttemptId id) {
ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(
id.getTaskId().getJobId().getAppId(), 0);
ContainerId containerId = ContainerId.newInstance(appAttemptId, 0);
TaskAttemptReport report = Records.newRecord(TaskAttemptReport.class); TaskAttemptReport report = Records.newRecord(TaskAttemptReport.class);
report.setTaskAttemptId(id); report.setTaskAttemptId(id);
report report
.setStartTime(System.currentTimeMillis() - (int) (Math.random() * DT)); .setStartTime(System.currentTimeMillis() - (int) (Math.random() * DT));
report.setFinishTime(System.currentTimeMillis() report.setFinishTime(System.currentTimeMillis()
+ (int) (Math.random() * DT) + 1); + (int) (Math.random() * DT) + 1);
if (id.getTaskId().getTaskType() == TaskType.REDUCE) {
report.setShuffleFinishTime(
(report.getFinishTime() + report.getStartTime()) / 2);
report.setSortFinishTime(
(report.getFinishTime() + report.getShuffleFinishTime()) / 2);
}
report.setPhase(PHASES.next()); report.setPhase(PHASES.next());
report.setTaskAttemptState(TASK_ATTEMPT_STATES.next()); report.setTaskAttemptState(TASK_ATTEMPT_STATES.next());
report.setProgress((float) Math.random()); report.setProgress((float) Math.random());
report.setCounters(TypeConverter.toYarn(newCounters())); report.setCounters(TypeConverter.toYarn(newCounters()));
report.setContainerId(containerId);
report.setDiagnosticInfo(DIAGS.next());
report.setStateString("Moving average " + Math.random());
return report; return report;
} }
@ -230,8 +245,6 @@ public static TaskAttempt newTaskAttempt(TaskId tid, int i) {
taid.setTaskId(tid); taid.setTaskId(tid);
taid.setId(i); taid.setId(i);
final TaskAttemptReport report = newTaskAttemptReport(taid); final TaskAttemptReport report = newTaskAttemptReport(taid);
final List<String> diags = Lists.newArrayList();
diags.add(DIAGS.next());
return new TaskAttempt() { return new TaskAttempt() {
@Override @Override
public NodeId getNodeId() throws UnsupportedOperationException{ public NodeId getNodeId() throws UnsupportedOperationException{
@ -250,12 +263,12 @@ public TaskAttemptReport getReport() {
@Override @Override
public long getLaunchTime() { public long getLaunchTime() {
return 0; return report.getStartTime();
} }
@Override @Override
public long getFinishTime() { public long getFinishTime() {
return 0; return report.getFinishTime();
} }
@Override @Override
@ -313,7 +326,7 @@ public String getNodeHttpAddress() {
@Override @Override
public List<String> getDiagnostics() { public List<String> getDiagnostics() {
return diags; return Lists.newArrayList(report.getDiagnosticInfo());
} }
@Override @Override
@ -323,12 +336,12 @@ public String getAssignedContainerMgrAddress() {
@Override @Override
public long getShuffleFinishTime() { public long getShuffleFinishTime() {
return 0; return report.getShuffleFinishTime();
} }
@Override @Override
public long getSortFinishTime() { public long getSortFinishTime() {
return 0; return report.getSortFinishTime();
} }
@Override @Override

View File

@ -1,3 +1,4 @@
/**
/** /**
* Licensed to the Apache Software Foundation (ASF) under one * Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file * or more contributor license agreements. See the NOTICE file
@ -425,9 +426,9 @@ public void verifyAMTaskAttemptXML(Element element, TaskAttempt att,
public void verifyAMTaskAttempt(JSONObject info, TaskAttempt att, public void verifyAMTaskAttempt(JSONObject info, TaskAttempt att,
TaskType ttype) throws JSONException { TaskType ttype) throws JSONException {
if (ttype == TaskType.REDUCE) { if (ttype == TaskType.REDUCE) {
assertEquals("incorrect number of elements", 16, info.length()); assertEquals("incorrect number of elements", 17, info.length());
} else { } else {
assertEquals("incorrect number of elements", 11, info.length()); assertEquals("incorrect number of elements", 12, info.length());
} }
verifyTaskAttemptGeneric(att, ttype, info.getString("id"), verifyTaskAttemptGeneric(att, ttype, info.getString("id"),
@ -532,11 +533,11 @@ public void verifyReduceTaskAttemptGeneric(TaskAttempt ta,
assertEquals("mergeFinishTime wrong", ta.getSortFinishTime(), assertEquals("mergeFinishTime wrong", ta.getSortFinishTime(),
mergeFinishTime); mergeFinishTime);
assertEquals("elapsedShuffleTime wrong", assertEquals("elapsedShuffleTime wrong",
ta.getLaunchTime() - ta.getShuffleFinishTime(), elapsedShuffleTime); ta.getShuffleFinishTime() - ta.getLaunchTime(), elapsedShuffleTime);
assertEquals("elapsedMergeTime wrong", assertEquals("elapsedMergeTime wrong",
ta.getShuffleFinishTime() - ta.getSortFinishTime(), elapsedMergeTime); ta.getSortFinishTime() - ta.getShuffleFinishTime(), elapsedMergeTime);
assertEquals("elapsedReduceTime wrong", assertEquals("elapsedReduceTime wrong",
ta.getSortFinishTime() - ta.getFinishTime(), elapsedReduceTime); ta.getFinishTime() - ta.getSortFinishTime(), elapsedReduceTime);
} }
@Test @Test

View File

@ -525,12 +525,13 @@ public void testTaskIdXML() throws JSONException, Exception {
public void verifyAMSingleTask(JSONObject info, Task task) public void verifyAMSingleTask(JSONObject info, Task task)
throws JSONException { throws JSONException {
assertEquals("incorrect number of elements", 8, info.length()); assertEquals("incorrect number of elements", 9, info.length());
verifyTaskGeneric(task, info.getString("id"), info.getString("state"), verifyTaskGeneric(task, info.getString("id"), info.getString("state"),
info.getString("type"), info.getString("successfulAttempt"), info.getString("type"), info.getString("successfulAttempt"),
info.getLong("startTime"), info.getLong("finishTime"), info.getLong("startTime"), info.getLong("finishTime"),
info.getLong("elapsedTime"), (float) info.getDouble("progress")); info.getLong("elapsedTime"), (float) info.getDouble("progress"),
info.getString("status"));
} }
public void verifyAMTask(JSONArray arr, Job job, String type) public void verifyAMTask(JSONArray arr, Job job, String type)
@ -555,7 +556,7 @@ public void verifyAMTask(JSONArray arr, Job job, String type)
public void verifyTaskGeneric(Task task, String id, String state, public void verifyTaskGeneric(Task task, String id, String state,
String type, String successfulAttempt, long startTime, long finishTime, String type, String successfulAttempt, long startTime, long finishTime,
long elapsedTime, float progress) { long elapsedTime, float progress, String status) {
TaskId taskid = task.getID(); TaskId taskid = task.getID();
String tid = MRApps.toString(taskid); String tid = MRApps.toString(taskid);
@ -572,6 +573,7 @@ public void verifyTaskGeneric(Task task, String id, String state,
assertEquals("finishTime wrong", report.getFinishTime(), finishTime); assertEquals("finishTime wrong", report.getFinishTime(), finishTime);
assertEquals("elapsedTime wrong", finishTime - startTime, elapsedTime); assertEquals("elapsedTime wrong", finishTime - startTime, elapsedTime);
assertEquals("progress wrong", report.getProgress() * 100, progress, 1e-3f); assertEquals("progress wrong", report.getProgress() * 100, progress, 1e-3f);
assertEquals("status wrong", report.getStatus(), status);
} }
public void verifyAMSingleTaskXML(Element element, Task task) { public void verifyAMSingleTaskXML(Element element, Task task) {
@ -582,7 +584,8 @@ public void verifyAMSingleTaskXML(Element element, Task task) {
WebServicesTestUtils.getXmlLong(element, "startTime"), WebServicesTestUtils.getXmlLong(element, "startTime"),
WebServicesTestUtils.getXmlLong(element, "finishTime"), WebServicesTestUtils.getXmlLong(element, "finishTime"),
WebServicesTestUtils.getXmlLong(element, "elapsedTime"), WebServicesTestUtils.getXmlLong(element, "elapsedTime"),
WebServicesTestUtils.getXmlFloat(element, "progress")); WebServicesTestUtils.getXmlFloat(element, "progress"),
WebServicesTestUtils.getXmlString(element, "status"));
} }
public void verifyAMTaskXML(NodeList nodes, Job job) { public void verifyAMTaskXML(NodeList nodes, Job job) {

View File

@ -24,10 +24,10 @@ public interface TaskReport {
public abstract TaskId getTaskId(); public abstract TaskId getTaskId();
public abstract TaskState getTaskState(); public abstract TaskState getTaskState();
public abstract float getProgress(); public abstract float getProgress();
public abstract String getStatus();
public abstract long getStartTime(); public abstract long getStartTime();
public abstract long getFinishTime(); public abstract long getFinishTime();
public abstract Counters getCounters(); public abstract Counters getCounters();
public abstract List<TaskAttemptId> getRunningAttemptsList(); public abstract List<TaskAttemptId> getRunningAttemptsList();
public abstract TaskAttemptId getRunningAttempt(int index); public abstract TaskAttemptId getRunningAttempt(int index);
public abstract int getRunningAttemptsCount(); public abstract int getRunningAttemptsCount();
@ -42,6 +42,7 @@ public interface TaskReport {
public abstract void setTaskId(TaskId taskId); public abstract void setTaskId(TaskId taskId);
public abstract void setTaskState(TaskState taskState); public abstract void setTaskState(TaskState taskState);
public abstract void setProgress(float progress); public abstract void setProgress(float progress);
public abstract void setStatus(String status);
public abstract void setStartTime(long startTime); public abstract void setStartTime(long startTime);
public abstract void setFinishTime(long finishTime); public abstract void setFinishTime(long finishTime);
public abstract void setCounters(Counters counters); public abstract void setCounters(Counters counters);

View File

@ -49,6 +49,7 @@ public class TaskReportPBImpl extends ProtoBase<TaskReportProto> implements Task
private List<TaskAttemptId> runningAttempts = null; private List<TaskAttemptId> runningAttempts = null;
private TaskAttemptId successfulAttemptId = null; private TaskAttemptId successfulAttemptId = null;
private List<String> diagnostics = null; private List<String> diagnostics = null;
private String status;
public TaskReportPBImpl() { public TaskReportPBImpl() {
@ -171,11 +172,22 @@ public float getProgress() {
return (p.getProgress()); return (p.getProgress());
} }
@Override
public String getStatus() {
return status;
}
@Override @Override
public void setProgress(float progress) { public void setProgress(float progress) {
maybeInitBuilder(); maybeInitBuilder();
builder.setProgress((progress)); builder.setProgress((progress));
} }
@Override
public void setStatus(String status) {
this.status = status;
}
@Override @Override
public TaskState getTaskState() { public TaskState getTaskState() {
TaskReportProtoOrBuilder p = viaProto ? proto : builder; TaskReportProtoOrBuilder p = viaProto ? proto : builder;

View File

@ -34,6 +34,7 @@
import org.apache.hadoop.mapreduce.v2.api.records.TaskType; import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt; import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
import org.apache.hadoop.mapreduce.v2.app.webapp.App; import org.apache.hadoop.mapreduce.v2.app.webapp.App;
import org.apache.hadoop.mapreduce.v2.app.webapp.dao.TaskAttemptInfo;
import org.apache.hadoop.mapreduce.v2.util.MRApps; import org.apache.hadoop.mapreduce.v2.util.MRApps;
import org.apache.hadoop.mapreduce.v2.util.MRWebAppUtil; import org.apache.hadoop.mapreduce.v2.util.MRWebAppUtil;
import org.apache.hadoop.yarn.util.Times; import org.apache.hadoop.yarn.util.Times;
@ -89,6 +90,7 @@ protected void render(Block html) {
headRow. headRow.
th(".id", "Attempt"). th(".id", "Attempt").
th(".state", "State"). th(".state", "State").
th(".status", "Status").
th(".node", "Node"). th(".node", "Node").
th(".logs", "Logs"). th(".logs", "Logs").
th(".tsh", "Start Time"); th(".tsh", "Start Time");
@ -113,15 +115,16 @@ protected void render(Block html) {
// DataTables to display // DataTables to display
StringBuilder attemptsTableData = new StringBuilder("[\n"); StringBuilder attemptsTableData = new StringBuilder("[\n");
for (TaskAttempt ta : getTaskAttempts()) { for (TaskAttempt attempt : getTaskAttempts()) {
String taid = MRApps.toString(ta.getID()); final TaskAttemptInfo ta = new TaskAttemptInfo(attempt, false);
String taid = ta.getId();
String nodeHttpAddr = ta.getNodeHttpAddress(); String nodeHttpAddr = ta.getNode();
String containerIdString = ta.getAssignedContainerID().toString(); String containerIdString = ta.getAssignedContainerIdStr();
String nodeIdString = ta.getAssignedContainerMgrAddress(); String nodeIdString = attempt.getAssignedContainerMgrAddress();
String nodeRackName = ta.getNodeRackName(); String nodeRackName = ta.getRack();
long attemptStartTime = ta.getLaunchTime(); long attemptStartTime = ta.getStartTime();
long shuffleFinishTime = -1; long shuffleFinishTime = -1;
long sortFinishTime = -1; long sortFinishTime = -1;
long attemptFinishTime = ta.getFinishTime(); long attemptFinishTime = ta.getFinishTime();
@ -129,8 +132,8 @@ protected void render(Block html) {
long elapsedSortTime = -1; long elapsedSortTime = -1;
long elapsedReduceTime = -1; long elapsedReduceTime = -1;
if(type == TaskType.REDUCE) { if(type == TaskType.REDUCE) {
shuffleFinishTime = ta.getShuffleFinishTime(); shuffleFinishTime = attempt.getShuffleFinishTime();
sortFinishTime = ta.getSortFinishTime(); sortFinishTime = attempt.getSortFinishTime();
elapsedShuffleTime = elapsedShuffleTime =
Times.elapsed(attemptStartTime, shuffleFinishTime, false); Times.elapsed(attemptStartTime, shuffleFinishTime, false);
elapsedSortTime = elapsedSortTime =
@ -140,11 +143,13 @@ protected void render(Block html) {
} }
long attemptElapsed = long attemptElapsed =
Times.elapsed(attemptStartTime, attemptFinishTime, false); Times.elapsed(attemptStartTime, attemptFinishTime, false);
int sortId = ta.getID().getId() + (ta.getID().getTaskId().getId() * 10000); int sortId = attempt.getID().getId()
+ (attempt.getID().getTaskId().getId() * 10000);
attemptsTableData.append("[\"") attemptsTableData.append("[\"")
.append(sortId + " ").append(taid).append("\",\"") .append(sortId + " ").append(taid).append("\",\"")
.append(ta.getState().toString()).append("\",\"") .append(ta.getState()).append("\",\"")
.append(ta.getStatus()).append("\",\"")
.append("<a class='nodelink' href='" + MRWebAppUtil.getYARNWebappScheme() + nodeHttpAddr + "'>") .append("<a class='nodelink' href='" + MRWebAppUtil.getYARNWebappScheme() + nodeHttpAddr + "'>")
.append(nodeRackName + "/" + nodeHttpAddr + "</a>\",\"") .append(nodeRackName + "/" + nodeHttpAddr + "</a>\",\"")
@ -167,8 +172,9 @@ protected void render(Block html) {
.append(elapsedReduceTime).append("\",\""); .append(elapsedReduceTime).append("\",\"");
} }
attemptsTableData.append(attemptElapsed).append("\",\"") attemptsTableData.append(attemptElapsed).append("\",\"")
.append(StringEscapeUtils.escapeJavaScript(StringEscapeUtils.escapeHtml( .append(StringEscapeUtils.escapeJavaScript(
Joiner.on('\n').join(ta.getDiagnostics())))).append("\"],\n"); StringEscapeUtils.escapeHtml(ta.getNote())))
.append("\"],\n");
} }
//Remove the last comma and close off the array of arrays //Remove the last comma and close off the array of arrays
if(attemptsTableData.charAt(attemptsTableData.length() - 2) == ',') { if(attemptsTableData.charAt(attemptsTableData.length() - 2) == ',') {
@ -184,6 +190,8 @@ protected void render(Block html) {
$name("attempt_name").$value("Attempt")._()._(). $name("attempt_name").$value("Attempt")._()._().
th().input("search_init").$type(InputType.text). th().input("search_init").$type(InputType.text).
$name("attempt_state").$value("State")._()._(). $name("attempt_state").$value("State")._()._().
th().input("search_init").$type(InputType.text).
$name("attempt_status").$value("Status")._()._().
th().input("search_init").$type(InputType.text). th().input("search_init").$type(InputType.text).
$name("attempt_node").$value("Node")._()._(). $name("attempt_node").$value("Node")._()._().
th().input("search_init").$type(InputType.text). th().input("search_init").$type(InputType.text).
@ -283,19 +291,19 @@ private String attemptsTableInit() {
.append("\n,aoColumnDefs:[\n") .append("\n,aoColumnDefs:[\n")
//logs column should not filterable (it includes container ID which may pollute searches) //logs column should not filterable (it includes container ID which may pollute searches)
.append("\n{'aTargets': [ 3 ]") .append("\n{'aTargets': [ 4 ]")
.append(", 'bSearchable': false }") .append(", 'bSearchable': false }")
.append("\n, {'sType':'numeric', 'aTargets': [ 0 ]") .append("\n, {'sType':'numeric', 'aTargets': [ 0 ]")
.append(", 'mRender': parseHadoopAttemptID }") .append(", 'mRender': parseHadoopAttemptID }")
.append("\n, {'sType':'numeric', 'aTargets': [ 4, 5") .append("\n, {'sType':'numeric', 'aTargets': [ 5, 6")
//Column numbers are different for maps and reduces //Column numbers are different for maps and reduces
.append(type == TaskType.REDUCE ? ", 6, 7" : "") .append(type == TaskType.REDUCE ? ", 7, 8" : "")
.append(" ], 'mRender': renderHadoopDate }") .append(" ], 'mRender': renderHadoopDate }")
.append("\n, {'sType':'numeric', 'aTargets': [") .append("\n, {'sType':'numeric', 'aTargets': [")
.append(type == TaskType.REDUCE ? "8, 9, 10, 11" : "6") .append(type == TaskType.REDUCE ? "9, 10, 11, 12" : "7")
.append(" ], 'mRender': renderHadoopElapsedTime }]") .append(" ], 'mRender': renderHadoopElapsedTime }]")
// Sort by id upon page load // Sort by id upon page load

View File

@ -33,6 +33,7 @@
import org.apache.hadoop.mapreduce.v2.api.records.JobReport; import org.apache.hadoop.mapreduce.v2.api.records.JobReport;
import org.apache.hadoop.mapreduce.v2.api.records.JobState; import org.apache.hadoop.mapreduce.v2.api.records.JobState;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId; import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptReport;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState; import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState;
import org.apache.hadoop.mapreduce.v2.api.records.TaskId; import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskReport; import org.apache.hadoop.mapreduce.v2.api.records.TaskReport;
@ -138,11 +139,31 @@ public void testAttemptsBlock() {
when(attempt.getAssignedContainerMgrAddress()).thenReturn( when(attempt.getAssignedContainerMgrAddress()).thenReturn(
"assignedContainerMgrAddress"); "assignedContainerMgrAddress");
when(attempt.getNodeRackName()).thenReturn("nodeRackName"); when(attempt.getNodeRackName()).thenReturn("nodeRackName");
when(attempt.getLaunchTime()).thenReturn(100002L);
when(attempt.getFinishTime()).thenReturn(100012L); final long taStartTime = 100002L;
when(attempt.getShuffleFinishTime()).thenReturn(100010L); final long taFinishTime = 100012L;
when(attempt.getSortFinishTime()).thenReturn(100011L); final long taShuffleFinishTime = 100010L;
when(attempt.getState()).thenReturn(TaskAttemptState.SUCCEEDED); final long taSortFinishTime = 100011L;
final TaskAttemptState taState = TaskAttemptState.SUCCEEDED;
when(attempt.getLaunchTime()).thenReturn(taStartTime);
when(attempt.getFinishTime()).thenReturn(taFinishTime);
when(attempt.getShuffleFinishTime()).thenReturn(taShuffleFinishTime);
when(attempt.getSortFinishTime()).thenReturn(taSortFinishTime);
when(attempt.getState()).thenReturn(taState);
TaskAttemptReport taReport = mock(TaskAttemptReport.class);
when(taReport.getStartTime()).thenReturn(taStartTime);
when(taReport.getFinishTime()).thenReturn(taFinishTime);
when(taReport.getShuffleFinishTime()).thenReturn(taShuffleFinishTime);
when(taReport.getSortFinishTime()).thenReturn(taSortFinishTime);
when(taReport.getContainerId()).thenReturn(containerId);
when(taReport.getProgress()).thenReturn(1.0f);
when(taReport.getStateString()).thenReturn("Processed 128/128 records");
when(taReport.getTaskAttemptState()).thenReturn(taState);
when(taReport.getDiagnosticInfo()).thenReturn("");
when(attempt.getReport()).thenReturn(taReport);
attempts.put(taId, attempt); attempts.put(taId, attempt);
when(task.getAttempts()).thenReturn(attempts); when(task.getAttempts()).thenReturn(attempts);

View File

@ -444,9 +444,9 @@ public void verifyHsTaskAttemptXML(Element element, TaskAttempt att,
public void verifyHsTaskAttempt(JSONObject info, TaskAttempt att, public void verifyHsTaskAttempt(JSONObject info, TaskAttempt att,
TaskType ttype) throws JSONException { TaskType ttype) throws JSONException {
if (ttype == TaskType.REDUCE) { if (ttype == TaskType.REDUCE) {
assertEquals("incorrect number of elements", 16, info.length()); assertEquals("incorrect number of elements", 17, info.length());
} else { } else {
assertEquals("incorrect number of elements", 11, info.length()); assertEquals("incorrect number of elements", 12, info.length());
} }
verifyTaskAttemptGeneric(att, ttype, info.getString("id"), verifyTaskAttemptGeneric(att, ttype, info.getString("id"),
@ -551,11 +551,11 @@ public void verifyReduceTaskAttemptGeneric(TaskAttempt ta,
assertEquals("mergeFinishTime wrong", ta.getSortFinishTime(), assertEquals("mergeFinishTime wrong", ta.getSortFinishTime(),
mergeFinishTime); mergeFinishTime);
assertEquals("elapsedShuffleTime wrong", assertEquals("elapsedShuffleTime wrong",
ta.getLaunchTime() - ta.getShuffleFinishTime(), elapsedShuffleTime); ta.getShuffleFinishTime() - ta.getLaunchTime(), elapsedShuffleTime);
assertEquals("elapsedMergeTime wrong", assertEquals("elapsedMergeTime wrong",
ta.getShuffleFinishTime() - ta.getSortFinishTime(), elapsedMergeTime); ta.getSortFinishTime() - ta.getShuffleFinishTime(), elapsedMergeTime);
assertEquals("elapsedReduceTime wrong", assertEquals("elapsedReduceTime wrong",
ta.getSortFinishTime() - ta.getFinishTime(), elapsedReduceTime); ta.getFinishTime() - ta.getSortFinishTime(), elapsedReduceTime);
} }
@Test @Test

View File

@ -538,7 +538,7 @@ public void testTaskIdXML() throws JSONException, Exception {
public void verifyHsSingleTask(JSONObject info, Task task) public void verifyHsSingleTask(JSONObject info, Task task)
throws JSONException { throws JSONException {
assertEquals("incorrect number of elements", 8, info.length()); assertEquals("incorrect number of elements", 9, info.length());
verifyTaskGeneric(task, info.getString("id"), info.getString("state"), verifyTaskGeneric(task, info.getString("id"), info.getString("state"),
info.getString("type"), info.getString("successfulAttempt"), info.getString("type"), info.getString("successfulAttempt"),

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.mapreduce.v2; package org.apache.hadoop.mapreduce.v2;
import java.util.Collection;
import java.util.Iterator; import java.util.Iterator;
import java.util.Map; import java.util.Map;
import java.util.Random; import java.util.Random;
@ -106,17 +107,21 @@ public void testSpeculateSuccessfulWithoutUpdateEvents() throws Exception {
int maxTimeWait = 10; int maxTimeWait = 10;
boolean successfullySpeculated = false; boolean successfullySpeculated = false;
TaskAttempt[] ta = null;
while (maxTimeWait > 0 && !successfullySpeculated) { while (maxTimeWait > 0 && !successfullySpeculated) {
if (taskToBeSpeculated.getAttempts().size() != 2) { if (taskToBeSpeculated.getAttempts().size() != 2) {
Thread.sleep(1000); Thread.sleep(1000);
clock.setTime(System.currentTimeMillis() + 20000); clock.setTime(System.currentTimeMillis() + 20000);
} else { } else {
successfullySpeculated = true; successfullySpeculated = true;
// finish 1st TA, 2nd will be killed
ta = makeFirstAttemptWin(appEventHandler, taskToBeSpeculated);
} }
maxTimeWait--; maxTimeWait--;
} }
Assert Assert
.assertTrue("Couldn't speculate successfully", successfullySpeculated); .assertTrue("Couldn't speculate successfully", successfullySpeculated);
verifySpeculationMessage(app, ta);
} }
@Test(timeout = 60000) @Test(timeout = 60000)
@ -197,16 +202,47 @@ public void testSepculateSuccessfulWithUpdateEvents() throws Exception {
int maxTimeWait = 5; int maxTimeWait = 5;
boolean successfullySpeculated = false; boolean successfullySpeculated = false;
TaskAttempt[] ta = null;
while (maxTimeWait > 0 && !successfullySpeculated) { while (maxTimeWait > 0 && !successfullySpeculated) {
if (speculatedTask.getAttempts().size() != 2) { if (speculatedTask.getAttempts().size() != 2) {
Thread.sleep(1000); Thread.sleep(1000);
} else { } else {
successfullySpeculated = true; successfullySpeculated = true;
ta = makeFirstAttemptWin(appEventHandler, speculatedTask);
} }
maxTimeWait--; maxTimeWait--;
} }
Assert Assert
.assertTrue("Couldn't speculate successfully", successfullySpeculated); .assertTrue("Couldn't speculate successfully", successfullySpeculated);
verifySpeculationMessage(app, ta);
}
private static TaskAttempt[] makeFirstAttemptWin(
EventHandler appEventHandler, Task speculatedTask) {
// finish 1st TA, 2nd will be killed
Collection<TaskAttempt> attempts = speculatedTask.getAttempts().values();
TaskAttempt[] ta = new TaskAttempt[attempts.size()];
attempts.toArray(ta);
appEventHandler.handle(
new TaskAttemptEvent(ta[0].getID(), TaskAttemptEventType.TA_DONE));
appEventHandler.handle(new TaskAttemptEvent(ta[0].getID(),
TaskAttemptEventType.TA_CONTAINER_CLEANED));
return ta;
}
private static void verifySpeculationMessage(MRApp app, TaskAttempt[] ta)
throws Exception {
app.waitForState(ta[0], TaskAttemptState.SUCCEEDED);
app.waitForState(ta[1], TaskAttemptState.KILLED);
boolean foundSpecMsg = false;
for (String msg : ta[1].getDiagnostics()) {
if (msg.contains("Speculation")) {
foundSpecMsg = true;
break;
}
}
Assert.assertTrue("No speculation diagnostics!", foundSpecMsg);
} }
private TaskAttemptStatus createTaskAttemptStatus(TaskAttemptId id, private TaskAttemptStatus createTaskAttemptStatus(TaskAttemptId id,

View File

@ -1756,7 +1756,22 @@ Release 2.0.2-alpha - 2012-09-07
YARN-138. Ensure default values for minimum/maximum container sizes is YARN-138. Ensure default values for minimum/maximum container sizes is
sane. (harsh & sseth via acmurthy) sane. (harsh & sseth via acmurthy)
Release 0.23.10 - UNRELEASED Release 0.23.11 - UNRELEASED
INCOMPATIBLE CHANGES
NEW FEATURES
IMPROVEMENTS
OPTIMIZATIONS
BUG FIXES
YARN-1180. Update capacity scheduler docs to include types on the configs
(Chen He via jeagles)
Release 0.23.10 - 2013-12-09
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -86,9 +86,7 @@ public static void setup() throws IOException {
setRpcAddressForRM(RM1_NODE_ID, RM1_PORT_BASE); setRpcAddressForRM(RM1_NODE_ID, RM1_PORT_BASE);
setRpcAddressForRM(RM2_NODE_ID, RM2_PORT_BASE); setRpcAddressForRM(RM2_NODE_ID, RM2_PORT_BASE);
conf.setInt(YarnConfiguration.CLIENT_FAILOVER_MAX_ATTEMPTS, 100);
conf.setLong(YarnConfiguration.CLIENT_FAILOVER_SLEEPTIME_BASE_MS, 100L); conf.setLong(YarnConfiguration.CLIENT_FAILOVER_SLEEPTIME_BASE_MS, 100L);
conf.setLong(YarnConfiguration.CLIENT_FAILOVER_SLEEPTIME_MAX_MS, 1000L);
conf.setBoolean(YarnConfiguration.YARN_MINICLUSTER_FIXED_PORTS, true); conf.setBoolean(YarnConfiguration.YARN_MINICLUSTER_FIXED_PORTS, true);
conf.setBoolean(YarnConfiguration.YARN_MINICLUSTER_USE_RPC, true); conf.setBoolean(YarnConfiguration.YARN_MINICLUSTER_USE_RPC, true);

View File

@ -225,19 +225,17 @@ public static RetryPolicy createRetryPolicy(Configuration conf) {
int maxFailoverAttempts = conf.getInt( int maxFailoverAttempts = conf.getInt(
YarnConfiguration.CLIENT_FAILOVER_MAX_ATTEMPTS, -1); YarnConfiguration.CLIENT_FAILOVER_MAX_ATTEMPTS, -1);
RetryPolicy basePolicy = RetryPolicies.TRY_ONCE_THEN_FAIL;
if (maxFailoverAttempts == -1) { if (maxFailoverAttempts == -1) {
if (waitForEver) { if (waitForEver) {
basePolicy = RetryPolicies.FAILOVER_FOREVER; maxFailoverAttempts = Integer.MAX_VALUE;
} else { } else {
basePolicy = new FailoverUptoMaximumTimePolicy( maxFailoverAttempts = (int) (rmConnectWaitMS / failoverSleepBaseMs);
System.currentTimeMillis() + rmConnectWaitMS);
} }
maxFailoverAttempts = 0;
} }
return RetryPolicies.failoverOnNetworkException(basePolicy, return RetryPolicies.failoverOnNetworkException(
maxFailoverAttempts, failoverSleepBaseMs, failoverSleepMaxMs); RetryPolicies.TRY_ONCE_THEN_FAIL, maxFailoverAttempts,
failoverSleepBaseMs, failoverSleepMaxMs);
} }
if (waitForEver) { if (waitForEver) {

View File

@ -220,7 +220,7 @@ Hadoop MapReduce Next Generation - Capacity Scheduler
| | application, no single user can use more than 33% of the queue resources. | | | application, no single user can use more than 33% of the queue resources. |
| | With 4 or more users, no user can use more than 25% of the queues | | | With 4 or more users, no user can use more than 25% of the queues |
| | resources. A value of 100 implies no user limits are imposed. The default | | | resources. A value of 100 implies no user limits are imposed. The default |
| | is 100.| | | is 100. Value is specified as a integer.|
*--------------------------------------+--------------------------------------+ *--------------------------------------+--------------------------------------+
| <<<yarn.scheduler.capacity.<queue-path>.user-limit-factor>>> | | | <<<yarn.scheduler.capacity.<queue-path>.user-limit-factor>>> | |
| | The multiple of the queue capacity which can be configured to allow a | | | The multiple of the queue capacity which can be configured to allow a |
@ -249,6 +249,7 @@ Hadoop MapReduce Next Generation - Capacity Scheduler
| | be rejected. Default is 10000. This can be set for all queues with | | | be rejected. Default is 10000. This can be set for all queues with |
| | <<<yarn.scheduler.capacity.maximum-applications>>> and can also be overridden on a | | | <<<yarn.scheduler.capacity.maximum-applications>>> and can also be overridden on a |
| | per queue basis by setting <<<yarn.scheduler.capacity.<queue-path>.maximum-applications>>>. | | | per queue basis by setting <<<yarn.scheduler.capacity.<queue-path>.maximum-applications>>>. |
| | Integer value expected.|
*--------------------------------------+--------------------------------------+ *--------------------------------------+--------------------------------------+
| <<<yarn.scheduler.capacity.maximum-am-resource-percent>>> / | | <<<yarn.scheduler.capacity.maximum-am-resource-percent>>> / |
| <<<yarn.scheduler.capacity.<queue-path>.maximum-am-resource-percent>>> | | | <<<yarn.scheduler.capacity.<queue-path>.maximum-am-resource-percent>>> | |
@ -276,7 +277,7 @@ Hadoop MapReduce Next Generation - Capacity Scheduler
| | Thus, if the <root> queue is <<<STOPPED>>> no applications can be | | | Thus, if the <root> queue is <<<STOPPED>>> no applications can be |
| | submitted to the entire cluster. | | | submitted to the entire cluster. |
| | Existing applications continue to completion, thus the queue can be | | Existing applications continue to completion, thus the queue can be
| | <drained> gracefully. | | | <drained> gracefully. Value is specified as Enumeration. |
*--------------------------------------+--------------------------------------+ *--------------------------------------+--------------------------------------+
| <<<yarn.scheduler.capacity.root.<queue-path>.acl_submit_applications>>> | | | <<<yarn.scheduler.capacity.root.<queue-path>.acl_submit_applications>>> | |
| | The <ACL> which controls who can <submit> applications to the given queue. | | | The <ACL> which controls who can <submit> applications to the given queue. |