add heavy concurrent updates to same doc, and make sure it has the same data on all replicas

This commit is contained in:
kimchy 2011-01-04 17:44:33 +02:00
parent 45c1ab06b3
commit 9423378f27
2 changed files with 71 additions and 0 deletions

View File

@ -372,6 +372,7 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
private void retry(boolean fromClusterEvent, final ShardId shardId) {
if (!fromClusterEvent) {
// make it threaded operation so we fork on the discovery listener thread
request.beforeLocalFork();
request.operationThreaded(true);
clusterService.add(request.timeout(), new TimeoutClusterStateListener() {
@Override public void postAdded() {

View File

@ -0,0 +1,70 @@
package org.elasticsearch.test.integration.load;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.node.Node;
import org.elasticsearch.test.integration.AbstractNodesTests;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.Test;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicReference;
import static org.elasticsearch.common.settings.ImmutableSettings.*;
import static org.hamcrest.MatcherAssert.*;
import static org.hamcrest.Matchers.*;
/**
* @author kimchy (shay.banon)
*/
@Test
public class ConcurrentDocumentOperationTests extends AbstractNodesTests {
@AfterMethod public void closeNodes() {
closeAllNodes();
}
@Test public void concurrentOperationOnSameDocTest() throws Exception {
// start 5 nodes
Node[] nodes = new Node[5];
for (int i = 0; i < nodes.length; i++) {
nodes[i] = startNode(Integer.toString(i));
}
logger.info("--> create an index with 1 shard and max replicas based on nodes");
nodes[0].client().admin().indices().prepareCreate("test")
.setSettings(settingsBuilder().put("number_of_shards", 1).put("number_of_replicas", nodes.length - 1))
.execute().actionGet();
logger.info("execute concurrent updates on the same doc");
int numberOfUpdates = 100;
final AtomicReference<Throwable> failure = new AtomicReference<Throwable>();
final CountDownLatch latch = new CountDownLatch(numberOfUpdates);
for (int i = 0; i < numberOfUpdates; i++) {
nodes[0].client().prepareIndex("test", "type1", "1").setSource("field1", i).execute(new ActionListener<IndexResponse>() {
@Override public void onResponse(IndexResponse response) {
latch.countDown();
}
@Override public void onFailure(Throwable e) {
e.printStackTrace();
failure.set(e);
latch.countDown();
}
});
}
latch.await();
assertThat(failure.get(), nullValue());
nodes[0].client().admin().indices().prepareRefresh().execute().actionGet();
logger.info("done indexing, check all have the same field value");
Map masterSource = nodes[0].client().prepareGet("test", "type1", "1").execute().actionGet().sourceAsMap();
for (int i = 0; i < (nodes.length * 5); i++) {
assertThat(nodes[0].client().prepareGet("test", "type1", "1").execute().actionGet().sourceAsMap(), equalTo(masterSource));
}
}
}