diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt index a1e86bbdd06..74bc315ae0c 100644 --- a/hadoop-common-project/hadoop-common/CHANGES.txt +++ b/hadoop-common-project/hadoop-common/CHANGES.txt @@ -345,6 +345,9 @@ Release 2.5.0 - UNRELEASED IMPROVEMENTS + HADOOP-10278. Refactor to make CallQueue pluggable. (Chris Li via + Arpit Agarwal) + OPTIMIZATIONS BUG FIXES @@ -403,6 +406,9 @@ Release 2.4.0 - UNRELEASED HADOOP-10355. Fix TestLoadGenerator#testLoadGenerator. (Haohui Mai via jing9) + HADOOP-10070. RPC client doesn't use per-connection conf to determine + server's expected Kerberos principal name. (atm) + Release 2.3.1 - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java index b57b3f2ce5a..e2d4fbcb9d3 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java @@ -82,6 +82,14 @@ public class CommonConfigurationKeys extends CommonConfigurationKeysPublic { /** Default value for IPC_SERVER_HANDLER_QUEUE_SIZE_KEY */ public static final int IPC_SERVER_HANDLER_QUEUE_SIZE_DEFAULT = 100; + /** + * CallQueue related settings. These are not used directly, but rather + * combined with a namespace and port. For instance: + * IPC_CALLQUEUE_NAMESPACE + ".8020." + IPC_CALLQUEUE_IMPL_KEY + */ + public static final String IPC_CALLQUEUE_NAMESPACE = "ipc"; + public static final String IPC_CALLQUEUE_IMPL_KEY = "callqueue.impl"; + /** Internal buffer size for Lzo compressor/decompressors */ public static final String IO_COMPRESSION_CODEC_LZO_BUFFERSIZE_KEY = "io.compression.codec.lzo.buffersize"; diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/CallQueueManager.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/CallQueueManager.java new file mode 100644 index 00000000000..dae7ace4bd8 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/CallQueueManager.java @@ -0,0 +1,159 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ipc; + +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.TimeUnit; + +import java.lang.reflect.Constructor; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import org.apache.hadoop.conf.Configuration; + +/** + * Abstracts queue operations for different blocking queues. + */ +public class CallQueueManager { + public static final Log LOG = LogFactory.getLog(CallQueueManager.class); + + // Atomic refs point to active callQueue + // We have two so we can better control swapping + private final AtomicReference> putRef; + private final AtomicReference> takeRef; + + public CallQueueManager(Class backingClass, int maxQueueSize, + String namespace, Configuration conf) { + BlockingQueue bq = createCallQueueInstance(backingClass, + maxQueueSize, namespace, conf); + this.putRef = new AtomicReference>(bq); + this.takeRef = new AtomicReference>(bq); + LOG.info("Using callQueue " + backingClass); + } + + @SuppressWarnings("unchecked") + private BlockingQueue createCallQueueInstance(Class theClass, int maxLen, + String ns, Configuration conf) { + + // Used for custom, configurable callqueues + try { + Constructor ctor = theClass.getDeclaredConstructor(int.class, String.class, + Configuration.class); + return (BlockingQueue)ctor.newInstance(maxLen, ns, conf); + } catch (RuntimeException e) { + throw e; + } catch (Exception e) { + } + + // Used for LinkedBlockingQueue, ArrayBlockingQueue, etc + try { + Constructor ctor = theClass.getDeclaredConstructor(int.class); + return (BlockingQueue)ctor.newInstance(maxLen); + } catch (RuntimeException e) { + throw e; + } catch (Exception e) { + } + + // Last attempt + try { + Constructor ctor = theClass.getDeclaredConstructor(); + return (BlockingQueue)ctor.newInstance(); + } catch (RuntimeException e) { + throw e; + } catch (Exception e) { + } + + // Nothing worked + throw new RuntimeException(theClass.getName() + + " could not be constructed."); + } + + /** + * Insert e into the backing queue or block until we can. + * If we block and the queue changes on us, we will insert while the + * queue is drained. + */ + public void put(E e) throws InterruptedException { + putRef.get().put(e); + } + + /** + * Retrieve an E from the backing queue or block until we can. + * Guaranteed to return an element from the current queue. + */ + public E take() throws InterruptedException { + E e = null; + + while (e == null) { + e = takeRef.get().poll(1000L, TimeUnit.MILLISECONDS); + } + + return e; + } + + public int size() { + return takeRef.get().size(); + } + + /** + * Replaces active queue with the newly requested one and transfers + * all calls to the newQ before returning. + */ + public synchronized void swapQueue(Class queueClassToUse, int maxSize, + String ns, Configuration conf) { + BlockingQueue newQ = createCallQueueInstance(queueClassToUse, maxSize, + ns, conf); + + // Our current queue becomes the old queue + BlockingQueue oldQ = putRef.get(); + + // Swap putRef first: allow blocked puts() to be unblocked + putRef.set(newQ); + + // Wait for handlers to drain the oldQ + while (!queueIsReallyEmpty(oldQ)) {} + + // Swap takeRef to handle new calls + takeRef.set(newQ); + + LOG.info("Old Queue: " + stringRepr(oldQ) + ", " + + "Replacement: " + stringRepr(newQ)); + } + + /** + * Checks if queue is empty by checking at two points in time. + * This doesn't mean the queue might not fill up at some point later, but + * it should decrease the probability that we lose a call this way. + */ + private boolean queueIsReallyEmpty(BlockingQueue q) { + boolean wasEmpty = q.isEmpty(); + try { + Thread.sleep(10); + } catch (InterruptedException ie) { + return false; + } + return q.isEmpty() && wasEmpty; + } + + private String stringRepr(Object o) { + return o.getClass().getName() + '@' + Integer.toHexString(o.hashCode()); + } +} diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java index b3cc04c4eb0..520269760d5 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java @@ -542,8 +542,11 @@ public class Client { private synchronized AuthMethod setupSaslConnection(final InputStream in2, final OutputStream out2) throws IOException, InterruptedException { + // Do not use Client.conf here! We must use ConnectionId.conf, since the + // Client object is cached and shared between all RPC clients, even those + // for separate services. saslRpcClient = new SaslRpcClient(remoteId.getTicket(), - remoteId.getProtocol(), remoteId.getAddress(), conf); + remoteId.getProtocol(), remoteId.getAddress(), remoteId.conf); return saslRpcClient.saslConnect(in2, out2); } @@ -1480,21 +1483,31 @@ public class Client { private final boolean doPing; //do we need to send ping message private final int pingInterval; // how often sends ping to the server in msecs private String saslQop; // here for testing + private final Configuration conf; // used to get the expected kerberos principal name ConnectionId(InetSocketAddress address, Class protocol, - UserGroupInformation ticket, int rpcTimeout, int maxIdleTime, - RetryPolicy connectionRetryPolicy, int maxRetriesOnSocketTimeouts, - boolean tcpNoDelay, boolean doPing, int pingInterval) { + UserGroupInformation ticket, int rpcTimeout, + RetryPolicy connectionRetryPolicy, Configuration conf) { this.protocol = protocol; this.address = address; this.ticket = ticket; this.rpcTimeout = rpcTimeout; - this.maxIdleTime = maxIdleTime; this.connectionRetryPolicy = connectionRetryPolicy; - this.maxRetriesOnSocketTimeouts = maxRetriesOnSocketTimeouts; - this.tcpNoDelay = tcpNoDelay; - this.doPing = doPing; - this.pingInterval = pingInterval; + + this.maxIdleTime = conf.getInt( + CommonConfigurationKeysPublic.IPC_CLIENT_CONNECTION_MAXIDLETIME_KEY, + CommonConfigurationKeysPublic.IPC_CLIENT_CONNECTION_MAXIDLETIME_DEFAULT); + this.maxRetriesOnSocketTimeouts = conf.getInt( + CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SOCKET_TIMEOUTS_KEY, + CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SOCKET_TIMEOUTS_DEFAULT); + this.tcpNoDelay = conf.getBoolean( + CommonConfigurationKeysPublic.IPC_CLIENT_TCPNODELAY_KEY, + CommonConfigurationKeysPublic.IPC_CLIENT_TCPNODELAY_DEFAULT); + this.doPing = conf.getBoolean( + CommonConfigurationKeys.IPC_CLIENT_PING_KEY, + CommonConfigurationKeys.IPC_CLIENT_PING_DEFAULT); + this.pingInterval = (doPing ? Client.getPingInterval(conf) : 0); + this.conf = conf; } InetSocketAddress getAddress() { @@ -1572,19 +1585,8 @@ public class Client { max, retryInterval, TimeUnit.MILLISECONDS); } - boolean doPing = - conf.getBoolean(CommonConfigurationKeys.IPC_CLIENT_PING_KEY, true); return new ConnectionId(addr, protocol, ticket, rpcTimeout, - conf.getInt(CommonConfigurationKeysPublic.IPC_CLIENT_CONNECTION_MAXIDLETIME_KEY, - CommonConfigurationKeysPublic.IPC_CLIENT_CONNECTION_MAXIDLETIME_DEFAULT), - connectionRetryPolicy, - conf.getInt( - CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SOCKET_TIMEOUTS_KEY, - CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SOCKET_TIMEOUTS_DEFAULT), - conf.getBoolean(CommonConfigurationKeysPublic.IPC_CLIENT_TCPNODELAY_KEY, - CommonConfigurationKeysPublic.IPC_CLIENT_TCPNODELAY_DEFAULT), - doPing, - (doPing ? Client.getPingInterval(conf) : 0)); + connectionRetryPolicy, conf); } static boolean isEqual(Object a, Object b) { diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ClientCache.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ClientCache.java index dccd15dffb3..d0fb8fd1145 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ClientCache.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ClientCache.java @@ -59,6 +59,9 @@ public class ClientCache { } else { client.incCount(); } + if (Client.LOG.isDebugEnabled()) { + Client.LOG.debug("getting client out of cache: " + client); + } return client; } @@ -90,13 +93,23 @@ public class ClientCache { * A RPC client is closed only when its reference count becomes zero. */ public void stopClient(Client client) { + if (Client.LOG.isDebugEnabled()) { + Client.LOG.debug("stopping client from cache: " + client); + } synchronized (this) { client.decCount(); if (client.isZeroReference()) { + if (Client.LOG.isDebugEnabled()) { + Client.LOG.debug("removing client from cache: " + client); + } clients.remove(client.getSocketFactory()); } } if (client.isZeroReference()) { + if (Client.LOG.isDebugEnabled()) { + Client.LOG.debug("stopping actual client because no more references remain: " + + client); + } client.stop(); } } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java index 9871a3d138a..92d8bbaf266 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java @@ -365,7 +365,7 @@ public abstract class Server { private final boolean tcpNoDelay; // if T then disable Nagle's Algorithm volatile private boolean running = true; // true while server runs - private BlockingQueue callQueue; // queued calls + private CallQueueManager callQueue; // maintains the set of client connections and handles idle timeouts private ConnectionManager connectionManager; @@ -469,6 +469,19 @@ public abstract class Server { return serviceAuthorizationManager; } + /* + * Refresh the call queue + */ + public synchronized void refreshCallQueue(Configuration conf) { + // Create the next queue + String prefix = CommonConfigurationKeys.IPC_CALLQUEUE_NAMESPACE + "." + + this.port; + Class queueClassToUse = conf.getClass(prefix + "." + + CommonConfigurationKeys.IPC_CALLQUEUE_IMPL_KEY, LinkedBlockingQueue.class); + + callQueue.swapQueue(queueClassToUse, maxQueueSize, prefix, conf); + } + /** A call queued for handling. */ public static class Call { private final int callId; // the client's call id @@ -2193,7 +2206,15 @@ public abstract class Server { this.readerPendingConnectionQueue = conf.getInt( CommonConfigurationKeys.IPC_SERVER_RPC_READ_CONNECTION_QUEUE_SIZE_KEY, CommonConfigurationKeys.IPC_SERVER_RPC_READ_CONNECTION_QUEUE_SIZE_DEFAULT); - this.callQueue = new LinkedBlockingQueue(maxQueueSize); + + // Setup appropriate callqueue + String prefix = CommonConfigurationKeys.IPC_CALLQUEUE_NAMESPACE + "." + + this.port; + Class queueClassToUse = conf.getClass(prefix + "." + + CommonConfigurationKeys.IPC_CALLQUEUE_IMPL_KEY, LinkedBlockingQueue.class); + this.callQueue = new CallQueueManager(queueClassToUse, maxQueueSize, + prefix, conf); + this.secretManager = (SecretManager) secretManager; this.authorize = conf.getBoolean(CommonConfigurationKeys.HADOOP_SECURITY_AUTHORIZATION, diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/SaslRpcClient.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/SaslRpcClient.java index a37616abd56..5343737ec34 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/SaslRpcClient.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/SaslRpcClient.java @@ -309,6 +309,10 @@ public class SaslRpcClient { // check that the server advertised principal matches our conf String confPrincipal = SecurityUtil.getServerPrincipal( conf.get(serverKey), serverAddr.getAddress()); + if (LOG.isDebugEnabled()) { + LOG.debug("getting serverKey: " + serverKey + " conf value: " + conf.get(serverKey) + + " principal: " + confPrincipal); + } if (confPrincipal == null || confPrincipal.isEmpty()) { throw new IllegalArgumentException( "Failed to specify server's Kerberos principal name"); diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestCallQueueManager.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestCallQueueManager.java new file mode 100644 index 00000000000..3e519af8d2d --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestCallQueueManager.java @@ -0,0 +1,223 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ipc; + +import java.util.concurrent.LinkedBlockingQueue; +import java.util.HashMap; +import java.util.ArrayList; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import org.junit.Assert; +import org.junit.Assume; +import org.junit.Test; +import org.junit.Before; +import org.junit.After; + +public class TestCallQueueManager { + private CallQueueManager manager; + + public class FakeCall { + public final int tag; // Can be used for unique identification + + public FakeCall(int tag) { + this.tag = tag; + } + } + + /** + * Putter produces FakeCalls + */ + public class Putter implements Runnable { + private final CallQueueManager cq; + + public final int tag; + public volatile int callsAdded = 0; // How many calls we added, accurate unless interrupted + private final int maxCalls; + + private boolean isRunning = true; + + public Putter(CallQueueManager aCq, int maxCalls, int tag) { + this.maxCalls = maxCalls; + this.cq = aCq; + this.tag = tag; + } + + public void run() { + try { + // Fill up to max (which is infinite if maxCalls < 0) + while (isRunning && (callsAdded < maxCalls || maxCalls < 0)) { + cq.put(new FakeCall(this.tag)); + callsAdded++; + } + } catch (InterruptedException e) { + return; + } + } + + public void stop() { + this.isRunning = false; + } + } + + /** + * Taker consumes FakeCalls + */ + public class Taker implements Runnable { + private final CallQueueManager cq; + + public final int tag; // if >= 0 means we will only take the matching tag, and put back + // anything else + public volatile int callsTaken = 0; // total calls taken, accurate if we aren't interrupted + public volatile FakeCall lastResult = null; // the last thing we took + private final int maxCalls; // maximum calls to take + + public Taker(CallQueueManager aCq, int maxCalls, int tag) { + this.maxCalls = maxCalls; + this.cq = aCq; + this.tag = tag; + } + + public void run() { + try { + // Take while we don't exceed maxCalls, or if maxCalls is undefined (< 0) + while (callsTaken < maxCalls || maxCalls < 0) { + FakeCall res = cq.take(); + + if (tag >= 0 && res.tag != this.tag) { + // This call does not match our tag, we should put it back and try again + cq.put(res); + } else { + callsTaken++; + lastResult = res; + } + } + } catch (InterruptedException e) { + return; + } + } + } + + // Assert we can take exactly the numberOfTakes + public void assertCanTake(CallQueueManager cq, int numberOfTakes, + int takeAttempts) throws InterruptedException { + + Taker taker = new Taker(cq, takeAttempts, -1); + Thread t = new Thread(taker); + t.start(); + t.join(100); + + assertEquals(taker.callsTaken, numberOfTakes); + t.interrupt(); + } + + // Assert we can put exactly the numberOfPuts + public void assertCanPut(CallQueueManager cq, int numberOfPuts, + int putAttempts) throws InterruptedException { + + Putter putter = new Putter(cq, putAttempts, -1); + Thread t = new Thread(putter); + t.start(); + t.join(100); + + assertEquals(putter.callsAdded, numberOfPuts); + t.interrupt(); + } + + + @Test + public void testCallQueueCapacity() throws InterruptedException { + manager = new CallQueueManager(LinkedBlockingQueue.class, 10, "", null); + + assertCanPut(manager, 10, 20); // Will stop at 10 due to capacity + } + + @Test + public void testEmptyConsume() throws InterruptedException { + manager = new CallQueueManager(LinkedBlockingQueue.class, 10, "", null); + + assertCanTake(manager, 0, 1); // Fails since it's empty + } + + @Test(timeout=60000) + public void testSwapUnderContention() throws InterruptedException { + manager = new CallQueueManager(LinkedBlockingQueue.class, 5000, "", null); + + ArrayList producers = new ArrayList(); + ArrayList consumers = new ArrayList(); + + HashMap threads = new HashMap(); + + // Create putters and takers + for (int i=0; i < 50; i++) { + Putter p = new Putter(manager, -1, -1); + Thread pt = new Thread(p); + producers.add(p); + threads.put(p, pt); + + pt.start(); + } + + for (int i=0; i < 20; i++) { + Taker t = new Taker(manager, -1, -1); + Thread tt = new Thread(t); + consumers.add(t); + threads.put(t, tt); + + tt.start(); + } + + Thread.sleep(10); + + assertTrue(manager.size() > 0); + + for (int i=0; i < 5; i++) { + manager.swapQueue(LinkedBlockingQueue.class, 5000, "", null); + } + + // Stop the producers + for (Putter p : producers) { + p.stop(); + } + + // Wait for consumers to wake up, then consume + Thread.sleep(2000); + assertEquals(0, manager.size()); + + // Ensure no calls were dropped + long totalCallsCreated = 0; + long totalCallsConsumed = 0; + + for (Putter p : producers) { + totalCallsCreated += p.callsAdded; + threads.get(p).interrupt(); + } + for (Taker t : consumers) { + totalCallsConsumed += t.callsTaken; + threads.get(t).interrupt(); + } + + assertEquals(totalCallsConsumed, totalCallsCreated); + } +} \ No newline at end of file diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index bfd854e9c6b..0aa03845c54 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -215,6 +215,9 @@ Release 2.4.0 - UNRELEASED YARN-1171. Add default queue properties to Fair Scheduler documentation (Naren Koneru via Sandy Ryza) + YARN-1470. Add audience annotations to MiniYARNCluster. (Anubhav Dhoot + via kasha) + OPTIMIZATIONS BUG FIXES diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java index 7fcd882ea5c..ff2e995ae5f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java @@ -29,6 +29,7 @@ import java.util.concurrent.ConcurrentMap; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileContext; import org.apache.hadoop.fs.Path; @@ -84,6 +85,8 @@ import com.google.common.annotations.VisibleForTesting; * the hostname:port of the namenodes. In such case, the AM must * do resource request using hostname:port as the location. */ +@InterfaceAudience.Public +@InterfaceStability.Evolving public class MiniYARNCluster extends CompositeService { private static final Log LOG = LogFactory.getLog(MiniYARNCluster.class);