YARN-8243. Flex down should remove instance with largest component instance ID first. Contributed by Gour Saha
(cherry picked from commit ca612e353f
)
This commit is contained in:
parent
f6d6a2ab04
commit
355ff085e6
|
@ -237,12 +237,11 @@ public class ServiceManager implements EventHandler<ServiceEvent> {
|
||||||
* ServiceMaster.checkAndUpdateServiceState here to make it easy to fix
|
* ServiceMaster.checkAndUpdateServiceState here to make it easy to fix
|
||||||
* this in future.
|
* this in future.
|
||||||
*/
|
*/
|
||||||
public void checkAndUpdateServiceState(boolean isIncrement) {
|
public void checkAndUpdateServiceState() {
|
||||||
writeLock.lock();
|
writeLock.lock();
|
||||||
try {
|
try {
|
||||||
if (!getState().equals(State.UPGRADING)) {
|
if (!getState().equals(State.UPGRADING)) {
|
||||||
ServiceMaster.checkAndUpdateServiceState(this.scheduler,
|
ServiceMaster.checkAndUpdateServiceState(this.scheduler);
|
||||||
isIncrement);
|
|
||||||
}
|
}
|
||||||
} finally {
|
} finally {
|
||||||
writeLock.unlock();
|
writeLock.unlock();
|
||||||
|
|
|
@ -264,13 +264,9 @@ public class ServiceMaster extends CompositeService {
|
||||||
// This method should be called whenever there is an increment or decrement
|
// This method should be called whenever there is an increment or decrement
|
||||||
// of a READY state component of a service
|
// of a READY state component of a service
|
||||||
public static synchronized void checkAndUpdateServiceState(
|
public static synchronized void checkAndUpdateServiceState(
|
||||||
ServiceScheduler scheduler, boolean isIncrement) {
|
ServiceScheduler scheduler) {
|
||||||
ServiceState curState = scheduler.getApp().getState();
|
ServiceState curState = scheduler.getApp().getState();
|
||||||
if (!isIncrement) {
|
// Check the state of all components
|
||||||
// set it to STARTED every time a component moves out of STABLE state
|
|
||||||
scheduler.getApp().setState(ServiceState.STARTED);
|
|
||||||
} else {
|
|
||||||
// otherwise check the state of all components
|
|
||||||
boolean isStable = true;
|
boolean isStable = true;
|
||||||
for (org.apache.hadoop.yarn.service.api.records.Component comp : scheduler
|
for (org.apache.hadoop.yarn.service.api.records.Component comp : scheduler
|
||||||
.getApp().getComponents()) {
|
.getApp().getComponents()) {
|
||||||
|
@ -289,7 +285,6 @@ public class ServiceMaster extends CompositeService {
|
||||||
scheduler.getApp().setState(ServiceState.STARTED);
|
scheduler.getApp().setState(ServiceState.STARTED);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
|
||||||
if (curState != scheduler.getApp().getState()) {
|
if (curState != scheduler.getApp().getState()) {
|
||||||
LOG.info("Service state changed from {} -> {}", curState,
|
LOG.info("Service state changed from {} -> {}", curState,
|
||||||
scheduler.getApp().getState());
|
scheduler.getApp().getState());
|
||||||
|
|
|
@ -323,7 +323,7 @@ public class Component implements EventHandler<ComponentEvent> {
|
||||||
org.apache.hadoop.yarn.service.api.records.ComponentState.FLEXING);
|
org.apache.hadoop.yarn.service.api.records.ComponentState.FLEXING);
|
||||||
component.getScheduler().getApp().setState(ServiceState.STARTED);
|
component.getScheduler().getApp().setState(ServiceState.STARTED);
|
||||||
return FLEXING;
|
return FLEXING;
|
||||||
} else if (delta < 0){
|
} else if (delta < 0) {
|
||||||
delta = 0 - delta;
|
delta = 0 - delta;
|
||||||
// scale down
|
// scale down
|
||||||
LOG.info("[FLEX DOWN COMPONENT " + component.getName()
|
LOG.info("[FLEX DOWN COMPONENT " + component.getName()
|
||||||
|
@ -343,7 +343,9 @@ public class Component implements EventHandler<ComponentEvent> {
|
||||||
instance.destroy();
|
instance.destroy();
|
||||||
}
|
}
|
||||||
checkAndUpdateComponentState(component, false);
|
checkAndUpdateComponentState(component, false);
|
||||||
return STABLE;
|
return component.componentSpec.getState()
|
||||||
|
== org.apache.hadoop.yarn.service.api.records.ComponentState.STABLE
|
||||||
|
? STABLE : FLEXING;
|
||||||
} else {
|
} else {
|
||||||
LOG.info("[FLEX COMPONENT " + component.getName() + "]: already has " +
|
LOG.info("[FLEX COMPONENT " + component.getName() + "]: already has " +
|
||||||
event.getDesired() + " instances, ignoring");
|
event.getDesired() + " instances, ignoring");
|
||||||
|
@ -440,7 +442,7 @@ public class Component implements EventHandler<ComponentEvent> {
|
||||||
component.componentSpec.getState());
|
component.componentSpec.getState());
|
||||||
}
|
}
|
||||||
// component state change will trigger re-check of service state
|
// component state change will trigger re-check of service state
|
||||||
component.context.getServiceManager().checkAndUpdateServiceState(true);
|
component.context.getServiceManager().checkAndUpdateServiceState();
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
// container moving out of READY state could be because of FLEX down so
|
// container moving out of READY state could be because of FLEX down so
|
||||||
|
@ -449,14 +451,18 @@ public class Component implements EventHandler<ComponentEvent> {
|
||||||
.value() < component.componentMetrics.containersDesired.value()) {
|
.value() < component.componentMetrics.containersDesired.value()) {
|
||||||
component.componentSpec.setState(
|
component.componentSpec.setState(
|
||||||
org.apache.hadoop.yarn.service.api.records.ComponentState.FLEXING);
|
org.apache.hadoop.yarn.service.api.records.ComponentState.FLEXING);
|
||||||
|
} else if (component.componentMetrics.containersReady
|
||||||
|
.value() == component.componentMetrics.containersDesired.value()) {
|
||||||
|
component.componentSpec.setState(
|
||||||
|
org.apache.hadoop.yarn.service.api.records.ComponentState.STABLE);
|
||||||
|
}
|
||||||
if (curState != component.componentSpec.getState()) {
|
if (curState != component.componentSpec.getState()) {
|
||||||
LOG.info("[COMPONENT {}] state changed from {} -> {}",
|
LOG.info("[COMPONENT {}] state changed from {} -> {}",
|
||||||
component.componentSpec.getName(), curState,
|
component.componentSpec.getName(), curState,
|
||||||
component.componentSpec.getState());
|
component.componentSpec.getState());
|
||||||
}
|
}
|
||||||
// component state change will trigger re-check of service state
|
// component state change will trigger re-check of service state
|
||||||
component.context.getServiceManager().checkAndUpdateServiceState(false);
|
component.context.getServiceManager().checkAndUpdateServiceState();
|
||||||
}
|
|
||||||
}
|
}
|
||||||
// when the service is stable then the state of component needs to
|
// when the service is stable then the state of component needs to
|
||||||
// transition to stable
|
// transition to stable
|
||||||
|
|
|
@ -581,14 +581,7 @@ public class ComponentInstance implements EventHandler<ComponentInstanceEvent>,
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public int compareTo(ComponentInstance to) {
|
public int compareTo(ComponentInstance to) {
|
||||||
long delta = containerStartedTime - to.containerStartedTime;
|
|
||||||
if (delta == 0) {
|
|
||||||
return getCompInstanceId().compareTo(to.getCompInstanceId());
|
return getCompInstanceId().compareTo(to.getCompInstanceId());
|
||||||
} else if (delta < 0) {
|
|
||||||
return -1;
|
|
||||||
} else {
|
|
||||||
return 1;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override public boolean equals(Object o) {
|
@Override public boolean equals(Object o) {
|
||||||
|
|
|
@ -484,8 +484,37 @@ public class TestYarnNativeServices extends ServiceTestUtils {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Flex compa up to 4, which is more containers than the no of NMs
|
// Flex compa up to 5, which is more containers than the no of NMs
|
||||||
Map<String, Long> compCounts = new HashMap<>();
|
Map<String, Long> compCounts = new HashMap<>();
|
||||||
|
compCounts.put("compa", 5L);
|
||||||
|
exampleApp.getComponent("compa").setNumberOfContainers(5L);
|
||||||
|
client.flexByRestService(exampleApp.getName(), compCounts);
|
||||||
|
try {
|
||||||
|
// 10 secs is enough for the container to be started. The down side of
|
||||||
|
// this test is that it has to wait that long. Setting a higher wait time
|
||||||
|
// will add to the total time taken by tests to run.
|
||||||
|
waitForServiceToBeStable(client, exampleApp, 10000);
|
||||||
|
Assert.fail("Service should not be in a stable state. It should throw "
|
||||||
|
+ "a timeout exception.");
|
||||||
|
} catch (Exception e) {
|
||||||
|
// Check that service state is not STABLE and only 3 containers are
|
||||||
|
// running and the fourth one should not get allocated.
|
||||||
|
service = client.getStatus(exampleApp.getName());
|
||||||
|
component = service.getComponent("compa");
|
||||||
|
Assert.assertNotEquals("Service state should not be STABLE",
|
||||||
|
ServiceState.STABLE, service.getState());
|
||||||
|
Assert.assertEquals("Component state should be FLEXING",
|
||||||
|
ComponentState.FLEXING, component.getState());
|
||||||
|
Assert.assertEquals("3 containers are expected to be running", 3,
|
||||||
|
component.getContainers().size());
|
||||||
|
}
|
||||||
|
|
||||||
|
// Flex compa down to 4 now, which is still more containers than the no of
|
||||||
|
// NMs. This tests the usecase that flex down does not kill any of the
|
||||||
|
// currently running containers since the required number of containers are
|
||||||
|
// still higher than the currently running number of containers. However,
|
||||||
|
// component state will still be FLEXING and service state not STABLE.
|
||||||
|
compCounts = new HashMap<>();
|
||||||
compCounts.put("compa", 4L);
|
compCounts.put("compa", 4L);
|
||||||
exampleApp.getComponent("compa").setNumberOfContainers(4L);
|
exampleApp.getComponent("compa").setNumberOfContainers(4L);
|
||||||
client.flexByRestService(exampleApp.getName(), compCounts);
|
client.flexByRestService(exampleApp.getName(), compCounts);
|
||||||
|
@ -509,6 +538,15 @@ public class TestYarnNativeServices extends ServiceTestUtils {
|
||||||
component.getContainers().size());
|
component.getContainers().size());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Finally flex compa down to 3, which is exactly the number of containers
|
||||||
|
// currently running. This will bring the component and service states to
|
||||||
|
// STABLE.
|
||||||
|
compCounts = new HashMap<>();
|
||||||
|
compCounts.put("compa", 3L);
|
||||||
|
exampleApp.getComponent("compa").setNumberOfContainers(3L);
|
||||||
|
client.flexByRestService(exampleApp.getName(), compCounts);
|
||||||
|
waitForServiceToBeStable(client, exampleApp);
|
||||||
|
|
||||||
LOG.info("Stop/destroy service {}", exampleApp);
|
LOG.info("Stop/destroy service {}", exampleApp);
|
||||||
client.actionStop(exampleApp.getName(), true);
|
client.actionStop(exampleApp.getName(), true);
|
||||||
client.actionDestroy(exampleApp.getName());
|
client.actionDestroy(exampleApp.getName());
|
||||||
|
|
Loading…
Reference in New Issue