mirror of
https://github.com/honeymoose/OpenSearch.git
synced 2025-02-22 21:05:23 +00:00
Improve MockTcpTransport memory usage (#35402)
The MockTcpTransport is not friendly in regards to memory usage. It must allocate multiple byte arrays for every message. This improves the memory situation by failing fast if the message is improperly formatted. Additionally, it uses reusable big arrays for at least half of the allocated byte arrays.
This commit is contained in:
parent
67f9e8fa23
commit
ba478827ad
@ -179,7 +179,7 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements
|
||||
private final CircuitBreakerService circuitBreakerService;
|
||||
private final Version version;
|
||||
protected final ThreadPool threadPool;
|
||||
private final BigArrays bigArrays;
|
||||
protected final BigArrays bigArrays;
|
||||
protected final NetworkService networkService;
|
||||
protected final Set<ProfileSettings> profileSettings;
|
||||
|
||||
|
@ -18,15 +18,17 @@
|
||||
*/
|
||||
package org.elasticsearch.transport;
|
||||
|
||||
import org.elasticsearch.cli.SuppressForbidden;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.core.internal.io.IOUtils;
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.cli.SuppressForbidden;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.common.bytes.BytesArray;
|
||||
import org.elasticsearch.common.bytes.BytesReference;
|
||||
import org.elasticsearch.common.concurrent.CompletableContext;
|
||||
import org.elasticsearch.common.io.stream.BytesStreamOutput;
|
||||
import org.elasticsearch.common.io.stream.InputStreamStreamInput;
|
||||
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
|
||||
import org.elasticsearch.common.io.stream.ReleasableBytesStreamOutput;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.network.NetworkService;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
@ -35,15 +37,16 @@ import org.elasticsearch.common.util.BigArrays;
|
||||
import org.elasticsearch.common.util.CancellableThreads;
|
||||
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
|
||||
import org.elasticsearch.common.util.concurrent.EsExecutors;
|
||||
import org.elasticsearch.core.internal.io.IOUtils;
|
||||
import org.elasticsearch.indices.breaker.CircuitBreakerService;
|
||||
import org.elasticsearch.mocksocket.MockServerSocket;
|
||||
import org.elasticsearch.mocksocket.MockSocket;
|
||||
import org.elasticsearch.common.concurrent.CompletableContext;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
|
||||
import java.io.BufferedInputStream;
|
||||
import java.io.BufferedOutputStream;
|
||||
import java.io.Closeable;
|
||||
import java.io.EOFException;
|
||||
import java.io.IOException;
|
||||
import java.io.OutputStream;
|
||||
import java.net.InetSocketAddress;
|
||||
@ -139,24 +142,26 @@ public class MockTcpTransport extends TcpTransport {
|
||||
|
||||
private void readMessage(MockChannel mockChannel, StreamInput input) throws IOException {
|
||||
Socket socket = mockChannel.activeChannel;
|
||||
byte[] minimalHeader = new byte[TcpHeader.MARKER_BYTES_SIZE];
|
||||
int firstByte = input.read();
|
||||
if (firstByte == -1) {
|
||||
byte[] minimalHeader = new byte[TcpHeader.MARKER_BYTES_SIZE + TcpHeader.MESSAGE_LENGTH_SIZE];
|
||||
try {
|
||||
input.readFully(minimalHeader);
|
||||
} catch (EOFException eof) {
|
||||
throw new IOException("Connection reset by peer");
|
||||
}
|
||||
minimalHeader[0] = (byte) firstByte;
|
||||
minimalHeader[1] = (byte) input.read();
|
||||
int msgSize = input.readInt();
|
||||
|
||||
// Read message length will throw stream corrupted exception if the marker bytes incorrect
|
||||
int msgSize = TcpTransport.readMessageLength(new BytesArray(minimalHeader));
|
||||
if (msgSize == -1) {
|
||||
socket.getOutputStream().flush();
|
||||
} else {
|
||||
BytesStreamOutput output = new BytesStreamOutput();
|
||||
final byte[] buffer = new byte[msgSize];
|
||||
input.readFully(buffer);
|
||||
output.write(minimalHeader);
|
||||
output.writeInt(msgSize);
|
||||
output.write(buffer);
|
||||
consumeNetworkReads(mockChannel, output.bytes());
|
||||
int expectedSize = TcpHeader.MARKER_BYTES_SIZE + TcpHeader.MESSAGE_LENGTH_SIZE + msgSize;
|
||||
try (BytesStreamOutput output = new ReleasableBytesStreamOutput(expectedSize, bigArrays)) {
|
||||
output.write(minimalHeader);
|
||||
output.write(buffer);
|
||||
consumeNetworkReads(mockChannel, output.bytes());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user