[reindex] Properly register status

Without this commit fetching the status of a reindex from a node that isn't
coordinating the reindex will fail. This commit properly registers reindex's
status so this doesn't happen. To do so it moves all task status registration
into NetworkModule and creates a method to register other statuses which the
reindex plugin calls.
This commit is contained in:
Nik Everett 2016-03-15 18:15:14 -04:00
parent d83e12094e
commit 7197172047
13 changed files with 107 additions and 34 deletions

View File

@ -22,6 +22,7 @@ package org.elasticsearch.common.network;
import java.util.Arrays;
import java.util.List;
import org.elasticsearch.action.support.replication.ReplicationTask;
import org.elasticsearch.client.transport.TransportClientNodesService;
import org.elasticsearch.client.transport.support.TransportProxyClient;
import org.elasticsearch.cluster.node.DiscoveryNode;
@ -139,6 +140,7 @@ import org.elasticsearch.rest.action.template.RestPutSearchTemplateAction;
import org.elasticsearch.rest.action.termvectors.RestMultiTermVectorsAction;
import org.elasticsearch.rest.action.termvectors.RestTermVectorsAction;
import org.elasticsearch.rest.action.update.RestUpdateAction;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.transport.Transport;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.transport.local.LocalTransport;
@ -326,6 +328,7 @@ public class NetworkModule extends AbstractModule {
registerTransportService(NETTY_TRANSPORT, TransportService.class);
registerTransport(LOCAL_TRANSPORT, LocalTransport.class);
registerTransport(NETTY_TRANSPORT, NettyTransport.class);
registerTaskStatus(ReplicationTask.Status.PROTOTYPE);
if (transportClient == false) {
registerHttpTransport(NETTY_TRANSPORT, NettyHttpServerTransport.class);
@ -371,6 +374,10 @@ public class NetworkModule extends AbstractModule {
}
}
public void registerTaskStatus(Task.Status prototype) {
namedWriteableRegistry.registerPrototype(Task.Status.class, prototype);
}
@Override
protected void configure() {
bind(NetworkService.class).toInstance(networkService);

View File

@ -20,13 +20,11 @@
package org.elasticsearch.transport;
import org.elasticsearch.action.admin.cluster.node.liveness.TransportLivenessAction;
import org.elasticsearch.action.support.replication.ReplicationTask;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.collect.MapBuilder;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.metrics.MeanMetric;
@ -43,7 +41,6 @@ import org.elasticsearch.common.util.concurrent.ConcurrentMapLong;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.common.util.concurrent.FutureUtils;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskManager;
import org.elasticsearch.threadpool.ThreadPool;
@ -113,11 +110,11 @@ public class TransportService extends AbstractLifecycleComponent<TransportServic
volatile DiscoveryNode localNode = null;
public TransportService(Transport transport, ThreadPool threadPool) {
this(EMPTY_SETTINGS, transport, threadPool, new NamedWriteableRegistry());
this(EMPTY_SETTINGS, transport, threadPool);
}
@Inject
public TransportService(Settings settings, Transport transport, ThreadPool threadPool, NamedWriteableRegistry namedWriteableRegistry) {
public TransportService(Settings settings, Transport transport, ThreadPool threadPool) {
super(settings);
this.transport = transport;
this.threadPool = threadPool;
@ -126,7 +123,6 @@ public class TransportService extends AbstractLifecycleComponent<TransportServic
tracerLog = Loggers.getLogger(logger, ".tracer");
adapter = createAdapter();
taskManager = createTaskManager();
namedWriteableRegistry.registerPrototype(Task.Status.class, ReplicationTask.Status.PROTOTYPE);
}
/**

View File

@ -183,7 +183,7 @@ public abstract class TaskManagerTestCase extends ESTestCase {
public TestNode(String name, ThreadPool threadPool, Settings settings) {
transportService = new TransportService(settings,
new LocalTransport(settings, threadPool, Version.CURRENT, new NamedWriteableRegistry()),
threadPool, new NamedWriteableRegistry()) {
threadPool) {
@Override
protected TaskManager createTaskManager() {
if (MockTaskManager.USE_MOCK_TASK_MANAGER_SETTING.get(settings)) {

View File

@ -128,8 +128,8 @@ public class TransportClientHeadersTests extends AbstractClientHeadersTestCase {
CountDownLatch clusterStateLatch = new CountDownLatch(1);
@Inject
public InternalTransportService(Settings settings, Transport transport, ThreadPool threadPool, NamedWriteableRegistry namedWriteableRegistry) {
super(settings, transport, threadPool, namedWriteableRegistry);
public InternalTransportService(Settings settings, Transport transport, ThreadPool threadPool) {
super(settings, transport, threadPool);
}
@Override @SuppressWarnings("unchecked")

View File

@ -71,7 +71,7 @@ public class TransportClientNodesServiceTests extends ESTestCase {
return new TestResponse();
}
};
transportService = new TransportService(Settings.EMPTY, transport, threadPool, new NamedWriteableRegistry());
transportService = new TransportService(Settings.EMPTY, transport, threadPool);
transportService.start();
transportService.acceptIncomingRequests();
transportClientNodesService = new TransportClientNodesService(Settings.EMPTY, ClusterName.DEFAULT, transportService, threadPool, Version.CURRENT);

View File

@ -19,12 +19,18 @@
package org.elasticsearch.common.network;
import org.elasticsearch.action.support.replication.ReplicationTask;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.Table;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.inject.ModuleTestCase;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.BoundTransportAddress;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.ToXContent.Params;
import org.elasticsearch.http.HttpInfo;
import org.elasticsearch.http.HttpServerAdapter;
import org.elasticsearch.http.HttpServerTransport;
@ -36,10 +42,16 @@ import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.action.cat.AbstractCatAction;
import org.elasticsearch.rest.action.cat.RestNodesAction;
import org.elasticsearch.rest.action.main.RestMainAction;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.Task.Status;
import org.elasticsearch.test.transport.AssertingLocalTransport;
import org.elasticsearch.transport.Transport;
import org.elasticsearch.transport.TransportService;
import java.io.IOException;
import static org.hamcrest.Matchers.sameInstance;
public class NetworkModuleTests extends ModuleTestCase {
static class FakeTransportService extends TransportService {
@ -104,36 +116,36 @@ public class NetworkModuleTests extends ModuleTestCase {
public void testRegisterTransportService() {
Settings settings = Settings.builder().put(NetworkModule.TRANSPORT_SERVICE_TYPE_KEY, "custom").build();
NetworkModule module = new NetworkModule(new NetworkService(settings), settings, false, null);
NetworkModule module = new NetworkModule(new NetworkService(settings), settings, false, new NamedWriteableRegistry());
module.registerTransportService("custom", FakeTransportService.class);
assertBinding(module, TransportService.class, FakeTransportService.class);
// check it works with transport only as well
module = new NetworkModule(new NetworkService(settings), settings, true, null);
module = new NetworkModule(new NetworkService(settings), settings, true, new NamedWriteableRegistry());
module.registerTransportService("custom", FakeTransportService.class);
assertBinding(module, TransportService.class, FakeTransportService.class);
}
public void testRegisterTransport() {
Settings settings = Settings.builder().put(NetworkModule.TRANSPORT_TYPE_KEY, "custom").build();
NetworkModule module = new NetworkModule(new NetworkService(settings), settings, false, null);
NetworkModule module = new NetworkModule(new NetworkService(settings), settings, false, new NamedWriteableRegistry());
module.registerTransport("custom", FakeTransport.class);
assertBinding(module, Transport.class, FakeTransport.class);
// check it works with transport only as well
module = new NetworkModule(new NetworkService(settings), settings, true, null);
module = new NetworkModule(new NetworkService(settings), settings, true, new NamedWriteableRegistry());
module.registerTransport("custom", FakeTransport.class);
assertBinding(module, Transport.class, FakeTransport.class);
}
public void testRegisterHttpTransport() {
Settings settings = Settings.builder().put(NetworkModule.HTTP_TYPE_SETTING.getKey(), "custom").build();
NetworkModule module = new NetworkModule(new NetworkService(settings), settings, false, null);
NetworkModule module = new NetworkModule(new NetworkService(settings), settings, false, new NamedWriteableRegistry());
module.registerHttpTransport("custom", FakeHttpTransport.class);
assertBinding(module, HttpServerTransport.class, FakeHttpTransport.class);
// check registration not allowed for transport only
module = new NetworkModule(new NetworkService(settings), settings, true, null);
module = new NetworkModule(new NetworkService(settings), settings, true, new NamedWriteableRegistry());
try {
module.registerHttpTransport("custom", FakeHttpTransport.class);
fail();
@ -144,19 +156,19 @@ public class NetworkModuleTests extends ModuleTestCase {
// not added if http is disabled
settings = Settings.builder().put(NetworkModule.HTTP_ENABLED.getKey(), false).build();
module = new NetworkModule(new NetworkService(settings), settings, false, null);
module = new NetworkModule(new NetworkService(settings), settings, false, new NamedWriteableRegistry());
assertNotBound(module, HttpServerTransport.class);
}
public void testRegisterRestHandler() {
Settings settings = Settings.EMPTY;
NetworkModule module = new NetworkModule(new NetworkService(settings), settings, false, null);
NetworkModule module = new NetworkModule(new NetworkService(settings), settings, false, new NamedWriteableRegistry());
module.registerRestHandler(FakeRestHandler.class);
// also check a builtin is bound
assertSetMultiBinding(module, RestHandler.class, FakeRestHandler.class, RestMainAction.class);
// check registration not allowed for transport only
module = new NetworkModule(new NetworkService(settings), settings, true, null);
module = new NetworkModule(new NetworkService(settings), settings, true, new NamedWriteableRegistry());
try {
module.registerRestHandler(FakeRestHandler.class);
fail();
@ -168,9 +180,44 @@ public class NetworkModuleTests extends ModuleTestCase {
public void testRegisterCatRestHandler() {
Settings settings = Settings.EMPTY;
NetworkModule module = new NetworkModule(new NetworkService(settings), settings, false, null);
NetworkModule module = new NetworkModule(new NetworkService(settings), settings, false, new NamedWriteableRegistry());
module.registerRestHandler(FakeCatRestHandler.class);
// also check a builtin is bound
assertSetMultiBinding(module, AbstractCatAction.class, FakeCatRestHandler.class, RestNodesAction.class);
}
public void testRegisterTaskStatus() {
NamedWriteableRegistry registry = new NamedWriteableRegistry();
Settings settings = Settings.EMPTY;
NetworkModule module = new NetworkModule(new NetworkService(settings), settings, false, registry);
// Builtin prototype comes back
assertNotNull(registry.getPrototype(Task.Status.class, ReplicationTask.Status.PROTOTYPE.getWriteableName()));
Task.Status dummy = new DummyTaskStatus();
module.registerTaskStatus(dummy);
assertThat(registry.getPrototype(Task.Status.class, "dummy"), sameInstance(dummy));
}
private class DummyTaskStatus implements Task.Status {
@Override
public String getWriteableName() {
return "dummy";
}
@Override
public Status readFrom(StreamInput in) throws IOException {
throw new UnsupportedOperationException();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
throw new UnsupportedOperationException();
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
throw new UnsupportedOperationException();
}
}
}

View File

@ -106,7 +106,7 @@ public class ZenFaultDetectionTests extends ESTestCase {
protected MockTransportService build(Settings settings, Version version) {
NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry();
MockTransportService transportService = new MockTransportService(Settings.EMPTY,
new LocalTransport(settings, threadPool, version, namedWriteableRegistry), threadPool, namedWriteableRegistry);
new LocalTransport(settings, threadPool, version, namedWriteableRegistry), threadPool);
transportService.start();
transportService.acceptIncomingRequests();
return transportService;

View File

@ -41,8 +41,8 @@ public class TransportModuleTests extends ModuleTestCase {
static class FakeTransportService extends TransportService {
@Inject
public FakeTransportService(Settings settings, Transport transport, ThreadPool threadPool, NamedWriteableRegistry namedWriteableRegistry) {
super(settings, transport, threadPool, namedWriteableRegistry);
public FakeTransportService(Settings settings, Transport transport, ThreadPool threadPool) {
super(settings, transport, threadPool);
}
}
}

View File

@ -54,13 +54,13 @@ public class NettyScheduledPingTests extends ESTestCase {
NamedWriteableRegistry registryA = new NamedWriteableRegistry();
final NettyTransport nettyA = new NettyTransport(settings, threadPool, new NetworkService(settings), BigArrays.NON_RECYCLING_INSTANCE, Version.CURRENT, registryA);
MockTransportService serviceA = new MockTransportService(settings, nettyA, threadPool, registryA);
MockTransportService serviceA = new MockTransportService(settings, nettyA, threadPool);
serviceA.start();
serviceA.acceptIncomingRequests();
NamedWriteableRegistry registryB = new NamedWriteableRegistry();
final NettyTransport nettyB = new NettyTransport(settings, threadPool, new NetworkService(settings), BigArrays.NON_RECYCLING_INSTANCE, Version.CURRENT, registryB);
MockTransportService serviceB = new MockTransportService(settings, nettyB, threadPool, registryB);
MockTransportService serviceB = new MockTransportService(settings, nettyB, threadPool);
serviceB.start();
serviceB.acceptIncomingRequests();

View File

@ -785,8 +785,8 @@ public class IndicesRequestTests extends ESIntegTestCase {
private final Map<String, List<TransportRequest>> requests = new HashMap<>();
@Inject
public InterceptingTransportService(Settings settings, Transport transport, ThreadPool threadPool, NamedWriteableRegistry namedWriteableRegistry) {
super(settings, transport, threadPool, namedWriteableRegistry);
public InterceptingTransportService(Settings settings, Transport transport, ThreadPool threadPool) {
super(settings, transport, threadPool);
}
synchronized List<TransportRequest> consumeRequests(String action) {

View File

@ -41,8 +41,9 @@ public class ReindexPlugin extends Plugin {
actionModule.registerAction(UpdateByQueryAction.INSTANCE, TransportUpdateByQueryAction.class);
}
public void onModule(NetworkModule restModule) {
restModule.registerRestHandler(RestReindexAction.class);
restModule.registerRestHandler(RestUpdateByQueryAction.class);
public void onModule(NetworkModule networkModule) {
networkModule.registerRestHandler(RestReindexAction.class);
networkModule.registerRestHandler(RestUpdateByQueryAction.class);
networkModule.registerTaskStatus(BulkByScrollTask.Status.PROTOTYPE);
}
}

View File

@ -21,7 +21,10 @@ package org.elasticsearch.index.reindex;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.ListenableActionFuture;
import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksRequest;
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse;
import org.elasticsearch.action.admin.cluster.node.tasks.list.TaskInfo;
import org.elasticsearch.index.reindex.BulkByScrollTask.Status;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.script.ExecutableScript;
import org.elasticsearch.script.NativeScriptFactory;
@ -41,7 +44,10 @@ import java.util.concurrent.TimeoutException;
import static java.util.Collections.emptyMap;
import static org.elasticsearch.test.ESIntegTestCase.client;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.hasSize;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThat;
/**
@ -76,10 +82,26 @@ public class CancelTestUtils {
// Wait until the script is on the second document.
barrier.await(30, TimeUnit.SECONDS);
// Status should show running
ListTasksResponse tasksList = client().admin().cluster().prepareListTasks().setActions(actionToCancel).setDetailed(true).get();
assertThat(tasksList.getNodeFailures(), empty());
assertThat(tasksList.getTaskFailures(), empty());
assertThat(tasksList.getTasks(), hasSize(1));
BulkByScrollTask.Status status = (Status) tasksList.getTasks().get(0).getStatus();
assertNull(status.getReasonCancelled());
// Cancel the request while the script is running. This will prevent the request from being sent at all.
List<TaskInfo> cancelledTasks = client().admin().cluster().prepareCancelTasks().setActions(actionToCancel).get().getTasks();
assertThat(cancelledTasks, hasSize(1));
// The status should now show canceled. The request will still be in the list because the script is still blocked.
tasksList = client().admin().cluster().prepareListTasks().setActions(actionToCancel).setDetailed(true).get();
assertThat(tasksList.getNodeFailures(), empty());
assertThat(tasksList.getTaskFailures(), empty());
assertThat(tasksList.getTasks(), hasSize(1));
status = (Status) tasksList.getTasks().get(0).getStatus();
assertEquals(CancelTasksRequest.DEFAULT_REASON, status.getReasonCancelled());
// Now let the next document through. It won't be sent because the request is cancelled but we need to unblock the script.
barrier.await();

View File

@ -99,22 +99,22 @@ public class MockTransportService extends TransportService {
public static MockTransportService local(Settings settings, Version version, ThreadPool threadPool) {
NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry();
Transport transport = new LocalTransport(settings, threadPool, version, namedWriteableRegistry);
return new MockTransportService(settings, transport, threadPool, namedWriteableRegistry);
return new MockTransportService(settings, transport, threadPool);
}
public static MockTransportService nettyFromThreadPool(Settings settings, Version version, ThreadPool threadPool) {
NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry();
Transport transport = new NettyTransport(settings, threadPool, new NetworkService(settings), BigArrays.NON_RECYCLING_INSTANCE,
version, namedWriteableRegistry);
return new MockTransportService(Settings.EMPTY, transport, threadPool, namedWriteableRegistry);
return new MockTransportService(Settings.EMPTY, transport, threadPool);
}
private final Transport original;
@Inject
public MockTransportService(Settings settings, Transport transport, ThreadPool threadPool, NamedWriteableRegistry namedWriteableRegistry) {
super(settings, new LookupTestTransport(transport), threadPool, namedWriteableRegistry);
public MockTransportService(Settings settings, Transport transport, ThreadPool threadPool) {
super(settings, new LookupTestTransport(transport), threadPool);
this.original = transport;
}