Use non-deprecated output methods for proton to allow for faster bulk

sends of outbound amqp frames.
This commit is contained in:
Timothy Bish 2014-02-04 12:47:42 -05:00
parent 9eb7fb9062
commit e102e64e9d
2 changed files with 27 additions and 10 deletions

View File

@ -18,6 +18,7 @@ package org.apache.activemq.transport.amqp;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
@ -159,16 +160,13 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
void pumpProtonToSocket() {
try {
int size = 1024 * 64;
byte data[] = new byte[size];
boolean done = false;
while (!done) {
int count = protonTransport.output(data, 0, size);
if (count > 0) {
final Buffer buffer;
buffer = new Buffer(data, 0, count);
// System.out.println("writing: " + buffer.toString().substring(5).replaceAll("(..)", "$1 "));
amqpTransport.sendToAmqp(buffer);
ByteBuffer toWrite = protonTransport.getOutputBuffer();
if (toWrite != null && toWrite.hasRemaining()) {
// // System.out.println("writing: " + buffer.toString().substring(5).replaceAll("(..)", "$1 "));
amqpTransport.sendToAmqp(toWrite);
protonTransport.outputConsumed();
} else {
done = true;
}
@ -248,10 +246,12 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
sasl.done(Sasl.SaslOutcome.PN_SASL_OK);
amqpTransport.getWireFormat().magicRead = false;
sasl = null;
LOG.debug("SASL [PLAIN] Handshake complete.");
} else if ("ANONYMOUS".equals(sasl.getRemoteMechanisms()[0])) {
sasl.done(Sasl.SaslOutcome.PN_SASL_OK);
amqpTransport.getWireFormat().magicRead = false;
sasl = null;
LOG.debug("SASL [ANONYMOUS] Handshake complete.");
}
}
}

View File

@ -21,6 +21,10 @@ import java.io.DataInputStream;
import java.io.DataOutput;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.nio.channels.Channels;
import java.nio.channels.WritableByteChannel;
import org.apache.activemq.util.ByteArrayInputStream;
import org.apache.activemq.util.ByteArrayOutputStream;
@ -53,8 +57,21 @@ public class AmqpWireFormat implements WireFormat {
@Override
public void marshal(Object command, DataOutput dataOut) throws IOException {
Buffer frame = (Buffer) command;
frame.writeTo(dataOut);
if (command instanceof ByteBuffer) {
ByteBuffer buffer = (ByteBuffer) command;
if (dataOut instanceof OutputStream) {
WritableByteChannel channel = Channels.newChannel((OutputStream) dataOut);
channel.write(buffer);
} else {
while (buffer.hasRemaining()) {
dataOut.writeByte(buffer.get());
}
}
} else {
Buffer frame = (Buffer) command;
frame.writeTo(dataOut);
}
}
boolean magicRead = false;