mirror of https://github.com/apache/activemq.git
Fixed the test cases in FanoutTransportBrokerTest also added a fanOutQueues property to enabled fanning out on
Queues. See issue: https://issues.apache.org/activemq/browse/AMQ-1464 git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@587188 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
565588c13d
commit
328229b42b
|
@ -76,6 +76,7 @@ public class FanoutTransport implements CompositeTransport {
|
|||
private int maxReconnectAttempts;
|
||||
private Exception connectionFailure;
|
||||
private FanoutTransportHandler primary;
|
||||
private boolean fanOutQueues;
|
||||
|
||||
static class RequestCounter {
|
||||
|
||||
|
@ -210,13 +211,18 @@ public class FanoutTransport implements CompositeTransport {
|
|||
primary = fanoutHandler;
|
||||
}
|
||||
t.setTransportListener(fanoutHandler);
|
||||
connectedCount++;
|
||||
if (started) {
|
||||
restoreTransport(fanoutHandler);
|
||||
}
|
||||
connectedCount++;
|
||||
} catch (Exception e) {
|
||||
LOG.debug("Connect fail to: " + uri + ", reason: " + e);
|
||||
|
||||
if( fanoutHandler.transport !=null ) {
|
||||
ServiceSupport.dispose(fanoutHandler.transport);
|
||||
fanoutHandler.transport=null;
|
||||
}
|
||||
|
||||
if (maxReconnectAttempts > 0 && ++fanoutHandler.connectFailures >= maxReconnectAttempts) {
|
||||
LOG.error("Failed to connect to transport after: " + fanoutHandler.connectFailures + " attempt(s)");
|
||||
connectionFailure = e;
|
||||
|
@ -418,6 +424,9 @@ public class FanoutTransport implements CompositeTransport {
|
|||
*/
|
||||
private boolean isFanoutCommand(Command command) {
|
||||
if (command.isMessage()) {
|
||||
if( fanOutQueues ) {
|
||||
return true;
|
||||
}
|
||||
return ((Message)command).getDestination().isTopic();
|
||||
}
|
||||
if (command.getDataStructureType() == ConsumerInfo.DATA_STRUCTURE_TYPE) {
|
||||
|
@ -552,4 +561,12 @@ public class FanoutTransport implements CompositeTransport {
|
|||
return true;
|
||||
}
|
||||
|
||||
public boolean isFanOutQueues() {
|
||||
return fanOutQueues;
|
||||
}
|
||||
|
||||
public void setFanOutQueues(boolean fanOutQueues) {
|
||||
this.fanOutQueues = fanOutQueues;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -151,7 +151,7 @@ public class NetworkTestSupport extends BrokerTestSupport {
|
|||
remoteBroker = createRemoteBroker(remotePersistenceAdapter);
|
||||
remoteBroker.start();
|
||||
String brokerId = remoteBroker.getBrokerName();
|
||||
remoteConnector = new TransportConnector(broker.getBroker(), TransportFactory.bind(brokerId, new URI(getRemoteURI())));
|
||||
remoteConnector = new TransportConnector(remoteBroker.getBroker(), TransportFactory.bind(brokerId, new URI(getRemoteURI())));
|
||||
remoteConnector.start();
|
||||
BrokerRegistry.getInstance().bind("remotehost", remoteBroker);
|
||||
}
|
||||
|
|
|
@ -47,8 +47,6 @@ public class FanoutTransportBrokerTest extends NetworkTestSupport {
|
|||
public ActiveMQDestination destination;
|
||||
public int deliveryMode;
|
||||
|
||||
private String remoteURI = "tcp://localhost:0?wireFormat.tcpNoDelayEnabled=true";
|
||||
|
||||
public static Test suite() {
|
||||
return suite(FanoutTransportBrokerTest.class);
|
||||
}
|
||||
|
@ -59,10 +57,10 @@ public class FanoutTransportBrokerTest extends NetworkTestSupport {
|
|||
|
||||
public void initCombosForTestPublisherFansout() {
|
||||
addCombinationValues("deliveryMode", new Object[] {Integer.valueOf(DeliveryMode.NON_PERSISTENT), Integer.valueOf(DeliveryMode.PERSISTENT)});
|
||||
addCombinationValues("destination", new Object[] {new ActiveMQQueue("TEST"), new ActiveMQTopic("TEST")});
|
||||
addCombinationValues("destination", new Object[] {new ActiveMQTopic("TEST")});
|
||||
}
|
||||
|
||||
public void xtestPublisherFansout() throws Exception {
|
||||
public void testPublisherFansout() throws Exception {
|
||||
|
||||
// Start a normal consumer on the local broker
|
||||
StubConnection connection1 = createConnection();
|
||||
|
@ -105,7 +103,7 @@ public class FanoutTransportBrokerTest extends NetworkTestSupport {
|
|||
|
||||
public void initCombosForTestPublisherWaitsForServerToBeUp() {
|
||||
addCombinationValues("deliveryMode", new Object[] {Integer.valueOf(DeliveryMode.NON_PERSISTENT), Integer.valueOf(DeliveryMode.PERSISTENT)});
|
||||
addCombinationValues("destination", new Object[] {new ActiveMQQueue("TEST"), new ActiveMQTopic("TEST")});
|
||||
addCombinationValues("destination", new Object[] {new ActiveMQTopic("TEST")});
|
||||
}
|
||||
|
||||
public void testPublisherWaitsForServerToBeUp() throws Exception {
|
||||
|
@ -177,20 +175,21 @@ public class FanoutTransportBrokerTest extends NetworkTestSupport {
|
|||
|
||||
// Restart the remote server. State should be re-played and the publish
|
||||
// should continue.
|
||||
remoteURI = remoteConnector.getServer().getConnectURI().toString();
|
||||
LOG.info("Restarting Broker");
|
||||
restartRemoteBroker();
|
||||
LOG.info("Broker Restarted");
|
||||
|
||||
// This should reconnect, and resend
|
||||
assertTrue(publishDone.await(10, TimeUnit.SECONDS));
|
||||
assertTrue(publishDone.await(20, TimeUnit.SECONDS));
|
||||
|
||||
}
|
||||
|
||||
protected String getLocalURI() {
|
||||
return "tcp://localhost:0?wireFormat.tcpNoDelayEnabled=true";
|
||||
return "tcp://localhost:61616";
|
||||
}
|
||||
|
||||
protected String getRemoteURI() {
|
||||
return remoteURI;
|
||||
return "tcp://localhost:61617";
|
||||
}
|
||||
|
||||
protected StubConnection createFanoutConnection() throws Exception {
|
||||
|
|
Loading…
Reference in New Issue