Start to break lines at 140 characters

Its what we say our maximum line length is in CONTRIBUTING.md and it'd be
nice to have a check on that. Unfortunately, we don't actually wrap all lines
at 140.
This commit is contained in:
Nik Everett 2016-01-30 20:18:58 -05:00
parent 7b5ed21d0d
commit 07cba65c1b
10 changed files with 171 additions and 89 deletions

View File

@ -31,6 +31,7 @@ import org.elasticsearch.common.metrics.MeanMetric;
import org.elasticsearch.common.regex.Regex;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Setting.Scope;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.BoundTransportAddress;
import org.elasticsearch.common.transport.TransportAddress;
@ -39,8 +40,8 @@ import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.common.util.concurrent.ConcurrentMapLong;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.common.util.concurrent.FutureUtils;
import org.elasticsearch.tasks.TaskManager;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.tasks.TaskManager;
import org.elasticsearch.threadpool.ThreadPool;
import java.io.IOException;
@ -56,6 +57,8 @@ import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import java.util.function.Supplier;
import static java.util.Collections.emptyList;
import static org.elasticsearch.common.settings.Setting.listSetting;
import static org.elasticsearch.common.settings.Settings.Builder.EMPTY_SETTINGS;
/**
@ -92,9 +95,10 @@ public class TransportService extends AbstractLifecycleComponent<TransportServic
// tracer log
public static final Setting<List<String>> TRACE_LOG_INCLUDE_SETTING = Setting.listSetting("transport.tracer.include", Collections.emptyList(), Function.identity(), true, Setting.Scope.CLUSTER);
public static final Setting<List<String>> TRACE_LOG_EXCLUDE_SETTING = Setting.listSetting("transport.tracer.exclude", Arrays.asList("internal:discovery/zen/fd*", TransportLivenessAction.NAME), Function.identity(), true, Setting.Scope.CLUSTER);
public static final Setting<List<String>> TRACE_LOG_INCLUDE_SETTING = listSetting("transport.tracer.include", emptyList(),
Function.identity(), true, Scope.CLUSTER);
public static final Setting<List<String>> TRACE_LOG_EXCLUDE_SETTING = listSetting("transport.tracer.exclude",
Arrays.asList("internal:discovery/zen/fd*", TransportLivenessAction.NAME), Function.identity(), true, Scope.CLUSTER);
private final ESLogger tracerLog;
@ -757,7 +761,8 @@ public class TransportService extends AbstractLifecycleComponent<TransportServic
final TransportServiceAdapter adapter;
final ThreadPool threadPool;
public DirectResponseChannel(ESLogger logger, DiscoveryNode localNode, String action, long requestId, TransportServiceAdapter adapter, ThreadPool threadPool) {
public DirectResponseChannel(ESLogger logger, DiscoveryNode localNode, String action, long requestId,
TransportServiceAdapter adapter, ThreadPool threadPool) {
this.logger = logger;
this.localNode = localNode;
this.action = action;

View File

@ -19,11 +19,15 @@
package org.elasticsearch.transport;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Setting.Scope;
import org.elasticsearch.common.settings.Settings;
import java.util.List;
import static java.util.Collections.emptyList;
import static org.elasticsearch.common.settings.Setting.groupSetting;
import static org.elasticsearch.common.settings.Setting.intSetting;
import static org.elasticsearch.common.settings.Setting.listSetting;
/**
* a collection of settings related to transport components, which are also needed in org.elasticsearch.bootstrap.Security
@ -31,13 +35,13 @@ import static java.util.Collections.emptyList;
*/
final public class TransportSettings {
public static final Setting<List<String>> HOST = Setting.listSetting("transport.host", emptyList(), s -> s, false, Setting.Scope.CLUSTER);
public static final Setting<List<String>> PUBLISH_HOST = Setting.listSetting("transport.publish_host", HOST, s -> s, false, Setting.Scope.CLUSTER);
public static final Setting<List<String>> BIND_HOST = Setting.listSetting("transport.bind_host", HOST, s -> s, false, Setting.Scope.CLUSTER);
public static final Setting<String> PORT = new Setting<>("transport.tcp.port", "9300-9400", s -> s, false, Setting.Scope.CLUSTER);
public static final Setting<Integer> PUBLISH_PORT = Setting.intSetting("transport.publish_port", -1, -1, false, Setting.Scope.CLUSTER);
public static final Setting<List<String>> HOST = listSetting("transport.host", emptyList(), s -> s, false, Scope.CLUSTER);
public static final Setting<List<String>> PUBLISH_HOST = listSetting("transport.publish_host", HOST, s -> s, false, Scope.CLUSTER);
public static final Setting<List<String>> BIND_HOST = listSetting("transport.bind_host", HOST, s -> s, false, Scope.CLUSTER);
public static final Setting<String> PORT = new Setting<>("transport.tcp.port", "9300-9400", s -> s, false, Scope.CLUSTER);
public static final Setting<Integer> PUBLISH_PORT = intSetting("transport.publish_port", -1, -1, false, Scope.CLUSTER);
public static final String DEFAULT_PROFILE = "default";
public static final Setting<Settings> TRANSPORT_PROFILES_SETTING = Setting.groupSetting("transport.profiles.", true, Setting.Scope.CLUSTER);
public static final Setting<Settings> TRANSPORT_PROFILES_SETTING = groupSetting("transport.profiles.", true, Scope.CLUSTER);
private TransportSettings() {

View File

@ -97,7 +97,8 @@ public class LocalTransport extends AbstractLifecycleComponent<Transport> implem
int queueSize = this.settings.getAsInt(TRANSPORT_LOCAL_QUEUE, -1);
logger.debug("creating [{}] workers, queue_size [{}]", workerCount, queueSize);
final ThreadFactory threadFactory = EsExecutors.daemonThreadFactory(this.settings, LOCAL_TRANSPORT_THREAD_NAME_PREFIX);
this.workers = EsExecutors.newFixed(LOCAL_TRANSPORT_THREAD_NAME_PREFIX, workerCount, queueSize, threadFactory, threadPool.getThreadContext());
this.workers = EsExecutors.newFixed(LOCAL_TRANSPORT_THREAD_NAME_PREFIX, workerCount, queueSize, threadFactory,
threadPool.getThreadContext());
this.namedWriteableRegistry = namedWriteableRegistry;
}
@ -199,7 +200,8 @@ public class LocalTransport extends AbstractLifecycleComponent<Transport> implem
}
@Override
public void sendRequest(final DiscoveryNode node, final long requestId, final String action, final TransportRequest request, TransportRequestOptions options) throws IOException, TransportException {
public void sendRequest(final DiscoveryNode node, final long requestId, final String action, final TransportRequest request,
TransportRequestOptions options) throws IOException, TransportException {
final Version version = Version.smallest(node.version(), this.version);
try (BytesStreamOutput stream = new BytesStreamOutput()) {
@ -237,7 +239,8 @@ public class LocalTransport extends AbstractLifecycleComponent<Transport> implem
return this.workers;
}
protected void messageReceived(byte[] data, String action, LocalTransport sourceTransport, Version version, @Nullable final Long sendRequestId) {
protected void messageReceived(byte[] data, String action, LocalTransport sourceTransport, Version version,
@Nullable final Long sendRequestId) {
Transports.assertTransportThread();
try {
transportServiceAdapter.received(data.length);
@ -278,7 +281,8 @@ public class LocalTransport extends AbstractLifecycleComponent<Transport> implem
stream = new NamedWriteableAwareStreamInput(stream, namedWriteableRegistry);
final String action = stream.readString();
transportServiceAdapter.onRequestReceived(requestId, action);
final LocalTransportChannel transportChannel = new LocalTransportChannel(this, transportServiceAdapter, sourceTransport, action, requestId, version);
final LocalTransportChannel transportChannel = new LocalTransportChannel(this, transportServiceAdapter, sourceTransport, action,
requestId, version);
try {
final RequestHandlerRegistry reg = transportServiceAdapter.getRequestHandler(action);
if (reg == null) {
@ -334,7 +338,8 @@ public class LocalTransport extends AbstractLifecycleComponent<Transport> implem
try {
response.readFrom(buffer);
} catch (Throwable e) {
handleException(handler, new TransportSerializationException("Failed to deserialize response of type [" + response.getClass().getName() + "]", e));
handleException(handler, new TransportSerializationException(
"Failed to deserialize response of type [" + response.getClass().getName() + "]", e));
return;
}
handleParsedResponse(response, handler);

View File

@ -46,7 +46,8 @@ public class LocalTransportChannel implements TransportChannel {
private final long requestId;
private final Version version;
public LocalTransportChannel(LocalTransport sourceTransport, TransportServiceAdapter sourceTransportServiceAdapter, LocalTransport targetTransport, String action, long requestId, Version version) {
public LocalTransportChannel(LocalTransport sourceTransport, TransportServiceAdapter sourceTransportServiceAdapter,
LocalTransport targetTransport, String action, long requestId, Version version) {
this.sourceTransport = sourceTransport;
this.sourceTransportServiceAdapter = sourceTransportServiceAdapter;
this.targetTransport = targetTransport;
@ -94,7 +95,8 @@ public class LocalTransportChannel implements TransportChannel {
public void sendResponse(Throwable error) throws IOException {
BytesStreamOutput stream = new BytesStreamOutput();
writeResponseExceptionHeader(stream);
RemoteTransportException tx = new RemoteTransportException(targetTransport.nodeName(), targetTransport.boundAddress().boundAddresses()[0], action, error);
RemoteTransportException tx = new RemoteTransportException(targetTransport.nodeName(),
targetTransport.boundAddress().boundAddresses()[0], action, error);
stream.writeThrowable(tx);
final byte[] data = stream.bytes().toBytes();

View File

@ -116,7 +116,9 @@ public class MessageChannelHandler extends SimpleChannelUpstreamHandler {
} catch (NotCompressedException ex) {
int maxToRead = Math.min(buffer.readableBytes(), 10);
int offset = buffer.readerIndex();
StringBuilder sb = new StringBuilder("stream marked as compressed, but no compressor found, first [").append(maxToRead).append("] content bytes out of [").append(buffer.readableBytes()).append("] readable bytes with message size [").append(size).append("] ").append("] are [");
StringBuilder sb = new StringBuilder("stream marked as compressed, but no compressor found, first [").append(maxToRead)
.append("] content bytes out of [").append(buffer.readableBytes())
.append("] readable bytes with message size [").append(size).append("] ").append("] are [");
for (int i = 0; i < maxToRead; i++) {
sb.append(buffer.getByte(offset + i)).append(",");
}
@ -134,15 +136,17 @@ public class MessageChannelHandler extends SimpleChannelUpstreamHandler {
final int nextByte = streamIn.read();
// calling read() is useful to make sure the message is fully read, even if there some kind of EOS marker
if (nextByte != -1) {
throw new IllegalStateException("Message not fully read (request) for requestId [" + requestId + "], action ["
+ action + "], readerIndex [" + buffer.readerIndex() + "] vs expected [" + expectedIndexReader + "]; resetting");
throw new IllegalStateException("Message not fully read (request) for requestId [" + requestId + "], action [" + action
+ "], readerIndex [" + buffer.readerIndex() + "] vs expected [" + expectedIndexReader + "]; resetting");
}
if (buffer.readerIndex() < expectedIndexReader) {
throw new IllegalStateException("Message is fully read (request), yet there are " + (expectedIndexReader - buffer.readerIndex()) + " remaining bytes; resetting");
throw new IllegalStateException("Message is fully read (request), yet there are "
+ (expectedIndexReader - buffer.readerIndex()) + " remaining bytes; resetting");
}
if (buffer.readerIndex() > expectedIndexReader) {
throw new IllegalStateException("Message read past expected size (request) for requestId [" + requestId + "], action ["
+ action + "], readerIndex [" + buffer.readerIndex() + "] vs expected [" + expectedIndexReader + "]; resetting");
throw new IllegalStateException(
"Message read past expected size (request) for requestId [" + requestId + "], action [" + action
+ "], readerIndex [" + buffer.readerIndex() + "] vs expected [" + expectedIndexReader + "]; resetting");
}
} else {
@ -163,11 +167,12 @@ public class MessageChannelHandler extends SimpleChannelUpstreamHandler {
+ handler + "], error [" + TransportStatus.isError(status) + "]; resetting");
}
if (buffer.readerIndex() < expectedIndexReader) {
throw new IllegalStateException("Message is fully read (response), yet there are " + (expectedIndexReader - buffer.readerIndex()) + " remaining bytes; resetting");
throw new IllegalStateException("Message is fully read (response), yet there are "
+ (expectedIndexReader - buffer.readerIndex()) + " remaining bytes; resetting");
}
if (buffer.readerIndex() > expectedIndexReader) {
throw new IllegalStateException("Message read past expected size (response) for requestId [" + requestId + "], handler ["
+ handler + "], error [" + TransportStatus.isError(status) + "]; resetting");
throw new IllegalStateException("Message read past expected size (response) for requestId [" + requestId
+ "], handler [" + handler + "], error [" + TransportStatus.isError(status) + "]; resetting");
}
}
@ -193,7 +198,8 @@ public class MessageChannelHandler extends SimpleChannelUpstreamHandler {
try {
response.readFrom(buffer);
} catch (Throwable e) {
handleException(handler, new TransportSerializationException("Failed to deserialize response of type [" + response.getClass().getName() + "]", e));
handleException(handler, new TransportSerializationException(
"Failed to deserialize response of type [" + response.getClass().getName() + "]", e));
return;
}
try {
@ -247,7 +253,8 @@ public class MessageChannelHandler extends SimpleChannelUpstreamHandler {
buffer = new NamedWriteableAwareStreamInput(buffer, transport.namedWriteableRegistry);
final String action = buffer.readString();
transportServiceAdapter.onRequestReceived(requestId, action);
final NettyTransportChannel transportChannel = new NettyTransportChannel(transport, transportServiceAdapter, action, channel, requestId, version, profileName);
final NettyTransportChannel transportChannel = new NettyTransportChannel(transport, transportServiceAdapter, action, channel,
requestId, version, profileName);
try {
final RequestHandlerRegistry reg = transportServiceAdapter.getRequestHandler(action);
if (reg == null) {

View File

@ -42,6 +42,7 @@ import org.elasticsearch.common.network.NetworkService;
import org.elasticsearch.common.network.NetworkService.TcpSettings;
import org.elasticsearch.common.network.NetworkUtils;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Setting.Scope;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.BoundTransportAddress;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
@ -119,6 +120,10 @@ import java.util.regex.Matcher;
import java.util.regex.Pattern;
import static java.util.Collections.unmodifiableMap;
import static org.elasticsearch.common.settings.Setting.boolSetting;
import static org.elasticsearch.common.settings.Setting.byteSizeSetting;
import static org.elasticsearch.common.settings.Setting.intSetting;
import static org.elasticsearch.common.settings.Setting.timeSetting;
import static org.elasticsearch.common.settings.Settings.settingsBuilder;
import static org.elasticsearch.common.transport.NetworkExceptionHelper.isCloseConnectionException;
import static org.elasticsearch.common.transport.NetworkExceptionHelper.isConnectException;
@ -143,21 +148,33 @@ public class NettyTransport extends AbstractLifecycleComponent<Transport> implem
public static final String TRANSPORT_CLIENT_BOSS_THREAD_NAME_PREFIX = "transport_client_boss";
public static final Setting<Integer> WORKER_COUNT = new Setting<>("transport.netty.worker_count",
(s) -> Integer.toString(EsExecutors.boundedNumberOfProcessors(s) * 2), (s) -> Setting.parseInt(s, 1, "transport.netty.worker_count"),
false, Setting.Scope.CLUSTER);
public static final Setting<Integer> CONNECTIONS_PER_NODE_RECOVERY = Setting.intSetting("transport.connections_per_node.recovery", 2, 1, false, Setting.Scope.CLUSTER);
public static final Setting<Integer> CONNECTIONS_PER_NODE_BULK = Setting.intSetting("transport.connections_per_node.bulk", 3, 1, false, Setting.Scope.CLUSTER);
public static final Setting<Integer> CONNECTIONS_PER_NODE_REG = Setting.intSetting("transport.connections_per_node.reg", 6, 1, false, Setting.Scope.CLUSTER);
public static final Setting<Integer> CONNECTIONS_PER_NODE_STATE = Setting.intSetting("transport.connections_per_node.state", 1, 1, false, Setting.Scope.CLUSTER);
public static final Setting<Integer> CONNECTIONS_PER_NODE_PING = Setting.intSetting("transport.connections_per_node.ping", 1, 1, false, Setting.Scope.CLUSTER);
(s) -> Integer.toString(EsExecutors.boundedNumberOfProcessors(s) * 2),
(s) -> Setting.parseInt(s, 1, "transport.netty.worker_count"), false, Setting.Scope.CLUSTER);
public static final Setting<Integer> CONNECTIONS_PER_NODE_RECOVERY = intSetting("transport.connections_per_node.recovery", 2, 1, false,
Scope.CLUSTER);
public static final Setting<Integer> CONNECTIONS_PER_NODE_BULK = intSetting("transport.connections_per_node.bulk", 3, 1, false,
Scope.CLUSTER);
public static final Setting<Integer> CONNECTIONS_PER_NODE_REG = intSetting("transport.connections_per_node.reg", 6, 1, false,
Scope.CLUSTER);
public static final Setting<Integer> CONNECTIONS_PER_NODE_STATE = intSetting("transport.connections_per_node.state", 1, 1, false,
Scope.CLUSTER);
public static final Setting<Integer> CONNECTIONS_PER_NODE_PING = intSetting("transport.connections_per_node.ping", 1, 1, false,
Scope.CLUSTER);
// the scheduled internal ping interval setting, defaults to disabled (-1)
public static final Setting<TimeValue> PING_SCHEDULE = Setting.timeSetting("transport.ping_schedule", TimeValue.timeValueSeconds(-1), false, Setting.Scope.CLUSTER);
public static final Setting<Boolean> TCP_BLOCKING_CLIENT = Setting.boolSetting("transport.tcp.blocking_client", TcpSettings.TCP_BLOCKING_CLIENT, false, Setting.Scope.CLUSTER);
public static final Setting<TimeValue> TCP_CONNECT_TIMEOUT = Setting.timeSetting("transport.tcp.connect_timeout", TcpSettings.TCP_CONNECT_TIMEOUT, false, Setting.Scope.CLUSTER);
public static final Setting<Boolean> TCP_NO_DELAY = Setting.boolSetting("transport.tcp_no_delay", TcpSettings.TCP_NO_DELAY, false, Setting.Scope.CLUSTER);
public static final Setting<Boolean> TCP_KEEP_ALIVE = Setting.boolSetting("transport.tcp.keep_alive", TcpSettings.TCP_KEEP_ALIVE, false, Setting.Scope.CLUSTER);
public static final Setting<Boolean> TCP_BLOCKING_SERVER = Setting.boolSetting("transport.tcp.blocking_server", TcpSettings.TCP_BLOCKING_SERVER, false, Setting.Scope.CLUSTER);
public static final Setting<Boolean> TCP_REUSE_ADDRESS = Setting.boolSetting("transport.tcp.reuse_address", TcpSettings.TCP_REUSE_ADDRESS, false, Setting.Scope.CLUSTER);
public static final Setting<TimeValue> PING_SCHEDULE = timeSetting("transport.ping_schedule", TimeValue.timeValueSeconds(-1), false,
Setting.Scope.CLUSTER);
public static final Setting<Boolean> TCP_BLOCKING_CLIENT = boolSetting("transport.tcp.blocking_client", TcpSettings.TCP_BLOCKING_CLIENT,
false, Setting.Scope.CLUSTER);
public static final Setting<TimeValue> TCP_CONNECT_TIMEOUT = timeSetting("transport.tcp.connect_timeout",
TcpSettings.TCP_CONNECT_TIMEOUT, false, Setting.Scope.CLUSTER);
public static final Setting<Boolean> TCP_NO_DELAY = boolSetting("transport.tcp_no_delay", TcpSettings.TCP_NO_DELAY, false,
Setting.Scope.CLUSTER);
public static final Setting<Boolean> TCP_KEEP_ALIVE = boolSetting("transport.tcp.keep_alive", TcpSettings.TCP_KEEP_ALIVE, false,
Setting.Scope.CLUSTER);
public static final Setting<Boolean> TCP_BLOCKING_SERVER = boolSetting("transport.tcp.blocking_server", TcpSettings.TCP_BLOCKING_SERVER,
false, Setting.Scope.CLUSTER);
public static final Setting<Boolean> TCP_REUSE_ADDRESS = boolSetting("transport.tcp.reuse_address", TcpSettings.TCP_REUSE_ADDRESS,
false, Setting.Scope.CLUSTER);
public static final Setting<ByteSizeValue> TCP_SEND_BUFFER_SIZE = Setting.byteSizeSetting("transport.tcp.send_buffer_size", TcpSettings.TCP_SEND_BUFFER_SIZE, false, Setting.Scope.CLUSTER);
public static final Setting<ByteSizeValue> TCP_RECEIVE_BUFFER_SIZE = Setting.byteSizeSetting("transport.tcp.receive_buffer_size", TcpSettings.TCP_RECEIVE_BUFFER_SIZE, false, Setting.Scope.CLUSTER);
@ -165,9 +182,9 @@ public class NettyTransport extends AbstractLifecycleComponent<Transport> implem
public static final Setting<ByteSizeValue> NETTY_MAX_CUMULATION_BUFFER_CAPACITY = Setting.byteSizeSetting("transport.netty.max_cumulation_buffer_capacity", new ByteSizeValue(-1), false, Setting.Scope.CLUSTER);
public static final Setting<Integer> NETTY_MAX_COMPOSITE_BUFFER_COMPONENTS = Setting.intSetting("transport.netty.max_composite_buffer_components", -1, -1, false, Setting.Scope.CLUSTER);
// See AdaptiveReceiveBufferSizePredictor#DEFAULT_XXX for default values in netty..., we can use higher ones for us, even fixed one
public static final Setting<ByteSizeValue> NETTY_RECEIVE_PREDICTOR_SIZE = Setting.byteSizeSetting("transport.netty.receive_predictor_size",
public static final Setting<ByteSizeValue> NETTY_RECEIVE_PREDICTOR_SIZE = Setting.byteSizeSetting(
"transport.netty.receive_predictor_size",
settings -> {
long defaultReceiverPredictor = 512 * 1024;
if (JvmInfo.jvmInfo().getMem().getDirectMemoryMax().bytes() > 0) {
@ -177,10 +194,11 @@ public class NettyTransport extends AbstractLifecycleComponent<Transport> implem
}
return new ByteSizeValue(defaultReceiverPredictor).toString();
}, false, Setting.Scope.CLUSTER);
public static final Setting<ByteSizeValue> NETTY_RECEIVE_PREDICTOR_MIN = Setting.byteSizeSetting("transport.netty.receive_predictor_min", NETTY_RECEIVE_PREDICTOR_SIZE, false, Setting.Scope.CLUSTER);
public static final Setting<ByteSizeValue> NETTY_RECEIVE_PREDICTOR_MAX = Setting.byteSizeSetting("transport.netty.receive_predictor_max", NETTY_RECEIVE_PREDICTOR_SIZE, false, Setting.Scope.CLUSTER);
public static final Setting<Integer> NETTY_BOSS_COUNT = Setting.intSetting("transport.netty.boss_count", 1, 1, false, Setting.Scope.CLUSTER);
public static final Setting<ByteSizeValue> NETTY_RECEIVE_PREDICTOR_MIN = byteSizeSetting("transport.netty.receive_predictor_min",
NETTY_RECEIVE_PREDICTOR_SIZE, false, Scope.CLUSTER);
public static final Setting<ByteSizeValue> NETTY_RECEIVE_PREDICTOR_MAX = byteSizeSetting("transport.netty.receive_predictor_max",
NETTY_RECEIVE_PREDICTOR_SIZE, false, Scope.CLUSTER);
public static final Setting<Integer> NETTY_BOSS_COUNT = intSetting("transport.netty.boss_count", 1, 1, false, Scope.CLUSTER);
protected final NetworkService networkService;
protected final Version version;
@ -226,7 +244,8 @@ public class NettyTransport extends AbstractLifecycleComponent<Transport> implem
final ScheduledPing scheduledPing;
@Inject
public NettyTransport(Settings settings, ThreadPool threadPool, NetworkService networkService, BigArrays bigArrays, Version version, NamedWriteableRegistry namedWriteableRegistry) {
public NettyTransport(Settings settings, ThreadPool threadPool, NetworkService networkService, BigArrays bigArrays, Version version,
NamedWriteableRegistry namedWriteableRegistry) {
super(settings);
this.threadPool = threadPool;
this.networkService = networkService;
@ -252,7 +271,8 @@ public class NettyTransport extends AbstractLifecycleComponent<Transport> implem
if (receivePredictorMax.bytes() == receivePredictorMin.bytes()) {
receiveBufferSizePredictorFactory = new FixedReceiveBufferSizePredictorFactory((int) receivePredictorMax.bytes());
} else {
receiveBufferSizePredictorFactory = new AdaptiveReceiveBufferSizePredictorFactory((int) receivePredictorMin.bytes(), (int) receivePredictorMin.bytes(), (int) receivePredictorMax.bytes());
receiveBufferSizePredictorFactory = new AdaptiveReceiveBufferSizePredictorFactory((int) receivePredictorMin.bytes(),
(int) receivePredictorMin.bytes(), (int) receivePredictorMax.bytes());
}
this.scheduledPing = new ScheduledPing();
@ -305,7 +325,8 @@ public class NettyTransport extends AbstractLifecycleComponent<Transport> implem
String name = entry.getKey();
if (!Strings.hasLength(name)) {
logger.info("transport profile configured without a name. skipping profile with settings [{}]", profileSettings.toDelimitedString(','));
logger.info("transport profile configured without a name. skipping profile with settings [{}]",
profileSettings.toDelimitedString(','));
continue;
} else if (TransportSettings.DEFAULT_PROFILE.equals(name)) {
profileSettings = settingsBuilder()
@ -345,13 +366,16 @@ public class NettyTransport extends AbstractLifecycleComponent<Transport> implem
private ClientBootstrap createClientBootstrap() {
if (blockingClient) {
clientBootstrap = new ClientBootstrap(new OioClientSocketChannelFactory(Executors.newCachedThreadPool(daemonThreadFactory(settings, TRANSPORT_CLIENT_WORKER_THREAD_NAME_PREFIX))));
clientBootstrap = new ClientBootstrap(new OioClientSocketChannelFactory(
Executors.newCachedThreadPool(daemonThreadFactory(settings, TRANSPORT_CLIENT_WORKER_THREAD_NAME_PREFIX))));
} else {
int bossCount = NETTY_BOSS_COUNT.get(settings);
clientBootstrap = new ClientBootstrap(new NioClientSocketChannelFactory(
Executors.newCachedThreadPool(daemonThreadFactory(settings, TRANSPORT_CLIENT_BOSS_THREAD_NAME_PREFIX)),
bossCount,
new NioWorkerPool(Executors.newCachedThreadPool(daemonThreadFactory(settings, TRANSPORT_CLIENT_WORKER_THREAD_NAME_PREFIX)), workerCount),
clientBootstrap = new ClientBootstrap(
new NioClientSocketChannelFactory(
Executors.newCachedThreadPool(daemonThreadFactory(settings, TRANSPORT_CLIENT_BOSS_THREAD_NAME_PREFIX)),
bossCount,
new NioWorkerPool(Executors.newCachedThreadPool(
daemonThreadFactory(settings, TRANSPORT_CLIENT_WORKER_THREAD_NAME_PREFIX)), workerCount),
new HashedWheelTimer(daemonThreadFactory(settings, "transport_client_timer"))));
}
clientBootstrap.setPipelineFactory(configureClientChannelPipelineFactory());
@ -403,12 +427,14 @@ public class NettyTransport extends AbstractLifecycleComponent<Transport> implem
boolean fallbackReuseAddress = settings.getAsBoolean("transport.netty.reuse_address", TcpSettings.TCP_REUSE_ADDRESS.get(settings));
fallbackSettingsBuilder.put("reuse_address", fallbackReuseAddress);
ByteSizeValue fallbackTcpSendBufferSize = settings.getAsBytesSize("transport.netty.tcp_send_buffer_size", TcpSettings.TCP_SEND_BUFFER_SIZE.get(settings));
ByteSizeValue fallbackTcpSendBufferSize = settings.getAsBytesSize("transport.netty.tcp_send_buffer_size",
TCP_SEND_BUFFER_SIZE.get(settings));
if (fallbackTcpSendBufferSize.bytes() >= 0) {
fallbackSettingsBuilder.put("tcp_send_buffer_size", fallbackTcpSendBufferSize);
}
ByteSizeValue fallbackTcpBufferSize = settings.getAsBytesSize("transport.netty.tcp_receive_buffer_size", TcpSettings.TCP_RECEIVE_BUFFER_SIZE.get(settings));
ByteSizeValue fallbackTcpBufferSize = settings.getAsBytesSize("transport.netty.tcp_receive_buffer_size",
TCP_RECEIVE_BUFFER_SIZE.get(settings));
if (fallbackTcpBufferSize.bytes() >= 0) {
fallbackSettingsBuilder.put("tcp_receive_buffer_size", fallbackTcpBufferSize);
}
@ -485,7 +511,8 @@ public class NettyTransport extends AbstractLifecycleComponent<Transport> implem
return boundSocket.get();
}
private BoundTransportAddress createBoundTransportAddress(String name, Settings profileSettings, List<InetSocketAddress> boundAddresses) {
private BoundTransportAddress createBoundTransportAddress(String name, Settings profileSettings,
List<InetSocketAddress> boundAddresses) {
String[] boundAddressesHostStrings = new String[boundAddresses.size()];
TransportAddress[] transportBoundAddresses = new TransportAddress[boundAddresses.size()];
for (int i = 0; i < boundAddresses.size(); i++) {
@ -531,7 +558,8 @@ public class NettyTransport extends AbstractLifecycleComponent<Transport> implem
// TODO: In case of DEFAULT_PROFILE we should probably fail here, as publish address does not match any bound address
// In case of a custom profile, we might use the publish address of the default profile
publishPort = boundAddresses.get(0).getPort();
logger.warn("Publish port not found by matching publish address [{}] to bound addresses [{}], falling back to port [{}] of first bound address", publishInetAddress, boundAddresses, publishPort);
logger.warn("Publish port not found by matching publish address [{}] to bound addresses [{}], "
+ "falling back to port [{}] of first bound address", publishInetAddress, boundAddresses, publishPort);
}
final TransportAddress publishAddress = new InetSocketTransportAddress(new InetSocketAddress(publishInetAddress, publishPort));
@ -549,8 +577,13 @@ public class NettyTransport extends AbstractLifecycleComponent<Transport> implem
ByteSizeValue tcpSendBufferSize = TCP_SEND_BUFFER_SIZE.getDefault(settings);
ByteSizeValue tcpReceiveBufferSize = TCP_RECEIVE_BUFFER_SIZE.getDefault(settings);
logger.debug("using profile[{}], worker_count[{}], port[{}], bind_host[{}], publish_host[{}], compress[{}], connect_timeout[{}], connections_per_node[{}/{}/{}/{}/{}], receive_predictor[{}->{}]",
name, workerCount, port, bindHost, publishHost, compress, connectTimeout, connectionsPerNodeRecovery, connectionsPerNodeBulk, connectionsPerNodeReg, connectionsPerNodeState, connectionsPerNodePing, receivePredictorMin, receivePredictorMax);
if (logger.isDebugEnabled()) {
logger.debug("using profile[{}], worker_count[{}], port[{}], bind_host[{}], publish_host[{}], compress[{}], "
+ "connect_timeout[{}], connections_per_node[{}/{}/{}/{}/{}], receive_predictor[{}->{}]",
name, workerCount, port, bindHost, publishHost, compress, connectTimeout, connectionsPerNodeRecovery,
connectionsPerNodeBulk, connectionsPerNodeReg, connectionsPerNodeState, connectionsPerNodePing, receivePredictorMin,
receivePredictorMax);
}
final ThreadFactory bossFactory = daemonThreadFactory(this.settings, HTTP_SERVER_BOSS_THREAD_NAME_PREFIX, name);
final ThreadFactory workerFactory = daemonThreadFactory(this.settings, HTTP_SERVER_WORKER_THREAD_NAME_PREFIX, name);
@ -739,7 +772,8 @@ public class NettyTransport extends AbstractLifecycleComponent<Transport> implem
return;
}
if (isCloseConnectionException(e.getCause())) {
logger.trace("close connection exception caught on transport layer [{}], disconnecting from relevant node", e.getCause(), ctx.getChannel());
logger.trace("close connection exception caught on transport layer [{}], disconnecting from relevant node", e.getCause(),
ctx.getChannel());
// close the channel, which will cause a node to be disconnected if relevant
ctx.getChannel().close();
disconnectFromNodeChannel(ctx.getChannel(), e.getCause());
@ -754,7 +788,8 @@ public class NettyTransport extends AbstractLifecycleComponent<Transport> implem
ctx.getChannel().close();
disconnectFromNodeChannel(ctx.getChannel(), e.getCause());
} else if (e.getCause() instanceof CancelledKeyException) {
logger.trace("cancelled key exception caught on transport layer [{}], disconnecting from relevant node", e.getCause(), ctx.getChannel());
logger.trace("cancelled key exception caught on transport layer [{}], disconnecting from relevant node", e.getCause(),
ctx.getChannel());
// close the channel as safe measure, which will cause a node to be disconnected if relevant
ctx.getChannel().close();
disconnectFromNodeChannel(ctx.getChannel(), e.getCause());
@ -800,7 +835,8 @@ public class NettyTransport extends AbstractLifecycleComponent<Transport> implem
}
@Override
public void sendRequest(final DiscoveryNode node, final long requestId, final String action, final TransportRequest request, TransportRequestOptions options) throws IOException, TransportException {
public void sendRequest(final DiscoveryNode node, final long requestId, final String action, final TransportRequest request,
TransportRequestOptions options) throws IOException, TransportException {
Channel targetChannel = nodeChannel(node, options);
@ -902,7 +938,9 @@ public class NettyTransport extends AbstractLifecycleComponent<Transport> implem
if (light) {
nodeChannels = connectToChannelsLight(node);
} else {
nodeChannels = new NodeChannels(new Channel[connectionsPerNodeRecovery], new Channel[connectionsPerNodeBulk], new Channel[connectionsPerNodeReg], new Channel[connectionsPerNodeState], new Channel[connectionsPerNodePing]);
nodeChannels = new NodeChannels(new Channel[connectionsPerNodeRecovery], new Channel[connectionsPerNodeBulk],
new Channel[connectionsPerNodeReg], new Channel[connectionsPerNodeState],
new Channel[connectionsPerNodePing]);
try {
connectToChannels(nodeChannels, node);
} catch (Throwable e) {

View File

@ -53,7 +53,8 @@ public class NettyTransportChannel implements TransportChannel {
private final long requestId;
private final String profileName;
public NettyTransportChannel(NettyTransport transport, TransportServiceAdapter transportServiceAdapter, String action, Channel channel, long requestId, Version version, String profileName) {
public NettyTransportChannel(NettyTransport transport, TransportServiceAdapter transportServiceAdapter, String action, Channel channel,
long requestId, Version version, String profileName) {
this.transportServiceAdapter = transportServiceAdapter;
this.version = version;
this.transport = transport;
@ -119,7 +120,8 @@ public class NettyTransportChannel implements TransportChannel {
public void sendResponse(Throwable error) throws IOException {
BytesStreamOutput stream = new BytesStreamOutput();
stream.skip(NettyHeader.HEADER_SIZE);
RemoteTransportException tx = new RemoteTransportException(transport.nodeName(), transport.wrapAddress(channel.getLocalAddress()), action, error);
RemoteTransportException tx = new RemoteTransportException(transport.nodeName(), transport.wrapAddress(channel.getLocalAddress()),
action, error);
stream.writeThrowable(tx);
byte status = 0;
status = TransportStatus.setResponse(status);

View File

@ -80,8 +80,8 @@ public class SizeHeaderFrameDecoder extends FrameDecoder {
}
// safety against too large frames being sent
if (dataLen > NINETY_PER_HEAP_SIZE) {
throw new TooLongFrameException(
"transport content length received [" + new ByteSizeValue(dataLen) + "] exceeded [" + new ByteSizeValue(NINETY_PER_HEAP_SIZE) + "]");
throw new TooLongFrameException("transport content length received [" + new ByteSizeValue(dataLen) + "] exceeded ["
+ new ByteSizeValue(NINETY_PER_HEAP_SIZE) + "]");
}
if (buffer.readableBytes() < dataLen + 6) {

View File

@ -20,6 +20,7 @@
package org.elasticsearch.tribe;
import com.carrotsearch.hppc.cursors.ObjectObjectCursor;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.support.master.TransportMasterNodeReadAction;
import org.elasticsearch.cluster.ClusterChangedEvent;
@ -83,8 +84,10 @@ import static java.util.Collections.unmodifiableMap;
*/
public class TribeService extends AbstractLifecycleComponent<TribeService> {
public static final ClusterBlock TRIBE_METADATA_BLOCK = new ClusterBlock(10, "tribe node, metadata not allowed", false, false, RestStatus.BAD_REQUEST, EnumSet.of(ClusterBlockLevel.METADATA_READ, ClusterBlockLevel.METADATA_WRITE));
public static final ClusterBlock TRIBE_WRITE_BLOCK = new ClusterBlock(11, "tribe node, write not allowed", false, false, RestStatus.BAD_REQUEST, EnumSet.of(ClusterBlockLevel.WRITE));
public static final ClusterBlock TRIBE_METADATA_BLOCK = new ClusterBlock(10, "tribe node, metadata not allowed", false, false,
RestStatus.BAD_REQUEST, EnumSet.of(ClusterBlockLevel.METADATA_READ, ClusterBlockLevel.METADATA_WRITE));
public static final ClusterBlock TRIBE_WRITE_BLOCK = new ClusterBlock(11, "tribe node, write not allowed", false, false,
RestStatus.BAD_REQUEST, EnumSet.of(ClusterBlockLevel.WRITE));
public static Settings processSettings(Settings settings) {
if (TRIBE_NAME_SETTING.exists(settings)) {
@ -106,7 +109,8 @@ public class TribeService extends AbstractLifecycleComponent<TribeService> {
Settings.Builder sb = Settings.builder().put(settings);
sb.put(Node.NODE_CLIENT_SETTING.getKey(), true); // this node should just act as a node client
sb.put(DiscoveryModule.DISCOVERY_TYPE_SETTING.getKey(), "local"); // a tribe node should not use zen discovery
sb.put(DiscoveryService.INITIAL_STATE_TIMEOUT_SETTING.getKey(), 0); // nothing is going to be discovered, since no master will be elected
// nothing is going to be discovered, since no master will be elected
sb.put(DiscoveryService.INITIAL_STATE_TIMEOUT_SETTING.getKey(), 0);
if (sb.get("cluster.name") == null) {
sb.put("cluster.name", "tribe_" + Strings.randomBase64UUID()); // make sure it won't join other tribe nodes in the same JVM
}
@ -114,7 +118,8 @@ public class TribeService extends AbstractLifecycleComponent<TribeService> {
return sb.build();
}
private static final Setting<String> TRIBE_NAME_SETTING = Setting.simpleString("tribe.name", false, Setting.Scope.CLUSTER); // internal settings only
// internal settings only
private static final Setting<String> TRIBE_NAME_SETTING = Setting.simpleString("tribe.name", false, Setting.Scope.CLUSTER);
private final ClusterService clusterService;
private final String[] blockIndicesWrite;
private final String[] blockIndicesRead;
@ -125,14 +130,20 @@ public class TribeService extends AbstractLifecycleComponent<TribeService> {
if (ON_CONFLICT_ANY.equals(s) || ON_CONFLICT_DROP.equals(s) || s.startsWith(ON_CONFLICT_PREFER)) {
return s;
}
throw new IllegalArgumentException("Invalid value for [tribe.on_conflict] must be either [any, drop or start with prefer_] but was: " +s);
throw new IllegalArgumentException(
"Invalid value for [tribe.on_conflict] must be either [any, drop or start with prefer_] but was: " + s);
}, false, Setting.Scope.CLUSTER);
public static final Setting<Boolean> BLOCKS_METADATA_SETTING = Setting.boolSetting("tribe.blocks.metadata", false, false, Setting.Scope.CLUSTER);
public static final Setting<Boolean> BLOCKS_WRITE_SETTING = Setting.boolSetting("tribe.blocks.write", false, false, Setting.Scope.CLUSTER);
public static final Setting<List<String>> BLOCKS_WRITE_INDICES_SETTING = Setting.listSetting("tribe.blocks.write.indices", Collections.emptyList(), Function.identity(), false, Setting.Scope.CLUSTER);
public static final Setting<List<String>> BLOCKS_READ_INDICES_SETTING = Setting.listSetting("tribe.blocks.read.indices", Collections.emptyList(), Function.identity(), false, Setting.Scope.CLUSTER);
public static final Setting<List<String>> BLOCKS_METADATA_INDICES_SETTING = Setting.listSetting("tribe.blocks.metadata.indices", Collections.emptyList(), Function.identity(), false, Setting.Scope.CLUSTER);
public static final Setting<Boolean> BLOCKS_METADATA_SETTING = Setting.boolSetting("tribe.blocks.metadata", false, false,
Setting.Scope.CLUSTER);
public static final Setting<Boolean> BLOCKS_WRITE_SETTING = Setting.boolSetting("tribe.blocks.write", false, false,
Setting.Scope.CLUSTER);
public static final Setting<List<String>> BLOCKS_WRITE_INDICES_SETTING = Setting.listSetting("tribe.blocks.write.indices",
Collections.emptyList(), Function.identity(), false, Setting.Scope.CLUSTER);
public static final Setting<List<String>> BLOCKS_READ_INDICES_SETTING = Setting.listSetting("tribe.blocks.read.indices",
Collections.emptyList(), Function.identity(), false, Setting.Scope.CLUSTER);
public static final Setting<List<String>> BLOCKS_METADATA_INDICES_SETTING = Setting.listSetting("tribe.blocks.metadata.indices",
Collections.emptyList(), Function.identity(), false, Setting.Scope.CLUSTER);
private final String onConflict;
private final Set<String> droppedIndices = ConcurrentCollections.newConcurrentSet();
@ -304,7 +315,8 @@ public class TribeService extends AbstractLifecycleComponent<TribeService> {
tribeAttr.put(attr.key, attr.value);
}
tribeAttr.put(TRIBE_NAME_SETTING.getKey(), tribeName);
DiscoveryNode discoNode = new DiscoveryNode(tribe.name(), tribe.id(), tribe.getHostName(), tribe.getHostAddress(), tribe.address(), unmodifiableMap(tribeAttr), tribe.version());
DiscoveryNode discoNode = new DiscoveryNode(tribe.name(), tribe.id(), tribe.getHostName(), tribe.getHostAddress(),
tribe.address(), unmodifiableMap(tribeAttr), tribe.version());
clusterStateChanged = true;
logger.info("[{}] adding node [{}]", tribeName, discoNode);
nodes.put(discoNode);
@ -328,7 +340,8 @@ public class TribeService extends AbstractLifecycleComponent<TribeService> {
// always make sure to update the metadata and routing table, in case
// there are changes in them (new mapping, shards moving from initializing to started)
routingTable.add(tribeState.routingTable().index(index.getIndex()));
Settings tribeSettings = Settings.builder().put(tribeIndex.getSettings()).put(TRIBE_NAME_SETTING.getKey(), tribeName).build();
Settings tribeSettings = Settings.builder().put(tribeIndex.getSettings())
.put(TRIBE_NAME_SETTING.getKey(), tribeName).build();
metaData.put(IndexMetaData.builder(tribeIndex).settings(tribeSettings));
}
}
@ -357,7 +370,8 @@ public class TribeService extends AbstractLifecycleComponent<TribeService> {
} else if (ON_CONFLICT_DROP.equals(onConflict)) {
// drop the indices, there is a conflict
clusterStateChanged = true;
logger.info("[{}] dropping index {} due to conflict with [{}]", tribeName, tribeIndex.getIndex(), existingFromTribe);
logger.info("[{}] dropping index {} due to conflict with [{}]", tribeName, tribeIndex.getIndex(),
existingFromTribe);
removeIndex(blocks, metaData, routingTable, tribeIndex);
droppedIndices.add(tribeIndex.getIndex().getName());
} else if (onConflict.startsWith(ON_CONFLICT_PREFER)) {
@ -366,7 +380,8 @@ public class TribeService extends AbstractLifecycleComponent<TribeService> {
if (tribeName.equals(preferredTribeName)) {
// the new one is hte preferred one, replace...
clusterStateChanged = true;
logger.info("[{}] adding index {}, preferred over [{}]", tribeName, tribeIndex.getIndex(), existingFromTribe);
logger.info("[{}] adding index {}, preferred over [{}]", tribeName, tribeIndex.getIndex(),
existingFromTribe);
removeIndex(blocks, metaData, routingTable, tribeIndex);
addNewIndex(tribeState, blocks, metaData, routingTable, tribeIndex);
} // else: either the existing one is the preferred one, or we haven't seen one, carry on
@ -378,17 +393,20 @@ public class TribeService extends AbstractLifecycleComponent<TribeService> {
if (!clusterStateChanged) {
return currentState;
} else {
return ClusterState.builder(currentState).incrementVersion().blocks(blocks).nodes(nodes).metaData(metaData).routingTable(routingTable.build()).build();
return ClusterState.builder(currentState).incrementVersion().blocks(blocks).nodes(nodes).metaData(metaData)
.routingTable(routingTable.build()).build();
}
}
private void removeIndex(ClusterBlocks.Builder blocks, MetaData.Builder metaData, RoutingTable.Builder routingTable, IndexMetaData index) {
private void removeIndex(ClusterBlocks.Builder blocks, MetaData.Builder metaData, RoutingTable.Builder routingTable,
IndexMetaData index) {
metaData.remove(index.getIndex().getName());
routingTable.remove(index.getIndex().getName());
blocks.removeIndexBlocks(index.getIndex().getName());
}
private void addNewIndex(ClusterState tribeState, ClusterBlocks.Builder blocks, MetaData.Builder metaData, RoutingTable.Builder routingTable, IndexMetaData tribeIndex) {
private void addNewIndex(ClusterState tribeState, ClusterBlocks.Builder blocks, MetaData.Builder metaData,
RoutingTable.Builder routingTable, IndexMetaData tribeIndex) {
Settings tribeSettings = Settings.builder().put(tribeIndex.getSettings()).put(TRIBE_NAME_SETTING.getKey(), tribeName).build();
metaData.put(IndexMetaData.builder(tribeIndex).settings(tribeSettings));
routingTable.add(tribeState.routingTable().index(tribeIndex.getIndex()));

View File

@ -83,7 +83,8 @@ public class ResourceWatcherService extends AbstractLifecycleComponent<ResourceW
TimeValue interval = settings.getAsTime("resource.reload.interval.low", Frequency.LOW.interval);
lowMonitor = new ResourceMonitor(interval, Frequency.LOW);
interval = settings.getAsTime("resource.reload.interval.medium", settings.getAsTime("resource.reload.interval", Frequency.MEDIUM.interval));
interval = settings.getAsTime("resource.reload.interval.medium",
settings.getAsTime("resource.reload.interval", Frequency.MEDIUM.interval));
mediumMonitor = new ResourceMonitor(interval, Frequency.MEDIUM);
interval = settings.getAsTime("resource.reload.interval.high", Frequency.HIGH.interval);
highMonitor = new ResourceMonitor(interval, Frequency.HIGH);