From 07cba65c1b1e61d227d66f774116434edc180927 Mon Sep 17 00:00:00 2001 From: Nik Everett Date: Sat, 30 Jan 2016 20:18:58 -0500 Subject: [PATCH] Start to break lines at 140 characters Its what we say our maximum line length is in CONTRIBUTING.md and it'd be nice to have a check on that. Unfortunately, we don't actually wrap all lines at 140. --- .../transport/TransportService.java | 15 ++- .../transport/TransportSettings.java | 16 ++- .../transport/local/LocalTransport.java | 15 ++- .../local/LocalTransportChannel.java | 6 +- .../netty/MessageChannelHandler.java | 29 +++-- .../transport/netty/NettyTransport.java | 114 ++++++++++++------ .../netty/NettyTransportChannel.java | 6 +- .../netty/SizeHeaderFrameDecoder.java | 4 +- .../org/elasticsearch/tribe/TribeService.java | 52 +++++--- .../watcher/ResourceWatcherService.java | 3 +- 10 files changed, 171 insertions(+), 89 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/transport/TransportService.java b/core/src/main/java/org/elasticsearch/transport/TransportService.java index a6a1cab4f05..b050b2cb71f 100644 --- a/core/src/main/java/org/elasticsearch/transport/TransportService.java +++ b/core/src/main/java/org/elasticsearch/transport/TransportService.java @@ -31,6 +31,7 @@ import org.elasticsearch.common.metrics.MeanMetric; import org.elasticsearch.common.regex.Regex; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Setting; +import org.elasticsearch.common.settings.Setting.Scope; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.BoundTransportAddress; import org.elasticsearch.common.transport.TransportAddress; @@ -39,8 +40,8 @@ import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.common.util.concurrent.ConcurrentMapLong; import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; import org.elasticsearch.common.util.concurrent.FutureUtils; -import org.elasticsearch.tasks.TaskManager; import org.elasticsearch.common.util.concurrent.ThreadContext; +import org.elasticsearch.tasks.TaskManager; import org.elasticsearch.threadpool.ThreadPool; import java.io.IOException; @@ -56,6 +57,8 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.function.Function; import java.util.function.Supplier; +import static java.util.Collections.emptyList; +import static org.elasticsearch.common.settings.Setting.listSetting; import static org.elasticsearch.common.settings.Settings.Builder.EMPTY_SETTINGS; /** @@ -92,9 +95,10 @@ public class TransportService extends AbstractLifecycleComponent> TRACE_LOG_INCLUDE_SETTING = Setting.listSetting("transport.tracer.include", Collections.emptyList(), Function.identity(), true, Setting.Scope.CLUSTER); - public static final Setting> TRACE_LOG_EXCLUDE_SETTING = Setting.listSetting("transport.tracer.exclude", Arrays.asList("internal:discovery/zen/fd*", TransportLivenessAction.NAME), Function.identity(), true, Setting.Scope.CLUSTER); - + public static final Setting> TRACE_LOG_INCLUDE_SETTING = listSetting("transport.tracer.include", emptyList(), + Function.identity(), true, Scope.CLUSTER); + public static final Setting> TRACE_LOG_EXCLUDE_SETTING = listSetting("transport.tracer.exclude", + Arrays.asList("internal:discovery/zen/fd*", TransportLivenessAction.NAME), Function.identity(), true, Scope.CLUSTER); private final ESLogger tracerLog; @@ -757,7 +761,8 @@ public class TransportService extends AbstractLifecycleComponent> HOST = Setting.listSetting("transport.host", emptyList(), s -> s, false, Setting.Scope.CLUSTER); - public static final Setting> PUBLISH_HOST = Setting.listSetting("transport.publish_host", HOST, s -> s, false, Setting.Scope.CLUSTER); - public static final Setting> BIND_HOST = Setting.listSetting("transport.bind_host", HOST, s -> s, false, Setting.Scope.CLUSTER); - public static final Setting PORT = new Setting<>("transport.tcp.port", "9300-9400", s -> s, false, Setting.Scope.CLUSTER); - public static final Setting PUBLISH_PORT = Setting.intSetting("transport.publish_port", -1, -1, false, Setting.Scope.CLUSTER); + public static final Setting> HOST = listSetting("transport.host", emptyList(), s -> s, false, Scope.CLUSTER); + public static final Setting> PUBLISH_HOST = listSetting("transport.publish_host", HOST, s -> s, false, Scope.CLUSTER); + public static final Setting> BIND_HOST = listSetting("transport.bind_host", HOST, s -> s, false, Scope.CLUSTER); + public static final Setting PORT = new Setting<>("transport.tcp.port", "9300-9400", s -> s, false, Scope.CLUSTER); + public static final Setting PUBLISH_PORT = intSetting("transport.publish_port", -1, -1, false, Scope.CLUSTER); public static final String DEFAULT_PROFILE = "default"; - public static final Setting TRANSPORT_PROFILES_SETTING = Setting.groupSetting("transport.profiles.", true, Setting.Scope.CLUSTER); + public static final Setting TRANSPORT_PROFILES_SETTING = groupSetting("transport.profiles.", true, Scope.CLUSTER); private TransportSettings() { diff --git a/core/src/main/java/org/elasticsearch/transport/local/LocalTransport.java b/core/src/main/java/org/elasticsearch/transport/local/LocalTransport.java index 7a41bf626c6..a5db72b9b5f 100644 --- a/core/src/main/java/org/elasticsearch/transport/local/LocalTransport.java +++ b/core/src/main/java/org/elasticsearch/transport/local/LocalTransport.java @@ -97,7 +97,8 @@ public class LocalTransport extends AbstractLifecycleComponent implem int queueSize = this.settings.getAsInt(TRANSPORT_LOCAL_QUEUE, -1); logger.debug("creating [{}] workers, queue_size [{}]", workerCount, queueSize); final ThreadFactory threadFactory = EsExecutors.daemonThreadFactory(this.settings, LOCAL_TRANSPORT_THREAD_NAME_PREFIX); - this.workers = EsExecutors.newFixed(LOCAL_TRANSPORT_THREAD_NAME_PREFIX, workerCount, queueSize, threadFactory, threadPool.getThreadContext()); + this.workers = EsExecutors.newFixed(LOCAL_TRANSPORT_THREAD_NAME_PREFIX, workerCount, queueSize, threadFactory, + threadPool.getThreadContext()); this.namedWriteableRegistry = namedWriteableRegistry; } @@ -199,7 +200,8 @@ public class LocalTransport extends AbstractLifecycleComponent implem } @Override - public void sendRequest(final DiscoveryNode node, final long requestId, final String action, final TransportRequest request, TransportRequestOptions options) throws IOException, TransportException { + public void sendRequest(final DiscoveryNode node, final long requestId, final String action, final TransportRequest request, + TransportRequestOptions options) throws IOException, TransportException { final Version version = Version.smallest(node.version(), this.version); try (BytesStreamOutput stream = new BytesStreamOutput()) { @@ -237,7 +239,8 @@ public class LocalTransport extends AbstractLifecycleComponent implem return this.workers; } - protected void messageReceived(byte[] data, String action, LocalTransport sourceTransport, Version version, @Nullable final Long sendRequestId) { + protected void messageReceived(byte[] data, String action, LocalTransport sourceTransport, Version version, + @Nullable final Long sendRequestId) { Transports.assertTransportThread(); try { transportServiceAdapter.received(data.length); @@ -278,7 +281,8 @@ public class LocalTransport extends AbstractLifecycleComponent implem stream = new NamedWriteableAwareStreamInput(stream, namedWriteableRegistry); final String action = stream.readString(); transportServiceAdapter.onRequestReceived(requestId, action); - final LocalTransportChannel transportChannel = new LocalTransportChannel(this, transportServiceAdapter, sourceTransport, action, requestId, version); + final LocalTransportChannel transportChannel = new LocalTransportChannel(this, transportServiceAdapter, sourceTransport, action, + requestId, version); try { final RequestHandlerRegistry reg = transportServiceAdapter.getRequestHandler(action); if (reg == null) { @@ -334,7 +338,8 @@ public class LocalTransport extends AbstractLifecycleComponent implem try { response.readFrom(buffer); } catch (Throwable e) { - handleException(handler, new TransportSerializationException("Failed to deserialize response of type [" + response.getClass().getName() + "]", e)); + handleException(handler, new TransportSerializationException( + "Failed to deserialize response of type [" + response.getClass().getName() + "]", e)); return; } handleParsedResponse(response, handler); diff --git a/core/src/main/java/org/elasticsearch/transport/local/LocalTransportChannel.java b/core/src/main/java/org/elasticsearch/transport/local/LocalTransportChannel.java index aad31fd8ccd..41eb7354098 100644 --- a/core/src/main/java/org/elasticsearch/transport/local/LocalTransportChannel.java +++ b/core/src/main/java/org/elasticsearch/transport/local/LocalTransportChannel.java @@ -46,7 +46,8 @@ public class LocalTransportChannel implements TransportChannel { private final long requestId; private final Version version; - public LocalTransportChannel(LocalTransport sourceTransport, TransportServiceAdapter sourceTransportServiceAdapter, LocalTransport targetTransport, String action, long requestId, Version version) { + public LocalTransportChannel(LocalTransport sourceTransport, TransportServiceAdapter sourceTransportServiceAdapter, + LocalTransport targetTransport, String action, long requestId, Version version) { this.sourceTransport = sourceTransport; this.sourceTransportServiceAdapter = sourceTransportServiceAdapter; this.targetTransport = targetTransport; @@ -94,7 +95,8 @@ public class LocalTransportChannel implements TransportChannel { public void sendResponse(Throwable error) throws IOException { BytesStreamOutput stream = new BytesStreamOutput(); writeResponseExceptionHeader(stream); - RemoteTransportException tx = new RemoteTransportException(targetTransport.nodeName(), targetTransport.boundAddress().boundAddresses()[0], action, error); + RemoteTransportException tx = new RemoteTransportException(targetTransport.nodeName(), + targetTransport.boundAddress().boundAddresses()[0], action, error); stream.writeThrowable(tx); final byte[] data = stream.bytes().toBytes(); diff --git a/core/src/main/java/org/elasticsearch/transport/netty/MessageChannelHandler.java b/core/src/main/java/org/elasticsearch/transport/netty/MessageChannelHandler.java index 6732b26ddbb..fca979f9bc9 100644 --- a/core/src/main/java/org/elasticsearch/transport/netty/MessageChannelHandler.java +++ b/core/src/main/java/org/elasticsearch/transport/netty/MessageChannelHandler.java @@ -116,7 +116,9 @@ public class MessageChannelHandler extends SimpleChannelUpstreamHandler { } catch (NotCompressedException ex) { int maxToRead = Math.min(buffer.readableBytes(), 10); int offset = buffer.readerIndex(); - StringBuilder sb = new StringBuilder("stream marked as compressed, but no compressor found, first [").append(maxToRead).append("] content bytes out of [").append(buffer.readableBytes()).append("] readable bytes with message size [").append(size).append("] ").append("] are ["); + StringBuilder sb = new StringBuilder("stream marked as compressed, but no compressor found, first [").append(maxToRead) + .append("] content bytes out of [").append(buffer.readableBytes()) + .append("] readable bytes with message size [").append(size).append("] ").append("] are ["); for (int i = 0; i < maxToRead; i++) { sb.append(buffer.getByte(offset + i)).append(","); } @@ -134,15 +136,17 @@ public class MessageChannelHandler extends SimpleChannelUpstreamHandler { final int nextByte = streamIn.read(); // calling read() is useful to make sure the message is fully read, even if there some kind of EOS marker if (nextByte != -1) { - throw new IllegalStateException("Message not fully read (request) for requestId [" + requestId + "], action [" - + action + "], readerIndex [" + buffer.readerIndex() + "] vs expected [" + expectedIndexReader + "]; resetting"); + throw new IllegalStateException("Message not fully read (request) for requestId [" + requestId + "], action [" + action + + "], readerIndex [" + buffer.readerIndex() + "] vs expected [" + expectedIndexReader + "]; resetting"); } if (buffer.readerIndex() < expectedIndexReader) { - throw new IllegalStateException("Message is fully read (request), yet there are " + (expectedIndexReader - buffer.readerIndex()) + " remaining bytes; resetting"); + throw new IllegalStateException("Message is fully read (request), yet there are " + + (expectedIndexReader - buffer.readerIndex()) + " remaining bytes; resetting"); } if (buffer.readerIndex() > expectedIndexReader) { - throw new IllegalStateException("Message read past expected size (request) for requestId [" + requestId + "], action [" - + action + "], readerIndex [" + buffer.readerIndex() + "] vs expected [" + expectedIndexReader + "]; resetting"); + throw new IllegalStateException( + "Message read past expected size (request) for requestId [" + requestId + "], action [" + action + + "], readerIndex [" + buffer.readerIndex() + "] vs expected [" + expectedIndexReader + "]; resetting"); } } else { @@ -163,11 +167,12 @@ public class MessageChannelHandler extends SimpleChannelUpstreamHandler { + handler + "], error [" + TransportStatus.isError(status) + "]; resetting"); } if (buffer.readerIndex() < expectedIndexReader) { - throw new IllegalStateException("Message is fully read (response), yet there are " + (expectedIndexReader - buffer.readerIndex()) + " remaining bytes; resetting"); + throw new IllegalStateException("Message is fully read (response), yet there are " + + (expectedIndexReader - buffer.readerIndex()) + " remaining bytes; resetting"); } if (buffer.readerIndex() > expectedIndexReader) { - throw new IllegalStateException("Message read past expected size (response) for requestId [" + requestId + "], handler [" - + handler + "], error [" + TransportStatus.isError(status) + "]; resetting"); + throw new IllegalStateException("Message read past expected size (response) for requestId [" + requestId + + "], handler [" + handler + "], error [" + TransportStatus.isError(status) + "]; resetting"); } } @@ -193,7 +198,8 @@ public class MessageChannelHandler extends SimpleChannelUpstreamHandler { try { response.readFrom(buffer); } catch (Throwable e) { - handleException(handler, new TransportSerializationException("Failed to deserialize response of type [" + response.getClass().getName() + "]", e)); + handleException(handler, new TransportSerializationException( + "Failed to deserialize response of type [" + response.getClass().getName() + "]", e)); return; } try { @@ -247,7 +253,8 @@ public class MessageChannelHandler extends SimpleChannelUpstreamHandler { buffer = new NamedWriteableAwareStreamInput(buffer, transport.namedWriteableRegistry); final String action = buffer.readString(); transportServiceAdapter.onRequestReceived(requestId, action); - final NettyTransportChannel transportChannel = new NettyTransportChannel(transport, transportServiceAdapter, action, channel, requestId, version, profileName); + final NettyTransportChannel transportChannel = new NettyTransportChannel(transport, transportServiceAdapter, action, channel, + requestId, version, profileName); try { final RequestHandlerRegistry reg = transportServiceAdapter.getRequestHandler(action); if (reg == null) { diff --git a/core/src/main/java/org/elasticsearch/transport/netty/NettyTransport.java b/core/src/main/java/org/elasticsearch/transport/netty/NettyTransport.java index 99fbac17b69..8b174ecb19c 100644 --- a/core/src/main/java/org/elasticsearch/transport/netty/NettyTransport.java +++ b/core/src/main/java/org/elasticsearch/transport/netty/NettyTransport.java @@ -42,6 +42,7 @@ import org.elasticsearch.common.network.NetworkService; import org.elasticsearch.common.network.NetworkService.TcpSettings; import org.elasticsearch.common.network.NetworkUtils; import org.elasticsearch.common.settings.Setting; +import org.elasticsearch.common.settings.Setting.Scope; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.BoundTransportAddress; import org.elasticsearch.common.transport.InetSocketTransportAddress; @@ -119,6 +120,10 @@ import java.util.regex.Matcher; import java.util.regex.Pattern; import static java.util.Collections.unmodifiableMap; +import static org.elasticsearch.common.settings.Setting.boolSetting; +import static org.elasticsearch.common.settings.Setting.byteSizeSetting; +import static org.elasticsearch.common.settings.Setting.intSetting; +import static org.elasticsearch.common.settings.Setting.timeSetting; import static org.elasticsearch.common.settings.Settings.settingsBuilder; import static org.elasticsearch.common.transport.NetworkExceptionHelper.isCloseConnectionException; import static org.elasticsearch.common.transport.NetworkExceptionHelper.isConnectException; @@ -143,21 +148,33 @@ public class NettyTransport extends AbstractLifecycleComponent implem public static final String TRANSPORT_CLIENT_BOSS_THREAD_NAME_PREFIX = "transport_client_boss"; public static final Setting WORKER_COUNT = new Setting<>("transport.netty.worker_count", - (s) -> Integer.toString(EsExecutors.boundedNumberOfProcessors(s) * 2), (s) -> Setting.parseInt(s, 1, "transport.netty.worker_count"), - false, Setting.Scope.CLUSTER); - public static final Setting CONNECTIONS_PER_NODE_RECOVERY = Setting.intSetting("transport.connections_per_node.recovery", 2, 1, false, Setting.Scope.CLUSTER); - public static final Setting CONNECTIONS_PER_NODE_BULK = Setting.intSetting("transport.connections_per_node.bulk", 3, 1, false, Setting.Scope.CLUSTER); - public static final Setting CONNECTIONS_PER_NODE_REG = Setting.intSetting("transport.connections_per_node.reg", 6, 1, false, Setting.Scope.CLUSTER); - public static final Setting CONNECTIONS_PER_NODE_STATE = Setting.intSetting("transport.connections_per_node.state", 1, 1, false, Setting.Scope.CLUSTER); - public static final Setting CONNECTIONS_PER_NODE_PING = Setting.intSetting("transport.connections_per_node.ping", 1, 1, false, Setting.Scope.CLUSTER); + (s) -> Integer.toString(EsExecutors.boundedNumberOfProcessors(s) * 2), + (s) -> Setting.parseInt(s, 1, "transport.netty.worker_count"), false, Setting.Scope.CLUSTER); + public static final Setting CONNECTIONS_PER_NODE_RECOVERY = intSetting("transport.connections_per_node.recovery", 2, 1, false, + Scope.CLUSTER); + public static final Setting CONNECTIONS_PER_NODE_BULK = intSetting("transport.connections_per_node.bulk", 3, 1, false, + Scope.CLUSTER); + public static final Setting CONNECTIONS_PER_NODE_REG = intSetting("transport.connections_per_node.reg", 6, 1, false, + Scope.CLUSTER); + public static final Setting CONNECTIONS_PER_NODE_STATE = intSetting("transport.connections_per_node.state", 1, 1, false, + Scope.CLUSTER); + public static final Setting CONNECTIONS_PER_NODE_PING = intSetting("transport.connections_per_node.ping", 1, 1, false, + Scope.CLUSTER); // the scheduled internal ping interval setting, defaults to disabled (-1) - public static final Setting PING_SCHEDULE = Setting.timeSetting("transport.ping_schedule", TimeValue.timeValueSeconds(-1), false, Setting.Scope.CLUSTER); - public static final Setting TCP_BLOCKING_CLIENT = Setting.boolSetting("transport.tcp.blocking_client", TcpSettings.TCP_BLOCKING_CLIENT, false, Setting.Scope.CLUSTER); - public static final Setting TCP_CONNECT_TIMEOUT = Setting.timeSetting("transport.tcp.connect_timeout", TcpSettings.TCP_CONNECT_TIMEOUT, false, Setting.Scope.CLUSTER); - public static final Setting TCP_NO_DELAY = Setting.boolSetting("transport.tcp_no_delay", TcpSettings.TCP_NO_DELAY, false, Setting.Scope.CLUSTER); - public static final Setting TCP_KEEP_ALIVE = Setting.boolSetting("transport.tcp.keep_alive", TcpSettings.TCP_KEEP_ALIVE, false, Setting.Scope.CLUSTER); - public static final Setting TCP_BLOCKING_SERVER = Setting.boolSetting("transport.tcp.blocking_server", TcpSettings.TCP_BLOCKING_SERVER, false, Setting.Scope.CLUSTER); - public static final Setting TCP_REUSE_ADDRESS = Setting.boolSetting("transport.tcp.reuse_address", TcpSettings.TCP_REUSE_ADDRESS, false, Setting.Scope.CLUSTER); + public static final Setting PING_SCHEDULE = timeSetting("transport.ping_schedule", TimeValue.timeValueSeconds(-1), false, + Setting.Scope.CLUSTER); + public static final Setting TCP_BLOCKING_CLIENT = boolSetting("transport.tcp.blocking_client", TcpSettings.TCP_BLOCKING_CLIENT, + false, Setting.Scope.CLUSTER); + public static final Setting TCP_CONNECT_TIMEOUT = timeSetting("transport.tcp.connect_timeout", + TcpSettings.TCP_CONNECT_TIMEOUT, false, Setting.Scope.CLUSTER); + public static final Setting TCP_NO_DELAY = boolSetting("transport.tcp_no_delay", TcpSettings.TCP_NO_DELAY, false, + Setting.Scope.CLUSTER); + public static final Setting TCP_KEEP_ALIVE = boolSetting("transport.tcp.keep_alive", TcpSettings.TCP_KEEP_ALIVE, false, + Setting.Scope.CLUSTER); + public static final Setting TCP_BLOCKING_SERVER = boolSetting("transport.tcp.blocking_server", TcpSettings.TCP_BLOCKING_SERVER, + false, Setting.Scope.CLUSTER); + public static final Setting TCP_REUSE_ADDRESS = boolSetting("transport.tcp.reuse_address", TcpSettings.TCP_REUSE_ADDRESS, + false, Setting.Scope.CLUSTER); public static final Setting TCP_SEND_BUFFER_SIZE = Setting.byteSizeSetting("transport.tcp.send_buffer_size", TcpSettings.TCP_SEND_BUFFER_SIZE, false, Setting.Scope.CLUSTER); public static final Setting TCP_RECEIVE_BUFFER_SIZE = Setting.byteSizeSetting("transport.tcp.receive_buffer_size", TcpSettings.TCP_RECEIVE_BUFFER_SIZE, false, Setting.Scope.CLUSTER); @@ -165,9 +182,9 @@ public class NettyTransport extends AbstractLifecycleComponent implem public static final Setting NETTY_MAX_CUMULATION_BUFFER_CAPACITY = Setting.byteSizeSetting("transport.netty.max_cumulation_buffer_capacity", new ByteSizeValue(-1), false, Setting.Scope.CLUSTER); public static final Setting NETTY_MAX_COMPOSITE_BUFFER_COMPONENTS = Setting.intSetting("transport.netty.max_composite_buffer_components", -1, -1, false, Setting.Scope.CLUSTER); - // See AdaptiveReceiveBufferSizePredictor#DEFAULT_XXX for default values in netty..., we can use higher ones for us, even fixed one - public static final Setting NETTY_RECEIVE_PREDICTOR_SIZE = Setting.byteSizeSetting("transport.netty.receive_predictor_size", + public static final Setting NETTY_RECEIVE_PREDICTOR_SIZE = Setting.byteSizeSetting( + "transport.netty.receive_predictor_size", settings -> { long defaultReceiverPredictor = 512 * 1024; if (JvmInfo.jvmInfo().getMem().getDirectMemoryMax().bytes() > 0) { @@ -177,10 +194,11 @@ public class NettyTransport extends AbstractLifecycleComponent implem } return new ByteSizeValue(defaultReceiverPredictor).toString(); }, false, Setting.Scope.CLUSTER); - public static final Setting NETTY_RECEIVE_PREDICTOR_MIN = Setting.byteSizeSetting("transport.netty.receive_predictor_min", NETTY_RECEIVE_PREDICTOR_SIZE, false, Setting.Scope.CLUSTER); - public static final Setting NETTY_RECEIVE_PREDICTOR_MAX = Setting.byteSizeSetting("transport.netty.receive_predictor_max", NETTY_RECEIVE_PREDICTOR_SIZE, false, Setting.Scope.CLUSTER); - public static final Setting NETTY_BOSS_COUNT = Setting.intSetting("transport.netty.boss_count", 1, 1, false, Setting.Scope.CLUSTER); - + public static final Setting NETTY_RECEIVE_PREDICTOR_MIN = byteSizeSetting("transport.netty.receive_predictor_min", + NETTY_RECEIVE_PREDICTOR_SIZE, false, Scope.CLUSTER); + public static final Setting NETTY_RECEIVE_PREDICTOR_MAX = byteSizeSetting("transport.netty.receive_predictor_max", + NETTY_RECEIVE_PREDICTOR_SIZE, false, Scope.CLUSTER); + public static final Setting NETTY_BOSS_COUNT = intSetting("transport.netty.boss_count", 1, 1, false, Scope.CLUSTER); protected final NetworkService networkService; protected final Version version; @@ -226,7 +244,8 @@ public class NettyTransport extends AbstractLifecycleComponent implem final ScheduledPing scheduledPing; @Inject - public NettyTransport(Settings settings, ThreadPool threadPool, NetworkService networkService, BigArrays bigArrays, Version version, NamedWriteableRegistry namedWriteableRegistry) { + public NettyTransport(Settings settings, ThreadPool threadPool, NetworkService networkService, BigArrays bigArrays, Version version, + NamedWriteableRegistry namedWriteableRegistry) { super(settings); this.threadPool = threadPool; this.networkService = networkService; @@ -252,7 +271,8 @@ public class NettyTransport extends AbstractLifecycleComponent implem if (receivePredictorMax.bytes() == receivePredictorMin.bytes()) { receiveBufferSizePredictorFactory = new FixedReceiveBufferSizePredictorFactory((int) receivePredictorMax.bytes()); } else { - receiveBufferSizePredictorFactory = new AdaptiveReceiveBufferSizePredictorFactory((int) receivePredictorMin.bytes(), (int) receivePredictorMin.bytes(), (int) receivePredictorMax.bytes()); + receiveBufferSizePredictorFactory = new AdaptiveReceiveBufferSizePredictorFactory((int) receivePredictorMin.bytes(), + (int) receivePredictorMin.bytes(), (int) receivePredictorMax.bytes()); } this.scheduledPing = new ScheduledPing(); @@ -305,7 +325,8 @@ public class NettyTransport extends AbstractLifecycleComponent implem String name = entry.getKey(); if (!Strings.hasLength(name)) { - logger.info("transport profile configured without a name. skipping profile with settings [{}]", profileSettings.toDelimitedString(',')); + logger.info("transport profile configured without a name. skipping profile with settings [{}]", + profileSettings.toDelimitedString(',')); continue; } else if (TransportSettings.DEFAULT_PROFILE.equals(name)) { profileSettings = settingsBuilder() @@ -345,13 +366,16 @@ public class NettyTransport extends AbstractLifecycleComponent implem private ClientBootstrap createClientBootstrap() { if (blockingClient) { - clientBootstrap = new ClientBootstrap(new OioClientSocketChannelFactory(Executors.newCachedThreadPool(daemonThreadFactory(settings, TRANSPORT_CLIENT_WORKER_THREAD_NAME_PREFIX)))); + clientBootstrap = new ClientBootstrap(new OioClientSocketChannelFactory( + Executors.newCachedThreadPool(daemonThreadFactory(settings, TRANSPORT_CLIENT_WORKER_THREAD_NAME_PREFIX)))); } else { int bossCount = NETTY_BOSS_COUNT.get(settings); - clientBootstrap = new ClientBootstrap(new NioClientSocketChannelFactory( - Executors.newCachedThreadPool(daemonThreadFactory(settings, TRANSPORT_CLIENT_BOSS_THREAD_NAME_PREFIX)), - bossCount, - new NioWorkerPool(Executors.newCachedThreadPool(daemonThreadFactory(settings, TRANSPORT_CLIENT_WORKER_THREAD_NAME_PREFIX)), workerCount), + clientBootstrap = new ClientBootstrap( + new NioClientSocketChannelFactory( + Executors.newCachedThreadPool(daemonThreadFactory(settings, TRANSPORT_CLIENT_BOSS_THREAD_NAME_PREFIX)), + bossCount, + new NioWorkerPool(Executors.newCachedThreadPool( + daemonThreadFactory(settings, TRANSPORT_CLIENT_WORKER_THREAD_NAME_PREFIX)), workerCount), new HashedWheelTimer(daemonThreadFactory(settings, "transport_client_timer")))); } clientBootstrap.setPipelineFactory(configureClientChannelPipelineFactory()); @@ -403,12 +427,14 @@ public class NettyTransport extends AbstractLifecycleComponent implem boolean fallbackReuseAddress = settings.getAsBoolean("transport.netty.reuse_address", TcpSettings.TCP_REUSE_ADDRESS.get(settings)); fallbackSettingsBuilder.put("reuse_address", fallbackReuseAddress); - ByteSizeValue fallbackTcpSendBufferSize = settings.getAsBytesSize("transport.netty.tcp_send_buffer_size", TcpSettings.TCP_SEND_BUFFER_SIZE.get(settings)); + ByteSizeValue fallbackTcpSendBufferSize = settings.getAsBytesSize("transport.netty.tcp_send_buffer_size", + TCP_SEND_BUFFER_SIZE.get(settings)); if (fallbackTcpSendBufferSize.bytes() >= 0) { fallbackSettingsBuilder.put("tcp_send_buffer_size", fallbackTcpSendBufferSize); } - ByteSizeValue fallbackTcpBufferSize = settings.getAsBytesSize("transport.netty.tcp_receive_buffer_size", TcpSettings.TCP_RECEIVE_BUFFER_SIZE.get(settings)); + ByteSizeValue fallbackTcpBufferSize = settings.getAsBytesSize("transport.netty.tcp_receive_buffer_size", + TCP_RECEIVE_BUFFER_SIZE.get(settings)); if (fallbackTcpBufferSize.bytes() >= 0) { fallbackSettingsBuilder.put("tcp_receive_buffer_size", fallbackTcpBufferSize); } @@ -485,7 +511,8 @@ public class NettyTransport extends AbstractLifecycleComponent implem return boundSocket.get(); } - private BoundTransportAddress createBoundTransportAddress(String name, Settings profileSettings, List boundAddresses) { + private BoundTransportAddress createBoundTransportAddress(String name, Settings profileSettings, + List boundAddresses) { String[] boundAddressesHostStrings = new String[boundAddresses.size()]; TransportAddress[] transportBoundAddresses = new TransportAddress[boundAddresses.size()]; for (int i = 0; i < boundAddresses.size(); i++) { @@ -531,7 +558,8 @@ public class NettyTransport extends AbstractLifecycleComponent implem // TODO: In case of DEFAULT_PROFILE we should probably fail here, as publish address does not match any bound address // In case of a custom profile, we might use the publish address of the default profile publishPort = boundAddresses.get(0).getPort(); - logger.warn("Publish port not found by matching publish address [{}] to bound addresses [{}], falling back to port [{}] of first bound address", publishInetAddress, boundAddresses, publishPort); + logger.warn("Publish port not found by matching publish address [{}] to bound addresses [{}], " + + "falling back to port [{}] of first bound address", publishInetAddress, boundAddresses, publishPort); } final TransportAddress publishAddress = new InetSocketTransportAddress(new InetSocketAddress(publishInetAddress, publishPort)); @@ -549,8 +577,13 @@ public class NettyTransport extends AbstractLifecycleComponent implem ByteSizeValue tcpSendBufferSize = TCP_SEND_BUFFER_SIZE.getDefault(settings); ByteSizeValue tcpReceiveBufferSize = TCP_RECEIVE_BUFFER_SIZE.getDefault(settings); - logger.debug("using profile[{}], worker_count[{}], port[{}], bind_host[{}], publish_host[{}], compress[{}], connect_timeout[{}], connections_per_node[{}/{}/{}/{}/{}], receive_predictor[{}->{}]", - name, workerCount, port, bindHost, publishHost, compress, connectTimeout, connectionsPerNodeRecovery, connectionsPerNodeBulk, connectionsPerNodeReg, connectionsPerNodeState, connectionsPerNodePing, receivePredictorMin, receivePredictorMax); + if (logger.isDebugEnabled()) { + logger.debug("using profile[{}], worker_count[{}], port[{}], bind_host[{}], publish_host[{}], compress[{}], " + + "connect_timeout[{}], connections_per_node[{}/{}/{}/{}/{}], receive_predictor[{}->{}]", + name, workerCount, port, bindHost, publishHost, compress, connectTimeout, connectionsPerNodeRecovery, + connectionsPerNodeBulk, connectionsPerNodeReg, connectionsPerNodeState, connectionsPerNodePing, receivePredictorMin, + receivePredictorMax); + } final ThreadFactory bossFactory = daemonThreadFactory(this.settings, HTTP_SERVER_BOSS_THREAD_NAME_PREFIX, name); final ThreadFactory workerFactory = daemonThreadFactory(this.settings, HTTP_SERVER_WORKER_THREAD_NAME_PREFIX, name); @@ -739,7 +772,8 @@ public class NettyTransport extends AbstractLifecycleComponent implem return; } if (isCloseConnectionException(e.getCause())) { - logger.trace("close connection exception caught on transport layer [{}], disconnecting from relevant node", e.getCause(), ctx.getChannel()); + logger.trace("close connection exception caught on transport layer [{}], disconnecting from relevant node", e.getCause(), + ctx.getChannel()); // close the channel, which will cause a node to be disconnected if relevant ctx.getChannel().close(); disconnectFromNodeChannel(ctx.getChannel(), e.getCause()); @@ -754,7 +788,8 @@ public class NettyTransport extends AbstractLifecycleComponent implem ctx.getChannel().close(); disconnectFromNodeChannel(ctx.getChannel(), e.getCause()); } else if (e.getCause() instanceof CancelledKeyException) { - logger.trace("cancelled key exception caught on transport layer [{}], disconnecting from relevant node", e.getCause(), ctx.getChannel()); + logger.trace("cancelled key exception caught on transport layer [{}], disconnecting from relevant node", e.getCause(), + ctx.getChannel()); // close the channel as safe measure, which will cause a node to be disconnected if relevant ctx.getChannel().close(); disconnectFromNodeChannel(ctx.getChannel(), e.getCause()); @@ -800,7 +835,8 @@ public class NettyTransport extends AbstractLifecycleComponent implem } @Override - public void sendRequest(final DiscoveryNode node, final long requestId, final String action, final TransportRequest request, TransportRequestOptions options) throws IOException, TransportException { + public void sendRequest(final DiscoveryNode node, final long requestId, final String action, final TransportRequest request, + TransportRequestOptions options) throws IOException, TransportException { Channel targetChannel = nodeChannel(node, options); @@ -902,7 +938,9 @@ public class NettyTransport extends AbstractLifecycleComponent implem if (light) { nodeChannels = connectToChannelsLight(node); } else { - nodeChannels = new NodeChannels(new Channel[connectionsPerNodeRecovery], new Channel[connectionsPerNodeBulk], new Channel[connectionsPerNodeReg], new Channel[connectionsPerNodeState], new Channel[connectionsPerNodePing]); + nodeChannels = new NodeChannels(new Channel[connectionsPerNodeRecovery], new Channel[connectionsPerNodeBulk], + new Channel[connectionsPerNodeReg], new Channel[connectionsPerNodeState], + new Channel[connectionsPerNodePing]); try { connectToChannels(nodeChannels, node); } catch (Throwable e) { diff --git a/core/src/main/java/org/elasticsearch/transport/netty/NettyTransportChannel.java b/core/src/main/java/org/elasticsearch/transport/netty/NettyTransportChannel.java index aaf33c2fd5a..c89523074dc 100644 --- a/core/src/main/java/org/elasticsearch/transport/netty/NettyTransportChannel.java +++ b/core/src/main/java/org/elasticsearch/transport/netty/NettyTransportChannel.java @@ -53,7 +53,8 @@ public class NettyTransportChannel implements TransportChannel { private final long requestId; private final String profileName; - public NettyTransportChannel(NettyTransport transport, TransportServiceAdapter transportServiceAdapter, String action, Channel channel, long requestId, Version version, String profileName) { + public NettyTransportChannel(NettyTransport transport, TransportServiceAdapter transportServiceAdapter, String action, Channel channel, + long requestId, Version version, String profileName) { this.transportServiceAdapter = transportServiceAdapter; this.version = version; this.transport = transport; @@ -119,7 +120,8 @@ public class NettyTransportChannel implements TransportChannel { public void sendResponse(Throwable error) throws IOException { BytesStreamOutput stream = new BytesStreamOutput(); stream.skip(NettyHeader.HEADER_SIZE); - RemoteTransportException tx = new RemoteTransportException(transport.nodeName(), transport.wrapAddress(channel.getLocalAddress()), action, error); + RemoteTransportException tx = new RemoteTransportException(transport.nodeName(), transport.wrapAddress(channel.getLocalAddress()), + action, error); stream.writeThrowable(tx); byte status = 0; status = TransportStatus.setResponse(status); diff --git a/core/src/main/java/org/elasticsearch/transport/netty/SizeHeaderFrameDecoder.java b/core/src/main/java/org/elasticsearch/transport/netty/SizeHeaderFrameDecoder.java index f38dc1dc02d..aab83d293d8 100644 --- a/core/src/main/java/org/elasticsearch/transport/netty/SizeHeaderFrameDecoder.java +++ b/core/src/main/java/org/elasticsearch/transport/netty/SizeHeaderFrameDecoder.java @@ -80,8 +80,8 @@ public class SizeHeaderFrameDecoder extends FrameDecoder { } // safety against too large frames being sent if (dataLen > NINETY_PER_HEAP_SIZE) { - throw new TooLongFrameException( - "transport content length received [" + new ByteSizeValue(dataLen) + "] exceeded [" + new ByteSizeValue(NINETY_PER_HEAP_SIZE) + "]"); + throw new TooLongFrameException("transport content length received [" + new ByteSizeValue(dataLen) + "] exceeded [" + + new ByteSizeValue(NINETY_PER_HEAP_SIZE) + "]"); } if (buffer.readableBytes() < dataLen + 6) { diff --git a/core/src/main/java/org/elasticsearch/tribe/TribeService.java b/core/src/main/java/org/elasticsearch/tribe/TribeService.java index 44d35305a60..88c4dc75222 100644 --- a/core/src/main/java/org/elasticsearch/tribe/TribeService.java +++ b/core/src/main/java/org/elasticsearch/tribe/TribeService.java @@ -20,6 +20,7 @@ package org.elasticsearch.tribe; import com.carrotsearch.hppc.cursors.ObjectObjectCursor; + import org.elasticsearch.ElasticsearchException; import org.elasticsearch.action.support.master.TransportMasterNodeReadAction; import org.elasticsearch.cluster.ClusterChangedEvent; @@ -83,8 +84,10 @@ import static java.util.Collections.unmodifiableMap; */ public class TribeService extends AbstractLifecycleComponent { - public static final ClusterBlock TRIBE_METADATA_BLOCK = new ClusterBlock(10, "tribe node, metadata not allowed", false, false, RestStatus.BAD_REQUEST, EnumSet.of(ClusterBlockLevel.METADATA_READ, ClusterBlockLevel.METADATA_WRITE)); - public static final ClusterBlock TRIBE_WRITE_BLOCK = new ClusterBlock(11, "tribe node, write not allowed", false, false, RestStatus.BAD_REQUEST, EnumSet.of(ClusterBlockLevel.WRITE)); + public static final ClusterBlock TRIBE_METADATA_BLOCK = new ClusterBlock(10, "tribe node, metadata not allowed", false, false, + RestStatus.BAD_REQUEST, EnumSet.of(ClusterBlockLevel.METADATA_READ, ClusterBlockLevel.METADATA_WRITE)); + public static final ClusterBlock TRIBE_WRITE_BLOCK = new ClusterBlock(11, "tribe node, write not allowed", false, false, + RestStatus.BAD_REQUEST, EnumSet.of(ClusterBlockLevel.WRITE)); public static Settings processSettings(Settings settings) { if (TRIBE_NAME_SETTING.exists(settings)) { @@ -106,7 +109,8 @@ public class TribeService extends AbstractLifecycleComponent { Settings.Builder sb = Settings.builder().put(settings); sb.put(Node.NODE_CLIENT_SETTING.getKey(), true); // this node should just act as a node client sb.put(DiscoveryModule.DISCOVERY_TYPE_SETTING.getKey(), "local"); // a tribe node should not use zen discovery - sb.put(DiscoveryService.INITIAL_STATE_TIMEOUT_SETTING.getKey(), 0); // nothing is going to be discovered, since no master will be elected + // nothing is going to be discovered, since no master will be elected + sb.put(DiscoveryService.INITIAL_STATE_TIMEOUT_SETTING.getKey(), 0); if (sb.get("cluster.name") == null) { sb.put("cluster.name", "tribe_" + Strings.randomBase64UUID()); // make sure it won't join other tribe nodes in the same JVM } @@ -114,7 +118,8 @@ public class TribeService extends AbstractLifecycleComponent { return sb.build(); } - private static final Setting TRIBE_NAME_SETTING = Setting.simpleString("tribe.name", false, Setting.Scope.CLUSTER); // internal settings only + // internal settings only + private static final Setting TRIBE_NAME_SETTING = Setting.simpleString("tribe.name", false, Setting.Scope.CLUSTER); private final ClusterService clusterService; private final String[] blockIndicesWrite; private final String[] blockIndicesRead; @@ -125,14 +130,20 @@ public class TribeService extends AbstractLifecycleComponent { if (ON_CONFLICT_ANY.equals(s) || ON_CONFLICT_DROP.equals(s) || s.startsWith(ON_CONFLICT_PREFER)) { return s; } - throw new IllegalArgumentException("Invalid value for [tribe.on_conflict] must be either [any, drop or start with prefer_] but was: " +s); + throw new IllegalArgumentException( + "Invalid value for [tribe.on_conflict] must be either [any, drop or start with prefer_] but was: " + s); }, false, Setting.Scope.CLUSTER); - public static final Setting BLOCKS_METADATA_SETTING = Setting.boolSetting("tribe.blocks.metadata", false, false, Setting.Scope.CLUSTER); - public static final Setting BLOCKS_WRITE_SETTING = Setting.boolSetting("tribe.blocks.write", false, false, Setting.Scope.CLUSTER); - public static final Setting> BLOCKS_WRITE_INDICES_SETTING = Setting.listSetting("tribe.blocks.write.indices", Collections.emptyList(), Function.identity(), false, Setting.Scope.CLUSTER); - public static final Setting> BLOCKS_READ_INDICES_SETTING = Setting.listSetting("tribe.blocks.read.indices", Collections.emptyList(), Function.identity(), false, Setting.Scope.CLUSTER); - public static final Setting> BLOCKS_METADATA_INDICES_SETTING = Setting.listSetting("tribe.blocks.metadata.indices", Collections.emptyList(), Function.identity(), false, Setting.Scope.CLUSTER); + public static final Setting BLOCKS_METADATA_SETTING = Setting.boolSetting("tribe.blocks.metadata", false, false, + Setting.Scope.CLUSTER); + public static final Setting BLOCKS_WRITE_SETTING = Setting.boolSetting("tribe.blocks.write", false, false, + Setting.Scope.CLUSTER); + public static final Setting> BLOCKS_WRITE_INDICES_SETTING = Setting.listSetting("tribe.blocks.write.indices", + Collections.emptyList(), Function.identity(), false, Setting.Scope.CLUSTER); + public static final Setting> BLOCKS_READ_INDICES_SETTING = Setting.listSetting("tribe.blocks.read.indices", + Collections.emptyList(), Function.identity(), false, Setting.Scope.CLUSTER); + public static final Setting> BLOCKS_METADATA_INDICES_SETTING = Setting.listSetting("tribe.blocks.metadata.indices", + Collections.emptyList(), Function.identity(), false, Setting.Scope.CLUSTER); private final String onConflict; private final Set droppedIndices = ConcurrentCollections.newConcurrentSet(); @@ -304,7 +315,8 @@ public class TribeService extends AbstractLifecycleComponent { tribeAttr.put(attr.key, attr.value); } tribeAttr.put(TRIBE_NAME_SETTING.getKey(), tribeName); - DiscoveryNode discoNode = new DiscoveryNode(tribe.name(), tribe.id(), tribe.getHostName(), tribe.getHostAddress(), tribe.address(), unmodifiableMap(tribeAttr), tribe.version()); + DiscoveryNode discoNode = new DiscoveryNode(tribe.name(), tribe.id(), tribe.getHostName(), tribe.getHostAddress(), + tribe.address(), unmodifiableMap(tribeAttr), tribe.version()); clusterStateChanged = true; logger.info("[{}] adding node [{}]", tribeName, discoNode); nodes.put(discoNode); @@ -328,7 +340,8 @@ public class TribeService extends AbstractLifecycleComponent { // always make sure to update the metadata and routing table, in case // there are changes in them (new mapping, shards moving from initializing to started) routingTable.add(tribeState.routingTable().index(index.getIndex())); - Settings tribeSettings = Settings.builder().put(tribeIndex.getSettings()).put(TRIBE_NAME_SETTING.getKey(), tribeName).build(); + Settings tribeSettings = Settings.builder().put(tribeIndex.getSettings()) + .put(TRIBE_NAME_SETTING.getKey(), tribeName).build(); metaData.put(IndexMetaData.builder(tribeIndex).settings(tribeSettings)); } } @@ -357,7 +370,8 @@ public class TribeService extends AbstractLifecycleComponent { } else if (ON_CONFLICT_DROP.equals(onConflict)) { // drop the indices, there is a conflict clusterStateChanged = true; - logger.info("[{}] dropping index {} due to conflict with [{}]", tribeName, tribeIndex.getIndex(), existingFromTribe); + logger.info("[{}] dropping index {} due to conflict with [{}]", tribeName, tribeIndex.getIndex(), + existingFromTribe); removeIndex(blocks, metaData, routingTable, tribeIndex); droppedIndices.add(tribeIndex.getIndex().getName()); } else if (onConflict.startsWith(ON_CONFLICT_PREFER)) { @@ -366,7 +380,8 @@ public class TribeService extends AbstractLifecycleComponent { if (tribeName.equals(preferredTribeName)) { // the new one is hte preferred one, replace... clusterStateChanged = true; - logger.info("[{}] adding index {}, preferred over [{}]", tribeName, tribeIndex.getIndex(), existingFromTribe); + logger.info("[{}] adding index {}, preferred over [{}]", tribeName, tribeIndex.getIndex(), + existingFromTribe); removeIndex(blocks, metaData, routingTable, tribeIndex); addNewIndex(tribeState, blocks, metaData, routingTable, tribeIndex); } // else: either the existing one is the preferred one, or we haven't seen one, carry on @@ -378,17 +393,20 @@ public class TribeService extends AbstractLifecycleComponent { if (!clusterStateChanged) { return currentState; } else { - return ClusterState.builder(currentState).incrementVersion().blocks(blocks).nodes(nodes).metaData(metaData).routingTable(routingTable.build()).build(); + return ClusterState.builder(currentState).incrementVersion().blocks(blocks).nodes(nodes).metaData(metaData) + .routingTable(routingTable.build()).build(); } } - private void removeIndex(ClusterBlocks.Builder blocks, MetaData.Builder metaData, RoutingTable.Builder routingTable, IndexMetaData index) { + private void removeIndex(ClusterBlocks.Builder blocks, MetaData.Builder metaData, RoutingTable.Builder routingTable, + IndexMetaData index) { metaData.remove(index.getIndex().getName()); routingTable.remove(index.getIndex().getName()); blocks.removeIndexBlocks(index.getIndex().getName()); } - private void addNewIndex(ClusterState tribeState, ClusterBlocks.Builder blocks, MetaData.Builder metaData, RoutingTable.Builder routingTable, IndexMetaData tribeIndex) { + private void addNewIndex(ClusterState tribeState, ClusterBlocks.Builder blocks, MetaData.Builder metaData, + RoutingTable.Builder routingTable, IndexMetaData tribeIndex) { Settings tribeSettings = Settings.builder().put(tribeIndex.getSettings()).put(TRIBE_NAME_SETTING.getKey(), tribeName).build(); metaData.put(IndexMetaData.builder(tribeIndex).settings(tribeSettings)); routingTable.add(tribeState.routingTable().index(tribeIndex.getIndex())); diff --git a/core/src/main/java/org/elasticsearch/watcher/ResourceWatcherService.java b/core/src/main/java/org/elasticsearch/watcher/ResourceWatcherService.java index 5ff6525a428..7c1cd060952 100644 --- a/core/src/main/java/org/elasticsearch/watcher/ResourceWatcherService.java +++ b/core/src/main/java/org/elasticsearch/watcher/ResourceWatcherService.java @@ -83,7 +83,8 @@ public class ResourceWatcherService extends AbstractLifecycleComponent