Merge branch 'master' into feature/query-refactoring

This commit is contained in:
Christoph Büscher 2015-06-17 11:54:00 +02:00
commit 9171ff0959
69 changed files with 955 additions and 2543 deletions

View File

@ -54,7 +54,6 @@ import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.engine.DocumentAlreadyExistsException;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.VersionConflictEngineException;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.mapper.Mapping;
import org.elasticsearch.index.mapper.SourceToParse;
import org.elasticsearch.index.shard.IndexShard;
@ -64,7 +63,6 @@ import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.node.NodeClosedException;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.river.RiverIndexName;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.*;
@ -1068,34 +1066,18 @@ public abstract class TransportReplicationAction<Request extends ReplicationRequ
protected final WriteResult<IndexResponse> executeIndexRequestOnPrimary(BulkShardRequest shardRequest, IndexRequest request, IndexShard indexShard) throws Throwable {
Engine.IndexingOperation operation = prepareIndexOperationOnPrimary(shardRequest, request, indexShard);
Mapping update = operation.parsedDoc().dynamicMappingsUpdate();
final boolean created;
final ShardId shardId = indexShard.shardId();
if (update != null) {
final String indexName = shardId.getIndex();
if (indexName.equals(RiverIndexName.Conf.indexName(settings))) {
// With rivers, we have a chicken and egg problem if indexing
// the _meta document triggers a mapping update. Because we would
// like to validate the mapping update first, but on the other
// hand putting the mapping would start the river, which expects
// to find a _meta document
// So we have no choice but to index first and send mappings afterwards
MapperService mapperService = indexShard.indexService().mapperService();
mapperService.merge(request.type(), new CompressedXContent(update.toBytes()), true);
created = operation.execute(indexShard);
mappingUpdatedAction.updateMappingOnMasterAsynchronously(indexName, request.type(), update);
} else {
mappingUpdatedAction.updateMappingOnMasterSynchronously(indexName, request.type(), update);
operation = prepareIndexOperationOnPrimary(shardRequest, request, indexShard);
update = operation.parsedDoc().dynamicMappingsUpdate();
if (update != null) {
throw new RetryOnPrimaryException(shardId,
"Dynamics mappings are not available on the node that holds the primary yet");
}
created = operation.execute(indexShard);
mappingUpdatedAction.updateMappingOnMasterSynchronously(indexName, request.type(), update);
operation = prepareIndexOperationOnPrimary(shardRequest, request, indexShard);
update = operation.parsedDoc().dynamicMappingsUpdate();
if (update != null) {
throw new RetryOnPrimaryException(shardId,
"Dynamics mappings are not available on the node that holds the primary yet");
}
} else {
created = operation.execute(indexShard);
}
final boolean created = operation.execute(indexShard);
// update the version on request so it will happen on the replicas
final long version = operation.version();

View File

@ -28,6 +28,7 @@ import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.*;
import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.cluster.routing.allocation.FailedRerouteAllocation;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.component.AbstractComponent;
@ -146,7 +147,7 @@ public class ShardStateAction extends AbstractComponent {
MetaData metaData = currentState.getMetaData();
List<ShardRouting> shardRoutingsToBeApplied = new ArrayList<>(shardRoutingEntries.size());
List<FailedRerouteAllocation.FailedShard> shardRoutingsToBeApplied = new ArrayList<>(shardRoutingEntries.size());
for (int i = 0; i < shardRoutingEntries.size(); i++) {
ShardRoutingEntry shardRoutingEntry = shardRoutingEntries.get(i);
shardRoutingEntry.processed = true;
@ -163,7 +164,7 @@ public class ShardStateAction extends AbstractComponent {
}
logger.debug("{} will apply shard failed {}", shardRouting.shardId(), shardRoutingEntry);
shardRoutingsToBeApplied.add(shardRouting);
shardRoutingsToBeApplied.add(new FailedRerouteAllocation.FailedShard(shardRouting, shardRoutingEntry.reason));
}
RoutingAllocation.Result routingResult = allocationService.applyFailedShards(currentState, shardRoutingsToBeApplied);

View File

@ -53,7 +53,6 @@ import org.elasticsearch.common.regex.Regex;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.env.Environment;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.mapper.DocumentMapper;
@ -62,7 +61,6 @@ import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.query.IndexQueryParserService;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.indices.*;
import org.elasticsearch.river.RiverIndexName;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.threadpool.ThreadPool;
@ -93,7 +91,6 @@ public class MetaDataCreateIndexService extends AbstractComponent {
private final AllocationService allocationService;
private final MetaDataService metaDataService;
private final Version version;
private final String riverIndexName;
private final AliasValidator aliasValidator;
private final IndexTemplateFilter indexTemplateFilter;
private final NodeEnvironment nodeEnv;
@ -101,7 +98,7 @@ public class MetaDataCreateIndexService extends AbstractComponent {
@Inject
public MetaDataCreateIndexService(Settings settings, ThreadPool threadPool, ClusterService clusterService,
IndicesService indicesService, AllocationService allocationService, MetaDataService metaDataService,
Version version, @RiverIndexName String riverIndexName, AliasValidator aliasValidator,
Version version, AliasValidator aliasValidator,
Set<IndexTemplateFilter> indexTemplateFilters, NodeEnvironment nodeEnv) {
super(settings);
this.threadPool = threadPool;
@ -110,7 +107,6 @@ public class MetaDataCreateIndexService extends AbstractComponent {
this.allocationService = allocationService;
this.metaDataService = metaDataService;
this.version = version;
this.riverIndexName = riverIndexName;
this.aliasValidator = aliasValidator;
this.nodeEnv = nodeEnv;
@ -163,7 +159,7 @@ public class MetaDataCreateIndexService extends AbstractComponent {
if (index.contains("#")) {
throw new InvalidIndexNameException(new Index(index), index, "must not contain '#'");
}
if (!index.equals(riverIndexName) && index.charAt(0) == '_') {
if (index.charAt(0) == '_') {
throw new InvalidIndexNameException(new Index(index), index, "must not start with '_'");
}
if (!index.toLowerCase(Locale.ROOT).equals(index)) {
@ -306,11 +302,7 @@ public class MetaDataCreateIndexService extends AbstractComponent {
indexSettingsBuilder.put(SETTING_NUMBER_OF_SHARDS, settings.getAsInt(SETTING_NUMBER_OF_SHARDS, 1));
} else {
if (indexSettingsBuilder.get(SETTING_NUMBER_OF_SHARDS) == null) {
if (request.index().equals(riverIndexName)) {
indexSettingsBuilder.put(SETTING_NUMBER_OF_SHARDS, settings.getAsInt(SETTING_NUMBER_OF_SHARDS, 1));
} else {
indexSettingsBuilder.put(SETTING_NUMBER_OF_SHARDS, settings.getAsInt(SETTING_NUMBER_OF_SHARDS, 5));
}
indexSettingsBuilder.put(SETTING_NUMBER_OF_SHARDS, settings.getAsInt(SETTING_NUMBER_OF_SHARDS, 5));
}
}
if (request.index().equals(ScriptService.SCRIPT_INDEX)) {
@ -318,11 +310,7 @@ public class MetaDataCreateIndexService extends AbstractComponent {
indexSettingsBuilder.put(SETTING_AUTO_EXPAND_REPLICAS, "0-all");
} else {
if (indexSettingsBuilder.get(SETTING_NUMBER_OF_REPLICAS) == null) {
if (request.index().equals(riverIndexName)) {
indexSettingsBuilder.put(SETTING_NUMBER_OF_REPLICAS, settings.getAsInt(SETTING_NUMBER_OF_REPLICAS, 1));
} else {
indexSettingsBuilder.put(SETTING_NUMBER_OF_REPLICAS, settings.getAsInt(SETTING_NUMBER_OF_REPLICAS, 1));
}
indexSettingsBuilder.put(SETTING_NUMBER_OF_REPLICAS, settings.getAsInt(SETTING_NUMBER_OF_REPLICAS, 1));
}
}

View File

@ -180,7 +180,7 @@ public class MetaDataIndexStateService extends AbstractComponent {
RoutingTable.Builder rtBuilder = RoutingTable.builder(updatedState.routingTable());
for (String index : indicesToOpen) {
rtBuilder.addAsRecovery(updatedState.metaData().index(index));
rtBuilder.addAsFromCloseToOpen(updatedState.metaData().index(index));
}
RoutingAllocation.Result routingResult = allocationService.reroute(ClusterState.builder(updatedState).routingTable(rtBuilder).build());

View File

@ -20,6 +20,8 @@
package org.elasticsearch.cluster.routing;
import com.google.common.collect.ImmutableList;
import org.elasticsearch.Version;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable;
@ -53,6 +55,8 @@ public class ImmutableShardRouting implements Streamable, Serializable, ShardRou
protected RestoreSource restoreSource;
protected UnassignedInfo unassignedInfo;
private final transient ImmutableList<ShardRouting> asList;
ImmutableShardRouting() {
@ -64,7 +68,7 @@ public class ImmutableShardRouting implements Streamable, Serializable, ShardRou
}
public ImmutableShardRouting(ShardRouting copy, long version) {
this(copy.index(), copy.id(), copy.currentNodeId(), copy.relocatingNodeId(), copy.restoreSource(), copy.primary(), copy.state(), version);
this(copy.index(), copy.id(), copy.currentNodeId(), copy.relocatingNodeId(), copy.restoreSource(), copy.primary(), copy.state(), version, copy.unassignedInfo());
}
public ImmutableShardRouting(String index, int shardId, String currentNodeId, boolean primary, ShardRoutingState state, long version) {
@ -78,6 +82,12 @@ public class ImmutableShardRouting implements Streamable, Serializable, ShardRou
public ImmutableShardRouting(String index, int shardId, String currentNodeId,
String relocatingNodeId, RestoreSource restoreSource, boolean primary, ShardRoutingState state, long version) {
this(index, shardId, currentNodeId, relocatingNodeId, restoreSource, primary, state, version, null);
}
public ImmutableShardRouting(String index, int shardId, String currentNodeId,
String relocatingNodeId, RestoreSource restoreSource, boolean primary, ShardRoutingState state, long version,
UnassignedInfo unassignedInfo) {
this.index = index;
this.shardId = shardId;
this.currentNodeId = currentNodeId;
@ -87,6 +97,8 @@ public class ImmutableShardRouting implements Streamable, Serializable, ShardRou
this.asList = ImmutableList.of((ShardRouting) this);
this.version = version;
this.restoreSource = restoreSource;
this.unassignedInfo = unassignedInfo;
assert !(state == ShardRoutingState.UNASSIGNED && unassignedInfo == null) : "unassigned shard must be created with meta";
}
@Override
@ -167,6 +179,12 @@ public class ImmutableShardRouting implements Streamable, Serializable, ShardRou
return restoreSource;
}
@Override
@Nullable
public UnassignedInfo unassignedInfo() {
return unassignedInfo;
}
@Override
public boolean primary() {
return this.primary;
@ -224,6 +242,11 @@ public class ImmutableShardRouting implements Streamable, Serializable, ShardRou
state = ShardRoutingState.fromValue(in.readByte());
restoreSource = RestoreSource.readOptionalRestoreSource(in);
if (in.getVersion().onOrAfter(Version.V_1_7_0)) {
if (in.readBoolean()) {
unassignedInfo = new UnassignedInfo(in);
}
}
}
@Override
@ -263,6 +286,14 @@ public class ImmutableShardRouting implements Streamable, Serializable, ShardRou
} else {
out.writeBoolean(false);
}
if (out.getVersion().onOrAfter(Version.V_1_7_0)) {
if (unassignedInfo != null) {
out.writeBoolean(true);
unassignedInfo.writeTo(out);
} else {
out.writeBoolean(false);
}
}
}
@Override
@ -342,6 +373,9 @@ public class ImmutableShardRouting implements Streamable, Serializable, ShardRou
sb.append(", restoring[" + restoreSource + "]");
}
sb.append(", s[").append(state).append("]");
if (this.unassignedInfo != null) {
sb.append(", ").append(unassignedInfo.toString());
}
return sb.toString();
}
@ -358,6 +392,9 @@ public class ImmutableShardRouting implements Streamable, Serializable, ShardRou
builder.field("restore_source");
restoreSource().toXContent(builder, params);
}
if (unassignedInfo != null) {
unassignedInfo.toXContent(builder, params);
}
return builder.endObject();
}
}

View File

@ -37,7 +37,6 @@ import org.elasticsearch.index.shard.ShardId;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Set;
@ -394,34 +393,48 @@ public class IndexRoutingTable extends AbstractDiffable<IndexRoutingTable> imple
* Initializes a new empty index, as if it was created from an API.
*/
public Builder initializeAsNew(IndexMetaData indexMetaData) {
return initializeEmpty(indexMetaData, true);
return initializeEmpty(indexMetaData, true, new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, null));
}
/**
* Initializes a new empty index, as if it was created from an API.
*/
public Builder initializeAsRecovery(IndexMetaData indexMetaData) {
return initializeEmpty(indexMetaData, false);
return initializeEmpty(indexMetaData, false, new UnassignedInfo(UnassignedInfo.Reason.CLUSTER_RECOVERED, null));
}
/**
* Initializes a new index caused by dangling index imported.
*/
public Builder initializeAsFromDangling(IndexMetaData indexMetaData) {
return initializeEmpty(indexMetaData, false, new UnassignedInfo(UnassignedInfo.Reason.DANGLING_INDEX_IMPORTED, null));
}
/**
* Initializes a new empty index, as as a result of opening a closed index.
*/
public Builder initializeAsFromCloseToOpen(IndexMetaData indexMetaData) {
return initializeEmpty(indexMetaData, false, new UnassignedInfo(UnassignedInfo.Reason.INDEX_REOPENED, null));
}
/**
* Initializes a new empty index, to be restored from a snapshot
*/
public Builder initializeAsNewRestore(IndexMetaData indexMetaData, RestoreSource restoreSource, IntSet ignoreShards) {
return initializeAsRestore(indexMetaData, restoreSource, ignoreShards, true);
return initializeAsRestore(indexMetaData, restoreSource, ignoreShards, true, new UnassignedInfo(UnassignedInfo.Reason.NEW_INDEX_RESTORED, "restore_source[" + restoreSource.snapshotId().getRepository() + "/" + restoreSource.snapshotId().getSnapshot() + "]"));
}
/**
* Initializes an existing index, to be restored from a snapshot
*/
public Builder initializeAsRestore(IndexMetaData indexMetaData, RestoreSource restoreSource) {
return initializeAsRestore(indexMetaData, restoreSource, null, false);
return initializeAsRestore(indexMetaData, restoreSource, null, false, new UnassignedInfo(UnassignedInfo.Reason.EXISTING_INDEX_RESTORED, "restore_source[" + restoreSource.snapshotId().getRepository() + "/" + restoreSource.snapshotId().getSnapshot() + "]"));
}
/**
* Initializes an index, to be restored from snapshot
*/
private Builder initializeAsRestore(IndexMetaData indexMetaData, RestoreSource restoreSource, IntSet ignoreShards, boolean asNew) {
private Builder initializeAsRestore(IndexMetaData indexMetaData, RestoreSource restoreSource, IntSet ignoreShards, boolean asNew, UnassignedInfo unassignedInfo) {
if (!shards.isEmpty()) {
throw new IllegalStateException("trying to initialize an index with fresh shards, but already has shards created");
}
@ -430,9 +443,9 @@ public class IndexRoutingTable extends AbstractDiffable<IndexRoutingTable> imple
for (int i = 0; i <= indexMetaData.numberOfReplicas(); i++) {
if (asNew && ignoreShards.contains(shardId)) {
// This shards wasn't completely snapshotted - restore it as new shard
indexShardRoutingBuilder.addShard(new ImmutableShardRouting(index, shardId, null, i == 0, ShardRoutingState.UNASSIGNED, 0));
indexShardRoutingBuilder.addShard(new ImmutableShardRouting(index, shardId, null, null, null, i == 0, ShardRoutingState.UNASSIGNED, 0, unassignedInfo));
} else {
indexShardRoutingBuilder.addShard(new ImmutableShardRouting(index, shardId, null, null, i == 0 ? restoreSource : null, i == 0, ShardRoutingState.UNASSIGNED, 0));
indexShardRoutingBuilder.addShard(new ImmutableShardRouting(index, shardId, null, null, i == 0 ? restoreSource : null, i == 0, ShardRoutingState.UNASSIGNED, 0, unassignedInfo));
}
}
shards.put(shardId, indexShardRoutingBuilder.build());
@ -443,14 +456,14 @@ public class IndexRoutingTable extends AbstractDiffable<IndexRoutingTable> imple
/**
* Initializes a new empty index, with an option to control if its from an API or not.
*/
private Builder initializeEmpty(IndexMetaData indexMetaData, boolean asNew) {
private Builder initializeEmpty(IndexMetaData indexMetaData, boolean asNew, UnassignedInfo unassignedInfo) {
if (!shards.isEmpty()) {
throw new IllegalStateException("trying to initialize an index with fresh shards, but already has shards created");
}
for (int shardId = 0; shardId < indexMetaData.numberOfShards(); shardId++) {
IndexShardRoutingTable.Builder indexShardRoutingBuilder = new IndexShardRoutingTable.Builder(new ShardId(indexMetaData.index(), shardId), asNew ? false : true);
for (int i = 0; i <= indexMetaData.numberOfReplicas(); i++) {
indexShardRoutingBuilder.addShard(new ImmutableShardRouting(index, shardId, null, i == 0, ShardRoutingState.UNASSIGNED, 0));
indexShardRoutingBuilder.addShard(new ImmutableShardRouting(index, shardId, null, null, null, i == 0, ShardRoutingState.UNASSIGNED, 0, unassignedInfo));
}
shards.put(shardId, indexShardRoutingBuilder.build());
}
@ -461,7 +474,7 @@ public class IndexRoutingTable extends AbstractDiffable<IndexRoutingTable> imple
for (IntCursor cursor : shards.keys()) {
int shardId = cursor.value;
// version 0, will get updated when reroute will happen
ImmutableShardRouting shard = new ImmutableShardRouting(index, shardId, null, false, ShardRoutingState.UNASSIGNED, 0);
ImmutableShardRouting shard = new ImmutableShardRouting(index, shardId, null, null, null, false, ShardRoutingState.UNASSIGNED, 0, new UnassignedInfo(UnassignedInfo.Reason.REPLICA_ADDED, null));
shards.put(shardId,
new IndexShardRoutingTable.Builder(shards.get(shard.id())).addShard(shard).build()
);

View File

@ -44,12 +44,13 @@ public class MutableShardRouting extends ImmutableShardRouting {
/**
* Moves the shard to unassigned state.
*/
void moveToUnassigned() {
void moveToUnassigned(UnassignedInfo unassignedInfo) {
version++;
assert state != ShardRoutingState.UNASSIGNED;
state = ShardRoutingState.UNASSIGNED;
currentNodeId = null;
relocatingNodeId = null;
this.unassignedInfo = unassignedInfo;
}
/**
@ -120,6 +121,7 @@ public class MutableShardRouting extends ImmutableShardRouting {
relocatingNodeId = null;
restoreSource = null;
state = ShardRoutingState.STARTED;
unassignedInfo = null; // we keep the unassigned data until the shard is started
}
/**

View File

@ -795,10 +795,10 @@ public class RoutingNodes implements Iterable<RoutingNode> {
return iterable.iterator();
}
public void moveToUnassigned() {
public void moveToUnassigned(UnassignedInfo unassignedInfo) {
remove();
MutableShardRouting unassigned = new MutableShardRouting(shard); // protective copy of the mutable shard
unassigned.moveToUnassigned();
unassigned.moveToUnassigned(unassignedInfo);
unassigned().add(unassigned);
}
}

View File

@ -440,6 +440,24 @@ public class RoutingTable implements Iterable<IndexRoutingTable>, Diffable<Routi
return this;
}
public Builder addAsFromDangling(IndexMetaData indexMetaData) {
if (indexMetaData.state() == IndexMetaData.State.OPEN) {
IndexRoutingTable.Builder indexRoutingBuilder = new IndexRoutingTable.Builder(indexMetaData.index())
.initializeAsFromDangling(indexMetaData);
add(indexRoutingBuilder);
}
return this;
}
public Builder addAsFromCloseToOpen(IndexMetaData indexMetaData) {
if (indexMetaData.state() == IndexMetaData.State.OPEN) {
IndexRoutingTable.Builder indexRoutingBuilder = new IndexRoutingTable.Builder(indexMetaData.index())
.initializeAsFromCloseToOpen(indexMetaData);
add(indexRoutingBuilder);
}
return this;
}
public Builder addAsRestore(IndexMetaData indexMetaData, RestoreSource restoreSource) {
IndexRoutingTable.Builder indexRoutingBuilder = new IndexRoutingTable.Builder(indexMetaData.index())
.initializeAsRestore(indexMetaData, restoreSource);

View File

@ -128,6 +128,12 @@ public interface ShardRouting extends Streamable, Serializable, ToXContent {
*/
RestoreSource restoreSource();
/**
* Additional metadata on why the shard is/was unassigned. The metadata is kept around
* until the shard moves to STARTED.
*/
UnassignedInfo unassignedInfo();
/**
* Returns <code>true</code> iff this shard is a primary.
*/

View File

@ -0,0 +1,167 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.cluster.routing;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.joda.FormatDateTimeFormatter;
import org.elasticsearch.common.joda.Joda;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import java.io.IOException;
/**
* Holds additional information as to why the shard is in unassigned state.
*/
public class UnassignedInfo implements ToXContent, Writeable<UnassignedInfo> {
public static final FormatDateTimeFormatter DATE_TIME_FORMATTER = Joda.forPattern("dateOptionalTime");
/**
* Reason why the shard is in unassigned state.
* <p/>
* Note, ordering of the enum is important, make sure to add new values
* at the end and handle version serialization properly.
*/
public enum Reason {
/**
* Unassigned as a result of an API creation of an index.
*/
INDEX_CREATED,
/**
* Unassigned as a result of a full cluster recovery.
*/
CLUSTER_RECOVERED,
/**
* Unassigned as a result of opening a closed index.
*/
INDEX_REOPENED,
/**
* Unassigned as a result of importing a dangling index.
*/
DANGLING_INDEX_IMPORTED,
/**
* Unassigned as a result of restoring into a new index.
*/
NEW_INDEX_RESTORED,
/**
* Unassigned as a result of restoring into a closed index.
*/
EXISTING_INDEX_RESTORED,
/**
* Unassigned as a result of explicit addition of a replica.
*/
REPLICA_ADDED,
/**
* Unassigned as a result of a failed allocation of the shard.
*/
ALLOCATION_FAILED,
/**
* Unassigned as a result of the node hosting it leaving the cluster.
*/
NODE_LEFT,
/**
* Unassigned as a result of explicit cancel reroute command.
*/
REROUTE_CANCELLED;
}
private final Reason reason;
private final long timestamp;
private final String details;
public UnassignedInfo(Reason reason, String details) {
this(reason, System.currentTimeMillis(), details);
}
private UnassignedInfo(Reason reason, long timestamp, String details) {
this.reason = reason;
this.timestamp = timestamp;
this.details = details;
}
UnassignedInfo(StreamInput in) throws IOException {
this.reason = Reason.values()[(int) in.readByte()];
this.timestamp = in.readLong();
this.details = in.readOptionalString();
}
public void writeTo(StreamOutput out) throws IOException {
out.writeByte((byte) reason.ordinal());
out.writeLong(timestamp);
out.writeOptionalString(details);
}
public UnassignedInfo readFrom(StreamInput in) throws IOException {
return new UnassignedInfo(in);
}
/**
* The reason why the shard is unassigned.
*/
public Reason getReason() {
return this.reason;
}
/**
* The timestamp in milliseconds since epoch. Note, we use timestamp here since
* we want to make sure its preserved across node serializations. Extra care need
* to be made if its used to calculate diff (handle negative values) in case of
* time drift.
*/
public long getTimestampInMillis() {
return this.timestamp;
}
/**
* Returns optional details explaining the reasons.
*/
@Nullable
public String getDetails() {
return this.details;
}
@Override
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append("unassigned_info[[reason=").append(reason).append("]");
sb.append(", at[").append(DATE_TIME_FORMATTER.printer().print(timestamp)).append("]");
if (details != null) {
sb.append(", details[").append(details).append("]");
}
sb.append("]");
return sb.toString();
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject("unassigned_info");
builder.field("reason", reason);
builder.field("at", DATE_TIME_FORMATTER.printer().print(timestamp));
if (details != null) {
builder.field("details", details);
}
builder.endObject();
return builder;
}
}

View File

@ -88,7 +88,7 @@ public class AllocationService extends AbstractComponent {
}
public RoutingAllocation.Result applyFailedShard(ClusterState clusterState, ShardRouting failedShard) {
return applyFailedShards(clusterState, ImmutableList.of(failedShard));
return applyFailedShards(clusterState, ImmutableList.of(new FailedRerouteAllocation.FailedShard(failedShard, null)));
}
/**
@ -96,14 +96,14 @@ public class AllocationService extends AbstractComponent {
* <p/>
* <p>If the same instance of the routing table is returned, then no change has been made.</p>
*/
public RoutingAllocation.Result applyFailedShards(ClusterState clusterState, List<ShardRouting> failedShards) {
public RoutingAllocation.Result applyFailedShards(ClusterState clusterState, List<FailedRerouteAllocation.FailedShard> failedShards) {
RoutingNodes routingNodes = clusterState.routingNodes();
// shuffle the unassigned nodes, just so we won't have things like poison failed shards
routingNodes.unassigned().shuffle();
FailedRerouteAllocation allocation = new FailedRerouteAllocation(allocationDeciders, routingNodes, clusterState.nodes(), failedShards, clusterInfoService.getClusterInfo());
boolean changed = false;
for (ShardRouting failedShard : failedShards) {
changed |= applyFailedShard(allocation, failedShard, true);
for (FailedRerouteAllocation.FailedShard failedShard : failedShards) {
changed |= applyFailedShard(allocation, failedShard.shard, true, new UnassignedInfo(UnassignedInfo.Reason.ALLOCATION_FAILED, failedShard.details));
}
if (!changed) {
return new RoutingAllocation.Result(false, clusterState.routingTable());
@ -288,7 +288,7 @@ public class AllocationService extends AbstractComponent {
}
}
for (ShardRouting shardToFail : shardsToFail) {
changed |= applyFailedShard(allocation, shardToFail, false);
changed |= applyFailedShard(allocation, shardToFail, false, new UnassignedInfo(UnassignedInfo.Reason.ALLOCATION_FAILED, "primary failed while replica initializing"));
}
// now, go over and elect a new primary if possible, not, from this code block on, if one is elected,
@ -348,8 +348,9 @@ public class AllocationService extends AbstractComponent {
}
changed = true;
// now, go over all the shards routing on the node, and fail them
UnassignedInfo unassignedInfo = new UnassignedInfo(UnassignedInfo.Reason.NODE_LEFT, "node_left[" + node.nodeId() + "]");
for (MutableShardRouting shardRouting : node.copyShards()) {
applyFailedShard(allocation, shardRouting, false);
applyFailedShard(allocation, shardRouting, false, unassignedInfo);
}
// its a dead node, remove it, note, its important to remove it *after* we apply failed shard
// since it relies on the fact that the RoutingNode exists in the list of nodes
@ -410,7 +411,7 @@ public class AllocationService extends AbstractComponent {
* Applies the relevant logic to handle a failed shard. Returns <tt>true</tt> if changes happened that
* require relocation.
*/
private boolean applyFailedShard(RoutingAllocation allocation, ShardRouting failedShard, boolean addToIgnoreList) {
private boolean applyFailedShard(RoutingAllocation allocation, ShardRouting failedShard, boolean addToIgnoreList, UnassignedInfo unassignedInfo) {
// create a copy of the failed shard, since we assume we can change possible references to it without
// changing the state of failed shard
failedShard = new ImmutableShardRouting(failedShard);
@ -474,7 +475,7 @@ public class AllocationService extends AbstractComponent {
// make sure we ignore this shard on the relevant node
allocation.addIgnoreShardForNode(failedShard.shardId(), failedShard.currentNodeId());
}
relocatingFromNode.moveToUnassigned();
relocatingFromNode.moveToUnassigned(unassignedInfo);
break;
}
}
@ -525,7 +526,7 @@ public class AllocationService extends AbstractComponent {
routingNodes.unassigned().addAll(shardsToMove);
}
node.moveToUnassigned();
node.moveToUnassigned(unassignedInfo);
break;
}
}

View File

@ -33,14 +33,28 @@ import java.util.List;
*/
public class FailedRerouteAllocation extends RoutingAllocation {
private final List<ShardRouting> failedShards;
/**
* A failed shard with the shard routing itself and an optional
* details on why it failed.
*/
public static class FailedShard {
public final ShardRouting shard;
public final String details;
public FailedRerouteAllocation(AllocationDeciders deciders, RoutingNodes routingNodes, DiscoveryNodes nodes, List<ShardRouting> failedShards, ClusterInfo clusterInfo) {
public FailedShard(ShardRouting shard, String details) {
this.shard = shard;
this.details = details;
}
}
private final List<FailedShard> failedShards;
public FailedRerouteAllocation(AllocationDeciders deciders, RoutingNodes routingNodes, DiscoveryNodes nodes, List<FailedShard> failedShards, ClusterInfo clusterInfo) {
super(deciders, routingNodes, nodes, clusterInfo);
this.failedShards = failedShards;
}
public List<ShardRouting> failedShards() {
public List<FailedShard> failedShards() {
return failedShards;
}
}

View File

@ -19,13 +19,9 @@
package org.elasticsearch.cluster.routing.allocation.command;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.MutableShardRouting;
import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.cluster.routing.RoutingNodes;
import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.cluster.routing.*;
import org.elasticsearch.cluster.routing.allocation.RerouteExplanation;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.cluster.routing.allocation.decider.Decision;
@ -199,7 +195,7 @@ public class CancelAllocationCommand implements AllocationCommand {
throw new IllegalArgumentException("[cancel_allocation] can't cancel " + shardId + " on node " +
discoNode + ", shard is primary and initializing its state");
}
it.moveToUnassigned();
it.moveToUnassigned(new UnassignedInfo(UnassignedInfo.Reason.REROUTE_CANCELLED, null));
// now, go and find the shard that is initializing on the target node, and cancel it as well...
RoutingNodes.RoutingNodeIterator initializingNode = allocation.routingNodes().routingNodeIter(shardRouting.relocatingNodeId());
if (initializingNode != null) {
@ -222,7 +218,7 @@ public class CancelAllocationCommand implements AllocationCommand {
throw new IllegalArgumentException("[cancel_allocation] can't cancel " + shardId + " on node " +
discoNode + ", shard is primary and started");
}
it.moveToUnassigned();
it.moveToUnassigned(new UnassignedInfo(UnassignedInfo.Reason.REROUTE_CANCELLED, null));
}
}
if (!found) {

View File

@ -24,7 +24,6 @@ import org.elasticsearch.common.Classes;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.river.RiverName;
import java.net.InetAddress;
import java.net.UnknownHostException;
@ -71,15 +70,6 @@ public class Loggers {
return getLogger(clazz, settings, Lists.asList(SPACE, index.name(), prefixes).toArray(new String[0]));
}
public static ESLogger getLogger(Class clazz, Settings settings, RiverName riverName, String... prefixes) {
List<String> l = Lists.newArrayList();
l.add(SPACE);
l.add(riverName.type());
l.add(riverName.name());
l.addAll(Lists.newArrayList(prefixes));
return getLogger(clazz, settings, l.toArray(new String[l.size()]));
}
public static ESLogger getLogger(Class clazz, Settings settings, String... prefixes) {
return getLogger(buildClassLoggerName(clazz), settings, prefixes);
}

View File

@ -130,9 +130,9 @@ public class GatewayAllocator extends AbstractComponent {
}
public void applyFailedShards(FailedRerouteAllocation allocation) {
for (ShardRouting shard : allocation.failedShards()) {
Releasables.close(asyncFetchStarted.remove(shard.shardId()));
Releasables.close(asyncFetchStore.remove(shard.shardId()));
for (FailedRerouteAllocation.FailedShard shard : allocation.failedShards()) {
Releasables.close(asyncFetchStarted.remove(shard.shard.shardId()));
Releasables.close(asyncFetchStore.remove(shard.shard.shardId()));
}
}

View File

@ -149,7 +149,7 @@ public class LocalAllocateDangledIndices extends AbstractComponent {
metaData.put(upgradedIndexMetaData, false);
blocks.addBlocks(upgradedIndexMetaData);
if (upgradedIndexMetaData.getState() == IndexMetaData.State.OPEN) {
routingTableBuilder.addAsRecovery(upgradedIndexMetaData);
routingTableBuilder.addAsFromDangling(upgradedIndexMetaData);
}
sb.append("[").append(upgradedIndexMetaData.index()).append("/").append(upgradedIndexMetaData.state()).append("]");
}

View File

@ -104,11 +104,9 @@ public abstract class DocValuesIndexFieldData {
if (BINARY_INDEX_FIELD_NAMES.contains(fieldNames.indexName())) {
assert numericType == null;
return new BinaryDVIndexFieldData(index, fieldNames, fieldType.fieldDataType());
} else if (NUMERIC_INDEX_FIELD_NAMES.contains(fieldNames.indexName())) {
assert !numericType.isFloatingPoint();
return new NumericDVIndexFieldData(index, fieldNames, fieldType.fieldDataType());
} else if (numericType != null) {
if (Version.indexCreated(indexSettings).onOrAfter(Version.V_1_4_0_Beta1)) {
if (TimestampFieldMapper.NAME.equals(fieldNames.indexName())
|| Version.indexCreated(indexSettings).onOrAfter(Version.V_1_4_0_Beta1)) {
return new SortedNumericDVIndexFieldData(index, fieldNames, numericType, fieldType.fieldDataType());
} else {
// prior to ES 1.4: multi-valued numerics were boxed inside a byte[] as BINARY

View File

@ -1,85 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.fielddata.plain;
import org.apache.lucene.index.DocValues;
import org.apache.lucene.index.LeafReader;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.NumericDocValues;
import org.apache.lucene.index.SortedNumericDocValues;
import org.apache.lucene.util.Accountable;
import org.apache.lucene.util.Bits;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.fielddata.FieldDataType;
import org.elasticsearch.index.fielddata.IndexFieldData.XFieldComparatorSource.Nested;
import org.elasticsearch.index.fielddata.IndexNumericFieldData;
import org.elasticsearch.index.fielddata.fieldcomparator.LongValuesComparatorSource;
import org.elasticsearch.index.mapper.MappedFieldType.Names;
import org.elasticsearch.search.MultiValueMode;
import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
public class NumericDVIndexFieldData extends DocValuesIndexFieldData implements IndexNumericFieldData {
public NumericDVIndexFieldData(Index index, Names fieldNames, FieldDataType fieldDataType) {
super(index, fieldNames, fieldDataType);
}
@Override
public AtomicLongFieldData load(LeafReaderContext context) {
final LeafReader reader = context.reader();
final String field = fieldNames.indexName();
return new AtomicLongFieldData(0) {
@Override
public SortedNumericDocValues getLongValues() {
try {
final NumericDocValues values = DocValues.getNumeric(reader, field);
final Bits docsWithField = DocValues.getDocsWithField(reader, field);
return DocValues.singleton(values, docsWithField);
} catch (IOException e) {
throw new IllegalStateException("Cannot load doc values", e);
}
}
@Override
public Collection<Accountable> getChildResources() {
return Collections.emptyList();
}
};
}
@Override
public AtomicLongFieldData loadDirect(LeafReaderContext context) throws Exception {
return load(context);
}
@Override
public org.elasticsearch.index.fielddata.IndexFieldData.XFieldComparatorSource comparatorSource(Object missingValue, MultiValueMode sortMode, Nested nested) {
return new LongValuesComparatorSource(this, missingValue, sortMode, nested);
}
@Override
public org.elasticsearch.index.fielddata.IndexNumericFieldData.NumericType getNumericType() {
return NumericType.LONG;
}
}

View File

@ -129,7 +129,7 @@ public class SortedNumericDVIndexFieldData extends DocValuesIndexFieldData imple
throw new IllegalStateException("Cannot load doc values", e);
}
}
@Override
public Collection<Accountable> getChildResources() {
return Collections.emptyList();

View File

@ -109,34 +109,30 @@ public class FuzzyQueryBuilder extends MultiTermQueryBuilder implements Boostabl
@Override
public void doXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject(NAME);
if (boost == -1 && fuzziness == null && prefixLength == null && queryName != null) {
builder.field(name, value);
} else {
builder.startObject(name);
builder.field("value", value);
if (boost != -1) {
builder.field("boost", boost);
}
if (transpositions != null) {
builder.field("transpositions", transpositions);
}
if (fuzziness != null) {
fuzziness.toXContent(builder, params);
}
if (prefixLength != null) {
builder.field("prefix_length", prefixLength);
}
if (maxExpansions != null) {
builder.field("max_expansions", maxExpansions);
}
if (rewrite != null) {
builder.field("rewrite", rewrite);
}
if (queryName != null) {
builder.field("_name", queryName);
}
builder.endObject();
builder.startObject(name);
builder.field("value", value);
if (boost != -1) {
builder.field("boost", boost);
}
if (transpositions != null) {
builder.field("transpositions", transpositions);
}
if (fuzziness != null) {
fuzziness.toXContent(builder, params);
}
if (prefixLength != null) {
builder.field("prefix_length", prefixLength);
}
if (maxExpansions != null) {
builder.field("max_expansions", maxExpansions);
}
if (rewrite != null) {
builder.field("rewrite", rewrite);
}
if (queryName != null) {
builder.field("_name", queryName);
}
builder.endObject();
builder.endObject();
}

View File

@ -79,7 +79,7 @@ public class PrefixQueryBuilder extends MultiTermQueryBuilder implements Boostab
@Override
public void doXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject(NAME);
if (boost == -1 && rewrite == null && queryName != null) {
if (boost == -1 && rewrite == null && queryName == null) {
builder.field(name, prefix);
} else {
builder.startObject(name);

View File

@ -100,28 +100,24 @@ public class RegexpQueryBuilder extends MultiTermQueryBuilder implements Boostab
@Override
public void doXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject(NAME);
if (boost == -1 && rewrite == null && queryName != null) {
builder.field(name, regexp);
} else {
builder.startObject(name);
builder.field("value", regexp);
if (flags != -1) {
builder.field("flags_value", flags);
}
if (maxDetermizedStatesSet) {
builder.field("max_determinized_states", maxDeterminizedStates);
}
if (boost != -1) {
builder.field("boost", boost);
}
if (rewrite != null) {
builder.field("rewrite", rewrite);
}
if (queryName != null) {
builder.field("name", queryName);
}
builder.endObject();
builder.startObject(name);
builder.field("value", regexp);
if (flags != -1) {
builder.field("flags_value", flags);
}
if (maxDetermizedStatesSet) {
builder.field("max_determinized_states", maxDeterminizedStates);
}
if (boost != -1) {
builder.field("boost", boost);
}
if (rewrite != null) {
builder.field("rewrite", rewrite);
}
if (queryName != null) {
builder.field("_name", queryName);
}
builder.endObject();
builder.endObject();
}

View File

@ -27,7 +27,6 @@ import org.apache.lucene.util.automaton.Operations;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.lucene.BytesRefs;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.index.mapper.FieldMapper;
import org.elasticsearch.index.mapper.MappedFieldType;
import org.elasticsearch.index.query.support.QueryParsers;
@ -85,6 +84,8 @@ public class RegexpQueryParser extends BaseQueryParserTemp {
maxDeterminizedStates = parser.intValue();
} else if ("flags_value".equals(currentFieldName)) {
flagsValue = parser.intValue();
} else if ("_name".equals(currentFieldName)) {
queryName = parser.text();
} else {
throw new QueryParsingException(parseContext, "[regexp] query does not support [" + currentFieldName + "]");
}

View File

@ -66,7 +66,7 @@ public class SpanFirstQueryBuilder extends QueryBuilder implements SpanQueryBuil
builder.field("boost", boost);
}
if (queryName != null) {
builder.field("name", queryName);
builder.field("_name", queryName);
}
builder.endObject();
}

View File

@ -89,7 +89,7 @@ public class WildcardQueryBuilder extends MultiTermQueryBuilder implements Boost
@Override
public void doXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject(NAME);
if (boost == -1 && rewrite == null && queryName != null) {
if (boost == -1 && rewrite == null && queryName == null) {
builder.field(name, wildcard);
} else {
builder.startObject(name);

View File

@ -35,7 +35,6 @@ import org.elasticsearch.index.fielddata.AtomicFieldData;
import org.elasticsearch.index.fielddata.FieldDataType;
import org.elasticsearch.index.fielddata.IndexFieldData;
import org.elasticsearch.index.fielddata.IndexFieldDataCache;
import org.elasticsearch.index.mapper.FieldMapper;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.mapper.MappedFieldType;
import org.elasticsearch.index.shard.ShardId;
@ -222,15 +221,22 @@ public class IndicesFieldDataCache extends AbstractComponent implements RemovalL
@Override
public void onClose(Object coreKey) {
cache.invalidate(new Key(this, coreKey));
// don't call cache.cleanUp here as it would have bad performance implications
}
@Override
public void onClose(IndexReader reader) {
cache.invalidate(new Key(this, reader.getCoreCacheKey()));
// don't call cache.cleanUp here as it would have bad performance implications
}
@Override
public void clear() {
for (Key key : cache.asMap().keySet()) {
if (key.indexCache.index.equals(index)) {
cache.invalidate(key);
}
}
// Note that cache invalidation in Guava does not immediately remove
// values from the cache. In the case of a cache with a rare write or
// read rate, it's possible for values to persist longer than desired.
@ -238,11 +244,11 @@ public class IndicesFieldDataCache extends AbstractComponent implements RemovalL
// Note this is intended by the Guava developers, see:
// https://code.google.com/p/guava-libraries/wiki/CachesExplained#Eviction
// (the "When Does Cleanup Happen" section)
for (Key key : cache.asMap().keySet()) {
if (key.indexCache.index.equals(index)) {
cache.invalidate(key);
}
}
// We call it explicitly here since it should be a "rare" operation, and
// if a user runs it he probably wants to see memory returned as soon as
// possible
cache.cleanUp();
}
@Override
@ -254,11 +260,16 @@ public class IndicesFieldDataCache extends AbstractComponent implements RemovalL
}
}
}
// we call cleanUp() because this is a manual operation, should happen
// rarely and probably means the user wants to see memory returned as
// soon as possible
cache.cleanUp();
}
@Override
public void clear(Object coreCacheKey) {
cache.invalidate(new Key(this, coreCacheKey));
// don't call cache.cleanUp here as it would have bad performance implications
}
}

View File

@ -81,8 +81,6 @@ import org.elasticsearch.plugins.PluginsService;
import org.elasticsearch.repositories.RepositoriesModule;
import org.elasticsearch.rest.RestController;
import org.elasticsearch.rest.RestModule;
import org.elasticsearch.river.RiversManager;
import org.elasticsearch.river.RiversModule;
import org.elasticsearch.script.ScriptModule;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.search.SearchModule;
@ -182,7 +180,6 @@ public class Node implements Releasable {
if (settings.getAsBoolean(HTTP_ENABLED, true)) {
modules.add(new HttpServerModule(settings));
}
modules.add(new RiversModule(settings));
modules.add(new IndicesModule(settings));
modules.add(new SearchModule(settings));
modules.add(new ActionModule(false));
@ -247,7 +244,6 @@ public class Node implements Releasable {
injector.getInstance(IndexingMemoryController.class).start();
injector.getInstance(IndicesClusterStateService.class).start();
injector.getInstance(IndicesTTLService.class).start();
injector.getInstance(RiversManager.class).start();
injector.getInstance(SnapshotsService.class).start();
injector.getInstance(TransportService.class).start();
injector.getInstance(ClusterService.class).start();
@ -289,8 +285,6 @@ public class Node implements Releasable {
injector.getInstance(HttpServer.class).stop();
}
injector.getInstance(RiversManager.class).stop();
injector.getInstance(SnapshotsService.class).stop();
// stop any changes happening as a result of cluster state changes
injector.getInstance(IndicesClusterStateService.class).stop();
@ -339,10 +333,6 @@ public class Node implements Releasable {
if (settings.getAsBoolean("http.enabled", true)) {
injector.getInstance(HttpServer.class).close();
}
stopWatch.stop().start("rivers");
injector.getInstance(RiversManager.class).close();
stopWatch.stop().start("snapshot_service");
injector.getInstance(SnapshotsService.class).close();
stopWatch.stop().start("client");

View File

@ -27,10 +27,12 @@ import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.UnassignedInfo;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.Table;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.rest.*;
import org.elasticsearch.rest.action.support.RestActionListener;
import org.elasticsearch.rest.action.support.RestResponseListener;
@ -89,6 +91,11 @@ public class RestShardsAction extends AbstractCatAction {
.addCell("id", "default:false;desc:unique id of node where it lives")
.addCell("node", "default:true;alias:n;desc:name of node where it lives");
table.addCell("unassigned.reason", "alias:ur;default:false;desc:reason shard is unassigned");
table.addCell("unassigned.at", "alias:ua;default:false;desc:time shard became unassigned (UTC)");
table.addCell("unassigned.for", "alias:uf;default:false;text-align:right;desc:time has been unassigned");
table.addCell("unassigned.details", "alias:ud;default:false;desc:additional details as to why the shard became unassigned");
table.addCell("completion.size", "alias:cs,completionSize;default:false;text-align:right;desc:size of completion");
table.addCell("fielddata.memory_size", "alias:fm,fielddataMemory;default:false;text-align:right;desc:used fielddata cache");
@ -208,6 +215,18 @@ public class RestShardsAction extends AbstractCatAction {
table.addCell(null);
}
if (shard.unassignedInfo() != null) {
table.addCell(shard.unassignedInfo().getReason());
table.addCell(UnassignedInfo.DATE_TIME_FORMATTER.printer().print(shard.unassignedInfo().getTimestampInMillis()));
table.addCell(TimeValue.timeValueMillis(System.currentTimeMillis() - shard.unassignedInfo().getTimestampInMillis()));
table.addCell(shard.unassignedInfo().getDetails());
} else {
table.addCell(null);
table.addCell(null);
table.addCell(null);
table.addCell(null);
}
table.addCell(shardStats == null ? null : shardStats.getCompletion().getSize());
table.addCell(shardStats == null ? null : shardStats.getFieldData().getMemorySize());

View File

@ -1,52 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.river;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.Loggers;
/**
* @deprecated See blog post https://www.elastic.co/blog/deprecating_rivers
*/
@Deprecated
public class AbstractRiverComponent implements RiverComponent {
protected final ESLogger logger;
protected final RiverName riverName;
protected final RiverSettings settings;
protected AbstractRiverComponent(RiverName riverName, RiverSettings settings) {
this.riverName = riverName;
this.settings = settings;
this.logger = Loggers.getLogger(getClass(), settings.globalSettings(), riverName);
}
@Override
public RiverName riverName() {
return riverName;
}
public String nodeName() {
return settings.globalSettings().get("name", "");
}
}

View File

@ -1,43 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.river;
/**
* Allows to import data into elasticsearch via plugin
* Gets allocated on a node and eventually automatically re-allocated if needed
* @deprecated See blog post https://www.elastic.co/blog/deprecating_rivers
*/
@Deprecated
public interface River extends RiverComponent {
/**
* Called whenever the river is registered on a node, which can happen when:
* 1) the river _meta document gets indexed
* 2) an already registered river gets started on a node
*/
void start();
/**
* Called when the river is closed on a node, which can happen when:
* 1) the river is deleted by deleting its type through the delete mapping api
* 2) the node where the river is allocated is shut down or the river gets rerouted to another node
*/
void close();
}

View File

@ -1,29 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.river;
/**
* @deprecated See blog post https://www.elastic.co/blog/deprecating_rivers
*/
@Deprecated
public interface RiverComponent {
RiverName riverName();
}

View File

@ -1,47 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.river;
import org.elasticsearch.ElasticsearchException;
/**
*
*/
public class RiverException extends ElasticsearchException {
private final RiverName river;
public RiverException(RiverName river, String msg) {
this(river, msg, null);
}
public RiverException(RiverName river, String msg, Throwable cause) {
this(river, true, msg, cause);
}
protected RiverException(RiverName river, boolean withSpace, String msg, Throwable cause) {
super("[" + river.type() + "][" + river.name() + "]" + (withSpace ? " " : "") + msg, cause);
this.river = river;
}
public RiverName riverName() {
return river;
}
}

View File

@ -1,50 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.river;
import org.elasticsearch.common.inject.BindingAnnotation;
import org.elasticsearch.common.settings.Settings;
import java.lang.annotation.Documented;
import java.lang.annotation.Retention;
import java.lang.annotation.Target;
import static java.lang.annotation.ElementType.FIELD;
import static java.lang.annotation.ElementType.PARAMETER;
import static java.lang.annotation.RetentionPolicy.RUNTIME;
/**
*
*/
@BindingAnnotation
@Target({FIELD, PARAMETER})
@Retention(RUNTIME)
@Documented
public @interface RiverIndexName {
static class Conf {
public static final String DEFAULT_INDEX_NAME = "_river";
public static String indexName(Settings settings) {
return settings.get("river.index_name", DEFAULT_INDEX_NAME);
}
}
}

View File

@ -1,93 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.river;
import com.google.common.collect.ImmutableList;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.inject.AbstractModule;
import org.elasticsearch.common.inject.Module;
import org.elasticsearch.common.inject.Modules;
import org.elasticsearch.common.inject.SpawnModules;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.settings.NoClassSettingsException;
import java.util.Locale;
import java.util.Map;
import static org.elasticsearch.common.Strings.toCamelCase;
/**
*
*/
public class RiverModule extends AbstractModule implements SpawnModules {
private RiverName riverName;
private final Settings globalSettings;
private final Map<String, Object> settings;
private final RiversTypesRegistry typesRegistry;
public RiverModule(RiverName riverName, Map<String, Object> settings, Settings globalSettings, RiversTypesRegistry typesRegistry) {
this.riverName = riverName;
this.globalSettings = globalSettings;
this.settings = settings;
this.typesRegistry = typesRegistry;
}
@Override
public Iterable<? extends Module> spawnModules() {
return ImmutableList.of(Modules.createModule(loadTypeModule(riverName.type(), "org.elasticsearch.river.", "RiverModule"), globalSettings));
}
@Override
protected void configure() {
bind(RiverSettings.class).toInstance(new RiverSettings(globalSettings, settings));
}
private Class<? extends Module> loadTypeModule(String type, String prefixPackage, String suffixClassName) {
Class<? extends Module> registered = typesRegistry.type(type);
if (registered != null) {
return registered;
}
String fullClassName = type;
try {
return (Class<? extends Module>) globalSettings.getClassLoader().loadClass(fullClassName);
} catch (ClassNotFoundException e) {
fullClassName = prefixPackage + Strings.capitalize(toCamelCase(type)) + suffixClassName;
try {
return (Class<? extends Module>) globalSettings.getClassLoader().loadClass(fullClassName);
} catch (ClassNotFoundException e1) {
fullClassName = prefixPackage + toCamelCase(type) + "." + Strings.capitalize(toCamelCase(type)) + suffixClassName;
try {
return (Class<? extends Module>) globalSettings.getClassLoader().loadClass(fullClassName);
} catch (ClassNotFoundException e2) {
fullClassName = prefixPackage + toCamelCase(type).toLowerCase(Locale.ROOT) + "." + Strings.capitalize(toCamelCase(type)) + suffixClassName;
try {
return (Class<? extends Module>) globalSettings.getClassLoader().loadClass(fullClassName);
} catch (ClassNotFoundException e3) {
throw new NoClassSettingsException("Failed to load class with value [" + type + "]", e);
}
}
}
}
}
}

View File

@ -1,74 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.river;
import java.io.Serializable;
/**
* @deprecated See blog post https://www.elastic.co/blog/deprecating_rivers
*/
@Deprecated
public class RiverName implements Serializable {
private final String type;
private final String name;
public RiverName(String type, String name) {
this.type = type;
this.name = name;
}
public String type() {
return this.type;
}
public String getType() {
return type();
}
public String name() {
return this.name;
}
public String getName() {
return name();
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
RiverName that = (RiverName) o;
if (name != null ? !name.equals(that.name) : that.name != null) return false;
if (type != null ? !type.equals(that.type) : that.type != null) return false;
return true;
}
@Override
public int hashCode() {
int result = type != null ? type.hashCode() : 0;
result = 31 * result + (name != null ? name.hashCode() : 0);
return result;
}
}

View File

@ -1,39 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.river;
import org.elasticsearch.common.inject.AbstractModule;
/**
*
*/
public class RiverNameModule extends AbstractModule {
private final RiverName riverName;
public RiverNameModule(RiverName riverName) {
this.riverName = riverName;
}
@Override
protected void configure() {
bind(RiverName.class).toInstance(riverName);
}
}

View File

@ -1,48 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.river;
import org.elasticsearch.common.settings.Settings;
import java.util.Map;
/**
* (shayy.banon)
*/
public class RiverSettings {
private final Settings globalSettings;
private final Map<String, Object> settings;
public RiverSettings(Settings globalSettings, Map<String, Object> settings) {
this.globalSettings = globalSettings;
this.settings = settings;
}
public Settings globalSettings() {
return globalSettings;
}
public Map<String, Object> settings() {
return settings;
}
}

View File

@ -1,67 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.river;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.river.cluster.RiverClusterService;
import org.elasticsearch.river.routing.RiversRouter;
/**
*
*/
public class RiversManager extends AbstractLifecycleComponent<RiversManager> {
private final RiversService riversService;
private final RiverClusterService clusterService;
private final RiversRouter riversRouter;
@Inject
public RiversManager(Settings settings, RiversService riversService, RiverClusterService clusterService, RiversRouter riversRouter) {
super(settings);
this.riversService = riversService;
this.clusterService = clusterService;
this.riversRouter = riversRouter;
}
@Override
protected void doStart() {
riversRouter.start();
riversService.start();
clusterService.start();
}
@Override
protected void doStop() {
riversRouter.stop();
clusterService.stop();
riversService.stop();
}
@Override
protected void doClose() {
riversRouter.close();
clusterService.close();
riversService.close();
}
}

View File

@ -1,64 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.river;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import org.elasticsearch.common.inject.AbstractModule;
import org.elasticsearch.common.inject.Module;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.river.cluster.RiverClusterService;
import org.elasticsearch.river.routing.RiversRouter;
import java.util.Map;
/**
*
*/
public class RiversModule extends AbstractModule {
private final Settings settings;
private Map<String, Class<? extends Module>> riverTypes = Maps.newHashMap();
public RiversModule(Settings settings) {
this.settings = settings;
}
/**
* Registers a custom river type name against a module.
*
* @param type The type
* @param module The module
*/
public void registerRiver(String type, Class<? extends Module> module) {
riverTypes.put(type, module);
}
@Override
protected void configure() {
bind(String.class).annotatedWith(RiverIndexName.class).toInstance(RiverIndexName.Conf.indexName(settings));
bind(RiversService.class).asEagerSingleton();
bind(RiverClusterService.class).asEagerSingleton();
bind(RiversRouter.class).asEagerSingleton();
bind(RiversManager.class).asEagerSingleton();
bind(RiversTypesRegistry.class).toInstance(new RiversTypesRegistry(ImmutableMap.copyOf(riverTypes)));
}
}

View File

@ -1,51 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.river;
import org.elasticsearch.common.inject.AbstractModule;
import org.elasticsearch.common.inject.Module;
import org.elasticsearch.common.inject.PreProcessModule;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.plugins.PluginsService;
/**
* A module that simply calls the {@link PluginsService#processModule(org.elasticsearch.common.inject.Module)}
* in order to allow plugins to pre process specific river modules.
*/
public class RiversPluginsModule extends AbstractModule implements PreProcessModule {
private final Settings settings;
private final PluginsService pluginsService;
public RiversPluginsModule(Settings settings, PluginsService pluginsService) {
this.settings = settings;
this.pluginsService = pluginsService;
}
@Override
public void processModule(Module module) {
pluginsService.processModule(module);
}
@Override
protected void configure() {
}
}

View File

@ -1,279 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.river;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Maps;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.WriteConsistencyLevel;
import org.elasticsearch.action.get.GetRequestBuilder;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.support.ThreadedActionListener;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.collect.MapBuilder;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.inject.Injector;
import org.elasticsearch.common.inject.ModulesBuilder;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.plugins.PluginsService;
import org.elasticsearch.river.cluster.RiverClusterChangedEvent;
import org.elasticsearch.river.cluster.RiverClusterService;
import org.elasticsearch.river.cluster.RiverClusterState;
import org.elasticsearch.river.cluster.RiverClusterStateListener;
import org.elasticsearch.river.routing.RiverRouting;
import org.elasticsearch.threadpool.ThreadPool;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import static org.elasticsearch.action.support.TransportActions.isShardNotAvailableException;
/**
*
*/
public class RiversService extends AbstractLifecycleComponent<RiversService> {
private final String riverIndexName;
private Client client;
private final ThreadPool threadPool;
private final ClusterService clusterService;
private final RiversTypesRegistry typesRegistry;
private final Injector injector;
private final Map<RiverName, Injector> riversInjectors = Maps.newHashMap();
private volatile ImmutableMap<RiverName, River> rivers = ImmutableMap.of();
@Inject
public RiversService(Settings settings, Client client, ThreadPool threadPool, ClusterService clusterService, RiversTypesRegistry typesRegistry, RiverClusterService riverClusterService, Injector injector) {
super(settings);
this.riverIndexName = RiverIndexName.Conf.indexName(settings);
this.client = client;
this.threadPool = threadPool;
this.clusterService = clusterService;
this.typesRegistry = typesRegistry;
this.injector = injector;
riverClusterService.add(new ApplyRivers());
}
@Override
protected void doStart() {
}
@Override
protected void doStop() {
ImmutableSet<RiverName> indices = ImmutableSet.copyOf(this.rivers.keySet());
final CountDownLatch latch = new CountDownLatch(indices.size());
for (final RiverName riverName : indices) {
threadPool.generic().execute(new Runnable() {
@Override
public void run() {
try {
closeRiver(riverName);
} catch (Exception e) {
logger.warn("failed to delete river on stop [{}]/[{}]", e, riverName.type(), riverName.name());
} finally {
latch.countDown();
}
}
});
}
try {
latch.await();
} catch (InterruptedException e) {
// ignore
}
}
@Override
protected void doClose() {
}
public synchronized void createRiver(RiverName riverName, Map<String, Object> settings) {
if (riversInjectors.containsKey(riverName)) {
logger.warn("ignoring river [{}][{}] creation, already exists", riverName.type(), riverName.name());
return;
}
logger.info("rivers have been deprecated. Read https://www.elastic.co/blog/deprecating_rivers");
logger.debug("creating river [{}][{}]", riverName.type(), riverName.name());
try {
ModulesBuilder modules = new ModulesBuilder();
modules.add(new RiverNameModule(riverName));
modules.add(new RiverModule(riverName, settings, this.settings, typesRegistry));
modules.add(new RiversPluginsModule(this.settings, injector.getInstance(PluginsService.class)));
Injector indexInjector = modules.createChildInjector(injector);
riversInjectors.put(riverName, indexInjector);
River river = indexInjector.getInstance(River.class);
rivers = MapBuilder.newMapBuilder(rivers).put(riverName, river).immutableMap();
// we need this start so there can be operations done (like creating an index) which can't be
// done on create since Guice can't create two concurrent child injectors
river.start();
XContentBuilder builder = XContentFactory.jsonBuilder().startObject();
builder.startObject("node");
builder.field("id", clusterService.localNode().id());
builder.field("name", clusterService.localNode().name());
builder.field("transport_address", clusterService.localNode().address().toString());
builder.endObject();
builder.endObject();
client.prepareIndex(riverIndexName, riverName.name(), "_status")
.setConsistencyLevel(WriteConsistencyLevel.ONE)
.setSource(builder).execute().actionGet();
} catch (Exception e) {
logger.warn("failed to create river [{}][{}]", e, riverName.type(), riverName.name());
try {
XContentBuilder builder = XContentFactory.jsonBuilder().startObject();
builder.field("error", ExceptionsHelper.detailedMessage(e));
builder.startObject("node");
builder.field("id", clusterService.localNode().id());
builder.field("name", clusterService.localNode().name());
builder.field("transport_address", clusterService.localNode().address().toString());
builder.endObject();
builder.endObject();
client.prepareIndex(riverIndexName, riverName.name(), "_status")
.setConsistencyLevel(WriteConsistencyLevel.ONE)
.setSource(builder).execute().actionGet();
} catch (Exception e1) {
logger.warn("failed to write failed status for river creation", e);
}
}
}
public synchronized void closeRiver(RiverName riverName) {
Injector riverInjector;
River river;
synchronized (this) {
riverInjector = riversInjectors.remove(riverName);
if (riverInjector == null) {
throw new RiverException(riverName, "missing");
}
logger.debug("closing river [{}][{}]", riverName.type(), riverName.name());
Map<RiverName, River> tmpMap = Maps.newHashMap(rivers);
river = tmpMap.remove(riverName);
rivers = ImmutableMap.copyOf(tmpMap);
}
river.close();
}
private class ApplyRivers implements RiverClusterStateListener {
@Override
public void riverClusterChanged(RiverClusterChangedEvent event) {
DiscoveryNode localNode = clusterService.localNode();
RiverClusterState state = event.state();
// first, go over and delete ones that either don't exists or are not allocated
for (final RiverName riverName : rivers.keySet()) {
RiverRouting routing = state.routing().routing(riverName);
if (routing == null || !localNode.equals(routing.node())) {
// not routed at all, and not allocated here, clean it (we delete the relevant ones before)
closeRiver(riverName);
}
}
for (final RiverRouting routing : state.routing()) {
// not allocated
if (routing.node() == null) {
logger.trace("river {} has no routing node", routing.riverName().getName());
continue;
}
// only apply changes to the local node
if (!routing.node().equals(localNode)) {
logger.trace("river {} belongs to node {}", routing.riverName().getName(), routing.node());
continue;
}
// if its already created, ignore it
if (rivers.containsKey(routing.riverName())) {
logger.trace("river {} is already allocated", routing.riverName().getName());
continue;
}
prepareGetMetaDocument(routing.riverName().name()).execute(new ThreadedActionListener<>(logger, threadPool, ThreadPool.Names.LISTENER, new ActionListener<GetResponse>() {
@Override
public void onResponse(GetResponse getResponse) {
if (!rivers.containsKey(routing.riverName())) {
if (getResponse.isExists()) {
// only create the river if it exists, otherwise, the indexing meta data has not been visible yet...
createRiver(routing.riverName(), getResponse.getSourceAsMap());
} else {
//this should never happen as we've just found the _meta document in RiversRouter
logger.warn("{}/{}/_meta document not found", riverIndexName, routing.riverName().getName());
}
}
}
@Override
public void onFailure(Throwable e) {
// if its this is a failure that need to be retried, then do it
// this might happen if the state of the river index has not been propagated yet to this node, which
// should happen pretty fast since we managed to get the _meta in the RiversRouter
Throwable failure = ExceptionsHelper.unwrapCause(e);
if (isShardNotAvailableException(failure)) {
logger.debug("failed to get _meta from [{}]/[{}], retrying...", e, routing.riverName().type(), routing.riverName().name());
final ActionListener<GetResponse> listener = this;
try {
threadPool.schedule(TimeValue.timeValueSeconds(5), ThreadPool.Names.LISTENER, new Runnable() {
@Override
public void run() {
prepareGetMetaDocument(routing.riverName().name()).execute(listener);
}
});
} catch (EsRejectedExecutionException ex) {
logger.debug("Couldn't schedule river start retry, node might be shutting down", ex);
}
} else {
logger.warn("failed to get _meta from [{}]/[{}]", e, routing.riverName().type(), routing.riverName().name());
}
}
}));
}
}
private GetRequestBuilder prepareGetMetaDocument(String riverName) {
return client.prepareGet(riverIndexName, riverName, "_meta").setPreference("_primary");
}
}
}

View File

@ -1,39 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.river;
import com.google.common.collect.ImmutableMap;
import org.elasticsearch.common.inject.Module;
/**
* A type registry for rivers
*/
public class RiversTypesRegistry {
private final ImmutableMap<String, Class<? extends Module>> riverTypes;
public RiversTypesRegistry(ImmutableMap<String, Class<? extends Module>> riverTypes) {
this.riverTypes = riverTypes;
}
public Class<? extends Module> type(String type) {
return riverTypes.get(type);
}
}

View File

@ -1,121 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.river.cluster;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.*;
import java.io.IOException;
/**
*
*/
public class PublishRiverClusterStateAction extends AbstractComponent {
public static final String ACTION_NAME = "internal:river/state/publish";
public interface NewClusterStateListener {
void onNewClusterState(RiverClusterState clusterState);
}
private final TransportService transportService;
private final ClusterService clusterService;
private final NewClusterStateListener listener;
public PublishRiverClusterStateAction(Settings settings, TransportService transportService, ClusterService clusterService,
NewClusterStateListener listener) {
super(settings);
this.transportService = transportService;
this.clusterService = clusterService;
this.listener = listener;
transportService.registerRequestHandler(ACTION_NAME, PublishClusterStateRequest.class, ThreadPool.Names.SAME, new PublishClusterStateRequestHandler());
}
public void close() {
transportService.removeHandler(ACTION_NAME);
}
public void publish(RiverClusterState clusterState) {
final DiscoveryNodes discoNodes = clusterService.state().nodes();
final DiscoveryNode localNode = discoNodes.localNode();
for (final DiscoveryNode node : discoNodes) {
if (node.equals(localNode)) {
// no need to send to our self
continue;
}
// we only want to send nodes that are either possible master nodes or river nodes
// master nodes because they will handle the state and the allocation of rivers
// and river nodes since they will end up creating indexes
if (!node.masterNode() && !RiverNodeHelper.isRiverNode(node)) {
continue;
}
transportService.sendRequest(node, ACTION_NAME, new PublishClusterStateRequest(clusterState), new EmptyTransportResponseHandler(ThreadPool.Names.SAME) {
@Override
public void handleException(TransportException exp) {
logger.debug("failed to send cluster state to [{}], should be detected as failed soon...", exp, node);
}
});
}
}
static class PublishClusterStateRequest extends TransportRequest {
private RiverClusterState clusterState;
PublishClusterStateRequest() {
}
private PublishClusterStateRequest(RiverClusterState clusterState) {
this.clusterState = clusterState;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
clusterState = RiverClusterState.Builder.readFrom(in);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
RiverClusterState.Builder.writeTo(clusterState, out);
}
}
private class PublishClusterStateRequestHandler implements TransportRequestHandler<PublishClusterStateRequest> {
@Override
public void messageReceived(PublishClusterStateRequest request, TransportChannel channel) throws Exception {
listener.onNewClusterState(request.clusterState);
channel.sendResponse(TransportResponse.Empty.INSTANCE);
}
}
}

View File

@ -1,53 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.river.cluster;
/**
*
*/
public class RiverClusterChangedEvent {
private final String source;
private final RiverClusterState previousState;
private final RiverClusterState state;
public RiverClusterChangedEvent(String source, RiverClusterState state, RiverClusterState previousState) {
this.source = source;
this.state = state;
this.previousState = previousState;
}
/**
* The source that caused this cluster event to be raised.
*/
public String source() {
return this.source;
}
public RiverClusterState state() {
return this.state;
}
public RiverClusterState previousState() {
return this.previousState;
}
}

View File

@ -1,170 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.river.cluster;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.transport.TransportService;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import static java.util.concurrent.Executors.newSingleThreadExecutor;
import static org.elasticsearch.common.util.concurrent.EsExecutors.daemonThreadFactory;
/**
*
*/
public class RiverClusterService extends AbstractLifecycleComponent<RiverClusterService> {
private final ClusterService clusterService;
private final PublishRiverClusterStateAction publishAction;
private final List<RiverClusterStateListener> clusterStateListeners = new CopyOnWriteArrayList<>();
private volatile ExecutorService updateTasksExecutor;
private volatile RiverClusterState clusterState = RiverClusterState.builder().build();
@Inject
public RiverClusterService(Settings settings, TransportService transportService, ClusterService clusterService) {
super(settings);
this.clusterService = clusterService;
this.publishAction = new PublishRiverClusterStateAction(settings, transportService, clusterService, new UpdateClusterStateListener());
}
@Override
protected void doStart() {
this.updateTasksExecutor = newSingleThreadExecutor(daemonThreadFactory(settings, "riverClusterService#updateTask"));
}
@Override
protected void doStop() {
updateTasksExecutor.shutdown();
try {
updateTasksExecutor.awaitTermination(10, TimeUnit.SECONDS);
} catch (InterruptedException e) {
// ignore
}
}
@Override
protected void doClose() {
}
public void add(RiverClusterStateListener listener) {
clusterStateListeners.add(listener);
}
public void remove(RiverClusterStateListener listener) {
clusterStateListeners.remove(listener);
}
/**
* The current state.
*/
public ClusterState state() {
return clusterService.state();
}
public void submitStateUpdateTask(final String source, final RiverClusterStateUpdateTask updateTask) {
if (!lifecycle.started()) {
return;
}
updateTasksExecutor.execute(new Runnable() {
@Override
public void run() {
if (!lifecycle.started()) {
logger.debug("processing [{}]: ignoring, cluster_service not started", source);
return;
}
logger.debug("processing [{}]: execute", source);
RiverClusterState previousClusterState = clusterState;
try {
clusterState = updateTask.execute(previousClusterState);
} catch (Exception e) {
StringBuilder sb = new StringBuilder("failed to execute cluster state update, state:\nversion [").append(clusterState.version()).append("], source [").append(source).append("]\n");
logger.warn(sb.toString(), e);
return;
}
if (previousClusterState != clusterState) {
if (clusterService.state().nodes().localNodeMaster()) {
// only the master controls the version numbers
clusterState = new RiverClusterState(clusterState.version() + 1, clusterState);
} else {
// we got this cluster state from the master, filter out based on versions (don't call listeners)
if (clusterState.version() < previousClusterState.version()) {
logger.debug("got old cluster state [" + clusterState.version() + "<" + previousClusterState.version() + "] from source [" + source + "], ignoring");
return;
}
}
if (logger.isTraceEnabled()) {
StringBuilder sb = new StringBuilder("cluster state updated:\nversion [").append(clusterState.version()).append("], source [").append(source).append("]\n");
logger.trace(sb.toString());
} else if (logger.isDebugEnabled()) {
logger.debug("cluster state updated, version [{}], source [{}]", clusterState.version(), source);
}
RiverClusterChangedEvent clusterChangedEvent = new RiverClusterChangedEvent(source, clusterState, previousClusterState);
for (RiverClusterStateListener listener : clusterStateListeners) {
listener.riverClusterChanged(clusterChangedEvent);
}
// if we are the master, publish the new state to all nodes
if (clusterService.state().nodes().localNodeMaster()) {
publishAction.publish(clusterState);
}
logger.debug("processing [{}]: done applying updated cluster_state", source);
} else {
logger.debug("processing [{}]: no change in cluster_state", source);
}
}
});
}
private class UpdateClusterStateListener implements PublishRiverClusterStateAction.NewClusterStateListener {
@Override
public void onNewClusterState(final RiverClusterState clusterState) {
ClusterState state = clusterService.state();
if (state.nodes().localNodeMaster()) {
logger.warn("master should not receive new cluster state from [{}]", state.nodes().masterNode());
return;
}
submitStateUpdateTask("received_state", new RiverClusterStateUpdateTask() {
@Override
public RiverClusterState execute(RiverClusterState currentState) {
return clusterState;
}
});
}
}
}

View File

@ -1,96 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.river.cluster;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.river.routing.RiversRouting;
import java.io.IOException;
/**
*
*/
public class RiverClusterState {
private final long version;
private final RiversRouting routing;
public RiverClusterState(long version, RiverClusterState state) {
this.version = version;
this.routing = state.routing();
}
RiverClusterState(long version, RiversRouting routing) {
this.version = version;
this.routing = routing;
}
public long version() {
return this.version;
}
public RiversRouting routing() {
return routing;
}
public static Builder builder() {
return new Builder();
}
public static class Builder {
private long version = 0;
private RiversRouting routing = RiversRouting.EMPTY;
public Builder state(RiverClusterState state) {
this.version = state.version();
this.routing = state.routing();
return this;
}
public Builder routing(RiversRouting.Builder builder) {
return routing(builder.build());
}
public Builder routing(RiversRouting routing) {
this.routing = routing;
return this;
}
public RiverClusterState build() {
return new RiverClusterState(version, routing);
}
public static RiverClusterState readFrom(StreamInput in) throws IOException {
Builder builder = new Builder();
builder.version = in.readVLong();
builder.routing = RiversRouting.Builder.readFrom(in);
return builder.build();
}
public static void writeTo(RiverClusterState clusterState, StreamOutput out) throws IOException {
out.writeVLong(clusterState.version);
RiversRouting.Builder.writeTo(clusterState.routing, out);
}
}
}

View File

@ -1,28 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.river.cluster;
/**
*
*/
public interface RiverClusterStateListener {
void riverClusterChanged(RiverClusterChangedEvent event);
}

View File

@ -1,28 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.river.cluster;
/**
*
*/
public interface RiverClusterStateUpdateTask {
RiverClusterState execute(RiverClusterState currentState);
}

View File

@ -1,55 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.river.cluster;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.river.RiverName;
/**
*
*/
public class RiverNodeHelper {
public static boolean isRiverNode(DiscoveryNode node) {
// we don't allocate rivers on client nodes
if (node.clientNode()) {
return false;
}
String river = node.attributes().get("river");
// by default, if not set, it's a river node (better OOB exp)
if (river == null) {
return true;
}
if ("_none_".equals(river)) {
return false;
}
// there is at least one river settings, we need it
return true;
}
public static boolean isRiverNode(DiscoveryNode node, RiverName riverName) {
if (!isRiverNode(node)) {
return false;
}
String river = node.attributes().get("river");
// by default, if not set, its an river node (better OOB exp)
return river == null || river.contains(riverName.type()) || river.contains(riverName.name());
}
}

View File

@ -1,48 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.river.dummy;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.river.AbstractRiverComponent;
import org.elasticsearch.river.River;
import org.elasticsearch.river.RiverName;
import org.elasticsearch.river.RiverSettings;
/**
*
*/
public class DummyRiver extends AbstractRiverComponent implements River {
@Inject
public DummyRiver(RiverName riverName, RiverSettings settings) {
super(riverName, settings);
logger.info("create");
}
@Override
public void start() {
logger.info("start");
}
@Override
public void close() {
logger.info("close");
}
}

View File

@ -1,34 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.river.dummy;
import org.elasticsearch.common.inject.AbstractModule;
import org.elasticsearch.river.River;
/**
*
*/
public class DummyRiverModule extends AbstractModule {
@Override
protected void configure() {
bind(River.class).to(DummyRiver.class).asEagerSingleton();
}
}

View File

@ -1,87 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.river.routing;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.river.RiverName;
import java.io.IOException;
/**
*
*/
public class RiverRouting implements Streamable {
private RiverName riverName;
private DiscoveryNode node;
private RiverRouting() {
}
RiverRouting(RiverName riverName, DiscoveryNode node) {
this.riverName = riverName;
this.node = node;
}
public RiverName riverName() {
return riverName;
}
/**
* The node the river is allocated to, <tt>null</tt> if its not allocated.
*/
public DiscoveryNode node() {
return node;
}
void node(DiscoveryNode node) {
this.node = node;
}
public static RiverRouting readRiverRouting(StreamInput in) throws IOException {
RiverRouting routing = new RiverRouting();
routing.readFrom(in);
return routing;
}
@Override
public void readFrom(StreamInput in) throws IOException {
riverName = new RiverName(in.readString(), in.readString());
if (in.readBoolean()) {
node = DiscoveryNode.readNode(in);
}
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeString(riverName.type());
out.writeString(riverName.name());
if (node == null) {
out.writeBoolean(false);
} else {
out.writeBoolean(true);
node.writeTo(out);
}
}
}

View File

@ -1,255 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.river.routing;
import com.carrotsearch.hppc.cursors.ObjectCursor;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.elasticsearch.action.NoShardAvailableActionException;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateListener;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MappingMetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.CountDown;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.common.xcontent.support.XContentMapValues;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.shard.IllegalIndexShardStateException;
import org.elasticsearch.indices.IndexMissingException;
import org.elasticsearch.river.RiverIndexName;
import org.elasticsearch.river.RiverName;
import org.elasticsearch.river.cluster.RiverClusterService;
import org.elasticsearch.river.cluster.RiverClusterState;
import org.elasticsearch.river.cluster.RiverClusterStateUpdateTask;
import org.elasticsearch.river.cluster.RiverNodeHelper;
import org.elasticsearch.threadpool.ThreadPool;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
/**
*
*/
public class RiversRouter extends AbstractLifecycleComponent<RiversRouter> implements ClusterStateListener {
private static final TimeValue RIVER_START_RETRY_INTERVAL = TimeValue.timeValueMillis(1000);
private static final int RIVER_START_MAX_RETRIES = 5;
private final String riverIndexName;
private final Client client;
private final RiverClusterService riverClusterService;
private final ThreadPool threadPool;
@Inject
public RiversRouter(Settings settings, Client client, ClusterService clusterService, RiverClusterService riverClusterService, ThreadPool threadPool) {
super(settings);
this.riverIndexName = RiverIndexName.Conf.indexName(settings);
this.riverClusterService = riverClusterService;
this.client = client;
this.threadPool = threadPool;
clusterService.add(this);
}
@Override
protected void doStart() {
}
@Override
protected void doStop() {
}
@Override
protected void doClose() {
}
@Override
public void clusterChanged(final ClusterChangedEvent event) {
if (!event.localNodeMaster()) {
return;
}
final String source = "reroute_rivers_node_changed";
//we'll try again a few times if we don't find the river _meta document while the type is there
final CountDown countDown = new CountDown(RIVER_START_MAX_RETRIES);
riverClusterService.submitStateUpdateTask(source, new RiverClusterStateUpdateTask() {
@Override
public RiverClusterState execute(RiverClusterState currentState) {
return updateRiverClusterState(source, currentState, event.state(), countDown);
}
});
}
protected RiverClusterState updateRiverClusterState(final String source, final RiverClusterState currentState,
ClusterState newClusterState, final CountDown countDown) {
if (!newClusterState.metaData().hasIndex(riverIndexName)) {
// if there are routings, publish an empty one (so it will be deleted on nodes), otherwise, return the same state
if (!currentState.routing().isEmpty()) {
return RiverClusterState.builder().state(currentState).routing(RiversRouting.builder()).build();
}
return currentState;
}
RiversRouting.Builder routingBuilder = RiversRouting.builder().routing(currentState.routing());
boolean dirty = false;
IndexMetaData indexMetaData = newClusterState.metaData().index(riverIndexName);
boolean metaFound = true;
// go over and create new river routing (with no node) for new types (rivers names)
for (ObjectCursor<MappingMetaData> cursor : indexMetaData.mappings().values()) {
String mappingType = cursor.value.type(); // mapping type is the name of the river
if (MapperService.DEFAULT_MAPPING.equals(mappingType)) {
continue;
}
if (!currentState.routing().hasRiverByName(mappingType)) {
// no river, we need to add it to the routing with no node allocation
try {
GetResponse getResponse = client.prepareGet(riverIndexName, mappingType, "_meta").setPreference("_primary").get();
if (getResponse.isExists()) {
logger.debug("{}/{}/_meta document found.", riverIndexName, mappingType);
String riverType = XContentMapValues.nodeStringValue(getResponse.getSourceAsMap().get("type"), null);
if (riverType == null) {
logger.warn("no river type provided for [{}], ignoring...", riverIndexName);
} else {
routingBuilder.put(new RiverRouting(new RiverName(riverType, mappingType), null));
dirty = true;
}
} else {
// At least one type does not have _meta
metaFound = false;
}
} catch (NoShardAvailableActionException e) {
// ignore, we will get it next time...
} catch (ClusterBlockException e) {
// ignore, we will get it next time
} catch (IndexMissingException e) {
// ignore, we will get it next time
} catch (IllegalIndexShardStateException e) {
// ignore, we will get it next time
} catch (Exception e) {
logger.warn("failed to get/parse _meta for [{}]", e, mappingType);
}
}
}
// At least one type does not have _meta, so we are
// going to reschedule some checks
if (!metaFound) {
if (countDown.countDown()) {
logger.warn("no river _meta document found after {} attempts", RIVER_START_MAX_RETRIES);
} else {
logger.debug("no river _meta document found retrying in {} ms", RIVER_START_RETRY_INTERVAL.millis());
try {
threadPool.schedule(RIVER_START_RETRY_INTERVAL, ThreadPool.Names.GENERIC, new Runnable() {
@Override
public void run() {
riverClusterService.submitStateUpdateTask(source, new RiverClusterStateUpdateTask() {
@Override
public RiverClusterState execute(RiverClusterState currentState) {
return updateRiverClusterState(source, currentState, riverClusterService.state(), countDown);
}
});
}
});
} catch (EsRejectedExecutionException ex) {
logger.debug("Couldn't schedule river start retry, node might be shutting down", ex);
}
}
}
// now, remove routings that were deleted
// also, apply nodes that were removed and rivers were running on
for (RiverRouting routing : currentState.routing()) {
if (!indexMetaData.mappings().containsKey(routing.riverName().name())) {
routingBuilder.remove(routing);
dirty = true;
} else if (routing.node() != null && !newClusterState.nodes().nodeExists(routing.node().id())) {
routingBuilder.remove(routing);
routingBuilder.put(new RiverRouting(routing.riverName(), null));
dirty = true;
}
}
// build a list from nodes to rivers
Map<DiscoveryNode, List<RiverRouting>> nodesToRivers = Maps.newHashMap();
for (DiscoveryNode node : newClusterState.nodes()) {
if (RiverNodeHelper.isRiverNode(node)) {
nodesToRivers.put(node, Lists.<RiverRouting>newArrayList());
}
}
List<RiverRouting> unassigned = Lists.newArrayList();
for (RiverRouting routing : routingBuilder.build()) {
if (routing.node() == null) {
unassigned.add(routing);
} else {
List<RiverRouting> l = nodesToRivers.get(routing.node());
if (l == null) {
l = Lists.newArrayList();
nodesToRivers.put(routing.node(), l);
}
l.add(routing);
}
}
for (Iterator<RiverRouting> it = unassigned.iterator(); it.hasNext(); ) {
RiverRouting routing = it.next();
DiscoveryNode smallest = null;
int smallestSize = Integer.MAX_VALUE;
for (Map.Entry<DiscoveryNode, List<RiverRouting>> entry : nodesToRivers.entrySet()) {
if (RiverNodeHelper.isRiverNode(entry.getKey(), routing.riverName())) {
if (entry.getValue().size() < smallestSize) {
smallestSize = entry.getValue().size();
smallest = entry.getKey();
}
}
}
if (smallest != null) {
dirty = true;
it.remove();
routing.node(smallest);
nodesToRivers.get(smallest).add(routing);
logger.debug("going to allocate river [{}] on node {}", routing.riverName().getName(), smallest);
}
}
// add relocation logic...
if (dirty) {
return RiverClusterState.builder().state(currentState).routing(routingBuilder).build();
}
return currentState;
}
}

View File

@ -1,123 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.river.routing;
import com.google.common.collect.ImmutableMap;
import org.elasticsearch.common.collect.MapBuilder;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.river.RiverName;
import java.io.IOException;
import java.util.Iterator;
/**
*
*/
public class RiversRouting implements Iterable<RiverRouting> {
public static final RiversRouting EMPTY = RiversRouting.builder().build();
private final ImmutableMap<RiverName, RiverRouting> rivers;
private RiversRouting(ImmutableMap<RiverName, RiverRouting> rivers) {
this.rivers = rivers;
}
public boolean isEmpty() {
return rivers.isEmpty();
}
public RiverRouting routing(RiverName riverName) {
return rivers.get(riverName);
}
public boolean hasRiverByName(String name) {
for (RiverName riverName : rivers.keySet()) {
if (riverName.name().equals(name)) {
return true;
}
}
return false;
}
@Override
public Iterator<RiverRouting> iterator() {
return rivers.values().iterator();
}
public static Builder builder() {
return new Builder();
}
public static class Builder {
private MapBuilder<RiverName, RiverRouting> rivers = MapBuilder.newMapBuilder();
public Builder routing(RiversRouting routing) {
rivers.putAll(routing.rivers);
return this;
}
public Builder put(RiverRouting routing) {
rivers.put(routing.riverName(), routing);
return this;
}
public Builder remove(RiverRouting routing) {
rivers.remove(routing.riverName());
return this;
}
public Builder remove(RiverName riverName) {
rivers.remove(riverName);
return this;
}
public Builder remote(String riverName) {
for (RiverName name : rivers.map().keySet()) {
if (name.name().equals(riverName)) {
rivers.remove(name);
}
}
return this;
}
public RiversRouting build() {
return new RiversRouting(rivers.immutableMap());
}
public static RiversRouting readFrom(StreamInput in) throws IOException {
Builder builder = new Builder();
int size = in.readVInt();
for (int i = 0; i < size; i++) {
builder.put(RiverRouting.readRiverRouting(in));
}
return builder.build();
}
public static void writeTo(RiversRouting routing, StreamOutput out) throws IOException {
out.writeVInt(routing.rivers.size());
for (RiverRouting riverRouting : routing) {
riverRouting.writeTo(out);
}
}
}
}

View File

@ -151,7 +151,14 @@ case "$1" in
# Prepare environment
mkdir -p "$LOG_DIR" "$DATA_DIR" && chown "$ES_USER":"$ES_GROUP" "$LOG_DIR" "$DATA_DIR"
touch "$PID_FILE" && chown "$ES_USER":"$ES_GROUP" "$PID_FILE"
# Ensure that the PID_DIR exists (it is cleaned at OS startup time)
if [ -n "$PID_DIR" ] && [ ! -e "$PID_DIR" ]; then
mkdir -p "$PID_DIR" && chown "$ES_USER":"$ES_GROUP" "$PID_DIR"
fi
if [ -n "$PID_FILE" ] && [ ! -e "$PID_FILE" ]; then
touch "$PID_FILE" && chown "$ES_USER":"$ES_GROUP" "$PID_FILE"
fi
if [ -n "$MAX_OPEN_FILES" ]; then
ulimit -n $MAX_OPEN_FILES

View File

@ -99,6 +99,14 @@ start() {
fi
export ES_GC_LOG_FILE
# Ensure that the PID_DIR exists (it is cleaned at OS startup time)
if [ -n "$PID_DIR" ] && [ ! -e "$PID_DIR" ]; then
mkdir -p "$PID_DIR" && chown "$ES_USER":"$ES_GROUP" "$PID_DIR"
fi
if [ -n "$pidfile" ] && [ ! -e "$pidfile" ]; then
touch "$pidfile" && chown "$ES_USER":"$ES_GROUP" "$pidfile"
fi
echo -n $"Starting $prog: "
# if not running, start it up here, usually something like "daemon $exec"
daemon --user $ES_USER --pidfile $pidfile $exec -p $pidfile -d -Des.default.path.home=$ES_HOME -Des.default.path.logs=$LOG_DIR -Des.default.path.data=$DATA_DIR -Des.default.path.conf=$CONF_DIR

View File

@ -211,6 +211,7 @@ public class ShardReplicationTests extends ElasticsearchTestCase {
String primaryNode = null;
String relocatingNode = null;
UnassignedInfo unassignedInfo = null;
if (primaryState != ShardRoutingState.UNASSIGNED) {
if (primaryLocal) {
primaryNode = newNode(0).id();
@ -221,21 +222,26 @@ public class ShardReplicationTests extends ElasticsearchTestCase {
if (primaryState == ShardRoutingState.RELOCATING) {
relocatingNode = selectAndRemove(unassignedNodes);
}
} else {
unassignedInfo = new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, null);
}
indexShardRoutingBuilder.addShard(new ImmutableShardRouting(index, 0, primaryNode, relocatingNode, true, primaryState, 0));
indexShardRoutingBuilder.addShard(new ImmutableShardRouting(index, 0, primaryNode, relocatingNode, null, true, primaryState, 0, unassignedInfo));
for (ShardRoutingState replicaState : replicaStates) {
String replicaNode = null;
relocatingNode = null;
unassignedInfo = null;
if (replicaState != ShardRoutingState.UNASSIGNED) {
assert primaryNode != null : "a replica is assigned but the primary isn't";
replicaNode = selectAndRemove(unassignedNodes);
if (replicaState == ShardRoutingState.RELOCATING) {
relocatingNode = selectAndRemove(unassignedNodes);
}
} else {
unassignedInfo = new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, null);
}
indexShardRoutingBuilder.addShard(
new ImmutableShardRouting(index, shardId.id(), replicaNode, relocatingNode, false, replicaState, 0));
new ImmutableShardRouting(index, shardId.id(), replicaNode, relocatingNode, null, false, replicaState, 0, unassignedInfo));
}
ClusterState.Builder state = ClusterState.builder(new ClusterName("test"));

View File

@ -0,0 +1,256 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.cluster.routing;
import com.carrotsearch.hppc.IntHashSet;
import com.carrotsearch.randomizedtesting.generators.RandomPicks;
import com.google.common.collect.ImmutableList;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.metadata.SnapshotId;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.cluster.routing.allocation.FailedRerouteAllocation;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.test.ElasticsearchAllocationTestCase;
import org.junit.Test;
import static org.elasticsearch.cluster.routing.ShardRoutingState.*;
import static org.hamcrest.Matchers.*;
/**
*/
public class UnassignedInfoTests extends ElasticsearchAllocationTestCase {
@Test
public void testReasonOrdinalOrder() {
UnassignedInfo.Reason[] order = new UnassignedInfo.Reason[]{
UnassignedInfo.Reason.INDEX_CREATED,
UnassignedInfo.Reason.CLUSTER_RECOVERED,
UnassignedInfo.Reason.INDEX_REOPENED,
UnassignedInfo.Reason.DANGLING_INDEX_IMPORTED,
UnassignedInfo.Reason.NEW_INDEX_RESTORED,
UnassignedInfo.Reason.EXISTING_INDEX_RESTORED,
UnassignedInfo.Reason.REPLICA_ADDED,
UnassignedInfo.Reason.ALLOCATION_FAILED,
UnassignedInfo.Reason.NODE_LEFT,
UnassignedInfo.Reason.REROUTE_CANCELLED};
for (int i = 0; i < order.length; i++) {
assertThat(order[i].ordinal(), equalTo(i));
}
assertThat(UnassignedInfo.Reason.values().length, equalTo(order.length));
}
@Test
public void testSerialization() throws Exception {
UnassignedInfo meta = new UnassignedInfo(RandomPicks.randomFrom(getRandom(), UnassignedInfo.Reason.values()), randomBoolean() ? randomAsciiOfLength(4) : null);
BytesStreamOutput out = new BytesStreamOutput();
meta.writeTo(out);
out.close();
UnassignedInfo read = new UnassignedInfo(StreamInput.wrap(out.bytes()));
assertThat(read.getReason(), equalTo(meta.getReason()));
assertThat(read.getTimestampInMillis(), equalTo(meta.getTimestampInMillis()));
assertThat(read.getDetails(), equalTo(meta.getDetails()));
}
@Test
public void testIndexCreated() {
MetaData metaData = MetaData.builder()
.put(IndexMetaData.builder("test").settings(settings(Version.CURRENT)).numberOfShards(randomIntBetween(1, 3)).numberOfReplicas(randomIntBetween(0, 3)))
.build();
ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT)
.metaData(metaData)
.routingTable(RoutingTable.builder().addAsNew(metaData.index("test"))).build();
for (MutableShardRouting shard : clusterState.routingNodes().shardsWithState(UNASSIGNED)) {
assertThat(shard.unassignedInfo().getReason(), equalTo(UnassignedInfo.Reason.INDEX_CREATED));
}
}
@Test
public void testClusterRecovered() {
MetaData metaData = MetaData.builder()
.put(IndexMetaData.builder("test").settings(settings(Version.CURRENT)).numberOfShards(randomIntBetween(1, 3)).numberOfReplicas(randomIntBetween(0, 3)))
.build();
ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT)
.metaData(metaData)
.routingTable(RoutingTable.builder().addAsRecovery(metaData.index("test"))).build();
for (MutableShardRouting shard : clusterState.routingNodes().shardsWithState(UNASSIGNED)) {
assertThat(shard.unassignedInfo().getReason(), equalTo(UnassignedInfo.Reason.CLUSTER_RECOVERED));
}
}
@Test
public void testIndexReopened() {
MetaData metaData = MetaData.builder()
.put(IndexMetaData.builder("test").settings(settings(Version.CURRENT)).numberOfShards(randomIntBetween(1, 3)).numberOfReplicas(randomIntBetween(0, 3)))
.build();
ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT)
.metaData(metaData)
.routingTable(RoutingTable.builder().addAsFromCloseToOpen(metaData.index("test"))).build();
for (MutableShardRouting shard : clusterState.routingNodes().shardsWithState(UNASSIGNED)) {
assertThat(shard.unassignedInfo().getReason(), equalTo(UnassignedInfo.Reason.INDEX_REOPENED));
}
}
@Test
public void testNewIndexRestored() {
MetaData metaData = MetaData.builder()
.put(IndexMetaData.builder("test").settings(settings(Version.CURRENT)).numberOfShards(randomIntBetween(1, 3)).numberOfReplicas(randomIntBetween(0, 3)))
.build();
ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT)
.metaData(metaData)
.routingTable(RoutingTable.builder().addAsNewRestore(metaData.index("test"), new RestoreSource(new SnapshotId("rep1", "snp1"), "test"), new IntHashSet())).build();
for (MutableShardRouting shard : clusterState.routingNodes().shardsWithState(UNASSIGNED)) {
assertThat(shard.unassignedInfo().getReason(), equalTo(UnassignedInfo.Reason.NEW_INDEX_RESTORED));
}
}
@Test
public void testExistingIndexRestored() {
MetaData metaData = MetaData.builder()
.put(IndexMetaData.builder("test").settings(settings(Version.CURRENT)).numberOfShards(randomIntBetween(1, 3)).numberOfReplicas(randomIntBetween(0, 3)))
.build();
ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT)
.metaData(metaData)
.routingTable(RoutingTable.builder().addAsRestore(metaData.index("test"), new RestoreSource(new SnapshotId("rep1", "snp1"), "test"))).build();
for (MutableShardRouting shard : clusterState.routingNodes().shardsWithState(UNASSIGNED)) {
assertThat(shard.unassignedInfo().getReason(), equalTo(UnassignedInfo.Reason.EXISTING_INDEX_RESTORED));
}
}
@Test
public void testDanglingIndexImported() {
MetaData metaData = MetaData.builder()
.put(IndexMetaData.builder("test").settings(settings(Version.CURRENT)).numberOfShards(randomIntBetween(1, 3)).numberOfReplicas(randomIntBetween(0, 3)))
.build();
ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT)
.metaData(metaData)
.routingTable(RoutingTable.builder().addAsFromDangling(metaData.index("test"))).build();
for (MutableShardRouting shard : clusterState.routingNodes().shardsWithState(UNASSIGNED)) {
assertThat(shard.unassignedInfo().getReason(), equalTo(UnassignedInfo.Reason.DANGLING_INDEX_IMPORTED));
}
}
@Test
public void testReplicaAdded() {
AllocationService allocation = createAllocationService();
MetaData metaData = MetaData.builder()
.put(IndexMetaData.builder("test").settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(0))
.build();
ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT)
.metaData(metaData)
.routingTable(RoutingTable.builder().addAsNew(metaData.index("test"))).build();
clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder().put(newNode("node1"))).build();
clusterState = ClusterState.builder(clusterState).routingResult(allocation.reroute(clusterState)).build();
// starting primaries
clusterState = ClusterState.builder(clusterState).routingResult(allocation.applyStartedShards(clusterState, clusterState.routingNodes().shardsWithState(INITIALIZING))).build();
IndexRoutingTable.Builder builder = IndexRoutingTable.builder("test");
for (IndexShardRoutingTable indexShardRoutingTable : clusterState.routingTable().index("test")) {
builder.addIndexShard(indexShardRoutingTable);
}
builder.addReplica();
clusterState = ClusterState.builder(clusterState).routingTable(RoutingTable.builder(clusterState.routingTable()).add(builder)).build();
assertThat(clusterState.routingNodes().shardsWithState(UNASSIGNED).size(), equalTo(1));
assertThat(clusterState.routingNodes().shardsWithState(UNASSIGNED).get(0).unassignedInfo(), notNullValue());
assertThat(clusterState.routingNodes().shardsWithState(UNASSIGNED).get(0).unassignedInfo().getReason(), equalTo(UnassignedInfo.Reason.REPLICA_ADDED));
}
/**
* The unassigned meta is kept when a shard goes to INITIALIZING, but cleared when it moves to STARTED.
*/
@Test
public void testStateTransitionMetaHandling() {
ImmutableShardRouting shard = new ImmutableShardRouting("test", 1, null, null, null, true, ShardRoutingState.UNASSIGNED, 1, new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, null));
MutableShardRouting mutable = new MutableShardRouting(shard);
assertThat(mutable.unassignedInfo(), notNullValue());
mutable.assignToNode("test_node");
assertThat(mutable.state(), equalTo(ShardRoutingState.INITIALIZING));
assertThat(mutable.unassignedInfo(), notNullValue());
mutable.moveToStarted();
assertThat(mutable.state(), equalTo(ShardRoutingState.STARTED));
assertThat(mutable.unassignedInfo(), nullValue());
}
/**
* Tests that during reroute when a node is detected as leaving the cluster, the right unassigned meta is set
*/
@Test
public void testNodeLeave() {
AllocationService allocation = createAllocationService();
MetaData metaData = MetaData.builder()
.put(IndexMetaData.builder("test").settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(1))
.build();
ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT)
.metaData(metaData)
.routingTable(RoutingTable.builder().addAsNew(metaData.index("test"))).build();
clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder().put(newNode("node1")).put(newNode("node2"))).build();
clusterState = ClusterState.builder(clusterState).routingResult(allocation.reroute(clusterState)).build();
// starting primaries
clusterState = ClusterState.builder(clusterState).routingResult(allocation.applyStartedShards(clusterState, clusterState.routingNodes().shardsWithState(INITIALIZING))).build();
// starting replicas
clusterState = ClusterState.builder(clusterState).routingResult(allocation.applyStartedShards(clusterState, clusterState.routingNodes().shardsWithState(INITIALIZING))).build();
assertThat(clusterState.routingNodes().hasUnassigned(), equalTo(false));
// remove node2 and reroute
clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder(clusterState.nodes()).remove("node2")).build();
clusterState = ClusterState.builder(clusterState).routingResult(allocation.reroute(clusterState)).build();
// verify that NODE_LEAVE is the reason for meta
assertThat(clusterState.routingNodes().hasUnassigned(), equalTo(true));
assertThat(clusterState.routingNodes().shardsWithState(UNASSIGNED).size(), equalTo(1));
assertThat(clusterState.routingNodes().shardsWithState(UNASSIGNED).get(0).unassignedInfo(), notNullValue());
assertThat(clusterState.routingNodes().shardsWithState(UNASSIGNED).get(0).unassignedInfo().getReason(), equalTo(UnassignedInfo.Reason.NODE_LEFT));
assertThat(clusterState.routingNodes().shardsWithState(UNASSIGNED).get(0).unassignedInfo().getTimestampInMillis(), greaterThan(0l));
}
/**
* Verifies that when a shard fails, reason is properly set and details are preserved.
*/
@Test
public void testFailedShard() {
AllocationService allocation = createAllocationService();
MetaData metaData = MetaData.builder()
.put(IndexMetaData.builder("test").settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(1))
.build();
ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT)
.metaData(metaData)
.routingTable(RoutingTable.builder().addAsNew(metaData.index("test"))).build();
clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder().put(newNode("node1")).put(newNode("node2"))).build();
clusterState = ClusterState.builder(clusterState).routingResult(allocation.reroute(clusterState)).build();
// starting primaries
clusterState = ClusterState.builder(clusterState).routingResult(allocation.applyStartedShards(clusterState, clusterState.routingNodes().shardsWithState(INITIALIZING))).build();
// starting replicas
clusterState = ClusterState.builder(clusterState).routingResult(allocation.applyStartedShards(clusterState, clusterState.routingNodes().shardsWithState(INITIALIZING))).build();
assertThat(clusterState.routingNodes().hasUnassigned(), equalTo(false));
// fail shard
ShardRouting shardToFail = clusterState.routingNodes().shardsWithState(STARTED).get(0);
clusterState = ClusterState.builder(clusterState).routingResult(allocation.applyFailedShards(clusterState, ImmutableList.of(new FailedRerouteAllocation.FailedShard(shardToFail, "test fail")))).build();
// verify the reason and details
assertThat(clusterState.routingNodes().hasUnassigned(), equalTo(true));
assertThat(clusterState.routingNodes().shardsWithState(UNASSIGNED).size(), equalTo(1));
assertThat(clusterState.routingNodes().shardsWithState(UNASSIGNED).get(0).unassignedInfo(), notNullValue());
assertThat(clusterState.routingNodes().shardsWithState(UNASSIGNED).get(0).unassignedInfo().getReason(), equalTo(UnassignedInfo.Reason.ALLOCATION_FAILED));
assertThat(clusterState.routingNodes().shardsWithState(UNASSIGNED).get(0).unassignedInfo().getDetails(), equalTo("test fail"));
assertThat(clusterState.routingNodes().shardsWithState(UNASSIGNED).get(0).unassignedInfo().getTimestampInMillis(), greaterThan(0l));
}
}

View File

@ -312,21 +312,21 @@ public class FailedShardsRoutingTests extends ElasticsearchAllocationTestCase {
}
int shardsToFail = randomIntBetween(1, numberOfReplicas);
ArrayList<ShardRouting> failedShards = new ArrayList<>();
ArrayList<FailedRerouteAllocation.FailedShard> failedShards = new ArrayList<>();
RoutingNodes routingNodes = clusterState.routingNodes();
for (int i = 0; i < shardsToFail; i++) {
String n = "node" + Integer.toString(randomInt(numberOfReplicas));
logger.info("failing shard on node [{}]", n);
ShardRouting shardToFail = routingNodes.node(n).get(0);
failedShards.add(new MutableShardRouting(shardToFail));
failedShards.add(new FailedRerouteAllocation.FailedShard(new MutableShardRouting(shardToFail), null));
}
routingTable = strategy.applyFailedShards(clusterState, failedShards).routingTable();
clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();
routingNodes = clusterState.routingNodes();
for (ShardRouting failedShard : failedShards) {
if (!routingNodes.node(failedShard.currentNodeId()).isEmpty()) {
for (FailedRerouteAllocation.FailedShard failedShard : failedShards) {
if (!routingNodes.node(failedShard.shard.currentNodeId()).isEmpty()) {
fail("shard " + failedShard + " was re-assigned to it's node");
}
}

View File

@ -29,6 +29,7 @@ import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.ImmutableShardRouting;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.cluster.routing.UnassignedInfo;
import org.elasticsearch.common.transport.LocalTransportAddress;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.test.ElasticsearchTestCase;
@ -95,7 +96,11 @@ public class IndicesStoreTests extends ElasticsearchTestCase {
} else {
state = randomFrom(ShardRoutingState.values());
}
routingTable.addShard(new ImmutableShardRouting("test", i, "xyz", null, j == 0, state, 0));
UnassignedInfo unassignedInfo = null;
if (state == ShardRoutingState.UNASSIGNED) {
unassignedInfo = new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, null);
}
routingTable.addShard(new ImmutableShardRouting("test", i, "xyz", null, null, j == 0, state, 0, unassignedInfo));
}
}
assertFalse(indicesStore.shardCanBeDeleted(clusterState.build(), routingTable.build()));

View File

@ -1,168 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.river;
import com.google.common.base.Predicate;
import org.apache.lucene.util.LuceneTestCase.AwaitsFix;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.get.MultiGetItemResponse;
import org.elasticsearch.action.get.MultiGetRequestBuilder;
import org.elasticsearch.action.get.MultiGetResponse;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.river.dummy.DummyRiverModule;
import org.elasticsearch.test.ElasticsearchIntegrationTest;
import org.elasticsearch.test.ElasticsearchIntegrationTest.ClusterScope;
import org.junit.Test;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import static org.hamcrest.Matchers.equalTo;
@ClusterScope(scope = ElasticsearchIntegrationTest.Scope.SUITE)
@AwaitsFix(bugUrl="occasionally fails apparently due to synchronous mappings updates")
public class RiverTests extends ElasticsearchIntegrationTest {
@Override
protected void beforeIndexDeletion() {
}
@Test
public void testRiverStart() throws Exception {
startAndCheckRiverIsStarted("dummy-river-test");
}
@Test
public void testMultipleRiversStart() throws Exception {
int nbRivers = between(2,10);
logger.info("--> testing with {} rivers...", nbRivers);
Thread[] riverCreators = new Thread[nbRivers];
final CountDownLatch latch = new CountDownLatch(nbRivers);
final MultiGetRequestBuilder multiGetRequestBuilder = client().prepareMultiGet();
for (int i = 0; i < nbRivers; i++) {
final String riverName = "dummy-river-test-" + i;
riverCreators[i] = new Thread() {
@Override
public void run() {
try {
startRiver(riverName);
} catch (Throwable t) {
logger.warn("failed to register river {}", t, riverName);
} finally {
latch.countDown();
}
}
};
riverCreators[i].start();
multiGetRequestBuilder.add(RiverIndexName.Conf.DEFAULT_INDEX_NAME, riverName, "_status");
}
latch.await();
logger.info("--> checking that all rivers were created");
assertThat(awaitBusy(new Predicate<Object>() {
@Override
public boolean apply(Object obj) {
MultiGetResponse multiGetItemResponse = multiGetRequestBuilder.get();
for (MultiGetItemResponse getItemResponse : multiGetItemResponse) {
if (getItemResponse.isFailed() || !getItemResponse.getResponse().isExists()) {
return false;
}
}
return true;
}
}, 5, TimeUnit.SECONDS), equalTo(true));
}
/**
* Test case for https://github.com/elasticsearch/elasticsearch/issues/4577
* River does not start when using config/templates files
*/
@Test
public void startDummyRiverWithDefaultTemplate() throws Exception {
logger.info("--> create empty template");
client().admin().indices().preparePutTemplate("template_1")
.setTemplate("*")
.setOrder(0)
.addMapping(MapperService.DEFAULT_MAPPING,
JsonXContent.contentBuilder().startObject().startObject(MapperService.DEFAULT_MAPPING)
.endObject().endObject())
.get();
startAndCheckRiverIsStarted("dummy-river-default-template-test");
}
/**
* Test case for https://github.com/elasticsearch/elasticsearch/issues/4577
* River does not start when using config/templates files
*/
@Test
public void startDummyRiverWithSomeTemplates() throws Exception {
logger.info("--> create some templates");
client().admin().indices().preparePutTemplate("template_1")
.setTemplate("*")
.setOrder(0)
.addMapping(MapperService.DEFAULT_MAPPING,
JsonXContent.contentBuilder().startObject().startObject(MapperService.DEFAULT_MAPPING)
.endObject().endObject())
.get();
client().admin().indices().preparePutTemplate("template_2")
.setTemplate("*")
.setOrder(0)
.addMapping("atype",
JsonXContent.contentBuilder().startObject().startObject("atype")
.endObject().endObject())
.get();
startAndCheckRiverIsStarted("dummy-river-template-test");
}
/**
* Create a Dummy river then check it has been started. We will fail after 5 seconds.
* @param riverName Dummy river needed to be started
*/
private void startAndCheckRiverIsStarted(final String riverName) throws InterruptedException {
startRiver(riverName);
checkRiverIsStarted(riverName);
}
private void startRiver(final String riverName) {
logger.info("--> starting river [{}]", riverName);
IndexResponse indexResponse = client().prepareIndex(RiverIndexName.Conf.DEFAULT_INDEX_NAME, riverName, "_meta")
.setSource("type", DummyRiverModule.class.getCanonicalName()).get();
assertTrue(indexResponse.isCreated());
ensureGreen();
}
private void checkRiverIsStarted(final String riverName) throws InterruptedException {
logger.info("--> checking that river [{}] was created", riverName);
assertThat(awaitBusy(new Predicate<Object>() {
@Override
public boolean apply(Object obj) {
GetResponse response = client().prepareGet(RiverIndexName.Conf.DEFAULT_INDEX_NAME, riverName, "_status").get();
return response.isExists();
}
}, 5, TimeUnit.SECONDS), equalTo(true));
}
}

View File

@ -21,6 +21,7 @@ package org.elasticsearch.search.matchedqueries;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.test.ElasticsearchIntegrationTest;
import org.junit.Test;
@ -204,6 +205,116 @@ public class MatchedQueriesTests extends ElasticsearchIntegrationTest {
}
}
@Test
public void testRegExpQuerySupportsName() {
createIndex("test1");
ensureGreen();
client().prepareIndex("test1", "type1", "1").setSource("title", "title1").get();
refresh();
SearchResponse searchResponse = client().prepareSearch()
.setQuery(QueryBuilders.regexpQuery("title", "title1").queryName("regex")).get();
assertHitCount(searchResponse, 1l);
for (SearchHit hit : searchResponse.getHits()) {
if (hit.id().equals("1")) {
assertThat(hit.matchedQueries().length, equalTo(1));
assertThat(hit.matchedQueries(), hasItemInArray("regex"));
} else {
fail("Unexpected document returned with id " + hit.id());
}
}
}
@Test
public void testPrefixQuerySupportsName() {
createIndex("test1");
ensureGreen();
client().prepareIndex("test1", "type1", "1").setSource("title", "title1").get();
refresh();
SearchResponse searchResponse = client().prepareSearch()
.setQuery(QueryBuilders.prefixQuery("title", "title").queryName("prefix")).get();
assertHitCount(searchResponse, 1l);
for (SearchHit hit : searchResponse.getHits()) {
if (hit.id().equals("1")) {
assertThat(hit.matchedQueries().length, equalTo(1));
assertThat(hit.matchedQueries(), hasItemInArray("prefix"));
} else {
fail("Unexpected document returned with id " + hit.id());
}
}
}
@Test
public void testFuzzyQuerySupportsName() {
createIndex("test1");
ensureGreen();
client().prepareIndex("test1", "type1", "1").setSource("title", "title1").get();
refresh();
SearchResponse searchResponse = client().prepareSearch()
.setQuery(QueryBuilders.fuzzyQuery("title", "titel1").queryName("fuzzy")).get();
assertHitCount(searchResponse, 1l);
for (SearchHit hit : searchResponse.getHits()) {
if (hit.id().equals("1")) {
assertThat(hit.matchedQueries().length, equalTo(1));
assertThat(hit.matchedQueries(), hasItemInArray("fuzzy"));
} else {
fail("Unexpected document returned with id " + hit.id());
}
}
}
@Test
public void testWildcardQuerySupportsName() {
createIndex("test1");
ensureGreen();
client().prepareIndex("test1", "type1", "1").setSource("title", "title1").get();
refresh();
SearchResponse searchResponse = client().prepareSearch()
.setQuery(QueryBuilders.wildcardQuery("title", "titl*").queryName("wildcard")).get();
assertHitCount(searchResponse, 1l);
for (SearchHit hit : searchResponse.getHits()) {
if (hit.id().equals("1")) {
assertThat(hit.matchedQueries().length, equalTo(1));
assertThat(hit.matchedQueries(), hasItemInArray("wildcard"));
} else {
fail("Unexpected document returned with id " + hit.id());
}
}
}
@Test
public void testSpanFirstQuerySupportsName() {
createIndex("test1");
ensureGreen();
client().prepareIndex("test1", "type1", "1").setSource("title", "title1 title2").get();
refresh();
SearchResponse searchResponse = client().prepareSearch()
.setQuery(QueryBuilders.spanFirstQuery(QueryBuilders.spanTermQuery("title", "title1"), 10).queryName("span")).get();
assertHitCount(searchResponse, 1l);
for (SearchHit hit : searchResponse.getHits()) {
if (hit.id().equals("1")) {
assertThat(hit.matchedQueries().length, equalTo(1));
assertThat(hit.matchedQueries(), hasItemInArray("span"));
} else {
fail("Unexpected document returned with id " + hit.id());
}
}
}
/**
* Test case for issue #4361: https://github.com/elasticsearch/elasticsearch/issues/4361
*/

View File

@ -144,3 +144,27 @@ setup() {
run systemctl status elasticsearch.service
echo "$output" | grep "Active:" | grep "inactive"
}
# Simulates the behavior of a system restart:
# the PID directory is deleted by the operating system
# but it should not block ES from starting
# see https://github.com/elastic/elasticsearch/issues/11594
@test "[SYSTEMD] delete PID_DIR and restart" {
skip_not_systemd
run rm -rf /var/run/elasticsearch
[ "$status" -eq 0 ]
run systemd-tmpfiles --create
[ "$status" -eq 0 ]
run systemctl start elasticsearch.service
[ "$status" -eq 0 ]
wait_for_elasticsearch_status
assert_file_exist "/var/run/elasticsearch/elasticsearch.pid"
run systemctl stop elasticsearch.service
[ "$status" -eq 0 ]
}

View File

@ -0,0 +1,123 @@
#!/usr/bin/env bats
# This file is used to test the elasticsearch init.d scripts.
# WARNING: This testing file must be executed as root and can
# dramatically change your system. It removes the 'elasticsearch'
# user/group and also many directories. Do not execute this file
# unless you know exactly what you are doing.
# The test case can be executed with the Bash Automated
# Testing System tool available at https://github.com/sstephenson/bats
# Thanks to Sam Stephenson!
# Licensed to Elasticsearch under one or more contributor
# license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright
# ownership. Elasticsearch licenses this file to you under
# the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
# Load test utilities
load packaging_test_utils
# Cleans everything for the 1st execution
setup() {
if [ "$BATS_TEST_NUMBER" -eq 1 ]; then
clean_before_test
fi
# Installs a package before test
if is_dpkg; then
dpkg -i elasticsearch*.deb >&2 || true
fi
if is_rpm; then
rpm -i elasticsearch*.rpm >&2 || true
fi
}
@test "[INIT.D] start" {
skip_not_sysvinit
run service elasticsearch start
[ "$status" -eq 0 ]
wait_for_elasticsearch_status
assert_file_exist "/var/run/elasticsearch/elasticsearch.pid"
}
@test "[INIT.D] status (running)" {
skip_not_sysvinit
run service elasticsearch status
[ "$status" -eq 0 ]
}
##################################
# Check that Elasticsearch is working
##################################
@test "[INIT.D] test elasticsearch" {
skip_not_sysvinit
run_elasticsearch_tests
}
@test "[INIT.D] restart" {
skip_not_sysvinit
run service elasticsearch restart
[ "$status" -eq 0 ]
wait_for_elasticsearch_status
run service elasticsearch status
[ "$status" -eq 0 ]
}
@test "[INIT.D] stop (running)" {
skip_not_sysvinit
run service elasticsearch stop
[ "$status" -eq 0 ]
}
@test "[INIT.D] status (stopped)" {
skip_not_sysvinit
run service elasticsearch status
[ "$status" -eq 3 ]
}
# Simulates the behavior of a system restart:
# the PID directory is deleted by the operating system
# but it should not block ES from starting
# see https://github.com/elastic/elasticsearch/issues/11594
@test "[INIT.D] delete PID_DIR and restart" {
skip_not_sysvinit
run rm -rf /var/run/elasticsearch
[ "$status" -eq 0 ]
run service elasticsearch start
[ "$status" -eq 0 ]
wait_for_elasticsearch_status
assert_file_exist "/var/run/elasticsearch/elasticsearch.pid"
run service elasticsearch stop
[ "$status" -eq 0 ]
}

View File

@ -167,7 +167,8 @@ The `refresh` parameter can be set to `true` in order to refresh the relevant
primary and replica shards immediately after the bulk operation has occurred
and make it searchable, instead of waiting for the normal refresh interval to
expire. Setting it to `true` can trigger additional load, and may slow down
indexing.
indexing. Due to its costly nature, the `refresh` parameter is set on the bulk request level
and is not supported on each individual bulk item.
[float]
[[bulk-update]]

View File

@ -9,6 +9,16 @@ your application to Elasticsearch 2.0.
Elasticsearch now binds to the loopback interface by default (usually 127.0.0.1
or ::1), the setting `network.host` can be specified to change this behavior.
=== Rivers removal
Elasticsearch does not support rivers anymore. While we had first planned to
keep them around to ease migration, keeping support for rivers proved to be
challenging as it conflicted with other important changes that we wanted to
bring to 2.0 like synchronous dynamic mappings updates, so we eventually
decided to remove them entirely. See
https://www.elastic.co/blog/deprecating_rivers for more background about why
we are moving away from rivers.
=== Indices API
The <<alias-retrieving, get alias api>> will, by default produce an error response

View File

@ -214,49 +214,6 @@ You can disable that check using `plugins.check_lucene: false`.
* https://github.com/shikhar/eskka[eskka Discovery Plugin] (by Shikhar Bhushan)
* https://github.com/grantr/elasticsearch-srv-discovery[DNS SRV Discovery Plugin] (by Grant Rodgers)
[float]
[[river]]
==== River Plugins
deprecated[1.5.0,Rivers have been deprecated. See https://www.elastic.co/blog/deprecating_rivers for more details]
.Supported by Elasticsearch
* https://github.com/elasticsearch/elasticsearch-river-couchdb[CouchDB River Plugin]
* https://github.com/elasticsearch/elasticsearch-river-rabbitmq[RabbitMQ River Plugin]
* https://github.com/elasticsearch/elasticsearch-river-twitter[Twitter River Plugin]
* https://github.com/elasticsearch/elasticsearch-river-wikipedia[Wikipedia River Plugin]
.Supported by the community
* https://github.com/domdorn/elasticsearch-river-activemq/[ActiveMQ River Plugin] (by Dominik Dorn)
* https://github.com/albogdano/elasticsearch-river-amazonsqs[Amazon SQS River Plugin] (by Alex Bogdanovski)
* https://github.com/xxBedy/elasticsearch-river-csv[CSV River Plugin] (by Martin Bednar)
* http://www.pilato.fr/dropbox/[Dropbox River Plugin] (by David Pilato)
* http://www.pilato.fr/fsriver/[FileSystem River Plugin] (by David Pilato)
* https://github.com/obazoud/elasticsearch-river-git[Git River Plugin] (by Olivier Bazoud)
* https://github.com/uberVU/elasticsearch-river-github[GitHub River Plugin] (by uberVU)
* https://github.com/sksamuel/elasticsearch-river-hazelcast[Hazelcast River Plugin] (by Steve Samuel)
* https://github.com/jprante/elasticsearch-river-jdbc[JDBC River Plugin] (by Jörg Prante)
* https://github.com/qotho/elasticsearch-river-jms[JMS River Plugin] (by Steve Sarandos)
* https://github.com/endgameinc/elasticsearch-river-kafka[Kafka River Plugin] (by Endgame Inc.)
* https://github.com/mariamhakobyan/elasticsearch-river-kafka[Kafka River Plugin 2] (by Mariam Hakobyan)
* https://github.com/tlrx/elasticsearch-river-ldap[LDAP River Plugin] (by Tanguy Leroux)
* https://github.com/richardwilly98/elasticsearch-river-mongodb/[MongoDB River Plugin] (by Richard Louapre)
* https://github.com/sksamuel/elasticsearch-river-neo4j[Neo4j River Plugin] (by Steve Samuel)
* https://github.com/jprante/elasticsearch-river-oai/[Open Archives Initiative (OAI) River Plugin] (by Jörg Prante)
* https://github.com/sksamuel/elasticsearch-river-redis[Redis River Plugin] (by Steve Samuel)
* https://github.com/rethinkdb/elasticsearch-river-rethinkdb[RethinkDB River Plugin] (by RethinkDB)
* http://dadoonet.github.com/rssriver/[RSS River Plugin] (by David Pilato)
* https://github.com/adamlofts/elasticsearch-river-sofa[Sofa River Plugin] (by adamlofts)
* https://github.com/javanna/elasticsearch-river-solr/[Solr River Plugin] (by Luca Cavanna)
* https://github.com/sunnygleason/elasticsearch-river-st9[St9 River Plugin] (by Sunny Gleason)
* https://github.com/plombard/SubversionRiver[Subversion River Plugin] (by Pascal Lombard)
* https://github.com/kzwang/elasticsearch-river-dynamodb[DynamoDB River Plugin] (by Kevin Wang)
* https://github.com/salyh/elasticsearch-river-imap[IMAP/POP3 Email River Plugin] (by Hendrik Saly)
* https://github.com/codelibs/elasticsearch-river-web[Web River Plugin] (by CodeLibs Project)
* https://github.com/eea/eea.elasticsearch.river.rdf[EEA ElasticSearch RDF River Plugin] (by the European Environment Agency)
* https://github.com/lbroudoux/es-amazon-s3-river[Amazon S3 River Plugin] (by Laurent Broudoux)
* https://github.com/lbroudoux/es-google-drive-river[Google Drive River Plugin] (by Laurent Broudoux)
[float]
[[transport]]
==== Transport Plugins

View File

@ -15,6 +15,10 @@
ip .+ \n
id .+ \n
node .+ \n
unassigned.reason .+ \n
unassigned.at .+ \n
unassigned.for .+ \n
unassigned.details .+ \n
completion.size .+ \n
fielddata.memory_size .+ \n
fielddata.evictions .+ \n