mirror of https://github.com/apache/activemq.git
fix for https://issues.apache.org/activemq/browse/AMQ-2245 - broker restart
git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@773569 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
206df1bd57
commit
62e1abfb90
|
@ -31,12 +31,13 @@ import java.util.concurrent.CopyOnWriteArrayList;
|
|||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
import javax.management.MBeanServer;
|
||||
import javax.management.MalformedObjectNameException;
|
||||
import javax.management.ObjectName;
|
||||
|
||||
import org.apache.activemq.ActiveMQConnectionMetaData;
|
||||
import org.apache.activemq.Service;
|
||||
import org.apache.activemq.selector.SelectorParser;
|
||||
import org.apache.activemq.advisory.AdvisoryBroker;
|
||||
import org.apache.activemq.broker.cluster.ConnectionSplitBroker;
|
||||
import org.apache.activemq.broker.ft.MasterConnector;
|
||||
|
@ -72,6 +73,7 @@ import org.apache.activemq.network.jms.JmsConnector;
|
|||
import org.apache.activemq.proxy.ProxyConnector;
|
||||
import org.apache.activemq.security.MessageAuthorizationPolicy;
|
||||
import org.apache.activemq.security.SecurityContext;
|
||||
import org.apache.activemq.selector.SelectorParser;
|
||||
import org.apache.activemq.store.PersistenceAdapter;
|
||||
import org.apache.activemq.store.PersistenceAdapterFactory;
|
||||
import org.apache.activemq.store.amq.AMQPersistenceAdapterFactory;
|
||||
|
@ -178,6 +180,8 @@ public class BrokerService implements Service {
|
|||
private int systemExitOnShutdownExitCode;
|
||||
private SslContext sslContext;
|
||||
|
||||
private boolean forceStart = false;
|
||||
|
||||
static {
|
||||
String localHostName = "localhost";
|
||||
try {
|
||||
|
@ -418,6 +422,11 @@ public class BrokerService implements Service {
|
|||
return started.get();
|
||||
}
|
||||
|
||||
public void start(boolean force) throws Exception {
|
||||
forceStart = force;
|
||||
start();
|
||||
}
|
||||
|
||||
// Service interface
|
||||
// -------------------------------------------------------------------------
|
||||
public void start() throws Exception {
|
||||
|
@ -456,12 +465,22 @@ public class BrokerService implements Service {
|
|||
startDestinations();
|
||||
|
||||
addShutdownHook();
|
||||
|
||||
if (isUseJmx()) {
|
||||
getManagementContext().start();
|
||||
}
|
||||
|
||||
getBroker().start();
|
||||
|
||||
if (isUseJmx()) {
|
||||
getManagementContext().start();
|
||||
ManagedRegionBroker managedBroker = (ManagedRegionBroker)regionBroker;
|
||||
managedBroker.setContextBroker(broker);
|
||||
adminView = new BrokerView(this, managedBroker);
|
||||
MBeanServer mbeanServer = getManagementContext().getMBeanServer();
|
||||
if (mbeanServer != null) {
|
||||
ObjectName objectName = getBrokerObjectName();
|
||||
mbeanServer.registerMBean(adminView, objectName);
|
||||
registeredMBeanNames.add(objectName);
|
||||
}
|
||||
}
|
||||
|
||||
BrokerRegistry.getInstance().bind(getBrokerName(), this);
|
||||
|
||||
// see if there is a MasterBroker service and if so, configure
|
||||
|
@ -532,6 +551,7 @@ public class BrokerService implements Service {
|
|||
}
|
||||
}
|
||||
}
|
||||
registeredMBeanNames.clear();
|
||||
stopper.stop(getManagementContext());
|
||||
}
|
||||
// Clear SelectorParser cache to free memory
|
||||
|
@ -1585,30 +1605,27 @@ public class BrokerService implements Service {
|
|||
|
||||
// Add a filter that will stop access to the broker once stopped
|
||||
broker = new MutableBrokerFilter(broker) {
|
||||
public void stop() throws Exception {
|
||||
Broker old = this.next.getAndSet(new ErrorBroker("Broker has been stopped: " + this) {
|
||||
Broker old;
|
||||
|
||||
public void stop() throws Exception {
|
||||
old = this.next.getAndSet(new ErrorBroker("Broker has been stopped: " + this) {
|
||||
// Just ignore additional stop actions.
|
||||
public void stop() throws Exception {
|
||||
}
|
||||
|
||||
});
|
||||
old.stop();
|
||||
}
|
||||
|
||||
public void start() throws Exception {
|
||||
if (forceStart && old != null) {
|
||||
this.next.set(old);
|
||||
}
|
||||
getNext().start();
|
||||
}
|
||||
|
||||
};
|
||||
|
||||
// RegionBroker rBroker = (RegionBroker)regionBroker;
|
||||
|
||||
if (isUseJmx()) {
|
||||
ManagedRegionBroker managedBroker = (ManagedRegionBroker)regionBroker;
|
||||
managedBroker.setContextBroker(broker);
|
||||
adminView = new BrokerView(this, managedBroker);
|
||||
MBeanServer mbeanServer = getManagementContext().getMBeanServer();
|
||||
if (mbeanServer != null) {
|
||||
ObjectName objectName = getBrokerObjectName();
|
||||
mbeanServer.registerMBean(adminView, objectName);
|
||||
registeredMBeanNames.add(objectName);
|
||||
}
|
||||
}
|
||||
|
||||
return broker;
|
||||
|
||||
}
|
||||
|
|
|
@ -20,6 +20,7 @@ import java.io.IOException;
|
|||
import java.lang.reflect.Method;
|
||||
import java.net.MalformedURLException;
|
||||
import java.rmi.registry.LocateRegistry;
|
||||
import java.rmi.registry.Registry;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
|
@ -62,6 +63,7 @@ public class ManagementContext implements Service {
|
|||
private AtomicBoolean started = new AtomicBoolean(false);
|
||||
private JMXConnectorServer connectorServer;
|
||||
private ObjectName namingServiceObjectName;
|
||||
private Registry registry;
|
||||
|
||||
public ManagementContext() {
|
||||
this(null);
|
||||
|
@ -121,6 +123,7 @@ public class ManagementContext implements Service {
|
|||
MBeanServerFactory.releaseMBeanServer(beanServer);
|
||||
}
|
||||
}
|
||||
beanServer = null;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -361,7 +364,9 @@ public class ManagementContext implements Service {
|
|||
private void createConnector(MBeanServer mbeanServer) throws MalformedObjectNameException, MalformedURLException, IOException {
|
||||
// Create the NamingService, needed by JSR 160
|
||||
try {
|
||||
LocateRegistry.createRegistry(connectorPort);
|
||||
if (registry == null) {
|
||||
registry = LocateRegistry.createRegistry(connectorPort);
|
||||
}
|
||||
namingServiceObjectName = ObjectName.getInstance("naming:type=rmiregistry");
|
||||
// Do not use the createMBean as the mx4j jar may not be in the
|
||||
// same class loader than the server
|
||||
|
|
|
@ -184,6 +184,11 @@ public class RegionBroker extends EmptyBroker {
|
|||
ServiceStopper ss = new ServiceStopper();
|
||||
doStop(ss);
|
||||
ss.throwFirstException();
|
||||
// clear the state
|
||||
clientIdSet.clear();
|
||||
connections.clear();
|
||||
destinations.clear();
|
||||
brokerInfos.clear();
|
||||
}
|
||||
|
||||
public PolicyMap getDestinationPolicy() {
|
||||
|
|
|
@ -90,7 +90,7 @@ public class AsyncDataManager {
|
|||
protected int preferedFileLength = DEFAULT_MAX_FILE_LENGTH - PREFERED_DIFF;
|
||||
|
||||
protected DataFileAppender appender;
|
||||
protected DataFileAccessorPool accessorPool = new DataFileAccessorPool(this);
|
||||
protected DataFileAccessorPool accessorPool;
|
||||
|
||||
protected Map<Integer, DataFile> fileMap = new HashMap<Integer, DataFile>();
|
||||
protected Map<File, DataFile> fileByFileMap = new LinkedHashMap<File, DataFile>();
|
||||
|
@ -120,6 +120,7 @@ public class AsyncDataManager {
|
|||
preferedFileLength=Math.max(PREFERED_DIFF, getMaxFileLength()-PREFERED_DIFF);
|
||||
lock();
|
||||
|
||||
accessorPool = new DataFileAccessorPool(this);
|
||||
ByteSequence sequence = controlFile.load();
|
||||
if (sequence != null && sequence.getLength() > 0) {
|
||||
unmarshallState(sequence);
|
||||
|
@ -197,7 +198,7 @@ public class AsyncDataManager {
|
|||
|
||||
public void lock() throws IOException {
|
||||
synchronized (this) {
|
||||
if (controlFile == null) {
|
||||
if (controlFile == null || controlFile.isDisposed()) {
|
||||
IOHelper.mkdirs(directory);
|
||||
controlFile = new ControlFile(new File(directory, filePrefix + "control"), CONTROL_RECORD_MAX_LENGTH);
|
||||
}
|
||||
|
|
|
@ -179,4 +179,8 @@ public final class ControlFile {
|
|||
}
|
||||
}
|
||||
|
||||
public boolean isDisposed() {
|
||||
return disposed;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -203,7 +203,7 @@ public class AMQPersistenceAdapter implements PersistenceAdapter, UsageListener,
|
|||
asyncDataManager.lock();
|
||||
break;
|
||||
} catch (IOException e) {
|
||||
LOG.info("Journal is locked... waiting " + (JOURNAL_LOCKED_WAIT_DELAY / 1000) + " seconds for the journal to be unlocked.");
|
||||
LOG.info("Journal is locked... waiting " + (JOURNAL_LOCKED_WAIT_DELAY / 1000) + " seconds for the journal to be unlocked.", e);
|
||||
try {
|
||||
Thread.sleep(JOURNAL_LOCKED_WAIT_DELAY);
|
||||
} catch (InterruptedException e1) {
|
||||
|
@ -325,6 +325,7 @@ public class AMQPersistenceAdapter implements PersistenceAdapter, UsageListener,
|
|||
topics.clear();
|
||||
IOException firstException = null;
|
||||
referenceStoreAdapter.stop();
|
||||
referenceStoreAdapter = null;
|
||||
try {
|
||||
LOG.debug("Journal close");
|
||||
asyncDataManager.close();
|
||||
|
|
|
@ -19,8 +19,16 @@ package org.apache.activemq.xbean;
|
|||
import java.net.URI;
|
||||
import java.util.List;
|
||||
|
||||
import javax.jms.Connection;
|
||||
import javax.jms.Destination;
|
||||
import javax.jms.MessageConsumer;
|
||||
import javax.jms.MessageProducer;
|
||||
import javax.jms.Session;
|
||||
import javax.jms.TextMessage;
|
||||
|
||||
import junit.framework.TestCase;
|
||||
|
||||
import org.apache.activemq.ActiveMQConnectionFactory;
|
||||
import org.apache.activemq.broker.BrokerFactory;
|
||||
import org.apache.activemq.broker.BrokerService;
|
||||
import org.apache.activemq.broker.TransportConnector;
|
||||
|
@ -52,6 +60,39 @@ public class ConnectorXBeanConfigTest extends TestCase {
|
|||
assertEquals(new ActiveMQTopic("include.test.bar"), dynamicallyIncludedDestinations.get(1));
|
||||
|
||||
}
|
||||
|
||||
public void testBrokerRestartFails() throws Exception {
|
||||
brokerService.stop();
|
||||
brokerService.waitUntilStopped();
|
||||
|
||||
try {
|
||||
brokerService.start();
|
||||
} catch (Exception e) {
|
||||
return;
|
||||
}
|
||||
fail("Error broker should have prevented us from starting it again");
|
||||
}
|
||||
|
||||
public void testForceBrokerRestart() throws Exception {
|
||||
brokerService.stop();
|
||||
brokerService.waitUntilStopped();
|
||||
|
||||
brokerService.start(true); // force restart
|
||||
brokerService.waitUntilStarted();
|
||||
|
||||
//send and receive a message from a restarted broker
|
||||
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61636");
|
||||
Connection conn = factory.createConnection();
|
||||
Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
conn.start();
|
||||
Destination dest = new ActiveMQQueue("test");
|
||||
MessageProducer producer = sess.createProducer(dest);
|
||||
MessageConsumer consumer = sess.createConsumer(dest);
|
||||
producer.send(sess.createTextMessage("test"));
|
||||
TextMessage msg = (TextMessage)consumer.receive(1000);
|
||||
assertEquals("test", msg.getText());
|
||||
}
|
||||
|
||||
|
||||
protected void setUp() throws Exception {
|
||||
brokerService = createBroker();
|
||||
|
|
Loading…
Reference in New Issue