From 9673baa7e8b43fa6300080f72ebce0189ea775e5 Mon Sep 17 00:00:00 2001 From: Andrew Wang Date: Mon, 11 Nov 2013 18:30:07 +0000 Subject: [PATCH 1/6] HDFS-5320. Add datanode caching metrics. Contributed by Andrew Wang. git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1540796 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 2 + ...atanodeProtocolClientSideTranslatorPB.java | 10 +-- ...atanodeProtocolServerSideTranslatorPB.java | 3 +- .../hdfs/server/datanode/BPServiceActor.java | 6 +- .../fsdataset/impl/FsDatasetCache.java | 27 +++++++- .../fsdataset/impl/FsDatasetImpl.java | 64 +++++++++++-------- .../datanode/metrics/FSDatasetMBean.java | 16 ++++- .../src/main/proto/DatanodeProtocol.proto | 4 +- .../server/datanode/SimulatedFSDataset.java | 15 ++++- .../server/datanode/TestFsDatasetCache.java | 56 ++++++++++++++-- 10 files changed, 150 insertions(+), 53 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 1918cac2e2a..637c77404c8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -364,6 +364,8 @@ Trunk (Unreleased) HDFS-5482. DistributedFileSystem#listPathBasedCacheDirectives must support relative paths. (Colin Patrick McCabe via cnauroth) + HDFS-5320. Add datanode caching metrics. (wang) + Release 2.3.0 - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java index f12a92ff0ec..ef44e0cd158 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java @@ -155,7 +155,7 @@ public class DatanodeProtocolClientSideTranslatorPB implements @Override public HeartbeatResponse sendHeartbeat(DatanodeRegistration registration, - StorageReport[] reports, long dnCacheCapacity, long dnCacheUsed, + StorageReport[] reports, long cacheCapacity, long cacheUsed, int xmitsInProgress, int xceiverCount, int failedVolumes) throws IOException { HeartbeatRequestProto.Builder builder = HeartbeatRequestProto.newBuilder() @@ -165,11 +165,11 @@ public class DatanodeProtocolClientSideTranslatorPB implements for (StorageReport r : reports) { builder.addReports(PBHelper.convert(r)); } - if (dnCacheCapacity != 0) { - builder.setDnCacheCapacity(dnCacheCapacity); + if (cacheCapacity != 0) { + builder.setCacheCapacity(cacheCapacity); } - if (dnCacheUsed != 0) { - builder.setDnCacheUsed(dnCacheUsed); + if (cacheUsed != 0) { + builder.setCacheUsed(cacheUsed); } HeartbeatResponseProto resp; try { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolServerSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolServerSideTranslatorPB.java index ab067709519..3f0c437c90c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolServerSideTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolServerSideTranslatorPB.java @@ -57,7 +57,6 @@ import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport; import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks; import org.apache.hadoop.hdfs.server.protocol.StorageReport; -import com.google.common.primitives.Longs; import com.google.protobuf.RpcController; import com.google.protobuf.ServiceException; @@ -112,7 +111,7 @@ public class DatanodeProtocolServerSideTranslatorPB implements p.getBlockPoolUsed()); } response = impl.sendHeartbeat(PBHelper.convert(request.getRegistration()), - report, request.getDnCacheCapacity(), request.getDnCacheUsed(), + report, request.getCacheCapacity(), request.getCacheUsed(), request.getXmitsInProgress(), request.getXceiverCount(), request.getFailedVolumes()); } catch (IOException e) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java index 73d3cdffadb..9d2b36d9823 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java @@ -435,7 +435,7 @@ class BPServiceActor implements Runnable { DatanodeCommand cacheReport() throws IOException { // If caching is disabled, do not send a cache report - if (dn.getFSDataset().getDnCacheCapacity() == 0) { + if (dn.getFSDataset().getCacheCapacity() == 0) { return null; } // send cache report if timer has expired. @@ -475,8 +475,8 @@ class BPServiceActor implements Runnable { dn.getFSDataset().getRemaining(), dn.getFSDataset().getBlockPoolUsed(bpos.getBlockPoolId())) }; return bpNamenode.sendHeartbeat(bpRegistration, report, - dn.getFSDataset().getDnCacheCapacity(), - dn.getFSDataset().getDnCacheUsed(), + dn.getFSDataset().getCacheCapacity(), + dn.getFSDataset().getCacheUsed(), dn.getXmitsInProgress(), dn.getXceiverCount(), dn.getFSDataset().getNumFailedVolumes()); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java index 2af46bb8915..3e12168e22d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java @@ -226,6 +226,15 @@ public class FsDatasetCache { */ private final long maxBytes; + /** + * Number of cache commands that could not be completed successfully + */ + AtomicLong numBlocksFailedToCache = new AtomicLong(0); + /** + * Number of uncache commands that could not be completed successfully + */ + AtomicLong numBlocksFailedToUncache = new AtomicLong(0); + public FsDatasetCache(FsDatasetImpl dataset) { this.dataset = dataset; this.maxBytes = dataset.datanode.getDnConf().getMaxLockedMemory(); @@ -274,6 +283,7 @@ public class FsDatasetCache { " already exists in the FsDatasetCache with state " + prevValue.state); } + numBlocksFailedToCache.incrementAndGet(); return; } mappableBlockMap.put(key, new Value(null, State.CACHING)); @@ -291,6 +301,7 @@ public class FsDatasetCache { "does not need to be uncached, because it is not currently " + "in the mappableBlockMap."); } + numBlocksFailedToUncache.incrementAndGet(); return; } switch (prevValue.state) { @@ -317,6 +328,7 @@ public class FsDatasetCache { "does not need to be uncached, because it is " + "in state " + prevValue.state + "."); } + numBlocksFailedToUncache.incrementAndGet(); break; } } @@ -349,7 +361,8 @@ public class FsDatasetCache { LOG.warn("Failed to cache block id " + key.id + ", pool " + key.bpid + ": could not reserve " + length + " more bytes in the " + "cache: " + DFSConfigKeys.DFS_DATANODE_MAX_LOCKED_MEMORY_KEY + - " of " + maxBytes + " exceeded."); + " of " + maxBytes + " exceeded."); + numBlocksFailedToCache.incrementAndGet(); return; } try { @@ -413,6 +426,7 @@ public class FsDatasetCache { if (mappableBlock != null) { mappableBlock.close(); } + numBlocksFailedToCache.incrementAndGet(); } } } @@ -449,7 +463,7 @@ public class FsDatasetCache { } } - // Stats related methods for FsDatasetMBean + // Stats related methods for FSDatasetMBean /** * Get the approximate amount of cache space used. @@ -464,4 +478,13 @@ public class FsDatasetCache { public long getDnCacheCapacity() { return maxBytes; } + + public long getNumBlocksFailedToCache() { + return numBlocksFailedToCache.get(); + } + + public long getNumBlocksFailedToUncache() { + return numBlocksFailedToUncache.get(); + } + } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java index 65f57712ec3..d8c0155f719 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java @@ -290,22 +290,26 @@ class FsDatasetImpl implements FsDatasetSpi { return volumes.numberOfFailedVolumes(); } - /** - * Returns the total cache used by the datanode (in bytes). - */ @Override // FSDatasetMBean - public long getDnCacheUsed() { + public long getCacheUsed() { return cacheManager.getDnCacheUsed(); } - /** - * Returns the total cache capacity of the datanode (in bytes). - */ @Override // FSDatasetMBean - public long getDnCacheCapacity() { + public long getCacheCapacity() { return cacheManager.getDnCacheCapacity(); } + @Override // FSDatasetMBean + public long getNumBlocksFailedToCache() { + return cacheManager.getNumBlocksFailedToCache(); + } + + @Override // FSDatasetMBean + public long getNumBlocksFailedToUncache() { + return cacheManager.getNumBlocksFailedToUncache(); + } + /** * Find the block's on-disk length */ @@ -1193,28 +1197,36 @@ class FsDatasetImpl implements FsDatasetSpi { synchronized (this) { ReplicaInfo info = volumeMap.get(bpid, blockId); - if (info == null) { - LOG.warn("Failed to cache block with id " + blockId + ", pool " + - bpid + ": ReplicaInfo not found."); - return; - } - if (info.getState() != ReplicaState.FINALIZED) { - LOG.warn("Failed to cache block with id " + blockId + ", pool " + - bpid + ": replica is not finalized; it is in state " + - info.getState()); - return; - } + boolean success = false; try { - volume = (FsVolumeImpl)info.getVolume(); - if (volume == null) { + if (info == null) { LOG.warn("Failed to cache block with id " + blockId + ", pool " + - bpid + ": volume not found."); + bpid + ": ReplicaInfo not found."); return; } - } catch (ClassCastException e) { - LOG.warn("Failed to cache block with id " + blockId + - ": volume was not an instance of FsVolumeImpl."); - return; + if (info.getState() != ReplicaState.FINALIZED) { + LOG.warn("Failed to cache block with id " + blockId + ", pool " + + bpid + ": replica is not finalized; it is in state " + + info.getState()); + return; + } + try { + volume = (FsVolumeImpl)info.getVolume(); + if (volume == null) { + LOG.warn("Failed to cache block with id " + blockId + ", pool " + + bpid + ": volume not found."); + return; + } + } catch (ClassCastException e) { + LOG.warn("Failed to cache block with id " + blockId + + ": volume was not an instance of FsVolumeImpl."); + return; + } + success = true; + } finally { + if (!success) { + cacheManager.numBlocksFailedToCache.incrementAndGet(); + } } blockFileName = info.getBlockFile().getAbsolutePath(); length = info.getVisibleLength(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/FSDatasetMBean.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/FSDatasetMBean.java index 82757d065bb..40ccefb6c3a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/FSDatasetMBean.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/FSDatasetMBean.java @@ -79,12 +79,22 @@ public interface FSDatasetMBean { public int getNumFailedVolumes(); /** - * Returns the total cache used by the datanode (in bytes). + * Returns the amount of cache used by the datanode (in bytes). */ - public long getDnCacheUsed(); + public long getCacheUsed(); /** * Returns the total cache capacity of the datanode (in bytes). */ - public long getDnCacheCapacity(); + public long getCacheCapacity(); + + /** + * Returns the number of blocks that the datanode was unable to cache + */ + public long getNumBlocksFailedToCache(); + + /** + * Returns the number of blocks that the datanode was unable to uncache + */ + public long getNumBlocksFailedToUncache(); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto index d64d97abca3..28077a85ff2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto @@ -188,8 +188,8 @@ message HeartbeatRequestProto { optional uint32 xmitsInProgress = 3 [ default = 0 ]; optional uint32 xceiverCount = 4 [ default = 0 ]; optional uint32 failedVolumes = 5 [ default = 0 ]; - optional uint64 dnCacheCapacity = 6 [ default = 0 ]; - optional uint64 dnCacheUsed = 7 [default = 0 ]; + optional uint64 cacheCapacity = 6 [ default = 0 ]; + optional uint64 cacheUsed = 7 [default = 0 ]; } message StorageReportProto { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java index d5df755167c..a855f126420 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java @@ -18,7 +18,6 @@ package org.apache.hadoop.hdfs.server.datanode; import java.io.File; -import java.io.FileInputStream; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; @@ -497,12 +496,22 @@ public class SimulatedFSDataset implements FsDatasetSpi { } @Override // FSDatasetMBean - public long getDnCacheUsed() { + public long getCacheUsed() { return 0l; } @Override // FSDatasetMBean - public long getDnCacheCapacity() { + public long getCacheCapacity() { + return 0l; + } + + @Override + public long getNumBlocksFailedToCache() { + return 0l; + } + + @Override + public long getNumBlocksFailedToUncache() { return 0l; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.java index e889413ec47..7f5a9101b65 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.java @@ -17,11 +17,13 @@ */ package org.apache.hadoop.hdfs.server.datanode; +import static junit.framework.Assert.assertTrue; +import static org.apache.hadoop.test.MetricsAsserts.getMetrics; import static org.junit.Assert.assertEquals; import static org.junit.Assume.assumeTrue; import static org.mockito.Matchers.any; -import static org.mockito.Matchers.anyLong; import static org.mockito.Matchers.anyInt; +import static org.mockito.Matchers.anyLong; import static org.mockito.Mockito.doReturn; import java.io.FileInputStream; @@ -57,14 +59,15 @@ import org.apache.hadoop.hdfs.server.protocol.HeartbeatResponse; import org.apache.hadoop.hdfs.server.protocol.NNHAStatusHeartbeat; import org.apache.hadoop.hdfs.server.protocol.StorageReport; import org.apache.hadoop.io.nativeio.NativeIO; +import org.apache.hadoop.metrics2.MetricsRecordBuilder; import org.apache.hadoop.test.GenericTestUtils; +import org.apache.hadoop.test.MetricsAsserts; import org.apache.log4j.Logger; import org.junit.After; import org.junit.Assert; import org.junit.Before; import org.junit.Test; -import com.google.common.base.Preconditions; import com.google.common.base.Supplier; public class TestFsDatasetCache { @@ -94,6 +97,7 @@ public class TestFsDatasetCache { conf.setLong(DFSConfigKeys.DFS_DATANODE_MAX_LOCKED_MEMORY_KEY, CACHE_CAPACITY); conf.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1); + conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_CACHING_ENABLED_KEY, true); cluster = new MiniDFSCluster.Builder(conf) .numDataNodes(1).build(); @@ -187,7 +191,7 @@ public class TestFsDatasetCache { @Override public Boolean get() { - long curDnCacheUsed = fsd.getDnCacheUsed(); + long curDnCacheUsed = fsd.getCacheUsed(); if (curDnCacheUsed != expected) { if (tries++ > 10) { LOG.info("verifyExpectedCacheUsage: expected " + @@ -222,22 +226,37 @@ public class TestFsDatasetCache { final long[] blockSizes = getBlockSizes(locs); // Check initial state - final long cacheCapacity = fsd.getDnCacheCapacity(); - long cacheUsed = fsd.getDnCacheUsed(); + final long cacheCapacity = fsd.getCacheCapacity(); + long cacheUsed = fsd.getCacheUsed(); long current = 0; assertEquals("Unexpected cache capacity", CACHE_CAPACITY, cacheCapacity); assertEquals("Unexpected amount of cache used", current, cacheUsed); + MetricsRecordBuilder dnMetrics; + long numCacheCommands = 0; + long numUncacheCommands = 0; + // Cache each block in succession, checking each time for (int i=0; i numCacheCommands); + numCacheCommands = cmds; } // Uncache each block in succession, again checking each time for (int i=0; i numUncacheCommands); + numUncacheCommands = cmds; } LOG.info("finishing testCacheAndUncacheBlock"); } @@ -293,6 +312,9 @@ public class TestFsDatasetCache { return lines > 0; } }, 500, 30000); + // Also check the metrics for the failure + assertTrue("Expected more than 0 failed cache attempts", + fsd.getNumBlocksFailedToCache() > 0); // Uncache the n-1 files for (int i=0; i() { + @Override + public Boolean get() { + return fsd.getNumBlocksFailedToUncache() > 0; + } + }, 100, 10000); + } } From 38a3b925e9cd1cf0889719894acf65e1c3e25c15 Mon Sep 17 00:00:00 2001 From: Jason Darrell Lowe Date: Mon, 11 Nov 2013 19:22:38 +0000 Subject: [PATCH 2/6] MAPREDUCE-5186. mapreduce.job.max.split.locations causes some splits created by CombineFileInputFormat to fail. Contributed by Robert Parker and Jason Lowe git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1540813 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-mapreduce-project/CHANGES.txt | 4 + .../mapreduce/split/JobSplitWriter.java | 11 +- .../src/main/resources/mapred-default.xml | 8 + .../mapreduce/split/TestJobSplitWriter.java | 86 +++++++++ .../apache/hadoop/mapred/TestBlockLimits.java | 176 ------------------ 5 files changed, 107 insertions(+), 178 deletions(-) create mode 100644 hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/split/TestJobSplitWriter.java delete mode 100644 hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestBlockLimits.java diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 261a1de3e47..240a63a605b 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -197,6 +197,10 @@ Release 2.3.0 - UNRELEASED MAPREDUCE-5585. TestCopyCommitter#testNoCommitAction Fails on JDK7 (jeagles) + MAPREDUCE-5186. mapreduce.job.max.split.locations causes some splits + created by CombineFileInputFormat to fail (Robert Parker and Jason Lowe + via jlowe) + Release 2.2.1 - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/split/JobSplitWriter.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/split/JobSplitWriter.java index e6ecac5b012..eb10ae50694 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/split/JobSplitWriter.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/split/JobSplitWriter.java @@ -20,6 +20,7 @@ package org.apache.hadoop.mapreduce.split; import java.io.IOException; import java.io.UnsupportedEncodingException; +import java.util.Arrays; import java.util.List; import org.apache.hadoop.conf.Configuration; @@ -39,6 +40,9 @@ import org.apache.hadoop.mapreduce.split.JobSplit.SplitMetaInfo; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + /** * The class that is used by the Job clients to write splits (both the meta * and the raw bytes parts) @@ -47,6 +51,7 @@ import org.apache.hadoop.classification.InterfaceStability; @InterfaceStability.Unstable public class JobSplitWriter { + private static final Log LOG = LogFactory.getLog(JobSplitWriter.class); private static final int splitVersion = JobSplit.META_SPLIT_VERSION; private static final byte[] SPLIT_FILE_HEADER; @@ -129,9 +134,10 @@ public class JobSplitWriter { long currCount = out.getPos(); String[] locations = split.getLocations(); if (locations.length > maxBlockLocations) { - throw new IOException("Max block location exceeded for split: " + LOG.warn("Max block location exceeded for split: " + split + " splitsize: " + locations.length + " maxsize: " + maxBlockLocations); + locations = Arrays.copyOf(locations, maxBlockLocations); } info[i++] = new JobSplit.SplitMetaInfo( @@ -159,9 +165,10 @@ public class JobSplitWriter { long currLen = out.getPos(); String[] locations = split.getLocations(); if (locations.length > maxBlockLocations) { - throw new IOException("Max block location exceeded for split: " + LOG.warn("Max block location exceeded for split: " + split + " splitsize: " + locations.length + " maxsize: " + maxBlockLocations); + locations = Arrays.copyOf(locations,maxBlockLocations); } info[i++] = new JobSplit.SplitMetaInfo( locations, offset, diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml index c8974726018..4a228820b23 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml @@ -82,6 +82,14 @@ + + mapreduce.job.max.split.locations + 10 + The max number of block locations to store for each split for + locality calculation. + + + mapreduce.job.split.metainfo.maxsize 10000000 diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/split/TestJobSplitWriter.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/split/TestJobSplitWriter.java new file mode 100644 index 00000000000..fb1a255cf56 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/split/TestJobSplitWriter.java @@ -0,0 +1,86 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapreduce.split; + +import static org.junit.Assert.assertEquals; + +import java.io.File; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapreduce.JobID; +import org.apache.hadoop.mapreduce.MRConfig; +import org.apache.hadoop.mapreduce.lib.input.FileSplit; +import org.junit.Test; + +public class TestJobSplitWriter { + + private static final File TEST_DIR = new File( + System.getProperty("test.build.data", + System.getProperty("java.io.tmpdir")), "TestJobSplitWriter"); + + @Test + public void testMaxBlockLocationsNewSplits() throws Exception { + TEST_DIR.mkdirs(); + try { + Configuration conf = new Configuration(); + conf.setInt(MRConfig.MAX_BLOCK_LOCATIONS_KEY, 4); + Path submitDir = new Path(TEST_DIR.getAbsolutePath()); + FileSystem fs = FileSystem.getLocal(conf); + FileSplit split = new FileSplit(new Path("/some/path"), 0, 1, + new String[] { "loc1", "loc2", "loc3", "loc4", "loc5" }); + JobSplitWriter.createSplitFiles(submitDir, conf, fs, + new FileSplit[] { split }); + JobSplit.TaskSplitMetaInfo[] infos = + SplitMetaInfoReader.readSplitMetaInfo(new JobID(), fs, conf, + submitDir); + assertEquals("unexpected number of splits", 1, infos.length); + assertEquals("unexpected number of split locations", + 4, infos[0].getLocations().length); + } finally { + FileUtil.fullyDelete(TEST_DIR); + } + } + + @Test + public void testMaxBlockLocationsOldSplits() throws Exception { + TEST_DIR.mkdirs(); + try { + Configuration conf = new Configuration(); + conf.setInt(MRConfig.MAX_BLOCK_LOCATIONS_KEY, 4); + Path submitDir = new Path(TEST_DIR.getAbsolutePath()); + FileSystem fs = FileSystem.getLocal(conf); + org.apache.hadoop.mapred.FileSplit split = + new org.apache.hadoop.mapred.FileSplit(new Path("/some/path"), 0, 1, + new String[] { "loc1", "loc2", "loc3", "loc4", "loc5" }); + JobSplitWriter.createSplitFiles(submitDir, conf, fs, + new org.apache.hadoop.mapred.InputSplit[] { split }); + JobSplit.TaskSplitMetaInfo[] infos = + SplitMetaInfoReader.readSplitMetaInfo(new JobID(), fs, conf, + submitDir); + assertEquals("unexpected number of splits", 1, infos.length); + assertEquals("unexpected number of split locations", + 4, infos[0].getLocations().length); + } finally { + FileUtil.fullyDelete(TEST_DIR); + } + } +} diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestBlockLimits.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestBlockLimits.java deleted file mode 100644 index d8b250ad45c..00000000000 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestBlockLimits.java +++ /dev/null @@ -1,176 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.mapred; - -import java.io.DataInput; -import java.io.DataOutput; -import java.io.File; -import java.io.IOException; -import java.util.Iterator; - -import junit.framework.TestCase; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.IntWritable; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.io.Writable; -import org.apache.hadoop.io.WritableComparable; -import org.apache.hadoop.io.WritableUtils; -import org.apache.hadoop.util.Progressable; -import org.apache.hadoop.util.StringUtils; - -/** - * A JUnit test to test limits on block locations - */ -public class TestBlockLimits extends TestCase { - private static String TEST_ROOT_DIR = new File(System.getProperty( - "test.build.data", "/tmp")).toURI().toString().replace(' ', '+'); - - public void testWithLimits() throws IOException, InterruptedException, - ClassNotFoundException { - MiniMRClientCluster mr = null; - try { - mr = MiniMRClientClusterFactory.create(this.getClass(), 2, - new Configuration()); - runCustomFormat(mr); - } finally { - if (mr != null) { - mr.stop(); - } - } - } - - private void runCustomFormat(MiniMRClientCluster mr) throws IOException { - JobConf job = new JobConf(mr.getConfig()); - FileSystem fileSys = FileSystem.get(job); - Path testDir = new Path(TEST_ROOT_DIR + "/test_mini_mr_local"); - Path outDir = new Path(testDir, "out"); - System.out.println("testDir= " + testDir); - fileSys.delete(testDir, true); - job.setInputFormat(MyInputFormat.class); - job.setOutputFormat(MyOutputFormat.class); - job.setOutputKeyClass(Text.class); - job.setOutputValueClass(IntWritable.class); - - job.setMapperClass(MyMapper.class); - job.setReducerClass(MyReducer.class); - job.setNumMapTasks(100); - job.setNumReduceTasks(1); - job.set("non.std.out", outDir.toString()); - try { - JobClient.runJob(job); - assertTrue(false); - } catch (IOException ie) { - System.out.println("Failed job " + StringUtils.stringifyException(ie)); - } finally { - fileSys.delete(testDir, true); - } - - } - - static class MyMapper extends MapReduceBase implements - Mapper { - - public void map(WritableComparable key, Writable value, - OutputCollector out, Reporter reporter) - throws IOException { - } - } - - static class MyReducer extends MapReduceBase implements - Reducer { - public void reduce(WritableComparable key, Iterator values, - OutputCollector output, Reporter reporter) - throws IOException { - } - } - - private static class MyInputFormat implements InputFormat { - - private static class MySplit implements InputSplit { - int first; - int length; - - public MySplit() { - } - - public MySplit(int first, int length) { - this.first = first; - this.length = length; - } - - public String[] getLocations() { - return new String[200]; - } - - public long getLength() { - return length; - } - - public void write(DataOutput out) throws IOException { - WritableUtils.writeVInt(out, first); - WritableUtils.writeVInt(out, length); - } - - public void readFields(DataInput in) throws IOException { - first = WritableUtils.readVInt(in); - length = WritableUtils.readVInt(in); - } - } - - public InputSplit[] getSplits(JobConf job, int numSplits) - throws IOException { - return new MySplit[] { new MySplit(0, 1), new MySplit(1, 3), - new MySplit(4, 2) }; - } - - public RecordReader getRecordReader(InputSplit split, - JobConf job, Reporter reporter) throws IOException { - return null; - } - - } - - static class MyOutputFormat implements OutputFormat { - static class MyRecordWriter implements RecordWriter { - - public MyRecordWriter(Path outputFile, JobConf job) throws IOException { - } - - public void write(Object key, Object value) throws IOException { - return; - } - - public void close(Reporter reporter) throws IOException { - } - } - - public RecordWriter getRecordWriter(FileSystem ignored, JobConf job, - String name, Progressable progress) throws IOException { - return new MyRecordWriter(new Path(job.get("non.std.out")), job); - } - - public void checkOutputSpecs(FileSystem ignored, JobConf job) - throws IOException { - } - } - -} \ No newline at end of file From 2b3e1abcda9f218b04cd7cd75cced91be501358e Mon Sep 17 00:00:00 2001 From: Andrew Wang Date: Mon, 11 Nov 2013 19:25:47 +0000 Subject: [PATCH 3/6] HDFS-5467. Remove tab characters in hdfs-default.xml. Contributed by Shinichi Yamashita. git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1540816 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 3 + .../src/main/resources/hdfs-default.xml | 62 +++++++++---------- 2 files changed, 34 insertions(+), 31 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 637c77404c8..1dcc8b2bc4b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -465,6 +465,9 @@ Release 2.3.0 - UNRELEASED HDFS-5371. Let client retry the same NN when "dfs.client.test.drop.namenode.response.number" is enabled. (jing9) + HDFS-5467. Remove tab characters in hdfs-default.xml. + (Shinichi Yamashita via Andrew Wang) + OPTIMIZATIONS HDFS-5239. Allow FSNamesystem lock fairness to be configurable (daryn) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml index b42e5928ace..45133d09967 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml @@ -1393,43 +1393,43 @@ - dfs.namenode.enable.retrycache - true - - This enables the retry cache on the namenode. Namenode tracks for - non-idempotent requests the corresponding response. If a client retries the - request, the response from the retry cache is sent. Such operations - are tagged with annotation @AtMostOnce in namenode protocols. It is - recommended that this flag be set to true. Setting it to false, will result - in clients getting failure responses to retried request. This flag must - be enabled in HA setup for transparent fail-overs. - - The entries in the cache have expiration time configurable - using dfs.namenode.retrycache.expirytime.millis. - + dfs.namenode.enable.retrycache + true + + This enables the retry cache on the namenode. Namenode tracks for + non-idempotent requests the corresponding response. If a client retries the + request, the response from the retry cache is sent. Such operations + are tagged with annotation @AtMostOnce in namenode protocols. It is + recommended that this flag be set to true. Setting it to false, will result + in clients getting failure responses to retried request. This flag must + be enabled in HA setup for transparent fail-overs. + + The entries in the cache have expiration time configurable + using dfs.namenode.retrycache.expirytime.millis. + - dfs.namenode.retrycache.expirytime.millis - 600000 - - The time for which retry cache entries are retained. - + dfs.namenode.retrycache.expirytime.millis + 600000 + + The time for which retry cache entries are retained. + - dfs.namenode.retrycache.heap.percent - 0.03f - - This parameter configures the heap size allocated for retry cache - (excluding the response cached). This corresponds to approximately - 4096 entries for every 64MB of namenode process java heap size. - Assuming retry cache entry expiration time (configured using - dfs.namenode.retrycache.expirytime.millis) of 10 minutes, this - enables retry cache to support 7 operations per second sustained - for 10 minutes. As the heap size is increased, the operation rate - linearly increases. - + dfs.namenode.retrycache.heap.percent + 0.03f + + This parameter configures the heap size allocated for retry cache + (excluding the response cached). This corresponds to approximately + 4096 entries for every 64MB of namenode process java heap size. + Assuming retry cache entry expiration time (configured using + dfs.namenode.retrycache.expirytime.millis) of 10 minutes, this + enables retry cache to support 7 operations per second sustained + for 10 minutes. As the heap size is increased, the operation rate + linearly increases. + From 5a5ba62a851f0b29434b76a5530a64f7714d3f95 Mon Sep 17 00:00:00 2001 From: Chris Nauroth Date: Mon, 11 Nov 2013 20:46:17 +0000 Subject: [PATCH 4/6] YARN-1395. Distributed shell application master launched with debug flag can hang waiting for external ls process. Contributed by Chris Nauroth. git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1540839 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-yarn-project/CHANGES.txt | 3 +++ .../distributedshell/ApplicationMaster.java | 20 +++++++------- .../TestDistributedShell.java | 26 +++++++++++++++++++ 3 files changed, 38 insertions(+), 11 deletions(-) diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 6c5d808517a..c23b1a528fd 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -118,6 +118,9 @@ Release 2.3.0 - UNRELEASED YARN-1374. Changed ResourceManager to start the preemption policy monitors as active services. (Karthik Kambatla via vinodkv) + YARN-1395. Distributed shell application master launched with debug flag can + hang waiting for external ls process. (cnauroth) + Release 2.2.1 - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java index 364476171f5..06d96cbd198 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java @@ -21,6 +21,7 @@ package org.apache.hadoop.yarn.applications.distributedshell; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; +import java.io.StringReader; import java.net.URI; import java.net.URISyntaxException; import java.nio.ByteBuffer; @@ -46,10 +47,12 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.DataOutputBuffer; +import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.util.Shell; import org.apache.hadoop.yarn.api.ApplicationConstants; import org.apache.hadoop.yarn.api.ApplicationConstants.Environment; import org.apache.hadoop.yarn.api.ApplicationMasterProtocol; @@ -262,25 +265,20 @@ public class ApplicationMaster { + env.getValue()); } - String cmd = "ls -al"; - Runtime run = Runtime.getRuntime(); - Process pr = null; + BufferedReader buf = null; try { - pr = run.exec(cmd); - pr.waitFor(); - - BufferedReader buf = new BufferedReader(new InputStreamReader( - pr.getInputStream())); + String lines = Shell.WINDOWS ? Shell.execCommand("cmd", "/c", "dir") : + Shell.execCommand("ls", "-al"); + buf = new BufferedReader(new StringReader(lines)); String line = ""; while ((line = buf.readLine()) != null) { LOG.info("System CWD content: " + line); System.out.println("System CWD content: " + line); } - buf.close(); } catch (IOException e) { e.printStackTrace(); - } catch (InterruptedException e) { - e.printStackTrace(); + } finally { + IOUtils.cleanup(LOG, buf); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java index 2f311b5114c..a8b546d752b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java @@ -306,5 +306,31 @@ public class TestDistributedShell { } + @Test(timeout=90000) + public void testDebugFlag() throws Exception { + String[] args = { + "--jar", + APPMASTER_JAR, + "--num_containers", + "2", + "--shell_command", + Shell.WINDOWS ? "dir" : "ls", + "--master_memory", + "512", + "--master_vcores", + "2", + "--container_memory", + "128", + "--container_vcores", + "1", + "--debug" + }; + + LOG.info("Initializing DS Client"); + Client client = new Client(new Configuration(yarnCluster.getConfig())); + Assert.assertTrue(client.init(args)); + LOG.info("Running DS Client"); + Assert.assertTrue(client.run()); + } } From 72c6d6255a86225ae1771fcc15e46aff7a4cc384 Mon Sep 17 00:00:00 2001 From: Sanford Ryza Date: Mon, 11 Nov 2013 21:49:43 +0000 Subject: [PATCH 5/6] YARN-1387. RMWebServices should use ClientRMService for filtering applications (Karthik Kambatla via Sandy Ryza) git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1540851 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-yarn-project/CHANGES.txt | 3 + .../GetApplicationsRequest.java | 107 ++++++++++++++ .../src/main/proto/yarn_service_protos.proto | 7 + .../impl/pb/GetApplicationsRequestPBImpl.java | 130 ++++++++++++++++++ .../resourcemanager/ClientRMService.java | 46 ++++++- .../resourcemanager/webapp/RMWebServices.java | 113 ++++++++------- .../resourcemanager/TestClientRMService.java | 93 ++++++++++++- 7 files changed, 447 insertions(+), 52 deletions(-) diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index c23b1a528fd..1e982df92b1 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -91,6 +91,9 @@ Release 2.3.0 - UNRELEASED YARN-1121. Changed ResourceManager's state-store to drain all events on shut-down. (Jian He via vinodkv) + YARN-1387. RMWebServices should use ClientRMService for filtering + applications (Karthik Kambatla via Sandy Ryza) + OPTIMIZATIONS BUG FIXES diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/GetApplicationsRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/GetApplicationsRequest.java index e53bf88a4fc..95254b2cb13 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/GetApplicationsRequest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/GetApplicationsRequest.java @@ -21,6 +21,8 @@ package org.apache.hadoop.yarn.api.protocolrecords; import java.util.EnumSet; import java.util.Set; +import org.apache.commons.collections.buffer.UnboundedFifoBuffer; +import org.apache.commons.lang.math.LongRange; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.classification.InterfaceStability.Stable; @@ -150,4 +152,109 @@ public abstract class GetApplicationsRequest { @Unstable public abstract void setApplicationStates(EnumSet applicationStates); + + /** + * Set the application states to filter applications on + * + * @param applicationStates all lower-case string representation of the + * application states to filter on + */ + @Private + @Unstable + public abstract void setApplicationStates(Set applicationStates); + + /** + * Get the users to filter applications on + * + * @return set of users to filter applications on + */ + @Private + @Unstable + public abstract Set getUsers(); + + /** + * Set the users to filter applications on + * + * @param users set of users to filter applications on + */ + @Private + @Unstable + public abstract void setUsers(Set users); + + /** + * Get the queues to filter applications on + * + * @return set of queues to filter applications on + */ + @Private + @Unstable + public abstract Set getQueues(); + + /** + * Set the queue to filter applications on + * + * @param queue user to filter applications on + */ + @Private + @Unstable + public abstract void setQueues(Set queue); + + /** + * Get the limit on the number applications to return + * + * @return number of applications to limit to + */ + @Private + @Unstable + public abstract long getLimit(); + + /** + * Limit the number applications to return + * + * @param limit number of applications to limit to + */ + @Private + @Unstable + public abstract void setLimit(long limit); + + /** + * Get the range of start times to filter applications on + * + * @return {@link LongRange} of start times to filter applications on + */ + @Private + @Unstable + public abstract LongRange getStartRange(); + + /** + * Set the range of start times to filter applications on + * + * @param begin beginning of the range + * @param end end of the range + * @throws IllegalArgumentException + */ + @Private + @Unstable + public abstract void setStartRange(long begin, long end) + throws IllegalArgumentException; + + /** + * Get the range of finish times to filter applications on + * + * @return {@link LongRange} of finish times to filter applications on + */ + @Private + @Unstable + public abstract LongRange getFinishRange(); + + /** + * Set the range of finish times to filter applications on + * + * @param begin beginning of the range + * @param end end of the range + * @throws IllegalArgumentException + */ + @Private + @Unstable + public abstract void setFinishRange(long begin, long end); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto index 391019ade79..cfe71d44c07 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto @@ -125,6 +125,13 @@ message GetClusterMetricsResponseProto { message GetApplicationsRequestProto { repeated string application_types = 1; repeated YarnApplicationStateProto application_states = 2; + repeated string users = 3; + repeated string queues = 4; + optional int64 limit = 5; + optional int64 start_begin = 6; + optional int64 start_end = 7; + optional int64 finish_begin = 8; + optional int64 finish_end = 9; } message GetApplicationsResponseProto { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/GetApplicationsRequestPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/GetApplicationsRequestPBImpl.java index 33f74f0903e..10513a3dc18 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/GetApplicationsRequestPBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/GetApplicationsRequestPBImpl.java @@ -24,6 +24,7 @@ import java.util.Iterator; import java.util.List; import java.util.Set; +import org.apache.commons.lang.math.LongRange; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsRequest; @@ -44,6 +45,10 @@ public class GetApplicationsRequestPBImpl extends GetApplicationsRequest { Set applicationTypes = null; EnumSet applicationStates = null; + Set users = null; + Set queues = null; + long limit = Long.MAX_VALUE; + LongRange start = null, finish = null; public GetApplicationsRequestPBImpl() { builder = GetApplicationsRequestProto.newBuilder(); @@ -148,6 +153,26 @@ public class GetApplicationsRequestPBImpl extends GetApplicationsRequest { } } + private void initUsers() { + if (this.users != null) { + return; + } + GetApplicationsRequestProtoOrBuilder p = viaProto ? proto : builder; + List usersList = p.getUsersList(); + this.users = new HashSet(); + this.users.addAll(usersList); + } + + private void initQueues() { + if (this.queues != null) { + return; + } + GetApplicationsRequestProtoOrBuilder p = viaProto ? proto : builder; + List queuesList = p.getQueuesList(); + this.queues = new HashSet(); + this.queues.addAll(queuesList); + } + @Override public Set getApplicationTypes() { initApplicationTypes(); @@ -177,6 +202,111 @@ public class GetApplicationsRequestPBImpl extends GetApplicationsRequest { this.applicationStates = applicationStates; } + @Override + public void setApplicationStates(Set applicationStates) { + EnumSet appStates = null; + for (YarnApplicationState state : YarnApplicationState.values()) { + if (applicationStates.contains(state.name().toLowerCase())) { + if (appStates == null) { + appStates = EnumSet.of(state); + } else { + appStates.add(state); + } + } + } + setApplicationStates(appStates); + } + + @Override + public Set getUsers() { + initUsers(); + return this.users; + } + + @Override + public void setUsers(Set users) { + maybeInitBuilder(); + if (users == null) { + builder.clearUsers(); + } + this.users = users; + } + + @Override + public Set getQueues() { + initQueues(); + return this.queues; + } + + @Override + public void setQueues(Set queues) { + maybeInitBuilder(); + if (queues == null) { + builder.clearQueues(); + } + this.queues = queues; + } + + @Override + public long getLimit() { + if (this.limit == Long.MAX_VALUE) { + GetApplicationsRequestProtoOrBuilder p = viaProto ? proto : builder; + this.limit = p.hasLimit() ? p.getLimit() : Long.MAX_VALUE; + } + return this.limit; + } + + @Override + public void setLimit(long limit) { + maybeInitBuilder(); + this.limit = limit; + } + + @Override + public LongRange getStartRange() { + if (this.start == null) { + GetApplicationsRequestProtoOrBuilder p = viaProto ? proto: builder; + if (p.hasStartBegin() || p.hasFinishBegin()) { + long begin = p.hasStartBegin() ? p.getStartBegin() : 0L; + long end = p.hasStartEnd() ? p.getStartEnd() : Long.MAX_VALUE; + this.start = new LongRange(begin, end); + } + } + return this.start; + } + + @Override + public void setStartRange(long begin, long end) + throws IllegalArgumentException { + if (begin > end) { + throw new IllegalArgumentException("begin > end in range (begin, " + + "end): (" + begin + ", " + end + ")"); + } + this.start = new LongRange(begin, end); + } + + @Override + public LongRange getFinishRange() { + if (this.finish == null) { + GetApplicationsRequestProtoOrBuilder p = viaProto ? proto: builder; + if (p.hasFinishBegin() || p.hasFinishEnd()) { + long begin = p.hasFinishBegin() ? p.getFinishBegin() : 0L; + long end = p.hasFinishEnd() ? p.getFinishEnd() : Long.MAX_VALUE; + this.finish = new LongRange(begin, end); + } + } + return this.finish; + } + + @Override + public void setFinishRange(long begin, long end) { + if (begin > end) { + throw new IllegalArgumentException("begin > end in range (begin, " + + "end): (" + begin + ", " + end + ")"); + } + this.finish = new LongRange(begin, end); + } + @Override public int hashCode() { return getProto().hashCode(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java index 90c88c20200..d94bc9e5186 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java @@ -28,6 +28,7 @@ import java.util.List; import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; +import org.apache.commons.lang.math.LongRange; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience.Private; @@ -401,6 +402,18 @@ public class ClientRMService extends AbstractService implements @Override public GetApplicationsResponse getApplications( GetApplicationsRequest request) throws YarnException { + return getApplications(request, true); + } + + /** + * Get applications matching the {@link GetApplicationsRequest}. If + * caseSensitive is set to false, applicationTypes in + * GetApplicationRequest are expected to be in all-lowercase + */ + @Private + public GetApplicationsResponse getApplications( + GetApplicationsRequest request, boolean caseSensitive) + throws YarnException { UserGroupInformation callerUGI; try { callerUGI = UserGroupInformation.getCurrentUser(); @@ -412,11 +425,23 @@ public class ClientRMService extends AbstractService implements Set applicationTypes = request.getApplicationTypes(); EnumSet applicationStates = request.getApplicationStates(); + Set users = request.getUsers(); + Set queues = request.getQueues(); + long limit = request.getLimit(); + LongRange start = request.getStartRange(); + LongRange finish = request.getFinishRange(); List reports = new ArrayList(); + long count = 0; for (RMApp application : this.rmContext.getRMApps().values()) { + if (++count > limit) { + break; + } if (applicationTypes != null && !applicationTypes.isEmpty()) { - if (!applicationTypes.contains(application.getApplicationType())) { + String appTypeToMatch = caseSensitive + ? application.getApplicationType() + : application.getApplicationType().toLowerCase(); + if (!applicationTypes.contains(appTypeToMatch)) { continue; } } @@ -427,6 +452,25 @@ public class ClientRMService extends AbstractService implements continue; } } + + if (users != null && !users.isEmpty() && + !users.contains(application.getUser())) { + continue; + } + + if (queues != null && !queues.isEmpty() && + !queues.contains(application.getQueue())) { + continue; + } + + if (start != null && !start.containsLong(application.getStartTime())) { + continue; + } + + if (finish != null && !finish.containsLong(application.getFinishTime())) { + continue; + } + boolean allowAccess = checkAccess(callerUGI, application.getUser(), ApplicationAccessType.VIEW_APP, application); reports.add(application.createAndGetApplicationReport( diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebServices.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebServices.java index 174038d7ba2..e18c04740cb 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebServices.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebServices.java @@ -24,6 +24,7 @@ import java.util.Collection; import java.util.EnumSet; import java.util.HashMap; import java.util.HashSet; +import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentMap; @@ -38,14 +39,20 @@ import javax.ws.rs.QueryParam; import javax.ws.rs.core.Context; import javax.ws.rs.core.MediaType; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsRequest; import org.apache.hadoop.yarn.api.records.ApplicationAccessType; import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ApplicationReport; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeState; import org.apache.hadoop.yarn.api.records.QueueACL; import org.apache.hadoop.yarn.api.records.YarnApplicationState; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.server.resourcemanager.RMServerUtils; @@ -85,6 +92,8 @@ import com.google.inject.Singleton; @Singleton @Path("/ws/v1/cluster") public class RMWebServices { + private static final Log LOG = + LogFactory.getLog(RMWebServices.class.getName()); private static final String EMPTY = ""; private static final String ANY = "*"; private final ResourceManager rm; @@ -253,7 +262,6 @@ public class RMWebServices { @QueryParam("finishedTimeBegin") String finishBegin, @QueryParam("finishedTimeEnd") String finishEnd, @QueryParam("applicationTypes") Set applicationTypes) { - long num = 0; boolean checkCount = false; boolean checkStart = false; boolean checkEnd = false; @@ -328,19 +336,66 @@ public class RMWebServices { checkAppStates = true; } - final ConcurrentMap apps = rm.getRMContext() - .getRMApps(); + GetApplicationsRequest request = GetApplicationsRequest.newInstance(); + + if (checkStart) { + request.setStartRange(sBegin, sEnd); + } + + if (checkEnd) { + request.setFinishRange(fBegin, fEnd); + } + + if (checkCount) { + request.setLimit(countNum); + } + + if (checkAppTypes) { + request.setApplicationTypes(appTypes); + } + + if (checkAppStates) { + request.setApplicationStates(appStates); + } + + if (queueQuery != null && !queueQuery.isEmpty()) { + ResourceScheduler rs = rm.getResourceScheduler(); + if (rs instanceof CapacityScheduler) { + CapacityScheduler cs = (CapacityScheduler) rs; + // validate queue exists + try { + cs.getQueueInfo(queueQuery, false, false); + } catch (IOException e) { + throw new BadRequestException(e.getMessage()); + } + } + Set queues = new HashSet(1); + queues.add(queueQuery); + request.setQueues(queues); + } + + if (userQuery != null && !userQuery.isEmpty()) { + Set users = new HashSet(1); + users.add(userQuery); + request.setUsers(users); + } + + List appReports = null; + try { + appReports = rm.getClientRMService() + .getApplications(request, false).getApplicationList(); + } catch (YarnException e) { + LOG.error("Unable to retrieve apps from ClientRMService", e); + throw new YarnRuntimeException( + "Unable to retrieve apps from ClientRMService", e); + } + + final ConcurrentMap apps = + rm.getRMContext().getRMApps(); AppsInfo allApps = new AppsInfo(); - for (RMApp rmapp : apps.values()) { + for (ApplicationReport report : appReports) { + RMApp rmapp = apps.get(report.getApplicationId()); - if (checkCount && num == countNum) { - break; - } - - if (checkAppStates && !appStates.contains( - rmapp.createApplicationState().toString().toLowerCase())) { - continue; - } if (finalStatusQuery != null && !finalStatusQuery.isEmpty()) { FinalApplicationStatus.valueOf(finalStatusQuery); if (!rmapp.getFinalApplicationStatus().toString() @@ -348,43 +403,9 @@ public class RMWebServices { continue; } } - if (userQuery != null && !userQuery.isEmpty()) { - if (!rmapp.getUser().equals(userQuery)) { - continue; - } - } - if (queueQuery != null && !queueQuery.isEmpty()) { - ResourceScheduler rs = rm.getResourceScheduler(); - if (rs instanceof CapacityScheduler) { - CapacityScheduler cs = (CapacityScheduler) rs; - // validate queue exists - try { - cs.getQueueInfo(queueQuery, false, false); - } catch (IOException e) { - throw new BadRequestException(e.getMessage()); - } - } - if (!rmapp.getQueue().equals(queueQuery)) { - continue; - } - } - if (checkAppTypes && !appTypes.contains( - rmapp.getApplicationType().trim().toLowerCase())) { - continue; - } - if (checkStart - && (rmapp.getStartTime() < sBegin || rmapp.getStartTime() > sEnd)) { - continue; - } - if (checkEnd - && (rmapp.getFinishTime() < fBegin || rmapp.getFinishTime() > fEnd)) { - continue; - } AppInfo app = new AppInfo(rmapp, hasAccess(rmapp, hsr)); - allApps.add(app); - num++; } return allApps; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java index 9ec82c4b7dd..491454a53ee 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.server.resourcemanager; +import static org.junit.Assert.assertEquals; import static org.mockito.Matchers.anyBoolean; import static org.mockito.Matchers.anyString; import static org.mockito.Matchers.any; @@ -50,7 +51,6 @@ import org.apache.hadoop.yarn.api.ApplicationClientProtocol; import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest; -import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoResponse; @@ -409,6 +409,89 @@ public class TestClientRMService { getAllApplicationsResponse.getApplicationList() .get(0).getApplicationId()); } + + @Test + public void testGetApplications() throws IOException, YarnException { + /** + * 1. Submit 3 applications alternately in two queues + * 2. Test each of the filters + */ + // Basic setup + YarnScheduler yarnScheduler = mockYarnScheduler(); + RMContext rmContext = mock(RMContext.class); + mockRMContext(yarnScheduler, rmContext); + RMStateStore stateStore = mock(RMStateStore.class); + when(rmContext.getStateStore()).thenReturn(stateStore); + RMAppManager appManager = new RMAppManager(rmContext, yarnScheduler, + null, mock(ApplicationACLsManager.class), new Configuration()); + when(rmContext.getDispatcher().getEventHandler()).thenReturn( + new EventHandler() { + public void handle(Event event) {} + }); + + ApplicationACLsManager mockAclsManager = mock(ApplicationACLsManager.class); + QueueACLsManager mockQueueACLsManager = mock(QueueACLsManager.class); + when(mockQueueACLsManager.checkAccess(any(UserGroupInformation.class), + any(QueueACL.class), anyString())).thenReturn(true); + ClientRMService rmService = + new ClientRMService(rmContext, yarnScheduler, appManager, + mockAclsManager, mockQueueACLsManager, null); + + // Initialize appnames and queues + String[] queues = {"Q-1", "Q-2"}; + String[] appNames = + {MockApps.newAppName(), MockApps.newAppName(), MockApps.newAppName()}; + ApplicationId[] appIds = + {getApplicationId(101), getApplicationId(102), getApplicationId(103)}; + + // Submit applications + for (int i = 0; i < appIds.length; i++) { + ApplicationId appId = appIds[i]; + when(mockAclsManager.checkAccess(UserGroupInformation.getCurrentUser(), + ApplicationAccessType.VIEW_APP, null, appId)).thenReturn(true); + SubmitApplicationRequest submitRequest = mockSubmitAppRequest( + appId, appNames[i], queues[i % queues.length]); + rmService.submitApplication(submitRequest); + } + + // Test different cases of ClientRMService#getApplications() + GetApplicationsRequest request = GetApplicationsRequest.newInstance(); + assertEquals("Incorrect total number of apps", 6, + rmService.getApplications(request).getApplicationList().size()); + + // Check limit + request.setLimit(1L); + assertEquals("Failed to limit applications", 1, + rmService.getApplications(request).getApplicationList().size()); + + // Check queue + request = GetApplicationsRequest.newInstance(); + Set queueSet = new HashSet(); + request.setQueues(queueSet); + + queueSet.add(queues[0]); + assertEquals("Incorrect number of applications in queue", 2, + rmService.getApplications(request).getApplicationList().size()); + assertEquals("Incorrect number of applications in queue", 2, + rmService.getApplications(request, false).getApplicationList().size()); + + queueSet.add(queues[1]); + assertEquals("Incorrect number of applications in queue", 3, + rmService.getApplications(request).getApplicationList().size()); + + // Check user + request = GetApplicationsRequest.newInstance(); + Set userSet = new HashSet(); + request.setUsers(userSet); + + userSet.add("random-user-name"); + assertEquals("Incorrect number of applications for user", 0, + rmService.getApplications(request).getApplicationList().size()); + + userSet.add(UserGroupInformation.getCurrentUser().getShortUserName()); + assertEquals("Incorrect number of applications for user", 3, + rmService.getApplications(request).getApplicationList().size()); + } @Test(timeout=4000) public void testConcurrentAppSubmit() @@ -492,10 +575,10 @@ public class TestClientRMService { submissionContext.setResource(resource); submissionContext.setApplicationType(appType); - SubmitApplicationRequest submitRequest = - recordFactory.newRecordInstance(SubmitApplicationRequest.class); - submitRequest.setApplicationSubmissionContext(submissionContext); - return submitRequest; + SubmitApplicationRequest submitRequest = + recordFactory.newRecordInstance(SubmitApplicationRequest.class); + submitRequest.setApplicationSubmissionContext(submissionContext); + return submitRequest; } private void mockRMContext(YarnScheduler yarnScheduler, RMContext rmContext) From f10642a8fd9b75385f7814d84557564d9944a2e9 Mon Sep 17 00:00:00 2001 From: Jing Zhao Date: Tue, 12 Nov 2013 00:21:57 +0000 Subject: [PATCH 6/6] HDFS-5488. Clean up TestHftpURLTimeout. Contributed by Haohui Mai. git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1540894 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 2 + .../hadoop/hdfs/web/TestHftpFileSystem.java | 126 +++++++--------- .../hadoop/hdfs/web/TestHftpURLTimeouts.java | 140 ------------------ 3 files changed, 58 insertions(+), 210 deletions(-) delete mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestHftpURLTimeouts.java diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 1dcc8b2bc4b..d61ef530991 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -524,6 +524,8 @@ Release 2.3.0 - UNRELEASED HDFS-5325. Remove WebHdfsFileSystem#ConnRunner. (Haohui Mai via jing9) + HDFS-5488. Clean up TestHftpURLTimeout. (Haohui Mai via jing9) + Release 2.2.1 - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestHftpFileSystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestHftpFileSystem.java index 0bece3286fb..1bd3c3974f7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestHftpFileSystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestHftpFileSystem.java @@ -28,6 +28,7 @@ import java.net.HttpURLConnection; import java.net.URI; import java.net.URISyntaxException; import java.net.URL; +import java.net.URLConnection; import java.util.Random; import org.apache.commons.logging.impl.Log4JLogger; @@ -40,15 +41,16 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.MiniDFSCluster; -import org.apache.hadoop.hdfs.MiniDFSCluster.Builder; import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; -import org.apache.hadoop.hdfs.web.HftpFileSystem; -import org.apache.hadoop.hdfs.web.HsftpFileSystem; import org.apache.hadoop.util.ServletUtil; import org.apache.log4j.Level; -import org.junit.*; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; public class TestHftpFileSystem { private static final Random RAN = new Random(); @@ -65,32 +67,24 @@ public class TestHftpFileSystem { new Path("/foo;bar"), // URI does not encode, Request#getPathInfo returns verbatim - new Path("/foo+"), - new Path("/foo+bar/foo+bar"), - new Path("/foo=bar/foo=bar"), - new Path("/foo,bar/foo,bar"), - new Path("/foo@bar/foo@bar"), - new Path("/foo&bar/foo&bar"), - new Path("/foo$bar/foo$bar"), - new Path("/foo_bar/foo_bar"), - new Path("/foo~bar/foo~bar"), - new Path("/foo.bar/foo.bar"), - new Path("/foo../bar/foo../bar"), - new Path("/foo.../bar/foo.../bar"), + new Path("/foo+"), new Path("/foo+bar/foo+bar"), + new Path("/foo=bar/foo=bar"), new Path("/foo,bar/foo,bar"), + new Path("/foo@bar/foo@bar"), new Path("/foo&bar/foo&bar"), + new Path("/foo$bar/foo$bar"), new Path("/foo_bar/foo_bar"), + new Path("/foo~bar/foo~bar"), new Path("/foo.bar/foo.bar"), + new Path("/foo../bar/foo../bar"), new Path("/foo.../bar/foo.../bar"), new Path("/foo'bar/foo'bar"), new Path("/foo#bar/foo#bar"), new Path("/foo!bar/foo!bar"), // HDFS file names may not contain ":" // URI percent encodes, Request#getPathInfo decodes - new Path("/foo bar/foo bar"), - new Path("/foo?bar/foo?bar"), - new Path("/foo\">bar/foo\">bar"), - }; + new Path("/foo bar/foo bar"), new Path("/foo?bar/foo?bar"), + new Path("/foo\">bar/foo\">bar"), }; @BeforeClass public static void setUp() throws IOException { - ((Log4JLogger)HftpFileSystem.LOG).getLogger().setLevel(Level.ALL); + ((Log4JLogger) HftpFileSystem.LOG).getLogger().setLevel(Level.ALL); final long seed = RAN.nextLong(); System.out.println("seed=" + seed); @@ -99,8 +93,8 @@ public class TestHftpFileSystem { config = new Configuration(); cluster = new MiniDFSCluster.Builder(config).numDataNodes(2).build(); blockPoolId = cluster.getNamesystem().getBlockPoolId(); - hftpUri = - "hftp://" + config.get(DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY); + hftpUri = "hftp://" + + config.get(DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY); } @AfterClass @@ -140,7 +134,8 @@ public class TestHftpFileSystem { // Check the file status matches the path. Hftp returns a FileStatus // with the entire URI, extract the path part. - assertEquals(p, new Path(hftpFs.getFileStatus(p).getPath().toUri().getPath())); + assertEquals(p, new Path(hftpFs.getFileStatus(p).getPath().toUri() + .getPath())); // Test list status (listPath servlet) assertEquals(1, hftpFs.listStatus(p).length); @@ -158,21 +153,20 @@ public class TestHftpFileSystem { if (hdfs.exists(path)) { hdfs.delete(path, true); } - FSDataOutputStream out = hdfs.create(path, (short)1); + FSDataOutputStream out = hdfs.create(path, (short) 1); out.writeBytes("0123456789"); out.close(); // Get the path's block location so we can determine // if we were redirected to the right DN. - BlockLocation[] locations = - hdfs.getFileBlockLocations(path, 0, 10); + BlockLocation[] locations = hdfs.getFileBlockLocations(path, 0, 10); String xferAddr = locations[0].getNames()[0]; // Connect to the NN to get redirected URL u = hftpFs.getNamenodeURL( "/data" + ServletUtil.encodePath(path.toUri().getPath()), "ugi=userx,groupy"); - HttpURLConnection conn = (HttpURLConnection)u.openConnection(); + HttpURLConnection conn = (HttpURLConnection) u.openConnection(); HttpURLConnection.setFollowRedirects(true); conn.connect(); conn.getInputStream(); @@ -181,15 +175,15 @@ public class TestHftpFileSystem { // Find the datanode that has the block according to locations // and check that the URL was redirected to this DN's info port for (DataNode node : cluster.getDataNodes()) { - DatanodeRegistration dnR = - DataNodeTestUtils.getDNRegistrationForBP(node, blockPoolId); + DatanodeRegistration dnR = DataNodeTestUtils.getDNRegistrationForBP(node, + blockPoolId); if (dnR.getXferAddr().equals(xferAddr)) { checked = true; assertEquals(dnR.getInfoPort(), conn.getURL().getPort()); } } - assertTrue("The test never checked that location of " + - "the block and hftp desitnation are the same", checked); + assertTrue("The test never checked that location of " + + "the block and hftp desitnation are the same", checked); } /** @@ -260,7 +254,7 @@ public class TestHftpFileSystem { os.writeBytes("0123456789"); os.close(); - // ByteRangeInputStream delays opens until reads. Make sure it doesn't + // ByteRangeInputStream delays opens until reads. Make sure it doesn't // open a closed stream that has never been opened FSDataInputStream in = hftpFs.open(testFile); in.close(); @@ -298,16 +292,15 @@ public class TestHftpFileSystem { URI uri = URI.create("hftp://localhost"); HftpFileSystem fs = (HftpFileSystem) FileSystem.get(uri, conf); - assertEquals(DFSConfigKeys.DFS_NAMENODE_HTTP_PORT_DEFAULT, fs.getDefaultPort()); + assertEquals(DFSConfigKeys.DFS_NAMENODE_HTTP_PORT_DEFAULT, + fs.getDefaultPort()); assertEquals(uri, fs.getUri()); // HFTP uses http to get the token so canonical service name should // return the http port. - assertEquals( - "127.0.0.1:" + DFSConfigKeys.DFS_NAMENODE_HTTP_PORT_DEFAULT, - fs.getCanonicalServiceName() - ); + assertEquals("127.0.0.1:" + DFSConfigKeys.DFS_NAMENODE_HTTP_PORT_DEFAULT, + fs.getCanonicalServiceName()); } @Test @@ -324,10 +317,7 @@ public class TestHftpFileSystem { // HFTP uses http to get the token so canonical service name should // return the http port. - assertEquals( - "127.0.0.1:123", - fs.getCanonicalServiceName() - ); + assertEquals("127.0.0.1:123", fs.getCanonicalServiceName()); } @Test @@ -336,13 +326,11 @@ public class TestHftpFileSystem { URI uri = URI.create("hftp://localhost:123"); HftpFileSystem fs = (HftpFileSystem) FileSystem.get(uri, conf); - assertEquals(DFSConfigKeys.DFS_NAMENODE_HTTP_PORT_DEFAULT, fs.getDefaultPort()); + assertEquals(DFSConfigKeys.DFS_NAMENODE_HTTP_PORT_DEFAULT, + fs.getDefaultPort()); assertEquals(uri, fs.getUri()); - assertEquals( - "127.0.0.1:123", - fs.getCanonicalServiceName() - ); + assertEquals("127.0.0.1:123", fs.getCanonicalServiceName()); } @Test @@ -356,13 +344,20 @@ public class TestHftpFileSystem { assertEquals(123, fs.getDefaultPort()); assertEquals(uri, fs.getUri()); - assertEquals( - "127.0.0.1:789", - fs.getCanonicalServiceName() - ); + assertEquals("127.0.0.1:789", fs.getCanonicalServiceName()); } - /// + @Test + public void testTimeout() throws IOException { + Configuration conf = new Configuration(); + URI uri = URI.create("hftp://localhost"); + HftpFileSystem fs = (HftpFileSystem) FileSystem.get(uri, conf); + URLConnection conn = fs.connectionFactory.openConnection(new URL("http://localhost")); + assertEquals(URLConnectionFactory.DEFAULT_SOCKET_TIMEOUT, conn.getConnectTimeout()); + assertEquals(URLConnectionFactory.DEFAULT_SOCKET_TIMEOUT, conn.getReadTimeout()); + } + + // / @Test public void testHsftpDefaultPorts() throws IOException { @@ -370,13 +365,12 @@ public class TestHftpFileSystem { URI uri = URI.create("hsftp://localhost"); HsftpFileSystem fs = (HsftpFileSystem) FileSystem.get(uri, conf); - assertEquals(DFSConfigKeys.DFS_NAMENODE_HTTPS_PORT_DEFAULT, fs.getDefaultPort()); + assertEquals(DFSConfigKeys.DFS_NAMENODE_HTTPS_PORT_DEFAULT, + fs.getDefaultPort()); assertEquals(uri, fs.getUri()); - assertEquals( - "127.0.0.1:"+DFSConfigKeys.DFS_NAMENODE_HTTPS_PORT_DEFAULT, - fs.getCanonicalServiceName() - ); + assertEquals("127.0.0.1:" + DFSConfigKeys.DFS_NAMENODE_HTTPS_PORT_DEFAULT, + fs.getCanonicalServiceName()); } @Test @@ -391,10 +385,7 @@ public class TestHftpFileSystem { assertEquals(456, fs.getDefaultPort()); assertEquals(uri, fs.getUri()); - assertEquals( - "127.0.0.1:456", - fs.getCanonicalServiceName() - ); + assertEquals("127.0.0.1:456", fs.getCanonicalServiceName()); } @Test @@ -403,13 +394,11 @@ public class TestHftpFileSystem { URI uri = URI.create("hsftp://localhost:123"); HsftpFileSystem fs = (HsftpFileSystem) FileSystem.get(uri, conf); - assertEquals(DFSConfigKeys.DFS_NAMENODE_HTTPS_PORT_DEFAULT, fs.getDefaultPort()); + assertEquals(DFSConfigKeys.DFS_NAMENODE_HTTPS_PORT_DEFAULT, + fs.getDefaultPort()); assertEquals(uri, fs.getUri()); - assertEquals( - "127.0.0.1:123", - fs.getCanonicalServiceName() - ); + assertEquals("127.0.0.1:123", fs.getCanonicalServiceName()); } @Test @@ -424,9 +413,6 @@ public class TestHftpFileSystem { assertEquals(456, fs.getDefaultPort()); assertEquals(uri, fs.getUri()); - assertEquals( - "127.0.0.1:789", - fs.getCanonicalServiceName() - ); + assertEquals("127.0.0.1:789", fs.getCanonicalServiceName()); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestHftpURLTimeouts.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestHftpURLTimeouts.java deleted file mode 100644 index a201ffe1c50..00000000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestHftpURLTimeouts.java +++ /dev/null @@ -1,140 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hdfs.web; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertTrue; - -import java.io.IOException; -import java.net.HttpURLConnection; -import java.net.InetAddress; -import java.net.ServerSocket; -import java.net.SocketTimeoutException; -import java.net.URI; -import java.util.LinkedList; -import java.util.List; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.hdfs.web.HftpFileSystem; -import org.apache.hadoop.hdfs.web.HsftpFileSystem; -import org.apache.hadoop.hdfs.web.URLConnectionFactory; -import org.junit.Test; - -public class TestHftpURLTimeouts { - - @Test - public void testHftpSocketTimeout() throws Exception { - Configuration conf = new Configuration(); - ServerSocket socket = new ServerSocket(0,1); - URI uri = new URI("hftp", null, - InetAddress.getByName(null).getHostAddress(), - socket.getLocalPort(), - null, null, null); - - HftpFileSystem fs = (HftpFileSystem)FileSystem.get(uri, conf); - fs.connectionFactory = new URLConnectionFactory(5); - - boolean timedout = false; - try { - HttpURLConnection conn = fs.openConnection("/", ""); - timedout = false; - try { - // this will consume the only slot in the backlog - conn.getInputStream(); - } catch (SocketTimeoutException ste) { - timedout = true; - assertEquals("Read timed out", ste.getMessage()); - } finally { - if (conn != null) conn.disconnect(); - } - assertTrue("read timedout", timedout); - assertTrue("connect timedout", checkConnectTimeout(fs, false)); - } finally { - fs.connectionFactory = URLConnectionFactory.DEFAULT_CONNECTION_FACTORY; - fs.close(); - } - } - - @Test - public void testHsftpSocketTimeout() throws Exception { - Configuration conf = new Configuration(); - ServerSocket socket = new ServerSocket(0,1); - URI uri = new URI("hsftp", null, - InetAddress.getByName(null).getHostAddress(), - socket.getLocalPort(), - null, null, null); - boolean timedout = false; - - HsftpFileSystem fs = (HsftpFileSystem)FileSystem.get(uri, conf); - fs.connectionFactory = new URLConnectionFactory(5); - - try { - HttpURLConnection conn = null; - timedout = false; - try { - // this will consume the only slot in the backlog - conn = fs.openConnection("/", ""); - } catch (SocketTimeoutException ste) { - // SSL expects a negotiation, so it will timeout on read, unlike hftp - timedout = true; - assertEquals("Read timed out", ste.getMessage()); - } finally { - if (conn != null) conn.disconnect(); - } - assertTrue("ssl read connect timedout", timedout); - assertTrue("connect timedout", checkConnectTimeout(fs, true)); - } finally { - fs.connectionFactory = URLConnectionFactory.DEFAULT_CONNECTION_FACTORY; - fs.close(); - } - } - - private boolean checkConnectTimeout(HftpFileSystem fs, boolean ignoreReadTimeout) - throws IOException { - boolean timedout = false; - List conns = new LinkedList(); - try { - // with a listen backlog of 1, should only have to make one connection - // to trigger a connection timeout. however... linux doesn't honor the - // socket's listen backlog so we have to try a bunch of times - for (int n=32; !timedout && n > 0; n--) { - try { - conns.add(fs.openConnection("/", "")); - } catch (SocketTimeoutException ste) { - String message = ste.getMessage(); - assertNotNull(message); - // https will get a read timeout due to SSL negotiation, but - // a normal http will not, so need to ignore SSL read timeouts - // until a connect timeout occurs - if (!(ignoreReadTimeout && "Read timed out".equals(message))) { - timedout = true; - assertEquals("connect timed out", message); - } - } - } - } finally { - for (HttpURLConnection conn : conns) { - conn.disconnect(); - } - } - return timedout; - } -}