From a9119eca19a8a427d2f99693bf98553d8bba8bb2 Mon Sep 17 00:00:00 2001 From: Robert Davies Date: Wed, 10 Oct 2007 19:53:07 +0000 Subject: [PATCH] Fix for NPE in duplex network connection git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@583595 13f79535-47bb-0310-9956-ffa450edef68 --- .../MapTransportConnectionStateRegister.java | 125 ++++++++++++++ ...ingleTransportConnectionStateRegister.java | 155 ++++++++++++++++++ .../activemq/broker/TransportConnection.java | 145 ++++------------ .../broker/TransportConnectionState.java | 74 +++++++++ .../TransportConnectionStateRegister.java | 60 +++++++ .../DemandForwardingBridgeSupport.java | 11 +- .../activemq/network/SimpleNetworkTest.java | 9 +- 7 files changed, 459 insertions(+), 120 deletions(-) create mode 100755 activemq-core/src/main/java/org/apache/activemq/broker/MapTransportConnectionStateRegister.java create mode 100755 activemq-core/src/main/java/org/apache/activemq/broker/SingleTransportConnectionStateRegister.java create mode 100755 activemq-core/src/main/java/org/apache/activemq/broker/TransportConnectionState.java create mode 100755 activemq-core/src/main/java/org/apache/activemq/broker/TransportConnectionStateRegister.java diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/MapTransportConnectionStateRegister.java b/activemq-core/src/main/java/org/apache/activemq/broker/MapTransportConnectionStateRegister.java new file mode 100755 index 0000000000..4965289c5a --- /dev/null +++ b/activemq-core/src/main/java/org/apache/activemq/broker/MapTransportConnectionStateRegister.java @@ -0,0 +1,125 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +import org.apache.activemq.command.ConnectionId; +import org.apache.activemq.command.ConsumerId; +import org.apache.activemq.command.ProducerId; +import org.apache.activemq.command.SessionId; + +/** + * @version $Revision: 1.8 $ + */ + +public class MapTransportConnectionStateRegister implements TransportConnectionStateRegister{ + + private Map connectionStates = new ConcurrentHashMap(); + + public TransportConnectionState registerConnectionState(ConnectionId connectionId, + TransportConnectionState state) { + TransportConnectionState rc = connectionStates.put(connectionId, state); + return rc; + } + + public TransportConnectionState unregisterConnectionState(ConnectionId connectionId) { + TransportConnectionState rc = connectionStates.remove(connectionId); + return rc; + } + + public List listConnectionStates() { + + List rc = new ArrayList(); + rc.addAll(connectionStates.values()); + return rc; + } + + public TransportConnectionState lookupConnectionState(String connectionId) { + return connectionStates.get(new ConnectionId(connectionId)); + } + + public TransportConnectionState lookupConnectionState(ConsumerId id) { + TransportConnectionState cs = lookupConnectionState(id.getConnectionId()); + if (cs == null) { + throw new IllegalStateException( + "Cannot lookup a consumer from a connection that had not been registered: " + + id.getParentId().getParentId()); + } + return cs; + } + + public TransportConnectionState lookupConnectionState(ProducerId id) { + TransportConnectionState cs = lookupConnectionState(id.getConnectionId()); + if (cs == null) { + throw new IllegalStateException( + "Cannot lookup a producer from a connection that had not been registered: " + + id.getParentId().getParentId()); + } + return cs; + } + + public TransportConnectionState lookupConnectionState(SessionId id) { + TransportConnectionState cs = lookupConnectionState(id.getConnectionId()); + if (cs == null) { + throw new IllegalStateException( + "Cannot lookup a session from a connection that had not been registered: " + + id.getParentId()); + } + return cs; + } + + public TransportConnectionState lookupConnectionState(ConnectionId connectionId) { + TransportConnectionState cs = connectionStates.get(connectionId); + if (cs == null) { + throw new IllegalStateException("Cannot lookup a connection that had not been registered: " + + connectionId); + } + return cs; + } + + + + public boolean doesHandleMultipleConnectionStates() { + return true; + } + + public boolean isEmpty() { + return connectionStates.isEmpty(); + } + + public void clear() { + connectionStates.clear(); + + } + + public void intialize(TransportConnectionStateRegister other) { + connectionStates.clear(); + connectionStates.putAll(other.mapStates()); + + } + + public Map mapStates() { + HashMap map = new HashMap(connectionStates); + return map; + } + +} diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/SingleTransportConnectionStateRegister.java b/activemq-core/src/main/java/org/apache/activemq/broker/SingleTransportConnectionStateRegister.java new file mode 100755 index 0000000000..26ab5a8e8d --- /dev/null +++ b/activemq-core/src/main/java/org/apache/activemq/broker/SingleTransportConnectionStateRegister.java @@ -0,0 +1,155 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; + +import org.apache.activemq.command.ConnectionId; +import org.apache.activemq.command.ConsumerId; +import org.apache.activemq.command.ProducerId; +import org.apache.activemq.command.SessionId; + +/** + * @version $Revision: 1.8 $ + */ + +public class SingleTransportConnectionStateRegister implements TransportConnectionStateRegister{ + + private TransportConnectionState connectionState; + private ConnectionId connectionId; + + public TransportConnectionState registerConnectionState(ConnectionId connectionId, + TransportConnectionState state) { + TransportConnectionState rc = connectionState; + connectionState = state; + this.connectionId = connectionId; + return rc; + } + + public synchronized TransportConnectionState unregisterConnectionState(ConnectionId connectionId) { + TransportConnectionState rc = null; + + + if (connectionId != null && connectionState != null && this.connectionId!=null){ + if (this.connectionId.equals(connectionId)){ + rc = connectionState; + connectionState = null; + connectionId = null; + } + } + return rc; + } + + public synchronized List listConnectionStates() { + List rc = new ArrayList(); + if (connectionState != null) { + rc.add(connectionState); + } + return rc; + } + + public synchronized TransportConnectionState lookupConnectionState(String connectionId) { + TransportConnectionState cs = connectionState; + if (cs == null) { + throw new IllegalStateException( + "Cannot lookup a connectionId for a connection that had not been registered: " + + connectionId); + } + return cs; + } + + public synchronized TransportConnectionState lookupConnectionState(ConsumerId id) { + TransportConnectionState cs = connectionState; + if (cs == null) { + throw new IllegalStateException( + "Cannot lookup a consumer from a connection that had not been registered: " + + id.getParentId().getParentId()); + } + return cs; + } + + public synchronized TransportConnectionState lookupConnectionState(ProducerId id) { + TransportConnectionState cs = connectionState; + if (cs == null) { + throw new IllegalStateException( + "Cannot lookup a producer from a connection that had not been registered: " + + id.getParentId().getParentId()); + } + return cs; + } + + public synchronized TransportConnectionState lookupConnectionState(SessionId id) { + TransportConnectionState cs = connectionState; + if (cs == null) { + throw new IllegalStateException( + "Cannot lookup a session from a connection that had not been registered: " + + id.getParentId()); + } + return cs; + } + + public synchronized TransportConnectionState lookupConnectionState(ConnectionId connectionId) { + TransportConnectionState cs = connectionState; + if (cs == null) { + throw new IllegalStateException("Cannot lookup a connection that had not been registered: " + + connectionId); + } + return cs; + } + + public synchronized boolean doesHandleMultipleConnectionStates() { + return false; + } + + public synchronized boolean isEmpty() { + return connectionState == null; + } + + public void intialize(TransportConnectionStateRegister other) { + + if (other.isEmpty()){ + clear(); + }else{ + Map map = other.mapStates(); + Iterator i = map.entrySet().iterator(); + Map.Entry entry = (Entry) i.next(); + connectionId = entry.getKey(); + connectionState =entry.getValue(); + } + + } + + public Map mapStates() { + Map map = new HashMap(); + if (!isEmpty()) { + map.put(connectionId, connectionState); + } + return map; + } + + public void clear() { + connectionState=null; + connectionId=null; + + } + +} diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java b/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java index 318d1924b8..4a40bb7168 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java @@ -150,45 +150,7 @@ public class TransportConnection implements Service, Connection, Task, CommandVi private AtomicInteger protocolVersion = new AtomicInteger(CommandTypes.PROTOCOL_VERSION); private DemandForwardingBridge duplexBridge; private final TaskRunnerFactory taskRunnerFactory; - private TransportConnectionState connectionState; - - static class TransportConnectionState extends org.apache.activemq.state.ConnectionState { - - private ConnectionContext context; - private TransportConnection connection; - private final Object connectMutex = new Object(); - private AtomicInteger referenceCounter = new AtomicInteger(); - - public TransportConnectionState(ConnectionInfo info, TransportConnection transportConnection) { - super(info); - connection = transportConnection; - } - - public ConnectionContext getContext() { - return context; - } - - public TransportConnection getConnection() { - return connection; - } - - public void setContext(ConnectionContext context) { - this.context = context; - } - - public void setConnection(TransportConnection connection) { - this.connection = connection; - } - - public int incrementReference() { - return referenceCounter.incrementAndGet(); - } - - public int decrementReference() { - return referenceCounter.decrementAndGet(); - } - - } + private TransportConnectionStateRegister connectionStateRegister = new SingleTransportConnectionStateRegister(); /** * @param connector @@ -555,7 +517,7 @@ public class TransportConnection implements Service, Connection, Task, CommandVi SessionState ss = cs.getSessionState(sessionId); if (ss == null) { throw new IllegalStateException( - "Cannot add a consumer to a session that had not been registered: " + broker.getBrokerName() + " Cannot add a consumer to a session that had not been registered: " + sessionId); } // Avoid replaying dup commands @@ -598,6 +560,7 @@ public class TransportConnection implements Service, Connection, Task, CommandVi try { cs.addSession(info); } catch (IllegalStateException e) { + e.printStackTrace(); broker.removeSession(cs.getContext(), info); } } @@ -659,7 +622,7 @@ public class TransportConnection implements Service, Connection, Task, CommandVi // If there are 2 concurrent connections for the same connection id, // then last one in wins, we need to sync here // to figure out the winner. - synchronized (state.connectMutex) { + synchronized (state.getConnectionMutex()) { if (state.getConnection() != this) { LOG.debug("Killing previous stale connection: " + state.getConnection().getRemoteAddress()); state.getConnection().stop(); @@ -1306,90 +1269,44 @@ public class TransportConnection implements Service, Connection, Task, CommandVi return null; } - // ///////////////////////////////////////////////////////////////// - // - // The following methods handle the logical connection state. It is possible - // multiple logical connections multiplexed over a single physical - // connection. - // But have not yet exploited the feature from the clients, so for - // performance - // reasons (to avoid a hash lookup) this class only keeps track of 1 - // logical connection state. - // - // A sub class could override these methods to a full multiple logical - // connection - // support. - // - // ///////////////////////////////////////////////////////////////// - - protected TransportConnectionState registerConnectionState(ConnectionId connectionId, - TransportConnectionState state) { - TransportConnectionState rc = connectionState; - connectionState = state; - return rc; - } - - protected TransportConnectionState unregisterConnectionState(ConnectionId connectionId) { - TransportConnectionState rc = connectionState; - connectionState = null; - return rc; - } - - protected List listConnectionStates() { - List rc = new ArrayList(); - if (connectionState != null) { - rc.add(connectionState); + protected synchronized TransportConnectionState registerConnectionState(ConnectionId connectionId,TransportConnectionState state) { + TransportConnectionState cs = null; + if (!connectionStateRegister.isEmpty() && !connectionStateRegister.doesHandleMultipleConnectionStates()){ + //swap implementations + TransportConnectionStateRegister newRegister = new MapTransportConnectionStateRegister(); + newRegister.intialize(connectionStateRegister); + connectionStateRegister = newRegister; } - return rc; + cs= connectionStateRegister.registerConnectionState(connectionId, state); + return cs; } - protected TransportConnectionState lookupConnectionState(String connectionId) { - TransportConnectionState cs = connectionState; - if (cs == null) { - throw new IllegalStateException( - "Cannot lookup a connectionId for a connection that had not been registered: " - + connectionId); - } - return cs; + protected synchronized TransportConnectionState unregisterConnectionState(ConnectionId connectionId) { + return connectionStateRegister.unregisterConnectionState(connectionId); } - protected TransportConnectionState lookupConnectionState(ConsumerId id) { - TransportConnectionState cs = connectionState; - if (cs == null) { - throw new IllegalStateException( - "Cannot lookup a consumer from a connection that had not been registered: " - + id.getParentId().getParentId()); - } - return cs; + protected synchronized List listConnectionStates() { + return connectionStateRegister.listConnectionStates(); } - protected TransportConnectionState lookupConnectionState(ProducerId id) { - TransportConnectionState cs = connectionState; - if (cs == null) { - throw new IllegalStateException( - "Cannot lookup a producer from a connection that had not been registered: " - + id.getParentId().getParentId()); - } - return cs; + protected synchronized TransportConnectionState lookupConnectionState(String connectionId) { + return connectionStateRegister.lookupConnectionState(connectionId); } - protected TransportConnectionState lookupConnectionState(SessionId id) { - TransportConnectionState cs = connectionState; - if (cs == null) { - throw new IllegalStateException( - "Cannot lookup a session from a connection that had not been registered: " - + id.getParentId()); - } - return cs; + protected synchronized TransportConnectionState lookupConnectionState(ConsumerId id) { + return connectionStateRegister.lookupConnectionState(id); } - protected TransportConnectionState lookupConnectionState(ConnectionId connectionId) { - TransportConnectionState cs = connectionState; - if (cs == null) { - throw new IllegalStateException("Cannot lookup a connection that had not been registered: " - + connectionId); - } - return cs; + protected synchronized TransportConnectionState lookupConnectionState(ProducerId id) { + return connectionStateRegister.lookupConnectionState(id); + } + + protected synchronized TransportConnectionState lookupConnectionState(SessionId id) { + return connectionStateRegister.lookupConnectionState(id); + } + + protected synchronized TransportConnectionState lookupConnectionState(ConnectionId connectionId) { + return connectionStateRegister.lookupConnectionState(connectionId); } } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnectionState.java b/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnectionState.java new file mode 100755 index 0000000000..5f9c8c25f6 --- /dev/null +++ b/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnectionState.java @@ -0,0 +1,74 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker; + +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.activemq.command.ConnectionInfo; + +/** + * @version $Revision: 1.8 $ + */ + +public class TransportConnectionState extends org.apache.activemq.state.ConnectionState { + + private ConnectionContext context; + private TransportConnection connection; + private AtomicInteger referenceCounter = new AtomicInteger(); + private final Object connectionMutex = new Object(); + + public TransportConnectionState(ConnectionInfo info, TransportConnection transportConnection) { + super(info); + connection = transportConnection; + } + + public ConnectionContext getContext() { + return context; + } + + public TransportConnection getConnection() { + return connection; + } + + public void setContext(ConnectionContext context) { + this.context = context; + } + + public void setConnection(TransportConnection connection) { + this.connection = connection; + } + + public int incrementReference() { + return referenceCounter.incrementAndGet(); + } + + public int decrementReference() { + return referenceCounter.decrementAndGet(); + } + + public AtomicInteger getReferenceCounter() { + return referenceCounter; + } + + public void setReferenceCounter(AtomicInteger referenceCounter) { + this.referenceCounter = referenceCounter; + } + + public Object getConnectionMutex() { + return connectionMutex; + } +} diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnectionStateRegister.java b/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnectionStateRegister.java new file mode 100755 index 0000000000..77a80ffae7 --- /dev/null +++ b/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnectionStateRegister.java @@ -0,0 +1,60 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker; + +import java.util.List; +import java.util.Map; + +import org.apache.activemq.command.ConnectionId; +import org.apache.activemq.command.ConsumerId; +import org.apache.activemq.command.ProducerId; +import org.apache.activemq.command.SessionId; + +/** + * @version $Revision: 1.8 $ + */ + +public interface TransportConnectionStateRegister{ + + TransportConnectionState registerConnectionState(ConnectionId connectionId, + TransportConnectionState state); + + TransportConnectionState unregisterConnectionState(ConnectionId connectionId); + + List listConnectionStates(); + + MapmapStates(); + + TransportConnectionState lookupConnectionState(String connectionId); + + TransportConnectionState lookupConnectionState(ConsumerId id); + + TransportConnectionState lookupConnectionState(ProducerId id); + + TransportConnectionState lookupConnectionState(SessionId id); + + TransportConnectionState lookupConnectionState(ConnectionId connectionId); + + boolean isEmpty(); + + boolean doesHandleMultipleConnectionStates(); + + void intialize(TransportConnectionStateRegister other); + + void clear(); + +} diff --git a/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java b/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java index 38db40c46c..14ef4cce2b 100644 --- a/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java +++ b/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java @@ -100,6 +100,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge { protected final ConcurrentHashMap subscriptionMapByRemoteId = new ConcurrentHashMap(); protected final BrokerId localBrokerPath[] = new BrokerId[] {null}; protected CountDownLatch startedLatch = new CountDownLatch(2); + protected CountDownLatch localStartedLatch = new CountDownLatch(1); protected CountDownLatch remoteBrokerNameKnownLatch = new CountDownLatch(1); protected final AtomicBoolean remoteInterupted = new AtomicBoolean(false); protected final AtomicBoolean lastConnectSucceeded = new AtomicBoolean(false); @@ -172,6 +173,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge { localBridgeStarted.set(false); remoteBridgeStarted.set(false); startedLatch = new CountDownLatch(2); + localStartedLatch = new CountDownLatch(1); } } @@ -261,6 +263,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge { LOG.info("Network connection between " + localBroker + " and " + remoteBroker + "(" + remoteBrokerName + ") has been established."); startedLatch.countDown(); + localStartedLatch.countDown(); setupStaticDestinations(); } } @@ -339,6 +342,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge { // stuck waiting for it to start up. startedLatch.countDown(); startedLatch.countDown(); + localStartedLatch.countDown(); ss.throwFirstException(); } } @@ -406,6 +410,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge { localBroker.oneway(command); break; case ConsumerInfo.DATA_STRUCTURE_TYPE: + localStartedLatch.await(); if (!addConsumerInfo((ConsumerInfo)command)) { if (LOG.isDebugEnabled()) { LOG.debug("Ignoring ConsumerInfo: " + command); @@ -430,6 +435,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge { } } } catch (Throwable e) { + e.printStackTrace(); serviceRemoteException(e); } } @@ -554,7 +560,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge { try { if (command.isMessageDispatch()) { enqueueCounter.incrementAndGet(); - waitStarted(); + //localStartedLatch.await(); final MessageDispatch md = (MessageDispatch)command; DemandSubscription sub = subscriptionMapByLocalId.get(md.getConsumerId()); if (sub != null) { @@ -628,7 +634,8 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge { LOG.warn("Unexpected local command: " + command); } } - } catch (Exception e) { + } catch (Throwable e) { + e.printStackTrace(); serviceLocalException(e); } } diff --git a/activemq-core/src/test/java/org/apache/activemq/network/SimpleNetworkTest.java b/activemq-core/src/test/java/org/apache/activemq/network/SimpleNetworkTest.java index 740b93c1c8..9686ec7995 100755 --- a/activemq-core/src/test/java/org/apache/activemq/network/SimpleNetworkTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/network/SimpleNetworkTest.java @@ -90,18 +90,19 @@ public class SimpleNetworkTest extends TestCase { } } - public void xtestFiltering() throws Exception { + public void testFiltering() throws Exception { MessageConsumer includedConsumer = remoteSession.createConsumer(included); MessageConsumer excludedConsumer = remoteSession.createConsumer(excluded); MessageProducer includedProducer = localSession.createProducer(included); MessageProducer excludedProducer = localSession.createProducer(excluded); - Thread.sleep(1000); + // allow for consumer infos to perculate arround + Thread.sleep(2000); Message test = localSession.createTextMessage("test"); includedProducer.send(test); excludedProducer.send(test); - assertNull(excludedConsumer.receive(500)); - assertNotNull(includedConsumer.receive(500)); + assertNull(excludedConsumer.receive(1000)); + assertNotNull(includedConsumer.receive(1000)); } public void xtestConduitBridge() throws Exception {