Local Gateway: Store specific index metadata under dedicated index locations, closes #1631.
This commit is contained in:
parent
62809bb62a
commit
942b427940
|
@ -234,7 +234,8 @@
|
|||
# gateway.type: local
|
||||
|
||||
# Settings below control how and when to start the initial recovery process on
|
||||
# a full cluster restart (to reuse as much local data as possible).
|
||||
# a full cluster restart (to reuse as much local data as possible when using shared
|
||||
# gateway).
|
||||
|
||||
# Allow recovery process after N nodes in a cluster are up:
|
||||
#
|
||||
|
@ -246,7 +247,8 @@
|
|||
# gateway.recover_after_time: 5m
|
||||
|
||||
# Set how many nodes are expected in this cluster. Once these N nodes
|
||||
# are up, begin recovery process immediately:
|
||||
# are up (and recover_after_nodes is met), begin recovery process immediately
|
||||
# (without waiting for recover_after_time to expire):
|
||||
#
|
||||
# gateway.expected_nodes: 2
|
||||
|
||||
|
@ -284,7 +286,7 @@
|
|||
|
||||
# Set to ensure a node sees N other master eligible nodes to be considered
|
||||
# operational within the cluster. Set this option to a higher value (2-4)
|
||||
# for large clusters:
|
||||
# for large clusters (>3 nodes):
|
||||
#
|
||||
# discovery.zen.minimum_master_nodes: 1
|
||||
|
||||
|
|
|
@ -99,7 +99,7 @@ public class TransportClusterStateAction extends TransportMasterNodeOperationAct
|
|||
for (String filteredIndex : indices) {
|
||||
IndexMetaData indexMetaData = currentState.metaData().index(filteredIndex);
|
||||
if (indexMetaData != null) {
|
||||
mdBuilder.put(indexMetaData);
|
||||
mdBuilder.put(indexMetaData, false);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -117,6 +117,7 @@ public class IndexMetaData {
|
|||
public static final String SETTING_READ_ONLY = "index.blocks.read_only";
|
||||
|
||||
private final String index;
|
||||
private final long version;
|
||||
|
||||
private final State state;
|
||||
|
||||
|
@ -131,10 +132,11 @@ public class IndexMetaData {
|
|||
private final DiscoveryNodeFilters includeFilters;
|
||||
private final DiscoveryNodeFilters excludeFilters;
|
||||
|
||||
private IndexMetaData(String index, State state, Settings settings, ImmutableMap<String, MappingMetaData> mappings, ImmutableMap<String, AliasMetaData> aliases) {
|
||||
private IndexMetaData(String index, long version, State state, Settings settings, ImmutableMap<String, MappingMetaData> mappings, ImmutableMap<String, AliasMetaData> aliases) {
|
||||
Preconditions.checkArgument(settings.getAsInt(SETTING_NUMBER_OF_SHARDS, -1) != -1, "must specify numberOfShards for index [" + index + "]");
|
||||
Preconditions.checkArgument(settings.getAsInt(SETTING_NUMBER_OF_REPLICAS, -1) != -1, "must specify numberOfReplicas for index [" + index + "]");
|
||||
this.index = index;
|
||||
this.version = version;
|
||||
this.state = state;
|
||||
this.settings = settings;
|
||||
this.mappings = mappings;
|
||||
|
@ -164,6 +166,14 @@ public class IndexMetaData {
|
|||
return index();
|
||||
}
|
||||
|
||||
public long version() {
|
||||
return this.version;
|
||||
}
|
||||
|
||||
public long getVersion() {
|
||||
return this.version;
|
||||
}
|
||||
|
||||
public State state() {
|
||||
return this.state;
|
||||
}
|
||||
|
@ -274,6 +284,8 @@ public class IndexMetaData {
|
|||
|
||||
private State state = State.OPEN;
|
||||
|
||||
private long version = 1;
|
||||
|
||||
private Settings settings = ImmutableSettings.Builder.EMPTY_SETTINGS;
|
||||
|
||||
private MapBuilder<String, MappingMetaData> mappings = MapBuilder.newMapBuilder();
|
||||
|
@ -290,6 +302,7 @@ public class IndexMetaData {
|
|||
mappings.putAll(indexMetaData.mappings);
|
||||
aliases.putAll(indexMetaData.aliases);
|
||||
this.state = indexMetaData.state;
|
||||
this.version = indexMetaData.version;
|
||||
}
|
||||
|
||||
public String index() {
|
||||
|
@ -364,6 +377,15 @@ public class IndexMetaData {
|
|||
return this;
|
||||
}
|
||||
|
||||
public long version() {
|
||||
return this.version;
|
||||
}
|
||||
|
||||
public Builder version(long version) {
|
||||
this.version = version;
|
||||
return this;
|
||||
}
|
||||
|
||||
public IndexMetaData build() {
|
||||
MapBuilder<String, AliasMetaData> tmpAliases = aliases;
|
||||
Settings tmpSettings = settings;
|
||||
|
@ -381,12 +403,13 @@ public class IndexMetaData {
|
|||
tmpSettings = ImmutableSettings.settingsBuilder().put(settings).putArray("index.aliases").build();
|
||||
}
|
||||
|
||||
return new IndexMetaData(index, state, tmpSettings, mappings.immutableMap(), tmpAliases.immutableMap());
|
||||
return new IndexMetaData(index, version, state, tmpSettings, mappings.immutableMap(), tmpAliases.immutableMap());
|
||||
}
|
||||
|
||||
public static void toXContent(IndexMetaData indexMetaData, XContentBuilder builder, ToXContent.Params params) throws IOException {
|
||||
builder.startObject(indexMetaData.index(), XContentBuilder.FieldCaseConversion.NONE);
|
||||
|
||||
builder.field("version", indexMetaData.version());
|
||||
builder.field("state", indexMetaData.state().toString().toLowerCase());
|
||||
|
||||
builder.startObject("settings");
|
||||
|
@ -416,6 +439,9 @@ public class IndexMetaData {
|
|||
}
|
||||
|
||||
public static IndexMetaData fromXContent(XContentParser parser) throws IOException {
|
||||
if (parser.currentToken() == XContentParser.Token.START_OBJECT) {
|
||||
parser.nextToken();
|
||||
}
|
||||
Builder builder = new Builder(parser.currentName());
|
||||
|
||||
String currentFieldName = null;
|
||||
|
@ -449,6 +475,8 @@ public class IndexMetaData {
|
|||
} else if (token.isValue()) {
|
||||
if ("state".equals(currentFieldName)) {
|
||||
builder.state(State.fromString(parser.text()));
|
||||
} else if ("version".equals(currentFieldName)) {
|
||||
builder.version(parser.longValue());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -457,6 +485,7 @@ public class IndexMetaData {
|
|||
|
||||
public static IndexMetaData readFrom(StreamInput in) throws IOException {
|
||||
Builder builder = new Builder(in.readUTF());
|
||||
builder.version(in.readLong());
|
||||
builder.state(State.fromId(in.readByte()));
|
||||
builder.settings(readSettingsFromStream(in));
|
||||
int mappingsSize = in.readVInt();
|
||||
|
@ -474,6 +503,7 @@ public class IndexMetaData {
|
|||
|
||||
public static void writeTo(IndexMetaData indexMetaData, StreamOutput out) throws IOException {
|
||||
out.writeUTF(indexMetaData.index());
|
||||
out.writeLong(indexMetaData.version());
|
||||
out.writeByte(indexMetaData.state().id());
|
||||
writeSettingsToStream(indexMetaData.settings(), out);
|
||||
out.writeVInt(indexMetaData.mappings().size());
|
||||
|
|
|
@ -101,6 +101,32 @@ public class IndexTemplateMetaData {
|
|||
return new Builder(name);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object o) {
|
||||
if (this == o) return true;
|
||||
if (o == null || getClass() != o.getClass()) return false;
|
||||
|
||||
IndexTemplateMetaData that = (IndexTemplateMetaData) o;
|
||||
|
||||
if (order != that.order) return false;
|
||||
if (!mappings.equals(that.mappings)) return false;
|
||||
if (!name.equals(that.name)) return false;
|
||||
if (!settings.equals(that.settings)) return false;
|
||||
if (!template.equals(that.template)) return false;
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
int result = name.hashCode();
|
||||
result = 31 * result + order;
|
||||
result = 31 * result + template.hashCode();
|
||||
result = 31 * result + settings.hashCode();
|
||||
result = 31 * result + mappings.hashCode();
|
||||
return result;
|
||||
}
|
||||
|
||||
public static class Builder {
|
||||
|
||||
private String name;
|
||||
|
|
|
@ -628,6 +628,12 @@ public class MetaData implements Iterable<IndexMetaData> {
|
|||
return indices.values().iterator();
|
||||
}
|
||||
|
||||
public static boolean isGlobalStateEquals(MetaData metaData1, MetaData metaData2) {
|
||||
if (!metaData1.persistentSettings.equals(metaData2.persistentSettings)) return false;
|
||||
if (!metaData1.templates.equals(metaData2.templates())) return false;
|
||||
return true;
|
||||
}
|
||||
|
||||
public static Builder builder() {
|
||||
return new Builder();
|
||||
}
|
||||
|
@ -658,10 +664,21 @@ public class MetaData implements Iterable<IndexMetaData> {
|
|||
}
|
||||
|
||||
public Builder put(IndexMetaData.Builder indexMetaDataBuilder) {
|
||||
return put(indexMetaDataBuilder.build());
|
||||
// we know its a new one, increment the version and store
|
||||
indexMetaDataBuilder.version(indexMetaDataBuilder.version() + 1);
|
||||
IndexMetaData indexMetaData = indexMetaDataBuilder.build();
|
||||
indices.put(indexMetaData.index(), indexMetaData);
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder put(IndexMetaData indexMetaData) {
|
||||
public Builder put(IndexMetaData indexMetaData, boolean incrementVersion) {
|
||||
if (indices.get(indexMetaData.index()) == indexMetaData) {
|
||||
return this;
|
||||
}
|
||||
// if we put a new index metadata, increment its version
|
||||
if (incrementVersion) {
|
||||
indexMetaData = IndexMetaData.newIndexMetaDataBuilder(indexMetaData).version(indexMetaData.version() + 1).build();
|
||||
}
|
||||
indices.put(indexMetaData.index(), indexMetaData);
|
||||
return this;
|
||||
}
|
||||
|
@ -675,6 +692,11 @@ public class MetaData implements Iterable<IndexMetaData> {
|
|||
return this;
|
||||
}
|
||||
|
||||
public Builder removeAllIndices() {
|
||||
indices.clear();
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder put(IndexTemplateMetaData.Builder template) {
|
||||
return put(template.build());
|
||||
}
|
||||
|
@ -699,8 +721,7 @@ public class MetaData implements Iterable<IndexMetaData> {
|
|||
throw new IndexMissingException(new Index(index));
|
||||
}
|
||||
put(IndexMetaData.newIndexMetaDataBuilder(indexMetaData)
|
||||
.settings(settingsBuilder().put(indexMetaData.settings()).put(settings))
|
||||
.build());
|
||||
.settings(settingsBuilder().put(indexMetaData.settings()).put(settings)));
|
||||
}
|
||||
return this;
|
||||
}
|
||||
|
@ -714,7 +735,7 @@ public class MetaData implements Iterable<IndexMetaData> {
|
|||
if (indexMetaData == null) {
|
||||
throw new IndexMissingException(new Index(index));
|
||||
}
|
||||
put(IndexMetaData.newIndexMetaDataBuilder(indexMetaData).numberOfReplicas(numberOfReplicas).build());
|
||||
put(IndexMetaData.newIndexMetaDataBuilder(indexMetaData).numberOfReplicas(numberOfReplicas));
|
||||
}
|
||||
return this;
|
||||
}
|
||||
|
@ -757,6 +778,8 @@ public class MetaData implements Iterable<IndexMetaData> {
|
|||
public static void toXContent(MetaData metaData, XContentBuilder builder, ToXContent.Params params) throws IOException {
|
||||
builder.startObject("meta-data");
|
||||
|
||||
builder.field("version", metaData.version());
|
||||
|
||||
if (!metaData.persistentSettings().getAsMap().isEmpty()) {
|
||||
builder.startObject("settings");
|
||||
for (Map.Entry<String, String> entry : metaData.persistentSettings().getAsMap().entrySet()) {
|
||||
|
@ -771,11 +794,13 @@ public class MetaData implements Iterable<IndexMetaData> {
|
|||
}
|
||||
builder.endObject();
|
||||
|
||||
builder.startObject("indices");
|
||||
for (IndexMetaData indexMetaData : metaData) {
|
||||
IndexMetaData.Builder.toXContent(indexMetaData, builder, params);
|
||||
if (!metaData.indices().isEmpty()) {
|
||||
builder.startObject("indices");
|
||||
for (IndexMetaData indexMetaData : metaData) {
|
||||
IndexMetaData.Builder.toXContent(indexMetaData, builder, params);
|
||||
}
|
||||
builder.endObject();
|
||||
}
|
||||
builder.endObject();
|
||||
|
||||
builder.endObject();
|
||||
}
|
||||
|
@ -809,13 +834,17 @@ public class MetaData implements Iterable<IndexMetaData> {
|
|||
builder.persistentSettings(settingsBuilder.build());
|
||||
} else if ("indices".equals(currentFieldName)) {
|
||||
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
|
||||
builder.put(IndexMetaData.Builder.fromXContent(parser));
|
||||
builder.put(IndexMetaData.Builder.fromXContent(parser), false);
|
||||
}
|
||||
} else if ("templates".equals(currentFieldName)) {
|
||||
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
|
||||
builder.put(IndexTemplateMetaData.Builder.fromXContent(parser));
|
||||
}
|
||||
}
|
||||
} else if (token.isValue()) {
|
||||
if ("version".equals(currentFieldName)) {
|
||||
builder.version = parser.longValue();
|
||||
}
|
||||
}
|
||||
}
|
||||
return builder.build();
|
||||
|
@ -828,7 +857,7 @@ public class MetaData implements Iterable<IndexMetaData> {
|
|||
builder.persistentSettings(readSettingsFromStream(in));
|
||||
int size = in.readVInt();
|
||||
for (int i = 0; i < size; i++) {
|
||||
builder.put(IndexMetaData.Builder.readFrom(in));
|
||||
builder.put(IndexMetaData.Builder.readFrom(in), false);
|
||||
}
|
||||
size = in.readVInt();
|
||||
for (int i = 0; i < size; i++) {
|
||||
|
|
|
@ -256,7 +256,7 @@ public class MetaDataCreateIndexService extends AbstractComponent {
|
|||
|
||||
MetaData newMetaData = newMetaDataBuilder()
|
||||
.metaData(currentState.metaData())
|
||||
.put(indexMetaData)
|
||||
.put(indexMetaData, false)
|
||||
.build();
|
||||
|
||||
logger.info("[{}] creating index, cause [{}], shards [{}]/[{}], mappings {}", request.index, request.cause, indexMetaData.numberOfShards(), indexMetaData.numberOfReplicas(), mappings.keySet());
|
||||
|
|
|
@ -264,6 +264,7 @@ public class MetaDataMappingService extends AbstractComponent {
|
|||
}
|
||||
|
||||
public void putMapping(final PutRequest request, final Listener listener) {
|
||||
final AtomicBoolean notifyOnPostProcess = new AtomicBoolean();
|
||||
clusterService.submitStateUpdateTask("put-mapping [" + request.mappingType + "]", new ProcessedClusterStateUpdateTask() {
|
||||
@Override
|
||||
public ClusterState execute(ClusterState currentState) {
|
||||
|
@ -394,7 +395,7 @@ public class MetaDataMappingService extends AbstractComponent {
|
|||
}
|
||||
|
||||
if (counter == 0) {
|
||||
listener.onResponse(new Response(true));
|
||||
notifyOnPostProcess.set(true);
|
||||
return updatedState;
|
||||
}
|
||||
mappingCreatedAction.add(new CountDownListener(counter, listener), request.timeout);
|
||||
|
@ -411,6 +412,9 @@ public class MetaDataMappingService extends AbstractComponent {
|
|||
|
||||
@Override
|
||||
public void clusterStateProcessed(ClusterState clusterState) {
|
||||
if (notifyOnPostProcess.get()) {
|
||||
listener.onResponse(new Response(true));
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
|
|
@ -61,15 +61,19 @@ public class RoutingService extends AbstractLifecycleComponent<RoutingService> i
|
|||
this.clusterService = clusterService;
|
||||
this.allocationService = allocationService;
|
||||
this.schedule = componentSettings.getAsTime("schedule", timeValueSeconds(10));
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doStart() throws ElasticSearchException {
|
||||
clusterService.addPriority(this);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doStart() throws ElasticSearchException {
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doStop() throws ElasticSearchException {
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doClose() throws ElasticSearchException {
|
||||
if (scheduledRoutingTableFuture != null) {
|
||||
scheduledRoutingTableFuture.cancel(true);
|
||||
scheduledRoutingTableFuture = null;
|
||||
|
@ -77,10 +81,6 @@ public class RoutingService extends AbstractLifecycleComponent<RoutingService> i
|
|||
clusterService.remove(this);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doClose() throws ElasticSearchException {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void clusterChanged(ClusterChangedEvent event) {
|
||||
if (event.source().equals(CLUSTER_UPDATE_TASK_SOURCE)) {
|
||||
|
|
|
@ -164,6 +164,8 @@ public class InternalClusterService extends AbstractLifecycleComponent<ClusterSe
|
|||
|
||||
public void remove(ClusterStateListener listener) {
|
||||
clusterStateListeners.remove(listener);
|
||||
priorityClusterStateListeners.remove(listener);
|
||||
lastClusterStateListeners.remove(listener);
|
||||
for (Iterator<NotifyTimeout> it = onGoingTimeouts.iterator(); it.hasNext(); ) {
|
||||
NotifyTimeout timeout = it.next();
|
||||
if (timeout.listener.equals(listener)) {
|
||||
|
@ -204,109 +206,126 @@ public class InternalClusterService extends AbstractLifecycleComponent<ClusterSe
|
|||
}
|
||||
logger.debug("processing [{}]: execute", source);
|
||||
ClusterState previousClusterState = clusterState;
|
||||
ClusterState newClusterState;
|
||||
try {
|
||||
clusterState = updateTask.execute(previousClusterState);
|
||||
newClusterState = updateTask.execute(previousClusterState);
|
||||
} catch (Exception e) {
|
||||
StringBuilder sb = new StringBuilder("failed to execute cluster state update, state:\nversion [").append(clusterState.version()).append("], source [").append(source).append("]\n");
|
||||
sb.append(clusterState.nodes().prettyPrint());
|
||||
sb.append(clusterState.routingTable().prettyPrint());
|
||||
sb.append(clusterState.readOnlyRoutingNodes().prettyPrint());
|
||||
StringBuilder sb = new StringBuilder("failed to execute cluster state update, state:\nversion [").append(previousClusterState.version()).append("], source [").append(source).append("]\n");
|
||||
sb.append(previousClusterState.nodes().prettyPrint());
|
||||
sb.append(previousClusterState.routingTable().prettyPrint());
|
||||
sb.append(previousClusterState.readOnlyRoutingNodes().prettyPrint());
|
||||
logger.warn(sb.toString(), e);
|
||||
return;
|
||||
}
|
||||
if (previousClusterState != clusterState) {
|
||||
try {
|
||||
if (clusterState.nodes().localNodeMaster()) {
|
||||
// only the master controls the version numbers
|
||||
Builder builder = ClusterState.builder().state(clusterState).version(clusterState.version() + 1);
|
||||
if (previousClusterState.routingTable() != clusterState.routingTable()) {
|
||||
builder.routingTable(RoutingTable.builder().routingTable(clusterState.routingTable()).version(clusterState.routingTable().version() + 1));
|
||||
}
|
||||
if (previousClusterState.metaData() != clusterState.metaData()) {
|
||||
builder.metaData(MetaData.builder().metaData(clusterState.metaData()).version(clusterState.metaData().version() + 1));
|
||||
}
|
||||
clusterState = builder.build();
|
||||
} else {
|
||||
// we got this cluster state from the master, filter out based on versions (don't call listeners)
|
||||
if (clusterState.version() < previousClusterState.version()) {
|
||||
logger.debug("got old cluster state [" + clusterState.version() + "<" + previousClusterState.version() + "] from source [" + source + "], ignoring");
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
if (logger.isTraceEnabled()) {
|
||||
StringBuilder sb = new StringBuilder("cluster state updated:\nversion [").append(clusterState.version()).append("], source [").append(source).append("]\n");
|
||||
sb.append(clusterState.nodes().prettyPrint());
|
||||
sb.append(clusterState.routingTable().prettyPrint());
|
||||
sb.append(clusterState.readOnlyRoutingNodes().prettyPrint());
|
||||
logger.trace(sb.toString());
|
||||
} else if (logger.isDebugEnabled()) {
|
||||
logger.debug("cluster state updated, version [{}], source [{}]", clusterState.version(), source);
|
||||
}
|
||||
|
||||
ClusterChangedEvent clusterChangedEvent = new ClusterChangedEvent(source, clusterState, previousClusterState);
|
||||
// new cluster state, notify all listeners
|
||||
final DiscoveryNodes.Delta nodesDelta = clusterChangedEvent.nodesDelta();
|
||||
if (nodesDelta.hasChanges() && logger.isInfoEnabled()) {
|
||||
String summary = nodesDelta.shortSummary();
|
||||
if (summary.length() > 0) {
|
||||
logger.info("{}, reason: {}", summary, source);
|
||||
}
|
||||
}
|
||||
|
||||
// TODO, do this in parallel (and wait)
|
||||
for (DiscoveryNode node : nodesDelta.addedNodes()) {
|
||||
if (!nodeRequiresConnection(node)) {
|
||||
continue;
|
||||
}
|
||||
try {
|
||||
transportService.connectToNode(node);
|
||||
} catch (Exception e) {
|
||||
// the fault detection will detect it as failed as well
|
||||
logger.warn("failed to connect to node [" + node + "]", e);
|
||||
}
|
||||
}
|
||||
|
||||
for (ClusterStateListener listener : priorityClusterStateListeners) {
|
||||
listener.clusterChanged(clusterChangedEvent);
|
||||
}
|
||||
for (ClusterStateListener listener : clusterStateListeners) {
|
||||
listener.clusterChanged(clusterChangedEvent);
|
||||
}
|
||||
for (ClusterStateListener listener : lastClusterStateListeners) {
|
||||
listener.clusterChanged(clusterChangedEvent);
|
||||
}
|
||||
|
||||
if (!nodesDelta.removedNodes().isEmpty()) {
|
||||
threadPool.cached().execute(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
for (DiscoveryNode node : nodesDelta.removedNodes()) {
|
||||
transportService.disconnectFromNode(node);
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
// if we are the master, publish the new state to all nodes
|
||||
if (clusterState.nodes().localNodeMaster()) {
|
||||
discoveryService.publish(clusterState);
|
||||
}
|
||||
|
||||
if (updateTask instanceof ProcessedClusterStateUpdateTask) {
|
||||
((ProcessedClusterStateUpdateTask) updateTask).clusterStateProcessed(clusterState);
|
||||
}
|
||||
|
||||
logger.debug("processing [{}]: done applying updated cluster_state", source);
|
||||
} catch (Exception e) {
|
||||
StringBuilder sb = new StringBuilder("failed to apply updated cluster state:\nversion [").append(clusterState.version()).append("], source [").append(source).append("]\n");
|
||||
sb.append(clusterState.nodes().prettyPrint());
|
||||
sb.append(clusterState.routingTable().prettyPrint());
|
||||
sb.append(clusterState.readOnlyRoutingNodes().prettyPrint());
|
||||
logger.warn(sb.toString(), e);
|
||||
}
|
||||
} else {
|
||||
if (previousClusterState == newClusterState) {
|
||||
logger.debug("processing [{}]: no change in cluster_state", source);
|
||||
return;
|
||||
}
|
||||
|
||||
try {
|
||||
if (newClusterState.nodes().localNodeMaster()) {
|
||||
// only the master controls the version numbers
|
||||
Builder builder = ClusterState.builder().state(newClusterState).version(newClusterState.version() + 1);
|
||||
if (previousClusterState.routingTable() != newClusterState.routingTable()) {
|
||||
builder.routingTable(RoutingTable.builder().routingTable(newClusterState.routingTable()).version(newClusterState.routingTable().version() + 1));
|
||||
}
|
||||
if (previousClusterState.metaData() != newClusterState.metaData()) {
|
||||
builder.metaData(MetaData.builder().metaData(newClusterState.metaData()).version(newClusterState.metaData().version() + 1));
|
||||
}
|
||||
newClusterState = builder.build();
|
||||
} else {
|
||||
if (previousClusterState.blocks().hasGlobalBlock(Discovery.NO_MASTER_BLOCK) && !newClusterState.blocks().hasGlobalBlock(Discovery.NO_MASTER_BLOCK)) {
|
||||
// force an update, its a fresh update from the master as we transition from a start of not having a master to having one
|
||||
// have a fresh instances of routing and metadata to remove the chance that version might be the same
|
||||
Builder builder = ClusterState.builder().state(newClusterState);
|
||||
builder.routingTable(RoutingTable.builder().routingTable(newClusterState.routingTable()));
|
||||
builder.metaData(MetaData.builder().metaData(newClusterState.metaData()));
|
||||
newClusterState = builder.build();
|
||||
logger.debug("got first state from fresh master [{}]", newClusterState.nodes().masterNodeId());
|
||||
} else if (newClusterState.version() < previousClusterState.version()) {
|
||||
// we got this cluster state from the master, filter out based on versions (don't call listeners)
|
||||
logger.debug("got old cluster state [" + newClusterState.version() + "<" + previousClusterState.version() + "] from source [" + source + "], ignoring");
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
if (logger.isTraceEnabled()) {
|
||||
StringBuilder sb = new StringBuilder("cluster state updated:\nversion [").append(newClusterState.version()).append("], source [").append(source).append("]\n");
|
||||
sb.append(newClusterState.nodes().prettyPrint());
|
||||
sb.append(newClusterState.routingTable().prettyPrint());
|
||||
sb.append(newClusterState.readOnlyRoutingNodes().prettyPrint());
|
||||
logger.trace(sb.toString());
|
||||
} else if (logger.isDebugEnabled()) {
|
||||
logger.debug("cluster state updated, version [{}], source [{}]", newClusterState.version(), source);
|
||||
}
|
||||
|
||||
ClusterChangedEvent clusterChangedEvent = new ClusterChangedEvent(source, newClusterState, previousClusterState);
|
||||
// new cluster state, notify all listeners
|
||||
final DiscoveryNodes.Delta nodesDelta = clusterChangedEvent.nodesDelta();
|
||||
if (nodesDelta.hasChanges() && logger.isInfoEnabled()) {
|
||||
String summary = nodesDelta.shortSummary();
|
||||
if (summary.length() > 0) {
|
||||
logger.info("{}, reason: {}", summary, source);
|
||||
}
|
||||
}
|
||||
|
||||
// TODO, do this in parallel (and wait)
|
||||
for (DiscoveryNode node : nodesDelta.addedNodes()) {
|
||||
if (!nodeRequiresConnection(node)) {
|
||||
continue;
|
||||
}
|
||||
try {
|
||||
transportService.connectToNode(node);
|
||||
} catch (Exception e) {
|
||||
// the fault detection will detect it as failed as well
|
||||
logger.warn("failed to connect to node [" + node + "]", e);
|
||||
}
|
||||
}
|
||||
|
||||
// if we are the master, publish the new state to all nodes
|
||||
// we publish here before we send a notification to all the listeners, since if it fails
|
||||
// we don't want to notify
|
||||
if (newClusterState.nodes().localNodeMaster()) {
|
||||
discoveryService.publish(newClusterState);
|
||||
}
|
||||
|
||||
// update the current cluster state
|
||||
clusterState = newClusterState;
|
||||
|
||||
for (ClusterStateListener listener : priorityClusterStateListeners) {
|
||||
listener.clusterChanged(clusterChangedEvent);
|
||||
}
|
||||
for (ClusterStateListener listener : clusterStateListeners) {
|
||||
listener.clusterChanged(clusterChangedEvent);
|
||||
}
|
||||
for (ClusterStateListener listener : lastClusterStateListeners) {
|
||||
listener.clusterChanged(clusterChangedEvent);
|
||||
}
|
||||
|
||||
if (!nodesDelta.removedNodes().isEmpty()) {
|
||||
threadPool.cached().execute(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
for (DiscoveryNode node : nodesDelta.removedNodes()) {
|
||||
transportService.disconnectFromNode(node);
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
if (updateTask instanceof ProcessedClusterStateUpdateTask) {
|
||||
((ProcessedClusterStateUpdateTask) updateTask).clusterStateProcessed(newClusterState);
|
||||
}
|
||||
|
||||
logger.debug("processing [{}]: done applying updated cluster_state", source);
|
||||
} catch (Exception e) {
|
||||
StringBuilder sb = new StringBuilder("failed to apply updated cluster state:\nversion [").append(newClusterState.version()).append("], source [").append(source).append("]\n");
|
||||
sb.append(newClusterState.nodes().prettyPrint());
|
||||
sb.append(newClusterState.routingTable().prettyPrint());
|
||||
sb.append(newClusterState.readOnlyRoutingNodes().prettyPrint());
|
||||
logger.warn(sb.toString(), e);
|
||||
}
|
||||
}
|
||||
});
|
||||
|
|
|
@ -59,6 +59,11 @@ public class MapBuilder<K, V> {
|
|||
return this;
|
||||
}
|
||||
|
||||
public MapBuilder<K, V> clear() {
|
||||
this.map.clear();
|
||||
return this;
|
||||
}
|
||||
|
||||
public V get(K key) {
|
||||
return map.get(key);
|
||||
}
|
||||
|
|
|
@ -315,9 +315,9 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
|
|||
continue;
|
||||
}
|
||||
// send join request
|
||||
ClusterState clusterState;
|
||||
ClusterState joinClusterStateX;
|
||||
try {
|
||||
clusterState = membership.sendJoinRequestBlocking(masterNode, localNode, pingTimeout);
|
||||
joinClusterStateX = membership.sendJoinRequestBlocking(masterNode, localNode, pingTimeout);
|
||||
} catch (Exception e) {
|
||||
if (e instanceof ElasticSearchException) {
|
||||
logger.info("failed to send join request to master [{}], reason [{}]", masterNode, ((ElasticSearchException) e).getDetailedMessage());
|
||||
|
@ -332,26 +332,8 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
|
|||
continue;
|
||||
}
|
||||
masterFD.start(masterNode, "initial_join");
|
||||
|
||||
// we update the metadata once we managed to join, so we pre-create indices and so on (no shards allocation)
|
||||
final MetaData metaData = clusterState.metaData();
|
||||
// sync also the version with the version the master currently has, so the next update will be applied
|
||||
final long version = clusterState.version();
|
||||
clusterService.submitStateUpdateTask("zen-disco-join (detected master)", new ProcessedClusterStateUpdateTask() {
|
||||
@Override
|
||||
public ClusterState execute(ClusterState currentState) {
|
||||
ClusterBlocks clusterBlocks = ClusterBlocks.builder().blocks(currentState.blocks()).removeGlobalBlock(NO_MASTER_BLOCK).build();
|
||||
// make sure we have the local node id set, we might need it as a result of the new metadata
|
||||
DiscoveryNodes.Builder nodesBuilder = DiscoveryNodes.newNodesBuilder().putAll(currentState.nodes()).put(localNode).localNodeId(localNode.id());
|
||||
return newClusterStateBuilder().state(currentState).nodes(nodesBuilder).blocks(clusterBlocks).metaData(metaData).version(version).build();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void clusterStateProcessed(ClusterState clusterState) {
|
||||
// don't send initial state event, since we want to get the cluster state from the master that includes us first
|
||||
// sendInitialStateEventIfNeeded();
|
||||
}
|
||||
});
|
||||
// no need to submit the received cluster state, we will get it from the master when it publishes
|
||||
// the fact that we joined
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -20,6 +20,7 @@
|
|||
package org.elasticsearch.env;
|
||||
|
||||
import com.google.common.collect.Sets;
|
||||
import com.google.common.primitives.Ints;
|
||||
import org.apache.lucene.store.Lock;
|
||||
import org.apache.lucene.store.NativeFSLockFactory;
|
||||
import org.elasticsearch.ElasticSearchIllegalStateException;
|
||||
|
@ -177,6 +178,23 @@ public class NodeEnvironment extends AbstractComponent {
|
|||
return shardLocations;
|
||||
}
|
||||
|
||||
public Set<String> finalAllIndices() throws Exception {
|
||||
if (nodeFiles == null || locks == null) {
|
||||
throw new ElasticSearchIllegalStateException("node is not configured to store local location");
|
||||
}
|
||||
Set<String> indices = Sets.newHashSet();
|
||||
for (File indicesLocation : nodeIndicesLocations) {
|
||||
File[] indicesList = indicesLocation.listFiles();
|
||||
if (indicesList == null) {
|
||||
continue;
|
||||
}
|
||||
for (File indexLocation : indicesList) {
|
||||
indices.add(indexLocation.getName());
|
||||
}
|
||||
}
|
||||
return indices;
|
||||
}
|
||||
|
||||
public Set<ShardId> findAllShardIds() throws Exception {
|
||||
if (nodeFiles == null || locks == null) {
|
||||
throw new ElasticSearchIllegalStateException("node is not configured to store local location");
|
||||
|
@ -200,7 +218,10 @@ public class NodeEnvironment extends AbstractComponent {
|
|||
if (!shardLocation.isDirectory()) {
|
||||
continue;
|
||||
}
|
||||
shardIds.add(new ShardId(indexName, Integer.parseInt(shardLocation.getName())));
|
||||
Integer shardId = Ints.tryParse(shardLocation.getName());
|
||||
if (shardId != null) {
|
||||
shardIds.add(new ShardId(indexName, shardId));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -39,6 +39,6 @@ public interface Gateway extends LifecycleComponent<Gateway> {
|
|||
interface GatewayStateRecoveredListener {
|
||||
void onSuccess(ClusterState recoveredState);
|
||||
|
||||
void onFailure(Throwable t);
|
||||
void onFailure(String message);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -25,7 +25,6 @@ import org.elasticsearch.cluster.block.ClusterBlock;
|
|||
import org.elasticsearch.cluster.block.ClusterBlockLevel;
|
||||
import org.elasticsearch.cluster.block.ClusterBlocks;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||
import org.elasticsearch.cluster.metadata.IndexTemplateMetaData;
|
||||
import org.elasticsearch.cluster.metadata.MetaData;
|
||||
import org.elasticsearch.cluster.metadata.MetaDataStateIndexService;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNodes;
|
||||
|
@ -42,7 +41,6 @@ import org.elasticsearch.discovery.DiscoveryService;
|
|||
import org.elasticsearch.rest.RestStatus;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
|
@ -92,7 +90,8 @@ public class GatewayService extends AbstractLifecycleComponent<GatewayService> i
|
|||
this.expectedNodes = componentSettings.getAsInt("expected_nodes", -1);
|
||||
this.recoverAfterDataNodes = componentSettings.getAsInt("recover_after_data_nodes", -1);
|
||||
this.expectedDataNodes = componentSettings.getAsInt("expected_data_nodes", -1);
|
||||
this.recoverAfterMasterNodes = componentSettings.getAsInt("recover_after_master_nodes", -1);
|
||||
// default the recover after master nodes to the minimum master nodes in the discovery
|
||||
this.recoverAfterMasterNodes = componentSettings.getAsInt("recover_after_master_nodes", settings.getAsInt("discovery.zen.minimum_master_nodes", -1));
|
||||
this.expectedMasterNodes = componentSettings.getAsInt("expected_master_nodes", -1);
|
||||
|
||||
// Add the not recovered as initial state block, we don't allow anything until
|
||||
|
@ -140,7 +139,7 @@ public class GatewayService extends AbstractLifecycleComponent<GatewayService> i
|
|||
} else {
|
||||
logger.debug("can't wait on start for (possibly) reading state from gateway, will do it asynchronously");
|
||||
}
|
||||
clusterService.add(this);
|
||||
clusterService.addLast(this);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -249,23 +248,14 @@ public class GatewayService extends AbstractLifecycleComponent<GatewayService> i
|
|||
.removeGlobalBlock(STATE_NOT_RECOVERED_BLOCK);
|
||||
|
||||
MetaData.Builder metaDataBuilder = newMetaDataBuilder()
|
||||
.metaData(currentState.metaData());
|
||||
metaDataBuilder.version(recoveredState.version());
|
||||
|
||||
metaDataBuilder.persistentSettings(recoveredState.metaData().persistentSettings());
|
||||
|
||||
// add the index templates
|
||||
for (Map.Entry<String, IndexTemplateMetaData> entry : recoveredState.metaData().templates().entrySet()) {
|
||||
metaDataBuilder.put(entry.getValue());
|
||||
}
|
||||
|
||||
.metaData(recoveredState.metaData());
|
||||
|
||||
if (recoveredState.metaData().settings().getAsBoolean(MetaData.SETTING_READ_ONLY, false) || currentState.metaData().settings().getAsBoolean(MetaData.SETTING_READ_ONLY, false)) {
|
||||
blocks.addGlobalBlock(MetaData.CLUSTER_READ_ONLY_BLOCK);
|
||||
}
|
||||
|
||||
for (IndexMetaData indexMetaData : recoveredState.metaData()) {
|
||||
metaDataBuilder.put(indexMetaData);
|
||||
metaDataBuilder.put(indexMetaData, false);
|
||||
if (indexMetaData.state() == IndexMetaData.State.CLOSE) {
|
||||
blocks.addIndexBlock(indexMetaData.index(), MetaDataStateIndexService.INDEX_CLOSED_BLOCK);
|
||||
}
|
||||
|
@ -276,7 +266,6 @@ public class GatewayService extends AbstractLifecycleComponent<GatewayService> i
|
|||
|
||||
// update the state to reflect the new metadata and routing
|
||||
ClusterState updatedState = newClusterStateBuilder().state(currentState)
|
||||
.version(recoveredState.version())
|
||||
.blocks(blocks)
|
||||
.metaData(metaDataBuilder)
|
||||
.build();
|
||||
|
@ -290,7 +279,8 @@ public class GatewayService extends AbstractLifecycleComponent<GatewayService> i
|
|||
routingTableBuilder.add(indexRoutingBuilder);
|
||||
}
|
||||
}
|
||||
routingTableBuilder.version(recoveredState.version());
|
||||
// start with 0 based versions for routing table
|
||||
routingTableBuilder.version(0);
|
||||
|
||||
// now, reroute
|
||||
RoutingAllocation.Result routingResult = allocationService.reroute(newClusterStateBuilder().state(updatedState).routingTable(routingTableBuilder).build());
|
||||
|
@ -307,9 +297,11 @@ public class GatewayService extends AbstractLifecycleComponent<GatewayService> i
|
|||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Throwable t) {
|
||||
public void onFailure(String message) {
|
||||
recovered.set(false);
|
||||
scheduledRecovery.set(false);
|
||||
// don't remove the block here, we don't want to allow anything in such a case
|
||||
logger.error("failed recover state, blocking...", t);
|
||||
logger.info("metadata state not restored, reason: {}", message);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -20,37 +20,29 @@
|
|||
package org.elasticsearch.gateway.local;
|
||||
|
||||
import com.google.common.collect.Sets;
|
||||
import com.google.common.io.Closeables;
|
||||
import gnu.trove.map.hash.TObjectIntHashMap;
|
||||
import org.elasticsearch.ElasticSearchException;
|
||||
import org.elasticsearch.action.FailedNodeException;
|
||||
import org.elasticsearch.cluster.ClusterChangedEvent;
|
||||
import org.elasticsearch.cluster.ClusterService;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.ClusterStateListener;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||
import org.elasticsearch.cluster.metadata.MetaData;
|
||||
import org.elasticsearch.common.component.AbstractLifecycleComponent;
|
||||
import org.elasticsearch.common.compress.lzf.LZF;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.inject.Module;
|
||||
import org.elasticsearch.common.io.FileSystemUtils;
|
||||
import org.elasticsearch.common.io.Streams;
|
||||
import org.elasticsearch.common.io.stream.*;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.thread.LoggingRunnable;
|
||||
import org.elasticsearch.common.xcontent.*;
|
||||
import org.elasticsearch.env.NodeEnvironment;
|
||||
import org.elasticsearch.gateway.Gateway;
|
||||
import org.elasticsearch.gateway.GatewayException;
|
||||
import org.elasticsearch.gateway.local.state.meta.LocalGatewayMetaState;
|
||||
import org.elasticsearch.gateway.local.state.meta.TransportNodesListGatewayMetaState;
|
||||
import org.elasticsearch.gateway.local.state.shards.LocalGatewayShardsState;
|
||||
import org.elasticsearch.index.gateway.local.LocalIndexGatewayModule;
|
||||
|
||||
import java.io.*;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import static java.util.concurrent.Executors.newSingleThreadExecutor;
|
||||
import static org.elasticsearch.common.util.concurrent.EsExecutors.daemonThreadFactory;
|
||||
|
||||
/**
|
||||
*
|
||||
|
@ -62,32 +54,28 @@ public class LocalGateway extends AbstractLifecycleComponent<Gateway> implements
|
|||
private final NodeEnvironment nodeEnv;
|
||||
|
||||
private final LocalGatewayShardsState shardsState;
|
||||
private final LocalGatewayMetaState metaState;
|
||||
|
||||
private final TransportNodesListGatewayMetaState listGatewayMetaState;
|
||||
|
||||
private final boolean compress;
|
||||
private final boolean prettyPrint;
|
||||
|
||||
private volatile LocalGatewayMetaState currentMetaState;
|
||||
|
||||
private volatile ExecutorService executor;
|
||||
|
||||
private volatile boolean initialized = false;
|
||||
|
||||
private volatile boolean metaDataPersistedAtLeastOnce = false;
|
||||
private final String initialMeta;
|
||||
|
||||
@Inject
|
||||
public LocalGateway(Settings settings, ClusterService clusterService, NodeEnvironment nodeEnv, LocalGatewayShardsState shardsState,
|
||||
public LocalGateway(Settings settings, ClusterService clusterService, NodeEnvironment nodeEnv,
|
||||
LocalGatewayShardsState shardsState, LocalGatewayMetaState metaState,
|
||||
TransportNodesListGatewayMetaState listGatewayMetaState) {
|
||||
super(settings);
|
||||
this.clusterService = clusterService;
|
||||
this.nodeEnv = nodeEnv;
|
||||
this.listGatewayMetaState = listGatewayMetaState.initGateway(this);
|
||||
this.metaState = metaState;
|
||||
this.listGatewayMetaState = listGatewayMetaState;
|
||||
|
||||
this.shardsState = shardsState;
|
||||
|
||||
this.compress = componentSettings.getAsBoolean("compress", true);
|
||||
this.prettyPrint = componentSettings.getAsBoolean("pretty", false);
|
||||
clusterService.addLast(this);
|
||||
|
||||
// we define what is our minimum "master" nodes, use that to allow for recovery
|
||||
this.initialMeta = componentSettings.get("initial_meta", settings.get("discovery.zen.minimum_master_nodes", "1"));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -95,31 +83,17 @@ public class LocalGateway extends AbstractLifecycleComponent<Gateway> implements
|
|||
return "local";
|
||||
}
|
||||
|
||||
public LocalGatewayMetaState currentMetaState() {
|
||||
lazyInitialize();
|
||||
return this.currentMetaState;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doStart() throws ElasticSearchException {
|
||||
this.executor = newSingleThreadExecutor(daemonThreadFactory(settings, "gateway"));
|
||||
lazyInitialize();
|
||||
clusterService.addLast(this);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doStop() throws ElasticSearchException {
|
||||
clusterService.remove(this);
|
||||
executor.shutdown();
|
||||
try {
|
||||
executor.awaitTermination(10, TimeUnit.SECONDS);
|
||||
} catch (InterruptedException e) {
|
||||
// ignore
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doClose() throws ElasticSearchException {
|
||||
clusterService.remove(this);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -128,32 +102,90 @@ public class LocalGateway extends AbstractLifecycleComponent<Gateway> implements
|
|||
nodesIds.addAll(clusterService.state().nodes().masterNodes().keySet());
|
||||
TransportNodesListGatewayMetaState.NodesLocalGatewayMetaState nodesState = listGatewayMetaState.list(nodesIds, null).actionGet();
|
||||
|
||||
|
||||
int requiredAllocation = 1;
|
||||
try {
|
||||
if ("quorum".equals(initialMeta)) {
|
||||
if (nodesIds.size() > 2) {
|
||||
requiredAllocation = (nodesIds.size() / 2) + 1;
|
||||
}
|
||||
} else if ("quorum-1".equals(initialMeta) || "half".equals(initialMeta)) {
|
||||
if (nodesIds.size() > 2) {
|
||||
requiredAllocation = ((1 + nodesIds.size()) / 2);
|
||||
}
|
||||
} else if ("one".equals(initialMeta)) {
|
||||
requiredAllocation = 1;
|
||||
} else if ("full".equals(initialMeta) || "all".equals(initialMeta)) {
|
||||
requiredAllocation = nodesIds.size();
|
||||
} else if ("full-1".equals(initialMeta) || "all-1".equals(initialMeta)) {
|
||||
if (nodesIds.size() > 1) {
|
||||
requiredAllocation = nodesIds.size() - 1;
|
||||
}
|
||||
} else {
|
||||
requiredAllocation = Integer.parseInt(initialMeta);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
logger.warn("failed to derived initial_meta from value {}", initialMeta);
|
||||
}
|
||||
|
||||
if (nodesState.failures().length > 0) {
|
||||
for (FailedNodeException failedNodeException : nodesState.failures()) {
|
||||
logger.warn("failed to fetch state from node", failedNodeException);
|
||||
}
|
||||
}
|
||||
|
||||
TransportNodesListGatewayMetaState.NodeLocalGatewayMetaState electedState = null;
|
||||
MetaData.Builder metaDataBuilder = MetaData.builder();
|
||||
TObjectIntHashMap<String> indices = new TObjectIntHashMap<String>();
|
||||
MetaData electedGlobalState = null;
|
||||
int found = 0;
|
||||
for (TransportNodesListGatewayMetaState.NodeLocalGatewayMetaState nodeState : nodesState) {
|
||||
if (nodeState.state() == null) {
|
||||
if (nodeState.metaData() == null) {
|
||||
continue;
|
||||
}
|
||||
if (electedState == null) {
|
||||
electedState = nodeState;
|
||||
} else if (nodeState.state().version() > electedState.state().version()) {
|
||||
electedState = nodeState;
|
||||
found++;
|
||||
if (electedGlobalState == null) {
|
||||
electedGlobalState = nodeState.metaData();
|
||||
} else if (nodeState.metaData().version() > electedGlobalState.version()) {
|
||||
electedGlobalState = nodeState.metaData();
|
||||
}
|
||||
for (IndexMetaData indexMetaData : nodeState.metaData().indices().values()) {
|
||||
indices.adjustOrPutValue(indexMetaData.index(), 1, 1);
|
||||
}
|
||||
}
|
||||
if (electedState == null) {
|
||||
logger.debug("no state elected");
|
||||
listener.onSuccess(ClusterState.builder().build());
|
||||
} else {
|
||||
logger.debug("elected state from [{}]", electedState.node());
|
||||
ClusterState.Builder builder = ClusterState.builder().version(electedState.state().version());
|
||||
builder.metaData(MetaData.builder().metaData(electedState.state().metaData()).version(electedState.state().version()));
|
||||
listener.onSuccess(builder.build());
|
||||
if (found < requiredAllocation) {
|
||||
listener.onFailure("found [" + found + "] metadata states, required [" + requiredAllocation + "]");
|
||||
return;
|
||||
}
|
||||
// update the global state, and clean the indices, we elect them in the next phase
|
||||
metaDataBuilder.metaData(electedGlobalState).removeAllIndices();
|
||||
for (String index : indices.keySet()) {
|
||||
IndexMetaData electedIndexMetaData = null;
|
||||
int indexMetaDataCount = 0;
|
||||
for (TransportNodesListGatewayMetaState.NodeLocalGatewayMetaState nodeState : nodesState) {
|
||||
if (nodeState.metaData() == null) {
|
||||
continue;
|
||||
}
|
||||
IndexMetaData indexMetaData = nodeState.metaData().index(index);
|
||||
if (indexMetaData == null) {
|
||||
continue;
|
||||
}
|
||||
if (electedIndexMetaData == null) {
|
||||
electedIndexMetaData = indexMetaData;
|
||||
} else if (indexMetaData.version() > electedIndexMetaData.version()) {
|
||||
electedIndexMetaData = indexMetaData;
|
||||
}
|
||||
indexMetaDataCount++;
|
||||
}
|
||||
if (electedIndexMetaData != null) {
|
||||
if (indexMetaDataCount < requiredAllocation) {
|
||||
logger.debug("[{}] found [{}], required [{}], not adding", index, indexMetaDataCount, requiredAllocation);
|
||||
}
|
||||
metaDataBuilder.put(electedIndexMetaData, false);
|
||||
}
|
||||
}
|
||||
ClusterState.Builder builder = ClusterState.builder();
|
||||
builder.metaData(metaDataBuilder);
|
||||
listener.onSuccess(builder.build());
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -172,187 +204,7 @@ public class LocalGateway extends AbstractLifecycleComponent<Gateway> implements
|
|||
if (event.state().blocks().disableStatePersistence()) {
|
||||
return;
|
||||
}
|
||||
|
||||
// we only write the local metadata if this is a possible master node
|
||||
if (event.state().nodes().localNode().masterNode() && (event.metaDataChanged() || !metaDataPersistedAtLeastOnce)) {
|
||||
executor.execute(new LoggingRunnable(logger, new PersistMetaData(event)));
|
||||
}
|
||||
|
||||
metaState.clusterChanged(event);
|
||||
shardsState.clusterChanged(event);
|
||||
}
|
||||
|
||||
/**
|
||||
* We do here lazy initialization on not only on start(), since we might be called before start by another node (really will
|
||||
* happen in term of timing in testing, but still), and we want to return the cluster state when we can.
|
||||
* <p/>
|
||||
* It is synchronized since we want to wait for it to be loaded if called concurrently. There should really be a nicer
|
||||
* solution here, but for now, its good enough.
|
||||
*/
|
||||
private synchronized void lazyInitialize() {
|
||||
if (initialized) {
|
||||
return;
|
||||
}
|
||||
initialized = true;
|
||||
|
||||
if (clusterService.localNode().masterNode()) {
|
||||
try {
|
||||
File latest = findLatestMetaStateVersion();
|
||||
if (latest != null) {
|
||||
logger.debug("[find_latest_state]: loading metadata from [{}]", latest.getAbsolutePath());
|
||||
this.currentMetaState = readMetaState(Streams.copyToByteArray(new FileInputStream(latest)));
|
||||
} else {
|
||||
logger.debug("[find_latest_state]: no metadata state loaded");
|
||||
}
|
||||
} catch (Exception e) {
|
||||
logger.warn("failed to read local state (metadata)", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private File findLatestMetaStateVersion() throws IOException {
|
||||
long index = -1;
|
||||
File latest = null;
|
||||
for (File dataLocation : nodeEnv.nodeDataLocations()) {
|
||||
File stateLocation = new File(dataLocation, "_state");
|
||||
if (!stateLocation.exists()) {
|
||||
continue;
|
||||
}
|
||||
File[] stateFiles = stateLocation.listFiles();
|
||||
if (stateFiles == null) {
|
||||
continue;
|
||||
}
|
||||
for (File stateFile : stateFiles) {
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("[find_latest_state]: processing [" + stateFile.getName() + "]");
|
||||
}
|
||||
String name = stateFile.getName();
|
||||
if (!name.startsWith("metadata-")) {
|
||||
continue;
|
||||
}
|
||||
long fileIndex = Long.parseLong(name.substring(name.indexOf('-') + 1));
|
||||
if (fileIndex >= index) {
|
||||
// try and read the meta data
|
||||
try {
|
||||
byte[] data = Streams.copyToByteArray(new FileInputStream(stateFile));
|
||||
if (data.length == 0) {
|
||||
logger.debug("[find_latest_state]: not data for [" + name + "], ignoring...");
|
||||
continue;
|
||||
}
|
||||
readMetaState(data);
|
||||
index = fileIndex;
|
||||
latest = stateFile;
|
||||
} catch (IOException e) {
|
||||
logger.warn("[find_latest_state]: failed to read state from [" + name + "], ignoring...", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return latest;
|
||||
}
|
||||
|
||||
private LocalGatewayMetaState readMetaState(byte[] data) throws IOException {
|
||||
XContentParser parser = null;
|
||||
try {
|
||||
if (LZF.isCompressed(data)) {
|
||||
BytesStreamInput siBytes = new BytesStreamInput(data, false);
|
||||
LZFStreamInput siLzf = CachedStreamInput.cachedLzf(siBytes);
|
||||
parser = XContentFactory.xContent(XContentType.JSON).createParser(siLzf);
|
||||
} else {
|
||||
parser = XContentFactory.xContent(XContentType.JSON).createParser(data);
|
||||
}
|
||||
return LocalGatewayMetaState.Builder.fromXContent(parser);
|
||||
} finally {
|
||||
if (parser != null) {
|
||||
parser.close();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
class PersistMetaData implements Runnable {
|
||||
private final ClusterChangedEvent event;
|
||||
|
||||
public PersistMetaData(ClusterChangedEvent event) {
|
||||
this.event = event;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
LocalGatewayMetaState.Builder builder = LocalGatewayMetaState.builder();
|
||||
if (currentMetaState != null) {
|
||||
builder.state(currentMetaState);
|
||||
}
|
||||
final long version = event.state().metaData().version();
|
||||
builder.version(version);
|
||||
builder.metaData(event.state().metaData());
|
||||
LocalGatewayMetaState stateToWrite = builder.build();
|
||||
|
||||
CachedStreamOutput.Entry cachedEntry = CachedStreamOutput.popEntry();
|
||||
StreamOutput streamOutput;
|
||||
try {
|
||||
try {
|
||||
if (compress) {
|
||||
streamOutput = cachedEntry.cachedLZFBytes();
|
||||
} else {
|
||||
streamOutput = cachedEntry.cachedBytes();
|
||||
}
|
||||
XContentBuilder xContentBuilder = XContentFactory.contentBuilder(XContentType.JSON, streamOutput);
|
||||
if (prettyPrint) {
|
||||
xContentBuilder.prettyPrint();
|
||||
}
|
||||
xContentBuilder.startObject();
|
||||
LocalGatewayMetaState.Builder.toXContent(stateToWrite, xContentBuilder, ToXContent.EMPTY_PARAMS);
|
||||
xContentBuilder.endObject();
|
||||
xContentBuilder.close();
|
||||
} catch (Exception e) {
|
||||
logger.warn("failed to serialize local gateway state", e);
|
||||
return;
|
||||
}
|
||||
|
||||
boolean serializedAtLeastOnce = false;
|
||||
for (File dataLocation : nodeEnv.nodeDataLocations()) {
|
||||
File stateLocation = new File(dataLocation, "_state");
|
||||
if (!stateLocation.exists()) {
|
||||
FileSystemUtils.mkdirs(stateLocation);
|
||||
}
|
||||
File stateFile = new File(stateLocation, "metadata-" + version);
|
||||
FileOutputStream fos = null;
|
||||
try {
|
||||
fos = new FileOutputStream(stateFile);
|
||||
fos.write(cachedEntry.bytes().underlyingBytes(), 0, cachedEntry.bytes().size());
|
||||
fos.getChannel().force(true);
|
||||
serializedAtLeastOnce = true;
|
||||
} catch (Exception e) {
|
||||
logger.warn("failed to write local gateway state to {}", e, stateFile);
|
||||
} finally {
|
||||
Closeables.closeQuietly(fos);
|
||||
}
|
||||
}
|
||||
if (serializedAtLeastOnce) {
|
||||
currentMetaState = stateToWrite;
|
||||
metaDataPersistedAtLeastOnce = true;
|
||||
|
||||
// delete all the other files
|
||||
for (File dataLocation : nodeEnv.nodeDataLocations()) {
|
||||
File stateLocation = new File(dataLocation, "_state");
|
||||
if (!stateLocation.exists()) {
|
||||
continue;
|
||||
}
|
||||
File[] files = stateLocation.listFiles(new FilenameFilter() {
|
||||
@Override
|
||||
public boolean accept(File dir, String name) {
|
||||
return name.startsWith("metadata-") && !name.equals("metadata-" + version);
|
||||
}
|
||||
});
|
||||
if (files != null) {
|
||||
for (File file : files) {
|
||||
file.delete();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
CachedStreamOutput.pushEntry(cachedEntry);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,131 +0,0 @@
|
|||
/*
|
||||
* Licensed to ElasticSearch and Shay Banon under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. ElasticSearch licenses this
|
||||
* file to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.gateway.local;
|
||||
|
||||
import org.elasticsearch.cluster.metadata.MetaData;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.common.xcontent.ToXContent;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.common.xcontent.XContentParser;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
public class LocalGatewayMetaState {
|
||||
|
||||
private final long version;
|
||||
|
||||
private final MetaData metaData;
|
||||
|
||||
public LocalGatewayMetaState(long version, MetaData metaData) {
|
||||
this.version = version;
|
||||
this.metaData = metaData;
|
||||
}
|
||||
|
||||
public long version() {
|
||||
return version;
|
||||
}
|
||||
|
||||
public MetaData metaData() {
|
||||
return metaData;
|
||||
}
|
||||
|
||||
public static Builder builder() {
|
||||
return new Builder();
|
||||
}
|
||||
|
||||
public static class Builder {
|
||||
|
||||
private long version;
|
||||
|
||||
private MetaData metaData;
|
||||
|
||||
public Builder state(LocalGatewayMetaState state) {
|
||||
this.version = state.version();
|
||||
this.metaData = state.metaData();
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder version(long version) {
|
||||
this.version = version;
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder metaData(MetaData metaData) {
|
||||
this.metaData = metaData;
|
||||
return this;
|
||||
}
|
||||
|
||||
public LocalGatewayMetaState build() {
|
||||
return new LocalGatewayMetaState(version, metaData);
|
||||
}
|
||||
|
||||
public static void toXContent(LocalGatewayMetaState state, XContentBuilder builder, ToXContent.Params params) throws IOException {
|
||||
builder.startObject("state");
|
||||
|
||||
builder.field("version", state.version());
|
||||
MetaData.Builder.toXContent(state.metaData(), builder, params);
|
||||
|
||||
builder.endObject();
|
||||
}
|
||||
|
||||
public static LocalGatewayMetaState fromXContent(XContentParser parser) throws IOException {
|
||||
Builder builder = new Builder();
|
||||
|
||||
String currentFieldName = null;
|
||||
XContentParser.Token token = parser.nextToken();
|
||||
if (token == null) {
|
||||
// no data...
|
||||
return builder.build();
|
||||
}
|
||||
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
|
||||
if (token == XContentParser.Token.FIELD_NAME) {
|
||||
currentFieldName = parser.currentName();
|
||||
} else if (token == XContentParser.Token.START_OBJECT) {
|
||||
if ("meta-data".equals(currentFieldName)) {
|
||||
builder.metaData = MetaData.Builder.fromXContent(parser);
|
||||
}
|
||||
} else if (token.isValue()) {
|
||||
if ("version".equals(currentFieldName)) {
|
||||
builder.version = parser.longValue();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return builder.build();
|
||||
}
|
||||
|
||||
public static LocalGatewayMetaState readFrom(StreamInput in) throws IOException {
|
||||
LocalGatewayMetaState.Builder builder = new Builder();
|
||||
builder.version = in.readLong();
|
||||
builder.metaData = MetaData.Builder.readFrom(in);
|
||||
return builder.build();
|
||||
}
|
||||
|
||||
public static void writeTo(LocalGatewayMetaState state, StreamOutput out) throws IOException {
|
||||
out.writeLong(state.version());
|
||||
MetaData.Builder.writeTo(state.metaData(), out);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
|
@ -24,6 +24,8 @@ import org.elasticsearch.common.inject.AbstractModule;
|
|||
import org.elasticsearch.common.inject.Module;
|
||||
import org.elasticsearch.common.inject.PreProcessModule;
|
||||
import org.elasticsearch.gateway.Gateway;
|
||||
import org.elasticsearch.gateway.local.state.meta.LocalGatewayMetaState;
|
||||
import org.elasticsearch.gateway.local.state.meta.TransportNodesListGatewayMetaState;
|
||||
import org.elasticsearch.gateway.local.state.shards.LocalGatewayShardsState;
|
||||
import org.elasticsearch.gateway.local.state.shards.TransportNodesListGatewayStartedShards;
|
||||
|
||||
|
@ -37,6 +39,7 @@ public class LocalGatewayModule extends AbstractModule implements PreProcessModu
|
|||
bind(Gateway.class).to(LocalGateway.class).asEagerSingleton();
|
||||
bind(LocalGatewayShardsState.class).asEagerSingleton();
|
||||
bind(TransportNodesListGatewayMetaState.class).asEagerSingleton();
|
||||
bind(LocalGatewayMetaState.class).asEagerSingleton();
|
||||
bind(TransportNodesListGatewayStartedShards.class).asEagerSingleton();
|
||||
}
|
||||
|
||||
|
|
|
@ -0,0 +1,454 @@
|
|||
/*
|
||||
* Licensed to ElasticSearch and Shay Banon under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. ElasticSearch licenses this
|
||||
* file to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.gateway.local.state.meta;
|
||||
|
||||
import com.google.common.io.Closeables;
|
||||
import org.elasticsearch.cluster.ClusterChangedEvent;
|
||||
import org.elasticsearch.cluster.ClusterStateListener;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||
import org.elasticsearch.cluster.metadata.MetaData;
|
||||
import org.elasticsearch.common.Nullable;
|
||||
import org.elasticsearch.common.component.AbstractComponent;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.io.FileSystemUtils;
|
||||
import org.elasticsearch.common.io.Streams;
|
||||
import org.elasticsearch.common.io.stream.CachedStreamOutput;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.common.xcontent.*;
|
||||
import org.elasticsearch.env.NodeEnvironment;
|
||||
import org.elasticsearch.index.Index;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.FileInputStream;
|
||||
import java.io.FileOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.util.Set;
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
public class LocalGatewayMetaState extends AbstractComponent implements ClusterStateListener {
|
||||
|
||||
private final NodeEnvironment nodeEnv;
|
||||
|
||||
private volatile MetaData currentMetaData;
|
||||
|
||||
@Inject
|
||||
public LocalGatewayMetaState(Settings settings, NodeEnvironment nodeEnv, TransportNodesListGatewayMetaState nodesListGatewayMetaState) throws Exception {
|
||||
super(settings);
|
||||
this.nodeEnv = nodeEnv;
|
||||
|
||||
nodesListGatewayMetaState.init(this);
|
||||
|
||||
try {
|
||||
pre019Upgrade();
|
||||
long start = System.currentTimeMillis();
|
||||
loadState();
|
||||
logger.debug("took {} to load state", TimeValue.timeValueMillis(System.currentTimeMillis() - start));
|
||||
} catch (Exception e) {
|
||||
logger.error("failed to read local state, exiting...", e);
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
||||
public MetaData currentMetaData() {
|
||||
return currentMetaData;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void clusterChanged(ClusterChangedEvent event) {
|
||||
if (event.state().blocks().disableStatePersistence()) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (!event.state().nodes().localNode().masterNode()) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (!event.metaDataChanged()) {
|
||||
return;
|
||||
}
|
||||
|
||||
// check if the global state changed?
|
||||
boolean success = true;
|
||||
if (currentMetaData == null || !MetaData.isGlobalStateEquals(currentMetaData, event.state().metaData())) {
|
||||
try {
|
||||
writeGlobalState("changed", event.state().metaData(), currentMetaData);
|
||||
} catch (Exception e) {
|
||||
success = false;
|
||||
}
|
||||
}
|
||||
|
||||
// check and write changes in indices
|
||||
for (IndexMetaData indexMetaData : event.state().metaData()) {
|
||||
String writeReason = null;
|
||||
IndexMetaData currentIndexMetaData = currentMetaData == null ? null : currentMetaData.index(indexMetaData.index());
|
||||
if (currentIndexMetaData == null) {
|
||||
writeReason = "freshly created";
|
||||
} else if (currentIndexMetaData.version() != indexMetaData.version()) {
|
||||
writeReason = "version changed from [" + currentIndexMetaData.version() + "] to [" + indexMetaData.version() + "]";
|
||||
}
|
||||
|
||||
// we update the writeReason only if we really need to write it
|
||||
if (writeReason == null) {
|
||||
continue;
|
||||
}
|
||||
|
||||
try {
|
||||
writeIndex(writeReason, indexMetaData, currentIndexMetaData);
|
||||
} catch (Exception e) {
|
||||
success = false;
|
||||
}
|
||||
}
|
||||
|
||||
// delete indices that are no longer there...
|
||||
if (currentMetaData != null) {
|
||||
for (IndexMetaData current : currentMetaData) {
|
||||
if (event.state().metaData().index(current.index()) == null) {
|
||||
deleteIndex(current.index());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (success) {
|
||||
currentMetaData = event.state().metaData();
|
||||
}
|
||||
}
|
||||
|
||||
private void deleteIndex(String index) {
|
||||
logger.trace("[{}] delete index state", index);
|
||||
File[] indexLocations = nodeEnv.indexLocations(new Index(index));
|
||||
for (File indexLocation : indexLocations) {
|
||||
if (!indexLocation.exists()) {
|
||||
continue;
|
||||
}
|
||||
FileSystemUtils.deleteRecursively(new File(indexLocation, "_state"));
|
||||
}
|
||||
}
|
||||
|
||||
private void writeIndex(String reason, IndexMetaData indexMetaData, @Nullable IndexMetaData previousIndexMetaData) throws Exception {
|
||||
logger.trace("[{}] writing state, reason [{}]", indexMetaData.index(), reason);
|
||||
CachedStreamOutput.Entry cachedEntry = CachedStreamOutput.popEntry();
|
||||
try {
|
||||
XContentBuilder builder = XContentFactory.contentBuilder(XContentType.JSON, cachedEntry.cachedBytes());
|
||||
builder.startObject();
|
||||
IndexMetaData.Builder.toXContent(indexMetaData, builder, ToXContent.EMPTY_PARAMS);
|
||||
builder.endObject();
|
||||
builder.flush();
|
||||
|
||||
Exception lastFailure = null;
|
||||
boolean wroteAtLeastOnce = false;
|
||||
for (File indexLocation : nodeEnv.indexLocations(new Index(indexMetaData.index()))) {
|
||||
File stateLocation = new File(indexLocation, "_state");
|
||||
FileSystemUtils.mkdirs(stateLocation);
|
||||
File stateFile = new File(stateLocation, "state-" + indexMetaData.version());
|
||||
|
||||
FileOutputStream fos = null;
|
||||
try {
|
||||
fos = new FileOutputStream(stateFile);
|
||||
fos.write(cachedEntry.bytes().underlyingBytes(), 0, cachedEntry.bytes().size());
|
||||
fos.getChannel().force(true);
|
||||
Closeables.closeQuietly(fos);
|
||||
wroteAtLeastOnce = true;
|
||||
} catch (Exception e) {
|
||||
lastFailure = e;
|
||||
} finally {
|
||||
Closeables.closeQuietly(fos);
|
||||
}
|
||||
}
|
||||
|
||||
if (!wroteAtLeastOnce) {
|
||||
logger.warn("[{}]: failed to state", lastFailure, indexMetaData.index());
|
||||
throw new IOException("failed to write state for [" + indexMetaData.index() + "]", lastFailure);
|
||||
}
|
||||
|
||||
// delete the old files
|
||||
if (previousIndexMetaData != null && previousIndexMetaData.version() != indexMetaData.version()) {
|
||||
for (File indexLocation : nodeEnv.indexLocations(new Index(indexMetaData.index()))) {
|
||||
File stateFile = new File(new File(indexLocation, "_state"), "state-" + previousIndexMetaData.version());
|
||||
stateFile.delete();
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
CachedStreamOutput.pushEntry(cachedEntry);
|
||||
}
|
||||
}
|
||||
|
||||
private void writeGlobalState(String reason, MetaData metaData, @Nullable MetaData previousMetaData) throws Exception {
|
||||
logger.trace("[_global] writing state, reason [{}]", reason);
|
||||
// create metadata to write with just the global state
|
||||
MetaData globalMetaData = MetaData.builder().metaData(metaData).removeAllIndices().build();
|
||||
|
||||
CachedStreamOutput.Entry cachedEntry = CachedStreamOutput.popEntry();
|
||||
try {
|
||||
XContentBuilder builder = XContentFactory.contentBuilder(XContentType.JSON, cachedEntry.cachedBytes());
|
||||
builder.startObject();
|
||||
MetaData.Builder.toXContent(globalMetaData, builder, ToXContent.EMPTY_PARAMS);
|
||||
builder.endObject();
|
||||
builder.flush();
|
||||
|
||||
Exception lastFailure = null;
|
||||
boolean wroteAtLeastOnce = false;
|
||||
for (File dataLocation : nodeEnv.nodeDataLocations()) {
|
||||
File stateLocation = new File(dataLocation, "_state");
|
||||
FileSystemUtils.mkdirs(stateLocation);
|
||||
File stateFile = new File(stateLocation, "global-" + globalMetaData.version());
|
||||
|
||||
FileOutputStream fos = null;
|
||||
try {
|
||||
fos = new FileOutputStream(stateFile);
|
||||
fos.write(cachedEntry.bytes().underlyingBytes(), 0, cachedEntry.bytes().size());
|
||||
fos.getChannel().force(true);
|
||||
Closeables.closeQuietly(fos);
|
||||
wroteAtLeastOnce = true;
|
||||
} catch (Exception e) {
|
||||
lastFailure = e;
|
||||
} finally {
|
||||
Closeables.closeQuietly(fos);
|
||||
}
|
||||
}
|
||||
|
||||
if (!wroteAtLeastOnce) {
|
||||
logger.warn("[_global]: failed to write global state", lastFailure);
|
||||
throw new IOException("failed to write global state", lastFailure);
|
||||
}
|
||||
|
||||
// delete the old files
|
||||
if (previousMetaData != null && previousMetaData.version() != currentMetaData.version()) {
|
||||
for (File dataLocation : nodeEnv.nodeDataLocations()) {
|
||||
File stateFile = new File(new File(dataLocation, "_state"), "global-" + previousMetaData.version());
|
||||
stateFile.delete();
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
CachedStreamOutput.pushEntry(cachedEntry);
|
||||
}
|
||||
}
|
||||
|
||||
private void loadState() throws Exception {
|
||||
MetaData.Builder metaDataBuilder = MetaData.builder();
|
||||
MetaData globalMetaData = loadGlobalState();
|
||||
if (globalMetaData != null) {
|
||||
metaDataBuilder.metaData(globalMetaData);
|
||||
}
|
||||
|
||||
Set<String> indices = nodeEnv.finalAllIndices();
|
||||
for (String index : indices) {
|
||||
IndexMetaData indexMetaData = loadIndex(index);
|
||||
if (indexMetaData == null) {
|
||||
logger.debug("[{}] failed to find metadata for existing index location", index);
|
||||
} else {
|
||||
metaDataBuilder.put(indexMetaData, false);
|
||||
}
|
||||
}
|
||||
currentMetaData = metaDataBuilder.build();
|
||||
}
|
||||
|
||||
private IndexMetaData loadIndex(String index) {
|
||||
long highestVersion = -1;
|
||||
IndexMetaData indexMetaData = null;
|
||||
for (File indexLocation : nodeEnv.indexLocations(new Index(index))) {
|
||||
File stateDir = new File(indexLocation, "_state");
|
||||
if (!stateDir.exists() || !stateDir.isDirectory()) {
|
||||
continue;
|
||||
}
|
||||
// now, iterate over the current versions, and find latest one
|
||||
File[] stateFiles = stateDir.listFiles();
|
||||
if (stateFiles == null) {
|
||||
continue;
|
||||
}
|
||||
for (File stateFile : stateFiles) {
|
||||
if (!stateFile.getName().startsWith("state-")) {
|
||||
continue;
|
||||
}
|
||||
try {
|
||||
long version = Long.parseLong(stateFile.getName().substring("state-".length()));
|
||||
if (version > highestVersion) {
|
||||
byte[] data = Streams.copyToByteArray(new FileInputStream(stateFile));
|
||||
if (data.length == 0) {
|
||||
logger.debug("[{}]: no data for [" + stateFile.getAbsolutePath() + "], ignoring...", index);
|
||||
continue;
|
||||
}
|
||||
XContentParser parser = null;
|
||||
try {
|
||||
parser = XContentHelper.createParser(data, 0, data.length);
|
||||
parser.nextToken(); // move to START_OBJECT
|
||||
indexMetaData = IndexMetaData.Builder.fromXContent(parser);
|
||||
highestVersion = version;
|
||||
} finally {
|
||||
if (parser != null) {
|
||||
parser.close();
|
||||
}
|
||||
}
|
||||
}
|
||||
} catch (Exception e) {
|
||||
logger.debug("[{}]: failed to read [" + stateFile.getAbsolutePath() + "], ignoring...", e, index);
|
||||
}
|
||||
}
|
||||
}
|
||||
return indexMetaData;
|
||||
}
|
||||
|
||||
private MetaData loadGlobalState() {
|
||||
long highestVersion = -1;
|
||||
MetaData metaData = null;
|
||||
for (File dataLocation : nodeEnv.nodeDataLocations()) {
|
||||
File stateLocation = new File(dataLocation, "_state");
|
||||
if (!stateLocation.exists()) {
|
||||
continue;
|
||||
}
|
||||
File[] stateFiles = stateLocation.listFiles();
|
||||
if (stateFiles == null) {
|
||||
continue;
|
||||
}
|
||||
for (File stateFile : stateFiles) {
|
||||
String name = stateFile.getName();
|
||||
if (!name.startsWith("global-")) {
|
||||
continue;
|
||||
}
|
||||
try {
|
||||
long version = Long.parseLong(stateFile.getName().substring("global-".length()));
|
||||
if (version > highestVersion) {
|
||||
byte[] data = Streams.copyToByteArray(new FileInputStream(stateFile));
|
||||
if (data.length == 0) {
|
||||
logger.debug("[_global] no data for [" + stateFile.getAbsolutePath() + "], ignoring...");
|
||||
continue;
|
||||
}
|
||||
|
||||
XContentParser parser = null;
|
||||
try {
|
||||
parser = XContentHelper.createParser(data, 0, data.length);
|
||||
metaData = MetaData.Builder.fromXContent(parser);
|
||||
highestVersion = version;
|
||||
} finally {
|
||||
if (parser != null) {
|
||||
parser.close();
|
||||
}
|
||||
}
|
||||
}
|
||||
} catch (Exception e) {
|
||||
logger.debug("");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return metaData;
|
||||
}
|
||||
|
||||
private void pre019Upgrade() throws Exception {
|
||||
long index = -1;
|
||||
File metaDataFile = null;
|
||||
MetaData metaData = null;
|
||||
long version = -1;
|
||||
for (File dataLocation : nodeEnv.nodeDataLocations()) {
|
||||
File stateLocation = new File(dataLocation, "_state");
|
||||
if (!stateLocation.exists()) {
|
||||
continue;
|
||||
}
|
||||
File[] stateFiles = stateLocation.listFiles();
|
||||
if (stateFiles == null) {
|
||||
continue;
|
||||
}
|
||||
for (File stateFile : stateFiles) {
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("[upgrade]: processing [" + stateFile.getName() + "]");
|
||||
}
|
||||
String name = stateFile.getName();
|
||||
if (!name.startsWith("metadata-")) {
|
||||
continue;
|
||||
}
|
||||
long fileIndex = Long.parseLong(name.substring(name.indexOf('-') + 1));
|
||||
if (fileIndex >= index) {
|
||||
// try and read the meta data
|
||||
try {
|
||||
byte[] data = Streams.copyToByteArray(new FileInputStream(stateFile));
|
||||
if (data.length == 0) {
|
||||
continue;
|
||||
}
|
||||
XContentParser parser = XContentHelper.createParser(data, 0, data.length);
|
||||
try {
|
||||
String currentFieldName = null;
|
||||
XContentParser.Token token = parser.nextToken();
|
||||
if (token != null) {
|
||||
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
|
||||
if (token == XContentParser.Token.FIELD_NAME) {
|
||||
currentFieldName = parser.currentName();
|
||||
} else if (token == XContentParser.Token.START_OBJECT) {
|
||||
if ("meta-data".equals(currentFieldName)) {
|
||||
metaData = MetaData.Builder.fromXContent(parser);
|
||||
}
|
||||
} else if (token.isValue()) {
|
||||
if ("version".equals(currentFieldName)) {
|
||||
version = parser.longValue();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
parser.close();
|
||||
}
|
||||
index = fileIndex;
|
||||
metaDataFile = stateFile;
|
||||
} catch (IOException e) {
|
||||
logger.warn("failed to read pre 0.19 state from [" + name + "], ignoring...", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
if (metaData == null) {
|
||||
return;
|
||||
}
|
||||
|
||||
logger.info("found old metadata state, loading metadata from [{}] and converting to new metadata location and strucutre...", metaDataFile.getAbsolutePath());
|
||||
|
||||
writeGlobalState("upgrade", MetaData.builder().metaData(metaData).version(version).build(), null);
|
||||
for (IndexMetaData indexMetaData : metaData) {
|
||||
writeIndex("upgrade", IndexMetaData.newIndexMetaDataBuilder(indexMetaData).version(version).build(), null);
|
||||
}
|
||||
|
||||
// rename shards state to backup state
|
||||
File backupFile = new File(metaDataFile.getParentFile(), "backup-" + metaDataFile.getName());
|
||||
if (!metaDataFile.renameTo(backupFile)) {
|
||||
throw new IOException("failed to rename old state to backup state [" + metaDataFile.getAbsolutePath() + "]");
|
||||
}
|
||||
|
||||
// delete all other shards state files
|
||||
for (File dataLocation : nodeEnv.nodeDataLocations()) {
|
||||
File stateLocation = new File(dataLocation, "_state");
|
||||
if (!stateLocation.exists()) {
|
||||
continue;
|
||||
}
|
||||
File[] stateFiles = stateLocation.listFiles();
|
||||
if (stateFiles == null) {
|
||||
continue;
|
||||
}
|
||||
for (File stateFile : stateFiles) {
|
||||
String name = stateFile.getName();
|
||||
if (!name.startsWith("metadata-")) {
|
||||
continue;
|
||||
}
|
||||
stateFile.delete();
|
||||
}
|
||||
}
|
||||
|
||||
logger.info("conversion to new metadata location and format done, backup create at [{}]", backupFile.getAbsolutePath());
|
||||
}
|
||||
}
|
|
@ -17,7 +17,7 @@
|
|||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.gateway.local;
|
||||
package org.elasticsearch.gateway.local.state.meta;
|
||||
|
||||
import com.google.common.collect.Lists;
|
||||
import org.elasticsearch.ElasticSearchException;
|
||||
|
@ -26,6 +26,7 @@ import org.elasticsearch.action.FailedNodeException;
|
|||
import org.elasticsearch.action.support.nodes.*;
|
||||
import org.elasticsearch.cluster.ClusterName;
|
||||
import org.elasticsearch.cluster.ClusterService;
|
||||
import org.elasticsearch.cluster.metadata.MetaData;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.common.Nullable;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
|
@ -46,15 +47,15 @@ import java.util.concurrent.atomic.AtomicReferenceArray;
|
|||
*/
|
||||
public class TransportNodesListGatewayMetaState extends TransportNodesOperationAction<TransportNodesListGatewayMetaState.Request, TransportNodesListGatewayMetaState.NodesLocalGatewayMetaState, TransportNodesListGatewayMetaState.NodeRequest, TransportNodesListGatewayMetaState.NodeLocalGatewayMetaState> {
|
||||
|
||||
private LocalGateway gateway;
|
||||
private LocalGatewayMetaState metaState;
|
||||
|
||||
@Inject
|
||||
public TransportNodesListGatewayMetaState(Settings settings, ClusterName clusterName, ThreadPool threadPool, ClusterService clusterService, TransportService transportService) {
|
||||
super(settings, clusterName, threadPool, clusterService, transportService);
|
||||
}
|
||||
|
||||
TransportNodesListGatewayMetaState initGateway(LocalGateway gateway) {
|
||||
this.gateway = gateway;
|
||||
TransportNodesListGatewayMetaState init(LocalGatewayMetaState metaState) {
|
||||
this.metaState = metaState;
|
||||
return this;
|
||||
}
|
||||
|
||||
|
@ -115,7 +116,7 @@ public class TransportNodesListGatewayMetaState extends TransportNodesOperationA
|
|||
|
||||
@Override
|
||||
protected NodeLocalGatewayMetaState nodeOperation(NodeRequest request) throws ElasticSearchException {
|
||||
return new NodeLocalGatewayMetaState(clusterService.localNode(), gateway.currentMetaState());
|
||||
return new NodeLocalGatewayMetaState(clusterService.localNode(), metaState.currentMetaData());
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -208,36 +209,36 @@ public class TransportNodesListGatewayMetaState extends TransportNodesOperationA
|
|||
|
||||
public static class NodeLocalGatewayMetaState extends NodeOperationResponse {
|
||||
|
||||
private LocalGatewayMetaState state;
|
||||
private MetaData metaData;
|
||||
|
||||
NodeLocalGatewayMetaState() {
|
||||
}
|
||||
|
||||
public NodeLocalGatewayMetaState(DiscoveryNode node, LocalGatewayMetaState state) {
|
||||
public NodeLocalGatewayMetaState(DiscoveryNode node, MetaData metaData) {
|
||||
super(node);
|
||||
this.state = state;
|
||||
this.metaData = metaData;
|
||||
}
|
||||
|
||||
public LocalGatewayMetaState state() {
|
||||
return state;
|
||||
public MetaData metaData() {
|
||||
return metaData;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void readFrom(StreamInput in) throws IOException {
|
||||
super.readFrom(in);
|
||||
if (in.readBoolean()) {
|
||||
state = LocalGatewayMetaState.Builder.readFrom(in);
|
||||
metaData = MetaData.Builder.readFrom(in);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeTo(StreamOutput out) throws IOException {
|
||||
super.writeTo(out);
|
||||
if (state == null) {
|
||||
if (metaData == null) {
|
||||
out.writeBoolean(false);
|
||||
} else {
|
||||
out.writeBoolean(true);
|
||||
LocalGatewayMetaState.Builder.writeTo(state, out);
|
||||
MetaData.Builder.writeTo(metaData, out);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -22,7 +22,6 @@ package org.elasticsearch.gateway.local.state.shards;
|
|||
import com.google.common.collect.Maps;
|
||||
import com.google.common.io.Closeables;
|
||||
import org.elasticsearch.cluster.ClusterChangedEvent;
|
||||
import org.elasticsearch.cluster.ClusterService;
|
||||
import org.elasticsearch.cluster.ClusterStateListener;
|
||||
import org.elasticsearch.cluster.routing.*;
|
||||
import org.elasticsearch.common.Nullable;
|
||||
|
@ -37,10 +36,7 @@ import org.elasticsearch.common.io.stream.CachedStreamOutput;
|
|||
import org.elasticsearch.common.io.stream.LZFStreamInput;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.common.xcontent.XContentFactory;
|
||||
import org.elasticsearch.common.xcontent.XContentParser;
|
||||
import org.elasticsearch.common.xcontent.XContentType;
|
||||
import org.elasticsearch.common.xcontent.*;
|
||||
import org.elasticsearch.env.NodeEnvironment;
|
||||
import org.elasticsearch.index.shard.ShardId;
|
||||
|
||||
|
@ -57,21 +53,27 @@ import java.util.Set;
|
|||
public class LocalGatewayShardsState extends AbstractComponent implements ClusterStateListener {
|
||||
|
||||
private final NodeEnvironment nodeEnv;
|
||||
private final ClusterService clusterService;
|
||||
|
||||
private volatile boolean initialized = false;
|
||||
private volatile Map<ShardId, ShardStateInfo> currentState = Maps.newHashMap();
|
||||
|
||||
@Inject
|
||||
public LocalGatewayShardsState(Settings settings, NodeEnvironment nodeEnv, ClusterService clusterService, TransportNodesListGatewayStartedShards listGatewayStartedShards) {
|
||||
public LocalGatewayShardsState(Settings settings, NodeEnvironment nodeEnv, TransportNodesListGatewayStartedShards listGatewayStartedShards) throws Exception {
|
||||
super(settings);
|
||||
this.nodeEnv = nodeEnv;
|
||||
this.clusterService = clusterService;
|
||||
listGatewayStartedShards.initGateway(this);
|
||||
|
||||
try {
|
||||
pre019Upgrade();
|
||||
long start = System.currentTimeMillis();
|
||||
loadStartedShards();
|
||||
logger.debug("took {} to load started shards state", TimeValue.timeValueMillis(System.currentTimeMillis() - start));
|
||||
} catch (Exception e) {
|
||||
logger.error("failed to read local state (started shards), exiting...", e);
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
||||
public Map<ShardId, ShardStateInfo> currentStartedShards() {
|
||||
lazyInitialize();
|
||||
return this.currentState;
|
||||
}
|
||||
|
||||
|
@ -160,30 +162,6 @@ public class LocalGatewayShardsState extends AbstractComponent implements Cluste
|
|||
this.currentState = newState;
|
||||
}
|
||||
|
||||
private synchronized void lazyInitialize() {
|
||||
if (initialized) {
|
||||
return;
|
||||
}
|
||||
initialized = true;
|
||||
|
||||
// we only persist shards state for data nodes
|
||||
if (!clusterService.localNode().dataNode()) {
|
||||
return;
|
||||
}
|
||||
|
||||
try {
|
||||
pre019Upgrade();
|
||||
long start = System.currentTimeMillis();
|
||||
loadStartedShards();
|
||||
logger.debug("took {} to load started shards state", TimeValue.timeValueMillis(System.currentTimeMillis() - start));
|
||||
} catch (Exception e) {
|
||||
logger.error("failed to read local state (started shards), exiting...", e);
|
||||
// ugly, but, if we fail to read it, bail completely so we don't have any node corrupting the cluster
|
||||
System.exit(1);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
private void loadStartedShards() throws Exception {
|
||||
Set<ShardId> shardIds = nodeEnv.findAllShardIds();
|
||||
long highestVersion = -1;
|
||||
|
@ -244,13 +222,7 @@ public class LocalGatewayShardsState extends AbstractComponent implements Cluste
|
|||
private long readShardState(byte[] data) throws Exception {
|
||||
XContentParser parser = null;
|
||||
try {
|
||||
if (LZF.isCompressed(data)) {
|
||||
BytesStreamInput siBytes = new BytesStreamInput(data, false);
|
||||
LZFStreamInput siLzf = CachedStreamInput.cachedLzf(siBytes);
|
||||
parser = XContentFactory.xContent(XContentType.JSON).createParser(siLzf);
|
||||
} else {
|
||||
parser = XContentFactory.xContent(XContentType.JSON).createParser(data);
|
||||
}
|
||||
parser = XContentHelper.createParser(data, 0, data.length);
|
||||
XContentParser.Token token = parser.nextToken();
|
||||
if (token == null) {
|
||||
return -1;
|
||||
|
@ -313,7 +285,7 @@ public class LocalGatewayShardsState extends AbstractComponent implements Cluste
|
|||
}
|
||||
|
||||
// delete the old files
|
||||
if (previousStateInfo != null) {
|
||||
if (previousStateInfo != null && previousStateInfo.version != shardStateInfo.version) {
|
||||
for (File shardLocation : nodeEnv.shardLocations(shardId)) {
|
||||
File stateFile = new File(new File(shardLocation, "_state"), "state-" + previousStateInfo.version);
|
||||
stateFile.delete();
|
||||
|
@ -361,13 +333,13 @@ public class LocalGatewayShardsState extends AbstractComponent implements Cluste
|
|||
try {
|
||||
byte[] data = Streams.copyToByteArray(new FileInputStream(stateFile));
|
||||
if (data.length == 0) {
|
||||
logger.debug("[find_latest_state]: not data for [" + name + "], ignoring...");
|
||||
logger.debug("[upgrade]: not data for [" + name + "], ignoring...");
|
||||
}
|
||||
pre09ReadState(data);
|
||||
index = fileIndex;
|
||||
latest = stateFile;
|
||||
} catch (IOException e) {
|
||||
logger.warn("[find_latest_state]: failed to read state from [" + name + "], ignoring...", e);
|
||||
logger.warn("[upgrade]: failed to read state from [" + name + "], ignoring...", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -400,9 +372,6 @@ public class LocalGatewayShardsState extends AbstractComponent implements Cluste
|
|||
continue;
|
||||
}
|
||||
for (File stateFile : stateFiles) {
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("[find_latest_state]: processing [" + stateFile.getName() + "]");
|
||||
}
|
||||
String name = stateFile.getName();
|
||||
if (!name.startsWith("shards-")) {
|
||||
continue;
|
||||
|
|
|
@ -20,6 +20,7 @@
|
|||
package org.elasticsearch.gateway.shared;
|
||||
|
||||
import org.elasticsearch.ElasticSearchException;
|
||||
import org.elasticsearch.ExceptionsHelper;
|
||||
import org.elasticsearch.cluster.ClusterChangedEvent;
|
||||
import org.elasticsearch.cluster.ClusterService;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
|
@ -53,16 +54,20 @@ public abstract class SharedStorageGateway extends AbstractLifecycleComponent<Ga
|
|||
super(settings);
|
||||
this.threadPool = threadPool;
|
||||
this.clusterService = clusterService;
|
||||
this.writeStateExecutor = newSingleThreadExecutor(daemonThreadFactory(settings, "gateway#writeMetaData"));
|
||||
clusterService.add(this);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doStart() throws ElasticSearchException {
|
||||
clusterService.add(this);
|
||||
this.writeStateExecutor = newSingleThreadExecutor(daemonThreadFactory(settings, "gateway#writeMetaData"));
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doStop() throws ElasticSearchException {
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doClose() throws ElasticSearchException {
|
||||
clusterService.remove(this);
|
||||
writeStateExecutor.shutdown();
|
||||
try {
|
||||
|
@ -72,10 +77,6 @@ public abstract class SharedStorageGateway extends AbstractLifecycleComponent<Ga
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doClose() throws ElasticSearchException {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void performStateRecovery(final GatewayStateRecoveredListener listener) throws GatewayException {
|
||||
threadPool.cached().execute(new Runnable() {
|
||||
|
@ -95,7 +96,7 @@ public abstract class SharedStorageGateway extends AbstractLifecycleComponent<Ga
|
|||
}
|
||||
} catch (Exception e) {
|
||||
logger.error("failed to read from gateway", e);
|
||||
listener.onFailure(e);
|
||||
listener.onFailure(ExceptionsHelper.detailedMessage(e));
|
||||
}
|
||||
}
|
||||
});
|
||||
|
|
|
@ -88,6 +88,10 @@ public abstract class AbstractNodesTests {
|
|||
// default to non gateway
|
||||
finalSettings = settingsBuilder().put(finalSettings).put("gateway.type", "none").build();
|
||||
}
|
||||
if (finalSettings.get("cluster.routing.schedule") != null) {
|
||||
// decrease the routing schedule so new nodes will be added quickly
|
||||
finalSettings = settingsBuilder().put(finalSettings).put("cluster.routing.schedule", "50ms").build();
|
||||
}
|
||||
|
||||
Node node = nodeBuilder()
|
||||
.settings(finalSettings)
|
||||
|
|
|
@ -124,7 +124,7 @@ public class MinimumMasterNodesTests extends AbstractZenNodesTests {
|
|||
logger.info("--> starting the previous master node again...");
|
||||
startNode(masterNodeName, settings);
|
||||
|
||||
clusterHealthResponse = client("node1").admin().cluster().prepareHealth().setWaitForNodes("2").execute().actionGet();
|
||||
clusterHealthResponse = client("node1").admin().cluster().prepareHealth().setWaitForYellowStatus().setWaitForNodes("2").execute().actionGet();
|
||||
assertThat(clusterHealthResponse.timedOut(), equalTo(false));
|
||||
|
||||
state = client("node1").admin().cluster().prepareState().setLocal(true).execute().actionGet().state();
|
||||
|
|
Loading…
Reference in New Issue