YARN-8243. Flex down should remove instance with largest component instance ID first. Contributed by Gour Saha
This commit is contained in:
parent
dc912994a1
commit
ca612e353f
|
@ -237,12 +237,11 @@ public class ServiceManager implements EventHandler<ServiceEvent> {
|
|||
* ServiceMaster.checkAndUpdateServiceState here to make it easy to fix
|
||||
* this in future.
|
||||
*/
|
||||
public void checkAndUpdateServiceState(boolean isIncrement) {
|
||||
public void checkAndUpdateServiceState() {
|
||||
writeLock.lock();
|
||||
try {
|
||||
if (!getState().equals(State.UPGRADING)) {
|
||||
ServiceMaster.checkAndUpdateServiceState(this.scheduler,
|
||||
isIncrement);
|
||||
ServiceMaster.checkAndUpdateServiceState(this.scheduler);
|
||||
}
|
||||
} finally {
|
||||
writeLock.unlock();
|
||||
|
|
|
@ -264,30 +264,25 @@ public class ServiceMaster extends CompositeService {
|
|||
// This method should be called whenever there is an increment or decrement
|
||||
// of a READY state component of a service
|
||||
public static synchronized void checkAndUpdateServiceState(
|
||||
ServiceScheduler scheduler, boolean isIncrement) {
|
||||
ServiceScheduler scheduler) {
|
||||
ServiceState curState = scheduler.getApp().getState();
|
||||
if (!isIncrement) {
|
||||
// set it to STARTED every time a component moves out of STABLE state
|
||||
scheduler.getApp().setState(ServiceState.STARTED);
|
||||
} else {
|
||||
// otherwise check the state of all components
|
||||
boolean isStable = true;
|
||||
for (org.apache.hadoop.yarn.service.api.records.Component comp : scheduler
|
||||
.getApp().getComponents()) {
|
||||
if (comp.getState() !=
|
||||
org.apache.hadoop.yarn.service.api.records.ComponentState.STABLE) {
|
||||
isStable = false;
|
||||
break;
|
||||
}
|
||||
// Check the state of all components
|
||||
boolean isStable = true;
|
||||
for (org.apache.hadoop.yarn.service.api.records.Component comp : scheduler
|
||||
.getApp().getComponents()) {
|
||||
if (comp.getState() !=
|
||||
org.apache.hadoop.yarn.service.api.records.ComponentState.STABLE) {
|
||||
isStable = false;
|
||||
break;
|
||||
}
|
||||
if (isStable) {
|
||||
scheduler.getApp().setState(ServiceState.STABLE);
|
||||
} else {
|
||||
// mark new state as started only if current state is stable, otherwise
|
||||
// leave it as is
|
||||
if (curState == ServiceState.STABLE) {
|
||||
scheduler.getApp().setState(ServiceState.STARTED);
|
||||
}
|
||||
}
|
||||
if (isStable) {
|
||||
scheduler.getApp().setState(ServiceState.STABLE);
|
||||
} else {
|
||||
// mark new state as started only if current state is stable, otherwise
|
||||
// leave it as is
|
||||
if (curState == ServiceState.STABLE) {
|
||||
scheduler.getApp().setState(ServiceState.STARTED);
|
||||
}
|
||||
}
|
||||
if (curState != scheduler.getApp().getState()) {
|
||||
|
|
|
@ -323,7 +323,7 @@ public class Component implements EventHandler<ComponentEvent> {
|
|||
org.apache.hadoop.yarn.service.api.records.ComponentState.FLEXING);
|
||||
component.getScheduler().getApp().setState(ServiceState.STARTED);
|
||||
return FLEXING;
|
||||
} else if (delta < 0){
|
||||
} else if (delta < 0) {
|
||||
delta = 0 - delta;
|
||||
// scale down
|
||||
LOG.info("[FLEX DOWN COMPONENT " + component.getName()
|
||||
|
@ -343,7 +343,9 @@ public class Component implements EventHandler<ComponentEvent> {
|
|||
instance.destroy();
|
||||
}
|
||||
checkAndUpdateComponentState(component, false);
|
||||
return STABLE;
|
||||
return component.componentSpec.getState()
|
||||
== org.apache.hadoop.yarn.service.api.records.ComponentState.STABLE
|
||||
? STABLE : FLEXING;
|
||||
} else {
|
||||
LOG.info("[FLEX COMPONENT " + component.getName() + "]: already has " +
|
||||
event.getDesired() + " instances, ignoring");
|
||||
|
@ -440,7 +442,7 @@ public class Component implements EventHandler<ComponentEvent> {
|
|||
component.componentSpec.getState());
|
||||
}
|
||||
// component state change will trigger re-check of service state
|
||||
component.context.getServiceManager().checkAndUpdateServiceState(true);
|
||||
component.context.getServiceManager().checkAndUpdateServiceState();
|
||||
}
|
||||
} else {
|
||||
// container moving out of READY state could be because of FLEX down so
|
||||
|
@ -449,14 +451,18 @@ public class Component implements EventHandler<ComponentEvent> {
|
|||
.value() < component.componentMetrics.containersDesired.value()) {
|
||||
component.componentSpec.setState(
|
||||
org.apache.hadoop.yarn.service.api.records.ComponentState.FLEXING);
|
||||
if (curState != component.componentSpec.getState()) {
|
||||
LOG.info("[COMPONENT {}] state changed from {} -> {}",
|
||||
component.componentSpec.getName(), curState,
|
||||
component.componentSpec.getState());
|
||||
}
|
||||
// component state change will trigger re-check of service state
|
||||
component.context.getServiceManager().checkAndUpdateServiceState(false);
|
||||
} else if (component.componentMetrics.containersReady
|
||||
.value() == component.componentMetrics.containersDesired.value()) {
|
||||
component.componentSpec.setState(
|
||||
org.apache.hadoop.yarn.service.api.records.ComponentState.STABLE);
|
||||
}
|
||||
if (curState != component.componentSpec.getState()) {
|
||||
LOG.info("[COMPONENT {}] state changed from {} -> {}",
|
||||
component.componentSpec.getName(), curState,
|
||||
component.componentSpec.getState());
|
||||
}
|
||||
// component state change will trigger re-check of service state
|
||||
component.context.getServiceManager().checkAndUpdateServiceState();
|
||||
}
|
||||
// when the service is stable then the state of component needs to
|
||||
// transition to stable
|
||||
|
|
|
@ -581,14 +581,7 @@ public class ComponentInstance implements EventHandler<ComponentInstanceEvent>,
|
|||
|
||||
@Override
|
||||
public int compareTo(ComponentInstance to) {
|
||||
long delta = containerStartedTime - to.containerStartedTime;
|
||||
if (delta == 0) {
|
||||
return getCompInstanceId().compareTo(to.getCompInstanceId());
|
||||
} else if (delta < 0) {
|
||||
return -1;
|
||||
} else {
|
||||
return 1;
|
||||
}
|
||||
return getCompInstanceId().compareTo(to.getCompInstanceId());
|
||||
}
|
||||
|
||||
@Override public boolean equals(Object o) {
|
||||
|
|
|
@ -484,8 +484,37 @@ public class TestYarnNativeServices extends ServiceTestUtils {
|
|||
}
|
||||
}
|
||||
|
||||
// Flex compa up to 4, which is more containers than the no of NMs
|
||||
// Flex compa up to 5, which is more containers than the no of NMs
|
||||
Map<String, Long> compCounts = new HashMap<>();
|
||||
compCounts.put("compa", 5L);
|
||||
exampleApp.getComponent("compa").setNumberOfContainers(5L);
|
||||
client.flexByRestService(exampleApp.getName(), compCounts);
|
||||
try {
|
||||
// 10 secs is enough for the container to be started. The down side of
|
||||
// this test is that it has to wait that long. Setting a higher wait time
|
||||
// will add to the total time taken by tests to run.
|
||||
waitForServiceToBeStable(client, exampleApp, 10000);
|
||||
Assert.fail("Service should not be in a stable state. It should throw "
|
||||
+ "a timeout exception.");
|
||||
} catch (Exception e) {
|
||||
// Check that service state is not STABLE and only 3 containers are
|
||||
// running and the fourth one should not get allocated.
|
||||
service = client.getStatus(exampleApp.getName());
|
||||
component = service.getComponent("compa");
|
||||
Assert.assertNotEquals("Service state should not be STABLE",
|
||||
ServiceState.STABLE, service.getState());
|
||||
Assert.assertEquals("Component state should be FLEXING",
|
||||
ComponentState.FLEXING, component.getState());
|
||||
Assert.assertEquals("3 containers are expected to be running", 3,
|
||||
component.getContainers().size());
|
||||
}
|
||||
|
||||
// Flex compa down to 4 now, which is still more containers than the no of
|
||||
// NMs. This tests the usecase that flex down does not kill any of the
|
||||
// currently running containers since the required number of containers are
|
||||
// still higher than the currently running number of containers. However,
|
||||
// component state will still be FLEXING and service state not STABLE.
|
||||
compCounts = new HashMap<>();
|
||||
compCounts.put("compa", 4L);
|
||||
exampleApp.getComponent("compa").setNumberOfContainers(4L);
|
||||
client.flexByRestService(exampleApp.getName(), compCounts);
|
||||
|
@ -509,6 +538,15 @@ public class TestYarnNativeServices extends ServiceTestUtils {
|
|||
component.getContainers().size());
|
||||
}
|
||||
|
||||
// Finally flex compa down to 3, which is exactly the number of containers
|
||||
// currently running. This will bring the component and service states to
|
||||
// STABLE.
|
||||
compCounts = new HashMap<>();
|
||||
compCounts.put("compa", 3L);
|
||||
exampleApp.getComponent("compa").setNumberOfContainers(3L);
|
||||
client.flexByRestService(exampleApp.getName(), compCounts);
|
||||
waitForServiceToBeStable(client, exampleApp);
|
||||
|
||||
LOG.info("Stop/destroy service {}", exampleApp);
|
||||
client.actionStop(exampleApp.getName(), true);
|
||||
client.actionDestroy(exampleApp.getName());
|
||||
|
|
Loading…
Reference in New Issue