From 8facdb2e3dc49935a4c4b93967acd33882b113d3 Mon Sep 17 00:00:00 2001 From: Shay Banon Date: Fri, 2 Sep 2011 09:36:25 +0300 Subject: [PATCH] Rapidly concurrent deleting/creating an index leaves index inconsistent, closes #1296. --- .../elasticsearch/cluster/ClusterModule.java | 2 + .../metadata/MetaDataCreateIndexService.java | 26 ++++++-- .../metadata/MetaDataDeleteIndexService.java | 26 ++++++-- .../cluster/metadata/MetaDataService.java | 61 +++++++++++++++++++ 4 files changed, 107 insertions(+), 8 deletions(-) create mode 100644 modules/elasticsearch/src/main/java/org/elasticsearch/cluster/metadata/MetaDataService.java diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/ClusterModule.java b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/ClusterModule.java index 5779cacfdc4..79c1bea0667 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/ClusterModule.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/ClusterModule.java @@ -31,6 +31,7 @@ import org.elasticsearch.cluster.metadata.MetaDataDeleteIndexService; import org.elasticsearch.cluster.metadata.MetaDataIndexAliasesService; import org.elasticsearch.cluster.metadata.MetaDataIndexTemplateService; import org.elasticsearch.cluster.metadata.MetaDataMappingService; +import org.elasticsearch.cluster.metadata.MetaDataService; import org.elasticsearch.cluster.metadata.MetaDataStateIndexService; import org.elasticsearch.cluster.metadata.MetaDataUpdateSettingsService; import org.elasticsearch.cluster.routing.RoutingService; @@ -62,6 +63,7 @@ public class ClusterModule extends AbstractModule implements SpawnModules { protected void configure() { bind(ClusterService.class).to(InternalClusterService.class).asEagerSingleton(); + bind(MetaDataService.class).asEagerSingleton(); bind(MetaDataCreateIndexService.class).asEagerSingleton(); bind(MetaDataDeleteIndexService.class).asEagerSingleton(); bind(MetaDataStateIndexService.class).asEagerSingleton(); diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/metadata/MetaDataCreateIndexService.java b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/metadata/MetaDataCreateIndexService.java index d6d99b80aea..511e2f3d156 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/metadata/MetaDataCreateIndexService.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/metadata/MetaDataCreateIndexService.java @@ -92,10 +92,12 @@ public class MetaDataCreateIndexService extends AbstractComponent { private final NodeIndexCreatedAction nodeIndexCreatedAction; + private final MetaDataService metaDataService; + private final String riverIndexName; @Inject public MetaDataCreateIndexService(Settings settings, Environment environment, ThreadPool threadPool, ClusterService clusterService, IndicesService indicesService, - ShardsAllocation shardsAllocation, NodeIndexCreatedAction nodeIndexCreatedAction, @RiverIndexName String riverIndexName) { + ShardsAllocation shardsAllocation, NodeIndexCreatedAction nodeIndexCreatedAction, MetaDataService metaDataService, @RiverIndexName String riverIndexName) { super(settings); this.environment = environment; this.threadPool = threadPool; @@ -103,6 +105,7 @@ public class MetaDataCreateIndexService extends AbstractComponent { this.indicesService = indicesService; this.shardsAllocation = shardsAllocation; this.nodeIndexCreatedAction = nodeIndexCreatedAction; + this.metaDataService = metaDataService; this.riverIndexName = riverIndexName; } @@ -116,8 +119,18 @@ public class MetaDataCreateIndexService extends AbstractComponent { } } request.settings(updatedSettingsBuilder.build()); - final CreateIndexListener listener = new CreateIndexListener(request, userListener); + // we lock here, and not within the cluster service callback since we don't want to + // block the whole cluster state handling + MetaDataService.MdLock mdLock = metaDataService.indexMetaDataLock(request.index); + try { + mdLock.lock(); + } catch (InterruptedException e) { + userListener.onFailure(e); + return; + } + + final CreateIndexListener listener = new CreateIndexListener(mdLock, request, userListener); clusterService.submitStateUpdateTask("create-index [" + request.index + "], cause [" + request.cause + "]", new ProcessedClusterStateUpdateTask() { @Override public ClusterState execute(ClusterState currentState) { @@ -309,7 +322,9 @@ public class MetaDataCreateIndexService extends AbstractComponent { class CreateIndexListener implements Listener { - private AtomicBoolean notified = new AtomicBoolean(); + private final AtomicBoolean notified = new AtomicBoolean(); + + private final MetaDataService.MdLock mdLock; private final Request request; @@ -317,13 +332,15 @@ public class MetaDataCreateIndexService extends AbstractComponent { volatile ScheduledFuture future; - private CreateIndexListener(Request request, Listener listener) { + private CreateIndexListener(MetaDataService.MdLock mdLock, Request request, Listener listener) { + this.mdLock = mdLock; this.request = request; this.listener = listener; } @Override public void onResponse(final Response response) { if (notified.compareAndSet(false, true)) { + mdLock.unlock(); if (future != null) { future.cancel(false); } @@ -333,6 +350,7 @@ public class MetaDataCreateIndexService extends AbstractComponent { @Override public void onFailure(Throwable t) { if (notified.compareAndSet(false, true)) { + mdLock.unlock(); if (future != null) { future.cancel(false); } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/metadata/MetaDataDeleteIndexService.java b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/metadata/MetaDataDeleteIndexService.java index 698bdfd559f..493232ed380 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/metadata/MetaDataDeleteIndexService.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/metadata/MetaDataDeleteIndexService.java @@ -55,19 +55,32 @@ public class MetaDataDeleteIndexService extends AbstractComponent { private final NodeIndexDeletedAction nodeIndexDeletedAction; + private final MetaDataService metaDataService; + @Inject public MetaDataDeleteIndexService(Settings settings, ThreadPool threadPool, ClusterService clusterService, ShardsAllocation shardsAllocation, - NodeIndexDeletedAction nodeIndexDeletedAction) { + NodeIndexDeletedAction nodeIndexDeletedAction, MetaDataService metaDataService) { super(settings); this.threadPool = threadPool; this.clusterService = clusterService; this.shardsAllocation = shardsAllocation; this.nodeIndexDeletedAction = nodeIndexDeletedAction; + this.metaDataService = metaDataService; } public void deleteIndex(final Request request, final Listener userListener) { + // we lock here, and not within the cluster service callback since we don't want to + // block the whole cluster state handling + MetaDataService.MdLock mdLock = metaDataService.indexMetaDataLock(request.index); + try { + mdLock.lock(); + } catch (InterruptedException e) { + userListener.onFailure(e); + return; + } + + final DeleteIndexListener listener = new DeleteIndexListener(mdLock, request, userListener); clusterService.submitStateUpdateTask("delete-index [" + request.index + "]", new ClusterStateUpdateTask() { @Override public ClusterState execute(ClusterState currentState) { - final DeleteIndexListener listener = new DeleteIndexListener(request, userListener); try { if (!currentState.metaData().hasConcreteIndex(request.index)) { listener.onFailure(new IndexMissingException(new Index(request.index))); @@ -121,7 +134,9 @@ public class MetaDataDeleteIndexService extends AbstractComponent { class DeleteIndexListener implements Listener { - private AtomicBoolean notified = new AtomicBoolean(); + private final AtomicBoolean notified = new AtomicBoolean(); + + private final MetaDataService.MdLock mdLock; private final Request request; @@ -129,13 +144,15 @@ public class MetaDataDeleteIndexService extends AbstractComponent { volatile ScheduledFuture future; - private DeleteIndexListener(Request request, Listener listener) { + private DeleteIndexListener(MetaDataService.MdLock mdLock, Request request, Listener listener) { + this.mdLock = mdLock; this.request = request; this.listener = listener; } @Override public void onResponse(final Response response) { if (notified.compareAndSet(false, true)) { + mdLock.unlock(); if (future != null) { future.cancel(false); } @@ -145,6 +162,7 @@ public class MetaDataDeleteIndexService extends AbstractComponent { @Override public void onFailure(Throwable t) { if (notified.compareAndSet(false, true)) { + mdLock.unlock(); if (future != null) { future.cancel(false); } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/metadata/MetaDataService.java b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/metadata/MetaDataService.java new file mode 100644 index 00000000000..f582fc46273 --- /dev/null +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/metadata/MetaDataService.java @@ -0,0 +1,61 @@ +/* + * Licensed to Elastic Search and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Elastic Search licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.cluster.metadata; + +import org.elasticsearch.cluster.routing.operation.hash.djb.DjbHashFunction; +import org.elasticsearch.common.component.AbstractComponent; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.settings.Settings; + +/** + */ +public class MetaDataService extends AbstractComponent { + + private final MdLock[] indexMdLocks; + + @Inject public MetaDataService(Settings settings) { + super(settings); + indexMdLocks = new MdLock[500]; + for (int i = 0; i < indexMdLocks.length; i++) { + indexMdLocks[i] = new MdLock(); + } + } + + public MdLock indexMetaDataLock(String index) { + return indexMdLocks[Math.abs(DjbHashFunction.DJB_HASH(index) % indexMdLocks.length)]; + } + + public class MdLock { + + private boolean isLocked = false; + + public synchronized void lock() throws InterruptedException { + while (isLocked) { + wait(); + } + isLocked = true; + } + + public synchronized void unlock() { + isLocked = false; + notifyAll(); + } + } +}