SOLR-6804: Untangle SnapPuller and ReplicationHandler

This closes #110


git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1664126 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Ramkumar Aiyengar 2015-03-04 19:45:09 +00:00
parent 82e72c8d19
commit de381d2816
8 changed files with 249 additions and 267 deletions

View File

@ -83,9 +83,9 @@ import org.apache.solr.common.util.IOUtils;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.common.util.SimpleOrderedMap;
import org.apache.solr.core.DirectoryFactory.DirContext;
import org.apache.solr.handler.IndexFetcher;
import org.apache.solr.handler.ReplicationHandler;
import org.apache.solr.handler.RequestHandlerBase;
import org.apache.solr.handler.SnapPuller;
import org.apache.solr.handler.admin.ShowFileRequestHandler;
import org.apache.solr.handler.component.DebugComponent;
import org.apache.solr.handler.component.ExpandComponent;
@ -291,7 +291,7 @@ public final class SolrCore implements SolrInfoMBean, Closeable {
dir = getDirectoryFactory().get(getDataDir(), DirContext.META_DATA, getSolrConfig().indexConfig.lockType);
IndexInput input;
try {
input = dir.openInput(SnapPuller.INDEX_PROPERTIES, IOContext.DEFAULT);
input = dir.openInput(IndexFetcher.INDEX_PROPERTIES, IOContext.DEFAULT);
} catch (FileNotFoundException | NoSuchFileException e) {
input = null;
}
@ -307,7 +307,7 @@ public final class SolrCore implements SolrInfoMBean, Closeable {
}
} catch (Exception e) {
log.error("Unable to load " + SnapPuller.INDEX_PROPERTIES, e);
log.error("Unable to load " + IndexFetcher.INDEX_PROPERTIES, e);
} finally {
IOUtils.closeQuietly(is);
}

View File

@ -67,11 +67,7 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.zip.Adler32;
import java.util.zip.Checksum;
import java.util.zip.InflaterInputStream;
@ -94,7 +90,6 @@ import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrException.ErrorCode;
import org.apache.solr.common.params.CommonParams;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.util.ExecutorUtil;
import org.apache.solr.common.util.FastInputStream;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.core.DirectoryFactory;
@ -121,24 +116,16 @@ import org.slf4j.LoggerFactory;
*
* @since solr 1.4
*/
public class SnapPuller {
public class IndexFetcher {
private static final int _100K = 100000;
public static final String INDEX_PROPERTIES = "index.properties";
private static final Logger LOG = LoggerFactory.getLogger(SnapPuller.class.getName());
private static final Logger LOG = LoggerFactory.getLogger(IndexFetcher.class.getName());
private final String masterUrl;
private final ReplicationHandler replicationHandler;
private final Integer pollInterval;
private String pollIntervalStr;
private ScheduledExecutorService executorService;
private volatile long executorStartTime;
final ReplicationHandler replicationHandler;
private volatile long replicationStartTime;
@ -166,11 +153,6 @@ public class SnapPuller {
private boolean useExternal = false;
/**
* Disable the timer task for polling
*/
private AtomicBoolean pollDisabled = new AtomicBoolean(false);
private final HttpClient myHttpClient;
private static HttpClient createHttpClient(SolrCore core, String connTimeout, String readTimeout, String httpBasicAuthUser, String httpBasicAuthPassword, boolean useCompression) {
@ -184,7 +166,7 @@ public class SnapPuller {
return HttpClientUtil.createClient(httpClientParams, core.getCoreDescriptor().getCoreContainer().getUpdateShardHandler().getConnectionManager());
}
public SnapPuller(final NamedList initArgs, final ReplicationHandler handler, final SolrCore sc) {
public IndexFetcher(final NamedList initArgs, final ReplicationHandler handler, final SolrCore sc) {
solrCore = sc;
String masterUrl = (String) initArgs.get(MASTER_URL);
if (masterUrl == null)
@ -197,8 +179,6 @@ public class SnapPuller {
this.masterUrl = masterUrl;
this.replicationHandler = handler;
pollIntervalStr = (String) initArgs.get(POLL_INTERVAL);
pollInterval = readInterval(pollIntervalStr);
String compress = (String) initArgs.get(COMPRESSION);
useInternal = INTERNAL.equals(compress);
useExternal = EXTERNAL.equals(compress);
@ -207,35 +187,6 @@ public class SnapPuller {
String httpBasicAuthUser = (String) initArgs.get(HttpClientUtil.PROP_BASIC_AUTH_USER);
String httpBasicAuthPassword = (String) initArgs.get(HttpClientUtil.PROP_BASIC_AUTH_PASS);
myHttpClient = createHttpClient(solrCore, connTimeout, readTimeout, httpBasicAuthUser, httpBasicAuthPassword, useExternal);
if (pollInterval != null && pollInterval > 0) {
startExecutorService();
} else {
LOG.info(" No value set for 'pollInterval'. Timer Task not started.");
}
}
private void startExecutorService() {
Runnable task = new Runnable() {
@Override
public void run() {
if (pollDisabled.get()) {
LOG.info("Poll disabled");
return;
}
try {
LOG.debug("Polling for index modifications");
executorStartTime = System.currentTimeMillis();
replicationHandler.doFetch(null, false);
} catch (Exception e) {
LOG.error("Exception in fetching index", e);
}
}
};
executorService = Executors.newSingleThreadScheduledExecutor(
new DefaultSolrThreadFactory("snapPuller"));
long initialDelay = pollInterval - (System.currentTimeMillis() % pollInterval);
executorService.scheduleAtFixedRate(task, initialDelay, pollInterval, TimeUnit.MILLISECONDS);
LOG.info("Poll Scheduled at an interval of " + pollInterval + "ms");
}
/**
@ -427,13 +378,13 @@ public class SnapPuller {
Thread.sleep(1000);
c++;
if (c >= 30) {
LOG.warn("SnapPuller unable to cleanup unused lucene index files so we must do a full copy instead");
LOG.warn("IndexFetcher unable to cleanup unused lucene index files so we must do a full copy instead");
isFullCopyNeeded = true;
break;
}
}
if (c > 0) {
LOG.info("SnapPuller slept for " + (c * 1000) + "ms for unused lucene index files to be delete-able");
LOG.info("IndexFetcher slept for " + (c * 1000) + "ms for unused lucene index files to be delete-able");
}
} finally {
writer.decref();
@ -634,7 +585,7 @@ public class SnapPuller {
props.setProperty(TIMES_CONFIG_REPLICATED, String.valueOf(confFilesCount));
}
props.setProperty(LAST_CYCLE_BYTES_DOWNLOADED, String.valueOf(getTotalBytesDownloaded(this)));
props.setProperty(LAST_CYCLE_BYTES_DOWNLOADED, String.valueOf(getTotalBytesDownloaded()));
if (!successfulInstall) {
int numFailures = 1;
if (props.containsKey(TIMES_FAILED)) {
@ -663,20 +614,20 @@ public class SnapPuller {
}
}
static long getTotalBytesDownloaded(SnapPuller snappuller) {
long getTotalBytesDownloaded() {
long bytesDownloaded = 0;
//get size from list of files to download
for (Map<String, Object> file : snappuller.getFilesDownloaded()) {
for (Map<String, Object> file : getFilesDownloaded()) {
bytesDownloaded += (Long) file.get(SIZE);
}
//get size from list of conf files to download
for (Map<String, Object> file : snappuller.getConfFilesDownloaded()) {
for (Map<String, Object> file : getConfFilesDownloaded()) {
bytesDownloaded += (Long) file.get(SIZE);
}
//get size from current file being downloaded
Map<String, Object> currentFile = snappuller.getCurrentFile();
Map<String, Object> currentFile = getCurrentFile();
if (currentFile != null) {
if (currentFile.containsKey("bytesDownloaded")) {
bytesDownloaded += (Long) currentFile.get("bytesDownloaded");
@ -1053,33 +1004,33 @@ public class SnapPuller {
Directory dir = null;
try {
dir = solrCore.getDirectoryFactory().get(solrCore.getDataDir(), DirContext.META_DATA, solrCore.getSolrConfig().indexConfig.lockType);
if (slowFileExists(dir, SnapPuller.INDEX_PROPERTIES)){
final IndexInput input = dir.openInput(SnapPuller.INDEX_PROPERTIES, DirectoryFactory.IOCONTEXT_NO_CACHE);
if (slowFileExists(dir, IndexFetcher.INDEX_PROPERTIES)){
final IndexInput input = dir.openInput(IndexFetcher.INDEX_PROPERTIES, DirectoryFactory.IOCONTEXT_NO_CACHE);
final InputStream is = new PropertiesInputStream(input);
try {
p.load(new InputStreamReader(is, StandardCharsets.UTF_8));
} catch (Exception e) {
LOG.error("Unable to load " + SnapPuller.INDEX_PROPERTIES, e);
LOG.error("Unable to load " + IndexFetcher.INDEX_PROPERTIES, e);
} finally {
IOUtils.closeQuietly(is);
}
}
try {
dir.deleteFile(SnapPuller.INDEX_PROPERTIES);
dir.deleteFile(IndexFetcher.INDEX_PROPERTIES);
} catch (IOException e) {
// no problem
}
final IndexOutput out = dir.createOutput(SnapPuller.INDEX_PROPERTIES, DirectoryFactory.IOCONTEXT_NO_CACHE);
final IndexOutput out = dir.createOutput(IndexFetcher.INDEX_PROPERTIES, DirectoryFactory.IOCONTEXT_NO_CACHE);
p.put("index", tmpIdxDirName);
Writer os = null;
try {
os = new OutputStreamWriter(new PropertiesOutputStream(out), StandardCharsets.UTF_8);
p.store(os, SnapPuller.INDEX_PROPERTIES);
p.store(os, IndexFetcher.INDEX_PROPERTIES);
dir.sync(Collections.singleton(INDEX_PROPERTIES));
} catch (Exception e) {
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
"Unable to write " + SnapPuller.INDEX_PROPERTIES, e);
"Unable to write " + IndexFetcher.INDEX_PROPERTIES, e);
} finally {
IOUtils.closeQuietly(os);
}
@ -1161,25 +1112,9 @@ public class SnapPuller {
}
/**
* Disable periodic polling
* Stops the ongoing fetch
*/
void disablePoll() {
pollDisabled.set(true);
LOG.info("inside disable poll, value of pollDisabled = " + pollDisabled);
}
/**
* Enable periodic polling
*/
void enablePoll() {
pollDisabled.set(false);
LOG.info("inside enable poll, value of pollDisabled = " + pollDisabled);
}
/**
* Stops the ongoing pull
*/
void abortPull() {
void abortFetch() {
stop = true;
}
@ -1187,6 +1122,13 @@ public class SnapPuller {
return replicationStartTime;
}
long getReplicationTimeElapsed() {
long timeElapsed = 0;
if (getReplicationStartTime() > 0)
timeElapsed = TimeUnit.SECONDS.convert(System.currentTimeMillis() - getReplicationStartTime(), TimeUnit.MILLISECONDS);
return timeElapsed;
}
List<Map<String, Object>> getConfFilesToDownload() {
//make a copy first because it can be null later
List<Map<String, Object>> tmp = confFilesToDownload;
@ -1224,17 +1166,6 @@ public class SnapPuller {
return tmp;
}
boolean isPollingDisabled() {
return pollDisabled.get();
}
Long getNextScheduledExecTime() {
Long nextTime = null;
if (executorStartTime > 0)
nextTime = executorStartTime + pollInterval;
return nextTime;
}
private static class ReplicationHandlerException extends InterruptedException {
public ReplicationHandlerException(String message) {
super(message);
@ -1586,55 +1517,14 @@ public class SnapPuller {
}
}
static Integer readInterval(String interval) {
if (interval == null)
return null;
int result = 0;
Matcher m = INTERVAL_PATTERN.matcher(interval.trim());
if (m.find()) {
String hr = m.group(1);
String min = m.group(2);
String sec = m.group(3);
result = 0;
try {
if (sec != null && sec.length() > 0)
result += Integer.parseInt(sec);
if (min != null && min.length() > 0)
result += (60 * Integer.parseInt(min));
if (hr != null && hr.length() > 0)
result += (60 * 60 * Integer.parseInt(hr));
result *= 1000;
} catch (NumberFormatException e) {
throw new SolrException(ErrorCode.SERVER_ERROR, INTERVAL_ERR_MSG);
}
} else {
throw new SolrException(ErrorCode.SERVER_ERROR, INTERVAL_ERR_MSG);
}
return result;
}
public void destroy() {
try {
if (executorService != null) executorService.shutdown();
} finally {
try {
abortPull();
} finally {
if (executorService != null) ExecutorUtil
.shutdownNowAndAwaitTermination(executorService);
}
}
abortFetch();
}
String getMasterUrl() {
return masterUrl;
}
String getPollInterval() {
return pollIntervalStr;
}
private static final int MAX_RETRIES = 5;
private static final int NO_CONTENT = 1;
@ -1643,12 +1533,6 @@ public class SnapPuller {
public static final String REPLICATION_PROPERTIES = "replication.properties";
public static final String POLL_INTERVAL = "pollInterval";
public static final String INTERVAL_ERR_MSG = "The " + POLL_INTERVAL + " must be in this format 'HH:mm:ss'";
private static final Pattern INTERVAL_PATTERN = Pattern.compile("(\\d*?):(\\d*?):(\\d*)");
static final String INDEX_REPLICATED_AT = "indexReplicatedAt";
static final String TIMES_INDEX_REPLICATED = "timesIndexReplicated";

View File

@ -36,9 +36,13 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantLock;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.zip.Adler32;
import java.util.zip.Checksum;
import java.util.zip.DeflaterOutputStream;
@ -60,6 +64,7 @@ import org.apache.solr.common.SolrException.ErrorCode;
import org.apache.solr.common.params.CommonParams;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.params.SolrParams;
import org.apache.solr.common.util.ExecutorUtil;
import org.apache.solr.common.util.FastOutputStream;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.common.util.SimpleOrderedMap;
@ -75,6 +80,7 @@ import org.apache.solr.request.SolrQueryRequest;
import org.apache.solr.response.SolrQueryResponse;
import org.apache.solr.search.SolrIndexSearcher;
import org.apache.solr.update.SolrIndexWriter;
import org.apache.solr.util.DefaultSolrThreadFactory;
import org.apache.solr.util.NumberUtils;
import org.apache.solr.util.PropertiesInputStream;
import org.apache.solr.util.RefCounted;
@ -90,8 +96,8 @@ import org.slf4j.LoggerFactory;
* file (command=filecontent&amp;file=&lt;FILE_NAME&gt;) You can optionally specify an offset and length to get that
* chunk of the file. You can request a configuration file by using "cf" parameter instead of the "file" parameter.</li>
* <li>Get status/statistics (command=details)</li> </ol> <p>When running on the slave, it provides the following
* commands <ol> <li>Perform a snap pull now (command=snappull)</li> <li>Get status/statistics (command=details)</li>
* <li>Abort a snap pull (command=abort)</li> <li>Enable/Disable polling the master for new versions (command=enablepoll
* commands <ol> <li>Perform an index fetch now (command=snappull)</li> <li>Get status/statistics (command=details)</li>
* <li>Abort an index fetch (command=abort)</li> <li>Enable/Disable polling the master for new versions (command=enablepoll
* or command=disablepoll)</li> </ol>
*
*
@ -134,9 +140,9 @@ public class ReplicationHandler extends RequestHandlerBase implements SolrCoreAw
}
}
private SnapPuller snapPuller;
private IndexFetcher pollingIndexFetcher;
private ReentrantLock snapPullLock = new ReentrantLock();
private ReentrantLock indexFetchLock = new ReentrantLock();
private String includeConfFiles;
@ -151,14 +157,18 @@ public class ReplicationHandler extends RequestHandlerBase implements SolrCoreAw
private boolean replicateOnCommit = false;
private boolean replicateOnStart = false;
private ScheduledExecutorService executorService;
private volatile long executorStartTime;
private int numberBackupsToKeep = 0; //zero: do not delete old backups
private int numTimesReplicated = 0;
private final Map<String, FileInfo> confFileInfoCache = new HashMap<>();
private Integer reserveCommitDuration = SnapPuller.readInterval("00:00:10");
private Integer reserveCommitDuration = readInterval("00:00:10");
volatile IndexCommit indexCommitPoint;
@ -166,6 +176,19 @@ public class ReplicationHandler extends RequestHandlerBase implements SolrCoreAw
private AtomicBoolean replicationEnabled = new AtomicBoolean(true);
private Integer pollInterval;
private String pollIntervalStr;
/**
* Disable the timer task for polling
*/
private AtomicBoolean pollDisabled = new AtomicBoolean(false);
String getPollInterval() {
return pollIntervalStr;
}
@Override
public void handleRequestBody(SolrQueryRequest req, SolrQueryResponse rsp) throws Exception {
rsp.setHttpCaching(false);
@ -221,38 +244,38 @@ public class ReplicationHandler extends RequestHandlerBase implements SolrCoreAw
return;
}
final SolrParams paramsCopy = new ModifiableSolrParams(solrParams);
Thread puller = new Thread("explicit-fetchindex-cmd") {
Thread fetchThread = new Thread("explicit-fetchindex-cmd") {
@Override
public void run() {
doFetch(paramsCopy, false);
}
};
puller.setDaemon(false);
puller.start();
fetchThread.setDaemon(false);
fetchThread.start();
if (solrParams.getBool(WAIT, false)) {
puller.join();
fetchThread.join();
}
rsp.add(STATUS, OK_STATUS);
} else if (command.equalsIgnoreCase(CMD_DISABLE_POLL)) {
if (snapPuller != null){
snapPuller.disablePoll();
if (pollingIndexFetcher != null){
disablePoll();
rsp.add(STATUS, OK_STATUS);
} else {
rsp.add(STATUS, ERR_STATUS);
rsp.add("message","No slave configured");
}
} else if (command.equalsIgnoreCase(CMD_ENABLE_POLL)) {
if (snapPuller != null){
snapPuller.enablePoll();
if (pollingIndexFetcher != null){
enablePoll();
rsp.add(STATUS, OK_STATUS);
}else {
rsp.add(STATUS,ERR_STATUS);
rsp.add("message","No slave configured");
}
} else if (command.equalsIgnoreCase(CMD_ABORT_FETCH)) {
SnapPuller temp = tempSnapPuller;
if (temp != null){
temp.abortPull();
IndexFetcher fetcher = currentIndexFetcher;
if (fetcher != null){
fetcher.abortFetch();
rsp.add(STATUS, OK_STATUS);
} else {
rsp.add(STATUS,ERR_STATUS);
@ -321,38 +344,35 @@ public class ReplicationHandler extends RequestHandlerBase implements SolrCoreAw
return null;
}
private volatile SnapPuller tempSnapPuller;
private volatile IndexFetcher currentIndexFetcher;
public boolean doFetch(SolrParams solrParams, boolean forceReplication) {
String masterUrl = solrParams == null ? null : solrParams.get(MASTER_URL);
if (!snapPullLock.tryLock())
if (!indexFetchLock.tryLock())
return false;
try {
if (masterUrl != null) {
if (tempSnapPuller != null && tempSnapPuller != snapPuller) {
tempSnapPuller.destroy();
if (currentIndexFetcher != null && currentIndexFetcher != pollingIndexFetcher) {
currentIndexFetcher.destroy();
}
NamedList<Object> nl = solrParams.toNamedList();
nl.remove(SnapPuller.POLL_INTERVAL);
tempSnapPuller = new SnapPuller(nl, this, core);
currentIndexFetcher = new IndexFetcher(solrParams.toNamedList(), this, core);
} else {
tempSnapPuller = snapPuller;
currentIndexFetcher = pollingIndexFetcher;
}
return tempSnapPuller.fetchLatestIndex(core, forceReplication);
return currentIndexFetcher.fetchLatestIndex(core, forceReplication);
} catch (Exception e) {
SolrException.log(LOG, "SnapPull failed ", e);
SolrException.log(LOG, "Index fetch failed ", e);
} finally {
if (snapPuller != null) {
tempSnapPuller = snapPuller;
if (pollingIndexFetcher != null) {
currentIndexFetcher = pollingIndexFetcher;
}
snapPullLock.unlock();
indexFetchLock.unlock();
}
return false;
}
boolean isReplicating() {
return snapPullLock.isLocked();
return indexFetchLock.isLocked();
}
private void doSnapShoot(SolrParams params, SolrQueryResponse rsp,
@ -390,10 +410,10 @@ public class ReplicationHandler extends RequestHandlerBase implements SolrCoreAw
/**
* This method adds an Object of FileStream to the response . The FileStream implements a custom protocol which is
* understood by SnapPuller.FileFetcher
* understood by IndexFetcher.FileFetcher
*
* @see org.apache.solr.handler.SnapPuller.LocalFsFileFetcher
* @see org.apache.solr.handler.SnapPuller.DirectoryFileFetcher
* @see IndexFetcher.LocalFsFileFetcher
* @see IndexFetcher.DirectoryFileFetcher
*/
private void getFileStream(SolrParams solrParams, SolrQueryResponse rsp) {
ModifiableSolrParams rawParams = new ModifiableSolrParams(solrParams);
@ -538,18 +558,28 @@ public class ReplicationHandler extends RequestHandlerBase implements SolrCoreAw
}
void disablePoll() {
if (isSlave)
snapPuller.disablePoll();
if (isSlave) {
pollDisabled.set(true);
LOG.info("inside disable poll, value of pollDisabled = " + pollDisabled);
}
}
void enablePoll() {
if (isSlave)
snapPuller.enablePoll();
if (isSlave) {
pollDisabled.set(false);
LOG.info("inside enable poll, value of pollDisabled = " + pollDisabled);
}
}
boolean isPollingDisabled() {
if (snapPuller == null) return true;
return snapPuller.isPollingDisabled();
return pollDisabled.get();
}
Long getNextScheduledExecTime() {
Long nextTime = null;
if (executorStartTime > 0)
nextTime = executorStartTime + pollInterval;
return nextTime;
}
int getTimesReplicatedSinceStartup() {
@ -611,31 +641,31 @@ public class ReplicationHandler extends RequestHandlerBase implements SolrCoreAw
list.add("isMaster", String.valueOf(isMaster));
list.add("isSlave", String.valueOf(isSlave));
SnapPuller snapPuller = tempSnapPuller;
if (snapPuller != null) {
list.add(MASTER_URL, snapPuller.getMasterUrl());
if (snapPuller.getPollInterval() != null) {
list.add(SnapPuller.POLL_INTERVAL, snapPuller.getPollInterval());
IndexFetcher fetcher = currentIndexFetcher;
if (fetcher != null) {
list.add(MASTER_URL, fetcher.getMasterUrl());
if (getPollInterval() != null) {
list.add(POLL_INTERVAL, getPollInterval());
}
list.add("isPollingDisabled", String.valueOf(isPollingDisabled()));
list.add("isReplicating", String.valueOf(isReplicating()));
long elapsed = getTimeElapsed(snapPuller);
long val = SnapPuller.getTotalBytesDownloaded(snapPuller);
long elapsed = fetcher.getReplicationTimeElapsed();
long val = fetcher.getTotalBytesDownloaded();
if (elapsed > 0) {
list.add("timeElapsed", elapsed);
list.add("bytesDownloaded", val);
list.add("downloadSpeed", val / elapsed);
}
Properties props = loadReplicationProperties();
addVal(list, SnapPuller.PREVIOUS_CYCLE_TIME_TAKEN, props, Long.class);
addVal(list, SnapPuller.INDEX_REPLICATED_AT, props, Date.class);
addVal(list, SnapPuller.CONF_FILES_REPLICATED_AT, props, Date.class);
addVal(list, SnapPuller.REPLICATION_FAILED_AT, props, Date.class);
addVal(list, SnapPuller.TIMES_FAILED, props, Integer.class);
addVal(list, SnapPuller.TIMES_INDEX_REPLICATED, props, Integer.class);
addVal(list, SnapPuller.LAST_CYCLE_BYTES_DOWNLOADED, props, Long.class);
addVal(list, SnapPuller.TIMES_CONFIG_REPLICATED, props, Integer.class);
addVal(list, SnapPuller.CONF_FILES_REPLICATED, props, String.class);
addVal(list, IndexFetcher.PREVIOUS_CYCLE_TIME_TAKEN, props, Long.class);
addVal(list, IndexFetcher.INDEX_REPLICATED_AT, props, Date.class);
addVal(list, IndexFetcher.CONF_FILES_REPLICATED_AT, props, Date.class);
addVal(list, IndexFetcher.REPLICATION_FAILED_AT, props, Date.class);
addVal(list, IndexFetcher.TIMES_FAILED, props, Integer.class);
addVal(list, IndexFetcher.TIMES_INDEX_REPLICATED, props, Integer.class);
addVal(list, IndexFetcher.LAST_CYCLE_BYTES_DOWNLOADED, props, Long.class);
addVal(list, IndexFetcher.TIMES_CONFIG_REPLICATED, props, Integer.class);
addVal(list, IndexFetcher.CONF_FILES_REPLICATED, props, String.class);
}
if (isMaster) {
if (includeConfFiles != null) list.add("confFilesToReplicate", includeConfFiles);
@ -677,12 +707,12 @@ public class ReplicationHandler extends RequestHandlerBase implements SolrCoreAw
master.add("replicableGeneration", repCommitInfo.generation);
}
SnapPuller snapPuller = tempSnapPuller;
if (snapPuller != null) {
IndexFetcher fetcher = currentIndexFetcher;
if (fetcher != null) {
Properties props = loadReplicationProperties();
if (showSlaveDetails) {
try {
NamedList nl = snapPuller.getDetails();
NamedList nl = fetcher.getDetails();
slave.add("masterDetails", nl.get(CMD_DETAILS));
} catch (Exception e) {
LOG.warn(
@ -691,26 +721,26 @@ public class ReplicationHandler extends RequestHandlerBase implements SolrCoreAw
slave.add(ERR_STATUS, "invalid_master");
}
}
slave.add(MASTER_URL, snapPuller.getMasterUrl());
if (snapPuller.getPollInterval() != null) {
slave.add(SnapPuller.POLL_INTERVAL, snapPuller.getPollInterval());
slave.add(MASTER_URL, fetcher.getMasterUrl());
if (getPollInterval() != null) {
slave.add(POLL_INTERVAL, getPollInterval());
}
if (snapPuller.getNextScheduledExecTime() != null && !isPollingDisabled()) {
slave.add(NEXT_EXECUTION_AT, new Date(snapPuller.getNextScheduledExecTime()).toString());
if (getNextScheduledExecTime() != null && !isPollingDisabled()) {
slave.add(NEXT_EXECUTION_AT, new Date(getNextScheduledExecTime()).toString());
} else if (isPollingDisabled()) {
slave.add(NEXT_EXECUTION_AT, "Polling disabled");
}
addVal(slave, SnapPuller.INDEX_REPLICATED_AT, props, Date.class);
addVal(slave, SnapPuller.INDEX_REPLICATED_AT_LIST, props, List.class);
addVal(slave, SnapPuller.REPLICATION_FAILED_AT_LIST, props, List.class);
addVal(slave, SnapPuller.TIMES_INDEX_REPLICATED, props, Integer.class);
addVal(slave, SnapPuller.CONF_FILES_REPLICATED, props, Integer.class);
addVal(slave, SnapPuller.TIMES_CONFIG_REPLICATED, props, Integer.class);
addVal(slave, SnapPuller.CONF_FILES_REPLICATED_AT, props, Integer.class);
addVal(slave, SnapPuller.LAST_CYCLE_BYTES_DOWNLOADED, props, Long.class);
addVal(slave, SnapPuller.TIMES_FAILED, props, Integer.class);
addVal(slave, SnapPuller.REPLICATION_FAILED_AT, props, Date.class);
addVal(slave, SnapPuller.PREVIOUS_CYCLE_TIME_TAKEN, props, Long.class);
addVal(slave, IndexFetcher.INDEX_REPLICATED_AT, props, Date.class);
addVal(slave, IndexFetcher.INDEX_REPLICATED_AT_LIST, props, List.class);
addVal(slave, IndexFetcher.REPLICATION_FAILED_AT_LIST, props, List.class);
addVal(slave, IndexFetcher.TIMES_INDEX_REPLICATED, props, Integer.class);
addVal(slave, IndexFetcher.CONF_FILES_REPLICATED, props, Integer.class);
addVal(slave, IndexFetcher.TIMES_CONFIG_REPLICATED, props, Integer.class);
addVal(slave, IndexFetcher.CONF_FILES_REPLICATED_AT, props, Integer.class);
addVal(slave, IndexFetcher.LAST_CYCLE_BYTES_DOWNLOADED, props, Long.class);
addVal(slave, IndexFetcher.TIMES_FAILED, props, Integer.class);
addVal(slave, IndexFetcher.REPLICATION_FAILED_AT, props, Date.class);
addVal(slave, IndexFetcher.PREVIOUS_CYCLE_TIME_TAKEN, props, Long.class);
slave.add("currentDate", new Date().toString());
slave.add("isPollingDisabled", String.valueOf(isPollingDisabled()));
@ -720,13 +750,13 @@ public class ReplicationHandler extends RequestHandlerBase implements SolrCoreAw
try {
long bytesToDownload = 0;
List<String> filesToDownload = new ArrayList<>();
for (Map<String, Object> file : snapPuller.getFilesToDownload()) {
for (Map<String, Object> file : fetcher.getFilesToDownload()) {
filesToDownload.add((String) file.get(NAME));
bytesToDownload += (Long) file.get(SIZE);
}
//get list of conf files to download
for (Map<String, Object> file : snapPuller.getConfFilesToDownload()) {
for (Map<String, Object> file : fetcher.getConfFilesToDownload()) {
filesToDownload.add((String) file.get(NAME));
bytesToDownload += (Long) file.get(SIZE);
}
@ -737,18 +767,18 @@ public class ReplicationHandler extends RequestHandlerBase implements SolrCoreAw
long bytesDownloaded = 0;
List<String> filesDownloaded = new ArrayList<>();
for (Map<String, Object> file : snapPuller.getFilesDownloaded()) {
for (Map<String, Object> file : fetcher.getFilesDownloaded()) {
filesDownloaded.add((String) file.get(NAME));
bytesDownloaded += (Long) file.get(SIZE);
}
//get list of conf files downloaded
for (Map<String, Object> file : snapPuller.getConfFilesDownloaded()) {
for (Map<String, Object> file : fetcher.getConfFilesDownloaded()) {
filesDownloaded.add((String) file.get(NAME));
bytesDownloaded += (Long) file.get(SIZE);
}
Map<String, Object> currentFile = snapPuller.getCurrentFile();
Map<String, Object> currentFile = fetcher.getCurrentFile();
String currFile = null;
long currFileSize = 0, currFileSizeDownloaded = 0;
float percentDownloaded = 0;
@ -767,10 +797,10 @@ public class ReplicationHandler extends RequestHandlerBase implements SolrCoreAw
long estimatedTimeRemaining = 0;
if (snapPuller.getReplicationStartTime() > 0) {
slave.add("replicationStartTime", new Date(snapPuller.getReplicationStartTime()).toString());
if (fetcher.getReplicationStartTime() > 0) {
slave.add("replicationStartTime", new Date(fetcher.getReplicationStartTime()).toString());
}
long elapsed = getTimeElapsed(snapPuller);
long elapsed = fetcher.getReplicationTimeElapsed();
slave.add("timeElapsed", String.valueOf(elapsed) + "s");
if (bytesDownloaded > 0)
@ -840,13 +870,6 @@ public class ReplicationHandler extends RequestHandlerBase implements SolrCoreAw
return replicateAfter;
}
private long getTimeElapsed(SnapPuller snapPuller) {
long timeElapsed = 0;
if (snapPuller.getReplicationStartTime() > 0)
timeElapsed = TimeUnit.SECONDS.convert(System.currentTimeMillis() - snapPuller.getReplicationStartTime(), TimeUnit.MILLISECONDS);
return timeElapsed;
}
Properties loadReplicationProperties() {
Directory dir = null;
try {
@ -856,7 +879,7 @@ public class ReplicationHandler extends RequestHandlerBase implements SolrCoreAw
IndexInput input;
try {
input = dir.openInput(
SnapPuller.REPLICATION_PROPERTIES, IOContext.DEFAULT);
IndexFetcher.REPLICATION_PROPERTIES, IOContext.DEFAULT);
} catch (FileNotFoundException | NoSuchFileException e) {
return new Properties();
}
@ -887,6 +910,37 @@ public class ReplicationHandler extends RequestHandlerBase implements SolrCoreAw
// }
// }
private void setupPolling(String intervalStr) {
pollIntervalStr = intervalStr;
pollInterval = readInterval(pollIntervalStr);
if (pollInterval == null || pollInterval <= 0) {
LOG.info(" No value set for 'pollInterval'. Timer Task not started.");
return;
}
Runnable task = new Runnable() {
@Override
public void run() {
if (pollDisabled.get()) {
LOG.info("Poll disabled");
return;
}
try {
LOG.debug("Polling for index modifications");
executorStartTime = System.currentTimeMillis();
doFetch(null, false);
} catch (Exception e) {
LOG.error("Exception in fetching index", e);
}
}
};
executorService = Executors.newSingleThreadScheduledExecutor(
new DefaultSolrThreadFactory("indexFetcher"));
long initialDelay = pollInterval - (System.currentTimeMillis() % pollInterval);
executorService.scheduleAtFixedRate(task, initialDelay, pollInterval, TimeUnit.MILLISECONDS);
LOG.info("Poll Scheduled at an interval of " + pollInterval + "ms");
}
@Override
@SuppressWarnings("unchecked")
public void inform(SolrCore core) {
@ -901,7 +955,8 @@ public class ReplicationHandler extends RequestHandlerBase implements SolrCoreAw
NamedList slave = (NamedList) initArgs.get("slave");
boolean enableSlave = isEnabled( slave );
if (enableSlave) {
tempSnapPuller = snapPuller = new SnapPuller(slave, this, core);
currentIndexFetcher = pollingIndexFetcher = new IndexFetcher(slave, this, core);
setupPolling((String) slave.get(POLL_INTERVAL));
isSlave = true;
}
NamedList master = (NamedList) initArgs.get("master");
@ -1005,7 +1060,7 @@ public class ReplicationHandler extends RequestHandlerBase implements SolrCoreAw
}
String reserve = (String) master.get(RESERVE);
if (reserve != null && !reserve.trim().equals("")) {
reserveCommitDuration = SnapPuller.readInterval(reserve);
reserveCommitDuration = readInterval(reserve);
}
LOG.info("Commits will be reserved for " + reserveCommitDuration);
isMaster = true;
@ -1029,11 +1084,20 @@ public class ReplicationHandler extends RequestHandlerBase implements SolrCoreAw
core.addCloseHook(new CloseHook() {
@Override
public void preClose(SolrCore core) {
if (snapPuller != null) {
snapPuller.destroy();
try {
if (executorService != null) executorService.shutdown();
} finally {
try {
if (pollingIndexFetcher != null) {
pollingIndexFetcher.destroy();
}
} finally {
if (executorService != null) ExecutorUtil
.shutdownNowAndAwaitTermination(executorService);
}
}
if (tempSnapPuller != null && tempSnapPuller != snapPuller) {
tempSnapPuller.destroy();
if (currentIndexFetcher != null && currentIndexFetcher != pollingIndexFetcher) {
currentIndexFetcher.destroy();
}
}
@ -1307,8 +1371,40 @@ public class ReplicationHandler extends RequestHandlerBase implements SolrCoreAw
releaseCommitPointAndExtendReserve();
}
}
}
}
static Integer readInterval(String interval) {
if (interval == null)
return null;
int result = 0;
if (interval != null) {
Matcher m = INTERVAL_PATTERN.matcher(interval.trim());
if (m.find()) {
String hr = m.group(1);
String min = m.group(2);
String sec = m.group(3);
result = 0;
try {
if (sec != null && sec.length() > 0)
result += Integer.parseInt(sec);
if (min != null && min.length() > 0)
result += (60 * Integer.parseInt(min));
if (hr != null && hr.length() > 0)
result += (60 * 60 * Integer.parseInt(hr));
result *= 1000;
} catch (NumberFormatException e) {
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
INTERVAL_ERR_MSG);
}
} else {
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
INTERVAL_ERR_MSG);
}
}
return result;
}
public static final String MASTER_URL = "masterUrl";
public static final String STATUS = "status";
@ -1369,6 +1465,12 @@ public class ReplicationHandler extends RequestHandlerBase implements SolrCoreAw
public static final String FILE_STREAM = "filestream";
public static final String POLL_INTERVAL = "pollInterval";
public static final String INTERVAL_ERR_MSG = "The " + POLL_INTERVAL + " must be in this format 'HH:mm:ss'";
private static final Pattern INTERVAL_PATTERN = Pattern.compile("(\\d*?):(\\d*?):(\\d*)");
public static final int PACKET_SZ = 1024 * 1024; // 1MB
public static final String RESERVE = "commitReserveDuration";

View File

@ -144,7 +144,7 @@ public class SnapShooter {
details.add("snapshotName", snapshotName);
LOG.info("Done creating backup snapshot: " + (snapshotName == null ? "<not named>" : snapshotName));
} catch (Exception e) {
SnapPuller.delTree(snapShotDir);
IndexFetcher.delTree(snapShotDir);
LOG.error("Exception while creating snapshot", e);
details.add("snapShootException", e.getMessage());
} finally {
@ -170,7 +170,7 @@ public class SnapShooter {
int i=1;
for (OldBackupDirectory dir : dirs) {
if (i++ > numberToKeep) {
SnapPuller.delTree(dir.dir);
IndexFetcher.delTree(dir.dir);
}
}
}
@ -181,7 +181,7 @@ public class SnapShooter {
NamedList<Object> details = new NamedList<>();
boolean isSuccess;
File f = new File(snapDir, "snapshot." + snapshotName);
isSuccess = SnapPuller.delTree(f);
isSuccess = IndexFetcher.delTree(f);
if(isSuccess) {
details.add("status", "success");

View File

@ -25,7 +25,7 @@ log4j.logger.org.apache.solr.hadoop=INFO
#log4j.logger.org.apache.solr.cloud.ChaosMonkey=DEBUG
#log4j.logger.org.apache.solr.update.TransactionLog=DEBUG
#log4j.logger.org.apache.solr.handler.ReplicationHandler=DEBUG
#log4j.logger.org.apache.solr.handler.SnapPuller=DEBUG
#log4j.logger.org.apache.solr.handler.IndexFetcher=DEBUG
#log4j.logger.org.apache.solr.common.cloud.ClusterStateUtil=DEBUG
#log4j.logger.org.apache.solr.cloud.OverseerAutoReplicaFailoverThread=DEBUG

View File

@ -36,7 +36,7 @@ import org.apache.lucene.store.Directory;
import org.apache.lucene.util.IOUtils;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.params.CommonParams;
import org.apache.solr.handler.SnapPuller;
import org.apache.solr.handler.IndexFetcher;
import org.apache.solr.util.AbstractSolrTestCase;
import org.apache.solr.util.TestHarness;
import org.junit.AfterClass;
@ -93,7 +93,7 @@ public class TestArbitraryIndexDir extends AbstractSolrTestCase{
assertU(adoc("id", String.valueOf(1),
"name", "name"+String.valueOf(1)));
//create a new index dir and index.properties file
File idxprops = new File(h.getCore().getDataDir() + SnapPuller.INDEX_PROPERTIES);
File idxprops = new File(h.getCore().getDataDir() + IndexFetcher.INDEX_PROPERTIES);
Properties p = new Properties();
File newDir = new File(h.getCore().getDataDir() + "index_temp");
newDir.mkdirs();
@ -104,7 +104,7 @@ public class TestArbitraryIndexDir extends AbstractSolrTestCase{
p.store(os, "index properties");
} catch (Exception e) {
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
"Unable to write " + SnapPuller.INDEX_PROPERTIES, e);
"Unable to write " + IndexFetcher.INDEX_PROPERTIES, e);
} finally {
IOUtils.closeWhileHandlingException(os);
}

View File

@ -172,17 +172,13 @@ public class TestReplicationHandler extends SolrTestCaseJ4 {
}
NamedList query(String query, SolrClient s) throws SolrServerException, IOException {
NamedList res = new SimpleOrderedMap();
ModifiableSolrParams params = new ModifiableSolrParams();
params.add("q", query);
params.add("sort","id desc");
QueryResponse qres = s.query(params);
res = qres.getResponse();
return res;
return qres.getResponse();
}
/** will sleep up to 30 seconds, looking for expectedDocCount */
@ -304,7 +300,7 @@ public class TestReplicationHandler extends SolrTestCaseJ4 {
assertNotNull("slave has slave section",
details.get("slave"));
// SOLR-2677: assert not false negatives
Object timesFailed = ((NamedList)details.get("slave")).get(SnapPuller.TIMES_FAILED);
Object timesFailed = ((NamedList)details.get("slave")).get(IndexFetcher.TIMES_FAILED);
assertEquals("slave has fetch error count",
null, timesFailed);
@ -513,7 +509,7 @@ public class TestReplicationHandler extends SolrTestCaseJ4 {
slaveClient.close();
slaveClient = createNewSolrClient(slaveJetty.getLocalPort());
//add a doc with new field and commit on master to trigger snappull from slave.
//add a doc with new field and commit on master to trigger index fetch from slave.
index(masterClient, "id", "2000", "name", "name = " + 2000, "newname", "newname = " + 2000);
masterClient.commit();
@ -581,7 +577,7 @@ public class TestReplicationHandler extends SolrTestCaseJ4 {
}
@Test
public void doTestSnapPullWithMasterUrl() throws Exception {
public void doTestIndexFetchWithMasterUrl() throws Exception {
//change solrconfig on slave
//this has no entry for pollinginterval
slave.copyConfigFile(CONF_DIR + "solrconfig-slave1.xml", "solrconfig.xml");
@ -608,7 +604,7 @@ public class TestReplicationHandler extends SolrTestCaseJ4 {
SolrDocumentList masterQueryResult = (SolrDocumentList) masterQueryRsp.get("response");
assertEquals(nDocs, masterQueryResult.getNumFound());
// snappull
// index fetch
String masterUrl = buildUrl(slaveJetty.getLocalPort()) + "/" + DEFAULT_TEST_CORENAME + "/replication?command=fetchindex&masterUrl=";
masterUrl += buildUrl(masterJetty.getLocalPort()) + "/" + DEFAULT_TEST_CORENAME + "/replication";
URL url = new URL(masterUrl);
@ -623,7 +619,7 @@ public class TestReplicationHandler extends SolrTestCaseJ4 {
String cmp = BaseDistributedSearchTestCase.compare(masterQueryResult, slaveQueryResult, 0, null);
assertEquals(null, cmp);
// snappull from the slave to the master
// index fetch from the slave to the master
for (int i = nDocs; i < nDocs + 3; i++)
index(slaveClient, "id", i, "name", "name = " + i);
@ -765,7 +761,7 @@ public class TestReplicationHandler extends SolrTestCaseJ4 {
.get("response");
assertEquals(totalDocs, masterQueryResult.getNumFound());
// snappull
// index fetch
Date slaveCoreStart = watchCoreStartAt(slaveClient, 30*1000, null);
pullFromMasterToSlave();
if (confCoreReload) {
@ -1219,7 +1215,7 @@ public class TestReplicationHandler extends SolrTestCaseJ4 {
// record collection1's start time on slave
final Date slaveStartTime = watchCoreStartAt(slaveClient, 30*1000, null);
//add a doc with new field and commit on master to trigger snappull from slave.
//add a doc with new field and commit on master to trigger index fetch from slave.
index(masterClient, "id", "2000", "name", "name = " + 2000, "newname", "n2000");
masterClient.commit();
rQuery(1, "newname:n2000", masterClient); // sanity check

View File

@ -69,7 +69,7 @@ public class MockDirectoryFactory extends EphemeralDirectoryFactory {
// already been created.
mockDirWrapper.setPreventDoubleWrite(false);
// snappuller & co don't seem ready for this:
// IndexFetcher & co don't seem ready for this:
mockDirWrapper.setEnableVirusScanner(false);
if (allowReadingFilesStillOpenForWrite) {