Merging r1540548 through r1540909 from trunk to branch HDFS-2832

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-2832@1540910 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Arpit Agarwal 2013-11-12 01:16:10 +00:00
commit ec5eebc450
27 changed files with 834 additions and 533 deletions

View File

@ -364,6 +364,8 @@ Trunk (Unreleased)
HDFS-5482. DistributedFileSystem#listPathBasedCacheDirectives must support HDFS-5482. DistributedFileSystem#listPathBasedCacheDirectives must support
relative paths. (Colin Patrick McCabe via cnauroth) relative paths. (Colin Patrick McCabe via cnauroth)
HDFS-5320. Add datanode caching metrics. (wang)
Release 2.3.0 - UNRELEASED Release 2.3.0 - UNRELEASED
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES
@ -463,6 +465,9 @@ Release 2.3.0 - UNRELEASED
HDFS-5371. Let client retry the same NN when HDFS-5371. Let client retry the same NN when
"dfs.client.test.drop.namenode.response.number" is enabled. (jing9) "dfs.client.test.drop.namenode.response.number" is enabled. (jing9)
HDFS-5467. Remove tab characters in hdfs-default.xml.
(Shinichi Yamashita via Andrew Wang)
OPTIMIZATIONS OPTIMIZATIONS
HDFS-5239. Allow FSNamesystem lock fairness to be configurable (daryn) HDFS-5239. Allow FSNamesystem lock fairness to be configurable (daryn)
@ -519,6 +524,8 @@ Release 2.3.0 - UNRELEASED
HDFS-5325. Remove WebHdfsFileSystem#ConnRunner. (Haohui Mai via jing9) HDFS-5325. Remove WebHdfsFileSystem#ConnRunner. (Haohui Mai via jing9)
HDFS-5488. Clean up TestHftpURLTimeout. (Haohui Mai via jing9)
Release 2.2.1 - UNRELEASED Release 2.2.1 - UNRELEASED
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -155,7 +155,7 @@ public class DatanodeProtocolClientSideTranslatorPB implements
@Override @Override
public HeartbeatResponse sendHeartbeat(DatanodeRegistration registration, public HeartbeatResponse sendHeartbeat(DatanodeRegistration registration,
StorageReport[] reports, long dnCacheCapacity, long dnCacheUsed, StorageReport[] reports, long cacheCapacity, long cacheUsed,
int xmitsInProgress, int xceiverCount, int failedVolumes) int xmitsInProgress, int xceiverCount, int failedVolumes)
throws IOException { throws IOException {
HeartbeatRequestProto.Builder builder = HeartbeatRequestProto.newBuilder() HeartbeatRequestProto.Builder builder = HeartbeatRequestProto.newBuilder()
@ -165,11 +165,11 @@ public class DatanodeProtocolClientSideTranslatorPB implements
for (StorageReport r : reports) { for (StorageReport r : reports) {
builder.addReports(PBHelper.convert(r)); builder.addReports(PBHelper.convert(r));
} }
if (dnCacheCapacity != 0) { if (cacheCapacity != 0) {
builder.setDnCacheCapacity(dnCacheCapacity); builder.setCacheCapacity(cacheCapacity);
} }
if (dnCacheUsed != 0) { if (cacheUsed != 0) {
builder.setDnCacheUsed(dnCacheUsed); builder.setCacheUsed(cacheUsed);
} }
HeartbeatResponseProto resp; HeartbeatResponseProto resp;
try { try {

View File

@ -104,7 +104,7 @@ public class DatanodeProtocolServerSideTranslatorPB implements
final StorageReport[] report = PBHelper.convertStorageReports( final StorageReport[] report = PBHelper.convertStorageReports(
request.getReportsList()); request.getReportsList());
response = impl.sendHeartbeat(PBHelper.convert(request.getRegistration()), response = impl.sendHeartbeat(PBHelper.convert(request.getRegistration()),
report, request.getDnCacheCapacity(), request.getDnCacheUsed(), report, request.getCacheCapacity(), request.getCacheUsed(),
request.getXmitsInProgress(), request.getXmitsInProgress(),
request.getXceiverCount(), request.getFailedVolumes()); request.getXceiverCount(), request.getFailedVolumes());
} catch (IOException e) { } catch (IOException e) {

View File

@ -476,7 +476,7 @@ class BPServiceActor implements Runnable {
DatanodeCommand cacheReport() throws IOException { DatanodeCommand cacheReport() throws IOException {
// If caching is disabled, do not send a cache report // If caching is disabled, do not send a cache report
if (dn.getFSDataset().getDnCacheCapacity() == 0) { if (dn.getFSDataset().getCacheCapacity() == 0) {
return null; return null;
} }
// send cache report if timer has expired. // send cache report if timer has expired.
@ -514,8 +514,8 @@ class BPServiceActor implements Runnable {
return bpNamenode.sendHeartbeat(bpRegistration, return bpNamenode.sendHeartbeat(bpRegistration,
reports, reports,
dn.getFSDataset().getDnCacheCapacity(), dn.getFSDataset().getCacheCapacity(),
dn.getFSDataset().getDnCacheUsed(), dn.getFSDataset().getCacheUsed(),
dn.getXmitsInProgress(), dn.getXmitsInProgress(),
dn.getXceiverCount(), dn.getXceiverCount(),
dn.getFSDataset().getNumFailedVolumes()); dn.getFSDataset().getNumFailedVolumes());

View File

@ -226,6 +226,15 @@ public class FsDatasetCache {
*/ */
private final long maxBytes; private final long maxBytes;
/**
* Number of cache commands that could not be completed successfully
*/
AtomicLong numBlocksFailedToCache = new AtomicLong(0);
/**
* Number of uncache commands that could not be completed successfully
*/
AtomicLong numBlocksFailedToUncache = new AtomicLong(0);
public FsDatasetCache(FsDatasetImpl dataset) { public FsDatasetCache(FsDatasetImpl dataset) {
this.dataset = dataset; this.dataset = dataset;
this.maxBytes = dataset.datanode.getDnConf().getMaxLockedMemory(); this.maxBytes = dataset.datanode.getDnConf().getMaxLockedMemory();
@ -274,6 +283,7 @@ public class FsDatasetCache {
" already exists in the FsDatasetCache with state " + " already exists in the FsDatasetCache with state " +
prevValue.state); prevValue.state);
} }
numBlocksFailedToCache.incrementAndGet();
return; return;
} }
mappableBlockMap.put(key, new Value(null, State.CACHING)); mappableBlockMap.put(key, new Value(null, State.CACHING));
@ -291,6 +301,7 @@ public class FsDatasetCache {
"does not need to be uncached, because it is not currently " + "does not need to be uncached, because it is not currently " +
"in the mappableBlockMap."); "in the mappableBlockMap.");
} }
numBlocksFailedToUncache.incrementAndGet();
return; return;
} }
switch (prevValue.state) { switch (prevValue.state) {
@ -317,6 +328,7 @@ public class FsDatasetCache {
"does not need to be uncached, because it is " + "does not need to be uncached, because it is " +
"in state " + prevValue.state + "."); "in state " + prevValue.state + ".");
} }
numBlocksFailedToUncache.incrementAndGet();
break; break;
} }
} }
@ -349,7 +361,8 @@ public class FsDatasetCache {
LOG.warn("Failed to cache block id " + key.id + ", pool " + key.bpid + LOG.warn("Failed to cache block id " + key.id + ", pool " + key.bpid +
": could not reserve " + length + " more bytes in the " + ": could not reserve " + length + " more bytes in the " +
"cache: " + DFSConfigKeys.DFS_DATANODE_MAX_LOCKED_MEMORY_KEY + "cache: " + DFSConfigKeys.DFS_DATANODE_MAX_LOCKED_MEMORY_KEY +
" of " + maxBytes + " exceeded."); " of " + maxBytes + " exceeded.");
numBlocksFailedToCache.incrementAndGet();
return; return;
} }
try { try {
@ -413,6 +426,7 @@ public class FsDatasetCache {
if (mappableBlock != null) { if (mappableBlock != null) {
mappableBlock.close(); mappableBlock.close();
} }
numBlocksFailedToCache.incrementAndGet();
} }
} }
} }
@ -449,7 +463,7 @@ public class FsDatasetCache {
} }
} }
// Stats related methods for FsDatasetMBean // Stats related methods for FSDatasetMBean
/** /**
* Get the approximate amount of cache space used. * Get the approximate amount of cache space used.
@ -464,4 +478,13 @@ public class FsDatasetCache {
public long getDnCacheCapacity() { public long getDnCacheCapacity() {
return maxBytes; return maxBytes;
} }
public long getNumBlocksFailedToCache() {
return numBlocksFailedToCache.get();
}
public long getNumBlocksFailedToUncache() {
return numBlocksFailedToUncache.get();
}
} }

View File

@ -339,22 +339,26 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
return volumes.numberOfFailedVolumes(); return volumes.numberOfFailedVolumes();
} }
/**
* Returns the total cache used by the datanode (in bytes).
*/
@Override // FSDatasetMBean @Override // FSDatasetMBean
public long getDnCacheUsed() { public long getCacheUsed() {
return cacheManager.getDnCacheUsed(); return cacheManager.getDnCacheUsed();
} }
/**
* Returns the total cache capacity of the datanode (in bytes).
*/
@Override // FSDatasetMBean @Override // FSDatasetMBean
public long getDnCacheCapacity() { public long getCacheCapacity() {
return cacheManager.getDnCacheCapacity(); return cacheManager.getDnCacheCapacity();
} }
@Override // FSDatasetMBean
public long getNumBlocksFailedToCache() {
return cacheManager.getNumBlocksFailedToCache();
}
@Override // FSDatasetMBean
public long getNumBlocksFailedToUncache() {
return cacheManager.getNumBlocksFailedToUncache();
}
/** /**
* Find the block's on-disk length * Find the block's on-disk length
*/ */
@ -1269,28 +1273,36 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
synchronized (this) { synchronized (this) {
ReplicaInfo info = volumeMap.get(bpid, blockId); ReplicaInfo info = volumeMap.get(bpid, blockId);
if (info == null) { boolean success = false;
LOG.warn("Failed to cache block with id " + blockId + ", pool " +
bpid + ": ReplicaInfo not found.");
return;
}
if (info.getState() != ReplicaState.FINALIZED) {
LOG.warn("Failed to cache block with id " + blockId + ", pool " +
bpid + ": replica is not finalized; it is in state " +
info.getState());
return;
}
try { try {
volume = (FsVolumeImpl)info.getVolume(); if (info == null) {
if (volume == null) {
LOG.warn("Failed to cache block with id " + blockId + ", pool " + LOG.warn("Failed to cache block with id " + blockId + ", pool " +
bpid + ": volume not found."); bpid + ": ReplicaInfo not found.");
return; return;
} }
} catch (ClassCastException e) { if (info.getState() != ReplicaState.FINALIZED) {
LOG.warn("Failed to cache block with id " + blockId + LOG.warn("Failed to cache block with id " + blockId + ", pool " +
": volume was not an instance of FsVolumeImpl."); bpid + ": replica is not finalized; it is in state " +
return; info.getState());
return;
}
try {
volume = (FsVolumeImpl)info.getVolume();
if (volume == null) {
LOG.warn("Failed to cache block with id " + blockId + ", pool " +
bpid + ": volume not found.");
return;
}
} catch (ClassCastException e) {
LOG.warn("Failed to cache block with id " + blockId +
": volume was not an instance of FsVolumeImpl.");
return;
}
success = true;
} finally {
if (!success) {
cacheManager.numBlocksFailedToCache.incrementAndGet();
}
} }
blockFileName = info.getBlockFile().getAbsolutePath(); blockFileName = info.getBlockFile().getAbsolutePath();
length = info.getVisibleLength(); length = info.getVisibleLength();

View File

@ -79,12 +79,22 @@ public interface FSDatasetMBean {
public int getNumFailedVolumes(); public int getNumFailedVolumes();
/** /**
* Returns the total cache used by the datanode (in bytes). * Returns the amount of cache used by the datanode (in bytes).
*/ */
public long getDnCacheUsed(); public long getCacheUsed();
/** /**
* Returns the total cache capacity of the datanode (in bytes). * Returns the total cache capacity of the datanode (in bytes).
*/ */
public long getDnCacheCapacity(); public long getCacheCapacity();
/**
* Returns the number of blocks that the datanode was unable to cache
*/
public long getNumBlocksFailedToCache();
/**
* Returns the number of blocks that the datanode was unable to uncache
*/
public long getNumBlocksFailedToUncache();
} }

View File

@ -191,8 +191,8 @@ message HeartbeatRequestProto {
optional uint32 xmitsInProgress = 3 [ default = 0 ]; optional uint32 xmitsInProgress = 3 [ default = 0 ];
optional uint32 xceiverCount = 4 [ default = 0 ]; optional uint32 xceiverCount = 4 [ default = 0 ];
optional uint32 failedVolumes = 5 [ default = 0 ]; optional uint32 failedVolumes = 5 [ default = 0 ];
optional uint64 dnCacheCapacity = 6 [ default = 0 ]; optional uint64 cacheCapacity = 6 [ default = 0 ];
optional uint64 dnCacheUsed = 7 [default = 0 ]; optional uint64 cacheUsed = 7 [default = 0 ];
} }
message StorageReportProto { message StorageReportProto {

View File

@ -1393,43 +1393,43 @@
</property> </property>
<property> <property>
<name>dfs.namenode.enable.retrycache</name> <name>dfs.namenode.enable.retrycache</name>
<value>true</value> <value>true</value>
<description> <description>
This enables the retry cache on the namenode. Namenode tracks for This enables the retry cache on the namenode. Namenode tracks for
non-idempotent requests the corresponding response. If a client retries the non-idempotent requests the corresponding response. If a client retries the
request, the response from the retry cache is sent. Such operations request, the response from the retry cache is sent. Such operations
are tagged with annotation @AtMostOnce in namenode protocols. It is are tagged with annotation @AtMostOnce in namenode protocols. It is
recommended that this flag be set to true. Setting it to false, will result recommended that this flag be set to true. Setting it to false, will result
in clients getting failure responses to retried request. This flag must in clients getting failure responses to retried request. This flag must
be enabled in HA setup for transparent fail-overs. be enabled in HA setup for transparent fail-overs.
The entries in the cache have expiration time configurable The entries in the cache have expiration time configurable
using dfs.namenode.retrycache.expirytime.millis. using dfs.namenode.retrycache.expirytime.millis.
</description> </description>
</property> </property>
<property> <property>
<name>dfs.namenode.retrycache.expirytime.millis</name> <name>dfs.namenode.retrycache.expirytime.millis</name>
<value>600000</value> <value>600000</value>
<description> <description>
The time for which retry cache entries are retained. The time for which retry cache entries are retained.
</description> </description>
</property> </property>
<property> <property>
<name>dfs.namenode.retrycache.heap.percent</name> <name>dfs.namenode.retrycache.heap.percent</name>
<value>0.03f</value> <value>0.03f</value>
<description> <description>
This parameter configures the heap size allocated for retry cache This parameter configures the heap size allocated for retry cache
(excluding the response cached). This corresponds to approximately (excluding the response cached). This corresponds to approximately
4096 entries for every 64MB of namenode process java heap size. 4096 entries for every 64MB of namenode process java heap size.
Assuming retry cache entry expiration time (configured using Assuming retry cache entry expiration time (configured using
dfs.namenode.retrycache.expirytime.millis) of 10 minutes, this dfs.namenode.retrycache.expirytime.millis) of 10 minutes, this
enables retry cache to support 7 operations per second sustained enables retry cache to support 7 operations per second sustained
for 10 minutes. As the heap size is increased, the operation rate for 10 minutes. As the heap size is increased, the operation rate
linearly increases. linearly increases.
</description> </description>
</property> </property>
<property> <property>

View File

@ -524,12 +524,22 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
} }
@Override // FSDatasetMBean @Override // FSDatasetMBean
public long getDnCacheUsed() { public long getCacheUsed() {
return 0l; return 0l;
} }
@Override // FSDatasetMBean @Override // FSDatasetMBean
public long getDnCacheCapacity() { public long getCacheCapacity() {
return 0l;
}
@Override
public long getNumBlocksFailedToCache() {
return 0l;
}
@Override
public long getNumBlocksFailedToUncache() {
return 0l; return 0l;
} }

View File

@ -17,11 +17,13 @@
*/ */
package org.apache.hadoop.hdfs.server.datanode; package org.apache.hadoop.hdfs.server.datanode;
import static junit.framework.Assert.assertTrue;
import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assume.assumeTrue; import static org.junit.Assume.assumeTrue;
import static org.mockito.Matchers.any; import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyLong;
import static org.mockito.Matchers.anyInt; import static org.mockito.Matchers.anyInt;
import static org.mockito.Matchers.anyLong;
import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.doReturn;
import java.io.FileInputStream; import java.io.FileInputStream;
@ -57,14 +59,15 @@ import org.apache.hadoop.hdfs.server.protocol.HeartbeatResponse;
import org.apache.hadoop.hdfs.server.protocol.NNHAStatusHeartbeat; import org.apache.hadoop.hdfs.server.protocol.NNHAStatusHeartbeat;
import org.apache.hadoop.hdfs.server.protocol.StorageReport; import org.apache.hadoop.hdfs.server.protocol.StorageReport;
import org.apache.hadoop.io.nativeio.NativeIO; import org.apache.hadoop.io.nativeio.NativeIO;
import org.apache.hadoop.metrics2.MetricsRecordBuilder;
import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.test.MetricsAsserts;
import org.apache.log4j.Logger; import org.apache.log4j.Logger;
import org.junit.After; import org.junit.After;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import com.google.common.base.Preconditions;
import com.google.common.base.Supplier; import com.google.common.base.Supplier;
public class TestFsDatasetCache { public class TestFsDatasetCache {
@ -94,6 +97,7 @@ public class TestFsDatasetCache {
conf.setLong(DFSConfigKeys.DFS_DATANODE_MAX_LOCKED_MEMORY_KEY, conf.setLong(DFSConfigKeys.DFS_DATANODE_MAX_LOCKED_MEMORY_KEY,
CACHE_CAPACITY); CACHE_CAPACITY);
conf.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1); conf.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1);
conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_CACHING_ENABLED_KEY, true);
cluster = new MiniDFSCluster.Builder(conf) cluster = new MiniDFSCluster.Builder(conf)
.numDataNodes(1).build(); .numDataNodes(1).build();
@ -187,7 +191,7 @@ public class TestFsDatasetCache {
@Override @Override
public Boolean get() { public Boolean get() {
long curDnCacheUsed = fsd.getDnCacheUsed(); long curDnCacheUsed = fsd.getCacheUsed();
if (curDnCacheUsed != expected) { if (curDnCacheUsed != expected) {
if (tries++ > 10) { if (tries++ > 10) {
LOG.info("verifyExpectedCacheUsage: expected " + LOG.info("verifyExpectedCacheUsage: expected " +
@ -222,22 +226,37 @@ public class TestFsDatasetCache {
final long[] blockSizes = getBlockSizes(locs); final long[] blockSizes = getBlockSizes(locs);
// Check initial state // Check initial state
final long cacheCapacity = fsd.getDnCacheCapacity(); final long cacheCapacity = fsd.getCacheCapacity();
long cacheUsed = fsd.getDnCacheUsed(); long cacheUsed = fsd.getCacheUsed();
long current = 0; long current = 0;
assertEquals("Unexpected cache capacity", CACHE_CAPACITY, cacheCapacity); assertEquals("Unexpected cache capacity", CACHE_CAPACITY, cacheCapacity);
assertEquals("Unexpected amount of cache used", current, cacheUsed); assertEquals("Unexpected amount of cache used", current, cacheUsed);
MetricsRecordBuilder dnMetrics;
long numCacheCommands = 0;
long numUncacheCommands = 0;
// Cache each block in succession, checking each time // Cache each block in succession, checking each time
for (int i=0; i<NUM_BLOCKS; i++) { for (int i=0; i<NUM_BLOCKS; i++) {
setHeartbeatResponse(cacheBlock(locs[i])); setHeartbeatResponse(cacheBlock(locs[i]));
current = verifyExpectedCacheUsage(current + blockSizes[i]); current = verifyExpectedCacheUsage(current + blockSizes[i]);
dnMetrics = getMetrics(dn.getMetrics().name());
long cmds = MetricsAsserts.getLongCounter("BlocksCached", dnMetrics);
assertTrue("Expected more cache requests from the NN ("
+ cmds + " <= " + numCacheCommands + ")",
cmds > numCacheCommands);
numCacheCommands = cmds;
} }
// Uncache each block in succession, again checking each time // Uncache each block in succession, again checking each time
for (int i=0; i<NUM_BLOCKS; i++) { for (int i=0; i<NUM_BLOCKS; i++) {
setHeartbeatResponse(uncacheBlock(locs[i])); setHeartbeatResponse(uncacheBlock(locs[i]));
current = verifyExpectedCacheUsage(current - blockSizes[i]); current = verifyExpectedCacheUsage(current - blockSizes[i]);
dnMetrics = getMetrics(dn.getMetrics().name());
long cmds = MetricsAsserts.getLongCounter("BlocksUncached", dnMetrics);
assertTrue("Expected more uncache requests from the NN",
cmds > numUncacheCommands);
numUncacheCommands = cmds;
} }
LOG.info("finishing testCacheAndUncacheBlock"); LOG.info("finishing testCacheAndUncacheBlock");
} }
@ -293,6 +312,9 @@ public class TestFsDatasetCache {
return lines > 0; return lines > 0;
} }
}, 500, 30000); }, 500, 30000);
// Also check the metrics for the failure
assertTrue("Expected more than 0 failed cache attempts",
fsd.getNumBlocksFailedToCache() > 0);
// Uncache the n-1 files // Uncache the n-1 files
for (int i=0; i<numFiles-1; i++) { for (int i=0; i<numFiles-1; i++) {
@ -322,8 +344,8 @@ public class TestFsDatasetCache {
final long[] blockSizes = getBlockSizes(locs); final long[] blockSizes = getBlockSizes(locs);
// Check initial state // Check initial state
final long cacheCapacity = fsd.getDnCacheCapacity(); final long cacheCapacity = fsd.getCacheCapacity();
long cacheUsed = fsd.getDnCacheUsed(); long cacheUsed = fsd.getCacheUsed();
long current = 0; long current = 0;
assertEquals("Unexpected cache capacity", CACHE_CAPACITY, cacheCapacity); assertEquals("Unexpected cache capacity", CACHE_CAPACITY, cacheCapacity);
assertEquals("Unexpected amount of cache used", current, cacheUsed); assertEquals("Unexpected amount of cache used", current, cacheUsed);
@ -354,4 +376,24 @@ public class TestFsDatasetCache {
current = verifyExpectedCacheUsage(0); current = verifyExpectedCacheUsage(0);
LOG.info("finishing testUncachingBlocksBeforeCachingFinishes"); LOG.info("finishing testUncachingBlocksBeforeCachingFinishes");
} }
@Test(timeout=60000)
public void testUncacheUnknownBlock() throws Exception {
// Create a file
Path fileName = new Path("/testUncacheUnknownBlock");
int fileLen = 4096;
DFSTestUtil.createFile(fs, fileName, fileLen, (short)1, 0xFDFD);
HdfsBlockLocation[] locs = (HdfsBlockLocation[])fs.getFileBlockLocations(
fileName, 0, fileLen);
// Try to uncache it without caching it first
setHeartbeatResponse(uncacheBlocks(locs));
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
return fsd.getNumBlocksFailedToUncache() > 0;
}
}, 100, 10000);
}
} }

View File

@ -28,6 +28,7 @@ import java.net.HttpURLConnection;
import java.net.URI; import java.net.URI;
import java.net.URISyntaxException; import java.net.URISyntaxException;
import java.net.URL; import java.net.URL;
import java.net.URLConnection;
import java.util.Random; import java.util.Random;
import org.apache.commons.logging.impl.Log4JLogger; import org.apache.commons.logging.impl.Log4JLogger;
@ -40,15 +41,16 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.MiniDFSCluster.Builder;
import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils; import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.web.HftpFileSystem;
import org.apache.hadoop.hdfs.web.HsftpFileSystem;
import org.apache.hadoop.util.ServletUtil; import org.apache.hadoop.util.ServletUtil;
import org.apache.log4j.Level; import org.apache.log4j.Level;
import org.junit.*; import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
public class TestHftpFileSystem { public class TestHftpFileSystem {
private static final Random RAN = new Random(); private static final Random RAN = new Random();
@ -65,32 +67,24 @@ public class TestHftpFileSystem {
new Path("/foo;bar"), new Path("/foo;bar"),
// URI does not encode, Request#getPathInfo returns verbatim // URI does not encode, Request#getPathInfo returns verbatim
new Path("/foo+"), new Path("/foo+"), new Path("/foo+bar/foo+bar"),
new Path("/foo+bar/foo+bar"), new Path("/foo=bar/foo=bar"), new Path("/foo,bar/foo,bar"),
new Path("/foo=bar/foo=bar"), new Path("/foo@bar/foo@bar"), new Path("/foo&bar/foo&bar"),
new Path("/foo,bar/foo,bar"), new Path("/foo$bar/foo$bar"), new Path("/foo_bar/foo_bar"),
new Path("/foo@bar/foo@bar"), new Path("/foo~bar/foo~bar"), new Path("/foo.bar/foo.bar"),
new Path("/foo&bar/foo&bar"), new Path("/foo../bar/foo../bar"), new Path("/foo.../bar/foo.../bar"),
new Path("/foo$bar/foo$bar"),
new Path("/foo_bar/foo_bar"),
new Path("/foo~bar/foo~bar"),
new Path("/foo.bar/foo.bar"),
new Path("/foo../bar/foo../bar"),
new Path("/foo.../bar/foo.../bar"),
new Path("/foo'bar/foo'bar"), new Path("/foo'bar/foo'bar"),
new Path("/foo#bar/foo#bar"), new Path("/foo#bar/foo#bar"),
new Path("/foo!bar/foo!bar"), new Path("/foo!bar/foo!bar"),
// HDFS file names may not contain ":" // HDFS file names may not contain ":"
// URI percent encodes, Request#getPathInfo decodes // URI percent encodes, Request#getPathInfo decodes
new Path("/foo bar/foo bar"), new Path("/foo bar/foo bar"), new Path("/foo?bar/foo?bar"),
new Path("/foo?bar/foo?bar"), new Path("/foo\">bar/foo\">bar"), };
new Path("/foo\">bar/foo\">bar"),
};
@BeforeClass @BeforeClass
public static void setUp() throws IOException { public static void setUp() throws IOException {
((Log4JLogger)HftpFileSystem.LOG).getLogger().setLevel(Level.ALL); ((Log4JLogger) HftpFileSystem.LOG).getLogger().setLevel(Level.ALL);
final long seed = RAN.nextLong(); final long seed = RAN.nextLong();
System.out.println("seed=" + seed); System.out.println("seed=" + seed);
@ -99,8 +93,8 @@ public class TestHftpFileSystem {
config = new Configuration(); config = new Configuration();
cluster = new MiniDFSCluster.Builder(config).numDataNodes(2).build(); cluster = new MiniDFSCluster.Builder(config).numDataNodes(2).build();
blockPoolId = cluster.getNamesystem().getBlockPoolId(); blockPoolId = cluster.getNamesystem().getBlockPoolId();
hftpUri = hftpUri = "hftp://"
"hftp://" + config.get(DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY); + config.get(DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY);
} }
@AfterClass @AfterClass
@ -140,7 +134,8 @@ public class TestHftpFileSystem {
// Check the file status matches the path. Hftp returns a FileStatus // Check the file status matches the path. Hftp returns a FileStatus
// with the entire URI, extract the path part. // with the entire URI, extract the path part.
assertEquals(p, new Path(hftpFs.getFileStatus(p).getPath().toUri().getPath())); assertEquals(p, new Path(hftpFs.getFileStatus(p).getPath().toUri()
.getPath()));
// Test list status (listPath servlet) // Test list status (listPath servlet)
assertEquals(1, hftpFs.listStatus(p).length); assertEquals(1, hftpFs.listStatus(p).length);
@ -158,21 +153,20 @@ public class TestHftpFileSystem {
if (hdfs.exists(path)) { if (hdfs.exists(path)) {
hdfs.delete(path, true); hdfs.delete(path, true);
} }
FSDataOutputStream out = hdfs.create(path, (short)1); FSDataOutputStream out = hdfs.create(path, (short) 1);
out.writeBytes("0123456789"); out.writeBytes("0123456789");
out.close(); out.close();
// Get the path's block location so we can determine // Get the path's block location so we can determine
// if we were redirected to the right DN. // if we were redirected to the right DN.
BlockLocation[] locations = BlockLocation[] locations = hdfs.getFileBlockLocations(path, 0, 10);
hdfs.getFileBlockLocations(path, 0, 10);
String xferAddr = locations[0].getNames()[0]; String xferAddr = locations[0].getNames()[0];
// Connect to the NN to get redirected // Connect to the NN to get redirected
URL u = hftpFs.getNamenodeURL( URL u = hftpFs.getNamenodeURL(
"/data" + ServletUtil.encodePath(path.toUri().getPath()), "/data" + ServletUtil.encodePath(path.toUri().getPath()),
"ugi=userx,groupy"); "ugi=userx,groupy");
HttpURLConnection conn = (HttpURLConnection)u.openConnection(); HttpURLConnection conn = (HttpURLConnection) u.openConnection();
HttpURLConnection.setFollowRedirects(true); HttpURLConnection.setFollowRedirects(true);
conn.connect(); conn.connect();
conn.getInputStream(); conn.getInputStream();
@ -181,15 +175,15 @@ public class TestHftpFileSystem {
// Find the datanode that has the block according to locations // Find the datanode that has the block according to locations
// and check that the URL was redirected to this DN's info port // and check that the URL was redirected to this DN's info port
for (DataNode node : cluster.getDataNodes()) { for (DataNode node : cluster.getDataNodes()) {
DatanodeRegistration dnR = DatanodeRegistration dnR = DataNodeTestUtils.getDNRegistrationForBP(node,
DataNodeTestUtils.getDNRegistrationForBP(node, blockPoolId); blockPoolId);
if (dnR.getXferAddr().equals(xferAddr)) { if (dnR.getXferAddr().equals(xferAddr)) {
checked = true; checked = true;
assertEquals(dnR.getInfoPort(), conn.getURL().getPort()); assertEquals(dnR.getInfoPort(), conn.getURL().getPort());
} }
} }
assertTrue("The test never checked that location of " + assertTrue("The test never checked that location of "
"the block and hftp desitnation are the same", checked); + "the block and hftp desitnation are the same", checked);
} }
/** /**
@ -260,7 +254,7 @@ public class TestHftpFileSystem {
os.writeBytes("0123456789"); os.writeBytes("0123456789");
os.close(); os.close();
// ByteRangeInputStream delays opens until reads. Make sure it doesn't // ByteRangeInputStream delays opens until reads. Make sure it doesn't
// open a closed stream that has never been opened // open a closed stream that has never been opened
FSDataInputStream in = hftpFs.open(testFile); FSDataInputStream in = hftpFs.open(testFile);
in.close(); in.close();
@ -298,16 +292,15 @@ public class TestHftpFileSystem {
URI uri = URI.create("hftp://localhost"); URI uri = URI.create("hftp://localhost");
HftpFileSystem fs = (HftpFileSystem) FileSystem.get(uri, conf); HftpFileSystem fs = (HftpFileSystem) FileSystem.get(uri, conf);
assertEquals(DFSConfigKeys.DFS_NAMENODE_HTTP_PORT_DEFAULT, fs.getDefaultPort()); assertEquals(DFSConfigKeys.DFS_NAMENODE_HTTP_PORT_DEFAULT,
fs.getDefaultPort());
assertEquals(uri, fs.getUri()); assertEquals(uri, fs.getUri());
// HFTP uses http to get the token so canonical service name should // HFTP uses http to get the token so canonical service name should
// return the http port. // return the http port.
assertEquals( assertEquals("127.0.0.1:" + DFSConfigKeys.DFS_NAMENODE_HTTP_PORT_DEFAULT,
"127.0.0.1:" + DFSConfigKeys.DFS_NAMENODE_HTTP_PORT_DEFAULT, fs.getCanonicalServiceName());
fs.getCanonicalServiceName()
);
} }
@Test @Test
@ -324,10 +317,7 @@ public class TestHftpFileSystem {
// HFTP uses http to get the token so canonical service name should // HFTP uses http to get the token so canonical service name should
// return the http port. // return the http port.
assertEquals( assertEquals("127.0.0.1:123", fs.getCanonicalServiceName());
"127.0.0.1:123",
fs.getCanonicalServiceName()
);
} }
@Test @Test
@ -336,13 +326,11 @@ public class TestHftpFileSystem {
URI uri = URI.create("hftp://localhost:123"); URI uri = URI.create("hftp://localhost:123");
HftpFileSystem fs = (HftpFileSystem) FileSystem.get(uri, conf); HftpFileSystem fs = (HftpFileSystem) FileSystem.get(uri, conf);
assertEquals(DFSConfigKeys.DFS_NAMENODE_HTTP_PORT_DEFAULT, fs.getDefaultPort()); assertEquals(DFSConfigKeys.DFS_NAMENODE_HTTP_PORT_DEFAULT,
fs.getDefaultPort());
assertEquals(uri, fs.getUri()); assertEquals(uri, fs.getUri());
assertEquals( assertEquals("127.0.0.1:123", fs.getCanonicalServiceName());
"127.0.0.1:123",
fs.getCanonicalServiceName()
);
} }
@Test @Test
@ -356,13 +344,20 @@ public class TestHftpFileSystem {
assertEquals(123, fs.getDefaultPort()); assertEquals(123, fs.getDefaultPort());
assertEquals(uri, fs.getUri()); assertEquals(uri, fs.getUri());
assertEquals( assertEquals("127.0.0.1:789", fs.getCanonicalServiceName());
"127.0.0.1:789",
fs.getCanonicalServiceName()
);
} }
/// @Test
public void testTimeout() throws IOException {
Configuration conf = new Configuration();
URI uri = URI.create("hftp://localhost");
HftpFileSystem fs = (HftpFileSystem) FileSystem.get(uri, conf);
URLConnection conn = fs.connectionFactory.openConnection(new URL("http://localhost"));
assertEquals(URLConnectionFactory.DEFAULT_SOCKET_TIMEOUT, conn.getConnectTimeout());
assertEquals(URLConnectionFactory.DEFAULT_SOCKET_TIMEOUT, conn.getReadTimeout());
}
// /
@Test @Test
public void testHsftpDefaultPorts() throws IOException { public void testHsftpDefaultPorts() throws IOException {
@ -370,13 +365,12 @@ public class TestHftpFileSystem {
URI uri = URI.create("hsftp://localhost"); URI uri = URI.create("hsftp://localhost");
HsftpFileSystem fs = (HsftpFileSystem) FileSystem.get(uri, conf); HsftpFileSystem fs = (HsftpFileSystem) FileSystem.get(uri, conf);
assertEquals(DFSConfigKeys.DFS_NAMENODE_HTTPS_PORT_DEFAULT, fs.getDefaultPort()); assertEquals(DFSConfigKeys.DFS_NAMENODE_HTTPS_PORT_DEFAULT,
fs.getDefaultPort());
assertEquals(uri, fs.getUri()); assertEquals(uri, fs.getUri());
assertEquals( assertEquals("127.0.0.1:" + DFSConfigKeys.DFS_NAMENODE_HTTPS_PORT_DEFAULT,
"127.0.0.1:"+DFSConfigKeys.DFS_NAMENODE_HTTPS_PORT_DEFAULT, fs.getCanonicalServiceName());
fs.getCanonicalServiceName()
);
} }
@Test @Test
@ -391,10 +385,7 @@ public class TestHftpFileSystem {
assertEquals(456, fs.getDefaultPort()); assertEquals(456, fs.getDefaultPort());
assertEquals(uri, fs.getUri()); assertEquals(uri, fs.getUri());
assertEquals( assertEquals("127.0.0.1:456", fs.getCanonicalServiceName());
"127.0.0.1:456",
fs.getCanonicalServiceName()
);
} }
@Test @Test
@ -403,13 +394,11 @@ public class TestHftpFileSystem {
URI uri = URI.create("hsftp://localhost:123"); URI uri = URI.create("hsftp://localhost:123");
HsftpFileSystem fs = (HsftpFileSystem) FileSystem.get(uri, conf); HsftpFileSystem fs = (HsftpFileSystem) FileSystem.get(uri, conf);
assertEquals(DFSConfigKeys.DFS_NAMENODE_HTTPS_PORT_DEFAULT, fs.getDefaultPort()); assertEquals(DFSConfigKeys.DFS_NAMENODE_HTTPS_PORT_DEFAULT,
fs.getDefaultPort());
assertEquals(uri, fs.getUri()); assertEquals(uri, fs.getUri());
assertEquals( assertEquals("127.0.0.1:123", fs.getCanonicalServiceName());
"127.0.0.1:123",
fs.getCanonicalServiceName()
);
} }
@Test @Test
@ -424,9 +413,6 @@ public class TestHftpFileSystem {
assertEquals(456, fs.getDefaultPort()); assertEquals(456, fs.getDefaultPort());
assertEquals(uri, fs.getUri()); assertEquals(uri, fs.getUri());
assertEquals( assertEquals("127.0.0.1:789", fs.getCanonicalServiceName());
"127.0.0.1:789",
fs.getCanonicalServiceName()
);
} }
} }

View File

@ -1,140 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.web;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.net.HttpURLConnection;
import java.net.InetAddress;
import java.net.ServerSocket;
import java.net.SocketTimeoutException;
import java.net.URI;
import java.util.LinkedList;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hdfs.web.HftpFileSystem;
import org.apache.hadoop.hdfs.web.HsftpFileSystem;
import org.apache.hadoop.hdfs.web.URLConnectionFactory;
import org.junit.Test;
public class TestHftpURLTimeouts {
@Test
public void testHftpSocketTimeout() throws Exception {
Configuration conf = new Configuration();
ServerSocket socket = new ServerSocket(0,1);
URI uri = new URI("hftp", null,
InetAddress.getByName(null).getHostAddress(),
socket.getLocalPort(),
null, null, null);
HftpFileSystem fs = (HftpFileSystem)FileSystem.get(uri, conf);
fs.connectionFactory = new URLConnectionFactory(5);
boolean timedout = false;
try {
HttpURLConnection conn = fs.openConnection("/", "");
timedout = false;
try {
// this will consume the only slot in the backlog
conn.getInputStream();
} catch (SocketTimeoutException ste) {
timedout = true;
assertEquals("Read timed out", ste.getMessage());
} finally {
if (conn != null) conn.disconnect();
}
assertTrue("read timedout", timedout);
assertTrue("connect timedout", checkConnectTimeout(fs, false));
} finally {
fs.connectionFactory = URLConnectionFactory.DEFAULT_CONNECTION_FACTORY;
fs.close();
}
}
@Test
public void testHsftpSocketTimeout() throws Exception {
Configuration conf = new Configuration();
ServerSocket socket = new ServerSocket(0,1);
URI uri = new URI("hsftp", null,
InetAddress.getByName(null).getHostAddress(),
socket.getLocalPort(),
null, null, null);
boolean timedout = false;
HsftpFileSystem fs = (HsftpFileSystem)FileSystem.get(uri, conf);
fs.connectionFactory = new URLConnectionFactory(5);
try {
HttpURLConnection conn = null;
timedout = false;
try {
// this will consume the only slot in the backlog
conn = fs.openConnection("/", "");
} catch (SocketTimeoutException ste) {
// SSL expects a negotiation, so it will timeout on read, unlike hftp
timedout = true;
assertEquals("Read timed out", ste.getMessage());
} finally {
if (conn != null) conn.disconnect();
}
assertTrue("ssl read connect timedout", timedout);
assertTrue("connect timedout", checkConnectTimeout(fs, true));
} finally {
fs.connectionFactory = URLConnectionFactory.DEFAULT_CONNECTION_FACTORY;
fs.close();
}
}
private boolean checkConnectTimeout(HftpFileSystem fs, boolean ignoreReadTimeout)
throws IOException {
boolean timedout = false;
List<HttpURLConnection> conns = new LinkedList<HttpURLConnection>();
try {
// with a listen backlog of 1, should only have to make one connection
// to trigger a connection timeout. however... linux doesn't honor the
// socket's listen backlog so we have to try a bunch of times
for (int n=32; !timedout && n > 0; n--) {
try {
conns.add(fs.openConnection("/", ""));
} catch (SocketTimeoutException ste) {
String message = ste.getMessage();
assertNotNull(message);
// https will get a read timeout due to SSL negotiation, but
// a normal http will not, so need to ignore SSL read timeouts
// until a connect timeout occurs
if (!(ignoreReadTimeout && "Read timed out".equals(message))) {
timedout = true;
assertEquals("connect timed out", message);
}
}
}
} finally {
for (HttpURLConnection conn : conns) {
conn.disconnect();
}
}
return timedout;
}
}

View File

@ -197,6 +197,10 @@ Release 2.3.0 - UNRELEASED
MAPREDUCE-5585. TestCopyCommitter#testNoCommitAction Fails on JDK7 MAPREDUCE-5585. TestCopyCommitter#testNoCommitAction Fails on JDK7
(jeagles) (jeagles)
MAPREDUCE-5186. mapreduce.job.max.split.locations causes some splits
created by CombineFileInputFormat to fail (Robert Parker and Jason Lowe
via jlowe)
Release 2.2.1 - UNRELEASED Release 2.2.1 - UNRELEASED
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -20,6 +20,7 @@ package org.apache.hadoop.mapreduce.split;
import java.io.IOException; import java.io.IOException;
import java.io.UnsupportedEncodingException; import java.io.UnsupportedEncodingException;
import java.util.Arrays;
import java.util.List; import java.util.List;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
@ -39,6 +40,9 @@ import org.apache.hadoop.mapreduce.split.JobSplit.SplitMetaInfo;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.InterfaceStability;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
/** /**
* The class that is used by the Job clients to write splits (both the meta * The class that is used by the Job clients to write splits (both the meta
* and the raw bytes parts) * and the raw bytes parts)
@ -47,6 +51,7 @@ import org.apache.hadoop.classification.InterfaceStability;
@InterfaceStability.Unstable @InterfaceStability.Unstable
public class JobSplitWriter { public class JobSplitWriter {
private static final Log LOG = LogFactory.getLog(JobSplitWriter.class);
private static final int splitVersion = JobSplit.META_SPLIT_VERSION; private static final int splitVersion = JobSplit.META_SPLIT_VERSION;
private static final byte[] SPLIT_FILE_HEADER; private static final byte[] SPLIT_FILE_HEADER;
@ -129,9 +134,10 @@ public class JobSplitWriter {
long currCount = out.getPos(); long currCount = out.getPos();
String[] locations = split.getLocations(); String[] locations = split.getLocations();
if (locations.length > maxBlockLocations) { if (locations.length > maxBlockLocations) {
throw new IOException("Max block location exceeded for split: " LOG.warn("Max block location exceeded for split: "
+ split + " splitsize: " + locations.length + + split + " splitsize: " + locations.length +
" maxsize: " + maxBlockLocations); " maxsize: " + maxBlockLocations);
locations = Arrays.copyOf(locations, maxBlockLocations);
} }
info[i++] = info[i++] =
new JobSplit.SplitMetaInfo( new JobSplit.SplitMetaInfo(
@ -159,9 +165,10 @@ public class JobSplitWriter {
long currLen = out.getPos(); long currLen = out.getPos();
String[] locations = split.getLocations(); String[] locations = split.getLocations();
if (locations.length > maxBlockLocations) { if (locations.length > maxBlockLocations) {
throw new IOException("Max block location exceeded for split: " LOG.warn("Max block location exceeded for split: "
+ split + " splitsize: " + locations.length + + split + " splitsize: " + locations.length +
" maxsize: " + maxBlockLocations); " maxsize: " + maxBlockLocations);
locations = Arrays.copyOf(locations,maxBlockLocations);
} }
info[i++] = new JobSplit.SplitMetaInfo( info[i++] = new JobSplit.SplitMetaInfo(
locations, offset, locations, offset,

View File

@ -82,6 +82,14 @@
</description> </description>
</property> </property>
<property>
<name>mapreduce.job.max.split.locations</name>
<value>10</value>
<description>The max number of block locations to store for each split for
locality calculation.
</description>
</property>
<property> <property>
<name>mapreduce.job.split.metainfo.maxsize</name> <name>mapreduce.job.split.metainfo.maxsize</name>
<value>10000000</value> <value>10000000</value>

View File

@ -0,0 +1,86 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapreduce.split;
import static org.junit.Assert.assertEquals;
import java.io.File;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.mapreduce.MRConfig;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.junit.Test;
public class TestJobSplitWriter {
private static final File TEST_DIR = new File(
System.getProperty("test.build.data",
System.getProperty("java.io.tmpdir")), "TestJobSplitWriter");
@Test
public void testMaxBlockLocationsNewSplits() throws Exception {
TEST_DIR.mkdirs();
try {
Configuration conf = new Configuration();
conf.setInt(MRConfig.MAX_BLOCK_LOCATIONS_KEY, 4);
Path submitDir = new Path(TEST_DIR.getAbsolutePath());
FileSystem fs = FileSystem.getLocal(conf);
FileSplit split = new FileSplit(new Path("/some/path"), 0, 1,
new String[] { "loc1", "loc2", "loc3", "loc4", "loc5" });
JobSplitWriter.createSplitFiles(submitDir, conf, fs,
new FileSplit[] { split });
JobSplit.TaskSplitMetaInfo[] infos =
SplitMetaInfoReader.readSplitMetaInfo(new JobID(), fs, conf,
submitDir);
assertEquals("unexpected number of splits", 1, infos.length);
assertEquals("unexpected number of split locations",
4, infos[0].getLocations().length);
} finally {
FileUtil.fullyDelete(TEST_DIR);
}
}
@Test
public void testMaxBlockLocationsOldSplits() throws Exception {
TEST_DIR.mkdirs();
try {
Configuration conf = new Configuration();
conf.setInt(MRConfig.MAX_BLOCK_LOCATIONS_KEY, 4);
Path submitDir = new Path(TEST_DIR.getAbsolutePath());
FileSystem fs = FileSystem.getLocal(conf);
org.apache.hadoop.mapred.FileSplit split =
new org.apache.hadoop.mapred.FileSplit(new Path("/some/path"), 0, 1,
new String[] { "loc1", "loc2", "loc3", "loc4", "loc5" });
JobSplitWriter.createSplitFiles(submitDir, conf, fs,
new org.apache.hadoop.mapred.InputSplit[] { split });
JobSplit.TaskSplitMetaInfo[] infos =
SplitMetaInfoReader.readSplitMetaInfo(new JobID(), fs, conf,
submitDir);
assertEquals("unexpected number of splits", 1, infos.length);
assertEquals("unexpected number of split locations",
4, infos[0].getLocations().length);
} finally {
FileUtil.fullyDelete(TEST_DIR);
}
}
}

View File

@ -1,176 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapred;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.File;
import java.io.IOException;
import java.util.Iterator;
import junit.framework.TestCase;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableUtils;
import org.apache.hadoop.util.Progressable;
import org.apache.hadoop.util.StringUtils;
/**
* A JUnit test to test limits on block locations
*/
public class TestBlockLimits extends TestCase {
private static String TEST_ROOT_DIR = new File(System.getProperty(
"test.build.data", "/tmp")).toURI().toString().replace(' ', '+');
public void testWithLimits() throws IOException, InterruptedException,
ClassNotFoundException {
MiniMRClientCluster mr = null;
try {
mr = MiniMRClientClusterFactory.create(this.getClass(), 2,
new Configuration());
runCustomFormat(mr);
} finally {
if (mr != null) {
mr.stop();
}
}
}
private void runCustomFormat(MiniMRClientCluster mr) throws IOException {
JobConf job = new JobConf(mr.getConfig());
FileSystem fileSys = FileSystem.get(job);
Path testDir = new Path(TEST_ROOT_DIR + "/test_mini_mr_local");
Path outDir = new Path(testDir, "out");
System.out.println("testDir= " + testDir);
fileSys.delete(testDir, true);
job.setInputFormat(MyInputFormat.class);
job.setOutputFormat(MyOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
job.setMapperClass(MyMapper.class);
job.setReducerClass(MyReducer.class);
job.setNumMapTasks(100);
job.setNumReduceTasks(1);
job.set("non.std.out", outDir.toString());
try {
JobClient.runJob(job);
assertTrue(false);
} catch (IOException ie) {
System.out.println("Failed job " + StringUtils.stringifyException(ie));
} finally {
fileSys.delete(testDir, true);
}
}
static class MyMapper extends MapReduceBase implements
Mapper<WritableComparable, Writable, WritableComparable, Writable> {
public void map(WritableComparable key, Writable value,
OutputCollector<WritableComparable, Writable> out, Reporter reporter)
throws IOException {
}
}
static class MyReducer extends MapReduceBase implements
Reducer<WritableComparable, Writable, WritableComparable, Writable> {
public void reduce(WritableComparable key, Iterator<Writable> values,
OutputCollector<WritableComparable, Writable> output, Reporter reporter)
throws IOException {
}
}
private static class MyInputFormat implements InputFormat<IntWritable, Text> {
private static class MySplit implements InputSplit {
int first;
int length;
public MySplit() {
}
public MySplit(int first, int length) {
this.first = first;
this.length = length;
}
public String[] getLocations() {
return new String[200];
}
public long getLength() {
return length;
}
public void write(DataOutput out) throws IOException {
WritableUtils.writeVInt(out, first);
WritableUtils.writeVInt(out, length);
}
public void readFields(DataInput in) throws IOException {
first = WritableUtils.readVInt(in);
length = WritableUtils.readVInt(in);
}
}
public InputSplit[] getSplits(JobConf job, int numSplits)
throws IOException {
return new MySplit[] { new MySplit(0, 1), new MySplit(1, 3),
new MySplit(4, 2) };
}
public RecordReader<IntWritable, Text> getRecordReader(InputSplit split,
JobConf job, Reporter reporter) throws IOException {
return null;
}
}
static class MyOutputFormat implements OutputFormat {
static class MyRecordWriter implements RecordWriter<Object, Object> {
public MyRecordWriter(Path outputFile, JobConf job) throws IOException {
}
public void write(Object key, Object value) throws IOException {
return;
}
public void close(Reporter reporter) throws IOException {
}
}
public RecordWriter getRecordWriter(FileSystem ignored, JobConf job,
String name, Progressable progress) throws IOException {
return new MyRecordWriter(new Path(job.get("non.std.out")), job);
}
public void checkOutputSpecs(FileSystem ignored, JobConf job)
throws IOException {
}
}
}

View File

@ -91,6 +91,9 @@ Release 2.3.0 - UNRELEASED
YARN-1121. Changed ResourceManager's state-store to drain all events on YARN-1121. Changed ResourceManager's state-store to drain all events on
shut-down. (Jian He via vinodkv) shut-down. (Jian He via vinodkv)
YARN-1387. RMWebServices should use ClientRMService for filtering
applications (Karthik Kambatla via Sandy Ryza)
OPTIMIZATIONS OPTIMIZATIONS
BUG FIXES BUG FIXES
@ -118,6 +121,9 @@ Release 2.3.0 - UNRELEASED
YARN-1374. Changed ResourceManager to start the preemption policy monitors YARN-1374. Changed ResourceManager to start the preemption policy monitors
as active services. (Karthik Kambatla via vinodkv) as active services. (Karthik Kambatla via vinodkv)
YARN-1395. Distributed shell application master launched with debug flag can
hang waiting for external ls process. (cnauroth)
Release 2.2.1 - UNRELEASED Release 2.2.1 - UNRELEASED
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -21,6 +21,8 @@ package org.apache.hadoop.yarn.api.protocolrecords;
import java.util.EnumSet; import java.util.EnumSet;
import java.util.Set; import java.util.Set;
import org.apache.commons.collections.buffer.UnboundedFifoBuffer;
import org.apache.commons.lang.math.LongRange;
import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Stable; import org.apache.hadoop.classification.InterfaceStability.Stable;
@ -150,4 +152,109 @@ public abstract class GetApplicationsRequest {
@Unstable @Unstable
public abstract void public abstract void
setApplicationStates(EnumSet<YarnApplicationState> applicationStates); setApplicationStates(EnumSet<YarnApplicationState> applicationStates);
/**
* Set the application states to filter applications on
*
* @param applicationStates all lower-case string representation of the
* application states to filter on
*/
@Private
@Unstable
public abstract void setApplicationStates(Set<String> applicationStates);
/**
* Get the users to filter applications on
*
* @return set of users to filter applications on
*/
@Private
@Unstable
public abstract Set<String> getUsers();
/**
* Set the users to filter applications on
*
* @param users set of users to filter applications on
*/
@Private
@Unstable
public abstract void setUsers(Set<String> users);
/**
* Get the queues to filter applications on
*
* @return set of queues to filter applications on
*/
@Private
@Unstable
public abstract Set<String> getQueues();
/**
* Set the queue to filter applications on
*
* @param queue user to filter applications on
*/
@Private
@Unstable
public abstract void setQueues(Set<String> queue);
/**
* Get the limit on the number applications to return
*
* @return number of applications to limit to
*/
@Private
@Unstable
public abstract long getLimit();
/**
* Limit the number applications to return
*
* @param limit number of applications to limit to
*/
@Private
@Unstable
public abstract void setLimit(long limit);
/**
* Get the range of start times to filter applications on
*
* @return {@link LongRange} of start times to filter applications on
*/
@Private
@Unstable
public abstract LongRange getStartRange();
/**
* Set the range of start times to filter applications on
*
* @param begin beginning of the range
* @param end end of the range
* @throws IllegalArgumentException
*/
@Private
@Unstable
public abstract void setStartRange(long begin, long end)
throws IllegalArgumentException;
/**
* Get the range of finish times to filter applications on
*
* @return {@link LongRange} of finish times to filter applications on
*/
@Private
@Unstable
public abstract LongRange getFinishRange();
/**
* Set the range of finish times to filter applications on
*
* @param begin beginning of the range
* @param end end of the range
* @throws IllegalArgumentException
*/
@Private
@Unstable
public abstract void setFinishRange(long begin, long end);
} }

View File

@ -125,6 +125,13 @@ message GetClusterMetricsResponseProto {
message GetApplicationsRequestProto { message GetApplicationsRequestProto {
repeated string application_types = 1; repeated string application_types = 1;
repeated YarnApplicationStateProto application_states = 2; repeated YarnApplicationStateProto application_states = 2;
repeated string users = 3;
repeated string queues = 4;
optional int64 limit = 5;
optional int64 start_begin = 6;
optional int64 start_end = 7;
optional int64 finish_begin = 8;
optional int64 finish_end = 9;
} }
message GetApplicationsResponseProto { message GetApplicationsResponseProto {

View File

@ -21,6 +21,7 @@ package org.apache.hadoop.yarn.applications.distributedshell;
import java.io.BufferedReader; import java.io.BufferedReader;
import java.io.IOException; import java.io.IOException;
import java.io.InputStreamReader; import java.io.InputStreamReader;
import java.io.StringReader;
import java.net.URI; import java.net.URI;
import java.net.URISyntaxException; import java.net.URISyntaxException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
@ -46,10 +47,12 @@ import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.DataOutputBuffer; import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.Shell;
import org.apache.hadoop.yarn.api.ApplicationConstants; import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.api.ApplicationConstants.Environment; import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
import org.apache.hadoop.yarn.api.ApplicationMasterProtocol; import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
@ -262,25 +265,20 @@ public class ApplicationMaster {
+ env.getValue()); + env.getValue());
} }
String cmd = "ls -al"; BufferedReader buf = null;
Runtime run = Runtime.getRuntime();
Process pr = null;
try { try {
pr = run.exec(cmd); String lines = Shell.WINDOWS ? Shell.execCommand("cmd", "/c", "dir") :
pr.waitFor(); Shell.execCommand("ls", "-al");
buf = new BufferedReader(new StringReader(lines));
BufferedReader buf = new BufferedReader(new InputStreamReader(
pr.getInputStream()));
String line = ""; String line = "";
while ((line = buf.readLine()) != null) { while ((line = buf.readLine()) != null) {
LOG.info("System CWD content: " + line); LOG.info("System CWD content: " + line);
System.out.println("System CWD content: " + line); System.out.println("System CWD content: " + line);
} }
buf.close();
} catch (IOException e) { } catch (IOException e) {
e.printStackTrace(); e.printStackTrace();
} catch (InterruptedException e) { } finally {
e.printStackTrace(); IOUtils.cleanup(LOG, buf);
} }
} }

View File

@ -306,5 +306,31 @@ public class TestDistributedShell {
} }
@Test(timeout=90000)
public void testDebugFlag() throws Exception {
String[] args = {
"--jar",
APPMASTER_JAR,
"--num_containers",
"2",
"--shell_command",
Shell.WINDOWS ? "dir" : "ls",
"--master_memory",
"512",
"--master_vcores",
"2",
"--container_memory",
"128",
"--container_vcores",
"1",
"--debug"
};
LOG.info("Initializing DS Client");
Client client = new Client(new Configuration(yarnCluster.getConfig()));
Assert.assertTrue(client.init(args));
LOG.info("Running DS Client");
Assert.assertTrue(client.run());
}
} }

View File

@ -24,6 +24,7 @@ import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Set; import java.util.Set;
import org.apache.commons.lang.math.LongRange;
import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsRequest;
@ -44,6 +45,10 @@ public class GetApplicationsRequestPBImpl extends GetApplicationsRequest {
Set<String> applicationTypes = null; Set<String> applicationTypes = null;
EnumSet<YarnApplicationState> applicationStates = null; EnumSet<YarnApplicationState> applicationStates = null;
Set<String> users = null;
Set<String> queues = null;
long limit = Long.MAX_VALUE;
LongRange start = null, finish = null;
public GetApplicationsRequestPBImpl() { public GetApplicationsRequestPBImpl() {
builder = GetApplicationsRequestProto.newBuilder(); builder = GetApplicationsRequestProto.newBuilder();
@ -148,6 +153,26 @@ public class GetApplicationsRequestPBImpl extends GetApplicationsRequest {
} }
} }
private void initUsers() {
if (this.users != null) {
return;
}
GetApplicationsRequestProtoOrBuilder p = viaProto ? proto : builder;
List<String> usersList = p.getUsersList();
this.users = new HashSet<String>();
this.users.addAll(usersList);
}
private void initQueues() {
if (this.queues != null) {
return;
}
GetApplicationsRequestProtoOrBuilder p = viaProto ? proto : builder;
List<String> queuesList = p.getQueuesList();
this.queues = new HashSet<String>();
this.queues.addAll(queuesList);
}
@Override @Override
public Set<String> getApplicationTypes() { public Set<String> getApplicationTypes() {
initApplicationTypes(); initApplicationTypes();
@ -177,6 +202,111 @@ public class GetApplicationsRequestPBImpl extends GetApplicationsRequest {
this.applicationStates = applicationStates; this.applicationStates = applicationStates;
} }
@Override
public void setApplicationStates(Set<String> applicationStates) {
EnumSet<YarnApplicationState> appStates = null;
for (YarnApplicationState state : YarnApplicationState.values()) {
if (applicationStates.contains(state.name().toLowerCase())) {
if (appStates == null) {
appStates = EnumSet.of(state);
} else {
appStates.add(state);
}
}
}
setApplicationStates(appStates);
}
@Override
public Set<String> getUsers() {
initUsers();
return this.users;
}
@Override
public void setUsers(Set<String> users) {
maybeInitBuilder();
if (users == null) {
builder.clearUsers();
}
this.users = users;
}
@Override
public Set<String> getQueues() {
initQueues();
return this.queues;
}
@Override
public void setQueues(Set<String> queues) {
maybeInitBuilder();
if (queues == null) {
builder.clearQueues();
}
this.queues = queues;
}
@Override
public long getLimit() {
if (this.limit == Long.MAX_VALUE) {
GetApplicationsRequestProtoOrBuilder p = viaProto ? proto : builder;
this.limit = p.hasLimit() ? p.getLimit() : Long.MAX_VALUE;
}
return this.limit;
}
@Override
public void setLimit(long limit) {
maybeInitBuilder();
this.limit = limit;
}
@Override
public LongRange getStartRange() {
if (this.start == null) {
GetApplicationsRequestProtoOrBuilder p = viaProto ? proto: builder;
if (p.hasStartBegin() || p.hasFinishBegin()) {
long begin = p.hasStartBegin() ? p.getStartBegin() : 0L;
long end = p.hasStartEnd() ? p.getStartEnd() : Long.MAX_VALUE;
this.start = new LongRange(begin, end);
}
}
return this.start;
}
@Override
public void setStartRange(long begin, long end)
throws IllegalArgumentException {
if (begin > end) {
throw new IllegalArgumentException("begin > end in range (begin, " +
"end): (" + begin + ", " + end + ")");
}
this.start = new LongRange(begin, end);
}
@Override
public LongRange getFinishRange() {
if (this.finish == null) {
GetApplicationsRequestProtoOrBuilder p = viaProto ? proto: builder;
if (p.hasFinishBegin() || p.hasFinishEnd()) {
long begin = p.hasFinishBegin() ? p.getFinishBegin() : 0L;
long end = p.hasFinishEnd() ? p.getFinishEnd() : Long.MAX_VALUE;
this.finish = new LongRange(begin, end);
}
}
return this.finish;
}
@Override
public void setFinishRange(long begin, long end) {
if (begin > end) {
throw new IllegalArgumentException("begin > end in range (begin, " +
"end): (" + begin + ", " + end + ")");
}
this.finish = new LongRange(begin, end);
}
@Override @Override
public int hashCode() { public int hashCode() {
return getProto().hashCode(); return getProto().hashCode();

View File

@ -28,6 +28,7 @@ import java.util.List;
import java.util.Set; import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.lang.math.LongRange;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceAudience.Private;
@ -401,6 +402,18 @@ public class ClientRMService extends AbstractService implements
@Override @Override
public GetApplicationsResponse getApplications( public GetApplicationsResponse getApplications(
GetApplicationsRequest request) throws YarnException { GetApplicationsRequest request) throws YarnException {
return getApplications(request, true);
}
/**
* Get applications matching the {@link GetApplicationsRequest}. If
* caseSensitive is set to false, applicationTypes in
* GetApplicationRequest are expected to be in all-lowercase
*/
@Private
public GetApplicationsResponse getApplications(
GetApplicationsRequest request, boolean caseSensitive)
throws YarnException {
UserGroupInformation callerUGI; UserGroupInformation callerUGI;
try { try {
callerUGI = UserGroupInformation.getCurrentUser(); callerUGI = UserGroupInformation.getCurrentUser();
@ -412,11 +425,23 @@ public class ClientRMService extends AbstractService implements
Set<String> applicationTypes = request.getApplicationTypes(); Set<String> applicationTypes = request.getApplicationTypes();
EnumSet<YarnApplicationState> applicationStates = EnumSet<YarnApplicationState> applicationStates =
request.getApplicationStates(); request.getApplicationStates();
Set<String> users = request.getUsers();
Set<String> queues = request.getQueues();
long limit = request.getLimit();
LongRange start = request.getStartRange();
LongRange finish = request.getFinishRange();
List<ApplicationReport> reports = new ArrayList<ApplicationReport>(); List<ApplicationReport> reports = new ArrayList<ApplicationReport>();
long count = 0;
for (RMApp application : this.rmContext.getRMApps().values()) { for (RMApp application : this.rmContext.getRMApps().values()) {
if (++count > limit) {
break;
}
if (applicationTypes != null && !applicationTypes.isEmpty()) { if (applicationTypes != null && !applicationTypes.isEmpty()) {
if (!applicationTypes.contains(application.getApplicationType())) { String appTypeToMatch = caseSensitive
? application.getApplicationType()
: application.getApplicationType().toLowerCase();
if (!applicationTypes.contains(appTypeToMatch)) {
continue; continue;
} }
} }
@ -427,6 +452,25 @@ public class ClientRMService extends AbstractService implements
continue; continue;
} }
} }
if (users != null && !users.isEmpty() &&
!users.contains(application.getUser())) {
continue;
}
if (queues != null && !queues.isEmpty() &&
!queues.contains(application.getQueue())) {
continue;
}
if (start != null && !start.containsLong(application.getStartTime())) {
continue;
}
if (finish != null && !finish.containsLong(application.getFinishTime())) {
continue;
}
boolean allowAccess = checkAccess(callerUGI, application.getUser(), boolean allowAccess = checkAccess(callerUGI, application.getUser(),
ApplicationAccessType.VIEW_APP, application); ApplicationAccessType.VIEW_APP, application);
reports.add(application.createAndGetApplicationReport( reports.add(application.createAndGetApplicationReport(

View File

@ -24,6 +24,7 @@ import java.util.Collection;
import java.util.EnumSet; import java.util.EnumSet;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
@ -38,14 +39,20 @@ import javax.ws.rs.QueryParam;
import javax.ws.rs.core.Context; import javax.ws.rs.core.Context;
import javax.ws.rs.core.MediaType; import javax.ws.rs.core.MediaType;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsRequest;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType; import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeState; import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.api.records.QueueACL; import org.apache.hadoop.yarn.api.records.QueueACL;
import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.resourcemanager.RMServerUtils; import org.apache.hadoop.yarn.server.resourcemanager.RMServerUtils;
@ -85,6 +92,8 @@ import com.google.inject.Singleton;
@Singleton @Singleton
@Path("/ws/v1/cluster") @Path("/ws/v1/cluster")
public class RMWebServices { public class RMWebServices {
private static final Log LOG =
LogFactory.getLog(RMWebServices.class.getName());
private static final String EMPTY = ""; private static final String EMPTY = "";
private static final String ANY = "*"; private static final String ANY = "*";
private final ResourceManager rm; private final ResourceManager rm;
@ -253,7 +262,6 @@ public class RMWebServices {
@QueryParam("finishedTimeBegin") String finishBegin, @QueryParam("finishedTimeBegin") String finishBegin,
@QueryParam("finishedTimeEnd") String finishEnd, @QueryParam("finishedTimeEnd") String finishEnd,
@QueryParam("applicationTypes") Set<String> applicationTypes) { @QueryParam("applicationTypes") Set<String> applicationTypes) {
long num = 0;
boolean checkCount = false; boolean checkCount = false;
boolean checkStart = false; boolean checkStart = false;
boolean checkEnd = false; boolean checkEnd = false;
@ -328,19 +336,66 @@ public class RMWebServices {
checkAppStates = true; checkAppStates = true;
} }
final ConcurrentMap<ApplicationId, RMApp> apps = rm.getRMContext() GetApplicationsRequest request = GetApplicationsRequest.newInstance();
.getRMApps();
if (checkStart) {
request.setStartRange(sBegin, sEnd);
}
if (checkEnd) {
request.setFinishRange(fBegin, fEnd);
}
if (checkCount) {
request.setLimit(countNum);
}
if (checkAppTypes) {
request.setApplicationTypes(appTypes);
}
if (checkAppStates) {
request.setApplicationStates(appStates);
}
if (queueQuery != null && !queueQuery.isEmpty()) {
ResourceScheduler rs = rm.getResourceScheduler();
if (rs instanceof CapacityScheduler) {
CapacityScheduler cs = (CapacityScheduler) rs;
// validate queue exists
try {
cs.getQueueInfo(queueQuery, false, false);
} catch (IOException e) {
throw new BadRequestException(e.getMessage());
}
}
Set<String> queues = new HashSet<String>(1);
queues.add(queueQuery);
request.setQueues(queues);
}
if (userQuery != null && !userQuery.isEmpty()) {
Set<String> users = new HashSet<String>(1);
users.add(userQuery);
request.setUsers(users);
}
List<ApplicationReport> appReports = null;
try {
appReports = rm.getClientRMService()
.getApplications(request, false).getApplicationList();
} catch (YarnException e) {
LOG.error("Unable to retrieve apps from ClientRMService", e);
throw new YarnRuntimeException(
"Unable to retrieve apps from ClientRMService", e);
}
final ConcurrentMap<ApplicationId, RMApp> apps =
rm.getRMContext().getRMApps();
AppsInfo allApps = new AppsInfo(); AppsInfo allApps = new AppsInfo();
for (RMApp rmapp : apps.values()) { for (ApplicationReport report : appReports) {
RMApp rmapp = apps.get(report.getApplicationId());
if (checkCount && num == countNum) {
break;
}
if (checkAppStates && !appStates.contains(
rmapp.createApplicationState().toString().toLowerCase())) {
continue;
}
if (finalStatusQuery != null && !finalStatusQuery.isEmpty()) { if (finalStatusQuery != null && !finalStatusQuery.isEmpty()) {
FinalApplicationStatus.valueOf(finalStatusQuery); FinalApplicationStatus.valueOf(finalStatusQuery);
if (!rmapp.getFinalApplicationStatus().toString() if (!rmapp.getFinalApplicationStatus().toString()
@ -348,43 +403,9 @@ public class RMWebServices {
continue; continue;
} }
} }
if (userQuery != null && !userQuery.isEmpty()) {
if (!rmapp.getUser().equals(userQuery)) {
continue;
}
}
if (queueQuery != null && !queueQuery.isEmpty()) {
ResourceScheduler rs = rm.getResourceScheduler();
if (rs instanceof CapacityScheduler) {
CapacityScheduler cs = (CapacityScheduler) rs;
// validate queue exists
try {
cs.getQueueInfo(queueQuery, false, false);
} catch (IOException e) {
throw new BadRequestException(e.getMessage());
}
}
if (!rmapp.getQueue().equals(queueQuery)) {
continue;
}
}
if (checkAppTypes && !appTypes.contains(
rmapp.getApplicationType().trim().toLowerCase())) {
continue;
}
if (checkStart
&& (rmapp.getStartTime() < sBegin || rmapp.getStartTime() > sEnd)) {
continue;
}
if (checkEnd
&& (rmapp.getFinishTime() < fBegin || rmapp.getFinishTime() > fEnd)) {
continue;
}
AppInfo app = new AppInfo(rmapp, hasAccess(rmapp, hsr)); AppInfo app = new AppInfo(rmapp, hasAccess(rmapp, hsr));
allApps.add(app); allApps.add(app);
num++;
} }
return allApps; return allApps;
} }

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.yarn.server.resourcemanager; package org.apache.hadoop.yarn.server.resourcemanager;
import static org.junit.Assert.assertEquals;
import static org.mockito.Matchers.anyBoolean; import static org.mockito.Matchers.anyBoolean;
import static org.mockito.Matchers.anyString; import static org.mockito.Matchers.anyString;
import static org.mockito.Matchers.any; import static org.mockito.Matchers.any;
@ -50,7 +51,6 @@ import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoResponse;
@ -409,6 +409,89 @@ public class TestClientRMService {
getAllApplicationsResponse.getApplicationList() getAllApplicationsResponse.getApplicationList()
.get(0).getApplicationId()); .get(0).getApplicationId());
} }
@Test
public void testGetApplications() throws IOException, YarnException {
/**
* 1. Submit 3 applications alternately in two queues
* 2. Test each of the filters
*/
// Basic setup
YarnScheduler yarnScheduler = mockYarnScheduler();
RMContext rmContext = mock(RMContext.class);
mockRMContext(yarnScheduler, rmContext);
RMStateStore stateStore = mock(RMStateStore.class);
when(rmContext.getStateStore()).thenReturn(stateStore);
RMAppManager appManager = new RMAppManager(rmContext, yarnScheduler,
null, mock(ApplicationACLsManager.class), new Configuration());
when(rmContext.getDispatcher().getEventHandler()).thenReturn(
new EventHandler<Event>() {
public void handle(Event event) {}
});
ApplicationACLsManager mockAclsManager = mock(ApplicationACLsManager.class);
QueueACLsManager mockQueueACLsManager = mock(QueueACLsManager.class);
when(mockQueueACLsManager.checkAccess(any(UserGroupInformation.class),
any(QueueACL.class), anyString())).thenReturn(true);
ClientRMService rmService =
new ClientRMService(rmContext, yarnScheduler, appManager,
mockAclsManager, mockQueueACLsManager, null);
// Initialize appnames and queues
String[] queues = {"Q-1", "Q-2"};
String[] appNames =
{MockApps.newAppName(), MockApps.newAppName(), MockApps.newAppName()};
ApplicationId[] appIds =
{getApplicationId(101), getApplicationId(102), getApplicationId(103)};
// Submit applications
for (int i = 0; i < appIds.length; i++) {
ApplicationId appId = appIds[i];
when(mockAclsManager.checkAccess(UserGroupInformation.getCurrentUser(),
ApplicationAccessType.VIEW_APP, null, appId)).thenReturn(true);
SubmitApplicationRequest submitRequest = mockSubmitAppRequest(
appId, appNames[i], queues[i % queues.length]);
rmService.submitApplication(submitRequest);
}
// Test different cases of ClientRMService#getApplications()
GetApplicationsRequest request = GetApplicationsRequest.newInstance();
assertEquals("Incorrect total number of apps", 6,
rmService.getApplications(request).getApplicationList().size());
// Check limit
request.setLimit(1L);
assertEquals("Failed to limit applications", 1,
rmService.getApplications(request).getApplicationList().size());
// Check queue
request = GetApplicationsRequest.newInstance();
Set<String> queueSet = new HashSet<String>();
request.setQueues(queueSet);
queueSet.add(queues[0]);
assertEquals("Incorrect number of applications in queue", 2,
rmService.getApplications(request).getApplicationList().size());
assertEquals("Incorrect number of applications in queue", 2,
rmService.getApplications(request, false).getApplicationList().size());
queueSet.add(queues[1]);
assertEquals("Incorrect number of applications in queue", 3,
rmService.getApplications(request).getApplicationList().size());
// Check user
request = GetApplicationsRequest.newInstance();
Set<String> userSet = new HashSet<String>();
request.setUsers(userSet);
userSet.add("random-user-name");
assertEquals("Incorrect number of applications for user", 0,
rmService.getApplications(request).getApplicationList().size());
userSet.add(UserGroupInformation.getCurrentUser().getShortUserName());
assertEquals("Incorrect number of applications for user", 3,
rmService.getApplications(request).getApplicationList().size());
}
@Test(timeout=4000) @Test(timeout=4000)
public void testConcurrentAppSubmit() public void testConcurrentAppSubmit()
@ -492,10 +575,10 @@ public class TestClientRMService {
submissionContext.setResource(resource); submissionContext.setResource(resource);
submissionContext.setApplicationType(appType); submissionContext.setApplicationType(appType);
SubmitApplicationRequest submitRequest = SubmitApplicationRequest submitRequest =
recordFactory.newRecordInstance(SubmitApplicationRequest.class); recordFactory.newRecordInstance(SubmitApplicationRequest.class);
submitRequest.setApplicationSubmissionContext(submissionContext); submitRequest.setApplicationSubmissionContext(submissionContext);
return submitRequest; return submitRequest;
} }
private void mockRMContext(YarnScheduler yarnScheduler, RMContext rmContext) private void mockRMContext(YarnScheduler yarnScheduler, RMContext rmContext)