SOLR-9856 Collect metrics for shard replication and tlog replay on replicas.

This commit is contained in:
Andrzej Bialecki 2017-01-09 21:00:00 +01:00
parent 1d7379b680
commit b8383db06e
7 changed files with 174 additions and 7 deletions

View File

@ -228,6 +228,8 @@ New Features
* SOLR-9935: Add hl.fragsize support when using the UnifiedHighlighter to avoid snippets/Passages that are too small.
Defaults to 70. (David Smiley)
* SOLR-9856: Collect metrics for shard replication and tlog replay on replicas (ab).
Optimizations
----------------------
* SOLR-9704: Facet Module / JSON Facet API: Optimize blockChildren facets that have

View File

@ -32,7 +32,7 @@ public interface SolrInfoMBean {
/**
* Category of {@link SolrCore} component.
*/
enum Category { CORE, QUERYHANDLER, UPDATEHANDLER, CACHE, HIGHLIGHTING, QUERYPARSER, SEARCHER, INDEX, DIRECTORY, HTTP, OTHER }
enum Category { CORE, QUERYHANDLER, UPDATEHANDLER, CACHE, HIGHLIGHTING, QUERYPARSER, SEARCHER, REPLICATION, TLOG, INDEX, DIRECTORY, HTTP, OTHER }
/**
* Top-level group of beans for a subsystem.

View File

@ -29,6 +29,8 @@ import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import com.codahale.metrics.Counter;
import com.codahale.metrics.Timer;
import org.apache.http.NoHttpResponseException;
import org.apache.http.client.HttpClient;
import org.apache.http.conn.ConnectTimeoutException;
@ -40,12 +42,15 @@ import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.util.StrUtils;
import org.apache.solr.core.SolrCore;
import org.apache.solr.core.SolrInfoMBean;
import org.apache.solr.handler.component.HttpShardHandlerFactory;
import org.apache.solr.handler.component.ShardHandler;
import org.apache.solr.handler.component.ShardHandlerFactory;
import org.apache.solr.handler.component.ShardRequest;
import org.apache.solr.handler.component.ShardResponse;
import org.apache.solr.logging.MDCLoggingContext;
import org.apache.solr.metrics.SolrMetricManager;
import org.apache.solr.metrics.SolrMetricProducer;
import org.apache.solr.request.LocalSolrQueryRequest;
import org.apache.solr.request.SolrQueryRequest;
import org.apache.solr.response.SolrQueryResponse;
@ -58,7 +63,7 @@ import static org.apache.solr.update.processor.DistributedUpdateProcessor.Distri
import static org.apache.solr.update.processor.DistributingUpdateProcessorFactory.DISTRIB_UPDATE_PARAM;
/** @lucene.experimental */
public class PeerSync {
public class PeerSync implements SolrMetricProducer {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private boolean debug = log.isDebugEnabled();
@ -87,6 +92,11 @@ public class PeerSync {
private final boolean onlyIfActive;
private SolrCore core;
// metrics
private Timer syncTime;
private Counter syncErrors;
private Counter syncSkipped;
// comparator that sorts by absolute value, putting highest first
public static Comparator<Long> absComparator = (o1, o2) -> {
long l1 = Math.abs(o1);
@ -112,7 +122,6 @@ public class PeerSync {
return 0;
};
private static class SyncShardRequest extends ShardRequest {
List<Long> reportedVersions;
IndexFingerprint fingerprint;
@ -147,6 +156,15 @@ public class PeerSync {
// TODO: close
shardHandlerFactory = (HttpShardHandlerFactory) core.getCoreDescriptor().getCoreContainer().getShardHandlerFactory();
shardHandler = shardHandlerFactory.getShardHandler(client);
core.getCoreMetricManager().registerMetricProducer(SolrInfoMBean.Category.REPLICATION.toString(), this);
}
@Override
public void initializeMetrics(SolrMetricManager manager, String registry, String scope) {
syncTime = manager.timer(registry, "time", scope);
syncErrors = manager.counter(registry, "errors", scope);
syncSkipped = manager.counter(registry, "skipped", scope);
}
/** optional list of updates we had before possibly receiving new updates */
@ -208,9 +226,11 @@ public class PeerSync {
*/
public PeerSyncResult sync() {
if (ulog == null) {
syncErrors.inc();
return PeerSyncResult.failure();
}
MDCLoggingContext.setCore(core);
Timer.Context timerContext = null;
try {
log.info(msg() + "START replicas=" + replicas + " nUpdates=" + nUpdates);
@ -221,10 +241,13 @@ public class PeerSync {
}
// check if we already in sync to begin with
if(doFingerprint && alreadyInSync()) {
syncSkipped.inc();
return PeerSyncResult.success();
}
// measure only when actual sync is performed
timerContext = syncTime.time();
// Fire off the requests before getting our own recent updates (for better concurrency)
// This also allows us to avoid getting updates we don't need... if we got our updates and then got their updates,
// they would
@ -242,6 +265,7 @@ public class PeerSync {
if (startingVersions != null) {
if (startingVersions.size() == 0) {
log.warn("no frame of reference to tell if we've missed updates");
syncErrors.inc();
return PeerSyncResult.failure();
}
Collections.sort(startingVersions, absComparator);
@ -257,6 +281,7 @@ public class PeerSync {
if (Math.abs(startingVersions.get(0)) < smallestNewUpdate) {
log.warn(msg()
+ "too many updates received since start - startingUpdates no longer overlaps with our currentUpdates");
syncErrors.inc();
return PeerSyncResult.failure();
}
@ -285,10 +310,12 @@ public class PeerSync {
if (srsp.getException() == null) {
List<Long> otherVersions = (List<Long>)srsp.getSolrResponse().getResponse().get("versions");
if (otherVersions != null && !otherVersions.isEmpty()) {
syncErrors.inc();
return PeerSyncResult.failure(true);
}
}
}
syncErrors.inc();
return PeerSyncResult.failure(false);
}
}
@ -304,6 +331,7 @@ public class PeerSync {
if (!success) {
log.info(msg() + "DONE. sync failed");
shardHandler.cancelAll();
syncErrors.inc();
return PeerSyncResult.failure();
}
}
@ -318,8 +346,14 @@ public class PeerSync {
}
log.info(msg() + "DONE. sync " + (success ? "succeeded" : "failed"));
if (!success) {
syncErrors.inc();
}
return success ? PeerSyncResult.success() : PeerSyncResult.failure();
} finally {
if (timerContext != null) {
timerContext.close();
}
MDCLoggingContext.clear();
}
}

View File

@ -40,6 +40,8 @@ import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import com.codahale.metrics.Gauge;
import com.codahale.metrics.Meter;
import org.apache.hadoop.fs.FileSystem;
import org.apache.lucene.util.BytesRef;
import org.apache.solr.common.SolrException;
@ -50,6 +52,9 @@ import org.apache.solr.common.params.SolrParams;
import org.apache.solr.common.util.ExecutorUtil;
import org.apache.solr.core.PluginInfo;
import org.apache.solr.core.SolrCore;
import org.apache.solr.core.SolrInfoMBean;
import org.apache.solr.metrics.SolrMetricManager;
import org.apache.solr.metrics.SolrMetricProducer;
import org.apache.solr.request.LocalSolrQueryRequest;
import org.apache.solr.request.SolrQueryRequest;
import org.apache.solr.request.SolrRequestInfo;
@ -71,7 +76,7 @@ import static org.apache.solr.update.processor.DistributingUpdateProcessorFactor
/** @lucene.experimental */
public class UpdateLog implements PluginInfoInitialized {
public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer {
private static final long STATUS_TIME = TimeUnit.NANOSECONDS.convert(60, TimeUnit.SECONDS);
public static String LOG_FILENAME_PATTERN = "%s.%019d";
public static String TLOG_NAME="tlog";
@ -186,6 +191,14 @@ public class UpdateLog implements PluginInfoInitialized {
List<Long> startingVersions;
int startingOperation; // last operation in the logs on startup
// metrics
protected Gauge<Integer> bufferedOpsGauge;
protected Gauge<Integer> replayLogsCountGauge;
protected Gauge<Long> replayBytesGauge;
protected Gauge<Integer> stateGauge;
protected Meter applyingBufferedOpsMeter;
protected Meter replayOpsMeter;
public static class LogPtr {
final long pointer;
final long version;
@ -333,7 +346,39 @@ public class UpdateLog implements PluginInfoInitialized {
}
}
core.getCoreMetricManager().registerMetricProducer(SolrInfoMBean.Category.TLOG.toString(), this);
}
@Override
public void initializeMetrics(SolrMetricManager manager, String registry, String scope) {
bufferedOpsGauge = () -> {
if (tlog == null) {
return 0;
} else if (state == State.APPLYING_BUFFERED) {
// numRecords counts header as a record
return tlog.numRecords() - 1 - recoveryInfo.adds - recoveryInfo.deleteByQuery - recoveryInfo.deletes - recoveryInfo.errors;
} else if (state == State.BUFFERING) {
// numRecords counts header as a record
return tlog.numRecords() - 1;
} else {
return 0;
}
};
replayLogsCountGauge = () -> logs.size();
replayBytesGauge = () -> {
if (state == State.REPLAYING) {
return getTotalLogsSize();
} else {
return 0L;
}
};
manager.register(registry, bufferedOpsGauge, true, "ops", scope, "buffered");
manager.register(registry, replayLogsCountGauge, true, "logs", scope, "replay", "remaining");
manager.register(registry, replayBytesGauge, true, "bytes", scope, "replay", "remaining");
applyingBufferedOpsMeter = manager.meter(registry, "ops", scope, "applying_buffered");
replayOpsMeter = manager.meter(registry, "ops", scope, "replay");
stateGauge = () -> state.ordinal();
manager.register(registry, stateGauge, true, "state", scope);
}
/**
@ -1427,6 +1472,13 @@ public class UpdateLog implements PluginInfoInitialized {
loglog.error("REPLAY_ERR: Exception replaying log", rsp.getException());
throw rsp.getException();
}
if (state == State.REPLAYING) {
replayOpsMeter.mark();
} else if (state == State.APPLYING_BUFFERED) {
applyingBufferedOpsMeter.mark();
} else {
// XXX should not happen?
}
} catch (IOException ex) {
recoveryInfo.errors++;
loglog.warn("REPLAY_ERR: IOException reading log", ex);

View File

@ -27,9 +27,14 @@ import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import com.codahale.metrics.Counter;
import com.codahale.metrics.Metric;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.Timer;
import org.apache.commons.lang.RandomStringUtils;
import org.apache.lucene.util.LuceneTestCase.Slow;
import org.apache.solr.client.solrj.SolrQuery;
@ -172,6 +177,16 @@ public class PeerSyncReplicationTest extends AbstractFullDistribZkTestBase {
// make sure leader has not changed after bringing initial leader back
assertEquals(nodePeerSynced, shardToLeaderJetty.get("shard1"));
// assert metrics
MetricRegistry registry = nodePeerSynced.jetty.getCoreContainer().getMetricManager().registry("solr.core.collection1");
Map<String, Metric> metrics = registry.getMetrics();
assertTrue("REPLICATION.time present", metrics.containsKey("REPLICATION.time"));
assertTrue("REPLICATION.errors present", metrics.containsKey("REPLICATION.errors"));
Timer timer = (Timer)metrics.get("REPLICATION.time");
assertEquals(1L, timer.getCount());
Counter counter = (Counter)metrics.get("REPLICATION.errors");
assertEquals(0L, counter.getCount());
success = true;
} finally {
System.clearProperty("solr.disableFingerprint");

View File

@ -23,9 +23,14 @@ import java.io.FileOutputStream;
import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import com.codahale.metrics.Counter;
import com.codahale.metrics.Metric;
import com.codahale.metrics.Timer;
import org.apache.commons.io.IOUtils;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.embedded.JettySolrRunner;
@ -35,6 +40,7 @@ import org.apache.solr.client.solrj.response.QueryResponse;
import org.apache.solr.common.cloud.ClusterStateUtil;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.core.SolrCore;
import org.apache.solr.metrics.SolrMetricManager;
import org.apache.solr.update.DirectUpdateHandler2;
import org.apache.solr.update.UpdateLog;
import org.apache.solr.util.TestInjection;
@ -102,6 +108,26 @@ public class TestCloudRecovery extends SolrCloudTestCase {
assertEquals(4, resp.getResults().getNumFound());
// Make sure all nodes is recover from tlog
assertEquals(4, countReplayLog.get());
// check metrics
int replicationCount = 0;
int errorsCount = 0;
int skippedCount = 0;
for (JettySolrRunner jetty : cluster.getJettySolrRunners()) {
SolrMetricManager manager = jetty.getCoreContainer().getMetricManager();
List<String> registryNames = manager.registryNames().stream()
.filter(s -> s.startsWith("solr.core.")).collect(Collectors.toList());
for (String registry : registryNames) {
Map<String, Metric> metrics = manager.registry(registry).getMetrics();
Timer timer = (Timer)metrics.get("REPLICATION.time");
Counter counter = (Counter)metrics.get("REPLICATION.errors");
Counter skipped = (Counter)metrics.get("REPLICATION.skipped");
replicationCount += timer.getCount();
errorsCount += counter.getCount();
skippedCount += skipped.getCount();
}
}
assertEquals(2, replicationCount);
}
@Test

View File

@ -19,6 +19,11 @@ package org.apache.solr.search;
import static org.apache.solr.update.processor.DistributingUpdateProcessorFactory.DISTRIB_UPDATE_PARAM;
import com.codahale.metrics.Gauge;
import com.codahale.metrics.Meter;
import com.codahale.metrics.Metric;
import com.codahale.metrics.MetricRegistry;
import org.apache.solr.metrics.SolrMetricManager;
import org.noggit.ObjectBuilder;
import org.apache.solr.SolrTestCaseJ4;
import org.apache.solr.request.SolrQueryRequest;
@ -55,7 +60,7 @@ public class TestRecovery extends SolrTestCaseJ4 {
// TODO: fix this test to not require FSDirectory
static String savedFactory;
@BeforeClass
public static void beforeClass() throws Exception {
savedFactory = System.getProperty("solr.DirectoryFactory");
@ -72,6 +77,12 @@ public class TestRecovery extends SolrTestCaseJ4 {
}
}
private Map<String, Metric> getMetrics() {
SolrMetricManager manager = h.getCoreContainer().getMetricManager();
MetricRegistry registry = manager.registry(h.getCore().getCoreMetricManager().getRegistryName());
return registry.getMetrics();
}
@Test
public void testLogReplay() throws Exception {
try {
@ -107,6 +118,9 @@ public class TestRecovery extends SolrTestCaseJ4 {
h.close();
createCore();
Map<String, Metric> metrics = getMetrics(); // live map view
// Solr should kick this off now
// h.getCore().getUpdateHandler().getUpdateLog().recoverFromLog();
@ -117,6 +131,15 @@ public class TestRecovery extends SolrTestCaseJ4 {
// make sure we can still access versions after a restart
assertJQ(req("qt","/get", "getVersions",""+versions.size()),"/versions==" + versions);
assertEquals(UpdateLog.State.REPLAYING, h.getCore().getUpdateHandler().getUpdateLog().getState());
// check metrics
Gauge<Integer> state = (Gauge<Integer>)metrics.get("TLOG.state");
assertEquals(UpdateLog.State.REPLAYING.ordinal(), state.getValue().intValue());
Gauge<Integer> replayingLogs = (Gauge<Integer>)metrics.get("TLOG.replay.remaining.logs");
assertTrue(replayingLogs.getValue().intValue() > 0);
Gauge<Long> replayingDocs = (Gauge<Long>)metrics.get("TLOG.replay.remaining.bytes");
assertTrue(replayingDocs.getValue().longValue() > 0);
// unblock recovery
logReplay.release(1000);
@ -128,6 +151,10 @@ public class TestRecovery extends SolrTestCaseJ4 {
assertJQ(req("q","*:*") ,"/response/numFound==3");
Meter replayDocs = (Meter)metrics.get("TLOG.replay.ops");
assertEquals(5L, replayDocs.getCount());
assertEquals(UpdateLog.State.ACTIVE.ordinal(), state.getValue().intValue());
// make sure we can still access versions after recovery
assertJQ(req("qt","/get", "getVersions",""+versions.size()) ,"/versions==" + versions);
@ -195,15 +222,20 @@ public class TestRecovery extends SolrTestCaseJ4 {
clearIndex();
assertU(commit());
Map<String, Metric> metrics = getMetrics();
assertEquals(UpdateLog.State.ACTIVE, ulog.getState());
ulog.bufferUpdates();
assertEquals(UpdateLog.State.BUFFERING, ulog.getState());
Future<UpdateLog.RecoveryInfo> rinfoFuture = ulog.applyBufferedUpdates();
assertTrue(rinfoFuture == null);
assertEquals(UpdateLog.State.ACTIVE, ulog.getState());
ulog.bufferUpdates();
assertEquals(UpdateLog.State.BUFFERING, ulog.getState());
Gauge<Integer> state = (Gauge<Integer>)metrics.get("TLOG.state");
assertEquals(UpdateLog.State.BUFFERING.ordinal(), state.getValue().intValue());
// simulate updates from a leader
updateJ(jsonAdd(sdoc("id","B1", "_version_","1010")), params(DISTRIB_UPDATE_PARAM,FROM_LEADER));
@ -235,6 +267,8 @@ public class TestRecovery extends SolrTestCaseJ4 {
,"=={'doc':null}"
);
Gauge<Integer> bufferedOps = (Gauge<Integer>)metrics.get("TLOG.buffered.ops");
assertEquals(6, bufferedOps.getValue().intValue());
rinfoFuture = ulog.applyBufferedUpdates();
assertTrue(rinfoFuture != null);
@ -246,6 +280,8 @@ public class TestRecovery extends SolrTestCaseJ4 {
UpdateLog.RecoveryInfo rinfo = rinfoFuture.get();
assertEquals(UpdateLog.State.ACTIVE, ulog.getState());
Meter applyingBuffered = (Meter)metrics.get("TLOG.applying_buffered.ops");
assertEquals(6L, applyingBuffered.getCount());
assertJQ(req("qt","/get", "getVersions","6")
,"=={'versions':[-2010,1030,1020,-1017,1015,1010]}"
@ -312,6 +348,8 @@ public class TestRecovery extends SolrTestCaseJ4 {
assertEquals(1, recInfo.deleteByQuery);
assertEquals(UpdateLog.State.ACTIVE, ulog.getState()); // leave each test method in a good state
assertEquals(0, bufferedOps.getValue().intValue());
} finally {
DirectUpdateHandler2.commitOnClose = true;
UpdateLog.testing_logReplayHook = null;