Merge pull request #16330 from nik9000/line_length_1
Start to break lines at 140 characters
This commit is contained in:
commit
d91a898f6a
|
@ -31,6 +31,7 @@ import org.elasticsearch.common.metrics.MeanMetric;
|
|||
import org.elasticsearch.common.regex.Regex;
|
||||
import org.elasticsearch.common.settings.ClusterSettings;
|
||||
import org.elasticsearch.common.settings.Setting;
|
||||
import org.elasticsearch.common.settings.Setting.Scope;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.transport.BoundTransportAddress;
|
||||
import org.elasticsearch.common.transport.TransportAddress;
|
||||
|
@ -39,8 +40,8 @@ import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
|
|||
import org.elasticsearch.common.util.concurrent.ConcurrentMapLong;
|
||||
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
|
||||
import org.elasticsearch.common.util.concurrent.FutureUtils;
|
||||
import org.elasticsearch.tasks.TaskManager;
|
||||
import org.elasticsearch.common.util.concurrent.ThreadContext;
|
||||
import org.elasticsearch.tasks.TaskManager;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
|
||||
import java.io.IOException;
|
||||
|
@ -56,6 +57,8 @@ import java.util.concurrent.atomic.AtomicLong;
|
|||
import java.util.function.Function;
|
||||
import java.util.function.Supplier;
|
||||
|
||||
import static java.util.Collections.emptyList;
|
||||
import static org.elasticsearch.common.settings.Setting.listSetting;
|
||||
import static org.elasticsearch.common.settings.Settings.Builder.EMPTY_SETTINGS;
|
||||
|
||||
/**
|
||||
|
@ -92,9 +95,10 @@ public class TransportService extends AbstractLifecycleComponent<TransportServic
|
|||
|
||||
// tracer log
|
||||
|
||||
public static final Setting<List<String>> TRACE_LOG_INCLUDE_SETTING = Setting.listSetting("transport.tracer.include", Collections.emptyList(), Function.identity(), true, Setting.Scope.CLUSTER);
|
||||
public static final Setting<List<String>> TRACE_LOG_EXCLUDE_SETTING = Setting.listSetting("transport.tracer.exclude", Arrays.asList("internal:discovery/zen/fd*", TransportLivenessAction.NAME), Function.identity(), true, Setting.Scope.CLUSTER);
|
||||
|
||||
public static final Setting<List<String>> TRACE_LOG_INCLUDE_SETTING = listSetting("transport.tracer.include", emptyList(),
|
||||
Function.identity(), true, Scope.CLUSTER);
|
||||
public static final Setting<List<String>> TRACE_LOG_EXCLUDE_SETTING = listSetting("transport.tracer.exclude",
|
||||
Arrays.asList("internal:discovery/zen/fd*", TransportLivenessAction.NAME), Function.identity(), true, Scope.CLUSTER);
|
||||
|
||||
private final ESLogger tracerLog;
|
||||
|
||||
|
@ -757,7 +761,8 @@ public class TransportService extends AbstractLifecycleComponent<TransportServic
|
|||
final TransportServiceAdapter adapter;
|
||||
final ThreadPool threadPool;
|
||||
|
||||
public DirectResponseChannel(ESLogger logger, DiscoveryNode localNode, String action, long requestId, TransportServiceAdapter adapter, ThreadPool threadPool) {
|
||||
public DirectResponseChannel(ESLogger logger, DiscoveryNode localNode, String action, long requestId,
|
||||
TransportServiceAdapter adapter, ThreadPool threadPool) {
|
||||
this.logger = logger;
|
||||
this.localNode = localNode;
|
||||
this.action = action;
|
||||
|
|
|
@ -19,11 +19,15 @@
|
|||
package org.elasticsearch.transport;
|
||||
|
||||
import org.elasticsearch.common.settings.Setting;
|
||||
import org.elasticsearch.common.settings.Setting.Scope;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
import static java.util.Collections.emptyList;
|
||||
import static org.elasticsearch.common.settings.Setting.groupSetting;
|
||||
import static org.elasticsearch.common.settings.Setting.intSetting;
|
||||
import static org.elasticsearch.common.settings.Setting.listSetting;
|
||||
|
||||
/**
|
||||
* a collection of settings related to transport components, which are also needed in org.elasticsearch.bootstrap.Security
|
||||
|
@ -31,13 +35,13 @@ import static java.util.Collections.emptyList;
|
|||
*/
|
||||
final public class TransportSettings {
|
||||
|
||||
public static final Setting<List<String>> HOST = Setting.listSetting("transport.host", emptyList(), s -> s, false, Setting.Scope.CLUSTER);
|
||||
public static final Setting<List<String>> PUBLISH_HOST = Setting.listSetting("transport.publish_host", HOST, s -> s, false, Setting.Scope.CLUSTER);
|
||||
public static final Setting<List<String>> BIND_HOST = Setting.listSetting("transport.bind_host", HOST, s -> s, false, Setting.Scope.CLUSTER);
|
||||
public static final Setting<String> PORT = new Setting<>("transport.tcp.port", "9300-9400", s -> s, false, Setting.Scope.CLUSTER);
|
||||
public static final Setting<Integer> PUBLISH_PORT = Setting.intSetting("transport.publish_port", -1, -1, false, Setting.Scope.CLUSTER);
|
||||
public static final Setting<List<String>> HOST = listSetting("transport.host", emptyList(), s -> s, false, Scope.CLUSTER);
|
||||
public static final Setting<List<String>> PUBLISH_HOST = listSetting("transport.publish_host", HOST, s -> s, false, Scope.CLUSTER);
|
||||
public static final Setting<List<String>> BIND_HOST = listSetting("transport.bind_host", HOST, s -> s, false, Scope.CLUSTER);
|
||||
public static final Setting<String> PORT = new Setting<>("transport.tcp.port", "9300-9400", s -> s, false, Scope.CLUSTER);
|
||||
public static final Setting<Integer> PUBLISH_PORT = intSetting("transport.publish_port", -1, -1, false, Scope.CLUSTER);
|
||||
public static final String DEFAULT_PROFILE = "default";
|
||||
public static final Setting<Settings> TRANSPORT_PROFILES_SETTING = Setting.groupSetting("transport.profiles.", true, Setting.Scope.CLUSTER);
|
||||
public static final Setting<Settings> TRANSPORT_PROFILES_SETTING = groupSetting("transport.profiles.", true, Scope.CLUSTER);
|
||||
|
||||
private TransportSettings() {
|
||||
|
||||
|
|
|
@ -97,7 +97,8 @@ public class LocalTransport extends AbstractLifecycleComponent<Transport> implem
|
|||
int queueSize = this.settings.getAsInt(TRANSPORT_LOCAL_QUEUE, -1);
|
||||
logger.debug("creating [{}] workers, queue_size [{}]", workerCount, queueSize);
|
||||
final ThreadFactory threadFactory = EsExecutors.daemonThreadFactory(this.settings, LOCAL_TRANSPORT_THREAD_NAME_PREFIX);
|
||||
this.workers = EsExecutors.newFixed(LOCAL_TRANSPORT_THREAD_NAME_PREFIX, workerCount, queueSize, threadFactory, threadPool.getThreadContext());
|
||||
this.workers = EsExecutors.newFixed(LOCAL_TRANSPORT_THREAD_NAME_PREFIX, workerCount, queueSize, threadFactory,
|
||||
threadPool.getThreadContext());
|
||||
this.namedWriteableRegistry = namedWriteableRegistry;
|
||||
}
|
||||
|
||||
|
@ -199,7 +200,8 @@ public class LocalTransport extends AbstractLifecycleComponent<Transport> implem
|
|||
}
|
||||
|
||||
@Override
|
||||
public void sendRequest(final DiscoveryNode node, final long requestId, final String action, final TransportRequest request, TransportRequestOptions options) throws IOException, TransportException {
|
||||
public void sendRequest(final DiscoveryNode node, final long requestId, final String action, final TransportRequest request,
|
||||
TransportRequestOptions options) throws IOException, TransportException {
|
||||
final Version version = Version.smallest(node.version(), this.version);
|
||||
|
||||
try (BytesStreamOutput stream = new BytesStreamOutput()) {
|
||||
|
@ -237,7 +239,8 @@ public class LocalTransport extends AbstractLifecycleComponent<Transport> implem
|
|||
return this.workers;
|
||||
}
|
||||
|
||||
protected void messageReceived(byte[] data, String action, LocalTransport sourceTransport, Version version, @Nullable final Long sendRequestId) {
|
||||
protected void messageReceived(byte[] data, String action, LocalTransport sourceTransport, Version version,
|
||||
@Nullable final Long sendRequestId) {
|
||||
Transports.assertTransportThread();
|
||||
try {
|
||||
transportServiceAdapter.received(data.length);
|
||||
|
@ -278,7 +281,8 @@ public class LocalTransport extends AbstractLifecycleComponent<Transport> implem
|
|||
stream = new NamedWriteableAwareStreamInput(stream, namedWriteableRegistry);
|
||||
final String action = stream.readString();
|
||||
transportServiceAdapter.onRequestReceived(requestId, action);
|
||||
final LocalTransportChannel transportChannel = new LocalTransportChannel(this, transportServiceAdapter, sourceTransport, action, requestId, version);
|
||||
final LocalTransportChannel transportChannel = new LocalTransportChannel(this, transportServiceAdapter, sourceTransport, action,
|
||||
requestId, version);
|
||||
try {
|
||||
final RequestHandlerRegistry reg = transportServiceAdapter.getRequestHandler(action);
|
||||
if (reg == null) {
|
||||
|
@ -334,7 +338,8 @@ public class LocalTransport extends AbstractLifecycleComponent<Transport> implem
|
|||
try {
|
||||
response.readFrom(buffer);
|
||||
} catch (Throwable e) {
|
||||
handleException(handler, new TransportSerializationException("Failed to deserialize response of type [" + response.getClass().getName() + "]", e));
|
||||
handleException(handler, new TransportSerializationException(
|
||||
"Failed to deserialize response of type [" + response.getClass().getName() + "]", e));
|
||||
return;
|
||||
}
|
||||
handleParsedResponse(response, handler);
|
||||
|
|
|
@ -46,7 +46,8 @@ public class LocalTransportChannel implements TransportChannel {
|
|||
private final long requestId;
|
||||
private final Version version;
|
||||
|
||||
public LocalTransportChannel(LocalTransport sourceTransport, TransportServiceAdapter sourceTransportServiceAdapter, LocalTransport targetTransport, String action, long requestId, Version version) {
|
||||
public LocalTransportChannel(LocalTransport sourceTransport, TransportServiceAdapter sourceTransportServiceAdapter,
|
||||
LocalTransport targetTransport, String action, long requestId, Version version) {
|
||||
this.sourceTransport = sourceTransport;
|
||||
this.sourceTransportServiceAdapter = sourceTransportServiceAdapter;
|
||||
this.targetTransport = targetTransport;
|
||||
|
@ -94,7 +95,8 @@ public class LocalTransportChannel implements TransportChannel {
|
|||
public void sendResponse(Throwable error) throws IOException {
|
||||
BytesStreamOutput stream = new BytesStreamOutput();
|
||||
writeResponseExceptionHeader(stream);
|
||||
RemoteTransportException tx = new RemoteTransportException(targetTransport.nodeName(), targetTransport.boundAddress().boundAddresses()[0], action, error);
|
||||
RemoteTransportException tx = new RemoteTransportException(targetTransport.nodeName(),
|
||||
targetTransport.boundAddress().boundAddresses()[0], action, error);
|
||||
stream.writeThrowable(tx);
|
||||
|
||||
final byte[] data = stream.bytes().toBytes();
|
||||
|
|
|
@ -116,7 +116,9 @@ public class MessageChannelHandler extends SimpleChannelUpstreamHandler {
|
|||
} catch (NotCompressedException ex) {
|
||||
int maxToRead = Math.min(buffer.readableBytes(), 10);
|
||||
int offset = buffer.readerIndex();
|
||||
StringBuilder sb = new StringBuilder("stream marked as compressed, but no compressor found, first [").append(maxToRead).append("] content bytes out of [").append(buffer.readableBytes()).append("] readable bytes with message size [").append(size).append("] ").append("] are [");
|
||||
StringBuilder sb = new StringBuilder("stream marked as compressed, but no compressor found, first [").append(maxToRead)
|
||||
.append("] content bytes out of [").append(buffer.readableBytes())
|
||||
.append("] readable bytes with message size [").append(size).append("] ").append("] are [");
|
||||
for (int i = 0; i < maxToRead; i++) {
|
||||
sb.append(buffer.getByte(offset + i)).append(",");
|
||||
}
|
||||
|
@ -134,15 +136,17 @@ public class MessageChannelHandler extends SimpleChannelUpstreamHandler {
|
|||
final int nextByte = streamIn.read();
|
||||
// calling read() is useful to make sure the message is fully read, even if there some kind of EOS marker
|
||||
if (nextByte != -1) {
|
||||
throw new IllegalStateException("Message not fully read (request) for requestId [" + requestId + "], action ["
|
||||
+ action + "], readerIndex [" + buffer.readerIndex() + "] vs expected [" + expectedIndexReader + "]; resetting");
|
||||
throw new IllegalStateException("Message not fully read (request) for requestId [" + requestId + "], action [" + action
|
||||
+ "], readerIndex [" + buffer.readerIndex() + "] vs expected [" + expectedIndexReader + "]; resetting");
|
||||
}
|
||||
if (buffer.readerIndex() < expectedIndexReader) {
|
||||
throw new IllegalStateException("Message is fully read (request), yet there are " + (expectedIndexReader - buffer.readerIndex()) + " remaining bytes; resetting");
|
||||
throw new IllegalStateException("Message is fully read (request), yet there are "
|
||||
+ (expectedIndexReader - buffer.readerIndex()) + " remaining bytes; resetting");
|
||||
}
|
||||
if (buffer.readerIndex() > expectedIndexReader) {
|
||||
throw new IllegalStateException("Message read past expected size (request) for requestId [" + requestId + "], action ["
|
||||
+ action + "], readerIndex [" + buffer.readerIndex() + "] vs expected [" + expectedIndexReader + "]; resetting");
|
||||
throw new IllegalStateException(
|
||||
"Message read past expected size (request) for requestId [" + requestId + "], action [" + action
|
||||
+ "], readerIndex [" + buffer.readerIndex() + "] vs expected [" + expectedIndexReader + "]; resetting");
|
||||
}
|
||||
|
||||
} else {
|
||||
|
@ -163,11 +167,12 @@ public class MessageChannelHandler extends SimpleChannelUpstreamHandler {
|
|||
+ handler + "], error [" + TransportStatus.isError(status) + "]; resetting");
|
||||
}
|
||||
if (buffer.readerIndex() < expectedIndexReader) {
|
||||
throw new IllegalStateException("Message is fully read (response), yet there are " + (expectedIndexReader - buffer.readerIndex()) + " remaining bytes; resetting");
|
||||
throw new IllegalStateException("Message is fully read (response), yet there are "
|
||||
+ (expectedIndexReader - buffer.readerIndex()) + " remaining bytes; resetting");
|
||||
}
|
||||
if (buffer.readerIndex() > expectedIndexReader) {
|
||||
throw new IllegalStateException("Message read past expected size (response) for requestId [" + requestId + "], handler ["
|
||||
+ handler + "], error [" + TransportStatus.isError(status) + "]; resetting");
|
||||
throw new IllegalStateException("Message read past expected size (response) for requestId [" + requestId
|
||||
+ "], handler [" + handler + "], error [" + TransportStatus.isError(status) + "]; resetting");
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -193,7 +198,8 @@ public class MessageChannelHandler extends SimpleChannelUpstreamHandler {
|
|||
try {
|
||||
response.readFrom(buffer);
|
||||
} catch (Throwable e) {
|
||||
handleException(handler, new TransportSerializationException("Failed to deserialize response of type [" + response.getClass().getName() + "]", e));
|
||||
handleException(handler, new TransportSerializationException(
|
||||
"Failed to deserialize response of type [" + response.getClass().getName() + "]", e));
|
||||
return;
|
||||
}
|
||||
try {
|
||||
|
@ -247,7 +253,8 @@ public class MessageChannelHandler extends SimpleChannelUpstreamHandler {
|
|||
buffer = new NamedWriteableAwareStreamInput(buffer, transport.namedWriteableRegistry);
|
||||
final String action = buffer.readString();
|
||||
transportServiceAdapter.onRequestReceived(requestId, action);
|
||||
final NettyTransportChannel transportChannel = new NettyTransportChannel(transport, transportServiceAdapter, action, channel, requestId, version, profileName);
|
||||
final NettyTransportChannel transportChannel = new NettyTransportChannel(transport, transportServiceAdapter, action, channel,
|
||||
requestId, version, profileName);
|
||||
try {
|
||||
final RequestHandlerRegistry reg = transportServiceAdapter.getRequestHandler(action);
|
||||
if (reg == null) {
|
||||
|
|
|
@ -42,6 +42,7 @@ import org.elasticsearch.common.network.NetworkService;
|
|||
import org.elasticsearch.common.network.NetworkService.TcpSettings;
|
||||
import org.elasticsearch.common.network.NetworkUtils;
|
||||
import org.elasticsearch.common.settings.Setting;
|
||||
import org.elasticsearch.common.settings.Setting.Scope;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.transport.BoundTransportAddress;
|
||||
import org.elasticsearch.common.transport.InetSocketTransportAddress;
|
||||
|
@ -119,6 +120,10 @@ import java.util.regex.Matcher;
|
|||
import java.util.regex.Pattern;
|
||||
|
||||
import static java.util.Collections.unmodifiableMap;
|
||||
import static org.elasticsearch.common.settings.Setting.boolSetting;
|
||||
import static org.elasticsearch.common.settings.Setting.byteSizeSetting;
|
||||
import static org.elasticsearch.common.settings.Setting.intSetting;
|
||||
import static org.elasticsearch.common.settings.Setting.timeSetting;
|
||||
import static org.elasticsearch.common.settings.Settings.settingsBuilder;
|
||||
import static org.elasticsearch.common.transport.NetworkExceptionHelper.isCloseConnectionException;
|
||||
import static org.elasticsearch.common.transport.NetworkExceptionHelper.isConnectException;
|
||||
|
@ -143,21 +148,33 @@ public class NettyTransport extends AbstractLifecycleComponent<Transport> implem
|
|||
public static final String TRANSPORT_CLIENT_BOSS_THREAD_NAME_PREFIX = "transport_client_boss";
|
||||
|
||||
public static final Setting<Integer> WORKER_COUNT = new Setting<>("transport.netty.worker_count",
|
||||
(s) -> Integer.toString(EsExecutors.boundedNumberOfProcessors(s) * 2), (s) -> Setting.parseInt(s, 1, "transport.netty.worker_count"),
|
||||
false, Setting.Scope.CLUSTER);
|
||||
public static final Setting<Integer> CONNECTIONS_PER_NODE_RECOVERY = Setting.intSetting("transport.connections_per_node.recovery", 2, 1, false, Setting.Scope.CLUSTER);
|
||||
public static final Setting<Integer> CONNECTIONS_PER_NODE_BULK = Setting.intSetting("transport.connections_per_node.bulk", 3, 1, false, Setting.Scope.CLUSTER);
|
||||
public static final Setting<Integer> CONNECTIONS_PER_NODE_REG = Setting.intSetting("transport.connections_per_node.reg", 6, 1, false, Setting.Scope.CLUSTER);
|
||||
public static final Setting<Integer> CONNECTIONS_PER_NODE_STATE = Setting.intSetting("transport.connections_per_node.state", 1, 1, false, Setting.Scope.CLUSTER);
|
||||
public static final Setting<Integer> CONNECTIONS_PER_NODE_PING = Setting.intSetting("transport.connections_per_node.ping", 1, 1, false, Setting.Scope.CLUSTER);
|
||||
(s) -> Integer.toString(EsExecutors.boundedNumberOfProcessors(s) * 2),
|
||||
(s) -> Setting.parseInt(s, 1, "transport.netty.worker_count"), false, Setting.Scope.CLUSTER);
|
||||
public static final Setting<Integer> CONNECTIONS_PER_NODE_RECOVERY = intSetting("transport.connections_per_node.recovery", 2, 1, false,
|
||||
Scope.CLUSTER);
|
||||
public static final Setting<Integer> CONNECTIONS_PER_NODE_BULK = intSetting("transport.connections_per_node.bulk", 3, 1, false,
|
||||
Scope.CLUSTER);
|
||||
public static final Setting<Integer> CONNECTIONS_PER_NODE_REG = intSetting("transport.connections_per_node.reg", 6, 1, false,
|
||||
Scope.CLUSTER);
|
||||
public static final Setting<Integer> CONNECTIONS_PER_NODE_STATE = intSetting("transport.connections_per_node.state", 1, 1, false,
|
||||
Scope.CLUSTER);
|
||||
public static final Setting<Integer> CONNECTIONS_PER_NODE_PING = intSetting("transport.connections_per_node.ping", 1, 1, false,
|
||||
Scope.CLUSTER);
|
||||
// the scheduled internal ping interval setting, defaults to disabled (-1)
|
||||
public static final Setting<TimeValue> PING_SCHEDULE = Setting.timeSetting("transport.ping_schedule", TimeValue.timeValueSeconds(-1), false, Setting.Scope.CLUSTER);
|
||||
public static final Setting<Boolean> TCP_BLOCKING_CLIENT = Setting.boolSetting("transport.tcp.blocking_client", TcpSettings.TCP_BLOCKING_CLIENT, false, Setting.Scope.CLUSTER);
|
||||
public static final Setting<TimeValue> TCP_CONNECT_TIMEOUT = Setting.timeSetting("transport.tcp.connect_timeout", TcpSettings.TCP_CONNECT_TIMEOUT, false, Setting.Scope.CLUSTER);
|
||||
public static final Setting<Boolean> TCP_NO_DELAY = Setting.boolSetting("transport.tcp_no_delay", TcpSettings.TCP_NO_DELAY, false, Setting.Scope.CLUSTER);
|
||||
public static final Setting<Boolean> TCP_KEEP_ALIVE = Setting.boolSetting("transport.tcp.keep_alive", TcpSettings.TCP_KEEP_ALIVE, false, Setting.Scope.CLUSTER);
|
||||
public static final Setting<Boolean> TCP_BLOCKING_SERVER = Setting.boolSetting("transport.tcp.blocking_server", TcpSettings.TCP_BLOCKING_SERVER, false, Setting.Scope.CLUSTER);
|
||||
public static final Setting<Boolean> TCP_REUSE_ADDRESS = Setting.boolSetting("transport.tcp.reuse_address", TcpSettings.TCP_REUSE_ADDRESS, false, Setting.Scope.CLUSTER);
|
||||
public static final Setting<TimeValue> PING_SCHEDULE = timeSetting("transport.ping_schedule", TimeValue.timeValueSeconds(-1), false,
|
||||
Setting.Scope.CLUSTER);
|
||||
public static final Setting<Boolean> TCP_BLOCKING_CLIENT = boolSetting("transport.tcp.blocking_client", TcpSettings.TCP_BLOCKING_CLIENT,
|
||||
false, Setting.Scope.CLUSTER);
|
||||
public static final Setting<TimeValue> TCP_CONNECT_TIMEOUT = timeSetting("transport.tcp.connect_timeout",
|
||||
TcpSettings.TCP_CONNECT_TIMEOUT, false, Setting.Scope.CLUSTER);
|
||||
public static final Setting<Boolean> TCP_NO_DELAY = boolSetting("transport.tcp_no_delay", TcpSettings.TCP_NO_DELAY, false,
|
||||
Setting.Scope.CLUSTER);
|
||||
public static final Setting<Boolean> TCP_KEEP_ALIVE = boolSetting("transport.tcp.keep_alive", TcpSettings.TCP_KEEP_ALIVE, false,
|
||||
Setting.Scope.CLUSTER);
|
||||
public static final Setting<Boolean> TCP_BLOCKING_SERVER = boolSetting("transport.tcp.blocking_server", TcpSettings.TCP_BLOCKING_SERVER,
|
||||
false, Setting.Scope.CLUSTER);
|
||||
public static final Setting<Boolean> TCP_REUSE_ADDRESS = boolSetting("transport.tcp.reuse_address", TcpSettings.TCP_REUSE_ADDRESS,
|
||||
false, Setting.Scope.CLUSTER);
|
||||
|
||||
public static final Setting<ByteSizeValue> TCP_SEND_BUFFER_SIZE = Setting.byteSizeSetting("transport.tcp.send_buffer_size", TcpSettings.TCP_SEND_BUFFER_SIZE, false, Setting.Scope.CLUSTER);
|
||||
public static final Setting<ByteSizeValue> TCP_RECEIVE_BUFFER_SIZE = Setting.byteSizeSetting("transport.tcp.receive_buffer_size", TcpSettings.TCP_RECEIVE_BUFFER_SIZE, false, Setting.Scope.CLUSTER);
|
||||
|
@ -165,9 +182,9 @@ public class NettyTransport extends AbstractLifecycleComponent<Transport> implem
|
|||
public static final Setting<ByteSizeValue> NETTY_MAX_CUMULATION_BUFFER_CAPACITY = Setting.byteSizeSetting("transport.netty.max_cumulation_buffer_capacity", new ByteSizeValue(-1), false, Setting.Scope.CLUSTER);
|
||||
public static final Setting<Integer> NETTY_MAX_COMPOSITE_BUFFER_COMPONENTS = Setting.intSetting("transport.netty.max_composite_buffer_components", -1, -1, false, Setting.Scope.CLUSTER);
|
||||
|
||||
|
||||
// See AdaptiveReceiveBufferSizePredictor#DEFAULT_XXX for default values in netty..., we can use higher ones for us, even fixed one
|
||||
public static final Setting<ByteSizeValue> NETTY_RECEIVE_PREDICTOR_SIZE = Setting.byteSizeSetting("transport.netty.receive_predictor_size",
|
||||
public static final Setting<ByteSizeValue> NETTY_RECEIVE_PREDICTOR_SIZE = Setting.byteSizeSetting(
|
||||
"transport.netty.receive_predictor_size",
|
||||
settings -> {
|
||||
long defaultReceiverPredictor = 512 * 1024;
|
||||
if (JvmInfo.jvmInfo().getMem().getDirectMemoryMax().bytes() > 0) {
|
||||
|
@ -177,10 +194,11 @@ public class NettyTransport extends AbstractLifecycleComponent<Transport> implem
|
|||
}
|
||||
return new ByteSizeValue(defaultReceiverPredictor).toString();
|
||||
}, false, Setting.Scope.CLUSTER);
|
||||
public static final Setting<ByteSizeValue> NETTY_RECEIVE_PREDICTOR_MIN = Setting.byteSizeSetting("transport.netty.receive_predictor_min", NETTY_RECEIVE_PREDICTOR_SIZE, false, Setting.Scope.CLUSTER);
|
||||
public static final Setting<ByteSizeValue> NETTY_RECEIVE_PREDICTOR_MAX = Setting.byteSizeSetting("transport.netty.receive_predictor_max", NETTY_RECEIVE_PREDICTOR_SIZE, false, Setting.Scope.CLUSTER);
|
||||
public static final Setting<Integer> NETTY_BOSS_COUNT = Setting.intSetting("transport.netty.boss_count", 1, 1, false, Setting.Scope.CLUSTER);
|
||||
|
||||
public static final Setting<ByteSizeValue> NETTY_RECEIVE_PREDICTOR_MIN = byteSizeSetting("transport.netty.receive_predictor_min",
|
||||
NETTY_RECEIVE_PREDICTOR_SIZE, false, Scope.CLUSTER);
|
||||
public static final Setting<ByteSizeValue> NETTY_RECEIVE_PREDICTOR_MAX = byteSizeSetting("transport.netty.receive_predictor_max",
|
||||
NETTY_RECEIVE_PREDICTOR_SIZE, false, Scope.CLUSTER);
|
||||
public static final Setting<Integer> NETTY_BOSS_COUNT = intSetting("transport.netty.boss_count", 1, 1, false, Scope.CLUSTER);
|
||||
|
||||
protected final NetworkService networkService;
|
||||
protected final Version version;
|
||||
|
@ -226,7 +244,8 @@ public class NettyTransport extends AbstractLifecycleComponent<Transport> implem
|
|||
final ScheduledPing scheduledPing;
|
||||
|
||||
@Inject
|
||||
public NettyTransport(Settings settings, ThreadPool threadPool, NetworkService networkService, BigArrays bigArrays, Version version, NamedWriteableRegistry namedWriteableRegistry) {
|
||||
public NettyTransport(Settings settings, ThreadPool threadPool, NetworkService networkService, BigArrays bigArrays, Version version,
|
||||
NamedWriteableRegistry namedWriteableRegistry) {
|
||||
super(settings);
|
||||
this.threadPool = threadPool;
|
||||
this.networkService = networkService;
|
||||
|
@ -252,7 +271,8 @@ public class NettyTransport extends AbstractLifecycleComponent<Transport> implem
|
|||
if (receivePredictorMax.bytes() == receivePredictorMin.bytes()) {
|
||||
receiveBufferSizePredictorFactory = new FixedReceiveBufferSizePredictorFactory((int) receivePredictorMax.bytes());
|
||||
} else {
|
||||
receiveBufferSizePredictorFactory = new AdaptiveReceiveBufferSizePredictorFactory((int) receivePredictorMin.bytes(), (int) receivePredictorMin.bytes(), (int) receivePredictorMax.bytes());
|
||||
receiveBufferSizePredictorFactory = new AdaptiveReceiveBufferSizePredictorFactory((int) receivePredictorMin.bytes(),
|
||||
(int) receivePredictorMin.bytes(), (int) receivePredictorMax.bytes());
|
||||
}
|
||||
|
||||
this.scheduledPing = new ScheduledPing();
|
||||
|
@ -305,7 +325,8 @@ public class NettyTransport extends AbstractLifecycleComponent<Transport> implem
|
|||
String name = entry.getKey();
|
||||
|
||||
if (!Strings.hasLength(name)) {
|
||||
logger.info("transport profile configured without a name. skipping profile with settings [{}]", profileSettings.toDelimitedString(','));
|
||||
logger.info("transport profile configured without a name. skipping profile with settings [{}]",
|
||||
profileSettings.toDelimitedString(','));
|
||||
continue;
|
||||
} else if (TransportSettings.DEFAULT_PROFILE.equals(name)) {
|
||||
profileSettings = settingsBuilder()
|
||||
|
@ -345,13 +366,16 @@ public class NettyTransport extends AbstractLifecycleComponent<Transport> implem
|
|||
private ClientBootstrap createClientBootstrap() {
|
||||
|
||||
if (blockingClient) {
|
||||
clientBootstrap = new ClientBootstrap(new OioClientSocketChannelFactory(Executors.newCachedThreadPool(daemonThreadFactory(settings, TRANSPORT_CLIENT_WORKER_THREAD_NAME_PREFIX))));
|
||||
clientBootstrap = new ClientBootstrap(new OioClientSocketChannelFactory(
|
||||
Executors.newCachedThreadPool(daemonThreadFactory(settings, TRANSPORT_CLIENT_WORKER_THREAD_NAME_PREFIX))));
|
||||
} else {
|
||||
int bossCount = NETTY_BOSS_COUNT.get(settings);
|
||||
clientBootstrap = new ClientBootstrap(new NioClientSocketChannelFactory(
|
||||
clientBootstrap = new ClientBootstrap(
|
||||
new NioClientSocketChannelFactory(
|
||||
Executors.newCachedThreadPool(daemonThreadFactory(settings, TRANSPORT_CLIENT_BOSS_THREAD_NAME_PREFIX)),
|
||||
bossCount,
|
||||
new NioWorkerPool(Executors.newCachedThreadPool(daemonThreadFactory(settings, TRANSPORT_CLIENT_WORKER_THREAD_NAME_PREFIX)), workerCount),
|
||||
new NioWorkerPool(Executors.newCachedThreadPool(
|
||||
daemonThreadFactory(settings, TRANSPORT_CLIENT_WORKER_THREAD_NAME_PREFIX)), workerCount),
|
||||
new HashedWheelTimer(daemonThreadFactory(settings, "transport_client_timer"))));
|
||||
}
|
||||
clientBootstrap.setPipelineFactory(configureClientChannelPipelineFactory());
|
||||
|
@ -403,12 +427,14 @@ public class NettyTransport extends AbstractLifecycleComponent<Transport> implem
|
|||
boolean fallbackReuseAddress = settings.getAsBoolean("transport.netty.reuse_address", TcpSettings.TCP_REUSE_ADDRESS.get(settings));
|
||||
fallbackSettingsBuilder.put("reuse_address", fallbackReuseAddress);
|
||||
|
||||
ByteSizeValue fallbackTcpSendBufferSize = settings.getAsBytesSize("transport.netty.tcp_send_buffer_size", TcpSettings.TCP_SEND_BUFFER_SIZE.get(settings));
|
||||
ByteSizeValue fallbackTcpSendBufferSize = settings.getAsBytesSize("transport.netty.tcp_send_buffer_size",
|
||||
TCP_SEND_BUFFER_SIZE.get(settings));
|
||||
if (fallbackTcpSendBufferSize.bytes() >= 0) {
|
||||
fallbackSettingsBuilder.put("tcp_send_buffer_size", fallbackTcpSendBufferSize);
|
||||
}
|
||||
|
||||
ByteSizeValue fallbackTcpBufferSize = settings.getAsBytesSize("transport.netty.tcp_receive_buffer_size", TcpSettings.TCP_RECEIVE_BUFFER_SIZE.get(settings));
|
||||
ByteSizeValue fallbackTcpBufferSize = settings.getAsBytesSize("transport.netty.tcp_receive_buffer_size",
|
||||
TCP_RECEIVE_BUFFER_SIZE.get(settings));
|
||||
if (fallbackTcpBufferSize.bytes() >= 0) {
|
||||
fallbackSettingsBuilder.put("tcp_receive_buffer_size", fallbackTcpBufferSize);
|
||||
}
|
||||
|
@ -485,7 +511,8 @@ public class NettyTransport extends AbstractLifecycleComponent<Transport> implem
|
|||
return boundSocket.get();
|
||||
}
|
||||
|
||||
private BoundTransportAddress createBoundTransportAddress(String name, Settings profileSettings, List<InetSocketAddress> boundAddresses) {
|
||||
private BoundTransportAddress createBoundTransportAddress(String name, Settings profileSettings,
|
||||
List<InetSocketAddress> boundAddresses) {
|
||||
String[] boundAddressesHostStrings = new String[boundAddresses.size()];
|
||||
TransportAddress[] transportBoundAddresses = new TransportAddress[boundAddresses.size()];
|
||||
for (int i = 0; i < boundAddresses.size(); i++) {
|
||||
|
@ -531,7 +558,8 @@ public class NettyTransport extends AbstractLifecycleComponent<Transport> implem
|
|||
// TODO: In case of DEFAULT_PROFILE we should probably fail here, as publish address does not match any bound address
|
||||
// In case of a custom profile, we might use the publish address of the default profile
|
||||
publishPort = boundAddresses.get(0).getPort();
|
||||
logger.warn("Publish port not found by matching publish address [{}] to bound addresses [{}], falling back to port [{}] of first bound address", publishInetAddress, boundAddresses, publishPort);
|
||||
logger.warn("Publish port not found by matching publish address [{}] to bound addresses [{}], "
|
||||
+ "falling back to port [{}] of first bound address", publishInetAddress, boundAddresses, publishPort);
|
||||
}
|
||||
|
||||
final TransportAddress publishAddress = new InetSocketTransportAddress(new InetSocketAddress(publishInetAddress, publishPort));
|
||||
|
@ -549,8 +577,13 @@ public class NettyTransport extends AbstractLifecycleComponent<Transport> implem
|
|||
ByteSizeValue tcpSendBufferSize = TCP_SEND_BUFFER_SIZE.getDefault(settings);
|
||||
ByteSizeValue tcpReceiveBufferSize = TCP_RECEIVE_BUFFER_SIZE.getDefault(settings);
|
||||
|
||||
logger.debug("using profile[{}], worker_count[{}], port[{}], bind_host[{}], publish_host[{}], compress[{}], connect_timeout[{}], connections_per_node[{}/{}/{}/{}/{}], receive_predictor[{}->{}]",
|
||||
name, workerCount, port, bindHost, publishHost, compress, connectTimeout, connectionsPerNodeRecovery, connectionsPerNodeBulk, connectionsPerNodeReg, connectionsPerNodeState, connectionsPerNodePing, receivePredictorMin, receivePredictorMax);
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug("using profile[{}], worker_count[{}], port[{}], bind_host[{}], publish_host[{}], compress[{}], "
|
||||
+ "connect_timeout[{}], connections_per_node[{}/{}/{}/{}/{}], receive_predictor[{}->{}]",
|
||||
name, workerCount, port, bindHost, publishHost, compress, connectTimeout, connectionsPerNodeRecovery,
|
||||
connectionsPerNodeBulk, connectionsPerNodeReg, connectionsPerNodeState, connectionsPerNodePing, receivePredictorMin,
|
||||
receivePredictorMax);
|
||||
}
|
||||
|
||||
final ThreadFactory bossFactory = daemonThreadFactory(this.settings, HTTP_SERVER_BOSS_THREAD_NAME_PREFIX, name);
|
||||
final ThreadFactory workerFactory = daemonThreadFactory(this.settings, HTTP_SERVER_WORKER_THREAD_NAME_PREFIX, name);
|
||||
|
@ -739,7 +772,8 @@ public class NettyTransport extends AbstractLifecycleComponent<Transport> implem
|
|||
return;
|
||||
}
|
||||
if (isCloseConnectionException(e.getCause())) {
|
||||
logger.trace("close connection exception caught on transport layer [{}], disconnecting from relevant node", e.getCause(), ctx.getChannel());
|
||||
logger.trace("close connection exception caught on transport layer [{}], disconnecting from relevant node", e.getCause(),
|
||||
ctx.getChannel());
|
||||
// close the channel, which will cause a node to be disconnected if relevant
|
||||
ctx.getChannel().close();
|
||||
disconnectFromNodeChannel(ctx.getChannel(), e.getCause());
|
||||
|
@ -754,7 +788,8 @@ public class NettyTransport extends AbstractLifecycleComponent<Transport> implem
|
|||
ctx.getChannel().close();
|
||||
disconnectFromNodeChannel(ctx.getChannel(), e.getCause());
|
||||
} else if (e.getCause() instanceof CancelledKeyException) {
|
||||
logger.trace("cancelled key exception caught on transport layer [{}], disconnecting from relevant node", e.getCause(), ctx.getChannel());
|
||||
logger.trace("cancelled key exception caught on transport layer [{}], disconnecting from relevant node", e.getCause(),
|
||||
ctx.getChannel());
|
||||
// close the channel as safe measure, which will cause a node to be disconnected if relevant
|
||||
ctx.getChannel().close();
|
||||
disconnectFromNodeChannel(ctx.getChannel(), e.getCause());
|
||||
|
@ -800,7 +835,8 @@ public class NettyTransport extends AbstractLifecycleComponent<Transport> implem
|
|||
}
|
||||
|
||||
@Override
|
||||
public void sendRequest(final DiscoveryNode node, final long requestId, final String action, final TransportRequest request, TransportRequestOptions options) throws IOException, TransportException {
|
||||
public void sendRequest(final DiscoveryNode node, final long requestId, final String action, final TransportRequest request,
|
||||
TransportRequestOptions options) throws IOException, TransportException {
|
||||
|
||||
Channel targetChannel = nodeChannel(node, options);
|
||||
|
||||
|
@ -902,7 +938,9 @@ public class NettyTransport extends AbstractLifecycleComponent<Transport> implem
|
|||
if (light) {
|
||||
nodeChannels = connectToChannelsLight(node);
|
||||
} else {
|
||||
nodeChannels = new NodeChannels(new Channel[connectionsPerNodeRecovery], new Channel[connectionsPerNodeBulk], new Channel[connectionsPerNodeReg], new Channel[connectionsPerNodeState], new Channel[connectionsPerNodePing]);
|
||||
nodeChannels = new NodeChannels(new Channel[connectionsPerNodeRecovery], new Channel[connectionsPerNodeBulk],
|
||||
new Channel[connectionsPerNodeReg], new Channel[connectionsPerNodeState],
|
||||
new Channel[connectionsPerNodePing]);
|
||||
try {
|
||||
connectToChannels(nodeChannels, node);
|
||||
} catch (Throwable e) {
|
||||
|
|
|
@ -53,7 +53,8 @@ public class NettyTransportChannel implements TransportChannel {
|
|||
private final long requestId;
|
||||
private final String profileName;
|
||||
|
||||
public NettyTransportChannel(NettyTransport transport, TransportServiceAdapter transportServiceAdapter, String action, Channel channel, long requestId, Version version, String profileName) {
|
||||
public NettyTransportChannel(NettyTransport transport, TransportServiceAdapter transportServiceAdapter, String action, Channel channel,
|
||||
long requestId, Version version, String profileName) {
|
||||
this.transportServiceAdapter = transportServiceAdapter;
|
||||
this.version = version;
|
||||
this.transport = transport;
|
||||
|
@ -119,7 +120,8 @@ public class NettyTransportChannel implements TransportChannel {
|
|||
public void sendResponse(Throwable error) throws IOException {
|
||||
BytesStreamOutput stream = new BytesStreamOutput();
|
||||
stream.skip(NettyHeader.HEADER_SIZE);
|
||||
RemoteTransportException tx = new RemoteTransportException(transport.nodeName(), transport.wrapAddress(channel.getLocalAddress()), action, error);
|
||||
RemoteTransportException tx = new RemoteTransportException(transport.nodeName(), transport.wrapAddress(channel.getLocalAddress()),
|
||||
action, error);
|
||||
stream.writeThrowable(tx);
|
||||
byte status = 0;
|
||||
status = TransportStatus.setResponse(status);
|
||||
|
|
|
@ -80,8 +80,8 @@ public class SizeHeaderFrameDecoder extends FrameDecoder {
|
|||
}
|
||||
// safety against too large frames being sent
|
||||
if (dataLen > NINETY_PER_HEAP_SIZE) {
|
||||
throw new TooLongFrameException(
|
||||
"transport content length received [" + new ByteSizeValue(dataLen) + "] exceeded [" + new ByteSizeValue(NINETY_PER_HEAP_SIZE) + "]");
|
||||
throw new TooLongFrameException("transport content length received [" + new ByteSizeValue(dataLen) + "] exceeded ["
|
||||
+ new ByteSizeValue(NINETY_PER_HEAP_SIZE) + "]");
|
||||
}
|
||||
|
||||
if (buffer.readableBytes() < dataLen + 6) {
|
||||
|
|
|
@ -20,6 +20,7 @@
|
|||
package org.elasticsearch.tribe;
|
||||
|
||||
import com.carrotsearch.hppc.cursors.ObjectObjectCursor;
|
||||
|
||||
import org.elasticsearch.ElasticsearchException;
|
||||
import org.elasticsearch.action.support.master.TransportMasterNodeReadAction;
|
||||
import org.elasticsearch.cluster.ClusterChangedEvent;
|
||||
|
@ -83,8 +84,10 @@ import static java.util.Collections.unmodifiableMap;
|
|||
*/
|
||||
public class TribeService extends AbstractLifecycleComponent<TribeService> {
|
||||
|
||||
public static final ClusterBlock TRIBE_METADATA_BLOCK = new ClusterBlock(10, "tribe node, metadata not allowed", false, false, RestStatus.BAD_REQUEST, EnumSet.of(ClusterBlockLevel.METADATA_READ, ClusterBlockLevel.METADATA_WRITE));
|
||||
public static final ClusterBlock TRIBE_WRITE_BLOCK = new ClusterBlock(11, "tribe node, write not allowed", false, false, RestStatus.BAD_REQUEST, EnumSet.of(ClusterBlockLevel.WRITE));
|
||||
public static final ClusterBlock TRIBE_METADATA_BLOCK = new ClusterBlock(10, "tribe node, metadata not allowed", false, false,
|
||||
RestStatus.BAD_REQUEST, EnumSet.of(ClusterBlockLevel.METADATA_READ, ClusterBlockLevel.METADATA_WRITE));
|
||||
public static final ClusterBlock TRIBE_WRITE_BLOCK = new ClusterBlock(11, "tribe node, write not allowed", false, false,
|
||||
RestStatus.BAD_REQUEST, EnumSet.of(ClusterBlockLevel.WRITE));
|
||||
|
||||
public static Settings processSettings(Settings settings) {
|
||||
if (TRIBE_NAME_SETTING.exists(settings)) {
|
||||
|
@ -106,7 +109,8 @@ public class TribeService extends AbstractLifecycleComponent<TribeService> {
|
|||
Settings.Builder sb = Settings.builder().put(settings);
|
||||
sb.put(Node.NODE_CLIENT_SETTING.getKey(), true); // this node should just act as a node client
|
||||
sb.put(DiscoveryModule.DISCOVERY_TYPE_SETTING.getKey(), "local"); // a tribe node should not use zen discovery
|
||||
sb.put(DiscoveryService.INITIAL_STATE_TIMEOUT_SETTING.getKey(), 0); // nothing is going to be discovered, since no master will be elected
|
||||
// nothing is going to be discovered, since no master will be elected
|
||||
sb.put(DiscoveryService.INITIAL_STATE_TIMEOUT_SETTING.getKey(), 0);
|
||||
if (sb.get("cluster.name") == null) {
|
||||
sb.put("cluster.name", "tribe_" + Strings.randomBase64UUID()); // make sure it won't join other tribe nodes in the same JVM
|
||||
}
|
||||
|
@ -114,7 +118,8 @@ public class TribeService extends AbstractLifecycleComponent<TribeService> {
|
|||
return sb.build();
|
||||
}
|
||||
|
||||
private static final Setting<String> TRIBE_NAME_SETTING = Setting.simpleString("tribe.name", false, Setting.Scope.CLUSTER); // internal settings only
|
||||
// internal settings only
|
||||
private static final Setting<String> TRIBE_NAME_SETTING = Setting.simpleString("tribe.name", false, Setting.Scope.CLUSTER);
|
||||
private final ClusterService clusterService;
|
||||
private final String[] blockIndicesWrite;
|
||||
private final String[] blockIndicesRead;
|
||||
|
@ -125,14 +130,20 @@ public class TribeService extends AbstractLifecycleComponent<TribeService> {
|
|||
if (ON_CONFLICT_ANY.equals(s) || ON_CONFLICT_DROP.equals(s) || s.startsWith(ON_CONFLICT_PREFER)) {
|
||||
return s;
|
||||
}
|
||||
throw new IllegalArgumentException("Invalid value for [tribe.on_conflict] must be either [any, drop or start with prefer_] but was: " +s);
|
||||
throw new IllegalArgumentException(
|
||||
"Invalid value for [tribe.on_conflict] must be either [any, drop or start with prefer_] but was: " + s);
|
||||
}, false, Setting.Scope.CLUSTER);
|
||||
|
||||
public static final Setting<Boolean> BLOCKS_METADATA_SETTING = Setting.boolSetting("tribe.blocks.metadata", false, false, Setting.Scope.CLUSTER);
|
||||
public static final Setting<Boolean> BLOCKS_WRITE_SETTING = Setting.boolSetting("tribe.blocks.write", false, false, Setting.Scope.CLUSTER);
|
||||
public static final Setting<List<String>> BLOCKS_WRITE_INDICES_SETTING = Setting.listSetting("tribe.blocks.write.indices", Collections.emptyList(), Function.identity(), false, Setting.Scope.CLUSTER);
|
||||
public static final Setting<List<String>> BLOCKS_READ_INDICES_SETTING = Setting.listSetting("tribe.blocks.read.indices", Collections.emptyList(), Function.identity(), false, Setting.Scope.CLUSTER);
|
||||
public static final Setting<List<String>> BLOCKS_METADATA_INDICES_SETTING = Setting.listSetting("tribe.blocks.metadata.indices", Collections.emptyList(), Function.identity(), false, Setting.Scope.CLUSTER);
|
||||
public static final Setting<Boolean> BLOCKS_METADATA_SETTING = Setting.boolSetting("tribe.blocks.metadata", false, false,
|
||||
Setting.Scope.CLUSTER);
|
||||
public static final Setting<Boolean> BLOCKS_WRITE_SETTING = Setting.boolSetting("tribe.blocks.write", false, false,
|
||||
Setting.Scope.CLUSTER);
|
||||
public static final Setting<List<String>> BLOCKS_WRITE_INDICES_SETTING = Setting.listSetting("tribe.blocks.write.indices",
|
||||
Collections.emptyList(), Function.identity(), false, Setting.Scope.CLUSTER);
|
||||
public static final Setting<List<String>> BLOCKS_READ_INDICES_SETTING = Setting.listSetting("tribe.blocks.read.indices",
|
||||
Collections.emptyList(), Function.identity(), false, Setting.Scope.CLUSTER);
|
||||
public static final Setting<List<String>> BLOCKS_METADATA_INDICES_SETTING = Setting.listSetting("tribe.blocks.metadata.indices",
|
||||
Collections.emptyList(), Function.identity(), false, Setting.Scope.CLUSTER);
|
||||
|
||||
private final String onConflict;
|
||||
private final Set<String> droppedIndices = ConcurrentCollections.newConcurrentSet();
|
||||
|
@ -304,7 +315,8 @@ public class TribeService extends AbstractLifecycleComponent<TribeService> {
|
|||
tribeAttr.put(attr.key, attr.value);
|
||||
}
|
||||
tribeAttr.put(TRIBE_NAME_SETTING.getKey(), tribeName);
|
||||
DiscoveryNode discoNode = new DiscoveryNode(tribe.name(), tribe.id(), tribe.getHostName(), tribe.getHostAddress(), tribe.address(), unmodifiableMap(tribeAttr), tribe.version());
|
||||
DiscoveryNode discoNode = new DiscoveryNode(tribe.name(), tribe.id(), tribe.getHostName(), tribe.getHostAddress(),
|
||||
tribe.address(), unmodifiableMap(tribeAttr), tribe.version());
|
||||
clusterStateChanged = true;
|
||||
logger.info("[{}] adding node [{}]", tribeName, discoNode);
|
||||
nodes.put(discoNode);
|
||||
|
@ -328,7 +340,8 @@ public class TribeService extends AbstractLifecycleComponent<TribeService> {
|
|||
// always make sure to update the metadata and routing table, in case
|
||||
// there are changes in them (new mapping, shards moving from initializing to started)
|
||||
routingTable.add(tribeState.routingTable().index(index.getIndex()));
|
||||
Settings tribeSettings = Settings.builder().put(tribeIndex.getSettings()).put(TRIBE_NAME_SETTING.getKey(), tribeName).build();
|
||||
Settings tribeSettings = Settings.builder().put(tribeIndex.getSettings())
|
||||
.put(TRIBE_NAME_SETTING.getKey(), tribeName).build();
|
||||
metaData.put(IndexMetaData.builder(tribeIndex).settings(tribeSettings));
|
||||
}
|
||||
}
|
||||
|
@ -357,7 +370,8 @@ public class TribeService extends AbstractLifecycleComponent<TribeService> {
|
|||
} else if (ON_CONFLICT_DROP.equals(onConflict)) {
|
||||
// drop the indices, there is a conflict
|
||||
clusterStateChanged = true;
|
||||
logger.info("[{}] dropping index {} due to conflict with [{}]", tribeName, tribeIndex.getIndex(), existingFromTribe);
|
||||
logger.info("[{}] dropping index {} due to conflict with [{}]", tribeName, tribeIndex.getIndex(),
|
||||
existingFromTribe);
|
||||
removeIndex(blocks, metaData, routingTable, tribeIndex);
|
||||
droppedIndices.add(tribeIndex.getIndex().getName());
|
||||
} else if (onConflict.startsWith(ON_CONFLICT_PREFER)) {
|
||||
|
@ -366,7 +380,8 @@ public class TribeService extends AbstractLifecycleComponent<TribeService> {
|
|||
if (tribeName.equals(preferredTribeName)) {
|
||||
// the new one is hte preferred one, replace...
|
||||
clusterStateChanged = true;
|
||||
logger.info("[{}] adding index {}, preferred over [{}]", tribeName, tribeIndex.getIndex(), existingFromTribe);
|
||||
logger.info("[{}] adding index {}, preferred over [{}]", tribeName, tribeIndex.getIndex(),
|
||||
existingFromTribe);
|
||||
removeIndex(blocks, metaData, routingTable, tribeIndex);
|
||||
addNewIndex(tribeState, blocks, metaData, routingTable, tribeIndex);
|
||||
} // else: either the existing one is the preferred one, or we haven't seen one, carry on
|
||||
|
@ -378,17 +393,20 @@ public class TribeService extends AbstractLifecycleComponent<TribeService> {
|
|||
if (!clusterStateChanged) {
|
||||
return currentState;
|
||||
} else {
|
||||
return ClusterState.builder(currentState).incrementVersion().blocks(blocks).nodes(nodes).metaData(metaData).routingTable(routingTable.build()).build();
|
||||
return ClusterState.builder(currentState).incrementVersion().blocks(blocks).nodes(nodes).metaData(metaData)
|
||||
.routingTable(routingTable.build()).build();
|
||||
}
|
||||
}
|
||||
|
||||
private void removeIndex(ClusterBlocks.Builder blocks, MetaData.Builder metaData, RoutingTable.Builder routingTable, IndexMetaData index) {
|
||||
private void removeIndex(ClusterBlocks.Builder blocks, MetaData.Builder metaData, RoutingTable.Builder routingTable,
|
||||
IndexMetaData index) {
|
||||
metaData.remove(index.getIndex().getName());
|
||||
routingTable.remove(index.getIndex().getName());
|
||||
blocks.removeIndexBlocks(index.getIndex().getName());
|
||||
}
|
||||
|
||||
private void addNewIndex(ClusterState tribeState, ClusterBlocks.Builder blocks, MetaData.Builder metaData, RoutingTable.Builder routingTable, IndexMetaData tribeIndex) {
|
||||
private void addNewIndex(ClusterState tribeState, ClusterBlocks.Builder blocks, MetaData.Builder metaData,
|
||||
RoutingTable.Builder routingTable, IndexMetaData tribeIndex) {
|
||||
Settings tribeSettings = Settings.builder().put(tribeIndex.getSettings()).put(TRIBE_NAME_SETTING.getKey(), tribeName).build();
|
||||
metaData.put(IndexMetaData.builder(tribeIndex).settings(tribeSettings));
|
||||
routingTable.add(tribeState.routingTable().index(tribeIndex.getIndex()));
|
||||
|
|
|
@ -83,7 +83,8 @@ public class ResourceWatcherService extends AbstractLifecycleComponent<ResourceW
|
|||
|
||||
TimeValue interval = settings.getAsTime("resource.reload.interval.low", Frequency.LOW.interval);
|
||||
lowMonitor = new ResourceMonitor(interval, Frequency.LOW);
|
||||
interval = settings.getAsTime("resource.reload.interval.medium", settings.getAsTime("resource.reload.interval", Frequency.MEDIUM.interval));
|
||||
interval = settings.getAsTime("resource.reload.interval.medium",
|
||||
settings.getAsTime("resource.reload.interval", Frequency.MEDIUM.interval));
|
||||
mediumMonitor = new ResourceMonitor(interval, Frequency.MEDIUM);
|
||||
interval = settings.getAsTime("resource.reload.interval.high", Frequency.HIGH.interval);
|
||||
highMonitor = new ResourceMonitor(interval, Frequency.HIGH);
|
||||
|
|
Loading…
Reference in New Issue