* This class' methods are all effectively `static` => make them `static` and stop instantiating it needlessly
This commit is contained in:
parent
cd441f6906
commit
a48242c371
|
@ -49,7 +49,6 @@ public class InboundHandler {
|
||||||
private final OutboundHandler outboundHandler;
|
private final OutboundHandler outboundHandler;
|
||||||
private final CircuitBreakerService circuitBreakerService;
|
private final CircuitBreakerService circuitBreakerService;
|
||||||
private final InboundMessage.Reader reader;
|
private final InboundMessage.Reader reader;
|
||||||
private final TransportLogger transportLogger;
|
|
||||||
private final TransportHandshaker handshaker;
|
private final TransportHandshaker handshaker;
|
||||||
private final TransportKeepAlive keepAlive;
|
private final TransportKeepAlive keepAlive;
|
||||||
|
|
||||||
|
@ -58,13 +57,11 @@ public class InboundHandler {
|
||||||
private volatile TransportMessageListener messageListener = TransportMessageListener.NOOP_LISTENER;
|
private volatile TransportMessageListener messageListener = TransportMessageListener.NOOP_LISTENER;
|
||||||
|
|
||||||
InboundHandler(ThreadPool threadPool, OutboundHandler outboundHandler, InboundMessage.Reader reader,
|
InboundHandler(ThreadPool threadPool, OutboundHandler outboundHandler, InboundMessage.Reader reader,
|
||||||
CircuitBreakerService circuitBreakerService, TransportLogger transportLogger, TransportHandshaker handshaker,
|
CircuitBreakerService circuitBreakerService, TransportHandshaker handshaker, TransportKeepAlive keepAlive) {
|
||||||
TransportKeepAlive keepAlive) {
|
|
||||||
this.threadPool = threadPool;
|
this.threadPool = threadPool;
|
||||||
this.outboundHandler = outboundHandler;
|
this.outboundHandler = outboundHandler;
|
||||||
this.circuitBreakerService = circuitBreakerService;
|
this.circuitBreakerService = circuitBreakerService;
|
||||||
this.reader = reader;
|
this.reader = reader;
|
||||||
this.transportLogger = transportLogger;
|
|
||||||
this.handshaker = handshaker;
|
this.handshaker = handshaker;
|
||||||
this.keepAlive = keepAlive;
|
this.keepAlive = keepAlive;
|
||||||
}
|
}
|
||||||
|
@ -98,7 +95,7 @@ public class InboundHandler {
|
||||||
|
|
||||||
void inboundMessage(TcpChannel channel, BytesReference message) throws Exception {
|
void inboundMessage(TcpChannel channel, BytesReference message) throws Exception {
|
||||||
channel.getChannelStats().markAccessed(threadPool.relativeTimeInMillis());
|
channel.getChannelStats().markAccessed(threadPool.relativeTimeInMillis());
|
||||||
transportLogger.logInboundMessage(channel, message);
|
TransportLogger.logInboundMessage(channel, message);
|
||||||
readBytesMetric.inc(message.length() + TcpHeader.MARKER_BYTES_SIZE + TcpHeader.MESSAGE_LENGTH_SIZE);
|
readBytesMetric.inc(message.length() + TcpHeader.MARKER_BYTES_SIZE + TcpHeader.MESSAGE_LENGTH_SIZE);
|
||||||
// Message length of 0 is a ping
|
// Message length of 0 is a ping
|
||||||
if (message.length() != 0) {
|
if (message.length() != 0) {
|
||||||
|
@ -226,7 +223,7 @@ public class InboundHandler {
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
private void handlerResponseError(StreamInput stream, final TransportResponseHandler handler) {
|
private void handlerResponseError(StreamInput stream, final TransportResponseHandler<?> handler) {
|
||||||
Exception error;
|
Exception error;
|
||||||
try {
|
try {
|
||||||
error = stream.readException();
|
error = stream.readException();
|
||||||
|
@ -236,7 +233,7 @@ public class InboundHandler {
|
||||||
handleException(handler, error);
|
handleException(handler, error);
|
||||||
}
|
}
|
||||||
|
|
||||||
private void handleException(final TransportResponseHandler handler, Throwable error) {
|
private void handleException(final TransportResponseHandler<?> handler, Throwable error) {
|
||||||
if (!(error instanceof RemoteTransportException)) {
|
if (!(error instanceof RemoteTransportException)) {
|
||||||
error = new RemoteTransportException(error.getMessage(), error);
|
error = new RemoteTransportException(error.getMessage(), error);
|
||||||
}
|
}
|
||||||
|
@ -255,7 +252,7 @@ public class InboundHandler {
|
||||||
private final TransportRequest request;
|
private final TransportRequest request;
|
||||||
private final TransportChannel transportChannel;
|
private final TransportChannel transportChannel;
|
||||||
|
|
||||||
RequestHandler(RequestHandlerRegistry reg, TransportRequest request, TransportChannel transportChannel) {
|
RequestHandler(RequestHandlerRegistry<?> reg, TransportRequest request, TransportChannel transportChannel) {
|
||||||
this.reg = reg;
|
this.reg = reg;
|
||||||
this.request = request;
|
this.request = request;
|
||||||
this.transportChannel = transportChannel;
|
this.transportChannel = transportChannel;
|
||||||
|
|
|
@ -53,17 +53,14 @@ final class OutboundHandler {
|
||||||
private final String[] features;
|
private final String[] features;
|
||||||
private final ThreadPool threadPool;
|
private final ThreadPool threadPool;
|
||||||
private final BigArrays bigArrays;
|
private final BigArrays bigArrays;
|
||||||
private final TransportLogger transportLogger;
|
|
||||||
private volatile TransportMessageListener messageListener = TransportMessageListener.NOOP_LISTENER;
|
private volatile TransportMessageListener messageListener = TransportMessageListener.NOOP_LISTENER;
|
||||||
|
|
||||||
OutboundHandler(String nodeName, Version version, String[] features, ThreadPool threadPool, BigArrays bigArrays,
|
OutboundHandler(String nodeName, Version version, String[] features, ThreadPool threadPool, BigArrays bigArrays) {
|
||||||
TransportLogger transportLogger) {
|
|
||||||
this.nodeName = nodeName;
|
this.nodeName = nodeName;
|
||||||
this.version = version;
|
this.version = version;
|
||||||
this.features = features;
|
this.features = features;
|
||||||
this.threadPool = threadPool;
|
this.threadPool = threadPool;
|
||||||
this.bigArrays = bigArrays;
|
this.bigArrays = bigArrays;
|
||||||
this.transportLogger = transportLogger;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void sendBytes(TcpChannel channel, BytesReference bytes, ActionListener<Void> listener) {
|
void sendBytes(TcpChannel channel, BytesReference bytes, ActionListener<Void> listener) {
|
||||||
|
@ -201,7 +198,7 @@ final class OutboundHandler {
|
||||||
try {
|
try {
|
||||||
message = messageSupplier.get();
|
message = messageSupplier.get();
|
||||||
messageSize = message.length();
|
messageSize = message.length();
|
||||||
transportLogger.logOutboundMessage(channel, message);
|
TransportLogger.logOutboundMessage(channel, message);
|
||||||
return message;
|
return message;
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
onFailure(e);
|
onFailure(e);
|
||||||
|
|
|
@ -134,7 +134,6 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements
|
||||||
this.threadPool = threadPool;
|
this.threadPool = threadPool;
|
||||||
this.pageCacheRecycler = pageCacheRecycler;
|
this.pageCacheRecycler = pageCacheRecycler;
|
||||||
this.networkService = networkService;
|
this.networkService = networkService;
|
||||||
TransportLogger transportLogger = new TransportLogger();
|
|
||||||
String nodeName = Node.NODE_NAME_SETTING.get(settings);
|
String nodeName = Node.NODE_NAME_SETTING.get(settings);
|
||||||
final Settings defaultFeatures = TransportSettings.DEFAULT_FEATURES_SETTING.get(settings);
|
final Settings defaultFeatures = TransportSettings.DEFAULT_FEATURES_SETTING.get(settings);
|
||||||
String[] features;
|
String[] features;
|
||||||
|
@ -151,7 +150,7 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements
|
||||||
}
|
}
|
||||||
BigArrays bigArrays = new BigArrays(pageCacheRecycler, circuitBreakerService, CircuitBreaker.IN_FLIGHT_REQUESTS);
|
BigArrays bigArrays = new BigArrays(pageCacheRecycler, circuitBreakerService, CircuitBreaker.IN_FLIGHT_REQUESTS);
|
||||||
|
|
||||||
this.outboundHandler = new OutboundHandler(nodeName, version, features, threadPool, bigArrays, transportLogger);
|
this.outboundHandler = new OutboundHandler(nodeName, version, features, threadPool, bigArrays);
|
||||||
this.handshaker = new TransportHandshaker(version, threadPool,
|
this.handshaker = new TransportHandshaker(version, threadPool,
|
||||||
(node, channel, requestId, v) -> outboundHandler.sendRequest(node, channel, requestId,
|
(node, channel, requestId, v) -> outboundHandler.sendRequest(node, channel, requestId,
|
||||||
TransportHandshaker.HANDSHAKE_ACTION_NAME, new TransportHandshaker.HandshakeRequest(version),
|
TransportHandshaker.HANDSHAKE_ACTION_NAME, new TransportHandshaker.HandshakeRequest(version),
|
||||||
|
@ -160,7 +159,7 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements
|
||||||
TransportHandshaker.HANDSHAKE_ACTION_NAME, response, false, true));
|
TransportHandshaker.HANDSHAKE_ACTION_NAME, response, false, true));
|
||||||
InboundMessage.Reader reader = new InboundMessage.Reader(version, namedWriteableRegistry, threadPool.getThreadContext());
|
InboundMessage.Reader reader = new InboundMessage.Reader(version, namedWriteableRegistry, threadPool.getThreadContext());
|
||||||
this.keepAlive = new TransportKeepAlive(threadPool, this.outboundHandler::sendBytes);
|
this.keepAlive = new TransportKeepAlive(threadPool, this.outboundHandler::sendBytes);
|
||||||
this.inboundHandler = new InboundHandler(threadPool, outboundHandler, reader, circuitBreakerService, transportLogger, handshaker,
|
this.inboundHandler = new InboundHandler(threadPool, outboundHandler, reader, circuitBreakerService, handshaker,
|
||||||
keepAlive);
|
keepAlive);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -36,7 +36,7 @@ public final class TransportLogger {
|
||||||
private static final Logger logger = LogManager.getLogger(TransportLogger.class);
|
private static final Logger logger = LogManager.getLogger(TransportLogger.class);
|
||||||
private static final int HEADER_SIZE = TcpHeader.MARKER_BYTES_SIZE + TcpHeader.MESSAGE_LENGTH_SIZE;
|
private static final int HEADER_SIZE = TcpHeader.MARKER_BYTES_SIZE + TcpHeader.MESSAGE_LENGTH_SIZE;
|
||||||
|
|
||||||
void logInboundMessage(TcpChannel channel, BytesReference message) {
|
static void logInboundMessage(TcpChannel channel, BytesReference message) {
|
||||||
if (logger.isTraceEnabled()) {
|
if (logger.isTraceEnabled()) {
|
||||||
try {
|
try {
|
||||||
String logMessage = format(channel, message, "READ");
|
String logMessage = format(channel, message, "READ");
|
||||||
|
@ -47,7 +47,7 @@ public final class TransportLogger {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void logOutboundMessage(TcpChannel channel, BytesReference message) {
|
static void logOutboundMessage(TcpChannel channel, BytesReference message) {
|
||||||
if (logger.isTraceEnabled()) {
|
if (logger.isTraceEnabled()) {
|
||||||
try {
|
try {
|
||||||
if (message.get(0) != 'E') {
|
if (message.get(0) != 'E') {
|
||||||
|
@ -63,7 +63,7 @@ public final class TransportLogger {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private String format(TcpChannel channel, BytesReference message, String event) throws IOException {
|
private static String format(TcpChannel channel, BytesReference message, String event) throws IOException {
|
||||||
final StringBuilder sb = new StringBuilder();
|
final StringBuilder sb = new StringBuilder();
|
||||||
sb.append(channel);
|
sb.append(channel);
|
||||||
int messageLengthWithHeader = HEADER_SIZE + message.length();
|
int messageLengthWithHeader = HEADER_SIZE + message.length();
|
||||||
|
|
|
@ -54,7 +54,6 @@ public class InboundHandlerTests extends ESTestCase {
|
||||||
@Before
|
@Before
|
||||||
public void setUp() throws Exception {
|
public void setUp() throws Exception {
|
||||||
super.setUp();
|
super.setUp();
|
||||||
TransportLogger transportLogger = new TransportLogger();
|
|
||||||
taskManager = new TaskManager(Settings.EMPTY, threadPool, Collections.emptySet());
|
taskManager = new TaskManager(Settings.EMPTY, threadPool, Collections.emptySet());
|
||||||
channel = new FakeTcpChannel(randomBoolean(), buildNewFakeTransportAddress().address(), buildNewFakeTransportAddress().address());
|
channel = new FakeTcpChannel(randomBoolean(), buildNewFakeTransportAddress().address(), buildNewFakeTransportAddress().address());
|
||||||
NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry(Collections.emptyList());
|
NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry(Collections.emptyList());
|
||||||
|
@ -63,10 +62,9 @@ public class InboundHandlerTests extends ESTestCase {
|
||||||
}, (v, f, c, r, r_id) -> {
|
}, (v, f, c, r, r_id) -> {
|
||||||
});
|
});
|
||||||
TransportKeepAlive keepAlive = new TransportKeepAlive(threadPool, TcpChannel::sendMessage);
|
TransportKeepAlive keepAlive = new TransportKeepAlive(threadPool, TcpChannel::sendMessage);
|
||||||
OutboundHandler outboundHandler = new OutboundHandler("node", version, new String[0], threadPool, BigArrays.NON_RECYCLING_INSTANCE,
|
OutboundHandler outboundHandler =
|
||||||
transportLogger);
|
new OutboundHandler("node", version, new String[0], threadPool, BigArrays.NON_RECYCLING_INSTANCE);
|
||||||
handler = new InboundHandler(threadPool, outboundHandler, reader, new NoneCircuitBreakerService(), transportLogger, handshaker,
|
handler = new InboundHandler(threadPool, outboundHandler, reader, new NoneCircuitBreakerService(), handshaker, keepAlive);
|
||||||
keepAlive);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@After
|
@After
|
||||||
|
|
|
@ -62,12 +62,11 @@ public class OutboundHandlerTests extends ESTestCase {
|
||||||
@Before
|
@Before
|
||||||
public void setUp() throws Exception {
|
public void setUp() throws Exception {
|
||||||
super.setUp();
|
super.setUp();
|
||||||
TransportLogger transportLogger = new TransportLogger();
|
|
||||||
channel = new FakeTcpChannel(randomBoolean(), buildNewFakeTransportAddress().address(), buildNewFakeTransportAddress().address());
|
channel = new FakeTcpChannel(randomBoolean(), buildNewFakeTransportAddress().address(), buildNewFakeTransportAddress().address());
|
||||||
TransportAddress transportAddress = buildNewFakeTransportAddress();
|
TransportAddress transportAddress = buildNewFakeTransportAddress();
|
||||||
node = new DiscoveryNode("", transportAddress, Version.CURRENT);
|
node = new DiscoveryNode("", transportAddress, Version.CURRENT);
|
||||||
String[] features = {feature1, feature2};
|
String[] features = {feature1, feature2};
|
||||||
handler = new OutboundHandler("node", Version.CURRENT, features, threadPool, BigArrays.NON_RECYCLING_INSTANCE, transportLogger);
|
handler = new OutboundHandler("node", Version.CURRENT, features, threadPool, BigArrays.NON_RECYCLING_INSTANCE);
|
||||||
}
|
}
|
||||||
|
|
||||||
@After
|
@After
|
||||||
|
|
|
@ -56,8 +56,6 @@ public class TransportLoggerTests extends ESTestCase {
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testLoggingHandler() throws IOException {
|
public void testLoggingHandler() throws IOException {
|
||||||
TransportLogger transportLogger = new TransportLogger();
|
|
||||||
|
|
||||||
final String writePattern =
|
final String writePattern =
|
||||||
".*\\[length: \\d+" +
|
".*\\[length: \\d+" +
|
||||||
", request id: \\d+" +
|
", request id: \\d+" +
|
||||||
|
@ -84,8 +82,8 @@ public class TransportLoggerTests extends ESTestCase {
|
||||||
appender.addExpectation(writeExpectation);
|
appender.addExpectation(writeExpectation);
|
||||||
appender.addExpectation(readExpectation);
|
appender.addExpectation(readExpectation);
|
||||||
BytesReference bytesReference = buildRequest();
|
BytesReference bytesReference = buildRequest();
|
||||||
transportLogger.logInboundMessage(mock(TcpChannel.class), bytesReference.slice(6, bytesReference.length() - 6));
|
TransportLogger.logInboundMessage(mock(TcpChannel.class), bytesReference.slice(6, bytesReference.length() - 6));
|
||||||
transportLogger.logOutboundMessage(mock(TcpChannel.class), bytesReference);
|
TransportLogger.logOutboundMessage(mock(TcpChannel.class), bytesReference);
|
||||||
appender.assertAllExpectationsMatched();
|
appender.assertAllExpectationsMatched();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue