Merge pull request #14159 from s1monw/remove_meta_data_service

Remove MetaDataSerivce and it's semaphores
This commit is contained in:
Simon Willnauer 2015-10-19 13:45:42 +02:00
commit 7dd35c5a12
8 changed files with 122 additions and 176 deletions

View File

@ -31,6 +31,7 @@ import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.MetaDataMappingService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
@ -67,25 +68,30 @@ public class TransportPutMappingAction extends TransportMasterNodeAction<PutMapp
@Override
protected void masterOperation(final PutMappingRequest request, final ClusterState state, final ActionListener<PutMappingResponse> listener) {
final String[] concreteIndices = indexNameExpressionResolver.concreteIndices(state, request);
PutMappingClusterStateUpdateRequest updateRequest = new PutMappingClusterStateUpdateRequest()
.ackTimeout(request.timeout()).masterNodeTimeout(request.masterNodeTimeout())
.indices(concreteIndices).type(request.type())
.updateAllTypes(request.updateAllTypes())
.source(request.source());
try {
final String[] concreteIndices = indexNameExpressionResolver.concreteIndices(state, request);
PutMappingClusterStateUpdateRequest updateRequest = new PutMappingClusterStateUpdateRequest()
.ackTimeout(request.timeout()).masterNodeTimeout(request.masterNodeTimeout())
.indices(concreteIndices).type(request.type())
.updateAllTypes(request.updateAllTypes())
.source(request.source());
metaDataMappingService.putMapping(updateRequest, new ActionListener<ClusterStateUpdateResponse>() {
metaDataMappingService.putMapping(updateRequest, new ActionListener<ClusterStateUpdateResponse>() {
@Override
public void onResponse(ClusterStateUpdateResponse response) {
listener.onResponse(new PutMappingResponse(response.isAcknowledged()));
}
@Override
public void onResponse(ClusterStateUpdateResponse response) {
listener.onResponse(new PutMappingResponse(response.isAcknowledged()));
}
@Override
public void onFailure(Throwable t) {
logger.debug("failed to put mappings on indices [{}], type [{}]", t, concreteIndices, request.type());
listener.onFailure(t);
}
});
@Override
public void onFailure(Throwable t) {
logger.debug("failed to put mappings on indices [{}], type [{}]", t, concreteIndices, request.type());
listener.onFailure(t);
}
});
} catch (IndexNotFoundException ex) {
logger.debug("failed to put mappings on indices [{}], type [{}]", ex, request.indices(), request.type());
throw ex;
}
}
}

View File

@ -25,6 +25,7 @@ import org.elasticsearch.Version;
import org.elasticsearch.action.*;
import org.elasticsearch.action.support.replication.ReplicationRequest;
import org.elasticsearch.client.Requests;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MappingMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.common.Nullable;
@ -35,6 +36,7 @@ import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.lucene.uid.Versions;
import org.elasticsearch.common.xcontent.*;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.mapper.MapperParsingException;
import org.elasticsearch.index.mapper.internal.TimestampFieldMapper;
@ -561,15 +563,24 @@ public class IndexRequest extends ReplicationRequest<IndexRequest> implements Do
return this.versionType;
}
private Version getVersion(MetaData metaData, String concreteIndex) {
// this can go away in 3.0 but is here now for easy backporting - since in 2.x we need the version on the timestamp stuff
final IndexMetaData indexMetaData = metaData.getIndices().get(concreteIndex);
if (indexMetaData == null) {
throw new IndexNotFoundException(concreteIndex);
}
return Version.indexCreated(indexMetaData.getSettings());
}
public void process(MetaData metaData, @Nullable MappingMetaData mappingMd, boolean allowIdGeneration, String concreteIndex) {
// resolve the routing if needed
routing(metaData.resolveIndexRouting(routing, index));
// resolve timestamp if provided externally
if (timestamp != null) {
Version version = Version.indexCreated(metaData.getIndices().get(concreteIndex).getSettings());
timestamp = MappingMetaData.Timestamp.parseStringTimestamp(timestamp,
mappingMd != null ? mappingMd.timestamp().dateTimeFormatter() : TimestampFieldMapper.Defaults.DATE_TIME_FORMATTER,
version);
getVersion(metaData, concreteIndex));
}
// extract values if needed
if (mappingMd != null) {
@ -592,8 +603,7 @@ public class IndexRequest extends ReplicationRequest<IndexRequest> implements Do
if (parseContext.shouldParseTimestamp()) {
timestamp = parseContext.timestamp();
if (timestamp != null) {
Version version = Version.indexCreated(metaData.getIndices().get(concreteIndex).getSettings());
timestamp = MappingMetaData.Timestamp.parseStringTimestamp(timestamp, mappingMd.timestamp().dateTimeFormatter(), version);
timestamp = MappingMetaData.Timestamp.parseStringTimestamp(timestamp, mappingMd.timestamp().dateTimeFormatter(), getVersion(metaData, concreteIndex));
}
}
} catch (MapperParsingException e) {
@ -642,8 +652,7 @@ public class IndexRequest extends ReplicationRequest<IndexRequest> implements Do
if (defaultTimestamp.equals(TimestampFieldMapper.Defaults.DEFAULT_TIMESTAMP)) {
timestamp = Long.toString(System.currentTimeMillis());
} else {
Version version = Version.indexCreated(metaData.getIndices().get(concreteIndex).getSettings());
timestamp = MappingMetaData.Timestamp.parseStringTimestamp(defaultTimestamp, mappingMd.timestamp().dateTimeFormatter(), version);
timestamp = MappingMetaData.Timestamp.parseStringTimestamp(defaultTimestamp, mappingMd.timestamp().dateTimeFormatter(), getVersion(metaData, concreteIndex));
}
}
}

View File

@ -34,7 +34,6 @@ import org.elasticsearch.cluster.metadata.MetaDataIndexAliasesService;
import org.elasticsearch.cluster.metadata.MetaDataIndexStateService;
import org.elasticsearch.cluster.metadata.MetaDataIndexTemplateService;
import org.elasticsearch.cluster.metadata.MetaDataMappingService;
import org.elasticsearch.cluster.metadata.MetaDataService;
import org.elasticsearch.cluster.metadata.MetaDataUpdateSettingsService;
import org.elasticsearch.cluster.node.DiscoveryNodeService;
import org.elasticsearch.cluster.routing.OperationRouting;
@ -309,7 +308,6 @@ public class ClusterModule extends AbstractModule {
bind(DiscoveryNodeService.class).asEagerSingleton();
bind(ClusterService.class).to(InternalClusterService.class).asEagerSingleton();
bind(OperationRouting.class).asEagerSingleton();
bind(MetaDataService.class).asEagerSingleton();
bind(MetaDataCreateIndexService.class).asEagerSingleton();
bind(MetaDataDeleteIndexService.class).asEagerSingleton();
bind(MetaDataIndexStateService.class).asEagerSingleton();

View File

@ -106,32 +106,25 @@ public class MetaDataCreateIndexService extends AbstractComponent {
public final static int MAX_INDEX_NAME_BYTES = 255;
private static final DefaultIndexTemplateFilter DEFAULT_INDEX_TEMPLATE_FILTER = new DefaultIndexTemplateFilter();
private final ThreadPool threadPool;
private final ClusterService clusterService;
private final IndicesService indicesService;
private final AllocationService allocationService;
private final MetaDataService metaDataService;
private final Version version;
private final AliasValidator aliasValidator;
private final IndexTemplateFilter indexTemplateFilter;
private final NodeEnvironment nodeEnv;
private final Environment env;
@Inject
public MetaDataCreateIndexService(Settings settings, ThreadPool threadPool, ClusterService clusterService,
IndicesService indicesService, AllocationService allocationService, MetaDataService metaDataService,
public MetaDataCreateIndexService(Settings settings, ClusterService clusterService,
IndicesService indicesService, AllocationService allocationService,
Version version, AliasValidator aliasValidator,
Set<IndexTemplateFilter> indexTemplateFilters, Environment env,
NodeEnvironment nodeEnv) {
Set<IndexTemplateFilter> indexTemplateFilters, Environment env) {
super(settings);
this.threadPool = threadPool;
this.clusterService = clusterService;
this.indicesService = indicesService;
this.allocationService = allocationService;
this.metaDataService = metaDataService;
this.version = version;
this.aliasValidator = aliasValidator;
this.nodeEnv = nodeEnv;
this.env = env;
if (indexTemplateFilters.isEmpty()) {
@ -147,29 +140,6 @@ public class MetaDataCreateIndexService extends AbstractComponent {
}
}
public void createIndex(final CreateIndexClusterStateUpdateRequest request, final ActionListener<ClusterStateUpdateResponse> listener) {
// we lock here, and not within the cluster service callback since we don't want to
// block the whole cluster state handling
final Semaphore mdLock = metaDataService.indexMetaDataLock(request.index());
// quick check to see if we can acquire a lock, otherwise spawn to a thread pool
if (mdLock.tryAcquire()) {
createIndex(request, listener, mdLock);
return;
}
threadPool.executor(ThreadPool.Names.MANAGEMENT).execute(new ActionRunnable(listener) {
@Override
public void doRun() throws InterruptedException {
if (!mdLock.tryAcquire(request.masterNodeTimeout().nanos(), TimeUnit.NANOSECONDS)) {
listener.onFailure(new ProcessClusterEventTimeoutException(request.masterNodeTimeout(), "acquire index lock"));
return;
}
createIndex(request, listener, mdLock);
}
});
}
public void validateIndexName(String index, ClusterState state) {
if (state.routingTable().hasIndex(index)) {
throw new IndexAlreadyExistsException(new Index(index));
@ -209,8 +179,7 @@ public class MetaDataCreateIndexService extends AbstractComponent {
}
}
private void createIndex(final CreateIndexClusterStateUpdateRequest request, final ActionListener<ClusterStateUpdateResponse> listener, final Semaphore mdLock) {
public void createIndex(final CreateIndexClusterStateUpdateRequest request, final ActionListener<ClusterStateUpdateResponse> listener) {
Settings.Builder updatedSettingsBuilder = Settings.settingsBuilder();
updatedSettingsBuilder.put(request.settings()).normalizePrefix(IndexMetaData.INDEX_SETTING_PREFIX);
request.settings(updatedSettingsBuilder.build());
@ -222,24 +191,6 @@ public class MetaDataCreateIndexService extends AbstractComponent {
return new ClusterStateUpdateResponse(acknowledged);
}
@Override
public void onAllNodesAcked(@Nullable Throwable t) {
mdLock.release();
super.onAllNodesAcked(t);
}
@Override
public void onAckTimeout() {
mdLock.release();
super.onAckTimeout();
}
@Override
public void onFailure(String source, Throwable t) {
mdLock.release();
super.onFailure(source, t);
}
@Override
public ClusterState execute(ClusterState currentState) throws Exception {
boolean indexCreated = false;

View File

@ -56,50 +56,18 @@ public class MetaDataDeleteIndexService extends AbstractComponent {
private final NodeIndexDeletedAction nodeIndexDeletedAction;
private final MetaDataService metaDataService;
@Inject
public MetaDataDeleteIndexService(Settings settings, ThreadPool threadPool, ClusterService clusterService, AllocationService allocationService,
NodeIndexDeletedAction nodeIndexDeletedAction, MetaDataService metaDataService) {
NodeIndexDeletedAction nodeIndexDeletedAction) {
super(settings);
this.threadPool = threadPool;
this.clusterService = clusterService;
this.allocationService = allocationService;
this.nodeIndexDeletedAction = nodeIndexDeletedAction;
this.metaDataService = metaDataService;
}
public void deleteIndex(final Request request, final Listener userListener) {
// we lock here, and not within the cluster service callback since we don't want to
// block the whole cluster state handling
final Semaphore mdLock = metaDataService.indexMetaDataLock(request.index);
// quick check to see if we can acquire a lock, otherwise spawn to a thread pool
if (mdLock.tryAcquire()) {
deleteIndex(request, userListener, mdLock);
return;
}
threadPool.executor(ThreadPool.Names.MANAGEMENT).execute(new Runnable() {
@Override
public void run() {
try {
if (!mdLock.tryAcquire(request.masterTimeout.nanos(), TimeUnit.NANOSECONDS)) {
userListener.onFailure(new ProcessClusterEventTimeoutException(request.masterTimeout, "acquire index lock"));
return;
}
} catch (InterruptedException e) {
userListener.onFailure(e);
return;
}
deleteIndex(request, userListener, mdLock);
}
});
}
private void deleteIndex(final Request request, final Listener userListener, Semaphore mdLock) {
final DeleteIndexListener listener = new DeleteIndexListener(mdLock, userListener);
final DeleteIndexListener listener = new DeleteIndexListener(userListener);
clusterService.submitStateUpdateTask("delete-index [" + request.index + "]", Priority.URGENT, new ClusterStateUpdateTask() {
@Override
@ -181,19 +149,16 @@ public class MetaDataDeleteIndexService extends AbstractComponent {
class DeleteIndexListener implements Listener {
private final AtomicBoolean notified = new AtomicBoolean();
private final Semaphore mdLock;
private final Listener listener;
volatile ScheduledFuture<?> future;
private DeleteIndexListener(Semaphore mdLock, Listener listener) {
this.mdLock = mdLock;
private DeleteIndexListener(Listener listener) {
this.listener = listener;
}
@Override
public void onResponse(final Response response) {
if (notified.compareAndSet(false, true)) {
mdLock.release();
FutureUtils.cancel(future);
listener.onResponse(response);
}
@ -202,7 +167,6 @@ public class MetaDataDeleteIndexService extends AbstractComponent {
@Override
public void onFailure(Throwable t) {
if (notified.compareAndSet(false, true)) {
mdLock.release();
FutureUtils.cancel(future);
listener.onFailure(t);
}
@ -210,7 +174,7 @@ public class MetaDataDeleteIndexService extends AbstractComponent {
}
public static interface Listener {
public interface Listener {
void onResponse(Response response);

View File

@ -1,48 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.cluster.metadata;
import org.elasticsearch.cluster.routing.Murmur3HashFunction;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.math.MathUtils;
import org.elasticsearch.common.settings.Settings;
import java.util.concurrent.Semaphore;
/**
*/
public class MetaDataService extends AbstractComponent {
private final Semaphore[] indexMdLocks;
@Inject
public MetaDataService(Settings settings) {
super(settings);
indexMdLocks = new Semaphore[500];
for (int i = 0; i < indexMdLocks.length; i++) {
indexMdLocks[i] = new Semaphore(1);
}
}
public Semaphore indexMetaDataLock(String index) {
return indexMdLocks[MathUtils.mod(Murmur3HashFunction.hash(index), indexMdLocks.length)];
}
}

View File

@ -19,18 +19,28 @@
package org.elasticsearch.action.admin.indices.create;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexResponse;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.query.RangeQueryBuilder;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.ESIntegTestCase.ClusterScope;
import org.elasticsearch.test.ESIntegTestCase.Scope;
import org.junit.Test;
import java.util.HashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
@ -107,8 +117,8 @@ public class CreateIndexIT extends ESIntegTestCase {
public void testInvalidShardCountSettings() throws Exception {
try {
prepareCreate("test").setSettings(Settings.builder()
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, randomIntBetween(-10, 0))
.build())
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, randomIntBetween(-10, 0))
.build())
.get();
fail("should have thrown an exception about the primary shard count");
} catch (IllegalArgumentException e) {
@ -118,8 +128,8 @@ public class CreateIndexIT extends ESIntegTestCase {
try {
prepareCreate("test").setSettings(Settings.builder()
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, randomIntBetween(-10, -1))
.build())
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, randomIntBetween(-10, -1))
.build())
.get();
fail("should have thrown an exception about the replica shard count");
} catch (IllegalArgumentException e) {
@ -129,9 +139,9 @@ public class CreateIndexIT extends ESIntegTestCase {
try {
prepareCreate("test").setSettings(Settings.builder()
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, randomIntBetween(-10, 0))
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, randomIntBetween(-10, -1))
.build())
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, randomIntBetween(-10, 0))
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, randomIntBetween(-10, -1))
.build())
.get();
fail("should have thrown an exception about the shard count");
} catch (IllegalArgumentException e) {
@ -196,4 +206,63 @@ public class CreateIndexIT extends ESIntegTestCase {
}
}
public void testCreateAndDeleteIndexConcurrently() throws InterruptedException {
createIndex("test");
final AtomicInteger indexVersion = new AtomicInteger(0);
final Object indexVersionLock = new Object();
final CountDownLatch latch = new CountDownLatch(1);
int numDocs = randomIntBetween(1, 10);
for (int i = 0; i < numDocs; i++) {
client().prepareIndex("test", "test").setSource("index_version", indexVersion.get()).get();
}
synchronized (indexVersionLock) { // not necessarily needed here but for completeness we lock here too
indexVersion.incrementAndGet();
}
client().admin().indices().prepareDelete("test").execute(new ActionListener<DeleteIndexResponse>() { // this happens async!!!
@Override
public void onResponse(DeleteIndexResponse deleteIndexResponse) {
Thread thread = new Thread() {
public void run() {
try {
client().prepareIndex("test", "test").setSource("index_version", indexVersion.get()).get(); // recreate that index
synchronized (indexVersionLock) {
// we sync here since we have to ensure that all indexing operations below for a given ID are done before we increment the
// index version otherwise a doc that is in-flight could make it into an index that it was supposed to be deleted for and our assertion fail...
indexVersion.incrementAndGet();
}
assertAcked(client().admin().indices().prepareDelete("test").get()); // from here on all docs with index_version == 0|1 must be gone!!!! only 2 are ok;
} finally {
latch.countDown();
}
}
};
thread.start();
}
@Override
public void onFailure(Throwable e) {
throw new RuntimeException(e);
}
}
);
numDocs = randomIntBetween(100, 200);
for (int i = 0; i < numDocs; i++) {
try {
synchronized (indexVersionLock) {
client().prepareIndex("test", "test").setSource("index_version", indexVersion.get()).get();
}
} catch (IndexNotFoundException inf) {
// fine
}
}
latch.await();
refresh();
// we only really assert that we never reuse segments of old indices or anything like this here and that nothing fails with crazy exceptions
SearchResponse expected = client().prepareSearch("test").setIndicesOptions(IndicesOptions.lenientExpandOpen()).setQuery(new RangeQueryBuilder("index_version").from(indexVersion.get(), true)).get();
SearchResponse all = client().prepareSearch("test").setIndicesOptions(IndicesOptions.lenientExpandOpen()).get();
assertEquals(expected + " vs. " + all, expected.getHits().getTotalHits(), all.getHits().getTotalHits());
logger.info("total: {}", expected.getHits().getTotalHits());
}
}

View File

@ -77,12 +77,9 @@ public class MetaDataIndexTemplateServiceTests extends ESTestCase {
null,
null,
null,
null,
null,
Version.CURRENT,
null,
new HashSet<>(),
null,
null
);
MetaDataIndexTemplateService service = new MetaDataIndexTemplateService(Settings.EMPTY, null, createIndexService, null);