rpcRequest
, to the IPC server defined by
* remoteId
, returning the rpc respond.
+ *
+ * @param rpcKind
+ * @param rpcRequest - contains serialized method and method parameters
+ * @param remoteId - the target rpc server
+ * @param fallbackToSimpleAuth - set to true or false during this method to
+ * indicate if a secure client falls back to simple auth
+ * @returns the rpc response
+ * Throws exceptions if there are network problems or if the remote code
+ * threw an exception.
+ */
+ public Writable call(RPC.RpcKind rpcKind, Writable rpcRequest,
+ ConnectionId remoteId, AtomicBoolean fallbackToSimpleAuth)
+ throws IOException {
+ return call(rpcKind, rpcRequest, remoteId, RPC.RPC_SERVICE_CLASS_DEFAULT,
+ fallbackToSimpleAuth);
+ }
+
+ /**
+ * Make a call, passing rpcRequest
, to the IPC server defined by
+ * remoteId
, returning the rpc response.
*
* @param rpcKind
* @param rpcRequest - contains serialized method and method parameters
@@ -1386,8 +1414,29 @@ public class Client {
*/
public Writable call(RPC.RpcKind rpcKind, Writable rpcRequest,
ConnectionId remoteId, int serviceClass) throws IOException {
+ return call(rpcKind, rpcRequest, remoteId, serviceClass, null);
+ }
+
+ /**
+ * Make a call, passing rpcRequest
, to the IPC server defined by
+ * remoteId
, returning the rpc response.
+ *
+ * @param rpcKind
+ * @param rpcRequest - contains serialized method and method parameters
+ * @param remoteId - the target rpc server
+ * @param serviceClass - service class for RPC
+ * @param fallbackToSimpleAuth - set to true or false during this method to
+ * indicate if a secure client falls back to simple auth
+ * @returns the rpc response
+ * Throws exceptions if there are network problems or if the remote code
+ * threw an exception.
+ */
+ public Writable call(RPC.RpcKind rpcKind, Writable rpcRequest,
+ ConnectionId remoteId, int serviceClass,
+ AtomicBoolean fallbackToSimpleAuth) throws IOException {
final Call call = createCall(rpcKind, rpcRequest);
- Connection connection = getConnection(remoteId, call, serviceClass);
+ Connection connection = getConnection(remoteId, call, serviceClass,
+ fallbackToSimpleAuth);
try {
connection.sendRpcRequest(call); // send the rpc request
} catch (RejectedExecutionException e) {
@@ -1444,7 +1493,8 @@ public class Client {
/** Get a connection from the pool, or create a new one and add it to the
* pool. Connections to a given ConnectionId are reused. */
private Connection getConnection(ConnectionId remoteId,
- Call call, int serviceClass) throws IOException {
+ Call call, int serviceClass, AtomicBoolean fallbackToSimpleAuth)
+ throws IOException {
if (!running.get()) {
// the client is stopped
throw new IOException("The client is stopped");
@@ -1468,7 +1518,7 @@ public class Client {
//block above. The reason for that is if the server happens to be slow,
//it will take longer to establish a connection and that will slow the
//entire system down.
- connection.setupIOstreams();
+ connection.setupIOstreams(fallbackToSimpleAuth);
return connection;
}
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java
index 0ccdb71d0ee..124d835ab15 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java
@@ -27,6 +27,7 @@ import java.lang.reflect.Proxy;
import java.net.InetSocketAddress;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
import javax.net.SocketFactory;
@@ -84,14 +85,23 @@ public class ProtobufRpcEngine implements RpcEngine {
}
@Override
- @SuppressWarnings("unchecked")
public * {@link ResourceManager} uses this class to write the information of @@ -71,8 +73,10 @@ public class RMApplicationHistoryWriter extends CompositeService { .getLog(RMApplicationHistoryWriter.class); private Dispatcher dispatcher; - private ApplicationHistoryWriter writer; - private boolean historyServiceEnabled; + @VisibleForTesting + ApplicationHistoryWriter writer; + @VisibleForTesting + boolean historyServiceEnabled; public RMApplicationHistoryWriter() { super(RMApplicationHistoryWriter.class.getName()); @@ -80,13 +84,18 @@ public class RMApplicationHistoryWriter extends CompositeService { @Override protected synchronized void serviceInit(Configuration conf) throws Exception { - historyServiceEnabled = conf.getBoolean(YarnConfiguration.APPLICATION_HISTORY_ENABLED, YarnConfiguration.DEFAULT_APPLICATION_HISTORY_ENABLED); + if (conf.get(YarnConfiguration.APPLICATION_HISTORY_STORE) == null || + conf.get(YarnConfiguration.APPLICATION_HISTORY_STORE).length() == 0 || + conf.get(YarnConfiguration.APPLICATION_HISTORY_STORE).equals( + NullApplicationHistoryStore.class.getName())) { + historyServiceEnabled = false; + } - // Only create the services when the history service is enabled, preventing - // wasting the system resources. + // Only create the services when the history service is enabled and not + // using the null store, preventing wasting the system resources. if (historyServiceEnabled) { writer = createApplicationHistoryStore(conf); addIfService(writer); @@ -112,25 +121,19 @@ public class RMApplicationHistoryWriter extends CompositeService { protected ApplicationHistoryStore createApplicationHistoryStore( Configuration conf) { - // If the history writer is not enabled, a dummy store will be used to - // write nothing - if (historyServiceEnabled) { - try { - Class extends ApplicationHistoryStore> storeClass = - conf.getClass(YarnConfiguration.APPLICATION_HISTORY_STORE, - FileSystemApplicationHistoryStore.class, + try { + Class extends ApplicationHistoryStore> storeClass = + conf.getClass(YarnConfiguration.APPLICATION_HISTORY_STORE, + NullApplicationHistoryStore.class, ApplicationHistoryStore.class); - return storeClass.newInstance(); - } catch (Exception e) { - String msg = - "Could not instantiate ApplicationHistoryWriter: " - + conf.get(YarnConfiguration.APPLICATION_HISTORY_STORE, - FileSystemApplicationHistoryStore.class.getName()); - LOG.error(msg, e); - throw new YarnRuntimeException(msg, e); - } - } else { - return new NullApplicationHistoryStore(); + return storeClass.newInstance(); + } catch (Exception e) { + String msg = + "Could not instantiate ApplicationHistoryWriter: " + + conf.get(YarnConfiguration.APPLICATION_HISTORY_STORE, + NullApplicationHistoryStore.class.getName()); + LOG.error(msg, e); + throw new YarnRuntimeException(msg, e); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/ahs/TestRMApplicationHistoryWriter.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/ahs/TestRMApplicationHistoryWriter.java index e83a6b9cc48..1c90ad213dd 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/ahs/TestRMApplicationHistoryWriter.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/ahs/TestRMApplicationHistoryWriter.java @@ -78,6 +78,8 @@ public class TestRMApplicationHistoryWriter { store = new MemoryApplicationHistoryStore(); Configuration conf = new Configuration(); conf.setBoolean(YarnConfiguration.APPLICATION_HISTORY_ENABLED, true); + conf.setClass(YarnConfiguration.APPLICATION_HISTORY_STORE, + MemoryApplicationHistoryStore.class, ApplicationHistoryStore.class); writer = new RMApplicationHistoryWriter() { @Override @@ -174,6 +176,22 @@ public class TestRMApplicationHistoryWriter { return container; } + @Test + public void testDefaultStoreSetup() throws Exception { + Configuration conf = new YarnConfiguration(); + conf.setBoolean(YarnConfiguration.APPLICATION_HISTORY_ENABLED, true); + RMApplicationHistoryWriter writer = new RMApplicationHistoryWriter(); + writer.init(conf); + writer.start(); + try { + Assert.assertFalse(writer.historyServiceEnabled); + Assert.assertNull(writer.writer); + } finally { + writer.stop(); + writer.close(); + } + } + @Test public void testWriteApplication() throws Exception { RMApp app = createRMApp(ApplicationId.newInstance(0, 1));