SOLR-6640: Close searchers before rollback and recovery to avoid index corruption

git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1653281 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Shalin Shekhar Mangar 2015-01-20 16:19:38 +00:00
parent e65eb74239
commit 8a2ef93467
2 changed files with 160 additions and 6 deletions

View File

@ -56,10 +56,12 @@ import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
@ -78,6 +80,8 @@ import org.apache.commons.io.IOUtils;
import org.apache.http.client.HttpClient;
import org.apache.lucene.index.IndexCommit;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.SegmentCommitInfo;
import org.apache.lucene.index.SegmentInfos;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.IOContext;
import org.apache.lucene.store.IndexInput;
@ -405,13 +409,41 @@ public class SnapPuller {
}
if (!isFullCopyNeeded) {
// rollback - and do it before we download any files
// so we don't remove files we thought we didn't need
// to download later
solrCore.getUpdateHandler().getSolrCoreState()
.closeIndexWriter(core, true);
// a searcher might be using some flushed but committed segments
// because of soft commits (which open a searcher on IW's data)
// so we need to close the existing searcher on the last commit
// and wait until we are able to clean up all unused lucene files
if (solrCore.getCoreDescriptor().getCoreContainer().isZooKeeperAware()) {
solrCore.closeSearcher();
}
// rollback and reopen index writer and wait until all unused files
// are successfully deleted
solrCore.getUpdateHandler().newIndexWriter(true);
RefCounted<IndexWriter> writer = solrCore.getUpdateHandler().getSolrCoreState().getIndexWriter(null);
try {
IndexWriter indexWriter = writer.get();
int c = 0;
indexWriter.deleteUnusedFiles();
while (hasUnusedFiles(indexDir, commit)) {
indexWriter.deleteUnusedFiles();
LOG.info("Sleeping for 1000ms to wait for unused lucene index files to be delete-able");
Thread.sleep(1000);
c++;
if (c >= 30) {
LOG.warn("SnapPuller unable to cleanup unused lucene index files so we must do a full copy instead");
isFullCopyNeeded = true;
break;
}
}
if (c > 0) {
LOG.info("SnapPuller slept for " + (c * 1000) + "ms for unused lucene index files to be delete-able");
}
} finally {
writer.decref();
}
solrCore.getUpdateHandler().getSolrCoreState().closeIndexWriter(core, true);
}
boolean reloadCore = false;
try {
@ -542,6 +574,24 @@ public class SnapPuller {
}
}
private boolean hasUnusedFiles(Directory indexDir, IndexCommit commit) throws IOException {
Set<String> currentFiles = new HashSet<>();
String segmentsFileName = commit.getSegmentsFileName();
SegmentInfos infos = SegmentInfos.readCommit(indexDir, segmentsFileName);
for (SegmentCommitInfo info : infos.asList()) {
Set<String> files = info.info.files(); // All files that belong to this segment
currentFiles.addAll(files);
}
String[] allFiles = indexDir.listAll();
for (String file : allFiles) {
if (!file.equals(segmentsFileName) && !currentFiles.contains(file)) {
LOG.info("Found unused file: " + file);
return true;
}
}
return false;
}
private volatile Exception fsyncException;
/**

View File

@ -0,0 +1,104 @@
package org.apache.solr.cloud;
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import java.io.File;
import java.util.List;
import org.apache.solr.client.solrj.embedded.JettySolrRunner;
import org.apache.solr.client.solrj.request.AbstractUpdateRequest;
import org.apache.solr.client.solrj.request.UpdateRequest;
import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.common.cloud.Replica;
import org.junit.AfterClass;
import org.junit.BeforeClass;
// See SOLR-6640
public class RecoveryAfterSoftCommitTest extends AbstractFullDistribZkTestBase {
public RecoveryAfterSoftCommitTest() {
fixShardCount = true;
sliceCount = 1;
shardCount = 2;
}
@BeforeClass
public static void beforeTests() {
System.setProperty("solr.tests.maxBufferedDocs", "2");
}
@AfterClass
public static void afterTest() {
System.clearProperty("solr.tests.maxBufferedDocs");
}
/**
* Overrides the parent implementation to install a SocketProxy in-front of the Jetty server.
*/
@Override
public JettySolrRunner createJetty(File solrHome, String dataDir,
String shardList, String solrConfigOverride, String schemaOverride)
throws Exception
{
return createProxiedJetty(solrHome, dataDir, shardList, solrConfigOverride, schemaOverride);
}
@Override
public void doTest() throws Exception {
// flush twice
for (int i=0; i<4; i++) {
SolrInputDocument document = new SolrInputDocument();
document.addField("id", String.valueOf(i));
document.addField("a_t", "text_" + i);
cloudClient.add(document);
}
// soft-commit so searchers are open on un-committed but flushed segment files
AbstractUpdateRequest request = new UpdateRequest().setAction(AbstractUpdateRequest.ACTION.COMMIT, true, true, true);
cloudClient.request(request);
Replica notLeader = ensureAllReplicasAreActive(DEFAULT_COLLECTION, "shard1", 1, 2, 30).get(0);
// ok, now introduce a network partition between the leader and the replica
SocketProxy proxy = getProxyForReplica(notLeader);
proxy.close();
// add more than 100 docs so that peer sync cannot be used for recovery
for (int i=5; i<115; i++) {
SolrInputDocument document = new SolrInputDocument();
document.addField("id", String.valueOf(i));
document.addField("a_t", "text_" + i);
cloudClient.add(document);
}
// Have the partition last at least 1 sec
// While this gives the impression that recovery is timing related, this is
// really only
// to give time for the state to be written to ZK before the test completes.
// In other words,
// without a brief pause, the test finishes so quickly that it doesn't give
// time for the recovery process to kick-in
Thread.sleep(2000L);
proxy.reopen();
List<Replica> notLeaders =
ensureAllReplicasAreActive(DEFAULT_COLLECTION, "shard1", 1, 2, 30);
}
}