Updated the test case some more, shows that its a problem with the MessageId handling when we receive Messages from the AMQP JMS client, when we use our own JMS client to send the the IDs are correct and we can properly ack them

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1487950 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Timothy A. Bish 2013-05-30 18:07:54 +00:00
parent 6fa059837f
commit 03a2c5e81e
2 changed files with 191 additions and 146 deletions

View File

@ -109,7 +109,7 @@ class AmqpProtocolConverter {
public static final EnumSet<EndpointState> CLOSED_STATE = EnumSet.of(EndpointState.CLOSED);
public static final EnumSet<EndpointState> ALL_STATES = EnumSet.of(EndpointState.CLOSED, EndpointState.ACTIVE, EndpointState.UNINITIALIZED);
private static final Logger LOG = LoggerFactory.getLogger(AmqpProtocolConverter.class);
static final public byte[] EMPTY_BYTE_ARRAY = new byte[]{};
static final public byte[] EMPTY_BYTE_ARRAY = new byte[] {};
private final AmqpTransport amqpTransport;
private static final Symbol COPY = Symbol.getSymbol("copy");
private static final Symbol JMS_SELECTOR = Symbol.valueOf("jms-selector");
@ -126,7 +126,7 @@ class AmqpProtocolConverter {
public AmqpProtocolConverter(AmqpTransport transport, BrokerContext brokerContext) {
this.amqpTransport = transport;
this.protonTransport.bind(this.protonConnection);
if( transport.isTrace() ) {
if (transport.isTrace()) {
this.protonTransport.setProtocolTracer(new ProtocolTracer() {
@Override
public void receivedFrame(TransportFrame transportFrame) {
@ -151,13 +151,13 @@ class AmqpProtocolConverter {
if (count > 0) {
final Buffer buffer;
buffer = new Buffer(data, 0, count);
// System.out.println("writing: " + buffer.toString().substring(5).replaceAll("(..)", "$1 "));
// System.out.println("writing: " + buffer.toString().substring(5).replaceAll("(..)", "$1 "));
amqpTransport.sendToAmqp(buffer);
} else {
done = true;
}
}
// System.out.println("write done");
// System.out.println("write done");
} catch (IOException e) {
amqpTransport.onException(e);
}
@ -170,7 +170,6 @@ class AmqpProtocolConverter {
public AmqpSessionContext(ConnectionId connectionId, long id) {
sessionId = new SessionId(connectionId, id);
}
}
@ -181,29 +180,29 @@ class AmqpProtocolConverter {
*/
public void onAMQPData(Object command) throws Exception {
Buffer frame;
if( command.getClass() == AmqpHeader.class ) {
AmqpHeader header = (AmqpHeader)command;
switch( header.getProtocolId() ) {
if (command.getClass() == AmqpHeader.class) {
AmqpHeader header = (AmqpHeader) command;
switch (header.getProtocolId()) {
case 0:
// amqpTransport.sendToAmqp(new AmqpHeader());
break; // nothing to do..
case 3: // Client will be using SASL for auth..
sasl = protonTransport.sasl();
sasl.setMechanisms(new String[]{"ANONYMOUS", "PLAIN"});
sasl.setMechanisms(new String[] { "ANONYMOUS", "PLAIN" });
sasl.server();
break;
default:
}
frame = header.getBuffer();
} else {
frame = (Buffer)command;
frame = (Buffer) command;
}
onFrame(frame);
}
public void onFrame(Buffer frame) throws Exception {
// System.out.println("read: " + frame.toString().substring(5).replaceAll("(..)", "$1 "));
while( frame.length > 0 ) {
// System.out.println("read: " + frame.toString().substring(5).replaceAll("(..)", "$1 "));
while (frame.length > 0) {
try {
int count = protonTransport.input(frame.data, frame.offset, frame.length);
frame.moveHead(count);
@ -211,26 +210,26 @@ class AmqpProtocolConverter {
handleException(new AmqpProtocolException("Could not decode AMQP frame: " + frame, true, e));
return;
}
try {
if( sasl!=null ) {
try {
if (sasl != null) {
// Lets try to complete the sasl handshake.
if( sasl.getRemoteMechanisms().length > 0 ) {
if( "PLAIN".equals(sasl.getRemoteMechanisms()[0]) ) {
if (sasl.getRemoteMechanisms().length > 0) {
if ("PLAIN".equals(sasl.getRemoteMechanisms()[0])) {
byte[] data = new byte[sasl.pending()];
sasl.recv(data, 0, data.length);
Buffer[] parts = new Buffer(data).split((byte) 0);
if( parts.length > 0 ) {
if (parts.length > 0) {
connectionInfo.setUserName(parts[0].utf8().toString());
}
if( parts.length > 1 ) {
if (parts.length > 1) {
connectionInfo.setPassword(parts[1].utf8().toString());
}
// We can't really auth at this point since we don't know the client id yet.. :(
sasl.done(Sasl.SaslOutcome.PN_SASL_OK);
amqpTransport.getWireFormat().magicRead = false;
sasl = null;
} else if( "ANONYMOUS".equals(sasl.getRemoteMechanisms()[0]) ) {
} else if ("ANONYMOUS".equals(sasl.getRemoteMechanisms()[0])) {
sasl.done(Sasl.SaslOutcome.PN_SASL_OK);
amqpTransport.getWireFormat().magicRead = false;
sasl = null;
@ -246,12 +245,10 @@ class AmqpProtocolConverter {
// Lets map amqp sessions to openwire sessions..
Session session = protonConnection.sessionHead(UNINITIALIZED_SET, INITIALIZED_SET);
while (session != null) {
onSessionOpen(session);
session = protonConnection.sessionHead(UNINITIALIZED_SET, INITIALIZED_SET);
}
Link link = protonConnection.linkHead(UNINITIALIZED_SET, INITIALIZED_SET);
while (link != null) {
onLinkOpen(link);
@ -269,21 +266,20 @@ class AmqpProtocolConverter {
link = protonConnection.linkHead(ACTIVE_STATE, CLOSED_STATE);
while (link != null) {
((AmqpDeliveryListener)link.getContext()).onClose();
((AmqpDeliveryListener) link.getContext()).onClose();
link.close();
link = link.next(ACTIVE_STATE, CLOSED_STATE);
}
link = protonConnection.linkHead(ACTIVE_STATE, ALL_STATES);
while (link != null) {
((AmqpDeliveryListener)link.getContext()).drainCheck();
((AmqpDeliveryListener) link.getContext()).drainCheck();
link = link.next(ACTIVE_STATE, ALL_STATES);
}
session = protonConnection.sessionHead(ACTIVE_STATE, CLOSED_STATE);
while (session != null) {
//TODO - close links?
// TODO - close links?
onSessionClose(session);
session = session.next(ACTIVE_STATE, CLOSED_STATE);
}
@ -303,13 +299,13 @@ class AmqpProtocolConverter {
boolean closedSocket = false;
private void doClose() {
if( !closing ) {
if (!closing) {
closing = true;
sendToActiveMQ(new RemoveInfo(connectionId), new ResponseHandler() {
@Override
public void onResponse(AmqpProtocolConverter converter, Response response) throws IOException {
protonConnection.close();
if( !closedSocket) {
if (!closedSocket) {
pumpProtonToSocket();
}
}
@ -317,10 +313,9 @@ class AmqpProtocolConverter {
}
}
public void onAMQPException(IOException error) {
closedSocket = true;
if( !closing ) {
if (!closing) {
amqpTransport.sendToActiveMQ(error);
} else {
try {
@ -347,6 +342,9 @@ class AmqpProtocolConverter {
MessageDispatch md = (MessageDispatch) command;
ConsumerContext consumerContext = subscriptionsByConsumerId.get(md.getConsumerId());
if (consumerContext != null) {
if (LOG.isTraceEnabled()) {
LOG.trace("Dispatching MessageId:{} to consumer", md.getMessage().getMessageId());
}
consumerContext.onMessageDispatch(md);
}
} else if (command.getDataStructureType() == ConnectionError.DATA_STRUCTURE_TYPE) {
@ -354,7 +352,7 @@ class AmqpProtocolConverter {
Throwable exception = ((ConnectionError) command).getException();
handleException(exception);
} else if (command.isBrokerInfo()) {
//ignore
// ignore
} else {
LOG.debug("Do not know how to process ActiveMQ Command " + command);
}
@ -368,15 +366,19 @@ class AmqpProtocolConverter {
static abstract class AmqpDeliveryListener {
abstract public void onDelivery(Delivery delivery) throws Exception;
public void onClose() throws Exception {}
public void drainCheck() {}
public void onClose() throws Exception {
}
public void drainCheck() {
}
}
private void onConnectionOpen() throws AmqpProtocolException {
connectionInfo.setResponseRequired(true);
connectionInfo.setConnectionId(connectionId);
// configureInactivityMonitor(connect.keepAlive());
// configureInactivityMonitor(connect.keepAlive());
String clientId = protonConnection.getRemoteContainer();
if (clientId != null && !clientId.isEmpty()) {
@ -393,8 +395,9 @@ class AmqpProtocolConverter {
if (response.isException()) {
Throwable exception = ((ExceptionResponse) response).getException();
// TODO: figure out how to close /w an error.
// protonConnection.setLocalError(new EndpointError(exception.getClass().getName(), exception.getMessage()));
// TODO: figure out how to close /w an error.
// protonConnection.setLocalError(new EndpointError(exception.getClass().getName(),
// exception.getMessage()));
protonConnection.close();
pumpProtonToSocket();
amqpTransport.onException(IOExceptionSupport.create(exception));
@ -413,8 +416,8 @@ class AmqpProtocolConverter {
}
private void onSessionClose(Session session) {
AmqpSessionContext sessionContext = (AmqpSessionContext)session.getContext();
if( sessionContext!=null ) {
AmqpSessionContext sessionContext = (AmqpSessionContext) session.getContext();
if (sessionContext != null) {
System.out.println(sessionContext.sessionId);
sendToActiveMQ(new RemoveInfo(sessionContext.sessionId), null);
session.setContext(null);
@ -436,7 +439,7 @@ class AmqpProtocolConverter {
InboundTransformer inboundTransformer;
protected InboundTransformer getInboundTransformer() {
protected InboundTransformer getInboundTransformer() {
if (inboundTransformer == null) {
String transformer = amqpTransport.getTransformer();
if (transformer.equals(InboundTransformer.TRANSFORMER_JMS)) {
@ -459,24 +462,24 @@ class AmqpProtocolConverter {
@Override
public void onDelivery(Delivery delivery) throws Exception {
Receiver receiver = ((Receiver)delivery.getLink());
if( !delivery.isReadable() ) {
Receiver receiver = ((Receiver) delivery.getLink());
if (!delivery.isReadable()) {
System.out.println("it was not readable!");
return;
}
if( current==null ) {
if (current == null) {
current = new ByteArrayOutputStream();
}
int count;
byte data[] = new byte[1024*4];
while( (count = receiver.recv(data, 0, data.length)) > 0 ) {
byte data[] = new byte[1024 * 4];
while ((count = receiver.recv(data, 0, data.length)) > 0) {
current.write(data, 0, count);
}
// Expecting more deliveries..
if( count == 0 ) {
if (count == 0) {
return;
}
@ -505,16 +508,16 @@ class AmqpProtocolConverter {
final ActiveMQMessage message = (ActiveMQMessage) getInboundTransformer().transform(em);
current = null;
if( destination!=null ) {
if (destination != null) {
message.setJMSDestination(destination);
}
message.setProducerId(producerId);
if( message.getMessageId()==null ) {
if (message.getMessageId() == null) {
message.setMessageId(new MessageId(producerId, messageIdGenerator.getNextSequenceId()));
}
DeliveryState remoteState = delivery.getRemoteState();
if( remoteState!=null && remoteState instanceof TransactionalState) {
if (remoteState != null && remoteState instanceof TransactionalState) {
TransactionalState s = (TransactionalState) remoteState;
long txid = toLong(s.getTxnId());
message.setTransactionId(new LocalTransactionId(connectionId, txid));
@ -524,9 +527,9 @@ class AmqpProtocolConverter {
sendToActiveMQ(message, new ResponseHandler() {
@Override
public void onResponse(AmqpProtocolConverter converter, Response response) throws IOException {
if( !delivery.remotelySettled() ) {
if( response.isException() ) {
ExceptionResponse er = (ExceptionResponse)response;
if (!delivery.remotelySettled()) {
if (response.isException()) {
ExceptionResponse er = (ExceptionResponse) response;
Rejected rejected = new Rejected();
ErrorCondition condition = new ErrorCondition();
condition.setCondition(Symbol.valueOf("failed"));
@ -541,13 +544,13 @@ class AmqpProtocolConverter {
}
});
}
}
long nextTransactionId = 0;
class Transaction {
class Transaction {
}
HashMap<Long, Transaction> transactions = new HashMap<Long, Transaction>();
public byte[] toBytes(long value) {
@ -561,7 +564,6 @@ class AmqpProtocolConverter {
return buffer.bigEndianEditor().readLong();
}
AmqpDeliveryListener coordinatorContext = new BaseProducerContext() {
@Override
protected void onMessage(Receiver receiver, final Delivery delivery, Buffer buffer) throws Exception {
@ -569,49 +571,48 @@ class AmqpProtocolConverter {
MessageImpl msg = new MessageImpl();
int offset = buffer.offset;
int len = buffer.length;
while( len > 0 ) {
while (len > 0) {
final int decoded = msg.decode(buffer.data, offset, len);
assert decoded > 0: "Make progress decoding the message";
assert decoded > 0 : "Make progress decoding the message";
offset += decoded;
len -= decoded;
}
Object action = ((AmqpValue)msg.getBody()).getValue();
System.out.println("COORDINATOR received: "+action+", ["+buffer+"]");
if( action instanceof Declare ) {
Object action = ((AmqpValue) msg.getBody()).getValue();
System.out.println("COORDINATOR received: " + action + ", [" + buffer + "]");
if (action instanceof Declare) {
Declare declare = (Declare) action;
if( declare.getGlobalId()!=null ) {
if (declare.getGlobalId() != null) {
throw new Exception("don't know how to handle a declare /w a set GlobalId");
}
long txid = nextTransactionId++;
TransactionInfo txinfo = new TransactionInfo(connectionId, new LocalTransactionId(connectionId, txid), TransactionInfo.BEGIN);
sendToActiveMQ(txinfo, null);
System.out.println("started transaction "+txid);
System.out.println("started transaction " + txid);
Declared declared = new Declared();
declared.setTxnId(new Binary(toBytes(txid)));
delivery.disposition(declared);
delivery.settle();
} else if( action instanceof Discharge) {
} else if (action instanceof Discharge) {
Discharge discharge = (Discharge) action;
long txid = toLong(discharge.getTxnId());
byte operation;
if( discharge.getFail() ) {
System.out.println("rollback transaction "+txid);
operation = TransactionInfo.ROLLBACK ;
if (discharge.getFail()) {
System.out.println("rollback transaction " + txid);
operation = TransactionInfo.ROLLBACK;
} else {
System.out.println("commit transaction "+txid);
System.out.println("commit transaction " + txid);
operation = TransactionInfo.COMMIT_ONE_PHASE;
}
TransactionInfo txinfo = new TransactionInfo(connectionId, new LocalTransactionId(connectionId, txid), operation);
sendToActiveMQ(txinfo, new ResponseHandler() {
@Override
public void onResponse(AmqpProtocolConverter converter, Response response) throws IOException {
if( response.isException() ) {
ExceptionResponse er = (ExceptionResponse)response;
if (response.isException()) {
ExceptionResponse er = (ExceptionResponse) response;
Rejected rejected = new Rejected();
rejected.setError(createErrorCondition("failed", er.getException().getMessage()));
delivery.disposition(rejected);
@ -620,20 +621,17 @@ class AmqpProtocolConverter {
pumpProtonToSocket();
}
});
} else {
throw new Exception("Expected coordinator message type: "+action.getClass());
throw new Exception("Expected coordinator message type: " + action.getClass());
}
}
};
void onReceiverOpen(final Receiver receiver, AmqpSessionContext sessionContext) {
// Client is producing to this receiver object
org.apache.qpid.proton.amqp.transport.Target remoteTarget = receiver.getRemoteTarget();
try {
if( remoteTarget instanceof Coordinator ) {
if (remoteTarget instanceof Coordinator) {
pumpProtonToSocket();
receiver.setContext(coordinatorContext);
receiver.flow(prefetch);
@ -643,7 +641,7 @@ class AmqpProtocolConverter {
Target target = (Target) remoteTarget;
ProducerId producerId = new ProducerId(sessionContext.sessionId, sessionContext.nextProducerId++);
ActiveMQDestination dest;
if( target.getDynamic() ) {
if (target.getDynamic()) {
dest = createTempQueue();
Target actualTarget = new Target();
actualTarget.setAddress(dest.getQualifiedName());
@ -665,7 +663,7 @@ class AmqpProtocolConverter {
if (response.isException()) {
receiver.setTarget(null);
Throwable exception = ((ExceptionResponse) response).getException();
((LinkImpl)receiver).setLocalError(new EndpointError(exception.getClass().getName(), exception.getMessage()));
((LinkImpl) receiver).setLocalError(new EndpointError(exception.getClass().getName(), exception.getMessage()));
receiver.close();
} else {
receiver.open();
@ -676,37 +674,35 @@ class AmqpProtocolConverter {
}
} catch (AmqpProtocolException exception) {
receiver.setTarget(null);
((LinkImpl)receiver).setLocalError(new EndpointError(exception.getSymbolicName(), exception.getMessage()));
((LinkImpl) receiver).setLocalError(new EndpointError(exception.getSymbolicName(), exception.getMessage()));
receiver.close();
}
}
private ActiveMQDestination createDestination(Object terminus) throws AmqpProtocolException {
if( terminus == null ) {
if (terminus == null) {
return null;
} else if( terminus instanceof org.apache.qpid.proton.amqp.messaging.Source) {
org.apache.qpid.proton.amqp.messaging.Source source = (org.apache.qpid.proton.amqp.messaging.Source)terminus;
if( source.getAddress() == null || source.getAddress().length()==0) {
} else if (terminus instanceof org.apache.qpid.proton.amqp.messaging.Source) {
org.apache.qpid.proton.amqp.messaging.Source source = (org.apache.qpid.proton.amqp.messaging.Source) terminus;
if (source.getAddress() == null || source.getAddress().length() == 0) {
throw new AmqpProtocolException("amqp:invalid-field", "source address not set");
}
return ActiveMQDestination.createDestination(source.getAddress(), ActiveMQDestination.QUEUE_TYPE);
} else if( terminus instanceof org.apache.qpid.proton.amqp.messaging.Target) {
org.apache.qpid.proton.amqp.messaging.Target target = (org.apache.qpid.proton.amqp.messaging.Target)terminus;
if( target.getAddress() == null || target.getAddress().length()==0) {
} else if (terminus instanceof org.apache.qpid.proton.amqp.messaging.Target) {
org.apache.qpid.proton.amqp.messaging.Target target = (org.apache.qpid.proton.amqp.messaging.Target) terminus;
if (target.getAddress() == null || target.getAddress().length() == 0) {
throw new AmqpProtocolException("amqp:invalid-field", "target address not set");
}
return ActiveMQDestination.createDestination(target.getAddress(), ActiveMQDestination.QUEUE_TYPE);
} else if( terminus instanceof Coordinator ) {
Coordinator target = (Coordinator)terminus;
} else if (terminus instanceof Coordinator) {
return null;
} else {
throw new RuntimeException("Unexpected terminus type: "+terminus);
throw new RuntimeException("Unexpected terminus type: " + terminus);
}
}
OutboundTransformer outboundTransformer = new AutoOutboundTransformer(ActiveMQJMSVendor.INSTANCE);
class ConsumerContext extends AmqpDeliveryListener {
private final ConsumerId consumerId;
private final Sender sender;
@ -739,15 +735,14 @@ class AmqpProtocolConverter {
}
void checkinTag(byte[] data) {
if( tagCache.size() < 1024 ) {
if (tagCache.size() < 1024) {
tagCache.add(data);
}
}
@Override
public void onClose() throws Exception {
if( !closed ) {
if (!closed) {
closed = true;
sendToActiveMQ(new RemoveInfo(consumerId), null);
}
@ -757,7 +752,7 @@ class AmqpProtocolConverter {
// called when the connection receives a JMS message from ActiveMQ
public void onMessageDispatch(MessageDispatch md) throws Exception {
if( !closed ) {
if (!closed) {
outbound.addLast(md);
pumpOutbound();
pumpProtonToSocket();
@ -768,14 +763,14 @@ class AmqpProtocolConverter {
Delivery currentDelivery;
public void pumpOutbound() throws Exception {
while(!closed) {
while (!closed) {
while( currentBuffer !=null ) {
while (currentBuffer != null) {
int sent = sender.send(currentBuffer.data, currentBuffer.offset, currentBuffer.length);
if( sent > 0 ) {
if (sent > 0) {
currentBuffer.moveHead(sent);
if( currentBuffer.length == 0 ) {
if( presettle ) {
if (currentBuffer.length == 0) {
if (presettle) {
settle(currentDelivery, MessageAck.INDIVIDUAL_ACK_TYPE);
} else {
sender.advance();
@ -788,34 +783,31 @@ class AmqpProtocolConverter {
}
}
if( outbound.isEmpty() ) {
if (outbound.isEmpty()) {
return;
}
final MessageDispatch md = outbound.removeFirst();
try {
final ActiveMQMessage jms = (ActiveMQMessage) md.getMessage();
if( jms==null ) {
if (jms == null) {
// It's the end of browse signal.
sender.drained();
} else {
jms.setRedeliveryCounter(md.getRedeliveryCounter());
jms.setReadOnlyBody(true);
final EncodedMessage amqp = outboundTransformer.transform(jms);
if( amqp!=null && amqp.getLength() > 0 ) {
if (amqp != null && amqp.getLength() > 0) {
currentBuffer = new Buffer(amqp.getArray(), amqp.getArrayOffset(), amqp.getLength());
if( presettle ) {
if (presettle) {
currentDelivery = sender.delivery(EMPTY_BYTE_ARRAY, 0, 0);
} else {
final byte[] tag = nextTag();
currentDelivery = sender.delivery(tag, 0, tag.length);
}
currentDelivery.setContext(md);
} else {
// TODO: message could not be generated what now?
}
}
} catch (Exception e) {
@ -826,11 +818,11 @@ class AmqpProtocolConverter {
private void settle(final Delivery delivery, int ackType) throws Exception {
byte[] tag = delivery.getTag();
if( tag !=null && tag.length>0 ) {
if (tag != null && tag.length > 0) {
checkinTag(tag);
}
if( ackType == -1) {
if (ackType == -1) {
// we are going to settle, but redeliver.. we we won't yet ack to ActiveMQ
delivery.settle();
onMessageDispatch((MessageDispatch) delivery.getContext());
@ -841,20 +833,24 @@ class AmqpProtocolConverter {
ack.setFirstMessageId(md.getMessage().getMessageId());
ack.setLastMessageId(md.getMessage().getMessageId());
ack.setMessageCount(1);
ack.setAckType((byte)ackType);
ack.setAckType((byte) ackType);
ack.setDestination(md.getDestination());
DeliveryState remoteState = delivery.getRemoteState();
if( remoteState!=null && remoteState instanceof TransactionalState) {
if (remoteState != null && remoteState instanceof TransactionalState) {
TransactionalState s = (TransactionalState) remoteState;
long txid = toLong(s.getTxnId());
ack.setTransactionId(new LocalTransactionId(connectionId, txid));
}
if (LOG.isTraceEnabled()) {
LOG.trace("Sending Ack for MessageId:{} to ActiveMQ", ack.getLastMessageId());
}
sendToActiveMQ(ack, new ResponseHandler() {
@Override
public void onResponse(AmqpProtocolConverter converter, Response response) throws IOException {
if( response.isException() ) {
if (response.isException()) {
if (response.isException()) {
Throwable exception = ((ExceptionResponse) response).getException();
exception.printStackTrace();
@ -871,7 +867,7 @@ class AmqpProtocolConverter {
@Override
public void drainCheck() {
if( outbound.isEmpty() ) {
if (outbound.isEmpty()) {
sender.drained();
}
}
@ -880,27 +876,27 @@ class AmqpProtocolConverter {
public void onDelivery(Delivery delivery) throws Exception {
MessageDispatch md = (MessageDispatch) delivery.getContext();
final DeliveryState state = delivery.getRemoteState();
if( state instanceof Accepted ) {
if( !delivery.remotelySettled() ) {
if (state instanceof Accepted) {
if (!delivery.remotelySettled()) {
delivery.disposition(new Accepted());
}
settle(delivery, MessageAck.INDIVIDUAL_ACK_TYPE);
} else if( state instanceof Rejected) {
} else if (state instanceof Rejected) {
// re-deliver /w incremented delivery counter.
md.setRedeliveryCounter(md.getRedeliveryCounter() + 1);
settle(delivery, -1);
} else if( state instanceof Released) {
} else if (state instanceof Released) {
// re-deliver && don't increment the counter.
settle(delivery, -1);
} else if( state instanceof Modified) {
} else if (state instanceof Modified) {
Modified modified = (Modified) state;
if ( modified.getDeliveryFailed() ) {
// increment delivery counter..
md.setRedeliveryCounter(md.getRedeliveryCounter() + 1);
if (modified.getDeliveryFailed()) {
// increment delivery counter..
md.setRedeliveryCounter(md.getRedeliveryCounter() + 1);
}
byte ackType = -1;
Boolean undeliverableHere = modified.getUndeliverableHere();
if( undeliverableHere !=null && undeliverableHere ) {
if (undeliverableHere != null && undeliverableHere) {
// receiver does not want the message..
// perhaps we should DLQ it?
ackType = MessageAck.POSION_ACK_TYPE;
@ -909,13 +905,12 @@ class AmqpProtocolConverter {
}
pumpOutbound();
}
}
private final ConcurrentHashMap<ConsumerId, ConsumerContext> subscriptionsByConsumerId = new ConcurrentHashMap<ConsumerId, ConsumerContext>();
void onSenderOpen(final Sender sender, AmqpSessionContext sessionContext) {
org.apache.qpid.proton.amqp.messaging.Source source = (org.apache.qpid.proton.amqp.messaging.Source)sender.getRemoteSource();
org.apache.qpid.proton.amqp.messaging.Source source = (org.apache.qpid.proton.amqp.messaging.Source) sender.getRemoteSource();
try {
final ConsumerId id = new ConsumerId(sessionContext.sessionId, sessionContext.nextConsumerId++);
@ -923,18 +918,18 @@ class AmqpProtocolConverter {
sender.setContext(consumerContext);
String selector = null;
if( source!=null ) {
if (source != null) {
Map filter = source.getFilter();
if (filter != null) {
DescribedType value = (DescribedType)filter.get(JMS_SELECTOR);
if( value!=null ) {
DescribedType value = (DescribedType) filter.get(JMS_SELECTOR);
if (value != null) {
selector = value.getDescribed().toString();
// Validate the Selector.
try {
SelectorParser.parse(selector);
} catch (InvalidSelectorException e) {
sender.setSource(null);
((LinkImpl)sender).setLocalError(new EndpointError("amqp:invalid-field", e.getMessage()));
((LinkImpl) sender).setLocalError(new EndpointError("amqp:invalid-field", e.getMessage()));
sender.close();
consumerContext.closed = true;
return;
@ -944,7 +939,7 @@ class AmqpProtocolConverter {
}
ActiveMQDestination dest;
if( source == null ) {
if (source == null) {
source = new org.apache.qpid.proton.amqp.messaging.Source();
source.setAddress("");
@ -957,7 +952,7 @@ class AmqpProtocolConverter {
rsi.setSubscriptionName(sender.getName());
rsi.setClientId(connectionInfo.getClientId());
consumerContext.closed=true;
consumerContext.closed = true;
sendToActiveMQ(rsi, new ResponseHandler() {
@Override
public void onResponse(AmqpProtocolConverter converter, Response response) throws IOException {
@ -965,19 +960,19 @@ class AmqpProtocolConverter {
sender.setSource(null);
Throwable exception = ((ExceptionResponse) response).getException();
String name = exception.getClass().getName();
((LinkImpl)sender).setLocalError(new EndpointError(name, exception.getMessage()));
((LinkImpl) sender).setLocalError(new EndpointError(name, exception.getMessage()));
}
sender.open();
pumpProtonToSocket();
}
});
return;
} else if( contains(source.getCapabilities(), DURABLE_SUBSCRIPTION_ENDED) ) {
consumerContext.closed=true;
} else if (contains(source.getCapabilities(), DURABLE_SUBSCRIPTION_ENDED)) {
consumerContext.closed = true;
sender.close();
pumpProtonToSocket();
return;
} else if( source.getDynamic() ) {
} else if (source.getDynamic()) {
// lets create a temp dest.
dest = createTempQueue();
source = new org.apache.qpid.proton.amqp.messaging.Source();
@ -995,17 +990,17 @@ class AmqpProtocolConverter {
consumerInfo.setDestination(dest);
consumerInfo.setPrefetchSize(100);
consumerInfo.setDispatchAsync(true);
if( source.getDistributionMode() == COPY && dest.isQueue() ) {
if (source.getDistributionMode() == COPY && dest.isQueue()) {
consumerInfo.setBrowser(true);
}
if( DURABLE.equals(source.getDurable()) && dest.isTopic() ) {
if (DURABLE.equals(source.getDurable()) && dest.isTopic()) {
consumerInfo.setSubscriptionName(sender.getName());
}
Map filter = source.getFilter();
if (filter != null) {
DescribedType value = (DescribedType)filter.get(NO_LOCAL);
if( value!=null ) {
DescribedType value = (DescribedType) filter.get(NO_LOCAL);
if (value != null) {
consumerInfo.setNoLocal(true);
}
}
@ -1017,10 +1012,10 @@ class AmqpProtocolConverter {
sender.setSource(null);
Throwable exception = ((ExceptionResponse) response).getException();
String name = exception.getClass().getName();
if( exception instanceof InvalidSelectorException ) {
if (exception instanceof InvalidSelectorException) {
name = "amqp:invalid-field";
}
((LinkImpl)sender).setLocalError(new EndpointError(name, exception.getMessage()));
((LinkImpl) sender).setLocalError(new EndpointError(name, exception.getMessage()));
subscriptionsByConsumerId.remove(id);
sender.close();
} else {
@ -1031,15 +1026,15 @@ class AmqpProtocolConverter {
});
} catch (AmqpProtocolException e) {
sender.setSource(null);
((LinkImpl)sender).setLocalError(new EndpointError(e.getSymbolicName(), e.getMessage()));
((LinkImpl) sender).setLocalError(new EndpointError(e.getSymbolicName(), e.getMessage()));
sender.close();
}
}
static private boolean contains(Symbol[] haystack, Symbol needle) {
if( haystack!=null ) {
if (haystack != null) {
for (Symbol capability : haystack) {
if( capability == needle) {
if (capability == needle) {
return true;
}
}
@ -1058,11 +1053,11 @@ class AmqpProtocolConverter {
return rc;
}
////////////////////////////////////////////////////////////////////////////
// //////////////////////////////////////////////////////////////////////////
//
// Implementation methods
//
////////////////////////////////////////////////////////////////////////////
// //////////////////////////////////////////////////////////////////////////
private final Object commnadIdMutex = new Object();
private int lastCommandId;
@ -1106,5 +1101,4 @@ class AmqpProtocolConverter {
condition.setDescription(description);
return condition;
}
}

View File

@ -23,6 +23,8 @@ import static org.junit.Assert.assertTrue;
import java.io.File;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.jms.Message;
@ -31,6 +33,7 @@ import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.spring.SpringSslContext;
import org.apache.activemq.store.kahadb.KahaDBStore;
@ -43,14 +46,47 @@ public class AMQ4563Test extends AmqpTestSupport {
public static final String KAHADB_DIRECTORY = "target/activemq-data/kahadb-amq4563";
private String openwireUri;
@Test(timeout = 60000)
public void testTransactions() throws Exception {
public void testMessagesAreAckedAMQProducer() throws Exception {
int messagesSent = 3;
ActiveMQAdmin.enableJMSFrameTracing();
QueueImpl queue = new QueueImpl("queue://txqueue");
assertTrue(brokerService.isPersistent());
Connection connection = createConnection();
Connection connection = createAMQConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createQueue("txqueue");
MessageProducer p = session.createProducer(destination);
TextMessage message = null;
for (int i=0; i < messagesSent; i++) {
message = session.createTextMessage();
String messageText = "Hello " + i + " sent at " + new java.util.Date().toString();
message.setText(messageText);
LOG.debug(">>>> Sent [" + messageText + "]");
p.send(message);
}
// After the first restart we should get all messages sent above
restartBroker(connection, session);
int messagesReceived = readAllMessages(queue);
assertEquals(messagesSent, messagesReceived);
// This time there should be no messages on this queue
restartBroker(connection, session);
messagesReceived = readAllMessages(queue);
assertEquals(0, messagesReceived);
}
@Test(timeout = 60000)
public void testMessagesAreAckedAMQPProducer() throws Exception {
int messagesSent = 3;
ActiveMQAdmin.enableJMSFrameTracing();
QueueImpl queue = new QueueImpl("queue://txqueue");
assertTrue(brokerService.isPersistent());
Connection connection = createAMQPConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer p = session.createProducer(queue);
TextMessage message = null;
@ -74,7 +110,7 @@ public class AMQ4563Test extends AmqpTestSupport {
}
private int readAllMessages(QueueImpl queue) throws JMSException {
Connection connection = createConnection();
Connection connection = createAMQPConnection();
try {
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
int messagesReceived = 0;
@ -104,7 +140,7 @@ public class AMQ4563Test extends AmqpTestSupport {
createBroker(false);
}
private Connection createConnection() throws JMSException {
private Connection createAMQPConnection() throws JMSException {
LOG.debug(">>> In createConnection using port " + port);
final ConnectionFactoryImpl factory = new ConnectionFactoryImpl("localhost", port, "admin", "password");
final Connection connection = factory.createConnection();
@ -118,6 +154,20 @@ public class AMQ4563Test extends AmqpTestSupport {
return connection;
}
private Connection createAMQConnection() throws JMSException {
LOG.debug(">>> In createConnection using port " + port);
final ConnectionFactory factory = new ActiveMQConnectionFactory("admin", "password", openwireUri);
final Connection connection = factory.createConnection();
connection.setExceptionListener(new ExceptionListener() {
@Override
public void onException(JMSException exception) {
exception.printStackTrace();
}
});
connection.start();
return connection;
}
@Override
public void startBroker() throws Exception {
createBroker(true);
@ -136,6 +186,7 @@ public class AMQ4563Test extends AmqpTestSupport {
brokerService.setPersistenceAdapter(kaha);
brokerService.setAdvisorySupport(false);
brokerService.setUseJmx(false);
openwireUri = brokerService.addConnector("tcp://0.0.0.0:0").getPublishableConnectString();
// Setup SSL context...
final File classesDir = new File(AmqpProtocolConverter.class.getProtectionDomain().getCodeSource().getLocation().getFile());