YARN-1954. Added waitFor to AMRMClient(Async). Contributed by Tsuyoshi Ozawa.
svn merge --ignore-ancestry -c 1617002 ../../trunk/ git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1617003 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
a6611f2bd1
commit
af83e57e82
|
@ -85,6 +85,8 @@ Release 2.6.0 - UNRELEASED
|
||||||
YARN-2026. Fair scheduler: Consider only active queues for computing fairshare.
|
YARN-2026. Fair scheduler: Consider only active queues for computing fairshare.
|
||||||
(Ashwin Shankar via kasha)
|
(Ashwin Shankar via kasha)
|
||||||
|
|
||||||
|
YARN-1954. Added waitFor to AMRMClient(Async). (Tsuyoshi Ozawa via zjshen)
|
||||||
|
|
||||||
OPTIMIZATIONS
|
OPTIMIZATIONS
|
||||||
|
|
||||||
BUG FIXES
|
BUG FIXES
|
||||||
|
|
|
@ -22,6 +22,8 @@ import java.io.IOException;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||||
import org.apache.hadoop.classification.InterfaceAudience.Public;
|
import org.apache.hadoop.classification.InterfaceAudience.Public;
|
||||||
|
@ -37,12 +39,14 @@ import org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl;
|
||||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||||
|
|
||||||
import com.google.common.base.Preconditions;
|
import com.google.common.base.Preconditions;
|
||||||
|
import com.google.common.base.Supplier;
|
||||||
import com.google.common.collect.ImmutableList;
|
import com.google.common.collect.ImmutableList;
|
||||||
|
|
||||||
@InterfaceAudience.Public
|
@InterfaceAudience.Public
|
||||||
@InterfaceStability.Stable
|
@InterfaceStability.Stable
|
||||||
public abstract class AMRMClient<T extends AMRMClient.ContainerRequest> extends
|
public abstract class AMRMClient<T extends AMRMClient.ContainerRequest> extends
|
||||||
AbstractService {
|
AbstractService {
|
||||||
|
private static final Log LOG = LogFactory.getLog(AMRMClient.class);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Create a new instance of AMRMClient.
|
* Create a new instance of AMRMClient.
|
||||||
|
@ -336,4 +340,63 @@ public abstract class AMRMClient<T extends AMRMClient.ContainerRequest> extends
|
||||||
return nmTokenCache;
|
return nmTokenCache;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Wait for <code>check</code> to return true for each 1000 ms.
|
||||||
|
* See also {@link #waitFor(com.google.common.base.Supplier, int)}
|
||||||
|
* and {@link #waitFor(com.google.common.base.Supplier, int, int)}
|
||||||
|
* @param check
|
||||||
|
*/
|
||||||
|
public void waitFor(Supplier<Boolean> check) throws InterruptedException {
|
||||||
|
waitFor(check, 1000);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Wait for <code>check</code> to return true for each
|
||||||
|
* <code>checkEveryMillis</code> ms.
|
||||||
|
* See also {@link #waitFor(com.google.common.base.Supplier, int, int)}
|
||||||
|
* @param check user defined checker
|
||||||
|
* @param checkEveryMillis interval to call <code>check</code>
|
||||||
|
*/
|
||||||
|
public void waitFor(Supplier<Boolean> check, int checkEveryMillis)
|
||||||
|
throws InterruptedException {
|
||||||
|
waitFor(check, checkEveryMillis, 1);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Wait for <code>check</code> to return true for each
|
||||||
|
* <code>checkEveryMillis</code> ms. In the main loop, this method will log
|
||||||
|
* the message "waiting in main loop" for each <code>logInterval</code> times
|
||||||
|
* iteration to confirm the thread is alive.
|
||||||
|
* @param check user defined checker
|
||||||
|
* @param checkEveryMillis interval to call <code>check</code>
|
||||||
|
* @param logInterval interval to log for each
|
||||||
|
*/
|
||||||
|
public void waitFor(Supplier<Boolean> check, int checkEveryMillis,
|
||||||
|
int logInterval) throws InterruptedException {
|
||||||
|
Preconditions.checkNotNull(check, "check should not be null");
|
||||||
|
Preconditions.checkArgument(checkEveryMillis >= 0,
|
||||||
|
"checkEveryMillis should be positive value");
|
||||||
|
Preconditions.checkArgument(logInterval >= 0,
|
||||||
|
"logInterval should be positive value");
|
||||||
|
|
||||||
|
int loggingCounter = logInterval;
|
||||||
|
do {
|
||||||
|
if (LOG.isDebugEnabled()) {
|
||||||
|
LOG.debug("Check the condition for main loop.");
|
||||||
|
}
|
||||||
|
|
||||||
|
boolean result = check.get();
|
||||||
|
if (result) {
|
||||||
|
LOG.info("Exits the main loop.");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
if (--loggingCounter <= 0) {
|
||||||
|
LOG.info("Waiting in main loop.");
|
||||||
|
loggingCounter = logInterval;
|
||||||
|
}
|
||||||
|
|
||||||
|
Thread.sleep(checkEveryMillis);
|
||||||
|
} while (true);
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -18,11 +18,15 @@
|
||||||
|
|
||||||
package org.apache.hadoop.yarn.client.api.async;
|
package org.apache.hadoop.yarn.client.api.async;
|
||||||
|
|
||||||
|
import com.google.common.base.Preconditions;
|
||||||
|
import com.google.common.base.Supplier;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||||
import org.apache.hadoop.classification.InterfaceAudience.Public;
|
import org.apache.hadoop.classification.InterfaceAudience.Public;
|
||||||
import org.apache.hadoop.classification.InterfaceStability.Stable;
|
import org.apache.hadoop.classification.InterfaceStability.Stable;
|
||||||
|
@ -90,6 +94,7 @@ import com.google.common.annotations.VisibleForTesting;
|
||||||
@Stable
|
@Stable
|
||||||
public abstract class AMRMClientAsync<T extends ContainerRequest>
|
public abstract class AMRMClientAsync<T extends ContainerRequest>
|
||||||
extends AbstractService {
|
extends AbstractService {
|
||||||
|
private static final Log LOG = LogFactory.getLog(AMRMClientAsync.class);
|
||||||
|
|
||||||
protected final AMRMClient<T> client;
|
protected final AMRMClient<T> client;
|
||||||
protected final CallbackHandler handler;
|
protected final CallbackHandler handler;
|
||||||
|
@ -189,6 +194,65 @@ extends AbstractService {
|
||||||
*/
|
*/
|
||||||
public abstract int getClusterNodeCount();
|
public abstract int getClusterNodeCount();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Wait for <code>check</code> to return true for each 1000 ms.
|
||||||
|
* See also {@link #waitFor(com.google.common.base.Supplier, int)}
|
||||||
|
* and {@link #waitFor(com.google.common.base.Supplier, int, int)}
|
||||||
|
* @param check
|
||||||
|
*/
|
||||||
|
public void waitFor(Supplier<Boolean> check) throws InterruptedException {
|
||||||
|
waitFor(check, 1000);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Wait for <code>check</code> to return true for each
|
||||||
|
* <code>checkEveryMillis</code> ms.
|
||||||
|
* See also {@link #waitFor(com.google.common.base.Supplier, int, int)}
|
||||||
|
* @param check user defined checker
|
||||||
|
* @param checkEveryMillis interval to call <code>check</code>
|
||||||
|
*/
|
||||||
|
public void waitFor(Supplier<Boolean> check, int checkEveryMillis)
|
||||||
|
throws InterruptedException {
|
||||||
|
waitFor(check, checkEveryMillis, 1);
|
||||||
|
};
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Wait for <code>check</code> to return true for each
|
||||||
|
* <code>checkEveryMillis</code> ms. In the main loop, this method will log
|
||||||
|
* the message "waiting in main loop" for each <code>logInterval</code> times
|
||||||
|
* iteration to confirm the thread is alive.
|
||||||
|
* @param check user defined checker
|
||||||
|
* @param checkEveryMillis interval to call <code>check</code>
|
||||||
|
* @param logInterval interval to log for each
|
||||||
|
*/
|
||||||
|
public void waitFor(Supplier<Boolean> check, int checkEveryMillis,
|
||||||
|
int logInterval) throws InterruptedException {
|
||||||
|
Preconditions.checkNotNull(check, "check should not be null");
|
||||||
|
Preconditions.checkArgument(checkEveryMillis >= 0,
|
||||||
|
"checkEveryMillis should be positive value");
|
||||||
|
Preconditions.checkArgument(logInterval >= 0,
|
||||||
|
"logInterval should be positive value");
|
||||||
|
|
||||||
|
int loggingCounter = logInterval;
|
||||||
|
do {
|
||||||
|
if (LOG.isDebugEnabled()) {
|
||||||
|
LOG.debug("Check the condition for main loop.");
|
||||||
|
}
|
||||||
|
|
||||||
|
boolean result = check.get();
|
||||||
|
if (result) {
|
||||||
|
LOG.info("Exits the main loop.");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
if (--loggingCounter <= 0) {
|
||||||
|
LOG.info("Waiting in main loop.");
|
||||||
|
loggingCounter = logInterval;
|
||||||
|
}
|
||||||
|
|
||||||
|
Thread.sleep(checkEveryMillis);
|
||||||
|
} while (true);
|
||||||
|
}
|
||||||
|
|
||||||
public interface CallbackHandler {
|
public interface CallbackHandler {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -18,6 +18,7 @@
|
||||||
|
|
||||||
package org.apache.hadoop.yarn.client.api.async.impl;
|
package org.apache.hadoop.yarn.client.api.async.impl;
|
||||||
|
|
||||||
|
import com.google.common.base.Supplier;
|
||||||
import static org.mockito.Matchers.anyFloat;
|
import static org.mockito.Matchers.anyFloat;
|
||||||
import static org.mockito.Matchers.anyInt;
|
import static org.mockito.Matchers.anyInt;
|
||||||
import static org.mockito.Matchers.anyString;
|
import static org.mockito.Matchers.anyString;
|
||||||
|
@ -180,7 +181,7 @@ public class TestAMRMClientAsync {
|
||||||
AMRMClient<ContainerRequest> client = mock(AMRMClientImpl.class);
|
AMRMClient<ContainerRequest> client = mock(AMRMClientImpl.class);
|
||||||
when(client.allocate(anyFloat())).thenThrow(ex);
|
when(client.allocate(anyFloat())).thenThrow(ex);
|
||||||
|
|
||||||
AMRMClientAsync<ContainerRequest> asyncClient =
|
AMRMClientAsync<ContainerRequest> asyncClient =
|
||||||
AMRMClientAsync.createAMRMClientAsync(client, 20, callbackHandler);
|
AMRMClientAsync.createAMRMClientAsync(client, 20, callbackHandler);
|
||||||
asyncClient.init(conf);
|
asyncClient.init(conf);
|
||||||
asyncClient.start();
|
asyncClient.start();
|
||||||
|
@ -228,6 +229,41 @@ public class TestAMRMClientAsync {
|
||||||
asyncClient.stop();
|
asyncClient.stop();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test (timeout = 10000)
|
||||||
|
public void testAMRMClientAsyncShutDownWithWaitFor() throws Exception {
|
||||||
|
Configuration conf = new Configuration();
|
||||||
|
final TestCallbackHandler callbackHandler = new TestCallbackHandler();
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
|
AMRMClient<ContainerRequest> client = mock(AMRMClientImpl.class);
|
||||||
|
|
||||||
|
final AllocateResponse shutDownResponse = createAllocateResponse(
|
||||||
|
new ArrayList<ContainerStatus>(), new ArrayList<Container>(), null);
|
||||||
|
shutDownResponse.setAMCommand(AMCommand.AM_SHUTDOWN);
|
||||||
|
when(client.allocate(anyFloat())).thenReturn(shutDownResponse);
|
||||||
|
|
||||||
|
AMRMClientAsync<ContainerRequest> asyncClient =
|
||||||
|
AMRMClientAsync.createAMRMClientAsync(client, 10, callbackHandler);
|
||||||
|
asyncClient.init(conf);
|
||||||
|
asyncClient.start();
|
||||||
|
|
||||||
|
Supplier<Boolean> checker = new Supplier<Boolean>() {
|
||||||
|
@Override
|
||||||
|
public Boolean get() {
|
||||||
|
return callbackHandler.reboot;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
asyncClient.registerApplicationMaster("localhost", 1234, null);
|
||||||
|
asyncClient.waitFor(checker);
|
||||||
|
|
||||||
|
asyncClient.stop();
|
||||||
|
// stopping should have joined all threads and completed all callbacks
|
||||||
|
Assert.assertTrue(callbackHandler.callbackCount == 0);
|
||||||
|
|
||||||
|
verify(client, times(1)).allocate(anyFloat());
|
||||||
|
asyncClient.stop();
|
||||||
|
}
|
||||||
|
|
||||||
@Test (timeout = 5000)
|
@Test (timeout = 5000)
|
||||||
public void testCallAMRMClientAsyncStopFromCallbackHandler()
|
public void testCallAMRMClientAsyncStopFromCallbackHandler()
|
||||||
throws YarnException, IOException, InterruptedException {
|
throws YarnException, IOException, InterruptedException {
|
||||||
|
@ -262,6 +298,40 @@ public class TestAMRMClientAsync {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test (timeout = 5000)
|
||||||
|
public void testCallAMRMClientAsyncStopFromCallbackHandlerWithWaitFor()
|
||||||
|
throws YarnException, IOException, InterruptedException {
|
||||||
|
Configuration conf = new Configuration();
|
||||||
|
final TestCallbackHandler2 callbackHandler = new TestCallbackHandler2();
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
|
AMRMClient<ContainerRequest> client = mock(AMRMClientImpl.class);
|
||||||
|
|
||||||
|
List<ContainerStatus> completed = Arrays.asList(
|
||||||
|
ContainerStatus.newInstance(newContainerId(0, 0, 0, 0),
|
||||||
|
ContainerState.COMPLETE, "", 0));
|
||||||
|
final AllocateResponse response = createAllocateResponse(completed,
|
||||||
|
new ArrayList<Container>(), null);
|
||||||
|
|
||||||
|
when(client.allocate(anyFloat())).thenReturn(response);
|
||||||
|
|
||||||
|
AMRMClientAsync<ContainerRequest> asyncClient =
|
||||||
|
AMRMClientAsync.createAMRMClientAsync(client, 20, callbackHandler);
|
||||||
|
callbackHandler.asynClient = asyncClient;
|
||||||
|
asyncClient.init(conf);
|
||||||
|
asyncClient.start();
|
||||||
|
|
||||||
|
Supplier<Boolean> checker = new Supplier<Boolean>() {
|
||||||
|
@Override
|
||||||
|
public Boolean get() {
|
||||||
|
return callbackHandler.notify;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
asyncClient.registerApplicationMaster("localhost", 1234, null);
|
||||||
|
asyncClient.waitFor(checker);
|
||||||
|
Assert.assertTrue(checker.get());
|
||||||
|
}
|
||||||
|
|
||||||
void runCallBackThrowOutException(TestCallbackHandler2 callbackHandler) throws
|
void runCallBackThrowOutException(TestCallbackHandler2 callbackHandler) throws
|
||||||
InterruptedException, YarnException, IOException {
|
InterruptedException, YarnException, IOException {
|
||||||
Configuration conf = new Configuration();
|
Configuration conf = new Configuration();
|
||||||
|
@ -342,7 +412,7 @@ public class TestAMRMClientAsync {
|
||||||
private volatile List<ContainerStatus> completedContainers;
|
private volatile List<ContainerStatus> completedContainers;
|
||||||
private volatile List<Container> allocatedContainers;
|
private volatile List<Container> allocatedContainers;
|
||||||
Exception savedException = null;
|
Exception savedException = null;
|
||||||
boolean reboot = false;
|
volatile boolean reboot = false;
|
||||||
Object notifier = new Object();
|
Object notifier = new Object();
|
||||||
|
|
||||||
int callbackCount = 0;
|
int callbackCount = 0;
|
||||||
|
@ -432,7 +502,7 @@ public class TestAMRMClientAsync {
|
||||||
@SuppressWarnings("rawtypes")
|
@SuppressWarnings("rawtypes")
|
||||||
AMRMClientAsync asynClient;
|
AMRMClientAsync asynClient;
|
||||||
boolean stop = true;
|
boolean stop = true;
|
||||||
boolean notify = false;
|
volatile boolean notify = false;
|
||||||
boolean throwOutException = false;
|
boolean throwOutException = false;
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -18,6 +18,7 @@
|
||||||
|
|
||||||
package org.apache.hadoop.yarn.client.api.impl;
|
package org.apache.hadoop.yarn.client.api.impl;
|
||||||
|
|
||||||
|
import com.google.common.base.Supplier;
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
import static org.junit.Assert.fail;
|
import static org.junit.Assert.fail;
|
||||||
|
@ -814,6 +815,40 @@ public class TestAMRMClient {
|
||||||
assertEquals(0, amClient.ask.size());
|
assertEquals(0, amClient.ask.size());
|
||||||
assertEquals(0, amClient.release.size());
|
assertEquals(0, amClient.release.size());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
class CountDownSupplier implements Supplier<Boolean> {
|
||||||
|
int counter = 0;
|
||||||
|
@Override
|
||||||
|
public Boolean get() {
|
||||||
|
counter++;
|
||||||
|
if (counter >= 3) {
|
||||||
|
return true;
|
||||||
|
} else {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testWaitFor() throws InterruptedException {
|
||||||
|
AMRMClientImpl<ContainerRequest> amClient = null;
|
||||||
|
CountDownSupplier countDownChecker = new CountDownSupplier();
|
||||||
|
|
||||||
|
try {
|
||||||
|
// start am rm client
|
||||||
|
amClient =
|
||||||
|
(AMRMClientImpl<ContainerRequest>) AMRMClient
|
||||||
|
.<ContainerRequest> createAMRMClient();
|
||||||
|
amClient.init(new YarnConfiguration());
|
||||||
|
amClient.start();
|
||||||
|
amClient.waitFor(countDownChecker, 1000);
|
||||||
|
assertEquals(3, countDownChecker.counter);
|
||||||
|
} finally {
|
||||||
|
if (amClient != null) {
|
||||||
|
amClient.stop();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private void sleep(int sleepTime) {
|
private void sleep(int sleepTime) {
|
||||||
try {
|
try {
|
||||||
|
|
Loading…
Reference in New Issue