more work on indexer

This commit is contained in:
kimchy 2010-09-13 23:00:34 +02:00
parent c69e77d2c4
commit 686b59c33f
14 changed files with 930 additions and 5 deletions

View File

@ -0,0 +1,57 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.indexer;
import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.indexer.cluster.IndexerClusterService;
/**
* @author kimchy (shay.banon)
*/
public class IndexerManager extends AbstractLifecycleComponent<IndexerManager> {
private final IndexersService indexersService;
private final IndexerClusterService clusterService;
@Inject public IndexerManager(Settings settings, IndexersService indexersService, IndexerClusterService clusterService) {
super(settings);
this.indexersService = indexersService;
this.clusterService = clusterService;
}
@Override protected void doStart() throws ElasticSearchException {
indexersService.start();
clusterService.start();
}
@Override protected void doStop() throws ElasticSearchException {
clusterService.stop();
indexersService.stop();
}
@Override protected void doClose() throws ElasticSearchException {
clusterService.close();
indexersService.close();
}
}

View File

@ -21,6 +21,7 @@ package org.elasticsearch.indexer;
import org.elasticsearch.common.inject.AbstractModule;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.indexer.cluster.IndexerClusterService;
/**
* @author kimchy (shay.banon)
@ -35,5 +36,7 @@ public class IndexersModule extends AbstractModule {
@Override protected void configure() {
bind(IndexersService.class).asEagerSingleton();
bind(IndexerClusterService.class).asEagerSingleton();
bind(IndexerManager.class).asEagerSingleton();
}
}

View File

@ -0,0 +1,53 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.indexer.cluster;
/**
* @author kimchy (shay.banon)
*/
public class IndexerClusterChangedEvent {
private final String source;
private final IndexerClusterState previousState;
private final IndexerClusterState state;
public IndexerClusterChangedEvent(String source, IndexerClusterState state, IndexerClusterState previousState) {
this.source = source;
this.state = state;
this.previousState = previousState;
}
/**
* The source that caused this cluster event to be raised.
*/
public String source() {
return this.source;
}
public IndexerClusterState state() {
return this.state;
}
public IndexerClusterState previousState() {
return this.previousState;
}
}

View File

@ -0,0 +1,160 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.indexer.cluster;
import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.transport.TransportService;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import static java.util.concurrent.Executors.*;
import static org.elasticsearch.common.util.concurrent.EsExecutors.*;
/**
* @author kimchy (shay.banon)
*/
public class IndexerClusterService extends AbstractLifecycleComponent<IndexerClusterService> {
private final TransportService transportService;
private final ClusterService clusterService;
private final PublishIndexerClusterStateAction publishAction;
private final List<IndexerClusterStateListener> clusterStateListeners = new CopyOnWriteArrayList<IndexerClusterStateListener>();
private volatile ExecutorService updateTasksExecutor;
private volatile IndexerClusterState clusterState = IndexerClusterState.builder().build();
@Inject public IndexerClusterService(Settings settings, TransportService transportService, ClusterService clusterService) {
super(settings);
this.transportService = transportService;
this.clusterService = clusterService;
this.publishAction = new PublishIndexerClusterStateAction(settings, transportService, clusterService, new UpdateClusterStateListener());
}
@Override protected void doStart() throws ElasticSearchException {
this.updateTasksExecutor = newSingleThreadExecutor(daemonThreadFactory(settings, "indexerClusterService#updateTask"));
}
@Override protected void doStop() throws ElasticSearchException {
updateTasksExecutor.shutdown();
try {
updateTasksExecutor.awaitTermination(10, TimeUnit.SECONDS);
} catch (InterruptedException e) {
// ignore
}
}
@Override protected void doClose() throws ElasticSearchException {
}
public void add(IndexerClusterStateListener listener) {
clusterStateListeners.add(listener);
}
public void remove(IndexerClusterStateListener listener) {
clusterStateListeners.remove(listener);
}
public void submitStateUpdateTask(final String source, final IndexerClusterStateUpdateTask updateTask) {
if (!lifecycle.started()) {
return;
}
updateTasksExecutor.execute(new Runnable() {
@Override public void run() {
if (!lifecycle.started()) {
logger.debug("processing [{}]: ignoring, cluster_service not started", source);
return;
}
logger.debug("processing [{}]: execute", source);
IndexerClusterState previousClusterState = clusterState;
try {
clusterState = updateTask.execute(previousClusterState);
} catch (Exception e) {
StringBuilder sb = new StringBuilder("failed to execute cluster state update, state:\nversion [").append(clusterState.version()).append("], source [").append(source).append("]\n");
logger.warn(sb.toString(), e);
return;
}
if (previousClusterState != clusterState) {
if (clusterService.state().nodes().localNodeMaster()) {
// only the master controls the version numbers
clusterState = new IndexerClusterState(clusterState.version() + 1, clusterState);
} else {
// we got this cluster state from the master, filter out based on versions (don't call listeners)
if (clusterState.version() < previousClusterState.version()) {
logger.debug("got old cluster state [" + clusterState.version() + "<" + previousClusterState.version() + "] from source [" + source + "], ignoring");
return;
}
}
if (logger.isTraceEnabled()) {
StringBuilder sb = new StringBuilder("cluster state updated:\nversion [").append(clusterState.version()).append("], source [").append(source).append("]\n");
logger.trace(sb.toString());
} else if (logger.isDebugEnabled()) {
logger.debug("cluster state updated, version [{}], source [{}]", clusterState.version(), source);
}
IndexerClusterChangedEvent clusterChangedEvent = new IndexerClusterChangedEvent(source, clusterState, previousClusterState);
for (IndexerClusterStateListener listener : clusterStateListeners) {
listener.indexerClusterChanged(clusterChangedEvent);
}
// if we are the master, publish the new state to all nodes
if (clusterService.state().nodes().localNodeMaster()) {
publishAction.publish(clusterState);
}
logger.debug("processing [{}]: done applying updated cluster_state", source);
} else {
logger.debug("processing [{}]: no change in cluster_state", source);
}
}
});
}
private class UpdateClusterStateListener implements PublishIndexerClusterStateAction.NewClusterStateListener {
@Override public void onNewClusterState(final IndexerClusterState clusterState) {
ClusterState state = clusterService.state();
if (!state.nodes().localNodeMaster()) {
logger.warn("master should not receive new cluster state from [{}]", state.nodes().masterNode());
return;
}
submitStateUpdateTask("received_state", new IndexerClusterStateUpdateTask() {
@Override public IndexerClusterState execute(IndexerClusterState currentState) {
return clusterState;
}
});
}
}
}

View File

@ -0,0 +1,121 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.indexer.cluster;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.indexer.metadata.IndexersMetaData;
import org.elasticsearch.indexer.routing.IndexersRouting;
import javax.annotation.Nullable;
import java.io.IOException;
/**
* @author kimchy (shay.banon)
*/
public class IndexerClusterState {
private final long version;
private final IndexersMetaData metaData;
private final IndexersRouting routing;
public IndexerClusterState(long version, IndexerClusterState state) {
this.version = version;
this.metaData = state.metaData();
this.routing = state.routing();
}
IndexerClusterState(long version, IndexersMetaData metaData, IndexersRouting routing) {
this.version = version;
this.metaData = metaData;
this.routing = routing;
}
public long version() {
return this.version;
}
public IndexersMetaData metaData() {
return metaData;
}
public IndexersRouting routing() {
return routing;
}
public static Builder builder() {
return new Builder();
}
public static class Builder {
private long version = 0;
private IndexersMetaData metaData;
private IndexersRouting routing;
public Builder state(IndexerClusterState state) {
this.version = state.version();
this.metaData = state.metaData();
this.routing = state.routing();
return this;
}
public Builder metaData(IndexersMetaData.Builder builder) {
return metaData(builder.build());
}
public Builder metaData(IndexersMetaData metaData) {
this.metaData = metaData;
return this;
}
public Builder routing(IndexersRouting.Builder builder) {
return routing(builder.build());
}
public Builder routing(IndexersRouting routing) {
this.routing = routing;
return this;
}
public IndexerClusterState build() {
return new IndexerClusterState(version, metaData, routing);
}
public static IndexerClusterState readFrom(StreamInput in, @Nullable Settings settings) throws IOException {
Builder builder = new Builder();
builder.version = in.readVLong();
builder.metaData = IndexersMetaData.Builder.readFrom(in, settings);
builder.routing = IndexersRouting.Builder.readFrom(in);
return builder.build();
}
public static void writeTo(IndexerClusterState clusterState, StreamOutput out) throws IOException {
out.writeVLong(clusterState.version);
IndexersMetaData.Builder.writeTo(clusterState.metaData, out);
IndexersRouting.Builder.writeTo(clusterState.routing, out);
}
}
}

View File

@ -0,0 +1,28 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.indexer.cluster;
/**
* @author kimchy (shay.banon)
*/
public interface IndexerClusterStateListener {
void indexerClusterChanged(IndexerClusterChangedEvent event);
}

View File

@ -0,0 +1,28 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.indexer.cluster;
/**
* @author kimchy (shay.banon)
*/
public interface IndexerClusterStateUpdateTask {
IndexerClusterState execute(IndexerClusterState currentState);
}

View File

@ -0,0 +1,44 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.indexer.cluster;
import org.elasticsearch.cluster.node.DiscoveryNode;
/**
* @author kimchy (shay.banon)
*/
public class IndexerNodeHelper {
public static boolean isIndexerNode(DiscoveryNode node) {
if (node.clientNode()) {
return false;
}
String indexer = node.attributes().get("indexer");
// by default, if not set, its an indexer node (better OOB exp)
if (indexer == null) {
return true;
}
if ("_none_".equals(indexer)) {
return false;
}
// there is at least one indexer settings, we need it
return true;
}
}

View File

@ -0,0 +1,131 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.indexer.cluster;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.common.io.stream.VoidStreamable;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.transport.*;
import java.io.IOException;
/**
* @author kimchy (shay.banon)
*/
public class PublishIndexerClusterStateAction extends AbstractComponent {
public static interface NewClusterStateListener {
void onNewClusterState(IndexerClusterState clusterState);
}
private final TransportService transportService;
private final ClusterService clusterService;
private final NewClusterStateListener listener;
public PublishIndexerClusterStateAction(Settings settings, TransportService transportService, ClusterService clusterService,
NewClusterStateListener listener) {
super(settings);
this.transportService = transportService;
this.clusterService = clusterService;
this.listener = listener;
transportService.registerHandler(PublishClusterStateRequestHandler.ACTION, new PublishClusterStateRequestHandler());
}
public void close() {
transportService.removeHandler(PublishClusterStateRequestHandler.ACTION);
}
public void publish(IndexerClusterState clusterState) {
final DiscoveryNodes discoNodes = clusterService.state().nodes();
for (final DiscoveryNode node : discoNodes) {
if (node.equals(discoNodes.localNode())) {
// no need to send to our self
continue;
}
// we only want to send nodes that are either possible master nodes or indexer nodes
// master nodes because they will handle the state and the allocation of indexers
// and indexer nodes since they will end up creating indexes
if (node.clientNode()) {
continue;
}
if (!node.masterNode() && !IndexerNodeHelper.isIndexerNode(node)) {
continue;
}
transportService.sendRequest(node, PublishClusterStateRequestHandler.ACTION, new PublishClusterStateRequest(clusterState), new VoidTransportResponseHandler(false) {
@Override public void handleException(RemoteTransportException exp) {
logger.debug("failed to send cluster state to [{}], should be detected as failed soon...", exp, node);
}
});
}
}
private class PublishClusterStateRequest implements Streamable {
private IndexerClusterState clusterState;
private PublishClusterStateRequest() {
}
private PublishClusterStateRequest(IndexerClusterState clusterState) {
this.clusterState = clusterState;
}
@Override public void readFrom(StreamInput in) throws IOException {
clusterState = IndexerClusterState.Builder.readFrom(in, settings);
}
@Override public void writeTo(StreamOutput out) throws IOException {
IndexerClusterState.Builder.writeTo(clusterState, out);
}
}
private class PublishClusterStateRequestHandler extends BaseTransportRequestHandler<PublishClusterStateRequest> {
static final String ACTION = "indexer/state/publish";
@Override public PublishClusterStateRequest newInstance() {
return new PublishClusterStateRequest();
}
@Override public void messageReceived(PublishClusterStateRequest request, TransportChannel channel) throws Exception {
listener.onNewClusterState(request.clusterState);
channel.sendResponse(VoidStreamable.INSTANCE);
}
/**
* No need to spawn, we add submit a new cluster state directly. This allows for faster application.
*/
@Override public boolean spawn() {
return false;
}
}
}

View File

@ -17,7 +17,7 @@
* under the License.
*/
package org.elasticsearch.cluster.metadata;
package org.elasticsearch.indexer.metadata;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;

View File

@ -0,0 +1,165 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.indexer.metadata;
import org.elasticsearch.common.collect.ImmutableMap;
import org.elasticsearch.common.collect.MapBuilder;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.*;
import org.elasticsearch.indexer.IndexerName;
import javax.annotation.Nullable;
import java.io.IOException;
import java.util.Iterator;
/**
* @author kimchy (shay.banon)
*/
public class IndexersMetaData implements Iterable<IndexerMetaData> {
private final ImmutableMap<IndexerName, IndexerMetaData> indexers;
private final boolean recoveredFromGateway;
private IndexersMetaData(ImmutableMap<IndexerName, IndexerMetaData> indexers, boolean recoveredFromGateway) {
this.indexers = indexers;
this.recoveredFromGateway = recoveredFromGateway;
}
@Override public Iterator<IndexerMetaData> iterator() {
return indexers.values().iterator();
}
public boolean recoveredFromGateway() {
return recoveredFromGateway;
}
public static class Builder {
private MapBuilder<IndexerName, IndexerMetaData> indexers = MapBuilder.newMapBuilder();
private boolean recoveredFromGateway = false;
public Builder put(IndexerMetaData.Builder builder) {
return put(builder.build());
}
public Builder put(IndexerMetaData indexerMetaData) {
indexers.put(indexerMetaData.indexerName(), indexerMetaData);
return this;
}
public IndexerMetaData get(IndexerName indexerName) {
return indexers.get(indexerName);
}
public Builder remove(IndexerName indexerName) {
indexers.remove(indexerName);
return this;
}
public Builder metaData(IndexersMetaData metaData) {
this.indexers.putAll(metaData.indexers);
this.recoveredFromGateway = metaData.recoveredFromGateway;
return this;
}
/**
* Indicates that this cluster state has been recovered from the gateawy.
*/
public Builder markAsRecoveredFromGateway() {
this.recoveredFromGateway = true;
return this;
}
public IndexersMetaData build() {
return new IndexersMetaData(indexers.immutableMap(), recoveredFromGateway);
}
public static String toXContent(IndexersMetaData metaData) throws IOException {
XContentBuilder builder = XContentFactory.contentBuilder(XContentType.JSON);
builder.startObject();
toXContent(metaData, builder, ToXContent.EMPTY_PARAMS);
builder.endObject();
return builder.string();
}
public static void toXContent(IndexersMetaData metaData, XContentBuilder builder, ToXContent.Params params) throws IOException {
builder.startObject("meta-data");
builder.startObject("indexers");
for (IndexerMetaData indexMetaData : metaData) {
IndexerMetaData.Builder.toXContent(indexMetaData, builder, params);
}
builder.endObject();
builder.endObject();
}
public static IndexersMetaData fromXContent(XContentParser parser, @Nullable Settings globalSettings) throws IOException {
Builder builder = new Builder();
XContentParser.Token token = parser.currentToken();
String currentFieldName = parser.currentName();
if (!"meta-data".equals(currentFieldName)) {
token = parser.nextToken();
currentFieldName = parser.currentName();
if (token == null) {
// no data...
return builder.build();
}
}
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
if (token == XContentParser.Token.FIELD_NAME) {
currentFieldName = parser.currentName();
} else if (token == XContentParser.Token.START_OBJECT) {
if ("indexers".equals(currentFieldName)) {
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
builder.put(IndexerMetaData.Builder.fromXContent(parser, globalSettings));
}
}
}
}
return builder.build();
}
public static IndexersMetaData readFrom(StreamInput in, @Nullable Settings globalSettings) throws IOException {
Builder builder = new Builder();
// we only serialize it using readFrom, not in to/from XContent
builder.recoveredFromGateway = in.readBoolean();
int size = in.readVInt();
for (int i = 0; i < size; i++) {
builder.put(IndexerMetaData.Builder.readFrom(in, globalSettings));
}
return builder.build();
}
public static void writeTo(IndexersMetaData metaData, StreamOutput out) throws IOException {
out.writeBoolean(metaData.recoveredFromGateway());
out.writeVInt(metaData.indexers.size());
for (IndexerMetaData indexMetaData : metaData) {
IndexerMetaData.Builder.writeTo(indexMetaData, out);
}
}
}
}

View File

@ -0,0 +1,46 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.indexer.routing;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.indexer.IndexerName;
/**
* @author kimchy (shay.banon)
*/
public class IndexerRouting {
private IndexerName indexerName;
private DiscoveryNode node;
IndexerRouting(IndexerName indexerName, DiscoveryNode node) {
this.indexerName = indexerName;
this.node = node;
}
public IndexerName indexerName() {
return indexerName;
}
public DiscoveryNode node() {
return node;
}
}

View File

@ -0,0 +1,89 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.indexer.routing;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.collect.ImmutableMap;
import org.elasticsearch.common.collect.MapBuilder;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.indexer.IndexerName;
import java.io.IOException;
import java.util.Iterator;
/**
* @author kimchy (shay.banon)
*/
public class IndexersRouting implements Iterable<IndexerRouting> {
private final ImmutableMap<IndexerName, IndexerRouting> indexers;
private IndexersRouting(ImmutableMap<IndexerName, IndexerRouting> indexers) {
this.indexers = indexers;
}
@Override public Iterator<IndexerRouting> iterator() {
return indexers.values().iterator();
}
public static class Builder {
private MapBuilder<IndexerName, IndexerRouting> indexers = MapBuilder.newMapBuilder();
public Builder routing(IndexersRouting routing) {
indexers.putAll(routing.indexers);
return this;
}
public Builder put(IndexerRouting routing) {
indexers.put(routing.indexerName(), routing);
return this;
}
public Builder remove(IndexerRouting routing) {
indexers.remove(routing.indexerName());
return this;
}
public IndexersRouting build() {
return new IndexersRouting(indexers.immutableMap());
}
public static IndexersRouting readFrom(StreamInput in) throws IOException {
Builder builder = new Builder();
int size = in.readVInt();
for (int i = 0; i < size; i++) {
builder.put(new IndexerRouting(new IndexerName(in.readUTF(), in.readUTF()), DiscoveryNode.readNode(in)));
}
return builder.build();
}
public static void writeTo(IndexersRouting routing, StreamOutput out) throws IOException {
out.writeVInt(routing.indexers.size());
for (IndexerRouting indexerRouting : routing) {
out.writeUTF(indexerRouting.indexerName().type());
out.writeUTF(indexerRouting.indexerName().name());
indexerRouting.node().writeTo(out);
}
}
}
}

View File

@ -54,8 +54,8 @@ import org.elasticsearch.gateway.GatewayModule;
import org.elasticsearch.gateway.GatewayService;
import org.elasticsearch.http.HttpServer;
import org.elasticsearch.http.HttpServerModule;
import org.elasticsearch.indexer.IndexerManager;
import org.elasticsearch.indexer.IndexersModule;
import org.elasticsearch.indexer.IndexersService;
import org.elasticsearch.indices.IndicesModule;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.indices.cluster.IndicesClusterStateService;
@ -171,7 +171,7 @@ public final class InternalNode implements Node {
injector.getInstance(IndicesService.class).start();
injector.getInstance(IndicesClusterStateService.class).start();
injector.getInstance(IndexersService.class).start();
injector.getInstance(IndexerManager.class).start();
injector.getInstance(ClusterService.class).start();
injector.getInstance(RoutingService.class).start();
injector.getInstance(SearchService.class).start();
@ -209,7 +209,7 @@ public final class InternalNode implements Node {
injector.getInstance(MonitorService.class).stop();
injector.getInstance(GatewayService.class).stop();
injector.getInstance(SearchService.class).stop();
injector.getInstance(IndexersService.class).stop();
injector.getInstance(IndexerManager.class).stop();
injector.getInstance(IndicesClusterStateService.class).stop();
injector.getInstance(IndicesService.class).stop();
injector.getInstance(RestController.class).stop();
@ -256,7 +256,7 @@ public final class InternalNode implements Node {
stopWatch.stop().start("search");
injector.getInstance(SearchService.class).close();
stopWatch.stop().start("indexers");
injector.getInstance(IndexersService.class).close();
injector.getInstance(IndexerManager.class).close();
stopWatch.stop().start("indices_cluster");
injector.getInstance(IndicesClusterStateService.class).close();
stopWatch.stop().start("indices");