git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1429878 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Timothy A. Bish 2013-01-07 16:19:35 +00:00
parent f79f7ad426
commit 3cd8da80a6
1 changed files with 53 additions and 67 deletions

View File

@ -224,8 +224,8 @@ public class BrokerService implements Service {
private boolean allowTempAutoCreationOnSend; private boolean allowTempAutoCreationOnSend;
private JobSchedulerStore jobSchedulerStore; private JobSchedulerStore jobSchedulerStore;
private int offlineDurableSubscriberTimeout = -1; private long offlineDurableSubscriberTimeout = -1;
private int offlineDurableSubscriberTaskSchedule = 300000; private long offlineDurableSubscriberTaskSchedule = 300000;
private DestinationFilter virtualConsumerDestinationFilter; private DestinationFilter virtualConsumerDestinationFilter;
private final Object persistenceAdapterLock = new Object(); private final Object persistenceAdapterLock = new Object();
@ -812,8 +812,7 @@ public class BrokerService implements Service {
* @param pollInterval * @param pollInterval
* @throws Exception * @throws Exception
*/ */
public void stopGracefully(String connectorName, String queueName, long timeout, long pollInterval) public void stopGracefully(String connectorName, String queueName, long timeout, long pollInterval) throws Exception {
throws Exception {
if (isUseJmx()) { if (isUseJmx()) {
if (connectorName == null || queueName == null || timeout <= 0) { if (connectorName == null || queueName == null || timeout <= 0) {
throw new Exception( throw new Exception(
@ -1349,14 +1348,13 @@ public class BrokerService implements Service {
* nestedType="org.apache.activemq.broker.TransportConnector" * nestedType="org.apache.activemq.broker.TransportConnector"
*/ */
public void setTransportConnectors(List<TransportConnector> transportConnectors) throws Exception { public void setTransportConnectors(List<TransportConnector> transportConnectors) throws Exception {
for (Iterator<TransportConnector> iter = transportConnectors.iterator(); iter.hasNext();) { for (TransportConnector connector : transportConnectors) {
TransportConnector connector = iter.next();
addConnector(connector); addConnector(connector);
} }
} }
public TransportConnector getTransportConnectorByName(String name){ public TransportConnector getTransportConnectorByName(String name){
for (TransportConnector transportConnector:transportConnectors){ for (TransportConnector transportConnector : transportConnectors){
if (name.equals(transportConnector.getName())){ if (name.equals(transportConnector.getName())){
return transportConnector; return transportConnector;
} }
@ -1365,7 +1363,7 @@ public class BrokerService implements Service {
} }
public TransportConnector getTransportConnectorByScheme(String scheme){ public TransportConnector getTransportConnectorByScheme(String scheme){
for (TransportConnector transportConnector:transportConnectors){ for (TransportConnector transportConnector : transportConnectors){
if (scheme.equals(transportConnector.getUri().getScheme())){ if (scheme.equals(transportConnector.getUri().getScheme())){
return transportConnector; return transportConnector;
} }
@ -1388,10 +1386,9 @@ public class BrokerService implements Service {
* @org.apache.xbean.Property * @org.apache.xbean.Property
* nestedType="org.apache.activemq.network.NetworkConnector" * nestedType="org.apache.activemq.network.NetworkConnector"
*/ */
public void setNetworkConnectors(List networkConnectors) throws Exception { public void setNetworkConnectors(List<?> networkConnectors) throws Exception {
for (Iterator iter = networkConnectors.iterator(); iter.hasNext();) { for (Object connector : networkConnectors) {
NetworkConnector connector = (NetworkConnector) iter.next(); addNetworkConnector((NetworkConnector) connector);
addNetworkConnector(connector);
} }
} }
@ -1399,10 +1396,9 @@ public class BrokerService implements Service {
* Sets the network connectors which this broker will use to connect to * Sets the network connectors which this broker will use to connect to
* other brokers in a federated network * other brokers in a federated network
*/ */
public void setProxyConnectors(List proxyConnectors) throws Exception { public void setProxyConnectors(List<?> proxyConnectors) throws Exception {
for (Iterator iter = proxyConnectors.iterator(); iter.hasNext();) { for (Object connector : proxyConnectors) {
ProxyConnector connector = (ProxyConnector) iter.next(); addProxyConnector((ProxyConnector) connector);
addProxyConnector(connector);
} }
} }
@ -1480,33 +1476,32 @@ public class BrokerService implements Service {
} }
public String getDefaultSocketURIString() { public String getDefaultSocketURIString() {
if (started.get()) {
if (started.get()) { if (this.defaultSocketURIString == null) {
if (this.defaultSocketURIString == null) { for (TransportConnector tc:this.transportConnectors) {
for (TransportConnector tc:this.transportConnectors) { String result = null;
String result = null; try {
try { result = tc.getPublishableConnectString();
result = tc.getPublishableConnectString(); } catch (Exception e) {
} catch (Exception e) { LOG.warn("Failed to get the ConnectURI for "+tc,e);
LOG.warn("Failed to get the ConnectURI for "+tc,e); }
} if (result != null) {
if (result != null) { // find first publishable uri
// find first publishable uri if (tc.isUpdateClusterClients() || tc.isRebalanceClusterClients()) {
if (tc.isUpdateClusterClients() || tc.isRebalanceClusterClients()) { this.defaultSocketURIString = result;
break;
} else {
// or use the first defined
if (this.defaultSocketURIString == null) {
this.defaultSocketURIString = result; this.defaultSocketURIString = result;
break;
} else {
// or use the first defined
if (this.defaultSocketURIString == null) {
this.defaultSocketURIString = result;
}
} }
} }
} }
} }
return this.defaultSocketURIString;
} }
return this.defaultSocketURIString;
}
return null; return null;
} }
@ -1815,7 +1810,6 @@ public class BrokerService implements Service {
* @throws Exception * @throws Exception
*/ */
protected void processHelperProperties() throws Exception { protected void processHelperProperties() throws Exception {
boolean masterServiceExists = false;
if (transportConnectorURIs != null) { if (transportConnectorURIs != null) {
for (int i = 0; i < transportConnectorURIs.length; i++) { for (int i = 0; i < transportConnectorURIs.length; i++) {
String uri = transportConnectorURIs[i]; String uri = transportConnectorURIs[i];
@ -1994,8 +1988,7 @@ public class BrokerService implements Service {
} }
protected void unregisterPersistenceAdapterMBean(PersistenceAdapter adaptor) throws IOException { protected void unregisterPersistenceAdapterMBean(PersistenceAdapter adaptor) throws IOException {
if (isUseJmx()) { if (isUseJmx()) {}
}
} }
private ObjectName createConnectorObjectName(TransportConnector connector) throws MalformedObjectNameException { private ObjectName createConnectorObjectName(TransportConnector connector) throws MalformedObjectNameException {
@ -2015,16 +2008,13 @@ public class BrokerService implements Service {
} }
} }
protected ObjectName createNetworkConnectorObjectName(NetworkConnector connector) protected ObjectName createNetworkConnectorObjectName(NetworkConnector connector) throws MalformedObjectNameException {
throws MalformedObjectNameException {
String objectNameStr = getBrokerObjectName().toString(); String objectNameStr = getBrokerObjectName().toString();
objectNameStr += ",connector=networkConnectors,networkConnectorName="+ JMXSupport.encodeObjectNamePart(connector.getName()); objectNameStr += ",connector=networkConnectors,networkConnectorName="+ JMXSupport.encodeObjectNamePart(connector.getName());
return new ObjectName(objectNameStr); return new ObjectName(objectNameStr);
} }
public ObjectName createDuplexNetworkConnectorObjectName(String transport) throws MalformedObjectNameException {
public ObjectName createDuplexNetworkConnectorObjectName(String transport)
throws MalformedObjectNameException {
String objectNameStr = getBrokerObjectName().toString(); String objectNameStr = getBrokerObjectName().toString();
objectNameStr += ",connector=duplexNetworkConnectors,networkConnectorName="+ JMXSupport.encodeObjectNamePart(transport); objectNameStr += ",connector=duplexNetworkConnectors,networkConnectorName="+ JMXSupport.encodeObjectNamePart(transport);
return new ObjectName(objectNameStr); return new ObjectName(objectNameStr);
@ -2053,8 +2043,6 @@ public class BrokerService implements Service {
} }
} }
protected void registerJmsConnectorMBean(JmsConnector connector) throws IOException { protected void registerJmsConnectorMBean(JmsConnector connector) throws IOException {
JmsConnectorView view = new JmsConnectorView(connector); JmsConnectorView view = new JmsConnectorView(connector);
try { try {
@ -2125,9 +2113,9 @@ public class BrokerService implements Service {
RegionBroker regionBroker; RegionBroker regionBroker;
if (isUseJmx()) { if (isUseJmx()) {
try { try {
regionBroker = new ManagedRegionBroker(this, getManagementContext(), getBrokerObjectName(), regionBroker = new ManagedRegionBroker(this, getManagementContext(), getBrokerObjectName(),
getTaskRunnerFactory(), getConsumerSystemUsage(), destinationFactory, destinationInterceptor,getScheduler(),getExecutor()); getTaskRunnerFactory(), getConsumerSystemUsage(), destinationFactory, destinationInterceptor,getScheduler(),getExecutor());
}catch(MalformedObjectNameException me){ } catch(MalformedObjectNameException me){
LOG.error("Couldn't create ManagedRegionBroker",me); LOG.error("Couldn't create ManagedRegionBroker",me);
throw new IOException(me); throw new IOException(me);
} }
@ -2179,7 +2167,6 @@ public class BrokerService implements Service {
if (isUseJmx()) { if (isUseJmx()) {
JobSchedulerViewMBean view = new JobSchedulerView(sb.getJobScheduler()); JobSchedulerViewMBean view = new JobSchedulerView(sb.getJobScheduler());
try { try {
String objectNameStr = getBrokerObjectName().toString(); String objectNameStr = getBrokerObjectName().toString();
objectNameStr += ",service=JobScheduler,name=JMS"; objectNameStr += ",service=JobScheduler,name=JMS";
ObjectName objectName = new ObjectName(objectNameStr); ObjectName objectName = new ObjectName(objectNameStr);
@ -2189,7 +2176,6 @@ public class BrokerService implements Service {
throw IOExceptionSupport.create("JobScheduler could not be registered in JMX: " throw IOExceptionSupport.create("JobScheduler could not be registered in JMX: "
+ e.getMessage(), e); + e.getMessage(), e);
} }
} }
broker = sb; broker = sb;
} }
@ -2262,7 +2248,7 @@ public class BrokerService implements Service {
/** /**
* Extracts the port from the options * Extracts the port from the options
*/ */
protected Object getPort(Map options) { protected Object getPort(Map<?,?> options) {
Object port = options.get("port"); Object port = options.get("port");
if (port == null) { if (port == null) {
port = DEFAULT_PORT; port = DEFAULT_PORT;
@ -2395,16 +2381,16 @@ public class BrokerService implements Service {
if (isNetworkConnectorStartAsync()) { if (isNetworkConnectorStartAsync()) {
// spin up as many threads as needed // spin up as many threads as needed
networkConnectorStartExecutor = new ThreadPoolExecutor(0, Integer.MAX_VALUE, networkConnectorStartExecutor = new ThreadPoolExecutor(0, Integer.MAX_VALUE,
10, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), 10, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(),
new ThreadFactory() { new ThreadFactory() {
int count=0; int count=0;
@Override @Override
public Thread newThread(Runnable runnable) { public Thread newThread(Runnable runnable) {
Thread thread = new Thread(runnable, "NetworkConnector Start Thread-" +(count++)); Thread thread = new Thread(runnable, "NetworkConnector Start Thread-" +(count++));
thread.setDaemon(true); thread.setDaemon(true);
return thread; return thread;
} }
}); });
} }
for (Iterator<NetworkConnector> iter = getNetworkConnectors().iterator(); iter.hasNext();) { for (Iterator<NetworkConnector> iter = getNetworkConnectors().iterator(); iter.hasNext();) {
@ -2819,25 +2805,25 @@ public class BrokerService implements Service {
this.allowTempAutoCreationOnSend = allowTempAutoCreationOnSend; this.allowTempAutoCreationOnSend = allowTempAutoCreationOnSend;
} }
public int getOfflineDurableSubscriberTimeout() { public long getOfflineDurableSubscriberTimeout() {
return offlineDurableSubscriberTimeout; return offlineDurableSubscriberTimeout;
} }
public void setOfflineDurableSubscriberTimeout(int offlineDurableSubscriberTimeout) { public void setOfflineDurableSubscriberTimeout(long offlineDurableSubscriberTimeout) {
this.offlineDurableSubscriberTimeout = offlineDurableSubscriberTimeout; this.offlineDurableSubscriberTimeout = offlineDurableSubscriberTimeout;
} }
public int getOfflineDurableSubscriberTaskSchedule() { public long getOfflineDurableSubscriberTaskSchedule() {
return offlineDurableSubscriberTaskSchedule; return offlineDurableSubscriberTaskSchedule;
} }
public void setOfflineDurableSubscriberTaskSchedule(int offlineDurableSubscriberTaskSchedule) { public void setOfflineDurableSubscriberTaskSchedule(long offlineDurableSubscriberTaskSchedule) {
this.offlineDurableSubscriberTaskSchedule = offlineDurableSubscriberTaskSchedule; this.offlineDurableSubscriberTaskSchedule = offlineDurableSubscriberTaskSchedule;
} }
public boolean shouldRecordVirtualDestination(ActiveMQDestination destination) { public boolean shouldRecordVirtualDestination(ActiveMQDestination destination) {
return isUseVirtualTopics() && destination.isQueue() && return isUseVirtualTopics() && destination.isQueue() &&
getVirtualTopicConsumerDestinationFilter().matches(destination); getVirtualTopicConsumerDestinationFilter().matches(destination);
} }
public Throwable getStartException() { public Throwable getStartException() {