ARTEMIS-204 Improvements on OpenWire
https://issues.apache.org/jira/browse/ARTEMIS-204 by consequence this will also fix any possible issues with AMQP
This commit is contained in:
parent
a4498d46d1
commit
1dae99746b
|
@ -16,6 +16,7 @@
|
|||
*/
|
||||
package org.apache.activemq.artemis.api.core;
|
||||
|
||||
import java.io.DataInput;
|
||||
import java.nio.ByteBuffer;
|
||||
|
||||
import io.netty.buffer.ByteBuf;
|
||||
|
@ -29,7 +30,7 @@ import io.netty.buffer.ByteBuf;
|
|||
*
|
||||
* @see ActiveMQBuffers
|
||||
*/
|
||||
public interface ActiveMQBuffer {
|
||||
public interface ActiveMQBuffer extends DataInput {
|
||||
|
||||
/**
|
||||
* Returns the underlying Netty's ByteBuf
|
||||
|
@ -642,7 +643,7 @@ public interface ActiveMQBuffer {
|
|||
* @return an unsigned byte at the current {@code readerIndex}
|
||||
* @throws IndexOutOfBoundsException if {@code this.readableBytes} is less than {@code 1}
|
||||
*/
|
||||
short readUnsignedByte();
|
||||
int readUnsignedByte();
|
||||
|
||||
/**
|
||||
* Gets a 16-bit short integer at the current {@code readerIndex}
|
||||
|
@ -874,7 +875,7 @@ public interface ActiveMQBuffer {
|
|||
* @param length The number of bytes to skip
|
||||
* @throws IndexOutOfBoundsException if {@code length} is greater than {@code this.readableBytes}
|
||||
*/
|
||||
void skipBytes(int length);
|
||||
int skipBytes(int length);
|
||||
|
||||
/**
|
||||
* Sets the specified byte at the current {@code writerIndex}
|
||||
|
|
|
@ -16,12 +16,14 @@
|
|||
*/
|
||||
package org.apache.activemq.artemis.core.buffers.impl;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.ByteBuffer;
|
||||
|
||||
import io.netty.buffer.ByteBuf;
|
||||
import io.netty.buffer.Unpooled;
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
|
||||
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||
import org.apache.activemq.artemis.utils.ByteUtil;
|
||||
import org.apache.activemq.artemis.utils.DataConstants;
|
||||
import org.apache.activemq.artemis.utils.UTF8Util;
|
||||
|
||||
|
@ -350,7 +352,7 @@ public class ChannelBufferWrapper implements ActiveMQBuffer {
|
|||
return new ChannelBufferWrapper(buffer.readSlice(length), releasable);
|
||||
}
|
||||
|
||||
public short readUnsignedByte() {
|
||||
public int readUnsignedByte() {
|
||||
return buffer.readUnsignedByte();
|
||||
}
|
||||
|
||||
|
@ -426,8 +428,9 @@ public class ChannelBufferWrapper implements ActiveMQBuffer {
|
|||
buffer.setShort(index, value);
|
||||
}
|
||||
|
||||
public void skipBytes(final int length) {
|
||||
public int skipBytes(final int length) {
|
||||
buffer.skipBytes(length);
|
||||
return length;
|
||||
}
|
||||
|
||||
public ActiveMQBuffer slice() {
|
||||
|
@ -510,4 +513,23 @@ public class ChannelBufferWrapper implements ActiveMQBuffer {
|
|||
buffer.writeShort(value);
|
||||
}
|
||||
|
||||
/** from {@link java.io.DataInput} interface */
|
||||
@Override
|
||||
public void readFully(byte[] b) throws IOException {
|
||||
readBytes(b);
|
||||
}
|
||||
|
||||
/** from {@link java.io.DataInput} interface */
|
||||
@Override
|
||||
public void readFully(byte[] b, int off, int len) throws IOException {
|
||||
readBytes(b, off, len);
|
||||
}
|
||||
|
||||
/** from {@link java.io.DataInput} interface */
|
||||
@Override
|
||||
public String readLine() throws IOException {
|
||||
return ByteUtil.readLine(this);
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
|
|
@ -18,6 +18,7 @@ package org.apache.activemq.artemis.utils;
|
|||
|
||||
import io.netty.buffer.ByteBuf;
|
||||
import io.netty.buffer.UnpooledByteBufAllocator;
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
|
||||
|
||||
public class ByteUtil {
|
||||
|
||||
|
@ -67,4 +68,15 @@ public class ByteUtil {
|
|||
return buffer.array();
|
||||
}
|
||||
|
||||
|
||||
public static String readLine(ActiveMQBuffer buffer) {
|
||||
StringBuilder sb = new StringBuilder("");
|
||||
char c = buffer.readChar();
|
||||
while (c != '\n') {
|
||||
sb.append(c);
|
||||
c = buffer.readChar();
|
||||
}
|
||||
return sb.toString();
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -17,6 +17,7 @@
|
|||
package org.apache.activemq.artemis.core.client.impl;
|
||||
|
||||
import java.io.DataInputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.io.OutputStream;
|
||||
import java.nio.ByteBuffer;
|
||||
|
@ -27,8 +28,8 @@ import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
|
|||
import org.apache.activemq.artemis.api.core.ActiveMQException;
|
||||
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||
import org.apache.activemq.artemis.core.client.ActiveMQClientLogger;
|
||||
import org.apache.activemq.artemis.utils.DataConstants;
|
||||
import org.apache.activemq.artemis.utils.ActiveMQBufferInputStream;
|
||||
import org.apache.activemq.artemis.utils.DataConstants;
|
||||
import org.apache.activemq.artemis.utils.InflaterReader;
|
||||
import org.apache.activemq.artemis.utils.InflaterWriter;
|
||||
import org.apache.activemq.artemis.utils.UTF8Util;
|
||||
|
@ -302,9 +303,9 @@ final class CompressedLargeMessageControllerImpl implements LargeMessageControll
|
|||
throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
|
||||
}
|
||||
|
||||
public short readUnsignedByte() {
|
||||
public int readUnsignedByte() {
|
||||
try {
|
||||
return (short) getStream().readUnsignedByte();
|
||||
return getStream().readUnsignedByte();
|
||||
}
|
||||
catch (Exception e) {
|
||||
throw new IllegalStateException(e.getMessage(), e);
|
||||
|
@ -391,18 +392,39 @@ final class CompressedLargeMessageControllerImpl implements LargeMessageControll
|
|||
dst.put(bytesToGet);
|
||||
}
|
||||
|
||||
public void skipBytes(final int length) {
|
||||
public int skipBytes(final int length) {
|
||||
|
||||
try {
|
||||
for (int i = 0; i < length; i++) {
|
||||
getStream().read();
|
||||
}
|
||||
return length;
|
||||
}
|
||||
catch (Exception e) {
|
||||
throw new IllegalStateException(e.getMessage(), e);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/** from {@link java.io.DataInput} interface */
|
||||
@Override
|
||||
public void readFully(byte[] b) throws IOException {
|
||||
readBytes(b);
|
||||
}
|
||||
|
||||
/** from {@link java.io.DataInput} interface */
|
||||
@Override
|
||||
public void readFully(byte[] b, int off, int len) throws IOException {
|
||||
readBytes(b, off, len);
|
||||
}
|
||||
|
||||
/** from {@link java.io.DataInput} interface */
|
||||
@Override
|
||||
public String readLine() throws IOException {
|
||||
return getStream().readLine();
|
||||
}
|
||||
|
||||
|
||||
public void writeByte(final byte value) {
|
||||
throw new IllegalAccessError(OPERATION_NOT_SUPPORTED);
|
||||
}
|
||||
|
|
|
@ -38,6 +38,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQInterruptedException;
|
|||
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||
import org.apache.activemq.artemis.core.client.ActiveMQClientLogger;
|
||||
import org.apache.activemq.artemis.core.client.ActiveMQClientMessageBundle;
|
||||
import org.apache.activemq.artemis.utils.ByteUtil;
|
||||
import org.apache.activemq.artemis.utils.DataConstants;
|
||||
import org.apache.activemq.artemis.utils.UTF8Util;
|
||||
|
||||
|
@ -666,7 +667,7 @@ public class LargeMessageControllerImpl implements LargeMessageController {
|
|||
throw new IllegalAccessError(LargeMessageControllerImpl.READ_ONLY_ERROR_MESSAGE);
|
||||
}
|
||||
|
||||
public short readUnsignedByte() {
|
||||
public int readUnsignedByte() {
|
||||
return (short) (readByte() & 0xFF);
|
||||
}
|
||||
|
||||
|
@ -758,11 +759,12 @@ public class LargeMessageControllerImpl implements LargeMessageController {
|
|||
readerIndex += length;
|
||||
}
|
||||
|
||||
public void skipBytes(final int length) {
|
||||
public int skipBytes(final int length) {
|
||||
|
||||
long newReaderIndex = readerIndex + length;
|
||||
checkForPacket(newReaderIndex);
|
||||
readerIndex = newReaderIndex;
|
||||
return length;
|
||||
}
|
||||
|
||||
public void writeByte(final byte value) {
|
||||
|
@ -1176,7 +1178,24 @@ public class LargeMessageControllerImpl implements LargeMessageController {
|
|||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/** from {@link java.io.DataInput} interface */
|
||||
@Override
|
||||
public void readFully(byte[] b) throws IOException {
|
||||
readBytes(b);
|
||||
}
|
||||
|
||||
/** from {@link java.io.DataInput} interface */
|
||||
@Override
|
||||
public void readFully(byte[] b, int off, int len) throws IOException {
|
||||
readBytes(b, off, len);
|
||||
}
|
||||
|
||||
/** from {@link java.io.DataInput} interface */
|
||||
@Override
|
||||
public String readLine() throws IOException {
|
||||
return ByteUtil.readLine(this);
|
||||
}
|
||||
|
||||
public ByteBuf byteBuf() {
|
||||
|
|
|
@ -16,115 +16,115 @@
|
|||
*/
|
||||
package org.apache.activemq.artemis.reader;
|
||||
|
||||
import org.apache.activemq.artemis.api.core.Message;
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
|
||||
|
||||
public class BytesMessageUtil extends MessageUtil {
|
||||
|
||||
public static boolean bytesReadBoolean(Message message) {
|
||||
return getBodyBuffer(message).readBoolean();
|
||||
public static boolean bytesReadBoolean(ActiveMQBuffer message) {
|
||||
return message.readBoolean();
|
||||
}
|
||||
|
||||
public static byte bytesReadByte(Message message) {
|
||||
return getBodyBuffer(message).readByte();
|
||||
public static byte bytesReadByte(ActiveMQBuffer message) {
|
||||
return message.readByte();
|
||||
}
|
||||
|
||||
public static int bytesReadUnsignedByte(Message message) {
|
||||
return getBodyBuffer(message).readUnsignedByte();
|
||||
public static int bytesReadUnsignedByte(ActiveMQBuffer message) {
|
||||
return message.readUnsignedByte();
|
||||
}
|
||||
|
||||
public static short bytesReadShort(Message message) {
|
||||
return getBodyBuffer(message).readShort();
|
||||
public static short bytesReadShort(ActiveMQBuffer message) {
|
||||
return message.readShort();
|
||||
}
|
||||
|
||||
public static int bytesReadUnsignedShort(Message message) {
|
||||
return getBodyBuffer(message).readUnsignedShort();
|
||||
public static int bytesReadUnsignedShort(ActiveMQBuffer message) {
|
||||
return message.readUnsignedShort();
|
||||
}
|
||||
|
||||
public static char bytesReadChar(Message message) {
|
||||
return (char) getBodyBuffer(message).readShort();
|
||||
public static char bytesReadChar(ActiveMQBuffer message) {
|
||||
return (char) message.readShort();
|
||||
}
|
||||
|
||||
public static int bytesReadInt(Message message) {
|
||||
return getBodyBuffer(message).readInt();
|
||||
public static int bytesReadInt(ActiveMQBuffer message) {
|
||||
return message.readInt();
|
||||
}
|
||||
|
||||
public static long bytesReadLong(Message message) {
|
||||
return getBodyBuffer(message).readLong();
|
||||
public static long bytesReadLong(ActiveMQBuffer message) {
|
||||
return message.readLong();
|
||||
}
|
||||
|
||||
public static float bytesReadFloat(Message message) {
|
||||
return Float.intBitsToFloat(getBodyBuffer(message).readInt());
|
||||
public static float bytesReadFloat(ActiveMQBuffer message) {
|
||||
return Float.intBitsToFloat(message.readInt());
|
||||
}
|
||||
|
||||
public static double bytesReadDouble(Message message) {
|
||||
return Double.longBitsToDouble(getBodyBuffer(message).readLong());
|
||||
public static double bytesReadDouble(ActiveMQBuffer message) {
|
||||
return Double.longBitsToDouble(message.readLong());
|
||||
}
|
||||
|
||||
public static String bytesReadUTF(Message message) {
|
||||
return getBodyBuffer(message).readUTF();
|
||||
public static String bytesReadUTF(ActiveMQBuffer message) {
|
||||
return message.readUTF();
|
||||
}
|
||||
|
||||
public static int bytesReadBytes(Message message, final byte[] value) {
|
||||
public static int bytesReadBytes(ActiveMQBuffer message, final byte[] value) {
|
||||
return bytesReadBytes(message, value, value.length);
|
||||
}
|
||||
|
||||
public static int bytesReadBytes(Message message, final byte[] value, final int length) {
|
||||
if (!getBodyBuffer(message).readable()) {
|
||||
public static int bytesReadBytes(ActiveMQBuffer message, final byte[] value, final int length) {
|
||||
if (!message.readable()) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
int read = Math.min(length, getBodyBuffer(message).readableBytes());
|
||||
int read = Math.min(length, message.readableBytes());
|
||||
|
||||
if (read != 0) {
|
||||
getBodyBuffer(message).readBytes(value, 0, read);
|
||||
message.readBytes(value, 0, read);
|
||||
}
|
||||
|
||||
return read;
|
||||
|
||||
}
|
||||
|
||||
public static void bytesWriteBoolean(Message message, boolean value) {
|
||||
getBodyBuffer(message).writeBoolean(value);
|
||||
public static void bytesWriteBoolean(ActiveMQBuffer message, boolean value) {
|
||||
message.writeBoolean(value);
|
||||
}
|
||||
|
||||
public static void bytesWriteByte(Message message, byte value) {
|
||||
getBodyBuffer(message).writeByte(value);
|
||||
public static void bytesWriteByte(ActiveMQBuffer message, byte value) {
|
||||
message.writeByte(value);
|
||||
}
|
||||
|
||||
public static void bytesWriteShort(Message message, short value) {
|
||||
getBodyBuffer(message).writeShort(value);
|
||||
public static void bytesWriteShort(ActiveMQBuffer message, short value) {
|
||||
message.writeShort(value);
|
||||
}
|
||||
|
||||
public static void bytesWriteChar(Message message, char value) {
|
||||
getBodyBuffer(message).writeShort((short) value);
|
||||
public static void bytesWriteChar(ActiveMQBuffer message, char value) {
|
||||
message.writeShort((short) value);
|
||||
}
|
||||
|
||||
public static void bytesWriteInt(Message message, int value) {
|
||||
getBodyBuffer(message).writeInt(value);
|
||||
public static void bytesWriteInt(ActiveMQBuffer message, int value) {
|
||||
message.writeInt(value);
|
||||
}
|
||||
|
||||
public static void bytesWriteLong(Message message, long value) {
|
||||
getBodyBuffer(message).writeLong(value);
|
||||
public static void bytesWriteLong(ActiveMQBuffer message, long value) {
|
||||
message.writeLong(value);
|
||||
}
|
||||
|
||||
public static void bytesWriteFloat(Message message, float value) {
|
||||
getBodyBuffer(message).writeInt(Float.floatToIntBits(value));
|
||||
public static void bytesWriteFloat(ActiveMQBuffer message, float value) {
|
||||
message.writeInt(Float.floatToIntBits(value));
|
||||
}
|
||||
|
||||
public static void bytesWriteDouble(Message message, double value) {
|
||||
getBodyBuffer(message).writeLong(Double.doubleToLongBits(value));
|
||||
public static void bytesWriteDouble(ActiveMQBuffer message, double value) {
|
||||
message.writeLong(Double.doubleToLongBits(value));
|
||||
}
|
||||
|
||||
public static void bytesWriteUTF(Message message, String value) {
|
||||
getBodyBuffer(message).writeUTF(value);
|
||||
public static void bytesWriteUTF(ActiveMQBuffer message, String value) {
|
||||
message.writeUTF(value);
|
||||
}
|
||||
|
||||
public static void bytesWriteBytes(Message message, byte[] value) {
|
||||
getBodyBuffer(message).writeBytes(value);
|
||||
public static void bytesWriteBytes(ActiveMQBuffer message, byte[] value) {
|
||||
message.writeBytes(value);
|
||||
}
|
||||
|
||||
public static void bytesWriteBytes(Message message, final byte[] value, final int offset, final int length) {
|
||||
getBodyBuffer(message).writeBytes(value, offset, length);
|
||||
public static void bytesWriteBytes(ActiveMQBuffer message, final byte[] value, final int offset, final int length) {
|
||||
message.writeBytes(value, offset, length);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -134,7 +134,7 @@ public class BytesMessageUtil extends MessageUtil {
|
|||
* @param value
|
||||
* @return
|
||||
*/
|
||||
public static boolean bytesWriteObject(Message message, Object value) {
|
||||
public static boolean bytesWriteObject(ActiveMQBuffer message, Object value) {
|
||||
if (value == null) {
|
||||
throw new NullPointerException("Attempt to write a null value");
|
||||
}
|
||||
|
@ -175,8 +175,8 @@ public class BytesMessageUtil extends MessageUtil {
|
|||
return true;
|
||||
}
|
||||
|
||||
public static void bytesMessageReset(Message message) {
|
||||
getBodyBuffer(message).resetReaderIndex();
|
||||
public static void bytesMessageReset(ActiveMQBuffer message) {
|
||||
message.resetReaderIndex();
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -17,7 +17,6 @@
|
|||
package org.apache.activemq.artemis.reader;
|
||||
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
|
||||
import org.apache.activemq.artemis.api.core.Message;
|
||||
import org.apache.activemq.artemis.utils.TypedProperties;
|
||||
|
||||
public class MapMessageUtil extends MessageUtil {
|
||||
|
@ -25,16 +24,15 @@ public class MapMessageUtil extends MessageUtil {
|
|||
/**
|
||||
* Utility method to set the map on a message body
|
||||
*/
|
||||
public static void writeBodyMap(Message message, TypedProperties properties) {
|
||||
ActiveMQBuffer buff = getBodyBuffer(message);
|
||||
buff.resetWriterIndex();
|
||||
properties.encode(buff);
|
||||
public static void writeBodyMap(ActiveMQBuffer message, TypedProperties properties) {
|
||||
message.resetWriterIndex();
|
||||
properties.encode(message);
|
||||
}
|
||||
|
||||
/**
|
||||
* Utility method to set the map on a message body
|
||||
*/
|
||||
public static TypedProperties readBodyMap(Message message) {
|
||||
public static TypedProperties readBodyMap(ActiveMQBuffer message) {
|
||||
TypedProperties map = new TypedProperties();
|
||||
readBodyMap(message, map);
|
||||
return map;
|
||||
|
@ -43,10 +41,9 @@ public class MapMessageUtil extends MessageUtil {
|
|||
/**
|
||||
* Utility method to set the map on a message body
|
||||
*/
|
||||
public static void readBodyMap(Message message, TypedProperties map) {
|
||||
ActiveMQBuffer buff = getBodyBuffer(message);
|
||||
buff.resetReaderIndex();
|
||||
map.decode(buff);
|
||||
public static void readBodyMap(ActiveMQBuffer message, TypedProperties map) {
|
||||
message.resetReaderIndex();
|
||||
map.decode(message);
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -21,7 +21,6 @@ import java.util.HashSet;
|
|||
import java.util.List;
|
||||
import java.util.Set;
|
||||
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQException;
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQPropertyConversionException;
|
||||
import org.apache.activemq.artemis.api.core.Message;
|
||||
|
@ -52,9 +51,9 @@ public class MessageUtil {
|
|||
|
||||
public static final SimpleString CONNECTION_ID_PROPERTY_NAME = new SimpleString("__AMQ_CID");
|
||||
|
||||
public static ActiveMQBuffer getBodyBuffer(Message message) {
|
||||
return message.getBodyBuffer();
|
||||
}
|
||||
// public static ActiveMQBuffer getBodyBuffer(Message message) {
|
||||
// return message.getBodyBuffer();
|
||||
// }
|
||||
|
||||
public static byte[] getJMSCorrelationIDAsBytes(Message message) {
|
||||
Object obj = message.getObjectProperty(CORRELATIONID_HEADER_NAME);
|
||||
|
|
|
@ -17,7 +17,6 @@
|
|||
package org.apache.activemq.artemis.reader;
|
||||
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
|
||||
import org.apache.activemq.artemis.api.core.Message;
|
||||
import org.apache.activemq.artemis.api.core.Pair;
|
||||
import org.apache.activemq.artemis.utils.DataConstants;
|
||||
|
||||
|
@ -27,11 +26,10 @@ public class StreamMessageUtil extends MessageUtil {
|
|||
* Method to read boolean values out of the Stream protocol existent on JMS Stream Messages
|
||||
* Throws IllegalStateException if the type was invalid
|
||||
*
|
||||
* @param message
|
||||
* @param buff
|
||||
* @return
|
||||
*/
|
||||
public static boolean streamReadBoolean(Message message) {
|
||||
ActiveMQBuffer buff = getBodyBuffer(message);
|
||||
public static boolean streamReadBoolean(ActiveMQBuffer buff) {
|
||||
byte type = buff.readByte();
|
||||
|
||||
switch (type) {
|
||||
|
@ -46,8 +44,7 @@ public class StreamMessageUtil extends MessageUtil {
|
|||
|
||||
}
|
||||
|
||||
public static byte streamReadByte(Message message) {
|
||||
ActiveMQBuffer buff = getBodyBuffer(message);
|
||||
public static byte streamReadByte(ActiveMQBuffer buff) {
|
||||
int index = buff.readerIndex();
|
||||
try {
|
||||
byte type = buff.readByte();
|
||||
|
@ -68,8 +65,7 @@ public class StreamMessageUtil extends MessageUtil {
|
|||
|
||||
}
|
||||
|
||||
public static short streamReadShort(Message message) {
|
||||
ActiveMQBuffer buff = getBodyBuffer(message);
|
||||
public static short streamReadShort(ActiveMQBuffer buff) {
|
||||
byte type = buff.readByte();
|
||||
switch (type) {
|
||||
case DataConstants.BYTE:
|
||||
|
@ -84,8 +80,7 @@ public class StreamMessageUtil extends MessageUtil {
|
|||
}
|
||||
}
|
||||
|
||||
public static char streamReadChar(Message message) {
|
||||
ActiveMQBuffer buff = getBodyBuffer(message);
|
||||
public static char streamReadChar(ActiveMQBuffer buff) {
|
||||
byte type = buff.readByte();
|
||||
switch (type) {
|
||||
case DataConstants.CHAR:
|
||||
|
@ -104,8 +99,7 @@ public class StreamMessageUtil extends MessageUtil {
|
|||
|
||||
}
|
||||
|
||||
public static int streamReadInteger(Message message) {
|
||||
ActiveMQBuffer buff = getBodyBuffer(message);
|
||||
public static int streamReadInteger(ActiveMQBuffer buff) {
|
||||
byte type = buff.readByte();
|
||||
switch (type) {
|
||||
case DataConstants.BYTE:
|
||||
|
@ -122,8 +116,7 @@ public class StreamMessageUtil extends MessageUtil {
|
|||
}
|
||||
}
|
||||
|
||||
public static long streamReadLong(Message message) {
|
||||
ActiveMQBuffer buff = getBodyBuffer(message);
|
||||
public static long streamReadLong(ActiveMQBuffer buff) {
|
||||
byte type = buff.readByte();
|
||||
switch (type) {
|
||||
case DataConstants.BYTE:
|
||||
|
@ -142,8 +135,7 @@ public class StreamMessageUtil extends MessageUtil {
|
|||
}
|
||||
}
|
||||
|
||||
public static float streamReadFloat(Message message) {
|
||||
ActiveMQBuffer buff = getBodyBuffer(message);
|
||||
public static float streamReadFloat(ActiveMQBuffer buff) {
|
||||
byte type = buff.readByte();
|
||||
switch (type) {
|
||||
case DataConstants.FLOAT:
|
||||
|
@ -156,8 +148,7 @@ public class StreamMessageUtil extends MessageUtil {
|
|||
}
|
||||
}
|
||||
|
||||
public static double streamReadDouble(Message message) {
|
||||
ActiveMQBuffer buff = getBodyBuffer(message);
|
||||
public static double streamReadDouble(ActiveMQBuffer buff) {
|
||||
byte type = buff.readByte();
|
||||
switch (type) {
|
||||
case DataConstants.FLOAT:
|
||||
|
@ -172,8 +163,7 @@ public class StreamMessageUtil extends MessageUtil {
|
|||
}
|
||||
}
|
||||
|
||||
public static String streamReadString(Message message) {
|
||||
ActiveMQBuffer buff = getBodyBuffer(message);
|
||||
public static String streamReadString(ActiveMQBuffer buff) {
|
||||
byte type = buff.readByte();
|
||||
switch (type) {
|
||||
case DataConstants.BOOLEAN:
|
||||
|
@ -204,12 +194,10 @@ public class StreamMessageUtil extends MessageUtil {
|
|||
* It will return remainingBytes, bytesRead
|
||||
*
|
||||
* @param remainingBytes remaining Bytes from previous read. Send it to 0 if it was the first call for the message
|
||||
* @param message
|
||||
* @param buff
|
||||
* @return a pair of remaining bytes and bytes read
|
||||
*/
|
||||
public static Pair<Integer, Integer> streamReadBytes(Message message, int remainingBytes, byte[] value) {
|
||||
ActiveMQBuffer buff = getBodyBuffer(message);
|
||||
|
||||
public static Pair<Integer, Integer> streamReadBytes(ActiveMQBuffer buff, int remainingBytes, byte[] value) {
|
||||
if (remainingBytes == -1) {
|
||||
return new Pair<>(0, -1);
|
||||
}
|
||||
|
@ -230,9 +218,7 @@ public class StreamMessageUtil extends MessageUtil {
|
|||
|
||||
}
|
||||
|
||||
public static Object streamReadObject(Message message) {
|
||||
ActiveMQBuffer buff = getBodyBuffer(message);
|
||||
|
||||
public static Object streamReadObject(ActiveMQBuffer buff) {
|
||||
byte type = buff.readByte();
|
||||
switch (type) {
|
||||
case DataConstants.BOOLEAN:
|
||||
|
|
|
@ -17,7 +17,6 @@
|
|||
package org.apache.activemq.artemis.reader;
|
||||
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
|
||||
import org.apache.activemq.artemis.api.core.Message;
|
||||
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||
|
||||
public class TextMessageUtil extends MessageUtil {
|
||||
|
@ -25,8 +24,7 @@ public class TextMessageUtil extends MessageUtil {
|
|||
/**
|
||||
* Utility method to set the Text message on a message body
|
||||
*/
|
||||
public static void writeBodyText(Message message, SimpleString text) {
|
||||
ActiveMQBuffer buff = getBodyBuffer(message);
|
||||
public static void writeBodyText(ActiveMQBuffer buff, SimpleString text) {
|
||||
buff.clear();
|
||||
buff.writeNullableSimpleString(text);
|
||||
}
|
||||
|
@ -34,8 +32,7 @@ public class TextMessageUtil extends MessageUtil {
|
|||
/**
|
||||
* Utility method to set the Text message on a message body
|
||||
*/
|
||||
public static SimpleString readBodyText(Message message) {
|
||||
ActiveMQBuffer buff = getBodyBuffer(message);
|
||||
public static SimpleString readBodyText(ActiveMQBuffer buff) {
|
||||
buff.resetReaderIndex();
|
||||
return buff.readNullableSimpleString();
|
||||
}
|
||||
|
|
|
@ -102,7 +102,7 @@ public class ActiveMQBytesMessage extends ActiveMQMessage implements BytesMessag
|
|||
public boolean readBoolean() throws JMSException {
|
||||
checkRead();
|
||||
try {
|
||||
return bytesReadBoolean(message);
|
||||
return bytesReadBoolean(message.getBodyBuffer());
|
||||
}
|
||||
catch (IndexOutOfBoundsException e) {
|
||||
throw new MessageEOFException("");
|
||||
|
@ -112,7 +112,7 @@ public class ActiveMQBytesMessage extends ActiveMQMessage implements BytesMessag
|
|||
public byte readByte() throws JMSException {
|
||||
checkRead();
|
||||
try {
|
||||
return bytesReadByte(message);
|
||||
return bytesReadByte(message.getBodyBuffer());
|
||||
}
|
||||
catch (IndexOutOfBoundsException e) {
|
||||
throw new MessageEOFException("");
|
||||
|
@ -122,7 +122,7 @@ public class ActiveMQBytesMessage extends ActiveMQMessage implements BytesMessag
|
|||
public int readUnsignedByte() throws JMSException {
|
||||
checkRead();
|
||||
try {
|
||||
return bytesReadUnsignedByte(message);
|
||||
return bytesReadUnsignedByte(message.getBodyBuffer());
|
||||
}
|
||||
catch (IndexOutOfBoundsException e) {
|
||||
throw new MessageEOFException("");
|
||||
|
@ -132,7 +132,7 @@ public class ActiveMQBytesMessage extends ActiveMQMessage implements BytesMessag
|
|||
public short readShort() throws JMSException {
|
||||
checkRead();
|
||||
try {
|
||||
return bytesReadShort(message);
|
||||
return bytesReadShort(message.getBodyBuffer());
|
||||
}
|
||||
catch (IndexOutOfBoundsException e) {
|
||||
throw new MessageEOFException("");
|
||||
|
@ -142,7 +142,7 @@ public class ActiveMQBytesMessage extends ActiveMQMessage implements BytesMessag
|
|||
public int readUnsignedShort() throws JMSException {
|
||||
checkRead();
|
||||
try {
|
||||
return bytesReadUnsignedShort(message);
|
||||
return bytesReadUnsignedShort(message.getBodyBuffer());
|
||||
}
|
||||
catch (IndexOutOfBoundsException e) {
|
||||
throw new MessageEOFException("");
|
||||
|
@ -152,7 +152,7 @@ public class ActiveMQBytesMessage extends ActiveMQMessage implements BytesMessag
|
|||
public char readChar() throws JMSException {
|
||||
checkRead();
|
||||
try {
|
||||
return bytesReadChar(message);
|
||||
return bytesReadChar(message.getBodyBuffer());
|
||||
}
|
||||
catch (IndexOutOfBoundsException e) {
|
||||
throw new MessageEOFException("");
|
||||
|
@ -162,7 +162,7 @@ public class ActiveMQBytesMessage extends ActiveMQMessage implements BytesMessag
|
|||
public int readInt() throws JMSException {
|
||||
checkRead();
|
||||
try {
|
||||
return bytesReadInt(message);
|
||||
return bytesReadInt(message.getBodyBuffer());
|
||||
}
|
||||
catch (IndexOutOfBoundsException e) {
|
||||
throw new MessageEOFException("");
|
||||
|
@ -172,7 +172,7 @@ public class ActiveMQBytesMessage extends ActiveMQMessage implements BytesMessag
|
|||
public long readLong() throws JMSException {
|
||||
checkRead();
|
||||
try {
|
||||
return bytesReadLong(message);
|
||||
return bytesReadLong(message.getBodyBuffer());
|
||||
}
|
||||
catch (IndexOutOfBoundsException e) {
|
||||
throw new MessageEOFException("");
|
||||
|
@ -182,7 +182,7 @@ public class ActiveMQBytesMessage extends ActiveMQMessage implements BytesMessag
|
|||
public float readFloat() throws JMSException {
|
||||
checkRead();
|
||||
try {
|
||||
return bytesReadFloat(message);
|
||||
return bytesReadFloat(message.getBodyBuffer());
|
||||
}
|
||||
catch (IndexOutOfBoundsException e) {
|
||||
throw new MessageEOFException("");
|
||||
|
@ -192,7 +192,7 @@ public class ActiveMQBytesMessage extends ActiveMQMessage implements BytesMessag
|
|||
public double readDouble() throws JMSException {
|
||||
checkRead();
|
||||
try {
|
||||
return bytesReadDouble(message);
|
||||
return bytesReadDouble(message.getBodyBuffer());
|
||||
}
|
||||
catch (IndexOutOfBoundsException e) {
|
||||
throw new MessageEOFException("");
|
||||
|
@ -202,7 +202,7 @@ public class ActiveMQBytesMessage extends ActiveMQMessage implements BytesMessag
|
|||
public String readUTF() throws JMSException {
|
||||
checkRead();
|
||||
try {
|
||||
return bytesReadUTF(message);
|
||||
return bytesReadUTF(message.getBodyBuffer());
|
||||
}
|
||||
catch (IndexOutOfBoundsException e) {
|
||||
throw new MessageEOFException("");
|
||||
|
@ -217,59 +217,59 @@ public class ActiveMQBytesMessage extends ActiveMQMessage implements BytesMessag
|
|||
|
||||
public int readBytes(final byte[] value) throws JMSException {
|
||||
checkRead();
|
||||
return bytesReadBytes(message, value);
|
||||
return bytesReadBytes(message.getBodyBuffer(), value);
|
||||
}
|
||||
|
||||
public int readBytes(final byte[] value, final int length) throws JMSException {
|
||||
checkRead();
|
||||
return bytesReadBytes(message, value, length);
|
||||
return bytesReadBytes(message.getBodyBuffer(), value, length);
|
||||
|
||||
}
|
||||
|
||||
public void writeBoolean(final boolean value) throws JMSException {
|
||||
checkWrite();
|
||||
bytesWriteBoolean(message, value);
|
||||
bytesWriteBoolean(message.getBodyBuffer(), value);
|
||||
}
|
||||
|
||||
public void writeByte(final byte value) throws JMSException {
|
||||
checkWrite();
|
||||
bytesWriteByte(message, value);
|
||||
bytesWriteByte(message.getBodyBuffer(), value);
|
||||
}
|
||||
|
||||
public void writeShort(final short value) throws JMSException {
|
||||
checkWrite();
|
||||
bytesWriteShort(message, value);
|
||||
bytesWriteShort(message.getBodyBuffer(), value);
|
||||
}
|
||||
|
||||
public void writeChar(final char value) throws JMSException {
|
||||
checkWrite();
|
||||
bytesWriteChar(message, value);
|
||||
bytesWriteChar(message.getBodyBuffer(), value);
|
||||
}
|
||||
|
||||
public void writeInt(final int value) throws JMSException {
|
||||
checkWrite();
|
||||
bytesWriteInt(message, value);
|
||||
bytesWriteInt(message.getBodyBuffer(), value);
|
||||
}
|
||||
|
||||
public void writeLong(final long value) throws JMSException {
|
||||
checkWrite();
|
||||
bytesWriteLong(message, value);
|
||||
bytesWriteLong(message.getBodyBuffer(), value);
|
||||
}
|
||||
|
||||
public void writeFloat(final float value) throws JMSException {
|
||||
checkWrite();
|
||||
bytesWriteFloat(message, value);
|
||||
bytesWriteFloat(message.getBodyBuffer(), value);
|
||||
}
|
||||
|
||||
public void writeDouble(final double value) throws JMSException {
|
||||
checkWrite();
|
||||
bytesWriteDouble(message, value);
|
||||
bytesWriteDouble(message.getBodyBuffer(), value);
|
||||
}
|
||||
|
||||
public void writeUTF(final String value) throws JMSException {
|
||||
checkWrite();
|
||||
try {
|
||||
bytesWriteUTF(message, value);
|
||||
bytesWriteUTF(message.getBodyBuffer(), value);
|
||||
}
|
||||
catch (Exception e) {
|
||||
JMSException je = new JMSException("Failed to write UTF");
|
||||
|
@ -282,17 +282,17 @@ public class ActiveMQBytesMessage extends ActiveMQMessage implements BytesMessag
|
|||
|
||||
public void writeBytes(final byte[] value) throws JMSException {
|
||||
checkWrite();
|
||||
bytesWriteBytes(message, value);
|
||||
bytesWriteBytes(message.getBodyBuffer(), value);
|
||||
}
|
||||
|
||||
public void writeBytes(final byte[] value, final int offset, final int length) throws JMSException {
|
||||
checkWrite();
|
||||
bytesWriteBytes(message, value, offset, length);
|
||||
bytesWriteBytes(message.getBodyBuffer(), value, offset, length);
|
||||
}
|
||||
|
||||
public void writeObject(final Object value) throws JMSException {
|
||||
checkWrite();
|
||||
if (!bytesWriteObject(message, value)) {
|
||||
if (!bytesWriteObject(message.getBodyBuffer(), value)) {
|
||||
throw new MessageFormatException("Invalid object for properties");
|
||||
}
|
||||
}
|
||||
|
@ -304,7 +304,7 @@ public class ActiveMQBytesMessage extends ActiveMQMessage implements BytesMessag
|
|||
bodyLength = message.getBodySize();
|
||||
}
|
||||
|
||||
bytesMessageReset(message);
|
||||
bytesMessageReset(message.getBodyBuffer());
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -317,7 +317,7 @@ public final class ActiveMQMapMessage extends ActiveMQMessage implements MapMess
|
|||
@Override
|
||||
public void doBeforeSend() throws Exception {
|
||||
if (invalid) {
|
||||
writeBodyMap(message, map);
|
||||
writeBodyMap(message.getBodyBuffer(), map);
|
||||
invalid = false;
|
||||
}
|
||||
|
||||
|
@ -328,7 +328,7 @@ public final class ActiveMQMapMessage extends ActiveMQMessage implements MapMess
|
|||
public void doBeforeReceive() throws ActiveMQException {
|
||||
super.doBeforeReceive();
|
||||
|
||||
readBodyMap(message, map);
|
||||
readBodyMap(message.getBodyBuffer(), map);
|
||||
}
|
||||
|
||||
// Package protected ---------------------------------------------
|
||||
|
|
|
@ -89,7 +89,7 @@ public final class ActiveMQStreamMessage extends ActiveMQMessage implements Stre
|
|||
public boolean readBoolean() throws JMSException {
|
||||
checkRead();
|
||||
try {
|
||||
return streamReadBoolean(message);
|
||||
return streamReadBoolean(message.getBodyBuffer());
|
||||
}
|
||||
catch (IllegalStateException e) {
|
||||
throw new MessageFormatException(e.getMessage());
|
||||
|
@ -103,7 +103,7 @@ public final class ActiveMQStreamMessage extends ActiveMQMessage implements Stre
|
|||
checkRead();
|
||||
|
||||
try {
|
||||
return streamReadByte(message);
|
||||
return streamReadByte(message.getBodyBuffer());
|
||||
}
|
||||
catch (IllegalStateException e) {
|
||||
throw new MessageFormatException(e.getMessage());
|
||||
|
@ -116,7 +116,7 @@ public final class ActiveMQStreamMessage extends ActiveMQMessage implements Stre
|
|||
public short readShort() throws JMSException {
|
||||
checkRead();
|
||||
try {
|
||||
return streamReadShort(message);
|
||||
return streamReadShort(message.getBodyBuffer());
|
||||
}
|
||||
catch (IllegalStateException e) {
|
||||
throw new MessageFormatException(e.getMessage());
|
||||
|
@ -129,7 +129,7 @@ public final class ActiveMQStreamMessage extends ActiveMQMessage implements Stre
|
|||
public char readChar() throws JMSException {
|
||||
checkRead();
|
||||
try {
|
||||
return streamReadChar(message);
|
||||
return streamReadChar(message.getBodyBuffer());
|
||||
}
|
||||
catch (IllegalStateException e) {
|
||||
throw new MessageFormatException(e.getMessage());
|
||||
|
@ -142,7 +142,7 @@ public final class ActiveMQStreamMessage extends ActiveMQMessage implements Stre
|
|||
public int readInt() throws JMSException {
|
||||
checkRead();
|
||||
try {
|
||||
return streamReadInteger(message);
|
||||
return streamReadInteger(message.getBodyBuffer());
|
||||
}
|
||||
catch (IllegalStateException e) {
|
||||
throw new MessageFormatException(e.getMessage());
|
||||
|
@ -155,7 +155,7 @@ public final class ActiveMQStreamMessage extends ActiveMQMessage implements Stre
|
|||
public long readLong() throws JMSException {
|
||||
checkRead();
|
||||
try {
|
||||
return streamReadLong(message);
|
||||
return streamReadLong(message.getBodyBuffer());
|
||||
}
|
||||
catch (IllegalStateException e) {
|
||||
throw new MessageFormatException(e.getMessage());
|
||||
|
@ -168,7 +168,7 @@ public final class ActiveMQStreamMessage extends ActiveMQMessage implements Stre
|
|||
public float readFloat() throws JMSException {
|
||||
checkRead();
|
||||
try {
|
||||
return streamReadFloat(message);
|
||||
return streamReadFloat(message.getBodyBuffer());
|
||||
}
|
||||
catch (IllegalStateException e) {
|
||||
throw new MessageFormatException(e.getMessage());
|
||||
|
@ -181,7 +181,7 @@ public final class ActiveMQStreamMessage extends ActiveMQMessage implements Stre
|
|||
public double readDouble() throws JMSException {
|
||||
checkRead();
|
||||
try {
|
||||
return streamReadDouble(message);
|
||||
return streamReadDouble(message.getBodyBuffer());
|
||||
}
|
||||
catch (IllegalStateException e) {
|
||||
throw new MessageFormatException(e.getMessage());
|
||||
|
@ -194,7 +194,7 @@ public final class ActiveMQStreamMessage extends ActiveMQMessage implements Stre
|
|||
public String readString() throws JMSException {
|
||||
checkRead();
|
||||
try {
|
||||
return streamReadString(message);
|
||||
return streamReadString(message.getBodyBuffer());
|
||||
}
|
||||
catch (IllegalStateException e) {
|
||||
throw new MessageFormatException(e.getMessage());
|
||||
|
@ -212,7 +212,7 @@ public final class ActiveMQStreamMessage extends ActiveMQMessage implements Stre
|
|||
public int readBytes(final byte[] value) throws JMSException {
|
||||
checkRead();
|
||||
try {
|
||||
Pair<Integer, Integer> pairRead = streamReadBytes(message, len, value);
|
||||
Pair<Integer, Integer> pairRead = streamReadBytes(message.getBodyBuffer(), len, value);
|
||||
|
||||
len = pairRead.getA();
|
||||
return pairRead.getB();
|
||||
|
@ -228,7 +228,7 @@ public final class ActiveMQStreamMessage extends ActiveMQMessage implements Stre
|
|||
public Object readObject() throws JMSException {
|
||||
checkRead();
|
||||
try {
|
||||
return streamReadObject(message);
|
||||
return streamReadObject(message.getBodyBuffer());
|
||||
}
|
||||
catch (IllegalStateException e) {
|
||||
throw new MessageFormatException(e.getMessage());
|
||||
|
|
|
@ -84,7 +84,7 @@ public class ActiveMQTextMessage extends ActiveMQMessage implements TextMessage
|
|||
this.text = null;
|
||||
}
|
||||
|
||||
writeBodyText(message, this.text);
|
||||
writeBodyText(message.getBodyBuffer(), this.text);
|
||||
}
|
||||
|
||||
public String getText() {
|
||||
|
@ -109,7 +109,7 @@ public class ActiveMQTextMessage extends ActiveMQMessage implements TextMessage
|
|||
public void doBeforeReceive() throws ActiveMQException {
|
||||
super.doBeforeReceive();
|
||||
|
||||
text = readBodyText(message);
|
||||
text = readBodyText(message.getBodyBuffer());
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -60,128 +60,128 @@ public class ServerJMSBytesMessage extends ServerJMSMessage implements BytesMess
|
|||
|
||||
@Override
|
||||
public boolean readBoolean() throws JMSException {
|
||||
return bytesReadBoolean(message);
|
||||
return bytesReadBoolean(getReadBodyBuffer());
|
||||
}
|
||||
|
||||
@Override
|
||||
public byte readByte() throws JMSException {
|
||||
return bytesReadByte(message);
|
||||
return bytesReadByte(getReadBodyBuffer());
|
||||
}
|
||||
|
||||
@Override
|
||||
public int readUnsignedByte() throws JMSException {
|
||||
return bytesReadUnsignedByte(message);
|
||||
return bytesReadUnsignedByte(getReadBodyBuffer());
|
||||
}
|
||||
|
||||
@Override
|
||||
public short readShort() throws JMSException {
|
||||
return bytesReadShort(message);
|
||||
return bytesReadShort(getReadBodyBuffer());
|
||||
}
|
||||
|
||||
@Override
|
||||
public int readUnsignedShort() throws JMSException {
|
||||
return bytesReadUnsignedShort(message);
|
||||
return bytesReadUnsignedShort(getReadBodyBuffer());
|
||||
}
|
||||
|
||||
@Override
|
||||
public char readChar() throws JMSException {
|
||||
return bytesReadChar(message);
|
||||
return bytesReadChar(getReadBodyBuffer());
|
||||
}
|
||||
|
||||
@Override
|
||||
public int readInt() throws JMSException {
|
||||
return bytesReadInt(message);
|
||||
return bytesReadInt(getReadBodyBuffer());
|
||||
}
|
||||
|
||||
@Override
|
||||
public long readLong() throws JMSException {
|
||||
return bytesReadLong(message);
|
||||
return bytesReadLong(getReadBodyBuffer());
|
||||
}
|
||||
|
||||
@Override
|
||||
public float readFloat() throws JMSException {
|
||||
return bytesReadFloat(message);
|
||||
return bytesReadFloat(getReadBodyBuffer());
|
||||
}
|
||||
|
||||
@Override
|
||||
public double readDouble() throws JMSException {
|
||||
return bytesReadDouble(message);
|
||||
return bytesReadDouble(getReadBodyBuffer());
|
||||
}
|
||||
|
||||
@Override
|
||||
public String readUTF() throws JMSException {
|
||||
return bytesReadUTF(message);
|
||||
return bytesReadUTF(getReadBodyBuffer());
|
||||
}
|
||||
|
||||
@Override
|
||||
public int readBytes(byte[] value) throws JMSException {
|
||||
return bytesReadBytes(message, value);
|
||||
return bytesReadBytes(getReadBodyBuffer(), value);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int readBytes(byte[] value, int length) throws JMSException {
|
||||
return bytesReadBytes(message, value, length);
|
||||
return bytesReadBytes(getReadBodyBuffer(), value, length);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeBoolean(boolean value) throws JMSException {
|
||||
bytesWriteBoolean(message, value);
|
||||
bytesWriteBoolean(getWriteBodyBuffer(), value);
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeByte(byte value) throws JMSException {
|
||||
bytesWriteByte(message, value);
|
||||
bytesWriteByte(getWriteBodyBuffer(), value);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeShort(short value) throws JMSException {
|
||||
bytesWriteShort(message, value);
|
||||
bytesWriteShort(getWriteBodyBuffer(), value);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeChar(char value) throws JMSException {
|
||||
bytesWriteChar(message, value);
|
||||
bytesWriteChar(getWriteBodyBuffer(), value);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeInt(int value) throws JMSException {
|
||||
bytesWriteInt(message, value);
|
||||
bytesWriteInt(getWriteBodyBuffer(), value);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeLong(long value) throws JMSException {
|
||||
bytesWriteLong(message, value);
|
||||
bytesWriteLong(getWriteBodyBuffer(), value);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeFloat(float value) throws JMSException {
|
||||
bytesWriteFloat(message, value);
|
||||
bytesWriteFloat(getWriteBodyBuffer(), value);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeDouble(double value) throws JMSException {
|
||||
bytesWriteDouble(message, value);
|
||||
bytesWriteDouble(getWriteBodyBuffer(), value);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeUTF(String value) throws JMSException {
|
||||
bytesWriteUTF(message, value);
|
||||
bytesWriteUTF(getWriteBodyBuffer(), value);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeBytes(byte[] value) throws JMSException {
|
||||
bytesWriteBytes(message, value);
|
||||
bytesWriteBytes(getWriteBodyBuffer(), value);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeBytes(byte[] value, int offset, int length) throws JMSException {
|
||||
bytesWriteBytes(message, value, offset, length);
|
||||
bytesWriteBytes(getWriteBodyBuffer(), value, offset, length);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeObject(Object value) throws JMSException {
|
||||
if (!bytesWriteObject(message, value)) {
|
||||
if (!bytesWriteObject(getWriteBodyBuffer(), value)) {
|
||||
throw new JMSException("Can't make conversion of " + value + " to any known type");
|
||||
}
|
||||
}
|
||||
|
@ -199,7 +199,8 @@ public class ServerJMSBytesMessage extends ServerJMSMessage implements BytesMess
|
|||
|
||||
@Override
|
||||
public void reset() throws JMSException {
|
||||
bytesMessageReset(message);
|
||||
bytesMessageReset(getReadBodyBuffer());
|
||||
bytesMessageReset(getWriteBodyBuffer());
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -247,12 +247,12 @@ public final class ServerJMSMapMessage extends ServerJMSMessage implements MapMe
|
|||
|
||||
public void encode() throws Exception {
|
||||
super.encode();
|
||||
writeBodyMap(message, map);
|
||||
writeBodyMap(getWriteBodyBuffer(), map);
|
||||
}
|
||||
|
||||
public void decode() throws Exception {
|
||||
super.decode();
|
||||
readBodyMap(message, map);
|
||||
readBodyMap(getReadBodyBuffer(), map);
|
||||
}
|
||||
|
||||
// Package protected ---------------------------------------------
|
||||
|
|
|
@ -23,6 +23,7 @@ import javax.jms.Message;
|
|||
import java.util.Collections;
|
||||
import java.util.Enumeration;
|
||||
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQException;
|
||||
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||
import org.apache.activemq.artemis.core.message.impl.MessageInternal;
|
||||
|
@ -45,6 +46,24 @@ public class ServerJMSMessage implements Message {
|
|||
this.deliveryCount = deliveryCount;
|
||||
}
|
||||
|
||||
private ActiveMQBuffer readBodyBuffer;
|
||||
|
||||
/** When reading we use a protected copy so multi-threads can work fine */
|
||||
protected ActiveMQBuffer getReadBodyBuffer() {
|
||||
if (readBodyBuffer == null) {
|
||||
// to avoid clashes between multiple threads
|
||||
readBodyBuffer = message.getBodyBufferCopy();
|
||||
}
|
||||
return readBodyBuffer;
|
||||
}
|
||||
|
||||
/** When writing on the conversion we use the buffer directly */
|
||||
protected ActiveMQBuffer getWriteBodyBuffer() {
|
||||
readBodyBuffer = null; // it invalidates this buffer if anything is written
|
||||
return message.getBodyBuffer();
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public final String getJMSMessageID() throws JMSException {
|
||||
return null;
|
||||
|
|
|
@ -21,13 +21,11 @@ import javax.jms.MessageEOFException;
|
|||
import javax.jms.MessageFormatException;
|
||||
import javax.jms.StreamMessage;
|
||||
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
|
||||
import org.apache.activemq.artemis.api.core.Message;
|
||||
import org.apache.activemq.artemis.api.core.Pair;
|
||||
import org.apache.activemq.artemis.core.message.impl.MessageInternal;
|
||||
import org.apache.activemq.artemis.utils.DataConstants;
|
||||
|
||||
import static org.apache.activemq.artemis.reader.MessageUtil.getBodyBuffer;
|
||||
import static org.apache.activemq.artemis.reader.StreamMessageUtil.streamReadBoolean;
|
||||
import static org.apache.activemq.artemis.reader.StreamMessageUtil.streamReadByte;
|
||||
import static org.apache.activemq.artemis.reader.StreamMessageUtil.streamReadBytes;
|
||||
|
@ -48,14 +46,14 @@ public final class ServerJMSStreamMessage extends ServerJMSMessage implements St
|
|||
|
||||
public ServerJMSStreamMessage(MessageInternal message, int deliveryCount) {
|
||||
super(message, deliveryCount);
|
||||
|
||||
}
|
||||
|
||||
// StreamMessage implementation ----------------------------------
|
||||
|
||||
public boolean readBoolean() throws JMSException {
|
||||
|
||||
try {
|
||||
return streamReadBoolean(message);
|
||||
return streamReadBoolean(getReadBodyBuffer());
|
||||
}
|
||||
catch (IllegalStateException e) {
|
||||
throw new MessageFormatException(e.getMessage());
|
||||
|
@ -67,7 +65,7 @@ public final class ServerJMSStreamMessage extends ServerJMSMessage implements St
|
|||
|
||||
public byte readByte() throws JMSException {
|
||||
try {
|
||||
return streamReadByte(message);
|
||||
return streamReadByte(getReadBodyBuffer());
|
||||
}
|
||||
catch (IllegalStateException e) {
|
||||
throw new MessageFormatException(e.getMessage());
|
||||
|
@ -80,7 +78,7 @@ public final class ServerJMSStreamMessage extends ServerJMSMessage implements St
|
|||
public short readShort() throws JMSException {
|
||||
|
||||
try {
|
||||
return streamReadShort(message);
|
||||
return streamReadShort(getReadBodyBuffer());
|
||||
}
|
||||
catch (IllegalStateException e) {
|
||||
throw new MessageFormatException(e.getMessage());
|
||||
|
@ -93,7 +91,7 @@ public final class ServerJMSStreamMessage extends ServerJMSMessage implements St
|
|||
public char readChar() throws JMSException {
|
||||
|
||||
try {
|
||||
return streamReadChar(message);
|
||||
return streamReadChar(getReadBodyBuffer());
|
||||
}
|
||||
catch (IllegalStateException e) {
|
||||
throw new MessageFormatException(e.getMessage());
|
||||
|
@ -106,7 +104,7 @@ public final class ServerJMSStreamMessage extends ServerJMSMessage implements St
|
|||
public int readInt() throws JMSException {
|
||||
|
||||
try {
|
||||
return streamReadInteger(message);
|
||||
return streamReadInteger(getReadBodyBuffer());
|
||||
}
|
||||
catch (IllegalStateException e) {
|
||||
throw new MessageFormatException(e.getMessage());
|
||||
|
@ -119,7 +117,7 @@ public final class ServerJMSStreamMessage extends ServerJMSMessage implements St
|
|||
public long readLong() throws JMSException {
|
||||
|
||||
try {
|
||||
return streamReadLong(message);
|
||||
return streamReadLong(getReadBodyBuffer());
|
||||
}
|
||||
catch (IllegalStateException e) {
|
||||
throw new MessageFormatException(e.getMessage());
|
||||
|
@ -132,7 +130,7 @@ public final class ServerJMSStreamMessage extends ServerJMSMessage implements St
|
|||
public float readFloat() throws JMSException {
|
||||
|
||||
try {
|
||||
return streamReadFloat(message);
|
||||
return streamReadFloat(getReadBodyBuffer());
|
||||
}
|
||||
catch (IllegalStateException e) {
|
||||
throw new MessageFormatException(e.getMessage());
|
||||
|
@ -145,7 +143,7 @@ public final class ServerJMSStreamMessage extends ServerJMSMessage implements St
|
|||
public double readDouble() throws JMSException {
|
||||
|
||||
try {
|
||||
return streamReadDouble(message);
|
||||
return streamReadDouble(getReadBodyBuffer());
|
||||
}
|
||||
catch (IllegalStateException e) {
|
||||
throw new MessageFormatException(e.getMessage());
|
||||
|
@ -158,7 +156,7 @@ public final class ServerJMSStreamMessage extends ServerJMSMessage implements St
|
|||
public String readString() throws JMSException {
|
||||
|
||||
try {
|
||||
return streamReadString(message);
|
||||
return streamReadString(getReadBodyBuffer());
|
||||
}
|
||||
catch (IllegalStateException e) {
|
||||
throw new MessageFormatException(e.getMessage());
|
||||
|
@ -176,7 +174,7 @@ public final class ServerJMSStreamMessage extends ServerJMSMessage implements St
|
|||
public int readBytes(final byte[] value) throws JMSException {
|
||||
|
||||
try {
|
||||
Pair<Integer, Integer> pairRead = streamReadBytes(message, len, value);
|
||||
Pair<Integer, Integer> pairRead = streamReadBytes(getReadBodyBuffer(), len, value);
|
||||
|
||||
len = pairRead.getA();
|
||||
return pairRead.getB();
|
||||
|
@ -191,11 +189,11 @@ public final class ServerJMSStreamMessage extends ServerJMSMessage implements St
|
|||
|
||||
public Object readObject() throws JMSException {
|
||||
|
||||
if (getBodyBuffer(message).readerIndex() >= message.getEndOfBodyPosition()) {
|
||||
if (getReadBodyBuffer().readerIndex() >= message.getEndOfBodyPosition()) {
|
||||
throw new MessageEOFException("");
|
||||
}
|
||||
try {
|
||||
return streamReadObject(message);
|
||||
return streamReadObject(getReadBodyBuffer());
|
||||
}
|
||||
catch (IllegalStateException e) {
|
||||
throw new MessageFormatException(e.getMessage());
|
||||
|
@ -207,70 +205,70 @@ public final class ServerJMSStreamMessage extends ServerJMSMessage implements St
|
|||
|
||||
public void writeBoolean(final boolean value) throws JMSException {
|
||||
|
||||
getBuffer().writeByte(DataConstants.BOOLEAN);
|
||||
getBuffer().writeBoolean(value);
|
||||
getWriteBodyBuffer().writeByte(DataConstants.BOOLEAN);
|
||||
getWriteBodyBuffer().writeBoolean(value);
|
||||
}
|
||||
|
||||
public void writeByte(final byte value) throws JMSException {
|
||||
|
||||
getBuffer().writeByte(DataConstants.BYTE);
|
||||
getBuffer().writeByte(value);
|
||||
getWriteBodyBuffer().writeByte(DataConstants.BYTE);
|
||||
getWriteBodyBuffer().writeByte(value);
|
||||
}
|
||||
|
||||
public void writeShort(final short value) throws JMSException {
|
||||
|
||||
getBuffer().writeByte(DataConstants.SHORT);
|
||||
getBuffer().writeShort(value);
|
||||
getWriteBodyBuffer().writeByte(DataConstants.SHORT);
|
||||
getWriteBodyBuffer().writeShort(value);
|
||||
}
|
||||
|
||||
public void writeChar(final char value) throws JMSException {
|
||||
|
||||
getBuffer().writeByte(DataConstants.CHAR);
|
||||
getBuffer().writeShort((short) value);
|
||||
getWriteBodyBuffer().writeByte(DataConstants.CHAR);
|
||||
getWriteBodyBuffer().writeShort((short) value);
|
||||
}
|
||||
|
||||
public void writeInt(final int value) throws JMSException {
|
||||
|
||||
getBuffer().writeByte(DataConstants.INT);
|
||||
getBuffer().writeInt(value);
|
||||
getWriteBodyBuffer().writeByte(DataConstants.INT);
|
||||
getWriteBodyBuffer().writeInt(value);
|
||||
}
|
||||
|
||||
public void writeLong(final long value) throws JMSException {
|
||||
|
||||
getBuffer().writeByte(DataConstants.LONG);
|
||||
getBuffer().writeLong(value);
|
||||
getWriteBodyBuffer().writeByte(DataConstants.LONG);
|
||||
getWriteBodyBuffer().writeLong(value);
|
||||
}
|
||||
|
||||
public void writeFloat(final float value) throws JMSException {
|
||||
|
||||
getBuffer().writeByte(DataConstants.FLOAT);
|
||||
getBuffer().writeInt(Float.floatToIntBits(value));
|
||||
getWriteBodyBuffer().writeByte(DataConstants.FLOAT);
|
||||
getWriteBodyBuffer().writeInt(Float.floatToIntBits(value));
|
||||
}
|
||||
|
||||
public void writeDouble(final double value) throws JMSException {
|
||||
|
||||
getBuffer().writeByte(DataConstants.DOUBLE);
|
||||
getBuffer().writeLong(Double.doubleToLongBits(value));
|
||||
getWriteBodyBuffer().writeByte(DataConstants.DOUBLE);
|
||||
getWriteBodyBuffer().writeLong(Double.doubleToLongBits(value));
|
||||
}
|
||||
|
||||
public void writeString(final String value) throws JMSException {
|
||||
|
||||
getBuffer().writeByte(DataConstants.STRING);
|
||||
getBuffer().writeNullableString(value);
|
||||
getWriteBodyBuffer().writeByte(DataConstants.STRING);
|
||||
getWriteBodyBuffer().writeNullableString(value);
|
||||
}
|
||||
|
||||
public void writeBytes(final byte[] value) throws JMSException {
|
||||
|
||||
getBuffer().writeByte(DataConstants.BYTES);
|
||||
getBuffer().writeInt(value.length);
|
||||
getBuffer().writeBytes(value);
|
||||
getWriteBodyBuffer().writeByte(DataConstants.BYTES);
|
||||
getWriteBodyBuffer().writeInt(value.length);
|
||||
getWriteBodyBuffer().writeBytes(value);
|
||||
}
|
||||
|
||||
public void writeBytes(final byte[] value, final int offset, final int length) throws JMSException {
|
||||
|
||||
getBuffer().writeByte(DataConstants.BYTES);
|
||||
getBuffer().writeInt(length);
|
||||
getBuffer().writeBytes(value, offset, length);
|
||||
getWriteBodyBuffer().writeByte(DataConstants.BYTES);
|
||||
getWriteBodyBuffer().writeInt(length);
|
||||
getWriteBodyBuffer().writeBytes(value, offset, length);
|
||||
}
|
||||
|
||||
public void writeObject(final Object value) throws JMSException {
|
||||
|
@ -313,7 +311,7 @@ public final class ServerJMSStreamMessage extends ServerJMSMessage implements St
|
|||
}
|
||||
|
||||
public void reset() throws JMSException {
|
||||
getBuffer().resetReaderIndex();
|
||||
getWriteBodyBuffer().resetReaderIndex();
|
||||
}
|
||||
|
||||
// ActiveMQRAMessage overrides ----------------------------------------
|
||||
|
@ -322,11 +320,7 @@ public final class ServerJMSStreamMessage extends ServerJMSMessage implements St
|
|||
public void clearBody() throws JMSException {
|
||||
super.clearBody();
|
||||
|
||||
getBuffer().clear();
|
||||
}
|
||||
|
||||
private ActiveMQBuffer getBuffer() {
|
||||
return message.getBodyBuffer();
|
||||
getWriteBodyBuffer().clear();
|
||||
}
|
||||
|
||||
public void decode() throws Exception {
|
||||
|
|
|
@ -63,7 +63,7 @@ public class ServerJMSTextMessage extends ServerJMSMessage implements TextMessag
|
|||
this.text = null;
|
||||
}
|
||||
|
||||
writeBodyText(message, this.text);
|
||||
writeBodyText(getWriteBodyBuffer(), this.text);
|
||||
}
|
||||
|
||||
public String getText() {
|
||||
|
@ -84,12 +84,12 @@ public class ServerJMSTextMessage extends ServerJMSMessage implements TextMessag
|
|||
|
||||
public void encode() throws Exception {
|
||||
super.encode();
|
||||
writeBodyText(message, text);
|
||||
writeBodyText(getWriteBodyBuffer(), text);
|
||||
}
|
||||
|
||||
public void decode() throws Exception {
|
||||
super.decode();
|
||||
text = readBodyText(message);
|
||||
text = readBodyText(getReadBodyBuffer());
|
||||
}
|
||||
|
||||
}
|
|
@ -16,6 +16,7 @@
|
|||
*/
|
||||
package org.apache.activemq.artemis.core.protocol.proton;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.HashMap;
|
||||
import java.util.LinkedList;
|
||||
|
@ -473,7 +474,7 @@ public class TestConversions extends Assert {
|
|||
}
|
||||
|
||||
@Override
|
||||
public short readUnsignedByte() {
|
||||
public int readUnsignedByte() {
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
@ -588,8 +589,8 @@ public class TestConversions extends Assert {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void skipBytes(int length) {
|
||||
|
||||
public int skipBytes(int length) {
|
||||
return length;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -682,6 +683,19 @@ public class TestConversions extends Assert {
|
|||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void readFully(byte[] b) throws IOException {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void readFully(byte[] b, int off, int len) throws IOException {
|
||||
}
|
||||
|
||||
@Override
|
||||
public String readLine() throws IOException {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ActiveMQBuffer copy() {
|
||||
return null;
|
||||
|
|
|
@ -1,228 +0,0 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.artemis.core.protocol.openwire;
|
||||
|
||||
import java.io.DataInput;
|
||||
import java.io.IOException;
|
||||
import java.nio.ByteBuffer;
|
||||
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
|
||||
import org.apache.activemq.artemis.utils.UTF8Util;
|
||||
import org.apache.activemq.artemis.utils.UTF8Util.StringUtilBuffer;
|
||||
|
||||
public class DataInputWrapper implements DataInput {
|
||||
|
||||
private static final int DEFAULT_CAPACITY = 1024 * 1024;
|
||||
private static final NotEnoughBytesException exception = new NotEnoughBytesException();
|
||||
private ByteBuffer internalBuffer;
|
||||
|
||||
public DataInputWrapper() {
|
||||
this(DEFAULT_CAPACITY);
|
||||
}
|
||||
|
||||
public DataInputWrapper(int capacity) {
|
||||
this.internalBuffer = ByteBuffer.allocateDirect(capacity);
|
||||
this.internalBuffer.mark();
|
||||
this.internalBuffer.limit(0);
|
||||
}
|
||||
|
||||
public void receiveData(byte[] data) {
|
||||
int newSize = data.length;
|
||||
int freeSpace = internalBuffer.capacity() - internalBuffer.limit();
|
||||
if (freeSpace < newSize) {
|
||||
internalBuffer.reset();
|
||||
internalBuffer.compact();
|
||||
if (internalBuffer.remaining() < newSize) {
|
||||
//need to enlarge
|
||||
}
|
||||
//make sure mark is at zero and position is at effective limit
|
||||
int pos = internalBuffer.position();
|
||||
internalBuffer.position(0);
|
||||
internalBuffer.mark();
|
||||
internalBuffer.position(pos);
|
||||
}
|
||||
else {
|
||||
internalBuffer.position(internalBuffer.limit());
|
||||
internalBuffer.limit(internalBuffer.capacity());
|
||||
}
|
||||
internalBuffer.put(data);
|
||||
internalBuffer.limit(internalBuffer.position());
|
||||
internalBuffer.reset();
|
||||
}
|
||||
|
||||
public void receiveData(ActiveMQBuffer buffer) {
|
||||
int newSize = buffer.readableBytes();
|
||||
byte[] newData = new byte[newSize];
|
||||
buffer.readBytes(newData);
|
||||
this.receiveData(newData);
|
||||
}
|
||||
|
||||
//invoke after each successful unmarshall
|
||||
public void mark() {
|
||||
this.internalBuffer.mark();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void readFully(byte[] b) throws IOException {
|
||||
readFully(b, 0, b.length);
|
||||
}
|
||||
|
||||
private void checkSize(int n) throws NotEnoughBytesException {
|
||||
if (internalBuffer.remaining() < n) {
|
||||
throw exception;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void readFully(byte[] b, int off, int len) throws IOException {
|
||||
checkSize(len);
|
||||
internalBuffer.get(b, off, len);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int skipBytes(int n) throws IOException {
|
||||
checkSize(n);
|
||||
int pos = internalBuffer.position();
|
||||
internalBuffer.position(pos + n);
|
||||
return n;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean readBoolean() throws IOException {
|
||||
checkSize(1);
|
||||
byte b = internalBuffer.get();
|
||||
return b != 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public byte readByte() throws IOException {
|
||||
checkSize(1);
|
||||
return this.internalBuffer.get();
|
||||
}
|
||||
|
||||
@Override
|
||||
public int readUnsignedByte() throws IOException {
|
||||
checkSize(1);
|
||||
return 0xFF & this.internalBuffer.get();
|
||||
}
|
||||
|
||||
@Override
|
||||
public short readShort() throws IOException {
|
||||
checkSize(2);
|
||||
return this.internalBuffer.getShort();
|
||||
}
|
||||
|
||||
@Override
|
||||
public int readUnsignedShort() throws IOException {
|
||||
checkSize(2);
|
||||
return 0xFFFF & this.internalBuffer.getShort();
|
||||
}
|
||||
|
||||
@Override
|
||||
public char readChar() throws IOException {
|
||||
checkSize(2);
|
||||
return this.internalBuffer.getChar();
|
||||
}
|
||||
|
||||
@Override
|
||||
public int readInt() throws IOException {
|
||||
checkSize(4);
|
||||
return this.internalBuffer.getInt();
|
||||
}
|
||||
|
||||
@Override
|
||||
public long readLong() throws IOException {
|
||||
checkSize(8);
|
||||
return this.internalBuffer.getLong();
|
||||
}
|
||||
|
||||
@Override
|
||||
public float readFloat() throws IOException {
|
||||
checkSize(4);
|
||||
return this.internalBuffer.getFloat();
|
||||
}
|
||||
|
||||
@Override
|
||||
public double readDouble() throws IOException {
|
||||
checkSize(8);
|
||||
return this.internalBuffer.getDouble();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String readLine() throws IOException {
|
||||
StringBuilder sb = new StringBuilder("");
|
||||
char c = this.readChar();
|
||||
while (c != '\n') {
|
||||
sb.append(c);
|
||||
c = this.readChar();
|
||||
}
|
||||
return sb.toString();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String readUTF() throws IOException {
|
||||
StringUtilBuffer buffer = UTF8Util.getThreadLocalBuffer();
|
||||
|
||||
final int size = this.readUnsignedShort();
|
||||
|
||||
if (size > buffer.byteBuffer.length) {
|
||||
buffer.resizeByteBuffer(size);
|
||||
}
|
||||
|
||||
if (size > buffer.charBuffer.length) {
|
||||
buffer.resizeCharBuffer(size);
|
||||
}
|
||||
|
||||
int count = 0;
|
||||
int byte1, byte2, byte3;
|
||||
int charCount = 0;
|
||||
|
||||
this.readFully(buffer.byteBuffer, 0, size);
|
||||
|
||||
while (count < size) {
|
||||
byte1 = buffer.byteBuffer[count++];
|
||||
|
||||
if (byte1 > 0 && byte1 <= 0x7F) {
|
||||
buffer.charBuffer[charCount++] = (char) byte1;
|
||||
}
|
||||
else {
|
||||
int c = byte1 & 0xff;
|
||||
switch (c >> 4) {
|
||||
case 0xc:
|
||||
case 0xd:
|
||||
byte2 = buffer.byteBuffer[count++];
|
||||
buffer.charBuffer[charCount++] = (char) ((c & 0x1F) << 6 | byte2 & 0x3F);
|
||||
break;
|
||||
case 0xe:
|
||||
byte2 = buffer.byteBuffer[count++];
|
||||
byte3 = buffer.byteBuffer[count++];
|
||||
buffer.charBuffer[charCount++] = (char) ((c & 0x0F) << 12 | (byte2 & 0x3F) << 6 | (byte3 & 0x3F) << 0);
|
||||
break;
|
||||
default:
|
||||
throw new InternalError("unhandled utf8 byte " + c);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return new String(buffer.charBuffer, 0, charCount);
|
||||
}
|
||||
|
||||
public boolean readable() {
|
||||
return this.internalBuffer.hasRemaining();
|
||||
}
|
||||
|
||||
}
|
|
@ -40,9 +40,23 @@ import org.apache.activemq.artemis.api.core.ActiveMQException;
|
|||
import org.apache.activemq.artemis.api.core.ActiveMQNonExistentQueueException;
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQSecurityException;
|
||||
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||
import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQBrokerStoppedException;
|
||||
import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQCompositeConsumerBrokerExchange;
|
||||
import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQConnectionContext;
|
||||
import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQConsumer;
|
||||
import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQConsumerBrokerExchange;
|
||||
import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQMessageAuthorizationPolicy;
|
||||
import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQProducerBrokerExchange;
|
||||
import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQSession;
|
||||
import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQSingleConsumerBrokerExchange;
|
||||
import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQTransaction;
|
||||
import org.apache.activemq.artemis.core.remoting.CloseListener;
|
||||
import org.apache.activemq.artemis.core.remoting.FailureListener;
|
||||
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
|
||||
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
|
||||
import org.apache.activemq.artemis.spi.core.remoting.Acceptor;
|
||||
import org.apache.activemq.artemis.spi.core.remoting.Connection;
|
||||
import org.apache.activemq.artemis.utils.ConcurrentHashSet;
|
||||
import org.apache.activemq.command.ActiveMQDestination;
|
||||
import org.apache.activemq.command.ActiveMQMessage;
|
||||
import org.apache.activemq.command.BrokerInfo;
|
||||
|
@ -77,20 +91,7 @@ import org.apache.activemq.command.ShutdownInfo;
|
|||
import org.apache.activemq.command.TransactionId;
|
||||
import org.apache.activemq.command.TransactionInfo;
|
||||
import org.apache.activemq.command.WireFormatInfo;
|
||||
import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQBrokerStoppedException;
|
||||
import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQConnectionContext;
|
||||
import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQConsumerBrokerExchange;
|
||||
import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQMessageAuthorizationPolicy;
|
||||
import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQProducerBrokerExchange;
|
||||
import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQSession;
|
||||
import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQTransaction;
|
||||
import org.apache.activemq.artemis.core.remoting.CloseListener;
|
||||
import org.apache.activemq.artemis.core.remoting.FailureListener;
|
||||
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
|
||||
import org.apache.activemq.openwire.OpenWireFormat;
|
||||
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
|
||||
import org.apache.activemq.artemis.spi.core.remoting.Acceptor;
|
||||
import org.apache.activemq.artemis.spi.core.remoting.Connection;
|
||||
import org.apache.activemq.state.CommandVisitor;
|
||||
import org.apache.activemq.state.ConnectionState;
|
||||
import org.apache.activemq.state.ConsumerState;
|
||||
|
@ -100,7 +101,6 @@ import org.apache.activemq.thread.TaskRunner;
|
|||
import org.apache.activemq.thread.TaskRunnerFactory;
|
||||
import org.apache.activemq.transport.TransmitCallback;
|
||||
import org.apache.activemq.util.ByteSequence;
|
||||
import org.apache.activemq.artemis.utils.ConcurrentHashSet;
|
||||
import org.apache.activemq.wireformat.WireFormat;
|
||||
|
||||
/**
|
||||
|
@ -176,8 +176,6 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor {
|
|||
|
||||
private final Set<String> tempQueues = new ConcurrentHashSet<String>();
|
||||
|
||||
private DataInputWrapper dataInput = new DataInputWrapper();
|
||||
|
||||
private Map<TransactionId, TransactionInfo> txMap = new ConcurrentHashMap<TransactionId, TransactionInfo>();
|
||||
|
||||
private volatile AMQSession advisorySession;
|
||||
|
@ -196,96 +194,78 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor {
|
|||
@Override
|
||||
public void bufferReceived(Object connectionID, ActiveMQBuffer buffer) {
|
||||
try {
|
||||
dataInput.receiveData(buffer);
|
||||
}
|
||||
catch (Throwable t) {
|
||||
ActiveMQServerLogger.LOGGER.error("decoding error", t);
|
||||
return;
|
||||
}
|
||||
Object object = wireFormat.unmarshal(buffer);
|
||||
|
||||
// this.setDataReceived();
|
||||
while (dataInput.readable()) {
|
||||
try {
|
||||
Object object = null;
|
||||
try {
|
||||
object = wireFormat.unmarshal(dataInput);
|
||||
dataInput.mark();
|
||||
Command command = (Command) object;
|
||||
boolean responseRequired = command.isResponseRequired();
|
||||
int commandId = command.getCommandId();
|
||||
// the connection handles pings, negotiations directly.
|
||||
// and delegate all other commands to manager.
|
||||
if (command.getClass() == KeepAliveInfo.class) {
|
||||
KeepAliveInfo info = (KeepAliveInfo) command;
|
||||
if (info.isResponseRequired()) {
|
||||
info.setResponseRequired(false);
|
||||
protocolManager.sendReply(this, info);
|
||||
}
|
||||
catch (NotEnoughBytesException e) {
|
||||
//meaning the dataInput hasn't enough bytes for a command.
|
||||
//in that case we just return and waiting for the next
|
||||
//call of bufferReceived()
|
||||
return;
|
||||
}
|
||||
else if (command.getClass() == WireFormatInfo.class) {
|
||||
// amq here starts a read/write monitor thread (detect ttl?)
|
||||
negotiate((WireFormatInfo) command);
|
||||
}
|
||||
else if (command.getClass() == ConnectionInfo.class || command.getClass() == ConsumerInfo.class || command.getClass() == RemoveInfo.class ||
|
||||
command.getClass() == SessionInfo.class || command.getClass() == ProducerInfo.class || ActiveMQMessage.class.isAssignableFrom(command.getClass()) ||
|
||||
command.getClass() == MessageAck.class || command.getClass() == TransactionInfo.class || command.getClass() == DestinationInfo.class ||
|
||||
command.getClass() == ShutdownInfo.class || command.getClass() == RemoveSubscriptionInfo.class) {
|
||||
Response response = null;
|
||||
|
||||
if (pendingStop) {
|
||||
response = new ExceptionResponse(this.stopError);
|
||||
}
|
||||
else {
|
||||
response = ((Command) command).visit(this);
|
||||
|
||||
Command command = (Command) object;
|
||||
boolean responseRequired = command.isResponseRequired();
|
||||
int commandId = command.getCommandId();
|
||||
// the connection handles pings, negotiations directly.
|
||||
// and delegate all other commands to manager.
|
||||
if (command.getClass() == KeepAliveInfo.class) {
|
||||
KeepAliveInfo info = (KeepAliveInfo) command;
|
||||
if (info.isResponseRequired()) {
|
||||
info.setResponseRequired(false);
|
||||
protocolManager.sendReply(this, info);
|
||||
}
|
||||
}
|
||||
else if (command.getClass() == WireFormatInfo.class) {
|
||||
// amq here starts a read/write monitor thread (detect ttl?)
|
||||
negotiate((WireFormatInfo) command);
|
||||
}
|
||||
else if (command.getClass() == ConnectionInfo.class || command.getClass() == ConsumerInfo.class || command.getClass() == RemoveInfo.class || command.getClass() == SessionInfo.class || command.getClass() == ProducerInfo.class || ActiveMQMessage.class.isAssignableFrom(command.getClass()) || command.getClass() == MessageAck.class || command.getClass() == TransactionInfo.class || command.getClass() == DestinationInfo.class || command.getClass() == ShutdownInfo.class || command.getClass() == RemoveSubscriptionInfo.class) {
|
||||
Response response = null;
|
||||
|
||||
if (pendingStop) {
|
||||
response = new ExceptionResponse(this.stopError);
|
||||
}
|
||||
else {
|
||||
response = ((Command) command).visit(this);
|
||||
|
||||
if (response instanceof ExceptionResponse) {
|
||||
if (!responseRequired) {
|
||||
Throwable cause = ((ExceptionResponse) response).getException();
|
||||
serviceException(cause);
|
||||
response = null;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (responseRequired) {
|
||||
if (response == null) {
|
||||
response = new Response();
|
||||
}
|
||||
}
|
||||
|
||||
// The context may have been flagged so that the response is not
|
||||
// sent.
|
||||
if (context != null) {
|
||||
if (context.isDontSendReponse()) {
|
||||
context.setDontSendReponse(false);
|
||||
if (response instanceof ExceptionResponse) {
|
||||
if (!responseRequired) {
|
||||
Throwable cause = ((ExceptionResponse) response).getException();
|
||||
serviceException(cause);
|
||||
response = null;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (response != null && !protocolManager.isStopping()) {
|
||||
response.setCorrelationId(commandId);
|
||||
dispatchSync(response);
|
||||
if (responseRequired) {
|
||||
if (response == null) {
|
||||
response = new Response();
|
||||
}
|
||||
}
|
||||
|
||||
// The context may have been flagged so that the response is not
|
||||
// sent.
|
||||
if (context != null) {
|
||||
if (context.isDontSendReponse()) {
|
||||
context.setDontSendReponse(false);
|
||||
response = null;
|
||||
}
|
||||
}
|
||||
else {
|
||||
// note!!! wait for negotiation (e.g. use a countdown latch)
|
||||
// before handling any other commands
|
||||
this.protocolManager.handleCommand(this, command);
|
||||
|
||||
if (response != null && !protocolManager.isStopping()) {
|
||||
response.setCorrelationId(commandId);
|
||||
dispatchSync(response);
|
||||
}
|
||||
|
||||
}
|
||||
catch (IOException e) {
|
||||
ActiveMQServerLogger.LOGGER.error("error decoding", e);
|
||||
}
|
||||
catch (Throwable t) {
|
||||
ActiveMQServerLogger.LOGGER.error("error decoding", t);
|
||||
else {
|
||||
// note!!! wait for negotiation (e.g. use a countdown latch)
|
||||
// before handling any other commands
|
||||
this.protocolManager.handleCommand(this, command);
|
||||
}
|
||||
}
|
||||
catch (IOException e) {
|
||||
ActiveMQServerLogger.LOGGER.error("error decoding", e);
|
||||
}
|
||||
catch (Throwable t) {
|
||||
ActiveMQServerLogger.LOGGER.error("error decoding", t);
|
||||
}
|
||||
}
|
||||
|
||||
private void negotiate(WireFormatInfo command) throws IOException {
|
||||
|
@ -624,6 +604,7 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor {
|
|||
|
||||
public void serviceExceptionAsync(final IOException e) {
|
||||
if (asyncException.compareAndSet(false, true)) {
|
||||
// Why this is not through an executor?
|
||||
new Thread("Async Exception Handler") {
|
||||
@Override
|
||||
public void run() {
|
||||
|
|
|
@ -16,6 +16,7 @@
|
|||
*/
|
||||
package org.apache.activemq.artemis.core.protocol.openwire;
|
||||
|
||||
import javax.jms.JMSException;
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.DataInputStream;
|
||||
import java.io.DataOutputStream;
|
||||
|
@ -23,13 +24,19 @@ import java.io.IOException;
|
|||
import java.io.InputStream;
|
||||
import java.util.Iterator;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.Map.Entry;
|
||||
|
||||
import javax.jms.JMSException;
|
||||
import java.util.Set;
|
||||
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQPropertyConversionException;
|
||||
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||
import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQConsumer;
|
||||
import org.apache.activemq.artemis.core.server.ServerMessage;
|
||||
import org.apache.activemq.artemis.core.server.impl.ServerMessageImpl;
|
||||
import org.apache.activemq.artemis.spi.core.protocol.MessageConverter;
|
||||
import org.apache.activemq.artemis.utils.DataConstants;
|
||||
import org.apache.activemq.artemis.utils.TypedProperties;
|
||||
import org.apache.activemq.artemis.utils.UUIDGenerator;
|
||||
import org.apache.activemq.command.ActiveMQBytesMessage;
|
||||
import org.apache.activemq.command.ActiveMQDestination;
|
||||
import org.apache.activemq.command.ActiveMQMapMessage;
|
||||
|
@ -51,14 +58,6 @@ import org.apache.activemq.util.ByteSequence;
|
|||
import org.apache.activemq.util.MarshallingSupport;
|
||||
import org.apache.activemq.wireformat.WireFormat;
|
||||
import org.fusesource.hawtbuf.UTF8Buffer;
|
||||
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||
import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQConsumer;
|
||||
import org.apache.activemq.artemis.core.server.ServerMessage;
|
||||
import org.apache.activemq.artemis.core.server.impl.ServerMessageImpl;
|
||||
import org.apache.activemq.artemis.spi.core.protocol.MessageConverter;
|
||||
import org.apache.activemq.artemis.utils.DataConstants;
|
||||
import org.apache.activemq.artemis.utils.TypedProperties;
|
||||
import org.apache.activemq.artemis.utils.UUIDGenerator;
|
||||
|
||||
public class OpenWireMessageConverter implements MessageConverter {
|
||||
|
||||
|
@ -429,7 +428,7 @@ public class OpenWireMessageConverter implements MessageConverter {
|
|||
}
|
||||
amqMsg.setBrokerInTime(brokerInTime);
|
||||
|
||||
ActiveMQBuffer buffer = coreMessage.getBodyBuffer();
|
||||
ActiveMQBuffer buffer = coreMessage.getBodyBufferCopy();
|
||||
if (buffer != null) {
|
||||
buffer.resetReaderIndex();
|
||||
byte[] bytes = null;
|
||||
|
|
|
@ -31,18 +31,38 @@ import java.util.concurrent.CopyOnWriteArrayList;
|
|||
import java.util.concurrent.ScheduledExecutorService;
|
||||
|
||||
import io.netty.channel.ChannelPipeline;
|
||||
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
|
||||
import org.apache.activemq.advisory.AdvisorySupport;
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
|
||||
import org.apache.activemq.artemis.api.core.BaseInterceptor;
|
||||
import org.apache.activemq.artemis.api.core.Interceptor;
|
||||
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||
import org.apache.activemq.artemis.api.core.management.CoreNotificationType;
|
||||
import org.apache.activemq.artemis.core.io.IOCallback;
|
||||
import org.apache.activemq.artemis.api.core.management.ManagementHelper;
|
||||
import org.apache.activemq.artemis.core.io.IOCallback;
|
||||
import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQConnectionContext;
|
||||
import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQConsumer;
|
||||
import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQPersistenceAdapter;
|
||||
import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQProducerBrokerExchange;
|
||||
import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQServerSession;
|
||||
import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQSession;
|
||||
import org.apache.activemq.artemis.core.remoting.impl.netty.NettyServerConnection;
|
||||
import org.apache.activemq.artemis.core.security.CheckType;
|
||||
import org.apache.activemq.artemis.core.server.ActiveMQServer;
|
||||
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
|
||||
import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
|
||||
import org.apache.activemq.artemis.core.server.management.ManagementService;
|
||||
import org.apache.activemq.artemis.core.server.management.Notification;
|
||||
import org.apache.activemq.artemis.core.server.management.NotificationListener;
|
||||
import org.apache.activemq.artemis.spi.core.protocol.ConnectionEntry;
|
||||
import org.apache.activemq.artemis.spi.core.protocol.MessageConverter;
|
||||
import org.apache.activemq.artemis.spi.core.protocol.ProtocolManager;
|
||||
import org.apache.activemq.artemis.spi.core.protocol.ProtocolManagerFactory;
|
||||
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
|
||||
import org.apache.activemq.artemis.spi.core.remoting.Acceptor;
|
||||
import org.apache.activemq.artemis.spi.core.remoting.Connection;
|
||||
import org.apache.activemq.artemis.spi.core.security.ActiveMQSecurityManager;
|
||||
import org.apache.activemq.artemis.utils.DataConstants;
|
||||
import org.apache.activemq.command.ActiveMQDestination;
|
||||
import org.apache.activemq.command.ActiveMQMessage;
|
||||
import org.apache.activemq.command.ActiveMQTopic;
|
||||
|
@ -66,26 +86,8 @@ import org.apache.activemq.command.TransactionId;
|
|||
import org.apache.activemq.command.TransactionInfo;
|
||||
import org.apache.activemq.command.WireFormatInfo;
|
||||
import org.apache.activemq.command.XATransactionId;
|
||||
import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQConnectionContext;
|
||||
import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQPersistenceAdapter;
|
||||
import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQProducerBrokerExchange;
|
||||
import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQServerSession;
|
||||
import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQSession;
|
||||
import org.apache.activemq.artemis.core.remoting.impl.netty.NettyServerConnection;
|
||||
import org.apache.activemq.artemis.core.security.CheckType;
|
||||
import org.apache.activemq.artemis.core.server.ActiveMQServer;
|
||||
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
|
||||
import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
|
||||
import org.apache.activemq.openwire.OpenWireFormat;
|
||||
import org.apache.activemq.openwire.OpenWireFormatFactory;
|
||||
import org.apache.activemq.artemis.spi.core.protocol.ConnectionEntry;
|
||||
import org.apache.activemq.artemis.spi.core.protocol.MessageConverter;
|
||||
import org.apache.activemq.artemis.spi.core.protocol.ProtocolManager;
|
||||
import org.apache.activemq.artemis.spi.core.protocol.ProtocolManagerFactory;
|
||||
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
|
||||
import org.apache.activemq.artemis.spi.core.remoting.Acceptor;
|
||||
import org.apache.activemq.artemis.spi.core.remoting.Connection;
|
||||
import org.apache.activemq.artemis.spi.core.security.ActiveMQSecurityManager;
|
||||
import org.apache.activemq.state.ConnectionState;
|
||||
import org.apache.activemq.state.ProducerState;
|
||||
import org.apache.activemq.state.SessionState;
|
||||
|
@ -183,8 +185,8 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, No
|
|||
|
||||
@Override
|
||||
public void addChannelHandlers(ChannelPipeline pipeline) {
|
||||
// TODO Auto-generated method stub
|
||||
|
||||
// each read will have a full packet with this
|
||||
pipeline.addLast("packet-decipher", new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, DataConstants.SIZE_INT));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -275,12 +275,6 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
|
|||
return HandleStatus.BUSY;
|
||||
}
|
||||
|
||||
// TODO - https://jira.jboss.org/browse/HORNETQ-533
|
||||
// if (!writeReady.get())
|
||||
// {
|
||||
// return HandleStatus.BUSY;
|
||||
// }
|
||||
|
||||
synchronized (lock) {
|
||||
// If the consumer is stopped then we don't accept the message, it
|
||||
// should go back into the
|
||||
|
|
|
@ -27,18 +27,18 @@ import javax.jms.Session;
|
|||
import javax.jms.TemporaryQueue;
|
||||
import javax.jms.TemporaryTopic;
|
||||
import javax.jms.TextMessage;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.apache.activemq.ActiveMQConnectionFactory;
|
||||
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
|
||||
import org.apache.activemq.command.ActiveMQQueue;
|
||||
import org.apache.activemq.command.ActiveMQTopic;
|
||||
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
|
||||
import org.junit.Before;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
import org.junit.rules.ExpectedException;
|
||||
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
public class SimpleOpenWireTest extends BasicOpenWireTest {
|
||||
|
||||
@Rule
|
||||
|
@ -300,60 +300,6 @@ public class SimpleOpenWireTest extends BasicOpenWireTest {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* This is the example shipped with the distribution
|
||||
*
|
||||
* @throws Exception
|
||||
*/
|
||||
@Test
|
||||
public void testOpenWireExample() throws Exception {
|
||||
Connection exConn = null;
|
||||
|
||||
try {
|
||||
String urlString = "tcp://" + OWHOST + ":" + OWPORT + "?wireFormat.cacheEnabled=true";
|
||||
ActiveMQConnectionFactory exFact = new ActiveMQConnectionFactory(urlString);
|
||||
|
||||
// Step 2. Perfom a lookup on the queue
|
||||
Queue queue = new ActiveMQQueue(durableQueueName);
|
||||
|
||||
// Step 4.Create a JMS Connection
|
||||
exConn = exFact.createConnection();
|
||||
|
||||
// Step 10. Start the Connection
|
||||
exConn.start();
|
||||
|
||||
// Step 5. Create a JMS Session
|
||||
Session session = exConn.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
|
||||
// Step 6. Create a JMS Message Producer
|
||||
MessageProducer producer = session.createProducer(queue);
|
||||
|
||||
// Step 7. Create a Text Message
|
||||
TextMessage message = session.createTextMessage("This is a text message");
|
||||
|
||||
//System.out.println("Sent message: " + message.getText());
|
||||
|
||||
// Step 8. Send the Message
|
||||
producer.send(message);
|
||||
|
||||
// Step 9. Create a JMS Message Consumer
|
||||
MessageConsumer messageConsumer = session.createConsumer(queue);
|
||||
|
||||
// Step 11. Receive the message
|
||||
TextMessage messageReceived = (TextMessage) messageConsumer.receive(5000);
|
||||
|
||||
System.out.println("Received message: " + messageReceived);
|
||||
|
||||
assertEquals("This is a text message", messageReceived.getText());
|
||||
}
|
||||
finally {
|
||||
if (exConn != null) {
|
||||
exConn.close();
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFailoverTransportReconnect() throws Exception {
|
||||
Connection exConn = null;
|
||||
|
@ -389,4 +335,132 @@ public class SimpleOpenWireTest extends BasicOpenWireTest {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* This is the example shipped with the distribution
|
||||
*
|
||||
* @throws Exception
|
||||
*/
|
||||
@Test
|
||||
public void testOpenWireExample() throws Exception {
|
||||
Connection exConn = null;
|
||||
|
||||
SimpleString durableQueue = new SimpleString("jms.queue.exampleQueue");
|
||||
this.server.createQueue(durableQueue, durableQueue, null, true, false);
|
||||
|
||||
try {
|
||||
ActiveMQConnectionFactory exFact = new ActiveMQConnectionFactory();
|
||||
|
||||
Queue queue = new ActiveMQQueue(durableQueueName);
|
||||
|
||||
exConn = exFact.createConnection();
|
||||
|
||||
exConn.start();
|
||||
|
||||
Session session = exConn.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
|
||||
MessageProducer producer = session.createProducer(queue);
|
||||
|
||||
TextMessage message = session.createTextMessage("This is a text message");
|
||||
|
||||
producer.send(message);
|
||||
|
||||
MessageConsumer messageConsumer = session.createConsumer(queue);
|
||||
|
||||
TextMessage messageReceived = (TextMessage) messageConsumer.receive(5000);
|
||||
|
||||
assertEquals("This is a text message", messageReceived.getText());
|
||||
}
|
||||
finally {
|
||||
if (exConn != null) {
|
||||
exConn.close();
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* This is the example shipped with the distribution
|
||||
*
|
||||
* @throws Exception
|
||||
*/
|
||||
@Test
|
||||
public void testMultipleConsumers() throws Exception {
|
||||
Connection exConn = null;
|
||||
|
||||
SimpleString durableQueue = new SimpleString("jms.queue.exampleQueue");
|
||||
this.server.createQueue(durableQueue, durableQueue, null, true, false);
|
||||
|
||||
try {
|
||||
ActiveMQConnectionFactory exFact = new ActiveMQConnectionFactory();
|
||||
|
||||
Queue queue = new ActiveMQQueue(durableQueueName);
|
||||
|
||||
exConn = exFact.createConnection();
|
||||
|
||||
exConn.start();
|
||||
|
||||
Session session = exConn.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
|
||||
MessageProducer producer = session.createProducer(queue);
|
||||
|
||||
TextMessage message = session.createTextMessage("This is a text message");
|
||||
|
||||
producer.send(message);
|
||||
|
||||
MessageConsumer messageConsumer = session.createConsumer(queue);
|
||||
|
||||
TextMessage messageReceived = (TextMessage) messageConsumer.receive(5000);
|
||||
|
||||
assertEquals("This is a text message", messageReceived.getText());
|
||||
}
|
||||
finally {
|
||||
if (exConn != null) {
|
||||
exConn.close();
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMixedOpenWireExample() throws Exception {
|
||||
Connection openConn = null;
|
||||
|
||||
SimpleString durableQueue = new SimpleString("jms.queue.exampleQueue");
|
||||
this.server.createQueue(durableQueue, durableQueue, null, true, false);
|
||||
|
||||
ActiveMQConnectionFactory openCF = new ActiveMQConnectionFactory();
|
||||
|
||||
Queue queue = new ActiveMQQueue("exampleQueue");
|
||||
|
||||
openConn = openCF.createConnection();
|
||||
|
||||
openConn.start();
|
||||
|
||||
Session openSession = openConn.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
|
||||
MessageProducer producer = openSession.createProducer(queue);
|
||||
|
||||
TextMessage message = openSession.createTextMessage("This is a text message");
|
||||
|
||||
producer.send(message);
|
||||
|
||||
org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory artemisCF = new org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory();
|
||||
|
||||
Connection artemisConn = artemisCF.createConnection();
|
||||
Session artemisSession = artemisConn.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
artemisConn.start();
|
||||
MessageConsumer messageConsumer = artemisSession.createConsumer(artemisSession.createQueue("exampleQueue"));
|
||||
|
||||
TextMessage messageReceived = (TextMessage) messageConsumer.receive(5000);
|
||||
|
||||
assertEquals("This is a text message", messageReceived.getText());
|
||||
|
||||
openConn.close();
|
||||
artemisConn.close();
|
||||
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
|
|
@ -0,0 +1,118 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.activemq.artemis.tests.integration.openwire;
|
||||
|
||||
import javax.jms.Connection;
|
||||
import javax.jms.MessageConsumer;
|
||||
import javax.jms.MessageProducer;
|
||||
import javax.jms.Queue;
|
||||
import javax.jms.Session;
|
||||
import javax.jms.TextMessage;
|
||||
|
||||
import org.apache.activemq.ActiveMQConnectionFactory;
|
||||
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||
import org.apache.activemq.command.ActiveMQQueue;
|
||||
import org.junit.Test;
|
||||
|
||||
/** This is useful to debug connection ordering. There's only one connection being made from these tests */
|
||||
public class VerySimpleOenwireTest extends OpenWireTestBase {
|
||||
|
||||
/**
|
||||
* This is the example shipped with the distribution
|
||||
*
|
||||
* @throws Exception
|
||||
*/
|
||||
@Test
|
||||
public void testOpenWireExample() throws Exception {
|
||||
Connection exConn = null;
|
||||
|
||||
SimpleString durableQueue = new SimpleString("jms.queue.exampleQueue");
|
||||
this.server.createQueue(durableQueue, durableQueue, null, true, false);
|
||||
|
||||
try {
|
||||
ActiveMQConnectionFactory exFact = new ActiveMQConnectionFactory();
|
||||
|
||||
Queue queue = new ActiveMQQueue("exampleQueue");
|
||||
|
||||
exConn = exFact.createConnection();
|
||||
|
||||
exConn.start();
|
||||
|
||||
Session session = exConn.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
|
||||
MessageProducer producer = session.createProducer(queue);
|
||||
|
||||
TextMessage message = session.createTextMessage("This is a text message");
|
||||
|
||||
producer.send(message);
|
||||
|
||||
MessageConsumer messageConsumer = session.createConsumer(queue);
|
||||
|
||||
TextMessage messageReceived = (TextMessage) messageConsumer.receive(5000);
|
||||
|
||||
assertEquals("This is a text message", messageReceived.getText());
|
||||
}
|
||||
finally {
|
||||
if (exConn != null) {
|
||||
exConn.close();
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMixedOpenWireExample() throws Exception {
|
||||
Connection openConn = null;
|
||||
|
||||
SimpleString durableQueue = new SimpleString("jms.queue.exampleQueue");
|
||||
this.server.createQueue(durableQueue, durableQueue, null, true, false);
|
||||
|
||||
ActiveMQConnectionFactory openCF = new ActiveMQConnectionFactory();
|
||||
|
||||
Queue queue = new ActiveMQQueue("exampleQueue");
|
||||
|
||||
openConn = openCF.createConnection();
|
||||
|
||||
openConn.start();
|
||||
|
||||
Session openSession = openConn.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
|
||||
MessageProducer producer = openSession.createProducer(queue);
|
||||
|
||||
TextMessage message = openSession.createTextMessage("This is a text message");
|
||||
|
||||
producer.send(message);
|
||||
|
||||
org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory artemisCF = new org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory();
|
||||
|
||||
Connection artemisConn = artemisCF.createConnection();
|
||||
Session artemisSession = artemisConn.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
artemisConn.start();
|
||||
MessageConsumer messageConsumer = artemisSession.createConsumer(artemisSession.createQueue("exampleQueue"));
|
||||
|
||||
TextMessage messageReceived = (TextMessage) messageConsumer.receive(5000);
|
||||
|
||||
assertEquals("This is a text message", messageReceived.getText());
|
||||
|
||||
openConn.close();
|
||||
artemisConn.close();
|
||||
|
||||
}
|
||||
|
||||
|
||||
}
|
|
@ -16,6 +16,10 @@
|
|||
*/
|
||||
package org.apache.activemq.artemis.tests.integration.transports.netty;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Random;
|
||||
|
||||
import io.netty.buffer.ByteBuf;
|
||||
import io.netty.buffer.Unpooled;
|
||||
import io.netty.channel.embedded.EmbeddedChannel;
|
||||
|
@ -24,10 +28,6 @@ import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
|
|||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Random;
|
||||
|
||||
public class ActiveMQFrameDecoder2Test extends ActiveMQTestBase {
|
||||
|
||||
private static final int MSG_CNT = 10000;
|
||||
|
|
Loading…
Reference in New Issue