initial work on reusing work node level data when recovering from gateway

This commit is contained in:
kimchy 2010-06-24 00:32:05 +03:00
parent 0d7cebbaf0
commit 57169d4233
28 changed files with 288 additions and 138 deletions

View File

@ -26,6 +26,7 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.env.Environment;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.store.Store;
@ -262,18 +263,19 @@ public class SimpleStoreBenchmark {
public static void main(String[] args) throws Exception {
Environment environment = new Environment();
Settings settings = EMPTY_SETTINGS;
String localNodeId = "nodeId";
NodeEnvironment nodeEnvironment = new NodeEnvironment(settings, environment);
ShardId shardId = new ShardId(new Index("index"), 1);
String type = args.length > 0 ? args[0] : "ram";
Store store;
if (type.equalsIgnoreCase("ram")) {
store = new RamStore(shardId, settings);
} else if (type.equalsIgnoreCase("simple-fs")) {
store = new SimpleFsStore(shardId, settings, new SimpleFsIndexStore(shardId.index(), settings, environment, localNodeId));
store = new SimpleFsStore(shardId, settings, new SimpleFsIndexStore(shardId.index(), settings, nodeEnvironment));
} else if (type.equalsIgnoreCase("mmap-fs")) {
store = new NioFsStore(shardId, settings, new NioFsIndexStore(shardId.index(), settings, environment, localNodeId));
store = new NioFsStore(shardId, settings, new NioFsIndexStore(shardId.index(), settings, nodeEnvironment));
} else if (type.equalsIgnoreCase("nio-fs")) {
store = new MmapFsStore(shardId, settings, new MmapFsIndexStore(shardId.index(), settings, environment, localNodeId));
store = new MmapFsStore(shardId, settings, new MmapFsIndexStore(shardId.index(), settings, nodeEnvironment));
} else if (type.equalsIgnoreCase("memory-direct")) {
Settings byteBufferSettings = settingsBuilder()
.put(settings)

View File

@ -67,7 +67,7 @@ public class Version {
}
public static String full() {
StringBuilder sb = new StringBuilder("ElasticSearch/");
StringBuilder sb = new StringBuilder("elasticsearch/");
sb.append(number);
if (snapshotBuild) {
sb.append("/").append(date);

View File

@ -226,7 +226,7 @@ public class MetaDataService extends AbstractComponent {
.initializeEmpty(newMetaData.index(index));
routingTableBuilder.add(indexRoutingBuilder);
logger.info("Creating Index [{}], cause [{}], shards [{}]/[{}], mappings {}", index, cause, indexMetaData.numberOfShards(), indexMetaData.numberOfReplicas(), fMappings.keySet());
logger.info("creating Index [{}], cause [{}], shards [{}]/[{}], mappings {}", index, cause, indexMetaData.numberOfShards(), indexMetaData.numberOfReplicas(), fMappings.keySet());
RoutingTable newRoutingTable = shardsRoutingStrategy.reroute(newClusterStateBuilder().state(currentState).routingTable(routingTableBuilder).metaData(newMetaData).build());
return newClusterStateBuilder().state(currentState).routingTable(newRoutingTable).metaData(newMetaData).build();
}
@ -254,7 +254,7 @@ public class MetaDataService extends AbstractComponent {
try {
mappings.put(fileNameNoSuffix, Streams.copyToString(new FileReader(mappingFile)));
} catch (IOException e) {
logger.warn("Failed to read mapping [" + fileNameNoSuffix + "] from location [" + mappingFile + "], ignoring...", e);
logger.warn("failed to read mapping [" + fileNameNoSuffix + "] from location [" + mappingFile + "], ignoring...", e);
}
}
}
@ -265,7 +265,7 @@ public class MetaDataService extends AbstractComponent {
throw new IndexMissingException(new Index(index));
}
logger.info("Deleting index [{}]", index);
logger.info("deleting index [{}]", index);
final CountDownLatch latch = new CountDownLatch(clusterService.state().nodes().size());
NodeIndexDeletedAction.Listener listener = new NodeIndexDeletedAction.Listener() {
@ -320,9 +320,9 @@ public class MetaDataService extends AbstractComponent {
// build the updated mapping source
final String updatedMappingSource = existingMapper.buildSource();
if (logger.isDebugEnabled()) {
logger.debug("Index [" + index + "]: Update mapping [" + type + "] (dynamic) with source [" + updatedMappingSource + "]");
logger.debug("index [" + index + "]: Update mapping [" + type + "] (dynamic) with source [" + updatedMappingSource + "]");
} else if (logger.isInfoEnabled()) {
logger.info("Index [" + index + "]: Update mapping [" + type + "] (dynamic)");
logger.info("index [" + index + "]: Update mapping [" + type + "] (dynamic)");
}
// publish the new mapping
clusterService.submitStateUpdateTask("update-mapping [" + index + "][" + type + "]", new ClusterStateUpdateTask() {
@ -396,9 +396,9 @@ public class MetaDataService extends AbstractComponent {
}
mappings.put(index, mapping);
if (logger.isDebugEnabled()) {
logger.debug("Index [" + index + "]: Put mapping [" + mapping.v1() + "] with source [" + mapping.v2() + "]");
logger.debug("index [" + index + "]: Put mapping [" + mapping.v1() + "] with source [" + mapping.v2() + "]");
} else if (logger.isInfoEnabled()) {
logger.info("Index [" + index + "]: Put mapping [" + mapping.v1() + "]");
logger.info("index [" + index + "]: Put mapping [" + mapping.v1() + "]");
}
}

View File

@ -71,6 +71,6 @@ public class BoundTransportAddress implements Streamable {
}
@Override public String toString() {
return "bound_address[" + boundAddress + "], publish_address[" + publishAddress + "]";
return "bound_address {" + boundAddress + "}, publish_address {" + publishAddress + "}";
}
}

View File

@ -22,7 +22,7 @@ package org.elasticsearch.env;
import org.elasticsearch.common.inject.AbstractModule;
/**
* @author kimchy (Shay Banon)
* @author kimchy (shay.banon)
*/
public class EnvironmentModule extends AbstractModule {

View File

@ -0,0 +1,93 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.env;
import org.apache.lucene.store.Lock;
import org.apache.lucene.store.NativeFSLockFactory;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import java.io.File;
import java.io.IOException;
import static org.elasticsearch.common.settings.ImmutableSettings.Builder.*;
/**
* @author kimchy (shay.banon)
*/
public class NodeEnvironment extends AbstractComponent {
private final File nodeFile;
private final Lock lock;
public NodeEnvironment(File nodeFile) {
super(EMPTY_SETTINGS);
this.nodeFile = nodeFile;
this.lock = null;
}
@Inject public NodeEnvironment(Settings settings, Environment environment) throws IOException {
super(settings);
Lock lock = null;
File dir = null;
for (int i = 0; i < 100; i++) {
dir = new File(new File(environment.workWithClusterFile(), "nodes"), Integer.toString(i));
if (!dir.exists()) {
dir.mkdirs();
}
try {
NativeFSLockFactory lockFactory = new NativeFSLockFactory(dir);
Lock tmpLock = lockFactory.makeLock("node.lock");
boolean obtained = tmpLock.obtain();
if (obtained) {
lock = tmpLock;
break;
}
} catch (IOException e) {
// ignore
}
}
if (lock == null) {
throw new IOException("Failed to obtain node lock");
}
this.lock = lock;
this.nodeFile = dir;
if (logger.isDebugEnabled()) {
logger.debug("using node location [{}]", dir);
}
}
public File nodeFile() {
return nodeFile;
}
public void close() {
if (lock != null) {
try {
lock.release();
} catch (IOException e) {
// ignore
}
}
}
}

View File

@ -0,0 +1,35 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.env;
import org.elasticsearch.common.inject.AbstractModule;
/**
* @author kimchy (shay.banon)
*/
public class NodeEnvironmentModule extends AbstractModule {
public NodeEnvironmentModule() {
}
@Override protected void configure() {
bind(NodeEnvironment.class).asEagerSingleton();
}
}

View File

@ -124,11 +124,8 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine,
logger.trace("Shard is locked, releasing lock");
store.directory().clearLock(IndexWriter.WRITE_LOCK_NAME);
}
IndexWriter writer = new IndexWriter(store.directory(), analysisService.defaultIndexAnalyzer(), true, IndexWriter.MaxFieldLength.UNLIMITED);
writer.commit();
writer.close();
} catch (IOException e) {
logger.warn("Failed to clean the index", e);
logger.warn("Failed to check if index is locked", e);
}
}
@ -454,11 +451,12 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine,
this.nrtResource.forceClose();
}
try {
// no need to commit in this case!, we snapshot before we close the shard, so translog and all sync'ed
if (indexWriter != null) {
indexWriter.close();
indexWriter.rollback();
}
} catch (IOException e) {
throw new CloseEngineException(shardId, "Failed to close engine", e);
logger.debug("failed to rollback writer on close", e);
} finally {
indexWriter = null;
rwl.writeLock().unlock();

View File

@ -234,15 +234,23 @@ public interface IndexShardGateway extends IndexShardComponent, CloseableIndexCo
}
public static class Index {
public static final Index EMPTY = new Index(-1, 0, new ByteSizeValue(0), 0, new ByteSizeValue(0), timeValueMillis(0));
private long version;
private int numberOfFiles;
private ByteSizeValue totalSize;
private int numberOfExistingFiles;
private ByteSizeValue existingTotalSize;
private TimeValue throttlingWaitTime;
public Index(long version, int numberOfFiles, ByteSizeValue totalSize, TimeValue throttlingWaitTime) {
public Index(long version, int numberOfFiles, ByteSizeValue totalSize,
int numberOfExistingFiles, ByteSizeValue existingTotalSize,
TimeValue throttlingWaitTime) {
this.version = version;
this.numberOfFiles = numberOfFiles;
this.totalSize = totalSize;
this.numberOfExistingFiles = numberOfExistingFiles;
this.existingTotalSize = existingTotalSize;
this.throttlingWaitTime = throttlingWaitTime;
}
@ -258,6 +266,14 @@ public interface IndexShardGateway extends IndexShardComponent, CloseableIndexCo
return totalSize;
}
public int numberOfExistingFiles() {
return numberOfExistingFiles;
}
public ByteSizeValue existingTotalSize() {
return existingTotalSize;
}
public TimeValue throttlingWaitTime() {
return throttlingWaitTime;
}

View File

@ -40,7 +40,6 @@ import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.indices.recovery.throttler.RecoveryThrottler;
import org.elasticsearch.threadpool.ThreadPool;
import java.io.IOException;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.atomic.AtomicBoolean;
@ -104,12 +103,6 @@ public class IndexShardGatewayService extends AbstractIndexShardComponent implem
throw new ElasticSearchIllegalStateException("Trying to recover when the shard is in backup state");
}
// clear the store, we are going to recover into it
try {
store.deleteContent();
} catch (IOException e) {
logger.debug("Failed to delete store before recovery from gateway", e);
}
indexShard.recovering();
StopWatch throttlingWaitTime = new StopWatch().start();
@ -128,7 +121,7 @@ public class IndexShardGatewayService extends AbstractIndexShardComponent implem
throttlingWaitTime.stop();
try {
logger.debug("Starting recovery from {}", shardGateway);
logger.debug("starting recovery from {}", shardGateway);
StopWatch stopWatch = new StopWatch().start();
IndexShardGateway.RecoveryStatus recoveryStatus = shardGateway.recover();
@ -143,9 +136,10 @@ public class IndexShardGatewayService extends AbstractIndexShardComponent implem
stopWatch.stop();
if (logger.isDebugEnabled()) {
StringBuilder sb = new StringBuilder();
sb.append("Recovery completed from ").append(shardGateway).append(", took [").append(stopWatch.totalTime()).append("], throttling_wait [").append(throttlingWaitTime.totalTime()).append("]\n");
sb.append(" Index : number_of_files [").append(recoveryStatus.index().numberOfFiles()).append("] with total_size [").append(recoveryStatus.index().totalSize()).append("], throttling_wait [").append(recoveryStatus.index().throttlingWaitTime()).append("]\n");
sb.append(" Translog : translog_id [").append(recoveryStatus.translog().translogId()).append("], number_of_operations [").append(recoveryStatus.translog().numberOfOperations()).append("] with total_size[").append(recoveryStatus.translog().totalSize()).append("]");
sb.append("recovery completed from ").append(shardGateway).append(", took [").append(stopWatch.totalTime()).append("], throttling_wait [").append(throttlingWaitTime.totalTime()).append("]\n");
sb.append(" index : number_of_files [").append(recoveryStatus.index().numberOfFiles()).append("] with total_size [").append(recoveryStatus.index().totalSize()).append("], throttling_wait [").append(recoveryStatus.index().throttlingWaitTime()).append("]\n");
sb.append(" : reusing_files [").append(recoveryStatus.index().numberOfExistingFiles()).append("] with total_size [").append(recoveryStatus.index().existingTotalSize()).append("]\n");
sb.append(" translog : translog_id [").append(recoveryStatus.translog().translogId()).append("], number_of_operations [").append(recoveryStatus.translog().numberOfOperations()).append("] with total_size[").append(recoveryStatus.translog().totalSize()).append("]");
logger.debug(sb.toString());
}
// refresh the shard
@ -190,9 +184,9 @@ public class IndexShardGatewayService extends AbstractIndexShardComponent implem
if (snapshotStatus != IndexShardGateway.SnapshotStatus.NA) {
if (logger.isDebugEnabled()) {
StringBuilder sb = new StringBuilder();
sb.append("Snapshot completed to ").append(shardGateway).append(", took [").append(snapshotStatus.totalTime()).append("]\n");
sb.append(" Index : number_of_files [").append(snapshotStatus.index().numberOfFiles()).append("] with total_size [").append(snapshotStatus.index().totalSize()).append("], took [").append(snapshotStatus.index().time()).append("]\n");
sb.append(" Translog : number_of_operations [").append(snapshotStatus.translog().numberOfOperations()).append("], took [").append(snapshotStatus.translog().time()).append("]");
sb.append("snapshot completed to ").append(shardGateway).append(", took [").append(snapshotStatus.totalTime()).append("]\n");
sb.append(" index : number_of_files [").append(snapshotStatus.index().numberOfFiles()).append("] with total_size [").append(snapshotStatus.index().totalSize()).append("], took [").append(snapshotStatus.index().time()).append("]\n");
sb.append(" translog : number_of_operations [").append(snapshotStatus.translog().numberOfOperations()).append("], took [").append(snapshotStatus.translog().time()).append("]");
logger.debug(sb.toString());
}
}
@ -211,11 +205,11 @@ public class IndexShardGatewayService extends AbstractIndexShardComponent implem
snapshotScheduleFuture = null;
}
if (!delete && snapshotOnClose) {
logger.debug("Snapshotting on close ...");
logger.debug("snapshotting on close ...");
try {
snapshot();
} catch (Exception e) {
logger.warn("Failed to snapshot on close", e);
logger.warn("failed to snapshot on close", e);
}
}
// don't really delete the shard gateway if we are *not* primary,
@ -245,7 +239,7 @@ public class IndexShardGatewayService extends AbstractIndexShardComponent implem
if (snapshotInterval.millis() != -1) {
// we need to schedule snapshot
if (logger.isDebugEnabled()) {
logger.debug("Scheduling snapshot every [{}]", snapshotInterval);
logger.debug("scheduling snapshot every [{}]", snapshotInterval);
}
snapshotScheduleFuture = threadPool.scheduleWithFixedDelay(new SnapshotRunnable(), snapshotInterval);
}
@ -256,7 +250,7 @@ public class IndexShardGatewayService extends AbstractIndexShardComponent implem
try {
snapshot();
} catch (Exception e) {
logger.warn("Failed to snapshot (scheduled)", e);
logger.warn("failed to snapshot (scheduled)", e);
}
}
}

View File

@ -403,7 +403,7 @@ public abstract class BlobStoreIndexShardGateway extends AbstractIndexShardCompo
return new RecoveryStatus.Translog(lastTranslogId, operations.size(), new ByteSizeValue(translogData.length, ByteSizeUnit.BYTES));
} catch (Exception e) {
lastException = e;
logger.debug("Failed to read translog, will try the next one", e);
logger.debug("failed to read translog, will try the next one", e);
}
}
throw new IndexShardGatewayRecoveryException(shardId, "Failed to recovery translog", lastException);
@ -418,6 +418,11 @@ public abstract class BlobStoreIndexShardGateway extends AbstractIndexShardCompo
}
TObjectLongHashMap<String> combinedBlobs = buildCombinedPartsBlobs(blobs);
int numberOfFiles = 0;
long totalSize = 0;
int numberOfExistingFiles = 0;
long existingTotalSize = 0;
// filter out only the files that we need to recover, and reuse ones that exists in the store
List<String> filesToRecover = new ArrayList<String>();
for (TObjectLongIterator<String> it = combinedBlobs.iterator(); it.hasNext();) {
@ -425,10 +430,14 @@ public abstract class BlobStoreIndexShardGateway extends AbstractIndexShardCompo
// if the store has the file, and it has the same length, don't recover it
try {
if (store.directory().fileExists(it.key()) && store.directory().fileLength(it.key()) == it.value()) {
numberOfExistingFiles++;
existingTotalSize += it.value();
if (logger.isTraceEnabled()) {
logger.trace("not recovering [{}], exists in local store and has same size [{}]", it.key(), new ByteSizeValue(it.value()));
}
} else {
numberOfFiles++;
totalSize += it.value();
filesToRecover.add(it.key());
}
} catch (Exception e) {
@ -437,12 +446,10 @@ public abstract class BlobStoreIndexShardGateway extends AbstractIndexShardCompo
}
}
long totalSize = 0;
final AtomicLong throttlingWaitTime = new AtomicLong();
final CountDownLatch latch = new CountDownLatch(filesToRecover.size());
final CopyOnWriteArrayList<Throwable> failures = new CopyOnWriteArrayList<Throwable>();
for (final String fileToRecover : filesToRecover) {
totalSize += combinedBlobs.get(fileToRecover);
if (recoveryThrottler.tryStream(shardId, fileToRecover)) {
// we managed to get a recovery going
recoverFile(fileToRecover, blobs, latch, failures);
@ -497,7 +504,7 @@ public abstract class BlobStoreIndexShardGateway extends AbstractIndexShardCompo
// ignore
}
return new RecoveryStatus.Index(version, filesToRecover.size(), new ByteSizeValue(totalSize, ByteSizeUnit.BYTES), TimeValue.timeValueMillis(throttlingWaitTime.get()));
return new RecoveryStatus.Index(version, numberOfFiles, new ByteSizeValue(totalSize), numberOfExistingFiles, new ByteSizeValue(existingTotalSize), TimeValue.timeValueMillis(throttlingWaitTime.get()));
}
private void recoverFile(final String fileToRecover, final ImmutableMap<String, BlobMetaData> blobs, final CountDownLatch latch, final List<Throwable> failures) {

View File

@ -23,7 +23,6 @@ import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.gateway.none.NoneGateway;
import org.elasticsearch.index.gateway.IndexShardGateway;
import org.elasticsearch.index.gateway.IndexShardGatewayRecoveryException;
@ -48,7 +47,7 @@ public class NoneIndexShardGateway extends AbstractIndexShardComponent implement
@Override public RecoveryStatus recover() throws IndexShardGatewayRecoveryException {
// in the none case, we simply start the shard
indexShard.start();
return new RecoveryStatus(new RecoveryStatus.Index(-1, 0, new ByteSizeValue(0, ByteSizeUnit.BYTES), TimeValue.timeValueMillis(0)), new RecoveryStatus.Translog(-1, 0, new ByteSizeValue(0, ByteSizeUnit.BYTES)));
return new RecoveryStatus(RecoveryStatus.Index.EMPTY, new RecoveryStatus.Translog(-1, 0, new ByteSizeValue(0, ByteSizeUnit.BYTES)));
}
@Override public String type() {

View File

@ -29,6 +29,7 @@ import org.elasticsearch.common.inject.Injector;
import org.elasticsearch.common.inject.Injectors;
import org.elasticsearch.common.inject.Module;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.gateway.none.NoneGateway;
import org.elasticsearch.index.AbstractIndexComponent;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexShardAlreadyExistsException;
@ -94,6 +95,10 @@ public class InternalIndexService extends AbstractIndexComponent implements Inde
private final IndexEngine indexEngine;
private final IndexGateway indexGateway;
private final IndexStore indexStore;
private final OperationRouting operationRouting;
private volatile ImmutableMap<Integer, Injector> shardsInjectors = ImmutableMap.of();
@ -104,7 +109,7 @@ public class InternalIndexService extends AbstractIndexComponent implements Inde
@Inject public InternalIndexService(Injector injector, Index index, @IndexSettings Settings indexSettings,
MapperService mapperService, IndexQueryParserService queryParserService, SimilarityService similarityService,
IndexCache indexCache, IndexEngine indexEngine, OperationRouting operationRouting) {
IndexCache indexCache, IndexEngine indexEngine, IndexGateway indexGateway, IndexStore indexStore, OperationRouting operationRouting) {
super(index, indexSettings);
this.injector = injector;
this.indexSettings = indexSettings;
@ -113,6 +118,8 @@ public class InternalIndexService extends AbstractIndexComponent implements Inde
this.similarityService = similarityService;
this.indexCache = indexCache;
this.indexEngine = indexEngine;
this.indexGateway = indexGateway;
this.indexStore = indexStore;
this.operationRouting = operationRouting;
this.pluginsService = injector.getInstance(PluginsService.class);
@ -207,7 +214,7 @@ public class InternalIndexService extends AbstractIndexComponent implements Inde
indicesLifecycle.beforeIndexShardCreated(shardId);
logger.debug("Creating shard_id[{}]", shardId.id());
logger.debug("creating shard_id [{}]", shardId.id());
List<Module> modules = Lists.newArrayList();
modules.add(new ShardsPluginsModule(indexSettings, pluginsService));
@ -228,12 +235,14 @@ public class InternalIndexService extends AbstractIndexComponent implements Inde
IndexShard indexShard = shardInjector.getInstance(IndexShard.class);
// clean the store
Store store = shardInjector.getInstance(Store.class);
try {
store.deleteContent();
} catch (IOException e) {
logger.warn("Failed to clean store on shard creation", e);
// if there is no gateway, clean the store, since we won't recover into it
if (indexGateway.type().equals(NoneGateway.TYPE)) {
Store store = shardInjector.getInstance(Store.class);
try {
store.deleteContent();
} catch (IOException e) {
logger.warn("failed to clean store on shard creation", e);
}
}
indicesLifecycle.afterIndexShardCreated(indexShard);
@ -258,7 +267,7 @@ public class InternalIndexService extends AbstractIndexComponent implements Inde
}
shardsInjectors = ImmutableMap.copyOf(tmpShardInjectors);
if (delete) {
logger.debug("Deleting shard_id[{}]", shardId);
logger.debug("deleting shard_id [{}]", shardId);
}
Map<Integer, IndexShard> tmpShardsMap = newHashMap(shards);
@ -287,16 +296,20 @@ public class InternalIndexService extends AbstractIndexComponent implements Inde
// call this before we close the store, so we can release resources for it
indicesLifecycle.afterIndexShardClosed(indexShard.shardId(), delete);
// if we delete or have no gateway or the store is not persistent, clean the store...
Store store = shardInjector.getInstance(Store.class);
try {
store.fullDelete();
} catch (IOException e) {
logger.warn("Failed to clean store on shard deletion", e);
if (delete || indexGateway.type().equals(NoneGateway.TYPE) || !indexStore.persistent()) {
try {
store.fullDelete();
} catch (IOException e) {
logger.warn("failed to clean store on shard deletion", e);
}
}
// and close it
try {
store.close();
} catch (IOException e) {
logger.warn("Failed to close store on shard deletion", e);
logger.warn("failed to close store on shard deletion", e);
}
Injectors.close(injector);

View File

@ -20,10 +20,9 @@
package org.elasticsearch.index.store.fs;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.env.Environment;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.index.AbstractIndexComponent;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.LocalNodeId;
import org.elasticsearch.index.settings.IndexSettings;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.store.IndexStore;
@ -37,15 +36,13 @@ public abstract class FsIndexStore extends AbstractIndexComponent implements Ind
private final File location;
public FsIndexStore(Index index, @IndexSettings Settings indexSettings, Environment environment, @LocalNodeId String localNodeId) {
public FsIndexStore(Index index, @IndexSettings Settings indexSettings, NodeEnvironment nodeEnv) {
super(index, indexSettings);
this.location = new File(new File(new File(environment.workWithClusterFile(), "indices"), localNodeId), index.name());
this.location = new File(new File(nodeEnv.nodeFile(), "indices"), index.name());
if (!location.exists()) {
boolean result = false;
for (int i = 0; i < 5; i++) {
result = location.mkdirs();
if (result) {
if (location.mkdirs()) {
break;
}
}

View File

@ -19,8 +19,7 @@
package org.elasticsearch.index.store.fs;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.FSDirectory;
import org.apache.lucene.store.*;
import org.elasticsearch.common.collect.ImmutableSet;
import org.elasticsearch.common.io.FileSystemUtils;
import org.elasticsearch.common.lucene.store.SwitchDirectory;
@ -55,6 +54,17 @@ public abstract class FsStore<T extends Directory> extends AbstractStore<T> {
public abstract FSDirectory fsDirectory();
protected LockFactory buildLockFactory() throws IOException {
String fsLock = componentSettings.get("fs_lock", "native");
LockFactory lockFactory = new NoLockFactory();
if (fsLock.equals("native")) {
lockFactory = new NativeFSLockFactory();
} else if (fsLock.equals("simple")) {
lockFactory = new SimpleFSLockFactory();
}
return lockFactory;
}
protected SwitchDirectory buildSwitchDirectoryIfNeeded(Directory fsDirectory) {
boolean cache = componentSettings.getAsBoolean("memory.enabled", false);
if (!cache) {

View File

@ -21,9 +21,8 @@ package org.elasticsearch.index.store.fs;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.env.Environment;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.LocalNodeId;
import org.elasticsearch.index.settings.IndexSettings;
import org.elasticsearch.index.store.Store;
@ -32,8 +31,8 @@ import org.elasticsearch.index.store.Store;
*/
public class MmapFsIndexStore extends FsIndexStore {
@Inject public MmapFsIndexStore(Index index, @IndexSettings Settings indexSettings, Environment environment, @LocalNodeId String localNodeId) {
super(index, indexSettings, environment, localNodeId);
@Inject public MmapFsIndexStore(Index index, @IndexSettings Settings indexSettings, NodeEnvironment nodeEnv) {
super(index, indexSettings, nodeEnv);
}
@Override public Class<? extends Store> shardStoreClass() {

View File

@ -23,7 +23,7 @@ import org.elasticsearch.common.inject.AbstractModule;
import org.elasticsearch.index.store.IndexStore;
/**
* @author kimchy (Shay Banon)
* @author kimchy (shay.banon)
*/
public class MmapFsIndexStoreModule extends AbstractModule {

View File

@ -19,7 +19,10 @@
package org.elasticsearch.index.store.fs;
import org.apache.lucene.store.*;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.FSDirectory;
import org.apache.lucene.store.LockFactory;
import org.apache.lucene.store.MMapDirectory;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.lucene.store.SwitchDirectory;
import org.elasticsearch.common.settings.Settings;
@ -48,14 +51,9 @@ public class MmapFsStore extends FsStore<Directory> {
super(shardId, indexSettings);
// by default, we don't need to sync to disk, since we use the gateway
this.syncToDisk = componentSettings.getAsBoolean("sync_to_disk", false);
String fsLock = componentSettings.get("use_fs_lock", "none");
LockFactory lockFactory = new NoLockFactory();
if (fsLock.equals("native")) {
lockFactory = new NativeFSLockFactory();
} else if (fsLock.equals("simple")) {
lockFactory = new SimpleFSLockFactory();
}
LockFactory lockFactory = buildLockFactory();
File location = ((FsIndexStore) indexStore).shardLocation(shardId);
location.mkdirs();
this.fsDirectory = new CustomMMapDirectory(location, lockFactory, syncToDisk);
SwitchDirectory switchDirectory = buildSwitchDirectoryIfNeeded(fsDirectory);

View File

@ -21,9 +21,8 @@ package org.elasticsearch.index.store.fs;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.env.Environment;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.LocalNodeId;
import org.elasticsearch.index.settings.IndexSettings;
import org.elasticsearch.index.store.Store;
@ -32,8 +31,8 @@ import org.elasticsearch.index.store.Store;
*/
public class NioFsIndexStore extends FsIndexStore {
@Inject public NioFsIndexStore(Index index, @IndexSettings Settings indexSettings, Environment environment, @LocalNodeId String localNodeId) {
super(index, indexSettings, environment, localNodeId);
@Inject public NioFsIndexStore(Index index, @IndexSettings Settings indexSettings, NodeEnvironment nodeEnv) {
super(index, indexSettings, nodeEnv);
}
@Override public Class<? extends Store> shardStoreClass() {

View File

@ -19,7 +19,10 @@
package org.elasticsearch.index.store.fs;
import org.apache.lucene.store.*;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.FSDirectory;
import org.apache.lucene.store.LockFactory;
import org.apache.lucene.store.NIOFSDirectory;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.lucene.store.SwitchDirectory;
import org.elasticsearch.common.settings.Settings;
@ -32,7 +35,7 @@ import java.io.File;
import java.io.IOException;
/**
* @author kimchy (Shay Banon)
* @author kimchy (shay.banon)
*/
public class NioFsStore extends FsStore<Directory> {
@ -48,14 +51,9 @@ public class NioFsStore extends FsStore<Directory> {
super(shardId, indexSettings);
// by default, we don't need to sync to disk, since we use the gateway
this.syncToDisk = componentSettings.getAsBoolean("sync_to_disk", false);
String fsLock = componentSettings.get("use_fs_lock", "none");
LockFactory lockFactory = new NoLockFactory();
if (fsLock.equals("native")) {
lockFactory = new NativeFSLockFactory();
} else if (fsLock.equals("simple")) {
lockFactory = new SimpleFSLockFactory();
}
LockFactory lockFactory = buildLockFactory();
File location = ((FsIndexStore) indexStore).shardLocation(shardId);
location.mkdirs();
this.fsDirectory = new CustomNioFSDirectory(location, lockFactory, syncToDisk);
SwitchDirectory switchDirectory = buildSwitchDirectoryIfNeeded(fsDirectory);

View File

@ -21,9 +21,8 @@ package org.elasticsearch.index.store.fs;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.env.Environment;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.LocalNodeId;
import org.elasticsearch.index.settings.IndexSettings;
import org.elasticsearch.index.store.Store;
@ -32,8 +31,8 @@ import org.elasticsearch.index.store.Store;
*/
public class SimpleFsIndexStore extends FsIndexStore {
@Inject public SimpleFsIndexStore(Index index, @IndexSettings Settings indexSettings, Environment environment, @LocalNodeId String localNodeId) {
super(index, indexSettings, environment, localNodeId);
@Inject public SimpleFsIndexStore(Index index, @IndexSettings Settings indexSettings, NodeEnvironment nodeEnv) {
super(index, indexSettings, nodeEnv);
}
@Override public Class<? extends Store> shardStoreClass() {

View File

@ -19,7 +19,10 @@
package org.elasticsearch.index.store.fs;
import org.apache.lucene.store.*;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.FSDirectory;
import org.apache.lucene.store.LockFactory;
import org.apache.lucene.store.SimpleFSDirectory;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.lucene.store.SwitchDirectory;
import org.elasticsearch.common.settings.Settings;
@ -32,7 +35,7 @@ import java.io.File;
import java.io.IOException;
/**
* @author kimchy (Shay Banon)
* @author kimchy (shay.banon)
*/
public class SimpleFsStore extends FsStore<Directory> {
@ -48,14 +51,9 @@ public class SimpleFsStore extends FsStore<Directory> {
super(shardId, indexSettings);
// by default, we don't need to sync to disk, since we use the gateway
this.syncToDisk = componentSettings.getAsBoolean("sync_to_disk", false);
String fsLock = componentSettings.get("use_fs_lock", "none");
LockFactory lockFactory = new NoLockFactory();
if (fsLock.equals("native")) {
lockFactory = new NativeFSLockFactory();
} else if (fsLock.equals("simple")) {
lockFactory = new SimpleFSLockFactory();
}
LockFactory lockFactory = buildLockFactory();
File location = ((FsIndexStore) indexStore).shardLocation(shardId);
location.mkdirs();
this.fsDirectory = new CustomSimpleFSDirectory(location, lockFactory, syncToDisk);
SwitchDirectory switchDirectory = buildSwitchDirectoryIfNeeded(fsDirectory);

View File

@ -31,7 +31,7 @@ import org.elasticsearch.index.store.Store;
import java.io.IOException;
/**
* @author kimchy (Shay Banon)
* @author kimchy (shay.banon)
*/
public abstract class AbstractStore<T extends Directory> extends AbstractIndexShardComponent implements Store<T> {

View File

@ -168,7 +168,7 @@ public class InternalIndicesService extends AbstractLifecycleComponent<IndicesSe
indicesLifecycle.beforeIndexCreated(index);
logger.debug("Creating Index [{}], shards [{}]/[{}]", sIndexName, settings.get(SETTING_NUMBER_OF_SHARDS), settings.get(SETTING_NUMBER_OF_REPLICAS));
logger.debug("creating Index [{}], shards [{}]/[{}]", sIndexName, settings.get(SETTING_NUMBER_OF_SHARDS), settings.get(SETTING_NUMBER_OF_REPLICAS));
Settings indexSettings = settingsBuilder()
.put("settingsType", "index")
@ -226,7 +226,7 @@ public class InternalIndicesService extends AbstractLifecycleComponent<IndicesSe
throw new IndexMissingException(new Index(index));
}
if (delete) {
logger.debug("Deleting Index [{}]", index);
logger.debug("deleting Index [{}]", index);
}
Map<String, IndexService> tmpMap = newHashMap(indices);

View File

@ -126,7 +126,7 @@ public class JmxService {
if (!success) {
throw new JmxConnectorCreationException("Failed to bind to [" + port + "]", lastException.get());
}
logger.info("bound_address[{}], publish_address[{}]", serviceUrl, publishUrl);
logger.info("bound_address {{}}, publish_address {{}}", serviceUrl, publishUrl);
}
for (ResourceDMBean resource : constructionMBeans) {

View File

@ -38,7 +38,6 @@ import org.elasticsearch.common.inject.Guice;
import org.elasticsearch.common.inject.Injector;
import org.elasticsearch.common.inject.Injectors;
import org.elasticsearch.common.inject.Module;
import org.elasticsearch.common.io.FileSystemUtils;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.network.NetworkModule;
@ -50,6 +49,8 @@ import org.elasticsearch.discovery.DiscoveryModule;
import org.elasticsearch.discovery.DiscoveryService;
import org.elasticsearch.env.Environment;
import org.elasticsearch.env.EnvironmentModule;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.env.NodeEnvironmentModule;
import org.elasticsearch.gateway.GatewayModule;
import org.elasticsearch.gateway.GatewayService;
import org.elasticsearch.http.HttpServer;
@ -76,7 +77,6 @@ import org.elasticsearch.timer.TimerService;
import org.elasticsearch.transport.TransportModule;
import org.elasticsearch.transport.TransportService;
import java.io.File;
import java.util.ArrayList;
import java.util.concurrent.TimeUnit;
@ -107,7 +107,7 @@ public final class InternalNode implements Node {
Tuple<Settings, Environment> tuple = InternalSettingsPerparer.prepareSettings(pSettings, loadConfigSettings);
ESLogger logger = Loggers.getLogger(Node.class, tuple.v1().get("name"));
logger.info("{{}}[{}]: Initializing ...", Version.full(), JvmInfo.jvmInfo().pid());
logger.info("{{}}[{}]: initializing ...", Version.full(), JvmInfo.jvmInfo().pid());
this.pluginsService = new PluginsService(tuple.v1(), tuple.v2());
this.settings = pluginsService.updatedSettings();
@ -122,6 +122,7 @@ public final class InternalNode implements Node {
modules.add(new ScriptModule());
modules.add(new JmxModule(settings));
modules.add(new EnvironmentModule(environment));
modules.add(new NodeEnvironmentModule());
modules.add(new ClusterNameModule(settings));
modules.add(new ThreadPoolModule(settings));
modules.add(new TimerModule());
@ -146,7 +147,7 @@ public final class InternalNode implements Node {
client = injector.getInstance(Client.class);
logger.info("{{}}[{}]: Initialized", Version.full(), JvmInfo.jvmInfo().pid());
logger.info("{{}}[{}]: initialized", Version.full(), JvmInfo.jvmInfo().pid());
}
@Override public Settings settings() {
@ -163,7 +164,7 @@ public final class InternalNode implements Node {
}
ESLogger logger = Loggers.getLogger(Node.class, settings.get("name"));
logger.info("{{}}[{}]: Starting ...", Version.full(), JvmInfo.jvmInfo().pid());
logger.info("{{}}[{}]: starting ...", Version.full(), JvmInfo.jvmInfo().pid());
for (Class<? extends LifecycleComponent> plugin : pluginsService.services()) {
injector.getInstance(plugin).start();
@ -186,7 +187,7 @@ public final class InternalNode implements Node {
}
injector.getInstance(JmxService.class).connectAndRegister(discoService.nodeDescription(), injector.getInstance(NetworkService.class));
logger.info("{{}}[{}]: Started", Version.full(), JvmInfo.jvmInfo().pid());
logger.info("{{}}[{}]: started", Version.full(), JvmInfo.jvmInfo().pid());
return this;
}
@ -196,7 +197,7 @@ public final class InternalNode implements Node {
return this;
}
ESLogger logger = Loggers.getLogger(Node.class, settings.get("name"));
logger.info("{{}}[{}]: Stopping ...", Version.full(), JvmInfo.jvmInfo().pid());
logger.info("{{}}[{}]: stopping ...", Version.full(), JvmInfo.jvmInfo().pid());
if (settings.getAsBoolean("http.enabled", true)) {
injector.getInstance(HttpServer.class).stop();
@ -216,17 +217,11 @@ public final class InternalNode implements Node {
injector.getInstance(plugin).stop();
}
// Not pretty, but here we go
try {
FileSystemUtils.deleteRecursively(new File(new File(environment.workWithClusterFile(), "indices"),
injector.getInstance(ClusterService.class).state().nodes().localNodeId()));
} catch (Exception e) {
// ignore
}
injector.getInstance(NodeEnvironment.class).close();
Injectors.close(injector);
logger.info("{{}}[{}]: Stopped", Version.full(), JvmInfo.jvmInfo().pid());
logger.info("{{}}[{}]: stopped", Version.full(), JvmInfo.jvmInfo().pid());
return this;
}
@ -240,7 +235,7 @@ public final class InternalNode implements Node {
}
ESLogger logger = Loggers.getLogger(Node.class, settings.get("name"));
logger.info("{{}}[{}]: Closing ...", Version.full(), JvmInfo.jvmInfo().pid());
logger.info("{{}}[{}]: closing ...", Version.full(), JvmInfo.jvmInfo().pid());
if (settings.getAsBoolean("http.enabled", true)) {
injector.getInstance(HttpServer.class).close();
@ -276,7 +271,7 @@ public final class InternalNode implements Node {
ThreadLocals.clearReferencesThreadLocals();
logger.info("{{}}[{}]: Closed", Version.full(), JvmInfo.jvmInfo().pid());
logger.info("{{}}[{}]: closed", Version.full(), JvmInfo.jvmInfo().pid());
}
public Injector injector() {

View File

@ -61,7 +61,7 @@ public class PluginsService extends AbstractComponent {
Map<String, Plugin> plugins = Maps.newHashMap();
plugins.putAll(loadPluginsFromClasspath(settings));
logger.info("Loaded {}", plugins.keySet());
logger.info("loaded {}", plugins.keySet());
this.plugins = ImmutableMap.copyOf(plugins);
}
@ -149,7 +149,7 @@ public class PluginsService extends AbstractComponent {
}
}
if (addURL == null) {
logger.debug("Failed to find addURL method on classLoader [" + classLoader + "] to add methods");
logger.debug("failed to find addURL method on classLoader [" + classLoader + "] to add methods");
return;
}
@ -159,7 +159,7 @@ public class PluginsService extends AbstractComponent {
continue;
}
if (logger.isTraceEnabled()) {
logger.trace("Processing [{}]", pluginFile);
logger.trace("processing [{}]", pluginFile);
}
String pluginNameNoExtension = pluginFile.getName().substring(0, pluginFile.getName().lastIndexOf('.'));
@ -180,7 +180,7 @@ public class PluginsService extends AbstractComponent {
if (size == pluginFile.length()) {
extractPlugin = false;
if (logger.isTraceEnabled()) {
logger.trace("--- No need to extract plugin, same size [" + size + "]");
logger.trace("--- no need to extract plugin, same size [" + size + "]");
}
}
} catch (Exception e) {
@ -198,7 +198,7 @@ public class PluginsService extends AbstractComponent {
if (extractPlugin) {
if (logger.isTraceEnabled()) {
logger.trace("--- Extracting plugin to [" + extractedPluginDir + "]");
logger.trace("--- extracting plugin to [" + extractedPluginDir + "]");
}
deleteRecursively(extractedPluginDir, false);
@ -216,7 +216,7 @@ public class PluginsService extends AbstractComponent {
Streams.copy(zipFile.getInputStream(zipEntry), new FileOutputStream(target));
}
} catch (Exception e) {
logger.warn("Failed to extract plugin [" + pluginFile + "], ignoring...", e);
logger.warn("failed to extract plugin [" + pluginFile + "], ignoring...", e);
continue;
} finally {
if (zipFile != null) {
@ -246,7 +246,7 @@ public class PluginsService extends AbstractComponent {
addURL.invoke(classLoader, jarToAdd.toURI().toURL());
}
} catch (Exception e) {
logger.warn("Failed to add plugin [" + pluginFile + "]", e);
logger.warn("failed to add plugin [" + pluginFile + "]", e);
}
}
}
@ -257,7 +257,7 @@ public class PluginsService extends AbstractComponent {
try {
pluginUrls = settings.getClassLoader().getResources("es-plugin.properties");
} catch (IOException e) {
logger.warn("Failed to find plugins from classpath", e);
logger.warn("failed to find plugins from classpath", e);
}
while (pluginUrls.hasMoreElements()) {
URL pluginUrl = pluginUrls.nextElement();
@ -280,7 +280,7 @@ public class PluginsService extends AbstractComponent {
}
plugins.put(plugin.name(), plugin);
} catch (Exception e) {
logger.warn("Failed to load plugin from [" + pluginUrl + "]", e);
logger.warn("failed to load plugin from [" + pluginUrl + "]", e);
} finally {
if (is != null) {
try {

View File

@ -415,7 +415,7 @@ public class CloudIndexShardGateway extends AbstractIndexShardComponent implemen
throw new IndexShardGatewayRecoveryException(shardId(), "Failed to fetch index version after copying it over", e);
}
return new RecoveryStatus.Index(version, filesMetaDatas.size(), new ByteSizeValue(totalSize, ByteSizeUnit.BYTES), TimeValue.timeValueMillis(throttlingWaitTime.get()));
return new RecoveryStatus.Index(version, filesMetaDatas.size(), new ByteSizeValue(totalSize), 0, new ByteSizeValue(0), TimeValue.timeValueMillis(throttlingWaitTime.get()));
}
private RecoveryStatus.Translog recoverTranslog() throws IndexShardGatewayRecoveryException {