Rapidly concurrent deleting/creating an index leaves index inconsistent, closes #1296.
This commit is contained in:
parent
a8baec6960
commit
8facdb2e3d
|
@ -31,6 +31,7 @@ import org.elasticsearch.cluster.metadata.MetaDataDeleteIndexService;
|
|||
import org.elasticsearch.cluster.metadata.MetaDataIndexAliasesService;
|
||||
import org.elasticsearch.cluster.metadata.MetaDataIndexTemplateService;
|
||||
import org.elasticsearch.cluster.metadata.MetaDataMappingService;
|
||||
import org.elasticsearch.cluster.metadata.MetaDataService;
|
||||
import org.elasticsearch.cluster.metadata.MetaDataStateIndexService;
|
||||
import org.elasticsearch.cluster.metadata.MetaDataUpdateSettingsService;
|
||||
import org.elasticsearch.cluster.routing.RoutingService;
|
||||
|
@ -62,6 +63,7 @@ public class ClusterModule extends AbstractModule implements SpawnModules {
|
|||
protected void configure() {
|
||||
bind(ClusterService.class).to(InternalClusterService.class).asEagerSingleton();
|
||||
|
||||
bind(MetaDataService.class).asEagerSingleton();
|
||||
bind(MetaDataCreateIndexService.class).asEagerSingleton();
|
||||
bind(MetaDataDeleteIndexService.class).asEagerSingleton();
|
||||
bind(MetaDataStateIndexService.class).asEagerSingleton();
|
||||
|
|
|
@ -92,10 +92,12 @@ public class MetaDataCreateIndexService extends AbstractComponent {
|
|||
|
||||
private final NodeIndexCreatedAction nodeIndexCreatedAction;
|
||||
|
||||
private final MetaDataService metaDataService;
|
||||
|
||||
private final String riverIndexName;
|
||||
|
||||
@Inject public MetaDataCreateIndexService(Settings settings, Environment environment, ThreadPool threadPool, ClusterService clusterService, IndicesService indicesService,
|
||||
ShardsAllocation shardsAllocation, NodeIndexCreatedAction nodeIndexCreatedAction, @RiverIndexName String riverIndexName) {
|
||||
ShardsAllocation shardsAllocation, NodeIndexCreatedAction nodeIndexCreatedAction, MetaDataService metaDataService, @RiverIndexName String riverIndexName) {
|
||||
super(settings);
|
||||
this.environment = environment;
|
||||
this.threadPool = threadPool;
|
||||
|
@ -103,6 +105,7 @@ public class MetaDataCreateIndexService extends AbstractComponent {
|
|||
this.indicesService = indicesService;
|
||||
this.shardsAllocation = shardsAllocation;
|
||||
this.nodeIndexCreatedAction = nodeIndexCreatedAction;
|
||||
this.metaDataService = metaDataService;
|
||||
this.riverIndexName = riverIndexName;
|
||||
}
|
||||
|
||||
|
@ -116,8 +119,18 @@ public class MetaDataCreateIndexService extends AbstractComponent {
|
|||
}
|
||||
}
|
||||
request.settings(updatedSettingsBuilder.build());
|
||||
final CreateIndexListener listener = new CreateIndexListener(request, userListener);
|
||||
|
||||
// we lock here, and not within the cluster service callback since we don't want to
|
||||
// block the whole cluster state handling
|
||||
MetaDataService.MdLock mdLock = metaDataService.indexMetaDataLock(request.index);
|
||||
try {
|
||||
mdLock.lock();
|
||||
} catch (InterruptedException e) {
|
||||
userListener.onFailure(e);
|
||||
return;
|
||||
}
|
||||
|
||||
final CreateIndexListener listener = new CreateIndexListener(mdLock, request, userListener);
|
||||
|
||||
clusterService.submitStateUpdateTask("create-index [" + request.index + "], cause [" + request.cause + "]", new ProcessedClusterStateUpdateTask() {
|
||||
@Override public ClusterState execute(ClusterState currentState) {
|
||||
|
@ -309,7 +322,9 @@ public class MetaDataCreateIndexService extends AbstractComponent {
|
|||
|
||||
class CreateIndexListener implements Listener {
|
||||
|
||||
private AtomicBoolean notified = new AtomicBoolean();
|
||||
private final AtomicBoolean notified = new AtomicBoolean();
|
||||
|
||||
private final MetaDataService.MdLock mdLock;
|
||||
|
||||
private final Request request;
|
||||
|
||||
|
@ -317,13 +332,15 @@ public class MetaDataCreateIndexService extends AbstractComponent {
|
|||
|
||||
volatile ScheduledFuture future;
|
||||
|
||||
private CreateIndexListener(Request request, Listener listener) {
|
||||
private CreateIndexListener(MetaDataService.MdLock mdLock, Request request, Listener listener) {
|
||||
this.mdLock = mdLock;
|
||||
this.request = request;
|
||||
this.listener = listener;
|
||||
}
|
||||
|
||||
@Override public void onResponse(final Response response) {
|
||||
if (notified.compareAndSet(false, true)) {
|
||||
mdLock.unlock();
|
||||
if (future != null) {
|
||||
future.cancel(false);
|
||||
}
|
||||
|
@ -333,6 +350,7 @@ public class MetaDataCreateIndexService extends AbstractComponent {
|
|||
|
||||
@Override public void onFailure(Throwable t) {
|
||||
if (notified.compareAndSet(false, true)) {
|
||||
mdLock.unlock();
|
||||
if (future != null) {
|
||||
future.cancel(false);
|
||||
}
|
||||
|
|
|
@ -55,19 +55,32 @@ public class MetaDataDeleteIndexService extends AbstractComponent {
|
|||
|
||||
private final NodeIndexDeletedAction nodeIndexDeletedAction;
|
||||
|
||||
private final MetaDataService metaDataService;
|
||||
|
||||
@Inject public MetaDataDeleteIndexService(Settings settings, ThreadPool threadPool, ClusterService clusterService, ShardsAllocation shardsAllocation,
|
||||
NodeIndexDeletedAction nodeIndexDeletedAction) {
|
||||
NodeIndexDeletedAction nodeIndexDeletedAction, MetaDataService metaDataService) {
|
||||
super(settings);
|
||||
this.threadPool = threadPool;
|
||||
this.clusterService = clusterService;
|
||||
this.shardsAllocation = shardsAllocation;
|
||||
this.nodeIndexDeletedAction = nodeIndexDeletedAction;
|
||||
this.metaDataService = metaDataService;
|
||||
}
|
||||
|
||||
public void deleteIndex(final Request request, final Listener userListener) {
|
||||
// we lock here, and not within the cluster service callback since we don't want to
|
||||
// block the whole cluster state handling
|
||||
MetaDataService.MdLock mdLock = metaDataService.indexMetaDataLock(request.index);
|
||||
try {
|
||||
mdLock.lock();
|
||||
} catch (InterruptedException e) {
|
||||
userListener.onFailure(e);
|
||||
return;
|
||||
}
|
||||
|
||||
final DeleteIndexListener listener = new DeleteIndexListener(mdLock, request, userListener);
|
||||
clusterService.submitStateUpdateTask("delete-index [" + request.index + "]", new ClusterStateUpdateTask() {
|
||||
@Override public ClusterState execute(ClusterState currentState) {
|
||||
final DeleteIndexListener listener = new DeleteIndexListener(request, userListener);
|
||||
try {
|
||||
if (!currentState.metaData().hasConcreteIndex(request.index)) {
|
||||
listener.onFailure(new IndexMissingException(new Index(request.index)));
|
||||
|
@ -121,7 +134,9 @@ public class MetaDataDeleteIndexService extends AbstractComponent {
|
|||
|
||||
class DeleteIndexListener implements Listener {
|
||||
|
||||
private AtomicBoolean notified = new AtomicBoolean();
|
||||
private final AtomicBoolean notified = new AtomicBoolean();
|
||||
|
||||
private final MetaDataService.MdLock mdLock;
|
||||
|
||||
private final Request request;
|
||||
|
||||
|
@ -129,13 +144,15 @@ public class MetaDataDeleteIndexService extends AbstractComponent {
|
|||
|
||||
volatile ScheduledFuture future;
|
||||
|
||||
private DeleteIndexListener(Request request, Listener listener) {
|
||||
private DeleteIndexListener(MetaDataService.MdLock mdLock, Request request, Listener listener) {
|
||||
this.mdLock = mdLock;
|
||||
this.request = request;
|
||||
this.listener = listener;
|
||||
}
|
||||
|
||||
@Override public void onResponse(final Response response) {
|
||||
if (notified.compareAndSet(false, true)) {
|
||||
mdLock.unlock();
|
||||
if (future != null) {
|
||||
future.cancel(false);
|
||||
}
|
||||
|
@ -145,6 +162,7 @@ public class MetaDataDeleteIndexService extends AbstractComponent {
|
|||
|
||||
@Override public void onFailure(Throwable t) {
|
||||
if (notified.compareAndSet(false, true)) {
|
||||
mdLock.unlock();
|
||||
if (future != null) {
|
||||
future.cancel(false);
|
||||
}
|
||||
|
|
|
@ -0,0 +1,61 @@
|
|||
/*
|
||||
* Licensed to Elastic Search and Shay Banon under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. Elastic Search licenses this
|
||||
* file to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.cluster.metadata;
|
||||
|
||||
import org.elasticsearch.cluster.routing.operation.hash.djb.DjbHashFunction;
|
||||
import org.elasticsearch.common.component.AbstractComponent;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
|
||||
/**
|
||||
*/
|
||||
public class MetaDataService extends AbstractComponent {
|
||||
|
||||
private final MdLock[] indexMdLocks;
|
||||
|
||||
@Inject public MetaDataService(Settings settings) {
|
||||
super(settings);
|
||||
indexMdLocks = new MdLock[500];
|
||||
for (int i = 0; i < indexMdLocks.length; i++) {
|
||||
indexMdLocks[i] = new MdLock();
|
||||
}
|
||||
}
|
||||
|
||||
public MdLock indexMetaDataLock(String index) {
|
||||
return indexMdLocks[Math.abs(DjbHashFunction.DJB_HASH(index) % indexMdLocks.length)];
|
||||
}
|
||||
|
||||
public class MdLock {
|
||||
|
||||
private boolean isLocked = false;
|
||||
|
||||
public synchronized void lock() throws InterruptedException {
|
||||
while (isLocked) {
|
||||
wait();
|
||||
}
|
||||
isLocked = true;
|
||||
}
|
||||
|
||||
public synchronized void unlock() {
|
||||
isLocked = false;
|
||||
notifyAll();
|
||||
}
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue