This commit is contained in:
rajdavies 2013-12-10 13:35:39 +00:00
parent fe36820b86
commit 6683eb652f
13 changed files with 208 additions and 47 deletions

View File

@ -188,12 +188,12 @@ public class ProducerView implements ProducerViewMBean {
@Override
public void resetStatistics() {
if (info != null){
info.getSentCount().reset();
info.resetSentCount();
}
}
@Override
public long getSentCount() {
return info != null ? info.getSentCount().getCount() :0;
return info != null ? info.getSentCount() :0;
}
}

View File

@ -102,7 +102,7 @@ public interface ProducerViewMBean {
@MBeanInfo("Resets statistics.")
void resetStatistics();
@MBeanInfo("Messages consumed")
@MBeanInfo("Messages dispatched by Producer")
long getSentCount();
}

View File

@ -421,12 +421,12 @@ public class SubscriptionView implements SubscriptionViewMBean {
@Override
public void resetStatistics() {
if (subscription != null){
subscription.getConsumedCount().reset();
subscription.resetConsumedCount();
}
}
@Override
public long getConsumedCount() {
return subscription != null ? subscription.getConsumedCount().getCount() : 0;
return subscription != null ? subscription.getConsumedCount() : 0;
}
}

View File

@ -392,8 +392,9 @@ public abstract class AbstractRegion implements Region {
}
producerExchange.getRegionDestination().send(producerExchange, messageSend);
if (producerExchange.getProducerState() != null && producerExchange.getProducerState().getInfo() != null){
producerExchange.getProducerState().getInfo().getSentCount().increment();
producerExchange.getProducerState().getInfo().incrementSentCount();
}
}

View File

@ -20,11 +20,11 @@ import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicLong;
import javax.jms.InvalidSelectorException;
import javax.jms.JMSException;
import javax.management.ObjectName;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ActiveMQDestination;
@ -36,7 +36,6 @@ import org.apache.activemq.filter.DestinationFilter;
import org.apache.activemq.filter.LogicExpression;
import org.apache.activemq.filter.MessageEvaluationContext;
import org.apache.activemq.filter.NoLocalExpression;
import org.apache.activemq.management.CountStatisticImpl;
import org.apache.activemq.selector.SelectorParser;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -54,7 +53,7 @@ public abstract class AbstractSubscription implements Subscription {
private int cursorMemoryHighWaterMark = 70;
private boolean slowConsumer;
private long lastAckTime;
private CountStatisticImpl consumedCount = new CountStatisticImpl("consumed","The number of messages consumed");
private AtomicLong consumedCount = new AtomicLong();
public AbstractSubscription(Broker broker,ConnectionContext context, ConsumerInfo info) throws InvalidSelectorException {
this.broker = broker;
@ -90,7 +89,7 @@ public abstract class AbstractSubscription implements Subscription {
@Override
public synchronized void acknowledge(final ConnectionContext context, final MessageAck ack) throws Exception {
this.lastAckTime = System.currentTimeMillis();
this.consumedCount.increment();
this.consumedCount.incrementAndGet();
}
@Override
@ -280,7 +279,15 @@ public abstract class AbstractSubscription implements Subscription {
this.lastAckTime = value;
}
public CountStatisticImpl getConsumedCount(){
return consumedCount;
public long getConsumedCount(){
return consumedCount.get();
}
public void incrementConsumedCount(){
consumedCount.incrementAndGet();
}
public void resetConsumedCount(){
consumedCount.set(0);
}
}

View File

@ -21,7 +21,6 @@ import java.util.List;
import javax.jms.InvalidSelectorException;
import javax.management.ObjectName;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ConsumerInfo;
@ -30,7 +29,6 @@ import org.apache.activemq.command.MessageDispatchNotification;
import org.apache.activemq.command.MessagePull;
import org.apache.activemq.command.Response;
import org.apache.activemq.filter.MessageEvaluationContext;
import org.apache.activemq.management.CountStatisticImpl;
/**
*
@ -48,7 +46,6 @@ public interface Subscription extends SubscriptionRecovery {
/**
* Used when client acknowledge receipt of dispatched message.
* @param node
* @throws IOException
* @throws Exception
*/
@ -70,7 +67,7 @@ public interface Subscription extends SubscriptionRecovery {
/**
* Is the subscription interested in messages in the destination?
* @param context
* @param destination
* @return
*/
boolean matches(ActiveMQDestination destination);
@ -93,7 +90,6 @@ public interface Subscription extends SubscriptionRecovery {
/**
* The ConsumerInfo object that created the subscription.
* @param destination
*/
ConsumerInfo getConsumerInfo();
@ -200,7 +196,7 @@ public interface Subscription extends SubscriptionRecovery {
/**
* Informs the Broker if the subscription needs to intervention to recover it's state
* e.g. DurableTopicSubscriber may do
* @see org.apache.activemq.region.cursors.PendingMessageCursor
* @see org.apache.activemq.broker.region.cursors.PendingMessageCursor
* @return true if recovery required
*/
boolean isRecoveryRequired();
@ -235,6 +231,10 @@ public interface Subscription extends SubscriptionRecovery {
*/
long getTimeOfLastMessageAck();
CountStatisticImpl getConsumedCount();
long getConsumedCount();
void incrementConsumedCount();
void resetConsumedCount();
}

View File

@ -16,7 +16,8 @@
*/
package org.apache.activemq.command;
import org.apache.activemq.management.CountStatisticImpl;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.activemq.state.CommandVisitor;
/**
@ -33,7 +34,7 @@ public class ProducerInfo extends BaseCommand {
protected BrokerId[] brokerPath;
protected boolean dispatchAsync;
protected int windowSize;
protected CountStatisticImpl sentCount = new CountStatisticImpl("sentCount","number of messages sent to a broker");
protected AtomicLong sentCount = new AtomicLong();
public ProducerInfo() {
}
@ -137,8 +138,16 @@ public class ProducerInfo extends BaseCommand {
this.windowSize = windowSize;
}
public CountStatisticImpl getSentCount(){
return sentCount;
public long getSentCount(){
return sentCount.get();
}
public void incrementSentCount(){
sentCount.incrementAndGet();
}
public void resetSentCount(){
sentCount.set(0);
}
}

View File

@ -17,7 +17,6 @@
package org.apache.activemq.transport.mqtt;
import java.io.IOException;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@ -28,8 +27,6 @@ import java.util.zip.Inflater;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import org.apache.activemq.broker.BrokerContext;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.command.*;
import org.apache.activemq.store.PersistenceAdapterSupport;
@ -44,21 +41,7 @@ import org.fusesource.hawtbuf.Buffer;
import org.fusesource.hawtbuf.UTF8Buffer;
import org.fusesource.mqtt.client.QoS;
import org.fusesource.mqtt.client.Topic;
import org.fusesource.mqtt.codec.CONNACK;
import org.fusesource.mqtt.codec.CONNECT;
import org.fusesource.mqtt.codec.DISCONNECT;
import org.fusesource.mqtt.codec.MQTTFrame;
import org.fusesource.mqtt.codec.PINGREQ;
import org.fusesource.mqtt.codec.PINGRESP;
import org.fusesource.mqtt.codec.PUBACK;
import org.fusesource.mqtt.codec.PUBCOMP;
import org.fusesource.mqtt.codec.PUBLISH;
import org.fusesource.mqtt.codec.PUBREC;
import org.fusesource.mqtt.codec.PUBREL;
import org.fusesource.mqtt.codec.SUBACK;
import org.fusesource.mqtt.codec.SUBSCRIBE;
import org.fusesource.mqtt.codec.UNSUBACK;
import org.fusesource.mqtt.codec.UNSUBSCRIBE;
import org.fusesource.mqtt.codec.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -96,10 +79,12 @@ public class MQTTProtocolConverter {
private long defaultKeepAlive;
private int activeMQSubscriptionPrefetch=1;
private final String QOS_PROPERTY_NAME = "QoSPropertyName";
private final MQTTRetainedMessages retainedMessages;
public MQTTProtocolConverter(MQTTTransport mqttTransport, BrokerService brokerService) {
this.mqttTransport = mqttTransport;
this.brokerService = brokerService;
this.retainedMessages = MQTTRetainedMessages.getMQTTRetainedMessages(brokerService);
this.defaultKeepAlive = 0;
}
@ -319,6 +304,23 @@ public class MQTTProtocolConverter {
} else {
LOG.warn("No topics defined for Subscription " + command);
}
//check retained messages
if (topics != null){
for (Topic topic:topics){
Buffer buffer = retainedMessages.getMessage(topic.name().toString());
if (buffer != null){
PUBLISH msg = new PUBLISH();
msg.payload(buffer);
msg.topicName(topic.name());
try {
getMQTTTransport().sendToMQTT(msg.encode());
} catch (IOException e) {
LOG.warn("Couldn't send retained message " + msg, e);
}
}
}
}
}
QoS onSubscribe(Topic topic) throws MQTTProtocolException {
@ -415,6 +417,9 @@ public class MQTTProtocolConverter {
void onMQTTPublish(PUBLISH command) throws IOException, JMSException {
checkConnected();
if (command.retain()){
retainedMessages.addMessage(command.topicName().toString(),command.payload());
}
ActiveMQMessage message = convertMessage(command);
message.setProducerId(producerId);
message.onSend();

View File

@ -0,0 +1,80 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.mqtt;
import org.apache.activemq.Service;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.util.LRUCache;
import org.apache.activemq.util.ServiceStopper;
import org.apache.activemq.util.ServiceSupport;
import org.fusesource.hawtbuf.Buffer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class MQTTRetainedMessages extends ServiceSupport {
private static final Logger LOG = LoggerFactory.getLogger(MQTTRetainedMessages.class);
private static final Object LOCK = new Object();
private LRUCache<String,Buffer> cache = new LRUCache<String, Buffer>(10000);
private MQTTRetainedMessages(){
}
@Override
protected void doStop(ServiceStopper stopper) throws Exception {
cache.clear();
}
@Override
protected void doStart() throws Exception {
}
public void addMessage(String destination,Buffer payload){
cache.put(destination,payload);
}
public Buffer getMessage(String destination){
return cache.get(destination);
}
public static MQTTRetainedMessages getMQTTRetainedMessages(BrokerService broker){
MQTTRetainedMessages result = null;
if (broker != null){
synchronized (LOCK){
Service[] services = broker.getServices();
if (services != null){
for (Service service:services){
if (service instanceof MQTTRetainedMessages){
return (MQTTRetainedMessages) service;
}
}
}
result = new MQTTRetainedMessages();
broker.addService(result);
if (broker != null && broker.isStarted()){
try {
result.start();
} catch (Exception e) {
LOG.warn("Couldn't start MQTTRetainedMessages");
}
}
}
}
return result;
}
}

View File

@ -48,7 +48,12 @@ class FuseMQQTTClientProvider implements MQTTClientProvider {
@Override
public void publish(String topic, byte[] payload, int qos) throws Exception {
connection.publish(topic,payload, QoS.values()[qos],false);
publish(topic,payload,qos,false);
}
@Override
public void publish(String topic, byte[] payload, int qos, boolean retained) throws Exception {
connection.publish(topic,payload, QoS.values()[qos],retained);
}
@Override

View File

@ -19,6 +19,7 @@ package org.apache.activemq.transport.mqtt;
public interface MQTTClientProvider {
void connect(String host) throws Exception;
void disconnect() throws Exception;
public void publish(String topic, byte[] payload, int qos, boolean retained) throws Exception;
void publish(String topic,byte[] payload,int qos) throws Exception;
void subscribe(String topic,int qos) throws Exception;
void unsubscribe(String topic) throws Exception;

View File

@ -16,9 +16,18 @@
*/
package org.apache.activemq.transport.mqtt;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.TransportConnector;
@ -35,9 +44,6 @@ import org.fusesource.mqtt.codec.MQTTFrame;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.jms.*;
import static org.junit.Assert.assertArrayEquals;
public class MQTTTest extends AbstractMQTTTest {
@ -280,6 +286,45 @@ public class MQTTTest extends AbstractMQTTTest {
publisher.disconnect();
}
@Test(timeout=60 * 1000)
public void testSendAndReceiveRetainedMessages() throws Exception {
addMQTTConnector();
brokerService.start();
final MQTTClientProvider publisher = getMQTTClientProvider();
initializeConnection(publisher);
final MQTTClientProvider subscriber = getMQTTClientProvider();
initializeConnection(subscriber);
String RETAINED = "retained";
publisher.publish("foo",RETAINED.getBytes(),AT_LEAST_ONCE,true);
List<String> messages = new ArrayList<String>();
for (int i = 0; i < 10; i++){
messages.add("TEST MESSAGE:" + i);
}
subscriber.subscribe("foo",AT_LEAST_ONCE);
for (int i = 0; i < 10; i++) {
publisher.publish("foo", messages.get(i).getBytes(), AT_LEAST_ONCE);
}
byte[] msg = subscriber.receive(5000);
assertNotNull(msg);
assertEquals(RETAINED,new String(msg));
for (int i =0; i < 10; i++){
msg = subscriber.receive(5000);
assertNotNull(msg);
assertEquals(messages.get(i),new String(msg));
}
subscriber.disconnect();
publisher.disconnect();
}
@Test(timeout=60 * 1000)
public void testSendMQTTReceiveJMS() throws Exception {
addMQTTConnector();

View File

@ -342,8 +342,16 @@ public class QueueDuplicatesFromStoreTest extends TestCase {
}
@Override
public CountStatisticImpl getConsumedCount() {
return null;
public long getConsumedCount() {
return 0;
}
public void incrementConsumedCount(){
}
public void resetConsumedCount(){
}
};