parent
b10cec1908
commit
a2a8553faf
|
@ -338,7 +338,6 @@
|
|||
|
||||
# Shard level query and fetch threshold logging.
|
||||
|
||||
#index.search.slowlog.level: TRACE
|
||||
#index.search.slowlog.threshold.query.warn: 10s
|
||||
#index.search.slowlog.threshold.query.info: 5s
|
||||
#index.search.slowlog.threshold.query.debug: 2s
|
||||
|
@ -349,6 +348,11 @@
|
|||
#index.search.slowlog.threshold.fetch.debug: 500ms
|
||||
#index.search.slowlog.threshold.fetch.trace: 200ms
|
||||
|
||||
#index.indexing.slowlog.threshold.index.warn: 10s
|
||||
#index.indexing.slowlog.threshold.index.info: 5s
|
||||
#index.indexing.slowlog.threshold.index.debug: 2s
|
||||
#index.indexing.slowlog.threshold.index.trace: 500ms
|
||||
|
||||
################################## GC Logging ################################
|
||||
|
||||
#monitor.jvm.gc.ParNew.warn: 1000ms
|
||||
|
|
|
@ -16,9 +16,11 @@ logger:
|
|||
#discovery: TRACE
|
||||
|
||||
index.search.slowlog: TRACE, index_search_slow_log_file
|
||||
index.indexing.slowlog: TRACE, index_indexing_slow_log_file
|
||||
|
||||
additivity:
|
||||
index.search.slowlog: false
|
||||
index.indexing.slowlog: false
|
||||
|
||||
appender:
|
||||
console:
|
||||
|
@ -42,3 +44,11 @@ appender:
|
|||
layout:
|
||||
type: pattern
|
||||
conversionPattern: "[%d{ISO8601}][%-5p][%-25c] %m%n"
|
||||
|
||||
index_indexing_slow_log_file:
|
||||
type: dailyRollingFile
|
||||
file: ${path.logs}/${cluster.name}_index_indexing_slowlog.log
|
||||
datePattern: "'.'yyyy-MM-dd"
|
||||
layout:
|
||||
type: pattern
|
||||
conversionPattern: "[%d{ISO8601}][%-5p][%-25c] %m%n"
|
||||
|
|
|
@ -20,6 +20,7 @@
|
|||
package org.elasticsearch.index.indexing;
|
||||
|
||||
import org.elasticsearch.common.inject.AbstractModule;
|
||||
import org.elasticsearch.index.indexing.slowlog.ShardSlowLogIndexingService;
|
||||
|
||||
/**
|
||||
*/
|
||||
|
@ -28,5 +29,6 @@ public class ShardIndexingModule extends AbstractModule {
|
|||
@Override
|
||||
protected void configure() {
|
||||
bind(ShardIndexingService.class).asEagerSingleton();
|
||||
bind(ShardSlowLogIndexingService.class).asEagerSingleton();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -26,6 +26,7 @@ import org.elasticsearch.common.metrics.CounterMetric;
|
|||
import org.elasticsearch.common.metrics.MeanMetric;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.index.engine.Engine;
|
||||
import org.elasticsearch.index.indexing.slowlog.ShardSlowLogIndexingService;
|
||||
import org.elasticsearch.index.settings.IndexSettings;
|
||||
import org.elasticsearch.index.shard.AbstractIndexShardComponent;
|
||||
import org.elasticsearch.index.shard.ShardId;
|
||||
|
@ -39,6 +40,8 @@ import java.util.concurrent.TimeUnit;
|
|||
*/
|
||||
public class ShardIndexingService extends AbstractIndexShardComponent {
|
||||
|
||||
private final ShardSlowLogIndexingService slowLog;
|
||||
|
||||
private final StatsHolder totalStats = new StatsHolder();
|
||||
|
||||
private volatile Map<String, StatsHolder> typesStats = ImmutableMap.of();
|
||||
|
@ -46,8 +49,9 @@ public class ShardIndexingService extends AbstractIndexShardComponent {
|
|||
private CopyOnWriteArrayList<IndexingOperationListener> listeners = null;
|
||||
|
||||
@Inject
|
||||
public ShardIndexingService(ShardId shardId, @IndexSettings Settings indexSettings) {
|
||||
public ShardIndexingService(ShardId shardId, @IndexSettings Settings indexSettings, ShardSlowLogIndexingService slowLog) {
|
||||
super(shardId, indexSettings);
|
||||
this.slowLog = slowLog;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -95,6 +99,8 @@ public class ShardIndexingService extends AbstractIndexShardComponent {
|
|||
}
|
||||
|
||||
public Engine.Create preCreate(Engine.Create create) {
|
||||
totalStats.indexCurrent.inc();
|
||||
typeStats(create.type()).indexCurrent.inc();
|
||||
if (listeners != null) {
|
||||
for (IndexingOperationListener listener : listeners) {
|
||||
create = listener.preCreate(create);
|
||||
|
@ -118,7 +124,11 @@ public class ShardIndexingService extends AbstractIndexShardComponent {
|
|||
public void postCreate(Engine.Create create) {
|
||||
long took = create.endTime() - create.startTime();
|
||||
totalStats.indexMetric.inc(took);
|
||||
typeStats(create.type()).indexMetric.inc(took);
|
||||
totalStats.indexCurrent.dec();
|
||||
StatsHolder typeStats = typeStats(create.type());
|
||||
typeStats.indexMetric.inc(took);
|
||||
typeStats.indexCurrent.dec();
|
||||
slowLog.postCreate(create, took);
|
||||
if (listeners != null) {
|
||||
for (IndexingOperationListener listener : listeners) {
|
||||
try {
|
||||
|
@ -160,6 +170,7 @@ public class ShardIndexingService extends AbstractIndexShardComponent {
|
|||
StatsHolder typeStats = typeStats(index.type());
|
||||
typeStats.indexMetric.inc(took);
|
||||
typeStats.indexCurrent.dec();
|
||||
slowLog.postIndex(index, took);
|
||||
if (listeners != null) {
|
||||
for (IndexingOperationListener listener : listeners) {
|
||||
try {
|
||||
|
|
|
@ -0,0 +1,176 @@
|
|||
/*
|
||||
* Licensed to ElasticSearch and Shay Banon under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. ElasticSearch licenses this
|
||||
* file to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.index.indexing.slowlog;
|
||||
|
||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.logging.ESLogger;
|
||||
import org.elasticsearch.common.logging.Loggers;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.common.xcontent.XContentHelper;
|
||||
import org.elasticsearch.index.engine.Engine;
|
||||
import org.elasticsearch.index.mapper.ParsedDocument;
|
||||
import org.elasticsearch.index.settings.IndexSettings;
|
||||
import org.elasticsearch.index.settings.IndexSettingsService;
|
||||
import org.elasticsearch.index.shard.AbstractIndexShardComponent;
|
||||
import org.elasticsearch.index.shard.ShardId;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
/**
|
||||
*/
|
||||
public class ShardSlowLogIndexingService extends AbstractIndexShardComponent {
|
||||
|
||||
private boolean reformat;
|
||||
|
||||
private long indexWarnThreshold;
|
||||
private long indexInfoThreshold;
|
||||
private long indexDebugThreshold;
|
||||
private long indexTraceThreshold;
|
||||
|
||||
private String level;
|
||||
|
||||
private final ESLogger indexLogger;
|
||||
private final ESLogger deleteLogger;
|
||||
|
||||
static {
|
||||
IndexMetaData.addDynamicSettings(
|
||||
"index.indexing.slowlog.threshold.index.warn",
|
||||
"index.indexing.slowlog.threshold.index.info",
|
||||
"index.indexing.slowlog.threshold.index.debug",
|
||||
"index.indexing.slowlog.threshold.index.trace",
|
||||
"index.indexing.slowlog.reformat",
|
||||
"index.indexing.slowlog.level"
|
||||
);
|
||||
}
|
||||
|
||||
class ApplySettings implements IndexSettingsService.Listener {
|
||||
@Override
|
||||
public synchronized void onRefreshSettings(Settings settings) {
|
||||
long indexWarnThreshold = settings.getAsTime("index.indexing.slowlog.threshold.index.warn", TimeValue.timeValueNanos(ShardSlowLogIndexingService.this.indexWarnThreshold)).nanos();
|
||||
if (indexWarnThreshold != ShardSlowLogIndexingService.this.indexWarnThreshold) {
|
||||
ShardSlowLogIndexingService.this.indexWarnThreshold = indexWarnThreshold;
|
||||
}
|
||||
long indexInfoThreshold = settings.getAsTime("index.indexing.slowlog.threshold.index.info", TimeValue.timeValueNanos(ShardSlowLogIndexingService.this.indexInfoThreshold)).nanos();
|
||||
if (indexInfoThreshold != ShardSlowLogIndexingService.this.indexInfoThreshold) {
|
||||
ShardSlowLogIndexingService.this.indexInfoThreshold = indexInfoThreshold;
|
||||
}
|
||||
long indexDebugThreshold = settings.getAsTime("index.indexing.slowlog.threshold.index.debug", TimeValue.timeValueNanos(ShardSlowLogIndexingService.this.indexDebugThreshold)).nanos();
|
||||
if (indexDebugThreshold != ShardSlowLogIndexingService.this.indexDebugThreshold) {
|
||||
ShardSlowLogIndexingService.this.indexDebugThreshold = indexDebugThreshold;
|
||||
}
|
||||
long indexTraceThreshold = settings.getAsTime("index.indexing.slowlog.threshold.index.trace", TimeValue.timeValueNanos(ShardSlowLogIndexingService.this.indexTraceThreshold)).nanos();
|
||||
if (indexTraceThreshold != ShardSlowLogIndexingService.this.indexTraceThreshold) {
|
||||
ShardSlowLogIndexingService.this.indexTraceThreshold = indexTraceThreshold;
|
||||
}
|
||||
|
||||
String level = settings.get("index.indexing.slowlog.level", ShardSlowLogIndexingService.this.level);
|
||||
if (!level.equals(ShardSlowLogIndexingService.this.level)) {
|
||||
ShardSlowLogIndexingService.this.indexLogger.setLevel(level.toUpperCase());
|
||||
ShardSlowLogIndexingService.this.deleteLogger.setLevel(level.toUpperCase());
|
||||
ShardSlowLogIndexingService.this.level = level;
|
||||
}
|
||||
|
||||
boolean reformat = settings.getAsBoolean("index.indexing.slowlog.reformat", ShardSlowLogIndexingService.this.reformat);
|
||||
if (reformat != ShardSlowLogIndexingService.this.reformat) {
|
||||
ShardSlowLogIndexingService.this.reformat = reformat;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Inject
|
||||
public ShardSlowLogIndexingService(ShardId shardId, @IndexSettings Settings indexSettings, IndexSettingsService indexSettingsService) {
|
||||
super(shardId, indexSettings);
|
||||
|
||||
this.reformat = componentSettings.getAsBoolean("reformat", true);
|
||||
|
||||
this.indexWarnThreshold = componentSettings.getAsTime("threshold.index.warn", TimeValue.timeValueNanos(-1)).nanos();
|
||||
this.indexInfoThreshold = componentSettings.getAsTime("threshold.index.info", TimeValue.timeValueNanos(-1)).nanos();
|
||||
this.indexDebugThreshold = componentSettings.getAsTime("threshold.index.debug", TimeValue.timeValueNanos(-1)).nanos();
|
||||
this.indexTraceThreshold = componentSettings.getAsTime("threshold.index.trace", TimeValue.timeValueNanos(-1)).nanos();
|
||||
|
||||
this.level = componentSettings.get("level", "TRACE").toUpperCase();
|
||||
|
||||
this.indexLogger = Loggers.getLogger(logger, ".index");
|
||||
this.deleteLogger = Loggers.getLogger(logger, ".delete");
|
||||
|
||||
indexLogger.setLevel(level);
|
||||
deleteLogger.setLevel(level);
|
||||
|
||||
indexSettingsService.addListener(new ApplySettings());
|
||||
}
|
||||
|
||||
public void postIndex(Engine.Index index, long tookInNanos) {
|
||||
postIndexing(index.parsedDoc(), tookInNanos);
|
||||
}
|
||||
|
||||
public void postCreate(Engine.Create create, long tookInNanos) {
|
||||
postIndexing(create.parsedDoc(), tookInNanos);
|
||||
}
|
||||
|
||||
private void postIndexing(ParsedDocument doc, long tookInNanos) {
|
||||
if (indexWarnThreshold >= 0 && tookInNanos > indexWarnThreshold) {
|
||||
indexLogger.warn("{}", new SlowLogParsedDocumentPrinter(doc, tookInNanos, reformat));
|
||||
} else if (indexInfoThreshold >= 0 && tookInNanos > indexInfoThreshold) {
|
||||
indexLogger.info("{}", new SlowLogParsedDocumentPrinter(doc, tookInNanos, reformat));
|
||||
} else if (indexDebugThreshold >= 0 && tookInNanos > indexDebugThreshold) {
|
||||
indexLogger.debug("{}", new SlowLogParsedDocumentPrinter(doc, tookInNanos, reformat));
|
||||
} else if (indexTraceThreshold >= 0 && tookInNanos > indexTraceThreshold) {
|
||||
indexLogger.trace("{}", new SlowLogParsedDocumentPrinter(doc, tookInNanos, reformat));
|
||||
}
|
||||
}
|
||||
|
||||
public static class SlowLogParsedDocumentPrinter {
|
||||
private final ParsedDocument doc;
|
||||
private final long tookInNanos;
|
||||
private final boolean reformat;
|
||||
|
||||
public SlowLogParsedDocumentPrinter(ParsedDocument doc, long tookInNanos, boolean reformat) {
|
||||
this.doc = doc;
|
||||
this.tookInNanos = tookInNanos;
|
||||
this.reformat = reformat;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
StringBuilder sb = new StringBuilder();
|
||||
sb.append("took[").append(TimeValue.timeValueNanos(tookInNanos)).append("], took_millis[").append(TimeUnit.NANOSECONDS.toMillis(tookInNanos)).append("], ");
|
||||
sb.append("type[").append(doc.type()).append("], ");
|
||||
sb.append("id[").append(doc.id()).append("], ");
|
||||
if (doc.routing() == null) {
|
||||
sb.append("routing[], ");
|
||||
} else {
|
||||
sb.append("routing[").append(doc.routing()).append("], ");
|
||||
}
|
||||
if (doc.source() != null && doc.source().length() > 0) {
|
||||
try {
|
||||
sb.append("source[").append(XContentHelper.convertToJson(doc.source(), reformat)).append("]");
|
||||
} catch (IOException e) {
|
||||
sb.append("source[_failed_to_convert_]");
|
||||
}
|
||||
} else {
|
||||
sb.append("source[]");
|
||||
}
|
||||
return sb.toString();
|
||||
}
|
||||
}
|
||||
}
|
|
@ -19,12 +19,12 @@
|
|||
|
||||
package org.elasticsearch.test.unit.index.engine.robin;
|
||||
|
||||
import org.apache.lucene.search.similarities.DefaultSimilarity;
|
||||
import org.elasticsearch.index.analysis.AnalysisService;
|
||||
import org.elasticsearch.index.codec.CodecService;
|
||||
import org.elasticsearch.index.engine.Engine;
|
||||
import org.elasticsearch.index.engine.robin.RobinEngine;
|
||||
import org.elasticsearch.index.indexing.ShardIndexingService;
|
||||
import org.elasticsearch.index.indexing.slowlog.ShardSlowLogIndexingService;
|
||||
import org.elasticsearch.index.settings.IndexSettingsService;
|
||||
import org.elasticsearch.index.similarity.SimilarityService;
|
||||
import org.elasticsearch.index.store.Store;
|
||||
|
@ -39,7 +39,8 @@ import static org.elasticsearch.common.settings.ImmutableSettings.Builder.EMPTY_
|
|||
public class SimpleRobinEngineTests extends AbstractSimpleEngineTests {
|
||||
|
||||
protected Engine createEngine(Store store, Translog translog) {
|
||||
return new RobinEngine(shardId, EMPTY_SETTINGS, threadPool, new IndexSettingsService(shardId.index(), EMPTY_SETTINGS), new ShardIndexingService(shardId, EMPTY_SETTINGS), null, store, createSnapshotDeletionPolicy(), translog, createMergePolicy(), createMergeScheduler(),
|
||||
IndexSettingsService indexSettingsService = new IndexSettingsService(shardId.index(), EMPTY_SETTINGS);
|
||||
return new RobinEngine(shardId, EMPTY_SETTINGS, threadPool, indexSettingsService, new ShardIndexingService(shardId, EMPTY_SETTINGS, new ShardSlowLogIndexingService(shardId, EMPTY_SETTINGS, indexSettingsService)), null, store, createSnapshotDeletionPolicy(), translog, createMergePolicy(), createMergeScheduler(),
|
||||
new AnalysisService(shardId.index()), new SimilarityService(shardId.index()), new CodecService(shardId.index()));
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue