HADOOP-10694. Remove synchronized input streams from Writable deserialization. Contributed by Gopal V and Rajesh Balamohan.

This commit is contained in:
Tsuyoshi Ozawa 2016-05-10 17:27:10 -07:00
parent 27242f211e
commit 6e56578031
1 changed files with 74 additions and 7 deletions

View File

@ -18,13 +18,14 @@
package org.apache.hadoop.io; package org.apache.hadoop.io;
import java.io.*;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.InterfaceStability;
/** A reusable {@link DataInput} implementation that reads from an in-memory import java.io.ByteArrayInputStream;
* buffer. import java.io.DataInputStream;
/** A reusable {@link java.io.DataInput} implementation
* that reads from an in-memory buffer.
* *
* <p>This saves memory over creating a new DataInputStream and * <p>This saves memory over creating a new DataInputStream and
* ByteArrayInputStream each time data is read. * ByteArrayInputStream each time data is read.
@ -56,9 +57,75 @@ public void reset(byte[] input, int start, int length) {
this.pos = start; this.pos = start;
} }
public byte[] getData() { return buf; } public byte[] getData() {
public int getPosition() { return pos; } return buf;
public int getLength() { return count; } }
public int getPosition() {
return pos;
}
public int getLength() {
return count;
}
/* functions below comes verbatim from
hive.common.io.NonSyncByteArrayInputStream */
/**
* {@inheritDoc}
*/
@Override
public int read() {
return (pos < count) ? (buf[pos++] & 0xff) : -1;
}
/**
* {@inheritDoc}
*/
@Override
public int read(byte[] b, int off, int len) {
if (b == null) {
throw new NullPointerException();
} else if (off < 0 || len < 0 || len > b.length - off) {
throw new IndexOutOfBoundsException();
}
if (pos >= count) {
return -1;
}
if (pos + len > count) {
len = count - pos;
}
if (len <= 0) {
return 0;
}
System.arraycopy(buf, pos, b, off, len);
pos += len;
return len;
}
/**
* {@inheritDoc}
*/
@Override
public long skip(long n) {
if (pos + n > count) {
n = count - pos;
}
if (n < 0) {
return 0;
}
pos += n;
return n;
}
/**
* {@inheritDoc}
*/
@Override
public int available() {
return count - pos;
}
} }
private Buffer buffer; private Buffer buffer;