SOLR-1475: Java-based replication doesn't properly reserve its commit point during backups

git-svn-id: https://svn.apache.org/repos/asf/lucene/solr/trunk@823711 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Yonik Seeley 2009-10-09 21:52:42 +00:00
parent 23ea65d32e
commit a247352966
4 changed files with 229 additions and 27 deletions

View File

@ -110,7 +110,7 @@ public class IndexDeletionPolicyWrapper implements IndexDeletionPolicy {
}
/** Release a previously saved commit point */
public synchronized void releaseCommmitPoint(Long indexCommitVersion) {
public synchronized void releaseCommitPoint(Long indexCommitVersion) {
AtomicInteger reserveCount = savedCommits.get(indexCommitVersion);
if (reserveCount == null) return;// this should not happen
if (reserveCount.decrementAndGet() <= 0) {

View File

@ -65,7 +65,7 @@ import java.util.zip.DeflaterOutputStream;
*/
public class ReplicationHandler extends RequestHandlerBase implements SolrCoreAware {
private static final Logger LOG = LoggerFactory.getLogger(ReplicationHandler.class.getName());
private SolrCore core;
SolrCore core;
private SnapPuller snapPuller;
@ -276,12 +276,15 @@ public class ReplicationHandler extends RequestHandlerBase implements SolrCoreAw
private void doSnapShoot(SolrParams params, SolrQueryResponse rsp, SolrQueryRequest req) {
try {
IndexCommit indexCommit = core.getDeletionPolicy().getLatestCommit();
IndexDeletionPolicyWrapper delPolicy = core.getDeletionPolicy();
IndexCommit indexCommit = delPolicy.getLatestCommit();
// race?
delPolicy.setReserveDuration(indexCommit.getVersion(), reserveCommitDuration);
if(indexCommit == null) {
indexCommit = req.getSearcher().getReader().getIndexCommit();
}
if (indexCommit != null) {
new SnapShooter(core, params.get("location")).createSnapAsync(indexCommit.getFileNames(), this);
new SnapShooter(core, params.get("location")).createSnapAsync(indexCommit, this);
}
} catch (Exception e) {
LOG.warn("Exception during creating a snapshot", e);
@ -687,10 +690,13 @@ public class ReplicationHandler extends RequestHandlerBase implements SolrCoreAw
details.add("master", master);
if (isSlave && showSlaveDetails)
details.add("slave", slave);
NamedList snapshotStats = snapShootDetails;
if (snapshotStats != null)
details.add(CMD_BACKUP, snapshotStats);
}
NamedList snapshotStats = snapShootDetails;
if (snapshotStats != null)
details.add(CMD_BACKUP, snapshotStats);
return details;
}
@ -915,13 +921,13 @@ public class ReplicationHandler extends RequestHandlerBase implements SolrCoreAw
indexCommitPoint = core.getDeletionPolicy().getLatestCommit();
core.getDeletionPolicy().saveCommitPoint(indexCommitPoint.getVersion());
if(oldCommitPoint != null){
core.getDeletionPolicy().releaseCommmitPoint(oldCommitPoint.getVersion());
core.getDeletionPolicy().releaseCommitPoint(oldCommitPoint.getVersion());
}
}
if (snapshoot) {
try {
SnapShooter snapShooter = new SnapShooter(core, null);
snapShooter.createSnapAsync(core.getDeletionPolicy().getLatestCommit().getFileNames(), ReplicationHandler.this);
snapShooter.createSnapAsync(core.getDeletionPolicy().getLatestCommit(), ReplicationHandler.this);
} catch (Exception e) {
LOG.error("Exception while snapshooting", e);
}

View File

@ -16,20 +16,24 @@
*/
package org.apache.solr.handler;
import org.apache.commons.io.FileUtils;
import org.apache.lucene.store.Lock;
import org.apache.lucene.store.SimpleFSLockFactory;
import org.apache.solr.core.SolrCore;
import org.apache.solr.common.util.NamedList;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Collection;
import java.util.Date;
import java.util.ArrayList;
import org.apache.commons.io.IOUtils;
import org.apache.lucene.index.IndexCommit;
import org.apache.lucene.store.Lock;
import org.apache.lucene.store.SimpleFSLockFactory;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.core.IndexDeletionPolicyWrapper;
import org.apache.solr.core.SolrCore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* <p/> Provides functionality equivalent to the snapshooter script </p>
@ -42,7 +46,7 @@ public class SnapShooter {
private String snapDir = null;
private SolrCore solrCore;
private SimpleFSLockFactory lockFactory;
public SnapShooter(SolrCore core, String location) throws IOException {
solrCore = core;
if (location == null) snapDir = core.getDataDir();
@ -55,15 +59,17 @@ public class SnapShooter {
lockFactory = new SimpleFSLockFactory(snapDir);
}
void createSnapAsync(final Collection<String> files, final ReplicationHandler replicationHandler) {
void createSnapAsync(final IndexCommit indexCommit, final ReplicationHandler replicationHandler) {
replicationHandler.core.getDeletionPolicy().saveCommitPoint(indexCommit.getVersion());
new Thread() {
public void run() {
createSnapshot(files, replicationHandler);
createSnapshot(indexCommit, replicationHandler);
}
}.start();
}
void createSnapshot(Collection<String> files, ReplicationHandler replicationHandler) {
void createSnapshot(final IndexCommit indexCommit, ReplicationHandler replicationHandler) {
NamedList details = new NamedList();
details.add("startTime", new Date().toString());
File snapShotDir = null;
@ -79,9 +85,10 @@ public class SnapShooter {
LOG.warn("Unable to create snapshot directory: " + snapShotDir.getAbsolutePath());
return;
}
for (String indexFile : files) {
FileUtils.copyFileToDirectory(new File(solrCore.getIndexDir(), indexFile), snapShotDir, true);
}
Collection<String> files = indexCommit.getFileNames();
FileCopier fileCopier = new FileCopier(solrCore.getDeletionPolicy(), indexCommit);
fileCopier.copyFiles(files, snapShotDir);
details.add("fileCount", files.size());
details.add("status", "success");
details.add("snapshotCompletedAt", new Date().toString());
@ -90,7 +97,8 @@ public class SnapShooter {
LOG.error("Exception while creating snapshot", e);
details.add("snapShootException", e.getMessage());
} finally {
replicationHandler.snapShootDetails = details;
replicationHandler.core.getDeletionPolicy().releaseCommitPoint(indexCommit.getVersion());
replicationHandler.snapShootDetails = details;
if (lock != null) {
try {
lock.release();
@ -103,4 +111,89 @@ public class SnapShooter {
public static final String SNAP_DIR = "snapDir";
public static final String DATE_FMT = "yyyyMMddhhmmss";
private class FileCopier {
private static final int DEFAULT_BUFFER_SIZE = 32768;
private byte[] buffer = new byte[DEFAULT_BUFFER_SIZE];
private IndexCommit indexCommit;
private IndexDeletionPolicyWrapper delPolicy;
private int reserveTime;
public FileCopier(IndexDeletionPolicyWrapper delPolicy, IndexCommit commit) {
this.delPolicy = delPolicy;
this.indexCommit = commit;
this.reserveTime = reserveTime;
}
public void copyFiles(Collection<String> files, File destDir) throws IOException {
for (String indexFile : files) {
File source = new File(solrCore.getIndexDir(), indexFile);
copyFile(source, new File(destDir, source.getName()), true);
}
}
public void copyFile(File source, File destination, boolean preserveFileDate)
throws IOException {
// check source exists
if (!source.exists()) {
String message = "File " + source + " does not exist";
throw new FileNotFoundException(message);
}
// does destinations directory exist ?
if (destination.getParentFile() != null
&& !destination.getParentFile().exists()) {
destination.getParentFile().mkdirs();
}
// make sure we can write to destination
if (destination.exists() && !destination.canWrite()) {
String message = "Unable to open file " + destination + " for writing.";
throw new IOException(message);
}
FileInputStream input = null;
FileOutputStream output = null;
try {
input = new FileInputStream(source);
output = new FileOutputStream(destination);
int count = 0;
int n = 0;
int rcnt = 0;
while (-1 != (n = input.read(buffer))) {
output.write(buffer, 0, n);
count += n;
rcnt++;
/***
// reserve every 4.6875 MB
if (rcnt == 150) {
rcnt = 0;
delPolicy.setReserveDuration(indexCommit.getVersion(), reserveTime);
}
***/
}
} finally {
try {
IOUtils.closeQuietly(input);
} finally {
IOUtils.closeQuietly(output);
}
}
if (source.length() != destination.length()) {
String message = "Failed to copy full contents from " + source + " to "
+ destination;
throw new IOException(message);
}
if (preserveFileDate) {
// file copy should preserve file date
destination.setLastModified(source.lastModified());
}
}
}
}

View File

@ -16,7 +16,11 @@
*/
package org.apache.solr.handler;
import junit.framework.TestCase;
import org.apache.commons.io.IOUtils;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.MatchAllDocsQuery;
import org.apache.lucene.search.TopDocs;
import org.apache.lucene.store.SimpleFSDirectory;
import org.apache.solr.TestDistributedSearch;
import org.apache.solr.client.solrj.SolrServer;
import org.apache.solr.client.solrj.SolrServerException;
@ -451,7 +455,106 @@ public class TestReplicationHandler extends AbstractSolrTestCase {
slaveQueryResult = (SolrDocumentList) slaveQueryRsp.get("response");
assertEquals(0, slaveQueryResult.getNumFound());
}
public void testBackup() throws Exception {
masterJetty.stop();
copyFile(new File(CONF_DIR + "solrconfig-master1.xml"), new File(master.getConfDir(), "solrconfig.xml"));
masterJetty = createJetty(master);
masterClient = createNewSolrServer(masterJetty.getLocalPort());
//add 500 docs to master
for (int i = 0; i < 500; i++)
index(masterClient, "id", i, "name", "name = " + i);
masterClient.commit();
class BackupThread extends Thread {
volatile String fail = null;
public void run() {
String masterUrl = "http://localhost:" + masterJetty.getLocalPort() + "/solr/replication?command=" + ReplicationHandler.CMD_BACKUP;
URL url;
InputStream stream = null;
try {
url = new URL(masterUrl);
stream = url.openStream();
stream.close();
} catch (Exception e) {
fail = e.getMessage();
} finally {
IOUtils.closeQuietly(stream);
}
};
};
BackupThread backupThread = new BackupThread();
backupThread.start();
File dataDir = new File(master.getDataDir());
class CheckStatus extends Thread {
volatile String fail = null;
volatile String response = null;
volatile boolean success = false;
public void run() {
String masterUrl = "http://localhost:" + masterJetty.getLocalPort() + "/solr/replication?command=" + ReplicationHandler.CMD_DETAILS;
URL url;
InputStream stream = null;
try {
url = new URL(masterUrl);
stream = url.openStream();
response = IOUtils.toString(stream);
if(response.contains("<str name=\"status\">success</str>")) {
success = true;
}
stream.close();
} catch (Exception e) {
fail = e.getMessage();
} finally {
IOUtils.closeQuietly(stream);
}
};
};
int waitCnt = 0;
CheckStatus checkStatus = new CheckStatus();
while(true) {
checkStatus.run();
if(checkStatus.fail != null) {
fail(checkStatus.fail);
}
if(checkStatus.success) {
break;
}
Thread.sleep(200);
if(waitCnt == 10) {
fail("Backup success not detected:" + checkStatus.response);
}
waitCnt++;
}
if(backupThread.fail != null) {
fail(backupThread.fail);
}
File[] files = dataDir.listFiles(new FilenameFilter() {
public boolean accept(File dir, String name) {
if(name.startsWith("snapshot")) {
return true;
}
return false;
}
});
assertEquals(1, files.length);
File snapDir = files[0];
IndexSearcher searcher = new IndexSearcher(new SimpleFSDirectory(snapDir.getAbsoluteFile(), null), true);
TopDocs hits = searcher.search(new MatchAllDocsQuery(), 1);
assertEquals(500, hits.totalHits);
}
/* character copy of file using UTF-8 */
void copyFile(File src, File dst) throws IOException {