From 9dc59e29fabcc1f8d936bc9299776bf9d2d875fc Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Mon, 14 Oct 2013 09:50:34 +0200 Subject: [PATCH] Retry blocking if async indexing is rejected due to queue size Some tests use AbstractIntegrationTest#indexRandom which sometimes uses async indexing. This can easily run into queue size based rejections on a slow box. In that case we should retry blocked indexing. --- .../elasticsearch/common/collect/Tuple.java | 5 ++ .../test/AbstractIntegrationTest.java | 64 ++++++++++++++----- 2 files changed, 53 insertions(+), 16 deletions(-) diff --git a/src/main/java/org/elasticsearch/common/collect/Tuple.java b/src/main/java/org/elasticsearch/common/collect/Tuple.java index 02043689537..3b4b2344da1 100644 --- a/src/main/java/org/elasticsearch/common/collect/Tuple.java +++ b/src/main/java/org/elasticsearch/common/collect/Tuple.java @@ -63,4 +63,9 @@ public class Tuple { result = 31 * result + (v2 != null ? v2.hashCode() : 0); return result; } + + @Override + public String toString() { + return "Tuple [v1=" + v1 + ", v2=" + v2 + "]"; + } } diff --git a/src/test/java/org/elasticsearch/test/AbstractIntegrationTest.java b/src/test/java/org/elasticsearch/test/AbstractIntegrationTest.java index 5e91cef5e08..bff9896b8a7 100644 --- a/src/test/java/org/elasticsearch/test/AbstractIntegrationTest.java +++ b/src/test/java/org/elasticsearch/test/AbstractIntegrationTest.java @@ -23,6 +23,7 @@ import com.google.common.base.Joiner; import com.google.common.collect.Iterators; import org.apache.lucene.util.AbstractRandomizedTest.IntegrationTests; import org.apache.lucene.util.IOUtils; +import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionRequestBuilder; import org.elasticsearch.action.ActionResponse; @@ -52,8 +53,10 @@ import org.elasticsearch.cluster.routing.GroupShardsIterator; import org.elasticsearch.cluster.routing.ShardIterator; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.common.Priority; +import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.settings.ImmutableSettings; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.index.merge.policy.*; import org.elasticsearch.indices.IndexAlreadyExistsException; @@ -495,6 +498,7 @@ public abstract class AbstractIntegrationTest extends ElasticsearchTestCase { if (builders.length == 0) { return; } + Random random = getRandom(); Set indicesSet = new HashSet(); for (int i = 0; i < builders.length; i++) { @@ -503,21 +507,21 @@ public abstract class AbstractIntegrationTest extends ElasticsearchTestCase { final String[] indices = indicesSet.toArray(new String[0]); List list = Arrays.asList(builders); Collections.shuffle(list, random); - final CopyOnWriteArrayList errors = new CopyOnWriteArrayList(); + final CopyOnWriteArrayList> errors = new CopyOnWriteArrayList>(); List latches = new ArrayList(); if (frequently()) { logger.info("Index [{}] docs async: [{}]", list.size(), true); final CountDownLatch latch = new CountDownLatch(list.size()); latches.add(latch); for (IndexRequestBuilder indexRequestBuilder : list) { - indexRequestBuilder.execute(new LatchedActionListener(latch, errors)); + indexRequestBuilder.execute(new PayloadLatchedActionListener(indexRequestBuilder, latch, errors)); if (rarely()) { if (rarely()) { - client().admin().indices().prepareRefresh(indices).setIgnoreIndices(IgnoreIndices.MISSING).execute(new LatchedActionListener(newLatch(latches), errors)); + client().admin().indices().prepareRefresh(indices).setIgnoreIndices(IgnoreIndices.MISSING).execute(new LatchedActionListener(newLatch(latches))); } else if (rarely()) { - client().admin().indices().prepareFlush(indices).setIgnoreIndices(IgnoreIndices.MISSING).execute(new LatchedActionListener(newLatch(latches), errors)); + client().admin().indices().prepareFlush(indices).setIgnoreIndices(IgnoreIndices.MISSING).execute(new LatchedActionListener(newLatch(latches))); } else if (rarely()) { - client().admin().indices().prepareOptimize(indices).setIgnoreIndices(IgnoreIndices.MISSING).setMaxNumSegments(between(1, 10)).setFlush(random.nextBoolean()).execute(new LatchedActionListener(newLatch(latches), errors)); + client().admin().indices().prepareOptimize(indices).setIgnoreIndices(IgnoreIndices.MISSING).setMaxNumSegments(between(1, 10)).setFlush(random.nextBoolean()).execute(new LatchedActionListener(newLatch(latches))); } } } @@ -528,11 +532,11 @@ public abstract class AbstractIntegrationTest extends ElasticsearchTestCase { indexRequestBuilder.execute().actionGet(); if (rarely()) { if (rarely()) { - client().admin().indices().prepareRefresh(indices).setIgnoreIndices(IgnoreIndices.MISSING).execute(new LatchedActionListener(newLatch(latches), errors)); + client().admin().indices().prepareRefresh(indices).setIgnoreIndices(IgnoreIndices.MISSING).execute(new LatchedActionListener(newLatch(latches))); } else if (rarely()) { - client().admin().indices().prepareFlush(indices).setIgnoreIndices(IgnoreIndices.MISSING).execute(new LatchedActionListener(newLatch(latches), errors)); + client().admin().indices().prepareFlush(indices).setIgnoreIndices(IgnoreIndices.MISSING).execute(new LatchedActionListener(newLatch(latches))); } else if (rarely()) { - client().admin().indices().prepareOptimize(indices).setIgnoreIndices(IgnoreIndices.MISSING).setMaxNumSegments(between(1, 10)).setFlush(random.nextBoolean()).execute(new LatchedActionListener(newLatch(latches), errors)); + client().admin().indices().prepareOptimize(indices).setIgnoreIndices(IgnoreIndices.MISSING).setMaxNumSegments(between(1, 10)).setFlush(random.nextBoolean()).execute(new LatchedActionListener(newLatch(latches))); } } } @@ -540,7 +544,15 @@ public abstract class AbstractIntegrationTest extends ElasticsearchTestCase { for (CountDownLatch countDownLatch : latches) { countDownLatch.await(); } - assertThat(errors, emptyIterable()); + final List actualErrors = new ArrayList(); + for (Tuple tuple : errors) { + if (ExceptionsHelper.unwrapCause(tuple.v2()) instanceof EsRejectedExecutionException) { + tuple.v1().execute().actionGet(); // re-index if rejected + } else { + actualErrors.add(tuple.v2()); + } + } + assertThat(actualErrors, emptyIterable()); if (forceRefresh) { assertNoFailures(client().admin().indices().prepareRefresh(indices).setIgnoreIndices(IgnoreIndices.MISSING).execute().get()); } @@ -551,29 +563,49 @@ public abstract class AbstractIntegrationTest extends ElasticsearchTestCase { latches.add(l); return l; } + + - private static class LatchedActionListener implements ActionListener { + private class LatchedActionListener implements ActionListener { private final CountDownLatch latch; - private final CopyOnWriteArrayList errors; - public LatchedActionListener(CountDownLatch latch, CopyOnWriteArrayList errors) { + public LatchedActionListener(CountDownLatch latch) { this.latch = latch; - this.errors = errors; } @Override - public void onResponse(Response response) { + public final void onResponse(Response response) { latch.countDown(); } @Override - public void onFailure(Throwable e) { + public final void onFailure(Throwable t) { try { - errors.add(e); + logger.info("Action Failed", t); + addError(t); } finally { latch.countDown(); } } + + protected void addError(Throwable t) { + } + + } + + private class PayloadLatchedActionListener extends LatchedActionListener { + private final CopyOnWriteArrayList> errors; + private final T builder; + + public PayloadLatchedActionListener(T builder, CountDownLatch latch, CopyOnWriteArrayList> errors) { + super(latch); + this.errors = errors; + this.builder = builder; + } + + protected void addError(Throwable t) { + errors.add(new Tuple(builder, t)); + } }