mirror of https://github.com/apache/activemq.git
Got the stomp SUBSCRIBE test case working.
git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@358349 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
bd60590273
commit
f71b60f16b
|
@ -3,17 +3,14 @@
|
||||||
*/
|
*/
|
||||||
package org.activemq.transport.stomp;
|
package org.activemq.transport.stomp;
|
||||||
|
|
||||||
import org.activemq.command.ActiveMQDestination;
|
|
||||||
import org.activemq.command.ActiveMQMessage;
|
|
||||||
import org.activemq.command.MessageAck;
|
|
||||||
import org.activemq.command.TransactionId;
|
|
||||||
|
|
||||||
import java.io.DataInput;
|
import java.io.DataInput;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.net.ProtocolException;
|
import java.net.ProtocolException;
|
||||||
import java.util.List;
|
|
||||||
import java.util.Properties;
|
import java.util.Properties;
|
||||||
|
|
||||||
|
import org.activemq.command.MessageAck;
|
||||||
|
import org.activemq.command.TransactionId;
|
||||||
|
|
||||||
class Ack implements StompCommand {
|
class Ack implements StompCommand {
|
||||||
private final StompWireFormat format;
|
private final StompWireFormat format;
|
||||||
private static final HeaderParser parser = new HeaderParser();
|
private static final HeaderParser parser = new HeaderParser();
|
||||||
|
@ -28,41 +25,22 @@ class Ack implements StompCommand {
|
||||||
if (message_id == null)
|
if (message_id == null)
|
||||||
throw new ProtocolException("ACK received without a message-id to acknowledge!");
|
throw new ProtocolException("ACK received without a message-id to acknowledge!");
|
||||||
|
|
||||||
List listeners = format.getAckListeners();
|
Subscription sub = (Subscription) format.getDispachedMap().get(message_id);
|
||||||
for (int i = 0; i < listeners.size(); i++) {
|
if( sub ==null )
|
||||||
AckListener listener = (AckListener) listeners.get(i);
|
throw new ProtocolException("Unexpected ACK received for message-id [" + message_id + "]");
|
||||||
if (listener.handle(message_id)) {
|
|
||||||
listeners.remove(i);
|
MessageAck ack = sub.createMessageAck(message_id);
|
||||||
ActiveMQMessage msg = listener.getMessage();
|
|
||||||
MessageAck ack = new MessageAck();
|
if (headers.containsKey(Stomp.Headers.TRANSACTION)) {
|
||||||
ack.setDestination((ActiveMQDestination) msg.getJMSDestination());
|
TransactionId tx_id = format.getTransactionId(headers.getProperty(Stomp.Headers.TRANSACTION));
|
||||||
ack.setConsumerId(listener.getConsumerId());
|
if (tx_id == null)
|
||||||
ack.setMessageID(msg.getMessageId());
|
throw new ProtocolException(headers.getProperty(Stomp.Headers.TRANSACTION) + " is an invalid transaction id");
|
||||||
ack.setAckType(MessageAck.STANDARD_ACK_TYPE);
|
ack.setTransactionId(tx_id);
|
||||||
|
|
||||||
/*
|
|
||||||
* ack.setMessageRead(true);
|
|
||||||
* ack.setProducerKey(msg.getProducerKey());
|
|
||||||
* ack.setSequenceNumber(msg.getSequenceNumber());
|
|
||||||
* ack.setPersistent(msg.getJMSDeliveryMode() ==
|
|
||||||
* DeliveryMode.PERSISTENT);
|
|
||||||
* ack.setSessionId(format.getSessionId());
|
|
||||||
*/
|
|
||||||
|
|
||||||
if (headers.containsKey(Stomp.Headers.TRANSACTION)) {
|
|
||||||
TransactionId tx_id = format.getTransactionId(headers.getProperty(Stomp.Headers.TRANSACTION));
|
|
||||||
if (tx_id == null)
|
|
||||||
throw new ProtocolException(headers.getProperty(Stomp.Headers.TRANSACTION) + " is an invalid transaction id");
|
|
||||||
ack.setTransactionId(tx_id);
|
|
||||||
}
|
|
||||||
|
|
||||||
while ((in.readByte()) != 0) {
|
|
||||||
}
|
|
||||||
return new CommandEnvelope(ack, headers);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
while ((in.readByte()) != 0) {
|
while ((in.readByte()) != 0) {
|
||||||
}
|
}
|
||||||
throw new ProtocolException("Unexepected ACK received for message-id [" + message_id + "]");
|
|
||||||
|
return new CommandEnvelope(ack, headers);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,35 +0,0 @@
|
||||||
/*
|
|
||||||
* Copyright (c) 2005 Your Corporation. All Rights Reserved.
|
|
||||||
*/
|
|
||||||
package org.activemq.transport.stomp;
|
|
||||||
|
|
||||||
import org.activemq.command.ActiveMQMessage;
|
|
||||||
import org.activemq.command.ConsumerId;
|
|
||||||
|
|
||||||
class AckListener {
|
|
||||||
private final ActiveMQMessage msg;
|
|
||||||
private final ConsumerId consumerId;
|
|
||||||
private final String subscriptionId;
|
|
||||||
|
|
||||||
public AckListener(ActiveMQMessage msg, ConsumerId consumerId, String subscriptionId) {
|
|
||||||
this.msg = msg;
|
|
||||||
this.consumerId = consumerId;
|
|
||||||
this.subscriptionId = subscriptionId;
|
|
||||||
}
|
|
||||||
|
|
||||||
boolean handle(String messageId) {
|
|
||||||
return msg.getJMSMessageID().equals(messageId);
|
|
||||||
}
|
|
||||||
|
|
||||||
public ActiveMQMessage getMessage() {
|
|
||||||
return msg;
|
|
||||||
}
|
|
||||||
|
|
||||||
public ConsumerId getConsumerId() {
|
|
||||||
return consumerId;
|
|
||||||
}
|
|
||||||
|
|
||||||
public String getSubscriptionId() {
|
|
||||||
return subscriptionId;
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -75,6 +75,7 @@ public interface Stomp {
|
||||||
|
|
||||||
public interface Unsubscribe {
|
public interface Unsubscribe {
|
||||||
String DESTINATION = "destination";
|
String DESTINATION = "destination";
|
||||||
|
String ID = "id";
|
||||||
}
|
}
|
||||||
|
|
||||||
public interface Connect {
|
public interface Connect {
|
||||||
|
|
|
@ -11,6 +11,7 @@ import java.io.IOException;
|
||||||
import java.net.ProtocolException;
|
import java.net.ProtocolException;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
import java.util.Set;
|
||||||
|
|
||||||
import javax.jms.JMSException;
|
import javax.jms.JMSException;
|
||||||
|
|
||||||
|
@ -19,9 +20,7 @@ import org.activeio.Packet;
|
||||||
import org.activeio.adapter.PacketInputStream;
|
import org.activeio.adapter.PacketInputStream;
|
||||||
import org.activeio.command.WireFormat;
|
import org.activeio.command.WireFormat;
|
||||||
import org.activeio.packet.ByteArrayPacket;
|
import org.activeio.packet.ByteArrayPacket;
|
||||||
import org.activemq.command.ActiveMQBytesMessage;
|
|
||||||
import org.activemq.command.ActiveMQDestination;
|
import org.activemq.command.ActiveMQDestination;
|
||||||
import org.activemq.command.ActiveMQTextMessage;
|
|
||||||
import org.activemq.command.Command;
|
import org.activemq.command.Command;
|
||||||
import org.activemq.command.CommandTypes;
|
import org.activemq.command.CommandTypes;
|
||||||
import org.activemq.command.ConnectionId;
|
import org.activemq.command.ConnectionId;
|
||||||
|
@ -29,11 +28,14 @@ import org.activemq.command.ConnectionInfo;
|
||||||
import org.activemq.command.ConsumerId;
|
import org.activemq.command.ConsumerId;
|
||||||
import org.activemq.command.FlushCommand;
|
import org.activemq.command.FlushCommand;
|
||||||
import org.activemq.command.LocalTransactionId;
|
import org.activemq.command.LocalTransactionId;
|
||||||
|
import org.activemq.command.Message;
|
||||||
|
import org.activemq.command.MessageDispatch;
|
||||||
import org.activemq.command.MessageId;
|
import org.activemq.command.MessageId;
|
||||||
import org.activemq.command.ProducerId;
|
import org.activemq.command.ProducerId;
|
||||||
import org.activemq.command.Response;
|
import org.activemq.command.Response;
|
||||||
import org.activemq.command.SessionId;
|
import org.activemq.command.SessionId;
|
||||||
import org.activemq.command.TransactionId;
|
import org.activemq.command.TransactionId;
|
||||||
|
import org.activemq.filter.DestinationMap;
|
||||||
import org.activemq.util.IOExceptionSupport;
|
import org.activemq.util.IOExceptionSupport;
|
||||||
import org.activemq.util.IdGenerator;
|
import org.activemq.util.IdGenerator;
|
||||||
import org.activemq.util.LongSequenceGenerator;
|
import org.activemq.util.LongSequenceGenerator;
|
||||||
|
@ -53,15 +55,17 @@ public class StompWireFormat implements WireFormat {
|
||||||
private static int transactionIdCounter;
|
private static int transactionIdCounter;
|
||||||
|
|
||||||
private int version = 1;
|
private int version = 1;
|
||||||
private CommandParser commandParser = new CommandParser(this);
|
private final CommandParser commandParser = new CommandParser(this);
|
||||||
private HeaderParser headerParser = new HeaderParser();
|
private final HeaderParser headerParser = new HeaderParser();
|
||||||
|
|
||||||
private BlockingQueue pendingReadCommands = new LinkedBlockingQueue();
|
private final BlockingQueue pendingReadCommands = new LinkedBlockingQueue();
|
||||||
private BlockingQueue pendingWriteFrames = new LinkedBlockingQueue();
|
private final BlockingQueue pendingWriteFrames = new LinkedBlockingQueue();
|
||||||
private List receiptListeners = new CopyOnWriteArrayList();
|
private final List receiptListeners = new CopyOnWriteArrayList();
|
||||||
private Map subscriptions = new ConcurrentHashMap();
|
private final Map subscriptionsByConsumerId = new ConcurrentHashMap();
|
||||||
private List ackListeners = new CopyOnWriteArrayList();
|
private final Map subscriptionsByName = new ConcurrentHashMap();
|
||||||
|
private final DestinationMap subscriptionsByDestination = new DestinationMap();
|
||||||
private final Map transactions = new ConcurrentHashMap();
|
private final Map transactions = new ConcurrentHashMap();
|
||||||
|
private final Map dispachedMap = new ConcurrentHashMap();
|
||||||
private short lastCommandId;
|
private short lastCommandId;
|
||||||
|
|
||||||
private final ConnectionId connectionId = new ConnectionId(connectionIdGenerator.generateId());
|
private final ConnectionId connectionId = new ConnectionId(connectionIdGenerator.generateId());
|
||||||
|
@ -119,18 +123,11 @@ public class StompWireFormat implements WireFormat {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
if( packet.isMessageDispatch() ) {
|
||||||
if (packet.getDataStructureType() == CommandTypes.ACTIVEMQ_TEXT_MESSAGE) {
|
MessageDispatch md = (MessageDispatch)packet;
|
||||||
assert (packet instanceof ActiveMQTextMessage);
|
Message message = md.getMessage();
|
||||||
ActiveMQTextMessage msg = (ActiveMQTextMessage) packet;
|
Subscription sub = (Subscription) subscriptionsByConsumerId.get(md.getConsumerId());
|
||||||
Subscription sub = (Subscription) subscriptions.get(msg.getJMSDestination());
|
sub.receive(md, out);
|
||||||
sub.receive(msg, out);
|
|
||||||
}
|
|
||||||
else if (packet.getDataStructureType() == CommandTypes.ACTIVEMQ_BYTES_MESSAGE) {
|
|
||||||
assert (packet instanceof ActiveMQBytesMessage);
|
|
||||||
ActiveMQBytesMessage msg = (ActiveMQBytesMessage) packet;
|
|
||||||
Subscription sub = (Subscription) subscriptions.get(msg.getJMSDestination());
|
|
||||||
sub.receive(msg, out);
|
|
||||||
}
|
}
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
@ -184,17 +181,35 @@ public class StompWireFormat implements WireFormat {
|
||||||
public ProducerId getProducerId() {
|
public ProducerId getProducerId() {
|
||||||
return producerId;
|
return producerId;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
public Subscription getSubcription(ConsumerId consumerId) {
|
||||||
|
return (Subscription) subscriptionsByConsumerId.get(consumerId);
|
||||||
|
}
|
||||||
|
public Set getSubcriptions(ActiveMQDestination destination) {
|
||||||
|
return subscriptionsByDestination.get(destination);
|
||||||
|
}
|
||||||
|
public Subscription getSubcription(String name) {
|
||||||
|
return (Subscription) subscriptionsByName.get(name);
|
||||||
|
}
|
||||||
|
|
||||||
public void addSubscription(Subscription s) {
|
public void addSubscription(Subscription s) {
|
||||||
if (subscriptions.containsKey(s.getDestination())) {
|
if (s.getSubscriptionId()!=null && subscriptionsByName.containsKey(s.getSubscriptionId())) {
|
||||||
Subscription old = (Subscription) subscriptions.get(s.getDestination());
|
Subscription old = (Subscription) subscriptionsByName.get(s.getSubscriptionId());
|
||||||
Command p = old.close();
|
removeSubscription(old);
|
||||||
enqueueCommand(p);
|
enqueueCommand(old.close());
|
||||||
subscriptions.put(s.getDestination(), s);
|
|
||||||
}
|
|
||||||
else {
|
|
||||||
subscriptions.put(s.getDestination(), s);
|
|
||||||
}
|
}
|
||||||
|
if( s.getSubscriptionId()!=null )
|
||||||
|
subscriptionsByName.put(s.getSubscriptionId(), s);
|
||||||
|
subscriptionsByConsumerId.put(s.getConsumerInfo().getConsumerId(), s);
|
||||||
|
subscriptionsByDestination.put(s.getConsumerInfo().getDestination(), s);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void removeSubscription(Subscription s) {
|
||||||
|
if( s.getSubscriptionId()!=null )
|
||||||
|
subscriptionsByName.remove(s.getSubscriptionId());
|
||||||
|
subscriptionsByConsumerId.remove(s.getConsumerInfo().getConsumerId());
|
||||||
|
subscriptionsByDestination.remove(s.getConsumerInfo().getDestination(), s);
|
||||||
}
|
}
|
||||||
|
|
||||||
public void enqueueCommand(final Command ack) {
|
public void enqueueCommand(final Command ack) {
|
||||||
|
@ -205,18 +220,6 @@ public class StompWireFormat implements WireFormat {
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
public Subscription getSubscriptionFor(ActiveMQDestination destination) {
|
|
||||||
return (Subscription) subscriptions.get(destination);
|
|
||||||
}
|
|
||||||
|
|
||||||
public void addAckListener(AckListener listener) {
|
|
||||||
this.ackListeners.add(listener);
|
|
||||||
}
|
|
||||||
|
|
||||||
public List getAckListeners() {
|
|
||||||
return ackListeners;
|
|
||||||
}
|
|
||||||
|
|
||||||
public TransactionId getTransactionId(String key) {
|
public TransactionId getTransactionId(String key) {
|
||||||
return (TransactionId) transactions.get(key);
|
return (TransactionId) transactions.get(key);
|
||||||
}
|
}
|
||||||
|
@ -293,4 +296,8 @@ public class StompWireFormat implements WireFormat {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public Map getDispachedMap() {
|
||||||
|
return dispachedMap;
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -3,14 +3,14 @@
|
||||||
*/
|
*/
|
||||||
package org.activemq.transport.stomp;
|
package org.activemq.transport.stomp;
|
||||||
|
|
||||||
import org.activemq.command.ActiveMQDestination;
|
|
||||||
import org.activemq.command.ConsumerId;
|
|
||||||
import org.activemq.command.ConsumerInfo;
|
|
||||||
|
|
||||||
import java.io.DataInput;
|
import java.io.DataInput;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.Properties;
|
import java.util.Properties;
|
||||||
|
|
||||||
|
import org.activemq.command.ActiveMQDestination;
|
||||||
|
import org.activemq.command.ConsumerInfo;
|
||||||
|
import org.activemq.util.IntrospectionSupport;
|
||||||
|
|
||||||
class Subscribe implements StompCommand {
|
class Subscribe implements StompCommand {
|
||||||
private HeaderParser headerParser = new HeaderParser();
|
private HeaderParser headerParser = new HeaderParser();
|
||||||
private StompWireFormat format;
|
private StompWireFormat format;
|
||||||
|
@ -20,19 +20,24 @@ class Subscribe implements StompCommand {
|
||||||
}
|
}
|
||||||
|
|
||||||
public CommandEnvelope build(String commandLine, DataInput in) throws IOException {
|
public CommandEnvelope build(String commandLine, DataInput in) throws IOException {
|
||||||
ConsumerInfo ci = new ConsumerInfo();
|
|
||||||
Properties headers = headerParser.parse(in);
|
Properties headers = headerParser.parse(in);
|
||||||
|
|
||||||
|
String subscriptionId = headers.getProperty(Stomp.Headers.Subscribe.ID);
|
||||||
String destination = headers.getProperty(Stomp.Headers.Subscribe.DESTINATION);
|
String destination = headers.getProperty(Stomp.Headers.Subscribe.DESTINATION);
|
||||||
|
|
||||||
ActiveMQDestination actual_dest = DestinationNamer.convert(destination);
|
ActiveMQDestination actual_dest = DestinationNamer.convert(destination);
|
||||||
|
ConsumerInfo ci = new ConsumerInfo(format.createConsumerId());
|
||||||
|
ci.setPrefetchSize(1000);
|
||||||
|
ci.setDispatchAsync(true);
|
||||||
|
|
||||||
|
IntrospectionSupport.setProperties(ci, headers, "activemq:");
|
||||||
|
|
||||||
ci.setDestination(DestinationNamer.convert(destination));
|
ci.setDestination(DestinationNamer.convert(destination));
|
||||||
ConsumerId consumerId = format.createConsumerId();
|
|
||||||
ci.setConsumerId(consumerId);
|
|
||||||
ci.setResponseRequired(true);
|
|
||||||
// ci.setSessionId(format.getSessionId());
|
|
||||||
while (in.readByte() != 0) {
|
while (in.readByte() != 0) {
|
||||||
}
|
}
|
||||||
String subscriptionId = headers.getProperty(Stomp.Headers.Subscribe.ID, Subscription.NO_ID);
|
|
||||||
Subscription s = new Subscription(format, consumerId, subscriptionId);
|
Subscription s = new Subscription(format, subscriptionId, ci);
|
||||||
s.setDestination(actual_dest);
|
s.setDestination(actual_dest);
|
||||||
String ack_mode_key = headers.getProperty(Stomp.Headers.Subscribe.ACK_MODE);
|
String ack_mode_key = headers.getProperty(Stomp.Headers.Subscribe.ACK_MODE);
|
||||||
if (ack_mode_key != null && ack_mode_key.equals(Stomp.Headers.Subscribe.AckModeValues.CLIENT)) {
|
if (ack_mode_key != null && ack_mode_key.equals(Stomp.Headers.Subscribe.AckModeValues.CLIENT)) {
|
||||||
|
|
|
@ -3,80 +3,79 @@
|
||||||
*/
|
*/
|
||||||
package org.activemq.transport.stomp;
|
package org.activemq.transport.stomp;
|
||||||
|
|
||||||
import org.activemq.command.ActiveMQBytesMessage;
|
import java.io.DataOutput;
|
||||||
import org.activemq.command.ActiveMQDestination;
|
import java.io.IOException;
|
||||||
import org.activemq.command.ActiveMQTextMessage;
|
import java.util.Iterator;
|
||||||
import org.activemq.command.ConsumerId;
|
import java.util.LinkedList;
|
||||||
import org.activemq.command.MessageAck;
|
|
||||||
import org.activemq.command.RemoveInfo;
|
|
||||||
|
|
||||||
import javax.jms.JMSException;
|
import javax.jms.JMSException;
|
||||||
|
|
||||||
import java.io.DataOutput;
|
import org.activemq.command.ActiveMQBytesMessage;
|
||||||
import java.io.IOException;
|
import org.activemq.command.ActiveMQDestination;
|
||||||
|
import org.activemq.command.ActiveMQMessage;
|
||||||
|
import org.activemq.command.ActiveMQTextMessage;
|
||||||
|
import org.activemq.command.ConsumerInfo;
|
||||||
|
import org.activemq.command.MessageAck;
|
||||||
|
import org.activemq.command.MessageDispatch;
|
||||||
|
import org.activemq.command.RemoveInfo;
|
||||||
|
|
||||||
public class Subscription {
|
public class Subscription {
|
||||||
|
|
||||||
private ActiveMQDestination destination;
|
private ActiveMQDestination destination;
|
||||||
private int ackMode = 1;
|
private int ackMode = 1;
|
||||||
private StompWireFormat format;
|
private StompWireFormat format;
|
||||||
private final ConsumerId consumerId;
|
|
||||||
private final String subscriptionId;
|
private final String subscriptionId;
|
||||||
public static final String NO_ID = "~~ NO SUCH THING ~~%%@#!Q";
|
public static final String NO_ID = "~~ NO SUCH THING ~~%%@#!Q";
|
||||||
|
private final ConsumerInfo consumerInfo;
|
||||||
public Subscription(StompWireFormat format, ConsumerId consumerId, String subscriptionId) {
|
private final LinkedList dispatchedMessages = new LinkedList();
|
||||||
|
|
||||||
|
public Subscription(StompWireFormat format, String subscriptionId, ConsumerInfo consumerInfo) {
|
||||||
this.format = format;
|
this.format = format;
|
||||||
this.consumerId = consumerId;
|
|
||||||
this.subscriptionId = subscriptionId;
|
this.subscriptionId = subscriptionId;
|
||||||
|
this.consumerInfo = consumerInfo;
|
||||||
}
|
}
|
||||||
|
|
||||||
void setDestination(ActiveMQDestination actual_dest) {
|
void setDestination(ActiveMQDestination actual_dest) {
|
||||||
this.destination = actual_dest;
|
this.destination = actual_dest;
|
||||||
}
|
}
|
||||||
|
|
||||||
void receive(ActiveMQTextMessage msg, DataOutput out) throws IOException, JMSException {
|
void receive(MessageDispatch md, DataOutput out) throws IOException, JMSException {
|
||||||
|
|
||||||
|
ActiveMQMessage m = (ActiveMQMessage) md.getMessage();
|
||||||
|
|
||||||
if (ackMode == CLIENT_ACK) {
|
if (ackMode == CLIENT_ACK) {
|
||||||
AckListener listener = new AckListener(msg, consumerId, subscriptionId);
|
Subscription sub = format.getSubcription(md.getConsumerId());
|
||||||
format.addAckListener(listener);
|
sub.addMessageDispatch(md);
|
||||||
|
format.getDispachedMap().put(m.getJMSMessageID(), sub);
|
||||||
}
|
}
|
||||||
else if (ackMode == AUTO_ACK) {
|
else if (ackMode == AUTO_ACK) {
|
||||||
MessageAck ack = new MessageAck();
|
MessageAck ack = new MessageAck(md, MessageAck.STANDARD_ACK_TYPE, 1);
|
||||||
// if (format.isInTransaction())
|
|
||||||
// ack.setTransactionIDString(format.getTransactionId());
|
|
||||||
ack.setDestination(msg.getDestination());
|
|
||||||
ack.setConsumerId(consumerId);
|
|
||||||
ack.setMessageID(msg.getMessageId());
|
|
||||||
ack.setAckType(MessageAck.STANDARD_ACK_TYPE);
|
|
||||||
format.enqueueCommand(ack);
|
format.enqueueCommand(ack);
|
||||||
}
|
}
|
||||||
FrameBuilder builder = new FrameBuilder(Stomp.Responses.MESSAGE).addHeaders(msg).setBody(msg.getText().getBytes());
|
|
||||||
if (!subscriptionId.equals(NO_ID)) {
|
|
||||||
|
FrameBuilder builder = new FrameBuilder(Stomp.Responses.MESSAGE);
|
||||||
|
builder.addHeaders(m);
|
||||||
|
|
||||||
|
if( m.getDataStructureType() == ActiveMQTextMessage.DATA_STRUCTURE_TYPE ) {
|
||||||
|
builder.setBody(((ActiveMQTextMessage)m).getText().getBytes("UTF-8"));
|
||||||
|
} else if( m.getDataStructureType() == ActiveMQBytesMessage.DATA_STRUCTURE_TYPE ) {
|
||||||
|
ActiveMQBytesMessage msg = (ActiveMQBytesMessage)m;
|
||||||
|
byte data[] = new byte[(int) msg.getBodyLength()];
|
||||||
|
msg.readBytes(data);
|
||||||
|
builder.setBody(data);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (subscriptionId!=null) {
|
||||||
builder.addHeader(Stomp.Headers.Message.SUBSCRIPTION, subscriptionId);
|
builder.addHeader(Stomp.Headers.Message.SUBSCRIPTION, subscriptionId);
|
||||||
}
|
}
|
||||||
|
|
||||||
out.write(builder.toFrame());
|
out.write(builder.toFrame());
|
||||||
}
|
}
|
||||||
|
|
||||||
void receive(ActiveMQBytesMessage msg, DataOutput out) throws IOException, JMSException {
|
private void addMessageDispatch(MessageDispatch md) {
|
||||||
// @todo refactor this and the other receive form to remoce duplication
|
dispatchedMessages.addLast(md);
|
||||||
// -bmc
|
|
||||||
if (ackMode == CLIENT_ACK) {
|
|
||||||
AckListener listener = new AckListener(msg, consumerId, subscriptionId);
|
|
||||||
format.addAckListener(listener);
|
|
||||||
}
|
|
||||||
else if (ackMode == AUTO_ACK) {
|
|
||||||
MessageAck ack = new MessageAck();
|
|
||||||
// if (format.isInTransaction())
|
|
||||||
// ack.setTransactionIDString(format.getTransactionId());
|
|
||||||
ack.setDestination(msg.getDestination());
|
|
||||||
ack.setConsumerId(consumerId);
|
|
||||||
ack.setMessageID(msg.getMessageId());
|
|
||||||
ack.setAckType(MessageAck.STANDARD_ACK_TYPE);
|
|
||||||
format.enqueueCommand(ack);
|
|
||||||
}
|
|
||||||
FrameBuilder builder = new FrameBuilder(Stomp.Responses.MESSAGE).addHeaders(msg).setBody(msg.getContent().getData());
|
|
||||||
if (!subscriptionId.equals(NO_ID)) {
|
|
||||||
builder.addHeader(Stomp.Headers.Message.SUBSCRIPTION, subscriptionId);
|
|
||||||
}
|
|
||||||
out.write(builder.toFrame());
|
|
||||||
}
|
}
|
||||||
|
|
||||||
ActiveMQDestination getDestination() {
|
ActiveMQDestination getDestination() {
|
||||||
|
@ -91,8 +90,39 @@ public class Subscription {
|
||||||
}
|
}
|
||||||
|
|
||||||
public RemoveInfo close() {
|
public RemoveInfo close() {
|
||||||
RemoveInfo unsub = new RemoveInfo();
|
return new RemoveInfo(consumerInfo.getConsumerId());
|
||||||
unsub.setObjectId(consumerId);
|
}
|
||||||
return unsub;
|
|
||||||
|
public ConsumerInfo getConsumerInfo() {
|
||||||
|
return consumerInfo;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getSubscriptionId() {
|
||||||
|
return subscriptionId;
|
||||||
|
}
|
||||||
|
|
||||||
|
public MessageAck createMessageAck(String message_id) {
|
||||||
|
MessageAck ack = new MessageAck();
|
||||||
|
ack.setDestination(consumerInfo.getDestination());
|
||||||
|
ack.setAckType(MessageAck.STANDARD_ACK_TYPE);
|
||||||
|
ack.setConsumerId(consumerInfo.getConsumerId());
|
||||||
|
|
||||||
|
int count=0;
|
||||||
|
for (Iterator iter = dispatchedMessages.iterator(); iter.hasNext();) {
|
||||||
|
|
||||||
|
MessageDispatch md = (MessageDispatch) iter.next();
|
||||||
|
String id = ((ActiveMQMessage)md.getMessage()).getJMSMessageID();
|
||||||
|
if( ack.getFirstMessageId()==null )
|
||||||
|
ack.setFirstMessageId(md.getMessage().getMessageId());
|
||||||
|
|
||||||
|
format.getDispachedMap().remove(id);
|
||||||
|
iter.remove();
|
||||||
|
count++;
|
||||||
|
if( id.equals(message_id) ) {
|
||||||
|
ack.setLastMessageId(md.getMessage().getMessageId());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
ack.setMessageCount(count);
|
||||||
|
return ack;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -7,7 +7,10 @@ import org.activemq.command.ActiveMQDestination;
|
||||||
|
|
||||||
import java.io.DataInput;
|
import java.io.DataInput;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.net.ProtocolException;
|
||||||
|
import java.util.Iterator;
|
||||||
import java.util.Properties;
|
import java.util.Properties;
|
||||||
|
import java.util.Set;
|
||||||
|
|
||||||
public class Unsubscribe implements StompCommand {
|
public class Unsubscribe implements StompCommand {
|
||||||
private static final HeaderParser parser = new HeaderParser();
|
private static final HeaderParser parser = new HeaderParser();
|
||||||
|
@ -22,10 +25,25 @@ public class Unsubscribe implements StompCommand {
|
||||||
while (in.readByte() == 0) {
|
while (in.readByte() == 0) {
|
||||||
}
|
}
|
||||||
|
|
||||||
String dest_name = headers.getProperty(Stomp.Headers.Unsubscribe.DESTINATION);
|
String subscriptionId = headers.getProperty(Stomp.Headers.Unsubscribe.ID);
|
||||||
ActiveMQDestination destination = DestinationNamer.convert(dest_name);
|
String destination = headers.getProperty(Stomp.Headers.Unsubscribe.DESTINATION);
|
||||||
|
|
||||||
|
|
||||||
|
if( subscriptionId!=null ) {
|
||||||
|
Subscription s = format.getSubcription(subscriptionId);
|
||||||
|
format.removeSubscription(s);
|
||||||
|
return new CommandEnvelope(s.close(), headers);
|
||||||
|
}
|
||||||
|
|
||||||
|
ActiveMQDestination d = DestinationNamer.convert(destination);
|
||||||
|
Set subs = format.getSubcriptions(d);
|
||||||
|
for (Iterator iter = subs.iterator(); iter.hasNext();) {
|
||||||
|
Subscription s = (Subscription) iter.next();
|
||||||
|
format.removeSubscription(s);
|
||||||
|
return new CommandEnvelope(s.close(), headers);
|
||||||
|
}
|
||||||
|
|
||||||
|
throw new ProtocolException("Unexpected UNSUBSCRIBE received.");
|
||||||
|
|
||||||
Subscription s = format.getSubscriptionFor(destination);
|
|
||||||
return new CommandEnvelope(s.close(), headers);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -8,7 +8,6 @@ import java.net.Socket;
|
||||||
import java.net.URI;
|
import java.net.URI;
|
||||||
|
|
||||||
import javax.jms.Connection;
|
import javax.jms.Connection;
|
||||||
import javax.jms.Message;
|
|
||||||
import javax.jms.MessageConsumer;
|
import javax.jms.MessageConsumer;
|
||||||
import javax.jms.MessageProducer;
|
import javax.jms.MessageProducer;
|
||||||
import javax.jms.Session;
|
import javax.jms.Session;
|
||||||
|
@ -18,7 +17,6 @@ import org.activemq.ActiveMQConnectionFactory;
|
||||||
import org.activemq.CombinationTestSupport;
|
import org.activemq.CombinationTestSupport;
|
||||||
import org.activemq.broker.BrokerService;
|
import org.activemq.broker.BrokerService;
|
||||||
import org.activemq.broker.TransportConnector;
|
import org.activemq.broker.TransportConnector;
|
||||||
import org.activemq.command.ActiveMQDestination;
|
|
||||||
import org.activemq.command.ActiveMQQueue;
|
import org.activemq.command.ActiveMQQueue;
|
||||||
|
|
||||||
public class StompTest extends CombinationTestSupport {
|
public class StompTest extends CombinationTestSupport {
|
||||||
|
@ -118,4 +116,32 @@ public class StompTest extends CombinationTestSupport {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void testSubscribeWithAutoAck() throws Exception {
|
||||||
|
|
||||||
|
String frame =
|
||||||
|
"CONNECT\n" +
|
||||||
|
"login: brianm\n" +
|
||||||
|
"passcode: wombats\n\n"+
|
||||||
|
Stomp.NULL;
|
||||||
|
sendFrame(frame);
|
||||||
|
|
||||||
|
frame = receiveFrame(10000000);
|
||||||
|
assertTrue(frame.startsWith("CONNECTED"));
|
||||||
|
|
||||||
|
frame =
|
||||||
|
"SUBSCRIBE\n" +
|
||||||
|
"destination:/queue/TEST\n" +
|
||||||
|
"ack:auto\n\n" +
|
||||||
|
Stomp.NULL;
|
||||||
|
sendFrame(frame);
|
||||||
|
|
||||||
|
MessageProducer producer = session.createProducer(queue);
|
||||||
|
TextMessage message = session.createTextMessage(getName());
|
||||||
|
producer.send(message);
|
||||||
|
|
||||||
|
frame = receiveFrame(10000);
|
||||||
|
assertTrue(frame.startsWith("MESSAGE"));
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,7 +1,7 @@
|
||||||
#
|
#
|
||||||
# The logging properties used during tests..
|
# The logging properties used during tests..
|
||||||
#
|
#
|
||||||
log4j.rootLogger=INFO, out
|
log4j.rootLogger=DEBUG, stdout
|
||||||
|
|
||||||
log4j.logger.org.activemq.spring=WARN
|
log4j.logger.org.activemq.spring=WARN
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue