NIFI-3648 removed message copying when not in debug mode. This closes #1637.

Signed-off-by: Mark Payne <markap14@hotmail.com>
This commit is contained in:
Mike Moser 2017-03-30 14:34:26 +00:00 committed by Mark Payne
parent 674c9e4687
commit bcac2766bc
1 changed files with 14 additions and 10 deletions

View File

@ -24,7 +24,6 @@ import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.UUID; import java.util.UUID;
import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;
import org.apache.nifi.cluster.protocol.NodeIdentifier; import org.apache.nifi.cluster.protocol.NodeIdentifier;
import org.apache.nifi.cluster.protocol.ProtocolContext; import org.apache.nifi.cluster.protocol.ProtocolContext;
@ -45,6 +44,7 @@ import org.apache.nifi.io.socket.SocketListener;
import org.apache.nifi.reporting.Bulletin; import org.apache.nifi.reporting.Bulletin;
import org.apache.nifi.reporting.BulletinRepository; import org.apache.nifi.reporting.BulletinRepository;
import org.apache.nifi.security.util.CertificateUtils; import org.apache.nifi.security.util.CertificateUtils;
import org.apache.nifi.stream.io.ByteCountingInputStream;
import org.apache.nifi.util.StopWatch; import org.apache.nifi.util.StopWatch;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -121,9 +121,7 @@ public class SocketProtocolListener extends SocketListener implements ProtocolLi
@Override @Override
public void dispatchRequest(final Socket socket) { public void dispatchRequest(final Socket socket) {
byte[] receivedMessage = null;
String hostname = null; String hostname = null;
final int maxMsgBuffer = 1024 * 1024; // don't buffer more than 1 MB of the message
try { try {
final StopWatch stopWatch = new StopWatch(true); final StopWatch stopWatch = new StopWatch(true);
hostname = socket.getInetAddress().getHostName(); hostname = socket.getInetAddress().getHostName();
@ -134,15 +132,21 @@ public class SocketProtocolListener extends SocketListener implements ProtocolLi
// unmarshall message // unmarshall message
final ProtocolMessageUnmarshaller<ProtocolMessage> unmarshaller = protocolContext.createUnmarshaller(); final ProtocolMessageUnmarshaller<ProtocolMessage> unmarshaller = protocolContext.createUnmarshaller();
final InputStream inStream = socket.getInputStream(); final ByteCountingInputStream countingIn = new ByteCountingInputStream(socket.getInputStream());
final CopyingInputStream copyingInputStream = new CopyingInputStream(inStream, maxMsgBuffer); // don't copy more than 1 MB InputStream wrappedInStream = countingIn;
if (logger.isDebugEnabled()) {
final int maxMsgBuffer = 1024 * 1024; // don't buffer more than 1 MB of the message
final CopyingInputStream copyingInputStream = new CopyingInputStream(wrappedInStream, maxMsgBuffer);
wrappedInStream = copyingInputStream;
}
final ProtocolMessage request; final ProtocolMessage request;
try { try {
request = unmarshaller.unmarshal(copyingInputStream); request = unmarshaller.unmarshal(wrappedInStream);
} finally { } finally {
receivedMessage = copyingInputStream.getBytesRead(); if (logger.isDebugEnabled() && wrappedInStream instanceof CopyingInputStream) {
if (logger.isDebugEnabled()) { final CopyingInputStream copyingInputStream = (CopyingInputStream) wrappedInStream;
byte[] receivedMessage = copyingInputStream.getBytesRead();
logger.debug("Received message: " + new String(receivedMessage)); logger.debug("Received message: " + new String(receivedMessage));
} }
} }
@ -181,8 +185,8 @@ public class SocketProtocolListener extends SocketListener implements ProtocolLi
stopWatch.stop(); stopWatch.stop();
final NodeIdentifier nodeId = getNodeIdentifier(request); final NodeIdentifier nodeId = getNodeIdentifier(request);
final String from = nodeId == null ? hostname : nodeId.toString(); final String from = nodeId == null ? hostname : nodeId.toString();
logger.info("Finished processing request {} (type={}, length={} bytes) from {} in {} millis", logger.info("Finished processing request {} (type={}, length={} bytes) from {} in {}",
requestId, request.getType(), receivedMessage.length, from, stopWatch.getDuration(TimeUnit.MILLISECONDS)); requestId, request.getType(), countingIn.getBytesRead(), from, stopWatch.getDuration());
} catch (final IOException | ProtocolException e) { } catch (final IOException | ProtocolException e) {
logger.warn("Failed processing protocol message from " + hostname + " due to " + e, e); logger.warn("Failed processing protocol message from " + hostname + " due to " + e, e);