LUCENE-1073: create SnapshotDeletionPolicy to facilitate taking a live backup of an index

git-svn-id: https://svn.apache.org/repos/asf/lucene/java/trunk@601104 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Michael McCandless 2007-12-04 22:03:24 +00:00
parent 72bb91f4ee
commit 5f73506dd3
6 changed files with 329 additions and 21 deletions

View File

@ -236,6 +236,10 @@ New features
10. LUCENE-1040: CharArraySet useful for efficiently checking
set membership of text specified by char[]. (yonik)
11. LUCENE-1073: Created SnapshotDeletionPolicy to facilitate taking a
live backup of an index without pausing indexing. (Mike
McCandless)
Optimizations

View File

@ -267,7 +267,7 @@ final class DocumentsWriter {
/* Returns list of files in use by this instance,
* including any flushed segments. */
List files() {
synchronized List files() {
if (files != null)
return files;

View File

@ -32,6 +32,9 @@ package org.apache.lucene.index;
* index commit point would have a larger N.
*/
import java.util.Collection;
import java.io.IOException;
public interface IndexCommitPoint {
/**
@ -40,6 +43,11 @@ public interface IndexCommitPoint {
*/
public String getSegmentsFileName();
/**
* Returns all index files referenced by this commit point.
*/
public Collection getFileNames() throws IOException;
/**
* Delete this commit point.
* <p>

View File

@ -28,6 +28,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Collection;
/*
* This class keeps track of each SegmentInfos instance that
@ -263,9 +264,8 @@ final class IndexFileDeleter {
}
int size2 = commit.files.size();
for(int j=0;j<size2;j++) {
decRef((List) commit.files.get(j));
decRef((String) commit.files.get(j));
}
decRef(commit.getSegmentsFileName());
}
commitsToDelete.clear();
@ -535,23 +535,6 @@ final class IndexFileDeleter {
}
}
/**
* Blindly delete the files used by the specific segments,
* with no reference counting and no retry. This is only
* currently used by writer to delete its RAM segments
* from a RAMDirectory.
*/
public void deleteDirect(Directory otherDir, List segments) throws IOException {
int size = segments.size();
for(int i=0;i<size;i++) {
List filestoDelete = ((SegmentInfo) segments.get(i)).files();
int size2 = filestoDelete.size();
for(int j=0;j<size2;j++) {
otherDir.deleteFile((String) filestoDelete.get(j));
}
}
}
/**
* Tracks the reference count for a single index file:
*/
@ -587,11 +570,12 @@ final class IndexFileDeleter {
segmentsFileName = segmentInfos.getCurrentSegmentFileName();
int size = segmentInfos.size();
files = new ArrayList(size);
files.add(segmentsFileName);
gen = segmentInfos.getGeneration();
for(int i=0;i<size;i++) {
SegmentInfo segmentInfo = segmentInfos.info(i);
if (segmentInfo.dir == directory) {
files.add(segmentInfo.files());
files.addAll(segmentInfo.files());
}
}
}
@ -603,6 +587,10 @@ final class IndexFileDeleter {
return segmentsFileName;
}
public Collection getFileNames() throws IOException {
return Collections.unmodifiableCollection(files);
}
/**
* Called only be the deletion policy, to remove this
* commit point from the index.

View File

@ -0,0 +1,109 @@
package org.apache.lucene.index;
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import java.util.Collection;
import java.util.List;
import java.util.ArrayList;
import java.io.IOException;
/** A {@link DeletionPolicy} that wraps around any other
* {@link DeletionPolicy} and adds the ability to hold and
* later release a single "snapshot" of an index. While
* the snapshot is held, the {@link IndexWriter} will not
* remove any files associated with it even if the index is
* otherwise being actively, arbitrarily changed. Because
* we wrap another arbitrary {@link DeletionPolicy}, this
* gives you the freedom to continue using whatever {@link
* DeletionPolicy} you would normally want to use with your
* index. */
public class SnapshotDeletionPolicy implements IndexDeletionPolicy {
private IndexCommitPoint lastCommit;
private IndexDeletionPolicy primary;
private IndexCommitPoint snapshot;
public SnapshotDeletionPolicy(IndexDeletionPolicy primary) {
this.primary = primary;
}
public synchronized void onInit(List commits) throws IOException {
primary.onInit(wrapCommits(commits));
lastCommit = (IndexCommitPoint) commits.get(commits.size()-1);
}
public synchronized void onCommit(List commits) throws IOException {
primary.onCommit(wrapCommits(commits));
lastCommit = (IndexCommitPoint) commits.get(commits.size()-1);
}
/** Take a snapshot of the most recent commit to the
* index. You must call release() to free this snapshot.
* Note that while the snapshot is held, the files it
* references will not be deleted, which will consume
* additional disk space in your index. If you take a
* snapshot at a particularly bad time (say just before
* you call optimize()) then in the worst case this could
* consume an extra 1X of your total index size, until
* you release the snapshot. */
public synchronized IndexCommitPoint snapshot() {
if (snapshot == null)
snapshot = lastCommit;
else
throw new IllegalStateException("snapshot is already set; please call release() first");
return snapshot;
}
/** Release the currently held snapshot. */
public synchronized void release() {
if (snapshot != null)
snapshot = null;
else
throw new IllegalStateException("snapshot was not set; please call snapshot() first");
}
private class MyCommitPoint implements IndexCommitPoint {
IndexCommitPoint cp;
MyCommitPoint(IndexCommitPoint cp) {
this.cp = cp;
}
public String getSegmentsFileName() {
return cp.getSegmentsFileName();
}
public Collection getFileNames() throws IOException {
return cp.getFileNames();
}
public void delete() {
synchronized(SnapshotDeletionPolicy.this) {
// Suppress the delete request if this commit point is
// our current snapshot.
if (snapshot != cp)
cp.delete();
}
}
}
private List wrapCommits(List commits) {
final int count = commits.size();
List myCommits = new ArrayList(count);
for(int i=0;i<count;i++)
myCommits.add(new MyCommitPoint((IndexCommitPoint) commits.get(i)));
return myCommits;
}
}

View File

@ -0,0 +1,199 @@
package org.apache.lucene;
// Intentionally not in org.apache.lucene.index, to assert
// that we do not require any package private access.
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import java.util.Iterator;
import java.util.Collection;
import java.io.File;
import java.io.IOException;
import org.apache.lucene.document.Document;
import org.apache.lucene.document.Field;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.FSDirectory;
import org.apache.lucene.store.IndexInput;
import org.apache.lucene.store.MockRAMDirectory;
import org.apache.lucene.analysis.standard.StandardAnalyzer;
import org.apache.lucene.index.IndexCommitPoint;
import org.apache.lucene.index.KeepOnlyLastCommitDeletionPolicy;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.TestIndexWriter;
import org.apache.lucene.index.SnapshotDeletionPolicy;
import org.apache.lucene.util.LuceneTestCase;
import org.apache.lucene.util._TestUtil;
//
// This was developed for Lucene In Action,
// http://lucenebook.com
//
public class TestSnapshotDeletionPolicy extends LuceneTestCase
{
public static final String INDEX_PATH = "test.snapshots";
public void testSnapshotDeletionPolicy() throws IOException {
File dir = new File(System.getProperty("tempDir"), INDEX_PATH);
try {
Directory fsDir = FSDirectory.getDirectory(dir);
runTest(fsDir);
fsDir.close();
} finally {
_TestUtil.rmDir(dir);
}
MockRAMDirectory dir2 = new MockRAMDirectory();
runTest(dir2);
}
private void runTest(Directory dir) throws IOException {
// Run for ~7 seconds
final long stopTime = System.currentTimeMillis() + 7000;
SnapshotDeletionPolicy dp = new SnapshotDeletionPolicy(new KeepOnlyLastCommitDeletionPolicy());
final IndexWriter writer = new IndexWriter(dir, true, new StandardAnalyzer(), dp);
// Force frequent commits
writer.setMaxBufferedDocs(2);
final Thread t = new Thread() {
public void run() {
Document doc = new Document();
doc.add(new Field("content", "aaa", Field.Store.YES, Field.Index.TOKENIZED, Field.TermVector.WITH_POSITIONS_OFFSETS));
while(System.currentTimeMillis() < stopTime) {
for(int i=0;i<27;i++) {
try {
writer.addDocument(doc);
} catch (IOException cie) {
RuntimeException re = new RuntimeException("addDocument failed");
re.initCause(cie);
throw re;
}
}
try {
Thread.sleep(1);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
}
}
}
};
t.start();
// While the above indexing thread is running, take many
// backups:
while(System.currentTimeMillis() < stopTime) {
backupIndex(dir, dp);
try {
Thread.sleep(20);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
}
if (!t.isAlive())
break;
}
try {
t.join();
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
}
// Add one more document to force writer to commit a
// final segment, so deletion policy has a chance to
// delete again:
Document doc = new Document();
doc.add(new Field("content", "aaa", Field.Store.YES, Field.Index.TOKENIZED, Field.TermVector.WITH_POSITIONS_OFFSETS));
writer.addDocument(doc);
// Make sure we don't have any leftover files in the
// directory:
writer.close();
TestIndexWriter.assertNoUnreferencedFiles(dir, "some files were not deleted but should have been");
}
/** Example showing how to use the SnapshotDeletionPolicy
* to take a backup. This method does not really do a
* backup; instead, it reads every byte of every file
* just to test that the files indeed exist and are
* readable even while the index is changing. */
public void backupIndex(Directory dir, SnapshotDeletionPolicy dp) throws IOException {
// To backup an index we first take a snapshot:
IndexCommitPoint cp = dp.snapshot();
try {
// While we hold the snapshot, and nomatter how long
// we take to do the backup, the IndexWriter will
// never delete the files in the snapshot:
Collection files = cp.getFileNames();
Iterator it = files.iterator();
while(it.hasNext()) {
final String fileName = (String) it.next();
// NOTE: in a real backup you would not use
// readFile; you would need to use something else
// that copies the file to a backup location. This
// could even be a spawned shell process (eg "tar",
// "zip") that takes the list of files and builds a
// backup.
readFile(dir, fileName);
}
} finally {
// Make sure to release the snapshot, otherwise these
// files will never be deleted during this IndexWriter
// session:
dp.release();
}
}
byte[] buffer = new byte[4096];
private void readFile(Directory dir, String name) throws IOException {
IndexInput input = dir.openInput(name);
try {
long size = dir.fileLength(name);
long bytesLeft = size;
while (bytesLeft > 0) {
final int numToRead;
if (bytesLeft < buffer.length)
numToRead = (int) bytesLeft;
else
numToRead = buffer.length;
input.readBytes(buffer, 0, numToRead, false);
bytesLeft -= numToRead;
}
// Don't do this in your real backups! This is just
// to force a backup to take a somewhat long time, to
// make sure we are exercising the fact that the
// IndexWriter should not delete this file even when I
// take my time reading it.
try {
Thread.sleep(1);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
}
} finally {
input.close();
}
}
}