From d12ae196afd555ce942bf9476b6935267117ade5 Mon Sep 17 00:00:00 2001 From: markharwood Date: Mon, 20 Oct 2014 12:06:20 +0100 Subject: [PATCH] Bulk indexing: Fix 8125 hanged request when auto create index is off. If a bulk request contains a mix of indexing requests for an existing index and one that needs to be auto-created but a cluster configuration prevents the auto-create of the new index the ingest process hangs. The exception for the failure to create an index was not caught or reported back properly. Added a Junit test to recreate the issue and the associated fix is in TransportBulkAction. Closes #8125 --- .../action/bulk/TransportBulkAction.java | 38 +++++++++---- .../BulkProcessorClusterSettingsTests.java | 54 +++++++++++++++++++ 2 files changed, 81 insertions(+), 11 deletions(-) create mode 100644 src/test/java/org/elasticsearch/action/bulk/BulkProcessorClusterSettingsTests.java diff --git a/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java b/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java index 747d2e79ed6..35ce1b94880 100644 --- a/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java +++ b/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java @@ -53,11 +53,16 @@ import org.elasticsearch.index.Index; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.indices.IndexAlreadyExistsException; import org.elasticsearch.indices.IndexClosedException; +import org.elasticsearch.indices.IndexMissingException; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; -import java.util.*; +import java.util.HashMap; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; /** @@ -117,7 +122,11 @@ public class TransportBulkAction extends HandledTransportAction responses, int idx, + private boolean addFailureIfIndexIsUnavailable(DocumentRequest request, BulkRequest bulkRequest, AtomicArray responses, int idx, final ConcreteIndices concreteIndices, final MetaData metaData) { String concreteIndex = concreteIndices.getConcreteIndex(request.index()); - boolean isClosed = false; + Exception unavailableException = null; if (concreteIndex == null) { try { concreteIndex = concreteIndices.resolveIfAbsent(request.index(), request.indicesOptions()); } catch (IndexClosedException ice) { - isClosed = true; + unavailableException = ice; + } catch (IndexMissingException ime) { + // Fix for issue where bulk request references an index that + // cannot be auto-created see issue #8125 + unavailableException = ime; } } - if (!isClosed) { + if (unavailableException == null) { IndexMetaData indexMetaData = metaData.index(concreteIndex); - isClosed = indexMetaData.getState() == IndexMetaData.State.CLOSE; + if (indexMetaData.getState() == IndexMetaData.State.CLOSE) { + unavailableException = new IndexClosedException(new Index(metaData.index(request.index()).getIndex())); + } } - if (isClosed) { + if (unavailableException != null) { BulkItemResponse.Failure failure = new BulkItemResponse.Failure(request.index(), request.type(), request.id(), - new IndexClosedException(new Index(metaData.index(request.index()).getIndex()))); + unavailableException); BulkItemResponse bulkItemResponse = new BulkItemResponse(idx, "index", failure); responses.set(idx, bulkItemResponse); // make sure the request gets never processed again bulkRequest.requests.set(idx, null); + return true; } - return isClosed; + return false; } diff --git a/src/test/java/org/elasticsearch/action/bulk/BulkProcessorClusterSettingsTests.java b/src/test/java/org/elasticsearch/action/bulk/BulkProcessorClusterSettingsTests.java new file mode 100644 index 00000000000..6b9379fe052 --- /dev/null +++ b/src/test/java/org/elasticsearch/action/bulk/BulkProcessorClusterSettingsTests.java @@ -0,0 +1,54 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.action.bulk; + +import org.elasticsearch.common.settings.ImmutableSettings; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.test.ElasticsearchIntegrationTest; +import org.elasticsearch.test.ElasticsearchIntegrationTest.ClusterScope; +import org.elasticsearch.test.ElasticsearchIntegrationTest.Scope; +import org.junit.Test; + +@ClusterScope(scope = Scope.TEST, numDataNodes = 0) +public class BulkProcessorClusterSettingsTests extends ElasticsearchIntegrationTest { + + @Test + public void testBulkProcessorAutoCreateRestrictions() throws Exception { + // See issue #8125 + Settings settings = ImmutableSettings.settingsBuilder().put("action.auto_create_index", false).build(); + + internalCluster().startNode(settings); + + createIndex("willwork"); + client().admin().cluster().prepareHealth("willwork").setWaitForGreenStatus().execute().actionGet(); + + BulkRequestBuilder bulkRequestBuilder = client().prepareBulk(); + bulkRequestBuilder.add(client().prepareIndex("willwork", "type1", "1").setSource("{\"foo\":1}")); + bulkRequestBuilder.add(client().prepareIndex("wontwork", "type1", "2").setSource("{\"foo\":2}")); + bulkRequestBuilder.add(client().prepareIndex("willwork", "type1", "3").setSource("{\"foo\":3}")); + BulkResponse br = bulkRequestBuilder.get(); + BulkItemResponse[] responses = br.getItems(); + assertEquals(3, responses.length); + assertFalse("Operation on existing index should succeed", responses[0].isFailed()); + assertTrue("Missing index should have been flagged", responses[1].isFailed()); + assertEquals("IndexMissingException[[wontwork] missing]", responses[1].getFailureMessage()); + assertFalse("Operation on existing index should succeed", responses[2].isFailed()); + } +}