diff --git a/solr/core/src/java/org/apache/solr/core/SolrCore.java b/solr/core/src/java/org/apache/solr/core/SolrCore.java index 1169ed7f3a0..1ffbcc2ad31 100644 --- a/solr/core/src/java/org/apache/solr/core/SolrCore.java +++ b/solr/core/src/java/org/apache/solr/core/SolrCore.java @@ -83,9 +83,9 @@ import org.apache.solr.common.util.IOUtils; import org.apache.solr.common.util.NamedList; import org.apache.solr.common.util.SimpleOrderedMap; import org.apache.solr.core.DirectoryFactory.DirContext; +import org.apache.solr.handler.IndexFetcher; import org.apache.solr.handler.ReplicationHandler; import org.apache.solr.handler.RequestHandlerBase; -import org.apache.solr.handler.SnapPuller; import org.apache.solr.handler.admin.ShowFileRequestHandler; import org.apache.solr.handler.component.DebugComponent; import org.apache.solr.handler.component.ExpandComponent; @@ -291,7 +291,7 @@ public final class SolrCore implements SolrInfoMBean, Closeable { dir = getDirectoryFactory().get(getDataDir(), DirContext.META_DATA, getSolrConfig().indexConfig.lockType); IndexInput input; try { - input = dir.openInput(SnapPuller.INDEX_PROPERTIES, IOContext.DEFAULT); + input = dir.openInput(IndexFetcher.INDEX_PROPERTIES, IOContext.DEFAULT); } catch (FileNotFoundException | NoSuchFileException e) { input = null; } @@ -307,7 +307,7 @@ public final class SolrCore implements SolrInfoMBean, Closeable { } } catch (Exception e) { - log.error("Unable to load " + SnapPuller.INDEX_PROPERTIES, e); + log.error("Unable to load " + IndexFetcher.INDEX_PROPERTIES, e); } finally { IOUtils.closeQuietly(is); } diff --git a/solr/core/src/java/org/apache/solr/handler/SnapPuller.java b/solr/core/src/java/org/apache/solr/handler/IndexFetcher.java similarity index 91% rename from solr/core/src/java/org/apache/solr/handler/SnapPuller.java rename to solr/core/src/java/org/apache/solr/handler/IndexFetcher.java index aba696badb7..3cda5ef61f3 100644 --- a/solr/core/src/java/org/apache/solr/handler/SnapPuller.java +++ b/solr/core/src/java/org/apache/solr/handler/IndexFetcher.java @@ -67,11 +67,7 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; -import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.regex.Matcher; -import java.util.regex.Pattern; import java.util.zip.Adler32; import java.util.zip.Checksum; import java.util.zip.InflaterInputStream; @@ -94,7 +90,6 @@ import org.apache.solr.common.SolrException; import org.apache.solr.common.SolrException.ErrorCode; import org.apache.solr.common.params.CommonParams; import org.apache.solr.common.params.ModifiableSolrParams; -import org.apache.solr.common.util.ExecutorUtil; import org.apache.solr.common.util.FastInputStream; import org.apache.solr.common.util.NamedList; import org.apache.solr.core.DirectoryFactory; @@ -121,24 +116,16 @@ import org.slf4j.LoggerFactory; * * @since solr 1.4 */ -public class SnapPuller { +public class IndexFetcher { private static final int _100K = 100000; public static final String INDEX_PROPERTIES = "index.properties"; - private static final Logger LOG = LoggerFactory.getLogger(SnapPuller.class.getName()); + private static final Logger LOG = LoggerFactory.getLogger(IndexFetcher.class.getName()); private final String masterUrl; - private final ReplicationHandler replicationHandler; - - private final Integer pollInterval; - - private String pollIntervalStr; - - private ScheduledExecutorService executorService; - - private volatile long executorStartTime; + final ReplicationHandler replicationHandler; private volatile long replicationStartTime; @@ -166,11 +153,6 @@ public class SnapPuller { private boolean useExternal = false; - /** - * Disable the timer task for polling - */ - private AtomicBoolean pollDisabled = new AtomicBoolean(false); - private final HttpClient myHttpClient; private static HttpClient createHttpClient(SolrCore core, String connTimeout, String readTimeout, String httpBasicAuthUser, String httpBasicAuthPassword, boolean useCompression) { @@ -184,7 +166,7 @@ public class SnapPuller { return HttpClientUtil.createClient(httpClientParams, core.getCoreDescriptor().getCoreContainer().getUpdateShardHandler().getConnectionManager()); } - public SnapPuller(final NamedList initArgs, final ReplicationHandler handler, final SolrCore sc) { + public IndexFetcher(final NamedList initArgs, final ReplicationHandler handler, final SolrCore sc) { solrCore = sc; String masterUrl = (String) initArgs.get(MASTER_URL); if (masterUrl == null) @@ -197,8 +179,6 @@ public class SnapPuller { this.masterUrl = masterUrl; this.replicationHandler = handler; - pollIntervalStr = (String) initArgs.get(POLL_INTERVAL); - pollInterval = readInterval(pollIntervalStr); String compress = (String) initArgs.get(COMPRESSION); useInternal = INTERNAL.equals(compress); useExternal = EXTERNAL.equals(compress); @@ -207,35 +187,6 @@ public class SnapPuller { String httpBasicAuthUser = (String) initArgs.get(HttpClientUtil.PROP_BASIC_AUTH_USER); String httpBasicAuthPassword = (String) initArgs.get(HttpClientUtil.PROP_BASIC_AUTH_PASS); myHttpClient = createHttpClient(solrCore, connTimeout, readTimeout, httpBasicAuthUser, httpBasicAuthPassword, useExternal); - if (pollInterval != null && pollInterval > 0) { - startExecutorService(); - } else { - LOG.info(" No value set for 'pollInterval'. Timer Task not started."); - } - } - - private void startExecutorService() { - Runnable task = new Runnable() { - @Override - public void run() { - if (pollDisabled.get()) { - LOG.info("Poll disabled"); - return; - } - try { - LOG.debug("Polling for index modifications"); - executorStartTime = System.currentTimeMillis(); - replicationHandler.doFetch(null, false); - } catch (Exception e) { - LOG.error("Exception in fetching index", e); - } - } - }; - executorService = Executors.newSingleThreadScheduledExecutor( - new DefaultSolrThreadFactory("snapPuller")); - long initialDelay = pollInterval - (System.currentTimeMillis() % pollInterval); - executorService.scheduleAtFixedRate(task, initialDelay, pollInterval, TimeUnit.MILLISECONDS); - LOG.info("Poll Scheduled at an interval of " + pollInterval + "ms"); } /** @@ -427,13 +378,13 @@ public class SnapPuller { Thread.sleep(1000); c++; if (c >= 30) { - LOG.warn("SnapPuller unable to cleanup unused lucene index files so we must do a full copy instead"); + LOG.warn("IndexFetcher unable to cleanup unused lucene index files so we must do a full copy instead"); isFullCopyNeeded = true; break; } } if (c > 0) { - LOG.info("SnapPuller slept for " + (c * 1000) + "ms for unused lucene index files to be delete-able"); + LOG.info("IndexFetcher slept for " + (c * 1000) + "ms for unused lucene index files to be delete-able"); } } finally { writer.decref(); @@ -634,7 +585,7 @@ public class SnapPuller { props.setProperty(TIMES_CONFIG_REPLICATED, String.valueOf(confFilesCount)); } - props.setProperty(LAST_CYCLE_BYTES_DOWNLOADED, String.valueOf(getTotalBytesDownloaded(this))); + props.setProperty(LAST_CYCLE_BYTES_DOWNLOADED, String.valueOf(getTotalBytesDownloaded())); if (!successfulInstall) { int numFailures = 1; if (props.containsKey(TIMES_FAILED)) { @@ -663,20 +614,20 @@ public class SnapPuller { } } - static long getTotalBytesDownloaded(SnapPuller snappuller) { + long getTotalBytesDownloaded() { long bytesDownloaded = 0; //get size from list of files to download - for (Map file : snappuller.getFilesDownloaded()) { + for (Map file : getFilesDownloaded()) { bytesDownloaded += (Long) file.get(SIZE); } //get size from list of conf files to download - for (Map file : snappuller.getConfFilesDownloaded()) { + for (Map file : getConfFilesDownloaded()) { bytesDownloaded += (Long) file.get(SIZE); } //get size from current file being downloaded - Map currentFile = snappuller.getCurrentFile(); + Map currentFile = getCurrentFile(); if (currentFile != null) { if (currentFile.containsKey("bytesDownloaded")) { bytesDownloaded += (Long) currentFile.get("bytesDownloaded"); @@ -1053,33 +1004,33 @@ public class SnapPuller { Directory dir = null; try { dir = solrCore.getDirectoryFactory().get(solrCore.getDataDir(), DirContext.META_DATA, solrCore.getSolrConfig().indexConfig.lockType); - if (slowFileExists(dir, SnapPuller.INDEX_PROPERTIES)){ - final IndexInput input = dir.openInput(SnapPuller.INDEX_PROPERTIES, DirectoryFactory.IOCONTEXT_NO_CACHE); + if (slowFileExists(dir, IndexFetcher.INDEX_PROPERTIES)){ + final IndexInput input = dir.openInput(IndexFetcher.INDEX_PROPERTIES, DirectoryFactory.IOCONTEXT_NO_CACHE); final InputStream is = new PropertiesInputStream(input); try { p.load(new InputStreamReader(is, StandardCharsets.UTF_8)); } catch (Exception e) { - LOG.error("Unable to load " + SnapPuller.INDEX_PROPERTIES, e); + LOG.error("Unable to load " + IndexFetcher.INDEX_PROPERTIES, e); } finally { IOUtils.closeQuietly(is); } } try { - dir.deleteFile(SnapPuller.INDEX_PROPERTIES); + dir.deleteFile(IndexFetcher.INDEX_PROPERTIES); } catch (IOException e) { // no problem } - final IndexOutput out = dir.createOutput(SnapPuller.INDEX_PROPERTIES, DirectoryFactory.IOCONTEXT_NO_CACHE); + final IndexOutput out = dir.createOutput(IndexFetcher.INDEX_PROPERTIES, DirectoryFactory.IOCONTEXT_NO_CACHE); p.put("index", tmpIdxDirName); Writer os = null; try { os = new OutputStreamWriter(new PropertiesOutputStream(out), StandardCharsets.UTF_8); - p.store(os, SnapPuller.INDEX_PROPERTIES); + p.store(os, IndexFetcher.INDEX_PROPERTIES); dir.sync(Collections.singleton(INDEX_PROPERTIES)); } catch (Exception e) { throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, - "Unable to write " + SnapPuller.INDEX_PROPERTIES, e); + "Unable to write " + IndexFetcher.INDEX_PROPERTIES, e); } finally { IOUtils.closeQuietly(os); } @@ -1161,25 +1112,9 @@ public class SnapPuller { } /** - * Disable periodic polling + * Stops the ongoing fetch */ - void disablePoll() { - pollDisabled.set(true); - LOG.info("inside disable poll, value of pollDisabled = " + pollDisabled); - } - - /** - * Enable periodic polling - */ - void enablePoll() { - pollDisabled.set(false); - LOG.info("inside enable poll, value of pollDisabled = " + pollDisabled); - } - - /** - * Stops the ongoing pull - */ - void abortPull() { + void abortFetch() { stop = true; } @@ -1187,6 +1122,13 @@ public class SnapPuller { return replicationStartTime; } + long getReplicationTimeElapsed() { + long timeElapsed = 0; + if (getReplicationStartTime() > 0) + timeElapsed = TimeUnit.SECONDS.convert(System.currentTimeMillis() - getReplicationStartTime(), TimeUnit.MILLISECONDS); + return timeElapsed; + } + List> getConfFilesToDownload() { //make a copy first because it can be null later List> tmp = confFilesToDownload; @@ -1224,17 +1166,6 @@ public class SnapPuller { return tmp; } - boolean isPollingDisabled() { - return pollDisabled.get(); - } - - Long getNextScheduledExecTime() { - Long nextTime = null; - if (executorStartTime > 0) - nextTime = executorStartTime + pollInterval; - return nextTime; - } - private static class ReplicationHandlerException extends InterruptedException { public ReplicationHandlerException(String message) { super(message); @@ -1586,55 +1517,14 @@ public class SnapPuller { } } - static Integer readInterval(String interval) { - if (interval == null) - return null; - int result = 0; - Matcher m = INTERVAL_PATTERN.matcher(interval.trim()); - if (m.find()) { - String hr = m.group(1); - String min = m.group(2); - String sec = m.group(3); - result = 0; - try { - if (sec != null && sec.length() > 0) - result += Integer.parseInt(sec); - if (min != null && min.length() > 0) - result += (60 * Integer.parseInt(min)); - if (hr != null && hr.length() > 0) - result += (60 * 60 * Integer.parseInt(hr)); - result *= 1000; - } catch (NumberFormatException e) { - throw new SolrException(ErrorCode.SERVER_ERROR, INTERVAL_ERR_MSG); - } - } else { - throw new SolrException(ErrorCode.SERVER_ERROR, INTERVAL_ERR_MSG); - } - - return result; - } - public void destroy() { - try { - if (executorService != null) executorService.shutdown(); - } finally { - try { - abortPull(); - } finally { - if (executorService != null) ExecutorUtil - .shutdownNowAndAwaitTermination(executorService); - } - } + abortFetch(); } String getMasterUrl() { return masterUrl; } - String getPollInterval() { - return pollIntervalStr; - } - private static final int MAX_RETRIES = 5; private static final int NO_CONTENT = 1; @@ -1643,12 +1533,6 @@ public class SnapPuller { public static final String REPLICATION_PROPERTIES = "replication.properties"; - public static final String POLL_INTERVAL = "pollInterval"; - - public static final String INTERVAL_ERR_MSG = "The " + POLL_INTERVAL + " must be in this format 'HH:mm:ss'"; - - private static final Pattern INTERVAL_PATTERN = Pattern.compile("(\\d*?):(\\d*?):(\\d*)"); - static final String INDEX_REPLICATED_AT = "indexReplicatedAt"; static final String TIMES_INDEX_REPLICATED = "timesIndexReplicated"; diff --git a/solr/core/src/java/org/apache/solr/handler/ReplicationHandler.java b/solr/core/src/java/org/apache/solr/handler/ReplicationHandler.java index f4ab68d9286..54630c92356 100644 --- a/solr/core/src/java/org/apache/solr/handler/ReplicationHandler.java +++ b/solr/core/src/java/org/apache/solr/handler/ReplicationHandler.java @@ -36,9 +36,13 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.ReentrantLock; +import java.util.regex.Matcher; +import java.util.regex.Pattern; import java.util.zip.Adler32; import java.util.zip.Checksum; import java.util.zip.DeflaterOutputStream; @@ -60,6 +64,7 @@ import org.apache.solr.common.SolrException.ErrorCode; import org.apache.solr.common.params.CommonParams; import org.apache.solr.common.params.ModifiableSolrParams; import org.apache.solr.common.params.SolrParams; +import org.apache.solr.common.util.ExecutorUtil; import org.apache.solr.common.util.FastOutputStream; import org.apache.solr.common.util.NamedList; import org.apache.solr.common.util.SimpleOrderedMap; @@ -75,6 +80,7 @@ import org.apache.solr.request.SolrQueryRequest; import org.apache.solr.response.SolrQueryResponse; import org.apache.solr.search.SolrIndexSearcher; import org.apache.solr.update.SolrIndexWriter; +import org.apache.solr.util.DefaultSolrThreadFactory; import org.apache.solr.util.NumberUtils; import org.apache.solr.util.PropertiesInputStream; import org.apache.solr.util.RefCounted; @@ -90,8 +96,8 @@ import org.slf4j.LoggerFactory; * file (command=filecontent&file=<FILE_NAME>) You can optionally specify an offset and length to get that * chunk of the file. You can request a configuration file by using "cf" parameter instead of the "file" parameter. *
  • Get status/statistics (command=details)
  • When running on the slave, it provides the following - * commands

    1. Perform a snap pull now (command=snappull)
    2. Get status/statistics (command=details)
    3. - *
    4. Abort a snap pull (command=abort)
    5. Enable/Disable polling the master for new versions (command=enablepoll + * commands
      1. Perform an index fetch now (command=snappull)
      2. Get status/statistics (command=details)
      3. + *
      4. Abort an index fetch (command=abort)
      5. Enable/Disable polling the master for new versions (command=enablepoll * or command=disablepoll)
      * * @@ -134,9 +140,9 @@ public class ReplicationHandler extends RequestHandlerBase implements SolrCoreAw } } - private SnapPuller snapPuller; + private IndexFetcher pollingIndexFetcher; - private ReentrantLock snapPullLock = new ReentrantLock(); + private ReentrantLock indexFetchLock = new ReentrantLock(); private String includeConfFiles; @@ -151,14 +157,18 @@ public class ReplicationHandler extends RequestHandlerBase implements SolrCoreAw private boolean replicateOnCommit = false; private boolean replicateOnStart = false; - + + private ScheduledExecutorService executorService; + + private volatile long executorStartTime; + private int numberBackupsToKeep = 0; //zero: do not delete old backups private int numTimesReplicated = 0; private final Map confFileInfoCache = new HashMap<>(); - private Integer reserveCommitDuration = SnapPuller.readInterval("00:00:10"); + private Integer reserveCommitDuration = readInterval("00:00:10"); volatile IndexCommit indexCommitPoint; @@ -166,6 +176,19 @@ public class ReplicationHandler extends RequestHandlerBase implements SolrCoreAw private AtomicBoolean replicationEnabled = new AtomicBoolean(true); + private Integer pollInterval; + + private String pollIntervalStr; + + /** + * Disable the timer task for polling + */ + private AtomicBoolean pollDisabled = new AtomicBoolean(false); + + String getPollInterval() { + return pollIntervalStr; + } + @Override public void handleRequestBody(SolrQueryRequest req, SolrQueryResponse rsp) throws Exception { rsp.setHttpCaching(false); @@ -221,38 +244,38 @@ public class ReplicationHandler extends RequestHandlerBase implements SolrCoreAw return; } final SolrParams paramsCopy = new ModifiableSolrParams(solrParams); - Thread puller = new Thread("explicit-fetchindex-cmd") { + Thread fetchThread = new Thread("explicit-fetchindex-cmd") { @Override public void run() { doFetch(paramsCopy, false); } }; - puller.setDaemon(false); - puller.start(); + fetchThread.setDaemon(false); + fetchThread.start(); if (solrParams.getBool(WAIT, false)) { - puller.join(); + fetchThread.join(); } rsp.add(STATUS, OK_STATUS); } else if (command.equalsIgnoreCase(CMD_DISABLE_POLL)) { - if (snapPuller != null){ - snapPuller.disablePoll(); + if (pollingIndexFetcher != null){ + disablePoll(); rsp.add(STATUS, OK_STATUS); } else { rsp.add(STATUS, ERR_STATUS); rsp.add("message","No slave configured"); } } else if (command.equalsIgnoreCase(CMD_ENABLE_POLL)) { - if (snapPuller != null){ - snapPuller.enablePoll(); + if (pollingIndexFetcher != null){ + enablePoll(); rsp.add(STATUS, OK_STATUS); }else { rsp.add(STATUS,ERR_STATUS); rsp.add("message","No slave configured"); } } else if (command.equalsIgnoreCase(CMD_ABORT_FETCH)) { - SnapPuller temp = tempSnapPuller; - if (temp != null){ - temp.abortPull(); + IndexFetcher fetcher = currentIndexFetcher; + if (fetcher != null){ + fetcher.abortFetch(); rsp.add(STATUS, OK_STATUS); } else { rsp.add(STATUS,ERR_STATUS); @@ -321,38 +344,35 @@ public class ReplicationHandler extends RequestHandlerBase implements SolrCoreAw return null; } - private volatile SnapPuller tempSnapPuller; + private volatile IndexFetcher currentIndexFetcher; public boolean doFetch(SolrParams solrParams, boolean forceReplication) { String masterUrl = solrParams == null ? null : solrParams.get(MASTER_URL); - if (!snapPullLock.tryLock()) + if (!indexFetchLock.tryLock()) return false; try { if (masterUrl != null) { - if (tempSnapPuller != null && tempSnapPuller != snapPuller) { - tempSnapPuller.destroy(); + if (currentIndexFetcher != null && currentIndexFetcher != pollingIndexFetcher) { + currentIndexFetcher.destroy(); } - - NamedList nl = solrParams.toNamedList(); - nl.remove(SnapPuller.POLL_INTERVAL); - tempSnapPuller = new SnapPuller(nl, this, core); + currentIndexFetcher = new IndexFetcher(solrParams.toNamedList(), this, core); } else { - tempSnapPuller = snapPuller; + currentIndexFetcher = pollingIndexFetcher; } - return tempSnapPuller.fetchLatestIndex(core, forceReplication); + return currentIndexFetcher.fetchLatestIndex(core, forceReplication); } catch (Exception e) { - SolrException.log(LOG, "SnapPull failed ", e); + SolrException.log(LOG, "Index fetch failed ", e); } finally { - if (snapPuller != null) { - tempSnapPuller = snapPuller; + if (pollingIndexFetcher != null) { + currentIndexFetcher = pollingIndexFetcher; } - snapPullLock.unlock(); + indexFetchLock.unlock(); } return false; } boolean isReplicating() { - return snapPullLock.isLocked(); + return indexFetchLock.isLocked(); } private void doSnapShoot(SolrParams params, SolrQueryResponse rsp, @@ -390,10 +410,10 @@ public class ReplicationHandler extends RequestHandlerBase implements SolrCoreAw /** * This method adds an Object of FileStream to the response . The FileStream implements a custom protocol which is - * understood by SnapPuller.FileFetcher + * understood by IndexFetcher.FileFetcher * - * @see org.apache.solr.handler.SnapPuller.LocalFsFileFetcher - * @see org.apache.solr.handler.SnapPuller.DirectoryFileFetcher + * @see IndexFetcher.LocalFsFileFetcher + * @see IndexFetcher.DirectoryFileFetcher */ private void getFileStream(SolrParams solrParams, SolrQueryResponse rsp) { ModifiableSolrParams rawParams = new ModifiableSolrParams(solrParams); @@ -538,18 +558,28 @@ public class ReplicationHandler extends RequestHandlerBase implements SolrCoreAw } void disablePoll() { - if (isSlave) - snapPuller.disablePoll(); + if (isSlave) { + pollDisabled.set(true); + LOG.info("inside disable poll, value of pollDisabled = " + pollDisabled); + } } void enablePoll() { - if (isSlave) - snapPuller.enablePoll(); + if (isSlave) { + pollDisabled.set(false); + LOG.info("inside enable poll, value of pollDisabled = " + pollDisabled); + } } boolean isPollingDisabled() { - if (snapPuller == null) return true; - return snapPuller.isPollingDisabled(); + return pollDisabled.get(); + } + + Long getNextScheduledExecTime() { + Long nextTime = null; + if (executorStartTime > 0) + nextTime = executorStartTime + pollInterval; + return nextTime; } int getTimesReplicatedSinceStartup() { @@ -611,31 +641,31 @@ public class ReplicationHandler extends RequestHandlerBase implements SolrCoreAw list.add("isMaster", String.valueOf(isMaster)); list.add("isSlave", String.valueOf(isSlave)); - SnapPuller snapPuller = tempSnapPuller; - if (snapPuller != null) { - list.add(MASTER_URL, snapPuller.getMasterUrl()); - if (snapPuller.getPollInterval() != null) { - list.add(SnapPuller.POLL_INTERVAL, snapPuller.getPollInterval()); + IndexFetcher fetcher = currentIndexFetcher; + if (fetcher != null) { + list.add(MASTER_URL, fetcher.getMasterUrl()); + if (getPollInterval() != null) { + list.add(POLL_INTERVAL, getPollInterval()); } list.add("isPollingDisabled", String.valueOf(isPollingDisabled())); list.add("isReplicating", String.valueOf(isReplicating())); - long elapsed = getTimeElapsed(snapPuller); - long val = SnapPuller.getTotalBytesDownloaded(snapPuller); + long elapsed = fetcher.getReplicationTimeElapsed(); + long val = fetcher.getTotalBytesDownloaded(); if (elapsed > 0) { list.add("timeElapsed", elapsed); list.add("bytesDownloaded", val); list.add("downloadSpeed", val / elapsed); } Properties props = loadReplicationProperties(); - addVal(list, SnapPuller.PREVIOUS_CYCLE_TIME_TAKEN, props, Long.class); - addVal(list, SnapPuller.INDEX_REPLICATED_AT, props, Date.class); - addVal(list, SnapPuller.CONF_FILES_REPLICATED_AT, props, Date.class); - addVal(list, SnapPuller.REPLICATION_FAILED_AT, props, Date.class); - addVal(list, SnapPuller.TIMES_FAILED, props, Integer.class); - addVal(list, SnapPuller.TIMES_INDEX_REPLICATED, props, Integer.class); - addVal(list, SnapPuller.LAST_CYCLE_BYTES_DOWNLOADED, props, Long.class); - addVal(list, SnapPuller.TIMES_CONFIG_REPLICATED, props, Integer.class); - addVal(list, SnapPuller.CONF_FILES_REPLICATED, props, String.class); + addVal(list, IndexFetcher.PREVIOUS_CYCLE_TIME_TAKEN, props, Long.class); + addVal(list, IndexFetcher.INDEX_REPLICATED_AT, props, Date.class); + addVal(list, IndexFetcher.CONF_FILES_REPLICATED_AT, props, Date.class); + addVal(list, IndexFetcher.REPLICATION_FAILED_AT, props, Date.class); + addVal(list, IndexFetcher.TIMES_FAILED, props, Integer.class); + addVal(list, IndexFetcher.TIMES_INDEX_REPLICATED, props, Integer.class); + addVal(list, IndexFetcher.LAST_CYCLE_BYTES_DOWNLOADED, props, Long.class); + addVal(list, IndexFetcher.TIMES_CONFIG_REPLICATED, props, Integer.class); + addVal(list, IndexFetcher.CONF_FILES_REPLICATED, props, String.class); } if (isMaster) { if (includeConfFiles != null) list.add("confFilesToReplicate", includeConfFiles); @@ -677,12 +707,12 @@ public class ReplicationHandler extends RequestHandlerBase implements SolrCoreAw master.add("replicableGeneration", repCommitInfo.generation); } - SnapPuller snapPuller = tempSnapPuller; - if (snapPuller != null) { + IndexFetcher fetcher = currentIndexFetcher; + if (fetcher != null) { Properties props = loadReplicationProperties(); if (showSlaveDetails) { try { - NamedList nl = snapPuller.getDetails(); + NamedList nl = fetcher.getDetails(); slave.add("masterDetails", nl.get(CMD_DETAILS)); } catch (Exception e) { LOG.warn( @@ -691,26 +721,26 @@ public class ReplicationHandler extends RequestHandlerBase implements SolrCoreAw slave.add(ERR_STATUS, "invalid_master"); } } - slave.add(MASTER_URL, snapPuller.getMasterUrl()); - if (snapPuller.getPollInterval() != null) { - slave.add(SnapPuller.POLL_INTERVAL, snapPuller.getPollInterval()); + slave.add(MASTER_URL, fetcher.getMasterUrl()); + if (getPollInterval() != null) { + slave.add(POLL_INTERVAL, getPollInterval()); } - if (snapPuller.getNextScheduledExecTime() != null && !isPollingDisabled()) { - slave.add(NEXT_EXECUTION_AT, new Date(snapPuller.getNextScheduledExecTime()).toString()); + if (getNextScheduledExecTime() != null && !isPollingDisabled()) { + slave.add(NEXT_EXECUTION_AT, new Date(getNextScheduledExecTime()).toString()); } else if (isPollingDisabled()) { slave.add(NEXT_EXECUTION_AT, "Polling disabled"); } - addVal(slave, SnapPuller.INDEX_REPLICATED_AT, props, Date.class); - addVal(slave, SnapPuller.INDEX_REPLICATED_AT_LIST, props, List.class); - addVal(slave, SnapPuller.REPLICATION_FAILED_AT_LIST, props, List.class); - addVal(slave, SnapPuller.TIMES_INDEX_REPLICATED, props, Integer.class); - addVal(slave, SnapPuller.CONF_FILES_REPLICATED, props, Integer.class); - addVal(slave, SnapPuller.TIMES_CONFIG_REPLICATED, props, Integer.class); - addVal(slave, SnapPuller.CONF_FILES_REPLICATED_AT, props, Integer.class); - addVal(slave, SnapPuller.LAST_CYCLE_BYTES_DOWNLOADED, props, Long.class); - addVal(slave, SnapPuller.TIMES_FAILED, props, Integer.class); - addVal(slave, SnapPuller.REPLICATION_FAILED_AT, props, Date.class); - addVal(slave, SnapPuller.PREVIOUS_CYCLE_TIME_TAKEN, props, Long.class); + addVal(slave, IndexFetcher.INDEX_REPLICATED_AT, props, Date.class); + addVal(slave, IndexFetcher.INDEX_REPLICATED_AT_LIST, props, List.class); + addVal(slave, IndexFetcher.REPLICATION_FAILED_AT_LIST, props, List.class); + addVal(slave, IndexFetcher.TIMES_INDEX_REPLICATED, props, Integer.class); + addVal(slave, IndexFetcher.CONF_FILES_REPLICATED, props, Integer.class); + addVal(slave, IndexFetcher.TIMES_CONFIG_REPLICATED, props, Integer.class); + addVal(slave, IndexFetcher.CONF_FILES_REPLICATED_AT, props, Integer.class); + addVal(slave, IndexFetcher.LAST_CYCLE_BYTES_DOWNLOADED, props, Long.class); + addVal(slave, IndexFetcher.TIMES_FAILED, props, Integer.class); + addVal(slave, IndexFetcher.REPLICATION_FAILED_AT, props, Date.class); + addVal(slave, IndexFetcher.PREVIOUS_CYCLE_TIME_TAKEN, props, Long.class); slave.add("currentDate", new Date().toString()); slave.add("isPollingDisabled", String.valueOf(isPollingDisabled())); @@ -720,13 +750,13 @@ public class ReplicationHandler extends RequestHandlerBase implements SolrCoreAw try { long bytesToDownload = 0; List filesToDownload = new ArrayList<>(); - for (Map file : snapPuller.getFilesToDownload()) { + for (Map file : fetcher.getFilesToDownload()) { filesToDownload.add((String) file.get(NAME)); bytesToDownload += (Long) file.get(SIZE); } //get list of conf files to download - for (Map file : snapPuller.getConfFilesToDownload()) { + for (Map file : fetcher.getConfFilesToDownload()) { filesToDownload.add((String) file.get(NAME)); bytesToDownload += (Long) file.get(SIZE); } @@ -737,18 +767,18 @@ public class ReplicationHandler extends RequestHandlerBase implements SolrCoreAw long bytesDownloaded = 0; List filesDownloaded = new ArrayList<>(); - for (Map file : snapPuller.getFilesDownloaded()) { + for (Map file : fetcher.getFilesDownloaded()) { filesDownloaded.add((String) file.get(NAME)); bytesDownloaded += (Long) file.get(SIZE); } //get list of conf files downloaded - for (Map file : snapPuller.getConfFilesDownloaded()) { + for (Map file : fetcher.getConfFilesDownloaded()) { filesDownloaded.add((String) file.get(NAME)); bytesDownloaded += (Long) file.get(SIZE); } - Map currentFile = snapPuller.getCurrentFile(); + Map currentFile = fetcher.getCurrentFile(); String currFile = null; long currFileSize = 0, currFileSizeDownloaded = 0; float percentDownloaded = 0; @@ -767,10 +797,10 @@ public class ReplicationHandler extends RequestHandlerBase implements SolrCoreAw long estimatedTimeRemaining = 0; - if (snapPuller.getReplicationStartTime() > 0) { - slave.add("replicationStartTime", new Date(snapPuller.getReplicationStartTime()).toString()); + if (fetcher.getReplicationStartTime() > 0) { + slave.add("replicationStartTime", new Date(fetcher.getReplicationStartTime()).toString()); } - long elapsed = getTimeElapsed(snapPuller); + long elapsed = fetcher.getReplicationTimeElapsed(); slave.add("timeElapsed", String.valueOf(elapsed) + "s"); if (bytesDownloaded > 0) @@ -840,13 +870,6 @@ public class ReplicationHandler extends RequestHandlerBase implements SolrCoreAw return replicateAfter; } - private long getTimeElapsed(SnapPuller snapPuller) { - long timeElapsed = 0; - if (snapPuller.getReplicationStartTime() > 0) - timeElapsed = TimeUnit.SECONDS.convert(System.currentTimeMillis() - snapPuller.getReplicationStartTime(), TimeUnit.MILLISECONDS); - return timeElapsed; - } - Properties loadReplicationProperties() { Directory dir = null; try { @@ -856,7 +879,7 @@ public class ReplicationHandler extends RequestHandlerBase implements SolrCoreAw IndexInput input; try { input = dir.openInput( - SnapPuller.REPLICATION_PROPERTIES, IOContext.DEFAULT); + IndexFetcher.REPLICATION_PROPERTIES, IOContext.DEFAULT); } catch (FileNotFoundException | NoSuchFileException e) { return new Properties(); } @@ -887,6 +910,37 @@ public class ReplicationHandler extends RequestHandlerBase implements SolrCoreAw // } // } + private void setupPolling(String intervalStr) { + pollIntervalStr = intervalStr; + pollInterval = readInterval(pollIntervalStr); + if (pollInterval == null || pollInterval <= 0) { + LOG.info(" No value set for 'pollInterval'. Timer Task not started."); + return; + } + + Runnable task = new Runnable() { + @Override + public void run() { + if (pollDisabled.get()) { + LOG.info("Poll disabled"); + return; + } + try { + LOG.debug("Polling for index modifications"); + executorStartTime = System.currentTimeMillis(); + doFetch(null, false); + } catch (Exception e) { + LOG.error("Exception in fetching index", e); + } + } + }; + executorService = Executors.newSingleThreadScheduledExecutor( + new DefaultSolrThreadFactory("indexFetcher")); + long initialDelay = pollInterval - (System.currentTimeMillis() % pollInterval); + executorService.scheduleAtFixedRate(task, initialDelay, pollInterval, TimeUnit.MILLISECONDS); + LOG.info("Poll Scheduled at an interval of " + pollInterval + "ms"); + } + @Override @SuppressWarnings("unchecked") public void inform(SolrCore core) { @@ -901,7 +955,8 @@ public class ReplicationHandler extends RequestHandlerBase implements SolrCoreAw NamedList slave = (NamedList) initArgs.get("slave"); boolean enableSlave = isEnabled( slave ); if (enableSlave) { - tempSnapPuller = snapPuller = new SnapPuller(slave, this, core); + currentIndexFetcher = pollingIndexFetcher = new IndexFetcher(slave, this, core); + setupPolling((String) slave.get(POLL_INTERVAL)); isSlave = true; } NamedList master = (NamedList) initArgs.get("master"); @@ -1005,7 +1060,7 @@ public class ReplicationHandler extends RequestHandlerBase implements SolrCoreAw } String reserve = (String) master.get(RESERVE); if (reserve != null && !reserve.trim().equals("")) { - reserveCommitDuration = SnapPuller.readInterval(reserve); + reserveCommitDuration = readInterval(reserve); } LOG.info("Commits will be reserved for " + reserveCommitDuration); isMaster = true; @@ -1029,11 +1084,20 @@ public class ReplicationHandler extends RequestHandlerBase implements SolrCoreAw core.addCloseHook(new CloseHook() { @Override public void preClose(SolrCore core) { - if (snapPuller != null) { - snapPuller.destroy(); + try { + if (executorService != null) executorService.shutdown(); + } finally { + try { + if (pollingIndexFetcher != null) { + pollingIndexFetcher.destroy(); + } + } finally { + if (executorService != null) ExecutorUtil + .shutdownNowAndAwaitTermination(executorService); + } } - if (tempSnapPuller != null && tempSnapPuller != snapPuller) { - tempSnapPuller.destroy(); + if (currentIndexFetcher != null && currentIndexFetcher != pollingIndexFetcher) { + currentIndexFetcher.destroy(); } } @@ -1307,8 +1371,40 @@ public class ReplicationHandler extends RequestHandlerBase implements SolrCoreAw releaseCommitPointAndExtendReserve(); } } - } - + } + + static Integer readInterval(String interval) { + if (interval == null) + return null; + int result = 0; + if (interval != null) { + Matcher m = INTERVAL_PATTERN.matcher(interval.trim()); + if (m.find()) { + String hr = m.group(1); + String min = m.group(2); + String sec = m.group(3); + result = 0; + try { + if (sec != null && sec.length() > 0) + result += Integer.parseInt(sec); + if (min != null && min.length() > 0) + result += (60 * Integer.parseInt(min)); + if (hr != null && hr.length() > 0) + result += (60 * 60 * Integer.parseInt(hr)); + result *= 1000; + } catch (NumberFormatException e) { + throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, + INTERVAL_ERR_MSG); + } + } else { + throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, + INTERVAL_ERR_MSG); + } + + } + return result; + } + public static final String MASTER_URL = "masterUrl"; public static final String STATUS = "status"; @@ -1369,6 +1465,12 @@ public class ReplicationHandler extends RequestHandlerBase implements SolrCoreAw public static final String FILE_STREAM = "filestream"; + public static final String POLL_INTERVAL = "pollInterval"; + + public static final String INTERVAL_ERR_MSG = "The " + POLL_INTERVAL + " must be in this format 'HH:mm:ss'"; + + private static final Pattern INTERVAL_PATTERN = Pattern.compile("(\\d*?):(\\d*?):(\\d*)"); + public static final int PACKET_SZ = 1024 * 1024; // 1MB public static final String RESERVE = "commitReserveDuration"; diff --git a/solr/core/src/java/org/apache/solr/handler/SnapShooter.java b/solr/core/src/java/org/apache/solr/handler/SnapShooter.java index 391801928de..60bf3c80654 100644 --- a/solr/core/src/java/org/apache/solr/handler/SnapShooter.java +++ b/solr/core/src/java/org/apache/solr/handler/SnapShooter.java @@ -144,7 +144,7 @@ public class SnapShooter { details.add("snapshotName", snapshotName); LOG.info("Done creating backup snapshot: " + (snapshotName == null ? "" : snapshotName)); } catch (Exception e) { - SnapPuller.delTree(snapShotDir); + IndexFetcher.delTree(snapShotDir); LOG.error("Exception while creating snapshot", e); details.add("snapShootException", e.getMessage()); } finally { @@ -170,7 +170,7 @@ public class SnapShooter { int i=1; for (OldBackupDirectory dir : dirs) { if (i++ > numberToKeep) { - SnapPuller.delTree(dir.dir); + IndexFetcher.delTree(dir.dir); } } } @@ -181,7 +181,7 @@ public class SnapShooter { NamedList details = new NamedList<>(); boolean isSuccess; File f = new File(snapDir, "snapshot." + snapshotName); - isSuccess = SnapPuller.delTree(f); + isSuccess = IndexFetcher.delTree(f); if(isSuccess) { details.add("status", "success"); diff --git a/solr/core/src/test-files/log4j.properties b/solr/core/src/test-files/log4j.properties index 4a3a20ab863..659b430231b 100644 --- a/solr/core/src/test-files/log4j.properties +++ b/solr/core/src/test-files/log4j.properties @@ -25,7 +25,7 @@ log4j.logger.org.apache.solr.hadoop=INFO #log4j.logger.org.apache.solr.cloud.ChaosMonkey=DEBUG #log4j.logger.org.apache.solr.update.TransactionLog=DEBUG #log4j.logger.org.apache.solr.handler.ReplicationHandler=DEBUG -#log4j.logger.org.apache.solr.handler.SnapPuller=DEBUG +#log4j.logger.org.apache.solr.handler.IndexFetcher=DEBUG #log4j.logger.org.apache.solr.common.cloud.ClusterStateUtil=DEBUG #log4j.logger.org.apache.solr.cloud.OverseerAutoReplicaFailoverThread=DEBUG \ No newline at end of file diff --git a/solr/core/src/test/org/apache/solr/core/TestArbitraryIndexDir.java b/solr/core/src/test/org/apache/solr/core/TestArbitraryIndexDir.java index 2ffb235f0f1..8b005b685c2 100644 --- a/solr/core/src/test/org/apache/solr/core/TestArbitraryIndexDir.java +++ b/solr/core/src/test/org/apache/solr/core/TestArbitraryIndexDir.java @@ -36,7 +36,7 @@ import org.apache.lucene.store.Directory; import org.apache.lucene.util.IOUtils; import org.apache.solr.common.SolrException; import org.apache.solr.common.params.CommonParams; -import org.apache.solr.handler.SnapPuller; +import org.apache.solr.handler.IndexFetcher; import org.apache.solr.util.AbstractSolrTestCase; import org.apache.solr.util.TestHarness; import org.junit.AfterClass; @@ -93,7 +93,7 @@ public class TestArbitraryIndexDir extends AbstractSolrTestCase{ assertU(adoc("id", String.valueOf(1), "name", "name"+String.valueOf(1))); //create a new index dir and index.properties file - File idxprops = new File(h.getCore().getDataDir() + SnapPuller.INDEX_PROPERTIES); + File idxprops = new File(h.getCore().getDataDir() + IndexFetcher.INDEX_PROPERTIES); Properties p = new Properties(); File newDir = new File(h.getCore().getDataDir() + "index_temp"); newDir.mkdirs(); @@ -104,7 +104,7 @@ public class TestArbitraryIndexDir extends AbstractSolrTestCase{ p.store(os, "index properties"); } catch (Exception e) { throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, - "Unable to write " + SnapPuller.INDEX_PROPERTIES, e); + "Unable to write " + IndexFetcher.INDEX_PROPERTIES, e); } finally { IOUtils.closeWhileHandlingException(os); } diff --git a/solr/core/src/test/org/apache/solr/handler/TestReplicationHandler.java b/solr/core/src/test/org/apache/solr/handler/TestReplicationHandler.java index 228ddcb557e..55503977b00 100644 --- a/solr/core/src/test/org/apache/solr/handler/TestReplicationHandler.java +++ b/solr/core/src/test/org/apache/solr/handler/TestReplicationHandler.java @@ -172,17 +172,13 @@ public class TestReplicationHandler extends SolrTestCaseJ4 { } NamedList query(String query, SolrClient s) throws SolrServerException, IOException { - NamedList res = new SimpleOrderedMap(); ModifiableSolrParams params = new ModifiableSolrParams(); params.add("q", query); params.add("sort","id desc"); QueryResponse qres = s.query(params); - - res = qres.getResponse(); - - return res; + return qres.getResponse(); } /** will sleep up to 30 seconds, looking for expectedDocCount */ @@ -304,7 +300,7 @@ public class TestReplicationHandler extends SolrTestCaseJ4 { assertNotNull("slave has slave section", details.get("slave")); // SOLR-2677: assert not false negatives - Object timesFailed = ((NamedList)details.get("slave")).get(SnapPuller.TIMES_FAILED); + Object timesFailed = ((NamedList)details.get("slave")).get(IndexFetcher.TIMES_FAILED); assertEquals("slave has fetch error count", null, timesFailed); @@ -513,7 +509,7 @@ public class TestReplicationHandler extends SolrTestCaseJ4 { slaveClient.close(); slaveClient = createNewSolrClient(slaveJetty.getLocalPort()); - //add a doc with new field and commit on master to trigger snappull from slave. + //add a doc with new field and commit on master to trigger index fetch from slave. index(masterClient, "id", "2000", "name", "name = " + 2000, "newname", "newname = " + 2000); masterClient.commit(); @@ -581,7 +577,7 @@ public class TestReplicationHandler extends SolrTestCaseJ4 { } @Test - public void doTestSnapPullWithMasterUrl() throws Exception { + public void doTestIndexFetchWithMasterUrl() throws Exception { //change solrconfig on slave //this has no entry for pollinginterval slave.copyConfigFile(CONF_DIR + "solrconfig-slave1.xml", "solrconfig.xml"); @@ -608,7 +604,7 @@ public class TestReplicationHandler extends SolrTestCaseJ4 { SolrDocumentList masterQueryResult = (SolrDocumentList) masterQueryRsp.get("response"); assertEquals(nDocs, masterQueryResult.getNumFound()); - // snappull + // index fetch String masterUrl = buildUrl(slaveJetty.getLocalPort()) + "/" + DEFAULT_TEST_CORENAME + "/replication?command=fetchindex&masterUrl="; masterUrl += buildUrl(masterJetty.getLocalPort()) + "/" + DEFAULT_TEST_CORENAME + "/replication"; URL url = new URL(masterUrl); @@ -623,7 +619,7 @@ public class TestReplicationHandler extends SolrTestCaseJ4 { String cmp = BaseDistributedSearchTestCase.compare(masterQueryResult, slaveQueryResult, 0, null); assertEquals(null, cmp); - // snappull from the slave to the master + // index fetch from the slave to the master for (int i = nDocs; i < nDocs + 3; i++) index(slaveClient, "id", i, "name", "name = " + i); @@ -765,7 +761,7 @@ public class TestReplicationHandler extends SolrTestCaseJ4 { .get("response"); assertEquals(totalDocs, masterQueryResult.getNumFound()); - // snappull + // index fetch Date slaveCoreStart = watchCoreStartAt(slaveClient, 30*1000, null); pullFromMasterToSlave(); if (confCoreReload) { @@ -1219,7 +1215,7 @@ public class TestReplicationHandler extends SolrTestCaseJ4 { // record collection1's start time on slave final Date slaveStartTime = watchCoreStartAt(slaveClient, 30*1000, null); - //add a doc with new field and commit on master to trigger snappull from slave. + //add a doc with new field and commit on master to trigger index fetch from slave. index(masterClient, "id", "2000", "name", "name = " + 2000, "newname", "n2000"); masterClient.commit(); rQuery(1, "newname:n2000", masterClient); // sanity check diff --git a/solr/test-framework/src/java/org/apache/solr/core/MockDirectoryFactory.java b/solr/test-framework/src/java/org/apache/solr/core/MockDirectoryFactory.java index d76b7093d6a..7f07f859ea7 100644 --- a/solr/test-framework/src/java/org/apache/solr/core/MockDirectoryFactory.java +++ b/solr/test-framework/src/java/org/apache/solr/core/MockDirectoryFactory.java @@ -69,7 +69,7 @@ public class MockDirectoryFactory extends EphemeralDirectoryFactory { // already been created. mockDirWrapper.setPreventDoubleWrite(false); - // snappuller & co don't seem ready for this: + // IndexFetcher & co don't seem ready for this: mockDirWrapper.setEnableVirusScanner(false); if (allowReadingFilesStillOpenForWrite) {