From 8f8b8cdf464642e46e3cdda623c80047d079b921 Mon Sep 17 00:00:00 2001 From: Mark Payne Date: Sun, 4 Dec 2016 11:43:41 -0500 Subject: [PATCH] NIFI-3071: Deprecated InputStreams & OutputStream sin org.apache.nifi.stream.io package in favor of using their Java counterparts --- .../nifi/stream/io/BufferedInputStream.java | 438 +----------------- .../nifi/stream/io/BufferedOutputStream.java | 105 +---- .../nifi/stream/io/ByteArrayInputStream.java | 187 +------- .../nifi/stream/io/ByteArrayOutputStream.java | 201 +------- .../nifi/stream/io/DataOutputStream.java | 346 +------------- .../stream/io/LeakyBucketStreamThrottler.java | 6 +- .../stream/io/TestLeakyBucketThrottler.java | 4 +- .../nifi/processors/splunk/PutSplunk.java | 20 +- .../nifi/processors/standard/TailFile.java | 15 +- .../processors/standard/util/JmsFactory.java | 6 +- 10 files changed, 59 insertions(+), 1269 deletions(-) diff --git a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/BufferedInputStream.java b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/BufferedInputStream.java index 324f59f72f..8dba8999e1 100644 --- a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/BufferedInputStream.java +++ b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/BufferedInputStream.java @@ -16,445 +16,19 @@ */ package org.apache.nifi.stream.io; -import java.io.IOException; import java.io.InputStream; /** - * This class is a slight modification of the BufferedInputStream in the java.io package. The modification is that this implementation does not provide synchronization on method calls, which means - * that this class is not suitable for use by multiple threads. However, the absence of these synchronized blocks results in potentially much better performance. + * @deprecated use java.io.BufferedInputStream instead */ -public class BufferedInputStream extends InputStream { +@Deprecated +public class BufferedInputStream extends java.io.BufferedInputStream { - private final InputStream in; - - private static int DEFAULT_BUFFER_SIZE = 8192; - - /** - * The maximum size of array to allocate. - * Some VMs reserve some header words in an array. - * Attempts to allocate larger arrays may result in - * OutOfMemoryError: Requested array size exceeds VM limit - */ - private static int MAX_BUFFER_SIZE = Integer.MAX_VALUE - 8; - - /** - * The internal buffer array where the data is stored. When necessary, - * it may be replaced by another array of - * a different size. - */ - protected byte buf[]; - - /** - * The index one greater than the index of the last valid byte in - * the buffer. - * This value is always - * in the range 0 through buf.length; - * elements buf[0] through buf[count-1] - * contain buffered input data obtained - * from the underlying input stream. - */ - private int count; - - /** - * The current position in the buffer. This is the index of the next - * character to be read from the buf array. - *

- * This value is always in the range 0 - * through count. If it is less - * than count, then buf[pos] - * is the next byte to be supplied as input; - * if it is equal to count, then - * the next read or skip - * operation will require more bytes to be - * read from the contained input stream. - * - * @see java.io.BufferedInputStream#buf - */ - private int pos; - - /** - * The value of the pos field at the time the last - * mark method was called. - *

- * This value is always - * in the range -1 through pos. - * If there is no marked position in the input - * stream, this field is -1. If - * there is a marked position in the input - * stream, then buf[markpos] - * is the first byte to be supplied as input - * after a reset operation. If - * markpos is not -1, - * then all bytes from positions buf[markpos] - * through buf[pos-1] must remain - * in the buffer array (though they may be - * moved to another place in the buffer array, - * with suitable adjustments to the values - * of count, pos, - * and markpos); they may not - * be discarded unless and until the difference - * between pos and markpos - * exceeds marklimit. - * - * @see java.io.BufferedInputStream#mark(int) - * @see java.io.BufferedInputStream#pos - */ - protected int markpos = -1; - - /** - * The maximum read ahead allowed after a call to the - * mark method before subsequent calls to the - * reset method fail. - * Whenever the difference between pos - * and markpos exceeds marklimit, - * then the mark may be dropped by setting - * markpos to -1. - * - * @see java.io.BufferedInputStream#mark(int) - * @see java.io.BufferedInputStream#reset() - */ - protected int marklimit; - - /** - * Check to make sure that underlying input stream has not been - * nulled out due to close; if not return it; - */ - private InputStream getInIfOpen() throws IOException { - InputStream input = in; - if (input == null) { - throw new IOException("Stream closed"); - } - return input; - } - - /** - * Check to make sure that buffer has not been nulled out due to - * close; if not return it; - */ - private byte[] getBufIfOpen() throws IOException { - if (buf == null) { - throw new IOException("Stream closed"); - } - return buf; - } - - /** - * Creates a BufferedInputStream - * and saves its argument, the input stream - * in, for later use. An internal - * buffer array is created and stored in buf. - * - * @param in the underlying input stream. - */ public BufferedInputStream(InputStream in) { - this(in, DEFAULT_BUFFER_SIZE); + super(in); } - /** - * Creates a BufferedInputStream - * with the specified buffer size, - * and saves its argument, the input stream - * in, for later use. An internal - * buffer array of length size - * is created and stored in buf. - * - * @param in the underlying input stream. - * @param size the buffer size. - * @exception IllegalArgumentException if {@code size <= 0}. - */ - public BufferedInputStream(InputStream in, int size) { - this.in = in; - if (size <= 0) { - throw new IllegalArgumentException("Buffer size <= 0"); - } - buf = new byte[size]; - } - - /** - * Fills the buffer with more data, taking into account - * shuffling and other tricks for dealing with marks. - * Assumes that it is being called by a synchronized method. - * This method also assumes that all data has already been read in, - * hence pos > count. - */ - private void fill() throws IOException { - byte[] buffer = getBufIfOpen(); - if (markpos < 0) { - pos = 0; /* no mark: throw away the buffer */ - } else if (pos >= buffer.length) { - if (markpos > 0) { /* can throw away early part of the buffer */ - int sz = pos - markpos; - System.arraycopy(buffer, markpos, buffer, 0, sz); - pos = sz; - markpos = 0; - } else if (buffer.length >= marklimit) { - markpos = -1; /* buffer got too big, invalidate mark */ - pos = 0; /* drop buffer contents */ - } else if (buffer.length >= MAX_BUFFER_SIZE) { - throw new OutOfMemoryError("Required array size too large"); - } else { /* grow buffer */ - int nsz = (pos <= MAX_BUFFER_SIZE - pos) ? pos * 2 : MAX_BUFFER_SIZE; - if (nsz > marklimit) { - nsz = marklimit; - } - byte nbuf[] = new byte[nsz]; - System.arraycopy(buffer, 0, nbuf, 0, pos); - buffer = nbuf; - } - } - count = pos; - int n = getInIfOpen().read(buffer, pos, buffer.length - pos); - if (n > 0) { - count = n + pos; - } - } - - /** - * See - * the general contract of the read - * method of InputStream. - * - * @return the next byte of data, or -1 if the end of the - * stream is reached. - * @exception IOException if this input stream has been closed by - * invoking its {@link #close()} method, - * or an I/O error occurs. - * @see java.io.FilterInputStream#in - */ - @Override - public int read() throws IOException { - if (pos >= count) { - fill(); - if (pos >= count) { - return -1; - } - } - return getBufIfOpen()[pos++] & 0xff; - } - - /** - * Read characters into a portion of an array, reading from the underlying - * stream at most once if necessary. - */ - private int read1(byte[] b, int off, int len) throws IOException { - int avail = count - pos; - if (avail <= 0) { - /* - * If the requested length is at least as large as the buffer, and - * if there is no mark/reset activity, do not bother to copy the - * bytes into the local buffer. In this way buffered streams will - * cascade harmlessly. - */ - if (len >= getBufIfOpen().length && markpos < 0) { - return getInIfOpen().read(b, off, len); - } - fill(); - avail = count - pos; - if (avail <= 0) { - return -1; - } - } - int cnt = (avail < len) ? avail : len; - System.arraycopy(getBufIfOpen(), pos, b, off, cnt); - pos += cnt; - return cnt; - } - - /** - * Reads bytes from this byte-input stream into the specified byte array, - * starting at the given offset. - * - *

- * This method implements the general contract of the corresponding - * {@link InputStream#read(byte[], int, int) read} method of - * the {@link InputStream} class. As an additional - * convenience, it attempts to read as many bytes as possible by repeatedly - * invoking the read method of the underlying stream. This - * iterated read continues until one of the following - * conditions becomes true: - *

- * If the first read on the underlying stream returns - * -1 to indicate end-of-file then this method returns - * -1. Otherwise this method returns the number of bytes - * actually read. - * - *

- * Subclasses of this class are encouraged, but not required, to - * attempt to read as many bytes as possible in the same fashion. - * - * @param b destination buffer. - * @param off offset at which to start storing bytes. - * @param len maximum number of bytes to read. - * @return the number of bytes read, or -1 if the end of - * the stream has been reached. - * @exception IOException if this input stream has been closed by - * invoking its {@link #close()} method, - * or an I/O error occurs. - */ - @Override - public int read(byte b[], int off, int len) - throws IOException { - getBufIfOpen(); // Check for closed stream - if ((off | len | (off + len) | (b.length - (off + len))) < 0) { - throw new IndexOutOfBoundsException(); - } else if (len == 0) { - return 0; - } - - int n = 0; - for (;;) { - int nread = read1(b, off + n, len - n); - if (nread <= 0) { - return (n == 0) ? nread : n; - } - n += nread; - if (n >= len) { - return n; - } - // if not closed but no bytes available, return - InputStream input = in; - if (input != null && input.available() <= 0) { - return n; - } - } - } - - /** - * See the general contract of the skip - * method of InputStream. - * - * @exception IOException if the stream does not support seek, - * or if this input stream has been closed by - * invoking its {@link #close()} method, or an - * I/O error occurs. - */ - @Override - public long skip(long n) throws IOException { - getBufIfOpen(); // Check for closed stream - if (n <= 0) { - return 0; - } - long avail = count - pos; - - if (avail <= 0) { - // If no mark position set then don't keep in buffer - if (markpos < 0) { - return getInIfOpen().skip(n); - } - - // Fill in buffer to save bytes for reset - fill(); - avail = count - pos; - if (avail <= 0) { - return 0; - } - } - - long skipped = (avail < n) ? avail : n; - pos += skipped; - return skipped; - } - - /** - * Returns an estimate of the number of bytes that can be read (or - * skipped over) from this input stream without blocking by the next - * invocation of a method for this input stream. The next invocation might be - * the same thread or another thread. A single read or skip of this - * many bytes will not block, but may read or skip fewer bytes. - *

- * This method returns the sum of the number of bytes remaining to be read in - * the buffer (count - pos) and the result of calling the - * {@link java.io.FilterInputStream#in in}.available(). - * - * @return an estimate of the number of bytes that can be read (or skipped - * over) from this input stream without blocking. - * @exception IOException if this input stream has been closed by - * invoking its {@link #close()} method, - * or an I/O error occurs. - */ - @Override - public int available() throws IOException { - int n = count - pos; - int avail = getInIfOpen().available(); - return n > (Integer.MAX_VALUE - avail) ? Integer.MAX_VALUE : n + avail; - } - - /** - * See the general contract of the mark - * method of InputStream. - * - * @param readlimit the maximum limit of bytes that can be read before - * the mark position becomes invalid. - * @see java.io.BufferedInputStream#reset() - */ - @Override - public void mark(int readlimit) { - marklimit = readlimit; - markpos = pos; - } - - /** - * See the general contract of the reset - * method of InputStream. - *

- * If markpos is -1 - * (no mark has been set or the mark has been - * invalidated), an IOException - * is thrown. Otherwise, pos is - * set equal to markpos. - * - * @exception IOException if this stream has not been marked or, - * if the mark has been invalidated, or the stream - * has been closed by invoking its {@link #close()} - * method, or an I/O error occurs. - * @see java.io.BufferedInputStream#mark(int) - */ - @Override - public void reset() throws IOException { - getBufIfOpen(); // Cause exception if closed - if (markpos < 0) { - throw new IOException("Resetting to invalid mark"); - } - pos = markpos; - } - - /** - * Tests if this input stream supports the mark - * and reset methods. The markSupported - * method of BufferedInputStream returns - * true. - * - * @return a boolean indicating if this stream type supports - * the mark and reset methods. - * @see java.io.InputStream#mark(int) - * @see java.io.InputStream#reset() - */ - @Override - public boolean markSupported() { - return true; - } - - /** - * Closes this input stream and releases any system resources - * associated with the stream. - * Once the stream has been closed, further read(), available(), reset(), - * or skip() invocations will throw an IOException. - * Closing a previously closed stream has no effect. - * - * @exception IOException if an I/O error occurs. - */ - @Override - public void close() throws IOException { - this.in.close(); + public BufferedInputStream(InputStream in, int bufferSize) { + super(in, bufferSize); } } diff --git a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/BufferedOutputStream.java b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/BufferedOutputStream.java index dc56927175..8c28eca682 100644 --- a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/BufferedOutputStream.java +++ b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/BufferedOutputStream.java @@ -16,114 +16,19 @@ */ package org.apache.nifi.stream.io; -import java.io.FilterOutputStream; -import java.io.IOException; import java.io.OutputStream; /** - * This class is a slight modification of the {@link java.io.BufferedOutputStream} class. This implementation differs in that it does not mark methods as synchronized. This means that this class is - * not suitable for writing by multiple concurrent threads. However, the removal of the synchronized keyword results in potentially much better performance. + * @deprecated use java.io.BufferedOutputStream instead */ -public class BufferedOutputStream extends FilterOutputStream { +@Deprecated +public class BufferedOutputStream extends java.io.BufferedOutputStream { - /** - * The internal buffer where data is stored. - */ - protected byte buf[]; - - /** - * The number of valid bytes in the buffer. This value is always in the range 0 through buf.length; elements - * buf[0] through buf[count-1] contain valid byte data. - */ - protected int count; - - /** - * Creates a new buffered output stream to write data to the specified underlying output stream. - * - * @param out the underlying output stream. - */ public BufferedOutputStream(OutputStream out) { - this(out, 8192); - } - - /** - * Creates a new buffered output stream to write data to the specified underlying output stream with the specified buffer size. - * - * @param out the underlying output stream. - * @param size the buffer size. - * @exception IllegalArgumentException if size <= 0. - */ - public BufferedOutputStream(OutputStream out, int size) { super(out); - if (size <= 0) { - throw new IllegalArgumentException("Buffer size <= 0"); - } - buf = new byte[size]; } - /** - * Flush the internal buffer - */ - private void flushBuffer() throws IOException { - if (count > 0) { - out.write(buf, 0, count); - count = 0; - } - } - - /** - * Writes the specified byte to this buffered output stream. - * - * @param b the byte to be written. - * @exception IOException if an I/O error occurs. - */ - @Override - public void write(int b) throws IOException { - if (count >= buf.length) { - flushBuffer(); - } - buf[count++] = (byte) b; - } - - /** - * Writes len bytes from the specified byte array starting at offset off to this buffered output stream. - * - *

- * Ordinarily this method stores bytes from the given array into this stream's buffer, flushing the buffer to the underlying output stream as needed. If the requested length is at least as large - * as this stream's buffer, however, then this method will flush the buffer and write the bytes directly to the underlying output stream. Thus redundant BufferedOutputStreams will not - * copy data unnecessarily. - * - * @param b the data. - * @param off the start offset in the data. - * @param len the number of bytes to write. - * @exception IOException if an I/O error occurs. - */ - @Override - public void write(byte b[], int off, int len) throws IOException { - if (len >= buf.length) { - /* If the request length exceeds the size of the output buffer, - flush the output buffer and then write the data directly. - In this way buffered streams will cascade harmlessly. */ - flushBuffer(); - out.write(b, off, len); - return; - } - if (len >= buf.length - count) { - flushBuffer(); - } - System.arraycopy(b, off, buf, count, len); - count += len; - } - - /** - * Flushes this buffered output stream. This forces any buffered output bytes to be written out to the underlying output stream. - * - * @exception IOException if an I/O error occurs. - * @see java.io.FilterOutputStream#out - */ - @Override - public void flush() throws IOException { - flushBuffer(); - out.flush(); + public BufferedOutputStream(OutputStream out, int bufferSize) { + super(out, bufferSize); } } diff --git a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/ByteArrayInputStream.java b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/ByteArrayInputStream.java index 85c8c4fdd7..fca5894819 100644 --- a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/ByteArrayInputStream.java +++ b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/ByteArrayInputStream.java @@ -16,190 +16,17 @@ */ package org.apache.nifi.stream.io; -import java.io.InputStream; - /** - * This class performs the same function as java.io.ByteArrayInputStream but does not mark its methods as synchronized + * @deprecated use java.io.ByteArrayInputStream instead */ -public class ByteArrayInputStream extends InputStream { +@Deprecated +public class ByteArrayInputStream extends java.io.ByteArrayInputStream { - /** - * An array of bytes that was provided by the creator of the stream. Elements buf[0] through buf[count-1] are the only bytes that can ever be read from the stream; - * element buf[pos] is the next byte to be read. - */ - protected byte buf[]; - - /** - * The index of the next character to read from the input stream buffer. This value should always be nonnegative and not larger than the value of count. The next byte to be read from - * the input stream buffer will be buf[pos]. - */ - protected int pos; - - /** - * The currently marked position in the stream. ByteArrayInputStream objects are marked at position zero by default when constructed. They may be marked at another position within the buffer by - * the mark() method. The current buffer position is set to this point by the reset() method. - *

- * If no mark has been set, then the value of mark is the offset passed to the constructor (or 0 if the offset was not supplied). - * - * @since JDK1.1 - */ - protected int mark = 0; - - /** - * The index one greater than the last valid character in the input stream buffer. This value should always be nonnegative and not larger than the length of buf. It is one greater - * than the position of the last byte within buf that can ever be read from the input stream buffer. - */ - protected int count; - - /** - * Creates a ByteArrayInputStream so that it uses buf as its buffer array. The buffer array is not copied. The initial value of pos is 0 and the - * initial value of count is the length of buf. - * - * @param buf the input buffer. - */ - public ByteArrayInputStream(byte buf[]) { - this.buf = buf; - this.pos = 0; - this.count = buf.length; + public ByteArrayInputStream(byte[] buffer) { + super(buffer); } - /** - * Creates ByteArrayInputStream that uses buf as its buffer array. The initial value of pos is offset and the initial value of - * count is the minimum of offset+length and buf.length. The buffer array is not copied. The buffer's mark is set to the specified offset. - * - * @param buf the input buffer. - * @param offset the offset in the buffer of the first byte to read. - * @param length the maximum number of bytes to read from the buffer. - */ - public ByteArrayInputStream(byte buf[], int offset, int length) { - this.buf = buf; - this.pos = offset; - this.count = Math.min(offset + length, buf.length); - this.mark = offset; + public ByteArrayInputStream(byte[] buffer, int offset, int length) { + super(buffer, offset, length); } - - /** - * Reads the next byte of data from this input stream. The value byte is returned as an int in the range 0 to 255. If no byte is available because the end of - * the stream has been reached, the value -1 is returned. - *

- * This read method cannot block. - * - * @return the next byte of data, or -1 if the end of the stream has been reached. - */ - @Override - public int read() { - return (pos < count) ? (buf[pos++] & 0xff) : -1; - } - - /** - * Reads up to len bytes of data into an array of bytes from this input stream. If pos equals count, then -1 is returned to indicate end of - * file. Otherwise, the number k of bytes read is equal to the smaller of len and count-pos. If k is positive, then bytes buf[pos] - * through buf[pos+k-1] are copied into b[off] through b[off+k-1] in the manner performed by System.arraycopy. The value k is added - * into pos and k is returned. - *

- * This read method cannot block. - * - * @param b the buffer into which the data is read. - * @param off the start offset in the destination array b - * @param len the maximum number of bytes read. - * @return the total number of bytes read into the buffer, or -1 if there is no more data because the end of the stream has been reached. - * @exception NullPointerException If b is null. - * @exception IndexOutOfBoundsException If off is negative, len is negative, or len is greater than b.length - off - */ - @Override - public int read(byte b[], int off, int len) { - if (b == null) { - throw new NullPointerException(); - } else if (off < 0 || len < 0 || len > b.length - off) { - throw new IndexOutOfBoundsException(); - } - - if (pos >= count) { - return -1; - } - - int avail = count - pos; - if (len > avail) { - len = avail; - } - if (len <= 0) { - return 0; - } - System.arraycopy(buf, pos, b, off, len); - pos += len; - return len; - } - - /** - * Skips n bytes of input from this input stream. Fewer bytes might be skipped if the end of the input stream is reached. The actual number k of bytes to be skipped is - * equal to the smaller of n and count-pos. The value k is added into pos and k is returned. - * - * @param n the number of bytes to be skipped. - * @return the actual number of bytes skipped. - */ - @Override - public long skip(long n) { - long k = count - pos; - if (n < k) { - k = n < 0 ? 0 : n; - } - - pos += k; - return k; - } - - /** - * Returns the number of remaining bytes that can be read (or skipped over) from this input stream. - *

- * The value returned is count - pos, which is the number of bytes remaining to be read from the input buffer. - * - * @return the number of remaining bytes that can be read (or skipped over) from this input stream without blocking. - */ - @Override - public int available() { - return count - pos; - } - - /** - * Tests if this InputStream supports mark/reset. The markSupported method of ByteArrayInputStream always returns true. - * - * @since JDK1.1 - */ - @Override - public boolean markSupported() { - return true; - } - - /** - * Set the current marked position in the stream. ByteArrayInputStream objects are marked at position zero by default when constructed. They may be marked at another position within the buffer by - * this method. - *

- * If no mark has been set, then the value of the mark is the offset passed to the constructor (or 0 if the offset was not supplied). - * - *

- * Note: The readAheadLimit for this class has no meaning. - * - * @since JDK1.1 - */ - @Override - public void mark(int readAheadLimit) { - mark = pos; - } - - /** - * Resets the buffer to the marked position. The marked position is 0 unless another position was marked or an offset was specified in the constructor. - */ - @Override - public void reset() { - pos = mark; - } - - /** - * Closing a ByteArrayInputStream has no effect. The methods in this class can be called after the stream has been closed without generating an IOException. - *

- */ - @Override - public void close() { - } - } diff --git a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/ByteArrayOutputStream.java b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/ByteArrayOutputStream.java index aade1999b2..bdc61aba9a 100644 --- a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/ByteArrayOutputStream.java +++ b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/ByteArrayOutputStream.java @@ -16,205 +16,16 @@ */ package org.apache.nifi.stream.io; -import java.io.IOException; -import java.io.OutputStream; -import java.io.UnsupportedEncodingException; -import java.util.Arrays; - /** - * This class provides a more efficient implementation of the java.io.ByteArrayOutputStream. The efficiency is gained in two ways: - *

- * + * @deprecated use java.io.ByteArrayOutputStream instead */ -public class ByteArrayOutputStream extends OutputStream { - - /** - * The buffer where data is stored. - */ - protected byte buf[]; - - /** - * The number of valid bytes in the buffer. - */ - protected int count; - - /** - * Creates a new byte array output stream. The buffer capacity is initially 32 bytes, though its size increases if necessary. - */ +@Deprecated +public class ByteArrayOutputStream extends java.io.ByteArrayOutputStream { public ByteArrayOutputStream() { - this(32); + super(); } - /** - * Creates a new byte array output stream, with a buffer capacity of the specified size, in bytes. - * - * @param size the initial size. - * @exception IllegalArgumentException if size is negative. - */ - public ByteArrayOutputStream(int size) { - if (size < 0) { - throw new IllegalArgumentException("Negative initial size: " - + size); - } - buf = new byte[size]; - } - - /** - * Increases the capacity if necessary to ensure that it can hold at least the number of elements specified by the minimum capacity argument. - * - * @param minCapacity the desired minimum capacity - * @throws OutOfMemoryError if {@code minCapacity < 0}. This is interpreted as a request for the unsatisfiably large capacity {@code (long) Integer.MAX_VALUE + (minCapacity - Integer.MAX_VALUE)}. - */ - private void ensureCapacity(int minCapacity) { - // overflow-conscious code - if (minCapacity - buf.length > 0) { - grow(minCapacity); - } - } - - /** - * Increases the capacity to ensure that it can hold at least the number of elements specified by the minimum capacity argument. - * - * @param minCapacity the desired minimum capacity - */ - private void grow(int minCapacity) { - // overflow-conscious code - int oldCapacity = buf.length; - int newCapacity = oldCapacity << 1; - if (newCapacity - minCapacity < 0) { - newCapacity = minCapacity; - } - if (newCapacity < 0) { - if (minCapacity < 0) { // overflow - throw new OutOfMemoryError(); - } - newCapacity = Integer.MAX_VALUE; - } - buf = Arrays.copyOf(buf, newCapacity); - } - - /** - * Writes the specified byte to this byte array output stream. - * - * @param b the byte to be written. - */ - @Override - public void write(int b) { - ensureCapacity(count + 1); - buf[count] = (byte) b; - count += 1; - } - - /** - * Writes len bytes from the specified byte array starting at offset off to this byte array output stream. - * - * @param b the data. - * @param off the start offset in the data. - * @param len the number of bytes to write. - */ - @Override - public void write(byte b[], int off, int len) { - if ((off < 0) || (off > b.length) || (len < 0) - || ((off + len) - b.length > 0)) { - throw new IndexOutOfBoundsException(); - } - ensureCapacity(count + len); - System.arraycopy(b, off, buf, count, len); - count += len; - } - - /** - * Writes the complete contents of this byte array output stream to the specified output stream argument, as if by calling the output stream's write method using - * out.write(buf, 0, count). - * - * @param out the output stream to which to write the data. - * @exception IOException if an I/O error occurs. - */ - public void writeTo(OutputStream out) throws IOException { - out.write(buf, 0, count); - } - - /** - * Resets the count field of this byte array output stream to zero, so that all currently accumulated output in the output stream is discarded. The output stream can be used again, - * reusing the already allocated buffer space. - * - * @see java.io.ByteArrayInputStream#count - */ - public void reset() { - count = 0; - } - - /** - * Creates a newly allocated byte array. Its size is the current size of this output stream and the valid contents of the buffer have been copied into it. - * - * @return the current contents of this output stream, as a byte array. - * @see java.io.ByteArrayOutputStream#size() - */ - public byte[] toByteArray() { - return Arrays.copyOf(buf, count); - } - - /** - * Returns the current size of the buffer. - * - * @return the value of the count field, which is the number of valid bytes in this output stream. - * @see java.io.ByteArrayOutputStream#count - */ - public int size() { - return count; - } - - /** - * Converts the buffer's contents into a string decoding bytes using the platform's default character set. The length of the new String - * is a function of the character set, and hence may not be equal to the size of the buffer. - * - *

- * This method always replaces malformed-input and unmappable-character sequences with the default replacement string for the platform's default character set. The - * {@linkplain java.nio.charset.CharsetDecoder} class should be used when more control over the decoding process is required. - * - * @return String decoded from the buffer's contents. - * @since JDK1.1 - */ - @Override - public String toString() { - return new String(buf, 0, count); - } - - /** - * Converts the buffer's contents into a string by decoding the bytes using the specified {@link java.nio.charset.Charset charsetName}. The length of the new String is a function of the - * charset, and hence may not be equal to the length of the byte array. - * - *

- * This method always replaces malformed-input and unmappable-character sequences with this charset's default replacement string. The {@link - * java.nio.charset.CharsetDecoder} class should be used when more control over the decoding process is required. - * - * @param charsetName the name of a supported {@linkplain java.nio.charset.Charset charset} - * @return String decoded from the buffer's contents. - * @exception UnsupportedEncodingException If the named charset is not supported - * @since JDK1.1 - */ - public String toString(String charsetName) throws UnsupportedEncodingException { - return new String(buf, 0, count, charsetName); - } - - /** - * Closing a ByteArrayOutputStream has no effect. The methods in this class can be called after the stream has been closed without generating an IOException. - *

- * - */ - @Override - public void close() { - } - - public byte[] getUnderlyingBuffer() { - return buf; - } - - public int getBufferLength() { - return count; + public ByteArrayOutputStream(int initialBufferSize) { + super(initialBufferSize); } } diff --git a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/DataOutputStream.java b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/DataOutputStream.java index e2059965a3..02b8f1ebcf 100644 --- a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/DataOutputStream.java +++ b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/DataOutputStream.java @@ -16,355 +16,15 @@ */ package org.apache.nifi.stream.io; -import java.io.DataOutput; -import java.io.FilterOutputStream; -import java.io.IOException; import java.io.OutputStream; -import java.io.UTFDataFormatException; /** - * This class is different from java.io.DataOutputStream in that it does synchronize on its methods. + * @deprecated use java.io.DataOutputStream instead */ -public class DataOutputStream extends FilterOutputStream implements DataOutput { +@Deprecated +public class DataOutputStream extends java.io.DataOutputStream { - /** - * The number of bytes written to the data output stream so far. If this counter overflows, it will be wrapped to Integer.MAX_VALUE. - */ - protected int written; - - /** - * bytearr is initialized on demand by writeUTF - */ - private byte[] bytearr = null; - - /** - * Creates a new data output stream to write data to the specified underlying output stream. The counter written is set to zero. - * - * @param out the underlying output stream, to be saved for later use. - * @see java.io.FilterOutputStream#out - */ public DataOutputStream(OutputStream out) { super(out); } - - /** - * Increases the written counter by the specified value until it reaches Integer.MAX_VALUE. - */ - private void incCount(int value) { - int temp = written + value; - if (temp < 0) { - temp = Integer.MAX_VALUE; - } - written = temp; - } - - /** - * Writes the specified byte (the low eight bits of the argument b) to the underlying output stream. If no exception is thrown, the counter written is incremented by - * 1. - *

- * Implements the write method of OutputStream. - * - * @param b the byte to be written. - * @exception IOException if an I/O error occurs. - * @see java.io.FilterOutputStream#out - */ - @Override - public void write(int b) throws IOException { - out.write(b); - incCount(1); - } - - /** - * Writes len bytes from the specified byte array starting at offset off to the underlying output stream. If no exception is thrown, the counter written is - * incremented by len. - * - * @param b the data. - * @param off the start offset in the data. - * @param len the number of bytes to write. - * @exception IOException if an I/O error occurs. - * @see java.io.FilterOutputStream#out - */ - @Override - public void write(byte b[], int off, int len) throws IOException { - out.write(b, off, len); - incCount(len); - } - - /** - * Flushes this data output stream. This forces any buffered output bytes to be written out to the stream. - *

- * The flush method of DataOutputStream calls the flush method of its underlying output stream. - * - * @exception IOException if an I/O error occurs. - * @see java.io.FilterOutputStream#out - * @see java.io.OutputStream#flush() - */ - @Override - public void flush() throws IOException { - out.flush(); - } - - /** - * Writes a boolean to the underlying output stream as a 1-byte value. The value true is written out as the value (byte)1; the value false is - * written out as the value (byte)0. If no exception is thrown, the counter written is incremented by 1. - * - * @param v a boolean value to be written. - * @exception IOException if an I/O error occurs. - * @see java.io.FilterOutputStream#out - */ - @Override - public final void writeBoolean(boolean v) throws IOException { - out.write(v ? 1 : 0); - incCount(1); - } - - /** - * Writes out a byte to the underlying output stream as a 1-byte value. If no exception is thrown, the counter written is incremented by 1. - * - * @param v a byte value to be written. - * @exception IOException if an I/O error occurs. - * @see java.io.FilterOutputStream#out - */ - @Override - public final void writeByte(int v) throws IOException { - out.write(v); - incCount(1); - } - - /** - * Writes a short to the underlying output stream as two bytes, high byte first. If no exception is thrown, the counter written is incremented by 2. - * - * @param v a short to be written. - * @exception IOException if an I/O error occurs. - * @see java.io.FilterOutputStream#out - */ - @Override - public final void writeShort(int v) throws IOException { - out.write((v >>> 8) & 0xFF); - out.write((v) & 0xFF); - incCount(2); - } - - /** - * Writes a char to the underlying output stream as a 2-byte value, high byte first. If no exception is thrown, the counter written is incremented by 2. - * - * @param v a char value to be written. - * @exception IOException if an I/O error occurs. - * @see java.io.FilterOutputStream#out - */ - @Override - public final void writeChar(int v) throws IOException { - out.write((v >>> 8) & 0xFF); - out.write((v) & 0xFF); - incCount(2); - } - - /** - * Writes an int to the underlying output stream as four bytes, high byte first. If no exception is thrown, the counter written is incremented by 4. - * - * @param v an int to be written. - * @exception IOException if an I/O error occurs. - * @see java.io.FilterOutputStream#out - */ - @Override - public final void writeInt(int v) throws IOException { - out.write((v >>> 24) & 0xFF); - out.write((v >>> 16) & 0xFF); - out.write((v >>> 8) & 0xFF); - out.write((v) & 0xFF); - incCount(4); - } - - private final byte writeBuffer[] = new byte[8]; - - /** - * Writes a long to the underlying output stream as eight bytes, high byte first. In no exception is thrown, the counter written is incremented by 8. - * - * @param v a long to be written. - * @exception IOException if an I/O error occurs. - * @see java.io.FilterOutputStream#out - */ - @Override - public final void writeLong(long v) throws IOException { - writeBuffer[0] = (byte) (v >>> 56); - writeBuffer[1] = (byte) (v >>> 48); - writeBuffer[2] = (byte) (v >>> 40); - writeBuffer[3] = (byte) (v >>> 32); - writeBuffer[4] = (byte) (v >>> 24); - writeBuffer[5] = (byte) (v >>> 16); - writeBuffer[6] = (byte) (v >>> 8); - writeBuffer[7] = (byte) (v); - out.write(writeBuffer, 0, 8); - incCount(8); - } - - /** - * Converts the float argument to an int using the floatToIntBits method in class Float, and then writes that int value to the underlying output - * stream as a 4-byte quantity, high byte first. If no exception is thrown, the counter written is incremented by 4. - * - * @param v a float value to be written. - * @exception IOException if an I/O error occurs. - * @see java.io.FilterOutputStream#out - * @see java.lang.Float#floatToIntBits(float) - */ - @Override - public final void writeFloat(float v) throws IOException { - writeInt(Float.floatToIntBits(v)); - } - - /** - * Converts the double argument to a long using the doubleToLongBits method in class Double, and then writes that long value to the underlying - * output stream as an 8-byte quantity, high byte first. If no exception is thrown, the counter written is incremented by 8. - * - * @param v a double value to be written. - * @exception IOException if an I/O error occurs. - * @see java.io.FilterOutputStream#out - * @see java.lang.Double#doubleToLongBits(double) - */ - @Override - public final void writeDouble(double v) throws IOException { - writeLong(Double.doubleToLongBits(v)); - } - - /** - * Writes out the string to the underlying output stream as a sequence of bytes. Each character in the string is written out, in sequence, by discarding its high eight bits. If no exception is - * thrown, the counter written is incremented by the length of s. - * - * @param s a string of bytes to be written. - * @exception IOException if an I/O error occurs. - * @see java.io.FilterOutputStream#out - */ - @Override - public final void writeBytes(String s) throws IOException { - int len = s.length(); - for (int i = 0; i < len; i++) { - out.write((byte) s.charAt(i)); - } - incCount(len); - } - - /** - * Writes a string to the underlying output stream as a sequence of characters. Each character is written to the data output stream as if by the writeChar method. If no exception is - * thrown, the counter written is incremented by twice the length of s. - * - * @param s a String value to be written. - * @exception IOException if an I/O error occurs. - * @see java.io.DataOutputStream#writeChar(int) - * @see java.io.FilterOutputStream#out - */ - @Override - public final void writeChars(String s) throws IOException { - int len = s.length(); - for (int i = 0; i < len; i++) { - int v = s.charAt(i); - out.write((v >>> 8) & 0xFF); - out.write((v) & 0xFF); - } - incCount(len * 2); - } - - /** - * Writes a string to the underlying output stream using - * modified UTF-8 - * encoding in a machine-independent manner. - *

- * First, two bytes are written to the output stream as if by the writeShort method giving the number of bytes to follow. This value is the number of bytes actually written out, not - * the length of the string. Following the length, each character of the string is output, in sequence, using the modified UTF-8 encoding for the character. If no exception is thrown, the counter - * written is incremented by the total number of bytes written to the output stream. This will be at least two plus the length of str, and at most two plus thrice the - * length of str. - * - * @param str a string to be written. - * @exception IOException if an I/O error occurs. - */ - @Override - public final void writeUTF(String str) throws IOException { - writeUTF(str, this); - } - - /** - * Writes a string to the specified DataOutput using - * modified UTF-8 - * encoding in a machine-independent manner. - *

- * First, two bytes are written to out as if by the writeShort method giving the number of bytes to follow. This value is the number of bytes actually written out, not the length of - * the string. Following the length, each character of the string is output, in sequence, using the modified UTF-8 encoding for the character. If no exception is thrown, the counter - * written is incremented by the total number of bytes written to the output stream. This will be at least two plus the length of str, and at most two plus thrice the - * length of str. - * - * @param str a string to be written. - * @param out destination to write to - * @return The number of bytes written out. - * @exception IOException if an I/O error occurs. - */ - static int writeUTF(String str, DataOutput out) throws IOException { - int strlen = str.length(); - int utflen = 0; - int c, count = 0; - - /* use charAt instead of copying String to char array */ - for (int i = 0; i < strlen; i++) { - c = str.charAt(i); - if ((c >= 0x0001) && (c <= 0x007F)) { - utflen++; - } else if (c > 0x07FF) { - utflen += 3; - } else { - utflen += 2; - } - } - - if (utflen > 65535) { - throw new UTFDataFormatException("encoded string too long: " + utflen + " bytes"); - } - - byte[] bytearr = null; - if (out instanceof DataOutputStream) { - DataOutputStream dos = (DataOutputStream) out; - if (dos.bytearr == null || (dos.bytearr.length < (utflen + 2))) { - dos.bytearr = new byte[(utflen * 2) + 2]; - } - bytearr = dos.bytearr; - } else { - bytearr = new byte[utflen + 2]; - } - - bytearr[count++] = (byte) ((utflen >>> 8) & 0xFF); - bytearr[count++] = (byte) ((utflen) & 0xFF); - - int i = 0; - for (i = 0; i < strlen; i++) { - c = str.charAt(i); - if (!((c >= 0x0001) && (c <= 0x007F))) { - break; - } - bytearr[count++] = (byte) c; - } - - for (; i < strlen; i++) { - c = str.charAt(i); - if ((c >= 0x0001) && (c <= 0x007F)) { - bytearr[count++] = (byte) c; - - } else if (c > 0x07FF) { - bytearr[count++] = (byte) (0xE0 | ((c >> 12) & 0x0F)); - bytearr[count++] = (byte) (0x80 | ((c >> 6) & 0x3F)); - bytearr[count++] = (byte) (0x80 | ((c) & 0x3F)); - } else { - bytearr[count++] = (byte) (0xC0 | ((c >> 6) & 0x1F)); - bytearr[count++] = (byte) (0x80 | ((c) & 0x3F)); - } - } - out.write(bytearr, 0, utflen + 2); - return utflen + 2; - } - - /** - * Returns the current value of the counter written, the number of bytes written to this data output stream so far. If the counter overflows, it will be wrapped to Integer.MAX_VALUE. - * - * @return the value of the written field. - * @see java.io.DataOutputStream#written - */ - public final int size() { - return written; - } } diff --git a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/LeakyBucketStreamThrottler.java b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/LeakyBucketStreamThrottler.java index 5153db5aff..0bee0d72c1 100644 --- a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/LeakyBucketStreamThrottler.java +++ b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/LeakyBucketStreamThrottler.java @@ -95,11 +95,11 @@ public class LeakyBucketStreamThrottler implements StreamThrottler { public int read() throws IOException { final ByteArrayOutputStream baos = new ByteArrayOutputStream(1); LeakyBucketStreamThrottler.this.copy(toWrap, baos, 1L); - if (baos.getBufferLength() < 1) { + if (baos.size() < 1) { return -1; } - return baos.getUnderlyingBuffer()[0] & 0xFF; + return baos.toByteArray()[0] & 0xFF; } @Override @@ -124,7 +124,7 @@ public class LeakyBucketStreamThrottler implements StreamThrottler { if (copied == 0) { return -1; } - System.arraycopy(baos.getUnderlyingBuffer(), 0, b, off, copied); + System.arraycopy(baos.toByteArray(), 0, b, off, copied); return copied; } diff --git a/nifi-commons/nifi-utils/src/test/java/org/apache/nifi/stream/io/TestLeakyBucketThrottler.java b/nifi-commons/nifi-utils/src/test/java/org/apache/nifi/stream/io/TestLeakyBucketThrottler.java index e46577a614..1e498b7459 100644 --- a/nifi-commons/nifi-utils/src/test/java/org/apache/nifi/stream/io/TestLeakyBucketThrottler.java +++ b/nifi-commons/nifi-utils/src/test/java/org/apache/nifi/stream/io/TestLeakyBucketThrottler.java @@ -107,8 +107,8 @@ public class TestLeakyBucketThrottler { assertTrue(elapsed < 7000); // ensure bytes were copied out appropriately - assertEquals(3 * (2 * 1024 * 1024 + 1), baos.getBufferLength()); - assertEquals((byte) 'A', baos.getUnderlyingBuffer()[baos.getBufferLength() - 1]); + assertEquals(3 * (2 * 1024 * 1024 + 1), baos.size()); + assertEquals((byte) 'A', baos.toByteArray()[baos.size() - 1]); } } diff --git a/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/java/org/apache/nifi/processors/splunk/PutSplunk.java b/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/java/org/apache/nifi/processors/splunk/PutSplunk.java index 9e4aa125b4..0a09243621 100644 --- a/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/java/org/apache/nifi/processors/splunk/PutSplunk.java +++ b/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/java/org/apache/nifi/processors/splunk/PutSplunk.java @@ -184,11 +184,12 @@ public class PutSplunk extends AbstractPutEventProcessor { // if TCP and we don't end in a new line then add one final String protocol = context.getProperty(PROTOCOL).getValue(); - if (protocol.equals(TCP_VALUE.getValue())) { - final byte[] buf = baos.getUnderlyingBuffer(); - if (buf[baos.size() - 1] != NEW_LINE_CHAR) { - baos.write(NEW_LINE_CHAR); - } + byte[] buf = baos.toByteArray(); + if (protocol.equals(TCP_VALUE.getValue()) && buf[buf.length - 1] != NEW_LINE_CHAR) { + final byte[] updatedBuf = new byte[buf.length + 1]; + System.arraycopy(buf, 0, updatedBuf, 0, buf.length); + updatedBuf[updatedBuf.length - 1] = NEW_LINE_CHAR; + buf = updatedBuf; } // create a message batch of one message and add to active batches @@ -198,7 +199,7 @@ public class PutSplunk extends AbstractPutEventProcessor { // attempt to send the data and add the appropriate range try { - sender.send(baos.toByteArray()); + sender.send(buf); messageBatch.addSuccessfulRange(0L, flowFile.getSize()); } catch (IOException e) { messageBatch.addFailedRange(0L, flowFile.getSize(), e); @@ -281,6 +282,9 @@ public class PutSplunk extends AbstractPutEventProcessor { }); messageBatch.setNumMessages(messagesSent.get()); + } catch (final IOException ioe) { + // Since this can be thrown only from closing the ByteArrayOutputStream(), we have already + // completed everything that we need to do, so there's nothing really to be done here } } @@ -299,7 +303,7 @@ public class PutSplunk extends AbstractPutEventProcessor { return null; } - final byte[] buf = baos.getUnderlyingBuffer(); + final byte[] buf = baos.toByteArray(); // if TCP and we don't already end with a new line then add one if (protocol.equals(TCP_VALUE.getValue()) && buf[length - 1] != NEW_LINE_CHAR) { @@ -311,7 +315,7 @@ public class PutSplunk extends AbstractPutEventProcessor { message[message.length - 1] = NEW_LINE_CHAR; return message; } else { - return Arrays.copyOfRange(baos.getUnderlyingBuffer(), 0, length); + return Arrays.copyOfRange(baos.toByteArray(), 0, length); } } diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TailFile.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TailFile.java index c5fcefbc2a..88b83b413e 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TailFile.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TailFile.java @@ -810,11 +810,12 @@ public class TailFile extends AbstractProcessor { byte ch = buffer.get(i); switch (ch) { - case '\n': + case '\n': { baos.write(ch); seenCR = false; baos.writeTo(out); - checksum.update(baos.getUnderlyingBuffer(), 0, baos.size()); + final byte[] baosBuffer = baos.toByteArray(); + checksum.update(baosBuffer, 0, baos.size()); if (getLogger().isTraceEnabled()) { getLogger().trace("Checksum updated to {}", new Object[]{checksum.getValue()}); } @@ -823,15 +824,18 @@ public class TailFile extends AbstractProcessor { rePos = pos + i + 1; linesRead++; break; - case '\r': + } + case '\r': { baos.write(ch); seenCR = true; break; - default: + } + default: { if (seenCR) { seenCR = false; baos.writeTo(out); - checksum.update(baos.getUnderlyingBuffer(), 0, baos.size()); + final byte[] baosBuffer = baos.toByteArray(); + checksum.update(baosBuffer, 0, baos.size()); if (getLogger().isTraceEnabled()) { getLogger().trace("Checksum updated to {}", new Object[]{checksum.getValue()}); } @@ -843,6 +847,7 @@ public class TailFile extends AbstractProcessor { } else { baos.write(ch); } + } } } diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JmsFactory.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JmsFactory.java index 45758a4a35..19e8ac07ae 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JmsFactory.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JmsFactory.java @@ -185,7 +185,11 @@ public class JmsFactory { baos.write(byteBuffer, 0, byteCount); } - baos.close(); + try { + baos.close(); + } catch (final IOException ioe) { + } + return baos.toByteArray(); }