From 8664f1f38a3741a7aff5988221cb7a1a7dda9e5b Mon Sep 17 00:00:00 2001 From: David Smiley Date: Fri, 24 Mar 2017 22:46:24 -0400 Subject: [PATCH] SOLR-10249: Refactor IndexFetcher to return detailed result --- solr/CHANGES.txt | 2 + .../apache/solr/cloud/RecoveryStrategy.java | 2 +- .../solr/handler/CdcrRequestHandler.java | 2 +- .../org/apache/solr/handler/IndexFetcher.java | 81 ++++++++++++++++--- .../solr/handler/ReplicationHandler.java | 10 +-- 5 files changed, 77 insertions(+), 20 deletions(-) diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt index 428bde46afa..f2937bfcced 100644 --- a/solr/CHANGES.txt +++ b/solr/CHANGES.txt @@ -138,6 +138,8 @@ Other Changes * SOLR-9221: Remove Solr contribs: map-reduce, morphlines-core and morphlines-cell. (Steve Rowe) +* SOLR-10249: Refactor IndexFetcher.doFetch() to return a more detailed result. (Jeff Miller via David Smiley) + ================== 6.5.0 ================== Consult the LUCENE_CHANGES.txt file for additional, low level, changes in this release. diff --git a/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java b/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java index cb6c69c8aa9..2cbc3945607 100644 --- a/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java +++ b/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java @@ -219,7 +219,7 @@ public class RecoveryStrategy extends Thread implements Closeable { solrParams.set(ReplicationHandler.MASTER_URL, leaderUrl); if (isClosed()) return; // we check closed on return - boolean success = replicationHandler.doFetch(solrParams, false); + boolean success = replicationHandler.doFetch(solrParams, false).getSuccessful(); if (!success) { throw new SolrException(ErrorCode.SERVER_ERROR, "Replication for recovery failed."); diff --git a/solr/core/src/java/org/apache/solr/handler/CdcrRequestHandler.java b/solr/core/src/java/org/apache/solr/handler/CdcrRequestHandler.java index fcc4bbef4e0..ba174f95256 100644 --- a/solr/core/src/java/org/apache/solr/handler/CdcrRequestHandler.java +++ b/solr/core/src/java/org/apache/solr/handler/CdcrRequestHandler.java @@ -754,7 +754,7 @@ public class CdcrRequestHandler extends RequestHandlerBase implements SolrCoreAw // we do not want the raw tlog files from the source solrParams.set(ReplicationHandler.TLOG_FILES, false); - success = replicationHandler.doFetch(solrParams, false); + success = replicationHandler.doFetch(solrParams, false).getSuccessful(); // this is required because this callable can race with HttpSolrCall#destroy // which clears the request info. diff --git a/solr/core/src/java/org/apache/solr/handler/IndexFetcher.java b/solr/core/src/java/org/apache/solr/handler/IndexFetcher.java index 33e80913e68..5efb6c5fe58 100644 --- a/solr/core/src/java/org/apache/solr/handler/IndexFetcher.java +++ b/solr/core/src/java/org/apache/solr/handler/IndexFetcher.java @@ -104,6 +104,8 @@ import static org.apache.solr.common.params.CommonParams.JAVABIN; import static org.apache.solr.common.params.CommonParams.NAME; import static org.apache.solr.handler.ReplicationHandler.*; +import com.google.common.base.Strings; + /** *

Provides functionality of downloading changed index files as well as config files and a timer for scheduling fetches from the * master.

@@ -161,6 +163,52 @@ public class IndexFetcher { private Integer soTimeout; + private static final String INTERRUPT_RESPONSE_MESSAGE = "Interrupted while waiting for modify lock"; + + public static class IndexFetchResult { + private final String message; + private final boolean successful; + private final Throwable exception; + + public static final String FAILED_BY_INTERRUPT_MESSAGE = "Fetching index failed by interrupt"; + public static final String FAILED_BY_EXCEPTION_MESSAGE = "Fetching index failed by exception"; + + /** pre-defined results */ + public static final IndexFetchResult ALREADY_IN_SYNC = new IndexFetchResult("Local index commit is already in sync with peer", true, null); + public static final IndexFetchResult INDEX_FETCH_FAILURE = new IndexFetchResult("Fetching lastest index is failed", false, null); + public static final IndexFetchResult INDEX_FETCH_SUCCESS = new IndexFetchResult("Fetching latest index is successful", true, null); + public static final IndexFetchResult LOCK_OBTAIN_FAILED = new IndexFetchResult("Obtaining SnapPuller lock failed", false, null); + public static final IndexFetchResult MASTER_VERSION_ZERO = new IndexFetchResult("Index in peer is empty and never committed yet", true, null); + public static final IndexFetchResult NO_INDEX_COMMIT_EXIST = new IndexFetchResult("No IndexCommit in local index", false, null); + public static final IndexFetchResult PEER_INDEX_COMMIT_DELETED = new IndexFetchResult("No files to download because IndexCommit in peer was deleted", false, null); + public static final IndexFetchResult LOCAL_ACTIVITY_DURING_REPLICATION = new IndexFetchResult("Local index modification during replication", false, null); + public static final IndexFetchResult EXPECTING_NON_LEADER = new IndexFetchResult("Replicating from leader but I'm the shard leader", false, null); + + IndexFetchResult(String message, boolean successful, Throwable exception) { + this.message = message; + this.successful = successful; + this.exception = exception; + } + + /* + * @return exception thrown if failed by exception or interrupt, otherwise null + */ + public Throwable getException() { + return this.exception; + } + + /* + * @return true if index fetch was successful, false otherwise + */ + public boolean getSuccessful() { + return this.successful; + } + + public String getMessage() { + return this.message; + } + } + private static HttpClient createHttpClient(SolrCore core, String httpBasicAuthUser, String httpBasicAuthPassword, boolean useCompression) { final ModifiableSolrParams httpClientParams = new ModifiableSolrParams(); httpClientParams.set(HttpClientUtil.PROP_BASIC_AUTH_USER, httpBasicAuthUser); @@ -274,7 +322,7 @@ public class IndexFetcher { } } - boolean fetchLatestIndex(boolean forceReplication) throws IOException, InterruptedException { + IndexFetchResult fetchLatestIndex(boolean forceReplication) throws IOException, InterruptedException { return fetchLatestIndex(forceReplication, false); } @@ -287,7 +335,7 @@ public class IndexFetcher { * @return true on success, false if slave is already in sync * @throws IOException if an exception occurs */ - boolean fetchLatestIndex(boolean forceReplication, boolean forceCoreReload) throws IOException, InterruptedException { + IndexFetchResult fetchLatestIndex(boolean forceReplication, boolean forceCoreReload) throws IOException, InterruptedException { boolean cleanupDone = false; boolean successfulInstall = false; @@ -311,7 +359,7 @@ public class IndexFetcher { Replica replica = getLeaderReplica(); CloudDescriptor cd = solrCore.getCoreDescriptor().getCloudDescriptor(); if (cd.getCoreNodeName().equals(replica.getName())) { - return false; + return IndexFetchResult.EXPECTING_NON_LEADER; } masterUrl = replica.getCoreUrl(); LOG.info("Updated masterUrl to " + masterUrl); @@ -321,9 +369,16 @@ public class IndexFetcher { try { response = getLatestVersion(); } catch (Exception e) { - LOG.error("Master at: " + masterUrl + " is not available. Index fetch failed. Exception: " + e.getMessage()); - return false; - } + final String errorMsg = e.toString(); + if (!Strings.isNullOrEmpty(errorMsg) && errorMsg.contains(INTERRUPT_RESPONSE_MESSAGE)) { + LOG.warn("Master at: " + masterUrl + " is not available. Index fetch failed by interrupt. Exception: " + errorMsg); + return new IndexFetchResult(IndexFetchResult.FAILED_BY_INTERRUPT_MESSAGE, false, e); + } else { + LOG.warn("Master at: " + masterUrl + " is not available. Index fetch failed by exception: " + errorMsg); + return new IndexFetchResult(IndexFetchResult.FAILED_BY_EXCEPTION_MESSAGE, false, e); + } + } + long latestVersion = (Long) response.get(CMD_INDEX_VERSION); long latestGeneration = (Long) response.get(GENERATION); @@ -339,7 +394,7 @@ public class IndexFetcher { searcherRefCounted = solrCore.getNewestSearcher(false); if (searcherRefCounted == null) { LOG.warn("No open searcher found - fetch aborted"); - return false; + return IndexFetchResult.NO_INDEX_COMMIT_EXIST; } commit = searcherRefCounted.get().getIndexReader().getIndexCommit(); } finally { @@ -367,7 +422,7 @@ public class IndexFetcher { //there is nothing to be replicated successfulInstall = true; - return true; + return IndexFetchResult.MASTER_VERSION_ZERO; } // TODO: Should we be comparing timestamps (across machines) here? @@ -375,14 +430,14 @@ public class IndexFetcher { //master and slave are already in sync just return LOG.info("Slave in sync with master."); successfulInstall = true; - return true; + return IndexFetchResult.ALREADY_IN_SYNC; } LOG.info("Starting replication process"); // get the list of files first fetchFileList(latestGeneration); // this can happen if the commit point is deleted before we fetch the file list. if (filesToDownload.isEmpty()) { - return false; + return IndexFetchResult.PEER_INDEX_COMMIT_DELETED; } LOG.info("Number of files in latest index in master: " + filesToDownload.size()); if (tlogFilesToDownload != null) { @@ -561,14 +616,14 @@ public class IndexFetcher { LOG.warn( "Replication attempt was not successful - trying a full index replication reloadCore={}", reloadCore); - successfulInstall = fetchLatestIndex(true, reloadCore); + successfulInstall = fetchLatestIndex(true, reloadCore).getSuccessful(); } markReplicationStop(); - return successfulInstall; + return successfulInstall ? IndexFetchResult.INDEX_FETCH_SUCCESS : IndexFetchResult.INDEX_FETCH_FAILURE; } catch (ReplicationHandlerException e) { LOG.error("User aborted Replication"); - return false; + return new IndexFetchResult(IndexFetchResult.FAILED_BY_EXCEPTION_MESSAGE, false, e); } catch (SolrException e) { throw e; } catch (InterruptedException e) { diff --git a/solr/core/src/java/org/apache/solr/handler/ReplicationHandler.java b/solr/core/src/java/org/apache/solr/handler/ReplicationHandler.java index 4f6a408d533..98bf11ab376 100644 --- a/solr/core/src/java/org/apache/solr/handler/ReplicationHandler.java +++ b/solr/core/src/java/org/apache/solr/handler/ReplicationHandler.java @@ -90,6 +90,7 @@ import org.apache.solr.core.SolrEventListener; import org.apache.solr.core.backup.repository.BackupRepository; import org.apache.solr.core.backup.repository.LocalFileSystemRepository; import org.apache.solr.core.snapshots.SolrSnapshotMetaDataManager; +import org.apache.solr.handler.IndexFetcher.IndexFetchResult; import org.apache.solr.request.SolrQueryRequest; import org.apache.solr.response.SolrQueryResponse; import org.apache.solr.search.SolrIndexSearcher; @@ -392,10 +393,10 @@ public class ReplicationHandler extends RequestHandlerBase implements SolrCoreAw private volatile IndexFetcher currentIndexFetcher; - public boolean doFetch(SolrParams solrParams, boolean forceReplication) { + public IndexFetchResult doFetch(SolrParams solrParams, boolean forceReplication) { String masterUrl = solrParams == null ? null : solrParams.get(MASTER_URL); if (!indexFetchLock.tryLock()) - return false; + return IndexFetchResult.LOCK_OBTAIN_FAILED; try { if (masterUrl != null) { if (currentIndexFetcher != null && currentIndexFetcher != pollingIndexFetcher) { @@ -411,17 +412,16 @@ public class ReplicationHandler extends RequestHandlerBase implements SolrCoreAw if (currentIndexFetcher != pollingIndexFetcher) { currentIndexFetcher.destroy(); } + return new IndexFetchResult(IndexFetchResult.FAILED_BY_EXCEPTION_MESSAGE, false, e); } finally { if (pollingIndexFetcher != null) { if( currentIndexFetcher != pollingIndexFetcher) { currentIndexFetcher.destroy(); } - currentIndexFetcher = pollingIndexFetcher; } indexFetchLock.unlock(); } - return false; } boolean isReplicating() { @@ -1151,7 +1151,7 @@ public class ReplicationHandler extends RequestHandlerBase implements SolrCoreAw try { LOG.debug("Polling for index modifications"); markScheduledExecutionStart(); - boolean pollSuccess = doFetch(null, false); + boolean pollSuccess = doFetch(null, false).getSuccessful(); if (pollListener != null) pollListener.onComplete(core, pollSuccess); } catch (Exception e) { LOG.error("Exception in fetching index", e);