HADOOP-6226. Moves BoundedByteArrayOutputStream from the tfile package to the io package and makes it available to other users (MAPREDUCE-318). Contributed by Jothi Padmanabhan.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@810451 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
9ea11c5fb3
commit
a3c52b93ea
|
@ -162,6 +162,10 @@ Trunk (unreleased changes)
|
||||||
HADOOP-6120. Add support for Avro specific and reflect data.
|
HADOOP-6120. Add support for Avro specific and reflect data.
|
||||||
(sharad via cutting)
|
(sharad via cutting)
|
||||||
|
|
||||||
|
HADOOP-6226. Moves BoundedByteArrayOutputStream from the tfile package to
|
||||||
|
the io package and makes it available to other users (MAPREDUCE-318).
|
||||||
|
(Jothi Padmanabhan via ddas)
|
||||||
|
|
||||||
IMPROVEMENTS
|
IMPROVEMENTS
|
||||||
|
|
||||||
HADOOP-4565. Added CombineFileInputFormat to use data locality information
|
HADOOP-4565. Added CombineFileInputFormat to use data locality information
|
||||||
|
|
|
@ -15,7 +15,7 @@
|
||||||
* the License.
|
* the License.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
package org.apache.hadoop.io.file.tfile;
|
package org.apache.hadoop.io;
|
||||||
|
|
||||||
import java.io.EOFException;
|
import java.io.EOFException;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
@ -26,15 +26,26 @@ import java.io.OutputStream;
|
||||||
* than the buffer capacity. The object can be reused through <code>reset</code>
|
* than the buffer capacity. The object can be reused through <code>reset</code>
|
||||||
* API and choose different limits in each round.
|
* API and choose different limits in each round.
|
||||||
*/
|
*/
|
||||||
class BoundedByteArrayOutputStream extends OutputStream {
|
public class BoundedByteArrayOutputStream extends OutputStream {
|
||||||
private final byte[] buffer;
|
private final byte[] buffer;
|
||||||
private int limit;
|
private int limit;
|
||||||
private int count;
|
private int count;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Create a BoundedByteArrayOutputStream with the specified
|
||||||
|
* capacity
|
||||||
|
* @param capacity The capacity of the underlying byte array
|
||||||
|
*/
|
||||||
public BoundedByteArrayOutputStream(int capacity) {
|
public BoundedByteArrayOutputStream(int capacity) {
|
||||||
this(capacity, capacity);
|
this(capacity, capacity);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Create a BoundedByteArrayOutputStream with the specified
|
||||||
|
* capacity and limit.
|
||||||
|
* @param capacity The capacity of the underlying byte array
|
||||||
|
* @param limit The maximum limit upto which data can be written
|
||||||
|
*/
|
||||||
public BoundedByteArrayOutputStream(int capacity, int limit) {
|
public BoundedByteArrayOutputStream(int capacity, int limit) {
|
||||||
if ((capacity < limit) || (capacity | limit) < 0) {
|
if ((capacity < limit) || (capacity | limit) < 0) {
|
||||||
throw new IllegalArgumentException("Invalid capacity/limit");
|
throw new IllegalArgumentException("Invalid capacity/limit");
|
||||||
|
@ -69,6 +80,10 @@ class BoundedByteArrayOutputStream extends OutputStream {
|
||||||
count += len;
|
count += len;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Reset the limit
|
||||||
|
* @param newlim New Limit
|
||||||
|
*/
|
||||||
public void reset(int newlim) {
|
public void reset(int newlim) {
|
||||||
if (newlim > buffer.length) {
|
if (newlim > buffer.length) {
|
||||||
throw new IndexOutOfBoundsException("Limit exceeds buffer size");
|
throw new IndexOutOfBoundsException("Limit exceeds buffer size");
|
||||||
|
@ -77,19 +92,27 @@ class BoundedByteArrayOutputStream extends OutputStream {
|
||||||
this.count = 0;
|
this.count = 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/** Reset the buffer */
|
||||||
public void reset() {
|
public void reset() {
|
||||||
this.limit = buffer.length;
|
this.limit = buffer.length;
|
||||||
this.count = 0;
|
this.count = 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/** Return the current limit */
|
||||||
public int getLimit() {
|
public int getLimit() {
|
||||||
return limit;
|
return limit;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/** Returns the underlying buffer.
|
||||||
|
* Data is only valid to {@link #size()}.
|
||||||
|
*/
|
||||||
public byte[] getBuffer() {
|
public byte[] getBuffer() {
|
||||||
return buffer;
|
return buffer;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/** Returns the length of the valid data
|
||||||
|
* currently in the buffer.
|
||||||
|
*/
|
||||||
public int size() {
|
public int size() {
|
||||||
return count;
|
return count;
|
||||||
}
|
}
|
|
@ -34,6 +34,7 @@ import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.FSDataInputStream;
|
import org.apache.hadoop.fs.FSDataInputStream;
|
||||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||||
|
import org.apache.hadoop.io.BoundedByteArrayOutputStream;
|
||||||
import org.apache.hadoop.io.BytesWritable;
|
import org.apache.hadoop.io.BytesWritable;
|
||||||
import org.apache.hadoop.io.DataInputBuffer;
|
import org.apache.hadoop.io.DataInputBuffer;
|
||||||
import org.apache.hadoop.io.DataOutputBuffer;
|
import org.apache.hadoop.io.DataOutputBuffer;
|
||||||
|
|
|
@ -0,0 +1,91 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.io;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
|
||||||
|
import junit.framework.TestCase;
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.Random;
|
||||||
|
|
||||||
|
|
||||||
|
/** Unit tests for BoundedByteArrayOutputStream */
|
||||||
|
public class TestBoundedByteArrayOutputStream extends TestCase {
|
||||||
|
|
||||||
|
private static final int SIZE = 1024;
|
||||||
|
private static final byte[] INPUT = new byte[SIZE];
|
||||||
|
static {
|
||||||
|
new Random().nextBytes(INPUT);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testBoundedStream() throws IOException {
|
||||||
|
|
||||||
|
BoundedByteArrayOutputStream stream =
|
||||||
|
new BoundedByteArrayOutputStream(SIZE);
|
||||||
|
|
||||||
|
// Write to the stream, get the data back and check for contents
|
||||||
|
stream.write(INPUT, 0, SIZE);
|
||||||
|
assertTrue("Array Contents Mismatch",
|
||||||
|
Arrays.equals(INPUT, stream.getBuffer()));
|
||||||
|
|
||||||
|
// Try writing beyond end of buffer. Should throw an exception
|
||||||
|
boolean caughtException = false;
|
||||||
|
|
||||||
|
try {
|
||||||
|
stream.write(INPUT[0]);
|
||||||
|
} catch (Exception e) {
|
||||||
|
caughtException = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
assertTrue("Writing beyond limit did not throw an exception",
|
||||||
|
caughtException);
|
||||||
|
|
||||||
|
//Reset the stream and try, should succeed
|
||||||
|
stream.reset();
|
||||||
|
assertTrue("Limit did not get reset correctly",
|
||||||
|
(stream.getLimit() == SIZE));
|
||||||
|
stream.write(INPUT, 0, SIZE);
|
||||||
|
assertTrue("Array Contents Mismatch",
|
||||||
|
Arrays.equals(INPUT, stream.getBuffer()));
|
||||||
|
|
||||||
|
// Try writing one more byte, should fail
|
||||||
|
caughtException = false;
|
||||||
|
try {
|
||||||
|
stream.write(INPUT[0]);
|
||||||
|
} catch (Exception e) {
|
||||||
|
caughtException = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Reset the stream, but set a lower limit. Writing beyond
|
||||||
|
// the limit should throw an exception
|
||||||
|
stream.reset(SIZE - 1);
|
||||||
|
assertTrue("Limit did not get reset correctly",
|
||||||
|
(stream.getLimit() == SIZE -1));
|
||||||
|
caughtException = false;
|
||||||
|
|
||||||
|
try {
|
||||||
|
stream.write(INPUT, 0, SIZE);
|
||||||
|
} catch (Exception e) {
|
||||||
|
caughtException = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
assertTrue("Writing beyond limit did not throw an exception",
|
||||||
|
caughtException);
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue