ARTEMIS-2912 Server start exception before activation can cause a zombie broker

This commit is contained in:
franz1981 2020-09-23 23:37:11 +02:00 committed by Clebert Suconic
parent f5dfe6812d
commit 207c2265bf
6 changed files with 105 additions and 23 deletions

View File

@ -88,7 +88,7 @@ public class Run extends LockAbstract {
AtomicBoolean serverActivationFailed = new AtomicBoolean(false);
server.getServer().registerActivationFailureListener(exception -> serverActivationFailed.set(true));
server.start();
server.getServer().addExternalComponent(managementContext);
server.getServer().addExternalComponent(managementContext, false);
if (broker.web != null) {
broker.components.add(broker.web);
@ -98,8 +98,8 @@ public class Run extends LockAbstract {
Class clazz = this.getClass().getClassLoader().loadClass(componentDTO.componentClassName);
ExternalComponent component = (ExternalComponent) clazz.newInstance();
component.configure(componentDTO, getBrokerInstance(), getBrokerHome());
server.getServer().addExternalComponent(component);
component.start();
server.getServer().addExternalComponent(component, true);
assert component.isStarted();
}
if (serverActivationFailed.get()) {

View File

@ -833,7 +833,16 @@ public interface ActiveMQServer extends ServiceComponent {
void setMBeanServer(MBeanServer mBeanServer);
void addExternalComponent(ActiveMQComponent externalComponent);
/**
* Adding external components is allowed only if the state
* isn't {@link SERVER_STATE#STOPPED} or {@link SERVER_STATE#STOPPING}.<br>
* It atomically starts the {@code externalComponent} while being added if {@code start == true}.<br>
* This atomicity is necessary to prevent {@link #stop()} to stop the component right after adding it, but before
* starting it.
*
* @throw IllegalStateException if the state is {@link SERVER_STATE#STOPPED} or {@link SERVER_STATE#STOPPING}
*/
void addExternalComponent(ActiveMQComponent externalComponent, boolean start) throws Exception;
List<ActiveMQComponent> getExternalComponents();

View File

@ -837,9 +837,23 @@ public class ActiveMQServerImpl implements ActiveMQServer {
this.mbeanServer = mbeanServer;
}
private void validateAddExternalComponent(ActiveMQComponent externalComponent) {
final SERVER_STATE state = this.state;
if (state == SERVER_STATE.STOPPED || state == SERVER_STATE.STOPPING) {
throw new IllegalStateException("cannot add " + externalComponent.getClass().getSimpleName() +
" if state is " + state);
}
}
@Override
public void addExternalComponent(ActiveMQComponent externalComponent) {
externalComponents.add(externalComponent);
public void addExternalComponent(ActiveMQComponent externalComponent, boolean start) throws Exception {
synchronized (externalComponents) {
validateAddExternalComponent(externalComponent);
externalComponents.add(externalComponent);
if (start) {
externalComponent.start();
}
}
}
@Override
@ -1261,17 +1275,7 @@ public class ActiveMQServerImpl implements ActiveMQServer {
connectedClientIds.clear();
for (ActiveMQComponent externalComponent : externalComponents) {
try {
if (externalComponent instanceof ServiceComponent) {
((ServiceComponent)externalComponent).stop(isShutdown || criticalIOError);
} else {
externalComponent.stop();
}
} catch (Exception e) {
ActiveMQServerLogger.LOGGER.errorStoppingComponent(e, externalComponent.getClass().getName());
}
}
stopExternalComponents(isShutdown || criticalIOError);
try {
this.analyzer.clear();
@ -4065,7 +4069,25 @@ public class ActiveMQServerImpl implements ActiveMQServer {
@Override
public List<ActiveMQComponent> getExternalComponents() {
return externalComponents;
synchronized (externalComponents) {
return new ArrayList<>(externalComponents);
}
}
private void stopExternalComponents(boolean shutdown) {
synchronized (externalComponents) {
for (ActiveMQComponent externalComponent : externalComponents) {
try {
if (externalComponent instanceof ServiceComponent) {
((ServiceComponent) externalComponent).stop(shutdown);
} else {
externalComponent.stop();
}
} catch (Exception e) {
ActiveMQServerLogger.LOGGER.errorStoppingComponent(e, externalComponent.getClass().getName());
}
}
}
}
}

View File

@ -26,8 +26,59 @@ import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.junit.Assert;
import org.junit.Test;
import static org.hamcrest.CoreMatchers.hasItem;
import static org.hamcrest.collection.IsEmptyCollection.empty;
public class ActiveMQServerImplTest extends ActiveMQTestBase {
@Test
public void testAddingAndStartingExternalComponent() throws Exception {
ActiveMQServer server = createServer(false);
server.start();
EmbeddedServerTest.FakeExternalComponent component = new EmbeddedServerTest.FakeExternalComponent();
server.addExternalComponent(component, true);
Assert.assertTrue(component.isStarted());
Assert.assertThat(server.getExternalComponents(), hasItem(component));
}
@Test
public void testAddingWithoutStartingExternalComponent() throws Exception {
ActiveMQServer server = createServer(false);
server.start();
EmbeddedServerTest.FakeExternalComponent component = new EmbeddedServerTest.FakeExternalComponent();
server.addExternalComponent(component, false);
Assert.assertFalse(component.isStarted());
Assert.assertThat(server.getExternalComponents(), hasItem(component));
}
@Test
public void testCannotAddExternalComponentsIfNotStarting() throws Exception {
ActiveMQServer server = createServer(false);
EmbeddedServerTest.FakeExternalComponent component = new EmbeddedServerTest.FakeExternalComponent();
try {
server.addExternalComponent(component, false);
Assert.fail();
} catch (IllegalStateException ex) {
Assert.assertFalse(component.isStarted());
Assert.assertThat(server.getExternalComponents(), empty());
}
}
@Test
public void testCannotAddExternalComponentsIfStopped() throws Exception {
ActiveMQServer server = createServer(false);
server.start();
server.stop();
EmbeddedServerTest.FakeExternalComponent component = new EmbeddedServerTest.FakeExternalComponent();
try {
server.addExternalComponent(component, false);
Assert.fail();
} catch (IllegalStateException ex) {
Assert.assertFalse(component.isStarted());
Assert.assertThat(server.getExternalComponents(), empty());
}
}
@Test
public void testScheduledPoolGC() throws Exception {
ActiveMQServer server = createServer(false);

View File

@ -78,8 +78,8 @@ public class EmbeddedServerTest {
FakeExternalComponent normalComponent = new FakeExternalComponent();
FakeExternalServiceComponent serviceComponent = new FakeExternalServiceComponent();
server.addExternalComponent(normalComponent);
server.addExternalComponent(serviceComponent);
server.addExternalComponent(normalComponent, false);
server.addExternalComponent(serviceComponent, false);
server.stop(false);
assertTrue(normalComponent.stopCalled);
@ -98,7 +98,7 @@ public class EmbeddedServerTest {
assertTrue(serviceComponent.exitCalled);
}
private class FakeExternalComponent implements ActiveMQComponent {
public static class FakeExternalComponent implements ActiveMQComponent {
volatile boolean startCalled;
volatile boolean stopCalled;
@ -124,7 +124,7 @@ public class EmbeddedServerTest {
}
}
private class FakeExternalServiceComponent extends FakeExternalComponent implements ServiceComponent {
public static class FakeExternalServiceComponent extends FakeExternalComponent implements ServiceComponent {
volatile boolean exitCalled;

View File

@ -176,7 +176,7 @@ public class ReplicatedFailoverTest extends FailoverTest {
backupServer.getServer().getNetworkHealthCheck().parseURIList("http://localhost:8787");
Assert.assertTrue(backupServer.getServer().getNetworkHealthCheck().isStarted());
backupServer.getServer().addExternalComponent(webServerComponent);
backupServer.getServer().addExternalComponent(webServerComponent, false);
// this is called when backup servers go from live back to backup
backupServer.getServer().fail(true);
Assert.assertTrue(backupServer.getServer().getNetworkHealthCheck().isStarted());