Fixes AMQ-4723: HTTP Discovery agent should only poll for broker URLs while attempting to connect a transport.

This commit is contained in:
Hiram Chirino 2013-09-12 11:16:18 -04:00
parent 8f8aa0f5e2
commit 272b846b0c
3 changed files with 88 additions and 2 deletions

View File

@ -24,6 +24,7 @@ import org.apache.activemq.command.DiscoveryEvent;
import org.apache.activemq.transport.CompositeTransport; import org.apache.activemq.transport.CompositeTransport;
import org.apache.activemq.transport.TransportFilter; import org.apache.activemq.transport.TransportFilter;
import org.apache.activemq.util.ServiceStopper; import org.apache.activemq.util.ServiceStopper;
import org.apache.activemq.util.Suspendable;
import org.apache.activemq.util.URISupport; import org.apache.activemq.util.URISupport;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -103,4 +104,27 @@ public class DiscoveryTransport extends TransportFilter implements DiscoveryList
this.parameters = parameters; this.parameters = parameters;
} }
@Override
public void transportResumed() {
if( discoveryAgent instanceof Suspendable ) {
try {
((Suspendable)discoveryAgent).suspend();
} catch (Exception e) {
e.printStackTrace();
}
}
super.transportResumed();
}
@Override
public void transportInterupted() {
if( discoveryAgent instanceof Suspendable ) {
try {
((Suspendable)discoveryAgent).resume();
} catch (Exception e) {
e.printStackTrace();
}
}
super.transportInterupted();
}
} }

View File

@ -0,0 +1,27 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.util;
/**
* Interface implemented by services that can be suspended and resumed.
*/
public interface Suspendable {
public void suspend() throws Exception;
public void resume() throws Exception;
}

View File

@ -31,6 +31,7 @@ import org.apache.activemq.command.DiscoveryEvent;
import org.apache.activemq.transport.discovery.DiscoveryAgent; import org.apache.activemq.transport.discovery.DiscoveryAgent;
import org.apache.activemq.transport.discovery.DiscoveryListener; import org.apache.activemq.transport.discovery.DiscoveryListener;
import org.apache.activemq.util.IntrospectionSupport; import org.apache.activemq.util.IntrospectionSupport;
import org.apache.activemq.util.Suspendable;
import org.apache.http.client.HttpClient; import org.apache.http.client.HttpClient;
import org.apache.http.client.ResponseHandler; import org.apache.http.client.ResponseHandler;
import org.apache.http.client.methods.HttpDelete; import org.apache.http.client.methods.HttpDelete;
@ -41,7 +42,13 @@ import org.apache.http.impl.client.DefaultHttpClient;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
public class HTTPDiscoveryAgent implements DiscoveryAgent { public class HTTPDiscoveryAgent implements DiscoveryAgent, Suspendable {
static enum UpdateState {
SUSPENDED,
RESUMING,
RESUMED
}
private static final Logger LOG = LoggerFactory.getLogger(HTTPDiscoveryAgent.class); private static final Logger LOG = LoggerFactory.getLogger(HTTPDiscoveryAgent.class);
@ -65,6 +72,8 @@ public class HTTPDiscoveryAgent implements DiscoveryAgent {
private boolean useExponentialBackOff = true; private boolean useExponentialBackOff = true;
private int maxReconnectAttempts; private int maxReconnectAttempts;
private final Object sleepMutex = new Object(); private final Object sleepMutex = new Object();
private final Object updateMutex = new Object();
private UpdateState updateState = UpdateState.RESUMED;
private long minConnectTime = 5000; private long minConnectTime = 5000;
class SimpleDiscoveryEvent extends DiscoveryEvent { class SimpleDiscoveryEvent extends DiscoveryEvent {
@ -237,7 +246,15 @@ public class HTTPDiscoveryAgent implements DiscoveryAgent {
while (running.get()) { while (running.get()) {
try { try {
update(); update();
Thread.sleep(updateInterval); synchronized (updateMutex) {
do {
if( updateState == UpdateState.RESUMING ) {
updateState = UpdateState.RESUMED;
} else {
updateMutex.wait(updateInterval);
}
} while( updateState==UpdateState.SUSPENDED && running.get());
}
} catch (InterruptedException e) { } catch (InterruptedException e) {
return; return;
} }
@ -305,6 +322,7 @@ public class HTTPDiscoveryAgent implements DiscoveryAgent {
public void stop() throws Exception { public void stop() throws Exception {
if (startCounter.decrementAndGet() == 0) { if (startCounter.decrementAndGet() == 0) {
resume();
running.set(false); running.set(false);
if (thread != null) { if (thread != null) {
thread.join(updateInterval * 3); thread.join(updateInterval * 3);
@ -340,4 +358,21 @@ public class HTTPDiscoveryAgent implements DiscoveryAgent {
public void setStartEmbeddRegistry(boolean startEmbeddRegistry) { public void setStartEmbeddRegistry(boolean startEmbeddRegistry) {
this.startEmbeddRegistry = startEmbeddRegistry; this.startEmbeddRegistry = startEmbeddRegistry;
} }
@Override
public void suspend() throws Exception {
synchronized (updateMutex) {
updateState = UpdateState.SUSPENDED;
}
}
@Override
public void resume() throws Exception {
synchronized (updateMutex) {
updateState = UpdateState.RESUMING;
updateMutex.notify();
}
}
} }