From 551024915d487957d9e829493ab319c8e31dfa81 Mon Sep 17 00:00:00 2001 From: Daryn Sharp Date: Fri, 18 Jul 2014 17:58:07 +0000 Subject: [PATCH 1/7] HDFS-6599. 2.4 addBlock is 10 to 20 times slower compared to 0.23 (daryn) git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1611737 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 2 + .../hadoop/hdfs/protocol/DatanodeInfo.java | 2 +- .../BlockPlacementPolicyDefault.java | 14 +- .../blockmanagement/DatanodeManager.java | 4 +- .../blockmanagement/DatanodeStatistics.java | 6 + .../blockmanagement/HeartbeatManager.java | 21 +- .../hdfs/server/namenode/FSClusterStats.java | 9 + .../hdfs/server/namenode/FSNamesystem.java | 13 +- .../TestReplicationPolicyConsiderLoad.java | 24 ++- .../namenode/TestNamenodeCapacityReport.java | 186 +++++++++++++++++- 10 files changed, 259 insertions(+), 22 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 6c815fc30a0..9433d62a8ca 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -595,6 +595,8 @@ Release 2.5.0 - UNRELEASED HDFS-6583. Remove clientNode in FileUnderConstructionFeature. (wheat9) + HDFS-6599. 2.4 addBlock is 10 to 20 times slower compared to 0.23 (daryn) + BUG FIXES HDFS-6112. NFS Gateway docs are incorrect for allowed hosts configuration. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeInfo.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeInfo.java index cb0f081b99f..9fcada734ae 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeInfo.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeInfo.java @@ -339,7 +339,7 @@ public class DatanodeInfo extends DatanodeID implements Node { buffer.append("Cache Remaining: " +cr+ " ("+StringUtils.byteDesc(cr)+")"+"\n"); buffer.append("Cache Used%: "+percent2String(cacheUsedPercent) + "\n"); buffer.append("Cache Remaining%: "+percent2String(cacheRemainingPercent) + "\n"); - + buffer.append("Xceivers: "+getXceiverCount()+"\n"); buffer.append("Last contact: "+new Date(lastUpdate)+"\n"); return buffer.toString(); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java index 35333363ece..accdddf5250 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java @@ -636,15 +636,11 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy { // check the communication traffic of the target machine if (considerLoad) { - double avgLoad = 0; - if (stats != null) { - int size = stats.getNumDatanodesInService(); - if (size != 0) { - avgLoad = (double)stats.getTotalLoad()/size; - } - } - if (node.getXceiverCount() > (2.0 * avgLoad)) { - logNodeIsNotChosen(storage, "the node is too busy "); + final double maxLoad = 2.0 * stats.getInServiceXceiverAverage(); + final int nodeLoad = node.getXceiverCount(); + if (nodeLoad > maxLoad) { + logNodeIsNotChosen(storage, + "the node is too busy (load:"+nodeLoad+" > "+maxLoad+") "); return false; } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java index aea34ecbc90..791c6dff5be 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java @@ -820,7 +820,9 @@ public class DatanodeManager { } /** Start decommissioning the specified datanode. */ - private void startDecommission(DatanodeDescriptor node) { + @InterfaceAudience.Private + @VisibleForTesting + public void startDecommission(DatanodeDescriptor node) { if (!node.isDecommissionInProgress() && !node.isDecommissioned()) { for (DatanodeStorageInfo storage : node.getStorageInfos()) { LOG.info("Start Decommissioning " + node + " " + storage diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStatistics.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStatistics.java index 9ccc5b1ff4a..c9bc3e5ea67 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStatistics.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStatistics.java @@ -52,6 +52,12 @@ public interface DatanodeStatistics { /** @return the xceiver count */ public int getXceiverCount(); + /** @return average xceiver count for non-decommission(ing|ed) nodes */ + public int getInServiceXceiverCount(); + + /** @return number of non-decommission(ing|ed) nodes */ + public int getNumDatanodesInService(); + /** * @return the total used space by data nodes for non-DFS purposes * such as storing temporary files on the local file system diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java index e7017a30a93..901f7e3653d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java @@ -150,6 +150,16 @@ class HeartbeatManager implements DatanodeStatistics { return stats.xceiverCount; } + @Override + public synchronized int getInServiceXceiverCount() { + return stats.nodesInServiceXceiverCount; + } + + @Override + public synchronized int getNumDatanodesInService() { + return stats.nodesInService; + } + @Override public synchronized long getCacheCapacity() { return stats.cacheCapacity; @@ -178,7 +188,7 @@ class HeartbeatManager implements DatanodeStatistics { } synchronized void register(final DatanodeDescriptor d) { - if (!datanodes.contains(d)) { + if (!d.isAlive) { addDatanode(d); //update its timestamp @@ -191,6 +201,8 @@ class HeartbeatManager implements DatanodeStatistics { } synchronized void addDatanode(final DatanodeDescriptor d) { + // update in-service node count + stats.add(d); datanodes.add(d); d.isAlive = true; } @@ -323,6 +335,9 @@ class HeartbeatManager implements DatanodeStatistics { private long cacheCapacity = 0L; private long cacheUsed = 0L; + private int nodesInService = 0; + private int nodesInServiceXceiverCount = 0; + private int expiredHeartbeats = 0; private void add(final DatanodeDescriptor node) { @@ -330,6 +345,8 @@ class HeartbeatManager implements DatanodeStatistics { blockPoolUsed += node.getBlockPoolUsed(); xceiverCount += node.getXceiverCount(); if (!(node.isDecommissionInProgress() || node.isDecommissioned())) { + nodesInService++; + nodesInServiceXceiverCount += node.getXceiverCount(); capacityTotal += node.getCapacity(); capacityRemaining += node.getRemaining(); } else { @@ -344,6 +361,8 @@ class HeartbeatManager implements DatanodeStatistics { blockPoolUsed -= node.getBlockPoolUsed(); xceiverCount -= node.getXceiverCount(); if (!(node.isDecommissionInProgress() || node.isDecommissioned())) { + nodesInService--; + nodesInServiceXceiverCount -= node.getXceiverCount(); capacityTotal -= node.getCapacity(); capacityRemaining -= node.getRemaining(); } else { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSClusterStats.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSClusterStats.java index 676aa0826c0..1a859a7b93e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSClusterStats.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSClusterStats.java @@ -48,6 +48,15 @@ public interface FSClusterStats { * @return Number of datanodes that are both alive and not decommissioned. */ public int getNumDatanodesInService(); + + /** + * an indication of the average load of non-decommission(ing|ed) nodes + * eligible for block placement + * + * @return average of the in service number of block transfers and block + * writes that are currently occurring on the cluster. + */ + public double getInServiceXceiverAverage(); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java index 81db6786dd4..7e0394a022f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java @@ -7320,7 +7320,18 @@ public class FSNamesystem implements Namesystem, FSClusterStats, @Override // FSClusterStats public int getNumDatanodesInService() { - return getNumLiveDataNodes() - getNumDecomLiveDataNodes(); + return datanodeStatistics.getNumDatanodesInService(); + } + + @Override // for block placement strategy + public double getInServiceXceiverAverage() { + double avgLoad = 0; + final int nodes = getNumDatanodesInService(); + if (nodes != 0) { + final int xceivers = datanodeStatistics.getInServiceXceiverCount(); + avgLoad = (double)xceivers/nodes; + } + return avgLoad; } public SnapshotManager getSnapshotManager() { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyConsiderLoad.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyConsiderLoad.java index 73d397760a4..bf972c03ca7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyConsiderLoad.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyConsiderLoad.java @@ -28,6 +28,7 @@ import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys; import org.apache.hadoop.hdfs.server.common.StorageInfo; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NodeType; +import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; import org.apache.hadoop.test.PathUtils; @@ -101,6 +102,7 @@ public class TestReplicationPolicyConsiderLoad { } } + private final double EPSILON = 0.0001; /** * Tests that chooseTarget with considerLoad set to true correctly calculates * load with decommissioned nodes. @@ -109,14 +111,6 @@ public class TestReplicationPolicyConsiderLoad { public void testChooseTargetWithDecomNodes() throws IOException { namenode.getNamesystem().writeLock(); try { - // Decommission DNs so BlockPlacementPolicyDefault.isGoodTarget() - // returns false - for (int i = 0; i < 3; i++) { - DatanodeInfo d = dnManager.getDatanodeByXferAddr( - dnrList.get(i).getIpAddr(), - dnrList.get(i).getXferPort()); - d.setDecommissioned(); - } String blockPoolId = namenode.getNamesystem().getBlockPoolId(); dnManager.handleHeartbeat(dnrList.get(3), BlockManagerTestUtil.getStorageReportsForDatanode(dataNodes[3]), @@ -133,6 +127,20 @@ public class TestReplicationPolicyConsiderLoad { blockPoolId, dataNodes[5].getCacheCapacity(), dataNodes[5].getCacheRemaining(), 4, 0, 0); + // value in the above heartbeats + final int load = 2 + 4 + 4; + + FSNamesystem fsn = namenode.getNamesystem(); + assertEquals((double)load/6, fsn.getInServiceXceiverAverage(), EPSILON); + + // Decommission DNs so BlockPlacementPolicyDefault.isGoodTarget() + // returns false + for (int i = 0; i < 3; i++) { + DatanodeDescriptor d = dnManager.getDatanode(dnrList.get(i)); + dnManager.startDecommission(d); + d.setDecommissioned(); + } + assertEquals((double)load/3, fsn.getInServiceXceiverAverage(), EPSILON); // Call chooseTarget() DatanodeStorageInfo[] targets = namenode.getNamesystem().getBlockManager() diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNamenodeCapacityReport.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNamenodeCapacityReport.java index 2883f99f692..48a4b3c6c76 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNamenodeCapacityReport.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNamenodeCapacityReport.java @@ -18,9 +18,11 @@ package org.apache.hadoop.hdfs.server.namenode; -import static org.junit.Assert.assertTrue; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_BLOCK_WRITE_LOCATEFOLLOWINGBLOCK_RETRIES_KEY; +import static org.junit.Assert.*; import java.io.File; +import java.io.IOException; import java.util.ArrayList; import java.util.List; @@ -28,12 +30,21 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.DF; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.DFSOutputStream; import org.apache.hadoop.hdfs.DFSUtil; +import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil; import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor; import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager; +import org.apache.hadoop.hdfs.server.datanode.DataNode; +import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils; import org.junit.Test; @@ -153,4 +164,177 @@ public class TestNamenodeCapacityReport { if (cluster != null) {cluster.shutdown();} } } + + private static final float EPSILON = 0.0001f; + @Test + public void testXceiverCount() throws Exception { + Configuration conf = new HdfsConfiguration(); + // don't waste time retrying if close fails + conf.setInt(DFS_CLIENT_BLOCK_WRITE_LOCATEFOLLOWINGBLOCK_RETRIES_KEY, 0); + MiniDFSCluster cluster = null; + + final int nodes = 8; + final int fileCount = 5; + final short fileRepl = 3; + + try { + cluster = new MiniDFSCluster.Builder(conf).numDataNodes(nodes).build(); + cluster.waitActive(); + + final FSNamesystem namesystem = cluster.getNamesystem(); + final DatanodeManager dnm = namesystem.getBlockManager().getDatanodeManager(); + List datanodes = cluster.getDataNodes(); + final DistributedFileSystem fs = cluster.getFileSystem(); + + // trigger heartbeats in case not already sent + triggerHeartbeats(datanodes); + + // check that all nodes are live and in service + int expectedTotalLoad = nodes; // xceiver server adds 1 to load + int expectedInServiceNodes = nodes; + int expectedInServiceLoad = nodes; + assertEquals(nodes, namesystem.getNumLiveDataNodes()); + assertEquals(expectedInServiceNodes, namesystem.getNumDatanodesInService()); + assertEquals(expectedTotalLoad, namesystem.getTotalLoad()); + assertEquals((double)expectedInServiceLoad/expectedInServiceLoad, + namesystem.getInServiceXceiverAverage(), EPSILON); + + // shutdown half the nodes and force a heartbeat check to ensure + // counts are accurate + for (int i=0; i < nodes/2; i++) { + DataNode dn = datanodes.get(i); + DatanodeDescriptor dnd = dnm.getDatanode(dn.getDatanodeId()); + dn.shutdown(); + dnd.setLastUpdate(0L); + BlockManagerTestUtil.checkHeartbeat(namesystem.getBlockManager()); + expectedInServiceNodes--; + assertEquals(expectedInServiceNodes, namesystem.getNumLiveDataNodes()); + assertEquals(expectedInServiceNodes, namesystem.getNumDatanodesInService()); + } + + // restart the nodes to verify that counts are correct after + // node re-registration + cluster.restartDataNodes(); + cluster.waitActive(); + datanodes = cluster.getDataNodes(); + expectedInServiceNodes = nodes; + assertEquals(nodes, datanodes.size()); + assertEquals(nodes, namesystem.getNumLiveDataNodes()); + assertEquals(expectedInServiceNodes, namesystem.getNumDatanodesInService()); + assertEquals(expectedTotalLoad, namesystem.getTotalLoad()); + assertEquals((double)expectedInServiceLoad/expectedInServiceLoad, + namesystem.getInServiceXceiverAverage(), EPSILON); + + // create streams and hsync to force datastreamers to start + DFSOutputStream[] streams = new DFSOutputStream[fileCount]; + for (int i=0; i < fileCount; i++) { + streams[i] = (DFSOutputStream)fs.create(new Path("/f"+i), fileRepl) + .getWrappedStream(); + streams[i].write("1".getBytes()); + streams[i].hsync(); + // the load for writers is 2 because both the write xceiver & packet + // responder threads are counted in the load + expectedTotalLoad += 2*fileRepl; + expectedInServiceLoad += 2*fileRepl; + } + // force nodes to send load update + triggerHeartbeats(datanodes); + assertEquals(nodes, namesystem.getNumLiveDataNodes()); + assertEquals(expectedInServiceNodes, + namesystem.getNumDatanodesInService()); + assertEquals(expectedTotalLoad, namesystem.getTotalLoad()); + assertEquals((double)expectedInServiceLoad/expectedInServiceNodes, + namesystem.getInServiceXceiverAverage(), EPSILON); + + // decomm a few nodes, substract their load from the expected load, + // trigger heartbeat to force load update + for (int i=0; i < fileRepl; i++) { + expectedInServiceNodes--; + DatanodeDescriptor dnd = + dnm.getDatanode(datanodes.get(i).getDatanodeId()); + expectedInServiceLoad -= dnd.getXceiverCount(); + dnm.startDecommission(dnd); + DataNodeTestUtils.triggerHeartbeat(datanodes.get(i)); + Thread.sleep(100); + assertEquals(nodes, namesystem.getNumLiveDataNodes()); + assertEquals(expectedInServiceNodes, + namesystem.getNumDatanodesInService()); + assertEquals(expectedTotalLoad, namesystem.getTotalLoad()); + assertEquals((double)expectedInServiceLoad/expectedInServiceNodes, + namesystem.getInServiceXceiverAverage(), EPSILON); + } + + // check expected load while closing each stream. recalc expected + // load based on whether the nodes in the pipeline are decomm + for (int i=0; i < fileCount; i++) { + int decomm = 0; + for (DatanodeInfo dni : streams[i].getPipeline()) { + DatanodeDescriptor dnd = dnm.getDatanode(dni); + expectedTotalLoad -= 2; + if (dnd.isDecommissionInProgress() || dnd.isDecommissioned()) { + decomm++; + } else { + expectedInServiceLoad -= 2; + } + } + try { + streams[i].close(); + } catch (IOException ioe) { + // nodes will go decommissioned even if there's a UC block whose + // other locations are decommissioned too. we'll ignore that + // bug for now + if (decomm < fileRepl) { + throw ioe; + } + } + triggerHeartbeats(datanodes); + // verify node count and loads + assertEquals(nodes, namesystem.getNumLiveDataNodes()); + assertEquals(expectedInServiceNodes, + namesystem.getNumDatanodesInService()); + assertEquals(expectedTotalLoad, namesystem.getTotalLoad()); + assertEquals((double)expectedInServiceLoad/expectedInServiceNodes, + namesystem.getInServiceXceiverAverage(), EPSILON); + } + + // shutdown each node, verify node counts based on decomm state + for (int i=0; i < nodes; i++) { + DataNode dn = datanodes.get(i); + dn.shutdown(); + // force it to appear dead so live count decreases + DatanodeDescriptor dnDesc = dnm.getDatanode(dn.getDatanodeId()); + dnDesc.setLastUpdate(0L); + BlockManagerTestUtil.checkHeartbeat(namesystem.getBlockManager()); + assertEquals(nodes-1-i, namesystem.getNumLiveDataNodes()); + // first few nodes are already out of service + if (i >= fileRepl) { + expectedInServiceNodes--; + } + assertEquals(expectedInServiceNodes, namesystem.getNumDatanodesInService()); + + // live nodes always report load of 1. no nodes is load 0 + double expectedXceiverAvg = (i == nodes-1) ? 0.0 : 1.0; + assertEquals((double)expectedXceiverAvg, + namesystem.getInServiceXceiverAverage(), EPSILON); + } + + // final sanity check + assertEquals(0, namesystem.getNumLiveDataNodes()); + assertEquals(0, namesystem.getNumDatanodesInService()); + assertEquals(0.0, namesystem.getTotalLoad(), EPSILON); + assertEquals(0.0, namesystem.getInServiceXceiverAverage(), EPSILON); + } finally { + if (cluster != null) { + cluster.shutdown(); + } + } + } + + private void triggerHeartbeats(List datanodes) + throws IOException, InterruptedException { + for (DataNode dn : datanodes) { + DataNodeTestUtils.triggerHeartbeat(dn); + } + Thread.sleep(100); + } } From 7c18f8d55b899dc4a6e118d3b54447a9b36b960a Mon Sep 17 00:00:00 2001 From: Tsz-wo Sze Date: Fri, 18 Jul 2014 18:20:20 +0000 Subject: [PATCH 2/7] HDFS-6616. Add exclude-datanodes feature to WebHDFS redirection so that it will not redirect retries to the same datanode. Contributed by zhaoyunjiong git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1611750 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 3 + .../web/resources/NamenodeWebHdfsMethods.java | 117 ++++++++++++------ .../hadoop/hdfs/web/WebHdfsFileSystem.java | 29 ++++- .../web/resources/ExcludeDatanodesParam.java | 42 +++++++ .../resources/TestWebHdfsDataLocality.java | 89 ++++++++++++- 5 files changed, 237 insertions(+), 43 deletions(-) create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/ExcludeDatanodesParam.java diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 9433d62a8ca..37878727536 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -304,6 +304,9 @@ Release 2.6.0 - UNRELEASED HDFS-6700. BlockPlacementPolicy shoud choose storage but not datanode for deletion. (szetszwo) + HDFS-6616. Add exclude-datanodes feature to WebHDFS redirection so that it + will not redirect retries to the same datanode. (zhaoyunjiong via szetszwo) + OPTIMIZATIONS HDFS-6690. Deduplicate xattr names in memory. (wang) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/web/resources/NamenodeWebHdfsMethods.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/web/resources/NamenodeWebHdfsMethods.java index 2871c987622..92a58f9822e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/web/resources/NamenodeWebHdfsMethods.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/web/resources/NamenodeWebHdfsMethods.java @@ -28,6 +28,7 @@ import java.net.URISyntaxException; import java.security.PrivilegedExceptionAction; import java.util.ArrayList; import java.util.EnumSet; +import java.util.HashSet; import java.util.List; import javax.servlet.ServletContext; @@ -84,6 +85,7 @@ import org.apache.hadoop.hdfs.web.resources.DelegationParam; import org.apache.hadoop.hdfs.web.resources.DeleteOpParam; import org.apache.hadoop.hdfs.web.resources.DestinationParam; import org.apache.hadoop.hdfs.web.resources.DoAsParam; +import org.apache.hadoop.hdfs.web.resources.ExcludeDatanodesParam; import org.apache.hadoop.hdfs.web.resources.GetOpParam; import org.apache.hadoop.hdfs.web.resources.GroupParam; import org.apache.hadoop.hdfs.web.resources.HttpOpParam; @@ -113,11 +115,13 @@ import org.apache.hadoop.hdfs.web.resources.XAttrValueParam; import org.apache.hadoop.io.Text; import org.apache.hadoop.ipc.Server; import org.apache.hadoop.net.NetworkTopology.InvalidTopologyException; +import org.apache.hadoop.net.Node; import org.apache.hadoop.net.NodeBase; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.TokenIdentifier; +import org.apache.hadoop.util.StringUtils; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Charsets; @@ -190,12 +194,26 @@ public class NamenodeWebHdfsMethods { } return np; } - + @VisibleForTesting static DatanodeInfo chooseDatanode(final NameNode namenode, final String path, final HttpOpParam.Op op, final long openOffset, - final long blocksize) throws IOException { + final long blocksize, final String excludeDatanodes) throws IOException { final BlockManager bm = namenode.getNamesystem().getBlockManager(); + + HashSet excludes = new HashSet(); + if (excludeDatanodes != null) { + for (String host : StringUtils + .getTrimmedStringCollection(excludeDatanodes)) { + int idx = host.indexOf(":"); + if (idx != -1) { + excludes.add(bm.getDatanodeManager().getDatanodeByXferAddr( + host.substring(0, idx), Integer.parseInt(host.substring(idx + 1)))); + } else { + excludes.add(bm.getDatanodeManager().getDatanodeByHost(host)); + } + } + } if (op == PutOpParam.Op.CREATE) { //choose a datanode near to client @@ -204,7 +222,7 @@ public class NamenodeWebHdfsMethods { if (clientNode != null) { final DatanodeStorageInfo[] storages = bm.getBlockPlacementPolicy() .chooseTarget(path, 1, clientNode, - new ArrayList(), false, null, blocksize, + new ArrayList(), false, excludes, blocksize, // TODO: get storage type from the file StorageType.DEFAULT); if (storages.length > 0) { @@ -233,7 +251,7 @@ public class NamenodeWebHdfsMethods { final LocatedBlocks locations = np.getBlockLocations(path, offset, 1); final int count = locations.locatedBlockCount(); if (count > 0) { - return bestNode(locations.get(0).getLocations()); + return bestNode(locations.get(0).getLocations(), excludes); } } } @@ -247,11 +265,14 @@ public class NamenodeWebHdfsMethods { * sorted based on availability and network distances, thus it is sufficient * to return the first element of the node here. */ - private static DatanodeInfo bestNode(DatanodeInfo[] nodes) throws IOException { - if (nodes.length == 0 || nodes[0].isDecommissioned()) { - throw new IOException("No active nodes contain this block"); + private static DatanodeInfo bestNode(DatanodeInfo[] nodes, + HashSet excludes) throws IOException { + for (DatanodeInfo dn: nodes) { + if (false == dn.isDecommissioned() && false == excludes.contains(dn)) { + return dn; + } } - return nodes[0]; + throw new IOException("No active nodes contain this block"); } private Token generateDelegationToken( @@ -270,11 +291,12 @@ public class NamenodeWebHdfsMethods { final UserGroupInformation ugi, final DelegationParam delegation, final UserParam username, final DoAsParam doAsUser, final String path, final HttpOpParam.Op op, final long openOffset, - final long blocksize, + final long blocksize, final String excludeDatanodes, final Param... parameters) throws URISyntaxException, IOException { final DatanodeInfo dn; try { - dn = chooseDatanode(namenode, path, op, openOffset, blocksize); + dn = chooseDatanode(namenode, path, op, openOffset, blocksize, + excludeDatanodes); } catch (InvalidTopologyException ite) { throw new IOException("Failed to find datanode, suggest to check cluster health.", ite); } @@ -361,13 +383,15 @@ public class NamenodeWebHdfsMethods { @QueryParam(SnapshotNameParam.NAME) @DefaultValue(SnapshotNameParam.DEFAULT) final SnapshotNameParam snapshotName, @QueryParam(OldSnapshotNameParam.NAME) @DefaultValue(OldSnapshotNameParam.DEFAULT) - final OldSnapshotNameParam oldSnapshotName - )throws IOException, InterruptedException { + final OldSnapshotNameParam oldSnapshotName, + @QueryParam(ExcludeDatanodesParam.NAME) @DefaultValue(ExcludeDatanodesParam.DEFAULT) + final ExcludeDatanodesParam excludeDatanodes + ) throws IOException, InterruptedException { return put(ugi, delegation, username, doAsUser, ROOT, op, destination, owner, group, permission, overwrite, bufferSize, replication, blockSize, modificationTime, accessTime, renameOptions, createParent, delegationTokenArgument, aclPermission, xattrName, xattrValue, - xattrSetFlag, snapshotName, oldSnapshotName); + xattrSetFlag, snapshotName, oldSnapshotName, excludeDatanodes); } /** Handle HTTP PUT request. */ @@ -423,14 +447,16 @@ public class NamenodeWebHdfsMethods { @QueryParam(SnapshotNameParam.NAME) @DefaultValue(SnapshotNameParam.DEFAULT) final SnapshotNameParam snapshotName, @QueryParam(OldSnapshotNameParam.NAME) @DefaultValue(OldSnapshotNameParam.DEFAULT) - final OldSnapshotNameParam oldSnapshotName + final OldSnapshotNameParam oldSnapshotName, + @QueryParam(ExcludeDatanodesParam.NAME) @DefaultValue(ExcludeDatanodesParam.DEFAULT) + final ExcludeDatanodesParam excludeDatanodes ) throws IOException, InterruptedException { init(ugi, delegation, username, doAsUser, path, op, destination, owner, group, permission, overwrite, bufferSize, replication, blockSize, modificationTime, accessTime, renameOptions, delegationTokenArgument, aclPermission, xattrName, xattrValue, xattrSetFlag, snapshotName, - oldSnapshotName); + oldSnapshotName, excludeDatanodes); return ugi.doAs(new PrivilegedExceptionAction() { @Override @@ -441,7 +467,7 @@ public class NamenodeWebHdfsMethods { permission, overwrite, bufferSize, replication, blockSize, modificationTime, accessTime, renameOptions, createParent, delegationTokenArgument, aclPermission, xattrName, xattrValue, - xattrSetFlag, snapshotName, oldSnapshotName); + xattrSetFlag, snapshotName, oldSnapshotName, excludeDatanodes); } finally { reset(); } @@ -474,7 +500,8 @@ public class NamenodeWebHdfsMethods { final XAttrValueParam xattrValue, final XAttrSetFlagParam xattrSetFlag, final SnapshotNameParam snapshotName, - final OldSnapshotNameParam oldSnapshotName + final OldSnapshotNameParam oldSnapshotName, + final ExcludeDatanodesParam exclDatanodes ) throws IOException, URISyntaxException { final Configuration conf = (Configuration)context.getAttribute(JspHelper.CURRENT_CONF); @@ -484,9 +511,10 @@ public class NamenodeWebHdfsMethods { switch(op.getValue()) { case CREATE: { - final URI uri = redirectURI(namenode, ugi, delegation, username, doAsUser, - fullpath, op.getValue(), -1L, blockSize.getValue(conf), - permission, overwrite, bufferSize, replication, blockSize); + final URI uri = redirectURI(namenode, ugi, delegation, username, + doAsUser, fullpath, op.getValue(), -1L, blockSize.getValue(conf), + exclDatanodes.getValue(), permission, overwrite, bufferSize, + replication, blockSize); return Response.temporaryRedirect(uri).type(MediaType.APPLICATION_OCTET_STREAM).build(); } case MKDIRS: @@ -619,9 +647,12 @@ public class NamenodeWebHdfsMethods { @QueryParam(ConcatSourcesParam.NAME) @DefaultValue(ConcatSourcesParam.DEFAULT) final ConcatSourcesParam concatSrcs, @QueryParam(BufferSizeParam.NAME) @DefaultValue(BufferSizeParam.DEFAULT) - final BufferSizeParam bufferSize + final BufferSizeParam bufferSize, + @QueryParam(ExcludeDatanodesParam.NAME) @DefaultValue(ExcludeDatanodesParam.DEFAULT) + final ExcludeDatanodesParam excludeDatanodes ) throws IOException, InterruptedException { - return post(ugi, delegation, username, doAsUser, ROOT, op, concatSrcs, bufferSize); + return post(ugi, delegation, username, doAsUser, ROOT, op, concatSrcs, + bufferSize, excludeDatanodes); } /** Handle HTTP POST request. */ @@ -643,17 +674,21 @@ public class NamenodeWebHdfsMethods { @QueryParam(ConcatSourcesParam.NAME) @DefaultValue(ConcatSourcesParam.DEFAULT) final ConcatSourcesParam concatSrcs, @QueryParam(BufferSizeParam.NAME) @DefaultValue(BufferSizeParam.DEFAULT) - final BufferSizeParam bufferSize + final BufferSizeParam bufferSize, + @QueryParam(ExcludeDatanodesParam.NAME) @DefaultValue(ExcludeDatanodesParam.DEFAULT) + final ExcludeDatanodesParam excludeDatanodes ) throws IOException, InterruptedException { - init(ugi, delegation, username, doAsUser, path, op, concatSrcs, bufferSize); + init(ugi, delegation, username, doAsUser, path, op, concatSrcs, bufferSize, + excludeDatanodes); return ugi.doAs(new PrivilegedExceptionAction() { @Override public Response run() throws IOException, URISyntaxException { try { return post(ugi, delegation, username, doAsUser, - path.getAbsolutePath(), op, concatSrcs, bufferSize); + path.getAbsolutePath(), op, concatSrcs, bufferSize, + excludeDatanodes); } finally { reset(); } @@ -669,15 +704,17 @@ public class NamenodeWebHdfsMethods { final String fullpath, final PostOpParam op, final ConcatSourcesParam concatSrcs, - final BufferSizeParam bufferSize + final BufferSizeParam bufferSize, + final ExcludeDatanodesParam excludeDatanodes ) throws IOException, URISyntaxException { final NameNode namenode = (NameNode)context.getAttribute("name.node"); switch(op.getValue()) { case APPEND: { - final URI uri = redirectURI(namenode, ugi, delegation, username, doAsUser, - fullpath, op.getValue(), -1L, -1L, bufferSize); + final URI uri = redirectURI(namenode, ugi, delegation, username, + doAsUser, fullpath, op.getValue(), -1L, -1L, + excludeDatanodes.getValue(), bufferSize); return Response.temporaryRedirect(uri).type(MediaType.APPLICATION_OCTET_STREAM).build(); } case CONCAT: @@ -715,10 +752,12 @@ public class NamenodeWebHdfsMethods { @QueryParam(XAttrNameParam.NAME) @DefaultValue(XAttrNameParam.DEFAULT) final List xattrNames, @QueryParam(XAttrEncodingParam.NAME) @DefaultValue(XAttrEncodingParam.DEFAULT) - final XAttrEncodingParam xattrEncoding + final XAttrEncodingParam xattrEncoding, + @QueryParam(ExcludeDatanodesParam.NAME) @DefaultValue(ExcludeDatanodesParam.DEFAULT) + final ExcludeDatanodesParam excludeDatanodes ) throws IOException, InterruptedException { return get(ugi, delegation, username, doAsUser, ROOT, op, offset, length, - renewer, bufferSize, xattrNames, xattrEncoding); + renewer, bufferSize, xattrNames, xattrEncoding, excludeDatanodes); } /** Handle HTTP GET request. */ @@ -747,11 +786,13 @@ public class NamenodeWebHdfsMethods { @QueryParam(XAttrNameParam.NAME) @DefaultValue(XAttrNameParam.DEFAULT) final List xattrNames, @QueryParam(XAttrEncodingParam.NAME) @DefaultValue(XAttrEncodingParam.DEFAULT) - final XAttrEncodingParam xattrEncoding + final XAttrEncodingParam xattrEncoding, + @QueryParam(ExcludeDatanodesParam.NAME) @DefaultValue(ExcludeDatanodesParam.DEFAULT) + final ExcludeDatanodesParam excludeDatanodes ) throws IOException, InterruptedException { init(ugi, delegation, username, doAsUser, path, op, offset, length, - renewer, bufferSize, xattrEncoding); + renewer, bufferSize, xattrEncoding, excludeDatanodes); return ugi.doAs(new PrivilegedExceptionAction() { @Override @@ -759,7 +800,7 @@ public class NamenodeWebHdfsMethods { try { return get(ugi, delegation, username, doAsUser, path.getAbsolutePath(), op, offset, length, renewer, bufferSize, - xattrNames, xattrEncoding); + xattrNames, xattrEncoding, excludeDatanodes); } finally { reset(); } @@ -779,7 +820,8 @@ public class NamenodeWebHdfsMethods { final RenewerParam renewer, final BufferSizeParam bufferSize, final List xattrNames, - final XAttrEncodingParam xattrEncoding + final XAttrEncodingParam xattrEncoding, + final ExcludeDatanodesParam excludeDatanodes ) throws IOException, URISyntaxException { final NameNode namenode = (NameNode)context.getAttribute("name.node"); final NamenodeProtocols np = getRPCServer(namenode); @@ -787,8 +829,9 @@ public class NamenodeWebHdfsMethods { switch(op.getValue()) { case OPEN: { - final URI uri = redirectURI(namenode, ugi, delegation, username, doAsUser, - fullpath, op.getValue(), offset.getValue(), -1L, offset, length, bufferSize); + final URI uri = redirectURI(namenode, ugi, delegation, username, + doAsUser, fullpath, op.getValue(), offset.getValue(), -1L, + excludeDatanodes.getValue(), offset, length, bufferSize); return Response.temporaryRedirect(uri).type(MediaType.APPLICATION_OCTET_STREAM).build(); } case GET_BLOCK_LOCATIONS: @@ -824,7 +867,7 @@ public class NamenodeWebHdfsMethods { case GETFILECHECKSUM: { final URI uri = redirectURI(namenode, ugi, delegation, username, doAsUser, - fullpath, op.getValue(), -1L, -1L); + fullpath, op.getValue(), -1L, -1L, null); return Response.temporaryRedirect(uri).type(MediaType.APPLICATION_OCTET_STREAM).build(); } case GETDELEGATIONTOKEN: diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java index 6eb09f61340..78062ad0b5f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java @@ -448,6 +448,7 @@ public class WebHdfsFileSystem extends FileSystem protected final HttpOpParam.Op op; private final boolean redirected; + protected ExcludeDatanodesParam excludeDatanodes = new ExcludeDatanodesParam(""); private boolean checkRetry; @@ -499,6 +500,10 @@ public class WebHdfsFileSystem extends FileSystem * a DN such as open and checksum */ private HttpURLConnection connect(URL url) throws IOException { + //redirect hostname and port + String redirectHost = null; + + // resolve redirects for a DN operation unless already resolved if (op.getRedirect() && !redirected) { final HttpOpParam.Op redirectOp = @@ -511,11 +516,24 @@ public class WebHdfsFileSystem extends FileSystem try { validateResponse(redirectOp, conn, false); url = new URL(conn.getHeaderField("Location")); + redirectHost = url.getHost() + ":" + url.getPort(); } finally { conn.disconnect(); } } - return connect(op, url); + try { + return connect(op, url); + } catch (IOException ioe) { + if (redirectHost != null) { + if (excludeDatanodes.getValue() != null) { + excludeDatanodes = new ExcludeDatanodesParam(redirectHost + "," + + excludeDatanodes.getValue()); + } else { + excludeDatanodes = new ExcludeDatanodesParam(redirectHost); + } + } + throw ioe; + } } private HttpURLConnection connect(final HttpOpParam.Op op, final URL url) @@ -652,7 +670,14 @@ public class WebHdfsFileSystem extends FileSystem @Override protected URL getUrl() throws IOException { - return toUrl(op, fspath, parameters); + if (excludeDatanodes.getValue() != null) { + Param[] tmpParam = new Param[parameters.length + 1]; + System.arraycopy(parameters, 0, tmpParam, 0, parameters.length); + tmpParam[parameters.length] = excludeDatanodes; + return toUrl(op, fspath, tmpParam); + } else { + return toUrl(op, fspath, parameters); + } } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/ExcludeDatanodesParam.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/ExcludeDatanodesParam.java new file mode 100644 index 00000000000..3f44fae948a --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/ExcludeDatanodesParam.java @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.web.resources; + + +/** Exclude datanodes param */ +public class ExcludeDatanodesParam extends StringParam { + /** Parameter name. */ + public static final String NAME = "excludedatanodes"; + /** Default parameter value. */ + public static final String DEFAULT = ""; + + private static final Domain DOMAIN = new Domain(NAME, null); + + /** + * Constructor. + * @param str a string representation of the parameter value. + */ + public ExcludeDatanodesParam(final String str) { + super(DOMAIN, str == null || str.equals(DEFAULT)? null: DOMAIN.parse(str)); + } + + @Override + public String getName() { + return NAME; + } +} \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/web/resources/TestWebHdfsDataLocality.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/web/resources/TestWebHdfsDataLocality.java index 9fe3deab1bd..c1169dc3b19 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/web/resources/TestWebHdfsDataLocality.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/web/resources/TestWebHdfsDataLocality.java @@ -92,7 +92,7 @@ public class TestWebHdfsDataLocality { //The chosen datanode must be the same as the client address final DatanodeInfo chosen = NamenodeWebHdfsMethods.chooseDatanode( - namenode, f, PutOpParam.Op.CREATE, -1L, blocksize); + namenode, f, PutOpParam.Op.CREATE, -1L, blocksize, null); Assert.assertEquals(ipAddr, chosen.getIpAddr()); } } @@ -117,23 +117,104 @@ public class TestWebHdfsDataLocality { { //test GETFILECHECKSUM final DatanodeInfo chosen = NamenodeWebHdfsMethods.chooseDatanode( - namenode, f, GetOpParam.Op.GETFILECHECKSUM, -1L, blocksize); + namenode, f, GetOpParam.Op.GETFILECHECKSUM, -1L, blocksize, null); Assert.assertEquals(expected, chosen); } { //test OPEN final DatanodeInfo chosen = NamenodeWebHdfsMethods.chooseDatanode( - namenode, f, GetOpParam.Op.OPEN, 0, blocksize); + namenode, f, GetOpParam.Op.OPEN, 0, blocksize, null); Assert.assertEquals(expected, chosen); } { //test APPEND final DatanodeInfo chosen = NamenodeWebHdfsMethods.chooseDatanode( - namenode, f, PostOpParam.Op.APPEND, -1L, blocksize); + namenode, f, PostOpParam.Op.APPEND, -1L, blocksize, null); Assert.assertEquals(expected, chosen); } } finally { cluster.shutdown(); } } + + @Test + public void testExcludeDataNodes() throws Exception { + final Configuration conf = WebHdfsTestUtil.createConf(); + final String[] racks = {RACK0, RACK0, RACK1, RACK1, RACK2, RACK2}; + final String[] hosts = {"DataNode1", "DataNode2", "DataNode3","DataNode4","DataNode5","DataNode6"}; + final int nDataNodes = hosts.length; + LOG.info("nDataNodes=" + nDataNodes + ", racks=" + Arrays.asList(racks) + + ", hosts=" + Arrays.asList(hosts)); + + final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf) + .hosts(hosts).numDataNodes(nDataNodes).racks(racks).build(); + + try { + cluster.waitActive(); + + final DistributedFileSystem dfs = cluster.getFileSystem(); + final NameNode namenode = cluster.getNameNode(); + final DatanodeManager dm = namenode.getNamesystem().getBlockManager( + ).getDatanodeManager(); + LOG.info("dm=" + dm); + + final long blocksize = DFSConfigKeys.DFS_BLOCK_SIZE_DEFAULT; + final String f = "/foo"; + + //create a file with three replica. + final Path p = new Path(f); + final FSDataOutputStream out = dfs.create(p, (short)3); + out.write(1); + out.close(); + + //get replica location. + final LocatedBlocks locatedblocks = NameNodeAdapter.getBlockLocations( + namenode, f, 0, 1); + final List lb = locatedblocks.getLocatedBlocks(); + Assert.assertEquals(1, lb.size()); + final DatanodeInfo[] locations = lb.get(0).getLocations(); + Assert.assertEquals(3, locations.length); + + + //For GETFILECHECKSUM, OPEN and APPEND, + //the chosen datanode must be different with exclude nodes. + + StringBuffer sb = new StringBuffer(); + for (int i = 0; i < 2; i++) { + sb.append(locations[i].getXferAddr()); + { // test GETFILECHECKSUM + final DatanodeInfo chosen = NamenodeWebHdfsMethods.chooseDatanode( + namenode, f, GetOpParam.Op.GETFILECHECKSUM, -1L, blocksize, + sb.toString()); + for (int j = 0; j <= i; j++) { + Assert.assertNotEquals(locations[j].getHostName(), + chosen.getHostName()); + } + } + + { // test OPEN + final DatanodeInfo chosen = NamenodeWebHdfsMethods.chooseDatanode( + namenode, f, GetOpParam.Op.OPEN, 0, blocksize, sb.toString()); + for (int j = 0; j <= i; j++) { + Assert.assertNotEquals(locations[j].getHostName(), + chosen.getHostName()); + } + } + + { // test APPEND + final DatanodeInfo chosen = NamenodeWebHdfsMethods + .chooseDatanode(namenode, f, PostOpParam.Op.APPEND, -1L, + blocksize, sb.toString()); + for (int j = 0; j <= i; j++) { + Assert.assertNotEquals(locations[j].getHostName(), + chosen.getHostName()); + } + } + + sb.append(","); + } + } finally { + cluster.shutdown(); + } + } } \ No newline at end of file From 64ed72a047a1ff20e07aaf18ebdb5f5d29569829 Mon Sep 17 00:00:00 2001 From: Alejandro Abdelnur Date: Fri, 18 Jul 2014 19:42:01 +0000 Subject: [PATCH 3/7] HADOOP-10817. ProxyUsers configuration should support configurable prefixes. (tucu) git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1611780 13f79535-47bb-0310-9956-ffa450edef68 --- .../hadoop-common/CHANGES.txt | 3 + .../DefaultImpersonationProvider.java | 65 ++++++--- .../authorize/ImpersonationProvider.java | 15 +++ .../hadoop/security/authorize/ProxyUsers.java | 29 +++- .../apache/hadoop/ipc/MiniRPCBenchmark.java | 8 +- .../security/TestDoAsEffectiveUser.java | 23 ++-- .../security/authorize/TestProxyUsers.java | 126 ++++++++++++++---- .../hadoop/hdfs/nfs/nfs3/TestReaddir.java | 8 +- .../hadoop/hdfs/nfs/nfs3/TestWrites.java | 6 +- .../TestDelegationTokenForProxyUser.java | 6 +- .../hdfs/server/common/TestJspHelper.java | 6 +- .../security/TestRefreshUserMappings.java | 6 +- 12 files changed, 227 insertions(+), 74 deletions(-) diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt index bace31168b4..5805d005547 100644 --- a/hadoop-common-project/hadoop-common/CHANGES.txt +++ b/hadoop-common-project/hadoop-common/CHANGES.txt @@ -432,6 +432,9 @@ Release 2.6.0 - UNRELEASED HADOOP-10610. Upgrade S3n s3.fs.buffer.dir to support multi directories. (Ted Malaska via atm) + HADOOP-10817. ProxyUsers configuration should support configurable + prefixes. (tucu) + OPTIMIZATIONS BUG FIXES diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/authorize/DefaultImpersonationProvider.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/authorize/DefaultImpersonationProvider.java index afa46582d32..ab1c390f464 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/authorize/DefaultImpersonationProvider.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/authorize/DefaultImpersonationProvider.java @@ -24,37 +24,64 @@ import java.util.Map; import java.util.Map.Entry; import java.util.regex.Pattern; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.MachineList; import com.google.common.annotations.VisibleForTesting; +@InterfaceStability.Unstable +@InterfaceAudience.Public public class DefaultImpersonationProvider implements ImpersonationProvider { private static final String CONF_HOSTS = ".hosts"; private static final String CONF_USERS = ".users"; private static final String CONF_GROUPS = ".groups"; - private static final String CONF_HADOOP_PROXYUSER = "hadoop.proxyuser."; - private static final String CONF_HADOOP_PROXYUSER_RE = "hadoop\\.proxyuser\\."; - private static final String CONF_HADOOP_PROXYUSER_RE_USERS_GROUPS = - CONF_HADOOP_PROXYUSER_RE+"[^.]*(" + Pattern.quote(CONF_USERS) + - "|" + Pattern.quote(CONF_GROUPS) + ")"; - private static final String CONF_HADOOP_PROXYUSER_RE_HOSTS = - CONF_HADOOP_PROXYUSER_RE+"[^.]*"+ Pattern.quote(CONF_HOSTS); // acl and list of hosts per proxyuser private Map proxyUserAcl = new HashMap(); - private static Map proxyHosts = + private Map proxyHosts = new HashMap(); private Configuration conf; + + private static DefaultImpersonationProvider testProvider; + + public static synchronized DefaultImpersonationProvider getTestProvider() { + if (testProvider == null) { + testProvider = new DefaultImpersonationProvider(); + testProvider.setConf(new Configuration()); + testProvider.init(ProxyUsers.CONF_HADOOP_PROXYUSER); + } + return testProvider; + } + @Override public void setConf(Configuration conf) { this.conf = conf; + } - // get list of users and groups per proxyuser + private String configPrefix; + + @Override + public void init(String configurationPrefix) { + configPrefix = configurationPrefix + + (configurationPrefix.endsWith(".") ? "" : "."); + + // constructing regex to match the following patterns: + // $configPrefix.[ANY].users + // $configPrefix.[ANY].groups + // $configPrefix.[ANY].hosts + // + String prefixRegEx = configPrefix.replace(".", "\\."); + String usersGroupsRegEx = prefixRegEx + "[^.]*(" + + Pattern.quote(CONF_USERS) + "|" + Pattern.quote(CONF_GROUPS) + ")"; + String hostsRegEx = prefixRegEx + "[^.]*" + Pattern.quote(CONF_HOSTS); + + // get list of users and groups per proxyuser Map allMatchKeys = - conf.getValByRegex(CONF_HADOOP_PROXYUSER_RE_USERS_GROUPS); + conf.getValByRegex(usersGroupsRegEx); for(Entry entry : allMatchKeys.entrySet()) { String aclKey = getAclKey(entry.getKey()); if (!proxyUserAcl.containsKey(aclKey)) { @@ -65,7 +92,7 @@ public class DefaultImpersonationProvider implements ImpersonationProvider { } // get hosts per proxyuser - allMatchKeys = conf.getValByRegex(CONF_HADOOP_PROXYUSER_RE_HOSTS); + allMatchKeys = conf.getValByRegex(hostsRegEx); for(Entry entry : allMatchKeys.entrySet()) { proxyHosts.put(entry.getKey(), new MachineList(entry.getValue())); @@ -86,8 +113,8 @@ public class DefaultImpersonationProvider implements ImpersonationProvider { return; } - AccessControlList acl = proxyUserAcl.get( - CONF_HADOOP_PROXYUSER+realUser.getShortUserName()); + AccessControlList acl = proxyUserAcl.get(configPrefix + + realUser.getShortUserName()); if (acl == null || !acl.isUserAllowed(user)) { throw new AuthorizationException("User: " + realUser.getUserName() + " is not allowed to impersonate " + user.getUserName()); @@ -116,8 +143,8 @@ public class DefaultImpersonationProvider implements ImpersonationProvider { * @param userName name of the superuser * @return configuration key for superuser usergroups */ - public static String getProxySuperuserUserConfKey(String userName) { - return CONF_HADOOP_PROXYUSER+userName+CONF_USERS; + public String getProxySuperuserUserConfKey(String userName) { + return configPrefix + userName + CONF_USERS; } /** @@ -126,8 +153,8 @@ public class DefaultImpersonationProvider implements ImpersonationProvider { * @param userName name of the superuser * @return configuration key for superuser groups */ - public static String getProxySuperuserGroupConfKey(String userName) { - return CONF_HADOOP_PROXYUSER+userName+CONF_GROUPS; + public String getProxySuperuserGroupConfKey(String userName) { + return configPrefix + userName + CONF_GROUPS; } /** @@ -136,8 +163,8 @@ public class DefaultImpersonationProvider implements ImpersonationProvider { * @param userName name of the superuser * @return configuration key for superuser ip-addresses */ - public static String getProxySuperuserIpConfKey(String userName) { - return CONF_HADOOP_PROXYUSER+userName+CONF_HOSTS; + public String getProxySuperuserIpConfKey(String userName) { + return configPrefix + userName + CONF_HOSTS; } @VisibleForTesting diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/authorize/ImpersonationProvider.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/authorize/ImpersonationProvider.java index 6e7a39565df..8b483f0336f 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/authorize/ImpersonationProvider.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/authorize/ImpersonationProvider.java @@ -18,10 +18,25 @@ package org.apache.hadoop.security.authorize; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.security.UserGroupInformation; +@InterfaceStability.Unstable +@InterfaceAudience.Public public interface ImpersonationProvider extends Configurable { + + + /** + * Specifies the configuration prefix for the proxy user properties and + * initializes the provider. + * + * @param configurationPrefix the configuration prefix for the proxy user + * properties + */ + public void init(String configurationPrefix); + /** * Authorize the superuser which is doing doAs * diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/authorize/ProxyUsers.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/authorize/ProxyUsers.java index e221ae390f2..60d82cbdcf7 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/authorize/ProxyUsers.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/authorize/ProxyUsers.java @@ -18,7 +18,9 @@ package org.apache.hadoop.security.authorize; +import com.google.common.base.Preconditions; import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.security.UserGroupInformation; @@ -26,9 +28,12 @@ import org.apache.hadoop.util.ReflectionUtils; import com.google.common.annotations.VisibleForTesting; +@InterfaceStability.Unstable @InterfaceAudience.LimitedPrivate({"HDFS", "MapReduce", "HBase", "Hive"}) public class ProxyUsers { + public static final String CONF_HADOOP_PROXYUSER = "hadoop.proxyuser"; + private static volatile ImpersonationProvider sip ; /** @@ -54,15 +59,31 @@ public class ProxyUsers { } /** - * refresh configuration - * @param conf + * Refreshes configuration using the specified Proxy user prefix for + * properties. + * + * @param conf configuration + * @param proxyUserPrefix proxy user configuration prefix */ - public static void refreshSuperUserGroupsConfiguration(Configuration conf) { + public static void refreshSuperUserGroupsConfiguration(Configuration conf, + String proxyUserPrefix) { + Preconditions.checkArgument(proxyUserPrefix != null && + !proxyUserPrefix.isEmpty(), "prefix cannot be NULL or empty"); // sip is volatile. Any assignment to it as well as the object's state // will be visible to all the other threads. - sip = getInstance(conf); + ImpersonationProvider ip = getInstance(conf); + ip.init(proxyUserPrefix); + sip = ip; ProxyServers.refresh(conf); } + + /** + * Refreshes configuration using the default Proxy user prefix for properties. + * @param conf configuration + */ + public static void refreshSuperUserGroupsConfiguration(Configuration conf) { + refreshSuperUserGroupsConfiguration(conf, CONF_HADOOP_PROXYUSER); + } /** * Authorize the superuser which is doing doAs diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/MiniRPCBenchmark.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/MiniRPCBenchmark.java index 0bc0b047dad..cdbd5570f1b 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/MiniRPCBenchmark.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/MiniRPCBenchmark.java @@ -327,8 +327,8 @@ public class MiniRPCBenchmark { String shortUserName = UserGroupInformation.createRemoteUser(user).getShortUserName(); try { - conf.setStrings(DefaultImpersonationProvider.getProxySuperuserGroupConfKey(shortUserName), - GROUP_NAME_1); + conf.setStrings(DefaultImpersonationProvider.getTestProvider(). + getProxySuperuserGroupConfKey(shortUserName), GROUP_NAME_1); configureSuperUserIPAddresses(conf, shortUserName); // start the server miniServer = new MiniServer(conf, user, keytabFile); @@ -411,7 +411,7 @@ public class MiniRPCBenchmark { } builder.append("127.0.1.1,"); builder.append(InetAddress.getLocalHost().getCanonicalHostName()); - conf.setStrings(DefaultImpersonationProvider.getProxySuperuserIpConfKey(superUserShortName), - builder.toString()); + conf.setStrings(DefaultImpersonationProvider.getTestProvider(). + getProxySuperuserIpConfKey(superUserShortName), builder.toString()); } } diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/TestDoAsEffectiveUser.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/TestDoAsEffectiveUser.java index 42e7881d3a9..b44fa8b85af 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/TestDoAsEffectiveUser.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/TestDoAsEffectiveUser.java @@ -101,7 +101,8 @@ public class TestDoAsEffectiveUser { builder.append("127.0.1.1,"); builder.append(InetAddress.getLocalHost().getCanonicalHostName()); LOG.info("Local Ip addresses: "+builder.toString()); - conf.setStrings(DefaultImpersonationProvider.getProxySuperuserIpConfKey(superUserShortName), + conf.setStrings(DefaultImpersonationProvider.getTestProvider(). + getProxySuperuserIpConfKey(superUserShortName), builder.toString()); } @@ -181,8 +182,8 @@ public class TestDoAsEffectiveUser { @Test(timeout=4000) public void testRealUserSetup() throws IOException { final Configuration conf = new Configuration(); - conf.setStrings(DefaultImpersonationProvider - .getProxySuperuserGroupConfKey(REAL_USER_SHORT_NAME), "group1"); + conf.setStrings(DefaultImpersonationProvider.getTestProvider(). + getProxySuperuserGroupConfKey(REAL_USER_SHORT_NAME), "group1"); configureSuperUserIPAddresses(conf, REAL_USER_SHORT_NAME); Server server = new RPC.Builder(conf).setProtocol(TestProtocol.class) .setInstance(new TestImpl()).setBindAddress(ADDRESS).setPort(0) @@ -214,7 +215,8 @@ public class TestDoAsEffectiveUser { public void testRealUserAuthorizationSuccess() throws IOException { final Configuration conf = new Configuration(); configureSuperUserIPAddresses(conf, REAL_USER_SHORT_NAME); - conf.setStrings(DefaultImpersonationProvider.getProxySuperuserGroupConfKey(REAL_USER_SHORT_NAME), + conf.setStrings(DefaultImpersonationProvider.getTestProvider(). + getProxySuperuserGroupConfKey(REAL_USER_SHORT_NAME), "group1"); Server server = new RPC.Builder(conf).setProtocol(TestProtocol.class) .setInstance(new TestImpl()).setBindAddress(ADDRESS).setPort(0) @@ -248,9 +250,11 @@ public class TestDoAsEffectiveUser { @Test public void testRealUserIPAuthorizationFailure() throws IOException { final Configuration conf = new Configuration(); - conf.setStrings(DefaultImpersonationProvider.getProxySuperuserIpConfKey(REAL_USER_SHORT_NAME), + conf.setStrings(DefaultImpersonationProvider.getTestProvider(). + getProxySuperuserIpConfKey(REAL_USER_SHORT_NAME), "20.20.20.20"); //Authorized IP address - conf.setStrings(DefaultImpersonationProvider.getProxySuperuserGroupConfKey(REAL_USER_SHORT_NAME), + conf.setStrings(DefaultImpersonationProvider.getTestProvider(). + getProxySuperuserGroupConfKey(REAL_USER_SHORT_NAME), "group1"); Server server = new RPC.Builder(conf).setProtocol(TestProtocol.class) .setInstance(new TestImpl()).setBindAddress(ADDRESS).setPort(0) @@ -293,8 +297,8 @@ public class TestDoAsEffectiveUser { @Test public void testRealUserIPNotSpecified() throws IOException { final Configuration conf = new Configuration(); - conf.setStrings(DefaultImpersonationProvider - .getProxySuperuserGroupConfKey(REAL_USER_SHORT_NAME), "group1"); + conf.setStrings(DefaultImpersonationProvider.getTestProvider(). + getProxySuperuserGroupConfKey(REAL_USER_SHORT_NAME), "group1"); Server server = new RPC.Builder(conf).setProtocol(TestProtocol.class) .setInstance(new TestImpl()).setBindAddress(ADDRESS).setPort(0) .setNumHandlers(2).setVerbose(false).build(); @@ -377,7 +381,8 @@ public class TestDoAsEffectiveUser { public void testRealUserGroupAuthorizationFailure() throws IOException { final Configuration conf = new Configuration(); configureSuperUserIPAddresses(conf, REAL_USER_SHORT_NAME); - conf.setStrings(DefaultImpersonationProvider.getProxySuperuserGroupConfKey(REAL_USER_SHORT_NAME), + conf.setStrings(DefaultImpersonationProvider.getTestProvider(). + getProxySuperuserGroupConfKey(REAL_USER_SHORT_NAME), "group3"); Server server = new RPC.Builder(conf).setProtocol(TestProtocol.class) .setInstance(new TestImpl()).setBindAddress(ADDRESS).setPort(0) diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/authorize/TestProxyUsers.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/authorize/TestProxyUsers.java index 3823d70391a..dbcac676fab 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/authorize/TestProxyUsers.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/authorize/TestProxyUsers.java @@ -111,10 +111,12 @@ public class TestProxyUsers { groupMappingClassName); conf.set( - DefaultImpersonationProvider.getProxySuperuserGroupConfKey(REAL_USER_NAME), + DefaultImpersonationProvider.getTestProvider(). + getProxySuperuserGroupConfKey(REAL_USER_NAME), StringUtils.join(",", Arrays.asList(NETGROUP_NAMES))); conf.set( - DefaultImpersonationProvider.getProxySuperuserIpConfKey(REAL_USER_NAME), + DefaultImpersonationProvider.getTestProvider(). + getProxySuperuserIpConfKey(REAL_USER_NAME), PROXY_IP); ProxyUsers.refreshSuperUserGroupsConfiguration(conf); @@ -135,10 +137,12 @@ public class TestProxyUsers { public void testProxyUsers() throws Exception { Configuration conf = new Configuration(); conf.set( - DefaultImpersonationProvider.getProxySuperuserGroupConfKey(REAL_USER_NAME), + DefaultImpersonationProvider.getTestProvider(). + getProxySuperuserGroupConfKey(REAL_USER_NAME), StringUtils.join(",", Arrays.asList(GROUP_NAMES))); conf.set( - DefaultImpersonationProvider.getProxySuperuserIpConfKey(REAL_USER_NAME), + DefaultImpersonationProvider.getTestProvider(). + getProxySuperuserIpConfKey(REAL_USER_NAME), PROXY_IP); ProxyUsers.refreshSuperUserGroupsConfiguration(conf); @@ -168,10 +172,12 @@ public class TestProxyUsers { public void testProxyUsersWithUserConf() throws Exception { Configuration conf = new Configuration(); conf.set( - DefaultImpersonationProvider.getProxySuperuserUserConfKey(REAL_USER_NAME), + DefaultImpersonationProvider.getTestProvider(). + getProxySuperuserUserConfKey(REAL_USER_NAME), StringUtils.join(",", Arrays.asList(AUTHORIZED_PROXY_USER_NAME))); conf.set( - DefaultImpersonationProvider.getProxySuperuserIpConfKey(REAL_USER_NAME), + DefaultImpersonationProvider.getTestProvider(). + getProxySuperuserIpConfKey(REAL_USER_NAME), PROXY_IP); ProxyUsers.refreshSuperUserGroupsConfiguration(conf); @@ -202,10 +208,12 @@ public class TestProxyUsers { public void testWildcardGroup() { Configuration conf = new Configuration(); conf.set( - DefaultImpersonationProvider.getProxySuperuserGroupConfKey(REAL_USER_NAME), + DefaultImpersonationProvider.getTestProvider(). + getProxySuperuserGroupConfKey(REAL_USER_NAME), "*"); conf.set( - DefaultImpersonationProvider.getProxySuperuserIpConfKey(REAL_USER_NAME), + DefaultImpersonationProvider.getTestProvider(). + getProxySuperuserIpConfKey(REAL_USER_NAME), PROXY_IP); ProxyUsers.refreshSuperUserGroupsConfiguration(conf); @@ -236,10 +244,12 @@ public class TestProxyUsers { public void testWildcardUser() { Configuration conf = new Configuration(); conf.set( - DefaultImpersonationProvider.getProxySuperuserUserConfKey(REAL_USER_NAME), + DefaultImpersonationProvider.getTestProvider(). + getProxySuperuserUserConfKey(REAL_USER_NAME), "*"); conf.set( - DefaultImpersonationProvider.getProxySuperuserIpConfKey(REAL_USER_NAME), + DefaultImpersonationProvider.getTestProvider(). + getProxySuperuserIpConfKey(REAL_USER_NAME), PROXY_IP); ProxyUsers.refreshSuperUserGroupsConfiguration(conf); @@ -270,10 +280,12 @@ public class TestProxyUsers { public void testWildcardIP() { Configuration conf = new Configuration(); conf.set( - DefaultImpersonationProvider.getProxySuperuserGroupConfKey(REAL_USER_NAME), + DefaultImpersonationProvider.getTestProvider(). + getProxySuperuserGroupConfKey(REAL_USER_NAME), StringUtils.join(",", Arrays.asList(GROUP_NAMES))); conf.set( - DefaultImpersonationProvider.getProxySuperuserIpConfKey(REAL_USER_NAME), + DefaultImpersonationProvider.getTestProvider(). + getProxySuperuserIpConfKey(REAL_USER_NAME), "*"); ProxyUsers.refreshSuperUserGroupsConfiguration(conf); @@ -301,10 +313,12 @@ public class TestProxyUsers { public void testIPRange() { Configuration conf = new Configuration(); conf.set( - DefaultImpersonationProvider.getProxySuperuserGroupConfKey(REAL_USER_NAME), + DefaultImpersonationProvider.getTestProvider(). + getProxySuperuserGroupConfKey(REAL_USER_NAME), "*"); conf.set( - DefaultImpersonationProvider.getProxySuperuserIpConfKey(REAL_USER_NAME), + DefaultImpersonationProvider.getTestProvider(). + getProxySuperuserIpConfKey(REAL_USER_NAME), PROXY_IP_RANGE); ProxyUsers.refreshSuperUserGroupsConfiguration(conf); @@ -324,16 +338,19 @@ public class TestProxyUsers { public void testWithDuplicateProxyGroups() throws Exception { Configuration conf = new Configuration(); conf.set( - DefaultImpersonationProvider.getProxySuperuserGroupConfKey(REAL_USER_NAME), + DefaultImpersonationProvider.getTestProvider(). + getProxySuperuserGroupConfKey(REAL_USER_NAME), StringUtils.join(",", Arrays.asList(GROUP_NAMES,GROUP_NAMES))); conf.set( - DefaultImpersonationProvider.getProxySuperuserIpConfKey(REAL_USER_NAME), + DefaultImpersonationProvider.getTestProvider(). + getProxySuperuserIpConfKey(REAL_USER_NAME), PROXY_IP); ProxyUsers.refreshSuperUserGroupsConfiguration(conf); Collection groupsToBeProxied = ProxyUsers.getDefaultImpersonationProvider().getProxyGroups().get( - DefaultImpersonationProvider.getProxySuperuserGroupConfKey(REAL_USER_NAME)); + DefaultImpersonationProvider.getTestProvider(). + getProxySuperuserGroupConfKey(REAL_USER_NAME)); assertEquals (1,groupsToBeProxied.size()); } @@ -342,16 +359,19 @@ public class TestProxyUsers { public void testWithDuplicateProxyHosts() throws Exception { Configuration conf = new Configuration(); conf.set( - DefaultImpersonationProvider.getProxySuperuserGroupConfKey(REAL_USER_NAME), + DefaultImpersonationProvider.getTestProvider() + .getProxySuperuserGroupConfKey(REAL_USER_NAME), StringUtils.join(",", Arrays.asList(GROUP_NAMES))); conf.set( - DefaultImpersonationProvider.getProxySuperuserIpConfKey(REAL_USER_NAME), + DefaultImpersonationProvider.getTestProvider(). + getProxySuperuserIpConfKey(REAL_USER_NAME), StringUtils.join(",", Arrays.asList(PROXY_IP,PROXY_IP))); ProxyUsers.refreshSuperUserGroupsConfiguration(conf); Collection hosts = ProxyUsers.getDefaultImpersonationProvider().getProxyHosts().get( - DefaultImpersonationProvider.getProxySuperuserIpConfKey(REAL_USER_NAME)); + DefaultImpersonationProvider.getTestProvider(). + getProxySuperuserIpConfKey(REAL_USER_NAME)); assertEquals (1,hosts.size()); } @@ -391,26 +411,73 @@ public class TestProxyUsers { public void testWithProxyGroupsAndUsersWithSpaces() throws Exception { Configuration conf = new Configuration(); conf.set( - DefaultImpersonationProvider.getProxySuperuserUserConfKey(REAL_USER_NAME), + DefaultImpersonationProvider.getTestProvider(). + getProxySuperuserUserConfKey(REAL_USER_NAME), StringUtils.join(",", Arrays.asList(PROXY_USER_NAME + " ",AUTHORIZED_PROXY_USER_NAME, "ONEMORE"))); conf.set( - DefaultImpersonationProvider.getProxySuperuserGroupConfKey(REAL_USER_NAME), + DefaultImpersonationProvider.getTestProvider(). + getProxySuperuserGroupConfKey(REAL_USER_NAME), StringUtils.join(",", Arrays.asList(GROUP_NAMES))); conf.set( - DefaultImpersonationProvider.getProxySuperuserIpConfKey(REAL_USER_NAME), + DefaultImpersonationProvider.getTestProvider(). + getProxySuperuserIpConfKey(REAL_USER_NAME), PROXY_IP); ProxyUsers.refreshSuperUserGroupsConfiguration(conf); Collection groupsToBeProxied = ProxyUsers.getDefaultImpersonationProvider().getProxyGroups().get( - DefaultImpersonationProvider.getProxySuperuserGroupConfKey(REAL_USER_NAME)); + DefaultImpersonationProvider.getTestProvider(). + getProxySuperuserGroupConfKey(REAL_USER_NAME)); assertEquals (GROUP_NAMES.length, groupsToBeProxied.size()); } + @Test(expected = IllegalArgumentException.class) + public void testProxyUsersWithNullPrefix() throws Exception { + ProxyUsers.refreshSuperUserGroupsConfiguration(new Configuration(false), + null); + } + + @Test(expected = IllegalArgumentException.class) + public void testProxyUsersWithEmptyPrefix() throws Exception { + ProxyUsers.refreshSuperUserGroupsConfiguration(new Configuration(false), + ""); + } + + @Test + public void testProxyUsersWithCustomPrefix() throws Exception { + Configuration conf = new Configuration(false); + conf.set("x." + REAL_USER_NAME + ".users", + StringUtils.join(",", Arrays.asList(AUTHORIZED_PROXY_USER_NAME))); + conf.set("x." + REAL_USER_NAME+ ".hosts", PROXY_IP); + ProxyUsers.refreshSuperUserGroupsConfiguration(conf, "x"); + + + // First try proxying a user that's allowed + UserGroupInformation realUserUgi = UserGroupInformation + .createRemoteUser(REAL_USER_NAME); + UserGroupInformation proxyUserUgi = UserGroupInformation.createProxyUserForTesting( + AUTHORIZED_PROXY_USER_NAME, realUserUgi, GROUP_NAMES); + + // From good IP + assertAuthorized(proxyUserUgi, "1.2.3.4"); + // From bad IP + assertNotAuthorized(proxyUserUgi, "1.2.3.5"); + + // Now try proxying a user that's not allowed + realUserUgi = UserGroupInformation.createRemoteUser(REAL_USER_NAME); + proxyUserUgi = UserGroupInformation.createProxyUserForTesting( + PROXY_USER_NAME, realUserUgi, GROUP_NAMES); + + // From good IP + assertNotAuthorized(proxyUserUgi, "1.2.3.4"); + // From bad IP + assertNotAuthorized(proxyUserUgi, "1.2.3.5"); + } + private void assertNotAuthorized(UserGroupInformation proxyUgi, String host) { try { @@ -430,6 +497,11 @@ public class TestProxyUsers { } static class TestDummyImpersonationProvider implements ImpersonationProvider { + + @Override + public void init(String configurationPrefix) { + } + /** * Authorize a user (superuser) to impersonate another user (user1) if the * superuser belongs to the group "sudo_user1" . @@ -460,11 +532,13 @@ public class TestProxyUsers { public static void loadTest(String ipString, int testRange) { Configuration conf = new Configuration(); conf.set( - DefaultImpersonationProvider.getProxySuperuserGroupConfKey(REAL_USER_NAME), + DefaultImpersonationProvider.getTestProvider(). + getProxySuperuserGroupConfKey(REAL_USER_NAME), StringUtils.join(",", Arrays.asList(GROUP_NAMES))); conf.set( - DefaultImpersonationProvider.getProxySuperuserIpConfKey(REAL_USER_NAME), + DefaultImpersonationProvider.getTestProvider(). + getProxySuperuserIpConfKey(REAL_USER_NAME), ipString ); ProxyUsers.refreshSuperUserGroupsConfiguration(conf); diff --git a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestReaddir.java b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestReaddir.java index 617c31d8005..05ba2db05a7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestReaddir.java +++ b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestReaddir.java @@ -72,11 +72,11 @@ public class TestReaddir { public static void setup() throws Exception { String currentUser = System.getProperty("user.name"); config.set( - DefaultImpersonationProvider.getProxySuperuserGroupConfKey(currentUser), - "*"); + DefaultImpersonationProvider.getTestProvider(). + getProxySuperuserGroupConfKey(currentUser), "*"); config.set( - DefaultImpersonationProvider.getProxySuperuserIpConfKey(currentUser), - "*"); + DefaultImpersonationProvider.getTestProvider(). + getProxySuperuserIpConfKey(currentUser), "*"); ProxyUsers.refreshSuperUserGroupsConfiguration(config); cluster = new MiniDFSCluster.Builder(config).numDataNodes(1).build(); cluster.waitActive(); diff --git a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestWrites.java b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestWrites.java index 3945b298f5e..32f9b2bcddf 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestWrites.java +++ b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestWrites.java @@ -312,10 +312,12 @@ public class TestWrites { System.getProperty("user.name")); String currentUser = System.getProperty("user.name"); config.set( - DefaultImpersonationProvider.getProxySuperuserGroupConfKey(currentUser), + DefaultImpersonationProvider.getTestProvider(). + getProxySuperuserGroupConfKey(currentUser), "*"); config.set( - DefaultImpersonationProvider.getProxySuperuserIpConfKey(currentUser), + DefaultImpersonationProvider.getTestProvider(). + getProxySuperuserIpConfKey(currentUser), "*"); ProxyUsers.refreshSuperUserGroupsConfiguration(config); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/security/TestDelegationTokenForProxyUser.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/security/TestDelegationTokenForProxyUser.java index fa29b4b03f2..e6493a24ff3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/security/TestDelegationTokenForProxyUser.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/security/TestDelegationTokenForProxyUser.java @@ -89,7 +89,8 @@ public class TestDelegationTokenForProxyUser { builder.append("127.0.1.1,"); builder.append(InetAddress.getLocalHost().getCanonicalHostName()); LOG.info("Local Ip addresses: " + builder.toString()); - conf.setStrings(DefaultImpersonationProvider.getProxySuperuserIpConfKey(superUserShortName), + conf.setStrings(DefaultImpersonationProvider.getTestProvider(). + getProxySuperuserIpConfKey(superUserShortName), builder.toString()); } @@ -101,7 +102,8 @@ public class TestDelegationTokenForProxyUser { DFSConfigKeys.DFS_NAMENODE_DELEGATION_TOKEN_MAX_LIFETIME_KEY, 10000); config.setLong( DFSConfigKeys.DFS_NAMENODE_DELEGATION_TOKEN_RENEW_INTERVAL_KEY, 5000); - config.setStrings(DefaultImpersonationProvider.getProxySuperuserGroupConfKey(REAL_USER), + config.setStrings(DefaultImpersonationProvider.getTestProvider(). + getProxySuperuserGroupConfKey(REAL_USER), "group1"); config.setBoolean( DFSConfigKeys.DFS_NAMENODE_DELEGATION_TOKEN_ALWAYS_USE_KEY, true); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/common/TestJspHelper.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/common/TestJspHelper.java index 827a33aa5ba..d0d8d3e3189 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/common/TestJspHelper.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/common/TestJspHelper.java @@ -285,8 +285,10 @@ public class TestJspHelper { String user = "TheNurse"; conf.set(DFSConfigKeys.HADOOP_SECURITY_AUTHENTICATION, "kerberos"); - conf.set(DefaultImpersonationProvider.getProxySuperuserGroupConfKey(realUser), "*"); - conf.set(DefaultImpersonationProvider.getProxySuperuserIpConfKey(realUser), "*"); + conf.set(DefaultImpersonationProvider.getTestProvider(). + getProxySuperuserGroupConfKey(realUser), "*"); + conf.set(DefaultImpersonationProvider.getTestProvider(). + getProxySuperuserIpConfKey(realUser), "*"); ProxyUsers.refreshSuperUserGroupsConfiguration(conf); UserGroupInformation.setConfiguration(conf); UserGroupInformation ugi; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/security/TestRefreshUserMappings.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/security/TestRefreshUserMappings.java index aadc6b0a417..72776e03ceb 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/security/TestRefreshUserMappings.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/security/TestRefreshUserMappings.java @@ -151,8 +151,10 @@ public class TestRefreshUserMappings { final String [] GROUP_NAMES2 = new String [] {"gr3" , "gr4"}; //keys in conf - String userKeyGroups = DefaultImpersonationProvider.getProxySuperuserGroupConfKey(SUPER_USER); - String userKeyHosts = DefaultImpersonationProvider.getProxySuperuserIpConfKey (SUPER_USER); + String userKeyGroups = DefaultImpersonationProvider.getTestProvider(). + getProxySuperuserGroupConfKey(SUPER_USER); + String userKeyHosts = DefaultImpersonationProvider.getTestProvider(). + getProxySuperuserIpConfKey (SUPER_USER); config.set(userKeyGroups, "gr3,gr4,gr5"); // superuser can proxy for this group config.set(userKeyHosts,"127.0.0.1"); From f1b831ccfbec37712522bef7a44f51bff0369003 Mon Sep 17 00:00:00 2001 From: Xuan Gong Date: Fri, 18 Jul 2014 21:46:29 +0000 Subject: [PATCH 4/7] YARN-2208. AMRMTokenManager need to have a way to roll over AMRMToken. Contributed by Xuan Gong git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1611820 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-yarn-project/CHANGES.txt | 2 + .../yarn/security/AMRMTokenIdentifier.java | 14 + .../rmapp/attempt/RMAppAttemptImpl.java | 6 +- .../security/AMRMTokenSecretManager.java | 246 +++++++++++++----- .../server/resourcemanager/TestRMRestart.java | 9 +- .../recovery/RMStateStoreTestBase.java | 14 +- .../attempt/TestRMAppAttemptTransitions.java | 3 + .../security/TestAMRMTokens.java | 81 +++++- 8 files changed, 287 insertions(+), 88 deletions(-) diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 4665703b286..99ecd9971f0 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -49,6 +49,8 @@ Release 2.6.0 - UNRELEASED YARN-1341. Recover NMTokens upon nodemanager restart. (Jason Lowe via junping_du) + YARN-2208. AMRMTokenManager need to have a way to roll over AMRMToken. (xgong) + OPTIMIZATIONS BUG FIXES diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/AMRMTokenIdentifier.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/AMRMTokenIdentifier.java index 99495d7ad0c..bc2d7c5624a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/AMRMTokenIdentifier.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/AMRMTokenIdentifier.java @@ -44,6 +44,7 @@ public class AMRMTokenIdentifier extends TokenIdentifier { public static final Text KIND_NAME = new Text("YARN_AM_RM_TOKEN"); private ApplicationAttemptId applicationAttemptId; + private int keyId = Integer.MIN_VALUE; public AMRMTokenIdentifier() { } @@ -53,6 +54,13 @@ public class AMRMTokenIdentifier extends TokenIdentifier { this.applicationAttemptId = appAttemptId; } + public AMRMTokenIdentifier(ApplicationAttemptId appAttemptId, + int masterKeyId) { + this(); + this.applicationAttemptId = appAttemptId; + this.keyId = masterKeyId; + } + @Private public ApplicationAttemptId getApplicationAttemptId() { return this.applicationAttemptId; @@ -64,6 +72,7 @@ public class AMRMTokenIdentifier extends TokenIdentifier { out.writeLong(appId.getClusterTimestamp()); out.writeInt(appId.getId()); out.writeInt(this.applicationAttemptId.getAttemptId()); + out.writeInt(this.keyId); } @Override @@ -75,6 +84,7 @@ public class AMRMTokenIdentifier extends TokenIdentifier { ApplicationId.newInstance(clusterTimeStamp, appId); this.applicationAttemptId = ApplicationAttemptId.newInstance(applicationId, attemptId); + this.keyId = in.readInt(); } @Override @@ -92,6 +102,10 @@ public class AMRMTokenIdentifier extends TokenIdentifier { .toString()); } + public int getKeyId() { + return this.keyId; + } + // TODO: Needed? @InterfaceAudience.Private public static class Renewer extends Token.TrivialRenewer { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java index dbcf64fc391..64657ad7ccf 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java @@ -774,11 +774,9 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable { } // create AMRMToken - AMRMTokenIdentifier id = - new AMRMTokenIdentifier(appAttempt.applicationAttemptId); appAttempt.amrmToken = - new Token(id, - appAttempt.rmContext.getAMRMTokenSecretManager()); + appAttempt.rmContext.getAMRMTokenSecretManager().createAndGetAMRMToken( + appAttempt.applicationAttemptId); // Add the applicationAttempt to the scheduler and inform the scheduler // whether to transfer the state from previous attempt. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/AMRMTokenSecretManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/AMRMTokenSecretManager.java index 5d21ec08885..c498b529bf6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/AMRMTokenSecretManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/AMRMTokenSecretManager.java @@ -19,22 +19,28 @@ package org.apache.hadoop.yarn.server.resourcemanager.security; import java.io.IOException; -import java.util.HashMap; -import java.util.Map; +import java.security.SecureRandom; +import java.util.HashSet; +import java.util.Set; import java.util.Timer; import java.util.TimerTask; - -import javax.crypto.SecretKey; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.Text; import org.apache.hadoop.security.token.SecretManager; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; +import org.apache.hadoop.yarn.server.security.MasterKeyData; + +import com.google.common.annotations.VisibleForTesting; /** * AMRM-tokens are per ApplicationAttempt. If users redistribute their @@ -49,40 +55,66 @@ public class AMRMTokenSecretManager extends private static final Log LOG = LogFactory .getLog(AMRMTokenSecretManager.class); - private SecretKey masterKey; + private int serialNo = new SecureRandom().nextInt(); + private MasterKeyData nextMasterKey; + private MasterKeyData currentMasterKey; + + private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock(); + private final Lock readLock = readWriteLock.readLock(); + private final Lock writeLock = readWriteLock.writeLock(); + private final Timer timer; private final long rollingInterval; + private final long activationDelay; - private final Map passwords = - new HashMap(); + private final Set appAttemptSet = + new HashSet(); /** * Create an {@link AMRMTokenSecretManager} */ public AMRMTokenSecretManager(Configuration conf) { - rollMasterKey(); this.timer = new Timer(); this.rollingInterval = conf .getLong( YarnConfiguration.RM_AMRM_TOKEN_MASTER_KEY_ROLLING_INTERVAL_SECS, YarnConfiguration.DEFAULT_RM_AMRM_TOKEN_MASTER_KEY_ROLLING_INTERVAL_SECS) * 1000; + // Adding delay = 1.5 * expiry interval makes sure that all active AMs get + // the updated shared-key. + this.activationDelay = + (long) (conf.getLong(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, + YarnConfiguration.DEFAULT_RM_AM_EXPIRY_INTERVAL_MS) * 1.5); + LOG.info("AMRMTokenKeyRollingInterval: " + this.rollingInterval + + "ms and AMRMTokenKeyActivationDelay: " + this.activationDelay + " ms"); + if (rollingInterval <= activationDelay * 2) { + throw new IllegalArgumentException( + YarnConfiguration.RM_AMRM_TOKEN_MASTER_KEY_ROLLING_INTERVAL_SECS + + " should be more than 2 X " + + YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS); + } } public void start() { - this.timer.scheduleAtFixedRate(new MasterKeyRoller(), 0, rollingInterval); + if (this.currentMasterKey == null) { + this.currentMasterKey = createNewMasterKey(); + } + this.timer.scheduleAtFixedRate(new MasterKeyRoller(), rollingInterval, + rollingInterval); } public void stop() { this.timer.cancel(); } - public synchronized void applicationMasterFinished( - ApplicationAttemptId appAttemptId) { - if (LOG.isDebugEnabled()) { - LOG.debug("Application finished, removing password for " + appAttemptId); + public void applicationMasterFinished(ApplicationAttemptId appAttemptId) { + this.writeLock.lock(); + try { + LOG.info("Application finished, removing password for " + appAttemptId); + this.appAttemptSet.remove(appAttemptId); + } finally { + this.writeLock.unlock(); } - this.passwords.remove(appAttemptId); } private class MasterKeyRoller extends TimerTask { @@ -93,49 +125,89 @@ public class AMRMTokenSecretManager extends } @Private - public synchronized void setMasterKey(SecretKey masterKey) { - this.masterKey = masterKey; - } - - @Private - public synchronized SecretKey getMasterKey() { - return this.masterKey; - } - - @Private - synchronized void rollMasterKey() { - LOG.info("Rolling master-key for amrm-tokens"); - this.masterKey = generateSecret(); - } - - /** - * Create a password for a given {@link AMRMTokenIdentifier}. Used to - * send to the AppicationAttempt which can give it back during authentication. - */ - @Override - public synchronized byte[] createPassword( - AMRMTokenIdentifier identifier) { - ApplicationAttemptId applicationAttemptId = - identifier.getApplicationAttemptId(); - if (LOG.isDebugEnabled()) { - LOG.debug("Creating password for " + applicationAttemptId); + void rollMasterKey() { + this.writeLock.lock(); + try { + LOG.info("Rolling master-key for amrm-tokens"); + this.nextMasterKey = createNewMasterKey(); + this.timer.schedule(new NextKeyActivator(), this.activationDelay); + } finally { + this.writeLock.unlock(); + } + } + + private class NextKeyActivator extends TimerTask { + @Override + public void run() { + activateNextMasterKey(); + } + } + + public void activateNextMasterKey() { + this.writeLock.lock(); + try { + LOG.info("Activating next master key with id: " + + this.nextMasterKey.getMasterKey().getKeyId()); + this.currentMasterKey = this.nextMasterKey; + this.nextMasterKey = null; + } finally { + this.writeLock.unlock(); + } + } + + @Private + @VisibleForTesting + public MasterKeyData createNewMasterKey() { + this.writeLock.lock(); + try { + return new MasterKeyData(serialNo++, generateSecret()); + } finally { + this.writeLock.unlock(); + } + } + + public Token createAndGetAMRMToken( + ApplicationAttemptId appAttemptId) { + this.writeLock.lock(); + try { + LOG.info("Create AMRMToken for ApplicationAttempt: " + appAttemptId); + AMRMTokenIdentifier identifier = + new AMRMTokenIdentifier(appAttemptId, getMasterKey().getMasterKey() + .getKeyId()); + byte[] password = this.createPassword(identifier); + appAttemptSet.add(appAttemptId); + return new Token(identifier.getBytes(), password, + identifier.getKind(), new Text()); + } finally { + this.writeLock.unlock(); + } + } + + // If nextMasterKey is not Null, then return nextMasterKey + // otherwise return currentMasterKey + @VisibleForTesting + public MasterKeyData getMasterKey() { + this.readLock.lock(); + try { + return nextMasterKey == null ? currentMasterKey : nextMasterKey; + } finally { + this.readLock.unlock(); } - byte[] password = createPassword(identifier.getBytes(), masterKey); - this.passwords.put(applicationAttemptId, password); - return password; } /** * Populate persisted password of AMRMToken back to AMRMTokenSecretManager. */ - public synchronized void - addPersistedPassword(Token token) throws IOException { - AMRMTokenIdentifier identifier = token.decodeIdentifier(); - if (LOG.isDebugEnabled()) { + public void addPersistedPassword(Token token) + throws IOException { + this.writeLock.lock(); + try { + AMRMTokenIdentifier identifier = token.decodeIdentifier(); LOG.debug("Adding password for " + identifier.getApplicationAttemptId()); + appAttemptSet.add(identifier.getApplicationAttemptId()); + } finally { + this.writeLock.unlock(); } - this.passwords.put(identifier.getApplicationAttemptId(), - token.getPassword()); } /** @@ -143,19 +215,35 @@ public class AMRMTokenSecretManager extends * Used by RPC layer to validate a remote {@link AMRMTokenIdentifier}. */ @Override - public synchronized byte[] retrievePassword( - AMRMTokenIdentifier identifier) throws InvalidToken { - ApplicationAttemptId applicationAttemptId = - identifier.getApplicationAttemptId(); - if (LOG.isDebugEnabled()) { - LOG.debug("Trying to retrieve password for " + applicationAttemptId); + public byte[] retrievePassword(AMRMTokenIdentifier identifier) + throws InvalidToken { + this.readLock.lock(); + try { + ApplicationAttemptId applicationAttemptId = + identifier.getApplicationAttemptId(); + if (LOG.isDebugEnabled()) { + LOG.debug("Trying to retrieve password for " + applicationAttemptId); + } + if (!appAttemptSet.contains(applicationAttemptId)) { + throw new InvalidToken("Password not found for ApplicationAttempt " + + applicationAttemptId); + } + if (identifier.getKeyId() == this.currentMasterKey.getMasterKey() + .getKeyId()) { + return createPassword(identifier.getBytes(), + this.currentMasterKey.getSecretKey()); + } else if (nextMasterKey != null + && identifier.getKeyId() == this.nextMasterKey.getMasterKey() + .getKeyId()) { + return createPassword(identifier.getBytes(), + this.nextMasterKey.getSecretKey()); + } + throw new InvalidToken("Given AMRMToken for application : " + + applicationAttemptId.toString() + + " seems to have been generated illegally."); + } finally { + this.readLock.unlock(); } - byte[] password = this.passwords.get(applicationAttemptId); - if (password == null) { - throw new InvalidToken("Password not found for ApplicationAttempt " - + applicationAttemptId); - } - return password; } /** @@ -167,4 +255,40 @@ public class AMRMTokenSecretManager extends return new AMRMTokenIdentifier(); } + @Private + @VisibleForTesting + public MasterKeyData getCurrnetMasterKeyData() { + this.readLock.lock(); + try { + return this.currentMasterKey; + } finally { + this.readLock.unlock(); + } + } + + @Private + @VisibleForTesting + public MasterKeyData getNextMasterKeyData() { + this.readLock.lock(); + try { + return this.nextMasterKey; + } finally { + this.readLock.unlock(); + } + } + + @Override + @Private + protected byte[] createPassword(AMRMTokenIdentifier identifier) { + this.readLock.lock(); + try { + ApplicationAttemptId applicationAttemptId = + identifier.getApplicationAttemptId(); + LOG.info("Creating password for " + applicationAttemptId); + return createPassword(identifier.getBytes(), getMasterKey() + .getSecretKey()); + } finally { + this.readLock.unlock(); + } + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java index dc3e9f18178..8966af7e2a5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java @@ -1250,10 +1250,11 @@ public class TestRMRestart { .getEncoded()); // assert AMRMTokenSecretManager also knows about the AMRMToken password - Token amrmToken = loadedAttempt1.getAMRMToken(); - Assert.assertArrayEquals(amrmToken.getPassword(), - rm2.getRMContext().getAMRMTokenSecretManager().retrievePassword( - amrmToken.decodeIdentifier())); + // TODO: fix this on YARN-2211 +// Token amrmToken = loadedAttempt1.getAMRMToken(); +// Assert.assertArrayEquals(amrmToken.getPassword(), +// rm2.getRMContext().getAMRMTokenSecretManager().retrievePassword( +// amrmToken.decodeIdentifier())); rm1.stop(); rm2.stop(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java index 6ac23a2ad57..04f034818c0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java @@ -25,6 +25,7 @@ import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; +import static org.mockito.Mockito.spy; import java.util.ArrayList; import java.util.HashMap; @@ -34,7 +35,6 @@ import java.util.Map; import javax.crypto.SecretKey; import org.junit.Assert; - import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -67,6 +67,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptS import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptNewSavedEvent; import org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretManager; import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM; +import org.apache.hadoop.yarn.server.security.MasterKeyData; import org.apache.hadoop.yarn.util.ConverterUtils; public class RMStateStoreTestBase extends ClientBaseWithFixes{ @@ -175,8 +176,11 @@ public class RMStateStoreTestBase extends ClientBaseWithFixes{ TestDispatcher dispatcher = new TestDispatcher(); store.setRMDispatcher(dispatcher); - AMRMTokenSecretManager appTokenMgr = - new AMRMTokenSecretManager(conf); + AMRMTokenSecretManager appTokenMgr = spy( + new AMRMTokenSecretManager(conf)); + MasterKeyData masterKeyData = appTokenMgr.createNewMasterKey(); + when(appTokenMgr.getMasterKey()).thenReturn(masterKeyData); + ClientToAMTokenSecretManagerInRM clientToAMTokenMgr = new ClientToAMTokenSecretManagerInRM(); @@ -455,10 +459,8 @@ public class RMStateStoreTestBase extends ClientBaseWithFixes{ private Token generateAMRMToken( ApplicationAttemptId attemptId, AMRMTokenSecretManager appTokenMgr) { - AMRMTokenIdentifier appTokenId = - new AMRMTokenIdentifier(attemptId); Token appToken = - new Token(appTokenId, appTokenMgr); + appTokenMgr.createAndGetAMRMToken(attemptId); appToken.setService(new Text("appToken service")); return appToken; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java index c99987d7add..ca0fc3960a0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java @@ -97,6 +97,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretMan import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM; import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM; import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; +import org.apache.hadoop.yarn.server.security.MasterKeyData; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.server.webproxy.ProxyUriUtils; import org.apache.hadoop.yarn.webapp.util.WebAppUtils; @@ -224,6 +225,8 @@ public class TestRMAppAttemptTransitions { amLivelinessMonitor = mock(AMLivelinessMonitor.class); amFinishingMonitor = mock(AMLivelinessMonitor.class); writer = mock(RMApplicationHistoryWriter.class); + MasterKeyData masterKeyData = amRMTokenManager.createNewMasterKey(); + when(amRMTokenManager.getMasterKey()).thenReturn(masterKeyData); rmContext = new RMContextImpl(rmDispatcher, containerAllocationExpirer, amLivelinessMonitor, amFinishingMonitor, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestAMRMTokens.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestAMRMTokens.java index 64602bd888e..b11aadd7912 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestAMRMTokens.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestAMRMTokens.java @@ -23,13 +23,12 @@ import java.security.PrivilegedAction; import java.util.Arrays; import java.util.Collection; -import javax.crypto.SecretKey; - import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.security.Credentials; +import org.apache.hadoop.security.SecurityUtil; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.TokenIdentifier; @@ -41,7 +40,9 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; +import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.ipc.YarnRPC; +import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; import org.apache.hadoop.yarn.server.resourcemanager.MockNM; import org.apache.hadoop.yarn.server.resourcemanager.MockRM; import org.apache.hadoop.yarn.server.resourcemanager.TestAMAuthorization.MockRMWithAMS; @@ -50,6 +51,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerFinishedEvent; +import org.apache.hadoop.yarn.server.security.MasterKeyData; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.util.Records; import org.junit.Assert; @@ -65,6 +67,8 @@ public class TestAMRMTokens { private final Configuration conf; private static final int maxWaitAttempts = 50; + private static final int rolling_interval_sec = 13; + private static final long am_expire_ms = 4000; @Parameters public static Collection configs() { @@ -201,15 +205,22 @@ public class TestAMRMTokens { @Test public void testMasterKeyRollOver() throws Exception { + conf.setLong( + YarnConfiguration.RM_AMRM_TOKEN_MASTER_KEY_ROLLING_INTERVAL_SECS, + rolling_interval_sec); + conf.setLong(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, am_expire_ms); MyContainerManager containerManager = new MyContainerManager(); final MockRMWithAMS rm = new MockRMWithAMS(conf, containerManager); rm.start(); - + Long startTime = System.currentTimeMillis(); final Configuration conf = rm.getConfig(); final YarnRPC rpc = YarnRPC.create(conf); ApplicationMasterProtocol rmClient = null; - + AMRMTokenSecretManager appTokenSecretManager = + rm.getRMContext().getAMRMTokenSecretManager(); + MasterKeyData oldKey = appTokenSecretManager.getMasterKey(); + Assert.assertNotNull(oldKey); try { MockNM nm1 = rm.registerNode("localhost:1234", 5120); @@ -218,7 +229,7 @@ public class TestAMRMTokens { nm1.nodeHeartbeat(true); int waitCount = 0; - while (containerManager.containerTokens == null && waitCount++ < 20) { + while (containerManager.containerTokens == null && waitCount++ < maxWaitAttempts) { LOG.info("Waiting for AM Launch to happen.."); Thread.sleep(1000); } @@ -250,21 +261,65 @@ public class TestAMRMTokens { Assert.assertTrue( rmClient.allocate(allocateRequest).getAMCommand() == null); - // Simulate a master-key-roll-over - AMRMTokenSecretManager appTokenSecretManager = - rm.getRMContext().getAMRMTokenSecretManager(); - SecretKey oldKey = appTokenSecretManager.getMasterKey(); - appTokenSecretManager.rollMasterKey(); - SecretKey newKey = appTokenSecretManager.getMasterKey(); + // Wait for enough time and make sure the roll_over happens + // At mean time, the old AMRMToken should continue to work + while(System.currentTimeMillis() - startTime < rolling_interval_sec*1000) { + rmClient.allocate(allocateRequest); + Thread.sleep(500); + } + + MasterKeyData newKey = appTokenSecretManager.getMasterKey(); + Assert.assertNotNull(newKey); Assert.assertFalse("Master key should have changed!", oldKey.equals(newKey)); + // Another allocate call with old AMRMToken. Should continue to work. + rpc.stopProxy(rmClient, conf); // To avoid using cached client + rmClient = createRMClient(rm, conf, rpc, currentUser); + Assert + .assertTrue(rmClient.allocate(allocateRequest).getAMCommand() == null); + + waitCount = 0; + while(waitCount++ <= maxWaitAttempts) { + if (appTokenSecretManager.getCurrnetMasterKeyData() != oldKey) { + break; + } + try { + rmClient.allocate(allocateRequest); + } catch (Exception ex) { + break; + } + Thread.sleep(200); + } + // active the nextMasterKey, and replace the currentMasterKey + Assert.assertTrue(appTokenSecretManager.getCurrnetMasterKeyData().equals(newKey)); + Assert.assertTrue(appTokenSecretManager.getMasterKey().equals(newKey)); + Assert.assertTrue(appTokenSecretManager.getNextMasterKeyData() == null); + + // Create a new Token + Token newToken = + appTokenSecretManager.createAndGetAMRMToken(applicationAttemptId); + SecurityUtil.setTokenService(newToken, rmBindAddress); + currentUser.addToken(newToken); // Another allocate call. Should continue to work. rpc.stopProxy(rmClient, conf); // To avoid using cached client rmClient = createRMClient(rm, conf, rpc, currentUser); allocateRequest = Records.newRecord(AllocateRequest.class); - Assert.assertTrue( - rmClient.allocate(allocateRequest).getAMCommand() == null); + Assert + .assertTrue(rmClient.allocate(allocateRequest).getAMCommand() == null); + + // Should not work by using the old AMRMToken. + rpc.stopProxy(rmClient, conf); // To avoid using cached client + try { + currentUser.addToken(amRMToken); + rmClient = createRMClient(rm, conf, rpc, currentUser); + allocateRequest = Records.newRecord(AllocateRequest.class); + Assert + .assertTrue(rmClient.allocate(allocateRequest).getAMCommand() == null); + Assert.fail("The old Token should not work"); + } catch (Exception ex) { + // expect exception + } } finally { rm.stop(); if (rmClient != null) { From 0a3ea6c486b43a798d487f9a20668d418f539b8b Mon Sep 17 00:00:00 2001 From: Alejandro Abdelnur Date: Fri, 18 Jul 2014 22:01:18 +0000 Subject: [PATCH 5/7] HADOOP-10750. KMSKeyProviderCache should be in hadoop-common. (asuresh via tucu) git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1611823 13f79535-47bb-0310-9956-ffa450edef68 --- .../hadoop-common/CHANGES.txt | 3 + .../hadoop/crypto/key/CachingKeyProvider.java | 174 +++++++++++++++++ .../crypto/key/TestCachingKeyProvider.java} | 74 +++++--- .../key/kms/server/KMSCacheKeyProvider.java | 177 ------------------ .../key/kms/server/KMSConfiguration.java | 14 +- .../crypto/key/kms/server/KMSWebApp.java | 16 +- .../hadoop-kms/src/site/apt/index.apt.vm | 27 ++- 7 files changed, 275 insertions(+), 210 deletions(-) create mode 100644 hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/CachingKeyProvider.java rename hadoop-common-project/{hadoop-kms/src/test/java/org/apache/hadoop/crypto/key/kms/server/TestKMSCacheKeyProvider.java => hadoop-common/src/test/java/org/apache/hadoop/crypto/key/TestCachingKeyProvider.java} (64%) delete mode 100644 hadoop-common-project/hadoop-kms/src/main/java/org/apache/hadoop/crypto/key/kms/server/KMSCacheKeyProvider.java diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt index 5805d005547..7bdfd17b5dd 100644 --- a/hadoop-common-project/hadoop-common/CHANGES.txt +++ b/hadoop-common-project/hadoop-common/CHANGES.txt @@ -183,6 +183,9 @@ Trunk (Unreleased) HADOOP-10842. CryptoExtension generateEncryptedKey method should receive the key name. (asuresh via tucu) + HADOOP-10750. KMSKeyProviderCache should be in hadoop-common. + (asuresh via tucu) + BUG FIXES HADOOP-9451. Fault single-layer config if node group topology is enabled. diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/CachingKeyProvider.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/CachingKeyProvider.java new file mode 100644 index 00000000000..057df33a61d --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/CachingKeyProvider.java @@ -0,0 +1,174 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.crypto.key; + +import java.io.IOException; +import java.security.NoSuchAlgorithmException; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; + +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.CacheLoader; +import com.google.common.cache.LoadingCache; + +/** + * A KeyProviderExtension implementation providing a short lived + * cache for KeyVersions and Metadatato avoid burst + * of requests to hit the underlying KeyProvider. + */ +public class CachingKeyProvider extends + KeyProviderExtension { + + static class CacheExtension implements KeyProviderExtension.Extension { + private final KeyProvider provider; + private LoadingCache keyVersionCache; + private LoadingCache currentKeyCache; + private LoadingCache keyMetadataCache; + + CacheExtension(KeyProvider prov, long keyTimeoutMillis, + long currKeyTimeoutMillis) { + this.provider = prov; + keyVersionCache = + CacheBuilder.newBuilder().expireAfterAccess(keyTimeoutMillis, + TimeUnit.MILLISECONDS) + .build(new CacheLoader() { + @Override + public KeyVersion load(String key) throws Exception { + KeyVersion kv = provider.getKeyVersion(key); + if (kv == null) { + throw new KeyNotFoundException(); + } + return kv; + } + }); + keyMetadataCache = + CacheBuilder.newBuilder().expireAfterAccess(keyTimeoutMillis, + TimeUnit.MILLISECONDS) + .build(new CacheLoader() { + @Override + public Metadata load(String key) throws Exception { + Metadata meta = provider.getMetadata(key); + if (meta == null) { + throw new KeyNotFoundException(); + } + return meta; + } + }); + currentKeyCache = + CacheBuilder.newBuilder().expireAfterWrite(currKeyTimeoutMillis, + TimeUnit.MILLISECONDS) + .build(new CacheLoader() { + @Override + public KeyVersion load(String key) throws Exception { + KeyVersion kv = provider.getCurrentKey(key); + if (kv == null) { + throw new KeyNotFoundException(); + } + return kv; + } + }); + } + } + + @SuppressWarnings("serial") + private static class KeyNotFoundException extends Exception { } + + public CachingKeyProvider(KeyProvider keyProvider, long keyTimeoutMillis, + long currKeyTimeoutMillis) { + super(keyProvider, new CacheExtension(keyProvider, keyTimeoutMillis, + currKeyTimeoutMillis)); + } + + @Override + public KeyVersion getCurrentKey(String name) throws IOException { + try { + return getExtension().currentKeyCache.get(name); + } catch (ExecutionException ex) { + Throwable cause = ex.getCause(); + if (cause instanceof KeyNotFoundException) { + return null; + } else if (cause instanceof IOException) { + throw (IOException) cause; + } else { + throw new IOException(cause); + } + } + } + + @Override + public KeyVersion getKeyVersion(String versionName) + throws IOException { + try { + return getExtension().keyVersionCache.get(versionName); + } catch (ExecutionException ex) { + Throwable cause = ex.getCause(); + if (cause instanceof KeyNotFoundException) { + return null; + } else if (cause instanceof IOException) { + throw (IOException) cause; + } else { + throw new IOException(cause); + } + } + } + + @Override + public void deleteKey(String name) throws IOException { + getKeyProvider().deleteKey(name); + getExtension().currentKeyCache.invalidate(name); + getExtension().keyMetadataCache.invalidate(name); + // invalidating all key versions as we don't know + // which ones belonged to the deleted key + getExtension().keyVersionCache.invalidateAll(); + } + + @Override + public KeyVersion rollNewVersion(String name, byte[] material) + throws IOException { + KeyVersion key = getKeyProvider().rollNewVersion(name, material); + getExtension().currentKeyCache.invalidate(name); + getExtension().keyMetadataCache.invalidate(name); + return key; + } + + @Override + public KeyVersion rollNewVersion(String name) + throws NoSuchAlgorithmException, IOException { + KeyVersion key = getKeyProvider().rollNewVersion(name); + getExtension().currentKeyCache.invalidate(name); + getExtension().keyMetadataCache.invalidate(name); + return key; + } + + @Override + public Metadata getMetadata(String name) throws IOException { + try { + return getExtension().keyMetadataCache.get(name); + } catch (ExecutionException ex) { + Throwable cause = ex.getCause(); + if (cause instanceof KeyNotFoundException) { + return null; + } else if (cause instanceof IOException) { + throw (IOException) cause; + } else { + throw new IOException(cause); + } + } + } + +} diff --git a/hadoop-common-project/hadoop-kms/src/test/java/org/apache/hadoop/crypto/key/kms/server/TestKMSCacheKeyProvider.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/key/TestCachingKeyProvider.java similarity index 64% rename from hadoop-common-project/hadoop-kms/src/test/java/org/apache/hadoop/crypto/key/kms/server/TestKMSCacheKeyProvider.java rename to hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/key/TestCachingKeyProvider.java index 72b219191bc..2eff6991c3d 100644 --- a/hadoop-common-project/hadoop-kms/src/test/java/org/apache/hadoop/crypto/key/kms/server/TestKMSCacheKeyProvider.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/key/TestCachingKeyProvider.java @@ -15,17 +15,16 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.hadoop.crypto.key.kms.server; +package org.apache.hadoop.crypto.key; + +import java.util.Date; -import org.apache.hadoop.crypto.key.KeyProvider; import org.apache.hadoop.crypto.key.kms.KMSClientProvider; import org.junit.Assert; import org.junit.Test; import org.mockito.Mockito; -import java.util.Date; - -public class TestKMSCacheKeyProvider { +public class TestCachingKeyProvider { @Test public void testCurrentKey() throws Exception { @@ -33,7 +32,7 @@ public class TestKMSCacheKeyProvider { KeyProvider mockProv = Mockito.mock(KeyProvider.class); Mockito.when(mockProv.getCurrentKey(Mockito.eq("k1"))).thenReturn(mockKey); Mockito.when(mockProv.getCurrentKey(Mockito.eq("k2"))).thenReturn(null); - KeyProvider cache = new KMSCacheKeyProvider(mockProv, 100); + KeyProvider cache = new CachingKeyProvider(mockProv, 100, 100); // asserting caching Assert.assertEquals(mockKey, cache.getCurrentKey("k1")); @@ -45,7 +44,7 @@ public class TestKMSCacheKeyProvider { Mockito.verify(mockProv, Mockito.times(2)).getCurrentKey(Mockito.eq("k1")); // asserting no caching when key is not known - cache = new KMSCacheKeyProvider(mockProv, 100); + cache = new CachingKeyProvider(mockProv, 100, 100); Assert.assertEquals(null, cache.getCurrentKey("k2")); Mockito.verify(mockProv, Mockito.times(1)).getCurrentKey(Mockito.eq("k2")); Assert.assertEquals(null, cache.getCurrentKey("k2")); @@ -56,25 +55,56 @@ public class TestKMSCacheKeyProvider { public void testKeyVersion() throws Exception { KeyProvider.KeyVersion mockKey = Mockito.mock(KeyProvider.KeyVersion.class); KeyProvider mockProv = Mockito.mock(KeyProvider.class); - Mockito.when(mockProv.getKeyVersion(Mockito.eq("k1@0"))).thenReturn(mockKey); + Mockito.when(mockProv.getKeyVersion(Mockito.eq("k1@0"))) + .thenReturn(mockKey); Mockito.when(mockProv.getKeyVersion(Mockito.eq("k2@0"))).thenReturn(null); - KeyProvider cache = new KMSCacheKeyProvider(mockProv, 100); + KeyProvider cache = new CachingKeyProvider(mockProv, 100, 100); // asserting caching Assert.assertEquals(mockKey, cache.getKeyVersion("k1@0")); - Mockito.verify(mockProv, Mockito.times(1)).getKeyVersion(Mockito.eq("k1@0")); + Mockito.verify(mockProv, Mockito.times(1)) + .getKeyVersion(Mockito.eq("k1@0")); Assert.assertEquals(mockKey, cache.getKeyVersion("k1@0")); - Mockito.verify(mockProv, Mockito.times(1)).getKeyVersion(Mockito.eq("k1@0")); + Mockito.verify(mockProv, Mockito.times(1)) + .getKeyVersion(Mockito.eq("k1@0")); Thread.sleep(200); Assert.assertEquals(mockKey, cache.getKeyVersion("k1@0")); - Mockito.verify(mockProv, Mockito.times(2)).getKeyVersion(Mockito.eq("k1@0")); + Mockito.verify(mockProv, Mockito.times(2)) + .getKeyVersion(Mockito.eq("k1@0")); // asserting no caching when key is not known - cache = new KMSCacheKeyProvider(mockProv, 100); + cache = new CachingKeyProvider(mockProv, 100, 100); Assert.assertEquals(null, cache.getKeyVersion("k2@0")); - Mockito.verify(mockProv, Mockito.times(1)).getKeyVersion(Mockito.eq("k2@0")); + Mockito.verify(mockProv, Mockito.times(1)) + .getKeyVersion(Mockito.eq("k2@0")); Assert.assertEquals(null, cache.getKeyVersion("k2@0")); - Mockito.verify(mockProv, Mockito.times(2)).getKeyVersion(Mockito.eq("k2@0")); + Mockito.verify(mockProv, Mockito.times(2)) + .getKeyVersion(Mockito.eq("k2@0")); + } + + @Test + public void testMetadata() throws Exception { + KeyProvider.Metadata mockMeta = Mockito.mock(KeyProvider.Metadata.class); + KeyProvider mockProv = Mockito.mock(KeyProvider.class); + Mockito.when(mockProv.getMetadata(Mockito.eq("k1"))).thenReturn(mockMeta); + Mockito.when(mockProv.getMetadata(Mockito.eq("k2"))).thenReturn(null); + KeyProvider cache = new CachingKeyProvider(mockProv, 100, 100); + + // asserting caching + Assert.assertEquals(mockMeta, cache.getMetadata("k1")); + Mockito.verify(mockProv, Mockito.times(1)).getMetadata(Mockito.eq("k1")); + Assert.assertEquals(mockMeta, cache.getMetadata("k1")); + Mockito.verify(mockProv, Mockito.times(1)).getMetadata(Mockito.eq("k1")); + Thread.sleep(200); + Assert.assertEquals(mockMeta, cache.getMetadata("k1")); + Mockito.verify(mockProv, Mockito.times(2)).getMetadata(Mockito.eq("k1")); + + // asserting no caching when key is not known + cache = new CachingKeyProvider(mockProv, 100, 100); + Assert.assertEquals(null, cache.getMetadata("k2")); + Mockito.verify(mockProv, Mockito.times(1)).getMetadata(Mockito.eq("k2")); + Assert.assertEquals(null, cache.getMetadata("k2")); + Mockito.verify(mockProv, Mockito.times(2)).getMetadata(Mockito.eq("k2")); } @Test @@ -82,7 +112,7 @@ public class TestKMSCacheKeyProvider { KeyProvider.KeyVersion mockKey = Mockito.mock(KeyProvider.KeyVersion.class); KeyProvider mockProv = Mockito.mock(KeyProvider.class); Mockito.when(mockProv.getCurrentKey(Mockito.eq("k1"))).thenReturn(mockKey); - KeyProvider cache = new KMSCacheKeyProvider(mockProv, 100); + KeyProvider cache = new CachingKeyProvider(mockProv, 100, 100); Assert.assertEquals(mockKey, cache.getCurrentKey("k1")); Mockito.verify(mockProv, Mockito.times(1)).getCurrentKey(Mockito.eq("k1")); cache.rollNewVersion("k1"); @@ -100,21 +130,23 @@ public class TestKMSCacheKeyProvider { KeyProvider.KeyVersion mockKey = Mockito.mock(KeyProvider.KeyVersion.class); KeyProvider mockProv = Mockito.mock(KeyProvider.class); Mockito.when(mockProv.getCurrentKey(Mockito.eq("k1"))).thenReturn(mockKey); - Mockito.when(mockProv.getKeyVersion(Mockito.eq("k1@0"))).thenReturn(mockKey); + Mockito.when(mockProv.getKeyVersion(Mockito.eq("k1@0"))) + .thenReturn(mockKey); Mockito.when(mockProv.getMetadata(Mockito.eq("k1"))).thenReturn( new KMSClientProvider.KMSMetadata("c", 0, "l", null, new Date(), 1)); - KeyProvider cache = new KMSCacheKeyProvider(mockProv, 100); + KeyProvider cache = new CachingKeyProvider(mockProv, 100, 100); Assert.assertEquals(mockKey, cache.getCurrentKey("k1")); Mockito.verify(mockProv, Mockito.times(1)).getCurrentKey(Mockito.eq("k1")); Assert.assertEquals(mockKey, cache.getKeyVersion("k1@0")); - Mockito.verify(mockProv, Mockito.times(1)).getKeyVersion(Mockito.eq("k1@0")); + Mockito.verify(mockProv, Mockito.times(1)) + .getKeyVersion(Mockito.eq("k1@0")); cache.deleteKey("k1"); // asserting the cache is purged Assert.assertEquals(mockKey, cache.getCurrentKey("k1")); Mockito.verify(mockProv, Mockito.times(2)).getCurrentKey(Mockito.eq("k1")); Assert.assertEquals(mockKey, cache.getKeyVersion("k1@0")); - Mockito.verify(mockProv, Mockito.times(2)).getKeyVersion(Mockito.eq("k1@0")); + Mockito.verify(mockProv, Mockito.times(2)) + .getKeyVersion(Mockito.eq("k1@0")); } - } diff --git a/hadoop-common-project/hadoop-kms/src/main/java/org/apache/hadoop/crypto/key/kms/server/KMSCacheKeyProvider.java b/hadoop-common-project/hadoop-kms/src/main/java/org/apache/hadoop/crypto/key/kms/server/KMSCacheKeyProvider.java deleted file mode 100644 index e453c16980d..00000000000 --- a/hadoop-common-project/hadoop-kms/src/main/java/org/apache/hadoop/crypto/key/kms/server/KMSCacheKeyProvider.java +++ /dev/null @@ -1,177 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.crypto.key.kms.server; - -import com.google.common.cache.CacheBuilder; -import com.google.common.cache.CacheLoader; -import com.google.common.cache.LoadingCache; -import org.apache.hadoop.crypto.key.KeyProvider; - -import java.io.IOException; -import java.security.NoSuchAlgorithmException; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeUnit; - -/** - * A KeyProvider proxy implementation providing a short lived - * cache for KeyVersions to avoid burst of requests to hit the - * underlying KeyProvider. - */ -public class KMSCacheKeyProvider extends KeyProvider { - private final KeyProvider provider; - private LoadingCache keyVersionCache; - private LoadingCache currentKeyCache; - - private static class KeyNotFoundException extends Exception { - private static final long serialVersionUID = 1L; - } - - public KMSCacheKeyProvider(KeyProvider prov, long timeoutMillis) { - this.provider = prov; - keyVersionCache = CacheBuilder.newBuilder().expireAfterAccess(timeoutMillis, - TimeUnit.MILLISECONDS).build(new CacheLoader() { - @Override - public KeyVersion load(String key) throws Exception { - KeyVersion kv = provider.getKeyVersion(key); - if (kv == null) { - throw new KeyNotFoundException(); - } - return kv; - } - }); - // for current key we don't want to go stale for more than 1 sec - currentKeyCache = CacheBuilder.newBuilder().expireAfterWrite(1000, - TimeUnit.MILLISECONDS).build(new CacheLoader() { - @Override - public KeyVersion load(String key) throws Exception { - KeyVersion kv = provider.getCurrentKey(key); - if (kv == null) { - throw new KeyNotFoundException(); - } - return kv; - } - }); - } - - @Override - public KeyVersion getCurrentKey(String name) throws IOException { - try { - return currentKeyCache.get(name); - } catch (ExecutionException ex) { - Throwable cause = ex.getCause(); - if (cause instanceof KeyNotFoundException) { - return null; - } else if (cause instanceof IOException) { - throw (IOException) cause; - } else { - throw new IOException(cause); - } - } - } - - @Override - public KeyVersion getKeyVersion(String versionName) - throws IOException { - try { - return keyVersionCache.get(versionName); - } catch (ExecutionException ex) { - Throwable cause = ex.getCause(); - if (cause instanceof KeyNotFoundException) { - return null; - } else if (cause instanceof IOException) { - throw (IOException) cause; - } else { - throw new IOException(cause); - } - } - } - - @Override - public List getKeys() throws IOException { - return provider.getKeys(); - } - - @Override - public List getKeyVersions(String name) - throws IOException { - return provider.getKeyVersions(name); - } - - @Override - public Metadata getMetadata(String name) throws IOException { - return provider.getMetadata(name); - } - - @Override - public KeyVersion createKey(String name, byte[] material, - Options options) throws IOException { - return provider.createKey(name, material, options); - } - - @Override - public KeyVersion createKey(String name, - Options options) - throws NoSuchAlgorithmException, IOException { - return provider.createKey(name, options); - } - - @Override - public void deleteKey(String name) throws IOException { - provider.deleteKey(name); - currentKeyCache.invalidate(name); - // invalidating all key versions as we don't know which ones belonged to the - // deleted key - keyVersionCache.invalidateAll(); - } - - @Override - public KeyVersion rollNewVersion(String name, byte[] material) - throws IOException { - KeyVersion key = provider.rollNewVersion(name, material); - currentKeyCache.invalidate(name); - return key; - } - - @Override - public KeyVersion rollNewVersion(String name) - throws NoSuchAlgorithmException, IOException { - KeyVersion key = provider.rollNewVersion(name); - currentKeyCache.invalidate(name); - return key; - } - - @Override - public void flush() throws IOException { - provider.flush(); - } - - @Override - public Metadata[] getKeysMetadata(String ... keyNames) - throws IOException { - return provider.getKeysMetadata(keyNames); - } - - @Override - public boolean isTransient() { - return provider.isTransient(); - } - -} diff --git a/hadoop-common-project/hadoop-kms/src/main/java/org/apache/hadoop/crypto/key/kms/server/KMSConfiguration.java b/hadoop-common-project/hadoop-kms/src/main/java/org/apache/hadoop/crypto/key/kms/server/KMSConfiguration.java index b2209d4cc77..e2b8fc4c093 100644 --- a/hadoop-common-project/hadoop-kms/src/main/java/org/apache/hadoop/crypto/key/kms/server/KMSConfiguration.java +++ b/hadoop-common-project/hadoop-kms/src/main/java/org/apache/hadoop/crypto/key/kms/server/KMSConfiguration.java @@ -34,9 +34,21 @@ public class KMSConfiguration { public static final String CONFIG_PREFIX = "hadoop.kms."; + // Property to Enable/Disable Caching + public static final String KEY_CACHE_ENABLE = CONFIG_PREFIX + + "cache.enable"; + // Timeout for the Key and Metadata Cache public static final String KEY_CACHE_TIMEOUT_KEY = CONFIG_PREFIX + "cache.timeout.ms"; - public static final long KEY_CACHE_TIMEOUT_DEFAULT = 10 * 1000; // 10 secs + // TImeout for the Current Key cache + public static final String CURR_KEY_CACHE_TIMEOUT_KEY = CONFIG_PREFIX + + "current.key.cache.timeout.ms"; + + public static final boolean KEY_CACHE_ENABLE_DEFAULT = true; + // 10 mins + public static final long KEY_CACHE_TIMEOUT_DEFAULT = 10 * 60 * 1000; + // 30 secs + public static final long CURR_KEY_CACHE_TIMEOUT_DEFAULT = 30 * 1000; static Configuration getConfiguration(boolean loadHadoopDefaults, String ... resources) { diff --git a/hadoop-common-project/hadoop-kms/src/main/java/org/apache/hadoop/crypto/key/kms/server/KMSWebApp.java b/hadoop-common-project/hadoop-kms/src/main/java/org/apache/hadoop/crypto/key/kms/server/KMSWebApp.java index 046753e55be..88ea8c4fa42 100644 --- a/hadoop-common-project/hadoop-kms/src/main/java/org/apache/hadoop/crypto/key/kms/server/KMSWebApp.java +++ b/hadoop-common-project/hadoop-kms/src/main/java/org/apache/hadoop/crypto/key/kms/server/KMSWebApp.java @@ -22,6 +22,7 @@ import com.codahale.metrics.Meter; import com.codahale.metrics.MetricRegistry; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.crypto.key.CachingKeyProvider; import org.apache.hadoop.crypto.key.KeyProvider; import org.apache.hadoop.crypto.key.KeyProviderFactory; import org.apache.hadoop.http.HttpServer2; @@ -150,10 +151,17 @@ public class KMSWebApp implements ServletContextListener { kmsConf.get(KeyProviderFactory.KEY_PROVIDER_PATH)); } keyProvider = providers.get(0); - long timeOutMillis = - kmsConf.getLong(KMSConfiguration.KEY_CACHE_TIMEOUT_KEY, - KMSConfiguration.KEY_CACHE_TIMEOUT_DEFAULT); - keyProvider = new KMSCacheKeyProvider(keyProvider, timeOutMillis); + if (kmsConf.getBoolean(KMSConfiguration.KEY_CACHE_ENABLE, + KMSConfiguration.KEY_CACHE_ENABLE_DEFAULT)) { + long keyTimeOutMillis = + kmsConf.getLong(KMSConfiguration.KEY_CACHE_TIMEOUT_KEY, + KMSConfiguration.KEY_CACHE_TIMEOUT_DEFAULT); + long currKeyTimeOutMillis = + kmsConf.getLong(KMSConfiguration.CURR_KEY_CACHE_TIMEOUT_KEY, + KMSConfiguration.CURR_KEY_CACHE_TIMEOUT_DEFAULT); + keyProvider = new CachingKeyProvider(keyProvider, keyTimeOutMillis, + currKeyTimeOutMillis); + } LOG.info("KMS Started"); } catch (Throwable ex) { diff --git a/hadoop-common-project/hadoop-kms/src/site/apt/index.apt.vm b/hadoop-common-project/hadoop-kms/src/site/apt/index.apt.vm index 329fd516e30..297d0325d01 100644 --- a/hadoop-common-project/hadoop-kms/src/site/apt/index.apt.vm +++ b/hadoop-common-project/hadoop-kms/src/site/apt/index.apt.vm @@ -72,22 +72,35 @@ Hadoop Key Management Server (KMS) - Documentation Sets ${project.version} KMS caches keys for short period of time to avoid excessive hits to the underlying key provider. - The cache is used with the following 2 methods only, <<>> - and <<>>. + The Cache is enabled by default (can be dissabled by setting the + <<>> boolean property to false) + + The cache is used with the following 3 methods only, <<>> + and <<>> and <<>>. For the <<>> method, cached entries are kept for a maximum - of 1000 millisecond regardless the number of times the key is being access + of 30000 millisecond regardless the number of times the key is being access (to avoid stale keys to be considered current). For the <<>> method, cached entries are kept with a default - inactivity timeout of 10000 milliseconds. This time out is configurable via - the following property in the <<>> configuration - file: + inactivity timeout of 600000 milliseconds (10 mins). This time out is + configurable via the following property in the <<>> + configuration file: +---+ + + hadoop.kms.cache.enable + true + + hadoop.kms.cache.timeout.ms - 10000 + 600000 + + + + hadoop.kms.current.key.cache.timeout.ms + 30000 +---+ From 8871d8ed9fb1e4b21943477dcbaa13ef22ea7b8e Mon Sep 17 00:00:00 2001 From: Karthik Kambatla Date: Sat, 19 Jul 2014 00:12:05 +0000 Subject: [PATCH 6/7] YARN-2244. FairScheduler missing handling of containers for unknown application attempts. (Anubhav Dhoot via kasha) git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1611840 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-yarn-project/CHANGES.txt | 3 + .../scheduler/AbstractYarnScheduler.java | 17 +++ .../scheduler/capacity/CapacityScheduler.java | 16 --- .../scheduler/fair/FairScheduler.java | 16 --- .../scheduler/fifo/FifoScheduler.java | 18 --- .../TestApplicationCleanup.java | 107 +++++++++++++----- 6 files changed, 98 insertions(+), 79 deletions(-) diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 99ecd9971f0..c1d351e0da6 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -71,6 +71,9 @@ Release 2.6.0 - UNRELEASED after RM recovery but before scheduler learns about apps and app-attempts. (Jian He via vinodkv) + YARN-2244. FairScheduler missing handling of containers for unknown + application attempts. (Anubhav Dhoot via kasha) + Release 2.5.0 - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java index b3e835a54d3..5764c8c7a65 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java @@ -123,6 +123,23 @@ public abstract class AbstractYarnScheduler return maximumAllocation; } + protected void containerLaunchedOnNode(ContainerId containerId, + SchedulerNode node) { + // Get the application for the finished container + SchedulerApplicationAttempt application = getCurrentAttemptForContainer + (containerId); + if (application == null) { + LOG.info("Unknown application " + + containerId.getApplicationAttemptId().getApplicationId() + + " launched container " + containerId + " on node: " + node); + this.rmContext.getDispatcher().getEventHandler() + .handle(new RMNodeCleanContainerEvent(node.getNodeID(), containerId)); + return; + } + + application.containerLaunchedOnNode(containerId, node.getNodeID()); + } + public T getApplicationAttempt(ApplicationAttemptId applicationAttemptId) { SchedulerApplication app = applications.get(applicationAttemptId.getApplicationId()); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index 6d26519ff98..26812388aaf 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -69,7 +69,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; -import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation; @@ -866,21 +865,6 @@ public class CapacityScheduler extends } - private void containerLaunchedOnNode(ContainerId containerId, FiCaSchedulerNode node) { - // Get the application for the finished container - FiCaSchedulerApp application = getCurrentAttemptForContainer(containerId); - if (application == null) { - LOG.info("Unknown application " - + containerId.getApplicationAttemptId().getApplicationId() - + " launched container " + containerId + " on node: " + node); - this.rmContext.getDispatcher().getEventHandler() - .handle(new RMNodeCleanContainerEvent(node.getNodeID(), containerId)); - return; - } - - application.containerLaunchedOnNode(containerId, node.getNodeID()); - } - @Override public void handle(SchedulerEvent event) { switch(event.getType()) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java index 3a847ce7589..18ccf9d8a93 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java @@ -928,22 +928,6 @@ public class FairScheduler extends } } - /** - * Process a container which has launched on a node, as reported by the node. - */ - private void containerLaunchedOnNode(ContainerId containerId, FSSchedulerNode node) { - // Get the application for the finished container - FSSchedulerApp application = getCurrentAttemptForContainer(containerId); - if (application == null) { - LOG.info("Unknown application " - + containerId.getApplicationAttemptId().getApplicationId() - + " launched container " + containerId + " on node: " + node); - return; - } - - application.containerLaunchedOnNode(containerId, node.getNodeID()); - } - /** * Process a heartbeat update from a node. */ diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java index 571d0558c04..518a8d9d3b6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java @@ -66,7 +66,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; -import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager; @@ -831,23 +830,6 @@ public class FifoScheduler extends } } - private void containerLaunchedOnNode(ContainerId containerId, FiCaSchedulerNode node) { - // Get the application for the finished container - FiCaSchedulerApp application = getCurrentAttemptForContainer(containerId); - if (application == null) { - LOG.info("Unknown application " - + containerId.getApplicationAttemptId().getApplicationId() - + " launched container " + containerId + " on node: " + node); - // Some unknown container sneaked into the system. Kill it. - this.rmContext.getDispatcher().getEventHandler() - .handle(new RMNodeCleanContainerEvent(node.getNodeID(), containerId)); - - return; - } - - application.containerLaunchedOnNode(containerId, node.getNodeID()); - } - @Lock(FifoScheduler.class) private synchronized void containerCompleted(RMContainer rmContainer, ContainerStatus containerStatus, RMContainerEventType event) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationCleanup.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationCleanup.java index 45ccd1c3016..e88ebd24f3b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationCleanup.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationCleanup.java @@ -232,20 +232,7 @@ public class TestApplicationCleanup { containerStatuses.put(app.getApplicationId(), containerStatusList); NodeHeartbeatResponse resp = nm1.nodeHeartbeat(containerStatuses, true); - dispatcher.await(); - List contsToClean = resp.getContainersToCleanup(); - int cleanedConts = contsToClean.size(); - waitCount = 0; - while (cleanedConts < 1 && waitCount++ < 200) { - LOG.info("Waiting to get cleanup events.. cleanedConts: " + cleanedConts); - Thread.sleep(100); - resp = nm1.nodeHeartbeat(true); - dispatcher.await(); - contsToClean = resp.getContainersToCleanup(); - cleanedConts += contsToClean.size(); - } - LOG.info("Got cleanup for " + contsToClean.get(0)); - Assert.assertEquals(1, cleanedConts); + waitForContainerCleanup(dispatcher, nm1, resp); // Now to test the case when RM already gave cleanup, and NM suddenly // realizes that the container is running. @@ -258,26 +245,36 @@ public class TestApplicationCleanup { containerStatuses.put(app.getApplicationId(), containerStatusList); resp = nm1.nodeHeartbeat(containerStatuses, true); - dispatcher.await(); - contsToClean = resp.getContainersToCleanup(); - cleanedConts = contsToClean.size(); // The cleanup list won't be instantaneous as it is given out by scheduler // and not RMNodeImpl. - waitCount = 0; - while (cleanedConts < 1 && waitCount++ < 200) { - LOG.info("Waiting to get cleanup events.. cleanedConts: " + cleanedConts); - Thread.sleep(100); - resp = nm1.nodeHeartbeat(true); - dispatcher.await(); - contsToClean = resp.getContainersToCleanup(); - cleanedConts += contsToClean.size(); - } - LOG.info("Got cleanup for " + contsToClean.get(0)); - Assert.assertEquals(1, cleanedConts); + waitForContainerCleanup(dispatcher, nm1, resp); rm.stop(); } - + + protected void waitForContainerCleanup(DrainDispatcher dispatcher, MockNM nm, + NodeHeartbeatResponse resp) throws Exception { + int waitCount = 0, cleanedConts = 0; + List contsToClean; + do { + dispatcher.await(); + contsToClean = resp.getContainersToCleanup(); + cleanedConts += contsToClean.size(); + if (cleanedConts >= 1) { + break; + } + Thread.sleep(100); + resp = nm.nodeHeartbeat(true); + } while(waitCount++ < 200); + + if (contsToClean.isEmpty()) { + LOG.error("Failed to get any containers to cleanup"); + } else { + LOG.info("Got cleanup for " + contsToClean.get(0)); + } + Assert.assertEquals(1, cleanedConts); + } + private void waitForAppCleanupMessageRecved(MockNM nm, ApplicationId appId) throws Exception { while (true) { @@ -400,6 +397,58 @@ public class TestApplicationCleanup { rm2.stop(); } + @SuppressWarnings("resource") + @Test (timeout = 60000) + public void testContainerCleanupWhenRMRestartedAppNotRegistered() throws + Exception { + conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 1); + MemoryRMStateStore memStore = new MemoryRMStateStore(); + memStore.init(conf); + + // start RM + final DrainDispatcher dispatcher = new DrainDispatcher(); + MockRM rm1 = new MockRM(conf, memStore) { + @Override + protected Dispatcher createDispatcher() { + return dispatcher; + } + }; + rm1.start(); + MockNM nm1 = + new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService()); + nm1.registerNode(); + + // create app and launch the AM + RMApp app0 = rm1.submitApp(200); + MockAM am0 = launchAM(app0, rm1, nm1); + nm1.nodeHeartbeat(am0.getApplicationAttemptId(), 1, ContainerState.RUNNING); + rm1.waitForState(app0.getApplicationId(), RMAppState.RUNNING); + + // start new RM + final DrainDispatcher dispatcher2 = new DrainDispatcher(); + MockRM rm2 = new MockRM(conf, memStore) { + @Override + protected Dispatcher createDispatcher() { + return dispatcher2; + } + }; + rm2.start(); + + // nm1 register to rm2, and do a heartbeat + nm1.setResourceTrackerService(rm2.getResourceTrackerService()); + nm1.registerNode(Arrays.asList(app0.getApplicationId())); + rm2.waitForState(app0.getApplicationId(), RMAppState.ACCEPTED); + + // Add unknown container for application unknown to scheduler + NodeHeartbeatResponse response = nm1.nodeHeartbeat(am0 + .getApplicationAttemptId(), 2, ContainerState.RUNNING); + + waitForContainerCleanup(dispatcher2, nm1, response); + + rm1.stop(); + rm2.stop(); + } + public static void main(String[] args) throws Exception { TestApplicationCleanup t = new TestApplicationCleanup(); t.testAppCleanup(); From 7c71a3b876f18510fd10a0b08bf27f83ac3aa389 Mon Sep 17 00:00:00 2001 From: Sanford Ryza Date: Mon, 21 Jul 2014 05:00:27 +0000 Subject: [PATCH 7/7] YARN-2323. FairShareComparator creates too many Resource objects (Hong Zhiguo via Sandy Ryza) git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1612187 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-yarn-project/CHANGES.txt | 3 +++ .../scheduler/fair/policies/FairSharePolicy.java | 6 +++--- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index c1d351e0da6..e1fa4445d55 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -51,6 +51,9 @@ Release 2.6.0 - UNRELEASED YARN-2208. AMRMTokenManager need to have a way to roll over AMRMToken. (xgong) + YARN-2323. FairShareComparator creates too many Resource objects (Hong Zhiguo + via Sandy Ryza) + OPTIMIZATIONS BUG FIXES diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java index 5976cea5230..c51852fa9d6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java @@ -65,6 +65,7 @@ public class FairSharePolicy extends SchedulingPolicy { private static class FairShareComparator implements Comparator, Serializable { private static final long serialVersionUID = 5564969375856699313L; + private static final Resource ONE = Resources.createResource(1); @Override public int compare(Schedulable s1, Schedulable s2) { @@ -78,11 +79,10 @@ public class FairSharePolicy extends SchedulingPolicy { s1.getResourceUsage(), minShare1); boolean s2Needy = Resources.lessThan(RESOURCE_CALCULATOR, null, s2.getResourceUsage(), minShare2); - Resource one = Resources.createResource(1); minShareRatio1 = (double) s1.getResourceUsage().getMemory() - / Resources.max(RESOURCE_CALCULATOR, null, minShare1, one).getMemory(); + / Resources.max(RESOURCE_CALCULATOR, null, minShare1, ONE).getMemory(); minShareRatio2 = (double) s2.getResourceUsage().getMemory() - / Resources.max(RESOURCE_CALCULATOR, null, minShare2, one).getMemory(); + / Resources.max(RESOURCE_CALCULATOR, null, minShare2, ONE).getMemory(); useToWeightRatio1 = s1.getResourceUsage().getMemory() / s1.getWeights().getWeight(ResourceType.MEMORY); useToWeightRatio2 = s2.getResourceUsage().getMemory() /