mirror of
https://github.com/honeymoose/OpenSearch.git
synced 2025-02-25 14:26:27 +00:00
Retry blocking if async indexing is rejected due to queue size
Some tests use AbstractIntegrationTest#indexRandom which sometimes uses async indexing. This can easily run into queue size based rejections on a slow box. In that case we should retry blocked indexing.
This commit is contained in:
parent
df95453430
commit
9dc59e29fa
@ -63,4 +63,9 @@ public class Tuple<V1, V2> {
|
|||||||
result = 31 * result + (v2 != null ? v2.hashCode() : 0);
|
result = 31 * result + (v2 != null ? v2.hashCode() : 0);
|
||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String toString() {
|
||||||
|
return "Tuple [v1=" + v1 + ", v2=" + v2 + "]";
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -23,6 +23,7 @@ import com.google.common.base.Joiner;
|
|||||||
import com.google.common.collect.Iterators;
|
import com.google.common.collect.Iterators;
|
||||||
import org.apache.lucene.util.AbstractRandomizedTest.IntegrationTests;
|
import org.apache.lucene.util.AbstractRandomizedTest.IntegrationTests;
|
||||||
import org.apache.lucene.util.IOUtils;
|
import org.apache.lucene.util.IOUtils;
|
||||||
|
import org.elasticsearch.ExceptionsHelper;
|
||||||
import org.elasticsearch.action.ActionListener;
|
import org.elasticsearch.action.ActionListener;
|
||||||
import org.elasticsearch.action.ActionRequestBuilder;
|
import org.elasticsearch.action.ActionRequestBuilder;
|
||||||
import org.elasticsearch.action.ActionResponse;
|
import org.elasticsearch.action.ActionResponse;
|
||||||
@ -52,8 +53,10 @@ import org.elasticsearch.cluster.routing.GroupShardsIterator;
|
|||||||
import org.elasticsearch.cluster.routing.ShardIterator;
|
import org.elasticsearch.cluster.routing.ShardIterator;
|
||||||
import org.elasticsearch.cluster.routing.ShardRouting;
|
import org.elasticsearch.cluster.routing.ShardRouting;
|
||||||
import org.elasticsearch.common.Priority;
|
import org.elasticsearch.common.Priority;
|
||||||
|
import org.elasticsearch.common.collect.Tuple;
|
||||||
import org.elasticsearch.common.settings.ImmutableSettings;
|
import org.elasticsearch.common.settings.ImmutableSettings;
|
||||||
import org.elasticsearch.common.settings.Settings;
|
import org.elasticsearch.common.settings.Settings;
|
||||||
|
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
|
||||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||||
import org.elasticsearch.index.merge.policy.*;
|
import org.elasticsearch.index.merge.policy.*;
|
||||||
import org.elasticsearch.indices.IndexAlreadyExistsException;
|
import org.elasticsearch.indices.IndexAlreadyExistsException;
|
||||||
@ -495,6 +498,7 @@ public abstract class AbstractIntegrationTest extends ElasticsearchTestCase {
|
|||||||
if (builders.length == 0) {
|
if (builders.length == 0) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
Random random = getRandom();
|
Random random = getRandom();
|
||||||
Set<String> indicesSet = new HashSet<String>();
|
Set<String> indicesSet = new HashSet<String>();
|
||||||
for (int i = 0; i < builders.length; i++) {
|
for (int i = 0; i < builders.length; i++) {
|
||||||
@ -503,21 +507,21 @@ public abstract class AbstractIntegrationTest extends ElasticsearchTestCase {
|
|||||||
final String[] indices = indicesSet.toArray(new String[0]);
|
final String[] indices = indicesSet.toArray(new String[0]);
|
||||||
List<IndexRequestBuilder> list = Arrays.asList(builders);
|
List<IndexRequestBuilder> list = Arrays.asList(builders);
|
||||||
Collections.shuffle(list, random);
|
Collections.shuffle(list, random);
|
||||||
final CopyOnWriteArrayList<Throwable> errors = new CopyOnWriteArrayList<Throwable>();
|
final CopyOnWriteArrayList<Tuple<IndexRequestBuilder, Throwable>> errors = new CopyOnWriteArrayList<Tuple<IndexRequestBuilder, Throwable>>();
|
||||||
List<CountDownLatch> latches = new ArrayList<CountDownLatch>();
|
List<CountDownLatch> latches = new ArrayList<CountDownLatch>();
|
||||||
if (frequently()) {
|
if (frequently()) {
|
||||||
logger.info("Index [{}] docs async: [{}]", list.size(), true);
|
logger.info("Index [{}] docs async: [{}]", list.size(), true);
|
||||||
final CountDownLatch latch = new CountDownLatch(list.size());
|
final CountDownLatch latch = new CountDownLatch(list.size());
|
||||||
latches.add(latch);
|
latches.add(latch);
|
||||||
for (IndexRequestBuilder indexRequestBuilder : list) {
|
for (IndexRequestBuilder indexRequestBuilder : list) {
|
||||||
indexRequestBuilder.execute(new LatchedActionListener<IndexResponse>(latch, errors));
|
indexRequestBuilder.execute(new PayloadLatchedActionListener<IndexResponse, IndexRequestBuilder>(indexRequestBuilder, latch, errors));
|
||||||
if (rarely()) {
|
if (rarely()) {
|
||||||
if (rarely()) {
|
if (rarely()) {
|
||||||
client().admin().indices().prepareRefresh(indices).setIgnoreIndices(IgnoreIndices.MISSING).execute(new LatchedActionListener<RefreshResponse>(newLatch(latches), errors));
|
client().admin().indices().prepareRefresh(indices).setIgnoreIndices(IgnoreIndices.MISSING).execute(new LatchedActionListener<RefreshResponse>(newLatch(latches)));
|
||||||
} else if (rarely()) {
|
} else if (rarely()) {
|
||||||
client().admin().indices().prepareFlush(indices).setIgnoreIndices(IgnoreIndices.MISSING).execute(new LatchedActionListener<FlushResponse>(newLatch(latches), errors));
|
client().admin().indices().prepareFlush(indices).setIgnoreIndices(IgnoreIndices.MISSING).execute(new LatchedActionListener<FlushResponse>(newLatch(latches)));
|
||||||
} else if (rarely()) {
|
} else if (rarely()) {
|
||||||
client().admin().indices().prepareOptimize(indices).setIgnoreIndices(IgnoreIndices.MISSING).setMaxNumSegments(between(1, 10)).setFlush(random.nextBoolean()).execute(new LatchedActionListener<OptimizeResponse>(newLatch(latches), errors));
|
client().admin().indices().prepareOptimize(indices).setIgnoreIndices(IgnoreIndices.MISSING).setMaxNumSegments(between(1, 10)).setFlush(random.nextBoolean()).execute(new LatchedActionListener<OptimizeResponse>(newLatch(latches)));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -528,11 +532,11 @@ public abstract class AbstractIntegrationTest extends ElasticsearchTestCase {
|
|||||||
indexRequestBuilder.execute().actionGet();
|
indexRequestBuilder.execute().actionGet();
|
||||||
if (rarely()) {
|
if (rarely()) {
|
||||||
if (rarely()) {
|
if (rarely()) {
|
||||||
client().admin().indices().prepareRefresh(indices).setIgnoreIndices(IgnoreIndices.MISSING).execute(new LatchedActionListener<RefreshResponse>(newLatch(latches), errors));
|
client().admin().indices().prepareRefresh(indices).setIgnoreIndices(IgnoreIndices.MISSING).execute(new LatchedActionListener<RefreshResponse>(newLatch(latches)));
|
||||||
} else if (rarely()) {
|
} else if (rarely()) {
|
||||||
client().admin().indices().prepareFlush(indices).setIgnoreIndices(IgnoreIndices.MISSING).execute(new LatchedActionListener<FlushResponse>(newLatch(latches), errors));
|
client().admin().indices().prepareFlush(indices).setIgnoreIndices(IgnoreIndices.MISSING).execute(new LatchedActionListener<FlushResponse>(newLatch(latches)));
|
||||||
} else if (rarely()) {
|
} else if (rarely()) {
|
||||||
client().admin().indices().prepareOptimize(indices).setIgnoreIndices(IgnoreIndices.MISSING).setMaxNumSegments(between(1, 10)).setFlush(random.nextBoolean()).execute(new LatchedActionListener<OptimizeResponse>(newLatch(latches), errors));
|
client().admin().indices().prepareOptimize(indices).setIgnoreIndices(IgnoreIndices.MISSING).setMaxNumSegments(between(1, 10)).setFlush(random.nextBoolean()).execute(new LatchedActionListener<OptimizeResponse>(newLatch(latches)));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -540,7 +544,15 @@ public abstract class AbstractIntegrationTest extends ElasticsearchTestCase {
|
|||||||
for (CountDownLatch countDownLatch : latches) {
|
for (CountDownLatch countDownLatch : latches) {
|
||||||
countDownLatch.await();
|
countDownLatch.await();
|
||||||
}
|
}
|
||||||
assertThat(errors, emptyIterable());
|
final List<Throwable> actualErrors = new ArrayList<Throwable>();
|
||||||
|
for (Tuple<IndexRequestBuilder, Throwable> tuple : errors) {
|
||||||
|
if (ExceptionsHelper.unwrapCause(tuple.v2()) instanceof EsRejectedExecutionException) {
|
||||||
|
tuple.v1().execute().actionGet(); // re-index if rejected
|
||||||
|
} else {
|
||||||
|
actualErrors.add(tuple.v2());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
assertThat(actualErrors, emptyIterable());
|
||||||
if (forceRefresh) {
|
if (forceRefresh) {
|
||||||
assertNoFailures(client().admin().indices().prepareRefresh(indices).setIgnoreIndices(IgnoreIndices.MISSING).execute().get());
|
assertNoFailures(client().admin().indices().prepareRefresh(indices).setIgnoreIndices(IgnoreIndices.MISSING).execute().get());
|
||||||
}
|
}
|
||||||
@ -551,29 +563,49 @@ public abstract class AbstractIntegrationTest extends ElasticsearchTestCase {
|
|||||||
latches.add(l);
|
latches.add(l);
|
||||||
return l;
|
return l;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
private static class LatchedActionListener<Response> implements ActionListener<Response> {
|
private class LatchedActionListener<Response> implements ActionListener<Response> {
|
||||||
private final CountDownLatch latch;
|
private final CountDownLatch latch;
|
||||||
private final CopyOnWriteArrayList<Throwable> errors;
|
|
||||||
|
|
||||||
public LatchedActionListener(CountDownLatch latch, CopyOnWriteArrayList<Throwable> errors) {
|
public LatchedActionListener(CountDownLatch latch) {
|
||||||
this.latch = latch;
|
this.latch = latch;
|
||||||
this.errors = errors;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void onResponse(Response response) {
|
public final void onResponse(Response response) {
|
||||||
latch.countDown();
|
latch.countDown();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void onFailure(Throwable e) {
|
public final void onFailure(Throwable t) {
|
||||||
try {
|
try {
|
||||||
errors.add(e);
|
logger.info("Action Failed", t);
|
||||||
|
addError(t);
|
||||||
} finally {
|
} finally {
|
||||||
latch.countDown();
|
latch.countDown();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
protected void addError(Throwable t) {
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
private class PayloadLatchedActionListener<Response, T> extends LatchedActionListener<Response> {
|
||||||
|
private final CopyOnWriteArrayList<Tuple<T, Throwable>> errors;
|
||||||
|
private final T builder;
|
||||||
|
|
||||||
|
public PayloadLatchedActionListener(T builder, CountDownLatch latch, CopyOnWriteArrayList<Tuple<T, Throwable>> errors) {
|
||||||
|
super(latch);
|
||||||
|
this.errors = errors;
|
||||||
|
this.builder = builder;
|
||||||
|
}
|
||||||
|
|
||||||
|
protected void addError(Throwable t) {
|
||||||
|
errors.add(new Tuple<T, Throwable>(builder, t));
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user