mirror of https://github.com/apache/activemq.git
[AMQ-6640] either broker dispatch to bridge or bridge dispatch to broker needs to be async - dispatchAsync network option is the trigger for the vm transport to be sync in line with the current defaults. original BacklogNetworkCrossTalkTest scenario exposed this. upshot is dispatchAsync=false is not compatible with duplicate subscription suppression which is fair.
This commit is contained in:
parent
5ac9657c12
commit
4ef1fc74cf
|
@ -1428,7 +1428,7 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
|
|||
}
|
||||
setDuplexNetworkConnectorId(duplexNetworkConnectorId);
|
||||
}
|
||||
Transport localTransport = NetworkBridgeFactory.createLocalTransport(broker.getVmConnectorURI());
|
||||
Transport localTransport = NetworkBridgeFactory.createLocalTransport(config, broker.getVmConnectorURI());
|
||||
Transport remoteBridgeTransport = transport;
|
||||
if (! (remoteBridgeTransport instanceof ResponseCorrelator)) {
|
||||
// the vm transport case is already wrapped
|
||||
|
|
|
@ -58,8 +58,9 @@ public final class NetworkBridgeFactory {
|
|||
return result;
|
||||
}
|
||||
|
||||
public static Transport createLocalTransport(URI uri) throws Exception {
|
||||
return createLocalTransport(uri, false);
|
||||
public static Transport createLocalTransport(NetworkBridgeConfiguration configuration, URI uri) throws Exception {
|
||||
// one end of the localbroker<->bridge transport needs to be async to allow concurrent forwards and acks
|
||||
return createLocalTransport(uri, !configuration.isDispatchAsync());
|
||||
}
|
||||
|
||||
public static Transport createLocalAsyncTransport(URI uri) throws Exception {
|
||||
|
|
|
@ -140,7 +140,7 @@ public abstract class NetworkConnector extends NetworkBridgeConfiguration implem
|
|||
}
|
||||
|
||||
protected Transport createLocalTransport() throws Exception {
|
||||
return NetworkBridgeFactory.createLocalTransport(localURI);
|
||||
return NetworkBridgeFactory.createLocalTransport(this, localURI);
|
||||
}
|
||||
|
||||
public static ActiveMQDestination[] getDurableTopicDestinations(final Set<ActiveMQDestination> durableDestinations) {
|
||||
|
|
|
@ -52,7 +52,7 @@ public class BacklogNetworkCrossTalkTest extends JmsMultipleBrokersTestSupport {
|
|||
|
||||
waitForBridgeFormation();
|
||||
|
||||
final int numMessages = 2000;
|
||||
final int numMessages = 1000;
|
||||
// Create queue
|
||||
ActiveMQDestination destA = createDestination("AAA", false);
|
||||
sendMessages("A", destA, numMessages);
|
||||
|
@ -88,7 +88,7 @@ public class BacklogNetworkCrossTalkTest extends JmsMultipleBrokersTestSupport {
|
|||
|
||||
@Override
|
||||
public void setUp() throws Exception {
|
||||
messageSize = 500;
|
||||
messageSize = 5000;
|
||||
super.setMaxTestTime(10*60*1000);
|
||||
super.setAutoFail(true);
|
||||
super.setUp();
|
||||
|
|
Loading…
Reference in New Issue