SOLR-7134: Replication can still cause index corruption.

git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1668779 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Mark Robert Miller 2015-03-24 03:19:37 +00:00
parent 84adc8604d
commit 57b47fed01
16 changed files with 177 additions and 75 deletions

View File

@ -282,6 +282,8 @@ Bug Fixes
* SOLR-7286: Using HDFS's FileSystem.newInstance does not guarantee a new instance.
(Mark Miller)
* SOLR-7134: Replication can still cause index corruption. (Mark Miller, shalin, Mike Drob)
Optimizations
----------------------

View File

@ -305,7 +305,7 @@ final class ShardLeaderElectionContext extends ShardLeaderElectionContextBase {
searchHolder.decref();
}
} catch (Exception e) {
throw new SolrException(ErrorCode.SERVER_ERROR, null, e);
log.error("Error in solrcloud_debug block", e);
}
}
if (!success) {

View File

@ -160,8 +160,7 @@ public class RecoveryStrategy extends Thread implements ClosableThread {
boolean success = replicationHandler.doFetch(solrParams, false);
if (!success) {
throw new SolrException(ErrorCode.SERVER_ERROR,
"Replication for recovery failed.");
throw new SolrException(ErrorCode.SERVER_ERROR, "Replication for recovery failed.");
}
// solrcloud_debug
@ -179,7 +178,7 @@ public class RecoveryStrategy extends Thread implements ClosableThread {
+ " from "
+ leaderUrl
+ " gen:"
+ core.getDeletionPolicy().getLatestCommit().getGeneration()
+ core.getDeletionPolicy().getLatestCommit() != null ? "null" : core.getDeletionPolicy().getLatestCommit().getGeneration()
+ " data:" + core.getDataDir()
+ " index:" + core.getIndexDir()
+ " newIndex:" + core.getNewIndexDir()
@ -189,7 +188,7 @@ public class RecoveryStrategy extends Thread implements ClosableThread {
searchHolder.decref();
}
} catch (Exception e) {
throw new SolrException(ErrorCode.SERVER_ERROR, null, e);
log.debug("Error in solrcloud_debug block", e);
}
}
@ -409,7 +408,7 @@ public class RecoveryStrategy extends Thread implements ClosableThread {
searchHolder.decref();
}
} catch (Exception e) {
throw new SolrException(ErrorCode.SERVER_ERROR, null, e);
log.debug("Error in solrcloud_debug block", e);
}
}
@ -557,7 +556,7 @@ public class RecoveryStrategy extends Thread implements ClosableThread {
searchHolder.decref();
}
} catch (Exception e) {
throw new SolrException(ErrorCode.SERVER_ERROR, null, e);
log.debug("Error in solrcloud_debug block", e);
}
}

View File

@ -245,25 +245,37 @@ public class IndexFetcher {
}
}
private boolean successfulInstall = false;
boolean fetchLatestIndex(final SolrCore core, boolean forceReplication) throws IOException, InterruptedException {
return fetchLatestIndex(core, forceReplication, false);
}
/**
* This command downloads all the necessary files from master to install a index commit point. Only changed files are
* downloaded. It also downloads the conf files (if they are modified).
*
* @param core the SolrCore
* @param forceReplication force a replication in all cases
* @param forceCoreReload force a core reload in all cases
* @return true on success, false if slave is already in sync
* @throws IOException if an exception occurs
*/
boolean fetchLatestIndex(final SolrCore core, boolean forceReplication) throws IOException, InterruptedException {
successfulInstall = false;
boolean fetchLatestIndex(final SolrCore core, boolean forceReplication, boolean forceCoreReload) throws IOException, InterruptedException {
boolean cleanupDone = false;
boolean successfulInstall = false;
replicationStartTime = System.currentTimeMillis();
Directory tmpIndexDir = null;
String tmpIndex = null;
Directory indexDir = null;
String indexDirPath = null;
boolean deleteTmpIdxDir = true;
if (!core.getSolrCoreState().getLastReplicateIndexSuccess()) {
// if the last replication was not a success, we force a full replication
// when we are a bit more confident we may want to try a partial replication
// if the error is connection related or something, but we have to be careful
forceReplication = true;
}
try {
//get the current 'replicateable' index version in the master
NamedList response = null;
@ -404,6 +416,7 @@ public class IndexFetcher {
+ " secs");
Collection<Map<String,Object>> modifiedConfFiles = getModifiedConfFiles(confFilesToDownload);
if (!modifiedConfFiles.isEmpty()) {
reloadCore = true;
downloadConfFiles(confFilesToDownload, latestGeneration);
if (isFullCopyNeeded) {
successfulInstall = modifyIndexProps(tmpIdxDirName);
@ -426,7 +439,6 @@ public class IndexFetcher {
logReplicationTimeAndConfFiles(modifiedConfFiles,
successfulInstall);// write to a file time of replication and
// conf files.
reloadCore = true;
}
} else {
terminateAndWaitFsyncService();
@ -448,7 +460,8 @@ public class IndexFetcher {
}
// we must reload the core after we open the IW back up
if (reloadCore) {
if (successfulInstall && (reloadCore || forceCoreReload)) {
LOG.info("Reloading SolrCore {}", core.getName());
reloadCore();
}
@ -469,6 +482,16 @@ public class IndexFetcher {
openNewSearcherAndUpdateCommitPoint();
}
if (!isFullCopyNeeded && !forceReplication && !successfulInstall) {
cleanup(core, tmpIndexDir, indexDir, deleteTmpIdxDir, successfulInstall);
cleanupDone = true;
// we try with a full copy of the index
LOG.warn(
"Replication attempt was not successful - trying a full index replication reloadCore={}",
reloadCore);
successfulInstall = fetchLatestIndex(core, true, reloadCore);
}
replicationStartTime = 0;
return successfulInstall;
} catch (ReplicationHandlerException e) {
@ -482,41 +505,51 @@ public class IndexFetcher {
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Index fetch failed : ", e);
}
} finally {
try {
if (!successfulInstall) {
try {
logReplicationTimeAndConfFiles(null, successfulInstall);
} catch(Exception e) {
LOG.error("caught", e);
}
}
filesToDownload = filesDownloaded = confFilesDownloaded = confFilesToDownload = null;
replicationStartTime = 0;
dirFileFetcher = null;
localFileFetcher = null;
if (fsyncService != null && !fsyncService.isShutdown()) fsyncService
.shutdownNow();
fsyncService = null;
stop = false;
fsyncException = null;
} finally {
if (deleteTmpIdxDir && tmpIndexDir != null) {
try {
core.getDirectoryFactory().doneWithDirectory(tmpIndexDir);
core.getDirectoryFactory().remove(tmpIndexDir);
} catch (IOException e) {
SolrException.log(LOG, "Error removing directory " + tmpIndexDir, e);
}
}
if (!cleanupDone) {
cleanup(core, tmpIndexDir, indexDir, deleteTmpIdxDir, successfulInstall);
}
}
}
if (tmpIndexDir != null) {
core.getDirectoryFactory().release(tmpIndexDir);
private void cleanup(final SolrCore core, Directory tmpIndexDir,
Directory indexDir, boolean deleteTmpIdxDir, boolean successfulInstall) throws IOException {
try {
if (!successfulInstall) {
try {
logReplicationTimeAndConfFiles(null, successfulInstall);
} catch (Exception e) {
LOG.error("caught", e);
}
if (indexDir != null) {
core.getDirectoryFactory().release(indexDir);
}
core.getUpdateHandler().getSolrCoreState().setLastReplicateIndexSuccess(successfulInstall);
filesToDownload = filesDownloaded = confFilesDownloaded = confFilesToDownload = null;
replicationStartTime = 0;
dirFileFetcher = null;
localFileFetcher = null;
if (fsyncService != null && !fsyncService.isShutdown()) fsyncService
.shutdownNow();
fsyncService = null;
stop = false;
fsyncException = null;
} finally {
if (deleteTmpIdxDir && tmpIndexDir != null) {
try {
core.getDirectoryFactory().doneWithDirectory(tmpIndexDir);
core.getDirectoryFactory().remove(tmpIndexDir);
} catch (IOException e) {
SolrException.log(LOG, "Error removing directory " + tmpIndexDir, e);
}
}
if (tmpIndexDir != null) {
core.getDirectoryFactory().release(tmpIndexDir);
}
if (indexDir != null) {
core.getDirectoryFactory().release(indexDir);
}
}
}

View File

@ -832,7 +832,7 @@ public class CoreAdminHandler extends RequestHandlerBase {
searchHolder.decref();
}
} catch (Exception e) {
throw new SolrException(ErrorCode.SERVER_ERROR, null, e);
log.debug("Error in solrcloud_debug block", e);
}
}
if (!success) {
@ -1011,7 +1011,7 @@ public class CoreAdminHandler extends RequestHandlerBase {
searchHolder.decref();
}
} catch (Exception e) {
throw new SolrException(ErrorCode.SERVER_ERROR, null, e);
log.debug("Error in solrcloud_debug block", e);
}
}
}

View File

@ -29,6 +29,7 @@ import org.apache.lucene.index.IndexableField;
import org.apache.lucene.index.StorableField;
import org.apache.lucene.index.StoredDocument;
import org.apache.lucene.index.Term;
import org.apache.lucene.search.MatchAllDocsQuery;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.BytesRefBuilder;
import org.apache.solr.client.solrj.SolrResponse;
@ -98,6 +99,25 @@ public class RealTimeGetComponent extends SearchComponent
val = params.get("getUpdates");
if (val != null) {
// solrcloud_debug
if (log.isDebugEnabled()) {
try {
RefCounted<SolrIndexSearcher> searchHolder = req.getCore()
.getNewestSearcher(false);
SolrIndexSearcher searcher = searchHolder.get();
try {
log.debug(req.getCore().getCoreDescriptor()
.getCoreContainer().getZkController().getNodeName()
+ " min count to sync to (from most recent searcher view) "
+ searcher.search(new MatchAllDocsQuery(), 1).totalHits);
} finally {
searchHolder.decref();
}
} catch (Exception e) {
log.debug("Error in solrcloud_debug block", e);
}
}
processGetUpdates(rb);
return;
}

View File

@ -53,6 +53,7 @@ public final class DefaultSolrCoreState extends SolrCoreState implements Recover
private volatile boolean recoveryRunning;
private RecoveryStrategy recoveryStrat;
private volatile boolean lastReplicationSuccess = true;
private RefCounted<IndexWriter> refCntWriter;
@ -377,5 +378,14 @@ public final class DefaultSolrCoreState extends SolrCoreState implements Recover
return leaderThrottle;
}
@Override
public boolean getLastReplicateIndexSuccess() {
return lastReplicationSuccess;
}
@Override
public void setLastReplicateIndexSuccess(boolean success) {
this.lastReplicationSuccess = success;
}
}

View File

@ -495,7 +495,7 @@ public class PeerSync {
cmd.setVersion(version);
cmd.setFlags(UpdateCommand.PEER_SYNC | UpdateCommand.IGNORE_AUTOCOMMIT);
if (debug) {
log.debug(msg() + "add " + cmd);
log.debug(msg() + "add " + cmd + " id " + sdoc.getField("id"));
}
proc.processAdd(cmd);
break;
@ -508,7 +508,7 @@ public class PeerSync {
cmd.setVersion(version);
cmd.setFlags(UpdateCommand.PEER_SYNC | UpdateCommand.IGNORE_AUTOCOMMIT);
if (debug) {
log.debug(msg() + "delete " + cmd);
log.debug(msg() + "delete " + cmd + " " + new BytesRef(idBytes).utf8ToString());
}
proc.processDelete(cmd);
break;

View File

@ -146,4 +146,7 @@ public abstract class SolrCoreState {
*/
public abstract ActionThrottle getLeaderThrottle();
public abstract boolean getLastReplicateIndexSuccess();
public abstract void setLastReplicateIndexSuccess(boolean success);
}

View File

@ -1081,6 +1081,7 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
Long lastVersion = vinfo.lookupVersion(cmd.getIndexedId());
if (lastVersion != null && Math.abs(lastVersion) >= versionOnUpdate) {
// This update is a repeat, or was reordered. We need to drop this update.
log.debug("Dropping add update due to version {}", idBytes.utf8ToString());
return true;
}
@ -1568,6 +1569,7 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
Long lastVersion = vinfo.lookupVersion(cmd.getIndexedId());
if (lastVersion != null && Math.abs(lastVersion) >= versionOnUpdate) {
// This update is a repeat, or was reordered. We need to drop this update.
log.debug("Dropping delete update due to version {}", idBytes.utf8ToString());
return true;
}
}

View File

@ -112,8 +112,21 @@ public class ChaosMonkeySafeLeaderTest extends AbstractFullDistribZkTestBase {
List<StopableIndexingThread> threads = new ArrayList<>();
int threadCount = 2;
int batchSize = 1;
if (random().nextBoolean()) {
batchSize = random().nextInt(98) + 2;
}
boolean pauseBetweenUpdates = TEST_NIGHTLY ? random().nextBoolean() : true;
int maxUpdates = -1;
if (!pauseBetweenUpdates) {
maxUpdates = 1000 + random().nextInt(1000);
} else {
maxUpdates = 15000;
}
for (int i = 0; i < threadCount; i++) {
StopableIndexingThread indexThread = new StopableIndexingThread(controlClient, cloudClient, Integer.toString(i), true);
StopableIndexingThread indexThread = new StopableIndexingThread(controlClient, cloudClient, Integer.toString(i), true, maxUpdates, batchSize, pauseBetweenUpdates); // random().nextInt(999) + 1
threads.add(indexThread);
indexThread.start();
}
@ -158,7 +171,7 @@ public class ChaosMonkeySafeLeaderTest extends AbstractFullDistribZkTestBase {
waitForThingsToLevelOut(180000);
checkShardConsistency(true, true);
checkShardConsistency(batchSize == 1, true);
if (VERBOSE) System.out.println("control docs:" + controlClient.query(new SolrQuery("*:*")).getResults().getNumFound() + "\n\n");

View File

@ -72,10 +72,10 @@ public class RecoveryZkTest extends AbstractFullDistribZkTestBase {
maxDoc = maxDocNightlyList[random().nextInt(maxDocList.length - 1)];
}
indexThread = new StopableIndexingThread(controlClient, cloudClient, "1", true, maxDoc);
indexThread = new StopableIndexingThread(controlClient, cloudClient, "1", true, maxDoc, 1, true);
indexThread.start();
indexThread2 = new StopableIndexingThread(controlClient, cloudClient, "2", true, maxDoc);
indexThread2 = new StopableIndexingThread(controlClient, cloudClient, "2", true, maxDoc, 1, true);
indexThread2.start();

View File

@ -18,6 +18,8 @@ import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
import org.apache.lucene.util.LuceneTestCase;
import org.apache.solr.SolrTestCaseJ4;
import org.apache.solr.common.util.IOUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
@ -37,6 +39,7 @@ import org.apache.solr.common.util.IOUtils;
*/
public class HdfsTestUtil {
private static Logger log = LoggerFactory.getLogger(HdfsTestUtil.class);
private static Locale savedLocale;
@ -131,7 +134,15 @@ public class HdfsTestUtil {
if (timer != null) {
timer.cancel();
}
dfsCluster.shutdown();
try {
dfsCluster.shutdown();
} catch (Error e) {
// Added in SOLR-7134
// Rarely, this can fail to either a NullPointerException
// or a class not found exception. The later may fixable
// by adding test dependencies.
log.warn("Exception shutting down dfsCluster", e);
}
}
// TODO: we HACK around HADOOP-9643

View File

@ -105,7 +105,7 @@ public class HdfsWriteToMultipleCollectionsTest extends BasicDistributedZkTest {
CloudSolrClient client = new CloudSolrClient(zkServer.getZkAddress());
client.setDefaultCollection(ACOLLECTION + i);
cloudClients.add(client);
StopableIndexingThread indexThread = new StopableIndexingThread(null, client, "1", true, docCount);
StopableIndexingThread indexThread = new StopableIndexingThread(null, client, "1", true, docCount, 1, true);
threads.add(indexThread);
indexThread.start();
}

View File

@ -301,8 +301,9 @@ public class TestReplicationHandler extends SolrTestCaseJ4 {
details.get("slave"));
// SOLR-2677: assert not false negatives
Object timesFailed = ((NamedList)details.get("slave")).get(IndexFetcher.TIMES_FAILED);
assertEquals("slave has fetch error count",
null, timesFailed);
// SOLR-7134: we can have a fail because some mock index files have no checksum, will
// always be downloaded, and may not be able to be moved into the existing index
assertTrue("slave has fetch error count: " + (String)timesFailed, timesFailed == null || ((String) timesFailed).equals("1"));
if (3 != i) {
// index & fetch

View File

@ -42,18 +42,23 @@ public class StopableIndexingThread extends AbstractFullDistribZkTestBase.Stopab
private SolrClient cloudClient;
private int numDeletes;
private int numAdds;
private List<SolrInputDocument> docs = new ArrayList<SolrInputDocument>();
private int batchSize;
private boolean pauseBetweenUpdates;
public StopableIndexingThread(SolrClient controlClient, SolrClient cloudClient, String id, boolean doDeletes) {
this(controlClient, cloudClient, id, doDeletes, -1);
this(controlClient, cloudClient, id, doDeletes, -1, 1, true);
}
public StopableIndexingThread(SolrClient controlClient, SolrClient cloudClient, String id, boolean doDeletes, int numCycles) {
public StopableIndexingThread(SolrClient controlClient, SolrClient cloudClient, String id, boolean doDeletes, int numCycles, int batchSize, boolean pauseBetweenUpdates) {
super("StopableIndexingThread");
this.controlClient = controlClient;
this.cloudClient = cloudClient;
this.id = id;
this.doDeletes = doDeletes;
this.numCycles = numCycles;
this.batchSize = batchSize;
this.pauseBetweenUpdates = pauseBetweenUpdates;
setDaemon(true);
}
@ -100,8 +105,17 @@ public class StopableIndexingThread extends AbstractFullDistribZkTestBase.Stopab
try {
numAdds++;
indexr("id", id, i1, 50, t1,
SolrInputDocument doc = new SolrInputDocument();
addFields(doc, "id", id, i1, 50, t1,
"to come to the aid of their country.");
addFields(doc, "rnd_b", true);
docs.add(doc);
if (docs.size() >= batchSize) {
indexDocs(docs);
docs.clear();
}
} catch (Exception e) {
addFailed = true;
System.err.println("REQUEST FAILED for id=" + id);
@ -117,10 +131,12 @@ public class StopableIndexingThread extends AbstractFullDistribZkTestBase.Stopab
deletes.add(id);
}
try {
Thread.currentThread().sleep(AbstractFullDistribZkTestBase.random().nextInt(100));
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
if (docs.size() > 0 && pauseBetweenUpdates) {
try {
Thread.currentThread().sleep(AbstractFullDistribZkTestBase.random().nextInt(500) + 50);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
@ -151,26 +167,18 @@ public class StopableIndexingThread extends AbstractFullDistribZkTestBase.Stopab
}
}
protected void indexr(Object... fields) throws Exception {
SolrInputDocument doc = new SolrInputDocument();
addFields(doc, fields);
addFields(doc, "rnd_b", true);
indexDoc(doc);
}
protected void indexDoc(SolrInputDocument doc) throws IOException,
protected void indexDocs(List<SolrInputDocument> docs) throws IOException,
SolrServerException {
if (controlClient != null) {
UpdateRequest req = new UpdateRequest();
req.add(doc);
req.add(docs);
req.setParam("CONTROL", "TRUE");
req.process(controlClient);
}
UpdateRequest ureq = new UpdateRequest();
ureq.add(doc);
ureq.add(docs);
ureq.process(cloudClient);
}