From cc9c867083831cbacfdc76466800522fda05e1f7 Mon Sep 17 00:00:00 2001 From: Cao Manh Dat Date: Wed, 27 Sep 2017 22:46:55 +0700 Subject: [PATCH] SOLR-11293: Fix downgrade in performance from precommit --- solr/CHANGES.txt | 2 +- .../apache/solr/cloud/RecoveryStrategy.java | 1 + .../solr/cloud/ReplicateFromLeader.java | 7 ++- .../org/apache/solr/cloud/ZkController.java | 2 +- .../org/apache/solr/handler/IndexFetcher.java | 16 ++++-- .../solr/handler/ReplicationHandler.java | 10 ++-- .../solr/handler/admin/PrepRecoveryOp.java | 33 ------------ .../org/apache/solr/update/UpdateLog.java | 12 +++-- .../apache/solr/cloud/TestTlogReplica.java | 50 ++++++++++++------- 9 files changed, 66 insertions(+), 67 deletions(-) diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt index 7cd3717b29e..2b8a9c591c8 100644 --- a/solr/CHANGES.txt +++ b/solr/CHANGES.txt @@ -134,7 +134,7 @@ Bug Fixes * SOLR-11278: Stopping CDCR should cancel a running bootstrap operation. (Amrit Sarkar, shalin) -* SOLR-11293: Potential data loss in TLOG replicas after replication failures (noble) +* SOLR-11293: Potential data loss in TLOG replicas when masterVersion equals zero (noble, Cao Manh Dat) * SOLR-10101: TestLazyCores hangs (Erick Erickson) diff --git a/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java b/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java index 8a6b99b2c95..73e61426ad8 100644 --- a/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java +++ b/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java @@ -214,6 +214,7 @@ public class RecoveryStrategy implements Runnable, Closeable { ModifiableSolrParams solrParams = new ModifiableSolrParams(); solrParams.set(ReplicationHandler.MASTER_URL, leaderUrl); + solrParams.set(ReplicationHandler.SKIP_COMMIT_ON_MASTER_VERSION_ZERO, replicaType == Replica.Type.TLOG); if (isClosed()) return; // we check closed on return boolean success = replicationHandler.doFetch(solrParams, false).getSuccessful(); diff --git a/solr/core/src/java/org/apache/solr/cloud/ReplicateFromLeader.java b/solr/core/src/java/org/apache/solr/cloud/ReplicateFromLeader.java index 549ef70080f..0a742e3a5ae 100644 --- a/solr/core/src/java/org/apache/solr/cloud/ReplicateFromLeader.java +++ b/solr/core/src/java/org/apache/solr/cloud/ReplicateFromLeader.java @@ -26,6 +26,7 @@ import org.apache.solr.common.util.NamedList; import org.apache.solr.core.CoreContainer; import org.apache.solr.core.SolrConfig; import org.apache.solr.core.SolrCore; +import org.apache.solr.handler.IndexFetcher; import org.apache.solr.handler.ReplicationHandler; import org.apache.solr.request.LocalSolrQueryRequest; import org.apache.solr.request.SolrQueryRequest; @@ -74,6 +75,7 @@ public class ReplicateFromLeader { NamedList slaveConfig = new NamedList<>(); slaveConfig.add("fetchFromLeader", Boolean.TRUE); + slaveConfig.add(ReplicationHandler.SKIP_COMMIT_ON_MASTER_VERSION_ZERO, switchTransactionLog); slaveConfig.add("pollInterval", pollIntervalStr); NamedList replicationConfig = new NamedList<>(); replicationConfig.add("slave", slaveConfig); @@ -85,10 +87,11 @@ public class ReplicateFromLeader { replicationProcess = new ReplicationHandler(); if (switchTransactionLog) { - replicationProcess.setPollListener((solrCore, pollSuccess) -> { - if (pollSuccess) { + replicationProcess.setPollListener((solrCore, fetchResult) -> { + if (fetchResult == IndexFetcher.IndexFetchResult.INDEX_FETCH_SUCCESS) { String commitVersion = getCommitVersion(core); if (commitVersion == null) return; + if (Long.parseLong(commitVersion) == lastVersion) return; UpdateLog updateLog = solrCore.getUpdateHandler().getUpdateLog(); SolrQueryRequest req = new LocalSolrQueryRequest(core, new ModifiableSolrParams()); diff --git a/solr/core/src/java/org/apache/solr/cloud/ZkController.java b/solr/core/src/java/org/apache/solr/cloud/ZkController.java index b3793af6e17..dcce2c82422 100644 --- a/solr/core/src/java/org/apache/solr/cloud/ZkController.java +++ b/solr/core/src/java/org/apache/solr/cloud/ZkController.java @@ -989,7 +989,7 @@ public class ZkController { if (isTlogReplicaAndNotLeader) { String commitVersion = ReplicateFromLeader.getCommitVersion(core); if (commitVersion != null) { - ulog.copyOverOldUpdates(Long.parseLong(commitVersion), true); + ulog.copyOverOldUpdates(Long.parseLong(commitVersion)); } } // we will call register again after zk expiration and on reload diff --git a/solr/core/src/java/org/apache/solr/handler/IndexFetcher.java b/solr/core/src/java/org/apache/solr/handler/IndexFetcher.java index 1bf452fce4f..52d3b25d48d 100644 --- a/solr/core/src/java/org/apache/solr/handler/IndexFetcher.java +++ b/solr/core/src/java/org/apache/solr/handler/IndexFetcher.java @@ -163,6 +163,8 @@ public class IndexFetcher { private Integer soTimeout; + private boolean skipCommitOnMasterVersionZero; + private static final String INTERRUPT_RESPONSE_MESSAGE = "Interrupted while waiting for modify lock"; public static class IndexFetchResult { @@ -226,6 +228,10 @@ public class IndexFetcher { if (fetchFromLeader != null && fetchFromLeader instanceof Boolean) { this.fetchFromLeader = (boolean) fetchFromLeader; } + Object skipCommitOnMasterVersionZero = initArgs.get(SKIP_COMMIT_ON_MASTER_VERSION_ZERO); + if (skipCommitOnMasterVersionZero != null && skipCommitOnMasterVersionZero instanceof Boolean) { + this.skipCommitOnMasterVersionZero = (boolean) skipCommitOnMasterVersionZero; + } String masterUrl = (String) initArgs.get(MASTER_URL); if (masterUrl == null && !this.fetchFromLeader) throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, @@ -428,7 +434,7 @@ public class IndexFetcher { LOG.info("Slave's version: " + IndexDeletionPolicyWrapper.getCommitTimestamp(commit)); if (latestVersion == 0L) { - if (forceReplication && commit.getGeneration() != 0) { + if (commit.getGeneration() != 0) { // since we won't get the files for an empty index, // we just clear ours and commit LOG.info("New index in Master. Deleting mine..."); @@ -438,8 +444,12 @@ public class IndexFetcher { } finally { iw.decref(); } - SolrQueryRequest req = new LocalSolrQueryRequest(solrCore, new ModifiableSolrParams()); - solrCore.getUpdateHandler().commit(new CommitUpdateCommand(req, false)); + if (skipCommitOnMasterVersionZero) { + openNewSearcherAndUpdateCommitPoint(); + } else { + SolrQueryRequest req = new LocalSolrQueryRequest(solrCore, new ModifiableSolrParams()); + solrCore.getUpdateHandler().commit(new CommitUpdateCommand(req, false)); + } } //there is nothing to be replicated diff --git a/solr/core/src/java/org/apache/solr/handler/ReplicationHandler.java b/solr/core/src/java/org/apache/solr/handler/ReplicationHandler.java index 2fef4341a0a..a9e14f45fcc 100644 --- a/solr/core/src/java/org/apache/solr/handler/ReplicationHandler.java +++ b/solr/core/src/java/org/apache/solr/handler/ReplicationHandler.java @@ -220,7 +220,7 @@ public class ReplicationHandler extends RequestHandlerBase implements SolrCoreAw private PollListener pollListener; public interface PollListener { - void onComplete(SolrCore solrCore, boolean pollSuccess) throws IOException; + void onComplete(SolrCore solrCore, IndexFetchResult fetchResult) throws IOException; } /** @@ -1182,8 +1182,8 @@ public class ReplicationHandler extends RequestHandlerBase implements SolrCoreAw try { LOG.debug("Polling for index modifications"); markScheduledExecutionStart(); - boolean pollSuccess = doFetch(null, false).getSuccessful(); - if (pollListener != null) pollListener.onComplete(core, pollSuccess); + IndexFetchResult fetchResult = doFetch(null, false); + if (pollListener != null) pollListener.onComplete(core, fetchResult); } catch (Exception e) { LOG.error("Exception in fetching index", e); } @@ -1754,6 +1754,10 @@ public class ReplicationHandler extends RequestHandlerBase implements SolrCoreAw public static final String FETCH_FROM_LEADER = "fetchFromLeader"; + // in case of TLOG replica, if masterVersion = zero, don't do commit + // otherwise updates from current tlog won't copied over properly to the new tlog, leading to data loss + public static final String SKIP_COMMIT_ON_MASTER_VERSION_ZERO = "skipCommitOnMasterVersionZero"; + public static final String STATUS = "status"; public static final String COMMAND = "command"; diff --git a/solr/core/src/java/org/apache/solr/handler/admin/PrepRecoveryOp.java b/solr/core/src/java/org/apache/solr/handler/admin/PrepRecoveryOp.java index 748982d79f9..0a6d5ce2c51 100644 --- a/solr/core/src/java/org/apache/solr/handler/admin/PrepRecoveryOp.java +++ b/solr/core/src/java/org/apache/solr/handler/admin/PrepRecoveryOp.java @@ -20,7 +20,6 @@ package org.apache.solr.handler.admin; import java.lang.invoke.MethodHandles; import java.util.Objects; -import org.apache.lucene.search.MatchAllDocsQuery; import org.apache.solr.cloud.CloudDescriptor; import org.apache.solr.common.SolrException; import org.apache.solr.common.cloud.ClusterState; @@ -29,15 +28,10 @@ import org.apache.solr.common.cloud.Replica; import org.apache.solr.common.cloud.Slice; import org.apache.solr.common.cloud.ZkStateReader; import org.apache.solr.common.params.CoreAdminParams; -import org.apache.solr.common.params.ModifiableSolrParams; import org.apache.solr.common.params.SolrParams; import org.apache.solr.core.CoreContainer; import org.apache.solr.core.SolrCore; import org.apache.solr.handler.admin.CoreAdminHandler.CallInfo; -import org.apache.solr.request.LocalSolrQueryRequest; -import org.apache.solr.search.SolrIndexSearcher; -import org.apache.solr.update.CommitUpdateCommand; -import org.apache.solr.util.RefCounted; import org.apache.solr.util.TestInjection; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -177,33 +171,6 @@ class PrepRecoveryOp implements CoreAdminHandler.CoreAdminOp { throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Solr is shutting down"); } - - // solrcloud_debug - if (log.isDebugEnabled() && core != null) { - try { - LocalSolrQueryRequest r = new LocalSolrQueryRequest(core, - new ModifiableSolrParams()); - CommitUpdateCommand commitCmd = new CommitUpdateCommand(r, false); - commitCmd.softCommit = true; - core.getUpdateHandler().commit(commitCmd); - RefCounted searchHolder = core - .getNewestSearcher(false); - SolrIndexSearcher searcher = searchHolder.get(); - try { - log.debug(core.getCoreContainer() - .getZkController().getNodeName() - + " to replicate " - + searcher.search(new MatchAllDocsQuery(), 1).totalHits - + " gen:" - + core.getDeletionPolicy().getLatestCommit().getGeneration() - + " data:" + core.getDataDir()); - } finally { - searchHolder.decref(); - } - } catch (Exception e) { - log.debug("Error in solrcloud_debug block", e); - } - } } Thread.sleep(1000); } diff --git a/solr/core/src/java/org/apache/solr/update/UpdateLog.java b/solr/core/src/java/org/apache/solr/update/UpdateLog.java index 5a16cd45294..fc029172b7e 100644 --- a/solr/core/src/java/org/apache/solr/update/UpdateLog.java +++ b/solr/core/src/java/org/apache/solr/update/UpdateLog.java @@ -244,6 +244,7 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer { protected Gauge bufferedOpsGauge; protected Meter applyingBufferedOpsMeter; protected Meter replayOpsMeter; + protected Meter copyOverOldUpdatesMeter; public static class LogPtr { final long pointer; @@ -435,6 +436,7 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer { manager.registerGauge(null, registry, () -> getTotalLogsSize(), true, "bytes", scope, "replay", "remaining"); applyingBufferedOpsMeter = manager.meter(null, registry, "ops", scope, "applyingBuffered"); replayOpsMeter = manager.meter(null, registry, "ops", scope, "replay"); + copyOverOldUpdatesMeter = manager.meter(null, registry, "ops", scope, "copyOverOldUpdates"); manager.registerGauge(null, registry, () -> state.getValue(), true, "state", scope); } @@ -1158,12 +1160,12 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer { protected void copyAndSwitchToNewTlog(CommitUpdateCommand cuc) { synchronized (this) { - if (tlog == null && prevTlog == null && prevMapLog2 == null && logs.isEmpty()) { + if (tlog == null) { return; } preCommit(cuc); try { - copyOverOldUpdates(cuc.getVersion(), false); + copyOverOldUpdates(cuc.getVersion()); } finally { postCommit(cuc); } @@ -1173,9 +1175,8 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer { /** * Copy over updates from prevTlog or last tlog (in tlog folder) to a new tlog * @param commitVersion any updates that have version larger than the commitVersion will be copied over - * @param omitCommitted if a tlog is already committed then don't read it */ - public void copyOverOldUpdates(long commitVersion, boolean omitCommitted) { + public void copyOverOldUpdates(long commitVersion) { TransactionLog oldTlog = prevTlog; if (oldTlog == null && !logs.isEmpty()) { oldTlog = logs.getFirst(); @@ -1185,11 +1186,12 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer { } try { - if (omitCommitted && oldTlog.endsWithCommit()) return; + if (oldTlog.endsWithCommit()) return; } catch (IOException e) { log.warn("Exception reading log", e); return; } + copyOverOldUpdatesMeter.mark(); SolrQueryRequest req = new LocalSolrQueryRequest(uhandler.core, new ModifiableSolrParams()); diff --git a/solr/core/src/test/org/apache/solr/cloud/TestTlogReplica.java b/solr/core/src/test/org/apache/solr/cloud/TestTlogReplica.java index cc470d6f2fa..79dce4d636a 100644 --- a/solr/core/src/test/org/apache/solr/cloud/TestTlogReplica.java +++ b/solr/core/src/test/org/apache/solr/cloud/TestTlogReplica.java @@ -22,6 +22,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.EnumSet; +import java.util.HashMap; import java.util.List; import java.util.Locale; import java.util.Map; @@ -29,6 +30,8 @@ import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; +import com.carrotsearch.randomizedtesting.annotations.Repeat; +import com.codahale.metrics.Meter; import org.apache.http.HttpResponse; import org.apache.http.client.ClientProtocolException; import org.apache.http.client.HttpClient; @@ -60,8 +63,6 @@ import org.apache.solr.common.util.NamedList; import org.apache.solr.core.SolrCore; import org.apache.solr.update.DirectUpdateHandler2; import org.apache.solr.update.SolrIndexWriter; -import org.apache.solr.update.UpdateHandler; -import org.apache.solr.update.UpdateLog; import org.apache.solr.util.RefCounted; import org.apache.solr.util.TestInjection; import org.apache.solr.util.TimeOut; @@ -71,8 +72,6 @@ import org.junit.BeforeClass; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.carrotsearch.randomizedtesting.annotations.Repeat; - @Slow public class TestTlogReplica extends SolrCloudTestCase { @@ -464,16 +463,13 @@ public class TestTlogReplica extends SolrCloudTestCase { .process(cloudClient, collectionName); { - UpdateHandler updateHandler = getSolrCore(true).get(0).getUpdateHandler(); - RefCounted iwRef = updateHandler.getSolrCoreState().getIndexWriter(null); - assertTrue("IndexWriter at leader must see updates ", iwRef.get().hasUncommittedChanges()); - iwRef.decref(); + long docsPending = (long) getSolrCore(true).get(0).getMetricRegistry().getGauges().get("UPDATE.updateHandler.docsPending").getValue(); + assertEquals(4, docsPending); } for (SolrCore solrCore : getSolrCore(false)) { - RefCounted iwRef = solrCore.getUpdateHandler().getSolrCoreState().getIndexWriter(null); - assertFalse("IndexWriter at replicas must not see updates ", iwRef.get().hasUncommittedChanges()); - iwRef.decref(); + long docsPending = (long) solrCore.getMetricRegistry().getGauges().get("UPDATE.updateHandler.docsPending").getValue(); + assertEquals(0, docsPending); } checkRTG(1, 4, cluster.getJettySolrRunners()); @@ -486,16 +482,12 @@ public class TestTlogReplica extends SolrCloudTestCase { // The DBQ is not processed at replicas, so we still can get doc2 and other docs by RTG checkRTG(2,4, getSolrRunner(false)); + Map timeCopyOverPerCores = getTimesCopyOverOldUpdates(getSolrCore(false)); new UpdateRequest() .commit(cloudClient, collectionName); waitForNumDocsInAllActiveReplicas(2); - - // Update log roll over - for (SolrCore solrCore : getSolrCore(false)) { - UpdateLog updateLog = solrCore.getUpdateHandler().getUpdateLog(); - assertFalse(updateLog.hasUncommittedChanges()); - } + assertCopyOverOldUpdates(1, timeCopyOverPerCores); // UpdateLog copy over old updates for (int i = 15; i <= 150; i++) { @@ -506,6 +498,7 @@ public class TestTlogReplica extends SolrCloudTestCase { } checkRTG(120,150, cluster.getJettySolrRunners()); waitForReplicasCatchUp(20); + assertCopyOverOldUpdates(2, timeCopyOverPerCores); } @SuppressWarnings("unchecked") @@ -535,7 +528,7 @@ public class TestTlogReplica extends SolrCloudTestCase { waitForState("Replica didn't recover", collectionName, activeReplicaCount(0,2,0)); // We skip peerSync, so replica will always trigger commit on leader // We query only the non-leader replicas, since we haven't opened a new searcher on the leader yet - waitForNumDocsInAllReplicas(4, getNonLeaderReplias(collectionName), 0);// Should be immediate + waitForNumDocsInAllReplicas(4, getNonLeaderReplias(collectionName), 10); //timeout for stale collection state // If I add the doc immediately, the leader fails to communicate with the follower with broken pipe. // Options are, wait or retry... @@ -556,8 +549,8 @@ public class TestTlogReplica extends SolrCloudTestCase { DirectUpdateHandler2.commitOnClose = true; ChaosMonkey.start(solrRunner); waitForState("Replica didn't recover", collectionName, activeReplicaCount(0,2,0)); + waitForNumDocsInAllReplicas(5, getNonLeaderReplias(collectionName), 10); //timeout for stale collection state checkRTG(3,7, cluster.getJettySolrRunners()); - waitForNumDocsInAllReplicas(5, getNonLeaderReplias(collectionName), 0);// Should be immediate cluster.getSolrClient().commit(collectionName); // Test replica recovery apply buffer updates @@ -910,4 +903,23 @@ public class TestTlogReplica extends SolrCloudTestCase { fail("Some replicas are not in sync with leader"); } + + private void assertCopyOverOldUpdates(long delta, Map timesPerCore) { + for (SolrCore core : timesPerCore.keySet()) { + assertEquals(timesPerCore.get(core) + delta, getTimesCopyOverOldUpdates(core)); + } + } + + private Map getTimesCopyOverOldUpdates(List cores) { + Map timesPerCore = new HashMap<>(); + for (SolrCore core : cores) { + long times = getTimesCopyOverOldUpdates(core); + timesPerCore.put(core, times); + } + return timesPerCore; + } + + private long getTimesCopyOverOldUpdates(SolrCore core) { + return ((Meter)core.getMetricRegistry().getMetrics().get("TLOG.copyOverOldUpdates.ops")).getCount(); + } }