diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java b/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java index de70d29346..97fb81494c 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java @@ -268,6 +268,7 @@ public class BrokerService implements Service { private boolean rollbackOnlyOnAsyncException = true; private int storeOpenWireVersion = OpenWireFormat.DEFAULT_STORE_VERSION; + private List preShutdownHooks = new CopyOnWriteArrayList<>(); static { @@ -478,6 +479,16 @@ public class BrokerService implements Service { return connector; } + /** + * Adds a {@link Runnable} hook that will be invoked before the + * broker is stopped. This allows performing cleanup actions + * before the broker is stopped. The hook should not throw + * exceptions or block. + */ + public final void addPreShutdownHook(final Runnable hook) { + preShutdownHooks.add(hook); + } + public JmsConnector removeJmsConnector(JmsConnector connector) { if (jmsConnectors.remove(connector)) { return connector; @@ -788,6 +799,16 @@ public class BrokerService implements Service { */ @Override public void stop() throws Exception { + final ServiceStopper stopper = new ServiceStopper(); + + for (Runnable hook : preShutdownHooks) { + try { + hook.run(); + } catch (Throwable e) { + stopper.onException(hook, e); + } + } + if (!stopping.compareAndSet(false, true)) { LOG.trace("Broker already stopping/stopped"); return; @@ -812,7 +833,6 @@ public class BrokerService implements Service { this.scheduler.stop(); this.scheduler = null; } - ServiceStopper stopper = new ServiceStopper(); if (services != null) { for (Service service : services) { stopper.stop(service); @@ -2846,6 +2866,10 @@ public class BrokerService implements Service { this.regionBroker = regionBroker; } + public final void removePreShutdownHook(final Runnable hook) { + preShutdownHooks.remove(hook); + } + public void addShutdownHook(Runnable hook) { synchronized (shutdownHooks) { shutdownHooks.add(hook); diff --git a/activemq-broker/src/test/java/org/apache/activemq/broker/BrokerServiceTest.java b/activemq-broker/src/test/java/org/apache/activemq/broker/BrokerServiceTest.java new file mode 100644 index 0000000000..e4fcb090f0 --- /dev/null +++ b/activemq-broker/src/test/java/org/apache/activemq/broker/BrokerServiceTest.java @@ -0,0 +1,60 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker; + +import org.junit.Test; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +public class BrokerServiceTest { + + static class Hook implements Runnable { + + boolean invoked = false; + + @Override + public void run() { + invoked = true; + } + } + + @Test + public void removedPreShutdownHooksShouldNotBeInvokedWhenStopping() throws Exception { + final BrokerService brokerService = new BrokerService(); + + final Hook hook = new Hook(); + brokerService.addPreShutdownHook(hook); + brokerService.removePreShutdownHook(hook); + + brokerService.stop(); + + assertFalse("Removed pre-shutdown hook should not have been invoked", hook.invoked); + } + + @Test + public void shouldInvokePreShutdownHooksBeforeStopping() throws Exception { + final BrokerService brokerService = new BrokerService(); + + final Hook hook = new Hook(); + brokerService.addPreShutdownHook(hook); + + brokerService.stop(); + + assertTrue("Pre-shutdown hook should have been invoked", hook.invoked); + } +} diff --git a/activemq-camel/src/main/java/org/apache/activemq/camel/CamelShutdownHook.java b/activemq-camel/src/main/java/org/apache/activemq/camel/CamelShutdownHook.java new file mode 100644 index 0000000000..217a1a63c9 --- /dev/null +++ b/activemq-camel/src/main/java/org/apache/activemq/camel/CamelShutdownHook.java @@ -0,0 +1,73 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.camel; + +import org.apache.activemq.broker.BrokerService; +import org.apache.camel.CamelContext; +import org.apache.camel.CamelContextAware; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; + +/** + * A shutdown hook that can be used to shutdown {@link CamelContext} before the + * ActiveMQ broker is shut down. This is sometimes important as if the broker is + * shutdown before Camel there could be a loss of data due to inflight exchanges + * not yet completed. + *

+ * This hook can be added to ActiveMQ configuration ({@code activemq.xml}) as in + * the following example: + *

+ * + * <bean xmlns="http://www.springframework.org/schema/beans" class="org.apache.activemq.camel.CamelShutdownHook" /> + * + */ +public final class CamelShutdownHook implements Runnable, CamelContextAware { + + private static final Logger LOG = LoggerFactory.getLogger(CamelShutdownHook.class); + + private CamelContext camelContext; + + @Autowired + public CamelShutdownHook(final BrokerService brokerService) { + brokerService.addPreShutdownHook(this); + } + + @Override + public CamelContext getCamelContext() { + return camelContext; + } + + @Override + public void run() { + if (camelContext != null) { + try { + camelContext.stop(); + } catch (final Exception e) { + LOG.warn("Unable to stop CamelContext", e); + } + } else { + LOG.warn("Unable to stop CamelContext, no CamelContext was set!"); + } + } + + @Override + public void setCamelContext(final CamelContext camelContext) { + this.camelContext = camelContext; + } + +} diff --git a/activemq-camel/src/test/java/org/apache/activemq/camel/BrokerPreShutdownHookTest.java b/activemq-camel/src/test/java/org/apache/activemq/camel/BrokerPreShutdownHookTest.java new file mode 100644 index 0000000000..4b6122df93 --- /dev/null +++ b/activemq-camel/src/test/java/org/apache/activemq/camel/BrokerPreShutdownHookTest.java @@ -0,0 +1,80 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.camel; + +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.camel.component.ActiveMQComponent; +import org.apache.camel.Exchange; +import org.apache.camel.Processor; +import org.apache.camel.ProducerTemplate; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.impl.DefaultCamelContext; +import org.junit.Test; + +import static org.junit.Assert.assertTrue; + +public class BrokerPreShutdownHookTest { + + static class TestProcessor implements Processor { + + boolean messageReceived; + + @Override + public void process(final Exchange exchange) throws Exception { + messageReceived = true; + } + } + + @Test + public void testShouldCleanlyShutdownCamelBeforeStoppingBroker() throws Exception { + final BrokerService broker = new BrokerService(); + broker.setBrokerName("testBroker"); + broker.setUseJmx(true); + broker.setPersistent(false); + broker.addConnector("vm://testBroker"); + + final DefaultCamelContext camel = new DefaultCamelContext(); + camel.setName("test-camel"); + + final CamelShutdownHook hook = new CamelShutdownHook(broker); + hook.setCamelContext(camel); + + broker.start(); + + camel.addComponent("testq", ActiveMQComponent.activeMQComponent("vm://testBroker?create=false")); + + final TestProcessor processor = new TestProcessor(); + camel.addRoutes(new RouteBuilder() { + @Override + public void configure() throws Exception { + from("testq:test.in").delay(200).process(processor); + } + }); + camel.start(); + + final ProducerTemplate producer = camel.createProducerTemplate(); + producer.sendBody("testq:test.in", "Hi!"); + producer.stop(); + + broker.stop(); + + assertTrue("Message should be received", processor.messageReceived); + assertTrue("Camel context should be stopped", camel.isStopped()); + assertTrue("Broker should be stopped", broker.isStopped()); + + } +}