From 1f0e0a21705e1ef3d9194ca53590f4d51ef81a35 Mon Sep 17 00:00:00 2001 From: Yannick Welsch Date: Fri, 20 Jan 2017 12:26:07 +0100 Subject: [PATCH] Close InputStream when receiving cluster state in PublishClusterStateAction (#22711) Not closing the InputStream will leak native memory as the DeflateCompressor/Inflater won't be closed. --- .../zen/PublishClusterStateAction.java | 56 ++++++++++--------- 1 file changed, 30 insertions(+), 26 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/discovery/zen/PublishClusterStateAction.java b/core/src/main/java/org/elasticsearch/discovery/zen/PublishClusterStateAction.java index 11ef5b9ee14..4150783a8fd 100644 --- a/core/src/main/java/org/elasticsearch/discovery/zen/PublishClusterStateAction.java +++ b/core/src/main/java/org/elasticsearch/discovery/zen/PublishClusterStateAction.java @@ -20,6 +20,7 @@ package org.elasticsearch.discovery.zen; import org.apache.logging.log4j.message.ParameterizedMessage; +import org.apache.lucene.util.IOUtils; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.Version; import org.elasticsearch.cluster.ClusterChangedEvent; @@ -375,34 +376,37 @@ public class PublishClusterStateAction extends AbstractComponent { protected void handleIncomingClusterStateRequest(BytesTransportRequest request, TransportChannel channel) throws IOException { Compressor compressor = CompressorFactory.compressor(request.bytes()); - StreamInput in; - if (compressor != null) { - in = compressor.streamInput(request.bytes().streamInput()); - } else { - in = request.bytes().streamInput(); - } - in = new NamedWriteableAwareStreamInput(in, namedWriteableRegistry); - in.setVersion(request.version()); - synchronized (lastSeenClusterStateMutex) { - final ClusterState incomingState; - // If true we received full cluster state - otherwise diffs - if (in.readBoolean()) { - incomingState = ClusterState.readFrom(in, clusterStateSupplier.get().nodes().getLocalNode()); - logger.debug("received full cluster state version [{}] with size [{}]", incomingState.version(), request.bytes().length()); - } else if (lastSeenClusterState != null) { - Diff diff = ClusterState.readDiffFrom(in, lastSeenClusterState.nodes().getLocalNode()); - incomingState = diff.apply(lastSeenClusterState); - logger.debug("received diff cluster state version [{}] with uuid [{}], diff size [{}]", - incomingState.version(), incomingState.stateUUID(), request.bytes().length()); - } else { - logger.debug("received diff for but don't have any local cluster state - requesting full state"); - throw new IncompatibleClusterStateVersionException("have no local cluster state"); + StreamInput in = request.bytes().streamInput(); + try { + if (compressor != null) { + in = compressor.streamInput(in); } - // sanity check incoming state - validateIncomingState(incomingState, lastSeenClusterState); + in = new NamedWriteableAwareStreamInput(in, namedWriteableRegistry); + in.setVersion(request.version()); + synchronized (lastSeenClusterStateMutex) { + final ClusterState incomingState; + // If true we received full cluster state - otherwise diffs + if (in.readBoolean()) { + incomingState = ClusterState.readFrom(in, clusterStateSupplier.get().nodes().getLocalNode()); + logger.debug("received full cluster state version [{}] with size [{}]", incomingState.version(), + request.bytes().length()); + } else if (lastSeenClusterState != null) { + Diff diff = ClusterState.readDiffFrom(in, lastSeenClusterState.nodes().getLocalNode()); + incomingState = diff.apply(lastSeenClusterState); + logger.debug("received diff cluster state version [{}] with uuid [{}], diff size [{}]", + incomingState.version(), incomingState.stateUUID(), request.bytes().length()); + } else { + logger.debug("received diff for but don't have any local cluster state - requesting full state"); + throw new IncompatibleClusterStateVersionException("have no local cluster state"); + } + // sanity check incoming state + validateIncomingState(incomingState, lastSeenClusterState); - pendingStatesQueue.addPending(incomingState); - lastSeenClusterState = incomingState; + pendingStatesQueue.addPending(incomingState); + lastSeenClusterState = incomingState; + } + } finally { + IOUtils.close(in); } channel.sendResponse(TransportResponse.Empty.INSTANCE); }