mirror of https://github.com/apache/lucene.git
SOLR-10249: Refactor IndexFetcher to return detailed result
This commit is contained in:
parent
fec87fceb5
commit
8664f1f38a
|
@ -138,6 +138,8 @@ Other Changes
|
|||
|
||||
* SOLR-9221: Remove Solr contribs: map-reduce, morphlines-core and morphlines-cell. (Steve Rowe)
|
||||
|
||||
* SOLR-10249: Refactor IndexFetcher.doFetch() to return a more detailed result. (Jeff Miller via David Smiley)
|
||||
|
||||
================== 6.5.0 ==================
|
||||
|
||||
Consult the LUCENE_CHANGES.txt file for additional, low level, changes in this release.
|
||||
|
|
|
@ -219,7 +219,7 @@ public class RecoveryStrategy extends Thread implements Closeable {
|
|||
solrParams.set(ReplicationHandler.MASTER_URL, leaderUrl);
|
||||
|
||||
if (isClosed()) return; // we check closed on return
|
||||
boolean success = replicationHandler.doFetch(solrParams, false);
|
||||
boolean success = replicationHandler.doFetch(solrParams, false).getSuccessful();
|
||||
|
||||
if (!success) {
|
||||
throw new SolrException(ErrorCode.SERVER_ERROR, "Replication for recovery failed.");
|
||||
|
|
|
@ -754,7 +754,7 @@ public class CdcrRequestHandler extends RequestHandlerBase implements SolrCoreAw
|
|||
// we do not want the raw tlog files from the source
|
||||
solrParams.set(ReplicationHandler.TLOG_FILES, false);
|
||||
|
||||
success = replicationHandler.doFetch(solrParams, false);
|
||||
success = replicationHandler.doFetch(solrParams, false).getSuccessful();
|
||||
|
||||
// this is required because this callable can race with HttpSolrCall#destroy
|
||||
// which clears the request info.
|
||||
|
|
|
@ -104,6 +104,8 @@ import static org.apache.solr.common.params.CommonParams.JAVABIN;
|
|||
import static org.apache.solr.common.params.CommonParams.NAME;
|
||||
import static org.apache.solr.handler.ReplicationHandler.*;
|
||||
|
||||
import com.google.common.base.Strings;
|
||||
|
||||
/**
|
||||
* <p> Provides functionality of downloading changed index files as well as config files and a timer for scheduling fetches from the
|
||||
* master. </p>
|
||||
|
@ -161,6 +163,52 @@ public class IndexFetcher {
|
|||
|
||||
private Integer soTimeout;
|
||||
|
||||
private static final String INTERRUPT_RESPONSE_MESSAGE = "Interrupted while waiting for modify lock";
|
||||
|
||||
public static class IndexFetchResult {
|
||||
private final String message;
|
||||
private final boolean successful;
|
||||
private final Throwable exception;
|
||||
|
||||
public static final String FAILED_BY_INTERRUPT_MESSAGE = "Fetching index failed by interrupt";
|
||||
public static final String FAILED_BY_EXCEPTION_MESSAGE = "Fetching index failed by exception";
|
||||
|
||||
/** pre-defined results */
|
||||
public static final IndexFetchResult ALREADY_IN_SYNC = new IndexFetchResult("Local index commit is already in sync with peer", true, null);
|
||||
public static final IndexFetchResult INDEX_FETCH_FAILURE = new IndexFetchResult("Fetching lastest index is failed", false, null);
|
||||
public static final IndexFetchResult INDEX_FETCH_SUCCESS = new IndexFetchResult("Fetching latest index is successful", true, null);
|
||||
public static final IndexFetchResult LOCK_OBTAIN_FAILED = new IndexFetchResult("Obtaining SnapPuller lock failed", false, null);
|
||||
public static final IndexFetchResult MASTER_VERSION_ZERO = new IndexFetchResult("Index in peer is empty and never committed yet", true, null);
|
||||
public static final IndexFetchResult NO_INDEX_COMMIT_EXIST = new IndexFetchResult("No IndexCommit in local index", false, null);
|
||||
public static final IndexFetchResult PEER_INDEX_COMMIT_DELETED = new IndexFetchResult("No files to download because IndexCommit in peer was deleted", false, null);
|
||||
public static final IndexFetchResult LOCAL_ACTIVITY_DURING_REPLICATION = new IndexFetchResult("Local index modification during replication", false, null);
|
||||
public static final IndexFetchResult EXPECTING_NON_LEADER = new IndexFetchResult("Replicating from leader but I'm the shard leader", false, null);
|
||||
|
||||
IndexFetchResult(String message, boolean successful, Throwable exception) {
|
||||
this.message = message;
|
||||
this.successful = successful;
|
||||
this.exception = exception;
|
||||
}
|
||||
|
||||
/*
|
||||
* @return exception thrown if failed by exception or interrupt, otherwise null
|
||||
*/
|
||||
public Throwable getException() {
|
||||
return this.exception;
|
||||
}
|
||||
|
||||
/*
|
||||
* @return true if index fetch was successful, false otherwise
|
||||
*/
|
||||
public boolean getSuccessful() {
|
||||
return this.successful;
|
||||
}
|
||||
|
||||
public String getMessage() {
|
||||
return this.message;
|
||||
}
|
||||
}
|
||||
|
||||
private static HttpClient createHttpClient(SolrCore core, String httpBasicAuthUser, String httpBasicAuthPassword, boolean useCompression) {
|
||||
final ModifiableSolrParams httpClientParams = new ModifiableSolrParams();
|
||||
httpClientParams.set(HttpClientUtil.PROP_BASIC_AUTH_USER, httpBasicAuthUser);
|
||||
|
@ -274,7 +322,7 @@ public class IndexFetcher {
|
|||
}
|
||||
}
|
||||
|
||||
boolean fetchLatestIndex(boolean forceReplication) throws IOException, InterruptedException {
|
||||
IndexFetchResult fetchLatestIndex(boolean forceReplication) throws IOException, InterruptedException {
|
||||
return fetchLatestIndex(forceReplication, false);
|
||||
}
|
||||
|
||||
|
@ -287,7 +335,7 @@ public class IndexFetcher {
|
|||
* @return true on success, false if slave is already in sync
|
||||
* @throws IOException if an exception occurs
|
||||
*/
|
||||
boolean fetchLatestIndex(boolean forceReplication, boolean forceCoreReload) throws IOException, InterruptedException {
|
||||
IndexFetchResult fetchLatestIndex(boolean forceReplication, boolean forceCoreReload) throws IOException, InterruptedException {
|
||||
|
||||
boolean cleanupDone = false;
|
||||
boolean successfulInstall = false;
|
||||
|
@ -311,7 +359,7 @@ public class IndexFetcher {
|
|||
Replica replica = getLeaderReplica();
|
||||
CloudDescriptor cd = solrCore.getCoreDescriptor().getCloudDescriptor();
|
||||
if (cd.getCoreNodeName().equals(replica.getName())) {
|
||||
return false;
|
||||
return IndexFetchResult.EXPECTING_NON_LEADER;
|
||||
}
|
||||
masterUrl = replica.getCoreUrl();
|
||||
LOG.info("Updated masterUrl to " + masterUrl);
|
||||
|
@ -321,9 +369,16 @@ public class IndexFetcher {
|
|||
try {
|
||||
response = getLatestVersion();
|
||||
} catch (Exception e) {
|
||||
LOG.error("Master at: " + masterUrl + " is not available. Index fetch failed. Exception: " + e.getMessage());
|
||||
return false;
|
||||
}
|
||||
final String errorMsg = e.toString();
|
||||
if (!Strings.isNullOrEmpty(errorMsg) && errorMsg.contains(INTERRUPT_RESPONSE_MESSAGE)) {
|
||||
LOG.warn("Master at: " + masterUrl + " is not available. Index fetch failed by interrupt. Exception: " + errorMsg);
|
||||
return new IndexFetchResult(IndexFetchResult.FAILED_BY_INTERRUPT_MESSAGE, false, e);
|
||||
} else {
|
||||
LOG.warn("Master at: " + masterUrl + " is not available. Index fetch failed by exception: " + errorMsg);
|
||||
return new IndexFetchResult(IndexFetchResult.FAILED_BY_EXCEPTION_MESSAGE, false, e);
|
||||
}
|
||||
}
|
||||
|
||||
long latestVersion = (Long) response.get(CMD_INDEX_VERSION);
|
||||
long latestGeneration = (Long) response.get(GENERATION);
|
||||
|
||||
|
@ -339,7 +394,7 @@ public class IndexFetcher {
|
|||
searcherRefCounted = solrCore.getNewestSearcher(false);
|
||||
if (searcherRefCounted == null) {
|
||||
LOG.warn("No open searcher found - fetch aborted");
|
||||
return false;
|
||||
return IndexFetchResult.NO_INDEX_COMMIT_EXIST;
|
||||
}
|
||||
commit = searcherRefCounted.get().getIndexReader().getIndexCommit();
|
||||
} finally {
|
||||
|
@ -367,7 +422,7 @@ public class IndexFetcher {
|
|||
|
||||
//there is nothing to be replicated
|
||||
successfulInstall = true;
|
||||
return true;
|
||||
return IndexFetchResult.MASTER_VERSION_ZERO;
|
||||
}
|
||||
|
||||
// TODO: Should we be comparing timestamps (across machines) here?
|
||||
|
@ -375,14 +430,14 @@ public class IndexFetcher {
|
|||
//master and slave are already in sync just return
|
||||
LOG.info("Slave in sync with master.");
|
||||
successfulInstall = true;
|
||||
return true;
|
||||
return IndexFetchResult.ALREADY_IN_SYNC;
|
||||
}
|
||||
LOG.info("Starting replication process");
|
||||
// get the list of files first
|
||||
fetchFileList(latestGeneration);
|
||||
// this can happen if the commit point is deleted before we fetch the file list.
|
||||
if (filesToDownload.isEmpty()) {
|
||||
return false;
|
||||
return IndexFetchResult.PEER_INDEX_COMMIT_DELETED;
|
||||
}
|
||||
LOG.info("Number of files in latest index in master: " + filesToDownload.size());
|
||||
if (tlogFilesToDownload != null) {
|
||||
|
@ -561,14 +616,14 @@ public class IndexFetcher {
|
|||
LOG.warn(
|
||||
"Replication attempt was not successful - trying a full index replication reloadCore={}",
|
||||
reloadCore);
|
||||
successfulInstall = fetchLatestIndex(true, reloadCore);
|
||||
successfulInstall = fetchLatestIndex(true, reloadCore).getSuccessful();
|
||||
}
|
||||
|
||||
markReplicationStop();
|
||||
return successfulInstall;
|
||||
return successfulInstall ? IndexFetchResult.INDEX_FETCH_SUCCESS : IndexFetchResult.INDEX_FETCH_FAILURE;
|
||||
} catch (ReplicationHandlerException e) {
|
||||
LOG.error("User aborted Replication");
|
||||
return false;
|
||||
return new IndexFetchResult(IndexFetchResult.FAILED_BY_EXCEPTION_MESSAGE, false, e);
|
||||
} catch (SolrException e) {
|
||||
throw e;
|
||||
} catch (InterruptedException e) {
|
||||
|
|
|
@ -90,6 +90,7 @@ import org.apache.solr.core.SolrEventListener;
|
|||
import org.apache.solr.core.backup.repository.BackupRepository;
|
||||
import org.apache.solr.core.backup.repository.LocalFileSystemRepository;
|
||||
import org.apache.solr.core.snapshots.SolrSnapshotMetaDataManager;
|
||||
import org.apache.solr.handler.IndexFetcher.IndexFetchResult;
|
||||
import org.apache.solr.request.SolrQueryRequest;
|
||||
import org.apache.solr.response.SolrQueryResponse;
|
||||
import org.apache.solr.search.SolrIndexSearcher;
|
||||
|
@ -392,10 +393,10 @@ public class ReplicationHandler extends RequestHandlerBase implements SolrCoreAw
|
|||
|
||||
private volatile IndexFetcher currentIndexFetcher;
|
||||
|
||||
public boolean doFetch(SolrParams solrParams, boolean forceReplication) {
|
||||
public IndexFetchResult doFetch(SolrParams solrParams, boolean forceReplication) {
|
||||
String masterUrl = solrParams == null ? null : solrParams.get(MASTER_URL);
|
||||
if (!indexFetchLock.tryLock())
|
||||
return false;
|
||||
return IndexFetchResult.LOCK_OBTAIN_FAILED;
|
||||
try {
|
||||
if (masterUrl != null) {
|
||||
if (currentIndexFetcher != null && currentIndexFetcher != pollingIndexFetcher) {
|
||||
|
@ -411,17 +412,16 @@ public class ReplicationHandler extends RequestHandlerBase implements SolrCoreAw
|
|||
if (currentIndexFetcher != pollingIndexFetcher) {
|
||||
currentIndexFetcher.destroy();
|
||||
}
|
||||
return new IndexFetchResult(IndexFetchResult.FAILED_BY_EXCEPTION_MESSAGE, false, e);
|
||||
} finally {
|
||||
if (pollingIndexFetcher != null) {
|
||||
if( currentIndexFetcher != pollingIndexFetcher) {
|
||||
currentIndexFetcher.destroy();
|
||||
}
|
||||
|
||||
currentIndexFetcher = pollingIndexFetcher;
|
||||
}
|
||||
indexFetchLock.unlock();
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
boolean isReplicating() {
|
||||
|
@ -1151,7 +1151,7 @@ public class ReplicationHandler extends RequestHandlerBase implements SolrCoreAw
|
|||
try {
|
||||
LOG.debug("Polling for index modifications");
|
||||
markScheduledExecutionStart();
|
||||
boolean pollSuccess = doFetch(null, false);
|
||||
boolean pollSuccess = doFetch(null, false).getSuccessful();
|
||||
if (pollListener != null) pollListener.onComplete(core, pollSuccess);
|
||||
} catch (Exception e) {
|
||||
LOG.error("Exception in fetching index", e);
|
||||
|
|
Loading…
Reference in New Issue