diff --git a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/BufferedInputStream.java b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/BufferedInputStream.java
index 324f59f72f..8dba8999e1 100644
--- a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/BufferedInputStream.java
+++ b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/BufferedInputStream.java
@@ -16,445 +16,19 @@
*/
package org.apache.nifi.stream.io;
-import java.io.IOException;
import java.io.InputStream;
/**
- * This class is a slight modification of the BufferedInputStream in the java.io package. The modification is that this implementation does not provide synchronization on method calls, which means
- * that this class is not suitable for use by multiple threads. However, the absence of these synchronized blocks results in potentially much better performance.
+ * @deprecated use java.io.BufferedInputStream instead
*/
-public class BufferedInputStream extends InputStream {
+@Deprecated
+public class BufferedInputStream extends java.io.BufferedInputStream {
- private final InputStream in;
-
- private static int DEFAULT_BUFFER_SIZE = 8192;
-
- /**
- * The maximum size of array to allocate.
- * Some VMs reserve some header words in an array.
- * Attempts to allocate larger arrays may result in
- * OutOfMemoryError: Requested array size exceeds VM limit
- */
- private static int MAX_BUFFER_SIZE = Integer.MAX_VALUE - 8;
-
- /**
- * The internal buffer array where the data is stored. When necessary,
- * it may be replaced by another array of
- * a different size.
- */
- protected byte buf[];
-
- /**
- * The index one greater than the index of the last valid byte in
- * the buffer.
- * This value is always
- * in the range 0
through buf.length
;
- * elements buf[0]
through buf[count-1]
- *
contain buffered input data obtained
- * from the underlying input stream.
- */
- private int count;
-
- /**
- * The current position in the buffer. This is the index of the next
- * character to be read from the buf
array.
- *
- * This value is always in the range 0
- * through count
. If it is less
- * than count
, then buf[pos]
- * is the next byte to be supplied as input;
- * if it is equal to count
, then
- * the next read
or skip
- * operation will require more bytes to be
- * read from the contained input stream.
- *
- * @see java.io.BufferedInputStream#buf
- */
- private int pos;
-
- /**
- * The value of the pos
field at the time the last
- * mark
method was called.
- *
- * This value is always
- * in the range -1
through pos
.
- * If there is no marked position in the input
- * stream, this field is -1
. If
- * there is a marked position in the input
- * stream, then buf[markpos]
- * is the first byte to be supplied as input
- * after a reset
operation. If
- * markpos
is not -1
,
- * then all bytes from positions buf[markpos]
- * through buf[pos-1]
must remain
- * in the buffer array (though they may be
- * moved to another place in the buffer array,
- * with suitable adjustments to the values
- * of count
, pos
,
- * and markpos
); they may not
- * be discarded unless and until the difference
- * between pos
and markpos
- * exceeds marklimit
.
- *
- * @see java.io.BufferedInputStream#mark(int)
- * @see java.io.BufferedInputStream#pos
- */
- protected int markpos = -1;
-
- /**
- * The maximum read ahead allowed after a call to the
- * mark
method before subsequent calls to the
- * reset
method fail.
- * Whenever the difference between pos
- * and markpos
exceeds marklimit
,
- * then the mark may be dropped by setting
- * markpos
to -1
.
- *
- * @see java.io.BufferedInputStream#mark(int)
- * @see java.io.BufferedInputStream#reset()
- */
- protected int marklimit;
-
- /**
- * Check to make sure that underlying input stream has not been
- * nulled out due to close; if not return it;
- */
- private InputStream getInIfOpen() throws IOException {
- InputStream input = in;
- if (input == null) {
- throw new IOException("Stream closed");
- }
- return input;
- }
-
- /**
- * Check to make sure that buffer has not been nulled out due to
- * close; if not return it;
- */
- private byte[] getBufIfOpen() throws IOException {
- if (buf == null) {
- throw new IOException("Stream closed");
- }
- return buf;
- }
-
- /**
- * Creates a BufferedInputStream
- * and saves its argument, the input stream
- * in
, for later use. An internal
- * buffer array is created and stored in buf
.
- *
- * @param in the underlying input stream.
- */
public BufferedInputStream(InputStream in) {
- this(in, DEFAULT_BUFFER_SIZE);
+ super(in);
}
- /**
- * Creates a BufferedInputStream
- * with the specified buffer size,
- * and saves its argument, the input stream
- * in
, for later use. An internal
- * buffer array of length size
- * is created and stored in buf
.
- *
- * @param in the underlying input stream.
- * @param size the buffer size.
- * @exception IllegalArgumentException if {@code size <= 0}.
- */
- public BufferedInputStream(InputStream in, int size) {
- this.in = in;
- if (size <= 0) {
- throw new IllegalArgumentException("Buffer size <= 0");
- }
- buf = new byte[size];
- }
-
- /**
- * Fills the buffer with more data, taking into account
- * shuffling and other tricks for dealing with marks.
- * Assumes that it is being called by a synchronized method.
- * This method also assumes that all data has already been read in,
- * hence pos > count.
- */
- private void fill() throws IOException {
- byte[] buffer = getBufIfOpen();
- if (markpos < 0) {
- pos = 0; /* no mark: throw away the buffer */
- } else if (pos >= buffer.length) {
- if (markpos > 0) { /* can throw away early part of the buffer */
- int sz = pos - markpos;
- System.arraycopy(buffer, markpos, buffer, 0, sz);
- pos = sz;
- markpos = 0;
- } else if (buffer.length >= marklimit) {
- markpos = -1; /* buffer got too big, invalidate mark */
- pos = 0; /* drop buffer contents */
- } else if (buffer.length >= MAX_BUFFER_SIZE) {
- throw new OutOfMemoryError("Required array size too large");
- } else { /* grow buffer */
- int nsz = (pos <= MAX_BUFFER_SIZE - pos) ? pos * 2 : MAX_BUFFER_SIZE;
- if (nsz > marklimit) {
- nsz = marklimit;
- }
- byte nbuf[] = new byte[nsz];
- System.arraycopy(buffer, 0, nbuf, 0, pos);
- buffer = nbuf;
- }
- }
- count = pos;
- int n = getInIfOpen().read(buffer, pos, buffer.length - pos);
- if (n > 0) {
- count = n + pos;
- }
- }
-
- /**
- * See
- * the general contract of the read
- * method of InputStream
.
- *
- * @return the next byte of data, or -1
if the end of the
- * stream is reached.
- * @exception IOException if this input stream has been closed by
- * invoking its {@link #close()} method,
- * or an I/O error occurs.
- * @see java.io.FilterInputStream#in
- */
- @Override
- public int read() throws IOException {
- if (pos >= count) {
- fill();
- if (pos >= count) {
- return -1;
- }
- }
- return getBufIfOpen()[pos++] & 0xff;
- }
-
- /**
- * Read characters into a portion of an array, reading from the underlying
- * stream at most once if necessary.
- */
- private int read1(byte[] b, int off, int len) throws IOException {
- int avail = count - pos;
- if (avail <= 0) {
- /*
- * If the requested length is at least as large as the buffer, and
- * if there is no mark/reset activity, do not bother to copy the
- * bytes into the local buffer. In this way buffered streams will
- * cascade harmlessly.
- */
- if (len >= getBufIfOpen().length && markpos < 0) {
- return getInIfOpen().read(b, off, len);
- }
- fill();
- avail = count - pos;
- if (avail <= 0) {
- return -1;
- }
- }
- int cnt = (avail < len) ? avail : len;
- System.arraycopy(getBufIfOpen(), pos, b, off, cnt);
- pos += cnt;
- return cnt;
- }
-
- /**
- * Reads bytes from this byte-input stream into the specified byte array,
- * starting at the given offset.
- *
- *
- * This method implements the general contract of the corresponding
- * {@link InputStream#read(byte[], int, int) read}
method of
- * the {@link InputStream}
class. As an additional
- * convenience, it attempts to read as many bytes as possible by repeatedly
- * invoking the read
method of the underlying stream. This
- * iterated read
continues until one of the following
- * conditions becomes true:
- *
read
method of the underlying stream returns
- * -1
, indicating end-of-file, or
- *
- * available
method of the underlying stream
- * returns zero, indicating that further input requests would block.
- *
- * read
on the underlying stream returns
- * -1
to indicate end-of-file then this method returns
- * -1
. Otherwise this method returns the number of bytes
- * actually read.
- *
- *
- * Subclasses of this class are encouraged, but not required, to
- * attempt to read as many bytes as possible in the same fashion.
- *
- * @param b destination buffer.
- * @param off offset at which to start storing bytes.
- * @param len maximum number of bytes to read.
- * @return the number of bytes read, or -1
if the end of
- * the stream has been reached.
- * @exception IOException if this input stream has been closed by
- * invoking its {@link #close()} method,
- * or an I/O error occurs.
- */
- @Override
- public int read(byte b[], int off, int len)
- throws IOException {
- getBufIfOpen(); // Check for closed stream
- if ((off | len | (off + len) | (b.length - (off + len))) < 0) {
- throw new IndexOutOfBoundsException();
- } else if (len == 0) {
- return 0;
- }
-
- int n = 0;
- for (;;) {
- int nread = read1(b, off + n, len - n);
- if (nread <= 0) {
- return (n == 0) ? nread : n;
- }
- n += nread;
- if (n >= len) {
- return n;
- }
- // if not closed but no bytes available, return
- InputStream input = in;
- if (input != null && input.available() <= 0) {
- return n;
- }
- }
- }
-
- /**
- * See the general contract of the skip
- * method of InputStream
.
- *
- * @exception IOException if the stream does not support seek,
- * or if this input stream has been closed by
- * invoking its {@link #close()} method, or an
- * I/O error occurs.
- */
- @Override
- public long skip(long n) throws IOException {
- getBufIfOpen(); // Check for closed stream
- if (n <= 0) {
- return 0;
- }
- long avail = count - pos;
-
- if (avail <= 0) {
- // If no mark position set then don't keep in buffer
- if (markpos < 0) {
- return getInIfOpen().skip(n);
- }
-
- // Fill in buffer to save bytes for reset
- fill();
- avail = count - pos;
- if (avail <= 0) {
- return 0;
- }
- }
-
- long skipped = (avail < n) ? avail : n;
- pos += skipped;
- return skipped;
- }
-
- /**
- * Returns an estimate of the number of bytes that can be read (or
- * skipped over) from this input stream without blocking by the next
- * invocation of a method for this input stream. The next invocation might be
- * the same thread or another thread. A single read or skip of this
- * many bytes will not block, but may read or skip fewer bytes.
- *
- * This method returns the sum of the number of bytes remaining to be read in
- * the buffer (count - pos
) and the result of calling the
- * {@link java.io.FilterInputStream#in in}.available().
- *
- * @return an estimate of the number of bytes that can be read (or skipped
- * over) from this input stream without blocking.
- * @exception IOException if this input stream has been closed by
- * invoking its {@link #close()} method,
- * or an I/O error occurs.
- */
- @Override
- public int available() throws IOException {
- int n = count - pos;
- int avail = getInIfOpen().available();
- return n > (Integer.MAX_VALUE - avail) ? Integer.MAX_VALUE : n + avail;
- }
-
- /**
- * See the general contract of the mark
- * method of InputStream
.
- *
- * @param readlimit the maximum limit of bytes that can be read before
- * the mark position becomes invalid.
- * @see java.io.BufferedInputStream#reset()
- */
- @Override
- public void mark(int readlimit) {
- marklimit = readlimit;
- markpos = pos;
- }
-
- /**
- * See the general contract of the reset
- * method of InputStream
.
- *
- * If markpos
is -1
- * (no mark has been set or the mark has been
- * invalidated), an IOException
- * is thrown. Otherwise, pos
is
- * set equal to markpos
.
- *
- * @exception IOException if this stream has not been marked or,
- * if the mark has been invalidated, or the stream
- * has been closed by invoking its {@link #close()}
- * method, or an I/O error occurs.
- * @see java.io.BufferedInputStream#mark(int)
- */
- @Override
- public void reset() throws IOException {
- getBufIfOpen(); // Cause exception if closed
- if (markpos < 0) {
- throw new IOException("Resetting to invalid mark");
- }
- pos = markpos;
- }
-
- /**
- * Tests if this input stream supports the mark
- * and reset
methods. The markSupported
- * method of BufferedInputStream
returns
- * true
.
- *
- * @return a boolean
indicating if this stream type supports
- * the mark
and reset
methods.
- * @see java.io.InputStream#mark(int)
- * @see java.io.InputStream#reset()
- */
- @Override
- public boolean markSupported() {
- return true;
- }
-
- /**
- * Closes this input stream and releases any system resources
- * associated with the stream.
- * Once the stream has been closed, further read(), available(), reset(),
- * or skip() invocations will throw an IOException.
- * Closing a previously closed stream has no effect.
- *
- * @exception IOException if an I/O error occurs.
- */
- @Override
- public void close() throws IOException {
- this.in.close();
+ public BufferedInputStream(InputStream in, int bufferSize) {
+ super(in, bufferSize);
}
}
diff --git a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/BufferedOutputStream.java b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/BufferedOutputStream.java
index dc56927175..8c28eca682 100644
--- a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/BufferedOutputStream.java
+++ b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/BufferedOutputStream.java
@@ -16,114 +16,19 @@
*/
package org.apache.nifi.stream.io;
-import java.io.FilterOutputStream;
-import java.io.IOException;
import java.io.OutputStream;
/**
- * This class is a slight modification of the {@link java.io.BufferedOutputStream} class. This implementation differs in that it does not mark methods as synchronized. This means that this class is
- * not suitable for writing by multiple concurrent threads. However, the removal of the synchronized keyword results in potentially much better performance.
+ * @deprecated use java.io.BufferedOutputStream instead
*/
-public class BufferedOutputStream extends FilterOutputStream {
+@Deprecated
+public class BufferedOutputStream extends java.io.BufferedOutputStream {
- /**
- * The internal buffer where data is stored.
- */
- protected byte buf[];
-
- /**
- * The number of valid bytes in the buffer. This value is always in the range 0 through buf.length; elements
- * buf[0] through buf[count-1] contain valid byte data.
- */
- protected int count;
-
- /**
- * Creates a new buffered output stream to write data to the specified underlying output stream.
- *
- * @param out the underlying output stream.
- */
public BufferedOutputStream(OutputStream out) {
- this(out, 8192);
- }
-
- /**
- * Creates a new buffered output stream to write data to the specified underlying output stream with the specified buffer size.
- *
- * @param out the underlying output stream.
- * @param size the buffer size.
- * @exception IllegalArgumentException if size <= 0.
- */
- public BufferedOutputStream(OutputStream out, int size) {
super(out);
- if (size <= 0) {
- throw new IllegalArgumentException("Buffer size <= 0");
- }
- buf = new byte[size];
}
- /**
- * Flush the internal buffer
- */
- private void flushBuffer() throws IOException {
- if (count > 0) {
- out.write(buf, 0, count);
- count = 0;
- }
- }
-
- /**
- * Writes the specified byte to this buffered output stream.
- *
- * @param b the byte to be written.
- * @exception IOException if an I/O error occurs.
- */
- @Override
- public void write(int b) throws IOException {
- if (count >= buf.length) {
- flushBuffer();
- }
- buf[count++] = (byte) b;
- }
-
- /**
- * Writes len
bytes from the specified byte array starting at offset off
to this buffered output stream.
- *
- *
- * Ordinarily this method stores bytes from the given array into this stream's buffer, flushing the buffer to the underlying output stream as needed. If the requested length is at least as large
- * as this stream's buffer, however, then this method will flush the buffer and write the bytes directly to the underlying output stream. Thus redundant BufferedOutputStream
s will not
- * copy data unnecessarily.
- *
- * @param b the data.
- * @param off the start offset in the data.
- * @param len the number of bytes to write.
- * @exception IOException if an I/O error occurs.
- */
- @Override
- public void write(byte b[], int off, int len) throws IOException {
- if (len >= buf.length) {
- /* If the request length exceeds the size of the output buffer,
- flush the output buffer and then write the data directly.
- In this way buffered streams will cascade harmlessly. */
- flushBuffer();
- out.write(b, off, len);
- return;
- }
- if (len >= buf.length - count) {
- flushBuffer();
- }
- System.arraycopy(b, off, buf, count, len);
- count += len;
- }
-
- /**
- * Flushes this buffered output stream. This forces any buffered output bytes to be written out to the underlying output stream.
- *
- * @exception IOException if an I/O error occurs.
- * @see java.io.FilterOutputStream#out
- */
- @Override
- public void flush() throws IOException {
- flushBuffer();
- out.flush();
+ public BufferedOutputStream(OutputStream out, int bufferSize) {
+ super(out, bufferSize);
}
}
diff --git a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/ByteArrayInputStream.java b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/ByteArrayInputStream.java
index 85c8c4fdd7..fca5894819 100644
--- a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/ByteArrayInputStream.java
+++ b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/ByteArrayInputStream.java
@@ -16,190 +16,17 @@
*/
package org.apache.nifi.stream.io;
-import java.io.InputStream;
-
/**
- * This class performs the same function as java.io.ByteArrayInputStream but does not mark its methods as synchronized
+ * @deprecated use java.io.ByteArrayInputStream instead
*/
-public class ByteArrayInputStream extends InputStream {
+@Deprecated
+public class ByteArrayInputStream extends java.io.ByteArrayInputStream {
- /**
- * An array of bytes that was provided by the creator of the stream. Elements buf[0]
through buf[count-1]
are the only bytes that can ever be read from the stream;
- * element buf[pos]
is the next byte to be read.
- */
- protected byte buf[];
-
- /**
- * The index of the next character to read from the input stream buffer. This value should always be nonnegative and not larger than the value of count
. The next byte to be read from
- * the input stream buffer will be buf[pos]
.
- */
- protected int pos;
-
- /**
- * The currently marked position in the stream. ByteArrayInputStream objects are marked at position zero by default when constructed. They may be marked at another position within the buffer by
- * the mark()
method. The current buffer position is set to this point by the reset()
method.
- *
- * If no mark has been set, then the value of mark is the offset passed to the constructor (or 0 if the offset was not supplied).
- *
- * @since JDK1.1
- */
- protected int mark = 0;
-
- /**
- * The index one greater than the last valid character in the input stream buffer. This value should always be nonnegative and not larger than the length of buf
. It is one greater
- * than the position of the last byte within buf
that can ever be read from the input stream buffer.
- */
- protected int count;
-
- /**
- * Creates a ByteArrayInputStream
so that it uses buf
as its buffer array. The buffer array is not copied. The initial value of pos
is 0
and the
- * initial value of count
is the length of buf
.
- *
- * @param buf the input buffer.
- */
- public ByteArrayInputStream(byte buf[]) {
- this.buf = buf;
- this.pos = 0;
- this.count = buf.length;
+ public ByteArrayInputStream(byte[] buffer) {
+ super(buffer);
}
- /**
- * Creates ByteArrayInputStream
that uses buf
as its buffer array. The initial value of pos
is offset
and the initial value of
- * count
is the minimum of offset+length
and buf.length
. The buffer array is not copied. The buffer's mark is set to the specified offset.
- *
- * @param buf the input buffer.
- * @param offset the offset in the buffer of the first byte to read.
- * @param length the maximum number of bytes to read from the buffer.
- */
- public ByteArrayInputStream(byte buf[], int offset, int length) {
- this.buf = buf;
- this.pos = offset;
- this.count = Math.min(offset + length, buf.length);
- this.mark = offset;
+ public ByteArrayInputStream(byte[] buffer, int offset, int length) {
+ super(buffer, offset, length);
}
-
- /**
- * Reads the next byte of data from this input stream. The value byte is returned as an int
in the range 0
to 255
. If no byte is available because the end of
- * the stream has been reached, the value -1
is returned.
- *
- * This read
method cannot block.
- *
- * @return the next byte of data, or -1
if the end of the stream has been reached.
- */
- @Override
- public int read() {
- return (pos < count) ? (buf[pos++] & 0xff) : -1;
- }
-
- /**
- * Reads up to len
bytes of data into an array of bytes from this input stream. If pos
equals count
, then -1
is returned to indicate end of
- * file. Otherwise, the number k
of bytes read is equal to the smaller of len
and count-pos
. If k
is positive, then bytes buf[pos]
- * through buf[pos+k-1]
are copied into b[off]
through b[off+k-1]
in the manner performed by System.arraycopy
. The value k
is added
- * into pos
and k
is returned.
- *
- * This read
method cannot block.
- *
- * @param b the buffer into which the data is read.
- * @param off the start offset in the destination array b
- * @param len the maximum number of bytes read.
- * @return the total number of bytes read into the buffer, or -1
if there is no more data because the end of the stream has been reached.
- * @exception NullPointerException If b
is null
.
- * @exception IndexOutOfBoundsException If off
is negative, len
is negative, or len
is greater than b.length - off
- */
- @Override
- public int read(byte b[], int off, int len) {
- if (b == null) {
- throw new NullPointerException();
- } else if (off < 0 || len < 0 || len > b.length - off) {
- throw new IndexOutOfBoundsException();
- }
-
- if (pos >= count) {
- return -1;
- }
-
- int avail = count - pos;
- if (len > avail) {
- len = avail;
- }
- if (len <= 0) {
- return 0;
- }
- System.arraycopy(buf, pos, b, off, len);
- pos += len;
- return len;
- }
-
- /**
- * Skips n
bytes of input from this input stream. Fewer bytes might be skipped if the end of the input stream is reached. The actual number k
of bytes to be skipped is
- * equal to the smaller of n
and count-pos
. The value k
is added into pos
and k
is returned.
- *
- * @param n the number of bytes to be skipped.
- * @return the actual number of bytes skipped.
- */
- @Override
- public long skip(long n) {
- long k = count - pos;
- if (n < k) {
- k = n < 0 ? 0 : n;
- }
-
- pos += k;
- return k;
- }
-
- /**
- * Returns the number of remaining bytes that can be read (or skipped over) from this input stream.
- *
- * The value returned is count - pos
, which is the number of bytes remaining to be read from the input buffer.
- *
- * @return the number of remaining bytes that can be read (or skipped over) from this input stream without blocking.
- */
- @Override
- public int available() {
- return count - pos;
- }
-
- /**
- * Tests if this InputStream
supports mark/reset. The markSupported
method of ByteArrayInputStream
always returns true
.
- *
- * @since JDK1.1
- */
- @Override
- public boolean markSupported() {
- return true;
- }
-
- /**
- * Set the current marked position in the stream. ByteArrayInputStream objects are marked at position zero by default when constructed. They may be marked at another position within the buffer by
- * this method.
- *
- * If no mark has been set, then the value of the mark is the offset passed to the constructor (or 0 if the offset was not supplied). - * - *
- * Note: The readAheadLimit
for this class has no meaning.
- *
- * @since JDK1.1
- */
- @Override
- public void mark(int readAheadLimit) {
- mark = pos;
- }
-
- /**
- * Resets the buffer to the marked position. The marked position is 0 unless another position was marked or an offset was specified in the constructor.
- */
- @Override
- public void reset() {
- pos = mark;
- }
-
- /**
- * Closing a ByteArrayInputStream has no effect. The methods in this class can be called after the stream has been closed without generating an IOException.
- *
- */ - @Override - public void close() { - } - } diff --git a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/ByteArrayOutputStream.java b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/ByteArrayOutputStream.java index aade1999b2..bdc61aba9a 100644 --- a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/ByteArrayOutputStream.java +++ b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/ByteArrayOutputStream.java @@ -16,205 +16,16 @@ */ package org.apache.nifi.stream.io; -import java.io.IOException; -import java.io.OutputStream; -import java.io.UnsupportedEncodingException; -import java.util.Arrays; - /** - * This class provides a more efficient implementation of the java.io.ByteArrayOutputStream. The efficiency is gained in two ways: - *
len
bytes from the specified byte array starting at offset off
to this byte array output stream.
- *
- * @param b the data.
- * @param off the start offset in the data.
- * @param len the number of bytes to write.
- */
- @Override
- public void write(byte b[], int off, int len) {
- if ((off < 0) || (off > b.length) || (len < 0)
- || ((off + len) - b.length > 0)) {
- throw new IndexOutOfBoundsException();
- }
- ensureCapacity(count + len);
- System.arraycopy(b, off, buf, count, len);
- count += len;
- }
-
- /**
- * Writes the complete contents of this byte array output stream to the specified output stream argument, as if by calling the output stream's write method using
- * out.write(buf, 0, count)
.
- *
- * @param out the output stream to which to write the data.
- * @exception IOException if an I/O error occurs.
- */
- public void writeTo(OutputStream out) throws IOException {
- out.write(buf, 0, count);
- }
-
- /**
- * Resets the count
field of this byte array output stream to zero, so that all currently accumulated output in the output stream is discarded. The output stream can be used again,
- * reusing the already allocated buffer space.
- *
- * @see java.io.ByteArrayInputStream#count
- */
- public void reset() {
- count = 0;
- }
-
- /**
- * Creates a newly allocated byte array. Its size is the current size of this output stream and the valid contents of the buffer have been copied into it.
- *
- * @return the current contents of this output stream, as a byte array.
- * @see java.io.ByteArrayOutputStream#size()
- */
- public byte[] toByteArray() {
- return Arrays.copyOf(buf, count);
- }
-
- /**
- * Returns the current size of the buffer.
- *
- * @return the value of the count
field, which is the number of valid bytes in this output stream.
- * @see java.io.ByteArrayOutputStream#count
- */
- public int size() {
- return count;
- }
-
- /**
- * Converts the buffer's contents into a string decoding bytes using the platform's default character set. The length of the new String
- * is a function of the character set, and hence may not be equal to the size of the buffer.
- *
- * - * This method always replaces malformed-input and unmappable-character sequences with the default replacement string for the platform's default character set. The - * {@linkplain java.nio.charset.CharsetDecoder} class should be used when more control over the decoding process is required. - * - * @return String decoded from the buffer's contents. - * @since JDK1.1 - */ - @Override - public String toString() { - return new String(buf, 0, count); - } - - /** - * Converts the buffer's contents into a string by decoding the bytes using the specified {@link java.nio.charset.Charset charsetName}. The length of the new String is a function of the - * charset, and hence may not be equal to the length of the byte array. - * - *
- * This method always replaces malformed-input and unmappable-character sequences with this charset's default replacement string. The {@link
- * java.nio.charset.CharsetDecoder} class should be used when more control over the decoding process is required.
- *
- * @param charsetName the name of a supported {@linkplain java.nio.charset.Charset charset
}
- * @return String decoded from the buffer's contents.
- * @exception UnsupportedEncodingException If the named charset is not supported
- * @since JDK1.1
- */
- public String toString(String charsetName) throws UnsupportedEncodingException {
- return new String(buf, 0, count, charsetName);
- }
-
- /**
- * Closing a ByteArrayOutputStream has no effect. The methods in this class can be called after the stream has been closed without generating an IOException.
- *
- *
- */
- @Override
- public void close() {
- }
-
- public byte[] getUnderlyingBuffer() {
- return buf;
- }
-
- public int getBufferLength() {
- return count;
+ public ByteArrayOutputStream(int initialBufferSize) {
+ super(initialBufferSize);
}
}
diff --git a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/DataOutputStream.java b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/DataOutputStream.java
index e2059965a3..02b8f1ebcf 100644
--- a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/DataOutputStream.java
+++ b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/DataOutputStream.java
@@ -16,355 +16,15 @@
*/
package org.apache.nifi.stream.io;
-import java.io.DataOutput;
-import java.io.FilterOutputStream;
-import java.io.IOException;
import java.io.OutputStream;
-import java.io.UTFDataFormatException;
/**
- * This class is different from java.io.DataOutputStream in that it does synchronize on its methods.
+ * @deprecated use java.io.DataOutputStream instead
*/
-public class DataOutputStream extends FilterOutputStream implements DataOutput {
+@Deprecated
+public class DataOutputStream extends java.io.DataOutputStream {
- /**
- * The number of bytes written to the data output stream so far. If this counter overflows, it will be wrapped to Integer.MAX_VALUE.
- */
- protected int written;
-
- /**
- * bytearr is initialized on demand by writeUTF
- */
- private byte[] bytearr = null;
-
- /**
- * Creates a new data output stream to write data to the specified underlying output stream. The counter written
is set to zero.
- *
- * @param out the underlying output stream, to be saved for later use.
- * @see java.io.FilterOutputStream#out
- */
public DataOutputStream(OutputStream out) {
super(out);
}
-
- /**
- * Increases the written counter by the specified value until it reaches Integer.MAX_VALUE.
- */
- private void incCount(int value) {
- int temp = written + value;
- if (temp < 0) {
- temp = Integer.MAX_VALUE;
- }
- written = temp;
- }
-
- /**
- * Writes the specified byte (the low eight bits of the argument b
) to the underlying output stream. If no exception is thrown, the counter written
is incremented by
- * 1
.
- *
- * Implements the write
method of OutputStream
.
- *
- * @param b the byte
to be written.
- * @exception IOException if an I/O error occurs.
- * @see java.io.FilterOutputStream#out
- */
- @Override
- public void write(int b) throws IOException {
- out.write(b);
- incCount(1);
- }
-
- /**
- * Writes len
bytes from the specified byte array starting at offset off
to the underlying output stream. If no exception is thrown, the counter written
is
- * incremented by len
.
- *
- * @param b the data.
- * @param off the start offset in the data.
- * @param len the number of bytes to write.
- * @exception IOException if an I/O error occurs.
- * @see java.io.FilterOutputStream#out
- */
- @Override
- public void write(byte b[], int off, int len) throws IOException {
- out.write(b, off, len);
- incCount(len);
- }
-
- /**
- * Flushes this data output stream. This forces any buffered output bytes to be written out to the stream.
- *
- * The flush
method of DataOutputStream
calls the flush
method of its underlying output stream.
- *
- * @exception IOException if an I/O error occurs.
- * @see java.io.FilterOutputStream#out
- * @see java.io.OutputStream#flush()
- */
- @Override
- public void flush() throws IOException {
- out.flush();
- }
-
- /**
- * Writes a boolean
to the underlying output stream as a 1-byte value. The value true
is written out as the value (byte)1
; the value false
is
- * written out as the value (byte)0
. If no exception is thrown, the counter written
is incremented by 1
.
- *
- * @param v a boolean
value to be written.
- * @exception IOException if an I/O error occurs.
- * @see java.io.FilterOutputStream#out
- */
- @Override
- public final void writeBoolean(boolean v) throws IOException {
- out.write(v ? 1 : 0);
- incCount(1);
- }
-
- /**
- * Writes out a byte
to the underlying output stream as a 1-byte value. If no exception is thrown, the counter written
is incremented by 1
.
- *
- * @param v a byte
value to be written.
- * @exception IOException if an I/O error occurs.
- * @see java.io.FilterOutputStream#out
- */
- @Override
- public final void writeByte(int v) throws IOException {
- out.write(v);
- incCount(1);
- }
-
- /**
- * Writes a short
to the underlying output stream as two bytes, high byte first. If no exception is thrown, the counter written
is incremented by 2
.
- *
- * @param v a short
to be written.
- * @exception IOException if an I/O error occurs.
- * @see java.io.FilterOutputStream#out
- */
- @Override
- public final void writeShort(int v) throws IOException {
- out.write((v >>> 8) & 0xFF);
- out.write((v) & 0xFF);
- incCount(2);
- }
-
- /**
- * Writes a char
to the underlying output stream as a 2-byte value, high byte first. If no exception is thrown, the counter written
is incremented by 2
.
- *
- * @param v a char
value to be written.
- * @exception IOException if an I/O error occurs.
- * @see java.io.FilterOutputStream#out
- */
- @Override
- public final void writeChar(int v) throws IOException {
- out.write((v >>> 8) & 0xFF);
- out.write((v) & 0xFF);
- incCount(2);
- }
-
- /**
- * Writes an int
to the underlying output stream as four bytes, high byte first. If no exception is thrown, the counter written
is incremented by 4
.
- *
- * @param v an int
to be written.
- * @exception IOException if an I/O error occurs.
- * @see java.io.FilterOutputStream#out
- */
- @Override
- public final void writeInt(int v) throws IOException {
- out.write((v >>> 24) & 0xFF);
- out.write((v >>> 16) & 0xFF);
- out.write((v >>> 8) & 0xFF);
- out.write((v) & 0xFF);
- incCount(4);
- }
-
- private final byte writeBuffer[] = new byte[8];
-
- /**
- * Writes a long
to the underlying output stream as eight bytes, high byte first. In no exception is thrown, the counter written
is incremented by 8
.
- *
- * @param v a long
to be written.
- * @exception IOException if an I/O error occurs.
- * @see java.io.FilterOutputStream#out
- */
- @Override
- public final void writeLong(long v) throws IOException {
- writeBuffer[0] = (byte) (v >>> 56);
- writeBuffer[1] = (byte) (v >>> 48);
- writeBuffer[2] = (byte) (v >>> 40);
- writeBuffer[3] = (byte) (v >>> 32);
- writeBuffer[4] = (byte) (v >>> 24);
- writeBuffer[5] = (byte) (v >>> 16);
- writeBuffer[6] = (byte) (v >>> 8);
- writeBuffer[7] = (byte) (v);
- out.write(writeBuffer, 0, 8);
- incCount(8);
- }
-
- /**
- * Converts the float argument to an int
using the floatToIntBits
method in class Float
, and then writes that int
value to the underlying output
- * stream as a 4-byte quantity, high byte first. If no exception is thrown, the counter written
is incremented by 4
.
- *
- * @param v a float
value to be written.
- * @exception IOException if an I/O error occurs.
- * @see java.io.FilterOutputStream#out
- * @see java.lang.Float#floatToIntBits(float)
- */
- @Override
- public final void writeFloat(float v) throws IOException {
- writeInt(Float.floatToIntBits(v));
- }
-
- /**
- * Converts the double argument to a long
using the doubleToLongBits
method in class Double
, and then writes that long
value to the underlying
- * output stream as an 8-byte quantity, high byte first. If no exception is thrown, the counter written
is incremented by 8
.
- *
- * @param v a double
value to be written.
- * @exception IOException if an I/O error occurs.
- * @see java.io.FilterOutputStream#out
- * @see java.lang.Double#doubleToLongBits(double)
- */
- @Override
- public final void writeDouble(double v) throws IOException {
- writeLong(Double.doubleToLongBits(v));
- }
-
- /**
- * Writes out the string to the underlying output stream as a sequence of bytes. Each character in the string is written out, in sequence, by discarding its high eight bits. If no exception is
- * thrown, the counter written
is incremented by the length of s
.
- *
- * @param s a string of bytes to be written.
- * @exception IOException if an I/O error occurs.
- * @see java.io.FilterOutputStream#out
- */
- @Override
- public final void writeBytes(String s) throws IOException {
- int len = s.length();
- for (int i = 0; i < len; i++) {
- out.write((byte) s.charAt(i));
- }
- incCount(len);
- }
-
- /**
- * Writes a string to the underlying output stream as a sequence of characters. Each character is written to the data output stream as if by the writeChar
method. If no exception is
- * thrown, the counter written
is incremented by twice the length of s
.
- *
- * @param s a String
value to be written.
- * @exception IOException if an I/O error occurs.
- * @see java.io.DataOutputStream#writeChar(int)
- * @see java.io.FilterOutputStream#out
- */
- @Override
- public final void writeChars(String s) throws IOException {
- int len = s.length();
- for (int i = 0; i < len; i++) {
- int v = s.charAt(i);
- out.write((v >>> 8) & 0xFF);
- out.write((v) & 0xFF);
- }
- incCount(len * 2);
- }
-
- /**
- * Writes a string to the underlying output stream using
- * modified UTF-8
- * encoding in a machine-independent manner.
- *
- * First, two bytes are written to the output stream as if by the writeShort
method giving the number of bytes to follow. This value is the number of bytes actually written out, not
- * the length of the string. Following the length, each character of the string is output, in sequence, using the modified UTF-8 encoding for the character. If no exception is thrown, the counter
- * written
is incremented by the total number of bytes written to the output stream. This will be at least two plus the length of str
, and at most two plus thrice the
- * length of str
.
- *
- * @param str a string to be written.
- * @exception IOException if an I/O error occurs.
- */
- @Override
- public final void writeUTF(String str) throws IOException {
- writeUTF(str, this);
- }
-
- /**
- * Writes a string to the specified DataOutput using
- * modified UTF-8
- * encoding in a machine-independent manner.
- *
- * First, two bytes are written to out as if by the writeShort
method giving the number of bytes to follow. This value is the number of bytes actually written out, not the length of
- * the string. Following the length, each character of the string is output, in sequence, using the modified UTF-8 encoding for the character. If no exception is thrown, the counter
- * written
is incremented by the total number of bytes written to the output stream. This will be at least two plus the length of str
, and at most two plus thrice the
- * length of str
.
- *
- * @param str a string to be written.
- * @param out destination to write to
- * @return The number of bytes written out.
- * @exception IOException if an I/O error occurs.
- */
- static int writeUTF(String str, DataOutput out) throws IOException {
- int strlen = str.length();
- int utflen = 0;
- int c, count = 0;
-
- /* use charAt instead of copying String to char array */
- for (int i = 0; i < strlen; i++) {
- c = str.charAt(i);
- if ((c >= 0x0001) && (c <= 0x007F)) {
- utflen++;
- } else if (c > 0x07FF) {
- utflen += 3;
- } else {
- utflen += 2;
- }
- }
-
- if (utflen > 65535) {
- throw new UTFDataFormatException("encoded string too long: " + utflen + " bytes");
- }
-
- byte[] bytearr = null;
- if (out instanceof DataOutputStream) {
- DataOutputStream dos = (DataOutputStream) out;
- if (dos.bytearr == null || (dos.bytearr.length < (utflen + 2))) {
- dos.bytearr = new byte[(utflen * 2) + 2];
- }
- bytearr = dos.bytearr;
- } else {
- bytearr = new byte[utflen + 2];
- }
-
- bytearr[count++] = (byte) ((utflen >>> 8) & 0xFF);
- bytearr[count++] = (byte) ((utflen) & 0xFF);
-
- int i = 0;
- for (i = 0; i < strlen; i++) {
- c = str.charAt(i);
- if (!((c >= 0x0001) && (c <= 0x007F))) {
- break;
- }
- bytearr[count++] = (byte) c;
- }
-
- for (; i < strlen; i++) {
- c = str.charAt(i);
- if ((c >= 0x0001) && (c <= 0x007F)) {
- bytearr[count++] = (byte) c;
-
- } else if (c > 0x07FF) {
- bytearr[count++] = (byte) (0xE0 | ((c >> 12) & 0x0F));
- bytearr[count++] = (byte) (0x80 | ((c >> 6) & 0x3F));
- bytearr[count++] = (byte) (0x80 | ((c) & 0x3F));
- } else {
- bytearr[count++] = (byte) (0xC0 | ((c >> 6) & 0x1F));
- bytearr[count++] = (byte) (0x80 | ((c) & 0x3F));
- }
- }
- out.write(bytearr, 0, utflen + 2);
- return utflen + 2;
- }
-
- /**
- * Returns the current value of the counter written
, the number of bytes written to this data output stream so far. If the counter overflows, it will be wrapped to Integer.MAX_VALUE.
- *
- * @return the value of the written
field.
- * @see java.io.DataOutputStream#written
- */
- public final int size() {
- return written;
- }
}
diff --git a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/LeakyBucketStreamThrottler.java b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/LeakyBucketStreamThrottler.java
index 5153db5aff..0bee0d72c1 100644
--- a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/LeakyBucketStreamThrottler.java
+++ b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/LeakyBucketStreamThrottler.java
@@ -95,11 +95,11 @@ public class LeakyBucketStreamThrottler implements StreamThrottler {
public int read() throws IOException {
final ByteArrayOutputStream baos = new ByteArrayOutputStream(1);
LeakyBucketStreamThrottler.this.copy(toWrap, baos, 1L);
- if (baos.getBufferLength() < 1) {
+ if (baos.size() < 1) {
return -1;
}
- return baos.getUnderlyingBuffer()[0] & 0xFF;
+ return baos.toByteArray()[0] & 0xFF;
}
@Override
@@ -124,7 +124,7 @@ public class LeakyBucketStreamThrottler implements StreamThrottler {
if (copied == 0) {
return -1;
}
- System.arraycopy(baos.getUnderlyingBuffer(), 0, b, off, copied);
+ System.arraycopy(baos.toByteArray(), 0, b, off, copied);
return copied;
}
diff --git a/nifi-commons/nifi-utils/src/test/java/org/apache/nifi/stream/io/TestLeakyBucketThrottler.java b/nifi-commons/nifi-utils/src/test/java/org/apache/nifi/stream/io/TestLeakyBucketThrottler.java
index e46577a614..1e498b7459 100644
--- a/nifi-commons/nifi-utils/src/test/java/org/apache/nifi/stream/io/TestLeakyBucketThrottler.java
+++ b/nifi-commons/nifi-utils/src/test/java/org/apache/nifi/stream/io/TestLeakyBucketThrottler.java
@@ -107,8 +107,8 @@ public class TestLeakyBucketThrottler {
assertTrue(elapsed < 7000);
// ensure bytes were copied out appropriately
- assertEquals(3 * (2 * 1024 * 1024 + 1), baos.getBufferLength());
- assertEquals((byte) 'A', baos.getUnderlyingBuffer()[baos.getBufferLength() - 1]);
+ assertEquals(3 * (2 * 1024 * 1024 + 1), baos.size());
+ assertEquals((byte) 'A', baos.toByteArray()[baos.size() - 1]);
}
}
diff --git a/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/java/org/apache/nifi/processors/splunk/PutSplunk.java b/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/java/org/apache/nifi/processors/splunk/PutSplunk.java
index 9e4aa125b4..0a09243621 100644
--- a/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/java/org/apache/nifi/processors/splunk/PutSplunk.java
+++ b/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/java/org/apache/nifi/processors/splunk/PutSplunk.java
@@ -184,11 +184,12 @@ public class PutSplunk extends AbstractPutEventProcessor {
// if TCP and we don't end in a new line then add one
final String protocol = context.getProperty(PROTOCOL).getValue();
- if (protocol.equals(TCP_VALUE.getValue())) {
- final byte[] buf = baos.getUnderlyingBuffer();
- if (buf[baos.size() - 1] != NEW_LINE_CHAR) {
- baos.write(NEW_LINE_CHAR);
- }
+ byte[] buf = baos.toByteArray();
+ if (protocol.equals(TCP_VALUE.getValue()) && buf[buf.length - 1] != NEW_LINE_CHAR) {
+ final byte[] updatedBuf = new byte[buf.length + 1];
+ System.arraycopy(buf, 0, updatedBuf, 0, buf.length);
+ updatedBuf[updatedBuf.length - 1] = NEW_LINE_CHAR;
+ buf = updatedBuf;
}
// create a message batch of one message and add to active batches
@@ -198,7 +199,7 @@ public class PutSplunk extends AbstractPutEventProcessor {
// attempt to send the data and add the appropriate range
try {
- sender.send(baos.toByteArray());
+ sender.send(buf);
messageBatch.addSuccessfulRange(0L, flowFile.getSize());
} catch (IOException e) {
messageBatch.addFailedRange(0L, flowFile.getSize(), e);
@@ -281,6 +282,9 @@ public class PutSplunk extends AbstractPutEventProcessor {
});
messageBatch.setNumMessages(messagesSent.get());
+ } catch (final IOException ioe) {
+ // Since this can be thrown only from closing the ByteArrayOutputStream(), we have already
+ // completed everything that we need to do, so there's nothing really to be done here
}
}
@@ -299,7 +303,7 @@ public class PutSplunk extends AbstractPutEventProcessor {
return null;
}
- final byte[] buf = baos.getUnderlyingBuffer();
+ final byte[] buf = baos.toByteArray();
// if TCP and we don't already end with a new line then add one
if (protocol.equals(TCP_VALUE.getValue()) && buf[length - 1] != NEW_LINE_CHAR) {
@@ -311,7 +315,7 @@ public class PutSplunk extends AbstractPutEventProcessor {
message[message.length - 1] = NEW_LINE_CHAR;
return message;
} else {
- return Arrays.copyOfRange(baos.getUnderlyingBuffer(), 0, length);
+ return Arrays.copyOfRange(baos.toByteArray(), 0, length);
}
}
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TailFile.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TailFile.java
index c5fcefbc2a..88b83b413e 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TailFile.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TailFile.java
@@ -810,11 +810,12 @@ public class TailFile extends AbstractProcessor {
byte ch = buffer.get(i);
switch (ch) {
- case '\n':
+ case '\n': {
baos.write(ch);
seenCR = false;
baos.writeTo(out);
- checksum.update(baos.getUnderlyingBuffer(), 0, baos.size());
+ final byte[] baosBuffer = baos.toByteArray();
+ checksum.update(baosBuffer, 0, baos.size());
if (getLogger().isTraceEnabled()) {
getLogger().trace("Checksum updated to {}", new Object[]{checksum.getValue()});
}
@@ -823,15 +824,18 @@ public class TailFile extends AbstractProcessor {
rePos = pos + i + 1;
linesRead++;
break;
- case '\r':
+ }
+ case '\r': {
baos.write(ch);
seenCR = true;
break;
- default:
+ }
+ default: {
if (seenCR) {
seenCR = false;
baos.writeTo(out);
- checksum.update(baos.getUnderlyingBuffer(), 0, baos.size());
+ final byte[] baosBuffer = baos.toByteArray();
+ checksum.update(baosBuffer, 0, baos.size());
if (getLogger().isTraceEnabled()) {
getLogger().trace("Checksum updated to {}", new Object[]{checksum.getValue()});
}
@@ -843,6 +847,7 @@ public class TailFile extends AbstractProcessor {
} else {
baos.write(ch);
}
+ }
}
}
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JmsFactory.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JmsFactory.java
index 45758a4a35..19e8ac07ae 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JmsFactory.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JmsFactory.java
@@ -185,7 +185,11 @@ public class JmsFactory {
baos.write(byteBuffer, 0, byteCount);
}
- baos.close();
+ try {
+ baos.close();
+ } catch (final IOException ioe) {
+ }
+
return baos.toByteArray();
}