mirror of https://github.com/apache/activemq.git
Allow the region broker implemenation to get changed by subclasses of BrokerService.
git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@636609 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
93ccb9ca3b
commit
1096564117
|
@ -126,7 +126,7 @@ public class BrokerService implements Service {
|
||||||
private SystemUsage consumerSystemUsaage;
|
private SystemUsage consumerSystemUsaage;
|
||||||
private PersistenceAdapter persistenceAdapter;
|
private PersistenceAdapter persistenceAdapter;
|
||||||
private PersistenceAdapterFactory persistenceFactory;
|
private PersistenceAdapterFactory persistenceFactory;
|
||||||
private DestinationFactory destinationFactory;
|
protected DestinationFactory destinationFactory;
|
||||||
private MessageAuthorizationPolicy messageAuthorizationPolicy;
|
private MessageAuthorizationPolicy messageAuthorizationPolicy;
|
||||||
private List<TransportConnector> transportConnectors = new CopyOnWriteArrayList<TransportConnector>();
|
private List<TransportConnector> transportConnectors = new CopyOnWriteArrayList<TransportConnector>();
|
||||||
private List<NetworkConnector> networkConnectors = new CopyOnWriteArrayList<NetworkConnector>();
|
private List<NetworkConnector> networkConnectors = new CopyOnWriteArrayList<NetworkConnector>();
|
||||||
|
@ -1370,11 +1370,11 @@ public class BrokerService implements Service {
|
||||||
}
|
}
|
||||||
|
|
||||||
protected PersistenceAdapter registerPersistenceAdapterMBean(PersistenceAdapter adaptor) throws IOException {
|
protected PersistenceAdapter registerPersistenceAdapterMBean(PersistenceAdapter adaptor) throws IOException {
|
||||||
MBeanServer mbeanServer = getManagementContext().getMBeanServer();
|
// MBeanServer mbeanServer = getManagementContext().getMBeanServer();
|
||||||
if (mbeanServer != null) {
|
// if (mbeanServer != null) {
|
||||||
|
//
|
||||||
|
//
|
||||||
}
|
// }
|
||||||
return adaptor;
|
return adaptor;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1487,17 +1487,16 @@ public class BrokerService implements Service {
|
||||||
// Add a filter that will stop access to the broker once stopped
|
// Add a filter that will stop access to the broker once stopped
|
||||||
broker = new MutableBrokerFilter(broker) {
|
broker = new MutableBrokerFilter(broker) {
|
||||||
public void stop() throws Exception {
|
public void stop() throws Exception {
|
||||||
setNext(new ErrorBroker("Broker has been stopped: " + this) {
|
Broker old = this.next.getAndSet(new ErrorBroker("Broker has been stopped: " + this) {
|
||||||
// Just ignore additional stop actions.
|
// Just ignore additional stop actions.
|
||||||
public void stop() throws Exception {
|
public void stop() throws Exception {
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
super.stop();
|
old.stop();
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
RegionBroker rBroker = (RegionBroker)regionBroker;
|
// RegionBroker rBroker = (RegionBroker)regionBroker;
|
||||||
rBroker.getDestinationStatistics().setEnabled(enableStatistics);
|
|
||||||
|
|
||||||
if (isUseJmx()) {
|
if (isUseJmx()) {
|
||||||
ManagedRegionBroker managedBroker = (ManagedRegionBroker)regionBroker;
|
ManagedRegionBroker managedBroker = (ManagedRegionBroker)regionBroker;
|
||||||
|
@ -1537,10 +1536,14 @@ public class BrokerService implements Service {
|
||||||
configureServices(destinationInterceptors);
|
configureServices(destinationInterceptors);
|
||||||
|
|
||||||
DestinationInterceptor destinationInterceptor = new CompositeDestinationInterceptor(destinationInterceptors);
|
DestinationInterceptor destinationInterceptor = new CompositeDestinationInterceptor(destinationInterceptors);
|
||||||
RegionBroker regionBroker = null;
|
|
||||||
if (destinationFactory == null) {
|
if (destinationFactory == null) {
|
||||||
destinationFactory = new DestinationFactoryImpl(this, getTaskRunnerFactory(), getPersistenceAdapter());
|
destinationFactory = new DestinationFactoryImpl(this, getTaskRunnerFactory(), getPersistenceAdapter());
|
||||||
}
|
}
|
||||||
|
return createRegionBroker(destinationInterceptor);
|
||||||
|
}
|
||||||
|
|
||||||
|
protected Broker createRegionBroker(DestinationInterceptor destinationInterceptor) throws IOException {
|
||||||
|
RegionBroker regionBroker;
|
||||||
if (isUseJmx()) {
|
if (isUseJmx()) {
|
||||||
MBeanServer mbeanServer = getManagementContext().getMBeanServer();
|
MBeanServer mbeanServer = getManagementContext().getMBeanServer();
|
||||||
regionBroker = new ManagedRegionBroker(this, mbeanServer, getBrokerObjectName(), getTaskRunnerFactory(), getConsumerSystemUsage(), destinationFactory,
|
regionBroker = new ManagedRegionBroker(this, mbeanServer, getBrokerObjectName(), getTaskRunnerFactory(), getConsumerSystemUsage(), destinationFactory,
|
||||||
|
@ -1552,8 +1555,10 @@ public class BrokerService implements Service {
|
||||||
|
|
||||||
regionBroker.setKeepDurableSubsActive(keepDurableSubsActive);
|
regionBroker.setKeepDurableSubsActive(keepDurableSubsActive);
|
||||||
regionBroker.setBrokerName(getBrokerName());
|
regionBroker.setBrokerName(getBrokerName());
|
||||||
return regionBroker;
|
regionBroker.getDestinationStatistics().setEnabled(enableStatistics);
|
||||||
}
|
|
||||||
|
return regionBroker;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Create the default destination interceptor
|
* Create the default destination interceptor
|
||||||
|
|
|
@ -51,7 +51,7 @@ import org.apache.activemq.kaha.Store;
|
||||||
*/
|
*/
|
||||||
public class MutableBrokerFilter implements Broker {
|
public class MutableBrokerFilter implements Broker {
|
||||||
|
|
||||||
private AtomicReference<Broker> next = new AtomicReference<Broker>();
|
protected AtomicReference<Broker> next = new AtomicReference<Broker>();
|
||||||
|
|
||||||
public MutableBrokerFilter(Broker next) {
|
public MutableBrokerFilter(Broker next) {
|
||||||
this.next.set(next);
|
this.next.set(next);
|
||||||
|
|
|
@ -28,7 +28,7 @@ import org.apache.activemq.broker.TransportConnector;
|
||||||
|
|
||||||
public abstract class TransportBrokerTestSupport extends BrokerTest {
|
public abstract class TransportBrokerTestSupport extends BrokerTest {
|
||||||
|
|
||||||
private TransportConnector connector;
|
protected TransportConnector connector;
|
||||||
private ArrayList<StubConnection> connections = new ArrayList<StubConnection>();
|
private ArrayList<StubConnection> connections = new ArrayList<StubConnection>();
|
||||||
|
|
||||||
protected void setUp() throws Exception {
|
protected void setUp() throws Exception {
|
||||||
|
@ -49,7 +49,9 @@ public abstract class TransportBrokerTestSupport extends BrokerTest {
|
||||||
connection.stop();
|
connection.stop();
|
||||||
iter.remove();
|
iter.remove();
|
||||||
}
|
}
|
||||||
connector.stop();
|
if( connector!=null ) {
|
||||||
|
connector.stop();
|
||||||
|
}
|
||||||
super.tearDown();
|
super.tearDown();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue