mirror of https://github.com/apache/lucene.git
SOLR-561: thread safe fixes, commit point reservation fixes
git-svn-id: https://svn.apache.org/repos/asf/lucene/solr/trunk@707683 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
e5371125cb
commit
87b0576d0d
|
@ -19,10 +19,10 @@ import java.util.concurrent.ConcurrentHashMap;
|
|||
* @see org.apache.lucene.index.IndexDeletionPolicy
|
||||
*/
|
||||
public class IndexDeletionPolicyWrapper implements IndexDeletionPolicy {
|
||||
private IndexDeletionPolicy deletionPolicy;
|
||||
private Map<Long, IndexCommit> solrVersionVsCommits = new ConcurrentHashMap<Long, IndexCommit>();
|
||||
private Map<Long, Long> reserves = new ConcurrentHashMap<Long,Long>();
|
||||
private IndexCommit latestCommit;
|
||||
private final IndexDeletionPolicy deletionPolicy;
|
||||
private volatile Map<Long, IndexCommit> solrVersionVsCommits = new ConcurrentHashMap<Long, IndexCommit>();
|
||||
private final Map<Long, Long> reserves = new ConcurrentHashMap<Long,Long>();
|
||||
private volatile IndexCommit latestCommit;
|
||||
|
||||
public IndexDeletionPolicyWrapper(IndexDeletionPolicy deletionPolicy) {
|
||||
this.deletionPolicy = deletionPolicy;
|
||||
|
@ -51,7 +51,20 @@ public class IndexDeletionPolicyWrapper implements IndexDeletionPolicy {
|
|||
* @param reserveTime time in milliseconds for which the commit point is to be reserved
|
||||
*/
|
||||
public void setReserveDuration(Long indexVersion, long reserveTime) {
|
||||
reserves.put(indexVersion, System.currentTimeMillis() + reserveTime);
|
||||
long timeToSet = System.currentTimeMillis() + reserveTime;
|
||||
for(;;) {
|
||||
Long previousTime = reserves.put(indexVersion, timeToSet);
|
||||
|
||||
// this is the common success case: the older time didn't exist, or
|
||||
// came before the new time.
|
||||
if (previousTime == null || previousTime <= timeToSet) break;
|
||||
|
||||
// At this point, we overwrote a longer reservation, so we want to restore the older one.
|
||||
// the problem is that an even longer reservation may come in concurrently
|
||||
// and we don't want to overwrite that one too. We simply keep retrying in a loop
|
||||
// with the maximum time value we have seen.
|
||||
timeToSet = previousTime;
|
||||
}
|
||||
}
|
||||
|
||||
private void cleanReserves() {
|
||||
|
|
|
@ -96,7 +96,7 @@ public class ReplicationHandler extends RequestHandlerBase implements SolrCoreAw
|
|||
|
||||
private Integer reserveCommitDuration = SnapPuller.readInterval("00:00:10");
|
||||
|
||||
private IndexCommit indexCommitPoint;
|
||||
private volatile IndexCommit indexCommitPoint;
|
||||
|
||||
public void handleRequestBody(SolrQueryRequest req, SolrQueryResponse rsp) throws Exception {
|
||||
rsp.setHttpCaching(false);
|
||||
|
@ -107,9 +107,10 @@ public class ReplicationHandler extends RequestHandlerBase implements SolrCoreAw
|
|||
return;
|
||||
}
|
||||
if (command.equals(CMD_INDEX_VERSION)) {
|
||||
if (indexCommitPoint != null) {
|
||||
rsp.add(CMD_INDEX_VERSION, indexCommitPoint.getVersion());
|
||||
rsp.add(GENERATION, indexCommitPoint.getGeneration());
|
||||
IndexCommit commitPoint = indexCommitPoint; // make a copy so it won't change
|
||||
if (commitPoint != null) {
|
||||
rsp.add(CMD_INDEX_VERSION, commitPoint.getVersion());
|
||||
rsp.add(GENERATION, commitPoint.getGeneration());
|
||||
} else {
|
||||
// must never happen
|
||||
rsp.add(CMD_INDEX_VERSION, 0L);
|
||||
|
@ -201,9 +202,8 @@ public class ReplicationHandler extends RequestHandlerBase implements SolrCoreAw
|
|||
void doSnapPull() {
|
||||
if (!isSlave)
|
||||
return;
|
||||
if (snapPullLock.isLocked())
|
||||
if (!snapPullLock.tryLock())
|
||||
return;
|
||||
snapPullLock.lock();
|
||||
try {
|
||||
snapPuller.fetchLatestIndex(core);
|
||||
} catch (Exception e) {
|
||||
|
@ -214,7 +214,6 @@ public class ReplicationHandler extends RequestHandlerBase implements SolrCoreAw
|
|||
}
|
||||
|
||||
boolean isReplicating() {
|
||||
boolean b = snapPullLock.isLocked();
|
||||
return snapPullLock.isLocked();
|
||||
}
|
||||
|
||||
|
@ -445,9 +444,10 @@ public class ReplicationHandler extends RequestHandlerBase implements SolrCoreAw
|
|||
long[] versionAndGeneration = getIndexVersion();
|
||||
details.add(CMD_INDEX_VERSION, versionAndGeneration[0]);
|
||||
details.add(GENERATION, versionAndGeneration[1]);
|
||||
if (isMaster && indexCommitPoint != null) {
|
||||
details.add("replicatable" + CMD_INDEX_VERSION, indexCommitPoint.getVersion());
|
||||
details.add("replicatable" + GENERATION, indexCommitPoint.getGeneration());
|
||||
IndexCommit commit = indexCommitPoint; // make a copy so it won't change
|
||||
if (isMaster && commit != null) {
|
||||
details.add("replicatable" + CMD_INDEX_VERSION, commit.getVersion());
|
||||
details.add("replicatable" + GENERATION, commit.getGeneration());
|
||||
}
|
||||
|
||||
if (isSlave) {
|
||||
|
|
|
@ -58,35 +58,35 @@ import java.util.zip.Checksum;
|
|||
public class SnapPuller {
|
||||
private static final Logger LOG = LoggerFactory.getLogger(SnapPuller.class.getName());
|
||||
|
||||
private String masterUrl;
|
||||
private final String masterUrl;
|
||||
|
||||
private ReplicationHandler replicationHandler;
|
||||
private final ReplicationHandler replicationHandler;
|
||||
|
||||
private Integer pollInterval;
|
||||
private final Integer pollInterval;
|
||||
|
||||
private String pollIntervalStr;
|
||||
|
||||
private ScheduledExecutorService executorService;
|
||||
|
||||
private long executorStartTime;
|
||||
private volatile long executorStartTime;
|
||||
|
||||
private long replicationStartTime;
|
||||
private volatile long replicationStartTime;
|
||||
|
||||
private SolrCore solrCore;
|
||||
private final SolrCore solrCore;
|
||||
|
||||
private List<Map<String, Object>> filesToDownload;
|
||||
private volatile List<Map<String, Object>> filesToDownload;
|
||||
|
||||
private List<Map<String, Object>> confFilesToDownload;
|
||||
private volatile List<Map<String, Object>> confFilesToDownload;
|
||||
|
||||
private List<Map<String, Object>> filesDownloaded;
|
||||
private volatile List<Map<String, Object>> filesDownloaded;
|
||||
|
||||
private List<Map<String, Object>> confFilesDownloaded;
|
||||
private volatile List<Map<String, Object>> confFilesDownloaded;
|
||||
|
||||
private Map<String, Object> currentFile;
|
||||
private volatile Map<String, Object> currentFile;
|
||||
|
||||
private FileFetcher fileFetcher;
|
||||
private volatile FileFetcher fileFetcher;
|
||||
|
||||
private boolean stop = false;
|
||||
private volatile boolean stop = false;
|
||||
|
||||
/**
|
||||
* Disable the timer task for polling
|
||||
|
@ -214,7 +214,7 @@ public class SnapPuller {
|
|||
IndexCommit commit;
|
||||
RefCounted<SolrIndexSearcher> searcherRefCounted = null;
|
||||
try {
|
||||
searcherRefCounted = core.getSearcher();
|
||||
searcherRefCounted = core.getNewestSearcher(false);
|
||||
commit = searcherRefCounted.get().getReader().getIndexCommit();
|
||||
} finally {
|
||||
if (searcherRefCounted != null)
|
||||
|
@ -587,6 +587,7 @@ public class SnapPuller {
|
|||
List<Map<String, Object>> getConfFilesDownloaded() {
|
||||
//make a copy first because it can be null later
|
||||
List<Map<String, Object>> tmp = confFilesDownloaded;
|
||||
// NOTE: it's safe to make a copy of a SynchronizedCollection(ArrayList)
|
||||
return tmp == null ? Collections.EMPTY_LIST : new ArrayList<Map<String, Object>>(tmp);
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue