From 76f54f86115e42d16f700aa0d1c5bce22f830734 Mon Sep 17 00:00:00 2001 From: Mark Payne Date: Fri, 12 Dec 2014 09:47:19 -0500 Subject: [PATCH] NIFI-145: Allow a run.as user to be set in bootstrap.conf file; addressed concerns where an un-priveleged user could issue commands to running NiFi to shutdown; addressed concerns where an un-priveleged user could push large amounts of data to the Bootstrap or NiFi causing OOME --- .../repository/VolatileContentRepository.java | 2 +- .../src/main/resources/conf/bootstrap.conf | 2 +- .../org/apache/nifi/BootstrapListener.java | 46 ++++++-- .../src/main/java/org/apache/nifi/NiFi.java | 2 - .../apache/nifi/util/LimitingInputStream.java | 107 ++++++++++++++++++ .../apache/nifi/bootstrap/BootstrapCodec.java | 6 +- .../apache/nifi/bootstrap/NiFiListener.java | 18 ++- .../org/apache/nifi/bootstrap/RunNiFi.java | 81 +++++++++---- .../bootstrap/util/LimitingInputStream.java | 107 ++++++++++++++++++ 9 files changed, 332 insertions(+), 39 deletions(-) create mode 100644 nar-bundles/framework-bundle/framework/runtime/src/main/java/org/apache/nifi/util/LimitingInputStream.java create mode 100644 nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/util/LimitingInputStream.java diff --git a/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/repository/VolatileContentRepository.java b/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/repository/VolatileContentRepository.java index e14ec5dcea..1a44725ba3 100644 --- a/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/repository/VolatileContentRepository.java +++ b/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/repository/VolatileContentRepository.java @@ -92,7 +92,7 @@ public class VolatileContentRepository implements ContentRepository { public static final String MAX_SIZE_PROPERTY = "nifi.volatile.content.repository.max.size"; public static final String BLOCK_SIZE_PROPERTY = "nifi.volatile.content.repository.block.size"; - private final ScheduledExecutorService executor = new FlowEngine(3, "VolatileContentRepository Workers"); + private final ScheduledExecutorService executor = new FlowEngine(3, "VolatileContentRepository Workers", true); private final ConcurrentMap claimMap = new ConcurrentHashMap<>(256); private final AtomicLong repoSize = new AtomicLong(0L); diff --git a/nar-bundles/framework-bundle/framework/resources/src/main/resources/conf/bootstrap.conf b/nar-bundles/framework-bundle/framework/resources/src/main/resources/conf/bootstrap.conf index 6b32b2b6f7..37ec474575 100644 --- a/nar-bundles/framework-bundle/framework/resources/src/main/resources/conf/bootstrap.conf +++ b/nar-bundles/framework-bundle/framework/resources/src/main/resources/conf/bootstrap.conf @@ -13,7 +13,7 @@ java.arg.2=-Xms256m java.arg.3=-Xmx512m # Enable Remote Debugging -#java.arg.2=-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=8000 +#java.arg.debug=-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=8000 # Java command to use when running NiFi java=java diff --git a/nar-bundles/framework-bundle/framework/runtime/src/main/java/org/apache/nifi/BootstrapListener.java b/nar-bundles/framework-bundle/framework/runtime/src/main/java/org/apache/nifi/BootstrapListener.java index 31f336cb83..3393952f46 100644 --- a/nar-bundles/framework-bundle/framework/runtime/src/main/java/org/apache/nifi/BootstrapListener.java +++ b/nar-bundles/framework-bundle/framework/runtime/src/main/java/org/apache/nifi/BootstrapListener.java @@ -27,9 +27,11 @@ import java.net.Socket; import java.net.SocketTimeoutException; import java.nio.charset.StandardCharsets; import java.util.Arrays; +import java.util.UUID; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import org.apache.nifi.util.LimitingInputStream; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -38,14 +40,16 @@ public class BootstrapListener { private final NiFi nifi; private final int bootstrapPort; - + private final String secretKey; + private volatile Listener listener; private volatile ServerSocket serverSocket; - public BootstrapListener(final NiFi nifi, final int port) { + public BootstrapListener(final NiFi nifi, final int bootstrapPort) { this.nifi = nifi; - this.bootstrapPort = port; + this.bootstrapPort = bootstrapPort; + secretKey = UUID.randomUUID().toString(); } public void start() throws IOException { @@ -71,7 +75,7 @@ public class BootstrapListener { socket.setSoTimeout(60000); final OutputStream out = socket.getOutputStream(); - out.write(("PORT " + localPort + "\n").getBytes(StandardCharsets.UTF_8)); + out.write(("PORT " + localPort + " " + secretKey + "\n").getBytes(StandardCharsets.UTF_8)); out.flush(); logger.debug("Awaiting response from Bootstrap..."); @@ -121,6 +125,7 @@ public class BootstrapListener { try { final Socket socket; try { + logger.debug("Listening for Bootstrap Requests"); socket = serverSocket.accept(); } catch (final SocketTimeoutException ste) { if ( stopped ) { @@ -136,6 +141,9 @@ public class BootstrapListener { throw ioe; } + logger.debug("Received connection from Bootstrap"); + socket.setSoTimeout(5000); + executor.submit(new Runnable() { @Override public void run() { @@ -184,27 +192,42 @@ public class BootstrapListener { out.flush(); } - private BootstrapRequest readRequest(final InputStream in) throws IOException { - final BufferedReader reader = new BufferedReader(new InputStreamReader(in)); + + @SuppressWarnings("resource") // we don't want to close the stream, as the caller will do that + private BootstrapRequest readRequest(final InputStream in) throws IOException { + // We want to ensure that we don't try to read data from an InputStream directly + // by a BufferedReader because any user on the system could open a socket and send + // a multi-gigabyte file without any new lines in order to crash the NiFi instance + // (or at least cause OutOfMemoryErrors, which can wreak havoc on the running instance). + // So we will limit the Input Stream to only 4 KB, which should be plenty for any request. + final LimitingInputStream limitingIn = new LimitingInputStream(in, 4096); + final BufferedReader reader = new BufferedReader(new InputStreamReader(limitingIn)); final String line = reader.readLine(); final String[] splits = line.split(" "); if ( splits.length < 0 ) { - throw new IOException("Received invalid command from NiFi: " + line); + throw new IOException("Received invalid request from Bootstrap: " + line); } final String requestType = splits[0]; final String[] args; if ( splits.length == 1 ) { - args = new String[0]; + throw new IOException("Received invalid request from Bootstrap; request did not have a secret key; request type = " + requestType); + } else if ( splits.length == 2 ) { + args = new String[0]; } else { - args = Arrays.copyOfRange(splits, 1, splits.length); + args = Arrays.copyOfRange(splits, 2, splits.length); + } + + final String requestKey = splits[1]; + if ( !secretKey.equals(requestKey) ) { + throw new IOException("Received invalid Secret Key for request type " + requestType); } try { return new BootstrapRequest(requestType, args); } catch (final Exception e) { - throw new IOException("Received invalid request from bootstrap; request type = " + requestType); + throw new IOException("Received invalid request from Bootstrap; request type = " + requestType); } } @@ -227,7 +250,8 @@ public class BootstrapListener { return requestType; } - public String[] getArgs() { + @SuppressWarnings("unused") + public String[] getArgs() { return args; } } diff --git a/nar-bundles/framework-bundle/framework/runtime/src/main/java/org/apache/nifi/NiFi.java b/nar-bundles/framework-bundle/framework/runtime/src/main/java/org/apache/nifi/NiFi.java index 13cd4d691b..98489af16f 100644 --- a/nar-bundles/framework-bundle/framework/runtime/src/main/java/org/apache/nifi/NiFi.java +++ b/nar-bundles/framework-bundle/framework/runtime/src/main/java/org/apache/nifi/NiFi.java @@ -36,7 +36,6 @@ import org.apache.nifi.nar.NarClassLoaders; import org.apache.nifi.nar.NarUnpacker; import org.apache.nifi.util.FileUtils; import org.apache.nifi.util.NiFiProperties; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.slf4j.bridge.SLF4JBridgeHandler; @@ -61,7 +60,6 @@ public class NiFi { // register the shutdown hook Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() { - @Override public void run() { // shutdown the jetty server diff --git a/nar-bundles/framework-bundle/framework/runtime/src/main/java/org/apache/nifi/util/LimitingInputStream.java b/nar-bundles/framework-bundle/framework/runtime/src/main/java/org/apache/nifi/util/LimitingInputStream.java new file mode 100644 index 0000000000..ce3a6dbff0 --- /dev/null +++ b/nar-bundles/framework-bundle/framework/runtime/src/main/java/org/apache/nifi/util/LimitingInputStream.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.util; + +import java.io.IOException; +import java.io.InputStream; + +public class LimitingInputStream extends InputStream { + + private final InputStream in; + private final long limit; + private long bytesRead = 0; + + public LimitingInputStream(final InputStream in, final long limit) { + this.in = in; + this.limit = limit; + } + + @Override + public int read() throws IOException { + if (bytesRead >= limit) { + return -1; + } + + final int val = in.read(); + if (val > -1) { + bytesRead++; + } + return val; + } + + @Override + public int read(final byte[] b) throws IOException { + if (bytesRead >= limit) { + return -1; + } + + final int maxToRead = (int) Math.min(b.length, limit - bytesRead); + + final int val = in.read(b, 0, maxToRead); + if (val > 0) { + bytesRead += val; + } + return val; + } + + @Override + public int read(byte[] b, int off, int len) throws IOException { + if (bytesRead >= limit) { + return -1; + } + + final int maxToRead = (int) Math.min(len, limit - bytesRead); + + final int val = in.read(b, off, maxToRead); + if (val > 0) { + bytesRead += val; + } + return val; + } + + @Override + public long skip(final long n) throws IOException { + final long skipped = in.skip(Math.min(n, limit - bytesRead)); + bytesRead += skipped; + return skipped; + } + + @Override + public int available() throws IOException { + return in.available(); + } + + @Override + public void close() throws IOException { + in.close(); + } + + @Override + public void mark(int readlimit) { + in.mark(readlimit); + } + + @Override + public boolean markSupported() { + return in.markSupported(); + } + + @Override + public void reset() throws IOException { + in.reset(); + } +} diff --git a/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/BootstrapCodec.java b/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/BootstrapCodec.java index fb109305ff..f03bf1edcd 100644 --- a/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/BootstrapCodec.java +++ b/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/BootstrapCodec.java @@ -64,7 +64,7 @@ public class BootstrapCodec { private void processRequest(final String cmd, final String[] args) throws InvalidCommandException, IOException { switch (cmd) { case "PORT": { - if ( args.length != 1 ) { + if ( args.length != 2 ) { throw new InvalidCommandException(); } @@ -78,8 +78,10 @@ public class BootstrapCodec { if ( port < 1 || port > 65535 ) { throw new InvalidCommandException("Invalid Port number; should be integer between 1 and 65535"); } + + final String secretKey = args[1]; - runner.setNiFiCommandControlPort(port); + runner.setNiFiCommandControlPort(port, secretKey); writer.write("OK"); writer.newLine(); writer.flush(); diff --git a/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/NiFiListener.java b/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/NiFiListener.java index c83135199f..f05d45a395 100644 --- a/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/NiFiListener.java +++ b/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/NiFiListener.java @@ -17,6 +17,7 @@ package org.apache.nifi.bootstrap; import java.io.IOException; +import java.io.InputStream; import java.net.InetSocketAddress; import java.net.ServerSocket; import java.net.Socket; @@ -24,6 +25,8 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; +import org.apache.nifi.bootstrap.util.LimitingInputStream; + public class NiFiListener { private ServerSocket serverSocket; private volatile Listener listener; @@ -92,17 +95,26 @@ public class NiFiListener { throw ioe; } - executor.submit(new Runnable() { @Override public void run() { try { - final BootstrapCodec codec = new BootstrapCodec(runner, socket.getInputStream(), socket.getOutputStream()); + // we want to ensure that we don't try to read data from an InputStream directly + // by a BufferedReader because any user on the system could open a socket and send + // a multi-gigabyte file without any new lines in order to crash the Bootstrap, + // which in turn may cause the Shutdown Hook to shutdown NiFi. + // So we will limit the amount of data to read to 4 KB + final InputStream limitingIn = new LimitingInputStream(socket.getInputStream(), 4096); + final BootstrapCodec codec = new BootstrapCodec(runner, limitingIn, socket.getOutputStream()); codec.communicate(); - socket.close(); } catch (final Throwable t) { System.out.println("Failed to communicate with NiFi due to " + t); t.printStackTrace(); + } finally { + try { + socket.close(); + } catch (final IOException ioe) { + } } } }); diff --git a/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/RunNiFi.java b/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/RunNiFi.java index a5987bc016..a230711265 100644 --- a/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/RunNiFi.java +++ b/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/RunNiFi.java @@ -31,7 +31,10 @@ import java.lang.reflect.Field; import java.net.InetSocketAddress; import java.net.Socket; import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.attribute.PosixFilePermission; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -205,6 +208,20 @@ public class RunNiFi { private synchronized void saveProperties(final Properties nifiProps) throws IOException { final File statusFile = getStatusFile(); + if ( statusFile.exists() && !statusFile.delete() ) { + logger.warning("Failed to delete " + statusFile); + } + + if ( !statusFile.createNewFile() ) { + throw new IOException("Failed to create file " + statusFile); + } + + try { + Files.setPosixFilePermissions(statusFile.toPath(), Collections.singleton(PosixFilePermission.OWNER_READ)); + } catch (final Exception e) { + logger.warning("Failed to set permissions so that only the owner can read status file " + statusFile + "; this may allows others to have access to the key needed to communicate with NiFi. Permissions should be changed so that only the owner can read this file"); + } + try (final FileOutputStream fos = new FileOutputStream(statusFile)) { nifiProps.store(fos, null); fos.getFD().sync(); @@ -213,16 +230,16 @@ public class RunNiFi { logger.fine("Saved Properties " + nifiProps + " to " + statusFile); } - private boolean isPingSuccessful(final int port) { + private boolean isPingSuccessful(final int port, final String secretKey) { logger.fine("Pinging " + port); try (final Socket socket = new Socket("localhost", port)) { final OutputStream out = socket.getOutputStream(); - out.write((PING_CMD + "\n").getBytes(StandardCharsets.UTF_8)); + out.write((PING_CMD + " " + secretKey + "\n").getBytes(StandardCharsets.UTF_8)); out.flush(); logger.fine("Sent PING command"); - + socket.setSoTimeout(5000); final InputStream in = socket.getInputStream(); final BufferedReader reader = new BufferedReader(new InputStreamReader(in)); final String response = reader.readLine(); @@ -245,7 +262,7 @@ public class RunNiFi { } final int port = Integer.parseInt(portVal); - final boolean success = isPingSuccessful(port); + final boolean success = isPingSuccessful(port, props.getProperty("secret.key")); if ( success ) { logger.fine("Successful PING on port " + port); return port; @@ -271,10 +288,7 @@ public class RunNiFi { // We use the "ps" command to check if the process is still running. final ProcessBuilder builder = new ProcessBuilder(); - // ps -p -o comm= - // -> -p to filter just the pid we care about - // -> -o comm= to remove headers from the output - builder.command("ps", "-p", pid, "-o", "comm="); + builder.command("ps", "-p", pid, "--no-headers"); final Process proc = builder.start(); // Read how many lines are output by the 'ps' command @@ -321,6 +335,7 @@ public class RunNiFi { final String portValue = props.getProperty("port"); final String pid = props.getProperty("pid"); + final String secretKey = props.getProperty("secret.key"); if ( portValue == null && pid == null ) { return new Status(null, null, false, false); @@ -331,7 +346,7 @@ public class RunNiFi { if ( portValue != null ) { try { port = Integer.parseInt(portValue); - pingSuccess = isPingSuccessful(port); + pingSuccess = isPingSuccessful(port, secretKey); } catch (final NumberFormatException nfe) { return new Status(null, null, false, false); } @@ -373,14 +388,19 @@ public class RunNiFi { return; } + final Properties nifiProps = loadProperties(); + final String secretKey = nifiProps.getProperty("secret.key"); + try (final Socket socket = new Socket()) { + logger.fine("Connecting to NiFi instance"); socket.setSoTimeout(60000); socket.connect(new InetSocketAddress("localhost", port)); + logger.fine("Established connection to NiFi instance."); socket.setSoTimeout(60000); logger.fine("Sending SHUTDOWN Command to port " + port); final OutputStream out = socket.getOutputStream(); - out.write((SHUTDOWN_CMD + "\n").getBytes(StandardCharsets.UTF_8)); + out.write((SHUTDOWN_CMD + " " + secretKey + "\n").getBytes(StandardCharsets.UTF_8)); out.flush(); final InputStream in = socket.getInputStream(); @@ -392,10 +412,8 @@ public class RunNiFi { if ( SHUTDOWN_CMD.equals(response) ) { logger.info("Apache NiFi has accepted the Shutdown Command and is shutting down now"); - final Properties nifiProps = loadProperties(); final String pid = nifiProps.getProperty("pid"); if ( pid != null ) { - final Properties bootstrapProperties = new Properties(); try (final FileInputStream fis = new FileInputStream(bootstrapConfigFile)) { bootstrapProperties.load(fis); @@ -418,7 +436,7 @@ public class RunNiFi { if ( isProcessRunning(pid) ) { logger.warning("NiFi has not finished shutting down after " + gracefulShutdownSeconds + " seconds. Killing process."); try { - killProcess(pid); + killProcessTree(pid); } catch (final IOException ioe) { logger.severe("Failed to kill Process with PID " + pid); } @@ -448,7 +466,31 @@ public class RunNiFi { } - private static void killProcess(final String pid) throws IOException { + private static List getChildProcesses(final String ppid) throws IOException { + final Process proc = Runtime.getRuntime().exec(new String[] {"ps", "-o", "pid", "--no-headers", "--ppid", ppid}); + final List childPids = new ArrayList<>(); + try (final InputStream in = proc.getInputStream(); + final BufferedReader reader = new BufferedReader(new InputStreamReader(in))) { + + String line; + while ((line = reader.readLine()) != null) { + childPids.add(line.trim()); + } + } + + return childPids; + } + + private void killProcessTree(final String pid) throws IOException { + logger.fine("Killing Process Tree for PID " + pid); + + final List children = getChildProcesses(pid); + logger.fine("Children of PID " + pid + ": " + children); + + for ( final String childPid : children ) { + killProcessTree(childPid); + } + Runtime.getRuntime().exec(new String[] {"kill", "-9", pid}); } @@ -620,7 +662,7 @@ public class RunNiFi { nifiPid = pid; final Properties nifiProps = new Properties(); nifiProps.setProperty("pid", String.valueOf(nifiPid)); - saveProperties(properties); + saveProperties(nifiProps); } ShutdownHook shutdownHook = new ShutdownHook(process, this, gracefulShutdownSeconds); @@ -651,7 +693,7 @@ public class RunNiFi { nifiPid = pid; final Properties nifiProps = new Properties(); nifiProps.setProperty("pid", String.valueOf(nifiPid)); - saveProperties(properties); + saveProperties(nifiProps); } shutdownHook = new ShutdownHook(process, this, gracefulShutdownSeconds); @@ -677,7 +719,7 @@ public class RunNiFi { nifiPid = pid; final Properties nifiProps = new Properties(); nifiProps.setProperty("pid", String.valueOf(nifiPid)); - saveProperties(properties); + saveProperties(nifiProps); } boolean started = waitForStart(); @@ -758,9 +800,8 @@ public class RunNiFi { this.autoRestartNiFi = restart; } - void setNiFiCommandControlPort(final int port) { + void setNiFiCommandControlPort(final int port, final String secretKey) { this.ccPort = port; - final File statusFile = getStatusFile(); final Properties nifiProps = new Properties(); @@ -768,6 +809,8 @@ public class RunNiFi { nifiProps.setProperty("pid", String.valueOf(nifiPid)); } nifiProps.setProperty("port", String.valueOf(ccPort)); + nifiProps.setProperty("secret.key", secretKey); + try { saveProperties(nifiProps); } catch (final IOException ioe) { diff --git a/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/util/LimitingInputStream.java b/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/util/LimitingInputStream.java new file mode 100644 index 0000000000..214934222c --- /dev/null +++ b/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/util/LimitingInputStream.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.bootstrap.util; + +import java.io.IOException; +import java.io.InputStream; + +public class LimitingInputStream extends InputStream { + + private final InputStream in; + private final long limit; + private long bytesRead = 0; + + public LimitingInputStream(final InputStream in, final long limit) { + this.in = in; + this.limit = limit; + } + + @Override + public int read() throws IOException { + if (bytesRead >= limit) { + return -1; + } + + final int val = in.read(); + if (val > -1) { + bytesRead++; + } + return val; + } + + @Override + public int read(final byte[] b) throws IOException { + if (bytesRead >= limit) { + return -1; + } + + final int maxToRead = (int) Math.min(b.length, limit - bytesRead); + + final int val = in.read(b, 0, maxToRead); + if (val > 0) { + bytesRead += val; + } + return val; + } + + @Override + public int read(byte[] b, int off, int len) throws IOException { + if (bytesRead >= limit) { + return -1; + } + + final int maxToRead = (int) Math.min(len, limit - bytesRead); + + final int val = in.read(b, off, maxToRead); + if (val > 0) { + bytesRead += val; + } + return val; + } + + @Override + public long skip(final long n) throws IOException { + final long skipped = in.skip(Math.min(n, limit - bytesRead)); + bytesRead += skipped; + return skipped; + } + + @Override + public int available() throws IOException { + return in.available(); + } + + @Override + public void close() throws IOException { + in.close(); + } + + @Override + public void mark(int readlimit) { + in.mark(readlimit); + } + + @Override + public boolean markSupported() { + return in.markSupported(); + } + + @Override + public void reset() throws IOException { + in.reset(); + } +}