diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/common/io/stream/InputStreamStreamInput.java b/modules/elasticsearch/src/main/java/org/elasticsearch/common/io/stream/InputStreamStreamInput.java new file mode 100644 index 00000000000..056bad1af15 --- /dev/null +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/common/io/stream/InputStreamStreamInput.java @@ -0,0 +1,79 @@ +/* + * Licensed to Elastic Search and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Elastic Search licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.common.io.stream; + +import java.io.EOFException; +import java.io.IOException; +import java.io.InputStream; + +/** + * @author kimchy (shay.banon) + */ +public class InputStreamStreamInput extends StreamInput { + + private final InputStream is; + + public InputStreamStreamInput(InputStream is) { + this.is = is; + } + + @Override public byte readByte() throws IOException { + int ch = is.read(); + if (ch < 0) + throw new EOFException(); + return (byte) (ch); + } + + @Override public void readBytes(byte[] b, int offset, int len) throws IOException { + if (len < 0) + throw new IndexOutOfBoundsException(); + int n = 0; + while (n < len) { + int count = is.read(b, offset + n, len - n); + if (count < 0) + throw new EOFException(); + n += count; + } + } + + @Override public void reset() throws IOException { + is.reset(); + } + + @Override public void close() throws IOException { + is.close(); + } + + @Override public int read() throws IOException { + return is.read(); + } + + @Override public int read(byte[] b) throws IOException { + return is.read(b); + } + + @Override public int read(byte[] b, int off, int len) throws IOException { + return is.read(b, off, len); + } + + @Override public long skip(long n) throws IOException { + return is.skip(n); + } +} diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/local/LocalGateway.java b/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/local/LocalGateway.java index 6929517ea89..a9fa680fcef 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/local/LocalGateway.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/local/LocalGateway.java @@ -47,7 +47,7 @@ import org.elasticsearch.common.xcontent.builder.BinaryXContentBuilder; import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.gateway.Gateway; import org.elasticsearch.gateway.GatewayException; -import org.elasticsearch.index.gateway.none.NoneIndexGatewayModule; +import org.elasticsearch.index.gateway.local.LocalIndexGatewayModule; import org.elasticsearch.threadpool.ThreadPool; import java.io.*; @@ -195,7 +195,7 @@ public class LocalGateway extends AbstractLifecycleComponent implements } @Override public Class suggestIndexGateway() { - return NoneIndexGatewayModule.class; + return LocalIndexGatewayModule.class; } @Override public void reset() throws Exception { diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/local/LocalGatewayNodeAllocation.java b/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/local/LocalGatewayNodeAllocation.java index 01eb0dbc49e..40e5c712246 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/local/LocalGatewayNodeAllocation.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/local/LocalGatewayNodeAllocation.java @@ -21,10 +21,7 @@ package org.elasticsearch.gateway.local; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; -import org.elasticsearch.cluster.routing.IndexRoutingTable; -import org.elasticsearch.cluster.routing.MutableShardRouting; -import org.elasticsearch.cluster.routing.RoutingNode; -import org.elasticsearch.cluster.routing.RoutingNodes; +import org.elasticsearch.cluster.routing.*; import org.elasticsearch.cluster.routing.allocation.NodeAllocation; import org.elasticsearch.cluster.routing.allocation.NodeAllocations; import org.elasticsearch.common.collect.Maps; @@ -35,6 +32,7 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.shard.ShardId; import java.util.Iterator; +import java.util.List; import java.util.Map; import java.util.Set; @@ -50,6 +48,10 @@ public class LocalGatewayNodeAllocation extends NodeAllocation { this.listGatewayState = listGatewayState; } + @Override public void applyFailedShards(NodeAllocations nodeAllocations, RoutingNodes routingNodes, DiscoveryNodes nodes, List failedShards) { + // TODO when a shard failed and we in the initial allocation, find an existing one + } + @Override public boolean allocateUnassigned(NodeAllocations nodeAllocations, RoutingNodes routingNodes, DiscoveryNodes nodes) { boolean changed = false; @@ -105,6 +107,8 @@ public class LocalGatewayNodeAllocation extends NodeAllocation { } } + // TODO optimize replica allocation to existing work locations + return changed; } } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/local/LocalIndexGateway.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/local/LocalIndexGateway.java new file mode 100644 index 00000000000..e93ea309b63 --- /dev/null +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/local/LocalIndexGateway.java @@ -0,0 +1,53 @@ +/* + * Licensed to Elastic Search and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Elastic Search licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.index.gateway.local; + +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.index.AbstractIndexComponent; +import org.elasticsearch.index.Index; +import org.elasticsearch.index.gateway.IndexGateway; +import org.elasticsearch.index.gateway.IndexShardGateway; +import org.elasticsearch.index.settings.IndexSettings; + +/** + * @author kimchy (shay.banon) + */ +public class LocalIndexGateway extends AbstractIndexComponent implements IndexGateway { + + @Inject public LocalIndexGateway(Index index, @IndexSettings Settings indexSettings) { + super(index, indexSettings); + } + + @Override public String type() { + return "local"; + } + + @Override public Class shardGatewayClass() { + return LocalIndexShardGateway.class; + } + + @Override public String toString() { + return "local"; + } + + @Override public void close(boolean delete) { + } +} diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/local/LocalIndexGatewayModule.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/local/LocalIndexGatewayModule.java new file mode 100644 index 00000000000..b353aa94281 --- /dev/null +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/local/LocalIndexGatewayModule.java @@ -0,0 +1,33 @@ +/* + * Licensed to Elastic Search and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Elastic Search licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.index.gateway.local; + +import org.elasticsearch.common.inject.AbstractModule; +import org.elasticsearch.index.gateway.IndexGateway; + +/** + * @author kimchy (shay.banon) + */ +public class LocalIndexGatewayModule extends AbstractModule { + + @Override protected void configure() { + bind(IndexGateway.class).to(LocalIndexGateway.class).asEagerSingleton(); + } +} diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/local/LocalIndexShardGateway.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/local/LocalIndexShardGateway.java new file mode 100644 index 00000000000..01c02e734c0 --- /dev/null +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/local/LocalIndexShardGateway.java @@ -0,0 +1,157 @@ +/* + * Licensed to Elastic Search and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Elastic Search licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.index.gateway.local; + +import org.apache.lucene.index.IndexReader; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.io.stream.InputStreamStreamInput; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.gateway.none.NoneGateway; +import org.elasticsearch.index.gateway.IndexShardGateway; +import org.elasticsearch.index.gateway.IndexShardGatewayRecoveryException; +import org.elasticsearch.index.gateway.RecoveryStatus; +import org.elasticsearch.index.gateway.SnapshotStatus; +import org.elasticsearch.index.settings.IndexSettings; +import org.elasticsearch.index.shard.AbstractIndexShardComponent; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.index.shard.service.IndexShard; +import org.elasticsearch.index.shard.service.InternalIndexShard; +import org.elasticsearch.index.translog.Translog; +import org.elasticsearch.index.translog.TranslogStreams; +import org.elasticsearch.index.translog.fs.FsTranslog; + +import java.io.EOFException; +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; + +/** + * @author kimchy (shay.banon) + */ +public class LocalIndexShardGateway extends AbstractIndexShardComponent implements IndexShardGateway { + + private final InternalIndexShard indexShard; + + private final RecoveryStatus recoveryStatus = new RecoveryStatus(); + + @Inject public LocalIndexShardGateway(ShardId shardId, @IndexSettings Settings indexSettings, IndexShard indexShard) { + super(shardId, indexSettings); + this.indexShard = (InternalIndexShard) indexShard; + } + + @Override public String toString() { + return "local"; + } + + @Override public RecoveryStatus recoveryStatus() { + return recoveryStatus; + } + + @Override public void recover(RecoveryStatus recoveryStatus) throws IndexShardGatewayRecoveryException { + recoveryStatus().index().startTime(System.currentTimeMillis()); + // read the gateway data persisted + long version = -1; + try { + if (IndexReader.indexExists(indexShard.store().directory())) { + version = IndexReader.getCurrentVersion(indexShard.store().directory()); + } + } catch (IOException e) { + throw new IndexShardGatewayRecoveryException(shardId(), "Failed to fetch index version after copying it over", e); + } + recoveryStatus.index().updateVersion(version); + recoveryStatus.index().time(System.currentTimeMillis() - recoveryStatus.index().startTime()); + + recoveryStatus.translog().startTime(System.currentTimeMillis()); + if (version == -1) { + // no translog files, bail + indexShard.start(); + // no index, just start the shard and bail + recoveryStatus.translog().time(System.currentTimeMillis() - recoveryStatus.index().startTime()); + return; + } + + // move an existing translog, if exists, to "recovering" state, and start reading from it + FsTranslog translog = (FsTranslog) indexShard.translog(); + File recoveringTranslogFile = new File(translog.location(), "translog-" + version + ".recovering"); + if (!recoveringTranslogFile.exists()) { + File translogFile = new File(translog.location(), "translog-" + version); + if (translogFile.exists()) { + for (int i = 0; i < 3; i++) { + if (translogFile.renameTo(recoveringTranslogFile)) { + break; + } + } + } + } + + if (!recoveringTranslogFile.exists()) { + // no translog to recovery from, start and bail + // no translog files, bail + indexShard.start(); + // no index, just start the shard and bail + recoveryStatus.translog().time(System.currentTimeMillis() - recoveryStatus.index().startTime()); + return; + } + + // recover from the translog file + indexShard.performRecoveryPrepareForTranslog(); + try { + InputStreamStreamInput si = new InputStreamStreamInput(new FileInputStream(recoveringTranslogFile)); + while (true) { + int opSize = si.readInt(); + Translog.Operation operation = TranslogStreams.readTranslogOperation(si); + recoveryStatus.translog().addTranslogOperations(1); + indexShard.performRecoveryOperation(operation); + } + } catch (EOFException e) { + // ignore this exception, its fine + } catch (IOException e) { + // ignore this as well + } + indexShard.performRecoveryFinalization(true); + + recoveringTranslogFile.delete(); + + recoveryStatus.translog().time(System.currentTimeMillis() - recoveryStatus.index().startTime()); + } + + @Override public String type() { + return NoneGateway.TYPE; + } + + @Override public SnapshotStatus snapshot(Snapshot snapshot) { + return null; + } + + @Override public SnapshotStatus lastSnapshotStatus() { + return null; + } + + @Override public SnapshotStatus currentSnapshotStatus() { + return null; + } + + @Override public boolean requiresSnapshotScheduling() { + return false; + } + + @Override public void close(boolean delete) { + } +} diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/translog/fs/FsTranslog.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/translog/fs/FsTranslog.java index 03d951dc8b8..89e2f70153b 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/translog/fs/FsTranslog.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/translog/fs/FsTranslog.java @@ -75,6 +75,10 @@ public class FsTranslog extends AbstractIndexShardComponent implements Translog this.useStream = useStream; } + public File location() { + return location; + } + @Override public long currentId() { return this.id; }