From 3cd8da80a687676167681f8e424e9648db79eb66 Mon Sep 17 00:00:00 2001 From: "Timothy A. Bish" Date: Mon, 7 Jan 2013 16:19:35 +0000 Subject: [PATCH] fixes: https://issues.apache.org/jira/browse/AMQ-4246 git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1429878 13f79535-47bb-0310-9956-ffa450edef68 --- .../apache/activemq/broker/BrokerService.java | 120 ++++++++---------- 1 file changed, 53 insertions(+), 67 deletions(-) diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java b/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java index d350dfd4dd..a2bab1c8f8 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java @@ -224,8 +224,8 @@ public class BrokerService implements Service { private boolean allowTempAutoCreationOnSend; private JobSchedulerStore jobSchedulerStore; - private int offlineDurableSubscriberTimeout = -1; - private int offlineDurableSubscriberTaskSchedule = 300000; + private long offlineDurableSubscriberTimeout = -1; + private long offlineDurableSubscriberTaskSchedule = 300000; private DestinationFilter virtualConsumerDestinationFilter; private final Object persistenceAdapterLock = new Object(); @@ -812,8 +812,7 @@ public class BrokerService implements Service { * @param pollInterval * @throws Exception */ - public void stopGracefully(String connectorName, String queueName, long timeout, long pollInterval) - throws Exception { + public void stopGracefully(String connectorName, String queueName, long timeout, long pollInterval) throws Exception { if (isUseJmx()) { if (connectorName == null || queueName == null || timeout <= 0) { throw new Exception( @@ -1349,14 +1348,13 @@ public class BrokerService implements Service { * nestedType="org.apache.activemq.broker.TransportConnector" */ public void setTransportConnectors(List transportConnectors) throws Exception { - for (Iterator iter = transportConnectors.iterator(); iter.hasNext();) { - TransportConnector connector = iter.next(); + for (TransportConnector connector : transportConnectors) { addConnector(connector); } } public TransportConnector getTransportConnectorByName(String name){ - for (TransportConnector transportConnector:transportConnectors){ + for (TransportConnector transportConnector : transportConnectors){ if (name.equals(transportConnector.getName())){ return transportConnector; } @@ -1365,7 +1363,7 @@ public class BrokerService implements Service { } public TransportConnector getTransportConnectorByScheme(String scheme){ - for (TransportConnector transportConnector:transportConnectors){ + for (TransportConnector transportConnector : transportConnectors){ if (scheme.equals(transportConnector.getUri().getScheme())){ return transportConnector; } @@ -1388,10 +1386,9 @@ public class BrokerService implements Service { * @org.apache.xbean.Property * nestedType="org.apache.activemq.network.NetworkConnector" */ - public void setNetworkConnectors(List networkConnectors) throws Exception { - for (Iterator iter = networkConnectors.iterator(); iter.hasNext();) { - NetworkConnector connector = (NetworkConnector) iter.next(); - addNetworkConnector(connector); + public void setNetworkConnectors(List networkConnectors) throws Exception { + for (Object connector : networkConnectors) { + addNetworkConnector((NetworkConnector) connector); } } @@ -1399,10 +1396,9 @@ public class BrokerService implements Service { * Sets the network connectors which this broker will use to connect to * other brokers in a federated network */ - public void setProxyConnectors(List proxyConnectors) throws Exception { - for (Iterator iter = proxyConnectors.iterator(); iter.hasNext();) { - ProxyConnector connector = (ProxyConnector) iter.next(); - addProxyConnector(connector); + public void setProxyConnectors(List proxyConnectors) throws Exception { + for (Object connector : proxyConnectors) { + addProxyConnector((ProxyConnector) connector); } } @@ -1480,33 +1476,32 @@ public class BrokerService implements Service { } public String getDefaultSocketURIString() { - - if (started.get()) { - if (this.defaultSocketURIString == null) { - for (TransportConnector tc:this.transportConnectors) { - String result = null; - try { - result = tc.getPublishableConnectString(); - } catch (Exception e) { - LOG.warn("Failed to get the ConnectURI for "+tc,e); - } - if (result != null) { - // find first publishable uri - if (tc.isUpdateClusterClients() || tc.isRebalanceClusterClients()) { + if (started.get()) { + if (this.defaultSocketURIString == null) { + for (TransportConnector tc:this.transportConnectors) { + String result = null; + try { + result = tc.getPublishableConnectString(); + } catch (Exception e) { + LOG.warn("Failed to get the ConnectURI for "+tc,e); + } + if (result != null) { + // find first publishable uri + if (tc.isUpdateClusterClients() || tc.isRebalanceClusterClients()) { + this.defaultSocketURIString = result; + break; + } else { + // or use the first defined + if (this.defaultSocketURIString == null) { this.defaultSocketURIString = result; - break; - } else { - // or use the first defined - if (this.defaultSocketURIString == null) { - this.defaultSocketURIString = result; - } } } } - } - return this.defaultSocketURIString; + } + return this.defaultSocketURIString; + } return null; } @@ -1815,7 +1810,6 @@ public class BrokerService implements Service { * @throws Exception */ protected void processHelperProperties() throws Exception { - boolean masterServiceExists = false; if (transportConnectorURIs != null) { for (int i = 0; i < transportConnectorURIs.length; i++) { String uri = transportConnectorURIs[i]; @@ -1994,8 +1988,7 @@ public class BrokerService implements Service { } protected void unregisterPersistenceAdapterMBean(PersistenceAdapter adaptor) throws IOException { - if (isUseJmx()) { - } + if (isUseJmx()) {} } private ObjectName createConnectorObjectName(TransportConnector connector) throws MalformedObjectNameException { @@ -2015,16 +2008,13 @@ public class BrokerService implements Service { } } - protected ObjectName createNetworkConnectorObjectName(NetworkConnector connector) - throws MalformedObjectNameException { + protected ObjectName createNetworkConnectorObjectName(NetworkConnector connector) throws MalformedObjectNameException { String objectNameStr = getBrokerObjectName().toString(); objectNameStr += ",connector=networkConnectors,networkConnectorName="+ JMXSupport.encodeObjectNamePart(connector.getName()); return new ObjectName(objectNameStr); } - - public ObjectName createDuplexNetworkConnectorObjectName(String transport) - throws MalformedObjectNameException { + public ObjectName createDuplexNetworkConnectorObjectName(String transport) throws MalformedObjectNameException { String objectNameStr = getBrokerObjectName().toString(); objectNameStr += ",connector=duplexNetworkConnectors,networkConnectorName="+ JMXSupport.encodeObjectNamePart(transport); return new ObjectName(objectNameStr); @@ -2053,8 +2043,6 @@ public class BrokerService implements Service { } } - - protected void registerJmsConnectorMBean(JmsConnector connector) throws IOException { JmsConnectorView view = new JmsConnectorView(connector); try { @@ -2125,9 +2113,9 @@ public class BrokerService implements Service { RegionBroker regionBroker; if (isUseJmx()) { try { - regionBroker = new ManagedRegionBroker(this, getManagementContext(), getBrokerObjectName(), + regionBroker = new ManagedRegionBroker(this, getManagementContext(), getBrokerObjectName(), getTaskRunnerFactory(), getConsumerSystemUsage(), destinationFactory, destinationInterceptor,getScheduler(),getExecutor()); - }catch(MalformedObjectNameException me){ + } catch(MalformedObjectNameException me){ LOG.error("Couldn't create ManagedRegionBroker",me); throw new IOException(me); } @@ -2179,7 +2167,6 @@ public class BrokerService implements Service { if (isUseJmx()) { JobSchedulerViewMBean view = new JobSchedulerView(sb.getJobScheduler()); try { - String objectNameStr = getBrokerObjectName().toString(); objectNameStr += ",service=JobScheduler,name=JMS"; ObjectName objectName = new ObjectName(objectNameStr); @@ -2189,7 +2176,6 @@ public class BrokerService implements Service { throw IOExceptionSupport.create("JobScheduler could not be registered in JMX: " + e.getMessage(), e); } - } broker = sb; } @@ -2262,7 +2248,7 @@ public class BrokerService implements Service { /** * Extracts the port from the options */ - protected Object getPort(Map options) { + protected Object getPort(Map options) { Object port = options.get("port"); if (port == null) { port = DEFAULT_PORT; @@ -2395,16 +2381,16 @@ public class BrokerService implements Service { if (isNetworkConnectorStartAsync()) { // spin up as many threads as needed networkConnectorStartExecutor = new ThreadPoolExecutor(0, Integer.MAX_VALUE, - 10, TimeUnit.SECONDS, new SynchronousQueue(), - new ThreadFactory() { - int count=0; - @Override - public Thread newThread(Runnable runnable) { - Thread thread = new Thread(runnable, "NetworkConnector Start Thread-" +(count++)); - thread.setDaemon(true); - return thread; - } - }); + 10, TimeUnit.SECONDS, new SynchronousQueue(), + new ThreadFactory() { + int count=0; + @Override + public Thread newThread(Runnable runnable) { + Thread thread = new Thread(runnable, "NetworkConnector Start Thread-" +(count++)); + thread.setDaemon(true); + return thread; + } + }); } for (Iterator iter = getNetworkConnectors().iterator(); iter.hasNext();) { @@ -2819,25 +2805,25 @@ public class BrokerService implements Service { this.allowTempAutoCreationOnSend = allowTempAutoCreationOnSend; } - public int getOfflineDurableSubscriberTimeout() { + public long getOfflineDurableSubscriberTimeout() { return offlineDurableSubscriberTimeout; } - public void setOfflineDurableSubscriberTimeout(int offlineDurableSubscriberTimeout) { + public void setOfflineDurableSubscriberTimeout(long offlineDurableSubscriberTimeout) { this.offlineDurableSubscriberTimeout = offlineDurableSubscriberTimeout; } - public int getOfflineDurableSubscriberTaskSchedule() { + public long getOfflineDurableSubscriberTaskSchedule() { return offlineDurableSubscriberTaskSchedule; } - public void setOfflineDurableSubscriberTaskSchedule(int offlineDurableSubscriberTaskSchedule) { + public void setOfflineDurableSubscriberTaskSchedule(long offlineDurableSubscriberTaskSchedule) { this.offlineDurableSubscriberTaskSchedule = offlineDurableSubscriberTaskSchedule; } public boolean shouldRecordVirtualDestination(ActiveMQDestination destination) { return isUseVirtualTopics() && destination.isQueue() && - getVirtualTopicConsumerDestinationFilter().matches(destination); + getVirtualTopicConsumerDestinationFilter().matches(destination); } public Throwable getStartException() {