HADOOP-9669 Reduce the number of byte array creations and copies in XDR data manipulation. Contributed by Haohui Mai
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1524259 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
ac618c6429
commit
c16442c459
|
@ -430,6 +430,9 @@ Release 2.1.1-beta - 2013-09-23
|
||||||
HADOOP-9962. in order to avoid dependency divergence within Hadoop itself
|
HADOOP-9962. in order to avoid dependency divergence within Hadoop itself
|
||||||
lets enable DependencyConvergence. (rvs via tucu)
|
lets enable DependencyConvergence. (rvs via tucu)
|
||||||
|
|
||||||
|
HADOOP-9669. Reduce the number of byte array creations and copies in
|
||||||
|
XDR data manipulation. (Haohui Mai via brandonli)
|
||||||
|
|
||||||
OPTIMIZATIONS
|
OPTIMIZATIONS
|
||||||
|
|
||||||
BUG FIXES
|
BUG FIXES
|
||||||
|
|
|
@ -57,8 +57,7 @@ public class SimpleUdpClient {
|
||||||
clientSocket.receive(receivePacket);
|
clientSocket.receive(receivePacket);
|
||||||
|
|
||||||
// Check reply status
|
// Check reply status
|
||||||
XDR xdr = new XDR();
|
XDR xdr = new XDR(Arrays.copyOfRange(receiveData, 0,
|
||||||
xdr.writeFixedOpaque(Arrays.copyOfRange(receiveData, 0,
|
|
||||||
receivePacket.getLength()));
|
receivePacket.getLength()));
|
||||||
RpcReply reply = RpcReply.read(xdr);
|
RpcReply reply = RpcReply.read(xdr);
|
||||||
if (reply.getState() != RpcReply.ReplyState.MSG_ACCEPTED) {
|
if (reply.getState() != RpcReply.ReplyState.MSG_ACCEPTED) {
|
||||||
|
|
|
@ -43,13 +43,14 @@ public class SimpleUdpServerHandler extends SimpleChannelHandler {
|
||||||
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {
|
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {
|
||||||
ChannelBuffer buf = (ChannelBuffer) e.getMessage();
|
ChannelBuffer buf = (ChannelBuffer) e.getMessage();
|
||||||
|
|
||||||
XDR request = new XDR();
|
XDR request = new XDR(buf.array());
|
||||||
|
|
||||||
request.writeFixedOpaque(buf.array());
|
|
||||||
InetAddress remoteInetAddr = ((InetSocketAddress) e.getRemoteAddress())
|
InetAddress remoteInetAddr = ((InetSocketAddress) e.getRemoteAddress())
|
||||||
.getAddress();
|
.getAddress();
|
||||||
XDR response = rpcProgram.handle(request, remoteInetAddr, null);
|
XDR response = rpcProgram.handle(request, remoteInetAddr, null);
|
||||||
e.getChannel().write(XDR.writeMessageUdp(response), e.getRemoteAddress());
|
|
||||||
|
e.getChannel().write(XDR.writeMessageUdp(response.asReadOnlyWrap()),
|
||||||
|
e.getRemoteAddress());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -17,402 +17,253 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.oncrpc;
|
package org.apache.hadoop.oncrpc;
|
||||||
|
|
||||||
import java.io.PrintStream;
|
import java.nio.ByteBuffer;
|
||||||
import java.util.Arrays;
|
|
||||||
|
|
||||||
import org.jboss.netty.buffer.ChannelBuffer;
|
import org.jboss.netty.buffer.ChannelBuffer;
|
||||||
import org.jboss.netty.buffer.ChannelBuffers;
|
import org.jboss.netty.buffer.ChannelBuffers;
|
||||||
|
|
||||||
import com.google.common.annotations.VisibleForTesting;
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
|
import com.google.common.base.Preconditions;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Utility class for building XDR messages based on RFC 4506.
|
* Utility class for building XDR messages based on RFC 4506.
|
||||||
* <p>
|
*
|
||||||
* This class maintains a buffer into which java types are written as
|
* Key points of the format:
|
||||||
* XDR types for building XDR messages. Similarly this class can
|
*
|
||||||
* be used to get java types from an XDR request or response.
|
* <ul>
|
||||||
* <p>
|
* <li>Primitives are stored in big-endian order (i.e., the default byte order
|
||||||
* Currently only a subset of XDR types defined in RFC 4506 are supported.
|
* of ByteBuffer).</li>
|
||||||
|
* <li>Booleans are stored as an integer.</li>
|
||||||
|
* <li>Each field in the message is always aligned by 4.</li>
|
||||||
|
* </ul>
|
||||||
|
*
|
||||||
*/
|
*/
|
||||||
public class XDR {
|
public final class XDR {
|
||||||
private final static String HEXES = "0123456789abcdef";
|
private static final int DEFAULT_INITIAL_CAPACITY = 256;
|
||||||
|
private static final int SIZEOF_INT = 4;
|
||||||
|
private static final int SIZEOF_LONG = 8;
|
||||||
|
private static final byte[] PADDING_BYTES = new byte[] { 0, 0, 0, 0 };
|
||||||
|
|
||||||
/** Internal buffer for reading or writing to */
|
private ByteBuffer buf;
|
||||||
private byte[] bytearr;
|
|
||||||
|
|
||||||
/** Place to read from or write to */
|
private enum State {
|
||||||
private int cursor;
|
READING, WRITING,
|
||||||
|
|
||||||
public XDR() {
|
|
||||||
this(new byte[0]);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public XDR(byte[] data) {
|
private final State state;
|
||||||
bytearr = Arrays.copyOf(data, data.length);
|
|
||||||
cursor = 0;
|
/**
|
||||||
|
* Construct a new XDR message buffer.
|
||||||
|
*
|
||||||
|
* @param initialCapacity
|
||||||
|
* the initial capacity of the buffer.
|
||||||
|
*/
|
||||||
|
public XDR(int initialCapacity) {
|
||||||
|
this(ByteBuffer.allocate(initialCapacity), State.WRITING);
|
||||||
|
}
|
||||||
|
|
||||||
|
public XDR() {
|
||||||
|
this(DEFAULT_INITIAL_CAPACITY);
|
||||||
|
}
|
||||||
|
|
||||||
|
private XDR(ByteBuffer buf, State state) {
|
||||||
|
this.buf = buf;
|
||||||
|
this.state = state;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @param bytes bytes to be appended to internal buffer
|
* Wraps a byte array as a read-only XDR message. There's no copy involved,
|
||||||
|
* thus it is the client's responsibility to ensure that the byte array
|
||||||
|
* remains unmodified when using the XDR object.
|
||||||
|
*
|
||||||
|
* @param src
|
||||||
|
* the byte array to be wrapped.
|
||||||
*/
|
*/
|
||||||
private void append(byte[] bytesToAdd) {
|
public XDR(byte[] src) {
|
||||||
bytearr = append(bytearr, bytesToAdd);
|
this(ByteBuffer.wrap(src).asReadOnlyBuffer(), State.READING);
|
||||||
|
}
|
||||||
|
|
||||||
|
public XDR asReadOnlyWrap() {
|
||||||
|
ByteBuffer b = buf.asReadOnlyBuffer();
|
||||||
|
if (state == State.WRITING) {
|
||||||
|
b.flip();
|
||||||
|
}
|
||||||
|
|
||||||
|
XDR n = new XDR(b, State.READING);
|
||||||
|
return n;
|
||||||
}
|
}
|
||||||
|
|
||||||
public int size() {
|
public int size() {
|
||||||
return bytearr.length;
|
// TODO: This overloading intends to be compatible with the semantics of
|
||||||
|
// the previous version of the class. This function should be separated into
|
||||||
|
// two with clear semantics.
|
||||||
|
return state == State.READING ? buf.limit() : buf.position();
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Skip some bytes by moving the cursor */
|
|
||||||
public void skip(int size) {
|
|
||||||
cursor += size;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Write Java primitive integer as XDR signed integer.
|
|
||||||
*
|
|
||||||
* Definition of XDR signed integer from RFC 4506:
|
|
||||||
* <pre>
|
|
||||||
* An XDR signed integer is a 32-bit datum that encodes an integer in
|
|
||||||
* the range [-2147483648,2147483647]. The integer is represented in
|
|
||||||
* two's complement notation. The most and least significant bytes are
|
|
||||||
* 0 and 3, respectively. Integers are declared as follows:
|
|
||||||
*
|
|
||||||
* int identifier;
|
|
||||||
*
|
|
||||||
* (MSB) (LSB)
|
|
||||||
* +-------+-------+-------+-------+
|
|
||||||
* |byte 0 |byte 1 |byte 2 |byte 3 | INTEGER
|
|
||||||
* +-------+-------+-------+-------+
|
|
||||||
* <------------32 bits------------>
|
|
||||||
* </pre>
|
|
||||||
*/
|
|
||||||
public void writeInt(int data) {
|
|
||||||
append(toBytes(data));
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Read an XDR signed integer and return as Java primitive integer.
|
|
||||||
*/
|
|
||||||
public int readInt() {
|
public int readInt() {
|
||||||
byte byte0 = bytearr[cursor++];
|
Preconditions.checkState(state == State.READING);
|
||||||
byte byte1 = bytearr[cursor++];
|
return buf.getInt();
|
||||||
byte byte2 = bytearr[cursor++];
|
|
||||||
byte byte3 = bytearr[cursor++];
|
|
||||||
return (XDR.toShort(byte0) << 24) + (XDR.toShort(byte1) << 16)
|
|
||||||
+ (XDR.toShort(byte2) << 8) + XDR.toShort(byte3);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
public void writeInt(int v) {
|
||||||
* Write Java primitive boolean as an XDR boolean.
|
ensureFreeSpace(SIZEOF_INT);
|
||||||
*
|
buf.putInt(v);
|
||||||
* Definition of XDR boolean from RFC 4506:
|
|
||||||
* <pre>
|
|
||||||
* Booleans are important enough and occur frequently enough to warrant
|
|
||||||
* their own explicit type in the standard. Booleans are declared as
|
|
||||||
* follows:
|
|
||||||
*
|
|
||||||
* bool identifier;
|
|
||||||
*
|
|
||||||
* This is equivalent to:
|
|
||||||
*
|
|
||||||
* enum { FALSE = 0, TRUE = 1 } identifier;
|
|
||||||
* </pre>
|
|
||||||
*/
|
|
||||||
public void writeBoolean(boolean data) {
|
|
||||||
this.writeInt(data ? 1 : 0);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Read an XDR boolean and return as Java primitive boolean.
|
|
||||||
*/
|
|
||||||
public boolean readBoolean() {
|
public boolean readBoolean() {
|
||||||
return readInt() == 0 ? false : true;
|
Preconditions.checkState(state == State.READING);
|
||||||
|
return buf.getInt() != 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
public void writeBoolean(boolean v) {
|
||||||
* Write Java primitive long to an XDR signed long.
|
ensureFreeSpace(SIZEOF_INT);
|
||||||
*
|
buf.putInt(v ? 1 : 0);
|
||||||
* Definition of XDR signed long from RFC 4506:
|
|
||||||
* <pre>
|
|
||||||
* The standard also defines 64-bit (8-byte) numbers called hyper
|
|
||||||
* integers and unsigned hyper integers. Their representations are the
|
|
||||||
* obvious extensions of integer and unsigned integer defined above.
|
|
||||||
* They are represented in two's complement notation.The most and
|
|
||||||
* least significant bytes are 0 and 7, respectively. Their
|
|
||||||
* declarations:
|
|
||||||
*
|
|
||||||
* hyper identifier; unsigned hyper identifier;
|
|
||||||
*
|
|
||||||
* (MSB) (LSB)
|
|
||||||
* +-------+-------+-------+-------+-------+-------+-------+-------+
|
|
||||||
* |byte 0 |byte 1 |byte 2 |byte 3 |byte 4 |byte 5 |byte 6 |byte 7 |
|
|
||||||
* +-------+-------+-------+-------+-------+-------+-------+-------+
|
|
||||||
* <----------------------------64 bits---------------------------->
|
|
||||||
* HYPER INTEGER
|
|
||||||
* UNSIGNED HYPER INTEGER
|
|
||||||
* </pre>
|
|
||||||
*/
|
|
||||||
public void writeLongAsHyper(long data) {
|
|
||||||
byte byte0 = (byte) ((data & 0xff00000000000000l) >> 56);
|
|
||||||
byte byte1 = (byte) ((data & 0x00ff000000000000l) >> 48);
|
|
||||||
byte byte2 = (byte) ((data & 0x0000ff0000000000l) >> 40);
|
|
||||||
byte byte3 = (byte) ((data & 0x000000ff00000000l) >> 32);
|
|
||||||
byte byte4 = (byte) ((data & 0x00000000ff000000l) >> 24);
|
|
||||||
byte byte5 = (byte) ((data & 0x0000000000ff0000l) >> 16);
|
|
||||||
byte byte6 = (byte) ((data & 0x000000000000ff00l) >> 8);
|
|
||||||
byte byte7 = (byte) ((data & 0x00000000000000ffl));
|
|
||||||
this.append(new byte[] { byte0, byte1, byte2, byte3, byte4, byte5, byte6, byte7 });
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Read XDR signed hyper and return as java primitive long.
|
|
||||||
*/
|
|
||||||
public long readHyper() {
|
public long readHyper() {
|
||||||
byte byte0 = bytearr[cursor++];
|
Preconditions.checkState(state == State.READING);
|
||||||
byte byte1 = bytearr[cursor++];
|
return buf.getLong();
|
||||||
byte byte2 = bytearr[cursor++];
|
|
||||||
byte byte3 = bytearr[cursor++];
|
|
||||||
byte byte4 = bytearr[cursor++];
|
|
||||||
byte byte5 = bytearr[cursor++];
|
|
||||||
byte byte6 = bytearr[cursor++];
|
|
||||||
byte byte7 = bytearr[cursor++];
|
|
||||||
return ((long) XDR.toShort(byte0) << 56)
|
|
||||||
+ ((long) XDR.toShort(byte1) << 48) + ((long) XDR.toShort(byte2) << 40)
|
|
||||||
+ ((long) XDR.toShort(byte3) << 32) + ((long) XDR.toShort(byte4) << 24)
|
|
||||||
+ ((long) XDR.toShort(byte5) << 16) + ((long) XDR.toShort(byte6) << 8)
|
|
||||||
+ XDR.toShort(byte7);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
public void writeLongAsHyper(long v) {
|
||||||
* Write a Java primitive byte array to XDR fixed-length opaque data.
|
ensureFreeSpace(SIZEOF_LONG);
|
||||||
*
|
buf.putLong(v);
|
||||||
* Defintion of fixed-length opaque data from RFC 4506:
|
|
||||||
* <pre>
|
|
||||||
* At times, fixed-length uninterpreted data needs to be passed among
|
|
||||||
* machines. This data is called "opaque" and is declared as follows:
|
|
||||||
*
|
|
||||||
* opaque identifier[n];
|
|
||||||
*
|
|
||||||
* where the constant n is the (static) number of bytes necessary to
|
|
||||||
* contain the opaque data. If n is not a multiple of four, then the n
|
|
||||||
* bytes are followed by enough (0 to 3) residual zero bytes, r, to make
|
|
||||||
* the total byte count of the opaque object a multiple of four.
|
|
||||||
*
|
|
||||||
* 0 1 ...
|
|
||||||
* +--------+--------+...+--------+--------+...+--------+
|
|
||||||
* | byte 0 | byte 1 |...|byte n-1| 0 |...| 0 |
|
|
||||||
* +--------+--------+...+--------+--------+...+--------+
|
|
||||||
* |<-----------n bytes---------->|<------r bytes------>|
|
|
||||||
* |<-----------n+r (where (n+r) mod 4 = 0)------------>|
|
|
||||||
* FIXED-LENGTH OPAQUE
|
|
||||||
* </pre>
|
|
||||||
*/
|
|
||||||
public void writeFixedOpaque(byte[] data) {
|
|
||||||
writeFixedOpaque(data, data.length);
|
|
||||||
}
|
|
||||||
|
|
||||||
public void writeFixedOpaque(byte[] data, int length) {
|
|
||||||
append(Arrays.copyOf(data, length + XDR.pad(length, 4)));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public byte[] readFixedOpaque(int size) {
|
public byte[] readFixedOpaque(int size) {
|
||||||
byte[] ret = new byte[size];
|
Preconditions.checkState(state == State.READING);
|
||||||
for(int i = 0; i < size; i++) {
|
byte[] r = new byte[size];
|
||||||
ret[i] = bytearr[cursor];
|
buf.get(r);
|
||||||
cursor++;
|
alignPosition();
|
||||||
|
return r;
|
||||||
}
|
}
|
||||||
|
|
||||||
for(int i = 0; i < XDR.pad(size, 4); i++) {
|
public void writeFixedOpaque(byte[] src, int length) {
|
||||||
cursor++;
|
ensureFreeSpace(alignUp(length));
|
||||||
}
|
buf.put(src, 0, length);
|
||||||
return ret;
|
writePadding();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
public void writeFixedOpaque(byte[] src) {
|
||||||
* Write a Java primitive byte array as XDR variable-length opque data.
|
writeFixedOpaque(src, src.length);
|
||||||
*
|
|
||||||
* Definition of XDR variable-length opaque data RFC 4506:
|
|
||||||
*
|
|
||||||
* <pre>
|
|
||||||
* The standard also provides for variable-length (counted) opaque data,
|
|
||||||
* defined as a sequence of n (numbered 0 through n-1) arbitrary bytes
|
|
||||||
* to be the number n encoded as an unsigned integer (as described
|
|
||||||
* below), and followed by the n bytes of the sequence.
|
|
||||||
*
|
|
||||||
* Byte m of the sequence always precedes byte m+1 of the sequence, and
|
|
||||||
* byte 0 of the sequence always follows the sequence's length (count).
|
|
||||||
* If n is not a multiple of four, then the n bytes are followed by
|
|
||||||
* enough (0 to 3) residual zero bytes, r, to make the total byte count
|
|
||||||
* a multiple of four. Variable-length opaque data is declared in the
|
|
||||||
* following way:
|
|
||||||
*
|
|
||||||
* opaque identifier<m>;
|
|
||||||
* or
|
|
||||||
* opaque identifier<>;
|
|
||||||
*
|
|
||||||
* The constant m denotes an upper bound of the number of bytes that the
|
|
||||||
* sequence may contain. If m is not specified, as in the second
|
|
||||||
* declaration, it is assumed to be (2**32) - 1, the maximum length.
|
|
||||||
*
|
|
||||||
* The constant m would normally be found in a protocol specification.
|
|
||||||
* For example, a filing protocol may state that the maximum data
|
|
||||||
* transfer size is 8192 bytes, as follows:
|
|
||||||
*
|
|
||||||
* opaque filedata<8192>;
|
|
||||||
*
|
|
||||||
* 0 1 2 3 4 5 ...
|
|
||||||
* +-----+-----+-----+-----+-----+-----+...+-----+-----+...+-----+
|
|
||||||
* | length n |byte0|byte1|...| n-1 | 0 |...| 0 |
|
|
||||||
* +-----+-----+-----+-----+-----+-----+...+-----+-----+...+-----+
|
|
||||||
* |<-------4 bytes------->|<------n bytes------>|<---r bytes--->|
|
|
||||||
* |<----n+r (where (n+r) mod 4 = 0)---->|
|
|
||||||
* VARIABLE-LENGTH OPAQUE
|
|
||||||
*
|
|
||||||
* It is an error to encode a length greater than the maximum described
|
|
||||||
* in the specification.
|
|
||||||
* </pre>
|
|
||||||
*/
|
|
||||||
public void writeVariableOpaque(byte[] data) {
|
|
||||||
this.writeInt(data.length);
|
|
||||||
this.writeFixedOpaque(data);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public byte[] readVariableOpaque() {
|
public byte[] readVariableOpaque() {
|
||||||
int size = this.readInt();
|
Preconditions.checkState(state == State.READING);
|
||||||
return size != 0 ? this.readFixedOpaque(size) : new byte[0];
|
int size = readInt();
|
||||||
|
return readFixedOpaque(size);
|
||||||
}
|
}
|
||||||
|
|
||||||
public void skipVariableOpaque() {
|
public void writeVariableOpaque(byte[] src) {
|
||||||
int length= this.readInt();
|
ensureFreeSpace(SIZEOF_INT + alignUp(src.length));
|
||||||
this.skip(length+XDR.pad(length, 4));
|
buf.putInt(src.length);
|
||||||
}
|
writeFixedOpaque(src);
|
||||||
|
|
||||||
/**
|
|
||||||
* Write Java String as XDR string.
|
|
||||||
*
|
|
||||||
* Definition of XDR string from RFC 4506:
|
|
||||||
*
|
|
||||||
* <pre>
|
|
||||||
* The standard defines a string of n (numbered 0 through n-1) ASCII
|
|
||||||
* bytes to be the number n encoded as an unsigned integer (as described
|
|
||||||
* above), and followed by the n bytes of the string. Byte m of the
|
|
||||||
* string always precedes byte m+1 of the string, and byte 0 of the
|
|
||||||
* string always follows the string's length. If n is not a multiple of
|
|
||||||
* four, then the n bytes are followed by enough (0 to 3) residual zero
|
|
||||||
* bytes, r, to make the total byte count a multiple of four. Counted
|
|
||||||
* byte strings are declared as follows:
|
|
||||||
*
|
|
||||||
* string object<m>;
|
|
||||||
* or
|
|
||||||
* string object<>;
|
|
||||||
*
|
|
||||||
* The constant m denotes an upper bound of the number of bytes that a
|
|
||||||
* string may contain. If m is not specified, as in the second
|
|
||||||
* declaration, it is assumed to be (2**32) - 1, the maximum length.
|
|
||||||
* The constant m would normally be found in a protocol specification.
|
|
||||||
* For example, a filing protocol may state that a file name can be no
|
|
||||||
* longer than 255 bytes, as follows:
|
|
||||||
*
|
|
||||||
* string filename<255>;
|
|
||||||
*
|
|
||||||
* 0 1 2 3 4 5 ...
|
|
||||||
* +-----+-----+-----+-----+-----+-----+...+-----+-----+...+-----+
|
|
||||||
* | length n |byte0|byte1|...| n-1 | 0 |...| 0 |
|
|
||||||
* +-----+-----+-----+-----+-----+-----+...+-----+-----+...+-----+
|
|
||||||
* |<-------4 bytes------->|<------n bytes------>|<---r bytes--->|
|
|
||||||
* |<----n+r (where (n+r) mod 4 = 0)---->|
|
|
||||||
* STRING
|
|
||||||
* It is an error to encode a length greater than the maximum described
|
|
||||||
* in the specification.
|
|
||||||
* </pre>
|
|
||||||
*/
|
|
||||||
public void writeString(String data) {
|
|
||||||
this.writeVariableOpaque(data.getBytes());
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public String readString() {
|
public String readString() {
|
||||||
return new String(this.readVariableOpaque());
|
return new String(readVariableOpaque());
|
||||||
}
|
}
|
||||||
|
|
||||||
public void dump(PrintStream out) {
|
public void writeString(String s) {
|
||||||
for(int i = 0; i < bytearr.length; i += 4) {
|
writeVariableOpaque(s.getBytes());
|
||||||
out.println(hex(bytearr[i]) + " " + hex(bytearr[i + 1]) + " "
|
}
|
||||||
+ hex(bytearr[i + 2]) + " " + hex(bytearr[i + 3]));
|
|
||||||
|
private void writePadding() {
|
||||||
|
Preconditions.checkState(state == State.WRITING);
|
||||||
|
int p = pad(buf.position());
|
||||||
|
ensureFreeSpace(p);
|
||||||
|
buf.put(PADDING_BYTES, 0, p);
|
||||||
|
}
|
||||||
|
|
||||||
|
private int alignUp(int length) {
|
||||||
|
return length + pad(length);
|
||||||
|
}
|
||||||
|
|
||||||
|
private int pad(int length) {
|
||||||
|
switch (length % 4) {
|
||||||
|
case 1:
|
||||||
|
return 3;
|
||||||
|
case 2:
|
||||||
|
return 2;
|
||||||
|
case 3:
|
||||||
|
return 1;
|
||||||
|
default:
|
||||||
|
return 0;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@VisibleForTesting
|
private void alignPosition() {
|
||||||
public byte[] getBytes() {
|
buf.position(alignUp(buf.position()));
|
||||||
return Arrays.copyOf(bytearr, bytearr.length);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public static byte[] append(byte[] bytes, byte[] bytesToAdd) {
|
private void ensureFreeSpace(int size) {
|
||||||
byte[] newByteArray = new byte[bytes.length + bytesToAdd.length];
|
Preconditions.checkState(state == State.WRITING);
|
||||||
System.arraycopy(bytes, 0, newByteArray, 0, bytes.length);
|
if (buf.remaining() < size) {
|
||||||
System.arraycopy(bytesToAdd, 0, newByteArray, bytes.length, bytesToAdd.length);
|
int newCapacity = buf.capacity() * 2;
|
||||||
return newByteArray;
|
int newRemaining = buf.capacity() + buf.remaining();
|
||||||
|
|
||||||
|
while (newRemaining < size) {
|
||||||
|
newRemaining += newCapacity;
|
||||||
|
newCapacity *= 2;
|
||||||
}
|
}
|
||||||
|
|
||||||
private static int pad(int x, int y) {
|
ByteBuffer newbuf = ByteBuffer.allocate(newCapacity);
|
||||||
return x % y == 0 ? 0 : y - (x % y);
|
buf.flip();
|
||||||
|
newbuf.put(buf);
|
||||||
|
buf = newbuf;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static byte[] toBytes(int n) {
|
/** check if the rest of data has more than len bytes */
|
||||||
byte[] ret = { (byte) ((n & 0xff000000) >> 24),
|
public static boolean verifyLength(XDR xdr, int len) {
|
||||||
(byte) ((n & 0x00ff0000) >> 16), (byte) ((n & 0x0000ff00) >> 8),
|
return xdr.buf.remaining() >= len;
|
||||||
(byte) (n & 0x000000ff) };
|
|
||||||
return ret;
|
|
||||||
}
|
|
||||||
|
|
||||||
private static short toShort(byte b) {
|
|
||||||
return b < 0 ? (short) (b + 256): (short) b;
|
|
||||||
}
|
|
||||||
|
|
||||||
private static String hex(byte b) {
|
|
||||||
return "" + HEXES.charAt((b & 0xF0) >> 4) + HEXES.charAt((b & 0x0F));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private static byte[] recordMark(int size, boolean last) {
|
private static byte[] recordMark(int size, boolean last) {
|
||||||
return toBytes(!last ? size : size | 0x80000000);
|
byte[] b = new byte[SIZEOF_INT];
|
||||||
}
|
ByteBuffer buf = ByteBuffer.wrap(b);
|
||||||
|
buf.putInt(!last ? size : size | 0x80000000);
|
||||||
public static byte[] getVariableOpque(byte[] data) {
|
return b;
|
||||||
byte[] bytes = toBytes(data.length);
|
|
||||||
return append(bytes, Arrays.copyOf(data, data.length + XDR.pad(data.length, 4)));
|
|
||||||
}
|
|
||||||
|
|
||||||
public static int fragmentSize(byte[] mark) {
|
|
||||||
int n = (XDR.toShort(mark[0]) << 24) + (XDR.toShort(mark[1]) << 16)
|
|
||||||
+ (XDR.toShort(mark[2]) << 8) + XDR.toShort(mark[3]);
|
|
||||||
return n & 0x7fffffff;
|
|
||||||
}
|
|
||||||
|
|
||||||
public static boolean isLastFragment(byte[] mark) {
|
|
||||||
int n = (XDR.toShort(mark[0]) << 24) + (XDR.toShort(mark[1]) << 16)
|
|
||||||
+ (XDR.toShort(mark[2]) << 8) + XDR.toShort(mark[3]);
|
|
||||||
return (n & 0x80000000) != 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
/** check if the rest of data has more than <len> bytes */
|
|
||||||
public static boolean verifyLength(XDR xdr, int len) {
|
|
||||||
return (xdr.bytearr.length - xdr.cursor) >= len;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Write an XDR message to a TCP ChannelBuffer */
|
/** Write an XDR message to a TCP ChannelBuffer */
|
||||||
public static ChannelBuffer writeMessageTcp(XDR request, boolean last) {
|
public static ChannelBuffer writeMessageTcp(XDR request, boolean last) {
|
||||||
byte[] fragmentHeader = XDR.recordMark(request.bytearr.length, last);
|
Preconditions.checkState(request.state == XDR.State.WRITING);
|
||||||
ChannelBuffer outBuf = ChannelBuffers.buffer(fragmentHeader.length
|
ByteBuffer b = request.buf.duplicate();
|
||||||
+ request.bytearr.length);
|
b.flip();
|
||||||
outBuf.writeBytes(fragmentHeader);
|
byte[] fragmentHeader = XDR.recordMark(b.limit(), last);
|
||||||
outBuf.writeBytes(request.bytearr);
|
ByteBuffer headerBuf = ByteBuffer.wrap(fragmentHeader);
|
||||||
return outBuf;
|
|
||||||
|
// TODO: Investigate whether making a copy of the buffer is necessary.
|
||||||
|
return ChannelBuffers.copiedBuffer(headerBuf, b);
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Write an XDR message to a UDP ChannelBuffer */
|
/** Write an XDR message to a UDP ChannelBuffer */
|
||||||
public static ChannelBuffer writeMessageUdp(XDR response) {
|
public static ChannelBuffer writeMessageUdp(XDR response) {
|
||||||
ChannelBuffer outBuf = ChannelBuffers.buffer(response.bytearr.length);
|
Preconditions.checkState(response.state == XDR.State.READING);
|
||||||
outBuf.writeBytes(response.bytearr);
|
// TODO: Investigate whether making a copy of the buffer is necessary.
|
||||||
return outBuf;
|
return ChannelBuffers.copiedBuffer(response.buf);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static int fragmentSize(byte[] mark) {
|
||||||
|
ByteBuffer b = ByteBuffer.wrap(mark);
|
||||||
|
int n = b.getInt();
|
||||||
|
return n & 0x7fffffff;
|
||||||
|
}
|
||||||
|
|
||||||
|
public static boolean isLastFragment(byte[] mark) {
|
||||||
|
ByteBuffer b = ByteBuffer.wrap(mark);
|
||||||
|
int n = b.getInt();
|
||||||
|
return (n & 0x80000000) != 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
public byte[] getBytes() {
|
||||||
|
ByteBuffer d = buf.duplicate();
|
||||||
|
byte[] b = new byte[d.position()];
|
||||||
|
d.flip();
|
||||||
|
d.get(b);
|
||||||
|
|
||||||
|
return b;
|
||||||
}
|
}
|
||||||
}
|
}
|
|
@ -39,7 +39,7 @@ public class TestNfsTime {
|
||||||
t1.serialize(xdr);
|
t1.serialize(xdr);
|
||||||
|
|
||||||
// Deserialize it back
|
// Deserialize it back
|
||||||
NfsTime t2 = NfsTime.deserialize(xdr);
|
NfsTime t2 = NfsTime.deserialize(xdr.asReadOnlyWrap());
|
||||||
|
|
||||||
// Ensure the NfsTimes are equal
|
// Ensure the NfsTimes are equal
|
||||||
Assert.assertEquals(t1, t2);
|
Assert.assertEquals(t1, t2);
|
||||||
|
|
|
@ -33,7 +33,7 @@ public class TestFileHandle {
|
||||||
|
|
||||||
// Deserialize it back
|
// Deserialize it back
|
||||||
FileHandle handle2 = new FileHandle();
|
FileHandle handle2 = new FileHandle();
|
||||||
handle2.deserialize(xdr);
|
handle2.deserialize(xdr.asReadOnlyWrap());
|
||||||
Assert.assertEquals(handle.getFileId(), 1024);
|
Assert.assertEquals(handle.getFileId(), 1024);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -17,23 +17,35 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.oncrpc;
|
package org.apache.hadoop.oncrpc;
|
||||||
|
|
||||||
import static org.junit.Assert.assertTrue;
|
|
||||||
|
|
||||||
import java.util.Arrays;
|
|
||||||
|
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
/**
|
import junit.framework.Assert;
|
||||||
* Tests for {@link XDR}
|
|
||||||
*/
|
|
||||||
public class TestXDR {
|
public class TestXDR {
|
||||||
/**
|
private void serializeInt(int times) {
|
||||||
* Test {@link XDR#append(byte[], byte[])}
|
XDR w = new XDR();
|
||||||
*/
|
for (int i = 0; i < times; ++i)
|
||||||
|
w.writeInt(23);
|
||||||
|
|
||||||
|
XDR r = w.asReadOnlyWrap();
|
||||||
|
for (int i = 0; i < times; ++i)
|
||||||
|
Assert.assertEquals(r.readInt(), 23);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void serializeLong(int times) {
|
||||||
|
XDR w = new XDR();
|
||||||
|
for (int i = 0; i < times; ++i)
|
||||||
|
w.writeLongAsHyper(23);
|
||||||
|
|
||||||
|
XDR r = w.asReadOnlyWrap();
|
||||||
|
for (int i = 0; i < times; ++i)
|
||||||
|
Assert.assertEquals(r.readHyper(), 23);
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testAppendBytes() {
|
public void testPerformance() {
|
||||||
byte[] arr1 = new byte[] {0, 1};
|
final int TEST_TIMES = 8 << 20;
|
||||||
byte[] arr2 = new byte[] {2, 3};
|
serializeInt(TEST_TIMES);
|
||||||
assertTrue(Arrays.equals(new byte[]{0, 1, 2, 3}, XDR.append(arr1, arr2)));
|
serializeLong(TEST_TIMES);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -38,7 +38,7 @@ public class TestCredentialsSys {
|
||||||
credential.write(xdr);
|
credential.write(xdr);
|
||||||
|
|
||||||
CredentialsSys newCredential = new CredentialsSys();
|
CredentialsSys newCredential = new CredentialsSys();
|
||||||
newCredential.read(xdr);
|
newCredential.read(xdr.asReadOnlyWrap());
|
||||||
|
|
||||||
assertEquals(0, newCredential.getUID());
|
assertEquals(0, newCredential.getUID());
|
||||||
assertEquals(1, newCredential.getGID());
|
assertEquals(1, newCredential.getGID());
|
||||||
|
|
Loading…
Reference in New Issue