Trap VirtualTopic names that leak from messages sent across a broker
network.
This commit is contained in:
Timothy Bish 2014-10-20 15:05:08 -04:00
parent 4881a848dc
commit 6885ff0a62
2 changed files with 10 additions and 9 deletions

View File

@ -159,11 +159,12 @@ public class MQTTVirtualTopicSubscriptionStrategy extends AbstractMQTTSubscripti
@Override
public String onSend(ActiveMQDestination destination) {
String amqTopicName = destination.getPhysicalName();
if (amqTopicName.startsWith(VIRTUALTOPIC_PREFIX)) {
amqTopicName = amqTopicName.substring(VIRTUALTOPIC_PREFIX.length());
String destinationName = destination.getPhysicalName();
int position = destinationName.indexOf(VIRTUALTOPIC_PREFIX);
if (position >= 0) {
destinationName = destinationName.substring(position+VIRTUALTOPIC_PREFIX.length()).substring(0);
}
return amqTopicName;
return destinationName;
}
@Override
@ -178,6 +179,7 @@ public class MQTTVirtualTopicSubscriptionStrategy extends AbstractMQTTSubscripti
private void deleteDurableQueues(List<ActiveMQQueue> queues) {
try {
for (ActiveMQQueue queue : queues) {
LOG.debug("Removing subscription for {} ",queue.getPhysicalName());
DestinationInfo removeAction = new DestinationInfo();
removeAction.setConnectionId(protocol.getConnectionId());
removeAction.setDestination(queue);

View File

@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq;
package org.apache.activemq.network;
import java.net.URI;
import java.util.concurrent.CountDownLatch;
@ -29,13 +29,11 @@ import javax.jms.MessageListener;
import javax.jms.Session;
import javax.management.ObjectName;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.broker.jmx.BrokerViewMBean;
import org.apache.activemq.broker.jmx.DurableSubscriptionViewMBean;
import org.apache.activemq.network.DiscoveryNetworkConnector;
import org.apache.activemq.network.NetworkConnector;
import org.apache.activemq.network.NetworkTestSupport;
import org.apache.activemq.store.PersistenceAdapter;
import org.apache.commons.lang.ArrayUtils;
import org.fusesource.hawtdispatch.Dispatch;
@ -124,11 +122,12 @@ public class MQTTNetworkOfBrokersFailoverTest extends NetworkTestSupport {
// now we should see that message on the local broker because the subscription
// should have been properly networked... we'll give a sec of grace for the
// networking and forwarding to have happened properly
org.fusesource.mqtt.client.Message msg = localConn.receive(1, TimeUnit.SECONDS);
org.fusesource.mqtt.client.Message msg = localConn.receive(100, TimeUnit.SECONDS);
assertNotNull(msg);
msg.ack();
String response = new String(msg.getPayload());
assertEquals("Hello, World!", response);
assertEquals("foo/bar", msg.getTopic());
// Now... we SHOULD NOT see a message on the remote broker because we already
// consumed it on the local broker... having the same message on the remote broker