diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt index ac795326ffa..1389f48734e 100644 --- a/hadoop-common-project/hadoop-common/CHANGES.txt +++ b/hadoop-common-project/hadoop-common/CHANGES.txt @@ -96,6 +96,9 @@ Release 2.5.0 - UNRELEASED HADOOP-10540. Datanode upgrade in Windows fails with hardlink error. (Chris Nauroth and Arpit Agarwal) + HADOOP-10508. RefreshCallQueue fails when authorization is enabled. + (Chris Li via wheat9) + Release 2.4.1 - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java index bf3b6cae40d..676ab814c98 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java @@ -139,6 +139,9 @@ public class CommonConfigurationKeys extends CommonConfigurationKeysPublic { public static final String HADOOP_SECURITY_SERVICE_AUTHORIZATION_REFRESH_USER_MAPPINGS = "security.refresh.user.mappings.protocol.acl"; + public static final String + HADOOP_SECURITY_SERVICE_AUTHORIZATION_REFRESH_CALLQUEUE = + "security.refresh.callqueue.protocol.acl"; public static final String SECURITY_HA_SERVICE_PROTOCOL_ACL = "security.ha.service.protocol.acl"; public static final String diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HDFSPolicyProvider.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HDFSPolicyProvider.java index 7571128bd00..d648a6f2325 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HDFSPolicyProvider.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HDFSPolicyProvider.java @@ -32,6 +32,7 @@ import org.apache.hadoop.security.authorize.PolicyProvider; import org.apache.hadoop.security.authorize.RefreshAuthorizationPolicyProtocol; import org.apache.hadoop.security.authorize.Service; import org.apache.hadoop.tools.GetUserMappingsProtocol; +import org.apache.hadoop.ipc.RefreshCallQueueProtocol; /** * {@link PolicyProvider} for HDFS protocols. @@ -64,7 +65,10 @@ public class HDFSPolicyProvider extends PolicyProvider { RefreshUserMappingsProtocol.class), new Service( CommonConfigurationKeys.HADOOP_SECURITY_SERVICE_AUTHORIZATION_GET_USER_MAPPINGS, - GetUserMappingsProtocol.class) + GetUserMappingsProtocol.class), + new Service( + CommonConfigurationKeys.HADOOP_SECURITY_SERVICE_AUTHORIZATION_REFRESH_CALLQUEUE, + RefreshCallQueueProtocol.class) }; @Override diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java index 4a8bff371fa..04180224cea 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java @@ -1056,7 +1056,7 @@ public class DFSAdmin extends FsShell { NameNodeProxies.createProxy(conf, FileSystem.getDefaultUri(conf), RefreshCallQueueProtocol.class).getProxy(); - // Refresh the user-to-groups mappings + // Refresh the call queue refreshProtocol.refreshCallQueue(); return 0; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/TestRefreshCallQueue.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/TestRefreshCallQueue.java new file mode 100644 index 00000000000..9b0acbc8ca6 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/TestRefreshCallQueue.java @@ -0,0 +1,112 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.io.IOException; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.tools.DFSAdmin; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +public class TestRefreshCallQueue { + private MiniDFSCluster cluster; + private Configuration config; + private FileSystem fs; + static int mockQueueConstructions; + static int mockQueuePuts; + private static final int NNPort = 54222; + private static String CALLQUEUE_CONFIG_KEY = "ipc." + NNPort + ".callqueue.impl"; + + @Before + public void setUp() throws Exception { + // We want to count additional events, so we reset here + mockQueueConstructions = 0; + mockQueuePuts = 0; + + config = new Configuration(); + config.setClass(CALLQUEUE_CONFIG_KEY, + MockCallQueue.class, BlockingQueue.class); + config.set("hadoop.security.authorization", "true"); + + FileSystem.setDefaultUri(config, "hdfs://localhost:" + NNPort); + fs = FileSystem.get(config); + cluster = new MiniDFSCluster.Builder(config).nameNodePort(NNPort).build(); + cluster.waitActive(); + } + + @After + public void tearDown() throws Exception { + if(cluster!=null) { + cluster.shutdown(); + } + } + + @SuppressWarnings("serial") + public static class MockCallQueue extends LinkedBlockingQueue { + public MockCallQueue(int cap, String ns, Configuration conf) { + super(cap); + mockQueueConstructions++; + } + + public void put(E e) throws InterruptedException { + super.put(e); + mockQueuePuts++; + } + } + + // Returns true if mock queue was used for put + public boolean canPutInMockQueue() throws IOException { + int putsBefore = mockQueuePuts; + fs.exists(new Path("/")); // Make an RPC call + return mockQueuePuts > putsBefore; + } + + @Test + public void testRefresh() throws Exception { + assertTrue("Mock queue should have been constructed", mockQueueConstructions > 0); + assertTrue("Puts are routed through MockQueue", canPutInMockQueue()); + int lastMockQueueConstructions = mockQueueConstructions; + + // Replace queue with the queue specified in core-site.xml, which would be the LinkedBlockingQueue + DFSAdmin admin = new DFSAdmin(config); + String [] args = new String[]{"-refreshCallQueue"}; + int exitCode = admin.run(args); + assertEquals("DFSAdmin should return 0", 0, exitCode); + + assertEquals("Mock queue should have no additional constructions", lastMockQueueConstructions, mockQueueConstructions); + try { + assertFalse("Puts are routed through LBQ instead of MockQueue", canPutInMockQueue()); + } catch (IOException ioe){ + fail("Could not put into queue at all"); + } + } + +} \ No newline at end of file