more work on getting local gateway to work, apply transaction log operations when primary "recovering"

This commit is contained in:
kimchy 2010-08-29 16:56:04 +03:00
parent 4f4471483d
commit 269616f35e
7 changed files with 336 additions and 6 deletions

View File

@ -0,0 +1,79 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.common.io.stream;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
/**
* @author kimchy (shay.banon)
*/
public class InputStreamStreamInput extends StreamInput {
private final InputStream is;
public InputStreamStreamInput(InputStream is) {
this.is = is;
}
@Override public byte readByte() throws IOException {
int ch = is.read();
if (ch < 0)
throw new EOFException();
return (byte) (ch);
}
@Override public void readBytes(byte[] b, int offset, int len) throws IOException {
if (len < 0)
throw new IndexOutOfBoundsException();
int n = 0;
while (n < len) {
int count = is.read(b, offset + n, len - n);
if (count < 0)
throw new EOFException();
n += count;
}
}
@Override public void reset() throws IOException {
is.reset();
}
@Override public void close() throws IOException {
is.close();
}
@Override public int read() throws IOException {
return is.read();
}
@Override public int read(byte[] b) throws IOException {
return is.read(b);
}
@Override public int read(byte[] b, int off, int len) throws IOException {
return is.read(b, off, len);
}
@Override public long skip(long n) throws IOException {
return is.skip(n);
}
}

View File

@ -47,7 +47,7 @@ import org.elasticsearch.common.xcontent.builder.BinaryXContentBuilder;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.gateway.Gateway;
import org.elasticsearch.gateway.GatewayException;
import org.elasticsearch.index.gateway.none.NoneIndexGatewayModule;
import org.elasticsearch.index.gateway.local.LocalIndexGatewayModule;
import org.elasticsearch.threadpool.ThreadPool;
import java.io.*;
@ -195,7 +195,7 @@ public class LocalGateway extends AbstractLifecycleComponent<Gateway> implements
}
@Override public Class<? extends Module> suggestIndexGateway() {
return NoneIndexGatewayModule.class;
return LocalIndexGatewayModule.class;
}
@Override public void reset() throws Exception {

View File

@ -21,10 +21,7 @@ package org.elasticsearch.gateway.local;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.cluster.routing.MutableShardRouting;
import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.cluster.routing.RoutingNodes;
import org.elasticsearch.cluster.routing.*;
import org.elasticsearch.cluster.routing.allocation.NodeAllocation;
import org.elasticsearch.cluster.routing.allocation.NodeAllocations;
import org.elasticsearch.common.collect.Maps;
@ -35,6 +32,7 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.shard.ShardId;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
@ -50,6 +48,10 @@ public class LocalGatewayNodeAllocation extends NodeAllocation {
this.listGatewayState = listGatewayState;
}
@Override public void applyFailedShards(NodeAllocations nodeAllocations, RoutingNodes routingNodes, DiscoveryNodes nodes, List<? extends ShardRouting> failedShards) {
// TODO when a shard failed and we in the initial allocation, find an existing one
}
@Override public boolean allocateUnassigned(NodeAllocations nodeAllocations, RoutingNodes routingNodes, DiscoveryNodes nodes) {
boolean changed = false;
@ -105,6 +107,8 @@ public class LocalGatewayNodeAllocation extends NodeAllocation {
}
}
// TODO optimize replica allocation to existing work locations
return changed;
}
}

View File

@ -0,0 +1,53 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.gateway.local;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.AbstractIndexComponent;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.gateway.IndexGateway;
import org.elasticsearch.index.gateway.IndexShardGateway;
import org.elasticsearch.index.settings.IndexSettings;
/**
* @author kimchy (shay.banon)
*/
public class LocalIndexGateway extends AbstractIndexComponent implements IndexGateway {
@Inject public LocalIndexGateway(Index index, @IndexSettings Settings indexSettings) {
super(index, indexSettings);
}
@Override public String type() {
return "local";
}
@Override public Class<? extends IndexShardGateway> shardGatewayClass() {
return LocalIndexShardGateway.class;
}
@Override public String toString() {
return "local";
}
@Override public void close(boolean delete) {
}
}

View File

@ -0,0 +1,33 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.gateway.local;
import org.elasticsearch.common.inject.AbstractModule;
import org.elasticsearch.index.gateway.IndexGateway;
/**
* @author kimchy (shay.banon)
*/
public class LocalIndexGatewayModule extends AbstractModule {
@Override protected void configure() {
bind(IndexGateway.class).to(LocalIndexGateway.class).asEagerSingleton();
}
}

View File

@ -0,0 +1,157 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.gateway.local;
import org.apache.lucene.index.IndexReader;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.InputStreamStreamInput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.gateway.none.NoneGateway;
import org.elasticsearch.index.gateway.IndexShardGateway;
import org.elasticsearch.index.gateway.IndexShardGatewayRecoveryException;
import org.elasticsearch.index.gateway.RecoveryStatus;
import org.elasticsearch.index.gateway.SnapshotStatus;
import org.elasticsearch.index.settings.IndexSettings;
import org.elasticsearch.index.shard.AbstractIndexShardComponent;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.service.IndexShard;
import org.elasticsearch.index.shard.service.InternalIndexShard;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.index.translog.TranslogStreams;
import org.elasticsearch.index.translog.fs.FsTranslog;
import java.io.EOFException;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
/**
* @author kimchy (shay.banon)
*/
public class LocalIndexShardGateway extends AbstractIndexShardComponent implements IndexShardGateway {
private final InternalIndexShard indexShard;
private final RecoveryStatus recoveryStatus = new RecoveryStatus();
@Inject public LocalIndexShardGateway(ShardId shardId, @IndexSettings Settings indexSettings, IndexShard indexShard) {
super(shardId, indexSettings);
this.indexShard = (InternalIndexShard) indexShard;
}
@Override public String toString() {
return "local";
}
@Override public RecoveryStatus recoveryStatus() {
return recoveryStatus;
}
@Override public void recover(RecoveryStatus recoveryStatus) throws IndexShardGatewayRecoveryException {
recoveryStatus().index().startTime(System.currentTimeMillis());
// read the gateway data persisted
long version = -1;
try {
if (IndexReader.indexExists(indexShard.store().directory())) {
version = IndexReader.getCurrentVersion(indexShard.store().directory());
}
} catch (IOException e) {
throw new IndexShardGatewayRecoveryException(shardId(), "Failed to fetch index version after copying it over", e);
}
recoveryStatus.index().updateVersion(version);
recoveryStatus.index().time(System.currentTimeMillis() - recoveryStatus.index().startTime());
recoveryStatus.translog().startTime(System.currentTimeMillis());
if (version == -1) {
// no translog files, bail
indexShard.start();
// no index, just start the shard and bail
recoveryStatus.translog().time(System.currentTimeMillis() - recoveryStatus.index().startTime());
return;
}
// move an existing translog, if exists, to "recovering" state, and start reading from it
FsTranslog translog = (FsTranslog) indexShard.translog();
File recoveringTranslogFile = new File(translog.location(), "translog-" + version + ".recovering");
if (!recoveringTranslogFile.exists()) {
File translogFile = new File(translog.location(), "translog-" + version);
if (translogFile.exists()) {
for (int i = 0; i < 3; i++) {
if (translogFile.renameTo(recoveringTranslogFile)) {
break;
}
}
}
}
if (!recoveringTranslogFile.exists()) {
// no translog to recovery from, start and bail
// no translog files, bail
indexShard.start();
// no index, just start the shard and bail
recoveryStatus.translog().time(System.currentTimeMillis() - recoveryStatus.index().startTime());
return;
}
// recover from the translog file
indexShard.performRecoveryPrepareForTranslog();
try {
InputStreamStreamInput si = new InputStreamStreamInput(new FileInputStream(recoveringTranslogFile));
while (true) {
int opSize = si.readInt();
Translog.Operation operation = TranslogStreams.readTranslogOperation(si);
recoveryStatus.translog().addTranslogOperations(1);
indexShard.performRecoveryOperation(operation);
}
} catch (EOFException e) {
// ignore this exception, its fine
} catch (IOException e) {
// ignore this as well
}
indexShard.performRecoveryFinalization(true);
recoveringTranslogFile.delete();
recoveryStatus.translog().time(System.currentTimeMillis() - recoveryStatus.index().startTime());
}
@Override public String type() {
return NoneGateway.TYPE;
}
@Override public SnapshotStatus snapshot(Snapshot snapshot) {
return null;
}
@Override public SnapshotStatus lastSnapshotStatus() {
return null;
}
@Override public SnapshotStatus currentSnapshotStatus() {
return null;
}
@Override public boolean requiresSnapshotScheduling() {
return false;
}
@Override public void close(boolean delete) {
}
}

View File

@ -75,6 +75,10 @@ public class FsTranslog extends AbstractIndexShardComponent implements Translog
this.useStream = useStream;
}
public File location() {
return location;
}
@Override public long currentId() {
return this.id;
}