mirror of https://github.com/apache/activemq.git
git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1177797 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
11ddb747d7
commit
633428c6e3
|
@ -81,8 +81,8 @@ import org.slf4j.LoggerFactory;
|
|||
|
||||
/**
|
||||
* A useful base class for implementing demand forwarding bridges.
|
||||
*
|
||||
*
|
||||
*
|
||||
*
|
||||
*/
|
||||
public abstract class DemandForwardingBridgeSupport implements NetworkBridge, BrokerServiceAware {
|
||||
private static final Logger LOG = LoggerFactory.getLogger(DemandForwardingBridgeSupport.class);
|
||||
|
@ -231,7 +231,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
|
|||
} catch (IOException e) {
|
||||
LOG.warn("Caught exception from remote start", e);
|
||||
}
|
||||
} else {
|
||||
} else {
|
||||
LOG.warn ("Bridge was disposed before the start() method was fully executed.");
|
||||
throw new TransportDisposedIOException();
|
||||
}
|
||||
|
@ -339,6 +339,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
|
|||
IntrospectionSupport.getProperties(configuration, props, null);
|
||||
String str = MarshallingSupport.propertiesToString(props);
|
||||
brokerInfo.setNetworkProperties(str);
|
||||
localBrokerIdKnownLatch.await();
|
||||
brokerInfo.setBrokerId(this.localBrokerId);
|
||||
remoteBroker.oneway(brokerInfo);
|
||||
}
|
||||
|
@ -487,7 +488,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
|
|||
if (isDuplex()) {
|
||||
if (command.isMessage()) {
|
||||
ActiveMQMessage message = (ActiveMQMessage) command;
|
||||
if (AdvisorySupport.isConsumerAdvisoryTopic(message.getDestination())
|
||||
if (AdvisorySupport.isConsumerAdvisoryTopic(message.getDestination())
|
||||
|| AdvisorySupport.isDestinationAdvisoryTopic(message.getDestination())) {
|
||||
serviceRemoteConsumerAdvisory(message.getDataStructure());
|
||||
} else {
|
||||
|
@ -708,7 +709,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
|
|||
final MessageDispatch md = (MessageDispatch) command;
|
||||
final DemandSubscription sub = subscriptionMapByLocalId.get(md.getConsumerId());
|
||||
if (sub != null && md.getMessage() != null && sub.incrementOutstandingResponses()) {
|
||||
|
||||
|
||||
if (suppressMessageDispatch(md, sub)) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug(configuration.getBrokerName() + " message not forwarded to " + remoteBrokerName + " because message came from there or fails networkTTL, brokerPath: " + Arrays.toString(md.getMessage().getBrokerPath()) + ", message: " + md.getMessage());
|
||||
|
@ -721,14 +722,14 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
|
|||
}
|
||||
return;
|
||||
}
|
||||
|
||||
|
||||
Message message = configureMessage(md);
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("bridging (" + configuration.getBrokerName() + " -> " + remoteBrokerName + ") " + message.getMessageId() + ", consumer: " + md.getConsumerId() + ", destination " + message.getDestination() + ", brokerPath: " + Arrays.toString(message.getBrokerPath()) + ", message: " + message);
|
||||
}
|
||||
|
||||
|
||||
if (!message.isResponseRequired()) {
|
||||
|
||||
|
||||
// If the message was originally sent using async
|
||||
// send, we will preserve that QOS
|
||||
// by bridging it using an async send (small chance
|
||||
|
@ -740,9 +741,9 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
|
|||
} finally {
|
||||
sub.decrementOutstandingResponses();
|
||||
}
|
||||
|
||||
|
||||
} else {
|
||||
|
||||
|
||||
// The message was not sent using async send, so we
|
||||
// should only ack the local
|
||||
// broker when we get confirmation that the remote
|
||||
|
@ -757,7 +758,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
|
|||
} else {
|
||||
localBroker.oneway(new MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE, 1));
|
||||
dequeueCounter.incrementAndGet();
|
||||
}
|
||||
}
|
||||
} catch (IOException e) {
|
||||
serviceLocalException(e);
|
||||
} finally {
|
||||
|
@ -765,9 +766,9 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
|
|||
}
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
remoteBroker.asyncRequest(message, callback);
|
||||
|
||||
|
||||
}
|
||||
} else {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
|
@ -1042,7 +1043,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
|
|||
}
|
||||
|
||||
List<ConsumerId> candidateConsumers = consumerInfo.getNetworkConsumerIds();
|
||||
Collection<Subscription> currentSubs =
|
||||
Collection<Subscription> currentSubs =
|
||||
getRegionSubscriptions(consumerInfo.getDestination().isTopic());
|
||||
for (Subscription sub : currentSubs) {
|
||||
List<ConsumerId> networkConsumers = sub.getConsumerInfo().getNetworkConsumerIds();
|
||||
|
@ -1070,7 +1071,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
|
|||
if (existingSub.getConsumerInfo().getPriority() >= candidateInfo.getPriority()) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug(configuration.getBrokerName() + " Ignoring duplicate subscription from " + remoteBrokerName
|
||||
+ ", sub: " + candidateInfo + " is duplicated by network subscription with equal or higher network priority: "
|
||||
+ ", sub: " + candidateInfo + " is duplicated by network subscription with equal or higher network priority: "
|
||||
+ existingSub + ", networkConsumerIds: " + existingSub.getConsumerInfo().getNetworkConsumerIds());
|
||||
}
|
||||
suppress = true;
|
||||
|
@ -1082,7 +1083,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
|
|||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug(configuration.getBrokerName() + " Replacing duplicate subscription " + existingSub.getConsumerInfo()
|
||||
+ " with sub from " + remoteBrokerName
|
||||
+ ", which has a higher priority, new sub: " + candidateInfo + ", networkComsumerIds: "
|
||||
+ ", which has a higher priority, new sub: " + candidateInfo + ", networkComsumerIds: "
|
||||
+ candidateInfo.getNetworkConsumerIds());
|
||||
}
|
||||
} catch (IOException e) {
|
||||
|
@ -1113,7 +1114,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
|
|||
|
||||
private final Collection<Subscription> getRegionSubscriptions(boolean isTopic) {
|
||||
RegionBroker region = (RegionBroker) brokerService.getRegionBroker();
|
||||
AbstractRegion abstractRegion = (AbstractRegion)
|
||||
AbstractRegion abstractRegion = (AbstractRegion)
|
||||
(isTopic ? region.getTopicRegion() : region.getQueueRegion());
|
||||
return abstractRegion.getSubscriptions().values();
|
||||
}
|
||||
|
|
|
@ -0,0 +1,195 @@
|
|||
package org.apache.activemq.bugs;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.URI;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.activemq.broker.BrokerService;
|
||||
import org.apache.activemq.broker.Connection;
|
||||
import org.apache.activemq.broker.TransportConnector;
|
||||
import org.apache.activemq.command.BrokerInfo;
|
||||
import org.apache.activemq.network.DiscoveryNetworkConnector;
|
||||
import org.apache.activemq.thread.Task;
|
||||
import org.apache.activemq.thread.TaskRunner;
|
||||
import org.apache.activemq.thread.TaskRunnerFactory;
|
||||
import org.apache.activemq.transport.Transport;
|
||||
import org.apache.activemq.transport.TransportFactory;
|
||||
import org.apache.activemq.transport.TransportListener;
|
||||
import org.apache.activemq.transport.TransportServer;
|
||||
import org.apache.activemq.transport.discovery.simple.SimpleDiscoveryAgent;
|
||||
import org.junit.After;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
/**
|
||||
* This test involves the creation of a local and remote broker, both of which
|
||||
* communicate over VM and TCP. The local broker establishes a bridge to the
|
||||
* remote broker for the purposes of verifying that broker info is only
|
||||
* transfered once the local broker's ID is known to the bridge support.
|
||||
*/
|
||||
public class AMQ3014Test {
|
||||
// Change this URL to be an unused port.
|
||||
private static final String REMOTE_BROKER_URL = "tcp://localhost:50000";
|
||||
|
||||
private List<BrokerInfo> remoteBrokerInfos = Collections
|
||||
.synchronizedList(new ArrayList<BrokerInfo>());
|
||||
|
||||
private BrokerService localBroker = new BrokerService();
|
||||
|
||||
// Override the "remote" broker so that it records all (remote) BrokerInfos
|
||||
// that it receives.
|
||||
private BrokerService remoteBroker = new BrokerService() {
|
||||
@Override
|
||||
protected TransportConnector createTransportConnector(URI brokerURI)
|
||||
throws Exception {
|
||||
TransportServer transport = TransportFactory.bind(this, brokerURI);
|
||||
return new TransportConnector(transport) {
|
||||
@Override
|
||||
protected Connection createConnection(Transport transport)
|
||||
throws IOException {
|
||||
Connection connection = super.createConnection(transport);
|
||||
final TransportListener proxiedListener = transport
|
||||
.getTransportListener();
|
||||
transport.setTransportListener(new TransportListener() {
|
||||
|
||||
@Override
|
||||
public void onCommand(Object command) {
|
||||
if (command instanceof BrokerInfo) {
|
||||
remoteBrokerInfos.add((BrokerInfo) command);
|
||||
}
|
||||
proxiedListener.onCommand(command);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onException(IOException error) {
|
||||
proxiedListener.onException(error);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void transportInterupted() {
|
||||
proxiedListener.transportInterupted();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void transportResumed() {
|
||||
proxiedListener.transportResumed();
|
||||
}
|
||||
});
|
||||
return connection;
|
||||
}
|
||||
|
||||
};
|
||||
}
|
||||
};
|
||||
|
||||
@Before
|
||||
public void init() throws Exception {
|
||||
localBroker.setBrokerName("localBroker");
|
||||
localBroker.setPersistent(false);
|
||||
localBroker.setUseJmx(false);
|
||||
localBroker.setSchedulerSupport(false);
|
||||
|
||||
remoteBroker.setBrokerName("remoteBroker");
|
||||
remoteBroker.setPersistent(false);
|
||||
remoteBroker.setUseJmx(false);
|
||||
remoteBroker.addConnector(REMOTE_BROKER_URL);
|
||||
remoteBroker.setSchedulerSupport(false);
|
||||
}
|
||||
|
||||
@After
|
||||
public void cleanup() throws Exception {
|
||||
try {
|
||||
localBroker.stop();
|
||||
} finally {
|
||||
remoteBroker.stop();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* This test verifies that the local broker's ID is typically known by the
|
||||
* bridge support before the local broker's BrokerInfo is sent to the remote
|
||||
* broker.
|
||||
*/
|
||||
@Test
|
||||
public void NormalCaseTest() throws Exception {
|
||||
runTest(0, 3000);
|
||||
}
|
||||
|
||||
/**
|
||||
* This test verifies that timing can arise under which the local broker's
|
||||
* ID is not known by the bridge support before the local broker's
|
||||
* BrokerInfo is sent to the remote broker.
|
||||
*/
|
||||
@Test
|
||||
public void DelayedCaseTest() throws Exception {
|
||||
runTest(500, 3000);
|
||||
}
|
||||
|
||||
private void runTest(final long taskRunnerDelay, long timeout)
|
||||
throws Exception {
|
||||
// Add a network connector to the local broker that will create a bridge
|
||||
// to the remote broker.
|
||||
DiscoveryNetworkConnector dnc = new DiscoveryNetworkConnector();
|
||||
SimpleDiscoveryAgent da = new SimpleDiscoveryAgent();
|
||||
da.setServices(REMOTE_BROKER_URL);
|
||||
dnc.setDiscoveryAgent(da);
|
||||
localBroker.addNetworkConnector(dnc);
|
||||
|
||||
// Before starting the local broker, intercept the task runner factory
|
||||
// so that the
|
||||
// local VMTransport dispatcher is artificially delayed.
|
||||
final TaskRunnerFactory realTaskRunnerFactory = localBroker
|
||||
.getTaskRunnerFactory();
|
||||
localBroker.setTaskRunnerFactory(new TaskRunnerFactory() {
|
||||
public TaskRunner createTaskRunner(Task task, String name) {
|
||||
final TaskRunner realTaskRunner = realTaskRunnerFactory
|
||||
.createTaskRunner(task, name);
|
||||
if (name.startsWith("ActiveMQ Connection Dispatcher: ")) {
|
||||
return new TaskRunner() {
|
||||
@Override
|
||||
public void shutdown() throws InterruptedException {
|
||||
realTaskRunner.shutdown();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void shutdown(long timeout)
|
||||
throws InterruptedException {
|
||||
realTaskRunner.shutdown(timeout);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void wakeup() throws InterruptedException {
|
||||
Thread.sleep(taskRunnerDelay);
|
||||
realTaskRunner.wakeup();
|
||||
}
|
||||
};
|
||||
} else {
|
||||
return realTaskRunnerFactory.createTaskRunner(task, name);
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
// Start the brokers and wait for the bridge to be created; the remote
|
||||
// broker is started first to ensure it is available for the local
|
||||
// broker to connect to.
|
||||
remoteBroker.start();
|
||||
localBroker.start();
|
||||
|
||||
// Wait for the remote broker to receive the local broker's BrokerInfo
|
||||
// and then verify the local broker's ID is known.
|
||||
long startTimeMillis = System.currentTimeMillis();
|
||||
while (remoteBrokerInfos.isEmpty()
|
||||
&& (System.currentTimeMillis() - startTimeMillis) < timeout) {
|
||||
Thread.sleep(100);
|
||||
}
|
||||
|
||||
Assert.assertFalse("Timed out waiting for bridge to form.",
|
||||
remoteBrokerInfos.isEmpty());
|
||||
;
|
||||
Assert.assertNotNull("Local broker ID is null.", remoteBrokerInfos.get(
|
||||
0).getBrokerId());
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue