More AMQP impl changes. More tests pass, initial cut of transaction support is in.

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1400116 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Hiram R. Chirino 2012-10-19 14:28:02 +00:00
parent 5d9695c5ef
commit 2db73e2b7e
9 changed files with 563 additions and 120 deletions

View File

@ -24,7 +24,18 @@ import org.apache.activemq.util.IdGenerator;
import org.apache.activemq.util.LongSequenceGenerator;
import org.apache.qpid.proton.engine.*;
import org.apache.qpid.proton.engine.impl.ConnectionImpl;
import org.apache.qpid.proton.engine.impl.ProtocolTracer;
import org.apache.qpid.proton.engine.impl.TransportImpl;
import org.apache.qpid.proton.framing.TransportFrame;
import org.apache.qpid.proton.type.Binary;
import org.apache.qpid.proton.type.messaging.*;
import org.apache.qpid.proton.type.messaging.Modified;
import org.apache.qpid.proton.type.messaging.Rejected;
import org.apache.qpid.proton.type.messaging.Released;
import org.apache.qpid.proton.type.transaction.*;
import org.apache.qpid.proton.type.transport.DeliveryState;
import org.apache.qpid.proton.type.transport.SenderSettleMode;
import org.apache.qpid.proton.type.transport.Source;
import org.fusesource.hawtbuf.Buffer;
import org.fusesource.hawtbuf.ByteArrayOutputStream;
import org.slf4j.Logger;
@ -43,8 +54,9 @@ class AmqpProtocolConverter {
public static final EnumSet<EndpointState> INITIALIZED_SET = EnumSet.complementOf(UNINITIALIZED_SET);
public static final EnumSet<EndpointState> ACTIVE_STATE = EnumSet.of(EndpointState.ACTIVE);
public static final EnumSet<EndpointState> CLOSED_STATE = EnumSet.of(EndpointState.CLOSED);
public static final EnumSet<EndpointState> ALL_STATES = EnumSet.of(EndpointState.CLOSED, EndpointState.ACTIVE, EndpointState.UNINITIALIZED);
private static final Logger LOG = LoggerFactory.getLogger(AmqpProtocolConverter.class);
static final public byte[] EMPTY_BYTE_ARRAY = new byte[]{};
private final AmqpTransport amqpTransport;
public AmqpProtocolConverter(AmqpTransport amqpTransport, BrokerContext brokerContext) {
@ -78,6 +90,18 @@ class AmqpProtocolConverter {
{
this.protonTransport.bind(this.protonConnection);
this.protonTransport.setProtocolTracer(new ProtocolTracer() {
@Override
public void receivedFrame(TransportFrame transportFrame) {
System.out.println(String.format("RECV: %05d | %s", transportFrame.getChannel(), transportFrame.getBody()));
}
@Override
public void sentFrame(TransportFrame transportFrame) {
System.out.println(String.format("SENT: %05d | %s", transportFrame.getChannel(), transportFrame.getBody()));
}
});
}
void pumpProtonToSocket() {
@ -88,7 +112,8 @@ class AmqpProtocolConverter {
while (!done) {
int count = protonTransport.output(data, 0, size);
if (count > 0) {
final Buffer buffer = new Buffer(data, 0, count);
final Buffer buffer;
buffer = new Buffer(data, 0, count);
// System.out.println("writing: " + buffer.toString().substring(5).replaceAll("(..)", "$1 "));
amqpTransport.sendToAmqp(buffer);
} else {
@ -107,7 +132,7 @@ class AmqpProtocolConverter {
long nextConsumerId = 0;
public AmqpSessionContext(ConnectionId connectionId, long id) {
sessionId = new SessionId(connectionId, -1);
sessionId = new SessionId(connectionId, id);
}
}
@ -163,6 +188,13 @@ class AmqpProtocolConverter {
link = link.next(ACTIVE_STATE, CLOSED_STATE);
}
link = protonConnection.linkHead(ACTIVE_STATE, ALL_STATES);
while (link != null) {
((AmqpDeliveryListener)link.getContext()).drainCheck();
link = link.next(ACTIVE_STATE, CLOSED_STATE);
}
session = protonConnection.sessionHead(ACTIVE_STATE, CLOSED_STATE);
while (session != null) {
//TODO - close links?
@ -170,8 +202,7 @@ class AmqpProtocolConverter {
session = session.next(ACTIVE_STATE, CLOSED_STATE);
}
if (protonConnection.getLocalState() == EndpointState.ACTIVE && protonConnection.getRemoteState() == EndpointState.CLOSED) {
// listener.onConnectionClose(protonConnection);
protonConnection.close();
doClose();
}
} catch (Throwable e) {
@ -181,6 +212,35 @@ class AmqpProtocolConverter {
pumpProtonToSocket();
}
boolean closing = false;
boolean closedSocket = false;
private void doClose() {
if( !closing ) {
closing = true;
sendToActiveMQ(new RemoveInfo(connectionId), new ResponseHandler() {
@Override
public void onResponse(AmqpProtocolConverter converter, Response response) throws IOException {
protonConnection.close();
if( !closedSocket) {
pumpProtonToSocket();
}
}
});
}
}
public void onAMQPException(IOException error) {
closedSocket = true;
if( !closing) {
System.out.println("AMQP client disconnected");
error.printStackTrace();
} else {
doClose();
}
}
public void onActiveMQCommand(Command command) throws Exception {
if (command.isResponse()) {
Response response = (Response) command;
@ -221,6 +281,7 @@ class AmqpProtocolConverter {
static abstract class AmqpDeliveryListener {
abstract public void onDelivery(Delivery delivery) throws Exception;
public void onClose() throws Exception {}
public void drainCheck() {}
}
private void onConnectionOpen() throws AmqpProtocolException {
@ -278,13 +339,17 @@ class AmqpProtocolConverter {
private void onSessionClose(Session session) {
AmqpSessionContext sessionContext = (AmqpSessionContext)session.getContext();
sendToActiveMQ(new RemoveInfo(sessionContext.sessionId), null);
if( sessionContext!=null ) {
System.out.println(sessionContext.sessionId);
sendToActiveMQ(new RemoveInfo(sessionContext.sessionId), null);
session.setContext(null);
}
session.close();
}
private void onLinkOpen(Link link) {
link.setLocalSourceAddress(link.getRemoteSourceAddress());
link.setLocalTargetAddress(link.getRemoteTargetAddress());
link.setSource(link.getRemoteSource());
link.setTarget(link.getRemoteTarget());
AmqpSessionContext sessionContext = (AmqpSessionContext) link.getSession().getContext();
if (link instanceof Receiver) {
@ -296,18 +361,10 @@ class AmqpProtocolConverter {
InboundTransformer inboundTransformer = new AMQPNativeInboundTransformer(ActiveMQJMSVendor.INSTANCE);
class ProducerContext extends AmqpDeliveryListener {
private final ProducerId producerId;
private final LongSequenceGenerator messageIdGenerator = new LongSequenceGenerator();
private final ActiveMQDestination destination;
abstract class BaseProducerContext extends AmqpDeliveryListener {
ByteArrayOutputStream current = new ByteArrayOutputStream();
public ProducerContext(ProducerId producerId, ActiveMQDestination destination) {
this.producerId = producerId;
this.destination = destination;
}
@Override
public void onDelivery(Delivery delivery) throws Exception {
Receiver receiver = ((Receiver)delivery.getLink());
@ -336,7 +393,26 @@ class AmqpProtocolConverter {
receiver.advance();
delivery.settle();
final Buffer buffer = current.toBuffer();
Buffer buffer = current.toBuffer();
current = null;
onMessage(receiver, delivery, buffer);
}
abstract protected void onMessage(Receiver receiver, Delivery delivery, Buffer buffer) throws Exception;
}
class ProducerContext extends BaseProducerContext {
private final ProducerId producerId;
private final LongSequenceGenerator messageIdGenerator = new LongSequenceGenerator();
private final ActiveMQDestination destination;
public ProducerContext(ProducerId producerId, ActiveMQDestination destination) {
this.producerId = producerId;
this.destination = destination;
}
@Override
protected void onMessage(Receiver receiver, Delivery delivery, Buffer buffer) throws Exception {
EncodedMessage em = new EncodedMessage(delivery.getMessageFormat(), buffer.data, buffer.offset, buffer.length);
final ActiveMQMessage message = (ActiveMQMessage) inboundTransformer.transform(em);
current = null;
@ -348,6 +424,14 @@ class AmqpProtocolConverter {
if( message.getMessageId()==null ) {
message.setMessageId(new MessageId(producerId, messageIdGenerator.getNextSequenceId()));
}
DeliveryState remoteState = delivery.getRemoteState();
if( remoteState!=null && remoteState instanceof TransactionalState) {
TransactionalState s = (TransactionalState) remoteState;
long txid = toLong(s.getTxnId());
message.setTransactionId(new LocalTransactionId(connectionId, txid));
}
message.onSend();
// sendToActiveMQ(message, createResponseHandler(command));
sendToActiveMQ(message, null);
@ -355,37 +439,166 @@ class AmqpProtocolConverter {
}
long nextTransactionId = 0;
class Transaction {
}
HashMap<Long, Transaction> transactions = new HashMap<Long, Transaction>();
public byte[] toBytes(long value) {
Buffer buffer = new Buffer(8);
buffer.bigEndianEditor().writeLong(value);
return buffer.data;
}
private long toLong(Binary value) {
Buffer buffer = new Buffer(value.getArray(), value.getArrayOffset(), value.getLength());
return buffer.bigEndianEditor().readLong();
}
AmqpDeliveryListener coordinatorContext = new BaseProducerContext() {
@Override
protected void onMessage(Receiver receiver, final Delivery delivery, Buffer buffer) throws Exception {
org.apache.qpid.proton.message.Message msg = new org.apache.qpid.proton.message.Message();
int offset = buffer.offset;
int len = buffer.length;
while( len > 0 ) {
final int decoded = msg.decode(buffer.data, offset, len);
assert decoded > 0: "Make progress decoding the message";
offset += decoded;
len -= decoded;
}
Object action = ((AmqpValue)msg.getBody()).getValue();
System.out.println("COORDINATOR received: "+action+", ["+buffer+"]");
if( action instanceof Declare ) {
Declare declare = (Declare) action;
if( declare.getGlobalId()!=null ) {
throw new Exception("don't know how to handle a declare /w a set GlobalId");
}
long txid = nextTransactionId++;
TransactionInfo txinfo = new TransactionInfo(connectionId, new LocalTransactionId(connectionId, txid), TransactionInfo.BEGIN);
sendToActiveMQ(txinfo, null);
System.out.println("started transaction "+txid);
Declared declared = new Declared();
declared.setTxnId(new Binary(toBytes(txid)));
delivery.disposition(declared);
delivery.settle();
} else if( action instanceof Discharge) {
Discharge discharge = (Discharge) action;
long txid = toLong(discharge.getTxnId());
byte operation;
if( discharge.getFail() ) {
System.out.println("rollback transaction "+txid);
operation = TransactionInfo.ROLLBACK ;
} else {
System.out.println("commit transaction "+txid);
operation = TransactionInfo.COMMIT_ONE_PHASE;
}
TransactionInfo txinfo = new TransactionInfo(connectionId, new LocalTransactionId(connectionId, txid), operation);
sendToActiveMQ(txinfo, new ResponseHandler() {
@Override
public void onResponse(AmqpProtocolConverter converter, Response response) throws IOException {
if( response.isException() ) {
ExceptionResponse er = (ExceptionResponse)response;
Rejected rejected = new Rejected();
ArrayList errors = new ArrayList();
errors.add(er.getException().getMessage());
rejected.setError(errors);
delivery.disposition(rejected);
}
delivery.settle();
pumpProtonToSocket();
}
});
receiver.advance();
} else {
throw new Exception("Expected coordinator message type: "+action.getClass());
}
}
};
void onReceiverOpen(final Receiver receiver, AmqpSessionContext sessionContext) {
// Client is producing to this receiver object
org.apache.qpid.proton.type.transport.Target remoteTarget = receiver.getRemoteTarget();
if( remoteTarget instanceof Coordinator ) {
pumpProtonToSocket();
receiver.setContext(coordinatorContext);
receiver.flow(1024 * 64);
receiver.open();
pumpProtonToSocket();
} else {
ProducerId producerId = new ProducerId(sessionContext.sessionId, sessionContext.nextProducerId++);
ActiveMQDestination dest = createDestination(remoteTarget);
ProducerContext producerContext = new ProducerContext(producerId, dest);
ProducerId producerId = new ProducerId(sessionContext.sessionId, sessionContext.nextProducerId++);
ActiveMQDestination dest = ActiveMQDestination.createDestination(receiver.getRemoteTargetAddress(), ActiveMQDestination.QUEUE_TYPE);
ProducerContext producerContext = new ProducerContext(producerId, dest);
receiver.setContext(producerContext);
receiver.flow(1024 * 64);
ProducerInfo producerInfo = new ProducerInfo(producerId);
producerInfo.setDestination(dest);
sendToActiveMQ(producerInfo, new ResponseHandler() {
public void onResponse(AmqpProtocolConverter converter, Response response) throws IOException {
receiver.open();
if (response.isException()) {
// If the connection attempt fails we close the socket.
Throwable exception = ((ExceptionResponse) response).getException();
receiver.close();
receiver.setContext(producerContext);
receiver.flow(1024 * 64);
ProducerInfo producerInfo = new ProducerInfo(producerId);
producerInfo.setDestination(dest);
sendToActiveMQ(producerInfo, new ResponseHandler() {
public void onResponse(AmqpProtocolConverter converter, Response response) throws IOException {
receiver.open();
if (response.isException()) {
// If the connection attempt fails we close the socket.
Throwable exception = ((ExceptionResponse) response).getException();
receiver.close();
}
pumpProtonToSocket();
}
pumpProtonToSocket();
}
});
});
}
}
private ActiveMQDestination createDestination(Object terminus) {
if( terminus == null ) {
return null;
} else if( terminus instanceof org.apache.qpid.proton.type.messaging.Source) {
org.apache.qpid.proton.type.messaging.Source source = (org.apache.qpid.proton.type.messaging.Source)terminus;
return ActiveMQDestination.createDestination(source.getAddress(), ActiveMQDestination.QUEUE_TYPE);
} else if( terminus instanceof org.apache.qpid.proton.type.messaging.Target) {
org.apache.qpid.proton.type.messaging.Target target = (org.apache.qpid.proton.type.messaging.Target)terminus;
return ActiveMQDestination.createDestination(target.getAddress(), ActiveMQDestination.QUEUE_TYPE);
} else if( terminus instanceof Coordinator ) {
Coordinator target = (Coordinator)terminus;
return null;
} else {
throw new RuntimeException("Unexpected terminus type: "+terminus);
}
}
private Source createSource(ActiveMQDestination dest) {
org.apache.qpid.proton.type.messaging.Source rc = new org.apache.qpid.proton.type.messaging.Source();
rc.setAddress(inboundTransformer.getVendor().toAddress(dest));
return rc;
}
OutboundTransformer outboundTransformer = new AutoOutboundTransformer(ActiveMQJMSVendor.INSTANCE);
class ConsumerContext extends AmqpDeliveryListener {
private final ConsumerId consumerId;
private final Sender sender;
private boolean presettle;
public ConsumerContext(ConsumerId consumerId, Sender sender) {
this.consumerId = consumerId;
this.sender = sender;
this.presettle = sender.getRemoteSenderSettleMode() == SenderSettleMode.SETTLED;
}
long nextTagId = 0;
HashSet<byte[]> tagCache = new HashSet<byte[]>();
@ -406,11 +619,13 @@ class AmqpProtocolConverter {
return rc;
}
public ConsumerContext(ConsumerId consumerId, Sender sender) {
this.consumerId = consumerId;
this.sender = sender;
void checkinTag(byte[] data) {
if( tagCache.size() < 1024 ) {
tagCache.add(data);
}
}
@Override
public void onClose() throws Exception {
sendToActiveMQ(new RemoveInfo(consumerId), null);
@ -428,7 +643,7 @@ class AmqpProtocolConverter {
Buffer currentBuffer;
Delivery currentDelivery;
public void pumpOutbound() {
public void pumpOutbound() throws Exception {
while(true) {
while( currentBuffer !=null ) {
@ -436,8 +651,11 @@ class AmqpProtocolConverter {
if( sent > 0 ) {
currentBuffer.moveHead(sent);
if( currentBuffer.length == 0 ) {
currentDelivery.settle();
sender.advance();
if( presettle ) {
settle(currentDelivery, MessageAck.INDIVIDUAL_ACK_TYPE);
} else {
sender.advance();
}
currentBuffer = null;
currentDelivery = null;
}
@ -453,12 +671,17 @@ class AmqpProtocolConverter {
final MessageDispatch md = outbound.removeFirst();
try {
final ActiveMQMessage jms = (ActiveMQMessage) md.getMessage();
jms.setRedeliveryCounter(md.getRedeliveryCounter());
final EncodedMessage amqp = outboundTransformer.transform(jms);
if( amqp!=null && amqp.getLength() > 0 ) {
currentBuffer = new Buffer(amqp.getArray(), amqp.getArrayOffset(), amqp.getLength());
final byte[] tag = nextTag();
currentDelivery = sender.delivery(tag, 0, tag.length);
if( presettle ) {
currentDelivery = sender.delivery(EMPTY_BYTE_ARRAY, 0, 0);
} else {
final byte[] tag = nextTag();
currentDelivery = sender.delivery(tag, 0, tag.length);
}
currentDelivery.setContext(md);
} else {
@ -471,12 +694,81 @@ class AmqpProtocolConverter {
}
}
@Override
public void onDelivery(Delivery delivery) throws JMSException {
if( delivery.remotelySettled() ) {
MessageDispatch md = (MessageDispatch) delivery.getContext();
pumpOutbound();
private void settle(final Delivery delivery, int ackType) throws Exception {
byte[] tag = delivery.getTag();
if( tag !=null && tag.length>0 ) {
checkinTag(tag);
}
if( ackType == -1) {
// we are going to settle, but redeliver.. we we won't yet ack to ActiveMQ
delivery.settle();
onMessageDispatch((MessageDispatch) delivery.getContext());
} else {
MessageDispatch md = (MessageDispatch) delivery.getContext();
MessageAck ack = new MessageAck();
ack.setConsumerId(consumerId);
ack.setFirstMessageId(md.getMessage().getMessageId());
ack.setLastMessageId(md.getMessage().getMessageId());
ack.setMessageCount(1);
ack.setAckType((byte)ackType);
DeliveryState remoteState = delivery.getRemoteState();
if( remoteState!=null && remoteState instanceof TransactionalState) {
TransactionalState s = (TransactionalState) remoteState;
long txid = toLong(s.getTxnId());
ack.setTransactionId(new LocalTransactionId(connectionId, txid));
}
sendToActiveMQ(ack, new ResponseHandler() {
@Override
public void onResponse(AmqpProtocolConverter converter, Response response) throws IOException {
delivery.settle();
pumpProtonToSocket();
}
});
}
}
@Override
public void drainCheck() {
if( outbound.isEmpty() ) {
sender.drained();
}
}
@Override
public void onDelivery(Delivery delivery) throws Exception {
MessageDispatch md = (MessageDispatch) delivery.getContext();
final DeliveryState state = delivery.getRemoteState();
if( state instanceof Accepted ) {
if( !delivery.remotelySettled() ) {
delivery.disposition(new Accepted());
}
settle(delivery, MessageAck.INDIVIDUAL_ACK_TYPE);
} else if( state instanceof Rejected) {
// re-deliver /w incremented delivery counter.
md.setRedeliveryCounter(md.getRedeliveryCounter() + 1);
settle(delivery, -1);
} else if( state instanceof Released) {
// re-deliver && don't increment the counter.
settle(delivery, -1);
} else if( state instanceof Modified) {
Modified modified = (Modified) state;
if ( modified.getDeliveryFailed() ) {
// increment delivery counter..
md.setRedeliveryCounter(md.getRedeliveryCounter() + 1);
}
byte ackType = -1;
Boolean undeliverableHere = modified.getUndeliverableHere();
if( undeliverableHere !=null && undeliverableHere ) {
// receiver does not want the message..
// perhaps we should DLQ it?
ackType = MessageAck.POSION_ACK_TYPE;
}
settle(delivery, ackType);
}
pumpOutbound();
}
}
@ -485,14 +777,16 @@ class AmqpProtocolConverter {
void onSenderOpen(final Sender sender, AmqpSessionContext sessionContext) {
// sender.get
ConsumerId id = new ConsumerId(sessionContext.sessionId, sessionContext.nextConsumerId++);
ConsumerContext consumerContext = new ConsumerContext(id, sender);
subscriptionsByConsumerId.put(id, consumerContext);
ActiveMQDestination dest;
if( sender.getRemoteSourceAddress() != null ) {
dest = ActiveMQDestination.createDestination(sender.getRemoteSourceAddress(), ActiveMQDestination.QUEUE_TYPE);
final Source remoteSource = sender.getRemoteSource();
if( remoteSource != null ) {
dest = createDestination(remoteSource);
} else {
// lets create a temp dest.
// if (topic) {
@ -507,7 +801,7 @@ class AmqpProtocolConverter {
info.setDestination(dest);
sendToActiveMQ(info, null);
tempDestinations.put(sender, dest);
sender.setLocalSourceAddress(inboundTransformer.getVendor().toAddress(dest));
sender.setSource(createSource(dest));
}

View File

@ -70,6 +70,20 @@ public class AmqpTransportFilter extends TransportFilter implements AmqpTranspor
}
}
@Override
public void onException(IOException error) {
try {
protocolConverter.lock.lock();
try {
protocolConverter.onAMQPException(error);
} finally {
protocolConverter.lock.unlock();
}
} finally {
super.onException(error);
}
}
public void onCommand(Object command) {
try {
if (trace) {
@ -140,6 +154,8 @@ public class AmqpTransportFilter extends TransportFilter implements AmqpTranspor
return this.wireFormat;
}
public void handleException(IOException e) {
super.onException(e);
}

View File

@ -16,10 +16,15 @@
*/
package org.apache.activemq.transport.amqp.transform;
import org.apache.qpid.proton.codec.CompositeWritableBuffer;
import org.apache.qpid.proton.codec.WritableBuffer;
import org.apache.qpid.proton.type.UnsignedInteger;
import javax.jms.BytesMessage;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageFormatException;
import java.nio.ByteBuffer;
/**
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
@ -54,8 +59,43 @@ public class AMQPNativeOutboundTransformer extends OutboundTransformer {
return null;
}
byte data[] = new byte[(int) msg.getBodyLength()];
int dataSize = data.length;
msg.readBytes(data);
return new EncodedMessage(messageFormat, data, 0, data.length);
msg.reset();
try {
int count = msg.getIntProperty("JMSXDeliveryCount");
if( count > 1 ) {
// decode...
org.apache.qpid.proton.message.Message amqp = new org.apache.qpid.proton.message.Message();
int offset = 0;
int len = data.length;
while( len > 0 ) {
final int decoded = amqp.decode(data, offset, len);
assert decoded > 0: "Make progress decoding the message";
offset += decoded;
len -= decoded;
}
// Update the DeliveryCount header...
amqp.getHeader().setDeliveryCount(new UnsignedInteger(count));
// Re-encode...
ByteBuffer buffer = ByteBuffer.wrap(new byte[1024*4]);
final DroppingWritableBuffer overflow = new DroppingWritableBuffer();
int c = amqp.encode(new CompositeWritableBuffer(new WritableBuffer.ByteBufferWrapper(buffer), overflow));
if( overflow.position() > 0 ) {
buffer = ByteBuffer.wrap(new byte[1024*4+overflow.position()]);
c = amqp.encode(new WritableBuffer.ByteBufferWrapper(buffer));
}
data = buffer.array();
dataSize = c;
}
} catch (JMSException e) {
}
return new EncodedMessage(messageFormat, data, 0, dataSize);
}
}

View File

@ -14,4 +14,4 @@
## See the License for the specific language governing permissions and
## limitations under the License.
## ---------------------------------------------------------------------------
class=org.apache.activemq.transport.amqp.AmqpSslTransportFactory
class=org.apache.activemq.transport.amqp.AMQPSslTransportFactory

View File

@ -19,12 +19,15 @@ package org.apache.activemq.transport.amqp;
import junit.framework.TestCase;
import org.apache.activemq.AutoFailTestSupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.SslContext;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.spring.SpringSslContext;
import org.junit.After;
import org.junit.Before;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.util.Vector;
import static org.fusesource.hawtbuf.UTF8Buffer.utf8;
@ -40,21 +43,51 @@ public class AmqpTestSupport {
protected int numberOfMessages;
AutoFailTestSupport autoFailTestSupport = new AutoFailTestSupport() {};
protected int port;
protected int sslPort;
public static void main(String[] args) throws Exception {
final AmqpTestSupport s = new AmqpTestSupport();
s.sslPort = 5671;
s.port = 5672;
s.startBroker();
while(true) {
Thread.sleep(100000);
}
}
@Before
public void startBroker() throws Exception {
public void setUp() throws Exception {
autoFailTestSupport.startAutoFailThread();
exceptions.clear();
startBroker();
}
public void startBroker() throws Exception {
brokerService = new BrokerService();
brokerService.setPersistent(false);
brokerService.setAdvisorySupport(false);
// Setup SSL context...
final File classesDir = new File(AmqpProtocolConverter.class.getProtectionDomain().getCodeSource().getLocation().getFile());
File keystore = new File(classesDir, "../../src/test/resources/keystore");
final SpringSslContext sslContext = new SpringSslContext();
sslContext.setKeyStore(keystore.getCanonicalPath());
sslContext.setKeyStorePassword("password");
sslContext.setTrustStore(keystore.getCanonicalPath());
sslContext.setTrustStorePassword("password");
sslContext.afterPropertiesSet();
brokerService.setSslContext(sslContext);
addAMQPConnector();
brokerService.start();
this.numberOfMessages = 2000;
}
protected void addAMQPConnector() throws Exception {
final TransportConnector connector = brokerService.addConnector("amqp://localhost:0");
TransportConnector connector =brokerService.addConnector("amqp+ssl://0.0.0.0:"+sslPort);
sslPort = connector.getConnectUri().getPort();
connector = brokerService.addConnector("amqp://0.0.0.0:"+port);
port = connector.getConnectUri().getPort();
}

View File

@ -16,6 +16,7 @@
*/
package org.apache.activemq.transport.amqp;
import org.apache.activemq.transport.amqp.joram.ActiveMQAdmin;
import org.apache.qpid.amqp_1_0.jms.impl.ConnectionFactoryImpl;
import org.apache.qpid.amqp_1_0.jms.impl.QueueImpl;
import org.junit.Test;
@ -30,59 +31,114 @@ import static org.junit.Assert.assertEquals;
public class JMSClientTest extends AmqpTestSupport {
@Test
public void testSendReceive() throws Exception {
public void testTransactions() throws Exception {
ActiveMQAdmin.enableJMSFrameTracing();
QueueImpl queue = new QueueImpl("queue://txqueue");
QueueImpl queue = new QueueImpl("queue://testqueue");
int nMsgs = 100;
final String dataFormat = "%010240d";
Connection connection = createConnection();
{
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer p = session.createProducer(queue);
p.send(session.createTextMessage("Hello World"));
// session.commit();
final ConnectionFactoryImpl factory = new ConnectionFactoryImpl("localhost", port, null, null);
MessageConsumer c = session.createConsumer(queue);
Message msg = c.receive();
System.out.println("first:"+msg);
System.out.println(msg.getJMSRedelivered());
try {
final Connection connection = factory.createConnection();
connection.setExceptionListener(new ExceptionListener() {
@Override
public void onException(JMSException exception) {
exception.printStackTrace();
}
});
connection.start();
{
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer p = session.createProducer(queue);
for (int i = 0; i < nMsgs; i++) {
System.out.println("Sending " + i);
p.send(session.createTextMessage(String.format(dataFormat, i)));
}
p.close();
session.close();
}
System.out.println("=======================================================================================");
System.out.println(" receiving ");
System.out.println("=======================================================================================");
{
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer c = session.createConsumer(queue);
// Receive messages non-transacted
int i = 0;
while ( i < nMsgs) {
TextMessage msg = (TextMessage) c.receive();
if( msg!=null ) {
String s = msg.getText();
assertEquals(String.format(dataFormat, i), s);
System.out.println("Received: " + i);
i++;
}
}
c.close();
session.close();
}
connection.close();
} catch (Exception e) {
e.printStackTrace();
// session.rollback();
//
// msg = c.receive();
// System.out.println("second:"+msg);
// System.out.println(msg.getJMSRedelivered());
}
connection.close();
}
// @Test
// public void testSendReceive() throws Exception {
// ActiveMQAdmin.enableJMSFrameTracing();
// QueueImpl queue = new QueueImpl("queue://testqueue");
// int nMsgs = 1;
// final String dataFormat = "%01024d";
//
//
// try {
// Connection connection = createConnection();
// {
// Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// MessageProducer p = session.createProducer(queue);
// for (int i = 0; i < nMsgs; i++) {
// System.out.println("Sending " + i);
// p.send(session.createTextMessage(String.format(dataFormat, i)));
// }
// }
// connection.close();
//
// System.out.println("=======================================================================================");
// System.out.println(" failing a receive ");
// System.out.println("=======================================================================================");
// connection = createConnection();
// {
// Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
// MessageConsumer c = session.createConsumer(queue);
//
// // Receive messages non-transacted
// int i = 0;
// while ( i < 1) {
// TextMessage msg = (TextMessage) c.receive();
// if( msg!=null ) {
// String s = msg.getText();
// assertEquals(String.format(dataFormat, i), s);
// System.out.println("Received: " + i);
// i++;
// }
// }
// }
// connection.close();
//
//
// System.out.println("=======================================================================================");
// System.out.println(" receiving ");
// System.out.println("=======================================================================================");
// connection = createConnection();
// {
// Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// MessageConsumer c = session.createConsumer(queue);
//
// // Receive messages non-transacted
// int i = 0;
// while ( i < nMsgs) {
// TextMessage msg = (TextMessage) c.receive();
// if( msg!=null ) {
// String s = msg.getText();
// assertEquals(String.format(dataFormat, i), s);
// System.out.println("Received: " + i);
// i++;
// }
// }
// }
// connection.close();
//
// } catch (Exception e) {
// e.printStackTrace();
// }
//
// }
private Connection createConnection() throws JMSException {
final ConnectionFactoryImpl factory = new ConnectionFactoryImpl("localhost", port, null, null);
final Connection connection = factory.createConnection();
connection.setExceptionListener(new ExceptionListener() {
@Override
public void onException(JMSException exception) {
exception.printStackTrace();
}
});
connection.start();
return connection;
}
}

View File

@ -29,6 +29,9 @@ import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.PrintStream;
import java.net.URI;
import java.util.Hashtable;
import java.util.logging.*;
@ -53,17 +56,18 @@ public class ActiveMQAdmin implements Admin {
}
}
private void enableJMSFrameTracing() {
static public void enableJMSFrameTracing() throws FileNotFoundException {
final SimpleFormatter formatter = new SimpleFormatter();
final PrintStream out = new PrintStream(new FileOutputStream(new File("/tmp/amqp-trace.txt")));
Handler handler = new Handler() {
@Override
public void publish(LogRecord r) {
System.out.println(String.format("%s:%s", r.getLoggerName(), r.getMessage()));
out.println(String.format("%s:%s", r.getLoggerName(), r.getMessage()));
}
@Override
public void flush() {
System.out.flush();
out.flush();
}
@Override
@ -71,7 +75,7 @@ public class ActiveMQAdmin implements Admin {
}
};
Logger log = Logger.getLogger("RAW");
Logger log = Logger.getLogger("FRM");
log.addHandler(handler);
log.setLevel(Level.FINEST);
}

View File

@ -47,6 +47,8 @@ public class JoramJmsTest extends TestCase {
TestSuite suite = new TestSuite();
// Passing tests
suite.addTestSuite(ConnectionTest.class);
suite.addTestSuite(SessionTest.class);
suite.addTestSuite(JMSXPropertyTest.class);
suite.addTestSuite(MessageBodyTest.class);
suite.addTestSuite(MessageDefaultTest.class);
@ -54,13 +56,7 @@ public class JoramJmsTest extends TestCase {
suite.addTestSuite(MessagePropertyTest.class);
if (false ) {
// TODO: Fails due to JMS client impl error.
suite.addTestSuite(UnifiedSessionTest.class);
// TODO: Fails due to https://issues.apache.org/jira/browse/PROTON-62: ClassCastException when processing an Attach frame
suite.addTestSuite(QueueSessionTest.class);
suite.addTestSuite(SessionTest.class);
// TODO: Fails due to inconsistent ObjectMessage mapping in the JMS client.
suite.addTestSuite(MessageTypeTest.class);
// TODO: Fails due to temp destinations not being supported yet.
suite.addTestSuite(MessageHeaderTest.class);
suite.addTestSuite(TemporaryQueueTest.class);
@ -68,13 +64,17 @@ public class JoramJmsTest extends TestCase {
// TODO: Fails due to selectors not being implemented yet.
suite.addTestSuite(SelectorSyntaxTest.class);
suite.addTestSuite(SelectorTest.class);
suite.addTestSuite(QueueSessionTest.class);
// TODO: Browsers not yet supported.
suite.addTestSuite(QueueBrowserTest.class);
// TODO: Fails due to JMS client impl error.
suite.addTestSuite(UnifiedSessionTest.class);
// TODO: Fails due to inconsistent ObjectMessage mapping in the JMS client.
suite.addTestSuite(MessageTypeTest.class);
// TODO: Fails due to: javax.jms.IllegalStateException: Cannot set client-id to "publisherConnection"; client-id must be set on connection creation
suite.addTestSuite(TopicConnectionTest.class);
suite.addTestSuite(TopicSessionTest.class);
// TODO: figure out why the following tests fail..
// TODO: figure out why the following tests hang..
suite.addTestSuite(ConnectionTest.class);
suite.addTestSuite(QueueBrowserTest.class);
}
return suite;

Binary file not shown.