SOLR-11293: Fix downgrade in performance from precommit

This commit is contained in:
Cao Manh Dat 2017-09-27 22:46:55 +07:00
parent 61d1e96b80
commit cc9c867083
9 changed files with 66 additions and 67 deletions

View File

@ -134,7 +134,7 @@ Bug Fixes
* SOLR-11278: Stopping CDCR should cancel a running bootstrap operation. (Amrit Sarkar, shalin)
* SOLR-11293: Potential data loss in TLOG replicas after replication failures (noble)
* SOLR-11293: Potential data loss in TLOG replicas when masterVersion equals zero (noble, Cao Manh Dat)
* SOLR-10101: TestLazyCores hangs (Erick Erickson)

View File

@ -214,6 +214,7 @@ public class RecoveryStrategy implements Runnable, Closeable {
ModifiableSolrParams solrParams = new ModifiableSolrParams();
solrParams.set(ReplicationHandler.MASTER_URL, leaderUrl);
solrParams.set(ReplicationHandler.SKIP_COMMIT_ON_MASTER_VERSION_ZERO, replicaType == Replica.Type.TLOG);
if (isClosed()) return; // we check closed on return
boolean success = replicationHandler.doFetch(solrParams, false).getSuccessful();

View File

@ -26,6 +26,7 @@ import org.apache.solr.common.util.NamedList;
import org.apache.solr.core.CoreContainer;
import org.apache.solr.core.SolrConfig;
import org.apache.solr.core.SolrCore;
import org.apache.solr.handler.IndexFetcher;
import org.apache.solr.handler.ReplicationHandler;
import org.apache.solr.request.LocalSolrQueryRequest;
import org.apache.solr.request.SolrQueryRequest;
@ -74,6 +75,7 @@ public class ReplicateFromLeader {
NamedList<Object> slaveConfig = new NamedList<>();
slaveConfig.add("fetchFromLeader", Boolean.TRUE);
slaveConfig.add(ReplicationHandler.SKIP_COMMIT_ON_MASTER_VERSION_ZERO, switchTransactionLog);
slaveConfig.add("pollInterval", pollIntervalStr);
NamedList<Object> replicationConfig = new NamedList<>();
replicationConfig.add("slave", slaveConfig);
@ -85,10 +87,11 @@ public class ReplicateFromLeader {
replicationProcess = new ReplicationHandler();
if (switchTransactionLog) {
replicationProcess.setPollListener((solrCore, pollSuccess) -> {
if (pollSuccess) {
replicationProcess.setPollListener((solrCore, fetchResult) -> {
if (fetchResult == IndexFetcher.IndexFetchResult.INDEX_FETCH_SUCCESS) {
String commitVersion = getCommitVersion(core);
if (commitVersion == null) return;
if (Long.parseLong(commitVersion) == lastVersion) return;
UpdateLog updateLog = solrCore.getUpdateHandler().getUpdateLog();
SolrQueryRequest req = new LocalSolrQueryRequest(core,
new ModifiableSolrParams());

View File

@ -989,7 +989,7 @@ public class ZkController {
if (isTlogReplicaAndNotLeader) {
String commitVersion = ReplicateFromLeader.getCommitVersion(core);
if (commitVersion != null) {
ulog.copyOverOldUpdates(Long.parseLong(commitVersion), true);
ulog.copyOverOldUpdates(Long.parseLong(commitVersion));
}
}
// we will call register again after zk expiration and on reload

View File

@ -163,6 +163,8 @@ public class IndexFetcher {
private Integer soTimeout;
private boolean skipCommitOnMasterVersionZero;
private static final String INTERRUPT_RESPONSE_MESSAGE = "Interrupted while waiting for modify lock";
public static class IndexFetchResult {
@ -226,6 +228,10 @@ public class IndexFetcher {
if (fetchFromLeader != null && fetchFromLeader instanceof Boolean) {
this.fetchFromLeader = (boolean) fetchFromLeader;
}
Object skipCommitOnMasterVersionZero = initArgs.get(SKIP_COMMIT_ON_MASTER_VERSION_ZERO);
if (skipCommitOnMasterVersionZero != null && skipCommitOnMasterVersionZero instanceof Boolean) {
this.skipCommitOnMasterVersionZero = (boolean) skipCommitOnMasterVersionZero;
}
String masterUrl = (String) initArgs.get(MASTER_URL);
if (masterUrl == null && !this.fetchFromLeader)
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
@ -428,7 +434,7 @@ public class IndexFetcher {
LOG.info("Slave's version: " + IndexDeletionPolicyWrapper.getCommitTimestamp(commit));
if (latestVersion == 0L) {
if (forceReplication && commit.getGeneration() != 0) {
if (commit.getGeneration() != 0) {
// since we won't get the files for an empty index,
// we just clear ours and commit
LOG.info("New index in Master. Deleting mine...");
@ -438,8 +444,12 @@ public class IndexFetcher {
} finally {
iw.decref();
}
SolrQueryRequest req = new LocalSolrQueryRequest(solrCore, new ModifiableSolrParams());
solrCore.getUpdateHandler().commit(new CommitUpdateCommand(req, false));
if (skipCommitOnMasterVersionZero) {
openNewSearcherAndUpdateCommitPoint();
} else {
SolrQueryRequest req = new LocalSolrQueryRequest(solrCore, new ModifiableSolrParams());
solrCore.getUpdateHandler().commit(new CommitUpdateCommand(req, false));
}
}
//there is nothing to be replicated

View File

@ -220,7 +220,7 @@ public class ReplicationHandler extends RequestHandlerBase implements SolrCoreAw
private PollListener pollListener;
public interface PollListener {
void onComplete(SolrCore solrCore, boolean pollSuccess) throws IOException;
void onComplete(SolrCore solrCore, IndexFetchResult fetchResult) throws IOException;
}
/**
@ -1182,8 +1182,8 @@ public class ReplicationHandler extends RequestHandlerBase implements SolrCoreAw
try {
LOG.debug("Polling for index modifications");
markScheduledExecutionStart();
boolean pollSuccess = doFetch(null, false).getSuccessful();
if (pollListener != null) pollListener.onComplete(core, pollSuccess);
IndexFetchResult fetchResult = doFetch(null, false);
if (pollListener != null) pollListener.onComplete(core, fetchResult);
} catch (Exception e) {
LOG.error("Exception in fetching index", e);
}
@ -1754,6 +1754,10 @@ public class ReplicationHandler extends RequestHandlerBase implements SolrCoreAw
public static final String FETCH_FROM_LEADER = "fetchFromLeader";
// in case of TLOG replica, if masterVersion = zero, don't do commit
// otherwise updates from current tlog won't copied over properly to the new tlog, leading to data loss
public static final String SKIP_COMMIT_ON_MASTER_VERSION_ZERO = "skipCommitOnMasterVersionZero";
public static final String STATUS = "status";
public static final String COMMAND = "command";

View File

@ -20,7 +20,6 @@ package org.apache.solr.handler.admin;
import java.lang.invoke.MethodHandles;
import java.util.Objects;
import org.apache.lucene.search.MatchAllDocsQuery;
import org.apache.solr.cloud.CloudDescriptor;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.cloud.ClusterState;
@ -29,15 +28,10 @@ import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.params.CoreAdminParams;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.params.SolrParams;
import org.apache.solr.core.CoreContainer;
import org.apache.solr.core.SolrCore;
import org.apache.solr.handler.admin.CoreAdminHandler.CallInfo;
import org.apache.solr.request.LocalSolrQueryRequest;
import org.apache.solr.search.SolrIndexSearcher;
import org.apache.solr.update.CommitUpdateCommand;
import org.apache.solr.util.RefCounted;
import org.apache.solr.util.TestInjection;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -177,33 +171,6 @@ class PrepRecoveryOp implements CoreAdminHandler.CoreAdminOp {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
"Solr is shutting down");
}
// solrcloud_debug
if (log.isDebugEnabled() && core != null) {
try {
LocalSolrQueryRequest r = new LocalSolrQueryRequest(core,
new ModifiableSolrParams());
CommitUpdateCommand commitCmd = new CommitUpdateCommand(r, false);
commitCmd.softCommit = true;
core.getUpdateHandler().commit(commitCmd);
RefCounted<SolrIndexSearcher> searchHolder = core
.getNewestSearcher(false);
SolrIndexSearcher searcher = searchHolder.get();
try {
log.debug(core.getCoreContainer()
.getZkController().getNodeName()
+ " to replicate "
+ searcher.search(new MatchAllDocsQuery(), 1).totalHits
+ " gen:"
+ core.getDeletionPolicy().getLatestCommit().getGeneration()
+ " data:" + core.getDataDir());
} finally {
searchHolder.decref();
}
} catch (Exception e) {
log.debug("Error in solrcloud_debug block", e);
}
}
}
Thread.sleep(1000);
}

View File

@ -244,6 +244,7 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer {
protected Gauge<Integer> bufferedOpsGauge;
protected Meter applyingBufferedOpsMeter;
protected Meter replayOpsMeter;
protected Meter copyOverOldUpdatesMeter;
public static class LogPtr {
final long pointer;
@ -435,6 +436,7 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer {
manager.registerGauge(null, registry, () -> getTotalLogsSize(), true, "bytes", scope, "replay", "remaining");
applyingBufferedOpsMeter = manager.meter(null, registry, "ops", scope, "applyingBuffered");
replayOpsMeter = manager.meter(null, registry, "ops", scope, "replay");
copyOverOldUpdatesMeter = manager.meter(null, registry, "ops", scope, "copyOverOldUpdates");
manager.registerGauge(null, registry, () -> state.getValue(), true, "state", scope);
}
@ -1158,12 +1160,12 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer {
protected void copyAndSwitchToNewTlog(CommitUpdateCommand cuc) {
synchronized (this) {
if (tlog == null && prevTlog == null && prevMapLog2 == null && logs.isEmpty()) {
if (tlog == null) {
return;
}
preCommit(cuc);
try {
copyOverOldUpdates(cuc.getVersion(), false);
copyOverOldUpdates(cuc.getVersion());
} finally {
postCommit(cuc);
}
@ -1173,9 +1175,8 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer {
/**
* Copy over updates from prevTlog or last tlog (in tlog folder) to a new tlog
* @param commitVersion any updates that have version larger than the commitVersion will be copied over
* @param omitCommitted if a tlog is already committed then don't read it
*/
public void copyOverOldUpdates(long commitVersion, boolean omitCommitted) {
public void copyOverOldUpdates(long commitVersion) {
TransactionLog oldTlog = prevTlog;
if (oldTlog == null && !logs.isEmpty()) {
oldTlog = logs.getFirst();
@ -1185,11 +1186,12 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer {
}
try {
if (omitCommitted && oldTlog.endsWithCommit()) return;
if (oldTlog.endsWithCommit()) return;
} catch (IOException e) {
log.warn("Exception reading log", e);
return;
}
copyOverOldUpdatesMeter.mark();
SolrQueryRequest req = new LocalSolrQueryRequest(uhandler.core,
new ModifiableSolrParams());

View File

@ -22,6 +22,7 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
@ -29,6 +30,8 @@ import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import com.carrotsearch.randomizedtesting.annotations.Repeat;
import com.codahale.metrics.Meter;
import org.apache.http.HttpResponse;
import org.apache.http.client.ClientProtocolException;
import org.apache.http.client.HttpClient;
@ -60,8 +63,6 @@ import org.apache.solr.common.util.NamedList;
import org.apache.solr.core.SolrCore;
import org.apache.solr.update.DirectUpdateHandler2;
import org.apache.solr.update.SolrIndexWriter;
import org.apache.solr.update.UpdateHandler;
import org.apache.solr.update.UpdateLog;
import org.apache.solr.util.RefCounted;
import org.apache.solr.util.TestInjection;
import org.apache.solr.util.TimeOut;
@ -71,8 +72,6 @@ import org.junit.BeforeClass;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.carrotsearch.randomizedtesting.annotations.Repeat;
@Slow
public class TestTlogReplica extends SolrCloudTestCase {
@ -464,16 +463,13 @@ public class TestTlogReplica extends SolrCloudTestCase {
.process(cloudClient, collectionName);
{
UpdateHandler updateHandler = getSolrCore(true).get(0).getUpdateHandler();
RefCounted<IndexWriter> iwRef = updateHandler.getSolrCoreState().getIndexWriter(null);
assertTrue("IndexWriter at leader must see updates ", iwRef.get().hasUncommittedChanges());
iwRef.decref();
long docsPending = (long) getSolrCore(true).get(0).getMetricRegistry().getGauges().get("UPDATE.updateHandler.docsPending").getValue();
assertEquals(4, docsPending);
}
for (SolrCore solrCore : getSolrCore(false)) {
RefCounted<IndexWriter> iwRef = solrCore.getUpdateHandler().getSolrCoreState().getIndexWriter(null);
assertFalse("IndexWriter at replicas must not see updates ", iwRef.get().hasUncommittedChanges());
iwRef.decref();
long docsPending = (long) solrCore.getMetricRegistry().getGauges().get("UPDATE.updateHandler.docsPending").getValue();
assertEquals(0, docsPending);
}
checkRTG(1, 4, cluster.getJettySolrRunners());
@ -486,16 +482,12 @@ public class TestTlogReplica extends SolrCloudTestCase {
// The DBQ is not processed at replicas, so we still can get doc2 and other docs by RTG
checkRTG(2,4, getSolrRunner(false));
Map<SolrCore, Long> timeCopyOverPerCores = getTimesCopyOverOldUpdates(getSolrCore(false));
new UpdateRequest()
.commit(cloudClient, collectionName);
waitForNumDocsInAllActiveReplicas(2);
// Update log roll over
for (SolrCore solrCore : getSolrCore(false)) {
UpdateLog updateLog = solrCore.getUpdateHandler().getUpdateLog();
assertFalse(updateLog.hasUncommittedChanges());
}
assertCopyOverOldUpdates(1, timeCopyOverPerCores);
// UpdateLog copy over old updates
for (int i = 15; i <= 150; i++) {
@ -506,6 +498,7 @@ public class TestTlogReplica extends SolrCloudTestCase {
}
checkRTG(120,150, cluster.getJettySolrRunners());
waitForReplicasCatchUp(20);
assertCopyOverOldUpdates(2, timeCopyOverPerCores);
}
@SuppressWarnings("unchecked")
@ -535,7 +528,7 @@ public class TestTlogReplica extends SolrCloudTestCase {
waitForState("Replica didn't recover", collectionName, activeReplicaCount(0,2,0));
// We skip peerSync, so replica will always trigger commit on leader
// We query only the non-leader replicas, since we haven't opened a new searcher on the leader yet
waitForNumDocsInAllReplicas(4, getNonLeaderReplias(collectionName), 0);// Should be immediate
waitForNumDocsInAllReplicas(4, getNonLeaderReplias(collectionName), 10); //timeout for stale collection state
// If I add the doc immediately, the leader fails to communicate with the follower with broken pipe.
// Options are, wait or retry...
@ -556,8 +549,8 @@ public class TestTlogReplica extends SolrCloudTestCase {
DirectUpdateHandler2.commitOnClose = true;
ChaosMonkey.start(solrRunner);
waitForState("Replica didn't recover", collectionName, activeReplicaCount(0,2,0));
waitForNumDocsInAllReplicas(5, getNonLeaderReplias(collectionName), 10); //timeout for stale collection state
checkRTG(3,7, cluster.getJettySolrRunners());
waitForNumDocsInAllReplicas(5, getNonLeaderReplias(collectionName), 0);// Should be immediate
cluster.getSolrClient().commit(collectionName);
// Test replica recovery apply buffer updates
@ -910,4 +903,23 @@ public class TestTlogReplica extends SolrCloudTestCase {
fail("Some replicas are not in sync with leader");
}
private void assertCopyOverOldUpdates(long delta, Map<SolrCore, Long> timesPerCore) {
for (SolrCore core : timesPerCore.keySet()) {
assertEquals(timesPerCore.get(core) + delta, getTimesCopyOverOldUpdates(core));
}
}
private Map<SolrCore, Long> getTimesCopyOverOldUpdates(List<SolrCore> cores) {
Map<SolrCore, Long> timesPerCore = new HashMap<>();
for (SolrCore core : cores) {
long times = getTimesCopyOverOldUpdates(core);
timesPerCore.put(core, times);
}
return timesPerCore;
}
private long getTimesCopyOverOldUpdates(SolrCore core) {
return ((Meter)core.getMetricRegistry().getMetrics().get("TLOG.copyOverOldUpdates.ops")).getCount();
}
}