refactor meta data opertions into discrete services that do all operations within the cluster update process

This commit is contained in:
kimchy 2010-07-13 18:01:57 +03:00
parent ed04721f08
commit 892dadca59
19 changed files with 1170 additions and 591 deletions

View File

@ -26,23 +26,26 @@ import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.AliasAction;
import org.elasticsearch.cluster.metadata.MetaDataService;
import org.elasticsearch.cluster.metadata.MetaDataIndexAliasesService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicReference;
/**
* @author kimchy (shay.banon)
*/
public class TransportIndicesAliasesAction extends TransportMasterNodeOperationAction<IndicesAliasesRequest, IndicesAliasesResponse> {
private final MetaDataService metaDataService;
private final MetaDataIndexAliasesService indexAliasesService;
@Inject public TransportIndicesAliasesAction(Settings settings, TransportService transportService, ClusterService clusterService,
ThreadPool threadPool, MetaDataService metaDataService) {
ThreadPool threadPool, MetaDataIndexAliasesService indexAliasesService) {
super(settings, transportService, clusterService, threadPool);
this.metaDataService = metaDataService;
this.indexAliasesService = indexAliasesService;
}
@Override protected String transportAction() {
@ -64,7 +67,35 @@ public class TransportIndicesAliasesAction extends TransportMasterNodeOperationA
}
@Override protected IndicesAliasesResponse masterOperation(IndicesAliasesRequest request, ClusterState state) throws ElasticSearchException {
MetaDataService.IndicesAliasesResult indicesAliasesResult = metaDataService.indicesAliases(request.aliasActions());
return new IndicesAliasesResponse();
final AtomicReference<IndicesAliasesResponse> responseRef = new AtomicReference<IndicesAliasesResponse>();
final AtomicReference<Throwable> failureRef = new AtomicReference<Throwable>();
final CountDownLatch latch = new CountDownLatch(1);
indexAliasesService.indicesAliases(new MetaDataIndexAliasesService.Request(request.aliasActions().toArray(new AliasAction[request.aliasActions().size()])), new MetaDataIndexAliasesService.Listener() {
@Override public void onResponse(MetaDataIndexAliasesService.Response response) {
responseRef.set(new IndicesAliasesResponse());
latch.countDown();
}
@Override public void onFailure(Throwable t) {
failureRef.set(t);
latch.countDown();
}
});
try {
latch.await();
} catch (InterruptedException e) {
failureRef.set(e);
}
if (failureRef.get() != null) {
if (failureRef.get() instanceof ElasticSearchException) {
throw (ElasticSearchException) failureRef.get();
} else {
throw new ElasticSearchException(failureRef.get().getMessage(), failureRef.get());
}
}
return responseRef.get();
}
}

View File

@ -25,12 +25,15 @@ import org.elasticsearch.action.support.master.TransportMasterNodeOperationActio
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.MetaDataService;
import org.elasticsearch.cluster.metadata.MetaDataCreateIndexService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicReference;
/**
* Create index action.
*
@ -38,12 +41,12 @@ import org.elasticsearch.transport.TransportService;
*/
public class TransportCreateIndexAction extends TransportMasterNodeOperationAction<CreateIndexRequest, CreateIndexResponse> {
private final MetaDataService metaDataService;
private final MetaDataCreateIndexService createIndexService;
@Inject public TransportCreateIndexAction(Settings settings, TransportService transportService, ClusterService clusterService,
ThreadPool threadPool, MetaDataService metaDataService) {
ThreadPool threadPool, MetaDataCreateIndexService createIndexService) {
super(settings, transportService, clusterService, threadPool);
this.metaDataService = metaDataService;
this.createIndexService = createIndexService;
}
@Override protected String transportAction() {
@ -67,7 +70,36 @@ public class TransportCreateIndexAction extends TransportMasterNodeOperationActi
if (cause.length() == 0) {
cause = "api";
}
MetaDataService.CreateIndexResult createIndexResult = metaDataService.createIndex(cause, request.index(), request.settings(), request.mappings(), request.timeout());
return new CreateIndexResponse(createIndexResult.acknowledged());
final AtomicReference<CreateIndexResponse> responseRef = new AtomicReference<CreateIndexResponse>();
final AtomicReference<Throwable> failureRef = new AtomicReference<Throwable>();
final CountDownLatch latch = new CountDownLatch(1);
createIndexService.createIndex(new MetaDataCreateIndexService.Request(cause, request.index()).settings(request.settings()).mappings(request.mappings()).timeout(request.timeout()), new MetaDataCreateIndexService.Listener() {
@Override public void onResponse(MetaDataCreateIndexService.Response response) {
responseRef.set(new CreateIndexResponse(response.acknowledged()));
latch.countDown();
}
@Override public void onFailure(Throwable t) {
failureRef.set(t);
latch.countDown();
}
});
try {
latch.await();
} catch (InterruptedException e) {
failureRef.set(e);
}
if (failureRef.get() != null) {
if (failureRef.get() instanceof ElasticSearchException) {
throw (ElasticSearchException) failureRef.get();
} else {
throw new ElasticSearchException(failureRef.get().getMessage(), failureRef.get());
}
}
return responseRef.get();
}
}

View File

@ -25,12 +25,15 @@ import org.elasticsearch.action.support.master.TransportMasterNodeOperationActio
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.MetaDataService;
import org.elasticsearch.cluster.metadata.MetaDataDeleteIndexService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicReference;
/**
* Delete index action.
*
@ -38,12 +41,12 @@ import org.elasticsearch.transport.TransportService;
*/
public class TransportDeleteIndexAction extends TransportMasterNodeOperationAction<DeleteIndexRequest, DeleteIndexResponse> {
private final MetaDataService metaDataService;
private final MetaDataDeleteIndexService deleteIndexService;
@Inject public TransportDeleteIndexAction(Settings settings, TransportService transportService, ClusterService clusterService,
ThreadPool threadPool, MetaDataService metaDataService) {
ThreadPool threadPool, MetaDataDeleteIndexService deleteIndexService) {
super(settings, transportService, clusterService, threadPool);
this.metaDataService = metaDataService;
this.deleteIndexService = deleteIndexService;
}
@Override protected String transportAction() {
@ -63,7 +66,35 @@ public class TransportDeleteIndexAction extends TransportMasterNodeOperationActi
}
@Override protected DeleteIndexResponse masterOperation(DeleteIndexRequest request, ClusterState state) throws ElasticSearchException {
MetaDataService.DeleteIndexResult deleteIndexResult = metaDataService.deleteIndex(request.index(), request.timeout());
return new DeleteIndexResponse(deleteIndexResult.acknowledged());
final AtomicReference<DeleteIndexResponse> responseRef = new AtomicReference<DeleteIndexResponse>();
final AtomicReference<Throwable> failureRef = new AtomicReference<Throwable>();
final CountDownLatch latch = new CountDownLatch(1);
deleteIndexService.deleteIndex(new MetaDataDeleteIndexService.Request(request.index()).timeout(request.timeout()), new MetaDataDeleteIndexService.Listener() {
@Override public void onResponse(MetaDataDeleteIndexService.Response response) {
responseRef.set(new DeleteIndexResponse(response.acknowledged()));
latch.countDown();
}
@Override public void onFailure(Throwable t) {
failureRef.set(t);
latch.countDown();
}
});
try {
latch.await();
} catch (InterruptedException e) {
failureRef.set(e);
}
if (failureRef.get() != null) {
if (failureRef.get() instanceof ElasticSearchException) {
throw (ElasticSearchException) failureRef.get();
} else {
throw new ElasticSearchException(failureRef.get().getMessage(), failureRef.get());
}
}
return responseRef.get();
}
}

View File

@ -25,12 +25,15 @@ import org.elasticsearch.action.support.master.TransportMasterNodeOperationActio
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.MetaDataService;
import org.elasticsearch.cluster.metadata.MetaDataMappingService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicReference;
/**
* Put mapping action.
*
@ -38,12 +41,12 @@ import org.elasticsearch.transport.TransportService;
*/
public class TransportPutMappingAction extends TransportMasterNodeOperationAction<PutMappingRequest, PutMappingResponse> {
private final MetaDataService metaDataService;
private final MetaDataMappingService metaDataMappingService;
@Inject public TransportPutMappingAction(Settings settings, TransportService transportService, ClusterService clusterService,
ThreadPool threadPool, MetaDataService metaDataService) {
ThreadPool threadPool, MetaDataMappingService metaDataMappingService) {
super(settings, transportService, clusterService, threadPool);
this.metaDataService = metaDataService;
this.metaDataMappingService = metaDataMappingService;
}
@ -75,7 +78,35 @@ public class TransportPutMappingAction extends TransportMasterNodeOperationActio
request.indices(clusterState.metaData().concreteIndices(request.indices()));
final String[] indices = request.indices();
MetaDataService.PutMappingResult result = metaDataService.putMapping(indices, request.type(), request.source(), request.ignoreConflicts(), request.timeout());
return new PutMappingResponse(result.acknowledged());
final AtomicReference<PutMappingResponse> responseRef = new AtomicReference<PutMappingResponse>();
final AtomicReference<Throwable> failureRef = new AtomicReference<Throwable>();
final CountDownLatch latch = new CountDownLatch(1);
metaDataMappingService.putMapping(new MetaDataMappingService.Request(request.indices(), request.type(), request.source()).ignoreConflicts(request.ignoreConflicts()).timeout(request.timeout()), new MetaDataMappingService.Listener() {
@Override public void onResponse(MetaDataMappingService.Response response) {
responseRef.set(new PutMappingResponse(response.acknowledged()));
latch.countDown();
}
@Override public void onFailure(Throwable t) {
failureRef.set(t);
latch.countDown();
}
});
try {
latch.await();
} catch (InterruptedException e) {
failureRef.set(e);
}
if (failureRef.get() != null) {
if (failureRef.get() instanceof ElasticSearchException) {
throw (ElasticSearchException) failureRef.get();
} else {
throw new ElasticSearchException(failureRef.get().getMessage(), failureRef.get());
}
}
return responseRef.get();
}
}

View File

@ -24,7 +24,10 @@ import org.elasticsearch.cluster.action.index.NodeIndexCreatedAction;
import org.elasticsearch.cluster.action.index.NodeIndexDeletedAction;
import org.elasticsearch.cluster.action.index.NodeMappingCreatedAction;
import org.elasticsearch.cluster.action.shard.ShardStateAction;
import org.elasticsearch.cluster.metadata.MetaDataService;
import org.elasticsearch.cluster.metadata.MetaDataCreateIndexService;
import org.elasticsearch.cluster.metadata.MetaDataDeleteIndexService;
import org.elasticsearch.cluster.metadata.MetaDataIndexAliasesService;
import org.elasticsearch.cluster.metadata.MetaDataMappingService;
import org.elasticsearch.cluster.routing.RoutingService;
import org.elasticsearch.cluster.routing.strategy.PreferUnallocatedShardUnassignedStrategy;
import org.elasticsearch.cluster.routing.strategy.ShardsRoutingStrategy;
@ -33,7 +36,7 @@ import org.elasticsearch.common.inject.AbstractModule;
import org.elasticsearch.common.settings.Settings;
/**
* @author kimchy (Shay Banon)
* @author kimchy (shay.banon)
*/
public class ClusterModule extends AbstractModule {
@ -49,7 +52,11 @@ public class ClusterModule extends AbstractModule {
bind(ShardsRoutingStrategy.class).asEagerSingleton();
bind(ClusterService.class).to(InternalClusterService.class).asEagerSingleton();
bind(MetaDataService.class).asEagerSingleton();
bind(MetaDataCreateIndexService.class).asEagerSingleton();
bind(MetaDataDeleteIndexService.class).asEagerSingleton();
bind(MetaDataMappingService.class).asEagerSingleton();
bind(MetaDataIndexAliasesService.class).asEagerSingleton();
bind(RoutingService.class).asEagerSingleton();
bind(ShardStateAction.class).asEagerSingleton();

View File

@ -26,7 +26,7 @@ import org.elasticsearch.action.support.master.MasterNodeOperationRequest;
import org.elasticsearch.action.support.master.TransportMasterNodeOperationAction;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.MetaDataService;
import org.elasticsearch.cluster.metadata.MetaDataMappingService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
@ -40,16 +40,16 @@ import java.io.IOException;
* Called by shards in the cluster when their mapping was dynamically updated and it needs to be updated
* in the cluster state meta data (and broadcast to all members).
*
* @author kimchy (Shay Banon)
* @author kimchy (shay.banon)
*/
public class MappingUpdatedAction extends TransportMasterNodeOperationAction<MappingUpdatedAction.MappingUpdatedRequest, MappingUpdatedAction.MappingUpdatedResponse> {
private final MetaDataService metaDataService;
private final MetaDataMappingService metaDataMappingService;
@Inject public MappingUpdatedAction(Settings settings, TransportService transportService, ClusterService clusterService, ThreadPool threadPool,
MetaDataService metaDataService) {
MetaDataMappingService metaDataMappingService) {
super(settings, transportService, clusterService, threadPool);
this.metaDataService = metaDataService;
this.metaDataMappingService = metaDataMappingService;
}
@Override protected String transportAction() {
@ -65,7 +65,7 @@ public class MappingUpdatedAction extends TransportMasterNodeOperationAction<Map
}
@Override protected MappingUpdatedResponse masterOperation(MappingUpdatedRequest request, ClusterState state) throws ElasticSearchException {
metaDataService.updateMapping(request.index(), request.type(), request.mappingSource());
metaDataMappingService.updateMapping(request.index(), request.type(), request.mappingSource());
return new MappingUpdatedResponse();
}

View File

@ -0,0 +1,312 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.cluster.metadata;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.ProcessedClusterStateUpdateTask;
import org.elasticsearch.cluster.action.index.NodeIndexCreatedAction;
import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.routing.strategy.ShardsRoutingStrategy;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.collect.Maps;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.Streams;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.timer.Timeout;
import org.elasticsearch.common.timer.TimerTask;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.env.Environment;
import org.elasticsearch.index.Index;
import org.elasticsearch.indices.IndexAlreadyExistsException;
import org.elasticsearch.indices.InvalidIndexNameException;
import org.elasticsearch.timer.TimerService;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import static org.elasticsearch.cluster.ClusterState.*;
import static org.elasticsearch.cluster.metadata.IndexMetaData.*;
import static org.elasticsearch.cluster.metadata.MetaData.*;
import static org.elasticsearch.common.settings.ImmutableSettings.*;
/**
* @author kimchy (shay.banon)
*/
public class MetaDataCreateIndexService extends AbstractComponent {
private final Environment environment;
private final TimerService timerService;
private final ClusterService clusterService;
private final ShardsRoutingStrategy shardsRoutingStrategy;
private final NodeIndexCreatedAction nodeIndexCreatedAction;
@Inject public MetaDataCreateIndexService(Settings settings, Environment environment, TimerService timerService, ClusterService clusterService, ShardsRoutingStrategy shardsRoutingStrategy,
NodeIndexCreatedAction nodeIndexCreatedAction) {
super(settings);
this.environment = environment;
this.timerService = timerService;
this.clusterService = clusterService;
this.shardsRoutingStrategy = shardsRoutingStrategy;
this.nodeIndexCreatedAction = nodeIndexCreatedAction;
}
public void createIndex(final Request request, final Listener userListener) {
clusterService.submitStateUpdateTask("create-index [" + request.index + "], cause [" + request.cause + "]", new ClusterStateUpdateTask() {
@Override public ClusterState execute(ClusterState currentState) {
final CreateIndexListener listener = new CreateIndexListener(request, userListener);
try {
if (currentState.routingTable().hasIndex(request.index)) {
listener.onFailure(new IndexAlreadyExistsException(new Index(request.index)));
return currentState;
}
if (currentState.metaData().hasIndex(request.index)) {
listener.onFailure(new IndexAlreadyExistsException(new Index(request.index)));
return currentState;
}
if (request.index.contains(" ")) {
listener.onFailure(new InvalidIndexNameException(new Index(request.index), request.index, "must not contain whitespace"));
return currentState;
}
if (request.index.contains(",")) {
listener.onFailure(new InvalidIndexNameException(new Index(request.index), request.index, "must not contain ',"));
return currentState;
}
if (request.index.contains("#")) {
listener.onFailure(new InvalidIndexNameException(new Index(request.index), request.index, "must not contain '#"));
return currentState;
}
if (request.index.charAt(0) == '_') {
listener.onFailure(new InvalidIndexNameException(new Index(request.index), request.index, "must not start with '_'"));
return currentState;
}
if (!request.index.toLowerCase().equals(request.index)) {
listener.onFailure(new InvalidIndexNameException(new Index(request.index), request.index, "must be lowercase"));
return currentState;
}
if (!Strings.validFileName(request.index)) {
listener.onFailure(new InvalidIndexNameException(new Index(request.index), request.index, "must not contain the following characters " + Strings.INVALID_FILENAME_CHARS));
return currentState;
}
if (currentState.metaData().aliases().contains(request.index)) {
listener.onFailure(new InvalidIndexNameException(new Index(request.index), request.index, "an alias with the same name already exists"));
return currentState;
}
// add to the mappings files that exists within the config/mappings location
Map<String, String> mappings = Maps.newHashMap();
File mappingsDir = new File(environment.configFile(), "mappings");
if (mappingsDir.exists() && mappingsDir.isDirectory()) {
File defaultMappingsDir = new File(mappingsDir, "_default");
if (mappingsDir.exists() && mappingsDir.isDirectory()) {
addMappings(mappings, defaultMappingsDir);
}
File indexMappingsDir = new File(mappingsDir, request.index);
if (mappingsDir.exists() && mappingsDir.isDirectory()) {
addMappings(mappings, indexMappingsDir);
}
}
// TODO add basic mapping validation
// put this last so index level mappings can override default mappings
mappings.putAll(request.mappings);
ImmutableSettings.Builder indexSettingsBuilder = settingsBuilder().put(request.settings);
if (request.settings.get(SETTING_NUMBER_OF_SHARDS) == null) {
indexSettingsBuilder.put(SETTING_NUMBER_OF_SHARDS, settings.getAsInt(SETTING_NUMBER_OF_SHARDS, 5));
}
if (request.settings.get(SETTING_NUMBER_OF_REPLICAS) == null) {
indexSettingsBuilder.put(SETTING_NUMBER_OF_REPLICAS, settings.getAsInt(SETTING_NUMBER_OF_REPLICAS, 1));
}
Settings actualIndexSettings = indexSettingsBuilder.build();
IndexMetaData.Builder indexMetaData = newIndexMetaDataBuilder(request.index).settings(actualIndexSettings);
for (Map.Entry<String, String> entry : mappings.entrySet()) {
indexMetaData.putMapping(entry.getKey(), entry.getValue());
}
MetaData newMetaData = newMetaDataBuilder()
.metaData(currentState.metaData())
.put(indexMetaData)
.build();
logger.info("[{}] creating index, cause [{}], shards [{}]/[{}], mappings {}", request.index, request.cause, indexMetaData.numberOfShards(), indexMetaData.numberOfReplicas(), mappings.keySet());
final AtomicInteger counter = new AtomicInteger(currentState.nodes().size());
final NodeIndexCreatedAction.Listener nodeIndexCreateListener = new NodeIndexCreatedAction.Listener() {
@Override public void onNodeIndexCreated(String index, String nodeId) {
if (index.equals(request.index)) {
if (counter.decrementAndGet() == 0) {
listener.onResponse(new Response(true));
nodeIndexCreatedAction.remove(this);
}
}
}
};
nodeIndexCreatedAction.add(nodeIndexCreateListener);
Timeout timeoutTask = timerService.newTimeout(new TimerTask() {
@Override public void run(Timeout timeout) throws Exception {
listener.onResponse(new Response(false));
nodeIndexCreatedAction.remove(nodeIndexCreateListener);
}
}, request.timeout, TimerService.ExecutionType.THREADED);
listener.timeout = timeoutTask;
return newClusterStateBuilder().state(currentState).metaData(newMetaData).build();
} catch (Exception e) {
listener.onFailure(e);
return currentState;
}
}
});
}
private void addMappings(Map<String, String> mappings, File mappingsDir) {
File[] mappingsFiles = mappingsDir.listFiles();
for (File mappingFile : mappingsFiles) {
String fileNameNoSuffix = mappingFile.getName().substring(0, mappingFile.getName().lastIndexOf('.'));
if (mappings.containsKey(fileNameNoSuffix)) {
// if we have the mapping defined, ignore it
continue;
}
try {
mappings.put(fileNameNoSuffix, Streams.copyToString(new FileReader(mappingFile)));
} catch (IOException e) {
logger.warn("failed to read mapping [" + fileNameNoSuffix + "] from location [" + mappingFile + "], ignoring...", e);
}
}
}
class CreateIndexListener implements Listener {
private AtomicBoolean notified = new AtomicBoolean();
private final Request request;
private final Listener listener;
volatile Timeout timeout;
private CreateIndexListener(Request request, Listener listener) {
this.request = request;
this.listener = listener;
}
@Override public void onResponse(final Response response) {
if (notified.compareAndSet(false, true)) {
if (timeout != null) {
timeout.cancel();
}
// do the reroute after indices have been created on all the other nodes so we can query them for some info (like shard allocation)
clusterService.submitStateUpdateTask("reroute after index [" + request.index + "] creation", new ProcessedClusterStateUpdateTask() {
@Override public ClusterState execute(ClusterState currentState) {
RoutingTable.Builder routingTableBuilder = new RoutingTable.Builder();
for (IndexRoutingTable indexRoutingTable : currentState.routingTable().indicesRouting().values()) {
routingTableBuilder.add(indexRoutingTable);
}
IndexRoutingTable.Builder indexRoutingBuilder = new IndexRoutingTable.Builder(request.index)
.initializeEmpty(currentState.metaData().index(request.index));
routingTableBuilder.add(indexRoutingBuilder);
RoutingTable newRoutingTable = shardsRoutingStrategy.reroute(newClusterStateBuilder().state(currentState).routingTable(routingTableBuilder).build());
return newClusterStateBuilder().state(currentState).routingTable(newRoutingTable).build();
}
@Override public void clusterStateProcessed(ClusterState clusterState) {
listener.onResponse(response);
}
});
}
}
@Override public void onFailure(Throwable t) {
if (notified.compareAndSet(false, true)) {
if (timeout != null) {
timeout.cancel();
}
listener.onFailure(t);
}
}
}
public static interface Listener {
void onResponse(Response response);
void onFailure(Throwable t);
}
public static class Request {
final String cause;
final String index;
Settings settings = ImmutableSettings.Builder.EMPTY_SETTINGS;
Map<String, String> mappings = Maps.newHashMap();
TimeValue timeout = TimeValue.timeValueSeconds(5);
public Request(String cause, String index) {
this.cause = cause;
this.index = index;
}
public Request settings(Settings settings) {
this.settings = settings;
return this;
}
public Request mappings(Map<String, String> mappings) {
this.mappings.putAll(mappings);
return this;
}
public Request timeout(TimeValue timeout) {
this.timeout = timeout;
return this;
}
}
public static class Response {
private final boolean acknowledged;
public Response(boolean acknowledged) {
this.acknowledged = acknowledged;
}
public boolean acknowledged() {
return acknowledged;
}
}
}

View File

@ -0,0 +1,195 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.cluster.metadata;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.action.index.NodeIndexDeletedAction;
import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.routing.strategy.ShardsRoutingStrategy;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.timer.Timeout;
import org.elasticsearch.common.timer.TimerTask;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.Index;
import org.elasticsearch.indices.IndexMissingException;
import org.elasticsearch.timer.TimerService;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import static org.elasticsearch.cluster.ClusterState.*;
import static org.elasticsearch.cluster.metadata.MetaData.*;
/**
* @author kimchy (shay.banon)
*/
public class MetaDataDeleteIndexService extends AbstractComponent {
private final TimerService timerService;
private final ClusterService clusterService;
private final ShardsRoutingStrategy shardsRoutingStrategy;
private final NodeIndexDeletedAction nodeIndexDeletedAction;
@Inject public MetaDataDeleteIndexService(Settings settings, TimerService timerService, ClusterService clusterService, ShardsRoutingStrategy shardsRoutingStrategy,
NodeIndexDeletedAction nodeIndexDeletedAction) {
super(settings);
this.timerService = timerService;
this.clusterService = clusterService;
this.shardsRoutingStrategy = shardsRoutingStrategy;
this.nodeIndexDeletedAction = nodeIndexDeletedAction;
}
public void deleteIndex(final Request request, final Listener userListener) {
clusterService.submitStateUpdateTask("delete-index [" + request.index + "]", new ClusterStateUpdateTask() {
@Override public ClusterState execute(ClusterState currentState) {
final DeleteIndexListener listener = new DeleteIndexListener(request, userListener);
try {
RoutingTable routingTable = currentState.routingTable();
if (!routingTable.hasIndex(request.index)) {
listener.onFailure(new IndexMissingException(new Index(request.index)));
return currentState;
}
logger.info("[{}] deleting index", request.index);
RoutingTable.Builder routingTableBuilder = new RoutingTable.Builder();
for (IndexRoutingTable indexRoutingTable : currentState.routingTable().indicesRouting().values()) {
if (!indexRoutingTable.index().equals(request.index)) {
routingTableBuilder.add(indexRoutingTable);
}
}
MetaData newMetaData = newMetaDataBuilder()
.metaData(currentState.metaData())
.remove(request.index)
.build();
RoutingTable newRoutingTable = shardsRoutingStrategy.reroute(
newClusterStateBuilder().state(currentState).routingTable(routingTableBuilder).metaData(newMetaData).build());
final AtomicInteger counter = new AtomicInteger(currentState.nodes().size());
final NodeIndexDeletedAction.Listener nodeIndexDeleteListener = new NodeIndexDeletedAction.Listener() {
@Override public void onNodeIndexDeleted(String index, String nodeId) {
if (index.equals(request.index)) {
if (counter.decrementAndGet() == 0) {
listener.onResponse(new Response(true));
nodeIndexDeletedAction.remove(this);
}
}
}
};
nodeIndexDeletedAction.add(nodeIndexDeleteListener);
Timeout timeoutTask = timerService.newTimeout(new TimerTask() {
@Override public void run(Timeout timeout) throws Exception {
listener.onResponse(new Response(false));
nodeIndexDeletedAction.remove(nodeIndexDeleteListener);
}
}, request.timeout, TimerService.ExecutionType.THREADED);
listener.timeout = timeoutTask;
return newClusterStateBuilder().state(currentState).routingTable(newRoutingTable).metaData(newMetaData).build();
} catch (Exception e) {
listener.onFailure(e);
return currentState;
}
}
});
}
class DeleteIndexListener implements Listener {
private AtomicBoolean notified = new AtomicBoolean();
private final Request request;
private final Listener listener;
volatile Timeout timeout;
private DeleteIndexListener(Request request, Listener listener) {
this.request = request;
this.listener = listener;
}
@Override public void onResponse(final Response response) {
if (notified.compareAndSet(false, true)) {
if (timeout != null) {
timeout.cancel();
}
listener.onResponse(response);
}
}
@Override public void onFailure(Throwable t) {
if (notified.compareAndSet(false, true)) {
if (timeout != null) {
timeout.cancel();
}
listener.onFailure(t);
}
}
}
public static interface Listener {
void onResponse(Response response);
void onFailure(Throwable t);
}
public static class Request {
final String index;
TimeValue timeout = TimeValue.timeValueSeconds(10);
public Request(String index) {
this.index = index;
}
public Request timeout(TimeValue timeout) {
this.timeout = timeout;
return this;
}
}
public static class Response {
private final boolean acknowledged;
public Response(boolean acknowledged) {
this.acknowledged = acknowledged;
}
public boolean acknowledged() {
return acknowledged;
}
}
}

View File

@ -0,0 +1,109 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.cluster.metadata;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ProcessedClusterStateUpdateTask;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.Index;
import org.elasticsearch.indices.IndexMissingException;
import java.util.Set;
import static org.elasticsearch.cluster.ClusterState.*;
import static org.elasticsearch.cluster.metadata.IndexMetaData.*;
import static org.elasticsearch.cluster.metadata.MetaData.*;
import static org.elasticsearch.common.collect.Sets.*;
import static org.elasticsearch.common.settings.ImmutableSettings.*;
/**
* @author kimchy (shay.banon)
*/
public class MetaDataIndexAliasesService extends AbstractComponent {
private final ClusterService clusterService;
@Inject public MetaDataIndexAliasesService(Settings settings, ClusterService clusterService) {
super(settings);
this.clusterService = clusterService;
}
public void indicesAliases(final Request request, final Listener listener) {
clusterService.submitStateUpdateTask("index-aliases", new ProcessedClusterStateUpdateTask() {
@Override public ClusterState execute(ClusterState currentState) {
for (AliasAction aliasAction : request.actions) {
if (!currentState.metaData().hasIndex(aliasAction.index())) {
listener.onFailure(new IndexMissingException(new Index(aliasAction.index())));
return currentState;
}
}
MetaData.Builder builder = newMetaDataBuilder().metaData(currentState.metaData());
for (AliasAction aliasAction : request.actions) {
IndexMetaData indexMetaData = builder.get(aliasAction.index());
if (indexMetaData == null) {
throw new IndexMissingException(new Index(aliasAction.index()));
}
Set<String> indexAliases = newHashSet(indexMetaData.settings().getAsArray("index.aliases"));
if (aliasAction.actionType() == AliasAction.Type.ADD) {
indexAliases.add(aliasAction.alias());
} else if (aliasAction.actionType() == AliasAction.Type.REMOVE) {
indexAliases.remove(aliasAction.alias());
}
Settings settings = settingsBuilder().put(indexMetaData.settings())
.putArray("index.aliases", indexAliases.toArray(new String[indexAliases.size()]))
.build();
builder.put(newIndexMetaDataBuilder(indexMetaData).settings(settings));
}
return newClusterStateBuilder().state(currentState).metaData(builder).build();
}
@Override public void clusterStateProcessed(ClusterState clusterState) {
listener.onResponse(new Response());
}
});
}
public static interface Listener {
void onResponse(Response response);
void onFailure(Throwable t);
}
public static class Request {
final AliasAction[] actions;
public Request(AliasAction[] actions) {
this.actions = actions;
}
}
public static class Response {
}
}

View File

@ -0,0 +1,301 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.cluster.metadata;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.action.index.NodeMappingCreatedAction;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.timer.Timeout;
import org.elasticsearch.common.timer.TimerTask;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.mapper.DocumentMapper;
import org.elasticsearch.index.mapper.InvalidTypeNameException;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.mapper.MergeMappingException;
import org.elasticsearch.index.service.IndexService;
import org.elasticsearch.indices.IndexMissingException;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.timer.TimerService;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import static org.elasticsearch.cluster.ClusterState.*;
import static org.elasticsearch.cluster.metadata.IndexMetaData.*;
import static org.elasticsearch.cluster.metadata.MetaData.*;
import static org.elasticsearch.common.collect.Maps.*;
import static org.elasticsearch.common.collect.Sets.*;
import static org.elasticsearch.index.mapper.DocumentMapper.MergeFlags.*;
/**
* @author kimchy (shay.banon)
*/
public class MetaDataMappingService extends AbstractComponent {
private final ClusterService clusterService;
private final IndicesService indicesService;
private final TimerService timerService;
private final NodeMappingCreatedAction nodeMappingCreatedAction;
@Inject public MetaDataMappingService(Settings settings, ClusterService clusterService, IndicesService indicesService,
TimerService timerService, NodeMappingCreatedAction nodeMappingCreatedAction) {
super(settings);
this.clusterService = clusterService;
this.indicesService = indicesService;
this.timerService = timerService;
this.nodeMappingCreatedAction = nodeMappingCreatedAction;
}
public void updateMapping(final String index, final String type, final String mappingSource) {
clusterService.submitStateUpdateTask("update-mapping [" + index + "][" + type + "]", new ClusterStateUpdateTask() {
@Override public ClusterState execute(ClusterState currentState) {
MapperService mapperService = indicesService.indexServiceSafe(index).mapperService();
DocumentMapper existingMapper = mapperService.documentMapper(type);
// parse the updated one
DocumentMapper updatedMapper = mapperService.parse(type, mappingSource);
if (existingMapper == null) {
existingMapper = updatedMapper;
} else {
// merge from the updated into the existing, ignore conflicts (we know we have them, we just want the new ones)
existingMapper.merge(updatedMapper, mergeFlags().simulate(false));
}
// build the updated mapping source
final String updatedMappingSource = existingMapper.buildSource();
if (logger.isDebugEnabled()) {
logger.debug("[{}] update mapping [{}] (dynamic) with source [{}]", index, type, updatedMappingSource);
} else if (logger.isInfoEnabled()) {
logger.info("[{}] update mapping [{}] (dynamic)", index, type);
}
MetaData.Builder builder = newMetaDataBuilder().metaData(currentState.metaData());
IndexMetaData indexMetaData = currentState.metaData().index(index);
builder.put(newIndexMetaDataBuilder(indexMetaData).putMapping(type, updatedMappingSource));
return newClusterStateBuilder().state(currentState).metaData(builder).build();
}
});
}
public void putMapping(final Request request, final Listener userListener) {
clusterService.submitStateUpdateTask("put-mapping [" + request.mappingType + "]", new ClusterStateUpdateTask() {
@Override public ClusterState execute(ClusterState currentState) {
final PutMappingListener listener = new PutMappingListener(request, userListener);
try {
if (request.indices.length == 0) {
throw new IndexMissingException(new Index("_all"));
}
for (String index : request.indices) {
if (!currentState.metaData().hasIndex(index)) {
listener.onFailure(new IndexMissingException(new Index(index)));
}
}
Map<String, DocumentMapper> newMappers = newHashMap();
Map<String, DocumentMapper> existingMappers = newHashMap();
for (String index : request.indices) {
IndexService indexService = indicesService.indexService(index);
if (indexService != null) {
// try and parse it (no need to add it here) so we can bail early in case of parsing exception
DocumentMapper newMapper = indexService.mapperService().parse(request.mappingType, request.mappingSource);
newMappers.put(index, newMapper);
DocumentMapper existingMapper = indexService.mapperService().documentMapper(request.mappingType);
if (existingMapper != null) {
// first, simulate
DocumentMapper.MergeResult mergeResult = existingMapper.merge(newMapper, mergeFlags().simulate(true));
// if we have conflicts, and we are not supposed to ignore them, throw an exception
if (!request.ignoreConflicts && mergeResult.hasConflicts()) {
throw new MergeMappingException(mergeResult.conflicts());
}
existingMappers.put(index, existingMapper);
}
} else {
throw new IndexMissingException(new Index(index));
}
}
String mappingType = request.mappingType;
if (mappingType == null) {
mappingType = newMappers.values().iterator().next().type();
} else if (!mappingType.equals(newMappers.values().iterator().next().type())) {
throw new InvalidTypeNameException("Type name provided does not match type name within mapping definition");
}
if (mappingType.charAt(0) == '_') {
throw new InvalidTypeNameException("Document mapping type name can't start with '_'");
}
final Map<String, Tuple<String, String>> mappings = newHashMap();
for (Map.Entry<String, DocumentMapper> entry : newMappers.entrySet()) {
Tuple<String, String> mapping;
String index = entry.getKey();
// do the actual merge here on the master, and update the mapping source
DocumentMapper newMapper = entry.getValue();
if (existingMappers.containsKey(entry.getKey())) {
// we have an existing mapping, do the merge here (on the master), it will automatically update the mapping source
DocumentMapper existingMapper = existingMappers.get(entry.getKey());
existingMapper.merge(newMapper, mergeFlags().simulate(false));
// use the merged mapping source
mapping = new Tuple<String, String>(existingMapper.type(), existingMapper.buildSource());
} else {
mapping = new Tuple<String, String>(newMapper.type(), newMapper.buildSource());
}
mappings.put(index, mapping);
if (logger.isDebugEnabled()) {
logger.debug("[{}] put_mapping [{}] with source [{}]", index, mapping.v1(), mapping.v2());
} else if (logger.isInfoEnabled()) {
logger.info("[{}] put_mapping [{}]", index, mapping.v1());
}
}
MetaData.Builder builder = newMetaDataBuilder().metaData(currentState.metaData());
for (String indexName : request.indices) {
IndexMetaData indexMetaData = currentState.metaData().index(indexName);
if (indexMetaData == null) {
throw new IndexMissingException(new Index(indexName));
}
Tuple<String, String> mapping = mappings.get(indexName);
builder.put(newIndexMetaDataBuilder(indexMetaData).putMapping(mapping.v1(), mapping.v2()));
}
final AtomicInteger counter = new AtomicInteger(clusterService.state().nodes().size() * request.indices.length);
final Set<String> indicesSet = newHashSet(request.indices);
final NodeMappingCreatedAction.Listener nodeMappingListener = new NodeMappingCreatedAction.Listener() {
@Override public void onNodeMappingCreated(NodeMappingCreatedAction.NodeMappingCreatedResponse response) {
if (indicesSet.contains(response.index()) && response.type().equals(request.mappingType)) {
if (counter.decrementAndGet() == 0) {
listener.onResponse(new Response(true));
nodeMappingCreatedAction.remove(this);
}
}
}
};
nodeMappingCreatedAction.add(nodeMappingListener);
Timeout timeoutTask = timerService.newTimeout(new TimerTask() {
@Override public void run(Timeout timeout) throws Exception {
listener.onResponse(new Response(false));
nodeMappingCreatedAction.remove(nodeMappingListener);
}
}, request.timeout, TimerService.ExecutionType.THREADED);
listener.timeout = timeoutTask;
return newClusterStateBuilder().state(currentState).metaData(builder).build();
} catch (Exception e) {
listener.onFailure(e);
return currentState;
}
}
});
}
class PutMappingListener implements Listener {
private AtomicBoolean notified = new AtomicBoolean();
private final Request request;
private final Listener listener;
volatile Timeout timeout;
private PutMappingListener(Request request, Listener listener) {
this.request = request;
this.listener = listener;
}
@Override public void onResponse(final Response response) {
if (notified.compareAndSet(false, true)) {
if (timeout != null) {
timeout.cancel();
}
listener.onResponse(response);
}
}
@Override public void onFailure(Throwable t) {
if (notified.compareAndSet(false, true)) {
if (timeout != null) {
timeout.cancel();
}
listener.onFailure(t);
}
}
}
public static interface Listener {
void onResponse(Response response);
void onFailure(Throwable t);
}
public static class Request {
final String[] indices;
final String mappingType;
final String mappingSource;
boolean ignoreConflicts = false;
TimeValue timeout = TimeValue.timeValueSeconds(10);
public Request(String[] indices, String mappingType, String mappingSource) {
this.indices = indices;
this.mappingType = mappingType;
this.mappingSource = mappingSource;
}
public Request ignoreConflicts(boolean ignoreConflicts) {
this.ignoreConflicts = ignoreConflicts;
return this;
}
public Request timeout(TimeValue timeout) {
this.timeout = timeout;
return this;
}
}
public static class Response {
private final boolean acknowledged;
public Response(boolean acknowledged) {
this.acknowledged = acknowledged;
}
public boolean acknowledged() {
return acknowledged;
}
}
}

View File

@ -1,525 +0,0 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.cluster.metadata;
import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.ProcessedClusterStateUpdateTask;
import org.elasticsearch.cluster.action.index.NodeIndexCreatedAction;
import org.elasticsearch.cluster.action.index.NodeIndexDeletedAction;
import org.elasticsearch.cluster.action.index.NodeMappingCreatedAction;
import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.routing.strategy.ShardsRoutingStrategy;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.collect.Maps;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.Streams;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.env.Environment;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.mapper.DocumentMapper;
import org.elasticsearch.index.mapper.InvalidTypeNameException;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.mapper.MergeMappingException;
import org.elasticsearch.index.service.IndexService;
import org.elasticsearch.indices.IndexAlreadyExistsException;
import org.elasticsearch.indices.IndexMissingException;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.indices.InvalidIndexNameException;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import static org.elasticsearch.cluster.ClusterState.*;
import static org.elasticsearch.cluster.metadata.IndexMetaData.*;
import static org.elasticsearch.cluster.metadata.MetaData.*;
import static org.elasticsearch.common.collect.Maps.*;
import static org.elasticsearch.common.collect.Sets.*;
import static org.elasticsearch.common.settings.ImmutableSettings.*;
import static org.elasticsearch.index.mapper.DocumentMapper.MergeFlags.*;
/**
* @author kimchy (shay.banon)
*/
public class MetaDataService extends AbstractComponent {
private final Environment environment;
private final ClusterService clusterService;
private final ShardsRoutingStrategy shardsRoutingStrategy;
private final IndicesService indicesService;
private final NodeIndexCreatedAction nodeIndexCreatedAction;
private final NodeIndexDeletedAction nodeIndexDeletedAction;
private final NodeMappingCreatedAction nodeMappingCreatedAction;
private final Object mutex = new Object();
@Inject public MetaDataService(Settings settings, Environment environment, ClusterService clusterService, IndicesService indicesService, ShardsRoutingStrategy shardsRoutingStrategy,
NodeIndexCreatedAction nodeIndexCreatedAction, NodeIndexDeletedAction nodeIndexDeletedAction,
NodeMappingCreatedAction nodeMappingCreatedAction) {
super(settings);
this.environment = environment;
this.clusterService = clusterService;
this.indicesService = indicesService;
this.shardsRoutingStrategy = shardsRoutingStrategy;
this.nodeIndexCreatedAction = nodeIndexCreatedAction;
this.nodeIndexDeletedAction = nodeIndexDeletedAction;
this.nodeMappingCreatedAction = nodeMappingCreatedAction;
}
// TODO should find nicer solution than sync here, since we block for timeout (same for other ops)
public IndicesAliasesResult indicesAliases(final List<AliasAction> aliasActions) {
synchronized (mutex) {
ClusterState clusterState = clusterService.state();
for (AliasAction aliasAction : aliasActions) {
if (!clusterState.metaData().hasIndex(aliasAction.index())) {
throw new IndexMissingException(new Index(aliasAction.index()));
}
}
clusterService.submitStateUpdateTask("index-aliases", new ClusterStateUpdateTask() {
@Override public ClusterState execute(ClusterState currentState) {
MetaData.Builder builder = newMetaDataBuilder().metaData(currentState.metaData());
for (AliasAction aliasAction : aliasActions) {
IndexMetaData indexMetaData = builder.get(aliasAction.index());
if (indexMetaData == null) {
throw new IndexMissingException(new Index(aliasAction.index()));
}
Set<String> indexAliases = newHashSet(indexMetaData.settings().getAsArray("index.aliases"));
if (aliasAction.actionType() == AliasAction.Type.ADD) {
indexAliases.add(aliasAction.alias());
} else if (aliasAction.actionType() == AliasAction.Type.REMOVE) {
indexAliases.remove(aliasAction.alias());
}
Settings settings = settingsBuilder().put(indexMetaData.settings())
.putArray("index.aliases", indexAliases.toArray(new String[indexAliases.size()]))
.build();
builder.put(newIndexMetaDataBuilder(indexMetaData).settings(settings));
}
return newClusterStateBuilder().state(currentState).metaData(builder).build();
}
});
return new IndicesAliasesResult();
}
}
public CreateIndexResult createIndex(final String cause, final String index, final Settings indexSettings, Map<String, String> mappings, TimeValue timeout) throws IndexAlreadyExistsException {
final CountDownLatch latch = new CountDownLatch(clusterService.state().nodes().size());
NodeIndexCreatedAction.Listener nodeCreatedListener = new NodeIndexCreatedAction.Listener() {
@Override public void onNodeIndexCreated(String mIndex, String nodeId) {
if (index.equals(mIndex)) {
latch.countDown();
}
}
};
synchronized (mutex) {
ClusterState clusterState = clusterService.state();
if (clusterState.routingTable().hasIndex(index)) {
throw new IndexAlreadyExistsException(new Index(index));
}
if (clusterState.metaData().hasIndex(index)) {
throw new IndexAlreadyExistsException(new Index(index));
}
if (index.contains(" ")) {
throw new InvalidIndexNameException(new Index(index), index, "must not contain whitespace");
}
if (index.contains(",")) {
throw new InvalidIndexNameException(new Index(index), index, "must not contain ',");
}
if (index.contains("#")) {
throw new InvalidIndexNameException(new Index(index), index, "must not contain '#");
}
if (index.charAt(0) == '_') {
throw new InvalidIndexNameException(new Index(index), index, "must not start with '_'");
}
if (!index.toLowerCase().equals(index)) {
throw new InvalidIndexNameException(new Index(index), index, "must be lowercase");
}
if (!Strings.validFileName(index)) {
throw new InvalidIndexNameException(new Index(index), index, "must not contain the following characters " + Strings.INVALID_FILENAME_CHARS);
}
if (clusterState.metaData().aliases().contains(index)) {
throw new InvalidIndexNameException(new Index(index), index, "an alias with the same name already exists");
}
// add to the mappings files that exists within the config/mappings location
if (mappings == null) {
mappings = Maps.newHashMap();
} else {
mappings = Maps.newHashMap(mappings);
}
File mappingsDir = new File(environment.configFile(), "mappings");
if (mappingsDir.exists() && mappingsDir.isDirectory()) {
File defaultMappingsDir = new File(mappingsDir, "_default");
if (mappingsDir.exists() && mappingsDir.isDirectory()) {
addMappings(mappings, defaultMappingsDir);
}
File indexMappingsDir = new File(mappingsDir, index);
if (mappingsDir.exists() && mappingsDir.isDirectory()) {
addMappings(mappings, indexMappingsDir);
}
}
final Map<String, String> fMappings = mappings;
nodeIndexCreatedAction.add(nodeCreatedListener);
clusterService.submitStateUpdateTask("create-index [" + index + "], cause [" + cause + "]", new ClusterStateUpdateTask() {
@Override public ClusterState execute(ClusterState currentState) {
ImmutableSettings.Builder indexSettingsBuilder = settingsBuilder().put(indexSettings);
if (indexSettings.get(SETTING_NUMBER_OF_SHARDS) == null) {
indexSettingsBuilder.put(SETTING_NUMBER_OF_SHARDS, settings.getAsInt(SETTING_NUMBER_OF_SHARDS, 5));
}
if (indexSettings.get(SETTING_NUMBER_OF_REPLICAS) == null) {
indexSettingsBuilder.put(SETTING_NUMBER_OF_REPLICAS, settings.getAsInt(SETTING_NUMBER_OF_REPLICAS, 1));
}
Settings actualIndexSettings = indexSettingsBuilder.build();
IndexMetaData.Builder indexMetaData = newIndexMetaDataBuilder(index).settings(actualIndexSettings);
for (Map.Entry<String, String> entry : fMappings.entrySet()) {
indexMetaData.putMapping(entry.getKey(), entry.getValue());
}
MetaData newMetaData = newMetaDataBuilder()
.metaData(currentState.metaData())
.put(indexMetaData)
.build();
logger.info("[{}] creating index, cause [{}], shards [{}]/[{}], mappings {}", index, cause, indexMetaData.numberOfShards(), indexMetaData.numberOfReplicas(), fMappings.keySet());
return newClusterStateBuilder().state(currentState).metaData(newMetaData).build();
}
});
}
boolean acknowledged;
try {
acknowledged = latch.await(timeout.millis(), TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
acknowledged = false;
} finally {
nodeIndexCreatedAction.remove(nodeCreatedListener);
}
final CountDownLatch latch2 = new CountDownLatch(1);
// do the reroute after indices have been created on all the other nodes so we can query them for some info
clusterService.submitStateUpdateTask("reroute after index [" + index + "] creation", new ProcessedClusterStateUpdateTask() {
@Override public ClusterState execute(ClusterState currentState) {
RoutingTable.Builder routingTableBuilder = new RoutingTable.Builder();
for (IndexRoutingTable indexRoutingTable : currentState.routingTable().indicesRouting().values()) {
routingTableBuilder.add(indexRoutingTable);
}
IndexRoutingTable.Builder indexRoutingBuilder = new IndexRoutingTable.Builder(index)
.initializeEmpty(currentState.metaData().index(index));
routingTableBuilder.add(indexRoutingBuilder);
RoutingTable newRoutingTable = shardsRoutingStrategy.reroute(newClusterStateBuilder().state(currentState).routingTable(routingTableBuilder).build());
return newClusterStateBuilder().state(currentState).routingTable(newRoutingTable).build();
}
@Override public void clusterStateProcessed(ClusterState clusterState) {
latch2.countDown();
}
});
// wait till it got processed (on the master, we are the master)
try {
latch2.await(timeout.millis(), TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
// ignore
}
return new CreateIndexResult(acknowledged);
}
private void addMappings(Map<String, String> mappings, File mappingsDir) {
File[] mappingsFiles = mappingsDir.listFiles();
for (File mappingFile : mappingsFiles) {
String fileNameNoSuffix = mappingFile.getName().substring(0, mappingFile.getName().lastIndexOf('.'));
if (mappings.containsKey(fileNameNoSuffix)) {
// if we have the mapping defined, ignore it
continue;
}
try {
mappings.put(fileNameNoSuffix, Streams.copyToString(new FileReader(mappingFile)));
} catch (IOException e) {
logger.warn("failed to read mapping [" + fileNameNoSuffix + "] from location [" + mappingFile + "], ignoring...", e);
}
}
}
public DeleteIndexResult deleteIndex(final String index, TimeValue timeout) throws IndexMissingException {
ClusterState clusterState = clusterService.state();
final CountDownLatch latch = new CountDownLatch(clusterState.nodes().size());
NodeIndexDeletedAction.Listener listener = new NodeIndexDeletedAction.Listener() {
@Override public void onNodeIndexDeleted(String fIndex, String nodeId) {
if (fIndex.equals(index)) {
latch.countDown();
}
}
};
nodeIndexDeletedAction.add(listener);
synchronized (mutex) {
RoutingTable routingTable = clusterState.routingTable();
if (!routingTable.hasIndex(index)) {
throw new IndexMissingException(new Index(index));
}
logger.info("[{}] deleting index", index);
clusterService.submitStateUpdateTask("delete-index [" + index + "]", new ClusterStateUpdateTask() {
@Override public ClusterState execute(ClusterState currentState) {
RoutingTable.Builder routingTableBuilder = new RoutingTable.Builder();
for (IndexRoutingTable indexRoutingTable : currentState.routingTable().indicesRouting().values()) {
if (!indexRoutingTable.index().equals(index)) {
routingTableBuilder.add(indexRoutingTable);
}
}
MetaData newMetaData = newMetaDataBuilder()
.metaData(currentState.metaData())
.remove(index)
.build();
RoutingTable newRoutingTable = shardsRoutingStrategy.reroute(
newClusterStateBuilder().state(currentState).routingTable(routingTableBuilder).metaData(newMetaData).build());
return newClusterStateBuilder().state(currentState).routingTable(newRoutingTable).metaData(newMetaData).build();
}
});
}
boolean acknowledged;
try {
acknowledged = latch.await(timeout.millis(), TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
acknowledged = false;
} finally {
nodeIndexDeletedAction.remove(listener);
}
return new DeleteIndexResult(acknowledged);
}
public void updateMapping(final String index, final String type, final String mappingSource) {
synchronized (mutex) {
MapperService mapperService = indicesService.indexServiceSafe(index).mapperService();
DocumentMapper existingMapper = mapperService.documentMapper(type);
// parse the updated one
DocumentMapper updatedMapper = mapperService.parse(type, mappingSource);
if (existingMapper == null) {
existingMapper = updatedMapper;
} else {
// merge from the updated into the existing, ignore conflicts (we know we have them, we just want the new ones)
existingMapper.merge(updatedMapper, mergeFlags().simulate(false));
}
// build the updated mapping source
final String updatedMappingSource = existingMapper.buildSource();
if (logger.isDebugEnabled()) {
logger.debug("[{}] update mapping [{}] (dynamic) with source [{}]", index, type, updatedMappingSource);
} else if (logger.isInfoEnabled()) {
logger.info("[{}] update mapping [{}] (dynamic)", index, type);
}
// publish the new mapping
clusterService.submitStateUpdateTask("update-mapping [" + index + "][" + type + "]", new ClusterStateUpdateTask() {
@Override public ClusterState execute(ClusterState currentState) {
MetaData.Builder builder = newMetaDataBuilder().metaData(currentState.metaData());
IndexMetaData indexMetaData = currentState.metaData().index(index);
builder.put(newIndexMetaDataBuilder(indexMetaData).putMapping(type, updatedMappingSource));
return newClusterStateBuilder().state(currentState).metaData(builder).build();
}
});
}
}
public PutMappingResult putMapping(final String[] indices, String mappingType, final String mappingSource, boolean ignoreConflicts, TimeValue timeout) throws ElasticSearchException {
ClusterState clusterState = clusterService.state();
final CountDownLatch latch = new CountDownLatch(clusterService.state().nodes().size() * indices.length);
final Set<String> indicesSet = newHashSet(indices);
final String fMappingType = mappingType;
NodeMappingCreatedAction.Listener listener = new NodeMappingCreatedAction.Listener() {
@Override public void onNodeMappingCreated(NodeMappingCreatedAction.NodeMappingCreatedResponse response) {
if (indicesSet.contains(response.index()) && response.type().equals(fMappingType)) {
latch.countDown();
}
}
};
synchronized (mutex) {
if (indices.length == 0) {
throw new IndexMissingException(new Index("_all"));
}
for (String index : indices) {
if (!clusterState.metaData().hasIndex(index)) {
throw new IndexMissingException(new Index(index));
}
}
Map<String, DocumentMapper> newMappers = newHashMap();
Map<String, DocumentMapper> existingMappers = newHashMap();
for (String index : indices) {
IndexService indexService = indicesService.indexService(index);
if (indexService != null) {
// try and parse it (no need to add it here) so we can bail early in case of parsing exception
DocumentMapper newMapper = indexService.mapperService().parse(mappingType, mappingSource);
newMappers.put(index, newMapper);
DocumentMapper existingMapper = indexService.mapperService().documentMapper(mappingType);
if (existingMapper != null) {
// first, simulate
DocumentMapper.MergeResult mergeResult = existingMapper.merge(newMapper, mergeFlags().simulate(true));
// if we have conflicts, and we are not supposed to ignore them, throw an exception
if (!ignoreConflicts && mergeResult.hasConflicts()) {
throw new MergeMappingException(mergeResult.conflicts());
}
existingMappers.put(index, existingMapper);
}
} else {
throw new IndexMissingException(new Index(index));
}
}
if (mappingType == null) {
mappingType = newMappers.values().iterator().next().type();
} else if (!mappingType.equals(newMappers.values().iterator().next().type())) {
throw new InvalidTypeNameException("Type name provided does not match type name within mapping definition");
}
if (mappingType.charAt(0) == '_') {
throw new InvalidTypeNameException("Document mapping type name can't start with '_'");
}
final Map<String, Tuple<String, String>> mappings = newHashMap();
for (Map.Entry<String, DocumentMapper> entry : newMappers.entrySet()) {
Tuple<String, String> mapping;
String index = entry.getKey();
// do the actual merge here on the master, and update the mapping source
DocumentMapper newMapper = entry.getValue();
if (existingMappers.containsKey(entry.getKey())) {
// we have an existing mapping, do the merge here (on the master), it will automatically update the mapping source
DocumentMapper existingMapper = existingMappers.get(entry.getKey());
existingMapper.merge(newMapper, mergeFlags().simulate(false));
// use the merged mapping source
mapping = new Tuple<String, String>(existingMapper.type(), existingMapper.buildSource());
} else {
mapping = new Tuple<String, String>(newMapper.type(), newMapper.buildSource());
}
mappings.put(index, mapping);
if (logger.isDebugEnabled()) {
logger.debug("[{}] put_mapping [{}] with source [{}]", index, mapping.v1(), mapping.v2());
} else if (logger.isInfoEnabled()) {
logger.info("[{}] put_mapping [{}]", index, mapping.v1());
}
}
nodeMappingCreatedAction.add(listener);
clusterService.submitStateUpdateTask("put-mapping [" + mappingType + "]", new ClusterStateUpdateTask() {
@Override public ClusterState execute(ClusterState currentState) {
MetaData.Builder builder = newMetaDataBuilder().metaData(currentState.metaData());
for (String indexName : indices) {
IndexMetaData indexMetaData = currentState.metaData().index(indexName);
if (indexMetaData == null) {
throw new IndexMissingException(new Index(indexName));
}
Tuple<String, String> mapping = mappings.get(indexName);
builder.put(newIndexMetaDataBuilder(indexMetaData).putMapping(mapping.v1(), mapping.v2()));
}
return newClusterStateBuilder().state(currentState).metaData(builder).build();
}
});
}
boolean acknowledged;
try {
acknowledged = latch.await(timeout.millis(), TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
acknowledged = false;
} finally {
nodeMappingCreatedAction.remove(listener);
}
return new PutMappingResult(acknowledged);
}
/**
* The result of a putting mapping.
*/
public static class PutMappingResult {
private final boolean acknowledged;
public PutMappingResult(boolean acknowledged) {
this.acknowledged = acknowledged;
}
public boolean acknowledged() {
return acknowledged;
}
}
public static class CreateIndexResult {
private final boolean acknowledged;
public CreateIndexResult(boolean acknowledged) {
this.acknowledged = acknowledged;
}
public boolean acknowledged() {
return acknowledged;
}
}
public static class DeleteIndexResult {
private final boolean acknowledged;
public DeleteIndexResult(boolean acknowledged) {
this.acknowledged = acknowledged;
}
public boolean acknowledged() {
return acknowledged;
}
}
public static class IndicesAliasesResult {
public IndicesAliasesResult() {
}
}
}

View File

@ -26,7 +26,7 @@ import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.block.ClusterBlocks;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.metadata.MetaDataService;
import org.elasticsearch.cluster.metadata.MetaDataCreateIndexService;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
@ -63,7 +63,7 @@ public class GatewayService extends AbstractLifecycleComponent<GatewayService> i
private final DiscoveryService discoveryService;
private final MetaDataService metaDataService;
private final MetaDataCreateIndexService createIndexService;
private final TimeValue initialStateTimeout;
@ -75,13 +75,13 @@ public class GatewayService extends AbstractLifecycleComponent<GatewayService> i
private final AtomicBoolean readFromGateway = new AtomicBoolean();
@Inject public GatewayService(Settings settings, Gateway gateway, ClusterService clusterService, DiscoveryService discoveryService,
ThreadPool threadPool, MetaDataService metaDataService) {
ThreadPool threadPool, MetaDataCreateIndexService createIndexService) {
super(settings);
this.gateway = gateway;
this.clusterService = clusterService;
this.discoveryService = discoveryService;
this.threadPool = threadPool;
this.metaDataService = metaDataService;
this.createIndexService = createIndexService;
this.initialStateTimeout = componentSettings.getAsTime("initial_state_timeout", TimeValue.timeValueSeconds(30));
// allow to control a delay of when indices will get created
this.recoverAfterTime = componentSettings.getAsTime("recover_after_time", null);
@ -247,15 +247,13 @@ public class GatewayService extends AbstractLifecycleComponent<GatewayService> i
// go over the meta data and create indices, we don't really need to copy over
// the meta data per index, since we create the index and it will be added automatically
for (final IndexMetaData indexMetaData : fMetaData) {
threadPool.execute(new Runnable() {
@Override public void run() {
try {
metaDataService.createIndex("gateway", indexMetaData.index(), indexMetaData.settings(), indexMetaData.mappings(), timeValueSeconds(30));
} catch (Exception e) {
logger.error("failed to create index [" + indexMetaData.index() + "]", e);
} finally {
latch.countDown();
}
createIndexService.createIndex(new MetaDataCreateIndexService.Request("gateway", indexMetaData.index()).settings(indexMetaData.settings()).mappings(indexMetaData.mappings()).timeout(timeValueSeconds(30)), new MetaDataCreateIndexService.Listener() {
@Override public void onResponse(MetaDataCreateIndexService.Response response) {
latch.countDown();
}
@Override public void onFailure(Throwable t) {
logger.error("failed to create index [{}]", indexMetaData.index(), t);
}
});
}

View File

@ -30,6 +30,7 @@ import org.elasticsearch.cluster.ClusterModule;
import org.elasticsearch.cluster.ClusterNameModule;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.routing.RoutingService;
import org.elasticsearch.common.StopWatch;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.component.Lifecycle;
import org.elasticsearch.common.component.LifecycleComponent;
@ -241,43 +242,67 @@ public final class InternalNode implements Node {
ESLogger logger = Loggers.getLogger(Node.class, settings.get("name"));
logger.info("{{}}[{}]: closing ...", Version.full(), JvmInfo.jvmInfo().pid());
StopWatch stopWatch = new StopWatch("node_close");
stopWatch.start("http");
if (settings.getAsBoolean("http.enabled", true)) {
injector.getInstance(HttpServer.class).close();
}
stopWatch.stop().start("client");
injector.getInstance(Client.class).close();
stopWatch.stop().start("routing");
injector.getInstance(RoutingService.class).close();
stopWatch.stop().start("cluster");
injector.getInstance(ClusterService.class).close();
stopWatch.stop().start("discovery");
injector.getInstance(DiscoveryService.class).close();
stopWatch.stop().start("monitor");
injector.getInstance(MonitorService.class).close();
stopWatch.stop().start("gateway");
injector.getInstance(GatewayService.class).close();
stopWatch.stop().start("search");
injector.getInstance(SearchService.class).close();
stopWatch.stop().start("indices_cluster");
injector.getInstance(IndicesClusterStateService.class).close();
stopWatch.stop().start("indices");
injector.getInstance(IndicesService.class).close();
stopWatch.stop().start("rest");
injector.getInstance(RestController.class).close();
stopWatch.stop().start("transport");
injector.getInstance(TransportService.class).close();
stopWatch.stop().start("http_client");
injector.getInstance(HttpClientService.class).close();
for (Class<? extends LifecycleComponent> plugin : pluginsService.services()) {
stopWatch.stop().start("plugin(" + plugin.getName() + ")");
injector.getInstance(plugin).close();
}
stopWatch.stop().start("node_cache");
injector.getInstance(NodeCache.class).close();
stopWatch.stop().start("timer");
injector.getInstance(TimerService.class).close();
stopWatch.stop().start("thread_pool");
injector.getInstance(ThreadPool.class).shutdown();
try {
injector.getInstance(ThreadPool.class).awaitTermination(10, TimeUnit.SECONDS);
} catch (InterruptedException e) {
// ignore
}
stopWatch.stop().start("thread_pool_force_shutdown");
try {
injector.getInstance(ThreadPool.class).shutdownNow();
} catch (Exception e) {
// ignore
}
stopWatch.stop();
ThreadLocals.clearReferencesThreadLocals();
if (logger.isTraceEnabled()) {
logger.trace("Close times for each service:\n{}", stopWatch.prettyPrint());
}
logger.info("{{}}[{}]: closed", Version.full(), JvmInfo.jvmInfo().pid());
}

View File

@ -317,7 +317,7 @@ public class SearchService extends AbstractLifecycleComponent<SearchService> {
}
context.keepAlive(keepAlive);
context.accessed(timerService.estimatedTimeInMillis());
context.keepAliveTimeout(timerService.newTimeout(new KeepAliveTimerTask(context), keepAlive));
context.keepAliveTimeout(timerService.newTimeout(new KeepAliveTimerTask(context), keepAlive, TimerService.ExecutionType.DEFAULT));
return context;
}
@ -424,7 +424,7 @@ public class SearchService extends AbstractLifecycleComponent<SearchService> {
freeContext(context.id());
} else {
// Read occurred before the timeout - set a new timeout with shorter delay.
context.keepAliveTimeout(timerService.newTimeout(this, nextDelay, TimeUnit.MILLISECONDS));
context.keepAliveTimeout(timerService.newTimeout(this, nextDelay, TimeUnit.MILLISECONDS, TimerService.ExecutionType.DEFAULT));
}
}
}

View File

@ -74,17 +74,25 @@ public abstract class AbstractThreadPool extends AbstractComponent implements Th
return scheduledExecutorService.scheduleWithFixedDelay(command, initialDelay, delay, unit);
}
@Override public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, TimeValue interval) {
return scheduleWithFixedDelay(command, interval.millis(), interval.millis(), TimeUnit.MILLISECONDS);
}
@Override public void shutdown() {
started = false;
logger.debug("Shutting down {} thread pool", getType());
logger.debug("shutting down {} thread pool", getType());
executorService.shutdown();
scheduledExecutorService.shutdown();
}
@Override public void shutdownNow() {
started = false;
executorService.shutdownNow();
scheduledExecutorService.shutdownNow();
if (!executorService.isTerminated()) {
executorService.shutdownNow();
}
if (!executorService.isTerminated()) {
scheduledExecutorService.shutdownNow();
}
}
@Override public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
@ -121,10 +129,6 @@ public abstract class AbstractThreadPool extends AbstractComponent implements Th
return schedule(command, delay.millis(), TimeUnit.MILLISECONDS);
}
@Override public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, TimeValue interval) {
return scheduleWithFixedDelay(command, interval.millis(), interval.millis(), TimeUnit.MILLISECONDS);
}
@Override public void execute(Runnable command) {
executorService.execute(command);
}

View File

@ -41,6 +41,11 @@ import static org.elasticsearch.common.util.concurrent.EsExecutors.*;
*/
public class TimerService extends AbstractComponent {
public static enum ExecutionType {
DEFAULT,
THREADED
}
private final ThreadPool threadPool;
private final TimeEstimator timeEstimator;
@ -79,14 +84,41 @@ public class TimerService extends AbstractComponent {
return timeEstimator.time();
}
public Timeout newTimeout(TimerTask task, TimeValue delay) {
return newTimeout(task, delay.nanos(), TimeUnit.NANOSECONDS);
public Timeout newTimeout(TimerTask task, TimeValue delay, ExecutionType executionType) {
return newTimeout(task, delay.nanos(), TimeUnit.NANOSECONDS, executionType);
}
public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) {
public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit, ExecutionType executionType) {
if (executionType == ExecutionType.THREADED) {
task = new ThreadedTimerTask(threadPool, task);
}
return timer.newTimeout(task, delay, unit);
}
private class ThreadedTimerTask implements TimerTask {
private final ThreadPool threadPool;
private final TimerTask task;
private ThreadedTimerTask(ThreadPool threadPool, TimerTask task) {
this.threadPool = threadPool;
this.task = task;
}
@Override public void run(final Timeout timeout) throws Exception {
threadPool.execute(new Runnable() {
@Override public void run() {
try {
task.run(timeout);
} catch (Exception e) {
logger.warn("An exception was thrown by " + TimerTask.class.getSimpleName() + ".", e);
}
}
});
}
}
private static class TimeEstimator implements Runnable {
private long time = System.currentTimeMillis();

View File

@ -177,7 +177,7 @@ public class TransportService extends AbstractLifecycleComponent<TransportServic
Timeout timeoutX = null;
try {
if (timeout != null) {
timeoutX = timerService.newTimeout(new TimeoutTimerTask(requestId), timeout);
timeoutX = timerService.newTimeout(new TimeoutTimerTask(requestId), timeout, TimerService.ExecutionType.THREADED);
}
clientHandlers.put(requestId, new RequestHolder<T>(handler, node, action, timeoutX));
transport.sendRequest(node, requestId, action, message);
@ -313,13 +313,7 @@ public class TransportService extends AbstractLifecycleComponent<TransportServic
if (holder != null) {
// add it to the timeout information holder, in case we are going to get a response later
timeoutInfoHandlers.put(requestId, new TimeoutInfoHolder(holder.node(), holder.action()));
// callback that an exception happened, but on a different thread since we don't
// want handlers to worry about stack overflows
threadPool.execute(new Runnable() {
@Override public void run() {
holder.handler().handleException(new ReceiveTimeoutTransportException(holder.node(), holder.action()));
}
});
holder.handler().handleException(new ReceiveTimeoutTransportException(holder.node(), holder.action()));
}
}
}

View File

@ -31,6 +31,7 @@ import static org.hamcrest.Matchers.*;
/**
* @author kimchy (shay.banon)
*/
@Test(enabled = false)
public class BlockingThreadPoolTest {
@Test public void testBlocking() throws Exception {

View File

@ -31,6 +31,7 @@ import static org.hamcrest.Matchers.*;
/**
* @author kimchy (shay.banon)
*/
@Test(enabled = false)
public class ScalingThreadPoolTest {
@Test public void testScaleUp() throws Exception {