Reduce multiple indices overhead, don't create Indices data on nodes that don't hold any shard for an index, closes #441.
This commit is contained in:
parent
1235358848
commit
ae5bc20959
|
@ -136,6 +136,20 @@ public class MetaDataMappingService extends AbstractComponent {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// pre create indices here and add mappings to them so we can merge the mappings here if needed
|
||||||
|
for (String index : request.indices) {
|
||||||
|
if (indicesService.hasIndex(index)) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
final IndexMetaData indexMetaData = currentState.metaData().index(index);
|
||||||
|
IndexService indexService = indicesService.createIndex(indexMetaData.index(), indexMetaData.settings(), currentState.nodes().localNode().id());
|
||||||
|
for (Map.Entry<String, CompressedString> mapping : indexMetaData.mappings().entrySet()) {
|
||||||
|
if (!indexService.mapperService().hasMapping(mapping.getKey())) {
|
||||||
|
indexService.mapperService().add(mapping.getKey(), mapping.getValue().string());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
Map<String, DocumentMapper> newMappers = newHashMap();
|
Map<String, DocumentMapper> newMappers = newHashMap();
|
||||||
Map<String, DocumentMapper> existingMappers = newHashMap();
|
Map<String, DocumentMapper> existingMappers = newHashMap();
|
||||||
for (String index : request.indices) {
|
for (String index : request.indices) {
|
||||||
|
|
|
@ -30,10 +30,7 @@ import org.elasticsearch.cluster.action.shard.ShardStateAction;
|
||||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||||
import org.elasticsearch.cluster.node.DiscoveryNodes;
|
import org.elasticsearch.cluster.node.DiscoveryNodes;
|
||||||
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
|
import org.elasticsearch.cluster.routing.*;
|
||||||
import org.elasticsearch.cluster.routing.RoutingNode;
|
|
||||||
import org.elasticsearch.cluster.routing.RoutingTable;
|
|
||||||
import org.elasticsearch.cluster.routing.ShardRouting;
|
|
||||||
import org.elasticsearch.common.collect.ImmutableMap;
|
import org.elasticsearch.common.collect.ImmutableMap;
|
||||||
import org.elasticsearch.common.collect.Tuple;
|
import org.elasticsearch.common.collect.Tuple;
|
||||||
import org.elasticsearch.common.component.AbstractLifecycleComponent;
|
import org.elasticsearch.common.component.AbstractLifecycleComponent;
|
||||||
|
@ -124,6 +121,19 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
|
||||||
applyNewOrUpdatedShards(event);
|
applyNewOrUpdatedShards(event);
|
||||||
applyDeletedIndices(event);
|
applyDeletedIndices(event);
|
||||||
applyDeletedShards(event);
|
applyDeletedShards(event);
|
||||||
|
applyCleanedIndices(event);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void applyCleanedIndices(final ClusterChangedEvent event) {
|
||||||
|
for (final String index : indicesService.indices()) {
|
||||||
|
if (indicesService.indexService(index).shardIds().isEmpty()) {
|
||||||
|
if (logger.isDebugEnabled()) {
|
||||||
|
logger.debug("[{}] cleaning index (no shards allocated)", index);
|
||||||
|
}
|
||||||
|
// clean the index
|
||||||
|
indicesService.cleanIndex(index);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void applyDeletedIndices(final ClusterChangedEvent event) {
|
private void applyDeletedIndices(final ClusterChangedEvent event) {
|
||||||
|
@ -173,9 +183,14 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
|
||||||
}
|
}
|
||||||
|
|
||||||
private void applyNewIndices(final ClusterChangedEvent event) {
|
private void applyNewIndices(final ClusterChangedEvent event) {
|
||||||
// first, go over and create and indices that needs to be created
|
// we only create indices for shards that are allocated
|
||||||
for (final IndexMetaData indexMetaData : event.state().metaData()) {
|
RoutingNode routingNode = event.state().readOnlyRoutingNodes().nodesToShards().get(event.state().nodes().localNodeId());
|
||||||
if (!indicesService.hasIndex(indexMetaData.index())) {
|
if (routingNode == null) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
for (MutableShardRouting shard : routingNode) {
|
||||||
|
if (!indicesService.hasIndex(shard.index())) {
|
||||||
|
final IndexMetaData indexMetaData = event.state().metaData().index(shard.index());
|
||||||
if (logger.isDebugEnabled()) {
|
if (logger.isDebugEnabled()) {
|
||||||
logger.debug("[{}] creating index", indexMetaData.index());
|
logger.debug("[{}] creating index", indexMetaData.index());
|
||||||
}
|
}
|
||||||
|
|
|
@ -70,6 +70,8 @@ public class RecoveryWhileUnderLoadTests extends AbstractNodesTests {
|
||||||
.setSource(MapBuilder.<String, Object>newMapBuilder().put("test", "value" + id).map()).execute().actionGet();
|
.setSource(MapBuilder.<String, Object>newMapBuilder().put("test", "value" + id).map()).execute().actionGet();
|
||||||
}
|
}
|
||||||
logger.info("**** done indexing thread {}", indexerId);
|
logger.info("**** done indexing thread {}", indexerId);
|
||||||
|
} catch (Exception e) {
|
||||||
|
logger.warn("**** failed indexing thread {}", e, indexerId);
|
||||||
} finally {
|
} finally {
|
||||||
stopLatch.countDown();
|
stopLatch.countDown();
|
||||||
}
|
}
|
||||||
|
@ -149,6 +151,8 @@ public class RecoveryWhileUnderLoadTests extends AbstractNodesTests {
|
||||||
.setSource(MapBuilder.<String, Object>newMapBuilder().put("test", "value" + id).map()).execute().actionGet();
|
.setSource(MapBuilder.<String, Object>newMapBuilder().put("test", "value" + id).map()).execute().actionGet();
|
||||||
}
|
}
|
||||||
logger.info("**** done indexing thread {}", indexerId);
|
logger.info("**** done indexing thread {}", indexerId);
|
||||||
|
} catch (Exception e) {
|
||||||
|
logger.warn("**** failed indexing thread {}", e, indexerId);
|
||||||
} finally {
|
} finally {
|
||||||
stopLatch.countDown();
|
stopLatch.countDown();
|
||||||
}
|
}
|
||||||
|
@ -236,6 +240,8 @@ public class RecoveryWhileUnderLoadTests extends AbstractNodesTests {
|
||||||
.setSource(MapBuilder.<String, Object>newMapBuilder().put("test", "value" + id).map()).execute().actionGet();
|
.setSource(MapBuilder.<String, Object>newMapBuilder().put("test", "value" + id).map()).execute().actionGet();
|
||||||
}
|
}
|
||||||
logger.info("**** done indexing thread {}", indexerId);
|
logger.info("**** done indexing thread {}", indexerId);
|
||||||
|
} catch (Exception e) {
|
||||||
|
logger.warn("**** failed indexing thread {}", e, indexerId);
|
||||||
} finally {
|
} finally {
|
||||||
stopLatch.countDown();
|
stopLatch.countDown();
|
||||||
}
|
}
|
||||||
|
|
|
@ -66,6 +66,8 @@ public class SearchFieldsTests extends AbstractNodesTests {
|
||||||
|
|
||||||
client.admin().indices().preparePutMapping().setType("type1").setSource(mapping).execute().actionGet();
|
client.admin().indices().preparePutMapping().setType("type1").setSource(mapping).execute().actionGet();
|
||||||
|
|
||||||
|
Thread.sleep(100); // sleep a bit here..., so hte mappings get applied
|
||||||
|
|
||||||
client.prepareIndex("test", "type1", "1").setSource(jsonBuilder().startObject()
|
client.prepareIndex("test", "type1", "1").setSource(jsonBuilder().startObject()
|
||||||
.field("field1", "value1")
|
.field("field1", "value1")
|
||||||
.field("field2", "value2")
|
.field("field2", "value2")
|
||||||
|
|
Loading…
Reference in New Issue