svn merge -c 1577710 from trunk for HADOOP-10407. Fix the javac warnings in org.apache.hadoop.ipc package.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1577720 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Tsz-wo Sze 2014-03-14 21:19:29 +00:00
parent c7a6775d94
commit a537b9745d
5 changed files with 83 additions and 66 deletions

View File

@ -120,6 +120,9 @@ Release 2.4.0 - UNRELEASED
HADOOP-10337 ConcurrentModificationException from HADOOP-10337 ConcurrentModificationException from
MetricsDynamicMBeanBase.createMBeanInfo() (Liang Xie via stack) MetricsDynamicMBeanBase.createMBeanInfo() (Liang Xie via stack)
HADOOP-10407. Fix the javac warnings in org.apache.hadoop.ipc package.
(szetszwo)
BREAKDOWN OF HADOOP-10184 SUBTASKS AND RELATED JIRAS BREAKDOWN OF HADOOP-10184 SUBTASKS AND RELATED JIRAS
HADOOP-10185. FileSystem API for ACLs. (cnauroth) HADOOP-10185. FileSystem API for ACLs. (cnauroth)

View File

@ -18,15 +18,13 @@
package org.apache.hadoop.ipc; package org.apache.hadoop.ipc;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.TimeUnit;
import java.lang.reflect.Constructor; import java.lang.reflect.Constructor;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
/** /**
@ -35,13 +33,19 @@ import org.apache.hadoop.conf.Configuration;
public class CallQueueManager<E> { public class CallQueueManager<E> {
public static final Log LOG = LogFactory.getLog(CallQueueManager.class); public static final Log LOG = LogFactory.getLog(CallQueueManager.class);
@SuppressWarnings("unchecked")
static <E> Class<? extends BlockingQueue<E>> convertQueueClass(
Class<?> queneClass, Class<E> elementClass) {
return (Class<? extends BlockingQueue<E>>)queneClass;
}
// Atomic refs point to active callQueue // Atomic refs point to active callQueue
// We have two so we can better control swapping // We have two so we can better control swapping
private final AtomicReference<BlockingQueue<E>> putRef; private final AtomicReference<BlockingQueue<E>> putRef;
private final AtomicReference<BlockingQueue<E>> takeRef; private final AtomicReference<BlockingQueue<E>> takeRef;
public CallQueueManager(Class backingClass, int maxQueueSize, public CallQueueManager(Class<? extends BlockingQueue<E>> backingClass,
String namespace, Configuration conf) { int maxQueueSize, String namespace, Configuration conf) {
BlockingQueue<E> bq = createCallQueueInstance(backingClass, BlockingQueue<E> bq = createCallQueueInstance(backingClass,
maxQueueSize, namespace, conf); maxQueueSize, namespace, conf);
this.putRef = new AtomicReference<BlockingQueue<E>>(bq); this.putRef = new AtomicReference<BlockingQueue<E>>(bq);
@ -49,15 +53,14 @@ public class CallQueueManager<E> {
LOG.info("Using callQueue " + backingClass); LOG.info("Using callQueue " + backingClass);
} }
@SuppressWarnings("unchecked") private <T extends BlockingQueue<E>> T createCallQueueInstance(
private BlockingQueue<E> createCallQueueInstance(Class theClass, int maxLen, Class<T> theClass, int maxLen, String ns, Configuration conf) {
String ns, Configuration conf) {
// Used for custom, configurable callqueues // Used for custom, configurable callqueues
try { try {
Constructor ctor = theClass.getDeclaredConstructor(int.class, String.class, Constructor<T> ctor = theClass.getDeclaredConstructor(int.class, String.class,
Configuration.class); Configuration.class);
return (BlockingQueue<E>)ctor.newInstance(maxLen, ns, conf); return ctor.newInstance(maxLen, ns, conf);
} catch (RuntimeException e) { } catch (RuntimeException e) {
throw e; throw e;
} catch (Exception e) { } catch (Exception e) {
@ -65,8 +68,8 @@ public class CallQueueManager<E> {
// Used for LinkedBlockingQueue, ArrayBlockingQueue, etc // Used for LinkedBlockingQueue, ArrayBlockingQueue, etc
try { try {
Constructor ctor = theClass.getDeclaredConstructor(int.class); Constructor<T> ctor = theClass.getDeclaredConstructor(int.class);
return (BlockingQueue<E>)ctor.newInstance(maxLen); return ctor.newInstance(maxLen);
} catch (RuntimeException e) { } catch (RuntimeException e) {
throw e; throw e;
} catch (Exception e) { } catch (Exception e) {
@ -74,8 +77,8 @@ public class CallQueueManager<E> {
// Last attempt // Last attempt
try { try {
Constructor ctor = theClass.getDeclaredConstructor(); Constructor<T> ctor = theClass.getDeclaredConstructor();
return (BlockingQueue<E>)ctor.newInstance(); return ctor.newInstance();
} catch (RuntimeException e) { } catch (RuntimeException e) {
throw e; throw e;
} catch (Exception e) { } catch (Exception e) {
@ -117,8 +120,9 @@ public class CallQueueManager<E> {
* Replaces active queue with the newly requested one and transfers * Replaces active queue with the newly requested one and transfers
* all calls to the newQ before returning. * all calls to the newQ before returning.
*/ */
public synchronized void swapQueue(Class queueClassToUse, int maxSize, public synchronized void swapQueue(
String ns, Configuration conf) { Class<? extends BlockingQueue<E>> queueClassToUse, int maxSize,
String ns, Configuration conf) {
BlockingQueue<E> newQ = createCallQueueInstance(queueClassToUse, maxSize, BlockingQueue<E> newQ = createCallQueueInstance(queueClassToUse, maxSize,
ns, conf); ns, conf);
@ -143,7 +147,7 @@ public class CallQueueManager<E> {
* This doesn't mean the queue might not fill up at some point later, but * This doesn't mean the queue might not fill up at some point later, but
* it should decrease the probability that we lose a call this way. * it should decrease the probability that we lose a call this way.
*/ */
private boolean queueIsReallyEmpty(BlockingQueue q) { private boolean queueIsReallyEmpty(BlockingQueue<?> q) {
boolean wasEmpty = q.isEmpty(); boolean wasEmpty = q.isEmpty();
try { try {
Thread.sleep(10); Thread.sleep(10);

View File

@ -541,7 +541,7 @@ public class Client {
} }
private synchronized AuthMethod setupSaslConnection(final InputStream in2, private synchronized AuthMethod setupSaslConnection(final InputStream in2,
final OutputStream out2) throws IOException, InterruptedException { final OutputStream out2) throws IOException {
// Do not use Client.conf here! We must use ConnectionId.conf, since the // Do not use Client.conf here! We must use ConnectionId.conf, since the
// Client object is cached and shared between all RPC clients, even those // Client object is cached and shared between all RPC clients, even those
// for separate services. // for separate services.

View File

@ -18,6 +18,11 @@
package org.apache.hadoop.ipc; package org.apache.hadoop.ipc;
import static org.apache.hadoop.ipc.RpcConstants.AUTHORIZATION_FAILED_CALL_ID;
import static org.apache.hadoop.ipc.RpcConstants.CONNECTION_CONTEXT_CALL_ID;
import static org.apache.hadoop.ipc.RpcConstants.CURRENT_VERSION;
import static org.apache.hadoop.ipc.RpcConstants.PING_CALL_ID;
import java.io.ByteArrayInputStream; import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream; import java.io.ByteArrayOutputStream;
import java.io.DataInputStream; import java.io.DataInputStream;
@ -75,8 +80,6 @@ import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.io.DataOutputBuffer; import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableUtils; import org.apache.hadoop.io.WritableUtils;
import static org.apache.hadoop.ipc.RpcConstants.*;
import org.apache.hadoop.ipc.ProtobufRpcEngine.RpcResponseMessageWrapper; import org.apache.hadoop.ipc.ProtobufRpcEngine.RpcResponseMessageWrapper;
import org.apache.hadoop.ipc.ProtobufRpcEngine.RpcResponseWrapper; import org.apache.hadoop.ipc.ProtobufRpcEngine.RpcResponseWrapper;
import org.apache.hadoop.ipc.RPC.RpcInvoker; import org.apache.hadoop.ipc.RPC.RpcInvoker;
@ -467,17 +470,24 @@ public abstract class Server {
return serviceAuthorizationManager; return serviceAuthorizationManager;
} }
static Class<? extends BlockingQueue<Call>> getQueueClass(
String prefix, Configuration conf) {
String name = prefix + "." + CommonConfigurationKeys.IPC_CALLQUEUE_IMPL_KEY;
Class<?> queueClass = conf.getClass(name, LinkedBlockingQueue.class);
return CallQueueManager.convertQueueClass(queueClass, Call.class);
}
private String getQueueClassPrefix() {
return CommonConfigurationKeys.IPC_CALLQUEUE_NAMESPACE + "." + port;
}
/* /*
* Refresh the call queue * Refresh the call queue
*/ */
public synchronized void refreshCallQueue(Configuration conf) { public synchronized void refreshCallQueue(Configuration conf) {
// Create the next queue // Create the next queue
String prefix = CommonConfigurationKeys.IPC_CALLQUEUE_NAMESPACE + "." + String prefix = getQueueClassPrefix();
this.port; callQueue.swapQueue(getQueueClass(prefix, conf), maxQueueSize, prefix, conf);
Class queueClassToUse = conf.getClass(prefix + "." +
CommonConfigurationKeys.IPC_CALLQUEUE_IMPL_KEY, LinkedBlockingQueue.class);
callQueue.swapQueue(queueClassToUse, maxQueueSize, prefix, conf);
} }
/** A call queued for handling. */ /** A call queued for handling. */
@ -1225,9 +1235,9 @@ public abstract class Server {
Throwable cause = e; Throwable cause = e;
while (cause != null) { while (cause != null) {
if (cause instanceof RetriableException) { if (cause instanceof RetriableException) {
return (RetriableException) cause; return cause;
} else if (cause instanceof StandbyException) { } else if (cause instanceof StandbyException) {
return (StandbyException) cause; return cause;
} else if (cause instanceof InvalidToken) { } else if (cause instanceof InvalidToken) {
// FIXME: hadoop method signatures are restricting the SASL // FIXME: hadoop method signatures are restricting the SASL
// callbacks to only returning InvalidToken, but some services // callbacks to only returning InvalidToken, but some services
@ -1297,7 +1307,7 @@ public abstract class Server {
private RpcSaslProto processSaslMessage(RpcSaslProto saslMessage) private RpcSaslProto processSaslMessage(RpcSaslProto saslMessage)
throws IOException, InterruptedException { throws IOException, InterruptedException {
RpcSaslProto saslResponse = null; final RpcSaslProto saslResponse;
final SaslState state = saslMessage.getState(); // required final SaslState state = saslMessage.getState(); // required
switch (state) { switch (state) {
case NEGOTIATE: { case NEGOTIATE: {
@ -1333,27 +1343,18 @@ public abstract class Server {
// SIMPLE is a legit option above. we will send no response // SIMPLE is a legit option above. we will send no response
if (authMethod == AuthMethod.SIMPLE) { if (authMethod == AuthMethod.SIMPLE) {
switchToSimple(); switchToSimple();
saslResponse = null;
break; break;
} }
// sasl server for tokens may already be instantiated // sasl server for tokens may already be instantiated
if (saslServer == null || authMethod != AuthMethod.TOKEN) { if (saslServer == null || authMethod != AuthMethod.TOKEN) {
saslServer = createSaslServer(authMethod); saslServer = createSaslServer(authMethod);
} }
// fallthru to process sasl token saslResponse = processSaslToken(saslMessage);
break;
} }
case RESPONSE: { case RESPONSE: {
if (!saslMessage.hasToken()) { saslResponse = processSaslToken(saslMessage);
throw new SaslException("Client did not send a token");
}
byte[] saslToken = saslMessage.getToken().toByteArray();
if (LOG.isDebugEnabled()) {
LOG.debug("Have read input token of size " + saslToken.length
+ " for processing by saslServer.evaluateResponse()");
}
saslToken = saslServer.evaluateResponse(saslToken);
saslResponse = buildSaslResponse(
saslServer.isComplete() ? SaslState.SUCCESS : SaslState.CHALLENGE,
saslToken);
break; break;
} }
default: default:
@ -1362,6 +1363,22 @@ public abstract class Server {
return saslResponse; return saslResponse;
} }
private RpcSaslProto processSaslToken(RpcSaslProto saslMessage)
throws SaslException {
if (!saslMessage.hasToken()) {
throw new SaslException("Client did not send a token");
}
byte[] saslToken = saslMessage.getToken().toByteArray();
if (LOG.isDebugEnabled()) {
LOG.debug("Have read input token of size " + saslToken.length
+ " for processing by saslServer.evaluateResponse()");
}
saslToken = saslServer.evaluateResponse(saslToken);
return buildSaslResponse(
saslServer.isComplete() ? SaslState.SUCCESS : SaslState.CHALLENGE,
saslToken);
}
private void switchToSimple() { private void switchToSimple() {
// disable SASL and blank out any SASL server // disable SASL and blank out any SASL server
authProtocol = AuthProtocol.NONE; authProtocol = AuthProtocol.NONE;
@ -2123,12 +2140,9 @@ public abstract class Server {
CommonConfigurationKeys.IPC_SERVER_RPC_READ_CONNECTION_QUEUE_SIZE_DEFAULT); CommonConfigurationKeys.IPC_SERVER_RPC_READ_CONNECTION_QUEUE_SIZE_DEFAULT);
// Setup appropriate callqueue // Setup appropriate callqueue
String prefix = CommonConfigurationKeys.IPC_CALLQUEUE_NAMESPACE + "." + final String prefix = getQueueClassPrefix();
this.port; this.callQueue = new CallQueueManager<Call>(getQueueClass(prefix, conf),
Class queueClassToUse = conf.getClass(prefix + "." + maxQueueSize, prefix, conf);
CommonConfigurationKeys.IPC_CALLQUEUE_IMPL_KEY, LinkedBlockingQueue.class);
this.callQueue = new CallQueueManager<Call>(queueClassToUse, maxQueueSize,
prefix, conf);
this.secretManager = (SecretManager<TokenIdentifier>) secretManager; this.secretManager = (SecretManager<TokenIdentifier>) secretManager;
this.authorize = this.authorize =

View File

@ -18,22 +18,15 @@
package org.apache.hadoop.ipc; package org.apache.hadoop.ipc;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.HashMap;
import java.util.ArrayList;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import org.junit.Assert; import java.util.ArrayList;
import org.junit.Assume; import java.util.HashMap;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import org.junit.Test; import org.junit.Test;
import org.junit.Before;
import org.junit.After;
public class TestCallQueueManager { public class TestCallQueueManager {
private CallQueueManager<FakeCall> manager; private CallQueueManager<FakeCall> manager;
@ -146,23 +139,26 @@ public class TestCallQueueManager {
} }
private static final Class<? extends BlockingQueue<FakeCall>> queueClass
= CallQueueManager.convertQueueClass(LinkedBlockingQueue.class, FakeCall.class);
@Test @Test
public void testCallQueueCapacity() throws InterruptedException { public void testCallQueueCapacity() throws InterruptedException {
manager = new CallQueueManager<FakeCall>(LinkedBlockingQueue.class, 10, "", null); manager = new CallQueueManager<FakeCall>(queueClass, 10, "", null);
assertCanPut(manager, 10, 20); // Will stop at 10 due to capacity assertCanPut(manager, 10, 20); // Will stop at 10 due to capacity
} }
@Test @Test
public void testEmptyConsume() throws InterruptedException { public void testEmptyConsume() throws InterruptedException {
manager = new CallQueueManager<FakeCall>(LinkedBlockingQueue.class, 10, "", null); manager = new CallQueueManager<FakeCall>(queueClass, 10, "", null);
assertCanTake(manager, 0, 1); // Fails since it's empty assertCanTake(manager, 0, 1); // Fails since it's empty
} }
@Test(timeout=60000) @Test(timeout=60000)
public void testSwapUnderContention() throws InterruptedException { public void testSwapUnderContention() throws InterruptedException {
manager = new CallQueueManager<FakeCall>(LinkedBlockingQueue.class, 5000, "", null); manager = new CallQueueManager<FakeCall>(queueClass, 5000, "", null);
ArrayList<Putter> producers = new ArrayList<Putter>(); ArrayList<Putter> producers = new ArrayList<Putter>();
ArrayList<Taker> consumers = new ArrayList<Taker>(); ArrayList<Taker> consumers = new ArrayList<Taker>();
@ -191,7 +187,7 @@ public class TestCallQueueManager {
Thread.sleep(10); Thread.sleep(10);
for (int i=0; i < 5; i++) { for (int i=0; i < 5; i++) {
manager.swapQueue(LinkedBlockingQueue.class, 5000, "", null); manager.swapQueue(queueClass, 5000, "", null);
} }
// Stop the producers // Stop the producers