Fix ConcurrentPercolatorTests replaces CountDownLatches array by a Semaphore.

The writes to the CountDownLatches array wasn't visible by all threads when the countdown latch array slots were re-initialized.
This commit is contained in:
Martijn van Groningen 2013-07-22 11:32:40 +02:00
parent 4da7086df8
commit d310b94904

View File

@ -40,6 +40,7 @@ import org.testng.annotations.Test;
import java.util.Random; import java.util.Random;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
@ -336,14 +337,11 @@ public class ConcurrentPercolatorTests extends AbstractNodesTests {
final int numberPercolateOperation = 100; final int numberPercolateOperation = 100;
final AtomicBoolean assertionFailure = new AtomicBoolean(false); final AtomicBoolean assertionFailure = new AtomicBoolean(false);
final CountDownLatch start = new CountDownLatch(1);
final AtomicInteger indexOperations = new AtomicInteger(); final AtomicInteger indexOperations = new AtomicInteger();
final AtomicInteger deleteOperations = new AtomicInteger(); final AtomicInteger deleteOperations = new AtomicInteger();
final AtomicBoolean run = new AtomicBoolean(true); final AtomicBoolean run = new AtomicBoolean(true);
final AtomicBoolean freeze = new AtomicBoolean(false);
final CountDownLatch[] latches = new CountDownLatch[2];
Thread[] indexThreads = new Thread[numIndexThreads]; Thread[] indexThreads = new Thread[numIndexThreads];
final Semaphore semaphore = new Semaphore(numIndexThreads, true);
for (int i = 0; i < indexThreads.length; i++) { for (int i = 0; i < indexThreads.length; i++) {
Runnable r = new Runnable() { Runnable r = new Runnable() {
@Override @Override
@ -352,37 +350,32 @@ public class ConcurrentPercolatorTests extends AbstractNodesTests {
Random r = new Random(); Random r = new Random();
XContentBuilder doc = XContentFactory.jsonBuilder().startObject() XContentBuilder doc = XContentFactory.jsonBuilder().startObject()
.field("query", termQuery("field1", "value")).endObject(); .field("query", termQuery("field1", "value")).endObject();
start.await();
while (run.get()) { while (run.get()) {
if (freeze.get()) { semaphore.acquire();
latches[0].countDown(); try {
latches[1].await(); if ((indexOperations.get() - deleteOperations.get()) > 0 && r.nextInt(100) < 19) {
} String id = Integer.toString(deleteOperations.incrementAndGet());
DeleteResponse response = client().prepareDelete("index", "_percolator", id)
if ((indexOperations.get() - deleteOperations.get()) > 0 && r.nextInt(100) < 19) { .execute().actionGet();
String id = Integer.toString(deleteOperations.incrementAndGet()); assertThat(response.getId(), equalTo(id));
DeleteResponse response = client().prepareDelete("index", "_percolator", id) assertThat(response.isNotFound(), equalTo(false));
.execute().actionGet(); } else {
assertThat(response.getId(), equalTo(id)); String id = Integer.toString(indexOperations.incrementAndGet());
assertThat(response.isNotFound(), equalTo(false)); IndexResponse response = client().prepareIndex("index", "_percolator", id)
} else { .setSource(doc)
String id = Integer.toString(indexOperations.incrementAndGet()); .execute().actionGet();
IndexResponse response = client().prepareIndex("index", "_percolator", id) assertThat(response.getId(), equalTo(id));
.setSource(doc) }
.execute().actionGet(); } finally {
assertThat(response.getId(), equalTo(id)); semaphore.release();
} }
} }
} catch (InterruptedException iex) {
logger.error("indexing thread was interrupted...", iex);
run.set(false);
} catch (Throwable t) { } catch (Throwable t) {
run.set(false); run.set(false);
assertionFailure.set(true); assertionFailure.set(true);
// If percolate is locked, then make sure isn't unlocked, otherwise we hang forever
CountDownLatch percolateLatch = latches[0];
latches[0] = null;
if (percolateLatch != null) {
percolateLatch.countDown();
}
logger.error("Error in indexing thread...", t); logger.error("Error in indexing thread...", t);
} }
} }
@ -394,22 +387,22 @@ public class ConcurrentPercolatorTests extends AbstractNodesTests {
XContentBuilder percolateDoc = XContentFactory.jsonBuilder().startObject().startObject("doc") XContentBuilder percolateDoc = XContentFactory.jsonBuilder().startObject().startObject("doc")
.field("field1", "value") .field("field1", "value")
.endObject().endObject(); .endObject().endObject();
start.countDown();
for (int counter = 0; counter < numberPercolateOperation; counter++) { for (int counter = 0; counter < numberPercolateOperation; counter++) {
Thread.sleep(100); Thread.sleep(5);
latches[0] = new CountDownLatch(numIndexThreads); // Locks percolating until all indexing threads have been blocked semaphore.acquire(numIndexThreads);
latches[1] = new CountDownLatch(1); // Locks indexing threads until percolate is done. try {
freeze.set(true); if (!run.get()) {
latches[0].await(); break;
}
int atLeastExpected = indexOperations.get() - deleteOperations.get(); int atLeastExpected = indexOperations.get() - deleteOperations.get();
PercolateResponse response = client().preparePercolate("index", "type") PercolateResponse response = client().preparePercolate("index", "type")
.setSource(percolateDoc).execute().actionGet(); .setSource(percolateDoc).execute().actionGet();
assertThat(response.getShardFailures(), emptyArray()); assertThat(response.getShardFailures(), emptyArray());
assertThat(response.getSuccessfulShards(), equalTo(response.getTotalShards())); assertThat(response.getSuccessfulShards(), equalTo(response.getTotalShards()));
assertThat(response.getMatches().length, equalTo(atLeastExpected)); assertThat(response.getMatches().length, equalTo(atLeastExpected));
freeze.set(false); } finally {
latches[1].countDown(); semaphore.release(numIndexThreads);
}
} }
run.set(false); run.set(false);
for (Thread thread : indexThreads) { for (Thread thread : indexThreads) {