diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt index 930ec0b7ac9..f08e20a2d94 100644 --- a/hadoop-common-project/hadoop-common/CHANGES.txt +++ b/hadoop-common-project/hadoop-common/CHANGES.txt @@ -467,6 +467,9 @@ Release 2.0.3-alpha - Unreleased HADOOP-9231. Parametrize staging URL for the uniformity of distributionManagement. (Konstantin Boudnik via suresh) + HADOOP-9276. Allow BoundedByteArrayOutputStream to be resettable. + (Arun Murthy via hitesh) + OPTIMIZATIONS HADOOP-8866. SampleQuantiles#query is O(N^2) instead of O(N). (Andrew Wang diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/BoundedByteArrayOutputStream.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/BoundedByteArrayOutputStream.java index 60836ab26a3..c27449d3618 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/BoundedByteArrayOutputStream.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/BoundedByteArrayOutputStream.java @@ -32,9 +32,10 @@ import org.apache.hadoop.classification.InterfaceStability; @InterfaceAudience.LimitedPrivate({"HDFS", "MapReduce"}) @InterfaceStability.Unstable public class BoundedByteArrayOutputStream extends OutputStream { - private final byte[] buffer; + private byte[] buffer; + private int startOffset; private int limit; - private int count; + private int currentPointer; /** * Create a BoundedByteArrayOutputStream with the specified @@ -52,20 +53,30 @@ public class BoundedByteArrayOutputStream extends OutputStream { * @param limit The maximum limit upto which data can be written */ public BoundedByteArrayOutputStream(int capacity, int limit) { + this(new byte[capacity], 0, limit); + } + + protected BoundedByteArrayOutputStream(byte[] buf, int offset, int limit) { + resetBuffer(buf, offset, limit); + } + + protected void resetBuffer(byte[] buf, int offset, int limit) { + int capacity = buf.length - offset; if ((capacity < limit) || (capacity | limit) < 0) { throw new IllegalArgumentException("Invalid capacity/limit"); } - this.buffer = new byte[capacity]; - this.limit = limit; - this.count = 0; + this.buffer = buf; + this.startOffset = offset; + this.currentPointer = offset; + this.limit = offset + limit; } - + @Override public void write(int b) throws IOException { - if (count >= limit) { + if (currentPointer >= limit) { throw new EOFException("Reaching the limit of the buffer."); } - buffer[count++] = (byte) b; + buffer[currentPointer++] = (byte) b; } @Override @@ -77,12 +88,12 @@ public class BoundedByteArrayOutputStream extends OutputStream { return; } - if (count + len > limit) { + if (currentPointer + len > limit) { throw new EOFException("Reach the limit of the buffer"); } - System.arraycopy(b, off, buffer, count, len); - count += len; + System.arraycopy(b, off, buffer, currentPointer, len); + currentPointer += len; } /** @@ -90,17 +101,17 @@ public class BoundedByteArrayOutputStream extends OutputStream { * @param newlim New Limit */ public void reset(int newlim) { - if (newlim > buffer.length) { + if (newlim > (buffer.length - startOffset)) { throw new IndexOutOfBoundsException("Limit exceeds buffer size"); } this.limit = newlim; - this.count = 0; + this.currentPointer = startOffset; } /** Reset the buffer */ public void reset() { - this.limit = buffer.length; - this.count = 0; + this.limit = buffer.length - startOffset; + this.currentPointer = startOffset; } /** Return the current limit */ @@ -119,6 +130,10 @@ public class BoundedByteArrayOutputStream extends OutputStream { * currently in the buffer. */ public int size() { - return count; + return currentPointer - startOffset; + } + + public int available() { + return limit - currentPointer; } } diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/TestBoundedByteArrayOutputStream.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/TestBoundedByteArrayOutputStream.java index a00d38bfa63..44215278ca6 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/TestBoundedByteArrayOutputStream.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/TestBoundedByteArrayOutputStream.java @@ -88,4 +88,61 @@ public class TestBoundedByteArrayOutputStream extends TestCase { assertTrue("Writing beyond limit did not throw an exception", caughtException); } + + + static class ResettableBoundedByteArrayOutputStream + extends BoundedByteArrayOutputStream { + + public ResettableBoundedByteArrayOutputStream(int capacity) { + super(capacity); + } + + public void resetBuffer(byte[] buf, int offset, int length) { + super.resetBuffer(buf, offset, length); + } + + } + + public void testResetBuffer() throws IOException { + + ResettableBoundedByteArrayOutputStream stream = + new ResettableBoundedByteArrayOutputStream(SIZE); + + // Write to the stream, get the data back and check for contents + stream.write(INPUT, 0, SIZE); + assertTrue("Array Contents Mismatch", + Arrays.equals(INPUT, stream.getBuffer())); + + // Try writing beyond end of buffer. Should throw an exception + boolean caughtException = false; + + try { + stream.write(INPUT[0]); + } catch (Exception e) { + caughtException = true; + } + + assertTrue("Writing beyond limit did not throw an exception", + caughtException); + + //Reset the stream and try, should succeed + byte[] newBuf = new byte[SIZE]; + stream.resetBuffer(newBuf, 0, newBuf.length); + assertTrue("Limit did not get reset correctly", + (stream.getLimit() == SIZE)); + stream.write(INPUT, 0, SIZE); + assertTrue("Array Contents Mismatch", + Arrays.equals(INPUT, stream.getBuffer())); + + // Try writing one more byte, should fail + caughtException = false; + try { + stream.write(INPUT[0]); + } catch (Exception e) { + caughtException = true; + } + assertTrue("Writing beyond limit did not throw an exception", + caughtException); + } + }