diff --git a/modules/benchmark/micro/src/main/java/org/elasticsearch/benchmark/index/store/SimpleStoreBenchmark.java b/modules/benchmark/micro/src/main/java/org/elasticsearch/benchmark/index/store/SimpleStoreBenchmark.java index c1a32829544..d5eb8034cb7 100644 --- a/modules/benchmark/micro/src/main/java/org/elasticsearch/benchmark/index/store/SimpleStoreBenchmark.java +++ b/modules/benchmark/micro/src/main/java/org/elasticsearch/benchmark/index/store/SimpleStoreBenchmark.java @@ -26,6 +26,7 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.env.Environment; +import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.index.Index; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.store.Store; @@ -262,18 +263,19 @@ public class SimpleStoreBenchmark { public static void main(String[] args) throws Exception { Environment environment = new Environment(); Settings settings = EMPTY_SETTINGS; - String localNodeId = "nodeId"; + NodeEnvironment nodeEnvironment = new NodeEnvironment(settings, environment); + ShardId shardId = new ShardId(new Index("index"), 1); String type = args.length > 0 ? args[0] : "ram"; Store store; if (type.equalsIgnoreCase("ram")) { store = new RamStore(shardId, settings); } else if (type.equalsIgnoreCase("simple-fs")) { - store = new SimpleFsStore(shardId, settings, new SimpleFsIndexStore(shardId.index(), settings, environment, localNodeId)); + store = new SimpleFsStore(shardId, settings, new SimpleFsIndexStore(shardId.index(), settings, nodeEnvironment)); } else if (type.equalsIgnoreCase("mmap-fs")) { - store = new NioFsStore(shardId, settings, new NioFsIndexStore(shardId.index(), settings, environment, localNodeId)); + store = new NioFsStore(shardId, settings, new NioFsIndexStore(shardId.index(), settings, nodeEnvironment)); } else if (type.equalsIgnoreCase("nio-fs")) { - store = new MmapFsStore(shardId, settings, new MmapFsIndexStore(shardId.index(), settings, environment, localNodeId)); + store = new MmapFsStore(shardId, settings, new MmapFsIndexStore(shardId.index(), settings, nodeEnvironment)); } else if (type.equalsIgnoreCase("memory-direct")) { Settings byteBufferSettings = settingsBuilder() .put(settings) diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/Version.java b/modules/elasticsearch/src/main/java/org/elasticsearch/Version.java index 0086956de9f..7ab7fcf19b9 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/Version.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/Version.java @@ -67,7 +67,7 @@ public class Version { } public static String full() { - StringBuilder sb = new StringBuilder("ElasticSearch/"); + StringBuilder sb = new StringBuilder("elasticsearch/"); sb.append(number); if (snapshotBuild) { sb.append("/").append(date); diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/metadata/MetaDataService.java b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/metadata/MetaDataService.java index 35d5d5d780e..71c7f50cb55 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/metadata/MetaDataService.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/metadata/MetaDataService.java @@ -226,7 +226,7 @@ public class MetaDataService extends AbstractComponent { .initializeEmpty(newMetaData.index(index)); routingTableBuilder.add(indexRoutingBuilder); - logger.info("Creating Index [{}], cause [{}], shards [{}]/[{}], mappings {}", index, cause, indexMetaData.numberOfShards(), indexMetaData.numberOfReplicas(), fMappings.keySet()); + logger.info("creating Index [{}], cause [{}], shards [{}]/[{}], mappings {}", index, cause, indexMetaData.numberOfShards(), indexMetaData.numberOfReplicas(), fMappings.keySet()); RoutingTable newRoutingTable = shardsRoutingStrategy.reroute(newClusterStateBuilder().state(currentState).routingTable(routingTableBuilder).metaData(newMetaData).build()); return newClusterStateBuilder().state(currentState).routingTable(newRoutingTable).metaData(newMetaData).build(); } @@ -254,7 +254,7 @@ public class MetaDataService extends AbstractComponent { try { mappings.put(fileNameNoSuffix, Streams.copyToString(new FileReader(mappingFile))); } catch (IOException e) { - logger.warn("Failed to read mapping [" + fileNameNoSuffix + "] from location [" + mappingFile + "], ignoring...", e); + logger.warn("failed to read mapping [" + fileNameNoSuffix + "] from location [" + mappingFile + "], ignoring...", e); } } } @@ -265,7 +265,7 @@ public class MetaDataService extends AbstractComponent { throw new IndexMissingException(new Index(index)); } - logger.info("Deleting index [{}]", index); + logger.info("deleting index [{}]", index); final CountDownLatch latch = new CountDownLatch(clusterService.state().nodes().size()); NodeIndexDeletedAction.Listener listener = new NodeIndexDeletedAction.Listener() { @@ -320,9 +320,9 @@ public class MetaDataService extends AbstractComponent { // build the updated mapping source final String updatedMappingSource = existingMapper.buildSource(); if (logger.isDebugEnabled()) { - logger.debug("Index [" + index + "]: Update mapping [" + type + "] (dynamic) with source [" + updatedMappingSource + "]"); + logger.debug("index [" + index + "]: Update mapping [" + type + "] (dynamic) with source [" + updatedMappingSource + "]"); } else if (logger.isInfoEnabled()) { - logger.info("Index [" + index + "]: Update mapping [" + type + "] (dynamic)"); + logger.info("index [" + index + "]: Update mapping [" + type + "] (dynamic)"); } // publish the new mapping clusterService.submitStateUpdateTask("update-mapping [" + index + "][" + type + "]", new ClusterStateUpdateTask() { @@ -396,9 +396,9 @@ public class MetaDataService extends AbstractComponent { } mappings.put(index, mapping); if (logger.isDebugEnabled()) { - logger.debug("Index [" + index + "]: Put mapping [" + mapping.v1() + "] with source [" + mapping.v2() + "]"); + logger.debug("index [" + index + "]: Put mapping [" + mapping.v1() + "] with source [" + mapping.v2() + "]"); } else if (logger.isInfoEnabled()) { - logger.info("Index [" + index + "]: Put mapping [" + mapping.v1() + "]"); + logger.info("index [" + index + "]: Put mapping [" + mapping.v1() + "]"); } } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/common/transport/BoundTransportAddress.java b/modules/elasticsearch/src/main/java/org/elasticsearch/common/transport/BoundTransportAddress.java index 97e6ee5c33b..5ae75ca3509 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/common/transport/BoundTransportAddress.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/common/transport/BoundTransportAddress.java @@ -71,6 +71,6 @@ public class BoundTransportAddress implements Streamable { } @Override public String toString() { - return "bound_address[" + boundAddress + "], publish_address[" + publishAddress + "]"; + return "bound_address {" + boundAddress + "}, publish_address {" + publishAddress + "}"; } } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/env/EnvironmentModule.java b/modules/elasticsearch/src/main/java/org/elasticsearch/env/EnvironmentModule.java index 18b0bbd7376..bc04fe442a5 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/env/EnvironmentModule.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/env/EnvironmentModule.java @@ -22,7 +22,7 @@ package org.elasticsearch.env; import org.elasticsearch.common.inject.AbstractModule; /** - * @author kimchy (Shay Banon) + * @author kimchy (shay.banon) */ public class EnvironmentModule extends AbstractModule { diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/env/NodeEnvironment.java b/modules/elasticsearch/src/main/java/org/elasticsearch/env/NodeEnvironment.java new file mode 100644 index 00000000000..236cb794f8e --- /dev/null +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/env/NodeEnvironment.java @@ -0,0 +1,93 @@ +/* + * Licensed to Elastic Search and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Elastic Search licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.env; + +import org.apache.lucene.store.Lock; +import org.apache.lucene.store.NativeFSLockFactory; +import org.elasticsearch.common.component.AbstractComponent; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.settings.Settings; + +import java.io.File; +import java.io.IOException; + +import static org.elasticsearch.common.settings.ImmutableSettings.Builder.*; + +/** + * @author kimchy (shay.banon) + */ +public class NodeEnvironment extends AbstractComponent { + + private final File nodeFile; + + private final Lock lock; + + public NodeEnvironment(File nodeFile) { + super(EMPTY_SETTINGS); + this.nodeFile = nodeFile; + this.lock = null; + } + + @Inject public NodeEnvironment(Settings settings, Environment environment) throws IOException { + super(settings); + + Lock lock = null; + File dir = null; + for (int i = 0; i < 100; i++) { + dir = new File(new File(environment.workWithClusterFile(), "nodes"), Integer.toString(i)); + if (!dir.exists()) { + dir.mkdirs(); + } + try { + NativeFSLockFactory lockFactory = new NativeFSLockFactory(dir); + Lock tmpLock = lockFactory.makeLock("node.lock"); + boolean obtained = tmpLock.obtain(); + if (obtained) { + lock = tmpLock; + break; + } + } catch (IOException e) { + // ignore + } + } + if (lock == null) { + throw new IOException("Failed to obtain node lock"); + } + this.lock = lock; + this.nodeFile = dir; + if (logger.isDebugEnabled()) { + logger.debug("using node location [{}]", dir); + } + } + + public File nodeFile() { + return nodeFile; + } + + public void close() { + if (lock != null) { + try { + lock.release(); + } catch (IOException e) { + // ignore + } + } + } +} diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/env/NodeEnvironmentModule.java b/modules/elasticsearch/src/main/java/org/elasticsearch/env/NodeEnvironmentModule.java new file mode 100644 index 00000000000..3a5bdd9cfdf --- /dev/null +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/env/NodeEnvironmentModule.java @@ -0,0 +1,35 @@ +/* + * Licensed to Elastic Search and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Elastic Search licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.env; + +import org.elasticsearch.common.inject.AbstractModule; + +/** + * @author kimchy (shay.banon) + */ +public class NodeEnvironmentModule extends AbstractModule { + + public NodeEnvironmentModule() { + } + + @Override protected void configure() { + bind(NodeEnvironment.class).asEagerSingleton(); + } +} \ No newline at end of file diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/engine/robin/RobinEngine.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/engine/robin/RobinEngine.java index a4eaba33be7..b43a1a5df44 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/engine/robin/RobinEngine.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/engine/robin/RobinEngine.java @@ -124,11 +124,8 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine, logger.trace("Shard is locked, releasing lock"); store.directory().clearLock(IndexWriter.WRITE_LOCK_NAME); } - IndexWriter writer = new IndexWriter(store.directory(), analysisService.defaultIndexAnalyzer(), true, IndexWriter.MaxFieldLength.UNLIMITED); - writer.commit(); - writer.close(); } catch (IOException e) { - logger.warn("Failed to clean the index", e); + logger.warn("Failed to check if index is locked", e); } } @@ -454,11 +451,12 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine, this.nrtResource.forceClose(); } try { + // no need to commit in this case!, we snapshot before we close the shard, so translog and all sync'ed if (indexWriter != null) { - indexWriter.close(); + indexWriter.rollback(); } } catch (IOException e) { - throw new CloseEngineException(shardId, "Failed to close engine", e); + logger.debug("failed to rollback writer on close", e); } finally { indexWriter = null; rwl.writeLock().unlock(); diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/IndexShardGateway.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/IndexShardGateway.java index ea3f58599ad..dbf6481b1b1 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/IndexShardGateway.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/IndexShardGateway.java @@ -234,15 +234,23 @@ public interface IndexShardGateway extends IndexShardComponent, CloseableIndexCo } public static class Index { + public static final Index EMPTY = new Index(-1, 0, new ByteSizeValue(0), 0, new ByteSizeValue(0), timeValueMillis(0)); + private long version; private int numberOfFiles; private ByteSizeValue totalSize; + private int numberOfExistingFiles; + private ByteSizeValue existingTotalSize; private TimeValue throttlingWaitTime; - public Index(long version, int numberOfFiles, ByteSizeValue totalSize, TimeValue throttlingWaitTime) { + public Index(long version, int numberOfFiles, ByteSizeValue totalSize, + int numberOfExistingFiles, ByteSizeValue existingTotalSize, + TimeValue throttlingWaitTime) { this.version = version; this.numberOfFiles = numberOfFiles; this.totalSize = totalSize; + this.numberOfExistingFiles = numberOfExistingFiles; + this.existingTotalSize = existingTotalSize; this.throttlingWaitTime = throttlingWaitTime; } @@ -258,6 +266,14 @@ public interface IndexShardGateway extends IndexShardComponent, CloseableIndexCo return totalSize; } + public int numberOfExistingFiles() { + return numberOfExistingFiles; + } + + public ByteSizeValue existingTotalSize() { + return existingTotalSize; + } + public TimeValue throttlingWaitTime() { return throttlingWaitTime; } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/IndexShardGatewayService.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/IndexShardGatewayService.java index 377cb95a78f..822877c7b47 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/IndexShardGatewayService.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/IndexShardGatewayService.java @@ -40,7 +40,6 @@ import org.elasticsearch.index.translog.Translog; import org.elasticsearch.indices.recovery.throttler.RecoveryThrottler; import org.elasticsearch.threadpool.ThreadPool; -import java.io.IOException; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.atomic.AtomicBoolean; @@ -104,12 +103,6 @@ public class IndexShardGatewayService extends AbstractIndexShardComponent implem throw new ElasticSearchIllegalStateException("Trying to recover when the shard is in backup state"); } - // clear the store, we are going to recover into it - try { - store.deleteContent(); - } catch (IOException e) { - logger.debug("Failed to delete store before recovery from gateway", e); - } indexShard.recovering(); StopWatch throttlingWaitTime = new StopWatch().start(); @@ -128,7 +121,7 @@ public class IndexShardGatewayService extends AbstractIndexShardComponent implem throttlingWaitTime.stop(); try { - logger.debug("Starting recovery from {}", shardGateway); + logger.debug("starting recovery from {}", shardGateway); StopWatch stopWatch = new StopWatch().start(); IndexShardGateway.RecoveryStatus recoveryStatus = shardGateway.recover(); @@ -143,9 +136,10 @@ public class IndexShardGatewayService extends AbstractIndexShardComponent implem stopWatch.stop(); if (logger.isDebugEnabled()) { StringBuilder sb = new StringBuilder(); - sb.append("Recovery completed from ").append(shardGateway).append(", took [").append(stopWatch.totalTime()).append("], throttling_wait [").append(throttlingWaitTime.totalTime()).append("]\n"); - sb.append(" Index : number_of_files [").append(recoveryStatus.index().numberOfFiles()).append("] with total_size [").append(recoveryStatus.index().totalSize()).append("], throttling_wait [").append(recoveryStatus.index().throttlingWaitTime()).append("]\n"); - sb.append(" Translog : translog_id [").append(recoveryStatus.translog().translogId()).append("], number_of_operations [").append(recoveryStatus.translog().numberOfOperations()).append("] with total_size[").append(recoveryStatus.translog().totalSize()).append("]"); + sb.append("recovery completed from ").append(shardGateway).append(", took [").append(stopWatch.totalTime()).append("], throttling_wait [").append(throttlingWaitTime.totalTime()).append("]\n"); + sb.append(" index : number_of_files [").append(recoveryStatus.index().numberOfFiles()).append("] with total_size [").append(recoveryStatus.index().totalSize()).append("], throttling_wait [").append(recoveryStatus.index().throttlingWaitTime()).append("]\n"); + sb.append(" : reusing_files [").append(recoveryStatus.index().numberOfExistingFiles()).append("] with total_size [").append(recoveryStatus.index().existingTotalSize()).append("]\n"); + sb.append(" translog : translog_id [").append(recoveryStatus.translog().translogId()).append("], number_of_operations [").append(recoveryStatus.translog().numberOfOperations()).append("] with total_size[").append(recoveryStatus.translog().totalSize()).append("]"); logger.debug(sb.toString()); } // refresh the shard @@ -190,9 +184,9 @@ public class IndexShardGatewayService extends AbstractIndexShardComponent implem if (snapshotStatus != IndexShardGateway.SnapshotStatus.NA) { if (logger.isDebugEnabled()) { StringBuilder sb = new StringBuilder(); - sb.append("Snapshot completed to ").append(shardGateway).append(", took [").append(snapshotStatus.totalTime()).append("]\n"); - sb.append(" Index : number_of_files [").append(snapshotStatus.index().numberOfFiles()).append("] with total_size [").append(snapshotStatus.index().totalSize()).append("], took [").append(snapshotStatus.index().time()).append("]\n"); - sb.append(" Translog : number_of_operations [").append(snapshotStatus.translog().numberOfOperations()).append("], took [").append(snapshotStatus.translog().time()).append("]"); + sb.append("snapshot completed to ").append(shardGateway).append(", took [").append(snapshotStatus.totalTime()).append("]\n"); + sb.append(" index : number_of_files [").append(snapshotStatus.index().numberOfFiles()).append("] with total_size [").append(snapshotStatus.index().totalSize()).append("], took [").append(snapshotStatus.index().time()).append("]\n"); + sb.append(" translog : number_of_operations [").append(snapshotStatus.translog().numberOfOperations()).append("], took [").append(snapshotStatus.translog().time()).append("]"); logger.debug(sb.toString()); } } @@ -211,11 +205,11 @@ public class IndexShardGatewayService extends AbstractIndexShardComponent implem snapshotScheduleFuture = null; } if (!delete && snapshotOnClose) { - logger.debug("Snapshotting on close ..."); + logger.debug("snapshotting on close ..."); try { snapshot(); } catch (Exception e) { - logger.warn("Failed to snapshot on close", e); + logger.warn("failed to snapshot on close", e); } } // don't really delete the shard gateway if we are *not* primary, @@ -245,7 +239,7 @@ public class IndexShardGatewayService extends AbstractIndexShardComponent implem if (snapshotInterval.millis() != -1) { // we need to schedule snapshot if (logger.isDebugEnabled()) { - logger.debug("Scheduling snapshot every [{}]", snapshotInterval); + logger.debug("scheduling snapshot every [{}]", snapshotInterval); } snapshotScheduleFuture = threadPool.scheduleWithFixedDelay(new SnapshotRunnable(), snapshotInterval); } @@ -256,7 +250,7 @@ public class IndexShardGatewayService extends AbstractIndexShardComponent implem try { snapshot(); } catch (Exception e) { - logger.warn("Failed to snapshot (scheduled)", e); + logger.warn("failed to snapshot (scheduled)", e); } } } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/blobstore/BlobStoreIndexShardGateway.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/blobstore/BlobStoreIndexShardGateway.java index d7e18b23453..b6247d6cbbc 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/blobstore/BlobStoreIndexShardGateway.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/blobstore/BlobStoreIndexShardGateway.java @@ -403,7 +403,7 @@ public abstract class BlobStoreIndexShardGateway extends AbstractIndexShardCompo return new RecoveryStatus.Translog(lastTranslogId, operations.size(), new ByteSizeValue(translogData.length, ByteSizeUnit.BYTES)); } catch (Exception e) { lastException = e; - logger.debug("Failed to read translog, will try the next one", e); + logger.debug("failed to read translog, will try the next one", e); } } throw new IndexShardGatewayRecoveryException(shardId, "Failed to recovery translog", lastException); @@ -418,6 +418,11 @@ public abstract class BlobStoreIndexShardGateway extends AbstractIndexShardCompo } TObjectLongHashMap combinedBlobs = buildCombinedPartsBlobs(blobs); + int numberOfFiles = 0; + long totalSize = 0; + int numberOfExistingFiles = 0; + long existingTotalSize = 0; + // filter out only the files that we need to recover, and reuse ones that exists in the store List filesToRecover = new ArrayList(); for (TObjectLongIterator it = combinedBlobs.iterator(); it.hasNext();) { @@ -425,10 +430,14 @@ public abstract class BlobStoreIndexShardGateway extends AbstractIndexShardCompo // if the store has the file, and it has the same length, don't recover it try { if (store.directory().fileExists(it.key()) && store.directory().fileLength(it.key()) == it.value()) { + numberOfExistingFiles++; + existingTotalSize += it.value(); if (logger.isTraceEnabled()) { logger.trace("not recovering [{}], exists in local store and has same size [{}]", it.key(), new ByteSizeValue(it.value())); } } else { + numberOfFiles++; + totalSize += it.value(); filesToRecover.add(it.key()); } } catch (Exception e) { @@ -437,12 +446,10 @@ public abstract class BlobStoreIndexShardGateway extends AbstractIndexShardCompo } } - long totalSize = 0; final AtomicLong throttlingWaitTime = new AtomicLong(); final CountDownLatch latch = new CountDownLatch(filesToRecover.size()); final CopyOnWriteArrayList failures = new CopyOnWriteArrayList(); for (final String fileToRecover : filesToRecover) { - totalSize += combinedBlobs.get(fileToRecover); if (recoveryThrottler.tryStream(shardId, fileToRecover)) { // we managed to get a recovery going recoverFile(fileToRecover, blobs, latch, failures); @@ -497,7 +504,7 @@ public abstract class BlobStoreIndexShardGateway extends AbstractIndexShardCompo // ignore } - return new RecoveryStatus.Index(version, filesToRecover.size(), new ByteSizeValue(totalSize, ByteSizeUnit.BYTES), TimeValue.timeValueMillis(throttlingWaitTime.get())); + return new RecoveryStatus.Index(version, numberOfFiles, new ByteSizeValue(totalSize), numberOfExistingFiles, new ByteSizeValue(existingTotalSize), TimeValue.timeValueMillis(throttlingWaitTime.get())); } private void recoverFile(final String fileToRecover, final ImmutableMap blobs, final CountDownLatch latch, final List failures) { diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/none/NoneIndexShardGateway.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/none/NoneIndexShardGateway.java index 56ab87e56d0..11cdd0247e5 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/none/NoneIndexShardGateway.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/none/NoneIndexShardGateway.java @@ -23,7 +23,6 @@ import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; -import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.gateway.none.NoneGateway; import org.elasticsearch.index.gateway.IndexShardGateway; import org.elasticsearch.index.gateway.IndexShardGatewayRecoveryException; @@ -48,7 +47,7 @@ public class NoneIndexShardGateway extends AbstractIndexShardComponent implement @Override public RecoveryStatus recover() throws IndexShardGatewayRecoveryException { // in the none case, we simply start the shard indexShard.start(); - return new RecoveryStatus(new RecoveryStatus.Index(-1, 0, new ByteSizeValue(0, ByteSizeUnit.BYTES), TimeValue.timeValueMillis(0)), new RecoveryStatus.Translog(-1, 0, new ByteSizeValue(0, ByteSizeUnit.BYTES))); + return new RecoveryStatus(RecoveryStatus.Index.EMPTY, new RecoveryStatus.Translog(-1, 0, new ByteSizeValue(0, ByteSizeUnit.BYTES))); } @Override public String type() { diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/service/InternalIndexService.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/service/InternalIndexService.java index 0b50a73dc71..d2d6a929a6d 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/service/InternalIndexService.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/service/InternalIndexService.java @@ -29,6 +29,7 @@ import org.elasticsearch.common.inject.Injector; import org.elasticsearch.common.inject.Injectors; import org.elasticsearch.common.inject.Module; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.gateway.none.NoneGateway; import org.elasticsearch.index.AbstractIndexComponent; import org.elasticsearch.index.Index; import org.elasticsearch.index.IndexShardAlreadyExistsException; @@ -94,6 +95,10 @@ public class InternalIndexService extends AbstractIndexComponent implements Inde private final IndexEngine indexEngine; + private final IndexGateway indexGateway; + + private final IndexStore indexStore; + private final OperationRouting operationRouting; private volatile ImmutableMap shardsInjectors = ImmutableMap.of(); @@ -104,7 +109,7 @@ public class InternalIndexService extends AbstractIndexComponent implements Inde @Inject public InternalIndexService(Injector injector, Index index, @IndexSettings Settings indexSettings, MapperService mapperService, IndexQueryParserService queryParserService, SimilarityService similarityService, - IndexCache indexCache, IndexEngine indexEngine, OperationRouting operationRouting) { + IndexCache indexCache, IndexEngine indexEngine, IndexGateway indexGateway, IndexStore indexStore, OperationRouting operationRouting) { super(index, indexSettings); this.injector = injector; this.indexSettings = indexSettings; @@ -113,6 +118,8 @@ public class InternalIndexService extends AbstractIndexComponent implements Inde this.similarityService = similarityService; this.indexCache = indexCache; this.indexEngine = indexEngine; + this.indexGateway = indexGateway; + this.indexStore = indexStore; this.operationRouting = operationRouting; this.pluginsService = injector.getInstance(PluginsService.class); @@ -207,7 +214,7 @@ public class InternalIndexService extends AbstractIndexComponent implements Inde indicesLifecycle.beforeIndexShardCreated(shardId); - logger.debug("Creating shard_id[{}]", shardId.id()); + logger.debug("creating shard_id [{}]", shardId.id()); List modules = Lists.newArrayList(); modules.add(new ShardsPluginsModule(indexSettings, pluginsService)); @@ -228,12 +235,14 @@ public class InternalIndexService extends AbstractIndexComponent implements Inde IndexShard indexShard = shardInjector.getInstance(IndexShard.class); - // clean the store - Store store = shardInjector.getInstance(Store.class); - try { - store.deleteContent(); - } catch (IOException e) { - logger.warn("Failed to clean store on shard creation", e); + // if there is no gateway, clean the store, since we won't recover into it + if (indexGateway.type().equals(NoneGateway.TYPE)) { + Store store = shardInjector.getInstance(Store.class); + try { + store.deleteContent(); + } catch (IOException e) { + logger.warn("failed to clean store on shard creation", e); + } } indicesLifecycle.afterIndexShardCreated(indexShard); @@ -258,7 +267,7 @@ public class InternalIndexService extends AbstractIndexComponent implements Inde } shardsInjectors = ImmutableMap.copyOf(tmpShardInjectors); if (delete) { - logger.debug("Deleting shard_id[{}]", shardId); + logger.debug("deleting shard_id [{}]", shardId); } Map tmpShardsMap = newHashMap(shards); @@ -287,16 +296,20 @@ public class InternalIndexService extends AbstractIndexComponent implements Inde // call this before we close the store, so we can release resources for it indicesLifecycle.afterIndexShardClosed(indexShard.shardId(), delete); + // if we delete or have no gateway or the store is not persistent, clean the store... Store store = shardInjector.getInstance(Store.class); - try { - store.fullDelete(); - } catch (IOException e) { - logger.warn("Failed to clean store on shard deletion", e); + if (delete || indexGateway.type().equals(NoneGateway.TYPE) || !indexStore.persistent()) { + try { + store.fullDelete(); + } catch (IOException e) { + logger.warn("failed to clean store on shard deletion", e); + } } + // and close it try { store.close(); } catch (IOException e) { - logger.warn("Failed to close store on shard deletion", e); + logger.warn("failed to close store on shard deletion", e); } Injectors.close(injector); diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/store/fs/FsIndexStore.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/store/fs/FsIndexStore.java index 71cbae0d326..6303e8f458b 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/store/fs/FsIndexStore.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/store/fs/FsIndexStore.java @@ -20,10 +20,9 @@ package org.elasticsearch.index.store.fs; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.env.Environment; +import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.index.AbstractIndexComponent; import org.elasticsearch.index.Index; -import org.elasticsearch.index.LocalNodeId; import org.elasticsearch.index.settings.IndexSettings; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.store.IndexStore; @@ -37,15 +36,13 @@ public abstract class FsIndexStore extends AbstractIndexComponent implements Ind private final File location; - public FsIndexStore(Index index, @IndexSettings Settings indexSettings, Environment environment, @LocalNodeId String localNodeId) { + public FsIndexStore(Index index, @IndexSettings Settings indexSettings, NodeEnvironment nodeEnv) { super(index, indexSettings); - this.location = new File(new File(new File(environment.workWithClusterFile(), "indices"), localNodeId), index.name()); + this.location = new File(new File(nodeEnv.nodeFile(), "indices"), index.name()); if (!location.exists()) { - boolean result = false; for (int i = 0; i < 5; i++) { - result = location.mkdirs(); - if (result) { + if (location.mkdirs()) { break; } } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/store/fs/FsStore.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/store/fs/FsStore.java index 47761f5c875..2153b4d1a4d 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/store/fs/FsStore.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/store/fs/FsStore.java @@ -19,8 +19,7 @@ package org.elasticsearch.index.store.fs; -import org.apache.lucene.store.Directory; -import org.apache.lucene.store.FSDirectory; +import org.apache.lucene.store.*; import org.elasticsearch.common.collect.ImmutableSet; import org.elasticsearch.common.io.FileSystemUtils; import org.elasticsearch.common.lucene.store.SwitchDirectory; @@ -55,6 +54,17 @@ public abstract class FsStore extends AbstractStore { public abstract FSDirectory fsDirectory(); + protected LockFactory buildLockFactory() throws IOException { + String fsLock = componentSettings.get("fs_lock", "native"); + LockFactory lockFactory = new NoLockFactory(); + if (fsLock.equals("native")) { + lockFactory = new NativeFSLockFactory(); + } else if (fsLock.equals("simple")) { + lockFactory = new SimpleFSLockFactory(); + } + return lockFactory; + } + protected SwitchDirectory buildSwitchDirectoryIfNeeded(Directory fsDirectory) { boolean cache = componentSettings.getAsBoolean("memory.enabled", false); if (!cache) { diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/store/fs/MmapFsIndexStore.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/store/fs/MmapFsIndexStore.java index c77c2432499..b25652a78a4 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/store/fs/MmapFsIndexStore.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/store/fs/MmapFsIndexStore.java @@ -21,9 +21,8 @@ package org.elasticsearch.index.store.fs; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.env.Environment; +import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.index.Index; -import org.elasticsearch.index.LocalNodeId; import org.elasticsearch.index.settings.IndexSettings; import org.elasticsearch.index.store.Store; @@ -32,8 +31,8 @@ import org.elasticsearch.index.store.Store; */ public class MmapFsIndexStore extends FsIndexStore { - @Inject public MmapFsIndexStore(Index index, @IndexSettings Settings indexSettings, Environment environment, @LocalNodeId String localNodeId) { - super(index, indexSettings, environment, localNodeId); + @Inject public MmapFsIndexStore(Index index, @IndexSettings Settings indexSettings, NodeEnvironment nodeEnv) { + super(index, indexSettings, nodeEnv); } @Override public Class shardStoreClass() { diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/store/fs/MmapFsIndexStoreModule.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/store/fs/MmapFsIndexStoreModule.java index b192e3305ed..7261bde83dd 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/store/fs/MmapFsIndexStoreModule.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/store/fs/MmapFsIndexStoreModule.java @@ -23,7 +23,7 @@ import org.elasticsearch.common.inject.AbstractModule; import org.elasticsearch.index.store.IndexStore; /** - * @author kimchy (Shay Banon) + * @author kimchy (shay.banon) */ public class MmapFsIndexStoreModule extends AbstractModule { diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/store/fs/MmapFsStore.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/store/fs/MmapFsStore.java index 45f10949ec6..396f8252240 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/store/fs/MmapFsStore.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/store/fs/MmapFsStore.java @@ -19,7 +19,10 @@ package org.elasticsearch.index.store.fs; -import org.apache.lucene.store.*; +import org.apache.lucene.store.Directory; +import org.apache.lucene.store.FSDirectory; +import org.apache.lucene.store.LockFactory; +import org.apache.lucene.store.MMapDirectory; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.lucene.store.SwitchDirectory; import org.elasticsearch.common.settings.Settings; @@ -48,14 +51,9 @@ public class MmapFsStore extends FsStore { super(shardId, indexSettings); // by default, we don't need to sync to disk, since we use the gateway this.syncToDisk = componentSettings.getAsBoolean("sync_to_disk", false); - String fsLock = componentSettings.get("use_fs_lock", "none"); - LockFactory lockFactory = new NoLockFactory(); - if (fsLock.equals("native")) { - lockFactory = new NativeFSLockFactory(); - } else if (fsLock.equals("simple")) { - lockFactory = new SimpleFSLockFactory(); - } + LockFactory lockFactory = buildLockFactory(); File location = ((FsIndexStore) indexStore).shardLocation(shardId); + location.mkdirs(); this.fsDirectory = new CustomMMapDirectory(location, lockFactory, syncToDisk); SwitchDirectory switchDirectory = buildSwitchDirectoryIfNeeded(fsDirectory); diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/store/fs/NioFsIndexStore.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/store/fs/NioFsIndexStore.java index 9527b8405e2..ad4e569666c 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/store/fs/NioFsIndexStore.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/store/fs/NioFsIndexStore.java @@ -21,9 +21,8 @@ package org.elasticsearch.index.store.fs; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.env.Environment; +import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.index.Index; -import org.elasticsearch.index.LocalNodeId; import org.elasticsearch.index.settings.IndexSettings; import org.elasticsearch.index.store.Store; @@ -32,8 +31,8 @@ import org.elasticsearch.index.store.Store; */ public class NioFsIndexStore extends FsIndexStore { - @Inject public NioFsIndexStore(Index index, @IndexSettings Settings indexSettings, Environment environment, @LocalNodeId String localNodeId) { - super(index, indexSettings, environment, localNodeId); + @Inject public NioFsIndexStore(Index index, @IndexSettings Settings indexSettings, NodeEnvironment nodeEnv) { + super(index, indexSettings, nodeEnv); } @Override public Class shardStoreClass() { diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/store/fs/NioFsStore.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/store/fs/NioFsStore.java index 2c4db8a5d6e..58b7a04b75a 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/store/fs/NioFsStore.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/store/fs/NioFsStore.java @@ -19,7 +19,10 @@ package org.elasticsearch.index.store.fs; -import org.apache.lucene.store.*; +import org.apache.lucene.store.Directory; +import org.apache.lucene.store.FSDirectory; +import org.apache.lucene.store.LockFactory; +import org.apache.lucene.store.NIOFSDirectory; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.lucene.store.SwitchDirectory; import org.elasticsearch.common.settings.Settings; @@ -32,7 +35,7 @@ import java.io.File; import java.io.IOException; /** - * @author kimchy (Shay Banon) + * @author kimchy (shay.banon) */ public class NioFsStore extends FsStore { @@ -48,14 +51,9 @@ public class NioFsStore extends FsStore { super(shardId, indexSettings); // by default, we don't need to sync to disk, since we use the gateway this.syncToDisk = componentSettings.getAsBoolean("sync_to_disk", false); - String fsLock = componentSettings.get("use_fs_lock", "none"); - LockFactory lockFactory = new NoLockFactory(); - if (fsLock.equals("native")) { - lockFactory = new NativeFSLockFactory(); - } else if (fsLock.equals("simple")) { - lockFactory = new SimpleFSLockFactory(); - } + LockFactory lockFactory = buildLockFactory(); File location = ((FsIndexStore) indexStore).shardLocation(shardId); + location.mkdirs(); this.fsDirectory = new CustomNioFSDirectory(location, lockFactory, syncToDisk); SwitchDirectory switchDirectory = buildSwitchDirectoryIfNeeded(fsDirectory); diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/store/fs/SimpleFsIndexStore.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/store/fs/SimpleFsIndexStore.java index b173d5d769b..b88441d4529 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/store/fs/SimpleFsIndexStore.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/store/fs/SimpleFsIndexStore.java @@ -21,9 +21,8 @@ package org.elasticsearch.index.store.fs; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.env.Environment; +import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.index.Index; -import org.elasticsearch.index.LocalNodeId; import org.elasticsearch.index.settings.IndexSettings; import org.elasticsearch.index.store.Store; @@ -32,8 +31,8 @@ import org.elasticsearch.index.store.Store; */ public class SimpleFsIndexStore extends FsIndexStore { - @Inject public SimpleFsIndexStore(Index index, @IndexSettings Settings indexSettings, Environment environment, @LocalNodeId String localNodeId) { - super(index, indexSettings, environment, localNodeId); + @Inject public SimpleFsIndexStore(Index index, @IndexSettings Settings indexSettings, NodeEnvironment nodeEnv) { + super(index, indexSettings, nodeEnv); } @Override public Class shardStoreClass() { diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/store/fs/SimpleFsStore.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/store/fs/SimpleFsStore.java index f28028f82d5..6bc5299b02a 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/store/fs/SimpleFsStore.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/store/fs/SimpleFsStore.java @@ -19,7 +19,10 @@ package org.elasticsearch.index.store.fs; -import org.apache.lucene.store.*; +import org.apache.lucene.store.Directory; +import org.apache.lucene.store.FSDirectory; +import org.apache.lucene.store.LockFactory; +import org.apache.lucene.store.SimpleFSDirectory; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.lucene.store.SwitchDirectory; import org.elasticsearch.common.settings.Settings; @@ -32,7 +35,7 @@ import java.io.File; import java.io.IOException; /** - * @author kimchy (Shay Banon) + * @author kimchy (shay.banon) */ public class SimpleFsStore extends FsStore { @@ -48,14 +51,9 @@ public class SimpleFsStore extends FsStore { super(shardId, indexSettings); // by default, we don't need to sync to disk, since we use the gateway this.syncToDisk = componentSettings.getAsBoolean("sync_to_disk", false); - String fsLock = componentSettings.get("use_fs_lock", "none"); - LockFactory lockFactory = new NoLockFactory(); - if (fsLock.equals("native")) { - lockFactory = new NativeFSLockFactory(); - } else if (fsLock.equals("simple")) { - lockFactory = new SimpleFSLockFactory(); - } + LockFactory lockFactory = buildLockFactory(); File location = ((FsIndexStore) indexStore).shardLocation(shardId); + location.mkdirs(); this.fsDirectory = new CustomSimpleFSDirectory(location, lockFactory, syncToDisk); SwitchDirectory switchDirectory = buildSwitchDirectoryIfNeeded(fsDirectory); diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/store/support/AbstractStore.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/store/support/AbstractStore.java index 7dc371b8f55..fdc27ca55a5 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/store/support/AbstractStore.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/store/support/AbstractStore.java @@ -31,7 +31,7 @@ import org.elasticsearch.index.store.Store; import java.io.IOException; /** - * @author kimchy (Shay Banon) + * @author kimchy (shay.banon) */ public abstract class AbstractStore extends AbstractIndexShardComponent implements Store { diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/indices/InternalIndicesService.java b/modules/elasticsearch/src/main/java/org/elasticsearch/indices/InternalIndicesService.java index 555fa54329a..a087187238d 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/indices/InternalIndicesService.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/indices/InternalIndicesService.java @@ -168,7 +168,7 @@ public class InternalIndicesService extends AbstractLifecycleComponent tmpMap = newHashMap(indices); diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/jmx/JmxService.java b/modules/elasticsearch/src/main/java/org/elasticsearch/jmx/JmxService.java index 8409f78e10c..ee685f00306 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/jmx/JmxService.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/jmx/JmxService.java @@ -126,7 +126,7 @@ public class JmxService { if (!success) { throw new JmxConnectorCreationException("Failed to bind to [" + port + "]", lastException.get()); } - logger.info("bound_address[{}], publish_address[{}]", serviceUrl, publishUrl); + logger.info("bound_address {{}}, publish_address {{}}", serviceUrl, publishUrl); } for (ResourceDMBean resource : constructionMBeans) { diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/node/internal/InternalNode.java b/modules/elasticsearch/src/main/java/org/elasticsearch/node/internal/InternalNode.java index c87c2e7a887..247367dbbf2 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/node/internal/InternalNode.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/node/internal/InternalNode.java @@ -38,7 +38,6 @@ import org.elasticsearch.common.inject.Guice; import org.elasticsearch.common.inject.Injector; import org.elasticsearch.common.inject.Injectors; import org.elasticsearch.common.inject.Module; -import org.elasticsearch.common.io.FileSystemUtils; import org.elasticsearch.common.logging.ESLogger; import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.common.network.NetworkModule; @@ -50,6 +49,8 @@ import org.elasticsearch.discovery.DiscoveryModule; import org.elasticsearch.discovery.DiscoveryService; import org.elasticsearch.env.Environment; import org.elasticsearch.env.EnvironmentModule; +import org.elasticsearch.env.NodeEnvironment; +import org.elasticsearch.env.NodeEnvironmentModule; import org.elasticsearch.gateway.GatewayModule; import org.elasticsearch.gateway.GatewayService; import org.elasticsearch.http.HttpServer; @@ -76,7 +77,6 @@ import org.elasticsearch.timer.TimerService; import org.elasticsearch.transport.TransportModule; import org.elasticsearch.transport.TransportService; -import java.io.File; import java.util.ArrayList; import java.util.concurrent.TimeUnit; @@ -107,7 +107,7 @@ public final class InternalNode implements Node { Tuple tuple = InternalSettingsPerparer.prepareSettings(pSettings, loadConfigSettings); ESLogger logger = Loggers.getLogger(Node.class, tuple.v1().get("name")); - logger.info("{{}}[{}]: Initializing ...", Version.full(), JvmInfo.jvmInfo().pid()); + logger.info("{{}}[{}]: initializing ...", Version.full(), JvmInfo.jvmInfo().pid()); this.pluginsService = new PluginsService(tuple.v1(), tuple.v2()); this.settings = pluginsService.updatedSettings(); @@ -122,6 +122,7 @@ public final class InternalNode implements Node { modules.add(new ScriptModule()); modules.add(new JmxModule(settings)); modules.add(new EnvironmentModule(environment)); + modules.add(new NodeEnvironmentModule()); modules.add(new ClusterNameModule(settings)); modules.add(new ThreadPoolModule(settings)); modules.add(new TimerModule()); @@ -146,7 +147,7 @@ public final class InternalNode implements Node { client = injector.getInstance(Client.class); - logger.info("{{}}[{}]: Initialized", Version.full(), JvmInfo.jvmInfo().pid()); + logger.info("{{}}[{}]: initialized", Version.full(), JvmInfo.jvmInfo().pid()); } @Override public Settings settings() { @@ -163,7 +164,7 @@ public final class InternalNode implements Node { } ESLogger logger = Loggers.getLogger(Node.class, settings.get("name")); - logger.info("{{}}[{}]: Starting ...", Version.full(), JvmInfo.jvmInfo().pid()); + logger.info("{{}}[{}]: starting ...", Version.full(), JvmInfo.jvmInfo().pid()); for (Class plugin : pluginsService.services()) { injector.getInstance(plugin).start(); @@ -186,7 +187,7 @@ public final class InternalNode implements Node { } injector.getInstance(JmxService.class).connectAndRegister(discoService.nodeDescription(), injector.getInstance(NetworkService.class)); - logger.info("{{}}[{}]: Started", Version.full(), JvmInfo.jvmInfo().pid()); + logger.info("{{}}[{}]: started", Version.full(), JvmInfo.jvmInfo().pid()); return this; } @@ -196,7 +197,7 @@ public final class InternalNode implements Node { return this; } ESLogger logger = Loggers.getLogger(Node.class, settings.get("name")); - logger.info("{{}}[{}]: Stopping ...", Version.full(), JvmInfo.jvmInfo().pid()); + logger.info("{{}}[{}]: stopping ...", Version.full(), JvmInfo.jvmInfo().pid()); if (settings.getAsBoolean("http.enabled", true)) { injector.getInstance(HttpServer.class).stop(); @@ -216,17 +217,11 @@ public final class InternalNode implements Node { injector.getInstance(plugin).stop(); } - // Not pretty, but here we go - try { - FileSystemUtils.deleteRecursively(new File(new File(environment.workWithClusterFile(), "indices"), - injector.getInstance(ClusterService.class).state().nodes().localNodeId())); - } catch (Exception e) { - // ignore - } + injector.getInstance(NodeEnvironment.class).close(); Injectors.close(injector); - logger.info("{{}}[{}]: Stopped", Version.full(), JvmInfo.jvmInfo().pid()); + logger.info("{{}}[{}]: stopped", Version.full(), JvmInfo.jvmInfo().pid()); return this; } @@ -240,7 +235,7 @@ public final class InternalNode implements Node { } ESLogger logger = Loggers.getLogger(Node.class, settings.get("name")); - logger.info("{{}}[{}]: Closing ...", Version.full(), JvmInfo.jvmInfo().pid()); + logger.info("{{}}[{}]: closing ...", Version.full(), JvmInfo.jvmInfo().pid()); if (settings.getAsBoolean("http.enabled", true)) { injector.getInstance(HttpServer.class).close(); @@ -276,7 +271,7 @@ public final class InternalNode implements Node { ThreadLocals.clearReferencesThreadLocals(); - logger.info("{{}}[{}]: Closed", Version.full(), JvmInfo.jvmInfo().pid()); + logger.info("{{}}[{}]: closed", Version.full(), JvmInfo.jvmInfo().pid()); } public Injector injector() { diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/plugins/PluginsService.java b/modules/elasticsearch/src/main/java/org/elasticsearch/plugins/PluginsService.java index 39a20d73f08..f780355b69c 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/plugins/PluginsService.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/plugins/PluginsService.java @@ -61,7 +61,7 @@ public class PluginsService extends AbstractComponent { Map plugins = Maps.newHashMap(); plugins.putAll(loadPluginsFromClasspath(settings)); - logger.info("Loaded {}", plugins.keySet()); + logger.info("loaded {}", plugins.keySet()); this.plugins = ImmutableMap.copyOf(plugins); } @@ -149,7 +149,7 @@ public class PluginsService extends AbstractComponent { } } if (addURL == null) { - logger.debug("Failed to find addURL method on classLoader [" + classLoader + "] to add methods"); + logger.debug("failed to find addURL method on classLoader [" + classLoader + "] to add methods"); return; } @@ -159,7 +159,7 @@ public class PluginsService extends AbstractComponent { continue; } if (logger.isTraceEnabled()) { - logger.trace("Processing [{}]", pluginFile); + logger.trace("processing [{}]", pluginFile); } String pluginNameNoExtension = pluginFile.getName().substring(0, pluginFile.getName().lastIndexOf('.')); @@ -180,7 +180,7 @@ public class PluginsService extends AbstractComponent { if (size == pluginFile.length()) { extractPlugin = false; if (logger.isTraceEnabled()) { - logger.trace("--- No need to extract plugin, same size [" + size + "]"); + logger.trace("--- no need to extract plugin, same size [" + size + "]"); } } } catch (Exception e) { @@ -198,7 +198,7 @@ public class PluginsService extends AbstractComponent { if (extractPlugin) { if (logger.isTraceEnabled()) { - logger.trace("--- Extracting plugin to [" + extractedPluginDir + "]"); + logger.trace("--- extracting plugin to [" + extractedPluginDir + "]"); } deleteRecursively(extractedPluginDir, false); @@ -216,7 +216,7 @@ public class PluginsService extends AbstractComponent { Streams.copy(zipFile.getInputStream(zipEntry), new FileOutputStream(target)); } } catch (Exception e) { - logger.warn("Failed to extract plugin [" + pluginFile + "], ignoring...", e); + logger.warn("failed to extract plugin [" + pluginFile + "], ignoring...", e); continue; } finally { if (zipFile != null) { @@ -246,7 +246,7 @@ public class PluginsService extends AbstractComponent { addURL.invoke(classLoader, jarToAdd.toURI().toURL()); } } catch (Exception e) { - logger.warn("Failed to add plugin [" + pluginFile + "]", e); + logger.warn("failed to add plugin [" + pluginFile + "]", e); } } } @@ -257,7 +257,7 @@ public class PluginsService extends AbstractComponent { try { pluginUrls = settings.getClassLoader().getResources("es-plugin.properties"); } catch (IOException e) { - logger.warn("Failed to find plugins from classpath", e); + logger.warn("failed to find plugins from classpath", e); } while (pluginUrls.hasMoreElements()) { URL pluginUrl = pluginUrls.nextElement(); @@ -280,7 +280,7 @@ public class PluginsService extends AbstractComponent { } plugins.put(plugin.name(), plugin); } catch (Exception e) { - logger.warn("Failed to load plugin from [" + pluginUrl + "]", e); + logger.warn("failed to load plugin from [" + pluginUrl + "]", e); } finally { if (is != null) { try { diff --git a/plugins/cloud/src/main/java/org/elasticsearch/index/gateway/cloud/CloudIndexShardGateway.java b/plugins/cloud/src/main/java/org/elasticsearch/index/gateway/cloud/CloudIndexShardGateway.java index f3ab138f00f..245aca62375 100644 --- a/plugins/cloud/src/main/java/org/elasticsearch/index/gateway/cloud/CloudIndexShardGateway.java +++ b/plugins/cloud/src/main/java/org/elasticsearch/index/gateway/cloud/CloudIndexShardGateway.java @@ -415,7 +415,7 @@ public class CloudIndexShardGateway extends AbstractIndexShardComponent implemen throw new IndexShardGatewayRecoveryException(shardId(), "Failed to fetch index version after copying it over", e); } - return new RecoveryStatus.Index(version, filesMetaDatas.size(), new ByteSizeValue(totalSize, ByteSizeUnit.BYTES), TimeValue.timeValueMillis(throttlingWaitTime.get())); + return new RecoveryStatus.Index(version, filesMetaDatas.size(), new ByteSizeValue(totalSize), 0, new ByteSizeValue(0), TimeValue.timeValueMillis(throttlingWaitTime.get())); } private RecoveryStatus.Translog recoverTranslog() throws IndexShardGatewayRecoveryException {