Fixed issue #2197
This commit is contained in:
parent
b4b33bb205
commit
bbe735f2cc
|
@ -32,6 +32,8 @@ import org.elasticsearch.action.search.TransportSearchAction;
|
||||||
import org.elasticsearch.action.support.TransportAction;
|
import org.elasticsearch.action.support.TransportAction;
|
||||||
import org.elasticsearch.cluster.ClusterService;
|
import org.elasticsearch.cluster.ClusterService;
|
||||||
import org.elasticsearch.cluster.ClusterState;
|
import org.elasticsearch.cluster.ClusterState;
|
||||||
|
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||||
|
import org.elasticsearch.cluster.routing.*;
|
||||||
import org.elasticsearch.common.inject.Inject;
|
import org.elasticsearch.common.inject.Inject;
|
||||||
import org.elasticsearch.common.settings.Settings;
|
import org.elasticsearch.common.settings.Settings;
|
||||||
import org.elasticsearch.index.get.GetField;
|
import org.elasticsearch.index.get.GetField;
|
||||||
|
@ -41,12 +43,11 @@ import org.elasticsearch.index.query.BoolQueryBuilder;
|
||||||
import org.elasticsearch.index.query.MoreLikeThisFieldQueryBuilder;
|
import org.elasticsearch.index.query.MoreLikeThisFieldQueryBuilder;
|
||||||
import org.elasticsearch.indices.IndicesService;
|
import org.elasticsearch.indices.IndicesService;
|
||||||
import org.elasticsearch.threadpool.ThreadPool;
|
import org.elasticsearch.threadpool.ThreadPool;
|
||||||
import org.elasticsearch.transport.BaseTransportRequestHandler;
|
import org.elasticsearch.transport.*;
|
||||||
import org.elasticsearch.transport.TransportChannel;
|
|
||||||
import org.elasticsearch.transport.TransportService;
|
|
||||||
|
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.Iterator;
|
import java.util.Iterator;
|
||||||
|
import java.util.List;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
|
||||||
import static com.google.common.collect.Sets.newHashSet;
|
import static com.google.common.collect.Sets.newHashSet;
|
||||||
|
@ -68,6 +69,8 @@ public class TransportMoreLikeThisAction extends TransportAction<MoreLikeThisReq
|
||||||
|
|
||||||
private final ClusterService clusterService;
|
private final ClusterService clusterService;
|
||||||
|
|
||||||
|
private final TransportService transportService;
|
||||||
|
|
||||||
@Inject
|
@Inject
|
||||||
public TransportMoreLikeThisAction(Settings settings, ThreadPool threadPool, TransportSearchAction searchAction, TransportGetAction getAction,
|
public TransportMoreLikeThisAction(Settings settings, ThreadPool threadPool, TransportSearchAction searchAction, TransportGetAction getAction,
|
||||||
ClusterService clusterService, IndicesService indicesService, TransportService transportService) {
|
ClusterService clusterService, IndicesService indicesService, TransportService transportService) {
|
||||||
|
@ -76,6 +79,7 @@ public class TransportMoreLikeThisAction extends TransportAction<MoreLikeThisReq
|
||||||
this.getAction = getAction;
|
this.getAction = getAction;
|
||||||
this.indicesService = indicesService;
|
this.indicesService = indicesService;
|
||||||
this.clusterService = clusterService;
|
this.clusterService = clusterService;
|
||||||
|
this.transportService = transportService;
|
||||||
|
|
||||||
transportService.registerHandler(MoreLikeThisAction.NAME, new TransportHandler());
|
transportService.registerHandler(MoreLikeThisAction.NAME, new TransportHandler());
|
||||||
}
|
}
|
||||||
|
@ -87,6 +91,22 @@ public class TransportMoreLikeThisAction extends TransportAction<MoreLikeThisReq
|
||||||
// update to the concrete index
|
// update to the concrete index
|
||||||
final String concreteIndex = clusterState.metaData().concreteIndex(request.index());
|
final String concreteIndex = clusterState.metaData().concreteIndex(request.index());
|
||||||
|
|
||||||
|
RoutingNode routingNode = clusterState.getRoutingNodes().nodesToShards().get(clusterService.localNode().getId());
|
||||||
|
if (routingNode == null) {
|
||||||
|
redirect(request, listener, clusterState);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
boolean hasIndexLocally = false;
|
||||||
|
for (MutableShardRouting shardRouting : routingNode.shards()) {
|
||||||
|
if (concreteIndex.equals(shardRouting.index())) {
|
||||||
|
hasIndexLocally = true;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (!hasIndexLocally) {
|
||||||
|
redirect(request, listener, clusterState);
|
||||||
|
return;
|
||||||
|
}
|
||||||
Set<String> getFields = newHashSet();
|
Set<String> getFields = newHashSet();
|
||||||
if (request.fields() != null) {
|
if (request.fields() != null) {
|
||||||
Collections.addAll(getFields, request.fields());
|
Collections.addAll(getFields, request.fields());
|
||||||
|
@ -204,6 +224,39 @@ public class TransportMoreLikeThisAction extends TransportAction<MoreLikeThisReq
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Redirects the request to a data node, that has the index meta data locally available.
|
||||||
|
private void redirect(MoreLikeThisRequest request, final ActionListener<SearchResponse> listener, ClusterState clusterState) {
|
||||||
|
ShardIterator shardIterator = clusterService.operationRouting().getShards(clusterState, request.index(), request.type(), request.id(), null, null);
|
||||||
|
ShardRouting shardRouting = shardIterator.firstOrNull();
|
||||||
|
if (shardRouting == null) {
|
||||||
|
throw new ElasticSearchException("No shards for index " + request.index());
|
||||||
|
}
|
||||||
|
String nodeId = shardRouting.currentNodeId();
|
||||||
|
DiscoveryNode discoveryNode = clusterState.nodes().get(nodeId);
|
||||||
|
transportService.sendRequest(discoveryNode, MoreLikeThisAction.NAME, request, new TransportResponseHandler<SearchResponse>() {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public SearchResponse newInstance() {
|
||||||
|
return new SearchResponse();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void handleResponse(SearchResponse response) {
|
||||||
|
listener.onResponse(response);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void handleException(TransportException exp) {
|
||||||
|
listener.onFailure(exp);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String executor() {
|
||||||
|
return ThreadPool.Names.SAME;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
private void parseSource(GetResponse getResponse, final BoolQueryBuilder boolBuilder, DocumentMapper docMapper, final Set<String> fields, final MoreLikeThisRequest request) {
|
private void parseSource(GetResponse getResponse, final BoolQueryBuilder boolBuilder, DocumentMapper docMapper, final Set<String> fields, final MoreLikeThisRequest request) {
|
||||||
if (getResponse.source() == null) {
|
if (getResponse.source() == null) {
|
||||||
return;
|
return;
|
||||||
|
|
|
@ -23,6 +23,8 @@ import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
|
||||||
import org.elasticsearch.action.admin.cluster.health.ClusterHealthStatus;
|
import org.elasticsearch.action.admin.cluster.health.ClusterHealthStatus;
|
||||||
import org.elasticsearch.action.search.SearchResponse;
|
import org.elasticsearch.action.search.SearchResponse;
|
||||||
import org.elasticsearch.client.Client;
|
import org.elasticsearch.client.Client;
|
||||||
|
import org.elasticsearch.common.settings.ImmutableSettings;
|
||||||
|
import org.elasticsearch.indices.IndexMissingException;
|
||||||
import org.elasticsearch.test.integration.AbstractNodesTests;
|
import org.elasticsearch.test.integration.AbstractNodesTests;
|
||||||
import org.testng.annotations.AfterMethod;
|
import org.testng.annotations.AfterMethod;
|
||||||
import org.testng.annotations.BeforeMethod;
|
import org.testng.annotations.BeforeMethod;
|
||||||
|
@ -33,6 +35,7 @@ import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
|
||||||
import static org.elasticsearch.index.query.FilterBuilders.termFilter;
|
import static org.elasticsearch.index.query.FilterBuilders.termFilter;
|
||||||
import static org.hamcrest.MatcherAssert.assertThat;
|
import static org.hamcrest.MatcherAssert.assertThat;
|
||||||
import static org.hamcrest.Matchers.equalTo;
|
import static org.hamcrest.Matchers.equalTo;
|
||||||
|
import static org.hamcrest.Matchers.notNullValue;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
*
|
*
|
||||||
|
@ -125,4 +128,24 @@ public class MoreLikeThisActionTests extends AbstractNodesTests {
|
||||||
assertThat(mltResponse.hits().totalHits(), equalTo(1l));
|
assertThat(mltResponse.hits().totalHits(), equalTo(1l));
|
||||||
assertThat(mltResponse.hits().getAt(0).id(), equalTo("2"));
|
assertThat(mltResponse.hits().getAt(0).id(), equalTo("2"));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testMoreLikeThisIssue2197() throws Exception {
|
||||||
|
startNode("client-node", ImmutableSettings.settingsBuilder().put("node.client", true));
|
||||||
|
try {
|
||||||
|
client1.admin().indices().prepareDelete("foo").execute().actionGet();
|
||||||
|
} catch (IndexMissingException e) {}
|
||||||
|
client1.prepareIndex("foo", "bar", "1")
|
||||||
|
.setSource(jsonBuilder().startObject().startObject("foo").field("bar", "boz").endObject())
|
||||||
|
.execute().actionGet();
|
||||||
|
client1.admin().indices().prepareRefresh("foo").execute().actionGet();
|
||||||
|
|
||||||
|
SearchResponse searchResponse = client1.prepareMoreLikeThis("foo", "bar", "1").execute().actionGet();
|
||||||
|
assertThat(searchResponse, notNullValue());
|
||||||
|
Client client3 = client("client-node");
|
||||||
|
searchResponse = client3.prepareMoreLikeThis("foo", "bar", "1").execute().actionGet();
|
||||||
|
assertThat(searchResponse, notNullValue());
|
||||||
|
client3.close();
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue