Remove memory monitor and move translog operations threshold to shard level setting, closes #312.

This commit is contained in:
kimchy 2010-08-11 12:54:00 +03:00
parent df4ece8cef
commit ee26d55296
11 changed files with 105 additions and 314 deletions

View File

@ -42,7 +42,7 @@ import javax.annotation.Nullable;
@ThreadSafe
public interface Engine extends IndexShardComponent, CloseableComponent {
void indexingBuffer(ByteSizeValue indexingBufferSize);
void updateIndexingBufferSize(ByteSizeValue indexingBufferSize);
/**
* Starts the Engine.

View File

@ -120,10 +120,10 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine,
this.similarityService = similarityService;
}
@Override public void indexingBuffer(ByteSizeValue indexingBufferSize) {
// LUCENE MONITOR - If this restriction is removed from Lucene, remove it from here
@Override public void updateIndexingBufferSize(ByteSizeValue indexingBufferSize) {
rwl.readLock().lock();
try {
// LUCENE MONITOR - If this restriction is removed from Lucene, remove it from here
if (indexingBufferSize.mbFrac() > 2048.0) {
this.indexingBufferSize = new ByteSizeValue(2048, ByteSizeUnit.MB);
} else {

View File

@ -60,6 +60,7 @@ import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.store.StoreModule;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.index.translog.TranslogModule;
import org.elasticsearch.index.translog.TranslogService;
import org.elasticsearch.indices.IndicesLifecycle;
import org.elasticsearch.indices.InternalIndicesLifecycle;
import org.elasticsearch.plugins.PluginsService;
@ -312,6 +313,13 @@ public class InternalIndexService extends AbstractIndexComponent implements Inde
}
}
try {
// now we can close the translog service, we need to close it before the we close the shard
shardInjector.getInstance(TranslogService.class).close();
} catch (Exception e) {
// ignore
}
// close shard actions
if (indexShard != null) {
shardInjector.getInstance(IndexShardManagement.class).close();

View File

@ -43,5 +43,6 @@ public class TranslogModule extends AbstractModule {
bind(Translog.class)
.to(settings.getAsClass(TranslogSettings.TYPE, FsTranslog.class))
.in(Scopes.SINGLETON);
bind(TranslogService.class).asEagerSingleton();
}
}

View File

@ -0,0 +1,91 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.translog;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.FlushNotAllowedEngineException;
import org.elasticsearch.index.settings.IndexSettings;
import org.elasticsearch.index.shard.AbstractIndexShardComponent;
import org.elasticsearch.index.shard.IndexShardState;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.service.IndexShard;
import org.elasticsearch.threadpool.ThreadPool;
import java.util.concurrent.ScheduledFuture;
import static org.elasticsearch.common.unit.TimeValue.*;
/**
* @author kimchy (shay.banon)
*/
public class TranslogService extends AbstractIndexShardComponent {
private final ThreadPool threadPool;
private final IndexShard indexShard;
private final Translog translog;
private final int flushThreshold;
private final TimeValue interval;
private ScheduledFuture future;
@Inject public TranslogService(ShardId shardId, @IndexSettings Settings indexSettings, ThreadPool threadPool, IndexShard indexShard, Translog translog) {
super(shardId, indexSettings);
this.threadPool = threadPool;
this.indexShard = indexShard;
this.translog = translog;
this.flushThreshold = componentSettings.getAsInt("flush_threshold", 5000);
this.interval = componentSettings.getAsTime("interval", timeValueMillis(1000));
this.future = threadPool.scheduleWithFixedDelay(new TranslogBasedFlush(), interval);
}
public void close() {
this.future.cancel(true);
}
private class TranslogBasedFlush implements Runnable {
@Override public void run() {
if (indexShard.state() != IndexShardState.STARTED) {
return;
}
int currentSize = translog.size();
if (currentSize > flushThreshold) {
logger.trace("flushing translog, operations [{}], breached [{}]", currentSize, flushThreshold);
try {
indexShard.flush(new Engine.Flush());
} catch (FlushNotAllowedEngineException e) {
// ignore this exception, we are not allowed to perform flush
} catch (Exception e) {
logger.warn("failed to flush shard on translog threshold", e);
}
}
}
}
}

View File

@ -88,7 +88,7 @@ public class IndexingMemoryBufferController extends AbstractComponent {
logger.debug("recalculating shard indexing buffer (reason={}), total is [{}] with [{}] shards, each shard set to [{}]", reason, indexingBuffer, shardsCount, shardIndexingBufferSize);
for (IndexService indexService : indicesService) {
for (IndexShard indexShard : indexService) {
((InternalIndexShard) indexShard).engine().indexingBuffer(shardIndexingBufferSize);
((InternalIndexShard) indexShard).engine().updateIndexingBufferSize(shardIndexingBufferSize);
}
}
}

View File

@ -32,9 +32,6 @@ import org.elasticsearch.monitor.dump.summary.SummaryDumpContributor;
import org.elasticsearch.monitor.dump.thread.ThreadDumpContributor;
import org.elasticsearch.monitor.jvm.JvmMonitorService;
import org.elasticsearch.monitor.jvm.JvmService;
import org.elasticsearch.monitor.memory.MemoryMonitor;
import org.elasticsearch.monitor.memory.MemoryMonitorService;
import org.elasticsearch.monitor.memory.alpha.AlphaMemoryMonitor;
import org.elasticsearch.monitor.network.JmxNetworkProbe;
import org.elasticsearch.monitor.network.NetworkProbe;
import org.elasticsearch.monitor.network.NetworkService;
@ -72,11 +69,6 @@ public class MonitorModule extends AbstractModule {
}
@Override protected void configure() {
bind(MemoryMonitor.class)
.to(settings.getAsClass(MonitorSettings.MEMORY_MANAGER_TYPE, AlphaMemoryMonitor.class, "org.elasticsearch.monitor.memory.", "MemoryMonitor"))
.asEagerSingleton();
bind(MemoryMonitorService.class).asEagerSingleton();
boolean sigarLoaded = false;
try {
settings.getClassLoader().loadClass("org.hyperic.sigar.Sigar");

View File

@ -25,7 +25,6 @@ import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.monitor.jvm.JvmMonitorService;
import org.elasticsearch.monitor.jvm.JvmService;
import org.elasticsearch.monitor.memory.MemoryMonitorService;
import org.elasticsearch.monitor.network.NetworkService;
import org.elasticsearch.monitor.os.OsService;
import org.elasticsearch.monitor.process.ProcessService;
@ -35,8 +34,6 @@ import org.elasticsearch.monitor.process.ProcessService;
*/
public class MonitorService extends AbstractLifecycleComponent<MonitorService> {
private final MemoryMonitorService memoryMonitorService;
private final JvmMonitorService jvmMonitorService;
private final OsService osService;
@ -47,10 +44,9 @@ public class MonitorService extends AbstractLifecycleComponent<MonitorService> {
private final NetworkService networkService;
@Inject public MonitorService(Settings settings, MemoryMonitorService memoryMonitorService, JvmMonitorService jvmMonitorService,
@Inject public MonitorService(Settings settings, JvmMonitorService jvmMonitorService,
OsService osService, ProcessService processService, JvmService jvmService, NetworkService networkService) {
super(settings);
this.memoryMonitorService = memoryMonitorService;
this.jvmMonitorService = jvmMonitorService;
this.osService = osService;
this.processService = processService;
@ -75,17 +71,14 @@ public class MonitorService extends AbstractLifecycleComponent<MonitorService> {
}
@Override protected void doStart() throws ElasticSearchException {
memoryMonitorService.start();
jvmMonitorService.start();
}
@Override protected void doStop() throws ElasticSearchException {
memoryMonitorService.stop();
jvmMonitorService.stop();
}
@Override protected void doClose() throws ElasticSearchException {
memoryMonitorService.close();
jvmMonitorService.close();
}
}

View File

@ -1,29 +0,0 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.monitor.memory;
import org.elasticsearch.common.component.LifecycleComponent;
/**
* @author kimchy (shay.banon)
*/
public interface MemoryMonitor extends LifecycleComponent<MemoryMonitor> {
}

View File

@ -1,50 +0,0 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.monitor.memory;
import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
/**
* @author kimchy (shay.banon)
*/
public class MemoryMonitorService extends AbstractLifecycleComponent<MemoryMonitorService> {
private final MemoryMonitor memoryMonitor;
@Inject public MemoryMonitorService(Settings settings, MemoryMonitor memoryMonitor) {
super(settings);
this.memoryMonitor = memoryMonitor;
}
@Override protected void doStart() throws ElasticSearchException {
memoryMonitor.start();
}
@Override protected void doStop() throws ElasticSearchException {
memoryMonitor.stop();
}
@Override protected void doClose() throws ElasticSearchException {
memoryMonitor.close();
}
}

View File

@ -1,215 +0,0 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.monitor.memory.alpha;
import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.indices.memory.IndicesMemoryCleaner;
import org.elasticsearch.monitor.memory.MemoryMonitor;
import org.elasticsearch.threadpool.ThreadPool;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.atomic.AtomicLong;
import static org.elasticsearch.common.unit.TimeValue.*;
/**
* @author kimchy (shay.banon)
*/
public class AlphaMemoryMonitor extends AbstractLifecycleComponent<MemoryMonitor> implements MemoryMonitor {
private final double upperMemoryThreshold;
private final double lowerMemoryThreshold;
private final TimeValue interval;
private final int fullThreshold;
private final int cleanThreshold;
private final ByteSizeValue minimumFlushableSizeToClean;
private final int translogNumberOfOperationsThreshold;
private final ThreadPool threadPool;
private final IndicesMemoryCleaner indicesMemoryCleaner;
private final Runtime runtime;
private final ByteSizeValue maxMemory;
private final ByteSizeValue totalMemory;
private volatile ScheduledFuture scheduledFuture;
private AtomicLong totalCleans = new AtomicLong();
private AtomicLong totalFull = new AtomicLong();
@Inject public AlphaMemoryMonitor(Settings settings, ThreadPool threadPool, IndicesMemoryCleaner indicesMemoryCleaner) {
super(settings);
this.threadPool = threadPool;
this.indicesMemoryCleaner = indicesMemoryCleaner;
this.upperMemoryThreshold = componentSettings.getAsDouble("upper_memory_threshold", 0.95);
this.lowerMemoryThreshold = componentSettings.getAsDouble("lower_memory_threshold", 0.8);
this.interval = componentSettings.getAsTime("interval", timeValueMillis(500));
this.fullThreshold = componentSettings.getAsInt("full_threshold", 2);
this.cleanThreshold = componentSettings.getAsInt("clean_threshold", 10);
this.minimumFlushableSizeToClean = componentSettings.getAsBytesSize("minimum_flushable_size_to_clean", new ByteSizeValue(5, ByteSizeUnit.MB));
this.translogNumberOfOperationsThreshold = componentSettings.getAsInt("translog_number_of_operations_threshold", 5000);
logger.debug("interval [" + interval + "], upper_memory_threshold [" + upperMemoryThreshold + "], lower_memory_threshold [" + lowerMemoryThreshold + "], translog_number_of_operations_threshold [" + translogNumberOfOperationsThreshold + "]");
this.runtime = Runtime.getRuntime();
this.maxMemory = new ByteSizeValue(runtime.maxMemory());
this.totalMemory = maxMemory.bytes() == runtime.totalMemory() ? new ByteSizeValue(runtime.totalMemory()) : null; // Xmx==Xms when the JVM was started.
}
@Override protected void doStart() throws ElasticSearchException {
scheduledFuture = threadPool.scheduleWithFixedDelay(new MemoryCleaner(), interval);
}
@Override protected void doStop() throws ElasticSearchException {
scheduledFuture.cancel(true);
}
@Override protected void doClose() throws ElasticSearchException {
}
private long freeMemory() {
return runtime.freeMemory();
}
private long totalMemory() {
return totalMemory == null ? runtime.totalMemory() : totalMemory.bytes();
}
private class MemoryCleaner implements Runnable {
private int fullCounter;
private boolean performedClean;
private int cleanCounter;
@Override public void run() {
try {
// clear unreferenced in the cache
indicesMemoryCleaner.cacheClearUnreferenced();
// try and clean translog based on a threshold, since we don't want to get a very large transaction log
// which means recovery it will take a long time (since the target re-index all this data)
IndicesMemoryCleaner.TranslogCleanResult translogCleanResult = indicesMemoryCleaner.cleanTranslog(translogNumberOfOperationsThreshold);
if (translogCleanResult.cleanedShards() > 0) {
long totalClean = totalCleans.incrementAndGet();
logger.debug("[" + totalClean + "] [Translog] " + translogCleanResult);
}
// the logic is simple, if the used memory is above the upper threshold, we need to clean
// we clean down as much as we can to down to the lower threshold
// in order not to get trashing, we only perform a clean after another clean if a the clean counter
// has expired.
// we also do the same for GC invocations
long upperMemory = maxMemory.bytes();
long totalMemory = totalMemory();
long usedMemory = totalMemory - freeMemory();
long upperThresholdMemory = (long) (upperMemory * upperMemoryThreshold);
if (usedMemory - upperThresholdMemory <= 0) {
fullCounter = 0;
performedClean = false;
cleanCounter = 0;
return;
}
if (performedClean) {
if (++cleanCounter < cleanThreshold) {
return;
}
}
long lowerThresholdMemory = (long) (upperMemory * lowerMemoryThreshold);
long memoryToClean = usedMemory - lowerThresholdMemory;
if (fullCounter++ >= fullThreshold) {
long total = totalFull.incrementAndGet();
if (logger.isInfoEnabled()) {
StringBuilder sb = new StringBuilder();
sb.append('[').append(total).append("] ");
sb.append("[Full ] Ran after [").append(fullThreshold).append("] consecutive clean swipes");
sb.append(", memory_to_clean [").append(new ByteSizeValue(memoryToClean)).append(']');
sb.append(", lower_memory_threshold [").append(new ByteSizeValue(lowerThresholdMemory)).append(']');
sb.append(", upper_memory_threshold [").append(new ByteSizeValue(upperThresholdMemory)).append(']');
sb.append(", used_memory [").append(new ByteSizeValue(usedMemory)).append(']');
sb.append(", total_memory[").append(new ByteSizeValue(totalMemory)).append(']');
sb.append(", max_memory[").append(maxMemory).append(']');
logger.info(sb.toString());
}
indicesMemoryCleaner.cacheClear();
// TODO this ends up doing a flush with "true", basically, at the end, replacing the IndexWriter, might not be needed with Lucene 3.0.2.
indicesMemoryCleaner.fullMemoryClean();
// don't clean thread locals, let GC clean them (so we won't run into visibility issues)
// ThreadLocals.clearReferencesThreadLocals();
fullCounter = 0;
} else {
long totalClean = totalCleans.incrementAndGet();
if (logger.isDebugEnabled()) {
StringBuilder sb = new StringBuilder();
sb.append('[').append(totalClean).append("] ");
sb.append("[Cleaning] memory_to_clean [").append(new ByteSizeValue(memoryToClean)).append(']');
sb.append(", lower_memory_threshold [").append(new ByteSizeValue(lowerThresholdMemory)).append(']');
sb.append(", upper_memory_threshold [").append(new ByteSizeValue(upperThresholdMemory)).append(']');
sb.append(", used_memory [").append(new ByteSizeValue(usedMemory)).append(']');
sb.append(", total_memory[").append(new ByteSizeValue(totalMemory)).append(']');
sb.append(", max_memory[").append(maxMemory).append(']');
logger.debug(sb.toString());
}
IndicesMemoryCleaner.MemoryCleanResult memoryCleanResult = indicesMemoryCleaner.cleanMemory(memoryToClean, minimumFlushableSizeToClean);
boolean forceClean = false;
if (memoryCleanResult.cleaned().bytes() < memoryToClean && (fullCounter > (fullThreshold / 2))) {
forceClean = true;
indicesMemoryCleaner.forceCleanMemory(memoryCleanResult.shardsCleaned());
}
if (logger.isDebugEnabled()) {
logger.debug("[" + totalClean + "] [Cleaned ] force_clean [" + forceClean + "], " + memoryCleanResult);
}
}
performedClean = true;
cleanCounter = 0;
} catch (Exception e) {
logger.info("Failed to run memory monitor", e);
}
}
}
}