mirror of https://github.com/apache/lucene.git
SOLR-8263: Tlog replication could interfere with the replay of buffered updates
git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1716229 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
d3401ccc66
commit
0f53ac705a
|
@ -87,6 +87,7 @@ import org.apache.solr.search.SolrIndexSearcher;
|
|||
import org.apache.solr.update.CdcrUpdateLog;
|
||||
import org.apache.solr.update.CommitUpdateCommand;
|
||||
import org.apache.solr.update.UpdateLog;
|
||||
import org.apache.solr.update.VersionInfo;
|
||||
import org.apache.solr.util.DefaultSolrThreadFactory;
|
||||
import org.apache.solr.util.FileUtils;
|
||||
import org.apache.solr.util.PropertiesInputStream;
|
||||
|
@ -1052,11 +1053,30 @@ public class IndexFetcher {
|
|||
private boolean moveTlogFiles(File tmpTlogDir) {
|
||||
UpdateLog ulog = solrCore.getUpdateHandler().getUpdateLog();
|
||||
|
||||
// reset the update log before copying the new tlog directory, it will be reinitialized
|
||||
// during the core reload
|
||||
((CdcrUpdateLog) ulog).reset();
|
||||
// try to move the temp tlog files to the tlog directory
|
||||
if (!copyTmpTlogFiles2Tlog(tmpTlogDir)) return false;
|
||||
VersionInfo vinfo = ulog.getVersionInfo();
|
||||
vinfo.blockUpdates(); // block updates until the new update log is initialised
|
||||
try {
|
||||
// reset the update log before copying the new tlog directory
|
||||
CdcrUpdateLog.BufferedUpdates bufferedUpdates = ((CdcrUpdateLog) ulog).resetForRecovery();
|
||||
// try to move the temp tlog files to the tlog directory
|
||||
if (!copyTmpTlogFiles2Tlog(tmpTlogDir)) return false;
|
||||
// reinitialise the update log and copy the buffered updates
|
||||
if (bufferedUpdates.tlog != null) {
|
||||
// map file path to its new backup location
|
||||
File parentDir = FileSystems.getDefault().getPath(solrCore.getUpdateHandler().getUpdateLog().getLogDir()).getParent().toFile();
|
||||
File backupTlogDir = new File(parentDir, tmpTlogDir.getName());
|
||||
bufferedUpdates.tlog = new File(backupTlogDir, bufferedUpdates.tlog.getName());
|
||||
}
|
||||
// init the update log with the new set of tlog files, and copy the buffered updates
|
||||
((CdcrUpdateLog) ulog).initForRecovery(bufferedUpdates.tlog, bufferedUpdates.offset);
|
||||
}
|
||||
catch (Exception e) {
|
||||
LOG.error("Unable to copy tlog files", e);
|
||||
return false;
|
||||
}
|
||||
finally {
|
||||
vinfo.unblockUpdates();
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
|
|
|
@ -20,6 +20,7 @@ package org.apache.solr.update;
|
|||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
import java.util.Locale;
|
||||
|
@ -27,8 +28,15 @@ import java.util.Map;
|
|||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.LinkedBlockingDeque;
|
||||
|
||||
import org.apache.lucene.util.BytesRef;
|
||||
import org.apache.solr.common.SolrException;
|
||||
import org.apache.solr.common.SolrInputDocument;
|
||||
import org.apache.solr.common.params.ModifiableSolrParams;
|
||||
import org.apache.solr.core.SolrCore;
|
||||
import org.apache.solr.request.LocalSolrQueryRequest;
|
||||
import org.apache.solr.request.SolrQueryRequest;
|
||||
import org.apache.solr.update.processor.DistributedUpdateProcessor;
|
||||
import org.apache.solr.update.processor.DistributingUpdateProcessorFactory;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
|
@ -239,13 +247,23 @@ public class CdcrUpdateLog extends UpdateLog {
|
|||
}
|
||||
|
||||
/**
|
||||
* expert: Reset the update log before initialisation. This is needed by the IndexFetcher during a
|
||||
* expert: Reset the update log before initialisation. This is called by
|
||||
* {@link org.apache.solr.handler.IndexFetcher#moveTlogFiles(File)} during a
|
||||
* a Recovery operation in order to re-initialise the UpdateLog with a new set of tlog files.
|
||||
* @see #initForRecovery(File, long)
|
||||
*/
|
||||
public void reset() {
|
||||
synchronized (this) {
|
||||
public BufferedUpdates resetForRecovery() {
|
||||
synchronized (this) { // since we blocked updates in IndexFetcher, this synchronization shouldn't strictly be necessary.
|
||||
// If we are buffering, we need to return the related information to the index fetcher
|
||||
// for properly initialising the new update log - SOLR-8263
|
||||
BufferedUpdates bufferedUpdates = new BufferedUpdates();
|
||||
if (state == State.BUFFERING && tlog != null) {
|
||||
bufferedUpdates.tlog = tlog.tlogFile; // file to keep
|
||||
bufferedUpdates.offset = this.recoveryInfo.positionOfStart;
|
||||
}
|
||||
|
||||
// Close readers
|
||||
for (CdcrLogReader reader : new ArrayList<>(logPointers.keySet())) {
|
||||
for (CdcrLogReader reader : logPointers.keySet()) {
|
||||
reader.close();
|
||||
}
|
||||
logPointers.clear();
|
||||
|
@ -268,13 +286,163 @@ public class CdcrUpdateLog extends UpdateLog {
|
|||
if (prevMap != null) prevMap.clear();
|
||||
if (prevMap2 != null) prevMap2.clear();
|
||||
|
||||
tlogFiles = null;
|
||||
numOldRecords = 0;
|
||||
|
||||
oldDeletes.clear();
|
||||
deleteByQueries.clear();
|
||||
|
||||
// reset lastDataDir for triggering full #init()
|
||||
lastDataDir = null;
|
||||
return bufferedUpdates;
|
||||
}
|
||||
}
|
||||
|
||||
public static class BufferedUpdates {
|
||||
public File tlog;
|
||||
public long offset;
|
||||
}
|
||||
|
||||
/**
|
||||
* <p>
|
||||
* expert: Initialise the update log with a tlog file containing buffered updates. This is called by
|
||||
* {@link org.apache.solr.handler.IndexFetcher#moveTlogFiles(File)} during a Recovery operation.
|
||||
* </p>
|
||||
*
|
||||
* This is mainly a copy of the original {@link UpdateLog#init(UpdateHandler, SolrCore)} method, but modified
|
||||
* to:
|
||||
* <ul>
|
||||
* <li>preserve the same {@link VersionInfo} instance in order to not "unblock" updates, since the
|
||||
* {@link org.apache.solr.handler.IndexFetcher#moveTlogFiles(File)} acquired a write lock from this instance.</li>
|
||||
* <li>copy the buffered updates.</li>
|
||||
* </ul>
|
||||
*
|
||||
* @see #resetForRecovery()
|
||||
*/
|
||||
public void initForRecovery(File bufferedTlog, long offset) {
|
||||
tlogFiles = getLogList(tlogDir);
|
||||
id = getLastLogId() + 1; // add 1 since we will create a new log for the next update
|
||||
|
||||
if (debug) {
|
||||
log.debug("UpdateHandler init: tlogDir=" + tlogDir + ", existing tlogs=" + Arrays.asList(tlogFiles) + ", next id=" + id);
|
||||
}
|
||||
|
||||
TransactionLog oldLog = null;
|
||||
for (String oldLogName : tlogFiles) {
|
||||
File f = new File(tlogDir, oldLogName);
|
||||
try {
|
||||
oldLog = newTransactionLog(f, null, true);
|
||||
addOldLog(oldLog, false); // don't remove old logs on startup since more than one may be uncapped.
|
||||
} catch (Exception e) {
|
||||
SolrException.log(log, "Failure to open existing log file (non fatal) " + f, e);
|
||||
deleteFile(f);
|
||||
}
|
||||
}
|
||||
|
||||
// Record first two logs (oldest first) at startup for potential tlog recovery.
|
||||
// It's possible that at abnormal close both "tlog" and "prevTlog" were uncapped.
|
||||
for (TransactionLog ll : logs) {
|
||||
newestLogsOnStartup.addFirst(ll);
|
||||
if (newestLogsOnStartup.size() >= 2) break;
|
||||
}
|
||||
|
||||
// TODO: these startingVersions assume that we successfully recover from all non-complete tlogs.
|
||||
UpdateLog.RecentUpdates startingUpdates = getRecentUpdates();
|
||||
try {
|
||||
startingVersions = startingUpdates.getVersions(numRecordsToKeep);
|
||||
startingOperation = startingUpdates.getLatestOperation();
|
||||
|
||||
// populate recent deletes list (since we can't get that info from the index)
|
||||
for (int i=startingUpdates.deleteList.size()-1; i>=0; i--) {
|
||||
DeleteUpdate du = startingUpdates.deleteList.get(i);
|
||||
oldDeletes.put(new BytesRef(du.id), new LogPtr(-1,du.version));
|
||||
}
|
||||
|
||||
// populate recent deleteByQuery commands
|
||||
for (int i=startingUpdates.deleteByQueryList.size()-1; i>=0; i--) {
|
||||
Update update = startingUpdates.deleteByQueryList.get(i);
|
||||
List<Object> dbq = (List<Object>) update.log.lookup(update.pointer);
|
||||
long version = (Long) dbq.get(1);
|
||||
String q = (String) dbq.get(2);
|
||||
trackDeleteByQuery(q, version);
|
||||
}
|
||||
|
||||
} finally {
|
||||
startingUpdates.close();
|
||||
}
|
||||
|
||||
// Copy buffered updates
|
||||
if (bufferedTlog != null) {
|
||||
this.copyBufferedUpdates(bufferedTlog, offset);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Read the entries from the given tlog file and replay them as buffered updates.
|
||||
*/
|
||||
private void copyBufferedUpdates(File tlogSrc, long offsetSrc) {
|
||||
recoveryInfo = new RecoveryInfo();
|
||||
recoveryInfo.positionOfStart = tlog == null ? 0 : tlog.snapshot();
|
||||
state = State.BUFFERING;
|
||||
operationFlags |= FLAG_GAP;
|
||||
|
||||
ModifiableSolrParams params = new ModifiableSolrParams();
|
||||
params.set(DistributingUpdateProcessorFactory.DISTRIB_UPDATE_PARAM, DistributedUpdateProcessor.DistribPhase.FROMLEADER.toString());
|
||||
SolrQueryRequest req = new LocalSolrQueryRequest(uhandler.core, params);
|
||||
|
||||
CdcrTransactionLog src = new CdcrTransactionLog(tlogSrc, null, true);
|
||||
TransactionLog.LogReader tlogReader = src.getReader(offsetSrc);
|
||||
try {
|
||||
int operationAndFlags = 0;
|
||||
for (; ; ) {
|
||||
Object o = tlogReader.next();
|
||||
if (o == null) break; // we reached the end of the tlog
|
||||
// should currently be a List<Oper,Ver,Doc/Id>
|
||||
List entry = (List) o;
|
||||
operationAndFlags = (Integer) entry.get(0);
|
||||
int oper = operationAndFlags & OPERATION_MASK;
|
||||
long version = (Long) entry.get(1);
|
||||
|
||||
switch (oper) {
|
||||
case UpdateLog.ADD: {
|
||||
SolrInputDocument sdoc = (SolrInputDocument) entry.get(entry.size() - 1);
|
||||
AddUpdateCommand cmd = new AddUpdateCommand(req);
|
||||
cmd.solrDoc = sdoc;
|
||||
cmd.setVersion(version);
|
||||
cmd.setFlags(UpdateCommand.BUFFERING);
|
||||
this.add(cmd);
|
||||
break;
|
||||
}
|
||||
case UpdateLog.DELETE: {
|
||||
byte[] idBytes = (byte[]) entry.get(2);
|
||||
DeleteUpdateCommand cmd = new DeleteUpdateCommand(req);
|
||||
cmd.setIndexedId(new BytesRef(idBytes));
|
||||
cmd.setVersion(version);
|
||||
cmd.setFlags(UpdateCommand.BUFFERING);
|
||||
this.delete(cmd);
|
||||
break;
|
||||
}
|
||||
|
||||
case UpdateLog.DELETE_BY_QUERY: {
|
||||
String query = (String) entry.get(2);
|
||||
DeleteUpdateCommand cmd = new DeleteUpdateCommand(req);
|
||||
cmd.query = query;
|
||||
cmd.setVersion(version);
|
||||
cmd.setFlags(UpdateCommand.BUFFERING);
|
||||
this.deleteByQuery(cmd);
|
||||
break;
|
||||
}
|
||||
|
||||
default:
|
||||
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Invalid Operation! " + oper);
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
catch (Exception e) {
|
||||
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Unable to copy buffered updates", e);
|
||||
}
|
||||
finally {
|
||||
tlogReader.close();
|
||||
src.close();
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -257,7 +257,7 @@ public class BaseCdcrDistributedZkTest extends AbstractDistribZkTestBase {
|
|||
Thread.sleep(500);
|
||||
}
|
||||
}
|
||||
throw new AssertionError("Timeout while trying to assert number of documents on collection: " + collection, lastAssertionError);
|
||||
throw new AssertionError("Timeout while trying to assert number of documents @ " + collection, lastAssertionError);
|
||||
} finally {
|
||||
client.close();
|
||||
}
|
||||
|
|
|
@ -17,17 +17,31 @@ package org.apache.solr.cloud;
|
|||
* limitations under the License.
|
||||
*/
|
||||
|
||||
import com.carrotsearch.randomizedtesting.annotations.Repeat;
|
||||
import org.apache.lucene.util.LuceneTestCase.Nightly;
|
||||
import org.apache.solr.client.solrj.SolrClient;
|
||||
import org.apache.solr.client.solrj.SolrQuery;
|
||||
import org.apache.solr.client.solrj.SolrServerException;
|
||||
import org.apache.solr.client.solrj.impl.CloudSolrClient;
|
||||
import org.apache.solr.common.SolrInputDocument;
|
||||
import org.apache.solr.util.DefaultSolrThreadFactory;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
/**
|
||||
* This class is testing the cdcr extension to the {@link org.apache.solr.handler.ReplicationHandler} and
|
||||
* {@link org.apache.solr.handler.IndexFetcher}.
|
||||
*/
|
||||
@Nightly
|
||||
public class CdcrReplicationHandlerTest extends BaseCdcrDistributedZkTest {
|
||||
|
||||
|
@ -190,6 +204,84 @@ public class CdcrReplicationHandlerTest extends BaseCdcrDistributedZkTest {
|
|||
this.assertUpdateLogsEquals(SOURCE_COLLECTION, 15);
|
||||
}
|
||||
|
||||
/**
|
||||
* Test the scenario where the slave is killed while the leader is still receiving updates.
|
||||
* The slave should buffer updates while in recovery, then replay them at the end of the recovery.
|
||||
* If updates were properly buffered and replayed, then the slave should have the same number of documents
|
||||
* than the leader. This checks if cdcr tlog replication interferes with buffered updates - SOLR-8263.
|
||||
*/
|
||||
@Test
|
||||
@ShardsFixed(num = 2)
|
||||
public void testReplicationWithBufferedUpdates() throws Exception {
|
||||
List<CloudJettyRunner> slaves = this.getShardToSlaveJetty(SOURCE_COLLECTION, SHARD1);
|
||||
|
||||
AtomicInteger numDocs = new AtomicInteger(0);
|
||||
ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(new DefaultSolrThreadFactory("cdcr-test-update-scheduler"));
|
||||
executor.scheduleWithFixedDelay(new UpdateThread(numDocs), 10, 10, TimeUnit.MILLISECONDS);
|
||||
|
||||
// Restart the slave node to trigger Replication strategy
|
||||
this.restartServer(slaves.get(0));
|
||||
|
||||
// shutdown the update thread and wait for its completion
|
||||
executor.shutdown();
|
||||
executor.awaitTermination(500, TimeUnit.MILLISECONDS);
|
||||
|
||||
// check that we have the expected number of documents in the cluster
|
||||
assertNumDocs(numDocs.get(), SOURCE_COLLECTION);
|
||||
|
||||
// check that we have the expected number of documents on the slave
|
||||
assertNumDocs(numDocs.get(), slaves.get(0));
|
||||
}
|
||||
|
||||
private void assertNumDocs(int expectedNumDocs, CloudJettyRunner jetty)
|
||||
throws InterruptedException, IOException, SolrServerException {
|
||||
SolrClient client = createNewSolrServer(jetty.url);
|
||||
try {
|
||||
int cnt = 30; // timeout after 15 seconds
|
||||
AssertionError lastAssertionError = null;
|
||||
while (cnt > 0) {
|
||||
try {
|
||||
assertEquals(expectedNumDocs, client.query(new SolrQuery("*:*")).getResults().getNumFound());
|
||||
return;
|
||||
}
|
||||
catch (AssertionError e) {
|
||||
lastAssertionError = e;
|
||||
cnt--;
|
||||
Thread.sleep(500);
|
||||
}
|
||||
}
|
||||
throw new AssertionError("Timeout while trying to assert number of documents @ " + jetty.url, lastAssertionError);
|
||||
} finally {
|
||||
client.close();
|
||||
}
|
||||
}
|
||||
|
||||
private class UpdateThread implements Runnable {
|
||||
|
||||
private AtomicInteger numDocs;
|
||||
|
||||
private UpdateThread(AtomicInteger numDocs) {
|
||||
this.numDocs = numDocs;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
List<SolrInputDocument> docs = new ArrayList<>();
|
||||
for (int j = numDocs.get(); j < (numDocs.get() + 10); j++) {
|
||||
docs.add(getDoc(id, Integer.toString(j)));
|
||||
}
|
||||
index(SOURCE_COLLECTION, docs);
|
||||
numDocs.getAndAdd(10);
|
||||
log.info("Sent batch of {} updates - numDocs:{}", docs.size(), numDocs);
|
||||
}
|
||||
catch (Exception e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
private List<CloudJettyRunner> getShardToSlaveJetty(String collection, String shard) {
|
||||
List<CloudJettyRunner> jetties = new ArrayList<>(shardToJetty.get(collection).get(shard));
|
||||
CloudJettyRunner leader = shardToLeaderJetty.get(collection).get(shard);
|
||||
|
|
Loading…
Reference in New Issue