cluster block with auto create index bulk action can cause bulk execution to not return

when there is a cluster block (like no master yet discovered), the bulk action doesn't properly catch the exception of inner execute to notify the listener, causing the bulk operation to hang
closes #7086
This commit is contained in:
Shay Banon 2014-07-31 16:57:49 +02:00
parent 5ea6267883
commit 739e977aa7
2 changed files with 43 additions and 16 deletions

View File

@ -96,7 +96,7 @@ public class TransportBulkAction extends HandledTransportAction<BulkRequest, Bul
@Override
protected void doExecute(final BulkRequest bulkRequest, final ActionListener<BulkResponse> listener) {
final long startTime = System.currentTimeMillis();
final AtomicArray<BulkItemResponse> responses = new AtomicArray<BulkItemResponse>(bulkRequest.requests.size());
final AtomicArray<BulkItemResponse> responses = new AtomicArray<>(bulkRequest.requests.size());
if (autoCreateIndex.needToCheck()) {
final Set<String> indices = Sets.newHashSet();
@ -125,7 +125,7 @@ public class TransportBulkAction extends HandledTransportAction<BulkRequest, Bul
ClusterState state = clusterService.state();
for (final String index : indices) {
if (autoCreateIndex.shouldAutoCreate(index, state)) {
createIndexAction.execute(new CreateIndexRequest(index).cause("auto(bulk api)"), new ActionListener<CreateIndexResponse>() {
createIndexAction.execute(new CreateIndexRequest(index).cause("auto(bulk api)").masterNodeTimeout(bulkRequest.timeout()), new ActionListener<CreateIndexResponse>() {
@Override
public void onResponse(CreateIndexResponse result) {
if (counter.decrementAndGet() == 0) {
@ -145,7 +145,11 @@ public class TransportBulkAction extends HandledTransportAction<BulkRequest, Bul
}
}
if (counter.decrementAndGet() == 0) {
executeBulk(bulkRequest, startTime, listener, responses);
try {
executeBulk(bulkRequest, startTime, listener, responses);
} catch (Throwable t) {
listener.onFailure(t);
}
}
}
});

View File

@ -19,13 +19,14 @@
package org.elasticsearch.cluster;
import com.google.common.base.Predicate;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.percolate.PercolateSourceBuilder;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.discovery.Discovery;
import org.elasticsearch.discovery.MasterNotDiscoveredException;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.test.ElasticsearchIntegrationTest;
@ -36,21 +37,24 @@ import org.junit.Test;
import java.util.HashMap;
import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder;
import static org.elasticsearch.test.ElasticsearchIntegrationTest.*;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.elasticsearch.test.ElasticsearchIntegrationTest.Scope;
import static org.hamcrest.Matchers.*;
/**
*/
@ClusterScope(scope= Scope.TEST, numDataNodes =0)
@ClusterScope(scope = Scope.TEST, numDataNodes = 0)
public class NoMasterNodeTests extends ElasticsearchIntegrationTest {
@Test
@TestLogging("action:TRACE,cluster.service:TRACE")
public void testNoMasterActions() throws Exception {
// note, sometimes, we want to check with the fact that an index gets created, sometimes not...
boolean autoCreateIndex = randomBoolean();
logger.info("auto_create_index set to {}", autoCreateIndex);
Settings settings = settingsBuilder()
.put("discovery.type", "zen")
.put("action.auto_create_index", false)
.put("action.auto_create_index", autoCreateIndex)
.put("discovery.zen.minimum_master_nodes", 2)
.put("discovery.zen.ping_timeout", "200ms")
.put("discovery.initial_state_timeout", "500ms")
@ -75,14 +79,14 @@ public class NoMasterNodeTests extends ElasticsearchIntegrationTest {
try {
client().prepareGet("test", "type1", "1").execute().actionGet();
fail("Expected ClusterBlockException");
} catch (ClusterBlockException e) {
} catch (ClusterBlockException | MasterNotDiscoveredException e) {
assertThat(e.status(), equalTo(RestStatus.SERVICE_UNAVAILABLE));
}
try {
client().prepareMultiGet().add("test", "type1", "1").execute().actionGet();
fail("Expected ClusterBlockException");
} catch (ClusterBlockException e) {
} catch (ClusterBlockException | MasterNotDiscoveredException e) {
assertThat(e.status(), equalTo(RestStatus.SERVICE_UNAVAILABLE));
}
@ -93,7 +97,7 @@ public class NoMasterNodeTests extends ElasticsearchIntegrationTest {
.setIndices("test").setDocumentType("type1")
.setSource(percolateSource).execute().actionGet();
fail("Expected ClusterBlockException");
} catch (ClusterBlockException e) {
} catch (ClusterBlockException | MasterNotDiscoveredException e) {
assertThat(e.status(), equalTo(RestStatus.SERVICE_UNAVAILABLE));
}
@ -101,7 +105,7 @@ public class NoMasterNodeTests extends ElasticsearchIntegrationTest {
try {
client().prepareUpdate("test", "type1", "1").setScript("test script", ScriptService.ScriptType.INLINE).setTimeout(timeout).execute().actionGet();
fail("Expected ClusterBlockException");
} catch (ClusterBlockException e) {
} catch (ClusterBlockException | MasterNotDiscoveredException e) {
assertThat(System.currentTimeMillis() - now, greaterThan(timeout.millis() - 50));
assertThat(e.status(), equalTo(RestStatus.SERVICE_UNAVAILABLE));
}
@ -109,14 +113,14 @@ public class NoMasterNodeTests extends ElasticsearchIntegrationTest {
try {
client().admin().indices().prepareAnalyze("test", "this is a test").execute().actionGet();
fail("Expected ClusterBlockException");
} catch (ClusterBlockException e) {
} catch (ClusterBlockException | MasterNotDiscoveredException e) {
assertThat(e.status(), equalTo(RestStatus.SERVICE_UNAVAILABLE));
}
try {
client().prepareCount("test").execute().actionGet();
fail("Expected ClusterBlockException");
} catch (ClusterBlockException e) {
} catch (ClusterBlockException | MasterNotDiscoveredException e) {
assertThat(e.status(), equalTo(RestStatus.SERVICE_UNAVAILABLE));
}
@ -124,11 +128,30 @@ public class NoMasterNodeTests extends ElasticsearchIntegrationTest {
try {
client().prepareIndex("test", "type1", "1").setSource(XContentFactory.jsonBuilder().startObject().endObject()).setTimeout(timeout).execute().actionGet();
fail("Expected ClusterBlockException");
} catch (ClusterBlockException e) {
} catch (ClusterBlockException | MasterNotDiscoveredException e) {
assertThat(System.currentTimeMillis() - now, greaterThan(timeout.millis() - 50));
assertThat(e.status(), equalTo(RestStatus.SERVICE_UNAVAILABLE));
}
now = System.currentTimeMillis();
try {
BulkRequestBuilder bulkRequestBuilder = client().prepareBulk();
bulkRequestBuilder.add(client().prepareIndex("test", "type1", "1").setSource(XContentFactory.jsonBuilder().startObject().endObject()));
bulkRequestBuilder.add(client().prepareIndex("test", "type1", "2").setSource(XContentFactory.jsonBuilder().startObject().endObject()));
bulkRequestBuilder.setTimeout(timeout);
bulkRequestBuilder.get();
fail("Expected ClusterBlockException");
} catch (ClusterBlockException | MasterNotDiscoveredException e) {
if (autoCreateIndex) {
// if its auto create index, the timeout will be based on the create index API
assertThat(System.currentTimeMillis() - now, greaterThan(timeout.millis() - 50));
} else {
// TODO note, today we don't retry on global block for bulk operations-Dtests.seed=80C397728140167
assertThat(System.currentTimeMillis() - now, lessThan(50l));
}
assertThat(e.status(), equalTo(RestStatus.SERVICE_UNAVAILABLE));
}
internalCluster().startNode(settings);
client().admin().cluster().prepareHealth().setWaitForGreenStatus().setWaitForNodes("2").execute().actionGet();
}