SOLR-8263: Reverting commit, missed latest patch from Renaud

git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1716233 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Erick Erickson 2015-11-24 18:50:14 +00:00
parent 0f53ac705a
commit d9129621aa
4 changed files with 12 additions and 292 deletions

View File

@ -87,7 +87,6 @@ import org.apache.solr.search.SolrIndexSearcher;
import org.apache.solr.update.CdcrUpdateLog;
import org.apache.solr.update.CommitUpdateCommand;
import org.apache.solr.update.UpdateLog;
import org.apache.solr.update.VersionInfo;
import org.apache.solr.util.DefaultSolrThreadFactory;
import org.apache.solr.util.FileUtils;
import org.apache.solr.util.PropertiesInputStream;
@ -1053,30 +1052,11 @@ public class IndexFetcher {
private boolean moveTlogFiles(File tmpTlogDir) {
UpdateLog ulog = solrCore.getUpdateHandler().getUpdateLog();
VersionInfo vinfo = ulog.getVersionInfo();
vinfo.blockUpdates(); // block updates until the new update log is initialised
try {
// reset the update log before copying the new tlog directory
CdcrUpdateLog.BufferedUpdates bufferedUpdates = ((CdcrUpdateLog) ulog).resetForRecovery();
// try to move the temp tlog files to the tlog directory
if (!copyTmpTlogFiles2Tlog(tmpTlogDir)) return false;
// reinitialise the update log and copy the buffered updates
if (bufferedUpdates.tlog != null) {
// map file path to its new backup location
File parentDir = FileSystems.getDefault().getPath(solrCore.getUpdateHandler().getUpdateLog().getLogDir()).getParent().toFile();
File backupTlogDir = new File(parentDir, tmpTlogDir.getName());
bufferedUpdates.tlog = new File(backupTlogDir, bufferedUpdates.tlog.getName());
}
// init the update log with the new set of tlog files, and copy the buffered updates
((CdcrUpdateLog) ulog).initForRecovery(bufferedUpdates.tlog, bufferedUpdates.offset);
}
catch (Exception e) {
LOG.error("Unable to copy tlog files", e);
return false;
}
finally {
vinfo.unblockUpdates();
}
// reset the update log before copying the new tlog directory, it will be reinitialized
// during the core reload
((CdcrUpdateLog) ulog).reset();
// try to move the temp tlog files to the tlog directory
if (!copyTmpTlogFiles2Tlog(tmpTlogDir)) return false;
return true;
}

View File

@ -20,7 +20,6 @@ package org.apache.solr.update;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Locale;
@ -28,15 +27,8 @@ import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingDeque;
import org.apache.lucene.util.BytesRef;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.core.SolrCore;
import org.apache.solr.request.LocalSolrQueryRequest;
import org.apache.solr.request.SolrQueryRequest;
import org.apache.solr.update.processor.DistributedUpdateProcessor;
import org.apache.solr.update.processor.DistributingUpdateProcessorFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -247,23 +239,13 @@ public class CdcrUpdateLog extends UpdateLog {
}
/**
* expert: Reset the update log before initialisation. This is called by
* {@link org.apache.solr.handler.IndexFetcher#moveTlogFiles(File)} during a
* expert: Reset the update log before initialisation. This is needed by the IndexFetcher during a
* a Recovery operation in order to re-initialise the UpdateLog with a new set of tlog files.
* @see #initForRecovery(File, long)
*/
public BufferedUpdates resetForRecovery() {
synchronized (this) { // since we blocked updates in IndexFetcher, this synchronization shouldn't strictly be necessary.
// If we are buffering, we need to return the related information to the index fetcher
// for properly initialising the new update log - SOLR-8263
BufferedUpdates bufferedUpdates = new BufferedUpdates();
if (state == State.BUFFERING && tlog != null) {
bufferedUpdates.tlog = tlog.tlogFile; // file to keep
bufferedUpdates.offset = this.recoveryInfo.positionOfStart;
}
public void reset() {
synchronized (this) {
// Close readers
for (CdcrLogReader reader : logPointers.keySet()) {
for (CdcrLogReader reader : new ArrayList<>(logPointers.keySet())) {
reader.close();
}
logPointers.clear();
@ -286,163 +268,13 @@ public class CdcrUpdateLog extends UpdateLog {
if (prevMap != null) prevMap.clear();
if (prevMap2 != null) prevMap2.clear();
tlogFiles = null;
numOldRecords = 0;
oldDeletes.clear();
deleteByQueries.clear();
return bufferedUpdates;
}
}
public static class BufferedUpdates {
public File tlog;
public long offset;
}
/**
* <p>
* expert: Initialise the update log with a tlog file containing buffered updates. This is called by
* {@link org.apache.solr.handler.IndexFetcher#moveTlogFiles(File)} during a Recovery operation.
* </p>
*
* This is mainly a copy of the original {@link UpdateLog#init(UpdateHandler, SolrCore)} method, but modified
* to:
* <ul>
* <li>preserve the same {@link VersionInfo} instance in order to not "unblock" updates, since the
* {@link org.apache.solr.handler.IndexFetcher#moveTlogFiles(File)} acquired a write lock from this instance.</li>
* <li>copy the buffered updates.</li>
* </ul>
*
* @see #resetForRecovery()
*/
public void initForRecovery(File bufferedTlog, long offset) {
tlogFiles = getLogList(tlogDir);
id = getLastLogId() + 1; // add 1 since we will create a new log for the next update
if (debug) {
log.debug("UpdateHandler init: tlogDir=" + tlogDir + ", existing tlogs=" + Arrays.asList(tlogFiles) + ", next id=" + id);
}
TransactionLog oldLog = null;
for (String oldLogName : tlogFiles) {
File f = new File(tlogDir, oldLogName);
try {
oldLog = newTransactionLog(f, null, true);
addOldLog(oldLog, false); // don't remove old logs on startup since more than one may be uncapped.
} catch (Exception e) {
SolrException.log(log, "Failure to open existing log file (non fatal) " + f, e);
deleteFile(f);
}
}
// Record first two logs (oldest first) at startup for potential tlog recovery.
// It's possible that at abnormal close both "tlog" and "prevTlog" were uncapped.
for (TransactionLog ll : logs) {
newestLogsOnStartup.addFirst(ll);
if (newestLogsOnStartup.size() >= 2) break;
}
// TODO: these startingVersions assume that we successfully recover from all non-complete tlogs.
UpdateLog.RecentUpdates startingUpdates = getRecentUpdates();
try {
startingVersions = startingUpdates.getVersions(numRecordsToKeep);
startingOperation = startingUpdates.getLatestOperation();
// populate recent deletes list (since we can't get that info from the index)
for (int i=startingUpdates.deleteList.size()-1; i>=0; i--) {
DeleteUpdate du = startingUpdates.deleteList.get(i);
oldDeletes.put(new BytesRef(du.id), new LogPtr(-1,du.version));
}
// populate recent deleteByQuery commands
for (int i=startingUpdates.deleteByQueryList.size()-1; i>=0; i--) {
Update update = startingUpdates.deleteByQueryList.get(i);
List<Object> dbq = (List<Object>) update.log.lookup(update.pointer);
long version = (Long) dbq.get(1);
String q = (String) dbq.get(2);
trackDeleteByQuery(q, version);
}
} finally {
startingUpdates.close();
}
// Copy buffered updates
if (bufferedTlog != null) {
this.copyBufferedUpdates(bufferedTlog, offset);
}
}
/**
* Read the entries from the given tlog file and replay them as buffered updates.
*/
private void copyBufferedUpdates(File tlogSrc, long offsetSrc) {
recoveryInfo = new RecoveryInfo();
recoveryInfo.positionOfStart = tlog == null ? 0 : tlog.snapshot();
state = State.BUFFERING;
operationFlags |= FLAG_GAP;
ModifiableSolrParams params = new ModifiableSolrParams();
params.set(DistributingUpdateProcessorFactory.DISTRIB_UPDATE_PARAM, DistributedUpdateProcessor.DistribPhase.FROMLEADER.toString());
SolrQueryRequest req = new LocalSolrQueryRequest(uhandler.core, params);
CdcrTransactionLog src = new CdcrTransactionLog(tlogSrc, null, true);
TransactionLog.LogReader tlogReader = src.getReader(offsetSrc);
try {
int operationAndFlags = 0;
for (; ; ) {
Object o = tlogReader.next();
if (o == null) break; // we reached the end of the tlog
// should currently be a List<Oper,Ver,Doc/Id>
List entry = (List) o;
operationAndFlags = (Integer) entry.get(0);
int oper = operationAndFlags & OPERATION_MASK;
long version = (Long) entry.get(1);
switch (oper) {
case UpdateLog.ADD: {
SolrInputDocument sdoc = (SolrInputDocument) entry.get(entry.size() - 1);
AddUpdateCommand cmd = new AddUpdateCommand(req);
cmd.solrDoc = sdoc;
cmd.setVersion(version);
cmd.setFlags(UpdateCommand.BUFFERING);
this.add(cmd);
break;
}
case UpdateLog.DELETE: {
byte[] idBytes = (byte[]) entry.get(2);
DeleteUpdateCommand cmd = new DeleteUpdateCommand(req);
cmd.setIndexedId(new BytesRef(idBytes));
cmd.setVersion(version);
cmd.setFlags(UpdateCommand.BUFFERING);
this.delete(cmd);
break;
}
case UpdateLog.DELETE_BY_QUERY: {
String query = (String) entry.get(2);
DeleteUpdateCommand cmd = new DeleteUpdateCommand(req);
cmd.query = query;
cmd.setVersion(version);
cmd.setFlags(UpdateCommand.BUFFERING);
this.deleteByQuery(cmd);
break;
}
default:
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Invalid Operation! " + oper);
}
}
}
catch (Exception e) {
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Unable to copy buffered updates", e);
}
finally {
tlogReader.close();
src.close();
// reset lastDataDir for triggering full #init()
lastDataDir = null;
}
}

View File

@ -257,7 +257,7 @@ public class BaseCdcrDistributedZkTest extends AbstractDistribZkTestBase {
Thread.sleep(500);
}
}
throw new AssertionError("Timeout while trying to assert number of documents @ " + collection, lastAssertionError);
throw new AssertionError("Timeout while trying to assert number of documents on collection: " + collection, lastAssertionError);
} finally {
client.close();
}

View File

@ -17,31 +17,17 @@ package org.apache.solr.cloud;
* limitations under the License.
*/
import com.carrotsearch.randomizedtesting.annotations.Repeat;
import org.apache.lucene.util.LuceneTestCase.Nightly;
import org.apache.solr.client.solrj.SolrClient;
import org.apache.solr.client.solrj.SolrQuery;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.impl.CloudSolrClient;
import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.util.DefaultSolrThreadFactory;
import org.junit.Test;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
/**
* This class is testing the cdcr extension to the {@link org.apache.solr.handler.ReplicationHandler} and
* {@link org.apache.solr.handler.IndexFetcher}.
*/
@Nightly
public class CdcrReplicationHandlerTest extends BaseCdcrDistributedZkTest {
@ -204,84 +190,6 @@ public class CdcrReplicationHandlerTest extends BaseCdcrDistributedZkTest {
this.assertUpdateLogsEquals(SOURCE_COLLECTION, 15);
}
/**
* Test the scenario where the slave is killed while the leader is still receiving updates.
* The slave should buffer updates while in recovery, then replay them at the end of the recovery.
* If updates were properly buffered and replayed, then the slave should have the same number of documents
* than the leader. This checks if cdcr tlog replication interferes with buffered updates - SOLR-8263.
*/
@Test
@ShardsFixed(num = 2)
public void testReplicationWithBufferedUpdates() throws Exception {
List<CloudJettyRunner> slaves = this.getShardToSlaveJetty(SOURCE_COLLECTION, SHARD1);
AtomicInteger numDocs = new AtomicInteger(0);
ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(new DefaultSolrThreadFactory("cdcr-test-update-scheduler"));
executor.scheduleWithFixedDelay(new UpdateThread(numDocs), 10, 10, TimeUnit.MILLISECONDS);
// Restart the slave node to trigger Replication strategy
this.restartServer(slaves.get(0));
// shutdown the update thread and wait for its completion
executor.shutdown();
executor.awaitTermination(500, TimeUnit.MILLISECONDS);
// check that we have the expected number of documents in the cluster
assertNumDocs(numDocs.get(), SOURCE_COLLECTION);
// check that we have the expected number of documents on the slave
assertNumDocs(numDocs.get(), slaves.get(0));
}
private void assertNumDocs(int expectedNumDocs, CloudJettyRunner jetty)
throws InterruptedException, IOException, SolrServerException {
SolrClient client = createNewSolrServer(jetty.url);
try {
int cnt = 30; // timeout after 15 seconds
AssertionError lastAssertionError = null;
while (cnt > 0) {
try {
assertEquals(expectedNumDocs, client.query(new SolrQuery("*:*")).getResults().getNumFound());
return;
}
catch (AssertionError e) {
lastAssertionError = e;
cnt--;
Thread.sleep(500);
}
}
throw new AssertionError("Timeout while trying to assert number of documents @ " + jetty.url, lastAssertionError);
} finally {
client.close();
}
}
private class UpdateThread implements Runnable {
private AtomicInteger numDocs;
private UpdateThread(AtomicInteger numDocs) {
this.numDocs = numDocs;
}
@Override
public void run() {
try {
List<SolrInputDocument> docs = new ArrayList<>();
for (int j = numDocs.get(); j < (numDocs.get() + 10); j++) {
docs.add(getDoc(id, Integer.toString(j)));
}
index(SOURCE_COLLECTION, docs);
numDocs.getAndAdd(10);
log.info("Sent batch of {} updates - numDocs:{}", docs.size(), numDocs);
}
catch (Exception e) {
throw new RuntimeException(e);
}
}
}
private List<CloudJettyRunner> getShardToSlaveJetty(String collection, String shard) {
List<CloudJettyRunner> jetties = new ArrayList<>(shardToJetty.get(collection).get(shard));
CloudJettyRunner leader = shardToLeaderJetty.get(collection).get(shard);