Close InputStream when receiving cluster state in PublishClusterStateAction (#22711)
Not closing the InputStream will leak native memory as the DeflateCompressor/Inflater won't be closed.
This commit is contained in:
parent
5d806bf93e
commit
1f0e0a2170
|
@ -20,6 +20,7 @@
|
||||||
package org.elasticsearch.discovery.zen;
|
package org.elasticsearch.discovery.zen;
|
||||||
|
|
||||||
import org.apache.logging.log4j.message.ParameterizedMessage;
|
import org.apache.logging.log4j.message.ParameterizedMessage;
|
||||||
|
import org.apache.lucene.util.IOUtils;
|
||||||
import org.elasticsearch.ElasticsearchException;
|
import org.elasticsearch.ElasticsearchException;
|
||||||
import org.elasticsearch.Version;
|
import org.elasticsearch.Version;
|
||||||
import org.elasticsearch.cluster.ClusterChangedEvent;
|
import org.elasticsearch.cluster.ClusterChangedEvent;
|
||||||
|
@ -375,34 +376,37 @@ public class PublishClusterStateAction extends AbstractComponent {
|
||||||
|
|
||||||
protected void handleIncomingClusterStateRequest(BytesTransportRequest request, TransportChannel channel) throws IOException {
|
protected void handleIncomingClusterStateRequest(BytesTransportRequest request, TransportChannel channel) throws IOException {
|
||||||
Compressor compressor = CompressorFactory.compressor(request.bytes());
|
Compressor compressor = CompressorFactory.compressor(request.bytes());
|
||||||
StreamInput in;
|
StreamInput in = request.bytes().streamInput();
|
||||||
if (compressor != null) {
|
try {
|
||||||
in = compressor.streamInput(request.bytes().streamInput());
|
if (compressor != null) {
|
||||||
} else {
|
in = compressor.streamInput(in);
|
||||||
in = request.bytes().streamInput();
|
|
||||||
}
|
|
||||||
in = new NamedWriteableAwareStreamInput(in, namedWriteableRegistry);
|
|
||||||
in.setVersion(request.version());
|
|
||||||
synchronized (lastSeenClusterStateMutex) {
|
|
||||||
final ClusterState incomingState;
|
|
||||||
// If true we received full cluster state - otherwise diffs
|
|
||||||
if (in.readBoolean()) {
|
|
||||||
incomingState = ClusterState.readFrom(in, clusterStateSupplier.get().nodes().getLocalNode());
|
|
||||||
logger.debug("received full cluster state version [{}] with size [{}]", incomingState.version(), request.bytes().length());
|
|
||||||
} else if (lastSeenClusterState != null) {
|
|
||||||
Diff<ClusterState> diff = ClusterState.readDiffFrom(in, lastSeenClusterState.nodes().getLocalNode());
|
|
||||||
incomingState = diff.apply(lastSeenClusterState);
|
|
||||||
logger.debug("received diff cluster state version [{}] with uuid [{}], diff size [{}]",
|
|
||||||
incomingState.version(), incomingState.stateUUID(), request.bytes().length());
|
|
||||||
} else {
|
|
||||||
logger.debug("received diff for but don't have any local cluster state - requesting full state");
|
|
||||||
throw new IncompatibleClusterStateVersionException("have no local cluster state");
|
|
||||||
}
|
}
|
||||||
// sanity check incoming state
|
in = new NamedWriteableAwareStreamInput(in, namedWriteableRegistry);
|
||||||
validateIncomingState(incomingState, lastSeenClusterState);
|
in.setVersion(request.version());
|
||||||
|
synchronized (lastSeenClusterStateMutex) {
|
||||||
|
final ClusterState incomingState;
|
||||||
|
// If true we received full cluster state - otherwise diffs
|
||||||
|
if (in.readBoolean()) {
|
||||||
|
incomingState = ClusterState.readFrom(in, clusterStateSupplier.get().nodes().getLocalNode());
|
||||||
|
logger.debug("received full cluster state version [{}] with size [{}]", incomingState.version(),
|
||||||
|
request.bytes().length());
|
||||||
|
} else if (lastSeenClusterState != null) {
|
||||||
|
Diff<ClusterState> diff = ClusterState.readDiffFrom(in, lastSeenClusterState.nodes().getLocalNode());
|
||||||
|
incomingState = diff.apply(lastSeenClusterState);
|
||||||
|
logger.debug("received diff cluster state version [{}] with uuid [{}], diff size [{}]",
|
||||||
|
incomingState.version(), incomingState.stateUUID(), request.bytes().length());
|
||||||
|
} else {
|
||||||
|
logger.debug("received diff for but don't have any local cluster state - requesting full state");
|
||||||
|
throw new IncompatibleClusterStateVersionException("have no local cluster state");
|
||||||
|
}
|
||||||
|
// sanity check incoming state
|
||||||
|
validateIncomingState(incomingState, lastSeenClusterState);
|
||||||
|
|
||||||
pendingStatesQueue.addPending(incomingState);
|
pendingStatesQueue.addPending(incomingState);
|
||||||
lastSeenClusterState = incomingState;
|
lastSeenClusterState = incomingState;
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
IOUtils.close(in);
|
||||||
}
|
}
|
||||||
channel.sendResponse(TransportResponse.Empty.INSTANCE);
|
channel.sendResponse(TransportResponse.Empty.INSTANCE);
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue