more work on indexer

This commit is contained in:
kimchy 2010-09-17 00:22:42 +02:00
parent 4b87f63bed
commit 08d7125cd5
18 changed files with 328 additions and 428 deletions

View File

@ -35,6 +35,13 @@ public class XContentMapValues {
return node instanceof List;
}
public static String nodeStringValue(Object node, String defaultValue) {
if (node == null) {
return defaultValue;
}
return node.toString();
}
public static float nodeFloatValue(Object node) {
if (node instanceof Number) {
return ((Number) node).floatValue();

View File

@ -67,13 +67,17 @@ public abstract class AbstractConcurrentMapFieldDataCache extends AbstractIndexC
return cache(type.fieldDataClass(), reader, fieldName);
}
protected ConcurrentMap<String, FieldData> buildFilterMap() {
return ConcurrentCollections.newConcurrentMap();
}
@Override public <T extends FieldData> T cache(Class<T> type, IndexReader reader, String fieldName) throws IOException {
ConcurrentMap<String, FieldData> fieldDataCache = cache.get(reader.getFieldCacheKey());
if (fieldDataCache == null) {
synchronized (creationMutex) {
fieldDataCache = cache.get(reader.getFieldCacheKey());
if (fieldDataCache == null) {
fieldDataCache = ConcurrentCollections.newConcurrentMap();
fieldDataCache = buildFilterMap();
cache.put(reader.getFieldCacheKey(), fieldDataCache);
}
}

View File

@ -22,7 +22,8 @@ package org.elasticsearch.indexer;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.indexer.settings.IndexerSettings;
import java.util.Map;
/**
* @author kimchy (shay.banon)
@ -33,24 +34,16 @@ public class AbstractIndexerComponent implements IndexerComponent {
protected final IndexerName indexerName;
protected final Settings indexSettings;
protected final Settings settings;
protected final Settings componentSettings;
protected final Map<String, Object> indexerSettings;
protected AbstractIndexerComponent(IndexerName indexerName, @IndexerSettings Settings indexSettings) {
protected AbstractIndexerComponent(IndexerName indexerName, Settings settings, @IndexerSettings Map<String, Object> indexerSettings) {
this.indexerName = indexerName;
this.indexSettings = indexSettings;
this.componentSettings = indexSettings.getComponentSettings(getClass());
this.settings = settings;
this.indexerSettings = indexerSettings;
this.logger = Loggers.getLogger(getClass(), indexSettings, indexerName);
}
protected AbstractIndexerComponent(IndexerName indexerName, @IndexerSettings Settings indexSettings, String prefixSettings) {
this.indexerName = indexerName;
this.indexSettings = indexSettings;
this.componentSettings = indexSettings.getComponentSettings(prefixSettings, getClass());
this.logger = Loggers.getLogger(getClass(), indexSettings, indexerName);
this.logger = Loggers.getLogger(getClass(), settings, indexerName);
}
@Override public IndexerName indexerName() {
@ -58,6 +51,6 @@ public class AbstractIndexerComponent implements IndexerComponent {
}
public String nodeName() {
return indexSettings.get("name", "");
return settings.get("name", "");
}
}

View File

@ -17,23 +17,24 @@
* under the License.
*/
package org.elasticsearch.indexer.settings;
package org.elasticsearch.indexer;
import org.elasticsearch.common.inject.AbstractModule;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.inject.BindingAnnotation;
import java.lang.annotation.Documented;
import java.lang.annotation.Retention;
import java.lang.annotation.Target;
import static java.lang.annotation.ElementType.*;
import static java.lang.annotation.RetentionPolicy.*;
/**
* @author kimchy (shay.banon)
* @author kimchy (Shay Banon)
*/
public class IndexerSettingsModule extends AbstractModule {
private final Settings settings;
public IndexerSettingsModule(Settings settings) {
this.settings = settings;
}
@Override protected void configure() {
bind(Settings.class).annotatedWith(IndexerSettings.class).toInstance(settings);
}
@BindingAnnotation
@Target({FIELD, PARAMETER})
@Retention(RUNTIME)
@Documented
public @interface IndexerIndexName {
}

View File

@ -24,6 +24,7 @@ import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.indexer.cluster.IndexerClusterService;
import org.elasticsearch.indexer.routing.IndexersRouter;
/**
* @author kimchy (shay.banon)
@ -34,23 +35,29 @@ public class IndexerManager extends AbstractLifecycleComponent<IndexerManager> {
private final IndexerClusterService clusterService;
@Inject public IndexerManager(Settings settings, IndexersService indexersService, IndexerClusterService clusterService) {
private final IndexersRouter indexersRouter;
@Inject public IndexerManager(Settings settings, IndexersService indexersService, IndexerClusterService clusterService, IndexersRouter indexersRouter) {
super(settings);
this.indexersService = indexersService;
this.clusterService = clusterService;
this.indexersRouter = indexersRouter;
}
@Override protected void doStart() throws ElasticSearchException {
indexersRouter.start();
indexersService.start();
clusterService.start();
}
@Override protected void doStop() throws ElasticSearchException {
indexersRouter.stop();
clusterService.stop();
indexersService.stop();
}
@Override protected void doClose() throws ElasticSearchException {
indexersRouter.close();
clusterService.close();
indexersService.close();
}

View File

@ -28,6 +28,8 @@ import org.elasticsearch.common.inject.SpawnModules;
import org.elasticsearch.common.settings.NoClassSettingsException;
import org.elasticsearch.common.settings.Settings;
import java.util.Map;
import static org.elasticsearch.common.Strings.*;
/**
@ -37,42 +39,40 @@ public class IndexerModule extends AbstractModule implements SpawnModules {
private IndexerName indexerName;
private final Settings settings;
private final Settings globalSettings;
public IndexerModule(IndexerName indexerName, Settings settings) {
private final Map<String, Object> settings;
public IndexerModule(IndexerName indexerName, Map<String, Object> settings, Settings globalSettings) {
this.indexerName = indexerName;
this.globalSettings = globalSettings;
this.settings = settings;
}
@Override public Iterable<? extends Module> spawnModules() {
String type = settings.get("indexer.type");
if (type == null) {
return ImmutableList.of(Modules.createModule(loadTypeModule(indexerName.type(), "org.elasticsearch.indexer.", "IndexerModule"), settings));
} else {
return ImmutableList.of(Modules.createModule(settings.getAsClass("indexer.type", Module.class, "org.elasticsearch.indexer.", "IndexerModule"), settings));
}
return ImmutableList.of(Modules.createModule(loadTypeModule(indexerName.type(), "org.elasticsearch.indexer.", "IndexerModule"), globalSettings));
}
@Override protected void configure() {
bind(Map.class).annotatedWith(IndexerSettings.class).toInstance(settings);
}
private Class<? extends Module> loadTypeModule(String type, String prefixPackage, String suffixClassName) {
String fullClassName = type;
try {
return (Class<? extends Module>) settings.getClassLoader().loadClass(fullClassName);
return (Class<? extends Module>) globalSettings.getClassLoader().loadClass(fullClassName);
} catch (ClassNotFoundException e) {
fullClassName = prefixPackage + Strings.capitalize(toCamelCase(type)) + suffixClassName;
try {
return (Class<? extends Module>) settings.getClassLoader().loadClass(fullClassName);
return (Class<? extends Module>) globalSettings.getClassLoader().loadClass(fullClassName);
} catch (ClassNotFoundException e1) {
fullClassName = prefixPackage + toCamelCase(type) + "." + Strings.capitalize(toCamelCase(type)) + suffixClassName;
try {
return (Class<? extends Module>) settings.getClassLoader().loadClass(fullClassName);
return (Class<? extends Module>) globalSettings.getClassLoader().loadClass(fullClassName);
} catch (ClassNotFoundException e2) {
fullClassName = prefixPackage + toCamelCase(type).toLowerCase() + "." + Strings.capitalize(toCamelCase(type)) + suffixClassName;
try {
return (Class<? extends Module>) settings.getClassLoader().loadClass(fullClassName);
return (Class<? extends Module>) globalSettings.getClassLoader().loadClass(fullClassName);
} catch (ClassNotFoundException e3) {
throw new NoClassSettingsException("Failed to load class with value [" + type + "]", e);
}

View File

@ -17,7 +17,7 @@
* under the License.
*/
package org.elasticsearch.indexer.settings;
package org.elasticsearch.indexer;
import org.elasticsearch.common.inject.BindingAnnotation;
@ -29,7 +29,7 @@ import static java.lang.annotation.ElementType.*;
import static java.lang.annotation.RetentionPolicy.*;
/**
* @author kimchy (shay.banon)
* @author kimchy (Shay Banon)
*/
@BindingAnnotation

View File

@ -22,6 +22,7 @@ package org.elasticsearch.indexer;
import org.elasticsearch.common.inject.AbstractModule;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.indexer.cluster.IndexerClusterService;
import org.elasticsearch.indexer.routing.IndexersRouter;
/**
* @author kimchy (shay.banon)
@ -35,8 +36,10 @@ public class IndexersModule extends AbstractModule {
}
@Override protected void configure() {
bind(String.class).annotatedWith(IndexerIndexName.class).toInstance(settings.get("indexer.index_name", "indexer"));
bind(IndexersService.class).asEagerSingleton();
bind(IndexerClusterService.class).asEagerSingleton();
bind(IndexersRouter.class).asEagerSingleton();
bind(IndexerManager.class).asEagerSingleton();
}
}

View File

@ -20,6 +20,9 @@
package org.elasticsearch.indexer;
import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.collect.ImmutableMap;
@ -36,21 +39,21 @@ import org.elasticsearch.indexer.cluster.IndexerClusterChangedEvent;
import org.elasticsearch.indexer.cluster.IndexerClusterService;
import org.elasticsearch.indexer.cluster.IndexerClusterState;
import org.elasticsearch.indexer.cluster.IndexerClusterStateListener;
import org.elasticsearch.indexer.metadata.IndexerMetaData;
import org.elasticsearch.indexer.routing.IndexerRouting;
import org.elasticsearch.indexer.settings.IndexerSettingsModule;
import org.elasticsearch.threadpool.ThreadPool;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import static org.elasticsearch.common.settings.ImmutableSettings.*;
/**
* @author kimchy (shay.banon)
*/
public class IndexersService extends AbstractLifecycleComponent<IndexersService> {
private final String indexerIndexName;
private Client client;
private final ThreadPool threadPool;
private final ClusterService clusterService;
@ -61,8 +64,10 @@ public class IndexersService extends AbstractLifecycleComponent<IndexersService>
private volatile ImmutableMap<IndexerName, Indexer> indexers = ImmutableMap.of();
@Inject public IndexersService(Settings settings, ThreadPool threadPool, ClusterService clusterService, IndexerClusterService indexerClusterService, Injector injector) {
@Inject public IndexersService(Settings settings, Client client, ThreadPool threadPool, ClusterService clusterService, IndexerClusterService indexerClusterService, Injector injector) {
super(settings);
this.indexerIndexName = settings.get("indexer.index_name", "indexer");
this.client = client;
this.threadPool = threadPool;
this.clusterService = clusterService;
this.injector = injector;
@ -98,24 +103,16 @@ public class IndexersService extends AbstractLifecycleComponent<IndexersService>
@Override protected void doClose() throws ElasticSearchException {
}
public synchronized Indexer createIndexer(IndexerName indexerName, Settings settings) throws ElasticSearchException {
public synchronized Indexer createIndexer(IndexerName indexerName, Map<String, Object> settings) throws ElasticSearchException {
if (indexersInjectors.containsKey(indexerName)) {
throw new IndexerException(indexerName, "indexer already exists");
}
logger.debug("creating indexer [{}][{}]", indexerName.type(), indexerName.name());
Settings indexerSettings = settingsBuilder()
.put(this.settings)
.put(settings)
.classLoader(settings.getClassLoader())
.globalSettings(settings.getGlobalSettings())
.build();
ModulesBuilder modules = new ModulesBuilder();
modules.add(new IndexerNameModule(indexerName));
modules.add(new IndexerSettingsModule(indexerSettings));
modules.add(new IndexerModule(indexerName, indexerSettings));
modules.add(new IndexerModule(indexerName, settings, this.settings));
Injector indexInjector = modules.createChildInjector(injector);
indexersInjectors.put(indexerName, indexInjector);
@ -170,12 +167,6 @@ public class IndexersService extends AbstractLifecycleComponent<IndexersService>
// first, go over and delete ones that either don't exists or are not allocated
for (IndexerName indexerName : indexers.keySet()) {
// if its not on the metadata, it was deleted, delete it
IndexerMetaData indexerMetaData = state.metaData().indexer(indexerName);
if (indexerMetaData == null) {
deleteIndexer(indexerName);
}
IndexerRouting routing = state.routing().routing(indexerName);
if (routing == null || !localNode.equals(routing.node())) {
// not routed at all, and not allocated here, clean it (we delete the relevant ones before)
@ -183,15 +174,27 @@ public class IndexersService extends AbstractLifecycleComponent<IndexersService>
}
}
for (IndexerRouting routing : state.routing()) {
for (final IndexerRouting routing : state.routing()) {
// not allocated
if (routing.node() == null) {
continue;
}
// only apply changes to the local node
if (!routing.node().equals(localNode)) {
continue;
}
client.prepareGet(indexerIndexName, routing.indexerName().name(), "_meta").execute(new ActionListener<GetResponse>() {
@Override public void onResponse(GetResponse getResponse) {
if (getResponse.exists()) {
// only create the indexer if it exists, otherwise, the indexing meta data has not been visible yet...
createIndexer(routing.indexerName(), getResponse.sourceAsMap());
}
}
IndexerMetaData indexerMetaData = state.metaData().indexer(routing.indexerName());
createIndexer(indexerMetaData.indexerName(), indexerMetaData.settings());
@Override public void onFailure(Throwable e) {
logger.warn("failed to get _meta from [{}]/[{}]", routing.indexerName().type(), routing.indexerName().name());
}
});
}
}
}

View File

@ -21,11 +21,8 @@ package org.elasticsearch.indexer.cluster;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.indexer.metadata.IndexersMetaData;
import org.elasticsearch.indexer.routing.IndexersRouting;
import javax.annotation.Nullable;
import java.io.IOException;
/**
@ -35,19 +32,15 @@ public class IndexerClusterState {
private final long version;
private final IndexersMetaData metaData;
private final IndexersRouting routing;
public IndexerClusterState(long version, IndexerClusterState state) {
this.version = version;
this.metaData = state.metaData();
this.routing = state.routing();
}
IndexerClusterState(long version, IndexersMetaData metaData, IndexersRouting routing) {
IndexerClusterState(long version, IndexersRouting routing) {
this.version = version;
this.metaData = metaData;
this.routing = routing;
}
@ -55,10 +48,6 @@ public class IndexerClusterState {
return this.version;
}
public IndexersMetaData metaData() {
return metaData;
}
public IndexersRouting routing() {
return routing;
}
@ -71,26 +60,14 @@ public class IndexerClusterState {
private long version = 0;
private IndexersMetaData metaData;
private IndexersRouting routing;
private IndexersRouting routing = IndexersRouting.EMPTY;
public Builder state(IndexerClusterState state) {
this.version = state.version();
this.metaData = state.metaData();
this.routing = state.routing();
return this;
}
public Builder metaData(IndexersMetaData.Builder builder) {
return metaData(builder.build());
}
public Builder metaData(IndexersMetaData metaData) {
this.metaData = metaData;
return this;
}
public Builder routing(IndexersRouting.Builder builder) {
return routing(builder.build());
}
@ -101,20 +78,18 @@ public class IndexerClusterState {
}
public IndexerClusterState build() {
return new IndexerClusterState(version, metaData, routing);
return new IndexerClusterState(version, routing);
}
public static IndexerClusterState readFrom(StreamInput in, @Nullable Settings settings) throws IOException {
public static IndexerClusterState readFrom(StreamInput in) throws IOException {
Builder builder = new Builder();
builder.version = in.readVLong();
builder.metaData = IndexersMetaData.Builder.readFrom(in, settings);
builder.routing = IndexersRouting.Builder.readFrom(in);
return builder.build();
}
public static void writeTo(IndexerClusterState clusterState, StreamOutput out) throws IOException {
out.writeVLong(clusterState.version);
IndexersMetaData.Builder.writeTo(clusterState.metaData, out);
IndexersRouting.Builder.writeTo(clusterState.routing, out);
}
}

View File

@ -100,7 +100,7 @@ public class PublishIndexerClusterStateAction extends AbstractComponent {
}
@Override public void readFrom(StreamInput in) throws IOException {
clusterState = IndexerClusterState.Builder.readFrom(in, settings);
clusterState = IndexerClusterState.Builder.readFrom(in);
}
@Override public void writeTo(StreamOutput out) throws IOException {

View File

@ -19,20 +19,21 @@
package org.elasticsearch.indexer.dummy;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.indexer.AbstractIndexerComponent;
import org.elasticsearch.indexer.Indexer;
import org.elasticsearch.indexer.IndexerName;
import org.elasticsearch.indexer.settings.IndexerSettings;
import org.elasticsearch.indexer.IndexerSettings;
import java.util.Map;
/**
* @author kimchy (shay.banon)
*/
public class DummyIndexer extends AbstractIndexerComponent implements Indexer {
@Inject public DummyIndexer(IndexerName indexerName, @IndexerSettings Settings indexSettings) {
super(indexerName, indexSettings);
public DummyIndexer(IndexerName indexerName, Settings settings, @IndexerSettings Map<String, Object> indexerSettings) {
super(indexerName, settings, indexerSettings);
logger.info("created");
}

View File

@ -1,140 +0,0 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.indexer.metadata;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.indexer.IndexerName;
import javax.annotation.Nullable;
import java.io.IOException;
import java.util.Map;
/**
* @author kimchy (shay.banon)
*/
public class IndexerMetaData {
private final IndexerName indexerName;
private final Settings settings;
private IndexerMetaData(IndexerName indexerName, Settings settings) {
this.indexerName = indexerName;
this.settings = settings;
}
public IndexerName indexerName() {
return indexerName;
}
public Settings settings() {
return settings;
}
public static class Builder {
private IndexerName indexerName;
private Settings settings = ImmutableSettings.Builder.EMPTY_SETTINGS;
public Builder(IndexerName indexerName) {
this.indexerName = indexerName;
}
public Builder settings(Settings.Builder settings) {
this.settings = settings.build();
return this;
}
public Builder settings(Settings settings) {
this.settings = settings;
return this;
}
public IndexerMetaData build() {
return new IndexerMetaData(indexerName, settings);
}
public static void toXContent(IndexerMetaData indexerMetaData, XContentBuilder builder, ToXContent.Params params) throws IOException {
builder.startObject(indexerMetaData.indexerName().name());
builder.field("type", indexerMetaData.indexerName().type());
builder.startObject("settings");
for (Map.Entry<String, String> entry : indexerMetaData.settings().getAsMap().entrySet()) {
builder.field(entry.getKey(), entry.getValue());
}
builder.endObject();
builder.endObject();
}
public static IndexerMetaData fromXContent(XContentParser parser, @Nullable Settings globalSettings) throws IOException {
String name = parser.currentName();
ImmutableSettings.Builder settingsBuilder = null;
String type = null;
String currentFieldName = null;
XContentParser.Token token = parser.nextToken();
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
if (token == XContentParser.Token.FIELD_NAME) {
currentFieldName = parser.currentName();
} else if (token == XContentParser.Token.START_OBJECT) {
if ("settings".equals(currentFieldName)) {
settingsBuilder = ImmutableSettings.settingsBuilder().globalSettings(globalSettings);
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
String key = parser.currentName();
token = parser.nextToken();
String value = parser.text();
settingsBuilder.put(key, value);
}
}
} else if (token.isValue()) {
if ("type".equals(currentFieldName)) {
type = parser.text();
}
}
}
Builder builder = new Builder(new IndexerName(name, type));
if (settingsBuilder != null) {
builder.settings(settingsBuilder);
}
return builder.build();
}
public static IndexerMetaData readFrom(StreamInput in, Settings globalSettings) throws IOException {
Builder builder = new Builder(new IndexerName(in.readUTF(), in.readUTF()));
builder.settings(ImmutableSettings.readSettingsFromStream(in, globalSettings));
return builder.build();
}
public static void writeTo(IndexerMetaData indexerMetaData, StreamOutput out) throws IOException {
out.writeUTF(indexerMetaData.indexerName().type());
out.writeUTF(indexerMetaData.indexerName().name());
ImmutableSettings.writeSettingsToStream(indexerMetaData.settings(), out);
}
}
}

View File

@ -1,169 +0,0 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.indexer.metadata;
import org.elasticsearch.common.collect.ImmutableMap;
import org.elasticsearch.common.collect.MapBuilder;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.*;
import org.elasticsearch.indexer.IndexerName;
import javax.annotation.Nullable;
import java.io.IOException;
import java.util.Iterator;
/**
* @author kimchy (shay.banon)
*/
public class IndexersMetaData implements Iterable<IndexerMetaData> {
private final ImmutableMap<IndexerName, IndexerMetaData> indexers;
private final boolean recoveredFromGateway;
private IndexersMetaData(ImmutableMap<IndexerName, IndexerMetaData> indexers, boolean recoveredFromGateway) {
this.indexers = indexers;
this.recoveredFromGateway = recoveredFromGateway;
}
@Override public Iterator<IndexerMetaData> iterator() {
return indexers.values().iterator();
}
public IndexerMetaData indexer(IndexerName indexerName) {
return indexers.get(indexerName);
}
public boolean recoveredFromGateway() {
return recoveredFromGateway;
}
public static class Builder {
private MapBuilder<IndexerName, IndexerMetaData> indexers = MapBuilder.newMapBuilder();
private boolean recoveredFromGateway = false;
public Builder put(IndexerMetaData.Builder builder) {
return put(builder.build());
}
public Builder put(IndexerMetaData indexerMetaData) {
indexers.put(indexerMetaData.indexerName(), indexerMetaData);
return this;
}
public IndexerMetaData get(IndexerName indexerName) {
return indexers.get(indexerName);
}
public Builder remove(IndexerName indexerName) {
indexers.remove(indexerName);
return this;
}
public Builder metaData(IndexersMetaData metaData) {
this.indexers.putAll(metaData.indexers);
this.recoveredFromGateway = metaData.recoveredFromGateway;
return this;
}
/**
* Indicates that this cluster state has been recovered from the gateawy.
*/
public Builder markAsRecoveredFromGateway() {
this.recoveredFromGateway = true;
return this;
}
public IndexersMetaData build() {
return new IndexersMetaData(indexers.immutableMap(), recoveredFromGateway);
}
public static String toXContent(IndexersMetaData metaData) throws IOException {
XContentBuilder builder = XContentFactory.contentBuilder(XContentType.JSON);
builder.startObject();
toXContent(metaData, builder, ToXContent.EMPTY_PARAMS);
builder.endObject();
return builder.string();
}
public static void toXContent(IndexersMetaData metaData, XContentBuilder builder, ToXContent.Params params) throws IOException {
builder.startObject("meta-data");
builder.startObject("indexers");
for (IndexerMetaData indexMetaData : metaData) {
IndexerMetaData.Builder.toXContent(indexMetaData, builder, params);
}
builder.endObject();
builder.endObject();
}
public static IndexersMetaData fromXContent(XContentParser parser, @Nullable Settings globalSettings) throws IOException {
Builder builder = new Builder();
XContentParser.Token token = parser.currentToken();
String currentFieldName = parser.currentName();
if (!"meta-data".equals(currentFieldName)) {
token = parser.nextToken();
currentFieldName = parser.currentName();
if (token == null) {
// no data...
return builder.build();
}
}
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
if (token == XContentParser.Token.FIELD_NAME) {
currentFieldName = parser.currentName();
} else if (token == XContentParser.Token.START_OBJECT) {
if ("indexers".equals(currentFieldName)) {
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
builder.put(IndexerMetaData.Builder.fromXContent(parser, globalSettings));
}
}
}
}
return builder.build();
}
public static IndexersMetaData readFrom(StreamInput in, @Nullable Settings globalSettings) throws IOException {
Builder builder = new Builder();
// we only serialize it using readFrom, not in to/from XContent
builder.recoveredFromGateway = in.readBoolean();
int size = in.readVInt();
for (int i = 0; i < size; i++) {
builder.put(IndexerMetaData.Builder.readFrom(in, globalSettings));
}
return builder.build();
}
public static void writeTo(IndexersMetaData metaData, StreamOutput out) throws IOException {
out.writeBoolean(metaData.recoveredFromGateway());
out.writeVInt(metaData.indexers.size());
for (IndexerMetaData indexMetaData : metaData) {
IndexerMetaData.Builder.writeTo(indexMetaData, out);
}
}
}
}

View File

@ -20,19 +20,30 @@
package org.elasticsearch.indexer.routing;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.indexer.IndexerName;
import java.io.IOException;
/**
* @author kimchy (shay.banon)
*/
public class IndexerRouting {
public class IndexerRouting implements Streamable {
private IndexerName indexerName;
private IndexerRoutingState state;
private DiscoveryNode node;
IndexerRouting(IndexerName indexerName, DiscoveryNode node) {
private IndexerRouting() {
}
IndexerRouting(IndexerName indexerName, IndexerRoutingState state, DiscoveryNode node) {
this.indexerName = indexerName;
this.state = state;
this.node = node;
}
@ -40,7 +51,40 @@ public class IndexerRouting {
return indexerName;
}
/**
* The node the indexer is allocated to, <tt>null</tt> if its not allocated.
*/
public DiscoveryNode node() {
return node;
}
public IndexerRoutingState state() {
return this.state;
}
public static IndexerRouting readIndexerRouting(StreamInput in) throws IOException {
IndexerRouting routing = new IndexerRouting();
routing.readFrom(in);
return routing;
}
@Override public void readFrom(StreamInput in) throws IOException {
indexerName = new IndexerName(in.readUTF(), in.readUTF());
state = IndexerRoutingState.fromValue(in.readByte());
if (in.readBoolean()) {
node = DiscoveryNode.readNode(in);
}
}
@Override public void writeTo(StreamOutput out) throws IOException {
out.writeUTF(indexerName.type());
out.writeUTF(indexerName.name());
out.writeByte(state.value());
if (node == null) {
out.writeBoolean(false);
} else {
out.writeBoolean(true);
node.writeTo(out);
}
}
}

View File

@ -0,0 +1,65 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.indexer.routing;
import org.elasticsearch.ElasticSearchIllegalStateException;
/**
* The state of the indexer as defined by the cluster.
*
* @author kimchy (shay.banon)
*/
public enum IndexerRoutingState {
/**
* The indexer is not assigned to any node.
*/
UNASSIGNED((byte) 1),
/**
* The indexer is initializing.
*/
INITIALIZING((byte) 2),
/**
* The indexer is started.
*/
STARTED((byte) 3);
private byte value;
IndexerRoutingState(byte value) {
this.value = value;
}
public byte value() {
return this.value;
}
public static IndexerRoutingState fromValue(byte value) {
switch (value) {
case 1:
return UNASSIGNED;
case 2:
return INITIALIZING;
case 3:
return STARTED;
default:
throw new ElasticSearchIllegalStateException("No should routing state mapped for [" + value + "]");
}
}
}

View File

@ -19,34 +19,111 @@
package org.elasticsearch.indexer.routing;
import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterStateListener;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.compress.CompressedString;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.support.XContentMapValues;
import org.elasticsearch.indexer.IndexerName;
import org.elasticsearch.indexer.cluster.IndexerClusterService;
import org.elasticsearch.indexer.cluster.IndexerClusterState;
import org.elasticsearch.indexer.cluster.IndexerClusterStateUpdateTask;
import java.util.Map;
/**
* @author kimchy (shay.banon)
*/
public class IndexersRouter extends AbstractComponent implements ClusterStateListener {
public class IndexersRouter extends AbstractLifecycleComponent<IndexersRouter> implements ClusterStateListener {
private final String indexerIndexName;
private final Client client;
private final IndexerClusterService indexerClusterService;
@Inject public IndexersRouter(Settings settings, ClusterService clusterService, IndexerClusterService indexerClusterService) {
@Inject public IndexersRouter(Settings settings, Client client, ClusterService clusterService, IndexerClusterService indexerClusterService) {
super(settings);
this.indexerIndexName = settings.get("indexer.index_name", "indexer");
this.indexerClusterService = indexerClusterService;
this.client = client;
clusterService.add(this);
}
@Override protected void doStart() throws ElasticSearchException {
}
@Override protected void doStop() throws ElasticSearchException {
}
@Override protected void doClose() throws ElasticSearchException {
}
@Override public void clusterChanged(final ClusterChangedEvent event) {
if (event.nodesChanged()) {
if (!event.localNodeMaster()) {
return;
}
if (event.nodesChanged() || event.metaDataChanged()) {
indexerClusterService.submitStateUpdateTask("reroute_indexers_node_changed", new IndexerClusterStateUpdateTask() {
@Override public IndexerClusterState execute(IndexerClusterState currentState) {
return null; //To change body of implemented methods use File | Settings | File Templates.
if (!event.state().metaData().hasIndex(indexerIndexName)) {
// if there are routings, publish an empty one (so it will be deleted on nodes), otherwise, return the same state
if (!currentState.routing().isEmpty()) {
return IndexerClusterState.builder().state(currentState).routing(IndexersRouting.builder()).build();
}
return currentState;
}
IndexersRouting.Builder routingBuilder = IndexersRouting.builder().routing(currentState.routing());
boolean dirty = false;
IndexMetaData indexMetaData = event.state().metaData().index(indexerIndexName);
// go over and create new indexer routing (with no node) for new types (indexers names)
for (Map.Entry<String, CompressedString> entry : indexMetaData.mappings().entrySet()) {
String mappingType = entry.getKey(); // mapping type is the name of the indexer
if (!currentState.routing().hasIndexerByName(mappingType)) {
// no indexer, we need to add it to the routing with no node allocation
try {
GetResponse getResponse = client.prepareGet(indexerIndexName, mappingType, "_meta").execute().actionGet();
if (getResponse.exists()) {
String indexerType = XContentMapValues.nodeStringValue(getResponse.sourceAsMap().get("type"), null);
if (indexerType == null) {
logger.warn("no indexer type provided for [{}], ignoring...", indexerIndexName);
} else {
routingBuilder.put(new IndexerRouting(new IndexerName(mappingType, indexerType), IndexerRoutingState.UNASSIGNED, null));
dirty = true;
}
}
} catch (Exception e) {
logger.warn("failed to get/parse _meta for [{}]", mappingType);
}
}
}
// now, remove routings that were deleted
for (IndexerRouting routing : currentState.routing()) {
if (!indexMetaData.mappings().containsKey(routing.indexerName().name())) {
routingBuilder.remove(routing);
dirty = true;
}
}
// now, allocate indexers
// see if we can relocate indexers (we can simply first unassign then, then publish) and then, next round, they will be assigned
// but, we need to make sure that there will *be* next round of this is the logic
if (dirty) {
return IndexerClusterState.builder().state(currentState).routing(routingBuilder).build();
}
return currentState;
}
});
}

View File

@ -19,7 +19,6 @@
package org.elasticsearch.indexer.routing;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.collect.ImmutableMap;
import org.elasticsearch.common.collect.MapBuilder;
import org.elasticsearch.common.io.stream.StreamInput;
@ -34,20 +33,39 @@ import java.util.Iterator;
*/
public class IndexersRouting implements Iterable<IndexerRouting> {
public static final IndexersRouting EMPTY = IndexersRouting.builder().build();
private final ImmutableMap<IndexerName, IndexerRouting> indexers;
private IndexersRouting(ImmutableMap<IndexerName, IndexerRouting> indexers) {
this.indexers = indexers;
}
public boolean isEmpty() {
return indexers.isEmpty();
}
public IndexerRouting routing(IndexerName indexerName) {
return indexers.get(indexerName);
}
public boolean hasIndexerByName(String name) {
for (IndexerName indexerName : indexers.keySet()) {
if (indexerName.name().equals(name)) {
return true;
}
}
return false;
}
@Override public Iterator<IndexerRouting> iterator() {
return indexers.values().iterator();
}
public static Builder builder() {
return new Builder();
}
public static class Builder {
private MapBuilder<IndexerName, IndexerRouting> indexers = MapBuilder.newMapBuilder();
@ -67,6 +85,20 @@ public class IndexersRouting implements Iterable<IndexerRouting> {
return this;
}
public Builder remove(IndexerName indexerName) {
indexers.remove(indexerName);
return this;
}
public Builder remote(String indexerName) {
for (IndexerName name : indexers.map().keySet()) {
if (name.name().equals(indexerName)) {
indexers.remove(name);
}
}
return this;
}
public IndexersRouting build() {
return new IndexersRouting(indexers.immutableMap());
}
@ -75,7 +107,7 @@ public class IndexersRouting implements Iterable<IndexerRouting> {
Builder builder = new Builder();
int size = in.readVInt();
for (int i = 0; i < size; i++) {
builder.put(new IndexerRouting(new IndexerName(in.readUTF(), in.readUTF()), DiscoveryNode.readNode(in)));
builder.put(IndexerRouting.readIndexerRouting(in));
}
return builder.build();
}
@ -83,10 +115,7 @@ public class IndexersRouting implements Iterable<IndexerRouting> {
public static void writeTo(IndexersRouting routing, StreamOutput out) throws IOException {
out.writeVInt(routing.indexers.size());
for (IndexerRouting indexerRouting : routing) {
out.writeUTF(indexerRouting.indexerName().type());
out.writeUTF(indexerRouting.indexerName().name());
indexerRouting.node().writeTo(out);
indexerRouting.writeTo(out);
}
}
}