diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt index 7f0568f5333..36949a2238b 100644 --- a/hadoop-common-project/hadoop-common/CHANGES.txt +++ b/hadoop-common-project/hadoop-common/CHANGES.txt @@ -120,6 +120,9 @@ Release 2.4.0 - UNRELEASED HADOOP-10337 ConcurrentModificationException from MetricsDynamicMBeanBase.createMBeanInfo() (Liang Xie via stack) + HADOOP-10407. Fix the javac warnings in org.apache.hadoop.ipc package. + (szetszwo) + BREAKDOWN OF HADOOP-10184 SUBTASKS AND RELATED JIRAS HADOOP-10185. FileSystem API for ACLs. (cnauroth) diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/CallQueueManager.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/CallQueueManager.java index dae7ace4bd8..27949d00713 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/CallQueueManager.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/CallQueueManager.java @@ -18,15 +18,13 @@ package org.apache.hadoop.ipc; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.atomic.AtomicReference; -import java.util.concurrent.TimeUnit; - import java.lang.reflect.Constructor; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; - import org.apache.hadoop.conf.Configuration; /** @@ -35,13 +33,19 @@ import org.apache.hadoop.conf.Configuration; public class CallQueueManager { public static final Log LOG = LogFactory.getLog(CallQueueManager.class); + @SuppressWarnings("unchecked") + static Class> convertQueueClass( + Class queneClass, Class elementClass) { + return (Class>)queneClass; + } + // Atomic refs point to active callQueue // We have two so we can better control swapping private final AtomicReference> putRef; private final AtomicReference> takeRef; - public CallQueueManager(Class backingClass, int maxQueueSize, - String namespace, Configuration conf) { + public CallQueueManager(Class> backingClass, + int maxQueueSize, String namespace, Configuration conf) { BlockingQueue bq = createCallQueueInstance(backingClass, maxQueueSize, namespace, conf); this.putRef = new AtomicReference>(bq); @@ -49,15 +53,14 @@ public class CallQueueManager { LOG.info("Using callQueue " + backingClass); } - @SuppressWarnings("unchecked") - private BlockingQueue createCallQueueInstance(Class theClass, int maxLen, - String ns, Configuration conf) { + private > T createCallQueueInstance( + Class theClass, int maxLen, String ns, Configuration conf) { // Used for custom, configurable callqueues try { - Constructor ctor = theClass.getDeclaredConstructor(int.class, String.class, + Constructor ctor = theClass.getDeclaredConstructor(int.class, String.class, Configuration.class); - return (BlockingQueue)ctor.newInstance(maxLen, ns, conf); + return ctor.newInstance(maxLen, ns, conf); } catch (RuntimeException e) { throw e; } catch (Exception e) { @@ -65,8 +68,8 @@ public class CallQueueManager { // Used for LinkedBlockingQueue, ArrayBlockingQueue, etc try { - Constructor ctor = theClass.getDeclaredConstructor(int.class); - return (BlockingQueue)ctor.newInstance(maxLen); + Constructor ctor = theClass.getDeclaredConstructor(int.class); + return ctor.newInstance(maxLen); } catch (RuntimeException e) { throw e; } catch (Exception e) { @@ -74,8 +77,8 @@ public class CallQueueManager { // Last attempt try { - Constructor ctor = theClass.getDeclaredConstructor(); - return (BlockingQueue)ctor.newInstance(); + Constructor ctor = theClass.getDeclaredConstructor(); + return ctor.newInstance(); } catch (RuntimeException e) { throw e; } catch (Exception e) { @@ -117,8 +120,9 @@ public class CallQueueManager { * Replaces active queue with the newly requested one and transfers * all calls to the newQ before returning. */ - public synchronized void swapQueue(Class queueClassToUse, int maxSize, - String ns, Configuration conf) { + public synchronized void swapQueue( + Class> queueClassToUse, int maxSize, + String ns, Configuration conf) { BlockingQueue newQ = createCallQueueInstance(queueClassToUse, maxSize, ns, conf); @@ -143,7 +147,7 @@ public class CallQueueManager { * This doesn't mean the queue might not fill up at some point later, but * it should decrease the probability that we lose a call this way. */ - private boolean queueIsReallyEmpty(BlockingQueue q) { + private boolean queueIsReallyEmpty(BlockingQueue q) { boolean wasEmpty = q.isEmpty(); try { Thread.sleep(10); diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java index d7cadbe4303..c36c0e4e112 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java @@ -541,7 +541,7 @@ public class Client { } private synchronized AuthMethod setupSaslConnection(final InputStream in2, - final OutputStream out2) throws IOException, InterruptedException { + final OutputStream out2) throws IOException { // Do not use Client.conf here! We must use ConnectionId.conf, since the // Client object is cached and shared between all RPC clients, even those // for separate services. diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java index 2fe248724eb..95c8c975740 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java @@ -18,6 +18,11 @@ package org.apache.hadoop.ipc; +import static org.apache.hadoop.ipc.RpcConstants.AUTHORIZATION_FAILED_CALL_ID; +import static org.apache.hadoop.ipc.RpcConstants.CONNECTION_CONTEXT_CALL_ID; +import static org.apache.hadoop.ipc.RpcConstants.CURRENT_VERSION; +import static org.apache.hadoop.ipc.RpcConstants.PING_CALL_ID; + import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.DataInputStream; @@ -75,8 +80,6 @@ import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.io.DataOutputBuffer; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableUtils; -import static org.apache.hadoop.ipc.RpcConstants.*; - import org.apache.hadoop.ipc.ProtobufRpcEngine.RpcResponseMessageWrapper; import org.apache.hadoop.ipc.ProtobufRpcEngine.RpcResponseWrapper; import org.apache.hadoop.ipc.RPC.RpcInvoker; @@ -467,17 +470,24 @@ public abstract class Server { return serviceAuthorizationManager; } + static Class> getQueueClass( + String prefix, Configuration conf) { + String name = prefix + "." + CommonConfigurationKeys.IPC_CALLQUEUE_IMPL_KEY; + Class queueClass = conf.getClass(name, LinkedBlockingQueue.class); + return CallQueueManager.convertQueueClass(queueClass, Call.class); + } + + private String getQueueClassPrefix() { + return CommonConfigurationKeys.IPC_CALLQUEUE_NAMESPACE + "." + port; + } + /* * Refresh the call queue */ public synchronized void refreshCallQueue(Configuration conf) { // Create the next queue - String prefix = CommonConfigurationKeys.IPC_CALLQUEUE_NAMESPACE + "." + - this.port; - Class queueClassToUse = conf.getClass(prefix + "." + - CommonConfigurationKeys.IPC_CALLQUEUE_IMPL_KEY, LinkedBlockingQueue.class); - - callQueue.swapQueue(queueClassToUse, maxQueueSize, prefix, conf); + String prefix = getQueueClassPrefix(); + callQueue.swapQueue(getQueueClass(prefix, conf), maxQueueSize, prefix, conf); } /** A call queued for handling. */ @@ -1225,9 +1235,9 @@ public abstract class Server { Throwable cause = e; while (cause != null) { if (cause instanceof RetriableException) { - return (RetriableException) cause; + return cause; } else if (cause instanceof StandbyException) { - return (StandbyException) cause; + return cause; } else if (cause instanceof InvalidToken) { // FIXME: hadoop method signatures are restricting the SASL // callbacks to only returning InvalidToken, but some services @@ -1297,7 +1307,7 @@ public abstract class Server { private RpcSaslProto processSaslMessage(RpcSaslProto saslMessage) throws IOException, InterruptedException { - RpcSaslProto saslResponse = null; + final RpcSaslProto saslResponse; final SaslState state = saslMessage.getState(); // required switch (state) { case NEGOTIATE: { @@ -1333,27 +1343,18 @@ public abstract class Server { // SIMPLE is a legit option above. we will send no response if (authMethod == AuthMethod.SIMPLE) { switchToSimple(); + saslResponse = null; break; } // sasl server for tokens may already be instantiated if (saslServer == null || authMethod != AuthMethod.TOKEN) { saslServer = createSaslServer(authMethod); } - // fallthru to process sasl token + saslResponse = processSaslToken(saslMessage); + break; } case RESPONSE: { - if (!saslMessage.hasToken()) { - throw new SaslException("Client did not send a token"); - } - byte[] saslToken = saslMessage.getToken().toByteArray(); - if (LOG.isDebugEnabled()) { - LOG.debug("Have read input token of size " + saslToken.length - + " for processing by saslServer.evaluateResponse()"); - } - saslToken = saslServer.evaluateResponse(saslToken); - saslResponse = buildSaslResponse( - saslServer.isComplete() ? SaslState.SUCCESS : SaslState.CHALLENGE, - saslToken); + saslResponse = processSaslToken(saslMessage); break; } default: @@ -1362,6 +1363,22 @@ public abstract class Server { return saslResponse; } + private RpcSaslProto processSaslToken(RpcSaslProto saslMessage) + throws SaslException { + if (!saslMessage.hasToken()) { + throw new SaslException("Client did not send a token"); + } + byte[] saslToken = saslMessage.getToken().toByteArray(); + if (LOG.isDebugEnabled()) { + LOG.debug("Have read input token of size " + saslToken.length + + " for processing by saslServer.evaluateResponse()"); + } + saslToken = saslServer.evaluateResponse(saslToken); + return buildSaslResponse( + saslServer.isComplete() ? SaslState.SUCCESS : SaslState.CHALLENGE, + saslToken); + } + private void switchToSimple() { // disable SASL and blank out any SASL server authProtocol = AuthProtocol.NONE; @@ -2123,12 +2140,9 @@ public abstract class Server { CommonConfigurationKeys.IPC_SERVER_RPC_READ_CONNECTION_QUEUE_SIZE_DEFAULT); // Setup appropriate callqueue - String prefix = CommonConfigurationKeys.IPC_CALLQUEUE_NAMESPACE + "." + - this.port; - Class queueClassToUse = conf.getClass(prefix + "." + - CommonConfigurationKeys.IPC_CALLQUEUE_IMPL_KEY, LinkedBlockingQueue.class); - this.callQueue = new CallQueueManager(queueClassToUse, maxQueueSize, - prefix, conf); + final String prefix = getQueueClassPrefix(); + this.callQueue = new CallQueueManager(getQueueClass(prefix, conf), + maxQueueSize, prefix, conf); this.secretManager = (SecretManager) secretManager; this.authorize = diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestCallQueueManager.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestCallQueueManager.java index 46435f4a675..446edfb2f8a 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestCallQueueManager.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestCallQueueManager.java @@ -18,22 +18,15 @@ package org.apache.hadoop.ipc; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.HashMap; -import java.util.ArrayList; - import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; -import org.junit.Assert; -import org.junit.Assume; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; + import org.junit.Test; -import org.junit.Before; -import org.junit.After; public class TestCallQueueManager { private CallQueueManager manager; @@ -146,23 +139,26 @@ public class TestCallQueueManager { } + private static final Class> queueClass + = CallQueueManager.convertQueueClass(LinkedBlockingQueue.class, FakeCall.class); + @Test public void testCallQueueCapacity() throws InterruptedException { - manager = new CallQueueManager(LinkedBlockingQueue.class, 10, "", null); + manager = new CallQueueManager(queueClass, 10, "", null); assertCanPut(manager, 10, 20); // Will stop at 10 due to capacity } @Test public void testEmptyConsume() throws InterruptedException { - manager = new CallQueueManager(LinkedBlockingQueue.class, 10, "", null); + manager = new CallQueueManager(queueClass, 10, "", null); assertCanTake(manager, 0, 1); // Fails since it's empty } @Test(timeout=60000) public void testSwapUnderContention() throws InterruptedException { - manager = new CallQueueManager(LinkedBlockingQueue.class, 5000, "", null); + manager = new CallQueueManager(queueClass, 5000, "", null); ArrayList producers = new ArrayList(); ArrayList consumers = new ArrayList(); @@ -191,7 +187,7 @@ public class TestCallQueueManager { Thread.sleep(10); for (int i=0; i < 5; i++) { - manager.swapQueue(LinkedBlockingQueue.class, 5000, "", null); + manager.swapQueue(queueClass, 5000, "", null); } // Stop the producers