mirror of https://github.com/apache/activemq.git
PolicyMap not initialized by XBean/Spring when Broker constructed for Transports.
Chanhed so PolicyMap is lazily fetched from the BrokerService by the RegionBroker git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@383486 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
6892f8ca04
commit
6cb66e4865
|
@ -893,11 +893,10 @@ public class BrokerService implements Service {
|
|||
if (isUseJmx()) {
|
||||
MBeanServer mbeanServer = getManagementContext().getMBeanServer();
|
||||
regionBroker = new ManagedRegionBroker(this,mbeanServer, getBrokerObjectName(),
|
||||
getTaskRunnerFactory(), getMemoryManager(), getPersistenceAdapter(), getDestinationPolicy());
|
||||
getTaskRunnerFactory(), getMemoryManager(), getPersistenceAdapter());
|
||||
}
|
||||
else {
|
||||
regionBroker = new RegionBroker(this,getTaskRunnerFactory(), getMemoryManager(), getPersistenceAdapter(),
|
||||
getDestinationPolicy());
|
||||
regionBroker = new RegionBroker(this,getTaskRunnerFactory(), getMemoryManager(), getPersistenceAdapter());
|
||||
}
|
||||
regionBroker.setBrokerName(getBrokerName());
|
||||
return regionBroker;
|
||||
|
|
|
@ -34,8 +34,8 @@ public class ManagedQueueRegion extends QueueRegion {
|
|||
|
||||
private final ManagedRegionBroker regionBroker;
|
||||
|
||||
public ManagedQueueRegion(ManagedRegionBroker broker, DestinationStatistics destinationStatistics, UsageManager memoryManager, TaskRunnerFactory taskRunnerFactory, PersistenceAdapter persistenceAdapter, PolicyMap policyMap) {
|
||||
super(broker,destinationStatistics, memoryManager, taskRunnerFactory, persistenceAdapter, policyMap);
|
||||
public ManagedQueueRegion(ManagedRegionBroker broker, DestinationStatistics destinationStatistics, UsageManager memoryManager, TaskRunnerFactory taskRunnerFactory, PersistenceAdapter persistenceAdapter) {
|
||||
super(broker,destinationStatistics, memoryManager, taskRunnerFactory, persistenceAdapter);
|
||||
regionBroker = broker;
|
||||
}
|
||||
|
||||
|
|
|
@ -83,9 +83,8 @@ public class ManagedRegionBroker extends RegionBroker{
|
|||
private Broker contextBroker;
|
||||
|
||||
public ManagedRegionBroker(BrokerService brokerService,MBeanServer mbeanServer,ObjectName brokerObjectName,
|
||||
TaskRunnerFactory taskRunnerFactory,UsageManager memoryManager,PersistenceAdapter adapter,
|
||||
PolicyMap policyMap) throws IOException{
|
||||
super(brokerService,taskRunnerFactory,memoryManager,adapter,policyMap);
|
||||
TaskRunnerFactory taskRunnerFactory,UsageManager memoryManager,PersistenceAdapter adapter) throws IOException{
|
||||
super(brokerService,taskRunnerFactory,memoryManager,adapter);
|
||||
this.mbeanServer=mbeanServer;
|
||||
this.brokerObjectName=brokerObjectName;
|
||||
}
|
||||
|
@ -98,8 +97,8 @@ public class ManagedRegionBroker extends RegionBroker{
|
|||
}
|
||||
|
||||
protected Region createQueueRegion(UsageManager memoryManager,TaskRunnerFactory taskRunnerFactory,
|
||||
PersistenceAdapter adapter,PolicyMap policyMap){
|
||||
return new ManagedQueueRegion(this,destinationStatistics,memoryManager,taskRunnerFactory,adapter,policyMap);
|
||||
PersistenceAdapter adapter){
|
||||
return new ManagedQueueRegion(this,destinationStatistics,memoryManager,taskRunnerFactory,adapter);
|
||||
}
|
||||
|
||||
protected Region createTempQueueRegion(UsageManager memoryManager,TaskRunnerFactory taskRunnerFactory){
|
||||
|
@ -111,8 +110,8 @@ public class ManagedRegionBroker extends RegionBroker{
|
|||
}
|
||||
|
||||
protected Region createTopicRegion(UsageManager memoryManager,TaskRunnerFactory taskRunnerFactory,
|
||||
PersistenceAdapter adapter,PolicyMap policyMap){
|
||||
return new ManagedTopicRegion(this,destinationStatistics,memoryManager,taskRunnerFactory,adapter,policyMap);
|
||||
PersistenceAdapter adapter){
|
||||
return new ManagedTopicRegion(this,destinationStatistics,memoryManager,taskRunnerFactory,adapter);
|
||||
}
|
||||
|
||||
public void register(ActiveMQDestination destName,Destination destination){
|
||||
|
|
|
@ -34,8 +34,8 @@ public class ManagedTopicRegion extends TopicRegion {
|
|||
|
||||
private final ManagedRegionBroker regionBroker;
|
||||
|
||||
public ManagedTopicRegion(ManagedRegionBroker broker, DestinationStatistics destinationStatistics, UsageManager memoryManager, TaskRunnerFactory taskRunnerFactory, PersistenceAdapter persistenceAdapter, PolicyMap policyMap) {
|
||||
super(broker,destinationStatistics, memoryManager, taskRunnerFactory, persistenceAdapter, policyMap);
|
||||
public ManagedTopicRegion(ManagedRegionBroker broker, DestinationStatistics destinationStatistics, UsageManager memoryManager, TaskRunnerFactory taskRunnerFactory, PersistenceAdapter persistenceAdapter) {
|
||||
super(broker,destinationStatistics, memoryManager, taskRunnerFactory, persistenceAdapter);
|
||||
regionBroker = broker;
|
||||
}
|
||||
|
||||
|
|
|
@ -52,12 +52,12 @@ abstract public class AbstractRegion implements Region {
|
|||
protected final UsageManager memoryManager;
|
||||
protected final PersistenceAdapter persistenceAdapter;
|
||||
protected final DestinationStatistics destinationStatistics;
|
||||
protected final Broker broker;
|
||||
protected final RegionBroker broker;
|
||||
protected boolean autoCreateDestinations=true;
|
||||
protected final TaskRunnerFactory taskRunnerFactory;
|
||||
protected final Object destinationsMutex = new Object();
|
||||
|
||||
public AbstractRegion(Broker broker,DestinationStatistics destinationStatistics, UsageManager memoryManager, TaskRunnerFactory taskRunnerFactory, PersistenceAdapter persistenceAdapter) {
|
||||
public AbstractRegion(RegionBroker broker,DestinationStatistics destinationStatistics, UsageManager memoryManager, TaskRunnerFactory taskRunnerFactory, PersistenceAdapter persistenceAdapter) {
|
||||
this.broker = broker;
|
||||
this.destinationStatistics = destinationStatistics;
|
||||
this.memoryManager = memoryManager;
|
||||
|
|
|
@ -39,12 +39,11 @@ import java.util.Set;
|
|||
*/
|
||||
public class QueueRegion extends AbstractRegion {
|
||||
|
||||
private final PolicyMap policyMap;
|
||||
|
||||
|
||||
public QueueRegion(Broker broker,DestinationStatistics destinationStatistics, UsageManager memoryManager, TaskRunnerFactory taskRunnerFactory,
|
||||
PersistenceAdapter persistenceAdapter, PolicyMap policyMap) {
|
||||
public QueueRegion(RegionBroker broker,DestinationStatistics destinationStatistics, UsageManager memoryManager, TaskRunnerFactory taskRunnerFactory,
|
||||
PersistenceAdapter persistenceAdapter) {
|
||||
super(broker,destinationStatistics, memoryManager, taskRunnerFactory, persistenceAdapter);
|
||||
this.policyMap = policyMap;
|
||||
}
|
||||
|
||||
public String toString() {
|
||||
|
@ -62,8 +61,8 @@ public class QueueRegion extends AbstractRegion {
|
|||
}
|
||||
|
||||
protected void configureQueue(Queue queue, ActiveMQDestination destination) {
|
||||
if (policyMap != null) {
|
||||
PolicyEntry entry = policyMap.getEntryFor(destination);
|
||||
if (broker.getDestinationPolicy() != null) {
|
||||
PolicyEntry entry = broker.getDestinationPolicy().getEntryFor(destination);
|
||||
if (entry != null) {
|
||||
entry.configure(queue);
|
||||
}
|
||||
|
|
|
@ -83,16 +83,13 @@ public class RegionBroker implements Broker {
|
|||
private Map clientIdSet = new HashMap(); // we will synchronize access
|
||||
protected PersistenceAdapter adaptor;
|
||||
|
||||
|
||||
public RegionBroker(BrokerService brokerService,TaskRunnerFactory taskRunnerFactory, UsageManager memoryManager, PersistenceAdapter adapter) throws IOException {
|
||||
this(brokerService,taskRunnerFactory, memoryManager, createDefaultPersistenceAdapter(memoryManager), null);
|
||||
}
|
||||
|
||||
public RegionBroker(BrokerService brokerService,TaskRunnerFactory taskRunnerFactory, UsageManager memoryManager, PersistenceAdapter adapter, PolicyMap policyMap) throws IOException {
|
||||
this.brokerService = brokerService;
|
||||
this.sequenceGenerator.setLastSequenceId( adapter.getLastMessageBrokerSequenceId() );
|
||||
this.adaptor = adapter;//weird - both are valid spellings ...
|
||||
queueRegion = createQueueRegion(memoryManager, taskRunnerFactory, adapter, policyMap);
|
||||
topicRegion = createTopicRegion(memoryManager, taskRunnerFactory, adapter, policyMap);
|
||||
queueRegion = createQueueRegion(memoryManager, taskRunnerFactory, adapter);
|
||||
topicRegion = createTopicRegion(memoryManager, taskRunnerFactory, adapter);
|
||||
|
||||
tempQueueRegion = createTempQueueRegion(memoryManager, taskRunnerFactory);
|
||||
tempTopicRegion = createTempTopicRegion(memoryManager, taskRunnerFactory);
|
||||
|
@ -113,12 +110,12 @@ public class RegionBroker implements Broker {
|
|||
return new TempQueueRegion(this,destinationStatistics, memoryManager, taskRunnerFactory);
|
||||
}
|
||||
|
||||
protected Region createTopicRegion(UsageManager memoryManager, TaskRunnerFactory taskRunnerFactory, PersistenceAdapter adapter, PolicyMap policyMap) {
|
||||
return new TopicRegion(this,destinationStatistics, memoryManager, taskRunnerFactory, adapter, policyMap);
|
||||
protected Region createTopicRegion(UsageManager memoryManager, TaskRunnerFactory taskRunnerFactory, PersistenceAdapter adapter) {
|
||||
return new TopicRegion(this,destinationStatistics, memoryManager, taskRunnerFactory, adapter);
|
||||
}
|
||||
|
||||
protected Region createQueueRegion(UsageManager memoryManager, TaskRunnerFactory taskRunnerFactory, PersistenceAdapter adapter, PolicyMap policyMap) {
|
||||
return new QueueRegion(this,destinationStatistics, memoryManager, taskRunnerFactory, adapter, policyMap);
|
||||
protected Region createQueueRegion(UsageManager memoryManager, TaskRunnerFactory taskRunnerFactory, PersistenceAdapter adapter) {
|
||||
return new QueueRegion(this,destinationStatistics, memoryManager, taskRunnerFactory, adapter);
|
||||
}
|
||||
|
||||
private static PersistenceAdapter createDefaultPersistenceAdapter(UsageManager memoryManager) throws IOException {
|
||||
|
@ -142,6 +139,10 @@ public class RegionBroker implements Broker {
|
|||
ss.stop(tempTopicRegion);
|
||||
ss.throwFirstException();
|
||||
}
|
||||
|
||||
public PolicyMap getDestinationPolicy(){
|
||||
return brokerService != null ? brokerService.getDestinationPolicy() : null;
|
||||
}
|
||||
|
||||
public void addConnection(ConnectionContext context, ConnectionInfo info) throws Exception {
|
||||
String clientId = info.getClientId();
|
||||
|
|
|
@ -33,7 +33,7 @@ import org.apache.activemq.thread.TaskRunnerFactory;
|
|||
*/
|
||||
public class TempQueueRegion extends AbstractRegion {
|
||||
|
||||
public TempQueueRegion(Broker broker,DestinationStatistics destinationStatistics, UsageManager memoryManager, TaskRunnerFactory taskRunnerFactory) {
|
||||
public TempQueueRegion(RegionBroker broker,DestinationStatistics destinationStatistics, UsageManager memoryManager, TaskRunnerFactory taskRunnerFactory) {
|
||||
super(broker,destinationStatistics, memoryManager, taskRunnerFactory, null);
|
||||
setAutoCreateDestinations(false);
|
||||
}
|
||||
|
|
|
@ -32,7 +32,7 @@ import org.apache.activemq.thread.TaskRunnerFactory;
|
|||
*/
|
||||
public class TempTopicRegion extends AbstractRegion {
|
||||
|
||||
public TempTopicRegion(Broker broker,DestinationStatistics destinationStatistics, UsageManager memoryManager, TaskRunnerFactory taskRunnerFactory) {
|
||||
public TempTopicRegion(RegionBroker broker,DestinationStatistics destinationStatistics, UsageManager memoryManager, TaskRunnerFactory taskRunnerFactory) {
|
||||
super(broker,destinationStatistics, memoryManager, taskRunnerFactory, null);
|
||||
setAutoCreateDestinations(false);
|
||||
}
|
||||
|
|
|
@ -49,12 +49,12 @@ public class TopicRegion extends AbstractRegion {
|
|||
private static final Log log = LogFactory.getLog(TopicRegion.class);
|
||||
|
||||
protected final ConcurrentHashMap durableSubscriptions = new ConcurrentHashMap();
|
||||
private final PolicyMap policyMap;
|
||||
|
||||
|
||||
public TopicRegion(Broker broker,DestinationStatistics destinationStatistics, UsageManager memoryManager, TaskRunnerFactory taskRunnerFactory,
|
||||
PersistenceAdapter persistenceAdapter, PolicyMap policyMap) {
|
||||
public TopicRegion(RegionBroker broker,DestinationStatistics destinationStatistics, UsageManager memoryManager, TaskRunnerFactory taskRunnerFactory,
|
||||
PersistenceAdapter persistenceAdapter) {
|
||||
super(broker,destinationStatistics, memoryManager, taskRunnerFactory, persistenceAdapter);
|
||||
this.policyMap = policyMap;
|
||||
|
||||
}
|
||||
|
||||
public void addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
|
||||
|
@ -182,8 +182,8 @@ public class TopicRegion extends AbstractRegion {
|
|||
}
|
||||
|
||||
protected void configureTopic(Topic topic, ActiveMQDestination destination) {
|
||||
if (policyMap != null) {
|
||||
PolicyEntry entry = policyMap.getEntryFor(destination);
|
||||
if (broker.getDestinationPolicy() != null) {
|
||||
PolicyEntry entry = broker.getDestinationPolicy().getEntryFor(destination);
|
||||
if (entry != null) {
|
||||
entry.configure(topic);
|
||||
}
|
||||
|
@ -208,8 +208,8 @@ public class TopicRegion extends AbstractRegion {
|
|||
|
||||
// lets configure the subscription depending on the destination
|
||||
ActiveMQDestination destination = info.getDestination();
|
||||
if (destination != null && policyMap != null) {
|
||||
PolicyEntry entry = policyMap.getEntryFor(destination);
|
||||
if (destination != null && broker.getDestinationPolicy() != null) {
|
||||
PolicyEntry entry = broker.getDestinationPolicy().getEntryFor(destination);
|
||||
if (entry != null) {
|
||||
entry.configure(answer);
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue