Merge PR for AMQ-6706

This closes #254
This commit is contained in:
Christopher L. Shannon (cshannon) 2017-06-20 10:54:59 -04:00
commit 3593afeebe
4 changed files with 238 additions and 1 deletions

View File

@ -268,6 +268,7 @@ public class BrokerService implements Service {
private boolean rollbackOnlyOnAsyncException = true;
private int storeOpenWireVersion = OpenWireFormat.DEFAULT_STORE_VERSION;
private List<Runnable> preShutdownHooks = new CopyOnWriteArrayList<>();
static {
@ -478,6 +479,16 @@ public class BrokerService implements Service {
return connector;
}
/**
* Adds a {@link Runnable} hook that will be invoked before the
* broker is stopped. This allows performing cleanup actions
* before the broker is stopped. The hook should not throw
* exceptions or block.
*/
public final void addPreShutdownHook(final Runnable hook) {
preShutdownHooks.add(hook);
}
public JmsConnector removeJmsConnector(JmsConnector connector) {
if (jmsConnectors.remove(connector)) {
return connector;
@ -788,6 +799,16 @@ public class BrokerService implements Service {
*/
@Override
public void stop() throws Exception {
final ServiceStopper stopper = new ServiceStopper();
for (Runnable hook : preShutdownHooks) {
try {
hook.run();
} catch (Throwable e) {
stopper.onException(hook, e);
}
}
if (!stopping.compareAndSet(false, true)) {
LOG.trace("Broker already stopping/stopped");
return;
@ -812,7 +833,6 @@ public class BrokerService implements Service {
this.scheduler.stop();
this.scheduler = null;
}
ServiceStopper stopper = new ServiceStopper();
if (services != null) {
for (Service service : services) {
stopper.stop(service);
@ -2846,6 +2866,10 @@ public class BrokerService implements Service {
this.regionBroker = regionBroker;
}
public final void removePreShutdownHook(final Runnable hook) {
preShutdownHooks.remove(hook);
}
public void addShutdownHook(Runnable hook) {
synchronized (shutdownHooks) {
shutdownHooks.add(hook);

View File

@ -0,0 +1,60 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.broker;
import org.junit.Test;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
public class BrokerServiceTest {
static class Hook implements Runnable {
boolean invoked = false;
@Override
public void run() {
invoked = true;
}
}
@Test
public void removedPreShutdownHooksShouldNotBeInvokedWhenStopping() throws Exception {
final BrokerService brokerService = new BrokerService();
final Hook hook = new Hook();
brokerService.addPreShutdownHook(hook);
brokerService.removePreShutdownHook(hook);
brokerService.stop();
assertFalse("Removed pre-shutdown hook should not have been invoked", hook.invoked);
}
@Test
public void shouldInvokePreShutdownHooksBeforeStopping() throws Exception {
final BrokerService brokerService = new BrokerService();
final Hook hook = new Hook();
brokerService.addPreShutdownHook(hook);
brokerService.stop();
assertTrue("Pre-shutdown hook should have been invoked", hook.invoked);
}
}

View File

@ -0,0 +1,73 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.camel;
import org.apache.activemq.broker.BrokerService;
import org.apache.camel.CamelContext;
import org.apache.camel.CamelContextAware;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
/**
* A shutdown hook that can be used to shutdown {@link CamelContext} before the
* ActiveMQ broker is shut down. This is sometimes important as if the broker is
* shutdown before Camel there could be a loss of data due to inflight exchanges
* not yet completed.
* <p>
* This hook can be added to ActiveMQ configuration ({@code activemq.xml}) as in
* the following example:
* <p>
* <code>
* &lt;bean xmlns=&quot;http://www.springframework.org/schema/beans&quot; class=&quot;org.apache.activemq.camel.CamelShutdownHook&quot; /&gt;
* </code>
*/
public final class CamelShutdownHook implements Runnable, CamelContextAware {
private static final Logger LOG = LoggerFactory.getLogger(CamelShutdownHook.class);
private CamelContext camelContext;
@Autowired
public CamelShutdownHook(final BrokerService brokerService) {
brokerService.addPreShutdownHook(this);
}
@Override
public CamelContext getCamelContext() {
return camelContext;
}
@Override
public void run() {
if (camelContext != null) {
try {
camelContext.stop();
} catch (final Exception e) {
LOG.warn("Unable to stop CamelContext", e);
}
} else {
LOG.warn("Unable to stop CamelContext, no CamelContext was set!");
}
}
@Override
public void setCamelContext(final CamelContext camelContext) {
this.camelContext = camelContext;
}
}

View File

@ -0,0 +1,80 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.camel;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.camel.component.ActiveMQComponent;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.ProducerTemplate;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.impl.DefaultCamelContext;
import org.junit.Test;
import static org.junit.Assert.assertTrue;
public class BrokerPreShutdownHookTest {
static class TestProcessor implements Processor {
boolean messageReceived;
@Override
public void process(final Exchange exchange) throws Exception {
messageReceived = true;
}
}
@Test
public void testShouldCleanlyShutdownCamelBeforeStoppingBroker() throws Exception {
final BrokerService broker = new BrokerService();
broker.setBrokerName("testBroker");
broker.setUseJmx(true);
broker.setPersistent(false);
broker.addConnector("vm://testBroker");
final DefaultCamelContext camel = new DefaultCamelContext();
camel.setName("test-camel");
final CamelShutdownHook hook = new CamelShutdownHook(broker);
hook.setCamelContext(camel);
broker.start();
camel.addComponent("testq", ActiveMQComponent.activeMQComponent("vm://testBroker?create=false"));
final TestProcessor processor = new TestProcessor();
camel.addRoutes(new RouteBuilder() {
@Override
public void configure() throws Exception {
from("testq:test.in").delay(200).process(processor);
}
});
camel.start();
final ProducerTemplate producer = camel.createProducerTemplate();
producer.sendBody("testq:test.in", "Hi!");
producer.stop();
broker.stop();
assertTrue("Message should be received", processor.messageReceived);
assertTrue("Camel context should be stopped", camel.isStopped());
assertTrue("Broker should be stopped", broker.isStopped());
}
}