This commit is contained in:
Clebert Suconic 2020-09-24 10:26:50 -04:00
commit 1ded600bac
6 changed files with 135 additions and 34 deletions

View File

@ -88,7 +88,7 @@ public class Run extends LockAbstract {
AtomicBoolean serverActivationFailed = new AtomicBoolean(false);
server.getServer().registerActivationFailureListener(exception -> serverActivationFailed.set(true));
server.start();
server.getServer().addExternalComponent(managementContext);
server.getServer().addExternalComponent(managementContext, false);
if (broker.web != null) {
broker.components.add(broker.web);
@ -98,8 +98,8 @@ public class Run extends LockAbstract {
Class clazz = this.getClass().getClassLoader().loadClass(componentDTO.componentClassName);
ExternalComponent component = (ExternalComponent) clazz.newInstance();
component.configure(componentDTO, getBrokerInstance(), getBrokerHome());
server.getServer().addExternalComponent(component);
component.start();
server.getServer().addExternalComponent(component, true);
assert component.isStarted();
}
if (serverActivationFailed.get()) {

View File

@ -833,7 +833,16 @@ public interface ActiveMQServer extends ServiceComponent {
void setMBeanServer(MBeanServer mBeanServer);
void addExternalComponent(ActiveMQComponent externalComponent);
/**
* Adding external components is allowed only if the state
* isn't {@link SERVER_STATE#STOPPED} or {@link SERVER_STATE#STOPPING}.<br>
* It atomically starts the {@code externalComponent} while being added if {@code start == true}.<br>
* This atomicity is necessary to prevent {@link #stop()} to stop the component right after adding it, but before
* starting it.
*
* @throw IllegalStateException if the state is {@link SERVER_STATE#STOPPED} or {@link SERVER_STATE#STOPPING}
*/
void addExternalComponent(ActiveMQComponent externalComponent, boolean start) throws Exception;
List<ActiveMQComponent> getExternalComponents();

View File

@ -837,9 +837,23 @@ public class ActiveMQServerImpl implements ActiveMQServer {
this.mbeanServer = mbeanServer;
}
private void validateAddExternalComponent(ActiveMQComponent externalComponent) {
final SERVER_STATE state = this.state;
if (state == SERVER_STATE.STOPPED || state == SERVER_STATE.STOPPING) {
throw new IllegalStateException("cannot add " + externalComponent.getClass().getSimpleName() +
" if state is " + state);
}
}
@Override
public void addExternalComponent(ActiveMQComponent externalComponent) {
externalComponents.add(externalComponent);
public void addExternalComponent(ActiveMQComponent externalComponent, boolean start) throws Exception {
synchronized (externalComponents) {
validateAddExternalComponent(externalComponent);
externalComponents.add(externalComponent);
if (start) {
externalComponent.start();
}
}
}
@Override
@ -1075,17 +1089,24 @@ public class ActiveMQServerImpl implements ActiveMQServer {
state = SERVER_STATE.STOPPING;
if (criticalIOError) {
// notifications trigger disk IO so we don't want to send any on a critical IO error
managementService.enableNotifications(false);
final ManagementService managementService = this.managementService;
if (managementService != null) {
// notifications trigger disk IO so we don't want to send any on a critical IO error
managementService.enableNotifications(false);
}
}
final FileStoreMonitor fileStoreMonitor = this.fileStoreMonitor;
if (fileStoreMonitor != null) {
fileStoreMonitor.stop();
fileStoreMonitor = null;
this.fileStoreMonitor = null;
}
if (failoverOnServerShutdown) {
activation.sendLiveIsStopping();
final Activation activation = this.activation;
if (activation != null) {
activation.sendLiveIsStopping();
}
}
stopComponent(connectorsService);
@ -1099,6 +1120,7 @@ public class ActiveMQServerImpl implements ActiveMQServer {
stopComponent(federationManager);
stopComponent(clusterManager);
final RemotingService remotingService = this.remotingService;
if (remotingService != null) {
remotingService.pauseAcceptors();
}
@ -1120,7 +1142,10 @@ public class ActiveMQServerImpl implements ActiveMQServer {
freezeConnections();
}
activation.postConnectionFreeze();
final Activation activation = this.activation;
if (activation != null) {
activation.postConnectionFreeze();
}
closeAllServerSessions(criticalIOError);
@ -1133,6 +1158,7 @@ public class ActiveMQServerImpl implements ActiveMQServer {
//
// *************************************************************************************************************
final StorageManager storageManager = this.storageManager;
if (storageManager != null)
storageManager.clearContext();
@ -1141,10 +1167,12 @@ public class ActiveMQServerImpl implements ActiveMQServer {
stopComponent(backupManager);
try {
activation.preStorageClose();
} catch (Throwable t) {
ActiveMQServerLogger.LOGGER.errorStoppingComponent(t, activation.getClass().getName());
if (activation != null) {
try {
activation.preStorageClose();
} catch (Throwable t) {
ActiveMQServerLogger.LOGGER.errorStoppingComponent(t, activation.getClass().getName());
}
}
stopComponent(pagingManager);
@ -1158,6 +1186,7 @@ public class ActiveMQServerImpl implements ActiveMQServer {
// We stop remotingService before otherwise we may lock the system in case of a critical IO
// error shutdown
final RemotingService remotingService = this.remotingService;
if (remotingService != null)
try {
remotingService.stop(criticalIOError);
@ -1166,6 +1195,7 @@ public class ActiveMQServerImpl implements ActiveMQServer {
}
// Stop the management service after the remoting service to ensure all acceptors are deregistered with JMX
final ManagementService managementService = this.managementService;
if (managementService != null)
try {
managementService.unregisterServer();
@ -1218,7 +1248,7 @@ public class ActiveMQServerImpl implements ActiveMQServer {
messagingServerControl = null;
memoryManager = null;
backupManager = null;
storageManager = null;
this.storageManager = null;
sessions.clear();
@ -1261,17 +1291,7 @@ public class ActiveMQServerImpl implements ActiveMQServer {
connectedClientIds.clear();
for (ActiveMQComponent externalComponent : externalComponents) {
try {
if (externalComponent instanceof ServiceComponent) {
((ServiceComponent)externalComponent).stop(isShutdown || criticalIOError);
} else {
externalComponent.stop();
}
} catch (Exception e) {
ActiveMQServerLogger.LOGGER.errorStoppingComponent(e, externalComponent.getClass().getName());
}
}
stopExternalComponents(isShutdown || criticalIOError);
try {
this.analyzer.clear();
@ -1326,7 +1346,10 @@ public class ActiveMQServerImpl implements ActiveMQServer {
* {@link #stop(boolean, boolean, boolean)}.
*/
private void freezeConnections() {
activation.freezeConnections(remotingService);
Activation activation = this.activation;
if (activation != null) {
activation.freezeConnections(remotingService);
}
// after disconnecting all the clients close all the server sessions so any messages in delivery will be cancelled back to the queue
for (ServerSession serverSession : sessions.values()) {
@ -4065,7 +4088,25 @@ public class ActiveMQServerImpl implements ActiveMQServer {
@Override
public List<ActiveMQComponent> getExternalComponents() {
return externalComponents;
synchronized (externalComponents) {
return new ArrayList<>(externalComponents);
}
}
private void stopExternalComponents(boolean shutdown) {
synchronized (externalComponents) {
for (ActiveMQComponent externalComponent : externalComponents) {
try {
if (externalComponent instanceof ServiceComponent) {
((ServiceComponent) externalComponent).stop(shutdown);
} else {
externalComponent.stop();
}
} catch (Exception e) {
ActiveMQServerLogger.LOGGER.errorStoppingComponent(e, externalComponent.getClass().getName());
}
}
}
}
}

View File

@ -26,8 +26,59 @@ import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.junit.Assert;
import org.junit.Test;
import static org.hamcrest.CoreMatchers.hasItem;
import static org.hamcrest.collection.IsEmptyCollection.empty;
public class ActiveMQServerImplTest extends ActiveMQTestBase {
@Test
public void testAddingAndStartingExternalComponent() throws Exception {
ActiveMQServer server = createServer(false);
server.start();
EmbeddedServerTest.FakeExternalComponent component = new EmbeddedServerTest.FakeExternalComponent();
server.addExternalComponent(component, true);
Assert.assertTrue(component.isStarted());
Assert.assertThat(server.getExternalComponents(), hasItem(component));
}
@Test
public void testAddingWithoutStartingExternalComponent() throws Exception {
ActiveMQServer server = createServer(false);
server.start();
EmbeddedServerTest.FakeExternalComponent component = new EmbeddedServerTest.FakeExternalComponent();
server.addExternalComponent(component, false);
Assert.assertFalse(component.isStarted());
Assert.assertThat(server.getExternalComponents(), hasItem(component));
}
@Test
public void testCannotAddExternalComponentsIfNotStarting() throws Exception {
ActiveMQServer server = createServer(false);
EmbeddedServerTest.FakeExternalComponent component = new EmbeddedServerTest.FakeExternalComponent();
try {
server.addExternalComponent(component, false);
Assert.fail();
} catch (IllegalStateException ex) {
Assert.assertFalse(component.isStarted());
Assert.assertThat(server.getExternalComponents(), empty());
}
}
@Test
public void testCannotAddExternalComponentsIfStopped() throws Exception {
ActiveMQServer server = createServer(false);
server.start();
server.stop();
EmbeddedServerTest.FakeExternalComponent component = new EmbeddedServerTest.FakeExternalComponent();
try {
server.addExternalComponent(component, false);
Assert.fail();
} catch (IllegalStateException ex) {
Assert.assertFalse(component.isStarted());
Assert.assertThat(server.getExternalComponents(), empty());
}
}
@Test
public void testScheduledPoolGC() throws Exception {
ActiveMQServer server = createServer(false);

View File

@ -78,8 +78,8 @@ public class EmbeddedServerTest {
FakeExternalComponent normalComponent = new FakeExternalComponent();
FakeExternalServiceComponent serviceComponent = new FakeExternalServiceComponent();
server.addExternalComponent(normalComponent);
server.addExternalComponent(serviceComponent);
server.addExternalComponent(normalComponent, false);
server.addExternalComponent(serviceComponent, false);
server.stop(false);
assertTrue(normalComponent.stopCalled);
@ -98,7 +98,7 @@ public class EmbeddedServerTest {
assertTrue(serviceComponent.exitCalled);
}
private class FakeExternalComponent implements ActiveMQComponent {
public static class FakeExternalComponent implements ActiveMQComponent {
volatile boolean startCalled;
volatile boolean stopCalled;
@ -124,7 +124,7 @@ public class EmbeddedServerTest {
}
}
private class FakeExternalServiceComponent extends FakeExternalComponent implements ServiceComponent {
public static class FakeExternalServiceComponent extends FakeExternalComponent implements ServiceComponent {
volatile boolean exitCalled;

View File

@ -176,7 +176,7 @@ public class ReplicatedFailoverTest extends FailoverTest {
backupServer.getServer().getNetworkHealthCheck().parseURIList("http://localhost:8787");
Assert.assertTrue(backupServer.getServer().getNetworkHealthCheck().isStarted());
backupServer.getServer().addExternalComponent(webServerComponent);
backupServer.getServer().addExternalComponent(webServerComponent, false);
// this is called when backup servers go from live back to backup
backupServer.getServer().fail(true);
Assert.assertTrue(backupServer.getServer().getNetworkHealthCheck().isStarted());