Merge branch 'master' into feature-suggest-refactoring

This commit is contained in:
Christoph Büscher 2016-03-16 15:27:02 +01:00
commit 6ddf9ae92f
17 changed files with 152 additions and 36 deletions

View File

@ -22,6 +22,7 @@ package org.elasticsearch.common.network;
import java.util.Arrays;
import java.util.List;
import org.elasticsearch.action.support.replication.ReplicationTask;
import org.elasticsearch.client.transport.TransportClientNodesService;
import org.elasticsearch.client.transport.support.TransportProxyClient;
import org.elasticsearch.cluster.node.DiscoveryNode;
@ -139,6 +140,7 @@ import org.elasticsearch.rest.action.template.RestPutSearchTemplateAction;
import org.elasticsearch.rest.action.termvectors.RestMultiTermVectorsAction;
import org.elasticsearch.rest.action.termvectors.RestTermVectorsAction;
import org.elasticsearch.rest.action.update.RestUpdateAction;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.transport.Transport;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.transport.local.LocalTransport;
@ -326,6 +328,7 @@ public class NetworkModule extends AbstractModule {
registerTransportService(NETTY_TRANSPORT, TransportService.class);
registerTransport(LOCAL_TRANSPORT, LocalTransport.class);
registerTransport(NETTY_TRANSPORT, NettyTransport.class);
registerTaskStatus(ReplicationTask.Status.PROTOTYPE);
if (transportClient == false) {
registerHttpTransport(NETTY_TRANSPORT, NettyHttpServerTransport.class);
@ -371,6 +374,10 @@ public class NetworkModule extends AbstractModule {
}
}
public void registerTaskStatus(Task.Status prototype) {
namedWriteableRegistry.registerPrototype(Task.Status.class, prototype);
}
@Override
protected void configure() {
bind(NetworkService.class).toInstance(networkService);

View File

@ -20,13 +20,11 @@
package org.elasticsearch.transport;
import org.elasticsearch.action.admin.cluster.node.liveness.TransportLivenessAction;
import org.elasticsearch.action.support.replication.ReplicationTask;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.collect.MapBuilder;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.metrics.MeanMetric;
@ -43,7 +41,6 @@ import org.elasticsearch.common.util.concurrent.ConcurrentMapLong;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.common.util.concurrent.FutureUtils;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskManager;
import org.elasticsearch.threadpool.ThreadPool;
@ -113,11 +110,11 @@ public class TransportService extends AbstractLifecycleComponent<TransportServic
volatile DiscoveryNode localNode = null;
public TransportService(Transport transport, ThreadPool threadPool) {
this(EMPTY_SETTINGS, transport, threadPool, new NamedWriteableRegistry());
this(EMPTY_SETTINGS, transport, threadPool);
}
@Inject
public TransportService(Settings settings, Transport transport, ThreadPool threadPool, NamedWriteableRegistry namedWriteableRegistry) {
public TransportService(Settings settings, Transport transport, ThreadPool threadPool) {
super(settings);
this.transport = transport;
this.threadPool = threadPool;
@ -126,7 +123,6 @@ public class TransportService extends AbstractLifecycleComponent<TransportServic
tracerLog = Loggers.getLogger(logger, ".tracer");
adapter = createAdapter();
taskManager = createTaskManager();
namedWriteableRegistry.registerPrototype(Task.Status.class, ReplicationTask.Status.PROTOTYPE);
}
/**

View File

@ -183,7 +183,7 @@ public abstract class TaskManagerTestCase extends ESTestCase {
public TestNode(String name, ThreadPool threadPool, Settings settings) {
transportService = new TransportService(settings,
new LocalTransport(settings, threadPool, Version.CURRENT, new NamedWriteableRegistry()),
threadPool, new NamedWriteableRegistry()) {
threadPool) {
@Override
protected TaskManager createTaskManager() {
if (MockTaskManager.USE_MOCK_TASK_MANAGER_SETTING.get(settings)) {

View File

@ -128,8 +128,8 @@ public class TransportClientHeadersTests extends AbstractClientHeadersTestCase {
CountDownLatch clusterStateLatch = new CountDownLatch(1);
@Inject
public InternalTransportService(Settings settings, Transport transport, ThreadPool threadPool, NamedWriteableRegistry namedWriteableRegistry) {
super(settings, transport, threadPool, namedWriteableRegistry);
public InternalTransportService(Settings settings, Transport transport, ThreadPool threadPool) {
super(settings, transport, threadPool);
}
@Override @SuppressWarnings("unchecked")

View File

@ -71,7 +71,7 @@ public class TransportClientNodesServiceTests extends ESTestCase {
return new TestResponse();
}
};
transportService = new TransportService(Settings.EMPTY, transport, threadPool, new NamedWriteableRegistry());
transportService = new TransportService(Settings.EMPTY, transport, threadPool);
transportService.start();
transportService.acceptIncomingRequests();
transportClientNodesService = new TransportClientNodesService(Settings.EMPTY, ClusterName.DEFAULT, transportService, threadPool, Version.CURRENT);

View File

@ -19,12 +19,18 @@
package org.elasticsearch.common.network;
import org.elasticsearch.action.support.replication.ReplicationTask;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.Table;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.inject.ModuleTestCase;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.BoundTransportAddress;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.ToXContent.Params;
import org.elasticsearch.http.HttpInfo;
import org.elasticsearch.http.HttpServerAdapter;
import org.elasticsearch.http.HttpServerTransport;
@ -36,10 +42,16 @@ import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.action.cat.AbstractCatAction;
import org.elasticsearch.rest.action.cat.RestNodesAction;
import org.elasticsearch.rest.action.main.RestMainAction;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.Task.Status;
import org.elasticsearch.test.transport.AssertingLocalTransport;
import org.elasticsearch.transport.Transport;
import org.elasticsearch.transport.TransportService;
import java.io.IOException;
import static org.hamcrest.Matchers.sameInstance;
public class NetworkModuleTests extends ModuleTestCase {
static class FakeTransportService extends TransportService {
@ -104,36 +116,36 @@ public class NetworkModuleTests extends ModuleTestCase {
public void testRegisterTransportService() {
Settings settings = Settings.builder().put(NetworkModule.TRANSPORT_SERVICE_TYPE_KEY, "custom").build();
NetworkModule module = new NetworkModule(new NetworkService(settings), settings, false, null);
NetworkModule module = new NetworkModule(new NetworkService(settings), settings, false, new NamedWriteableRegistry());
module.registerTransportService("custom", FakeTransportService.class);
assertBinding(module, TransportService.class, FakeTransportService.class);
// check it works with transport only as well
module = new NetworkModule(new NetworkService(settings), settings, true, null);
module = new NetworkModule(new NetworkService(settings), settings, true, new NamedWriteableRegistry());
module.registerTransportService("custom", FakeTransportService.class);
assertBinding(module, TransportService.class, FakeTransportService.class);
}
public void testRegisterTransport() {
Settings settings = Settings.builder().put(NetworkModule.TRANSPORT_TYPE_KEY, "custom").build();
NetworkModule module = new NetworkModule(new NetworkService(settings), settings, false, null);
NetworkModule module = new NetworkModule(new NetworkService(settings), settings, false, new NamedWriteableRegistry());
module.registerTransport("custom", FakeTransport.class);
assertBinding(module, Transport.class, FakeTransport.class);
// check it works with transport only as well
module = new NetworkModule(new NetworkService(settings), settings, true, null);
module = new NetworkModule(new NetworkService(settings), settings, true, new NamedWriteableRegistry());
module.registerTransport("custom", FakeTransport.class);
assertBinding(module, Transport.class, FakeTransport.class);
}
public void testRegisterHttpTransport() {
Settings settings = Settings.builder().put(NetworkModule.HTTP_TYPE_SETTING.getKey(), "custom").build();
NetworkModule module = new NetworkModule(new NetworkService(settings), settings, false, null);
NetworkModule module = new NetworkModule(new NetworkService(settings), settings, false, new NamedWriteableRegistry());
module.registerHttpTransport("custom", FakeHttpTransport.class);
assertBinding(module, HttpServerTransport.class, FakeHttpTransport.class);
// check registration not allowed for transport only
module = new NetworkModule(new NetworkService(settings), settings, true, null);
module = new NetworkModule(new NetworkService(settings), settings, true, new NamedWriteableRegistry());
try {
module.registerHttpTransport("custom", FakeHttpTransport.class);
fail();
@ -144,19 +156,19 @@ public class NetworkModuleTests extends ModuleTestCase {
// not added if http is disabled
settings = Settings.builder().put(NetworkModule.HTTP_ENABLED.getKey(), false).build();
module = new NetworkModule(new NetworkService(settings), settings, false, null);
module = new NetworkModule(new NetworkService(settings), settings, false, new NamedWriteableRegistry());
assertNotBound(module, HttpServerTransport.class);
}
public void testRegisterRestHandler() {
Settings settings = Settings.EMPTY;
NetworkModule module = new NetworkModule(new NetworkService(settings), settings, false, null);
NetworkModule module = new NetworkModule(new NetworkService(settings), settings, false, new NamedWriteableRegistry());
module.registerRestHandler(FakeRestHandler.class);
// also check a builtin is bound
assertSetMultiBinding(module, RestHandler.class, FakeRestHandler.class, RestMainAction.class);
// check registration not allowed for transport only
module = new NetworkModule(new NetworkService(settings), settings, true, null);
module = new NetworkModule(new NetworkService(settings), settings, true, new NamedWriteableRegistry());
try {
module.registerRestHandler(FakeRestHandler.class);
fail();
@ -168,9 +180,44 @@ public class NetworkModuleTests extends ModuleTestCase {
public void testRegisterCatRestHandler() {
Settings settings = Settings.EMPTY;
NetworkModule module = new NetworkModule(new NetworkService(settings), settings, false, null);
NetworkModule module = new NetworkModule(new NetworkService(settings), settings, false, new NamedWriteableRegistry());
module.registerRestHandler(FakeCatRestHandler.class);
// also check a builtin is bound
assertSetMultiBinding(module, AbstractCatAction.class, FakeCatRestHandler.class, RestNodesAction.class);
}
public void testRegisterTaskStatus() {
NamedWriteableRegistry registry = new NamedWriteableRegistry();
Settings settings = Settings.EMPTY;
NetworkModule module = new NetworkModule(new NetworkService(settings), settings, false, registry);
// Builtin prototype comes back
assertNotNull(registry.getPrototype(Task.Status.class, ReplicationTask.Status.PROTOTYPE.getWriteableName()));
Task.Status dummy = new DummyTaskStatus();
module.registerTaskStatus(dummy);
assertThat(registry.getPrototype(Task.Status.class, "dummy"), sameInstance(dummy));
}
private class DummyTaskStatus implements Task.Status {
@Override
public String getWriteableName() {
return "dummy";
}
@Override
public Status readFrom(StreamInput in) throws IOException {
throw new UnsupportedOperationException();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
throw new UnsupportedOperationException();
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
throw new UnsupportedOperationException();
}
}
}

View File

@ -106,7 +106,7 @@ public class ZenFaultDetectionTests extends ESTestCase {
protected MockTransportService build(Settings settings, Version version) {
NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry();
MockTransportService transportService = new MockTransportService(Settings.EMPTY,
new LocalTransport(settings, threadPool, version, namedWriteableRegistry), threadPool, namedWriteableRegistry);
new LocalTransport(settings, threadPool, version, namedWriteableRegistry), threadPool);
transportService.start();
transportService.acceptIncomingRequests();
return transportService;

View File

@ -41,8 +41,8 @@ public class TransportModuleTests extends ModuleTestCase {
static class FakeTransportService extends TransportService {
@Inject
public FakeTransportService(Settings settings, Transport transport, ThreadPool threadPool, NamedWriteableRegistry namedWriteableRegistry) {
super(settings, transport, threadPool, namedWriteableRegistry);
public FakeTransportService(Settings settings, Transport transport, ThreadPool threadPool) {
super(settings, transport, threadPool);
}
}
}

View File

@ -54,13 +54,13 @@ public class NettyScheduledPingTests extends ESTestCase {
NamedWriteableRegistry registryA = new NamedWriteableRegistry();
final NettyTransport nettyA = new NettyTransport(settings, threadPool, new NetworkService(settings), BigArrays.NON_RECYCLING_INSTANCE, Version.CURRENT, registryA);
MockTransportService serviceA = new MockTransportService(settings, nettyA, threadPool, registryA);
MockTransportService serviceA = new MockTransportService(settings, nettyA, threadPool);
serviceA.start();
serviceA.acceptIncomingRequests();
NamedWriteableRegistry registryB = new NamedWriteableRegistry();
final NettyTransport nettyB = new NettyTransport(settings, threadPool, new NetworkService(settings), BigArrays.NON_RECYCLING_INSTANCE, Version.CURRENT, registryB);
MockTransportService serviceB = new MockTransportService(settings, nettyB, threadPool, registryB);
MockTransportService serviceB = new MockTransportService(settings, nettyB, threadPool);
serviceB.start();
serviceB.acceptIncomingRequests();

View File

@ -67,3 +67,6 @@ include::integrations.asciidoc[]
include::authors.asciidoc[]
include::redirects.asciidoc[]

View File

@ -0,0 +1,40 @@
["appendix",role="exclude",id="redirects"]
= Deleted pages
The following pages have moved or been deleted.
[role="exclude",id="discovery-multicast"]
=== Multicast Discovery Plugin
The `multicast-discovery` plugin has been removed. Instead, configure networking
using unicast (see {ref}/modules-network.html[Network settings]) or using
one of the <<discovery,cloud discovery plugins>>.
[role="exclude",id="cloud-aws"]
=== AWS Cloud Plugin
The `cloud-aws` plugin has been split into two separate plugins:
* <<discovery-ec2>> (`discovery-ec2`)
* <<repository-s3>> (`repository-s3`)
[role="exclude",id="cloud-azure"]
=== Azure Cloud Plugin
The `cloud-azure` plugin has been split into two separate plugins:
* <<discovery-azure>> (`discovery-azure`)
* <<repository-azure>> (`repository-azure`)
[role="exclude",id="cloud-gce"]
=== GCE Cloud Plugin
The `cloud-gce` plugin has been renamed to <<discovery-gce>> (`discovery-gce`).

View File

@ -786,8 +786,8 @@ public class IndicesRequestTests extends ESIntegTestCase {
private final Map<String, List<TransportRequest>> requests = new HashMap<>();
@Inject
public InterceptingTransportService(Settings settings, Transport transport, ThreadPool threadPool, NamedWriteableRegistry namedWriteableRegistry) {
super(settings, transport, threadPool, namedWriteableRegistry);
public InterceptingTransportService(Settings settings, Transport transport, ThreadPool threadPool) {
super(settings, transport, threadPool);
}
synchronized List<TransportRequest> consumeRequests(String action) {

View File

@ -41,8 +41,9 @@ public class ReindexPlugin extends Plugin {
actionModule.registerAction(UpdateByQueryAction.INSTANCE, TransportUpdateByQueryAction.class);
}
public void onModule(NetworkModule restModule) {
restModule.registerRestHandler(RestReindexAction.class);
restModule.registerRestHandler(RestUpdateByQueryAction.class);
public void onModule(NetworkModule networkModule) {
networkModule.registerRestHandler(RestReindexAction.class);
networkModule.registerRestHandler(RestUpdateByQueryAction.class);
networkModule.registerTaskStatus(BulkByScrollTask.Status.PROTOTYPE);
}
}

View File

@ -21,7 +21,10 @@ package org.elasticsearch.index.reindex;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.ListenableActionFuture;
import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksRequest;
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse;
import org.elasticsearch.action.admin.cluster.node.tasks.list.TaskInfo;
import org.elasticsearch.index.reindex.BulkByScrollTask.Status;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.script.ExecutableScript;
import org.elasticsearch.script.NativeScriptFactory;
@ -41,7 +44,10 @@ import java.util.concurrent.TimeoutException;
import static java.util.Collections.emptyMap;
import static org.elasticsearch.test.ESIntegTestCase.client;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.hasSize;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThat;
/**
@ -76,10 +82,26 @@ public class CancelTestUtils {
// Wait until the script is on the second document.
barrier.await(30, TimeUnit.SECONDS);
// Status should show running
ListTasksResponse tasksList = client().admin().cluster().prepareListTasks().setActions(actionToCancel).setDetailed(true).get();
assertThat(tasksList.getNodeFailures(), empty());
assertThat(tasksList.getTaskFailures(), empty());
assertThat(tasksList.getTasks(), hasSize(1));
BulkByScrollTask.Status status = (Status) tasksList.getTasks().get(0).getStatus();
assertNull(status.getReasonCancelled());
// Cancel the request while the script is running. This will prevent the request from being sent at all.
List<TaskInfo> cancelledTasks = client().admin().cluster().prepareCancelTasks().setActions(actionToCancel).get().getTasks();
assertThat(cancelledTasks, hasSize(1));
// The status should now show canceled. The request will still be in the list because the script is still blocked.
tasksList = client().admin().cluster().prepareListTasks().setActions(actionToCancel).setDetailed(true).get();
assertThat(tasksList.getNodeFailures(), empty());
assertThat(tasksList.getTaskFailures(), empty());
assertThat(tasksList.getTasks(), hasSize(1));
status = (Status) tasksList.getTasks().get(0).getStatus();
assertEquals(CancelTasksRequest.DEFAULT_REASON, status.getReasonCancelled());
// Now let the next document through. It won't be sent because the request is cancelled but we need to unblock the script.
barrier.await();

View File

@ -1100,7 +1100,7 @@ public abstract class ESIntegTestCase extends ESTestCase {
// remove local node reference
masterClusterState = ClusterState.Builder.fromBytes(masterClusterStateBytes, null);
Map<String, Object> masterStateMap = convertToMap(masterClusterState);
int masterClusterStateSize = masterClusterState.toString().length();
int masterClusterStateSize = ClusterState.Builder.toBytes(masterClusterState).length;
String masterId = masterClusterState.nodes().masterNodeId();
for (Client client : cluster().getClients()) {
ClusterState localClusterState = client.admin().cluster().prepareState().all().setLocal(true).get().getState();
@ -1108,7 +1108,7 @@ public abstract class ESIntegTestCase extends ESTestCase {
// remove local node reference
localClusterState = ClusterState.Builder.fromBytes(localClusterStateBytes, null);
final Map<String, Object> localStateMap = convertToMap(localClusterState);
final int localClusterStateSize = localClusterState.toString().length();
final int localClusterStateSize = ClusterState.Builder.toBytes(localClusterState).length;
// Check that the non-master node has the same version of the cluster state as the master and
// that the master node matches the master (otherwise there is no requirement for the cluster state to match)
if (masterClusterState.version() == localClusterState.version() && masterId.equals(localClusterState.nodes().masterNodeId())) {

View File

@ -99,22 +99,22 @@ public class MockTransportService extends TransportService {
public static MockTransportService local(Settings settings, Version version, ThreadPool threadPool) {
NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry();
Transport transport = new LocalTransport(settings, threadPool, version, namedWriteableRegistry);
return new MockTransportService(settings, transport, threadPool, namedWriteableRegistry);
return new MockTransportService(settings, transport, threadPool);
}
public static MockTransportService nettyFromThreadPool(Settings settings, Version version, ThreadPool threadPool) {
NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry();
Transport transport = new NettyTransport(settings, threadPool, new NetworkService(settings), BigArrays.NON_RECYCLING_INSTANCE,
version, namedWriteableRegistry);
return new MockTransportService(Settings.EMPTY, transport, threadPool, namedWriteableRegistry);
return new MockTransportService(Settings.EMPTY, transport, threadPool);
}
private final Transport original;
@Inject
public MockTransportService(Settings settings, Transport transport, ThreadPool threadPool, NamedWriteableRegistry namedWriteableRegistry) {
super(settings, new LookupTestTransport(transport), threadPool, namedWriteableRegistry);
public MockTransportService(Settings settings, Transport transport, ThreadPool threadPool) {
super(settings, new LookupTestTransport(transport), threadPool);
this.original = transport;
}