diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java index 62a05b9cdb..7e249575b2 100644 --- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java @@ -18,6 +18,7 @@ package org.apache.activemq.transport.amqp; import java.io.IOException; import java.io.UnsupportedEncodingException; +import java.nio.ByteBuffer; import java.util.EnumSet; import java.util.HashMap; import java.util.HashSet; @@ -159,16 +160,13 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter { void pumpProtonToSocket() { try { - int size = 1024 * 64; - byte data[] = new byte[size]; boolean done = false; while (!done) { - int count = protonTransport.output(data, 0, size); - if (count > 0) { - final Buffer buffer; - buffer = new Buffer(data, 0, count); - // System.out.println("writing: " + buffer.toString().substring(5).replaceAll("(..)", "$1 ")); - amqpTransport.sendToAmqp(buffer); + ByteBuffer toWrite = protonTransport.getOutputBuffer(); + if (toWrite != null && toWrite.hasRemaining()) { +// // System.out.println("writing: " + buffer.toString().substring(5).replaceAll("(..)", "$1 ")); + amqpTransport.sendToAmqp(toWrite); + protonTransport.outputConsumed(); } else { done = true; } @@ -248,10 +246,12 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter { sasl.done(Sasl.SaslOutcome.PN_SASL_OK); amqpTransport.getWireFormat().magicRead = false; sasl = null; + LOG.debug("SASL [PLAIN] Handshake complete."); } else if ("ANONYMOUS".equals(sasl.getRemoteMechanisms()[0])) { sasl.done(Sasl.SaslOutcome.PN_SASL_OK); amqpTransport.getWireFormat().magicRead = false; sasl = null; + LOG.debug("SASL [ANONYMOUS] Handshake complete."); } } } diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpWireFormat.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpWireFormat.java index 4a11374b40..13a264a66e 100644 --- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpWireFormat.java +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpWireFormat.java @@ -21,6 +21,10 @@ import java.io.DataInputStream; import java.io.DataOutput; import java.io.DataOutputStream; import java.io.IOException; +import java.io.OutputStream; +import java.nio.ByteBuffer; +import java.nio.channels.Channels; +import java.nio.channels.WritableByteChannel; import org.apache.activemq.util.ByteArrayInputStream; import org.apache.activemq.util.ByteArrayOutputStream; @@ -53,8 +57,21 @@ public class AmqpWireFormat implements WireFormat { @Override public void marshal(Object command, DataOutput dataOut) throws IOException { - Buffer frame = (Buffer) command; - frame.writeTo(dataOut); + if (command instanceof ByteBuffer) { + ByteBuffer buffer = (ByteBuffer) command; + + if (dataOut instanceof OutputStream) { + WritableByteChannel channel = Channels.newChannel((OutputStream) dataOut); + channel.write(buffer); + } else { + while (buffer.hasRemaining()) { + dataOut.writeByte(buffer.get()); + } + } + } else { + Buffer frame = (Buffer) command; + frame.writeTo(dataOut); + } } boolean magicRead = false;