NIFI-3071: Deprecated InputStreams & OutputStream sin org.apache.nifi.stream.io package in favor of using their Java counterparts

This commit is contained in:
Mark Payne 2016-12-04 11:43:41 -05:00
parent bfe91db857
commit 8f8b8cdf46
10 changed files with 59 additions and 1269 deletions

View File

@ -16,445 +16,19 @@
*/
package org.apache.nifi.stream.io;
import java.io.IOException;
import java.io.InputStream;
/**
* This class is a slight modification of the BufferedInputStream in the java.io package. The modification is that this implementation does not provide synchronization on method calls, which means
* that this class is not suitable for use by multiple threads. However, the absence of these synchronized blocks results in potentially much better performance.
* @deprecated use java.io.BufferedInputStream instead
*/
public class BufferedInputStream extends InputStream {
@Deprecated
public class BufferedInputStream extends java.io.BufferedInputStream {
private final InputStream in;
private static int DEFAULT_BUFFER_SIZE = 8192;
/**
* The maximum size of array to allocate.
* Some VMs reserve some header words in an array.
* Attempts to allocate larger arrays may result in
* OutOfMemoryError: Requested array size exceeds VM limit
*/
private static int MAX_BUFFER_SIZE = Integer.MAX_VALUE - 8;
/**
* The internal buffer array where the data is stored. When necessary,
* it may be replaced by another array of
* a different size.
*/
protected byte buf[];
/**
* The index one greater than the index of the last valid byte in
* the buffer.
* This value is always
* in the range <code>0</code> through <code>buf.length</code>;
* elements <code>buf[0]</code> through <code>buf[count-1]
* </code>contain buffered input data obtained
* from the underlying input stream.
*/
private int count;
/**
* The current position in the buffer. This is the index of the next
* character to be read from the <code>buf</code> array.
* <p>
* This value is always in the range <code>0</code>
* through <code>count</code>. If it is less
* than <code>count</code>, then <code>buf[pos]</code>
* is the next byte to be supplied as input;
* if it is equal to <code>count</code>, then
* the next <code>read</code> or <code>skip</code>
* operation will require more bytes to be
* read from the contained input stream.
*
* @see java.io.BufferedInputStream#buf
*/
private int pos;
/**
* The value of the <code>pos</code> field at the time the last
* <code>mark</code> method was called.
* <p>
* This value is always
* in the range <code>-1</code> through <code>pos</code>.
* If there is no marked position in the input
* stream, this field is <code>-1</code>. If
* there is a marked position in the input
* stream, then <code>buf[markpos]</code>
* is the first byte to be supplied as input
* after a <code>reset</code> operation. If
* <code>markpos</code> is not <code>-1</code>,
* then all bytes from positions <code>buf[markpos]</code>
* through <code>buf[pos-1]</code> must remain
* in the buffer array (though they may be
* moved to another place in the buffer array,
* with suitable adjustments to the values
* of <code>count</code>, <code>pos</code>,
* and <code>markpos</code>); they may not
* be discarded unless and until the difference
* between <code>pos</code> and <code>markpos</code>
* exceeds <code>marklimit</code>.
*
* @see java.io.BufferedInputStream#mark(int)
* @see java.io.BufferedInputStream#pos
*/
protected int markpos = -1;
/**
* The maximum read ahead allowed after a call to the
* <code>mark</code> method before subsequent calls to the
* <code>reset</code> method fail.
* Whenever the difference between <code>pos</code>
* and <code>markpos</code> exceeds <code>marklimit</code>,
* then the mark may be dropped by setting
* <code>markpos</code> to <code>-1</code>.
*
* @see java.io.BufferedInputStream#mark(int)
* @see java.io.BufferedInputStream#reset()
*/
protected int marklimit;
/**
* Check to make sure that underlying input stream has not been
* nulled out due to close; if not return it;
*/
private InputStream getInIfOpen() throws IOException {
InputStream input = in;
if (input == null) {
throw new IOException("Stream closed");
}
return input;
}
/**
* Check to make sure that buffer has not been nulled out due to
* close; if not return it;
*/
private byte[] getBufIfOpen() throws IOException {
if (buf == null) {
throw new IOException("Stream closed");
}
return buf;
}
/**
* Creates a <code>BufferedInputStream</code>
* and saves its argument, the input stream
* <code>in</code>, for later use. An internal
* buffer array is created and stored in <code>buf</code>.
*
* @param in the underlying input stream.
*/
public BufferedInputStream(InputStream in) {
this(in, DEFAULT_BUFFER_SIZE);
super(in);
}
/**
* Creates a <code>BufferedInputStream</code>
* with the specified buffer size,
* and saves its argument, the input stream
* <code>in</code>, for later use. An internal
* buffer array of length <code>size</code>
* is created and stored in <code>buf</code>.
*
* @param in the underlying input stream.
* @param size the buffer size.
* @exception IllegalArgumentException if {@code size <= 0}.
*/
public BufferedInputStream(InputStream in, int size) {
this.in = in;
if (size <= 0) {
throw new IllegalArgumentException("Buffer size <= 0");
}
buf = new byte[size];
}
/**
* Fills the buffer with more data, taking into account
* shuffling and other tricks for dealing with marks.
* Assumes that it is being called by a synchronized method.
* This method also assumes that all data has already been read in,
* hence pos > count.
*/
private void fill() throws IOException {
byte[] buffer = getBufIfOpen();
if (markpos < 0) {
pos = 0; /* no mark: throw away the buffer */
} else if (pos >= buffer.length) {
if (markpos > 0) { /* can throw away early part of the buffer */
int sz = pos - markpos;
System.arraycopy(buffer, markpos, buffer, 0, sz);
pos = sz;
markpos = 0;
} else if (buffer.length >= marklimit) {
markpos = -1; /* buffer got too big, invalidate mark */
pos = 0; /* drop buffer contents */
} else if (buffer.length >= MAX_BUFFER_SIZE) {
throw new OutOfMemoryError("Required array size too large");
} else { /* grow buffer */
int nsz = (pos <= MAX_BUFFER_SIZE - pos) ? pos * 2 : MAX_BUFFER_SIZE;
if (nsz > marklimit) {
nsz = marklimit;
}
byte nbuf[] = new byte[nsz];
System.arraycopy(buffer, 0, nbuf, 0, pos);
buffer = nbuf;
}
}
count = pos;
int n = getInIfOpen().read(buffer, pos, buffer.length - pos);
if (n > 0) {
count = n + pos;
}
}
/**
* See
* the general contract of the <code>read</code>
* method of <code>InputStream</code>.
*
* @return the next byte of data, or <code>-1</code> if the end of the
* stream is reached.
* @exception IOException if this input stream has been closed by
* invoking its {@link #close()} method,
* or an I/O error occurs.
* @see java.io.FilterInputStream#in
*/
@Override
public int read() throws IOException {
if (pos >= count) {
fill();
if (pos >= count) {
return -1;
}
}
return getBufIfOpen()[pos++] & 0xff;
}
/**
* Read characters into a portion of an array, reading from the underlying
* stream at most once if necessary.
*/
private int read1(byte[] b, int off, int len) throws IOException {
int avail = count - pos;
if (avail <= 0) {
/*
* If the requested length is at least as large as the buffer, and
* if there is no mark/reset activity, do not bother to copy the
* bytes into the local buffer. In this way buffered streams will
* cascade harmlessly.
*/
if (len >= getBufIfOpen().length && markpos < 0) {
return getInIfOpen().read(b, off, len);
}
fill();
avail = count - pos;
if (avail <= 0) {
return -1;
}
}
int cnt = (avail < len) ? avail : len;
System.arraycopy(getBufIfOpen(), pos, b, off, cnt);
pos += cnt;
return cnt;
}
/**
* Reads bytes from this byte-input stream into the specified byte array,
* starting at the given offset.
*
* <p>
* This method implements the general contract of the corresponding
* <code>{@link InputStream#read(byte[], int, int) read}</code> method of
* the <code>{@link InputStream}</code> class. As an additional
* convenience, it attempts to read as many bytes as possible by repeatedly
* invoking the <code>read</code> method of the underlying stream. This
* iterated <code>read</code> continues until one of the following
* conditions becomes true:
* <ul>
*
* <li>The specified number of bytes have been read,
*
* <li>The <code>read</code> method of the underlying stream returns
* <code>-1</code>, indicating end-of-file, or
*
* <li>The <code>available</code> method of the underlying stream
* returns zero, indicating that further input requests would block.
*
* </ul>
* If the first <code>read</code> on the underlying stream returns
* <code>-1</code> to indicate end-of-file then this method returns
* <code>-1</code>. Otherwise this method returns the number of bytes
* actually read.
*
* <p>
* Subclasses of this class are encouraged, but not required, to
* attempt to read as many bytes as possible in the same fashion.
*
* @param b destination buffer.
* @param off offset at which to start storing bytes.
* @param len maximum number of bytes to read.
* @return the number of bytes read, or <code>-1</code> if the end of
* the stream has been reached.
* @exception IOException if this input stream has been closed by
* invoking its {@link #close()} method,
* or an I/O error occurs.
*/
@Override
public int read(byte b[], int off, int len)
throws IOException {
getBufIfOpen(); // Check for closed stream
if ((off | len | (off + len) | (b.length - (off + len))) < 0) {
throw new IndexOutOfBoundsException();
} else if (len == 0) {
return 0;
}
int n = 0;
for (;;) {
int nread = read1(b, off + n, len - n);
if (nread <= 0) {
return (n == 0) ? nread : n;
}
n += nread;
if (n >= len) {
return n;
}
// if not closed but no bytes available, return
InputStream input = in;
if (input != null && input.available() <= 0) {
return n;
}
}
}
/**
* See the general contract of the <code>skip</code>
* method of <code>InputStream</code>.
*
* @exception IOException if the stream does not support seek,
* or if this input stream has been closed by
* invoking its {@link #close()} method, or an
* I/O error occurs.
*/
@Override
public long skip(long n) throws IOException {
getBufIfOpen(); // Check for closed stream
if (n <= 0) {
return 0;
}
long avail = count - pos;
if (avail <= 0) {
// If no mark position set then don't keep in buffer
if (markpos < 0) {
return getInIfOpen().skip(n);
}
// Fill in buffer to save bytes for reset
fill();
avail = count - pos;
if (avail <= 0) {
return 0;
}
}
long skipped = (avail < n) ? avail : n;
pos += skipped;
return skipped;
}
/**
* Returns an estimate of the number of bytes that can be read (or
* skipped over) from this input stream without blocking by the next
* invocation of a method for this input stream. The next invocation might be
* the same thread or another thread. A single read or skip of this
* many bytes will not block, but may read or skip fewer bytes.
* <p>
* This method returns the sum of the number of bytes remaining to be read in
* the buffer (<code>count&nbsp;- pos</code>) and the result of calling the
* {@link java.io.FilterInputStream#in in}.available().
*
* @return an estimate of the number of bytes that can be read (or skipped
* over) from this input stream without blocking.
* @exception IOException if this input stream has been closed by
* invoking its {@link #close()} method,
* or an I/O error occurs.
*/
@Override
public int available() throws IOException {
int n = count - pos;
int avail = getInIfOpen().available();
return n > (Integer.MAX_VALUE - avail) ? Integer.MAX_VALUE : n + avail;
}
/**
* See the general contract of the <code>mark</code>
* method of <code>InputStream</code>.
*
* @param readlimit the maximum limit of bytes that can be read before
* the mark position becomes invalid.
* @see java.io.BufferedInputStream#reset()
*/
@Override
public void mark(int readlimit) {
marklimit = readlimit;
markpos = pos;
}
/**
* See the general contract of the <code>reset</code>
* method of <code>InputStream</code>.
* <p>
* If <code>markpos</code> is <code>-1</code>
* (no mark has been set or the mark has been
* invalidated), an <code>IOException</code>
* is thrown. Otherwise, <code>pos</code> is
* set equal to <code>markpos</code>.
*
* @exception IOException if this stream has not been marked or,
* if the mark has been invalidated, or the stream
* has been closed by invoking its {@link #close()}
* method, or an I/O error occurs.
* @see java.io.BufferedInputStream#mark(int)
*/
@Override
public void reset() throws IOException {
getBufIfOpen(); // Cause exception if closed
if (markpos < 0) {
throw new IOException("Resetting to invalid mark");
}
pos = markpos;
}
/**
* Tests if this input stream supports the <code>mark</code>
* and <code>reset</code> methods. The <code>markSupported</code>
* method of <code>BufferedInputStream</code> returns
* <code>true</code>.
*
* @return a <code>boolean</code> indicating if this stream type supports
* the <code>mark</code> and <code>reset</code> methods.
* @see java.io.InputStream#mark(int)
* @see java.io.InputStream#reset()
*/
@Override
public boolean markSupported() {
return true;
}
/**
* Closes this input stream and releases any system resources
* associated with the stream.
* Once the stream has been closed, further read(), available(), reset(),
* or skip() invocations will throw an IOException.
* Closing a previously closed stream has no effect.
*
* @exception IOException if an I/O error occurs.
*/
@Override
public void close() throws IOException {
this.in.close();
public BufferedInputStream(InputStream in, int bufferSize) {
super(in, bufferSize);
}
}

View File

@ -16,114 +16,19 @@
*/
package org.apache.nifi.stream.io;
import java.io.FilterOutputStream;
import java.io.IOException;
import java.io.OutputStream;
/**
* This class is a slight modification of the {@link java.io.BufferedOutputStream} class. This implementation differs in that it does not mark methods as synchronized. This means that this class is
* not suitable for writing by multiple concurrent threads. However, the removal of the synchronized keyword results in potentially much better performance.
* @deprecated use java.io.BufferedOutputStream instead
*/
public class BufferedOutputStream extends FilterOutputStream {
@Deprecated
public class BufferedOutputStream extends java.io.BufferedOutputStream {
/**
* The internal buffer where data is stored.
*/
protected byte buf[];
/**
* The number of valid bytes in the buffer. This value is always in the range <tt>0</tt> through <tt>buf.length</tt>; elements
* <tt>buf[0]</tt> through <tt>buf[count-1]</tt> contain valid byte data.
*/
protected int count;
/**
* Creates a new buffered output stream to write data to the specified underlying output stream.
*
* @param out the underlying output stream.
*/
public BufferedOutputStream(OutputStream out) {
this(out, 8192);
}
/**
* Creates a new buffered output stream to write data to the specified underlying output stream with the specified buffer size.
*
* @param out the underlying output stream.
* @param size the buffer size.
* @exception IllegalArgumentException if size &lt;= 0.
*/
public BufferedOutputStream(OutputStream out, int size) {
super(out);
if (size <= 0) {
throw new IllegalArgumentException("Buffer size <= 0");
}
buf = new byte[size];
}
/**
* Flush the internal buffer
*/
private void flushBuffer() throws IOException {
if (count > 0) {
out.write(buf, 0, count);
count = 0;
}
}
/**
* Writes the specified byte to this buffered output stream.
*
* @param b the byte to be written.
* @exception IOException if an I/O error occurs.
*/
@Override
public void write(int b) throws IOException {
if (count >= buf.length) {
flushBuffer();
}
buf[count++] = (byte) b;
}
/**
* Writes <code>len</code> bytes from the specified byte array starting at offset <code>off</code> to this buffered output stream.
*
* <p>
* Ordinarily this method stores bytes from the given array into this stream's buffer, flushing the buffer to the underlying output stream as needed. If the requested length is at least as large
* as this stream's buffer, however, then this method will flush the buffer and write the bytes directly to the underlying output stream. Thus redundant <code>BufferedOutputStream</code>s will not
* copy data unnecessarily.
*
* @param b the data.
* @param off the start offset in the data.
* @param len the number of bytes to write.
* @exception IOException if an I/O error occurs.
*/
@Override
public void write(byte b[], int off, int len) throws IOException {
if (len >= buf.length) {
/* If the request length exceeds the size of the output buffer,
flush the output buffer and then write the data directly.
In this way buffered streams will cascade harmlessly. */
flushBuffer();
out.write(b, off, len);
return;
}
if (len >= buf.length - count) {
flushBuffer();
}
System.arraycopy(b, off, buf, count, len);
count += len;
}
/**
* Flushes this buffered output stream. This forces any buffered output bytes to be written out to the underlying output stream.
*
* @exception IOException if an I/O error occurs.
* @see java.io.FilterOutputStream#out
*/
@Override
public void flush() throws IOException {
flushBuffer();
out.flush();
public BufferedOutputStream(OutputStream out, int bufferSize) {
super(out, bufferSize);
}
}

View File

@ -16,190 +16,17 @@
*/
package org.apache.nifi.stream.io;
import java.io.InputStream;
/**
* This class performs the same function as java.io.ByteArrayInputStream but does not mark its methods as synchronized
* @deprecated use java.io.ByteArrayInputStream instead
*/
public class ByteArrayInputStream extends InputStream {
@Deprecated
public class ByteArrayInputStream extends java.io.ByteArrayInputStream {
/**
* An array of bytes that was provided by the creator of the stream. Elements <code>buf[0]</code> through <code>buf[count-1]</code> are the only bytes that can ever be read from the stream;
* element <code>buf[pos]</code> is the next byte to be read.
*/
protected byte buf[];
/**
* The index of the next character to read from the input stream buffer. This value should always be nonnegative and not larger than the value of <code>count</code>. The next byte to be read from
* the input stream buffer will be <code>buf[pos]</code>.
*/
protected int pos;
/**
* The currently marked position in the stream. ByteArrayInputStream objects are marked at position zero by default when constructed. They may be marked at another position within the buffer by
* the <code>mark()</code> method. The current buffer position is set to this point by the <code>reset()</code> method.
* <p>
* If no mark has been set, then the value of mark is the offset passed to the constructor (or 0 if the offset was not supplied).
*
* @since JDK1.1
*/
protected int mark = 0;
/**
* The index one greater than the last valid character in the input stream buffer. This value should always be nonnegative and not larger than the length of <code>buf</code>. It is one greater
* than the position of the last byte within <code>buf</code> that can ever be read from the input stream buffer.
*/
protected int count;
/**
* Creates a <code>ByteArrayInputStream</code> so that it uses <code>buf</code> as its buffer array. The buffer array is not copied. The initial value of <code>pos</code> is <code>0</code> and the
* initial value of <code>count</code> is the length of <code>buf</code>.
*
* @param buf the input buffer.
*/
public ByteArrayInputStream(byte buf[]) {
this.buf = buf;
this.pos = 0;
this.count = buf.length;
public ByteArrayInputStream(byte[] buffer) {
super(buffer);
}
/**
* Creates <code>ByteArrayInputStream</code> that uses <code>buf</code> as its buffer array. The initial value of <code>pos</code> is <code>offset</code> and the initial value of
* <code>count</code> is the minimum of <code>offset+length</code> and <code>buf.length</code>. The buffer array is not copied. The buffer's mark is set to the specified offset.
*
* @param buf the input buffer.
* @param offset the offset in the buffer of the first byte to read.
* @param length the maximum number of bytes to read from the buffer.
*/
public ByteArrayInputStream(byte buf[], int offset, int length) {
this.buf = buf;
this.pos = offset;
this.count = Math.min(offset + length, buf.length);
this.mark = offset;
public ByteArrayInputStream(byte[] buffer, int offset, int length) {
super(buffer, offset, length);
}
/**
* Reads the next byte of data from this input stream. The value byte is returned as an <code>int</code> in the range <code>0</code> to <code>255</code>. If no byte is available because the end of
* the stream has been reached, the value <code>-1</code> is returned.
* <p>
* This <code>read</code> method cannot block.
*
* @return the next byte of data, or <code>-1</code> if the end of the stream has been reached.
*/
@Override
public int read() {
return (pos < count) ? (buf[pos++] & 0xff) : -1;
}
/**
* Reads up to <code>len</code> bytes of data into an array of bytes from this input stream. If <code>pos</code> equals <code>count</code>, then <code>-1</code> is returned to indicate end of
* file. Otherwise, the number <code>k</code> of bytes read is equal to the smaller of <code>len</code> and <code>count-pos</code>. If <code>k</code> is positive, then bytes <code>buf[pos]</code>
* through <code>buf[pos+k-1]</code> are copied into <code>b[off]</code> through <code>b[off+k-1]</code> in the manner performed by <code>System.arraycopy</code>. The value <code>k</code> is added
* into <code>pos</code> and <code>k</code> is returned.
* <p>
* This <code>read</code> method cannot block.
*
* @param b the buffer into which the data is read.
* @param off the start offset in the destination array <code>b</code>
* @param len the maximum number of bytes read.
* @return the total number of bytes read into the buffer, or <code>-1</code> if there is no more data because the end of the stream has been reached.
* @exception NullPointerException If <code>b</code> is <code>null</code>.
* @exception IndexOutOfBoundsException If <code>off</code> is negative, <code>len</code> is negative, or <code>len</code> is greater than <code>b.length - off</code>
*/
@Override
public int read(byte b[], int off, int len) {
if (b == null) {
throw new NullPointerException();
} else if (off < 0 || len < 0 || len > b.length - off) {
throw new IndexOutOfBoundsException();
}
if (pos >= count) {
return -1;
}
int avail = count - pos;
if (len > avail) {
len = avail;
}
if (len <= 0) {
return 0;
}
System.arraycopy(buf, pos, b, off, len);
pos += len;
return len;
}
/**
* Skips <code>n</code> bytes of input from this input stream. Fewer bytes might be skipped if the end of the input stream is reached. The actual number <code>k</code> of bytes to be skipped is
* equal to the smaller of <code>n</code> and <code>count-pos</code>. The value <code>k</code> is added into <code>pos</code> and <code>k</code> is returned.
*
* @param n the number of bytes to be skipped.
* @return the actual number of bytes skipped.
*/
@Override
public long skip(long n) {
long k = count - pos;
if (n < k) {
k = n < 0 ? 0 : n;
}
pos += k;
return k;
}
/**
* Returns the number of remaining bytes that can be read (or skipped over) from this input stream.
* <p>
* The value returned is <code>count&nbsp;- pos</code>, which is the number of bytes remaining to be read from the input buffer.
*
* @return the number of remaining bytes that can be read (or skipped over) from this input stream without blocking.
*/
@Override
public int available() {
return count - pos;
}
/**
* Tests if this <code>InputStream</code> supports mark/reset. The <code>markSupported</code> method of <code>ByteArrayInputStream</code> always returns <code>true</code>.
*
* @since JDK1.1
*/
@Override
public boolean markSupported() {
return true;
}
/**
* Set the current marked position in the stream. ByteArrayInputStream objects are marked at position zero by default when constructed. They may be marked at another position within the buffer by
* this method.
* <p>
* If no mark has been set, then the value of the mark is the offset passed to the constructor (or 0 if the offset was not supplied).
*
* <p>
* Note: The <code>readAheadLimit</code> for this class has no meaning.
*
* @since JDK1.1
*/
@Override
public void mark(int readAheadLimit) {
mark = pos;
}
/**
* Resets the buffer to the marked position. The marked position is 0 unless another position was marked or an offset was specified in the constructor.
*/
@Override
public void reset() {
pos = mark;
}
/**
* Closing a <tt>ByteArrayInputStream</tt> has no effect. The methods in this class can be called after the stream has been closed without generating an <tt>IOException</tt>.
* <p>
*/
@Override
public void close() {
}
}

View File

@ -16,205 +16,16 @@
*/
package org.apache.nifi.stream.io;
import java.io.IOException;
import java.io.OutputStream;
import java.io.UnsupportedEncodingException;
import java.util.Arrays;
/**
* This class provides a more efficient implementation of the java.io.ByteArrayOutputStream. The efficiency is gained in two ways:
* <ul>
* <li>The write methods are not synchronized</li>
* <li>The class provides {@link #getUnderlyingBuffer()} and {@link #getBufferLength()}, which can be used to access the underlying byte array directly, rather than the System.arraycopy that
* {@link #toByteArray()} uses
* </ul>
*
*/
public class ByteArrayOutputStream extends OutputStream {
/**
* The buffer where data is stored.
*/
protected byte buf[];
/**
* The number of valid bytes in the buffer.
*/
protected int count;
/**
* Creates a new byte array output stream. The buffer capacity is initially 32 bytes, though its size increases if necessary.
* @deprecated use java.io.ByteArrayOutputStream instead
*/
@Deprecated
public class ByteArrayOutputStream extends java.io.ByteArrayOutputStream {
public ByteArrayOutputStream() {
this(32);
super();
}
/**
* Creates a new byte array output stream, with a buffer capacity of the specified size, in bytes.
*
* @param size the initial size.
* @exception IllegalArgumentException if size is negative.
*/
public ByteArrayOutputStream(int size) {
if (size < 0) {
throw new IllegalArgumentException("Negative initial size: "
+ size);
}
buf = new byte[size];
}
/**
* Increases the capacity if necessary to ensure that it can hold at least the number of elements specified by the minimum capacity argument.
*
* @param minCapacity the desired minimum capacity
* @throws OutOfMemoryError if {@code minCapacity < 0}. This is interpreted as a request for the unsatisfiably large capacity {@code (long) Integer.MAX_VALUE + (minCapacity - Integer.MAX_VALUE)}.
*/
private void ensureCapacity(int minCapacity) {
// overflow-conscious code
if (minCapacity - buf.length > 0) {
grow(minCapacity);
}
}
/**
* Increases the capacity to ensure that it can hold at least the number of elements specified by the minimum capacity argument.
*
* @param minCapacity the desired minimum capacity
*/
private void grow(int minCapacity) {
// overflow-conscious code
int oldCapacity = buf.length;
int newCapacity = oldCapacity << 1;
if (newCapacity - minCapacity < 0) {
newCapacity = minCapacity;
}
if (newCapacity < 0) {
if (minCapacity < 0) { // overflow
throw new OutOfMemoryError();
}
newCapacity = Integer.MAX_VALUE;
}
buf = Arrays.copyOf(buf, newCapacity);
}
/**
* Writes the specified byte to this byte array output stream.
*
* @param b the byte to be written.
*/
@Override
public void write(int b) {
ensureCapacity(count + 1);
buf[count] = (byte) b;
count += 1;
}
/**
* Writes <code>len</code> bytes from the specified byte array starting at offset <code>off</code> to this byte array output stream.
*
* @param b the data.
* @param off the start offset in the data.
* @param len the number of bytes to write.
*/
@Override
public void write(byte b[], int off, int len) {
if ((off < 0) || (off > b.length) || (len < 0)
|| ((off + len) - b.length > 0)) {
throw new IndexOutOfBoundsException();
}
ensureCapacity(count + len);
System.arraycopy(b, off, buf, count, len);
count += len;
}
/**
* Writes the complete contents of this byte array output stream to the specified output stream argument, as if by calling the output stream's write method using
* <code>out.write(buf, 0, count)</code>.
*
* @param out the output stream to which to write the data.
* @exception IOException if an I/O error occurs.
*/
public void writeTo(OutputStream out) throws IOException {
out.write(buf, 0, count);
}
/**
* Resets the <code>count</code> field of this byte array output stream to zero, so that all currently accumulated output in the output stream is discarded. The output stream can be used again,
* reusing the already allocated buffer space.
*
* @see java.io.ByteArrayInputStream#count
*/
public void reset() {
count = 0;
}
/**
* Creates a newly allocated byte array. Its size is the current size of this output stream and the valid contents of the buffer have been copied into it.
*
* @return the current contents of this output stream, as a byte array.
* @see java.io.ByteArrayOutputStream#size()
*/
public byte[] toByteArray() {
return Arrays.copyOf(buf, count);
}
/**
* Returns the current size of the buffer.
*
* @return the value of the <code>count</code> field, which is the number of valid bytes in this output stream.
* @see java.io.ByteArrayOutputStream#count
*/
public int size() {
return count;
}
/**
* Converts the buffer's contents into a string decoding bytes using the platform's default character set. The length of the new <tt>String</tt>
* is a function of the character set, and hence may not be equal to the size of the buffer.
*
* <p>
* This method always replaces malformed-input and unmappable-character sequences with the default replacement string for the platform's default character set. The
* {@linkplain java.nio.charset.CharsetDecoder} class should be used when more control over the decoding process is required.
*
* @return String decoded from the buffer's contents.
* @since JDK1.1
*/
@Override
public String toString() {
return new String(buf, 0, count);
}
/**
* Converts the buffer's contents into a string by decoding the bytes using the specified {@link java.nio.charset.Charset charsetName}. The length of the new <tt>String</tt> is a function of the
* charset, and hence may not be equal to the length of the byte array.
*
* <p>
* This method always replaces malformed-input and unmappable-character sequences with this charset's default replacement string. The {@link
* java.nio.charset.CharsetDecoder} class should be used when more control over the decoding process is required.
*
* @param charsetName the name of a supported {@linkplain java.nio.charset.Charset <code>charset</code>}
* @return String decoded from the buffer's contents.
* @exception UnsupportedEncodingException If the named charset is not supported
* @since JDK1.1
*/
public String toString(String charsetName) throws UnsupportedEncodingException {
return new String(buf, 0, count, charsetName);
}
/**
* Closing a <tt>ByteArrayOutputStream</tt> has no effect. The methods in this class can be called after the stream has been closed without generating an <tt>IOException</tt>.
* <p>
*
*/
@Override
public void close() {
}
public byte[] getUnderlyingBuffer() {
return buf;
}
public int getBufferLength() {
return count;
public ByteArrayOutputStream(int initialBufferSize) {
super(initialBufferSize);
}
}

View File

@ -16,355 +16,15 @@
*/
package org.apache.nifi.stream.io;
import java.io.DataOutput;
import java.io.FilterOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.io.UTFDataFormatException;
/**
* This class is different from java.io.DataOutputStream in that it does synchronize on its methods.
* @deprecated use java.io.DataOutputStream instead
*/
public class DataOutputStream extends FilterOutputStream implements DataOutput {
@Deprecated
public class DataOutputStream extends java.io.DataOutputStream {
/**
* The number of bytes written to the data output stream so far. If this counter overflows, it will be wrapped to Integer.MAX_VALUE.
*/
protected int written;
/**
* bytearr is initialized on demand by writeUTF
*/
private byte[] bytearr = null;
/**
* Creates a new data output stream to write data to the specified underlying output stream. The counter <code>written</code> is set to zero.
*
* @param out the underlying output stream, to be saved for later use.
* @see java.io.FilterOutputStream#out
*/
public DataOutputStream(OutputStream out) {
super(out);
}
/**
* Increases the written counter by the specified value until it reaches Integer.MAX_VALUE.
*/
private void incCount(int value) {
int temp = written + value;
if (temp < 0) {
temp = Integer.MAX_VALUE;
}
written = temp;
}
/**
* Writes the specified byte (the low eight bits of the argument <code>b</code>) to the underlying output stream. If no exception is thrown, the counter <code>written</code> is incremented by
* <code>1</code>.
* <p>
* Implements the <code>write</code> method of <code>OutputStream</code>.
*
* @param b the <code>byte</code> to be written.
* @exception IOException if an I/O error occurs.
* @see java.io.FilterOutputStream#out
*/
@Override
public void write(int b) throws IOException {
out.write(b);
incCount(1);
}
/**
* Writes <code>len</code> bytes from the specified byte array starting at offset <code>off</code> to the underlying output stream. If no exception is thrown, the counter <code>written</code> is
* incremented by <code>len</code>.
*
* @param b the data.
* @param off the start offset in the data.
* @param len the number of bytes to write.
* @exception IOException if an I/O error occurs.
* @see java.io.FilterOutputStream#out
*/
@Override
public void write(byte b[], int off, int len) throws IOException {
out.write(b, off, len);
incCount(len);
}
/**
* Flushes this data output stream. This forces any buffered output bytes to be written out to the stream.
* <p>
* The <code>flush</code> method of <code>DataOutputStream</code> calls the <code>flush</code> method of its underlying output stream.
*
* @exception IOException if an I/O error occurs.
* @see java.io.FilterOutputStream#out
* @see java.io.OutputStream#flush()
*/
@Override
public void flush() throws IOException {
out.flush();
}
/**
* Writes a <code>boolean</code> to the underlying output stream as a 1-byte value. The value <code>true</code> is written out as the value <code>(byte)1</code>; the value <code>false</code> is
* written out as the value <code>(byte)0</code>. If no exception is thrown, the counter <code>written</code> is incremented by <code>1</code>.
*
* @param v a <code>boolean</code> value to be written.
* @exception IOException if an I/O error occurs.
* @see java.io.FilterOutputStream#out
*/
@Override
public final void writeBoolean(boolean v) throws IOException {
out.write(v ? 1 : 0);
incCount(1);
}
/**
* Writes out a <code>byte</code> to the underlying output stream as a 1-byte value. If no exception is thrown, the counter <code>written</code> is incremented by <code>1</code>.
*
* @param v a <code>byte</code> value to be written.
* @exception IOException if an I/O error occurs.
* @see java.io.FilterOutputStream#out
*/
@Override
public final void writeByte(int v) throws IOException {
out.write(v);
incCount(1);
}
/**
* Writes a <code>short</code> to the underlying output stream as two bytes, high byte first. If no exception is thrown, the counter <code>written</code> is incremented by <code>2</code>.
*
* @param v a <code>short</code> to be written.
* @exception IOException if an I/O error occurs.
* @see java.io.FilterOutputStream#out
*/
@Override
public final void writeShort(int v) throws IOException {
out.write((v >>> 8) & 0xFF);
out.write((v) & 0xFF);
incCount(2);
}
/**
* Writes a <code>char</code> to the underlying output stream as a 2-byte value, high byte first. If no exception is thrown, the counter <code>written</code> is incremented by <code>2</code>.
*
* @param v a <code>char</code> value to be written.
* @exception IOException if an I/O error occurs.
* @see java.io.FilterOutputStream#out
*/
@Override
public final void writeChar(int v) throws IOException {
out.write((v >>> 8) & 0xFF);
out.write((v) & 0xFF);
incCount(2);
}
/**
* Writes an <code>int</code> to the underlying output stream as four bytes, high byte first. If no exception is thrown, the counter <code>written</code> is incremented by <code>4</code>.
*
* @param v an <code>int</code> to be written.
* @exception IOException if an I/O error occurs.
* @see java.io.FilterOutputStream#out
*/
@Override
public final void writeInt(int v) throws IOException {
out.write((v >>> 24) & 0xFF);
out.write((v >>> 16) & 0xFF);
out.write((v >>> 8) & 0xFF);
out.write((v) & 0xFF);
incCount(4);
}
private final byte writeBuffer[] = new byte[8];
/**
* Writes a <code>long</code> to the underlying output stream as eight bytes, high byte first. In no exception is thrown, the counter <code>written</code> is incremented by <code>8</code>.
*
* @param v a <code>long</code> to be written.
* @exception IOException if an I/O error occurs.
* @see java.io.FilterOutputStream#out
*/
@Override
public final void writeLong(long v) throws IOException {
writeBuffer[0] = (byte) (v >>> 56);
writeBuffer[1] = (byte) (v >>> 48);
writeBuffer[2] = (byte) (v >>> 40);
writeBuffer[3] = (byte) (v >>> 32);
writeBuffer[4] = (byte) (v >>> 24);
writeBuffer[5] = (byte) (v >>> 16);
writeBuffer[6] = (byte) (v >>> 8);
writeBuffer[7] = (byte) (v);
out.write(writeBuffer, 0, 8);
incCount(8);
}
/**
* Converts the float argument to an <code>int</code> using the <code>floatToIntBits</code> method in class <code>Float</code>, and then writes that <code>int</code> value to the underlying output
* stream as a 4-byte quantity, high byte first. If no exception is thrown, the counter <code>written</code> is incremented by <code>4</code>.
*
* @param v a <code>float</code> value to be written.
* @exception IOException if an I/O error occurs.
* @see java.io.FilterOutputStream#out
* @see java.lang.Float#floatToIntBits(float)
*/
@Override
public final void writeFloat(float v) throws IOException {
writeInt(Float.floatToIntBits(v));
}
/**
* Converts the double argument to a <code>long</code> using the <code>doubleToLongBits</code> method in class <code>Double</code>, and then writes that <code>long</code> value to the underlying
* output stream as an 8-byte quantity, high byte first. If no exception is thrown, the counter <code>written</code> is incremented by <code>8</code>.
*
* @param v a <code>double</code> value to be written.
* @exception IOException if an I/O error occurs.
* @see java.io.FilterOutputStream#out
* @see java.lang.Double#doubleToLongBits(double)
*/
@Override
public final void writeDouble(double v) throws IOException {
writeLong(Double.doubleToLongBits(v));
}
/**
* Writes out the string to the underlying output stream as a sequence of bytes. Each character in the string is written out, in sequence, by discarding its high eight bits. If no exception is
* thrown, the counter <code>written</code> is incremented by the length of <code>s</code>.
*
* @param s a string of bytes to be written.
* @exception IOException if an I/O error occurs.
* @see java.io.FilterOutputStream#out
*/
@Override
public final void writeBytes(String s) throws IOException {
int len = s.length();
for (int i = 0; i < len; i++) {
out.write((byte) s.charAt(i));
}
incCount(len);
}
/**
* Writes a string to the underlying output stream as a sequence of characters. Each character is written to the data output stream as if by the <code>writeChar</code> method. If no exception is
* thrown, the counter <code>written</code> is incremented by twice the length of <code>s</code>.
*
* @param s a <code>String</code> value to be written.
* @exception IOException if an I/O error occurs.
* @see java.io.DataOutputStream#writeChar(int)
* @see java.io.FilterOutputStream#out
*/
@Override
public final void writeChars(String s) throws IOException {
int len = s.length();
for (int i = 0; i < len; i++) {
int v = s.charAt(i);
out.write((v >>> 8) & 0xFF);
out.write((v) & 0xFF);
}
incCount(len * 2);
}
/**
* Writes a string to the underlying output stream using
* <a href="DataInput.html#modified-utf-8">modified UTF-8</a>
* encoding in a machine-independent manner.
* <p>
* First, two bytes are written to the output stream as if by the <code>writeShort</code> method giving the number of bytes to follow. This value is the number of bytes actually written out, not
* the length of the string. Following the length, each character of the string is output, in sequence, using the modified UTF-8 encoding for the character. If no exception is thrown, the counter
* <code>written</code> is incremented by the total number of bytes written to the output stream. This will be at least two plus the length of <code>str</code>, and at most two plus thrice the
* length of <code>str</code>.
*
* @param str a string to be written.
* @exception IOException if an I/O error occurs.
*/
@Override
public final void writeUTF(String str) throws IOException {
writeUTF(str, this);
}
/**
* Writes a string to the specified DataOutput using
* <a href="DataInput.html#modified-utf-8">modified UTF-8</a>
* encoding in a machine-independent manner.
* <p>
* First, two bytes are written to out as if by the <code>writeShort</code> method giving the number of bytes to follow. This value is the number of bytes actually written out, not the length of
* the string. Following the length, each character of the string is output, in sequence, using the modified UTF-8 encoding for the character. If no exception is thrown, the counter
* <code>written</code> is incremented by the total number of bytes written to the output stream. This will be at least two plus the length of <code>str</code>, and at most two plus thrice the
* length of <code>str</code>.
*
* @param str a string to be written.
* @param out destination to write to
* @return The number of bytes written out.
* @exception IOException if an I/O error occurs.
*/
static int writeUTF(String str, DataOutput out) throws IOException {
int strlen = str.length();
int utflen = 0;
int c, count = 0;
/* use charAt instead of copying String to char array */
for (int i = 0; i < strlen; i++) {
c = str.charAt(i);
if ((c >= 0x0001) && (c <= 0x007F)) {
utflen++;
} else if (c > 0x07FF) {
utflen += 3;
} else {
utflen += 2;
}
}
if (utflen > 65535) {
throw new UTFDataFormatException("encoded string too long: " + utflen + " bytes");
}
byte[] bytearr = null;
if (out instanceof DataOutputStream) {
DataOutputStream dos = (DataOutputStream) out;
if (dos.bytearr == null || (dos.bytearr.length < (utflen + 2))) {
dos.bytearr = new byte[(utflen * 2) + 2];
}
bytearr = dos.bytearr;
} else {
bytearr = new byte[utflen + 2];
}
bytearr[count++] = (byte) ((utflen >>> 8) & 0xFF);
bytearr[count++] = (byte) ((utflen) & 0xFF);
int i = 0;
for (i = 0; i < strlen; i++) {
c = str.charAt(i);
if (!((c >= 0x0001) && (c <= 0x007F))) {
break;
}
bytearr[count++] = (byte) c;
}
for (; i < strlen; i++) {
c = str.charAt(i);
if ((c >= 0x0001) && (c <= 0x007F)) {
bytearr[count++] = (byte) c;
} else if (c > 0x07FF) {
bytearr[count++] = (byte) (0xE0 | ((c >> 12) & 0x0F));
bytearr[count++] = (byte) (0x80 | ((c >> 6) & 0x3F));
bytearr[count++] = (byte) (0x80 | ((c) & 0x3F));
} else {
bytearr[count++] = (byte) (0xC0 | ((c >> 6) & 0x1F));
bytearr[count++] = (byte) (0x80 | ((c) & 0x3F));
}
}
out.write(bytearr, 0, utflen + 2);
return utflen + 2;
}
/**
* Returns the current value of the counter <code>written</code>, the number of bytes written to this data output stream so far. If the counter overflows, it will be wrapped to Integer.MAX_VALUE.
*
* @return the value of the <code>written</code> field.
* @see java.io.DataOutputStream#written
*/
public final int size() {
return written;
}
}

View File

@ -95,11 +95,11 @@ public class LeakyBucketStreamThrottler implements StreamThrottler {
public int read() throws IOException {
final ByteArrayOutputStream baos = new ByteArrayOutputStream(1);
LeakyBucketStreamThrottler.this.copy(toWrap, baos, 1L);
if (baos.getBufferLength() < 1) {
if (baos.size() < 1) {
return -1;
}
return baos.getUnderlyingBuffer()[0] & 0xFF;
return baos.toByteArray()[0] & 0xFF;
}
@Override
@ -124,7 +124,7 @@ public class LeakyBucketStreamThrottler implements StreamThrottler {
if (copied == 0) {
return -1;
}
System.arraycopy(baos.getUnderlyingBuffer(), 0, b, off, copied);
System.arraycopy(baos.toByteArray(), 0, b, off, copied);
return copied;
}

View File

@ -107,8 +107,8 @@ public class TestLeakyBucketThrottler {
assertTrue(elapsed < 7000);
// ensure bytes were copied out appropriately
assertEquals(3 * (2 * 1024 * 1024 + 1), baos.getBufferLength());
assertEquals((byte) 'A', baos.getUnderlyingBuffer()[baos.getBufferLength() - 1]);
assertEquals(3 * (2 * 1024 * 1024 + 1), baos.size());
assertEquals((byte) 'A', baos.toByteArray()[baos.size() - 1]);
}
}

View File

@ -184,11 +184,12 @@ public class PutSplunk extends AbstractPutEventProcessor {
// if TCP and we don't end in a new line then add one
final String protocol = context.getProperty(PROTOCOL).getValue();
if (protocol.equals(TCP_VALUE.getValue())) {
final byte[] buf = baos.getUnderlyingBuffer();
if (buf[baos.size() - 1] != NEW_LINE_CHAR) {
baos.write(NEW_LINE_CHAR);
}
byte[] buf = baos.toByteArray();
if (protocol.equals(TCP_VALUE.getValue()) && buf[buf.length - 1] != NEW_LINE_CHAR) {
final byte[] updatedBuf = new byte[buf.length + 1];
System.arraycopy(buf, 0, updatedBuf, 0, buf.length);
updatedBuf[updatedBuf.length - 1] = NEW_LINE_CHAR;
buf = updatedBuf;
}
// create a message batch of one message and add to active batches
@ -198,7 +199,7 @@ public class PutSplunk extends AbstractPutEventProcessor {
// attempt to send the data and add the appropriate range
try {
sender.send(baos.toByteArray());
sender.send(buf);
messageBatch.addSuccessfulRange(0L, flowFile.getSize());
} catch (IOException e) {
messageBatch.addFailedRange(0L, flowFile.getSize(), e);
@ -281,6 +282,9 @@ public class PutSplunk extends AbstractPutEventProcessor {
});
messageBatch.setNumMessages(messagesSent.get());
} catch (final IOException ioe) {
// Since this can be thrown only from closing the ByteArrayOutputStream(), we have already
// completed everything that we need to do, so there's nothing really to be done here
}
}
@ -299,7 +303,7 @@ public class PutSplunk extends AbstractPutEventProcessor {
return null;
}
final byte[] buf = baos.getUnderlyingBuffer();
final byte[] buf = baos.toByteArray();
// if TCP and we don't already end with a new line then add one
if (protocol.equals(TCP_VALUE.getValue()) && buf[length - 1] != NEW_LINE_CHAR) {
@ -311,7 +315,7 @@ public class PutSplunk extends AbstractPutEventProcessor {
message[message.length - 1] = NEW_LINE_CHAR;
return message;
} else {
return Arrays.copyOfRange(baos.getUnderlyingBuffer(), 0, length);
return Arrays.copyOfRange(baos.toByteArray(), 0, length);
}
}

View File

@ -810,11 +810,12 @@ public class TailFile extends AbstractProcessor {
byte ch = buffer.get(i);
switch (ch) {
case '\n':
case '\n': {
baos.write(ch);
seenCR = false;
baos.writeTo(out);
checksum.update(baos.getUnderlyingBuffer(), 0, baos.size());
final byte[] baosBuffer = baos.toByteArray();
checksum.update(baosBuffer, 0, baos.size());
if (getLogger().isTraceEnabled()) {
getLogger().trace("Checksum updated to {}", new Object[]{checksum.getValue()});
}
@ -823,15 +824,18 @@ public class TailFile extends AbstractProcessor {
rePos = pos + i + 1;
linesRead++;
break;
case '\r':
}
case '\r': {
baos.write(ch);
seenCR = true;
break;
default:
}
default: {
if (seenCR) {
seenCR = false;
baos.writeTo(out);
checksum.update(baos.getUnderlyingBuffer(), 0, baos.size());
final byte[] baosBuffer = baos.toByteArray();
checksum.update(baosBuffer, 0, baos.size());
if (getLogger().isTraceEnabled()) {
getLogger().trace("Checksum updated to {}", new Object[]{checksum.getValue()});
}
@ -845,6 +849,7 @@ public class TailFile extends AbstractProcessor {
}
}
}
}
pos = reader.position();
}

View File

@ -185,7 +185,11 @@ public class JmsFactory {
baos.write(byteBuffer, 0, byteCount);
}
try {
baos.close();
} catch (final IOException ioe) {
}
return baos.toByteArray();
}