From b40c1deebe410eb5279579030493e5c63c02075d Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Fri, 16 Oct 2015 15:36:55 +0200 Subject: [PATCH 1/3] Remove MetaDataSerivce and it's semaphores MetaDataSerivce tried to protect concurrent index creation/deletion from resulting in inconsistent indices. This was originally added a long time ago via #1296 which seems to be caused by several problems that we fixed already in 2.0 or even in late 1.x version. Indices where recreated without being deleted and shards where deleted while being used which is now prevented on several levels. We can safely remove the semaphores since we are already serializing the events on the cluster state threads. This commit also fixes some expception handling bugs exposed by the added test --- .../put/TransportPutMappingAction.java | 40 ++++++---- .../action/index/IndexRequest.java | 21 +++-- .../elasticsearch/cluster/ClusterModule.java | 2 - .../metadata/MetaDataCreateIndexService.java | 57 +------------ .../metadata/MetaDataDeleteIndexService.java | 44 +---------- .../cluster/metadata/MetaDataService.java | 48 ----------- .../admin/indices/create/CreateIndexIT.java | 79 +++++++++++++++++-- .../MetaDataIndexTemplateServiceTests.java | 3 - 8 files changed, 118 insertions(+), 176 deletions(-) delete mode 100644 core/src/main/java/org/elasticsearch/cluster/metadata/MetaDataService.java diff --git a/core/src/main/java/org/elasticsearch/action/admin/indices/mapping/put/TransportPutMappingAction.java b/core/src/main/java/org/elasticsearch/action/admin/indices/mapping/put/TransportPutMappingAction.java index 2d2df8e4cf0..4e4b8c539e5 100644 --- a/core/src/main/java/org/elasticsearch/action/admin/indices/mapping/put/TransportPutMappingAction.java +++ b/core/src/main/java/org/elasticsearch/action/admin/indices/mapping/put/TransportPutMappingAction.java @@ -31,6 +31,7 @@ import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.metadata.MetaDataMappingService; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; @@ -67,25 +68,30 @@ public class TransportPutMappingAction extends TransportMasterNodeAction listener) { - final String[] concreteIndices = indexNameExpressionResolver.concreteIndices(state, request); - PutMappingClusterStateUpdateRequest updateRequest = new PutMappingClusterStateUpdateRequest() - .ackTimeout(request.timeout()).masterNodeTimeout(request.masterNodeTimeout()) - .indices(concreteIndices).type(request.type()) - .updateAllTypes(request.updateAllTypes()) - .source(request.source()); + try { + final String[] concreteIndices = indexNameExpressionResolver.concreteIndices(state, request); + PutMappingClusterStateUpdateRequest updateRequest = new PutMappingClusterStateUpdateRequest() + .ackTimeout(request.timeout()).masterNodeTimeout(request.masterNodeTimeout()) + .indices(concreteIndices).type(request.type()) + .updateAllTypes(request.updateAllTypes()) + .source(request.source()); - metaDataMappingService.putMapping(updateRequest, new ActionListener() { + metaDataMappingService.putMapping(updateRequest, new ActionListener() { - @Override - public void onResponse(ClusterStateUpdateResponse response) { - listener.onResponse(new PutMappingResponse(response.isAcknowledged())); - } + @Override + public void onResponse(ClusterStateUpdateResponse response) { + listener.onResponse(new PutMappingResponse(response.isAcknowledged())); + } - @Override - public void onFailure(Throwable t) { - logger.debug("failed to put mappings on indices [{}], type [{}]", t, concreteIndices, request.type()); - listener.onFailure(t); - } - }); + @Override + public void onFailure(Throwable t) { + logger.debug("failed to put mappings on indices [{}], type [{}]", t, concreteIndices, request.type()); + listener.onFailure(t); + } + }); + } catch (IndexNotFoundException ex) { + logger.debug("failed to put mappings on indices [{}], type [{}]", ex, request.indices(), request.type()); + listener.onFailure(ex); + } } } diff --git a/core/src/main/java/org/elasticsearch/action/index/IndexRequest.java b/core/src/main/java/org/elasticsearch/action/index/IndexRequest.java index 5d8915c6112..88ade451404 100644 --- a/core/src/main/java/org/elasticsearch/action/index/IndexRequest.java +++ b/core/src/main/java/org/elasticsearch/action/index/IndexRequest.java @@ -25,6 +25,7 @@ import org.elasticsearch.Version; import org.elasticsearch.action.*; import org.elasticsearch.action.support.replication.ReplicationRequest; import org.elasticsearch.client.Requests; +import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MappingMetaData; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.common.Nullable; @@ -35,6 +36,7 @@ import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.lucene.uid.Versions; import org.elasticsearch.common.xcontent.*; +import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.index.VersionType; import org.elasticsearch.index.mapper.MapperParsingException; import org.elasticsearch.index.mapper.internal.TimestampFieldMapper; @@ -561,15 +563,24 @@ public class IndexRequest extends ReplicationRequest implements Do return this.versionType; } + private Version getVersion(MetaData metaData, String concreteIndex) { + // this can go away in 3.0 but is here now for easy backporting - since in 2.x we need the version on the timestamp stuff + final IndexMetaData indexMetaData = metaData.getIndices().get(concreteIndex); + if (indexMetaData == null) { + throw new IndexNotFoundException(concreteIndex); + } + return Version.indexCreated(indexMetaData.getSettings()); + } + public void process(MetaData metaData, @Nullable MappingMetaData mappingMd, boolean allowIdGeneration, String concreteIndex) { // resolve the routing if needed routing(metaData.resolveIndexRouting(routing, index)); + // resolve timestamp if provided externally if (timestamp != null) { - Version version = Version.indexCreated(metaData.getIndices().get(concreteIndex).getSettings()); timestamp = MappingMetaData.Timestamp.parseStringTimestamp(timestamp, mappingMd != null ? mappingMd.timestamp().dateTimeFormatter() : TimestampFieldMapper.Defaults.DATE_TIME_FORMATTER, - version); + getVersion(metaData, concreteIndex)); } // extract values if needed if (mappingMd != null) { @@ -592,8 +603,7 @@ public class IndexRequest extends ReplicationRequest implements Do if (parseContext.shouldParseTimestamp()) { timestamp = parseContext.timestamp(); if (timestamp != null) { - Version version = Version.indexCreated(metaData.getIndices().get(concreteIndex).getSettings()); - timestamp = MappingMetaData.Timestamp.parseStringTimestamp(timestamp, mappingMd.timestamp().dateTimeFormatter(), version); + timestamp = MappingMetaData.Timestamp.parseStringTimestamp(timestamp, mappingMd.timestamp().dateTimeFormatter(), getVersion(metaData, concreteIndex)); } } } catch (MapperParsingException e) { @@ -642,8 +652,7 @@ public class IndexRequest extends ReplicationRequest implements Do if (defaultTimestamp.equals(TimestampFieldMapper.Defaults.DEFAULT_TIMESTAMP)) { timestamp = Long.toString(System.currentTimeMillis()); } else { - Version version = Version.indexCreated(metaData.getIndices().get(concreteIndex).getSettings()); - timestamp = MappingMetaData.Timestamp.parseStringTimestamp(defaultTimestamp, mappingMd.timestamp().dateTimeFormatter(), version); + timestamp = MappingMetaData.Timestamp.parseStringTimestamp(defaultTimestamp, mappingMd.timestamp().dateTimeFormatter(), getVersion(metaData, concreteIndex)); } } } diff --git a/core/src/main/java/org/elasticsearch/cluster/ClusterModule.java b/core/src/main/java/org/elasticsearch/cluster/ClusterModule.java index 581bee10369..640a21cd22d 100644 --- a/core/src/main/java/org/elasticsearch/cluster/ClusterModule.java +++ b/core/src/main/java/org/elasticsearch/cluster/ClusterModule.java @@ -34,7 +34,6 @@ import org.elasticsearch.cluster.metadata.MetaDataIndexAliasesService; import org.elasticsearch.cluster.metadata.MetaDataIndexStateService; import org.elasticsearch.cluster.metadata.MetaDataIndexTemplateService; import org.elasticsearch.cluster.metadata.MetaDataMappingService; -import org.elasticsearch.cluster.metadata.MetaDataService; import org.elasticsearch.cluster.metadata.MetaDataUpdateSettingsService; import org.elasticsearch.cluster.node.DiscoveryNodeService; import org.elasticsearch.cluster.routing.OperationRouting; @@ -309,7 +308,6 @@ public class ClusterModule extends AbstractModule { bind(DiscoveryNodeService.class).asEagerSingleton(); bind(ClusterService.class).to(InternalClusterService.class).asEagerSingleton(); bind(OperationRouting.class).asEagerSingleton(); - bind(MetaDataService.class).asEagerSingleton(); bind(MetaDataCreateIndexService.class).asEagerSingleton(); bind(MetaDataDeleteIndexService.class).asEagerSingleton(); bind(MetaDataIndexStateService.class).asEagerSingleton(); diff --git a/core/src/main/java/org/elasticsearch/cluster/metadata/MetaDataCreateIndexService.java b/core/src/main/java/org/elasticsearch/cluster/metadata/MetaDataCreateIndexService.java index a34d6a36d3a..629e3c07427 100644 --- a/core/src/main/java/org/elasticsearch/cluster/metadata/MetaDataCreateIndexService.java +++ b/core/src/main/java/org/elasticsearch/cluster/metadata/MetaDataCreateIndexService.java @@ -106,32 +106,25 @@ public class MetaDataCreateIndexService extends AbstractComponent { public final static int MAX_INDEX_NAME_BYTES = 255; private static final DefaultIndexTemplateFilter DEFAULT_INDEX_TEMPLATE_FILTER = new DefaultIndexTemplateFilter(); - private final ThreadPool threadPool; private final ClusterService clusterService; private final IndicesService indicesService; private final AllocationService allocationService; - private final MetaDataService metaDataService; private final Version version; private final AliasValidator aliasValidator; private final IndexTemplateFilter indexTemplateFilter; - private final NodeEnvironment nodeEnv; private final Environment env; @Inject - public MetaDataCreateIndexService(Settings settings, ThreadPool threadPool, ClusterService clusterService, - IndicesService indicesService, AllocationService allocationService, MetaDataService metaDataService, + public MetaDataCreateIndexService(Settings settings, ClusterService clusterService, + IndicesService indicesService, AllocationService allocationService, Version version, AliasValidator aliasValidator, - Set indexTemplateFilters, Environment env, - NodeEnvironment nodeEnv) { + Set indexTemplateFilters, Environment env) { super(settings); - this.threadPool = threadPool; this.clusterService = clusterService; this.indicesService = indicesService; this.allocationService = allocationService; - this.metaDataService = metaDataService; this.version = version; this.aliasValidator = aliasValidator; - this.nodeEnv = nodeEnv; this.env = env; if (indexTemplateFilters.isEmpty()) { @@ -147,29 +140,6 @@ public class MetaDataCreateIndexService extends AbstractComponent { } } - public void createIndex(final CreateIndexClusterStateUpdateRequest request, final ActionListener listener) { - - // we lock here, and not within the cluster service callback since we don't want to - // block the whole cluster state handling - final Semaphore mdLock = metaDataService.indexMetaDataLock(request.index()); - - // quick check to see if we can acquire a lock, otherwise spawn to a thread pool - if (mdLock.tryAcquire()) { - createIndex(request, listener, mdLock); - return; - } - threadPool.executor(ThreadPool.Names.MANAGEMENT).execute(new ActionRunnable(listener) { - @Override - public void doRun() throws InterruptedException { - if (!mdLock.tryAcquire(request.masterNodeTimeout().nanos(), TimeUnit.NANOSECONDS)) { - listener.onFailure(new ProcessClusterEventTimeoutException(request.masterNodeTimeout(), "acquire index lock")); - return; - } - createIndex(request, listener, mdLock); - } - }); - } - public void validateIndexName(String index, ClusterState state) { if (state.routingTable().hasIndex(index)) { throw new IndexAlreadyExistsException(new Index(index)); @@ -209,8 +179,7 @@ public class MetaDataCreateIndexService extends AbstractComponent { } } - private void createIndex(final CreateIndexClusterStateUpdateRequest request, final ActionListener listener, final Semaphore mdLock) { - + public void createIndex(final CreateIndexClusterStateUpdateRequest request, final ActionListener listener) { Settings.Builder updatedSettingsBuilder = Settings.settingsBuilder(); updatedSettingsBuilder.put(request.settings()).normalizePrefix(IndexMetaData.INDEX_SETTING_PREFIX); request.settings(updatedSettingsBuilder.build()); @@ -222,24 +191,6 @@ public class MetaDataCreateIndexService extends AbstractComponent { return new ClusterStateUpdateResponse(acknowledged); } - @Override - public void onAllNodesAcked(@Nullable Throwable t) { - mdLock.release(); - super.onAllNodesAcked(t); - } - - @Override - public void onAckTimeout() { - mdLock.release(); - super.onAckTimeout(); - } - - @Override - public void onFailure(String source, Throwable t) { - mdLock.release(); - super.onFailure(source, t); - } - @Override public ClusterState execute(ClusterState currentState) throws Exception { boolean indexCreated = false; diff --git a/core/src/main/java/org/elasticsearch/cluster/metadata/MetaDataDeleteIndexService.java b/core/src/main/java/org/elasticsearch/cluster/metadata/MetaDataDeleteIndexService.java index 88e1aad5614..cab86b60d44 100644 --- a/core/src/main/java/org/elasticsearch/cluster/metadata/MetaDataDeleteIndexService.java +++ b/core/src/main/java/org/elasticsearch/cluster/metadata/MetaDataDeleteIndexService.java @@ -56,50 +56,18 @@ public class MetaDataDeleteIndexService extends AbstractComponent { private final NodeIndexDeletedAction nodeIndexDeletedAction; - private final MetaDataService metaDataService; - @Inject public MetaDataDeleteIndexService(Settings settings, ThreadPool threadPool, ClusterService clusterService, AllocationService allocationService, - NodeIndexDeletedAction nodeIndexDeletedAction, MetaDataService metaDataService) { + NodeIndexDeletedAction nodeIndexDeletedAction) { super(settings); this.threadPool = threadPool; this.clusterService = clusterService; this.allocationService = allocationService; this.nodeIndexDeletedAction = nodeIndexDeletedAction; - this.metaDataService = metaDataService; } public void deleteIndex(final Request request, final Listener userListener) { - // we lock here, and not within the cluster service callback since we don't want to - // block the whole cluster state handling - final Semaphore mdLock = metaDataService.indexMetaDataLock(request.index); - - // quick check to see if we can acquire a lock, otherwise spawn to a thread pool - if (mdLock.tryAcquire()) { - deleteIndex(request, userListener, mdLock); - return; - } - - threadPool.executor(ThreadPool.Names.MANAGEMENT).execute(new Runnable() { - @Override - public void run() { - try { - if (!mdLock.tryAcquire(request.masterTimeout.nanos(), TimeUnit.NANOSECONDS)) { - userListener.onFailure(new ProcessClusterEventTimeoutException(request.masterTimeout, "acquire index lock")); - return; - } - } catch (InterruptedException e) { - userListener.onFailure(e); - return; - } - - deleteIndex(request, userListener, mdLock); - } - }); - } - - private void deleteIndex(final Request request, final Listener userListener, Semaphore mdLock) { - final DeleteIndexListener listener = new DeleteIndexListener(mdLock, userListener); + final DeleteIndexListener listener = new DeleteIndexListener(userListener); clusterService.submitStateUpdateTask("delete-index [" + request.index + "]", Priority.URGENT, new ClusterStateUpdateTask() { @Override @@ -181,19 +149,16 @@ public class MetaDataDeleteIndexService extends AbstractComponent { class DeleteIndexListener implements Listener { private final AtomicBoolean notified = new AtomicBoolean(); - private final Semaphore mdLock; private final Listener listener; volatile ScheduledFuture future; - private DeleteIndexListener(Semaphore mdLock, Listener listener) { - this.mdLock = mdLock; + private DeleteIndexListener(Listener listener) { this.listener = listener; } @Override public void onResponse(final Response response) { if (notified.compareAndSet(false, true)) { - mdLock.release(); FutureUtils.cancel(future); listener.onResponse(response); } @@ -202,7 +167,6 @@ public class MetaDataDeleteIndexService extends AbstractComponent { @Override public void onFailure(Throwable t) { if (notified.compareAndSet(false, true)) { - mdLock.release(); FutureUtils.cancel(future); listener.onFailure(t); } @@ -210,7 +174,7 @@ public class MetaDataDeleteIndexService extends AbstractComponent { } - public static interface Listener { + public interface Listener { void onResponse(Response response); diff --git a/core/src/main/java/org/elasticsearch/cluster/metadata/MetaDataService.java b/core/src/main/java/org/elasticsearch/cluster/metadata/MetaDataService.java deleted file mode 100644 index ca482ea604f..00000000000 --- a/core/src/main/java/org/elasticsearch/cluster/metadata/MetaDataService.java +++ /dev/null @@ -1,48 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.cluster.metadata; - -import org.elasticsearch.cluster.routing.Murmur3HashFunction; -import org.elasticsearch.common.component.AbstractComponent; -import org.elasticsearch.common.inject.Inject; -import org.elasticsearch.common.math.MathUtils; -import org.elasticsearch.common.settings.Settings; - -import java.util.concurrent.Semaphore; - -/** - */ -public class MetaDataService extends AbstractComponent { - - private final Semaphore[] indexMdLocks; - - @Inject - public MetaDataService(Settings settings) { - super(settings); - indexMdLocks = new Semaphore[500]; - for (int i = 0; i < indexMdLocks.length; i++) { - indexMdLocks[i] = new Semaphore(1); - } - } - - public Semaphore indexMetaDataLock(String index) { - return indexMdLocks[MathUtils.mod(Murmur3HashFunction.hash(index), indexMdLocks.length)]; - } -} diff --git a/core/src/test/java/org/elasticsearch/action/admin/indices/create/CreateIndexIT.java b/core/src/test/java/org/elasticsearch/action/admin/indices/create/CreateIndexIT.java index 7418cefa56e..4886bece909 100644 --- a/core/src/test/java/org/elasticsearch/action/admin/indices/create/CreateIndexIT.java +++ b/core/src/test/java/org/elasticsearch/action/admin/indices/create/CreateIndexIT.java @@ -19,18 +19,28 @@ package org.elasticsearch.action.admin.indices.create; +import org.elasticsearch.ExceptionsHelper; +import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse; +import org.elasticsearch.action.admin.indices.delete.DeleteIndexResponse; +import org.elasticsearch.action.index.IndexResponse; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.common.collect.ImmutableOpenMap; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.index.IndexNotFoundException; +import org.elasticsearch.index.query.RangeQueryBuilder; import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.ESIntegTestCase.ClusterScope; import org.elasticsearch.test.ESIntegTestCase.Scope; import org.junit.Test; import java.util.HashMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicInteger; import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; @@ -107,8 +117,8 @@ public class CreateIndexIT extends ESIntegTestCase { public void testInvalidShardCountSettings() throws Exception { try { prepareCreate("test").setSettings(Settings.builder() - .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, randomIntBetween(-10, 0)) - .build()) + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, randomIntBetween(-10, 0)) + .build()) .get(); fail("should have thrown an exception about the primary shard count"); } catch (IllegalArgumentException e) { @@ -118,8 +128,8 @@ public class CreateIndexIT extends ESIntegTestCase { try { prepareCreate("test").setSettings(Settings.builder() - .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, randomIntBetween(-10, -1)) - .build()) + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, randomIntBetween(-10, -1)) + .build()) .get(); fail("should have thrown an exception about the replica shard count"); } catch (IllegalArgumentException e) { @@ -129,9 +139,9 @@ public class CreateIndexIT extends ESIntegTestCase { try { prepareCreate("test").setSettings(Settings.builder() - .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, randomIntBetween(-10, 0)) - .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, randomIntBetween(-10, -1)) - .build()) + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, randomIntBetween(-10, 0)) + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, randomIntBetween(-10, -1)) + .build()) .get(); fail("should have thrown an exception about the shard count"); } catch (IllegalArgumentException e) { @@ -196,4 +206,59 @@ public class CreateIndexIT extends ESIntegTestCase { } } + public void testCreateAndDeleteIndexConcurrently() throws InterruptedException { + createIndex("test"); + final AtomicInteger indexDeleteAction = new AtomicInteger(0); + final CountDownLatch latch = new CountDownLatch(1); + int numDocs = randomIntBetween(1, 10); + for (int i = 0; i < numDocs; i++) { + client().prepareIndex("test", "test").setSource("index_version", indexDeleteAction.get()).get(); + } + synchronized (indexDeleteAction) { // not necessarily needed here + indexDeleteAction.incrementAndGet(); + } + client().admin().indices().prepareDelete("test").execute(new ActionListener() { // this happens async!!! + @Override + public void onResponse(DeleteIndexResponse deleteIndexResponse) { + Thread thread = new Thread() { + public void run() { + try { + client().prepareIndex("test", "test").setSource("index_version", indexDeleteAction.get()).get(); // recreate that index + synchronized (indexDeleteAction) { + indexDeleteAction.incrementAndGet(); + } + client().admin().indices().prepareDelete("test").get(); // from here on all docs with index_version == 0|1 must be gone!!!! only 2 are ok; + } finally { + latch.countDown(); + } + } + }; + thread.start(); + } + + @Override + public void onFailure(Throwable e) { + ExceptionsHelper.reThrowIfNotNull(e); + } + } + ); + numDocs = randomIntBetween(100, 200); + for (int i = 0; i < numDocs; i++) { + try { + synchronized (indexDeleteAction) { + client().prepareIndex("test", "test").setSource("index_version", indexDeleteAction.get()).get(); + } + } catch (IndexNotFoundException inf) { + // fine + } + } + latch.await(); + refresh(); + SearchResponse expected = client().prepareSearch("test").setIndicesOptions(IndicesOptions.lenientExpandOpen()).setQuery(new RangeQueryBuilder("index_version").from(indexDeleteAction.get(), true)).get(); + SearchResponse all = client().prepareSearch("test").setIndicesOptions(IndicesOptions.lenientExpandOpen()).get(); + assertEquals(expected + " vs. " + all, expected.getHits().getTotalHits(), all.getHits().getTotalHits() + ); + logger.info("total: {}", expected.getHits().getTotalHits()); + } + } diff --git a/core/src/test/java/org/elasticsearch/action/admin/indices/template/put/MetaDataIndexTemplateServiceTests.java b/core/src/test/java/org/elasticsearch/action/admin/indices/template/put/MetaDataIndexTemplateServiceTests.java index 86a9bbc1f3f..9c9802dbb5f 100644 --- a/core/src/test/java/org/elasticsearch/action/admin/indices/template/put/MetaDataIndexTemplateServiceTests.java +++ b/core/src/test/java/org/elasticsearch/action/admin/indices/template/put/MetaDataIndexTemplateServiceTests.java @@ -77,12 +77,9 @@ public class MetaDataIndexTemplateServiceTests extends ESTestCase { null, null, null, - null, - null, Version.CURRENT, null, new HashSet<>(), - null, null ); MetaDataIndexTemplateService service = new MetaDataIndexTemplateService(Settings.EMPTY, null, createIndexService, null); From 9487a8e1fd8d330275260ff885688f3ffe11902e Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Fri, 16 Oct 2015 20:32:57 +0200 Subject: [PATCH 2/3] apply review comments --- .../admin/indices/mapping/put/TransportPutMappingAction.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/java/org/elasticsearch/action/admin/indices/mapping/put/TransportPutMappingAction.java b/core/src/main/java/org/elasticsearch/action/admin/indices/mapping/put/TransportPutMappingAction.java index 4e4b8c539e5..b82c5d3a626 100644 --- a/core/src/main/java/org/elasticsearch/action/admin/indices/mapping/put/TransportPutMappingAction.java +++ b/core/src/main/java/org/elasticsearch/action/admin/indices/mapping/put/TransportPutMappingAction.java @@ -91,7 +91,7 @@ public class TransportPutMappingAction extends TransportMasterNodeAction Date: Mon, 19 Oct 2015 13:32:32 +0200 Subject: [PATCH 3/3] add several code comments and apply review comments --- .../admin/indices/create/CreateIndexIT.java | 32 +++++++++++-------- 1 file changed, 18 insertions(+), 14 deletions(-) diff --git a/core/src/test/java/org/elasticsearch/action/admin/indices/create/CreateIndexIT.java b/core/src/test/java/org/elasticsearch/action/admin/indices/create/CreateIndexIT.java index 4886bece909..cbdb8644b5d 100644 --- a/core/src/test/java/org/elasticsearch/action/admin/indices/create/CreateIndexIT.java +++ b/core/src/test/java/org/elasticsearch/action/admin/indices/create/CreateIndexIT.java @@ -208,14 +208,15 @@ public class CreateIndexIT extends ESIntegTestCase { public void testCreateAndDeleteIndexConcurrently() throws InterruptedException { createIndex("test"); - final AtomicInteger indexDeleteAction = new AtomicInteger(0); + final AtomicInteger indexVersion = new AtomicInteger(0); + final Object indexVersionLock = new Object(); final CountDownLatch latch = new CountDownLatch(1); int numDocs = randomIntBetween(1, 10); for (int i = 0; i < numDocs; i++) { - client().prepareIndex("test", "test").setSource("index_version", indexDeleteAction.get()).get(); + client().prepareIndex("test", "test").setSource("index_version", indexVersion.get()).get(); } - synchronized (indexDeleteAction) { // not necessarily needed here - indexDeleteAction.incrementAndGet(); + synchronized (indexVersionLock) { // not necessarily needed here but for completeness we lock here too + indexVersion.incrementAndGet(); } client().admin().indices().prepareDelete("test").execute(new ActionListener() { // this happens async!!! @Override @@ -223,11 +224,13 @@ public class CreateIndexIT extends ESIntegTestCase { Thread thread = new Thread() { public void run() { try { - client().prepareIndex("test", "test").setSource("index_version", indexDeleteAction.get()).get(); // recreate that index - synchronized (indexDeleteAction) { - indexDeleteAction.incrementAndGet(); + client().prepareIndex("test", "test").setSource("index_version", indexVersion.get()).get(); // recreate that index + synchronized (indexVersionLock) { + // we sync here since we have to ensure that all indexing operations below for a given ID are done before we increment the + // index version otherwise a doc that is in-flight could make it into an index that it was supposed to be deleted for and our assertion fail... + indexVersion.incrementAndGet(); } - client().admin().indices().prepareDelete("test").get(); // from here on all docs with index_version == 0|1 must be gone!!!! only 2 are ok; + assertAcked(client().admin().indices().prepareDelete("test").get()); // from here on all docs with index_version == 0|1 must be gone!!!! only 2 are ok; } finally { latch.countDown(); } @@ -238,15 +241,15 @@ public class CreateIndexIT extends ESIntegTestCase { @Override public void onFailure(Throwable e) { - ExceptionsHelper.reThrowIfNotNull(e); + throw new RuntimeException(e); } } ); numDocs = randomIntBetween(100, 200); for (int i = 0; i < numDocs; i++) { try { - synchronized (indexDeleteAction) { - client().prepareIndex("test", "test").setSource("index_version", indexDeleteAction.get()).get(); + synchronized (indexVersionLock) { + client().prepareIndex("test", "test").setSource("index_version", indexVersion.get()).get(); } } catch (IndexNotFoundException inf) { // fine @@ -254,10 +257,11 @@ public class CreateIndexIT extends ESIntegTestCase { } latch.await(); refresh(); - SearchResponse expected = client().prepareSearch("test").setIndicesOptions(IndicesOptions.lenientExpandOpen()).setQuery(new RangeQueryBuilder("index_version").from(indexDeleteAction.get(), true)).get(); + + // we only really assert that we never reuse segments of old indices or anything like this here and that nothing fails with crazy exceptions + SearchResponse expected = client().prepareSearch("test").setIndicesOptions(IndicesOptions.lenientExpandOpen()).setQuery(new RangeQueryBuilder("index_version").from(indexVersion.get(), true)).get(); SearchResponse all = client().prepareSearch("test").setIndicesOptions(IndicesOptions.lenientExpandOpen()).get(); - assertEquals(expected + " vs. " + all, expected.getHits().getTotalHits(), all.getHits().getTotalHits() - ); + assertEquals(expected + " vs. " + all, expected.getHits().getTotalHits(), all.getHits().getTotalHits()); logger.info("total: {}", expected.getHits().getTotalHits()); }