diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/indexer/IndexerManager.java b/modules/elasticsearch/src/main/java/org/elasticsearch/indexer/IndexerManager.java new file mode 100644 index 00000000000..4b1a4b689ed --- /dev/null +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/indexer/IndexerManager.java @@ -0,0 +1,57 @@ +/* + * Licensed to Elastic Search and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Elastic Search licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.indexer; + +import org.elasticsearch.ElasticSearchException; +import org.elasticsearch.common.component.AbstractLifecycleComponent; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.indexer.cluster.IndexerClusterService; + +/** + * @author kimchy (shay.banon) + */ +public class IndexerManager extends AbstractLifecycleComponent { + + private final IndexersService indexersService; + + private final IndexerClusterService clusterService; + + @Inject public IndexerManager(Settings settings, IndexersService indexersService, IndexerClusterService clusterService) { + super(settings); + this.indexersService = indexersService; + this.clusterService = clusterService; + } + + @Override protected void doStart() throws ElasticSearchException { + indexersService.start(); + clusterService.start(); + } + + @Override protected void doStop() throws ElasticSearchException { + clusterService.stop(); + indexersService.stop(); + } + + @Override protected void doClose() throws ElasticSearchException { + clusterService.close(); + indexersService.close(); + } +} diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/indexer/IndexersModule.java b/modules/elasticsearch/src/main/java/org/elasticsearch/indexer/IndexersModule.java index c75d3c4f46c..250cb139923 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/indexer/IndexersModule.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/indexer/IndexersModule.java @@ -21,6 +21,7 @@ package org.elasticsearch.indexer; import org.elasticsearch.common.inject.AbstractModule; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.indexer.cluster.IndexerClusterService; /** * @author kimchy (shay.banon) @@ -35,5 +36,7 @@ public class IndexersModule extends AbstractModule { @Override protected void configure() { bind(IndexersService.class).asEagerSingleton(); + bind(IndexerClusterService.class).asEagerSingleton(); + bind(IndexerManager.class).asEagerSingleton(); } } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/indexer/cluster/IndexerClusterChangedEvent.java b/modules/elasticsearch/src/main/java/org/elasticsearch/indexer/cluster/IndexerClusterChangedEvent.java new file mode 100644 index 00000000000..1bcaacfd313 --- /dev/null +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/indexer/cluster/IndexerClusterChangedEvent.java @@ -0,0 +1,53 @@ +/* + * Licensed to Elastic Search and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Elastic Search licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.indexer.cluster; + +/** + * @author kimchy (shay.banon) + */ +public class IndexerClusterChangedEvent { + + private final String source; + + private final IndexerClusterState previousState; + + private final IndexerClusterState state; + + public IndexerClusterChangedEvent(String source, IndexerClusterState state, IndexerClusterState previousState) { + this.source = source; + this.state = state; + this.previousState = previousState; + } + + /** + * The source that caused this cluster event to be raised. + */ + public String source() { + return this.source; + } + + public IndexerClusterState state() { + return this.state; + } + + public IndexerClusterState previousState() { + return this.previousState; + } +} diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/indexer/cluster/IndexerClusterService.java b/modules/elasticsearch/src/main/java/org/elasticsearch/indexer/cluster/IndexerClusterService.java new file mode 100644 index 00000000000..20988e54ec9 --- /dev/null +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/indexer/cluster/IndexerClusterService.java @@ -0,0 +1,160 @@ +/* + * Licensed to Elastic Search and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Elastic Search licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.indexer.cluster; + +import org.elasticsearch.ElasticSearchException; +import org.elasticsearch.cluster.ClusterService; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.common.component.AbstractLifecycleComponent; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.transport.TransportService; + +import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; + +import static java.util.concurrent.Executors.*; +import static org.elasticsearch.common.util.concurrent.EsExecutors.*; + +/** + * @author kimchy (shay.banon) + */ +public class IndexerClusterService extends AbstractLifecycleComponent { + + private final TransportService transportService; + + private final ClusterService clusterService; + + private final PublishIndexerClusterStateAction publishAction; + + private final List clusterStateListeners = new CopyOnWriteArrayList(); + + private volatile ExecutorService updateTasksExecutor; + + private volatile IndexerClusterState clusterState = IndexerClusterState.builder().build(); + + @Inject public IndexerClusterService(Settings settings, TransportService transportService, ClusterService clusterService) { + super(settings); + this.transportService = transportService; + this.clusterService = clusterService; + + this.publishAction = new PublishIndexerClusterStateAction(settings, transportService, clusterService, new UpdateClusterStateListener()); + } + + @Override protected void doStart() throws ElasticSearchException { + this.updateTasksExecutor = newSingleThreadExecutor(daemonThreadFactory(settings, "indexerClusterService#updateTask")); + } + + @Override protected void doStop() throws ElasticSearchException { + updateTasksExecutor.shutdown(); + try { + updateTasksExecutor.awaitTermination(10, TimeUnit.SECONDS); + } catch (InterruptedException e) { + // ignore + } + } + + @Override protected void doClose() throws ElasticSearchException { + } + + public void add(IndexerClusterStateListener listener) { + clusterStateListeners.add(listener); + } + + public void remove(IndexerClusterStateListener listener) { + clusterStateListeners.remove(listener); + } + + public void submitStateUpdateTask(final String source, final IndexerClusterStateUpdateTask updateTask) { + if (!lifecycle.started()) { + return; + } + updateTasksExecutor.execute(new Runnable() { + @Override public void run() { + if (!lifecycle.started()) { + logger.debug("processing [{}]: ignoring, cluster_service not started", source); + return; + } + logger.debug("processing [{}]: execute", source); + + IndexerClusterState previousClusterState = clusterState; + try { + clusterState = updateTask.execute(previousClusterState); + } catch (Exception e) { + StringBuilder sb = new StringBuilder("failed to execute cluster state update, state:\nversion [").append(clusterState.version()).append("], source [").append(source).append("]\n"); + logger.warn(sb.toString(), e); + return; + } + if (previousClusterState != clusterState) { + if (clusterService.state().nodes().localNodeMaster()) { + // only the master controls the version numbers + clusterState = new IndexerClusterState(clusterState.version() + 1, clusterState); + } else { + // we got this cluster state from the master, filter out based on versions (don't call listeners) + if (clusterState.version() < previousClusterState.version()) { + logger.debug("got old cluster state [" + clusterState.version() + "<" + previousClusterState.version() + "] from source [" + source + "], ignoring"); + return; + } + } + + if (logger.isTraceEnabled()) { + StringBuilder sb = new StringBuilder("cluster state updated:\nversion [").append(clusterState.version()).append("], source [").append(source).append("]\n"); + logger.trace(sb.toString()); + } else if (logger.isDebugEnabled()) { + logger.debug("cluster state updated, version [{}], source [{}]", clusterState.version(), source); + } + + IndexerClusterChangedEvent clusterChangedEvent = new IndexerClusterChangedEvent(source, clusterState, previousClusterState); + + for (IndexerClusterStateListener listener : clusterStateListeners) { + listener.indexerClusterChanged(clusterChangedEvent); + } + + // if we are the master, publish the new state to all nodes + if (clusterService.state().nodes().localNodeMaster()) { + publishAction.publish(clusterState); + } + + logger.debug("processing [{}]: done applying updated cluster_state", source); + } else { + logger.debug("processing [{}]: no change in cluster_state", source); + } + } + }); + } + + private class UpdateClusterStateListener implements PublishIndexerClusterStateAction.NewClusterStateListener { + @Override public void onNewClusterState(final IndexerClusterState clusterState) { + ClusterState state = clusterService.state(); + if (!state.nodes().localNodeMaster()) { + logger.warn("master should not receive new cluster state from [{}]", state.nodes().masterNode()); + return; + } + + submitStateUpdateTask("received_state", new IndexerClusterStateUpdateTask() { + @Override public IndexerClusterState execute(IndexerClusterState currentState) { + return clusterState; + } + }); + } + } +} diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/indexer/cluster/IndexerClusterState.java b/modules/elasticsearch/src/main/java/org/elasticsearch/indexer/cluster/IndexerClusterState.java new file mode 100644 index 00000000000..b73fd238c60 --- /dev/null +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/indexer/cluster/IndexerClusterState.java @@ -0,0 +1,121 @@ +/* + * Licensed to Elastic Search and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Elastic Search licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.indexer.cluster; + +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.indexer.metadata.IndexersMetaData; +import org.elasticsearch.indexer.routing.IndexersRouting; + +import javax.annotation.Nullable; +import java.io.IOException; + +/** + * @author kimchy (shay.banon) + */ +public class IndexerClusterState { + + private final long version; + + private final IndexersMetaData metaData; + + private final IndexersRouting routing; + + public IndexerClusterState(long version, IndexerClusterState state) { + this.version = version; + this.metaData = state.metaData(); + this.routing = state.routing(); + } + + IndexerClusterState(long version, IndexersMetaData metaData, IndexersRouting routing) { + this.version = version; + this.metaData = metaData; + this.routing = routing; + } + + public long version() { + return this.version; + } + + public IndexersMetaData metaData() { + return metaData; + } + + public IndexersRouting routing() { + return routing; + } + + public static Builder builder() { + return new Builder(); + } + + public static class Builder { + + private long version = 0; + + private IndexersMetaData metaData; + + private IndexersRouting routing; + + public Builder state(IndexerClusterState state) { + this.version = state.version(); + this.metaData = state.metaData(); + this.routing = state.routing(); + return this; + } + + public Builder metaData(IndexersMetaData.Builder builder) { + return metaData(builder.build()); + } + + public Builder metaData(IndexersMetaData metaData) { + this.metaData = metaData; + return this; + } + + public Builder routing(IndexersRouting.Builder builder) { + return routing(builder.build()); + } + + public Builder routing(IndexersRouting routing) { + this.routing = routing; + return this; + } + + public IndexerClusterState build() { + return new IndexerClusterState(version, metaData, routing); + } + + public static IndexerClusterState readFrom(StreamInput in, @Nullable Settings settings) throws IOException { + Builder builder = new Builder(); + builder.version = in.readVLong(); + builder.metaData = IndexersMetaData.Builder.readFrom(in, settings); + builder.routing = IndexersRouting.Builder.readFrom(in); + return builder.build(); + } + + public static void writeTo(IndexerClusterState clusterState, StreamOutput out) throws IOException { + out.writeVLong(clusterState.version); + IndexersMetaData.Builder.writeTo(clusterState.metaData, out); + IndexersRouting.Builder.writeTo(clusterState.routing, out); + } + } +} diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/indexer/cluster/IndexerClusterStateListener.java b/modules/elasticsearch/src/main/java/org/elasticsearch/indexer/cluster/IndexerClusterStateListener.java new file mode 100644 index 00000000000..af5b962bc24 --- /dev/null +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/indexer/cluster/IndexerClusterStateListener.java @@ -0,0 +1,28 @@ +/* + * Licensed to Elastic Search and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Elastic Search licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.indexer.cluster; + +/** + * @author kimchy (shay.banon) + */ +public interface IndexerClusterStateListener { + + void indexerClusterChanged(IndexerClusterChangedEvent event); +} diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/indexer/cluster/IndexerClusterStateUpdateTask.java b/modules/elasticsearch/src/main/java/org/elasticsearch/indexer/cluster/IndexerClusterStateUpdateTask.java new file mode 100644 index 00000000000..8498cbe7da0 --- /dev/null +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/indexer/cluster/IndexerClusterStateUpdateTask.java @@ -0,0 +1,28 @@ +/* + * Licensed to Elastic Search and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Elastic Search licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.indexer.cluster; + +/** + * @author kimchy (shay.banon) + */ +public interface IndexerClusterStateUpdateTask { + + IndexerClusterState execute(IndexerClusterState currentState); +} diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/indexer/cluster/IndexerNodeHelper.java b/modules/elasticsearch/src/main/java/org/elasticsearch/indexer/cluster/IndexerNodeHelper.java new file mode 100644 index 00000000000..316723c8752 --- /dev/null +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/indexer/cluster/IndexerNodeHelper.java @@ -0,0 +1,44 @@ +/* + * Licensed to Elastic Search and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Elastic Search licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.indexer.cluster; + +import org.elasticsearch.cluster.node.DiscoveryNode; + +/** + * @author kimchy (shay.banon) + */ +public class IndexerNodeHelper { + + public static boolean isIndexerNode(DiscoveryNode node) { + if (node.clientNode()) { + return false; + } + String indexer = node.attributes().get("indexer"); + // by default, if not set, its an indexer node (better OOB exp) + if (indexer == null) { + return true; + } + if ("_none_".equals(indexer)) { + return false; + } + // there is at least one indexer settings, we need it + return true; + } +} diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/indexer/cluster/PublishIndexerClusterStateAction.java b/modules/elasticsearch/src/main/java/org/elasticsearch/indexer/cluster/PublishIndexerClusterStateAction.java new file mode 100644 index 00000000000..51bd89c6fd2 --- /dev/null +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/indexer/cluster/PublishIndexerClusterStateAction.java @@ -0,0 +1,131 @@ +/* + * Licensed to Elastic Search and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Elastic Search licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.indexer.cluster; + +import org.elasticsearch.cluster.ClusterService; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.common.component.AbstractComponent; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Streamable; +import org.elasticsearch.common.io.stream.VoidStreamable; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.transport.*; + +import java.io.IOException; + +/** + * @author kimchy (shay.banon) + */ +public class PublishIndexerClusterStateAction extends AbstractComponent { + + public static interface NewClusterStateListener { + void onNewClusterState(IndexerClusterState clusterState); + } + + private final TransportService transportService; + + private final ClusterService clusterService; + + private final NewClusterStateListener listener; + + public PublishIndexerClusterStateAction(Settings settings, TransportService transportService, ClusterService clusterService, + NewClusterStateListener listener) { + super(settings); + this.transportService = transportService; + this.clusterService = clusterService; + this.listener = listener; + transportService.registerHandler(PublishClusterStateRequestHandler.ACTION, new PublishClusterStateRequestHandler()); + } + + public void close() { + transportService.removeHandler(PublishClusterStateRequestHandler.ACTION); + } + + public void publish(IndexerClusterState clusterState) { + final DiscoveryNodes discoNodes = clusterService.state().nodes(); + for (final DiscoveryNode node : discoNodes) { + if (node.equals(discoNodes.localNode())) { + // no need to send to our self + continue; + } + + // we only want to send nodes that are either possible master nodes or indexer nodes + // master nodes because they will handle the state and the allocation of indexers + // and indexer nodes since they will end up creating indexes + + if (node.clientNode()) { + continue; + } + + if (!node.masterNode() && !IndexerNodeHelper.isIndexerNode(node)) { + continue; + } + + transportService.sendRequest(node, PublishClusterStateRequestHandler.ACTION, new PublishClusterStateRequest(clusterState), new VoidTransportResponseHandler(false) { + @Override public void handleException(RemoteTransportException exp) { + logger.debug("failed to send cluster state to [{}], should be detected as failed soon...", exp, node); + } + }); + } + } + + private class PublishClusterStateRequest implements Streamable { + + private IndexerClusterState clusterState; + + private PublishClusterStateRequest() { + } + + private PublishClusterStateRequest(IndexerClusterState clusterState) { + this.clusterState = clusterState; + } + + @Override public void readFrom(StreamInput in) throws IOException { + clusterState = IndexerClusterState.Builder.readFrom(in, settings); + } + + @Override public void writeTo(StreamOutput out) throws IOException { + IndexerClusterState.Builder.writeTo(clusterState, out); + } + } + + private class PublishClusterStateRequestHandler extends BaseTransportRequestHandler { + + static final String ACTION = "indexer/state/publish"; + + @Override public PublishClusterStateRequest newInstance() { + return new PublishClusterStateRequest(); + } + + @Override public void messageReceived(PublishClusterStateRequest request, TransportChannel channel) throws Exception { + listener.onNewClusterState(request.clusterState); + channel.sendResponse(VoidStreamable.INSTANCE); + } + + /** + * No need to spawn, we add submit a new cluster state directly. This allows for faster application. + */ + @Override public boolean spawn() { + return false; + } + } +} diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/metadata/IndexerMetaData.java b/modules/elasticsearch/src/main/java/org/elasticsearch/indexer/metadata/IndexerMetaData.java similarity index 99% rename from modules/elasticsearch/src/main/java/org/elasticsearch/cluster/metadata/IndexerMetaData.java rename to modules/elasticsearch/src/main/java/org/elasticsearch/indexer/metadata/IndexerMetaData.java index 5a252c75e6d..6c8409bc1ff 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/metadata/IndexerMetaData.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/indexer/metadata/IndexerMetaData.java @@ -17,7 +17,7 @@ * under the License. */ -package org.elasticsearch.cluster.metadata; +package org.elasticsearch.indexer.metadata; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/indexer/metadata/IndexersMetaData.java b/modules/elasticsearch/src/main/java/org/elasticsearch/indexer/metadata/IndexersMetaData.java new file mode 100644 index 00000000000..8036607407a --- /dev/null +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/indexer/metadata/IndexersMetaData.java @@ -0,0 +1,165 @@ +/* + * Licensed to Elastic Search and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Elastic Search licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.indexer.metadata; + +import org.elasticsearch.common.collect.ImmutableMap; +import org.elasticsearch.common.collect.MapBuilder; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.xcontent.*; +import org.elasticsearch.indexer.IndexerName; + +import javax.annotation.Nullable; +import java.io.IOException; +import java.util.Iterator; + +/** + * @author kimchy (shay.banon) + */ +public class IndexersMetaData implements Iterable { + + private final ImmutableMap indexers; + + private final boolean recoveredFromGateway; + + private IndexersMetaData(ImmutableMap indexers, boolean recoveredFromGateway) { + this.indexers = indexers; + this.recoveredFromGateway = recoveredFromGateway; + } + + @Override public Iterator iterator() { + return indexers.values().iterator(); + } + + public boolean recoveredFromGateway() { + return recoveredFromGateway; + } + + public static class Builder { + private MapBuilder indexers = MapBuilder.newMapBuilder(); + + private boolean recoveredFromGateway = false; + + public Builder put(IndexerMetaData.Builder builder) { + return put(builder.build()); + } + + public Builder put(IndexerMetaData indexerMetaData) { + indexers.put(indexerMetaData.indexerName(), indexerMetaData); + return this; + } + + public IndexerMetaData get(IndexerName indexerName) { + return indexers.get(indexerName); + } + + public Builder remove(IndexerName indexerName) { + indexers.remove(indexerName); + return this; + } + + public Builder metaData(IndexersMetaData metaData) { + this.indexers.putAll(metaData.indexers); + this.recoveredFromGateway = metaData.recoveredFromGateway; + return this; + } + + /** + * Indicates that this cluster state has been recovered from the gateawy. + */ + public Builder markAsRecoveredFromGateway() { + this.recoveredFromGateway = true; + return this; + } + + public IndexersMetaData build() { + return new IndexersMetaData(indexers.immutableMap(), recoveredFromGateway); + } + + public static String toXContent(IndexersMetaData metaData) throws IOException { + XContentBuilder builder = XContentFactory.contentBuilder(XContentType.JSON); + builder.startObject(); + toXContent(metaData, builder, ToXContent.EMPTY_PARAMS); + builder.endObject(); + return builder.string(); + } + + public static void toXContent(IndexersMetaData metaData, XContentBuilder builder, ToXContent.Params params) throws IOException { + builder.startObject("meta-data"); + + builder.startObject("indexers"); + for (IndexerMetaData indexMetaData : metaData) { + IndexerMetaData.Builder.toXContent(indexMetaData, builder, params); + } + builder.endObject(); + + builder.endObject(); + } + + public static IndexersMetaData fromXContent(XContentParser parser, @Nullable Settings globalSettings) throws IOException { + Builder builder = new Builder(); + + XContentParser.Token token = parser.currentToken(); + String currentFieldName = parser.currentName(); + if (!"meta-data".equals(currentFieldName)) { + token = parser.nextToken(); + currentFieldName = parser.currentName(); + if (token == null) { + // no data... + return builder.build(); + } + } + + while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) { + if (token == XContentParser.Token.FIELD_NAME) { + currentFieldName = parser.currentName(); + } else if (token == XContentParser.Token.START_OBJECT) { + if ("indexers".equals(currentFieldName)) { + while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) { + builder.put(IndexerMetaData.Builder.fromXContent(parser, globalSettings)); + } + } + } + } + return builder.build(); + } + + public static IndexersMetaData readFrom(StreamInput in, @Nullable Settings globalSettings) throws IOException { + Builder builder = new Builder(); + // we only serialize it using readFrom, not in to/from XContent + builder.recoveredFromGateway = in.readBoolean(); + int size = in.readVInt(); + for (int i = 0; i < size; i++) { + builder.put(IndexerMetaData.Builder.readFrom(in, globalSettings)); + } + return builder.build(); + } + + public static void writeTo(IndexersMetaData metaData, StreamOutput out) throws IOException { + out.writeBoolean(metaData.recoveredFromGateway()); + out.writeVInt(metaData.indexers.size()); + for (IndexerMetaData indexMetaData : metaData) { + IndexerMetaData.Builder.writeTo(indexMetaData, out); + } + } + + } +} diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/indexer/routing/IndexerRouting.java b/modules/elasticsearch/src/main/java/org/elasticsearch/indexer/routing/IndexerRouting.java new file mode 100644 index 00000000000..4b454a6c877 --- /dev/null +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/indexer/routing/IndexerRouting.java @@ -0,0 +1,46 @@ +/* + * Licensed to Elastic Search and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Elastic Search licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.indexer.routing; + +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.indexer.IndexerName; + +/** + * @author kimchy (shay.banon) + */ +public class IndexerRouting { + + private IndexerName indexerName; + + private DiscoveryNode node; + + IndexerRouting(IndexerName indexerName, DiscoveryNode node) { + this.indexerName = indexerName; + this.node = node; + } + + public IndexerName indexerName() { + return indexerName; + } + + public DiscoveryNode node() { + return node; + } +} diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/indexer/routing/IndexersRouting.java b/modules/elasticsearch/src/main/java/org/elasticsearch/indexer/routing/IndexersRouting.java new file mode 100644 index 00000000000..cfd27343fd1 --- /dev/null +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/indexer/routing/IndexersRouting.java @@ -0,0 +1,89 @@ +/* + * Licensed to Elastic Search and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Elastic Search licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.indexer.routing; + +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.common.collect.ImmutableMap; +import org.elasticsearch.common.collect.MapBuilder; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.indexer.IndexerName; + +import java.io.IOException; +import java.util.Iterator; + +/** + * @author kimchy (shay.banon) + */ +public class IndexersRouting implements Iterable { + + private final ImmutableMap indexers; + + private IndexersRouting(ImmutableMap indexers) { + this.indexers = indexers; + } + + @Override public Iterator iterator() { + return indexers.values().iterator(); + } + + public static class Builder { + + private MapBuilder indexers = MapBuilder.newMapBuilder(); + + public Builder routing(IndexersRouting routing) { + indexers.putAll(routing.indexers); + return this; + } + + public Builder put(IndexerRouting routing) { + indexers.put(routing.indexerName(), routing); + return this; + } + + public Builder remove(IndexerRouting routing) { + indexers.remove(routing.indexerName()); + return this; + } + + public IndexersRouting build() { + return new IndexersRouting(indexers.immutableMap()); + } + + public static IndexersRouting readFrom(StreamInput in) throws IOException { + Builder builder = new Builder(); + int size = in.readVInt(); + for (int i = 0; i < size; i++) { + builder.put(new IndexerRouting(new IndexerName(in.readUTF(), in.readUTF()), DiscoveryNode.readNode(in))); + } + return builder.build(); + } + + public static void writeTo(IndexersRouting routing, StreamOutput out) throws IOException { + out.writeVInt(routing.indexers.size()); + for (IndexerRouting indexerRouting : routing) { + out.writeUTF(indexerRouting.indexerName().type()); + out.writeUTF(indexerRouting.indexerName().name()); + + indexerRouting.node().writeTo(out); + } + } + } +} diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/node/internal/InternalNode.java b/modules/elasticsearch/src/main/java/org/elasticsearch/node/internal/InternalNode.java index c95d1920c94..91d82d754ed 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/node/internal/InternalNode.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/node/internal/InternalNode.java @@ -54,8 +54,8 @@ import org.elasticsearch.gateway.GatewayModule; import org.elasticsearch.gateway.GatewayService; import org.elasticsearch.http.HttpServer; import org.elasticsearch.http.HttpServerModule; +import org.elasticsearch.indexer.IndexerManager; import org.elasticsearch.indexer.IndexersModule; -import org.elasticsearch.indexer.IndexersService; import org.elasticsearch.indices.IndicesModule; import org.elasticsearch.indices.IndicesService; import org.elasticsearch.indices.cluster.IndicesClusterStateService; @@ -171,7 +171,7 @@ public final class InternalNode implements Node { injector.getInstance(IndicesService.class).start(); injector.getInstance(IndicesClusterStateService.class).start(); - injector.getInstance(IndexersService.class).start(); + injector.getInstance(IndexerManager.class).start(); injector.getInstance(ClusterService.class).start(); injector.getInstance(RoutingService.class).start(); injector.getInstance(SearchService.class).start(); @@ -209,7 +209,7 @@ public final class InternalNode implements Node { injector.getInstance(MonitorService.class).stop(); injector.getInstance(GatewayService.class).stop(); injector.getInstance(SearchService.class).stop(); - injector.getInstance(IndexersService.class).stop(); + injector.getInstance(IndexerManager.class).stop(); injector.getInstance(IndicesClusterStateService.class).stop(); injector.getInstance(IndicesService.class).stop(); injector.getInstance(RestController.class).stop(); @@ -256,7 +256,7 @@ public final class InternalNode implements Node { stopWatch.stop().start("search"); injector.getInstance(SearchService.class).close(); stopWatch.stop().start("indexers"); - injector.getInstance(IndexersService.class).close(); + injector.getInstance(IndexerManager.class).close(); stopWatch.stop().start("indices_cluster"); injector.getInstance(IndicesClusterStateService.class).close(); stopWatch.stop().start("indices");