SOLR-978 -- Old files are not removed from slaves after replication

git-svn-id: https://svn.apache.org/repos/asf/lucene/solr/trunk@737550 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Shalin Shekhar Mangar 2009-01-25 17:52:48 +00:00
parent 645cb098f5
commit b822a6db8b
3 changed files with 43 additions and 25 deletions

View File

@ -219,6 +219,8 @@ Bug Fixes
24. SOLR-902: FastInputStream#read(byte b[], int off, int len) gives incorrect results when amount left to read is less 24. SOLR-902: FastInputStream#read(byte b[], int off, int len) gives incorrect results when amount left to read is less
than buffer size (Noble Paul via shalin) than buffer size (Noble Paul via shalin)
25. SOLR-978: Old files are not removed from slaves after replication (Jaco, Noble Paul, shalin)
Other Changes Other Changes
---------------------- ----------------------

View File

@ -27,6 +27,7 @@ import org.apache.solr.core.SolrCore;
import static org.apache.solr.handler.ReplicationHandler.*; import static org.apache.solr.handler.ReplicationHandler.*;
import org.apache.solr.search.SolrIndexSearcher; import org.apache.solr.search.SolrIndexSearcher;
import org.apache.solr.update.CommitUpdateCommand; import org.apache.solr.update.CommitUpdateCommand;
import org.apache.solr.update.DirectUpdateHandler2;
import org.apache.solr.util.RefCounted; import org.apache.solr.util.RefCounted;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -281,14 +282,14 @@ public class SnapPuller {
replicationStartTime = 0; replicationStartTime = 0;
return successfulInstall; return successfulInstall;
} catch (ReplicationHandlerException e) { } catch (ReplicationHandlerException e) {
delTree(tmpIndexDir);
LOG.error("User aborted Replication"); LOG.error("User aborted Replication");
} catch (SolrException e) { } catch (SolrException e) {
delTree(tmpIndexDir);
throw e; throw e;
} catch (Exception e) { } catch (Exception e) {
delTree(tmpIndexDir); delTree(tmpIndexDir);
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Snappull failed : ", e); throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Snappull failed : ", e);
} finally {
delTree(tmpIndexDir);
} }
return successfulInstall; return successfulInstall;
} finally { } finally {
@ -349,6 +350,14 @@ public class SnapPuller {
cmd.waitFlush = true; cmd.waitFlush = true;
cmd.waitSearcher = true; cmd.waitSearcher = true;
solrCore.getUpdateHandler().commit(cmd); solrCore.getUpdateHandler().commit(cmd);
if (solrCore.getUpdateHandler() instanceof DirectUpdateHandler2) {
LOG.info("Force open index writer to make sure older index files get deleted");
DirectUpdateHandler2 handler = (DirectUpdateHandler2) solrCore.getUpdateHandler();
handler.forceOpenWriter();
} else {
LOG.warn("The update handler is not an instance or sub-class of DirectUpdateHandler2. " +
"ReplicationHandler may not be able to cleanup un-used index files.");
}
} }

View File

@ -187,7 +187,7 @@ public class DirectUpdateHandler2 extends UpdateHandler {
addCommands.incrementAndGet(); addCommands.incrementAndGet();
addCommandsCumulative.incrementAndGet(); addCommandsCumulative.incrementAndGet();
int rc=-1; int rc=-1;
// if there is no ID field, use allowDups // if there is no ID field, use allowDups
if( idField == null ) { if( idField == null ) {
cmd.allowDups = true; cmd.allowDups = true;
@ -259,7 +259,7 @@ public class DirectUpdateHandler2 extends UpdateHandler {
} finally { } finally {
iwCommit.unlock(); iwCommit.unlock();
} }
if( tracker.timeUpperBound > 0 ) { if( tracker.timeUpperBound > 0 ) {
tracker.scheduleCommitWithin( tracker.timeUpperBound ); tracker.scheduleCommitWithin( tracker.timeUpperBound );
} }
@ -294,7 +294,7 @@ public class DirectUpdateHandler2 extends UpdateHandler {
deleteAll(); deleteAll();
} else { } else {
openWriter(); openWriter();
writer.deleteDocuments(q); writer.deleteDocuments(q);
} }
} finally { } finally {
iwCommit.unlock(); iwCommit.unlock();
@ -313,7 +313,14 @@ public class DirectUpdateHandler2 extends UpdateHandler {
} }
} }
public void forceOpenWriter() throws IOException {
iwCommit.lock();
try {
openWriter();
} finally {
iwCommit.unlock();
}
}
public void commit(CommitUpdateCommand cmd) throws IOException { public void commit(CommitUpdateCommand cmd) throws IOException {
@ -419,14 +426,14 @@ public class DirectUpdateHandler2 extends UpdateHandler {
tracker.pending.cancel( true ); tracker.pending.cancel( true );
tracker.pending = null; tracker.pending = null;
} }
tracker.scheduler.shutdown(); tracker.scheduler.shutdown();
closeWriter(); closeWriter();
} finally { } finally {
iwCommit.unlock(); iwCommit.unlock();
} }
log.info("closed " + this); log.info("closed " + this);
} }
/** Helper class for tracking autoCommit state. /** Helper class for tracking autoCommit state.
* *
* Note: This is purely an implementation detail of autoCommit and will * Note: This is purely an implementation detail of autoCommit and will
@ -435,8 +442,8 @@ public class DirectUpdateHandler2 extends UpdateHandler {
* *
* Note: all access must be synchronized. * Note: all access must be synchronized.
*/ */
class CommitTracker implements Runnable class CommitTracker implements Runnable
{ {
// scheduler delay for maxDoc-triggered autocommits // scheduler delay for maxDoc-triggered autocommits
public final int DOC_COMMIT_DELAY_MS = 250; public final int DOC_COMMIT_DELAY_MS = 250;
@ -447,12 +454,12 @@ public class DirectUpdateHandler2 extends UpdateHandler {
private final ScheduledExecutorService scheduler = private final ScheduledExecutorService scheduler =
Executors.newScheduledThreadPool(1); Executors.newScheduledThreadPool(1);
private ScheduledFuture pending; private ScheduledFuture pending;
// state // state
long docsSinceCommit; long docsSinceCommit;
int autoCommitCount = 0; int autoCommitCount = 0;
long lastAddedTime = -1; long lastAddedTime = -1;
public CommitTracker() { public CommitTracker() {
docsSinceCommit = 0; docsSinceCommit = 0;
pending = null; pending = null;
@ -464,27 +471,27 @@ public class DirectUpdateHandler2 extends UpdateHandler {
} }
/** schedule individual commits */ /** schedule individual commits */
public synchronized void scheduleCommitWithin(long commitMaxTime) public synchronized void scheduleCommitWithin(long commitMaxTime)
{ {
_scheduleCommitWithin( commitMaxTime ); _scheduleCommitWithin( commitMaxTime );
} }
private void _scheduleCommitWithin(long commitMaxTime) private void _scheduleCommitWithin(long commitMaxTime)
{ {
// Check if there is a commit already scheduled for longer then this time // Check if there is a commit already scheduled for longer then this time
if( pending != null && if( pending != null &&
pending.getDelay(TimeUnit.MILLISECONDS) >= commitMaxTime ) pending.getDelay(TimeUnit.MILLISECONDS) >= commitMaxTime )
{ {
pending.cancel(false); pending.cancel(false);
pending = null; pending = null;
} }
// schedule a new commit // schedule a new commit
if( pending == null ) { if( pending == null ) {
pending = scheduler.schedule( this, commitMaxTime, TimeUnit.MILLISECONDS ); pending = scheduler.schedule( this, commitMaxTime, TimeUnit.MILLISECONDS );
} }
} }
/** Indicate that documents have been added /** Indicate that documents have been added
*/ */
public void addedDocument( int commitWithin ) { public void addedDocument( int commitWithin ) {
@ -494,7 +501,7 @@ public class DirectUpdateHandler2 extends UpdateHandler {
if( docsUpperBound > 0 && (docsSinceCommit > docsUpperBound) ) { if( docsUpperBound > 0 && (docsSinceCommit > docsUpperBound) ) {
_scheduleCommitWithin( DOC_COMMIT_DELAY_MS ); _scheduleCommitWithin( DOC_COMMIT_DELAY_MS );
} }
// maxTime-triggered autoCommit // maxTime-triggered autoCommit
long ctime = (commitWithin>0) ? commitWithin : timeUpperBound; long ctime = (commitWithin>0) ? commitWithin : timeUpperBound;
if( ctime > 0 ) { if( ctime > 0 ) {
@ -530,7 +537,7 @@ public class DirectUpdateHandler2 extends UpdateHandler {
//no need for command.maxOptimizeSegments = 1; since it is not optimizing //no need for command.maxOptimizeSegments = 1; since it is not optimizing
commit( command ); commit( command );
autoCommitCount++; autoCommitCount++;
} }
catch (Exception e) { catch (Exception e) {
log.error( "auto commit error..." ); log.error( "auto commit error..." );
e.printStackTrace(); e.printStackTrace();
@ -555,7 +562,7 @@ public class DirectUpdateHandler2 extends UpdateHandler {
public String toString() { public String toString() {
if(timeUpperBound > 0 || docsUpperBound > 0) { if(timeUpperBound > 0 || docsUpperBound > 0) {
return return
(timeUpperBound > 0 ? ("if uncommited for " + timeUpperBound + "ms; ") : "") + (timeUpperBound > 0 ? ("if uncommited for " + timeUpperBound + "ms; ") : "") +
(docsUpperBound > 0 ? ("if " + docsUpperBound + " uncommited docs ") : ""); (docsUpperBound > 0 ? ("if " + docsUpperBound + " uncommited docs ") : "");
@ -564,8 +571,8 @@ public class DirectUpdateHandler2 extends UpdateHandler {
} }
} }
} }
///////////////////////////////////////////////////////////////////// /////////////////////////////////////////////////////////////////////
// SolrInfoMBean stuff: Statistics and Module Info // SolrInfoMBean stuff: Statistics and Module Info
///////////////////////////////////////////////////////////////////// /////////////////////////////////////////////////////////////////////