HADOOP-10085. CompositeService should allow adding services while being inited. (Steve Loughran via kasha)
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1563696 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
ad680d119c
commit
ef4cbe7c15
|
@ -21,6 +21,9 @@ Release 2.4.0 - UNRELEASED
|
|||
HADOOP-10320. Javadoc in InterfaceStability.java lacks final </ul>.
|
||||
(René Nyffenegger via cnauroth)
|
||||
|
||||
HADOOP-10085. CompositeService should allow adding services while being
|
||||
inited. (Steve Loughran via kasha)
|
||||
|
||||
Release 2.3.0 - UNRELEASED
|
||||
|
||||
INCOMPATIBLE CHANGES
|
||||
|
|
|
@ -19,7 +19,6 @@
|
|||
package org.apache.hadoop.service;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
|
@ -54,13 +53,13 @@ public class CompositeService extends AbstractService {
|
|||
}
|
||||
|
||||
/**
|
||||
* Get an unmodifiable list of services
|
||||
* Get a cloned list of services
|
||||
* @return a list of child services at the time of invocation -
|
||||
* added services will not be picked up.
|
||||
*/
|
||||
public List<Service> getServices() {
|
||||
synchronized (serviceList) {
|
||||
return Collections.unmodifiableList(serviceList);
|
||||
return new ArrayList<Service>(serviceList);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -16,26 +16,20 @@
|
|||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.util;
|
||||
package org.apache.hadoop.service;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.service.Service.STATE;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assert.fail;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.service.AbstractService;
|
||||
import org.apache.hadoop.service.BreakableService;
|
||||
import org.apache.hadoop.service.CompositeService;
|
||||
import org.apache.hadoop.service.Service;
|
||||
import org.apache.hadoop.service.ServiceStateException;
|
||||
import org.apache.hadoop.service.Service.STATE;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
public class TestCompositeService {
|
||||
|
||||
private static final int NUM_OF_SERVICES = 5;
|
||||
|
@ -156,7 +150,7 @@ public class TestCompositeService {
|
|||
try {
|
||||
serviceManager.start();
|
||||
fail("Exception should have been thrown due to startup failure of last service");
|
||||
} catch (YarnRuntimeException e) {
|
||||
} catch (ServiceTestRuntimeException e) {
|
||||
for (int i = 0; i < NUM_OF_SERVICES - 1; i++) {
|
||||
if (i >= FAILED_SERVICE_SEQ_NUMBER && STOP_ONLY_STARTED_SERVICES) {
|
||||
// Failed service state should be INITED
|
||||
|
@ -197,7 +191,7 @@ public class TestCompositeService {
|
|||
// Stop the composite service
|
||||
try {
|
||||
serviceManager.stop();
|
||||
} catch (YarnRuntimeException e) {
|
||||
} catch (ServiceTestRuntimeException e) {
|
||||
}
|
||||
assertInState(STATE.STOPPED, services);
|
||||
}
|
||||
|
@ -338,6 +332,40 @@ public class TestCompositeService {
|
|||
1, testService.getServices().size());
|
||||
}
|
||||
|
||||
@Test(timeout = 1000)
|
||||
public void testAddInitedSiblingInInit() throws Throwable {
|
||||
CompositeService parent = new CompositeService("parent");
|
||||
BreakableService sibling = new BreakableService();
|
||||
sibling.init(new Configuration());
|
||||
parent.addService(new AddSiblingService(parent,
|
||||
sibling,
|
||||
STATE.INITED));
|
||||
parent.init(new Configuration());
|
||||
parent.start();
|
||||
parent.stop();
|
||||
assertEquals("Incorrect number of services",
|
||||
2, parent.getServices().size());
|
||||
}
|
||||
|
||||
@Test(timeout = 1000)
|
||||
public void testAddUninitedSiblingInInit() throws Throwable {
|
||||
CompositeService parent = new CompositeService("parent");
|
||||
BreakableService sibling = new BreakableService();
|
||||
parent.addService(new AddSiblingService(parent,
|
||||
sibling,
|
||||
STATE.INITED));
|
||||
parent.init(new Configuration());
|
||||
try {
|
||||
parent.start();
|
||||
fail("Expected an exception, got " + parent);
|
||||
} catch (ServiceStateException e) {
|
||||
//expected
|
||||
}
|
||||
parent.stop();
|
||||
assertEquals("Incorrect number of services",
|
||||
2, parent.getServices().size());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRemoveService() {
|
||||
CompositeService testService = new CompositeService("TestService") {
|
||||
|
@ -365,6 +393,105 @@ public class TestCompositeService {
|
|||
2, testService.getServices().size());
|
||||
}
|
||||
|
||||
@Test(timeout = 1000)
|
||||
public void testAddStartedChildBeforeInit() throws Throwable {
|
||||
CompositeService parent = new CompositeService("parent");
|
||||
BreakableService child = new BreakableService();
|
||||
child.init(new Configuration());
|
||||
child.start();
|
||||
AddSiblingService.addChildToService(parent, child);
|
||||
try {
|
||||
parent.init(new Configuration());
|
||||
fail("Expected an exception, got " + parent);
|
||||
} catch (ServiceStateException e) {
|
||||
//expected
|
||||
}
|
||||
parent.stop();
|
||||
}
|
||||
|
||||
@Test(timeout = 1000)
|
||||
public void testAddStoppedChildBeforeInit() throws Throwable {
|
||||
CompositeService parent = new CompositeService("parent");
|
||||
BreakableService child = new BreakableService();
|
||||
child.init(new Configuration());
|
||||
child.start();
|
||||
child.stop();
|
||||
AddSiblingService.addChildToService(parent, child);
|
||||
try {
|
||||
parent.init(new Configuration());
|
||||
fail("Expected an exception, got " + parent);
|
||||
} catch (ServiceStateException e) {
|
||||
//expected
|
||||
}
|
||||
parent.stop();
|
||||
}
|
||||
|
||||
@Test(timeout = 1000)
|
||||
public void testAddStartedSiblingInStart() throws Throwable {
|
||||
CompositeService parent = new CompositeService("parent");
|
||||
BreakableService sibling = new BreakableService();
|
||||
sibling.init(new Configuration());
|
||||
sibling.start();
|
||||
parent.addService(new AddSiblingService(parent,
|
||||
sibling,
|
||||
STATE.STARTED));
|
||||
parent.init(new Configuration());
|
||||
parent.start();
|
||||
parent.stop();
|
||||
assertEquals("Incorrect number of services",
|
||||
2, parent.getServices().size());
|
||||
}
|
||||
|
||||
@Test(timeout = 1000)
|
||||
public void testAddUninitedSiblingInStart() throws Throwable {
|
||||
CompositeService parent = new CompositeService("parent");
|
||||
BreakableService sibling = new BreakableService();
|
||||
parent.addService(new AddSiblingService(parent,
|
||||
sibling,
|
||||
STATE.STARTED));
|
||||
parent.init(new Configuration());
|
||||
assertInState(STATE.NOTINITED, sibling);
|
||||
parent.start();
|
||||
parent.stop();
|
||||
assertEquals("Incorrect number of services",
|
||||
2, parent.getServices().size());
|
||||
}
|
||||
|
||||
@Test(timeout = 1000)
|
||||
public void testAddStartedSiblingInInit() throws Throwable {
|
||||
CompositeService parent = new CompositeService("parent");
|
||||
BreakableService sibling = new BreakableService();
|
||||
sibling.init(new Configuration());
|
||||
sibling.start();
|
||||
parent.addService(new AddSiblingService(parent,
|
||||
sibling,
|
||||
STATE.INITED));
|
||||
parent.init(new Configuration());
|
||||
assertInState(STATE.STARTED, sibling);
|
||||
parent.start();
|
||||
assertInState(STATE.STARTED, sibling);
|
||||
parent.stop();
|
||||
assertEquals("Incorrect number of services",
|
||||
2, parent.getServices().size());
|
||||
assertInState(STATE.STOPPED, sibling);
|
||||
}
|
||||
|
||||
@Test(timeout = 1000)
|
||||
public void testAddStartedSiblingInStop() throws Throwable {
|
||||
CompositeService parent = new CompositeService("parent");
|
||||
BreakableService sibling = new BreakableService();
|
||||
sibling.init(new Configuration());
|
||||
sibling.start();
|
||||
parent.addService(new AddSiblingService(parent,
|
||||
sibling,
|
||||
STATE.STOPPED));
|
||||
parent.init(new Configuration());
|
||||
parent.start();
|
||||
parent.stop();
|
||||
assertEquals("Incorrect number of services",
|
||||
2, parent.getServices().size());
|
||||
}
|
||||
|
||||
public static class CompositeServiceAddingAChild extends CompositeService{
|
||||
Service child;
|
||||
|
||||
|
@ -380,6 +507,17 @@ public class TestCompositeService {
|
|||
}
|
||||
}
|
||||
|
||||
public static class ServiceTestRuntimeException extends RuntimeException {
|
||||
public ServiceTestRuntimeException(String message) {
|
||||
super(message);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* This is a composite service that keeps a count of the number of lifecycle
|
||||
* events called, and can be set to throw a {@link ServiceTestRuntimeException }
|
||||
* during service start or stop
|
||||
*/
|
||||
public static class CompositeServiceImpl extends CompositeService {
|
||||
|
||||
public static boolean isPolicyToStopOnlyStartedServices() {
|
||||
|
@ -408,7 +546,7 @@ public class TestCompositeService {
|
|||
@Override
|
||||
protected void serviceStart() throws Exception {
|
||||
if (throwExceptionOnStart) {
|
||||
throw new YarnRuntimeException("Fake service start exception");
|
||||
throw new ServiceTestRuntimeException("Fake service start exception");
|
||||
}
|
||||
counter++;
|
||||
callSequenceNumber = counter;
|
||||
|
@ -420,7 +558,7 @@ public class TestCompositeService {
|
|||
counter++;
|
||||
callSequenceNumber = counter;
|
||||
if (throwExceptionOnStop) {
|
||||
throw new YarnRuntimeException("Fake service stop exception");
|
||||
throw new ServiceTestRuntimeException("Fake service stop exception");
|
||||
}
|
||||
super.serviceStop();
|
||||
}
|
||||
|
@ -457,6 +595,9 @@ public class TestCompositeService {
|
|||
|
||||
}
|
||||
|
||||
/**
|
||||
* Composite service that makes the addService method public to all
|
||||
*/
|
||||
public static class ServiceManager extends CompositeService {
|
||||
|
||||
public void addTestService(CompositeService service) {
|
||||
|
@ -468,4 +609,55 @@ public class TestCompositeService {
|
|||
}
|
||||
}
|
||||
|
||||
public static class AddSiblingService extends CompositeService {
|
||||
private final CompositeService parent;
|
||||
private final Service serviceToAdd;
|
||||
private STATE triggerState;
|
||||
|
||||
public AddSiblingService(CompositeService parent,
|
||||
Service serviceToAdd,
|
||||
STATE triggerState) {
|
||||
super("ParentStateManipulatorService");
|
||||
this.parent = parent;
|
||||
this.serviceToAdd = serviceToAdd;
|
||||
this.triggerState = triggerState;
|
||||
}
|
||||
|
||||
/**
|
||||
* Add the serviceToAdd to the parent if this service
|
||||
* is in the state requested
|
||||
*/
|
||||
private void maybeAddSibling() {
|
||||
if (getServiceState() == triggerState) {
|
||||
parent.addService(serviceToAdd);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void serviceInit(Configuration conf) throws Exception {
|
||||
maybeAddSibling();
|
||||
super.serviceInit(conf);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void serviceStart() throws Exception {
|
||||
maybeAddSibling();
|
||||
super.serviceStart();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void serviceStop() throws Exception {
|
||||
maybeAddSibling();
|
||||
super.serviceStop();
|
||||
}
|
||||
|
||||
/**
|
||||
* Expose addService method
|
||||
* @param parent parent service
|
||||
* @param child child to add
|
||||
*/
|
||||
public static void addChildToService(CompositeService parent, Service child) {
|
||||
parent.addService(child);
|
||||
}
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue