DeGuice some of IndicesModule

UpdateHelper, MetaDataIndexUpgradeService, and some recovery
stuff.

Move ClusterSettings to nullable ctor parameter of TransportService
so it isn't forgotten.
This commit is contained in:
Nik Everett 2016-10-06 16:04:45 -04:00
parent 6cf7a93837
commit cf4038b668
51 changed files with 330 additions and 272 deletions

View File

@ -28,7 +28,6 @@ import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.common.settings.Settings;
@ -64,10 +63,8 @@ import java.util.Map;
* Helper for translating an update request to an index, delete request or update response.
*/
public class UpdateHelper extends AbstractComponent {
private final ScriptService scriptService;
@Inject
public UpdateHelper(Settings settings, ScriptService scriptService) {
super(settings);
this.scriptService = scriptService;
@ -76,7 +73,6 @@ public class UpdateHelper extends AbstractComponent {
/**
* Prepares an update request by converting it into an index or delete request or an update response (no action).
*/
@SuppressWarnings("unchecked")
public Result prepare(UpdateRequest request, IndexShard indexShard) {
final GetResult getResult = indexShard.getService().get(request.type(), request.id(),
new String[]{RoutingFieldMapper.NAME, ParentFieldMapper.NAME, TTLFieldMapper.NAME, TimestampFieldMapper.NAME},

View File

@ -151,7 +151,7 @@ public abstract class TransportClient extends AbstractClient {
bigArrays, circuitBreakerService, namedWriteableRegistry, networkService);
final Transport transport = networkModule.getTransportSupplier().get();
final TransportService transportService = new TransportService(settings, transport, threadPool,
networkModule.getTransportInterceptor());
networkModule.getTransportInterceptor(), null);
modules.add((b -> {
b.bind(BigArrays.class).toInstance(bigArrays);
b.bind(PluginsService.class).toInstance(pluginsService);

View File

@ -22,8 +22,6 @@ package org.elasticsearch.indices;
import org.elasticsearch.action.admin.indices.rollover.Condition;
import org.elasticsearch.action.admin.indices.rollover.MaxAgeCondition;
import org.elasticsearch.action.admin.indices.rollover.MaxDocsCondition;
import org.elasticsearch.action.update.UpdateHelper;
import org.elasticsearch.cluster.metadata.MetaDataIndexUpgradeService;
import org.elasticsearch.common.geo.ShapesAvailability;
import org.elasticsearch.common.inject.AbstractModule;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry.Entry;
@ -36,6 +34,8 @@ import org.elasticsearch.index.mapper.DateFieldMapper;
import org.elasticsearch.index.mapper.FieldNamesFieldMapper;
import org.elasticsearch.index.mapper.GeoPointFieldMapper;
import org.elasticsearch.index.mapper.GeoShapeFieldMapper;
import org.elasticsearch.index.mapper.IdFieldMapper;
import org.elasticsearch.index.mapper.IndexFieldMapper;
import org.elasticsearch.index.mapper.IpFieldMapper;
import org.elasticsearch.index.mapper.KeywordFieldMapper;
import org.elasticsearch.index.mapper.LatLonPointFieldMapper;
@ -43,26 +43,24 @@ import org.elasticsearch.index.mapper.Mapper;
import org.elasticsearch.index.mapper.MetadataFieldMapper;
import org.elasticsearch.index.mapper.NumberFieldMapper;
import org.elasticsearch.index.mapper.ObjectMapper;
import org.elasticsearch.index.mapper.TextFieldMapper;
import org.elasticsearch.index.mapper.TokenCountFieldMapper;
import org.elasticsearch.index.mapper.ScaledFloatFieldMapper;
import org.elasticsearch.index.mapper.StringFieldMapper;
import org.elasticsearch.index.mapper.IdFieldMapper;
import org.elasticsearch.index.mapper.IndexFieldMapper;
import org.elasticsearch.index.mapper.ParentFieldMapper;
import org.elasticsearch.index.mapper.RoutingFieldMapper;
import org.elasticsearch.index.mapper.ScaledFloatFieldMapper;
import org.elasticsearch.index.mapper.SourceFieldMapper;
import org.elasticsearch.index.mapper.TimestampFieldMapper;
import org.elasticsearch.index.mapper.StringFieldMapper;
import org.elasticsearch.index.mapper.TTLFieldMapper;
import org.elasticsearch.index.mapper.TextFieldMapper;
import org.elasticsearch.index.mapper.TimestampFieldMapper;
import org.elasticsearch.index.mapper.TokenCountFieldMapper;
import org.elasticsearch.index.mapper.TypeFieldMapper;
import org.elasticsearch.index.mapper.UidFieldMapper;
import org.elasticsearch.index.mapper.VersionFieldMapper;
import org.elasticsearch.indices.cluster.IndicesClusterStateService;
import org.elasticsearch.indices.flush.SyncedFlushService;
import org.elasticsearch.indices.mapper.MapperRegistry;
import org.elasticsearch.indices.recovery.RecoverySettings;
import org.elasticsearch.indices.recovery.PeerRecoverySourceService;
import org.elasticsearch.indices.recovery.PeerRecoveryTargetService;
import org.elasticsearch.indices.recovery.RecoverySettings;
import org.elasticsearch.indices.store.IndicesStore;
import org.elasticsearch.indices.store.TransportNodesListShardStoreMetaData;
import org.elasticsearch.indices.ttl.IndicesTTLService;
@ -78,16 +76,11 @@ import java.util.Map;
* Configures classes and services that are shared by indices on each node.
*/
public class IndicesModule extends AbstractModule {
private final Map<String, Mapper.TypeParser> mapperParsers;
private final Map<String, MetadataFieldMapper.TypeParser> metadataMapperParsers;
private final MapperRegistry mapperRegistry;
private final List<Entry> namedWritables = new ArrayList<>();
private final MapperRegistry mapperRegistry;
public IndicesModule(List<MapperPlugin> mapperPlugins) {
this.mapperParsers = getMappers(mapperPlugins);
this.metadataMapperParsers = getMetadataMappers(mapperPlugins);
this.mapperRegistry = new MapperRegistry(mapperParsers, metadataMapperParsers);
this.mapperRegistry = new MapperRegistry(getMappers(mapperPlugins), getMetadataMappers(mapperPlugins));
registerBuiltinWritables();
}
@ -172,27 +165,18 @@ public class IndicesModule extends AbstractModule {
@Override
protected void configure() {
bindMapperExtension();
bind(RecoverySettings.class).asEagerSingleton();
bind(PeerRecoveryTargetService.class).asEagerSingleton();
bind(PeerRecoverySourceService.class).asEagerSingleton();
bind(IndicesStore.class).asEagerSingleton();
bind(IndicesClusterStateService.class).asEagerSingleton();
bind(SyncedFlushService.class).asEagerSingleton();
bind(TransportNodesListShardStoreMetaData.class).asEagerSingleton();
bind(IndicesTTLService.class).asEagerSingleton();
bind(UpdateHelper.class).asEagerSingleton();
bind(MetaDataIndexUpgradeService.class).asEagerSingleton();
bind(NodeServicesProvider.class).asEagerSingleton();
}
// public for testing
/**
* A registry for all field mappers.
*/
public MapperRegistry getMapperRegistry() {
return mapperRegistry;
}
protected void bindMapperExtension() {
bind(MapperRegistry.class).toInstance(getMapperRegistry());
}
}

View File

@ -33,7 +33,6 @@ import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
@ -91,7 +90,6 @@ public class PeerRecoveryTargetService extends AbstractComponent implements Inde
private final RecoveriesCollection onGoingRecoveries;
@Inject
public PeerRecoveryTargetService(Settings settings, ThreadPool threadPool, TransportService transportService, RecoverySettings
recoverySettings, ClusterService clusterService) {
super(settings);

View File

@ -22,7 +22,6 @@ package org.elasticsearch.indices.recovery;
import org.apache.lucene.store.RateLimiter;
import org.apache.lucene.store.RateLimiter.SimpleRateLimiter;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Setting.Property;
@ -85,7 +84,6 @@ public class RecoverySettings extends AbstractComponent {
private volatile ByteSizeValue chunkSize = DEFAULT_CHUNK_SIZE;
@Inject
public RecoverySettings(Settings settings, ClusterSettings clusterSettings) {
super(settings);
@ -142,7 +140,7 @@ public class RecoverySettings extends AbstractComponent {
public ByteSizeValue getChunkSize() { return chunkSize; }
void setChunkSize(ByteSizeValue chunkSize) { // only settable for tests
public void setChunkSize(ByteSizeValue chunkSize) { // only settable for tests
if (chunkSize.bytesAsInt() <= 0) {
throw new IllegalArgumentException("chunkSize must be > 0");
}

View File

@ -32,6 +32,7 @@ import org.elasticsearch.Version;
import org.elasticsearch.action.ActionModule;
import org.elasticsearch.action.GenericAction;
import org.elasticsearch.action.support.TransportAction;
import org.elasticsearch.action.update.UpdateHelper;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.cluster.ClusterModule;
@ -41,6 +42,7 @@ import org.elasticsearch.cluster.MasterNodeChangePredicate;
import org.elasticsearch.cluster.NodeConnectionsService;
import org.elasticsearch.cluster.action.index.MappingUpdatedAction;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.metadata.MetaDataIndexUpgradeService;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.RoutingService;
import org.elasticsearch.cluster.routing.allocation.AllocationService;
@ -91,6 +93,9 @@ import org.elasticsearch.indices.breaker.HierarchyCircuitBreakerService;
import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
import org.elasticsearch.indices.cluster.IndicesClusterStateService;
import org.elasticsearch.indices.query.IndicesQueriesRegistry;
import org.elasticsearch.indices.recovery.PeerRecoverySourceService;
import org.elasticsearch.indices.recovery.PeerRecoveryTargetService;
import org.elasticsearch.indices.recovery.RecoverySettings;
import org.elasticsearch.indices.store.IndicesStore;
import org.elasticsearch.indices.ttl.IndicesTTLService;
import org.elasticsearch.ingest.IngestService;
@ -374,7 +379,7 @@ public class Node implements Closeable {
final MetaDataUpgrader metaDataUpgrader = new MetaDataUpgrader(customMetaDataUpgraders);
final Transport transport = networkModule.getTransportSupplier().get();
final TransportService transportService = newTransportService(settings, transport, threadPool,
networkModule.getTransportInterceptor());
networkModule.getTransportInterceptor(), settingsModule.getClusterSettings());
final Consumer<Binder> httpBind;
if (networkModule.isHttpEnabled()) {
HttpServerTransport httpServerTransport = networkModule.getHttpServerTransportSupplier().get();
@ -416,6 +421,17 @@ public class Node implements Closeable {
b.bind(TransportService.class).toInstance(transportService);
b.bind(NetworkService.class).toInstance(networkService);
b.bind(AllocationCommandRegistry.class).toInstance(NetworkModule.getAllocationCommandRegistry());
b.bind(UpdateHelper.class).toInstance(new UpdateHelper(settings, scriptModule.getScriptService()));
b.bind(MetaDataIndexUpgradeService.class).toInstance(new MetaDataIndexUpgradeService(settings,
indicesModule.getMapperRegistry(), settingsModule.getIndexScopedSettings()));
{
RecoverySettings recoverySettings = new RecoverySettings(settings, settingsModule.getClusterSettings());
processRecoverySettings(settingsModule.getClusterSettings(), recoverySettings);
b.bind(PeerRecoverySourceService.class).toInstance(new PeerRecoverySourceService(settings, transportService,
indicesService, recoverySettings, clusterService));
b.bind(PeerRecoveryTargetService.class).toInstance(new PeerRecoveryTargetService(settings, threadPool,
transportService, recoverySettings, clusterService));
}
httpBind.accept(b);
pluginComponents.stream().forEach(p -> b.bind((Class) p.getClass()).toInstance(p));
}
@ -458,8 +474,12 @@ public class Node implements Closeable {
}
protected TransportService newTransportService(Settings settings, Transport transport, ThreadPool threadPool,
TransportInterceptor interceptor) {
return new TransportService(settings, transport, threadPool, interceptor);
TransportInterceptor interceptor, ClusterSettings clusterSettings) {
return new TransportService(settings, transport, threadPool, interceptor, clusterSettings);
}
protected void processRecoverySettings(ClusterSettings clusterSettings, RecoverySettings recoverySettings) {
// Noop in production, overridden by tests
}
/**

View File

@ -25,10 +25,10 @@ import org.elasticsearch.Version;
import org.elasticsearch.action.admin.cluster.node.liveness.TransportLivenessAction;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.collect.MapBuilder;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.logging.Loggers;
@ -119,8 +119,14 @@ public class TransportService extends AbstractLifecycleComponent {
/** if set will call requests sent to this id to shortcut and executed locally */
volatile DiscoveryNode localNode = null;
@Inject
public TransportService(Settings settings, Transport transport, ThreadPool threadPool, TransportInterceptor transportInterceptor) {
/**
* Build the service.
*
* @param clusterSettings if non null the the {@linkplain TransportService} will register with the {@link ClusterSettings} for settings
* updates for {@link #TRACE_LOG_EXCLUDE_SETTING} and {@link #TRACE_LOG_INCLUDE_SETTING}.
*/
public TransportService(Settings settings, Transport transport, ThreadPool threadPool, TransportInterceptor transportInterceptor,
@Nullable ClusterSettings clusterSettings) {
super(settings);
this.transport = transport;
this.threadPool = threadPool;
@ -132,6 +138,10 @@ public class TransportService extends AbstractLifecycleComponent {
taskManager = createTaskManager();
this.interceptor = transportInterceptor;
this.asyncSender = interceptor.interceptSender(this::sendRequestInternal);
if (clusterSettings != null) {
clusterSettings.addSettingsUpdateConsumer(TRACE_LOG_INCLUDE_SETTING, this::setTracerLogInclude);
clusterSettings.addSettingsUpdateConsumer(TRACE_LOG_EXCLUDE_SETTING, this::setTracerLogExclude);
}
}
/**
@ -159,13 +169,6 @@ public class TransportService extends AbstractLifecycleComponent {
return new TaskManager(settings);
}
// These need to be optional as they don't exist in the context of a transport client
@Inject(optional = true)
public void setDynamicSettings(ClusterSettings clusterSettings) {
clusterSettings.addSettingsUpdateConsumer(TRACE_LOG_INCLUDE_SETTING, this::setTracerLogInclude);
clusterSettings.addSettingsUpdateConsumer(TRACE_LOG_EXCLUDE_SETTING, this::setTracerLogExclude);
}
void setTracerLogInclude(List<String> tracerLogInclude) {
this.tracerLogInclude = tracerLogInclude.toArray(Strings.EMPTY_ARRAY);
}

View File

@ -18,13 +18,6 @@
*/
package org.elasticsearch.action.admin.cluster.node.tasks;
import java.io.IOException;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import org.elasticsearch.Version;
import org.elasticsearch.action.FailedNodeException;
import org.elasticsearch.action.admin.cluster.node.tasks.cancel.TransportCancelTasksAction;
@ -59,6 +52,13 @@ import org.junit.After;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import java.io.IOException;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import static java.util.Collections.emptyMap;
import static java.util.Collections.emptySet;
import static org.elasticsearch.test.ClusterServiceUtils.createClusterService;
@ -172,7 +172,7 @@ public abstract class TaskManagerTestCase extends ESTestCase {
transportService = new TransportService(settings,
new MockTcpTransport(settings, threadPool, BigArrays.NON_RECYCLING_INSTANCE, new NoneCircuitBreakerService(),
new NamedWriteableRegistry(Collections.emptyList()), new NetworkService(settings, Collections.emptyList())),
threadPool, TransportService.NOOP_TRANSPORT_INTERCEPTOR) {
threadPool, TransportService.NOOP_TRANSPORT_INTERCEPTOR, null) {
@Override
protected TaskManager createTaskManager() {
if (MockTaskManager.USE_MOCK_TASK_MANAGER_SETTING.get(settings)) {

View File

@ -88,7 +88,7 @@ public class TransportBulkActionTookTests extends ESTestCase {
private TransportBulkAction createAction(boolean controlled, AtomicLong expected) {
CapturingTransport capturingTransport = new CapturingTransport();
TransportService transportService = new TransportService(clusterService.getSettings(), capturingTransport, threadPool,
TransportService.NOOP_TRANSPORT_INTERCEPTOR);
TransportService.NOOP_TRANSPORT_INTERCEPTOR, null);
transportService.start();
transportService.acceptIncomingRequests();
IndexNameExpressionResolver resolver = new Resolver(Settings.EMPTY);

View File

@ -88,7 +88,7 @@ public class IngestProxyActionFilterTests extends ESTestCase {
ClusterService clusterService = mock(ClusterService.class);
when(clusterService.localNode()).thenReturn(localNode);
when(clusterService.state()).thenReturn(clusterState.build());
transportService = new TransportService(Settings.EMPTY, null, null, interceptor);
transportService = new TransportService(Settings.EMPTY, null, null, interceptor, null);
return new IngestProxyActionFilter(clusterService, transportService);
}

View File

@ -120,9 +120,10 @@ public class MainActionTests extends ESTestCase {
ClusterState state = ClusterState.builder(clusterName).blocks(blocks).build();
when(clusterService.state()).thenReturn(state);
TransportMainAction action = new TransportMainAction(settings, mock(ThreadPool.class), new TransportService(Settings.EMPTY,
null ,null, TransportService.NOOP_TRANSPORT_INTERCEPTOR),
mock(ActionFilters.class), mock(IndexNameExpressionResolver.class), clusterService);
TransportService transportService = new TransportService(Settings.EMPTY, null, null, TransportService.NOOP_TRANSPORT_INTERCEPTOR,
null);
TransportMainAction action = new TransportMainAction(settings, mock(ThreadPool.class), transportService, mock(ActionFilters.class),
mock(IndexNameExpressionResolver.class), clusterService);
AtomicReference<MainResponse> responseRef = new AtomicReference<>();
action.doExecute(new MainRequest(), new ActionListener<MainResponse>() {
@Override

View File

@ -56,7 +56,8 @@ public class TransportMultiSearchActionTests extends ESTestCase {
when(actionFilters.filters()).thenReturn(new ActionFilter[0]);
ThreadPool threadPool = new ThreadPool(settings);
TaskManager taskManager = mock(TaskManager.class);
TransportService transportService = new TransportService(Settings.EMPTY, null, null, TransportService.NOOP_TRANSPORT_INTERCEPTOR) {
TransportService transportService = new TransportService(Settings.EMPTY, null, null, TransportService.NOOP_TRANSPORT_INTERCEPTOR,
null) {
@Override
public TaskManager getTaskManager() {
return taskManager;

View File

@ -191,7 +191,7 @@ public class TransportBroadcastByNodeActionTests extends ESTestCase {
transport = new CapturingTransport();
clusterService = createClusterService(THREAD_POOL);
final TransportService transportService = new TransportService(clusterService.getSettings(), transport, THREAD_POOL,
TransportService.NOOP_TRANSPORT_INTERCEPTOR);
TransportService.NOOP_TRANSPORT_INTERCEPTOR, null);
transportService.start();
transportService.acceptIncomingRequests();
setClusterState(clusterService, TEST_INDEX);

View File

@ -86,7 +86,8 @@ public class TransportMasterNodeActionTests extends ESTestCase {
super.setUp();
transport = new CapturingTransport();
clusterService = createClusterService(threadPool);
transportService = new TransportService(clusterService.getSettings(), transport, threadPool, TransportService.NOOP_TRANSPORT_INTERCEPTOR);
transportService = new TransportService(clusterService.getSettings(), transport, threadPool,
TransportService.NOOP_TRANSPORT_INTERCEPTOR, null);
transportService.start();
transportService.acceptIncomingRequests();
localNode = new DiscoveryNode("local_node", buildNewFakeTransportAddress(), Collections.emptyMap(),

View File

@ -177,7 +177,7 @@ public class TransportNodesActionTests extends ESTestCase {
transport = new CapturingTransport();
clusterService = createClusterService(THREAD_POOL);
transportService = new TransportService(clusterService.getSettings(), transport, THREAD_POOL,
TransportService.NOOP_TRANSPORT_INTERCEPTOR);
TransportService.NOOP_TRANSPORT_INTERCEPTOR, null);
transportService.start();
transportService.acceptIncomingRequests();
int numNodes = randomIntBetween(3, 10);

View File

@ -96,10 +96,12 @@ public class BroadcastReplicationTests extends ESTestCase {
threadPool, BigArrays.NON_RECYCLING_INSTANCE, circuitBreakerService, new NamedWriteableRegistry(Collections.emptyList()),
new NetworkService(Settings.EMPTY, Collections.emptyList()));
clusterService = createClusterService(threadPool);
transportService = new TransportService(clusterService.getSettings(), transport, threadPool, TransportService.NOOP_TRANSPORT_INTERCEPTOR);
transportService = new TransportService(clusterService.getSettings(), transport, threadPool,
TransportService.NOOP_TRANSPORT_INTERCEPTOR, null);
transportService.start();
transportService.acceptIncomingRequests();
broadcastReplicationAction = new TestBroadcastReplicationAction(Settings.EMPTY, threadPool, clusterService, transportService, new ActionFilters(new HashSet<ActionFilter>()), new IndexNameExpressionResolver(Settings.EMPTY), null);
broadcastReplicationAction = new TestBroadcastReplicationAction(Settings.EMPTY, threadPool, clusterService, transportService,
new ActionFilters(new HashSet<ActionFilter>()), new IndexNameExpressionResolver(Settings.EMPTY), null);
}
@After

View File

@ -149,7 +149,7 @@ public class TransportReplicationActionTests extends ESTestCase {
transport = new CapturingTransport();
clusterService = createClusterService(threadPool);
transportService = new TransportService(clusterService.getSettings(), transport, threadPool,
TransportService.NOOP_TRANSPORT_INTERCEPTOR);
TransportService.NOOP_TRANSPORT_INTERCEPTOR, null);
transportService.start();
transportService.acceptIncomingRequests();
shardStateAction = new ShardStateAction(Settings.EMPTY, clusterService, transportService, null, null, threadPool);

View File

@ -130,9 +130,10 @@ public class TransportWriteActionTests extends ESTestCase {
private class TestAction extends TransportWriteAction<TestRequest, TestResponse> {
protected TestAction() {
super(Settings.EMPTY, "test", new TransportService(Settings.EMPTY, null, null, TransportService.NOOP_TRANSPORT_INTERCEPTOR),
null, null, null, null, new ActionFilters(new HashSet<>()),
new IndexNameExpressionResolver(Settings.EMPTY), TestRequest::new, ThreadPool.Names.SAME);
super(Settings.EMPTY, "test",
new TransportService(Settings.EMPTY, null, null, TransportService.NOOP_TRANSPORT_INTERCEPTOR, null), null, null, null,
null, new ActionFilters(new HashSet<>()), new IndexNameExpressionResolver(Settings.EMPTY), TestRequest::new,
ThreadPool.Names.SAME);
}
@Override

View File

@ -142,7 +142,8 @@ public class TransportInstanceSingleOperationActionTests extends ESTestCase {
super.setUp();
transport = new CapturingTransport();
clusterService = createClusterService(THREAD_POOL);
transportService = new TransportService(clusterService.getSettings(), transport, THREAD_POOL, TransportService.NOOP_TRANSPORT_INTERCEPTOR);
transportService = new TransportService(clusterService.getSettings(), transport, THREAD_POOL,
TransportService.NOOP_TRANSPORT_INTERCEPTOR, null);
transportService.start();
transportService.acceptIncomingRequests();
action = new TestTransportInstanceSingleOperationAction(

View File

@ -75,7 +75,7 @@ public class TransportClientHeadersTests extends AbstractClientHeadersTestCase {
@Override
protected Client buildClient(Settings headersSettings, GenericAction[] testedActions) {
transportService = MockTransportService.createNewService(Settings.EMPTY, Version.CURRENT, threadPool);
transportService = MockTransportService.createNewService(Settings.EMPTY, Version.CURRENT, threadPool, null);
transportService.start();
transportService.acceptIncomingRequests();
TransportClient client = new MockTransportClient(Settings.builder()

View File

@ -97,7 +97,7 @@ public class TransportClientNodesServiceTests extends ESTestCase {
}
};
}
});
}, null);
transportService.start();
transportService.acceptIncomingRequests();
transportClientNodesService =

View File

@ -148,7 +148,7 @@ public class NodeConnectionsServiceTests extends ESTestCase {
public void setUp() throws Exception {
super.setUp();
this.transport = new MockTransport();
transportService = new TransportService(Settings.EMPTY, transport, THREAD_POOL, TransportService.NOOP_TRANSPORT_INTERCEPTOR);
transportService = new TransportService(Settings.EMPTY, transport, THREAD_POOL, TransportService.NOOP_TRANSPORT_INTERCEPTOR, null);
transportService.start();
transportService.acceptIncomingRequests();
}

View File

@ -106,7 +106,8 @@ public class ShardStateActionTests extends ESTestCase {
super.setUp();
this.transport = new CapturingTransport();
clusterService = createClusterService(THREAD_POOL);
transportService = new TransportService(clusterService.getSettings(), transport, THREAD_POOL, TransportService.NOOP_TRANSPORT_INTERCEPTOR);
transportService = new TransportService(clusterService.getSettings(), transport, THREAD_POOL,
TransportService.NOOP_TRANSPORT_INTERCEPTOR, null);
transportService.start();
transportService.acceptIncomingRequests();
shardStateAction = new TestShardStateAction(Settings.EMPTY, clusterService, transportService, null, null);

View File

@ -93,7 +93,7 @@ public class ClusterStateHealthTests extends ESTestCase {
super.setUp();
clusterService = createClusterService(threadPool);
transportService = new TransportService(clusterService.getSettings(), new CapturingTransport(), threadPool,
TransportService.NOOP_TRANSPORT_INTERCEPTOR);
TransportService.NOOP_TRANSPORT_INTERCEPTOR, null);
transportService.start();
transportService.acceptIncomingRequests();
}

View File

@ -145,7 +145,7 @@ public class ZenFaultDetectionTests extends ESTestCase {
.build(),
new MockTcpTransport(settings, threadPool, BigArrays.NON_RECYCLING_INSTANCE, circuitBreakerService,
namedWriteableRegistry, new NetworkService(settings, Collections.emptyList()), version),
threadPool, TransportService.NOOP_TRANSPORT_INTERCEPTOR);
threadPool, TransportService.NOOP_TRANSPORT_INTERCEPTOR, null);
transportService.start();
transportService.acceptIncomingRequests();
return transportService;

View File

@ -154,7 +154,7 @@ public class ZenDiscoveryUnitTests extends ESTestCase {
try {
Set<DiscoveryNode> expectedFDNodes = null;
final MockTransportService masterTransport = MockTransportService.createNewService(settings, Version.CURRENT, threadPool);
final MockTransportService masterTransport = MockTransportService.createNewService(settings, Version.CURRENT, threadPool, null);
masterTransport.start();
DiscoveryNode masterNode = new DiscoveryNode("master", masterTransport.boundAddress().publishAddress(), Version.CURRENT);
toClose.add(masterTransport);
@ -170,7 +170,7 @@ public class ZenDiscoveryUnitTests extends ESTestCase {
toClose.add(masterZen);
masterTransport.acceptIncomingRequests();
final MockTransportService otherTransport = MockTransportService.createNewService(settings, Version.CURRENT, threadPool);
final MockTransportService otherTransport = MockTransportService.createNewService(settings, Version.CURRENT, threadPool, null);
otherTransport.start();
toClose.add(otherTransport);
DiscoveryNode otherNode = new DiscoveryNode("other", otherTransport.boundAddress().publishAddress(), Version.CURRENT);

View File

@ -210,7 +210,7 @@ public class UnicastZenPingTests extends ESTestCase {
MockTcpTransport transport = new MockTcpTransport(settings, threadPool, BigArrays.NON_RECYCLING_INSTANCE,
new NoneCircuitBreakerService(), new NamedWriteableRegistry(Collections.emptyList()), networkService, version);
final TransportService transportService = new TransportService(settings, transport, threadPool,
TransportService.NOOP_TRANSPORT_INTERCEPTOR);
TransportService.NOOP_TRANSPORT_INTERCEPTOR, null);
transportService.start();
transportService.acceptIncomingRequests();
ConcurrentMap<TransportAddress, AtomicInteger> counters = ConcurrentCollections.newConcurrentMap();

View File

@ -229,7 +229,7 @@ public class PublishClusterStateActionTests extends ESTestCase {
}
private static MockTransportService buildTransportService(Settings settings, ThreadPool threadPool) {
MockTransportService transportService = MockTransportService.createNewService(settings, Version.CURRENT, threadPool);
MockTransportService transportService = MockTransportService.createNewService(settings, Version.CURRENT, threadPool, null );
transportService.start();
transportService.acceptIncomingRequests();
return transportService;

View File

@ -82,7 +82,7 @@ public class DynamicMappingDisabledTests extends ESSingleNodeTestCase {
new NoneCircuitBreakerService(), new NamedWriteableRegistry(Collections.emptyList()),
new NetworkService(settings, Collections.emptyList()));
transportService = new TransportService(clusterService.getSettings(), transport, THREAD_POOL,
TransportService.NOOP_TRANSPORT_INTERCEPTOR);
TransportService.NOOP_TRANSPORT_INTERCEPTOR, null);
indicesService = getInstanceFromNode(IndicesService.class);
shardStateAction = new ShardStateAction(settings, clusterService, transportService, null, null, THREAD_POOL);
actionFilters = new ActionFilters(Collections.emptySet());

View File

@ -150,7 +150,7 @@ public class ClusterStateChanges extends AbstractComponent {
// services
TransportService transportService = new TransportService(settings, transport, threadPool,
TransportService.NOOP_TRANSPORT_INTERCEPTOR);
TransportService.NOOP_TRANSPORT_INTERCEPTOR, clusterSettings);
MetaDataIndexUpgradeService metaDataIndexUpgradeService = new MetaDataIndexUpgradeService(settings, null, null) {
// metaData upgrader should do nothing
@Override

View File

@ -368,7 +368,7 @@ public class IndicesClusterStateServiceRandomUpdatesTests extends AbstractIndice
final MockIndicesService indicesService = indicesServiceSupplier.get();
final Settings settings = Settings.builder().put("node.name", discoveryNode.getName()).build();
final TransportService transportService = new TransportService(settings, null, threadPool,
TransportService.NOOP_TRANSPORT_INTERCEPTOR);
TransportService.NOOP_TRANSPORT_INTERCEPTOR, null);
final ClusterService clusterService = mock(ClusterService.class);
final RepositoriesService repositoriesService = new RepositoriesService(settings, clusterService,
transportService, null);

View File

@ -47,6 +47,7 @@ import org.elasticsearch.index.recovery.RecoveryStats;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.indices.recovery.RecoveryState.Stage;
import org.elasticsearch.node.RecoverySettingsChunkSizePlugin;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.snapshots.Snapshot;
import org.elasticsearch.snapshots.SnapshotState;
@ -73,6 +74,7 @@ import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import static org.elasticsearch.node.RecoverySettingsChunkSizePlugin.CHUNK_SIZE_SETTING;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
import static org.hamcrest.Matchers.equalTo;
@ -101,7 +103,8 @@ public class IndexRecoveryIT extends ESIntegTestCase {
@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return Arrays.asList(MockTransportService.TestPlugin.class, MockFSIndexStore.TestPlugin.class);
return Arrays.asList(MockTransportService.TestPlugin.class, MockFSIndexStore.TestPlugin.class,
RecoverySettingsChunkSizePlugin.class);
}
private void assertRecoveryStateWithoutStage(RecoveryState state, int shardId, RecoverySource recoverySource, boolean primary,
@ -137,26 +140,21 @@ public class IndexRecoveryIT extends ESIntegTestCase {
private void slowDownRecovery(ByteSizeValue shardSize) {
long chunkSize = Math.max(1, shardSize.getBytes() / 10);
for(RecoverySettings settings : internalCluster().getInstances(RecoverySettings.class)) {
setChunkSize(settings, new ByteSizeValue(chunkSize, ByteSizeUnit.BYTES));
}
assertTrue(client().admin().cluster().prepareUpdateSettings()
.setTransientSettings(Settings.builder()
// one chunk per sec..
.put(RecoverySettings.INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING.getKey(), chunkSize, ByteSizeUnit.BYTES)
)
.get().isAcknowledged());
// small chunks
.put(CHUNK_SIZE_SETTING.getKey(), new ByteSizeValue(chunkSize, ByteSizeUnit.BYTES))
).get().isAcknowledged());
}
private void restoreRecoverySpeed() {
for(RecoverySettings settings : internalCluster().getInstances(RecoverySettings.class)) {
setChunkSize(settings, RecoverySettings.DEFAULT_CHUNK_SIZE);
}
assertTrue(client().admin().cluster().prepareUpdateSettings()
.setTransientSettings(Settings.builder()
.put(RecoverySettings.INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING.getKey(), "20mb")
)
.get().isAcknowledged());
.put(CHUNK_SIZE_SETTING.getKey(), RecoverySettings.DEFAULT_CHUNK_SIZE)
).get().isAcknowledged());
}
public void testGatewayRecovery() throws Exception {
@ -659,8 +657,4 @@ public class IndexRecoveryIT extends ESIntegTestCase {
transport.sendRequest(node, requestId, action, request, options);
}
}
public static void setChunkSize(RecoverySettings recoverySettings, ByteSizeValue chunksSize) {
recoverySettings.setChunkSize(chunksSize);
}
}

View File

@ -0,0 +1,84 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.indices.recovery;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.indices.recovery.RecoverySettings;
import org.elasticsearch.test.ESTestCase;
import java.util.concurrent.TimeUnit;
public class RecoverySettingsDynamicUpdateTests extends ESTestCase {
private final ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
private final RecoverySettings recoverySettings = new RecoverySettings(Settings.EMPTY, clusterSettings);
public void testZeroBytesPerSecondIsNoRateLimit() {
clusterSettings.applySettings(Settings.builder().put(
RecoverySettings.INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING.getKey(), 0).build());
assertEquals(null, recoverySettings.rateLimiter());
}
public void testRetryDelayStateSync() {
long duration = between(1, 1000);
TimeUnit timeUnit = randomFrom(TimeUnit.MILLISECONDS, TimeUnit.SECONDS, TimeUnit.MINUTES, TimeUnit.HOURS);
clusterSettings.applySettings(Settings.builder().put(
RecoverySettings.INDICES_RECOVERY_RETRY_DELAY_STATE_SYNC_SETTING.getKey(), duration, timeUnit
).build());
assertEquals(new TimeValue(duration, timeUnit), recoverySettings.retryDelayStateSync());
}
public void testRetryDelayNetwork() {
long duration = between(1, 1000);
TimeUnit timeUnit = randomFrom(TimeUnit.MILLISECONDS, TimeUnit.SECONDS, TimeUnit.MINUTES, TimeUnit.HOURS);
clusterSettings.applySettings(Settings.builder().put(
RecoverySettings.INDICES_RECOVERY_RETRY_DELAY_NETWORK_SETTING.getKey(), duration, timeUnit
).build());
assertEquals(new TimeValue(duration, timeUnit), recoverySettings.retryDelayNetwork());
}
public void testActivityTimeout() {
long duration = between(1, 1000);
TimeUnit timeUnit = randomFrom(TimeUnit.MILLISECONDS, TimeUnit.SECONDS, TimeUnit.MINUTES, TimeUnit.HOURS);
clusterSettings.applySettings(Settings.builder().put(
RecoverySettings.INDICES_RECOVERY_ACTIVITY_TIMEOUT_SETTING.getKey(), duration, timeUnit
).build());
assertEquals(new TimeValue(duration, timeUnit), recoverySettings.activityTimeout());
}
public void testInternalActionTimeout() {
long duration = between(1, 1000);
TimeUnit timeUnit = randomFrom(TimeUnit.MILLISECONDS, TimeUnit.SECONDS, TimeUnit.MINUTES, TimeUnit.HOURS);
clusterSettings.applySettings(Settings.builder().put(
RecoverySettings.INDICES_RECOVERY_INTERNAL_ACTION_TIMEOUT_SETTING.getKey(), duration, timeUnit
).build());
assertEquals(new TimeValue(duration, timeUnit), recoverySettings.internalActionTimeout());
}
public void testInternalLongActionTimeout() {
long duration = between(1, 1000);
TimeUnit timeUnit = randomFrom(TimeUnit.MILLISECONDS, TimeUnit.SECONDS, TimeUnit.MINUTES, TimeUnit.HOURS);
clusterSettings.applySettings(Settings.builder().put(
RecoverySettings.INDICES_RECOVERY_INTERNAL_LONG_ACTION_TIMEOUT_SETTING.getKey(), duration, timeUnit
).build());
assertEquals(new TimeValue(duration, timeUnit), recoverySettings.internalActionLongTimeout());
}
}

View File

@ -87,7 +87,9 @@ public class IndicesStoreTests extends ESTestCase {
public void before() {
localNode = new DiscoveryNode("abc", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT);
clusterService = createClusterService(threadPool);
indicesStore = new IndicesStore(Settings.EMPTY, null, clusterService, new TransportService(clusterService.getSettings(), null, null, TransportService.NOOP_TRANSPORT_INTERCEPTOR), null);
TransportService transportService = new TransportService(clusterService.getSettings(), null, null,
TransportService.NOOP_TRANSPORT_INTERCEPTOR, null);
indicesStore = new IndicesStore(Settings.EMPTY, null, clusterService, transportService, null);
}
@After

View File

@ -1,101 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.recovery;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.indices.recovery.RecoverySettings;
import org.elasticsearch.test.ESSingleNodeTestCase;
import java.util.concurrent.TimeUnit;
public class RecoverySettingsTests extends ESSingleNodeTestCase {
@Override
protected boolean resetNodeAfterTest() {
return true;
}
public void testAllSettingsAreDynamicallyUpdatable() {
innerTestSettings(RecoverySettings.INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING.getKey(), 0, new Validator() {
@Override
public void validate(RecoverySettings recoverySettings, int expectedValue) {
assertEquals(null, recoverySettings.rateLimiter());
}
});
innerTestSettings(RecoverySettings.INDICES_RECOVERY_RETRY_DELAY_STATE_SYNC_SETTING.getKey(), randomIntBetween(1, 200), TimeUnit.MILLISECONDS, new Validator() {
@Override
public void validate(RecoverySettings recoverySettings, int expectedValue) {
assertEquals(expectedValue, recoverySettings.retryDelayStateSync().millis());
}
});
innerTestSettings(RecoverySettings.INDICES_RECOVERY_RETRY_DELAY_NETWORK_SETTING.getKey(), randomIntBetween(1, 200), TimeUnit.MILLISECONDS, new Validator() {
@Override
public void validate(RecoverySettings recoverySettings, int expectedValue) {
assertEquals(expectedValue, recoverySettings.retryDelayNetwork().millis());
}
});
innerTestSettings(RecoverySettings.INDICES_RECOVERY_ACTIVITY_TIMEOUT_SETTING.getKey(), randomIntBetween(1, 200), TimeUnit.MILLISECONDS, new Validator() {
@Override
public void validate(RecoverySettings recoverySettings, int expectedValue) {
assertEquals(expectedValue, recoverySettings.activityTimeout().millis());
}
});
innerTestSettings(RecoverySettings.INDICES_RECOVERY_INTERNAL_ACTION_TIMEOUT_SETTING.getKey(), randomIntBetween(1, 200), TimeUnit.MILLISECONDS, new Validator() {
@Override
public void validate(RecoverySettings recoverySettings, int expectedValue) {
assertEquals(expectedValue, recoverySettings.internalActionTimeout().millis());
}
});
innerTestSettings(RecoverySettings.INDICES_RECOVERY_INTERNAL_LONG_ACTION_TIMEOUT_SETTING.getKey(), randomIntBetween(1, 200), TimeUnit.MILLISECONDS, new Validator() {
@Override
public void validate(RecoverySettings recoverySettings, int expectedValue) {
assertEquals(expectedValue, recoverySettings.internalActionLongTimeout().millis());
}
});
}
private static class Validator {
public void validate(RecoverySettings recoverySettings, int expectedValue) {
}
public void validate(RecoverySettings recoverySettings, boolean expectedValue) {
}
}
private void innerTestSettings(String key, int newValue, Validator validator) {
client().admin().cluster().prepareUpdateSettings().setTransientSettings(Settings.builder().put(key, newValue)).get();
validator.validate(getInstanceFromNode(RecoverySettings.class), newValue);
}
private void innerTestSettings(String key, int newValue, TimeUnit timeUnit, Validator validator) {
client().admin().cluster().prepareUpdateSettings().setTransientSettings(Settings.builder().put(key, newValue, timeUnit)).get();
validator.validate(getInstanceFromNode(RecoverySettings.class), newValue);
}
private void innerTestSettings(String key, int newValue, ByteSizeUnit byteSizeUnit, Validator validator) {
client().admin().cluster().prepareUpdateSettings().setTransientSettings(Settings.builder().put(key, newValue, byteSizeUnit)).get();
validator.validate(getInstanceFromNode(RecoverySettings.class), newValue);
}
private void innerTestSettings(String key, boolean newValue, Validator validator) {
client().admin().cluster().prepareUpdateSettings().setTransientSettings(Settings.builder().put(key, newValue)).get();
validator.validate(getInstanceFromNode(RecoverySettings.class), newValue);
}
}

View File

@ -30,10 +30,9 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.indices.recovery.IndexRecoveryIT;
import org.elasticsearch.indices.recovery.RecoveryFileChunkRequest;
import org.elasticsearch.indices.recovery.RecoverySettings;
import org.elasticsearch.indices.recovery.PeerRecoveryTargetService;
import org.elasticsearch.indices.recovery.RecoveryFileChunkRequest;
import org.elasticsearch.node.RecoverySettingsChunkSizePlugin;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.transport.MockTransportService;
@ -51,6 +50,7 @@ import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import static org.elasticsearch.node.RecoverySettingsChunkSizePlugin.CHUNK_SIZE_SETTING;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
@ -61,7 +61,7 @@ public class TruncatedRecoveryIT extends ESIntegTestCase {
@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return Arrays.asList(MockTransportService.TestPlugin.class);
return Arrays.asList(MockTransportService.TestPlugin.class, RecoverySettingsChunkSizePlugin.class);
}
/**
@ -71,9 +71,9 @@ public class TruncatedRecoveryIT extends ESIntegTestCase {
* Later we allow full recovery to ensure we can still recover and don't run into corruptions.
*/
public void testCancelRecoveryAndResume() throws Exception {
for(RecoverySettings settings : internalCluster().getInstances(RecoverySettings.class)) {
IndexRecoveryIT.setChunkSize(settings, new ByteSizeValue(randomIntBetween(50, 300), ByteSizeUnit.BYTES));
}
assertTrue(client().admin().cluster().prepareUpdateSettings().setTransientSettings(Settings.builder()
.put(CHUNK_SIZE_SETTING.getKey(), new ByteSizeValue(randomIntBetween(50, 300), ByteSizeUnit.BYTES)))
.get().isAcknowledged());
NodesStatsResponse nodeStats = client().admin().cluster().prepareNodesStats().get();
List<NodeStats> dataNodeStats = new ArrayList<>();

View File

@ -66,7 +66,7 @@ public class TransportServiceHandshakeTests extends ESTestCase {
new NamedWriteableRegistry(Collections.emptyList()),
new NetworkService(settings, Collections.emptyList()));
TransportService transportService = new MockTransportService(settings, transport, threadPool,
TransportService.NOOP_TRANSPORT_INTERCEPTOR);
TransportService.NOOP_TRANSPORT_INTERCEPTOR, null);
transportService.start();
transportService.acceptIncomingRequests();
DiscoveryNode node =

View File

@ -67,7 +67,7 @@ public class Netty3SizeHeaderFrameDecoderTests extends ESTestCase {
new NamedWriteableRegistry(Collections.emptyList()), new NoneCircuitBreakerService());
nettyTransport.start();
TransportService transportService = new TransportService(settings, nettyTransport, threadPool,
TransportService.NOOP_TRANSPORT_INTERCEPTOR);
TransportService.NOOP_TRANSPORT_INTERCEPTOR, null);
nettyTransport.transportServiceAdapter(transportService.createAdapter());
TransportAddress[] boundAddresses = nettyTransport.boundAddress().boundAddresses();

View File

@ -66,13 +66,15 @@ public class Netty3ScheduledPingTests extends ESTestCase {
NamedWriteableRegistry registry = new NamedWriteableRegistry(Collections.emptyList());
final Netty3Transport nettyA = new Netty3Transport(settings, threadPool, new NetworkService(settings, Collections.emptyList()),
BigArrays.NON_RECYCLING_INSTANCE, registry, circuitBreakerService);
MockTransportService serviceA = new MockTransportService(settings, nettyA, threadPool, TransportService.NOOP_TRANSPORT_INTERCEPTOR);
MockTransportService serviceA = new MockTransportService(settings, nettyA, threadPool, TransportService.NOOP_TRANSPORT_INTERCEPTOR,
null);
serviceA.start();
serviceA.acceptIncomingRequests();
final Netty3Transport nettyB = new Netty3Transport(settings, threadPool, new NetworkService(settings, Collections.emptyList()),
BigArrays.NON_RECYCLING_INSTANCE, registry, circuitBreakerService);
MockTransportService serviceB = new MockTransportService(settings, nettyB, threadPool, TransportService.NOOP_TRANSPORT_INTERCEPTOR);
MockTransportService serviceB = new MockTransportService(settings, nettyB, threadPool, TransportService.NOOP_TRANSPORT_INTERCEPTOR,
null);
serviceB.start();
serviceB.acceptIncomingRequests();

View File

@ -23,6 +23,7 @@ import org.elasticsearch.Version;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.network.NetworkService;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.util.BigArrays;
@ -45,9 +46,8 @@ import static org.hamcrest.Matchers.containsString;
public class SimpleNetty3TransportTests extends AbstractSimpleTransportTestCase {
public static MockTransportService nettyFromThreadPool(
Settings settings,
ThreadPool threadPool, final Version version) {
public static MockTransportService nettyFromThreadPool(Settings settings, ThreadPool threadPool, final Version version,
ClusterSettings clusterSettings) {
NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry(Collections.emptyList());
Transport transport = new Netty3Transport(settings, threadPool, new NetworkService(settings, Collections.emptyList()),
BigArrays.NON_RECYCLING_INSTANCE, namedWriteableRegistry, new NoneCircuitBreakerService()) {
@ -56,13 +56,14 @@ public class SimpleNetty3TransportTests extends AbstractSimpleTransportTestCase
return version;
}
};
return new MockTransportService(Settings.EMPTY, transport, threadPool, TransportService.NOOP_TRANSPORT_INTERCEPTOR);
return new MockTransportService(Settings.EMPTY, transport, threadPool, TransportService.NOOP_TRANSPORT_INTERCEPTOR,
clusterSettings);
}
@Override
protected MockTransportService build(Settings settings, Version version) {
protected MockTransportService build(Settings settings, Version version, ClusterSettings clusterSettings) {
settings = Settings.builder().put(settings).put(TransportSettings.PORT.getKey(), "0").build();
MockTransportService transportService = nettyFromThreadPool(settings, threadPool, version);
MockTransportService transportService = nettyFromThreadPool(settings, threadPool, version, clusterSettings);
transportService.start();
return transportService;
}

View File

@ -66,13 +66,15 @@ public class Netty4ScheduledPingTests extends ESTestCase {
NamedWriteableRegistry registry = new NamedWriteableRegistry(Collections.emptyList());
final Netty4Transport nettyA = new Netty4Transport(settings, threadPool, new NetworkService(settings, Collections.emptyList()),
BigArrays.NON_RECYCLING_INSTANCE, registry, circuitBreakerService);
MockTransportService serviceA = new MockTransportService(settings, nettyA, threadPool, TransportService.NOOP_TRANSPORT_INTERCEPTOR);
MockTransportService serviceA = new MockTransportService(settings, nettyA, threadPool, TransportService.NOOP_TRANSPORT_INTERCEPTOR,
null);
serviceA.start();
serviceA.acceptIncomingRequests();
final Netty4Transport nettyB = new Netty4Transport(settings, threadPool, new NetworkService(settings, Collections.emptyList()),
BigArrays.NON_RECYCLING_INSTANCE, registry, circuitBreakerService);
MockTransportService serviceB = new MockTransportService(settings, nettyB, threadPool, TransportService.NOOP_TRANSPORT_INTERCEPTOR);
MockTransportService serviceB = new MockTransportService(settings, nettyB, threadPool, TransportService.NOOP_TRANSPORT_INTERCEPTOR,
null);
serviceB.start();
serviceB.acceptIncomingRequests();

View File

@ -23,6 +23,7 @@ import org.elasticsearch.Version;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.network.NetworkService;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.util.BigArrays;
@ -45,9 +46,8 @@ import static org.hamcrest.Matchers.containsString;
public class SimpleNetty4TransportTests extends AbstractSimpleTransportTestCase {
public static MockTransportService nettyFromThreadPool(
Settings settings,
ThreadPool threadPool, final Version version) {
public static MockTransportService nettyFromThreadPool(Settings settings, ThreadPool threadPool, final Version version,
ClusterSettings clusterSettings) {
NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry(Collections.emptyList());
Transport transport = new Netty4Transport(settings, threadPool, new NetworkService(settings, Collections.emptyList()),
BigArrays.NON_RECYCLING_INSTANCE, namedWriteableRegistry, new NoneCircuitBreakerService()) {
@ -56,13 +56,14 @@ public class SimpleNetty4TransportTests extends AbstractSimpleTransportTestCase
return version;
}
};
return new MockTransportService(Settings.EMPTY, transport, threadPool, TransportService.NOOP_TRANSPORT_INTERCEPTOR);
return new MockTransportService(Settings.EMPTY, transport, threadPool, TransportService.NOOP_TRANSPORT_INTERCEPTOR,
clusterSettings);
}
@Override
protected MockTransportService build(Settings settings, Version version) {
protected MockTransportService build(Settings settings, Version version, ClusterSettings clusterSettings) {
settings = Settings.builder().put(settings).put(TransportSettings.PORT.getKey(), "0").build();
MockTransportService transportService = nettyFromThreadPool(settings, threadPool, version);
MockTransportService transportService = nettyFromThreadPool(settings, threadPool, version, clusterSettings);
transportService.start();
return transportService;
}

View File

@ -86,7 +86,8 @@ public class Ec2DiscoveryTests extends ESTestCase {
return new TransportAddress[] {poorMansDNS.getOrDefault(address, buildNewFakeTransportAddress())};
}
};
transportService = new MockTransportService(Settings.EMPTY, transport, threadPool, TransportService.NOOP_TRANSPORT_INTERCEPTOR);
transportService = new MockTransportService(Settings.EMPTY, transport, threadPool, TransportService.NOOP_TRANSPORT_INTERCEPTOR,
null);
}
protected List<DiscoveryNode> buildDynamicNodes(Settings nodeSettings, int nodes) {

View File

@ -74,7 +74,8 @@ public class FileBasedUnicastHostsProviderTests extends ESTestCase {
new NoneCircuitBreakerService(),
new NamedWriteableRegistry(Collections.emptyList()),
new NetworkService(Settings.EMPTY, Collections.emptyList()));
transportService = new MockTransportService(Settings.EMPTY, transport, threadPool, TransportService.NOOP_TRANSPORT_INTERCEPTOR);
transportService = new MockTransportService(Settings.EMPTY, transport, threadPool, TransportService.NOOP_TRANSPORT_INTERCEPTOR,
null);
}
public void testBuildDynamicNodes() throws Exception {

View File

@ -94,7 +94,7 @@ public class GceDiscoveryTests extends ESTestCase {
@Before
public void createTransportService() {
transportService = MockTransportService.createNewService(Settings.EMPTY, Version.CURRENT, threadPool);
transportService = MockTransportService.createNewService(Settings.EMPTY, Version.CURRENT, threadPool, null);
}
@After

View File

@ -27,7 +27,6 @@ import java.io.IOException;
import java.util.Locale;
public class GoogleCloudStorageBlobStoreContainerTests extends ESBlobStoreContainerTestCase {
@Override
protected BlobStore newBlobStore() throws IOException {
String bucket = randomAsciiOfLength(randomIntBetween(1, 10)).toLowerCase(Locale.ROOT);

View File

@ -20,13 +20,13 @@
package org.elasticsearch.node;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.network.NetworkModule;
import org.elasticsearch.common.network.NetworkService;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.MockBigArrays;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.indices.breaker.CircuitBreakerService;
import org.elasticsearch.indices.recovery.RecoverySettings;
import org.elasticsearch.node.internal.InternalSettingsPreparer;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.script.ScriptService;
@ -84,15 +84,22 @@ public class MockNode extends Node {
@Override
protected TransportService newTransportService(Settings settings, Transport transport, ThreadPool threadPool,
TransportInterceptor interceptor) {
// we use the MockTransportService.TestPlugin class as a marker to create a newtwork
TransportInterceptor interceptor, ClusterSettings clusterSettings) {
// we use the MockTransportService.TestPlugin class as a marker to create a network
// module with this MockNetworkService. NetworkService is such an integral part of the systme
// we don't allow to plug it in from plugins or anything. this is a test-only override and
// can't be done in a production env.
if (getPluginsService().filterPlugins(MockTransportService.TestPlugin.class).size() == 1) {
return new MockTransportService(settings, transport, threadPool, interceptor);
if (getPluginsService().filterPlugins(MockTransportService.TestPlugin.class).isEmpty()) {
return super.newTransportService(settings, transport, threadPool, interceptor, clusterSettings);
} else {
return super.newTransportService(settings, transport, threadPool, interceptor);
return new MockTransportService(settings, transport, threadPool, interceptor, clusterSettings);
}
}
@Override
protected void processRecoverySettings(ClusterSettings clusterSettings, RecoverySettings recoverySettings) {
if (false == getPluginsService().filterPlugins(RecoverySettingsChunkSizePlugin.class).isEmpty()) {
clusterSettings.addSettingsUpdateConsumer(RecoverySettingsChunkSizePlugin.CHUNK_SIZE_SETTING, recoverySettings::setChunkSize);
}
}
}

View File

@ -0,0 +1,46 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.node;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Setting.Property;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.indices.recovery.RecoverySettings;
import org.elasticsearch.plugins.Plugin;
import java.util.List;
import static java.util.Collections.singletonList;
/**
* Marker plugin that will trigger {@link MockNode} making {@link #CHUNK_SIZE_SETTING} dynamic.
*/
public class RecoverySettingsChunkSizePlugin extends Plugin {
/**
* The chunk size. Only exposed by tests.
*/
public static final Setting<ByteSizeValue> CHUNK_SIZE_SETTING = Setting.byteSizeSetting("indices.recovery.chunk_size",
RecoverySettings.DEFAULT_CHUNK_SIZE, Property.Dynamic, Property.NodeScope);
@Override
public List<Setting<?>> getSettings() {
return singletonList(CHUNK_SIZE_SETTING);
}
}

View File

@ -20,37 +20,36 @@
package org.elasticsearch.test.transport;
import org.elasticsearch.Version;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.util.BigArray;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
import org.elasticsearch.transport.MockTcpTransport;
import org.elasticsearch.transport.TransportInterceptor;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.component.Lifecycle;
import org.elasticsearch.common.component.LifecycleListener;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.network.NetworkService;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.BoundTransportAddress;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.tasks.TaskManager;
import org.elasticsearch.test.tasks.MockTaskManager;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.ConnectTransportException;
import org.elasticsearch.transport.MockTcpTransport;
import org.elasticsearch.transport.RequestHandlerRegistry;
import org.elasticsearch.transport.Transport;
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportInterceptor;
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.TransportRequestOptions;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.transport.TransportServiceAdapter;
import java.io.IOException;
@ -84,18 +83,25 @@ public final class MockTransportService extends TransportService {
}
}
public static MockTransportService createNewService(Settings settings, Version version, ThreadPool threadPool) {
public static MockTransportService createNewService(Settings settings, Version version, ThreadPool threadPool,
@Nullable ClusterSettings clusterSettings) {
NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry(Collections.emptyList());
final Transport transport = new MockTcpTransport(settings, threadPool, BigArrays.NON_RECYCLING_INSTANCE,
new NoneCircuitBreakerService(), namedWriteableRegistry, new NetworkService(settings, Collections.emptyList()), version);
return new MockTransportService(settings, transport, threadPool, TransportService.NOOP_TRANSPORT_INTERCEPTOR);
return new MockTransportService(settings, transport, threadPool, TransportService.NOOP_TRANSPORT_INTERCEPTOR, clusterSettings);
}
private final Transport original;
@Inject
public MockTransportService(Settings settings, Transport transport, ThreadPool threadPool, TransportInterceptor interceptor) {
super(settings, new LookupTestTransport(transport), threadPool, interceptor);
/**
* Build the service.
*
* @param clusterSettings if non null the the {@linkplain TransportService} will register with the {@link ClusterSettings} for settings
* updates for {@link #TRACE_LOG_EXCLUDE_SETTING} and {@link #TRACE_LOG_INCLUDE_SETTING}.
*/
public MockTransportService(Settings settings, Transport transport, ThreadPool threadPool, TransportInterceptor interceptor,
@Nullable ClusterSettings clusterSettings) {
super(settings, new LookupTestTransport(transport), threadPool, interceptor, clusterSettings);
this.original = transport;
}

View File

@ -72,6 +72,9 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
// we use always a non-alpha or beta version here otherwise minimumCompatibilityVersion will be different for the two used versions
private static final Version CURRENT_VERSION = Version.fromString(String.valueOf(Version.CURRENT.major) + ".0.0");
protected static final Version version0 = CURRENT_VERSION.minimumCompatibilityVersion();
private ClusterSettings clusterSettings;
protected volatile DiscoveryNode nodeA;
protected volatile MockTransportService serviceA;
@ -79,17 +82,18 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
protected volatile DiscoveryNode nodeB;
protected volatile MockTransportService serviceB;
protected abstract MockTransportService build(Settings settings, Version version);
protected abstract MockTransportService build(Settings settings, Version version, ClusterSettings clusterSettings);
@Override
@Before
public void setUp() throws Exception {
super.setUp();
threadPool = new TestThreadPool(getClass().getName());
serviceA = buildService("TS_A", version0);
clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
serviceA = buildService("TS_A", version0, clusterSettings); // this one supports dynamic tracer updates
nodeA = new DiscoveryNode("TS_A", serviceA.boundAddress().publishAddress(), emptyMap(), emptySet(), version0);
// serviceA.setLocalNode(nodeA);
serviceB = buildService("TS_B", version1);
serviceB = buildService("TS_B", version1, null); // this one doesn't support dynamic tracer updates
nodeB = new DiscoveryNode("TS_B", serviceB.boundAddress().publishAddress(), emptyMap(), emptySet(), version1);
//serviceB.setLocalNode(nodeB);
// wait till all nodes are properly connected and the event has been sent, so tests in this class
@ -128,14 +132,15 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
serviceB.removeConnectionListener(waitForConnection);
}
private MockTransportService buildService(final String name, final Version version) {
private MockTransportService buildService(final String name, final Version version, ClusterSettings clusterSettings) {
MockTransportService service = build(
Settings.builder()
.put(Node.NODE_NAME_SETTING.getKey(), name)
.put(TransportService.TRACE_LOG_INCLUDE_SETTING.getKey(), "")
.put(TransportService.TRACE_LOG_EXCLUDE_SETTING.getKey(), "NOTHING")
.build(),
version);
version,
clusterSettings);
service.acceptIncomingRequests();
return service;
}
@ -582,7 +587,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
if (i % 3 == 0) {
// simulate restart of nodeB
serviceB.close();
MockTransportService newService = buildService("TS_B_" + i, version1);
MockTransportService newService = buildService("TS_B_" + i, version1, null);
newService.registerRequestHandler("test", TestRequest::new, ThreadPool.Names.SAME, ignoringRequestHandler);
serviceB = newService;
nodeB = new DiscoveryNode("TS_B_" + i, "TS_B", serviceB.boundAddress().publishAddress(), emptyMap(), emptySet(), version1);
@ -864,9 +869,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
includeSettings = "test";
excludeSettings = "DOESN'T_MATCH";
}
ClusterSettings service = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
serviceA.setDynamicSettings(service);
service.applySettings(Settings.builder()
clusterSettings.applySettings(Settings.builder()
.put(TransportService.TRACE_LOG_INCLUDE_SETTING.getKey(), includeSettings)
.put(TransportService.TRACE_LOG_EXCLUDE_SETTING.getKey(), excludeSettings)
.build());
@ -1423,7 +1426,8 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
.put(TransportService.TRACE_LOG_INCLUDE_SETTING.getKey(), "")
.put(TransportService.TRACE_LOG_EXCLUDE_SETTING.getKey(), "NOTHING")
.build(),
version0);
version0,
null);
AtomicBoolean requestProcessed = new AtomicBoolean();
service.registerRequestHandler("action", TestRequest::new, ThreadPool.Names.SAME,
(request, channel) -> {
@ -1540,7 +1544,8 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
.put(TransportService.TRACE_LOG_INCLUDE_SETTING.getKey(), "")
.put(TransportService.TRACE_LOG_EXCLUDE_SETTING.getKey(), "NOTHING")
.build(),
version0);
version0,
null);
DiscoveryNode nodeC =
new DiscoveryNode("TS_C", "TS_C", serviceC.boundAddress().publishAddress(), emptyMap(), emptySet(), version0);
serviceC.acceptIncomingRequests();

View File

@ -21,6 +21,7 @@ package org.elasticsearch.transport;
import org.elasticsearch.Version;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.network.NetworkService;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
@ -30,12 +31,12 @@ import java.util.Collections;
public class MockTcpTransportTests extends AbstractSimpleTransportTestCase {
@Override
protected MockTransportService build(Settings settings, Version version) {
protected MockTransportService build(Settings settings, Version version, ClusterSettings clusterSettings) {
NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry(Collections.emptyList());
Transport transport = new MockTcpTransport(settings, threadPool, BigArrays.NON_RECYCLING_INSTANCE,
new NoneCircuitBreakerService(), namedWriteableRegistry, new NetworkService(settings, Collections.emptyList()), version);
MockTransportService mockTransportService = new MockTransportService(Settings.EMPTY, transport, threadPool,
TransportService.NOOP_TRANSPORT_INTERCEPTOR);
TransportService.NOOP_TRANSPORT_INTERCEPTOR, clusterSettings);
mockTransportService.start();
return mockTransportService;
}