SOLR-8263: Tlog replication could interfere with the replay of buffered updates

git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1718142 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Erick Erickson 2015-12-06 01:29:21 +00:00
parent 8a148abb4e
commit f5bb03e6c6
7 changed files with 372 additions and 37 deletions

View File

@ -88,6 +88,7 @@ import org.apache.solr.search.SolrIndexSearcher;
import org.apache.solr.update.CdcrUpdateLog;
import org.apache.solr.update.CommitUpdateCommand;
import org.apache.solr.update.UpdateLog;
import org.apache.solr.update.VersionInfo;
import org.apache.solr.util.DefaultSolrThreadFactory;
import org.apache.solr.util.FileUtils;
import org.apache.solr.util.PropertiesInputStream;
@ -1046,18 +1047,46 @@ public class IndexFetcher {
}
/**
* <p>
* Copy all the tlog files from the temp tlog dir to the actual tlog dir, and reset
* the {@link UpdateLog}. The copy will try to preserve the original tlog directory
* if the copy fails.
* </p>
* <p>
* This assumes that the tlog files transferred from the leader are in synch with the
* index files transferred from the leader. The reset of the update log relies on the version
* of the latest operations found in the tlog files. If the tlogs are ahead of the latest commit
* point, it will not copy all the needed buffered updates for the replay and it will miss
* some operations.
* </p>
*/
private boolean moveTlogFiles(File tmpTlogDir) {
UpdateLog ulog = solrCore.getUpdateHandler().getUpdateLog();
// reset the update log before copying the new tlog directory, it will be reinitialized
// during the core reload
((CdcrUpdateLog) ulog).reset();
VersionInfo vinfo = ulog.getVersionInfo();
vinfo.blockUpdates(); // block updates until the new update log is initialised
try {
// reset the update log before copying the new tlog directory
CdcrUpdateLog.BufferedUpdates bufferedUpdates = ((CdcrUpdateLog) ulog).resetForRecovery();
// try to move the temp tlog files to the tlog directory
if (!copyTmpTlogFiles2Tlog(tmpTlogDir)) return false;
// reinitialise the update log and copy the buffered updates
if (bufferedUpdates.tlog != null) {
// map file path to its new backup location
File parentDir = FileSystems.getDefault().getPath(solrCore.getUpdateHandler().getUpdateLog().getLogDir()).getParent().toFile();
File backupTlogDir = new File(parentDir, tmpTlogDir.getName());
bufferedUpdates.tlog = new File(backupTlogDir, bufferedUpdates.tlog.getName());
}
// init the update log with the new set of tlog files, and copy the buffered updates
((CdcrUpdateLog) ulog).initForRecovery(bufferedUpdates.tlog, bufferedUpdates.offset);
}
catch (Exception e) {
LOG.error("Unable to copy tlog files", e);
return false;
}
finally {
vinfo.unblockUpdates();
}
return true;
}

View File

@ -42,7 +42,6 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantLock;
@ -60,6 +59,7 @@ import org.apache.lucene.index.IndexDeletionPolicy;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.SegmentCommitInfo;
import org.apache.lucene.index.SegmentInfos;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.IOContext;
import org.apache.lucene.store.IndexInput;
@ -88,6 +88,7 @@ import org.apache.solr.search.SolrIndexSearcher;
import org.apache.solr.update.CdcrUpdateLog;
import org.apache.solr.update.SolrIndexWriter;
import org.apache.solr.update.UpdateLog;
import org.apache.solr.update.VersionInfo;
import org.apache.solr.util.DefaultSolrThreadFactory;
import org.apache.solr.util.NumberUtils;
import org.apache.solr.util.PropertiesInputStream;
@ -544,6 +545,7 @@ public class ReplicationHandler extends RequestHandlerBase implements SolrCoreAw
rsp.add("status", "invalid index generation");
return;
}
// reserve the indexcommit for sometime
core.getDeletionPolicy().setReserveDuration(gen, reserveCommitDuration);
List<Map<String, Object>> result = new ArrayList<>();
@ -553,7 +555,7 @@ public class ReplicationHandler extends RequestHandlerBase implements SolrCoreAw
SegmentInfos infos = SegmentInfos.readCommit(dir, commit.getSegmentsFileName());
for (SegmentCommitInfo commitInfo : infos) {
for (String file : commitInfo.files()) {
Map<String,Object> fileMeta = new HashMap<>();
Map<String, Object> fileMeta = new HashMap<>();
fileMeta.put(NAME, file);
fileMeta.put(SIZE, dir.fileLength(file));
@ -561,7 +563,7 @@ public class ReplicationHandler extends RequestHandlerBase implements SolrCoreAw
try {
long checksum = CodecUtil.retrieveChecksum(in);
fileMeta.put(CHECKSUM, checksum);
} catch(Exception e) {
} catch (Exception e) {
LOG.warn("Could not read checksum from index file: " + file, e);
}
}
@ -572,14 +574,14 @@ public class ReplicationHandler extends RequestHandlerBase implements SolrCoreAw
// add the segments_N file
Map<String,Object> fileMeta = new HashMap<>();
Map<String, Object> fileMeta = new HashMap<>();
fileMeta.put(NAME, infos.getSegmentsFileName());
fileMeta.put(SIZE, dir.fileLength(infos.getSegmentsFileName()));
if (infos.getId() != null) {
try (final IndexInput in = dir.openInput(infos.getSegmentsFileName(), IOContext.READONCE)) {
try {
fileMeta.put(CHECKSUM, CodecUtil.retrieveChecksum(in));
} catch(Exception e) {
} catch (Exception e) {
LOG.warn("Could not read checksum from index file: " + infos.getSegmentsFileName(), e);
}
}
@ -602,10 +604,17 @@ public class ReplicationHandler extends RequestHandlerBase implements SolrCoreAw
// fetch list of tlog files only if cdcr is activated
if (core.getUpdateHandler().getUpdateLog() != null && core.getUpdateHandler().getUpdateLog() instanceof CdcrUpdateLog) {
List<Map<String, Object>> tlogfiles = getTlogFileList();
try {
List<Map<String, Object>> tlogfiles = getTlogFileList(commit);
LOG.info("Adding tlog files to list: " + tlogfiles);
rsp.add(TLOG_FILES, tlogfiles);
}
catch (IOException e) {
rsp.add("status", "unable to get tlog file names for given index generation");
rsp.add(EXCEPTION, e);
LOG.error("Unable to get tlog file names for indexCommit generation: " + gen, e);
}
}
if (confFileNameAlias.size() < 1 || core.getCoreDescriptor().getCoreContainer().isZooKeeperAware())
return;
@ -614,19 +623,39 @@ public class ReplicationHandler extends RequestHandlerBase implements SolrCoreAw
rsp.add(CONF_FILES, getConfFileInfoFromCache(confFileNameAlias, confFileInfoCache));
}
List<Map<String, Object>> getTlogFileList() {
UpdateLog ulog = core.getUpdateHandler().getUpdateLog();
/**
* Retrieves the list of tlog files associated to a commit point.
*/
List<Map<String, Object>> getTlogFileList(IndexCommit commit) throws IOException {
long maxVersion = this.getMaxVersion(commit);
CdcrUpdateLog ulog = (CdcrUpdateLog) core.getUpdateHandler().getUpdateLog();
String[] logList = ulog.getLogList(new File(ulog.getLogDir()));
List<Map<String, Object>> tlogFiles = new ArrayList<>();
for (String fileName : logList) {
// filter out tlogs that are older than the current index commit generation, so that the list of tlog files is
// in synch with the latest index commit point
long startVersion = Math.abs(Long.parseLong(fileName.substring(fileName.lastIndexOf('.') + 1)));
if (startVersion < maxVersion) {
Map<String, Object> fileMeta = new HashMap<>();
fileMeta.put(NAME, fileName);
fileMeta.put(SIZE, new File(ulog.getLogDir(), fileName).length());
tlogFiles.add(fileMeta);
}
}
return tlogFiles;
}
/**
* Retrieves the maximum version number from an index commit.
*/
private long getMaxVersion(IndexCommit commit) throws IOException {
try (DirectoryReader reader = DirectoryReader.open(commit)) {
IndexSearcher searcher = new IndexSearcher(reader);
VersionInfo vinfo = core.getUpdateHandler().getUpdateLog().getVersionInfo();
return Math.abs(vinfo.getMaxVersionFromIndex(searcher));
}
}
/**
* For configuration files, checksum of the file is included because, unlike index files, they may have same content
* but different timestamps.

View File

@ -53,6 +53,7 @@ public class CdcrTransactionLog extends TransactionLog {
super(tlogFile, globalStrings);
// The starting version number will be used to seek more efficiently tlogs
// and to filter out tlog files during replication (in ReplicationHandler#getTlogFileList)
String filename = tlogFile.getName();
startVersion = Math.abs(Long.parseLong(filename.substring(filename.lastIndexOf('.') + 1)));

View File

@ -21,6 +21,7 @@ import java.io.File;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Locale;
@ -28,8 +29,15 @@ import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingDeque;
import org.apache.lucene.util.BytesRef;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.core.SolrCore;
import org.apache.solr.request.LocalSolrQueryRequest;
import org.apache.solr.request.SolrQueryRequest;
import org.apache.solr.update.processor.DistributedUpdateProcessor;
import org.apache.solr.update.processor.DistributingUpdateProcessorFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -240,13 +248,23 @@ public class CdcrUpdateLog extends UpdateLog {
}
/**
* expert: Reset the update log before initialisation. This is needed by the IndexFetcher during a
* expert: Reset the update log before initialisation. This is called by
* {@link org.apache.solr.handler.IndexFetcher#moveTlogFiles(File)} during a
* a Recovery operation in order to re-initialise the UpdateLog with a new set of tlog files.
* @see #initForRecovery(File, long)
*/
public void reset() {
synchronized (this) {
public BufferedUpdates resetForRecovery() {
synchronized (this) { // since we blocked updates in IndexFetcher, this synchronization shouldn't strictly be necessary.
// If we are buffering, we need to return the related information to the index fetcher
// for properly initialising the new update log - SOLR-8263
BufferedUpdates bufferedUpdates = new BufferedUpdates();
if (state == State.BUFFERING && tlog != null) {
bufferedUpdates.tlog = tlog.tlogFile; // file to keep
bufferedUpdates.offset = this.recoveryInfo.positionOfStart;
}
// Close readers
for (CdcrLogReader reader : new ArrayList<>(logPointers.keySet())) {
for (CdcrLogReader reader : logPointers.keySet()) {
reader.close();
}
logPointers.clear();
@ -269,13 +287,176 @@ public class CdcrUpdateLog extends UpdateLog {
if (prevMap != null) prevMap.clear();
if (prevMap2 != null) prevMap2.clear();
tlogFiles = null;
numOldRecords = 0;
oldDeletes.clear();
deleteByQueries.clear();
// reset lastDataDir for triggering full #init()
lastDataDir = null;
return bufferedUpdates;
}
}
public static class BufferedUpdates {
public File tlog;
public long offset;
}
/**
* <p>
* expert: Initialise the update log with a tlog file containing buffered updates. This is called by
* {@link org.apache.solr.handler.IndexFetcher#moveTlogFiles(File)} during a Recovery operation.
* This is mainly a copy of the original {@link UpdateLog#init(UpdateHandler, SolrCore)} method, but modified
* to:
* <ul>
* <li>preserve the same {@link VersionInfo} instance in order to not "unblock" updates, since the
* {@link org.apache.solr.handler.IndexFetcher#moveTlogFiles(File)} acquired a write lock from this instance.</li>
* <li>copy the buffered updates.</li>
* </ul>
* @see #resetForRecovery()
*/
public void initForRecovery(File bufferedTlog, long offset) {
tlogFiles = getLogList(tlogDir);
id = getLastLogId() + 1; // add 1 since we will create a new log for the next update
if (debug) {
log.debug("UpdateHandler init: tlogDir=" + tlogDir + ", existing tlogs=" + Arrays.asList(tlogFiles) + ", next id=" + id);
}
TransactionLog oldLog = null;
for (String oldLogName : tlogFiles) {
File f = new File(tlogDir, oldLogName);
try {
oldLog = newTransactionLog(f, null, true);
addOldLog(oldLog, false); // don't remove old logs on startup since more than one may be uncapped.
} catch (Exception e) {
SolrException.log(log, "Failure to open existing log file (non fatal) " + f, e);
deleteFile(f);
}
}
// Record first two logs (oldest first) at startup for potential tlog recovery.
// It's possible that at abnormal close both "tlog" and "prevTlog" were uncapped.
for (TransactionLog ll : logs) {
newestLogsOnStartup.addFirst(ll);
if (newestLogsOnStartup.size() >= 2) break;
}
// TODO: these startingVersions assume that we successfully recover from all non-complete tlogs.
UpdateLog.RecentUpdates startingUpdates = getRecentUpdates();
long latestVersion = startingUpdates.getMaxRecentVersion();
try {
startingVersions = startingUpdates.getVersions(numRecordsToKeep);
startingOperation = startingUpdates.getLatestOperation();
// populate recent deletes list (since we can't get that info from the index)
for (int i=startingUpdates.deleteList.size()-1; i>=0; i--) {
DeleteUpdate du = startingUpdates.deleteList.get(i);
oldDeletes.put(new BytesRef(du.id), new LogPtr(-1,du.version));
}
// populate recent deleteByQuery commands
for (int i=startingUpdates.deleteByQueryList.size()-1; i>=0; i--) {
Update update = startingUpdates.deleteByQueryList.get(i);
List<Object> dbq = (List<Object>) update.log.lookup(update.pointer);
long version = (Long) dbq.get(1);
String q = (String) dbq.get(2);
trackDeleteByQuery(q, version);
}
} finally {
startingUpdates.close();
}
// Copy buffered updates
if (bufferedTlog != null) {
this.copyBufferedUpdates(bufferedTlog, offset, latestVersion);
}
}
/**
* <p>
* Read the entries from the given tlog file and replay them as buffered updates.
* The buffered tlog that we are trying to copy might contain duplicate operations with the
* current update log. During the tlog replication process, the replica might buffer update operations
* that will be present also in the tlog files downloaded from the leader. In order to remove these
* duplicates, it will skip any operations with a version inferior to the latest know version.
*/
private void copyBufferedUpdates(File tlogSrc, long offsetSrc, long latestVersion) {
recoveryInfo = new RecoveryInfo();
recoveryInfo.positionOfStart = tlog == null ? 0 : tlog.snapshot();
state = State.BUFFERING;
operationFlags |= FLAG_GAP;
ModifiableSolrParams params = new ModifiableSolrParams();
params.set(DistributingUpdateProcessorFactory.DISTRIB_UPDATE_PARAM, DistributedUpdateProcessor.DistribPhase.FROMLEADER.toString());
SolrQueryRequest req = new LocalSolrQueryRequest(uhandler.core, params);
CdcrTransactionLog src = new CdcrTransactionLog(tlogSrc, null, true);
TransactionLog.LogReader tlogReader = src.getReader(offsetSrc);
try {
int operationAndFlags = 0;
for (; ; ) {
Object o = tlogReader.next();
if (o == null) break; // we reached the end of the tlog
// should currently be a List<Oper,Ver,Doc/Id>
List entry = (List) o;
operationAndFlags = (Integer) entry.get(0);
int oper = operationAndFlags & OPERATION_MASK;
long version = (Long) entry.get(1);
if (version <= latestVersion) {
// probably a buffered update that is also present in a tlog file coming from the leader,
// skip it.
log.debug("Dropping buffered operation - version {} < {}", version, latestVersion);
continue;
}
switch (oper) {
case UpdateLog.ADD: {
SolrInputDocument sdoc = (SolrInputDocument) entry.get(entry.size() - 1);
AddUpdateCommand cmd = new AddUpdateCommand(req);
cmd.solrDoc = sdoc;
cmd.setVersion(version);
cmd.setFlags(UpdateCommand.BUFFERING);
this.add(cmd);
break;
}
case UpdateLog.DELETE: {
byte[] idBytes = (byte[]) entry.get(2);
DeleteUpdateCommand cmd = new DeleteUpdateCommand(req);
cmd.setIndexedId(new BytesRef(idBytes));
cmd.setVersion(version);
cmd.setFlags(UpdateCommand.BUFFERING);
this.delete(cmd);
break;
}
case UpdateLog.DELETE_BY_QUERY: {
String query = (String) entry.get(2);
DeleteUpdateCommand cmd = new DeleteUpdateCommand(req);
cmd.query = query;
cmd.setVersion(version);
cmd.setFlags(UpdateCommand.BUFFERING);
this.deleteByQuery(cmd);
break;
}
default:
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Invalid Operation! " + oper);
}
}
}
catch (Exception e) {
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Unable to copy buffered updates", e);
}
finally {
try {
tlogReader.close();
}
finally {
this.doClose(src);
}
}
}

View File

@ -23,15 +23,13 @@ import java.util.Map;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.lucene.index.LeafReader;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.StoredDocument;
import org.apache.lucene.index.SlowCompositeReaderWrapper;
import org.apache.lucene.index.Terms;
import org.apache.lucene.queries.function.FunctionValues;
import org.apache.lucene.queries.function.ValueSource;
import org.apache.lucene.search.Query;
import org.apache.lucene.search.Sort;
import org.apache.lucene.search.SortField;
import org.apache.lucene.search.TopFieldDocs;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.util.BitUtil;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.NumericUtils;
@ -222,7 +220,7 @@ public class VersionInfo {
}
}
public Long getMaxVersionFromIndex(SolrIndexSearcher searcher) throws IOException {
public Long getMaxVersionFromIndex(IndexSearcher searcher) throws IOException {
String versionFieldName = versionField.getName();
@ -231,7 +229,8 @@ public class VersionInfo {
// if indexed, then we have terms to get the max from
if (versionField.indexed()) {
Terms versionTerms = searcher.getLeafReader().terms(versionFieldName);
LeafReader leafReader = SlowCompositeReaderWrapper.wrap(searcher.getIndexReader());
Terms versionTerms = leafReader.terms(versionFieldName);
Long max = (versionTerms != null) ? NumericUtils.getMaxLong(versionTerms) : null;
if (max != null) {
maxVersionInIndex = max.longValue();

View File

@ -262,7 +262,7 @@ public class BaseCdcrDistributedZkTest extends AbstractDistribZkTestBase {
Thread.sleep(500);
}
}
throw new AssertionError("Timeout while trying to assert number of documents on collection: " + collection, lastAssertionError);
throw new AssertionError("Timeout while trying to assert number of documents @ " + collection, lastAssertionError);
} finally {
client.close();
}

View File

@ -18,19 +18,37 @@ package org.apache.solr.cloud;
*/
import org.apache.lucene.util.LuceneTestCase.Nightly;
import org.apache.solr.client.solrj.SolrClient;
import org.apache.solr.client.solrj.SolrQuery;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.impl.CloudSolrClient;
import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.util.DefaultSolrThreadFactory;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
/**
* This class is testing the cdcr extension to the {@link org.apache.solr.handler.ReplicationHandler} and
* {@link org.apache.solr.handler.IndexFetcher}.
*/
@Nightly
public class CdcrReplicationHandlerTest extends BaseCdcrDistributedZkTest {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
@Override
public void distribSetUp() throws Exception {
schemaString = "schema15.xml"; // we need a string id
@ -190,6 +208,84 @@ public class CdcrReplicationHandlerTest extends BaseCdcrDistributedZkTest {
this.assertUpdateLogsEquals(SOURCE_COLLECTION, 15);
}
/**
* Test the scenario where the slave is killed while the leader is still receiving updates.
* The slave should buffer updates while in recovery, then replay them at the end of the recovery.
* If updates were properly buffered and replayed, then the slave should have the same number of documents
* than the leader. This checks if cdcr tlog replication interferes with buffered updates - SOLR-8263.
*/
@Test
@ShardsFixed(num = 2)
public void testReplicationWithBufferedUpdates() throws Exception {
List<CloudJettyRunner> slaves = this.getShardToSlaveJetty(SOURCE_COLLECTION, SHARD1);
AtomicInteger numDocs = new AtomicInteger(0);
ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(new DefaultSolrThreadFactory("cdcr-test-update-scheduler"));
executor.scheduleWithFixedDelay(new UpdateThread(numDocs), 10, 10, TimeUnit.MILLISECONDS);
// Restart the slave node to trigger Replication strategy
this.restartServer(slaves.get(0));
// shutdown the update thread and wait for its completion
executor.shutdown();
executor.awaitTermination(500, TimeUnit.MILLISECONDS);
// check that we have the expected number of documents in the cluster
assertNumDocs(numDocs.get(), SOURCE_COLLECTION);
// check that we have the expected number of documents on the slave
assertNumDocs(numDocs.get(), slaves.get(0));
}
private void assertNumDocs(int expectedNumDocs, CloudJettyRunner jetty)
throws InterruptedException, IOException, SolrServerException {
SolrClient client = createNewSolrServer(jetty.url);
try {
int cnt = 30; // timeout after 15 seconds
AssertionError lastAssertionError = null;
while (cnt > 0) {
try {
assertEquals(expectedNumDocs, client.query(new SolrQuery("*:*")).getResults().getNumFound());
return;
}
catch (AssertionError e) {
lastAssertionError = e;
cnt--;
Thread.sleep(500);
}
}
throw new AssertionError("Timeout while trying to assert number of documents @ " + jetty.url, lastAssertionError);
} finally {
client.close();
}
}
private class UpdateThread implements Runnable {
private AtomicInteger numDocs;
private UpdateThread(AtomicInteger numDocs) {
this.numDocs = numDocs;
}
@Override
public void run() {
try {
List<SolrInputDocument> docs = new ArrayList<>();
for (int j = numDocs.get(); j < (numDocs.get() + 10); j++) {
docs.add(getDoc(id, Integer.toString(j)));
}
index(SOURCE_COLLECTION, docs);
numDocs.getAndAdd(10);
log.info("Sent batch of {} updates - numDocs:{}", docs.size(), numDocs);
}
catch (Exception e) {
throw new RuntimeException(e);
}
}
}
private List<CloudJettyRunner> getShardToSlaveJetty(String collection, String shard) {
List<CloudJettyRunner> jetties = new ArrayList<>(shardToJetty.get(collection).get(shard));
CloudJettyRunner leader = shardToLeaderJetty.get(collection).get(shard);