Fixed testConcurrentAddingAndRemovingWhilePercolating test

This commit is contained in:
Martijn van Groningen 2013-07-24 20:48:55 +02:00
parent 70bbcb4c48
commit 426c2867d9
1 changed files with 19 additions and 10 deletions

View File

@ -30,6 +30,7 @@ import org.elasticsearch.common.Priority;
import org.elasticsearch.common.network.NetworkUtils; import org.elasticsearch.common.network.NetworkUtils;
import org.elasticsearch.common.settings.ImmutableSettings; import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.test.integration.AbstractNodesTests; import org.elasticsearch.test.integration.AbstractNodesTests;
@ -37,10 +38,12 @@ import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import java.util.Random; import java.util.Random;
import java.util.Set;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Semaphore; import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder; import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder;
import static org.elasticsearch.index.query.QueryBuilders.boolQuery; import static org.elasticsearch.index.query.QueryBuilders.boolQuery;
@ -327,9 +330,9 @@ public class ConcurrentPercolatorTests extends AbstractNodesTests {
final int numIndexThreads = 3; final int numIndexThreads = 3;
final int numberPercolateOperation = 100; final int numberPercolateOperation = 100;
final AtomicBoolean assertionFailure = new AtomicBoolean(false); final AtomicReference<Throwable> exceptionHolder = new AtomicReference<Throwable>(null);
final AtomicInteger indexOperations = new AtomicInteger(); final AtomicInteger idGen = new AtomicInteger(0);
final AtomicInteger deleteOperations = new AtomicInteger(); final Set<String> liveIds = ConcurrentCollections.newConcurrentSet();
final AtomicBoolean run = new AtomicBoolean(true); final AtomicBoolean run = new AtomicBoolean(true);
Thread[] indexThreads = new Thread[numIndexThreads]; Thread[] indexThreads = new Thread[numIndexThreads];
final Semaphore semaphore = new Semaphore(numIndexThreads, true); final Semaphore semaphore = new Semaphore(numIndexThreads, true);
@ -343,17 +346,23 @@ public class ConcurrentPercolatorTests extends AbstractNodesTests {
while (run.get()) { while (run.get()) {
semaphore.acquire(); semaphore.acquire();
try { try {
if ((indexOperations.get() - deleteOperations.get()) > 0 && getRandom().nextInt(100) < 19) { if (!liveIds.isEmpty() && getRandom().nextInt(100) < 19) {
String id = Integer.toString(deleteOperations.incrementAndGet()); String id;
do {
id = Integer.toString(randomInt(idGen.get()));
} while (!liveIds.remove(id));
DeleteResponse response = client().prepareDelete("index", "_percolator", id) DeleteResponse response = client().prepareDelete("index", "_percolator", id)
.execute().actionGet(); .execute().actionGet();
assertThat(response.getId(), equalTo(id)); assertThat(response.getId(), equalTo(id));
assertThat(response.isNotFound(), equalTo(false)); assertThat("doc[" + id + "] should have been deleted, but isn't", response.isNotFound(), equalTo(false));
} else { } else {
String id = Integer.toString(indexOperations.incrementAndGet()); String id = Integer.toString(idGen.getAndIncrement());
IndexResponse response = client().prepareIndex("index", "_percolator", id) IndexResponse response = client().prepareIndex("index", "_percolator", id)
.setSource(doc) .setSource(doc)
.execute().actionGet(); .execute().actionGet();
liveIds.add(id);
assertThat(response.isCreated(), equalTo(true)); // We only add new docs
assertThat(response.getId(), equalTo(id)); assertThat(response.getId(), equalTo(id));
} }
} finally { } finally {
@ -365,7 +374,7 @@ public class ConcurrentPercolatorTests extends AbstractNodesTests {
run.set(false); run.set(false);
} catch (Throwable t) { } catch (Throwable t) {
run.set(false); run.set(false);
assertionFailure.set(true); exceptionHolder.set(t);
logger.error("Error in indexing thread...", t); logger.error("Error in indexing thread...", t);
} }
} }
@ -384,7 +393,7 @@ public class ConcurrentPercolatorTests extends AbstractNodesTests {
if (!run.get()) { if (!run.get()) {
break; break;
} }
int atLeastExpected = indexOperations.get() - deleteOperations.get(); int atLeastExpected = liveIds.size();
PercolateResponse response = client().preparePercolate("index", "type") PercolateResponse response = client().preparePercolate("index", "type")
.setSource(percolateDoc).execute().actionGet(); .setSource(percolateDoc).execute().actionGet();
assertThat(response.getShardFailures(), emptyArray()); assertThat(response.getShardFailures(), emptyArray());
@ -398,7 +407,7 @@ public class ConcurrentPercolatorTests extends AbstractNodesTests {
for (Thread thread : indexThreads) { for (Thread thread : indexThreads) {
thread.join(); thread.join();
} }
assertThat(assertionFailure.get(), equalTo(false)); assertThat("exceptionHolder should have been empty, but holds: " + exceptionHolder.toString(), exceptionHolder.get(), nullValue());
} }
@Override @Override