mirror of https://github.com/apache/nifi.git
NIFI-145: Allow a run.as user to be set in bootstrap.conf file; addressed concerns where an un-priveleged user could issue commands to running NiFi to shutdown; addressed concerns where an un-priveleged user could push large amounts of data to the Bootstrap or NiFi causing OOME
This commit is contained in:
parent
d517b3fd09
commit
76f54f8611
|
@ -92,7 +92,7 @@ public class VolatileContentRepository implements ContentRepository {
|
|||
public static final String MAX_SIZE_PROPERTY = "nifi.volatile.content.repository.max.size";
|
||||
public static final String BLOCK_SIZE_PROPERTY = "nifi.volatile.content.repository.block.size";
|
||||
|
||||
private final ScheduledExecutorService executor = new FlowEngine(3, "VolatileContentRepository Workers");
|
||||
private final ScheduledExecutorService executor = new FlowEngine(3, "VolatileContentRepository Workers", true);
|
||||
private final ConcurrentMap<ContentClaim, ContentBlock> claimMap = new ConcurrentHashMap<>(256);
|
||||
private final AtomicLong repoSize = new AtomicLong(0L);
|
||||
|
||||
|
|
|
@ -13,7 +13,7 @@ java.arg.2=-Xms256m
|
|||
java.arg.3=-Xmx512m
|
||||
|
||||
# Enable Remote Debugging
|
||||
#java.arg.2=-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=8000
|
||||
#java.arg.debug=-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=8000
|
||||
|
||||
# Java command to use when running NiFi
|
||||
java=java
|
||||
|
|
|
@ -27,9 +27,11 @@ import java.net.Socket;
|
|||
import java.net.SocketTimeoutException;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.Arrays;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
|
||||
import org.apache.nifi.util.LimitingInputStream;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
|
@ -38,14 +40,16 @@ public class BootstrapListener {
|
|||
|
||||
private final NiFi nifi;
|
||||
private final int bootstrapPort;
|
||||
|
||||
private final String secretKey;
|
||||
|
||||
private volatile Listener listener;
|
||||
private volatile ServerSocket serverSocket;
|
||||
|
||||
|
||||
public BootstrapListener(final NiFi nifi, final int port) {
|
||||
public BootstrapListener(final NiFi nifi, final int bootstrapPort) {
|
||||
this.nifi = nifi;
|
||||
this.bootstrapPort = port;
|
||||
this.bootstrapPort = bootstrapPort;
|
||||
secretKey = UUID.randomUUID().toString();
|
||||
}
|
||||
|
||||
public void start() throws IOException {
|
||||
|
@ -71,7 +75,7 @@ public class BootstrapListener {
|
|||
socket.setSoTimeout(60000);
|
||||
|
||||
final OutputStream out = socket.getOutputStream();
|
||||
out.write(("PORT " + localPort + "\n").getBytes(StandardCharsets.UTF_8));
|
||||
out.write(("PORT " + localPort + " " + secretKey + "\n").getBytes(StandardCharsets.UTF_8));
|
||||
out.flush();
|
||||
|
||||
logger.debug("Awaiting response from Bootstrap...");
|
||||
|
@ -121,6 +125,7 @@ public class BootstrapListener {
|
|||
try {
|
||||
final Socket socket;
|
||||
try {
|
||||
logger.debug("Listening for Bootstrap Requests");
|
||||
socket = serverSocket.accept();
|
||||
} catch (final SocketTimeoutException ste) {
|
||||
if ( stopped ) {
|
||||
|
@ -136,6 +141,9 @@ public class BootstrapListener {
|
|||
throw ioe;
|
||||
}
|
||||
|
||||
logger.debug("Received connection from Bootstrap");
|
||||
socket.setSoTimeout(5000);
|
||||
|
||||
executor.submit(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
|
@ -184,27 +192,42 @@ public class BootstrapListener {
|
|||
out.flush();
|
||||
}
|
||||
|
||||
private BootstrapRequest readRequest(final InputStream in) throws IOException {
|
||||
final BufferedReader reader = new BufferedReader(new InputStreamReader(in));
|
||||
|
||||
@SuppressWarnings("resource") // we don't want to close the stream, as the caller will do that
|
||||
private BootstrapRequest readRequest(final InputStream in) throws IOException {
|
||||
// We want to ensure that we don't try to read data from an InputStream directly
|
||||
// by a BufferedReader because any user on the system could open a socket and send
|
||||
// a multi-gigabyte file without any new lines in order to crash the NiFi instance
|
||||
// (or at least cause OutOfMemoryErrors, which can wreak havoc on the running instance).
|
||||
// So we will limit the Input Stream to only 4 KB, which should be plenty for any request.
|
||||
final LimitingInputStream limitingIn = new LimitingInputStream(in, 4096);
|
||||
final BufferedReader reader = new BufferedReader(new InputStreamReader(limitingIn));
|
||||
|
||||
final String line = reader.readLine();
|
||||
final String[] splits = line.split(" ");
|
||||
if ( splits.length < 0 ) {
|
||||
throw new IOException("Received invalid command from NiFi: " + line);
|
||||
throw new IOException("Received invalid request from Bootstrap: " + line);
|
||||
}
|
||||
|
||||
final String requestType = splits[0];
|
||||
final String[] args;
|
||||
if ( splits.length == 1 ) {
|
||||
args = new String[0];
|
||||
throw new IOException("Received invalid request from Bootstrap; request did not have a secret key; request type = " + requestType);
|
||||
} else if ( splits.length == 2 ) {
|
||||
args = new String[0];
|
||||
} else {
|
||||
args = Arrays.copyOfRange(splits, 1, splits.length);
|
||||
args = Arrays.copyOfRange(splits, 2, splits.length);
|
||||
}
|
||||
|
||||
final String requestKey = splits[1];
|
||||
if ( !secretKey.equals(requestKey) ) {
|
||||
throw new IOException("Received invalid Secret Key for request type " + requestType);
|
||||
}
|
||||
|
||||
try {
|
||||
return new BootstrapRequest(requestType, args);
|
||||
} catch (final Exception e) {
|
||||
throw new IOException("Received invalid request from bootstrap; request type = " + requestType);
|
||||
throw new IOException("Received invalid request from Bootstrap; request type = " + requestType);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -227,7 +250,8 @@ public class BootstrapListener {
|
|||
return requestType;
|
||||
}
|
||||
|
||||
public String[] getArgs() {
|
||||
@SuppressWarnings("unused")
|
||||
public String[] getArgs() {
|
||||
return args;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -36,7 +36,6 @@ import org.apache.nifi.nar.NarClassLoaders;
|
|||
import org.apache.nifi.nar.NarUnpacker;
|
||||
import org.apache.nifi.util.FileUtils;
|
||||
import org.apache.nifi.util.NiFiProperties;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.slf4j.bridge.SLF4JBridgeHandler;
|
||||
|
@ -61,7 +60,6 @@ public class NiFi {
|
|||
|
||||
// register the shutdown hook
|
||||
Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
// shutdown the jetty server
|
||||
|
|
|
@ -0,0 +1,107 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.util;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
|
||||
public class LimitingInputStream extends InputStream {
|
||||
|
||||
private final InputStream in;
|
||||
private final long limit;
|
||||
private long bytesRead = 0;
|
||||
|
||||
public LimitingInputStream(final InputStream in, final long limit) {
|
||||
this.in = in;
|
||||
this.limit = limit;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int read() throws IOException {
|
||||
if (bytesRead >= limit) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
final int val = in.read();
|
||||
if (val > -1) {
|
||||
bytesRead++;
|
||||
}
|
||||
return val;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int read(final byte[] b) throws IOException {
|
||||
if (bytesRead >= limit) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
final int maxToRead = (int) Math.min(b.length, limit - bytesRead);
|
||||
|
||||
final int val = in.read(b, 0, maxToRead);
|
||||
if (val > 0) {
|
||||
bytesRead += val;
|
||||
}
|
||||
return val;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int read(byte[] b, int off, int len) throws IOException {
|
||||
if (bytesRead >= limit) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
final int maxToRead = (int) Math.min(len, limit - bytesRead);
|
||||
|
||||
final int val = in.read(b, off, maxToRead);
|
||||
if (val > 0) {
|
||||
bytesRead += val;
|
||||
}
|
||||
return val;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long skip(final long n) throws IOException {
|
||||
final long skipped = in.skip(Math.min(n, limit - bytesRead));
|
||||
bytesRead += skipped;
|
||||
return skipped;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int available() throws IOException {
|
||||
return in.available();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() throws IOException {
|
||||
in.close();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void mark(int readlimit) {
|
||||
in.mark(readlimit);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean markSupported() {
|
||||
return in.markSupported();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void reset() throws IOException {
|
||||
in.reset();
|
||||
}
|
||||
}
|
|
@ -64,7 +64,7 @@ public class BootstrapCodec {
|
|||
private void processRequest(final String cmd, final String[] args) throws InvalidCommandException, IOException {
|
||||
switch (cmd) {
|
||||
case "PORT": {
|
||||
if ( args.length != 1 ) {
|
||||
if ( args.length != 2 ) {
|
||||
throw new InvalidCommandException();
|
||||
}
|
||||
|
||||
|
@ -78,8 +78,10 @@ public class BootstrapCodec {
|
|||
if ( port < 1 || port > 65535 ) {
|
||||
throw new InvalidCommandException("Invalid Port number; should be integer between 1 and 65535");
|
||||
}
|
||||
|
||||
final String secretKey = args[1];
|
||||
|
||||
runner.setNiFiCommandControlPort(port);
|
||||
runner.setNiFiCommandControlPort(port, secretKey);
|
||||
writer.write("OK");
|
||||
writer.newLine();
|
||||
writer.flush();
|
||||
|
|
|
@ -17,6 +17,7 @@
|
|||
package org.apache.nifi.bootstrap;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.net.ServerSocket;
|
||||
import java.net.Socket;
|
||||
|
@ -24,6 +25,8 @@ import java.util.concurrent.ExecutorService;
|
|||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.apache.nifi.bootstrap.util.LimitingInputStream;
|
||||
|
||||
public class NiFiListener {
|
||||
private ServerSocket serverSocket;
|
||||
private volatile Listener listener;
|
||||
|
@ -92,17 +95,26 @@ public class NiFiListener {
|
|||
throw ioe;
|
||||
}
|
||||
|
||||
|
||||
executor.submit(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
final BootstrapCodec codec = new BootstrapCodec(runner, socket.getInputStream(), socket.getOutputStream());
|
||||
// we want to ensure that we don't try to read data from an InputStream directly
|
||||
// by a BufferedReader because any user on the system could open a socket and send
|
||||
// a multi-gigabyte file without any new lines in order to crash the Bootstrap,
|
||||
// which in turn may cause the Shutdown Hook to shutdown NiFi.
|
||||
// So we will limit the amount of data to read to 4 KB
|
||||
final InputStream limitingIn = new LimitingInputStream(socket.getInputStream(), 4096);
|
||||
final BootstrapCodec codec = new BootstrapCodec(runner, limitingIn, socket.getOutputStream());
|
||||
codec.communicate();
|
||||
socket.close();
|
||||
} catch (final Throwable t) {
|
||||
System.out.println("Failed to communicate with NiFi due to " + t);
|
||||
t.printStackTrace();
|
||||
} finally {
|
||||
try {
|
||||
socket.close();
|
||||
} catch (final IOException ioe) {
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
|
|
|
@ -31,7 +31,10 @@ import java.lang.reflect.Field;
|
|||
import java.net.InetSocketAddress;
|
||||
import java.net.Socket;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.attribute.PosixFilePermission;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
@ -205,6 +208,20 @@ public class RunNiFi {
|
|||
|
||||
private synchronized void saveProperties(final Properties nifiProps) throws IOException {
|
||||
final File statusFile = getStatusFile();
|
||||
if ( statusFile.exists() && !statusFile.delete() ) {
|
||||
logger.warning("Failed to delete " + statusFile);
|
||||
}
|
||||
|
||||
if ( !statusFile.createNewFile() ) {
|
||||
throw new IOException("Failed to create file " + statusFile);
|
||||
}
|
||||
|
||||
try {
|
||||
Files.setPosixFilePermissions(statusFile.toPath(), Collections.singleton(PosixFilePermission.OWNER_READ));
|
||||
} catch (final Exception e) {
|
||||
logger.warning("Failed to set permissions so that only the owner can read status file " + statusFile + "; this may allows others to have access to the key needed to communicate with NiFi. Permissions should be changed so that only the owner can read this file");
|
||||
}
|
||||
|
||||
try (final FileOutputStream fos = new FileOutputStream(statusFile)) {
|
||||
nifiProps.store(fos, null);
|
||||
fos.getFD().sync();
|
||||
|
@ -213,16 +230,16 @@ public class RunNiFi {
|
|||
logger.fine("Saved Properties " + nifiProps + " to " + statusFile);
|
||||
}
|
||||
|
||||
private boolean isPingSuccessful(final int port) {
|
||||
private boolean isPingSuccessful(final int port, final String secretKey) {
|
||||
logger.fine("Pinging " + port);
|
||||
|
||||
try (final Socket socket = new Socket("localhost", port)) {
|
||||
final OutputStream out = socket.getOutputStream();
|
||||
out.write((PING_CMD + "\n").getBytes(StandardCharsets.UTF_8));
|
||||
out.write((PING_CMD + " " + secretKey + "\n").getBytes(StandardCharsets.UTF_8));
|
||||
out.flush();
|
||||
|
||||
logger.fine("Sent PING command");
|
||||
|
||||
socket.setSoTimeout(5000);
|
||||
final InputStream in = socket.getInputStream();
|
||||
final BufferedReader reader = new BufferedReader(new InputStreamReader(in));
|
||||
final String response = reader.readLine();
|
||||
|
@ -245,7 +262,7 @@ public class RunNiFi {
|
|||
}
|
||||
|
||||
final int port = Integer.parseInt(portVal);
|
||||
final boolean success = isPingSuccessful(port);
|
||||
final boolean success = isPingSuccessful(port, props.getProperty("secret.key"));
|
||||
if ( success ) {
|
||||
logger.fine("Successful PING on port " + port);
|
||||
return port;
|
||||
|
@ -271,10 +288,7 @@ public class RunNiFi {
|
|||
// We use the "ps" command to check if the process is still running.
|
||||
final ProcessBuilder builder = new ProcessBuilder();
|
||||
|
||||
// ps -p <pid> -o comm=
|
||||
// -> -p <pid> to filter just the pid we care about
|
||||
// -> -o comm= to remove headers from the output
|
||||
builder.command("ps", "-p", pid, "-o", "comm=");
|
||||
builder.command("ps", "-p", pid, "--no-headers");
|
||||
final Process proc = builder.start();
|
||||
|
||||
// Read how many lines are output by the 'ps' command
|
||||
|
@ -321,6 +335,7 @@ public class RunNiFi {
|
|||
|
||||
final String portValue = props.getProperty("port");
|
||||
final String pid = props.getProperty("pid");
|
||||
final String secretKey = props.getProperty("secret.key");
|
||||
|
||||
if ( portValue == null && pid == null ) {
|
||||
return new Status(null, null, false, false);
|
||||
|
@ -331,7 +346,7 @@ public class RunNiFi {
|
|||
if ( portValue != null ) {
|
||||
try {
|
||||
port = Integer.parseInt(portValue);
|
||||
pingSuccess = isPingSuccessful(port);
|
||||
pingSuccess = isPingSuccessful(port, secretKey);
|
||||
} catch (final NumberFormatException nfe) {
|
||||
return new Status(null, null, false, false);
|
||||
}
|
||||
|
@ -373,14 +388,19 @@ public class RunNiFi {
|
|||
return;
|
||||
}
|
||||
|
||||
final Properties nifiProps = loadProperties();
|
||||
final String secretKey = nifiProps.getProperty("secret.key");
|
||||
|
||||
try (final Socket socket = new Socket()) {
|
||||
logger.fine("Connecting to NiFi instance");
|
||||
socket.setSoTimeout(60000);
|
||||
socket.connect(new InetSocketAddress("localhost", port));
|
||||
logger.fine("Established connection to NiFi instance.");
|
||||
socket.setSoTimeout(60000);
|
||||
|
||||
logger.fine("Sending SHUTDOWN Command to port " + port);
|
||||
final OutputStream out = socket.getOutputStream();
|
||||
out.write((SHUTDOWN_CMD + "\n").getBytes(StandardCharsets.UTF_8));
|
||||
out.write((SHUTDOWN_CMD + " " + secretKey + "\n").getBytes(StandardCharsets.UTF_8));
|
||||
out.flush();
|
||||
|
||||
final InputStream in = socket.getInputStream();
|
||||
|
@ -392,10 +412,8 @@ public class RunNiFi {
|
|||
if ( SHUTDOWN_CMD.equals(response) ) {
|
||||
logger.info("Apache NiFi has accepted the Shutdown Command and is shutting down now");
|
||||
|
||||
final Properties nifiProps = loadProperties();
|
||||
final String pid = nifiProps.getProperty("pid");
|
||||
if ( pid != null ) {
|
||||
|
||||
final Properties bootstrapProperties = new Properties();
|
||||
try (final FileInputStream fis = new FileInputStream(bootstrapConfigFile)) {
|
||||
bootstrapProperties.load(fis);
|
||||
|
@ -418,7 +436,7 @@ public class RunNiFi {
|
|||
if ( isProcessRunning(pid) ) {
|
||||
logger.warning("NiFi has not finished shutting down after " + gracefulShutdownSeconds + " seconds. Killing process.");
|
||||
try {
|
||||
killProcess(pid);
|
||||
killProcessTree(pid);
|
||||
} catch (final IOException ioe) {
|
||||
logger.severe("Failed to kill Process with PID " + pid);
|
||||
}
|
||||
|
@ -448,7 +466,31 @@ public class RunNiFi {
|
|||
}
|
||||
|
||||
|
||||
private static void killProcess(final String pid) throws IOException {
|
||||
private static List<String> getChildProcesses(final String ppid) throws IOException {
|
||||
final Process proc = Runtime.getRuntime().exec(new String[] {"ps", "-o", "pid", "--no-headers", "--ppid", ppid});
|
||||
final List<String> childPids = new ArrayList<>();
|
||||
try (final InputStream in = proc.getInputStream();
|
||||
final BufferedReader reader = new BufferedReader(new InputStreamReader(in))) {
|
||||
|
||||
String line;
|
||||
while ((line = reader.readLine()) != null) {
|
||||
childPids.add(line.trim());
|
||||
}
|
||||
}
|
||||
|
||||
return childPids;
|
||||
}
|
||||
|
||||
private void killProcessTree(final String pid) throws IOException {
|
||||
logger.fine("Killing Process Tree for PID " + pid);
|
||||
|
||||
final List<String> children = getChildProcesses(pid);
|
||||
logger.fine("Children of PID " + pid + ": " + children);
|
||||
|
||||
for ( final String childPid : children ) {
|
||||
killProcessTree(childPid);
|
||||
}
|
||||
|
||||
Runtime.getRuntime().exec(new String[] {"kill", "-9", pid});
|
||||
}
|
||||
|
||||
|
@ -620,7 +662,7 @@ public class RunNiFi {
|
|||
nifiPid = pid;
|
||||
final Properties nifiProps = new Properties();
|
||||
nifiProps.setProperty("pid", String.valueOf(nifiPid));
|
||||
saveProperties(properties);
|
||||
saveProperties(nifiProps);
|
||||
}
|
||||
|
||||
ShutdownHook shutdownHook = new ShutdownHook(process, this, gracefulShutdownSeconds);
|
||||
|
@ -651,7 +693,7 @@ public class RunNiFi {
|
|||
nifiPid = pid;
|
||||
final Properties nifiProps = new Properties();
|
||||
nifiProps.setProperty("pid", String.valueOf(nifiPid));
|
||||
saveProperties(properties);
|
||||
saveProperties(nifiProps);
|
||||
}
|
||||
|
||||
shutdownHook = new ShutdownHook(process, this, gracefulShutdownSeconds);
|
||||
|
@ -677,7 +719,7 @@ public class RunNiFi {
|
|||
nifiPid = pid;
|
||||
final Properties nifiProps = new Properties();
|
||||
nifiProps.setProperty("pid", String.valueOf(nifiPid));
|
||||
saveProperties(properties);
|
||||
saveProperties(nifiProps);
|
||||
}
|
||||
|
||||
boolean started = waitForStart();
|
||||
|
@ -758,9 +800,8 @@ public class RunNiFi {
|
|||
this.autoRestartNiFi = restart;
|
||||
}
|
||||
|
||||
void setNiFiCommandControlPort(final int port) {
|
||||
void setNiFiCommandControlPort(final int port, final String secretKey) {
|
||||
this.ccPort = port;
|
||||
|
||||
final File statusFile = getStatusFile();
|
||||
|
||||
final Properties nifiProps = new Properties();
|
||||
|
@ -768,6 +809,8 @@ public class RunNiFi {
|
|||
nifiProps.setProperty("pid", String.valueOf(nifiPid));
|
||||
}
|
||||
nifiProps.setProperty("port", String.valueOf(ccPort));
|
||||
nifiProps.setProperty("secret.key", secretKey);
|
||||
|
||||
try {
|
||||
saveProperties(nifiProps);
|
||||
} catch (final IOException ioe) {
|
||||
|
|
|
@ -0,0 +1,107 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.bootstrap.util;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
|
||||
public class LimitingInputStream extends InputStream {
|
||||
|
||||
private final InputStream in;
|
||||
private final long limit;
|
||||
private long bytesRead = 0;
|
||||
|
||||
public LimitingInputStream(final InputStream in, final long limit) {
|
||||
this.in = in;
|
||||
this.limit = limit;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int read() throws IOException {
|
||||
if (bytesRead >= limit) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
final int val = in.read();
|
||||
if (val > -1) {
|
||||
bytesRead++;
|
||||
}
|
||||
return val;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int read(final byte[] b) throws IOException {
|
||||
if (bytesRead >= limit) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
final int maxToRead = (int) Math.min(b.length, limit - bytesRead);
|
||||
|
||||
final int val = in.read(b, 0, maxToRead);
|
||||
if (val > 0) {
|
||||
bytesRead += val;
|
||||
}
|
||||
return val;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int read(byte[] b, int off, int len) throws IOException {
|
||||
if (bytesRead >= limit) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
final int maxToRead = (int) Math.min(len, limit - bytesRead);
|
||||
|
||||
final int val = in.read(b, off, maxToRead);
|
||||
if (val > 0) {
|
||||
bytesRead += val;
|
||||
}
|
||||
return val;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long skip(final long n) throws IOException {
|
||||
final long skipped = in.skip(Math.min(n, limit - bytesRead));
|
||||
bytesRead += skipped;
|
||||
return skipped;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int available() throws IOException {
|
||||
return in.available();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() throws IOException {
|
||||
in.close();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void mark(int readlimit) {
|
||||
in.mark(readlimit);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean markSupported() {
|
||||
return in.markSupported();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void reset() throws IOException {
|
||||
in.reset();
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue