From 272b846b0cc0e865aba1731a27e8eef5ea744639 Mon Sep 17 00:00:00 2001 From: Hiram Chirino Date: Thu, 12 Sep 2013 11:16:18 -0400 Subject: [PATCH] Fixes AMQ-4723: HTTP Discovery agent should only poll for broker URLs while attempting to connect a transport. --- .../discovery/DiscoveryTransport.java | 24 ++++++++++++ .../org/apache/activemq/util/Suspendable.java | 27 +++++++++++++ .../discovery/http/HTTPDiscoveryAgent.java | 39 ++++++++++++++++++- 3 files changed, 88 insertions(+), 2 deletions(-) create mode 100644 activemq-client/src/main/java/org/apache/activemq/util/Suspendable.java diff --git a/activemq-client/src/main/java/org/apache/activemq/transport/discovery/DiscoveryTransport.java b/activemq-client/src/main/java/org/apache/activemq/transport/discovery/DiscoveryTransport.java index 48cba828d9..40bca475ca 100755 --- a/activemq-client/src/main/java/org/apache/activemq/transport/discovery/DiscoveryTransport.java +++ b/activemq-client/src/main/java/org/apache/activemq/transport/discovery/DiscoveryTransport.java @@ -24,6 +24,7 @@ import org.apache.activemq.command.DiscoveryEvent; import org.apache.activemq.transport.CompositeTransport; import org.apache.activemq.transport.TransportFilter; import org.apache.activemq.util.ServiceStopper; +import org.apache.activemq.util.Suspendable; import org.apache.activemq.util.URISupport; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -103,4 +104,27 @@ public class DiscoveryTransport extends TransportFilter implements DiscoveryList this.parameters = parameters; } + @Override + public void transportResumed() { + if( discoveryAgent instanceof Suspendable ) { + try { + ((Suspendable)discoveryAgent).suspend(); + } catch (Exception e) { + e.printStackTrace(); + } + } + super.transportResumed(); + } + + @Override + public void transportInterupted() { + if( discoveryAgent instanceof Suspendable ) { + try { + ((Suspendable)discoveryAgent).resume(); + } catch (Exception e) { + e.printStackTrace(); + } + } + super.transportInterupted(); + } } diff --git a/activemq-client/src/main/java/org/apache/activemq/util/Suspendable.java b/activemq-client/src/main/java/org/apache/activemq/util/Suspendable.java new file mode 100644 index 0000000000..aa2b9fd64f --- /dev/null +++ b/activemq-client/src/main/java/org/apache/activemq/util/Suspendable.java @@ -0,0 +1,27 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.util; + +/** + * Interface implemented by services that can be suspended and resumed. + */ +public interface Suspendable { + + public void suspend() throws Exception; + public void resume() throws Exception; + +} diff --git a/activemq-http/src/main/java/org/apache/activemq/transport/discovery/http/HTTPDiscoveryAgent.java b/activemq-http/src/main/java/org/apache/activemq/transport/discovery/http/HTTPDiscoveryAgent.java index 9e8d4bdba3..cbce8363de 100644 --- a/activemq-http/src/main/java/org/apache/activemq/transport/discovery/http/HTTPDiscoveryAgent.java +++ b/activemq-http/src/main/java/org/apache/activemq/transport/discovery/http/HTTPDiscoveryAgent.java @@ -31,6 +31,7 @@ import org.apache.activemq.command.DiscoveryEvent; import org.apache.activemq.transport.discovery.DiscoveryAgent; import org.apache.activemq.transport.discovery.DiscoveryListener; import org.apache.activemq.util.IntrospectionSupport; +import org.apache.activemq.util.Suspendable; import org.apache.http.client.HttpClient; import org.apache.http.client.ResponseHandler; import org.apache.http.client.methods.HttpDelete; @@ -41,7 +42,13 @@ import org.apache.http.impl.client.DefaultHttpClient; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class HTTPDiscoveryAgent implements DiscoveryAgent { +public class HTTPDiscoveryAgent implements DiscoveryAgent, Suspendable { + + static enum UpdateState { + SUSPENDED, + RESUMING, + RESUMED + } private static final Logger LOG = LoggerFactory.getLogger(HTTPDiscoveryAgent.class); @@ -65,6 +72,8 @@ public class HTTPDiscoveryAgent implements DiscoveryAgent { private boolean useExponentialBackOff = true; private int maxReconnectAttempts; private final Object sleepMutex = new Object(); + private final Object updateMutex = new Object(); + private UpdateState updateState = UpdateState.RESUMED; private long minConnectTime = 5000; class SimpleDiscoveryEvent extends DiscoveryEvent { @@ -237,7 +246,15 @@ public class HTTPDiscoveryAgent implements DiscoveryAgent { while (running.get()) { try { update(); - Thread.sleep(updateInterval); + synchronized (updateMutex) { + do { + if( updateState == UpdateState.RESUMING ) { + updateState = UpdateState.RESUMED; + } else { + updateMutex.wait(updateInterval); + } + } while( updateState==UpdateState.SUSPENDED && running.get()); + } } catch (InterruptedException e) { return; } @@ -305,6 +322,7 @@ public class HTTPDiscoveryAgent implements DiscoveryAgent { public void stop() throws Exception { if (startCounter.decrementAndGet() == 0) { + resume(); running.set(false); if (thread != null) { thread.join(updateInterval * 3); @@ -340,4 +358,21 @@ public class HTTPDiscoveryAgent implements DiscoveryAgent { public void setStartEmbeddRegistry(boolean startEmbeddRegistry) { this.startEmbeddRegistry = startEmbeddRegistry; } + + + @Override + public void suspend() throws Exception { + synchronized (updateMutex) { + updateState = UpdateState.SUSPENDED; + } + } + + @Override + public void resume() throws Exception { + synchronized (updateMutex) { + updateState = UpdateState.RESUMING; + updateMutex.notify(); + } + } + }