AMQP impl: A simple send and receive is now working

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1393790 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Hiram R. Chirino 2012-10-03 22:01:08 +00:00
parent 7fe30bc0cc
commit e7137b7eae
3 changed files with 25 additions and 25 deletions

View File

@ -86,7 +86,7 @@ class AmqpProtocolConverter {
int count = protonTransport.output(data, 0, size); int count = protonTransport.output(data, 0, size);
if (count > 0) { if (count > 0) {
final Buffer buffer = new Buffer(data, 0, count); final Buffer buffer = new Buffer(data, 0, count);
System.out.println("writing: " + buffer.toString().substring(5).replaceAll("(..)", "$1 ")); // System.out.println("writing: " + buffer.toString().substring(5).replaceAll("(..)", "$1 "));
amqpTransport.sendToAmqp(buffer); amqpTransport.sendToAmqp(buffer);
} else { } else {
done = true; done = true;
@ -116,7 +116,7 @@ class AmqpProtocolConverter {
try { try {
System.out.println("reading: " + frame.toString().substring(5).replaceAll("(..)", "$1 ")); // System.out.println("reading: " + frame.toString().substring(5).replaceAll("(..)", "$1 "));
protonTransport.input(frame.data, frame.offset, frame.length); protonTransport.input(frame.data, frame.offset, frame.length);
} catch (Throwable e) { } catch (Throwable e) {
handleException(new AmqpProtocolException("Could not decode AMQP frame: " + frame, true, e)); handleException(new AmqpProtocolException("Could not decode AMQP frame: " + frame, true, e));

View File

@ -45,7 +45,7 @@ public class AMQPNativeInboundTransformer extends InboundTransformer {
} }
rc.setLongProperty(prefixVendor + "MESSAGE_FORMAT", messageFormat); rc.setLongProperty(prefixVendor + "MESSAGE_FORMAT", messageFormat);
rc.setBooleanProperty(prefixVendor + "NATIVE", false); rc.setBooleanProperty(prefixVendor + "NATIVE", true);
return rc; return rc;
} }
} }

View File

@ -23,6 +23,7 @@ import com.swiftmq.amqp.v100.messaging.AMQPMessage;
import com.swiftmq.amqp.v100.types.AMQPString; import com.swiftmq.amqp.v100.types.AMQPString;
import com.swiftmq.amqp.v100.types.AMQPType; import com.swiftmq.amqp.v100.types.AMQPType;
import org.junit.Test; import org.junit.Test;
import static org.junit.Assert.*;
/** /**
* @author <a href="http://hiramchirino.com">Hiram Chirino</a> * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
@ -50,39 +51,38 @@ public class SwiftMQClientTest extends AmqpTestSupport {
}); });
connection.connect(); connection.connect();
{ {
String data = String.format("%010d", 0);
Session session = connection.createSession(10, 10); Session session = connection.createSession(10, 10);
Producer p = session.createProducer(queue, qos); Producer p = session.createProducer(queue, qos);
for (int i = 0; i < nMsgs; i++) { for (int i = 0; i < nMsgs; i++) {
AMQPMessage msg = new AMQPMessage(); AMQPMessage msg = new AMQPMessage();
String s = "Message #" + (i + 1); System.out.println("Sending " + i);
System.out.println("Sending " + s); msg.setAmqpValue(new AmqpValue(new AMQPString(String.format("%010d", i))));
msg.setAmqpValue(new AmqpValue(new AMQPString(s + ", data: " + data)));
p.send(msg); p.send(msg);
} }
p.close(); p.close();
session.close(); session.close();
} }
// { {
// Session session = connection.createSession(10, 10); Session session = connection.createSession(10, 10);
// Consumer c = session.createConsumer(queue, 100, qos, true, null); Consumer c = session.createConsumer(queue, 100, qos, true, null);
//
// // Receive messages non-transacted // Receive messages non-transacted
// for (int i = 0; i < nMsgs; i++) { for (int i = 0; i < nMsgs; i++) {
// AMQPMessage msg = c.receive(); AMQPMessage msg = c.receive();
// final AMQPType value = msg.getAmqpValue().getValue(); final AMQPType value = msg.getAmqpValue().getValue();
// if (value instanceof AMQPString) { if (value instanceof AMQPString) {
// AMQPString s = (AMQPString) value; String s = ((AMQPString) value).getValue();
// System.out.println("Received: " + s.getValue()); assertEquals(String.format("%010d", i), s);
// } System.out.println("Received: " + i);
// if (!msg.isSettled()) }
// msg.accept(); if (!msg.isSettled())
// } msg.accept();
// c.close(); }
// session.close(); c.close();
// } session.close();
}
connection.close(); connection.close();
} catch (Exception e) { } catch (Exception e) {
e.printStackTrace(); e.printStackTrace();