Merge remote-tracking branch 'es/master' into feature/ingest

This commit is contained in:
Martijn van Groningen 2016-01-06 22:17:04 +01:00
commit 1d3dfa81aa
6 changed files with 63 additions and 13 deletions

View File

@ -844,11 +844,11 @@ public abstract class TransportReplicationAction<Request extends ReplicationRequ
// we never execute replication operation locally as primary operation has already completed locally
// hence, we ignore any local shard for replication
if (nodes.localNodeId().equals(shard.currentNodeId()) == false) {
performOnReplica(shard, shard.currentNodeId());
performOnReplica(shard);
}
// send operation to relocating shard
if (shard.relocating()) {
performOnReplica(shard, shard.relocatingNodeId());
performOnReplica(shard.buildTargetRelocatingShard());
}
}
}
@ -856,9 +856,10 @@ public abstract class TransportReplicationAction<Request extends ReplicationRequ
/**
* send replica operation to target node
*/
void performOnReplica(final ShardRouting shard, final String nodeId) {
void performOnReplica(final ShardRouting shard) {
// if we don't have that node, it means that it might have failed and will be created again, in
// this case, we don't have to do the operation, and just let it failover
String nodeId = shard.currentNodeId();
if (!nodes.nodeExists(nodeId)) {
logger.trace("failed to send action [{}] on replica [{}] for request [{}] due to unknown node [{}]", transportReplicaAction, shard.shardId(), replicaRequest, nodeId);
onReplicaFailure(nodeId, null);

View File

@ -302,6 +302,10 @@ public class ShardStateAction extends AbstractComponent {
this.failure = failure;
}
public ShardRouting getShardRouting() {
return shardRouting;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);

View File

@ -254,7 +254,7 @@ public class PercolatorService extends AbstractComponent {
}
PercolatorQuery percolatorQuery = builder.build();
if (context.isOnlyCount()) {
if (context.isOnlyCount() || context.size() == 0) {
TotalHitCountCollector collector = new TotalHitCountCollector();
context.searcher().search(percolatorQuery, MultiCollector.wrap(collector, aggregatorCollector));
if (aggregatorCollector != null) {

View File

@ -65,6 +65,7 @@ import org.junit.BeforeClass;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.concurrent.CountDownLatch;
@ -75,9 +76,13 @@ import java.util.concurrent.atomic.AtomicInteger;
import static org.elasticsearch.action.support.replication.ClusterStateCreationUtils.state;
import static org.elasticsearch.action.support.replication.ClusterStateCreationUtils.stateWithStartedPrimary;
import static org.hamcrest.CoreMatchers.not;
import static org.hamcrest.Matchers.arrayWithSize;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasItem;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;
@ -486,7 +491,39 @@ public class TransportReplicationActionTests extends ESTestCase {
replicationPhase.run();
final CapturingTransport.CapturedRequest[] capturedRequests = transport.capturedRequests();
transport.clear();
assertThat(capturedRequests.length, equalTo(assignedReplicas));
HashMap<String, Request> nodesSentTo = new HashMap<>();
boolean executeOnReplica =
action.shouldExecuteReplication(clusterService.state().getMetaData().index(shardId.getIndex()).getSettings());
for (CapturingTransport.CapturedRequest capturedRequest : capturedRequests) {
// no duplicate requests
Request replicationRequest = (Request) capturedRequest.request;
assertNull(nodesSentTo.put(capturedRequest.node.getId(), replicationRequest));
// the request is hitting the correct shard
assertEquals(request.shardId, replicationRequest.shardId);
}
// no request was sent to the local node
assertThat(nodesSentTo.keySet(), not(hasItem(clusterService.state().getNodes().localNodeId())));
// requests were sent to the correct shard copies
for (ShardRouting shard : clusterService.state().getRoutingTable().shardRoutingTable(shardId.getIndex(), shardId.id())) {
if (shard.primary() == false && executeOnReplica == false) {
continue;
}
if (shard.unassigned()) {
continue;
}
if (shard.primary() == false) {
nodesSentTo.remove(shard.currentNodeId());
}
if (shard.relocating()) {
nodesSentTo.remove(shard.relocatingNodeId());
}
}
assertThat(nodesSentTo.entrySet(), is(empty()));
if (assignedReplicas > 0) {
assertThat("listener is done, but there are outstanding replicas", listener.isDone(), equalTo(false));
}
@ -511,6 +548,12 @@ public class TransportReplicationActionTests extends ESTestCase {
transport.clear();
assertEquals(1, shardFailedRequests.length);
CapturingTransport.CapturedRequest shardFailedRequest = shardFailedRequests[0];
// get the shard the request was sent to
ShardRouting routing = clusterService.state().getRoutingNodes().node(capturedRequest.node.id()).get(request.shardId.id());
// and the shard that was requested to be failed
ShardStateAction.ShardRoutingEntry shardRoutingEntry = (ShardStateAction.ShardRoutingEntry)shardFailedRequest.request;
// the shard the request was sent to and the shard to be failed should be the same
assertEquals(shardRoutingEntry.getShardRouting(), routing);
failures.add(shardFailedRequest);
transport.handleResponse(shardFailedRequest.requestId, TransportResponse.Empty.INSTANCE);
}

View File

@ -251,7 +251,7 @@ public class ConcurrentPercolatorIT extends ESIntegTestCase {
.setSource(onlyField1Doc).execute().actionGet();
assertNoFailures(response);
assertThat(response.getSuccessfulShards(), equalTo(response.getTotalShards()));
assertThat(response.getMatches().length, greaterThanOrEqualTo(atLeastExpected));
assertThat(response.getCount(), greaterThanOrEqualTo((long) atLeastExpected));
break;
case 1:
atLeastExpected = type2.get();
@ -259,7 +259,7 @@ public class ConcurrentPercolatorIT extends ESIntegTestCase {
.setSource(onlyField2Doc).execute().actionGet();
assertNoFailures(response);
assertThat(response.getSuccessfulShards(), equalTo(response.getTotalShards()));
assertThat(response.getMatches().length, greaterThanOrEqualTo(atLeastExpected));
assertThat(response.getCount(), greaterThanOrEqualTo((long) atLeastExpected));
break;
case 2:
atLeastExpected = type3.get();
@ -267,7 +267,7 @@ public class ConcurrentPercolatorIT extends ESIntegTestCase {
.setSource(field1AndField2Doc).execute().actionGet();
assertNoFailures(response);
assertThat(response.getSuccessfulShards(), equalTo(response.getTotalShards()));
assertThat(response.getMatches().length, greaterThanOrEqualTo(atLeastExpected));
assertThat(response.getCount(), greaterThanOrEqualTo((long) atLeastExpected));
break;
}
}

View File

@ -621,11 +621,13 @@ The `french` analyzer could be reimplemented as a `custom` analyzer as follows:
"analysis": {
"filter": {
"french_elision": {
"type": "elision",
"articles": [ "l", "m", "t", "qu", "n", "s",
"j", "d", "c", "jusqu", "quoiqu",
"lorsqu", "puisqu"
]
"type": "elision",
"articles_case": true,
"articles": [
"l", "m", "t", "qu", "n", "s",
"j", "d", "c", "jusqu", "quoiqu",
"lorsqu", "puisqu"
]
},
"french_stop": {
"type": "stop",