This commit is contained in:
Dejan Bosanac 2014-05-15 15:13:01 +02:00
parent 0c0fadcdce
commit ff64b14bc7

View File

@ -553,6 +553,7 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
private final ProducerId producerId;
private final LongSequenceGenerator messageIdGenerator = new LongSequenceGenerator();
private final ActiveMQDestination destination;
private boolean closed;
public ProducerContext(ProducerId producerId, ActiveMQDestination destination) {
this.producerId = producerId;
@ -561,70 +562,79 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
@Override
protected void onMessage(final Receiver receiver, final Delivery delivery, Buffer buffer) throws Exception {
EncodedMessage em = new EncodedMessage(delivery.getMessageFormat(), buffer.data, buffer.offset, buffer.length);
final ActiveMQMessage message = (ActiveMQMessage) getInboundTransformer().transform(em);
current = null;
if (!closed) {
EncodedMessage em = new EncodedMessage(delivery.getMessageFormat(), buffer.data, buffer.offset, buffer.length);
final ActiveMQMessage message = (ActiveMQMessage) getInboundTransformer().transform(em);
current = null;
if (destination != null) {
message.setJMSDestination(destination);
}
message.setProducerId(producerId);
// Always override the AMQP client's MessageId with our own. Preserve the
// original in the TextView property for later Ack.
MessageId messageId = new MessageId(producerId, messageIdGenerator.getNextSequenceId());
MessageId amqpMessageId = message.getMessageId();
if (amqpMessageId != null) {
if (amqpMessageId.getTextView() != null) {
messageId.setTextView(amqpMessageId.getTextView());
} else {
messageId.setTextView(amqpMessageId.toString());
if (destination != null) {
message.setJMSDestination(destination);
}
}
message.setProducerId(producerId);
message.setMessageId(messageId);
// Always override the AMQP client's MessageId with our own. Preserve the
// original in the TextView property for later Ack.
MessageId messageId = new MessageId(producerId, messageIdGenerator.getNextSequenceId());
LOG.trace("Inbound Message:{} from Producer:{}", message.getMessageId(), producerId + ":" + messageId.getProducerSequenceId());
DeliveryState remoteState = delivery.getRemoteState();
if (remoteState != null && remoteState instanceof TransactionalState) {
TransactionalState s = (TransactionalState) remoteState;
long txid = toLong(s.getTxnId());
message.setTransactionId(new LocalTransactionId(connectionId, txid));
}
// Lets handle the case where the expiration was set, but the timestamp
// was not set by the client. Lets assign the timestamp now, and adjust the
// expiration.
if (message.getExpiration() != 0) {
if (message.getTimestamp() == 0) {
message.setTimestamp(System.currentTimeMillis());
message.setExpiration(message.getTimestamp() + message.getExpiration());
}
}
message.onSend();
sendToActiveMQ(message, new ResponseHandler() {
@Override
public void onResponse(IAmqpProtocolConverter converter, Response response) throws IOException {
if (!delivery.remotelySettled()) {
if (response.isException()) {
ExceptionResponse er = (ExceptionResponse) response;
Rejected rejected = new Rejected();
ErrorCondition condition = new ErrorCondition();
condition.setCondition(Symbol.valueOf("failed"));
condition.setDescription(er.getException().getMessage());
rejected.setError(condition);
delivery.disposition(rejected);
}
MessageId amqpMessageId = message.getMessageId();
if (amqpMessageId != null) {
if (amqpMessageId.getTextView() != null) {
messageId.setTextView(amqpMessageId.getTextView());
} else {
messageId.setTextView(amqpMessageId.toString());
}
receiver.flow(1);
delivery.disposition(Accepted.getInstance());
delivery.settle();
pumpProtonToSocket();
}
});
message.setMessageId(messageId);
LOG.trace("Inbound Message:{} from Producer:{}", message.getMessageId(), producerId + ":" + messageId.getProducerSequenceId());
DeliveryState remoteState = delivery.getRemoteState();
if (remoteState != null && remoteState instanceof TransactionalState) {
TransactionalState s = (TransactionalState) remoteState;
long txid = toLong(s.getTxnId());
message.setTransactionId(new LocalTransactionId(connectionId, txid));
}
// Lets handle the case where the expiration was set, but the timestamp
// was not set by the client. Lets assign the timestamp now, and adjust the
// expiration.
if (message.getExpiration() != 0) {
if (message.getTimestamp() == 0) {
message.setTimestamp(System.currentTimeMillis());
message.setExpiration(message.getTimestamp() + message.getExpiration());
}
}
message.onSend();
sendToActiveMQ(message, new ResponseHandler() {
@Override
public void onResponse(IAmqpProtocolConverter converter, Response response) throws IOException {
if (!delivery.remotelySettled()) {
if (response.isException()) {
ExceptionResponse er = (ExceptionResponse) response;
Rejected rejected = new Rejected();
ErrorCondition condition = new ErrorCondition();
condition.setCondition(Symbol.valueOf("failed"));
condition.setDescription(er.getException().getMessage());
rejected.setError(condition);
delivery.disposition(rejected);
}
}
receiver.flow(1);
delivery.disposition(Accepted.getInstance());
delivery.settle();
pumpProtonToSocket();
}
});
}
}
@Override
public void onClose() throws Exception {
if (!closed) {
sendToActiveMQ(new RemoveInfo(producerId), null);
}
}
}