From ab316bbc91c273b13c851a38ad5d14ef64ab3eec Mon Sep 17 00:00:00 2001 From: Cao Manh Dat Date: Mon, 4 Jun 2018 11:32:31 +0700 Subject: [PATCH] SOLR-9922: Write buffering updates to another tlog --- SOLR-9922.patch | 1294 +++++++++++++++++ solr/CHANGES.txt | 2 + .../apache/solr/cloud/RecoveryStrategy.java | 29 +- .../solr/cloud/ReplicateFromLeader.java | 2 +- .../solr/update/CdcrTransactionLog.java | 20 +- .../org/apache/solr/update/CdcrUpdateLog.java | 3 - .../solr/update/HdfsTransactionLog.java | 18 +- .../org/apache/solr/update/HdfsUpdateLog.java | 84 +- .../apache/solr/update/TransactionLog.java | 56 +- .../org/apache/solr/update/UpdateLog.java | 259 ++-- .../org/apache/solr/search/TestRecovery.java | 58 +- .../apache/solr/search/TestRecoveryHdfs.java | 46 +- .../solr/update/TransactionLogTest.java | 2 +- 13 files changed, 1555 insertions(+), 318 deletions(-) create mode 100644 SOLR-9922.patch diff --git a/SOLR-9922.patch b/SOLR-9922.patch new file mode 100644 index 00000000000..052abf4041a --- /dev/null +++ b/SOLR-9922.patch @@ -0,0 +1,1294 @@ +diff --git a/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java b/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java +index c8f5ae8..966497b 100644 +--- a/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java ++++ b/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java +@@ -449,7 +449,6 @@ public class RecoveryStrategy implements Runnable, Closeable { + + // TODO: perhaps make this grab a new core each time through the loop to handle core reloads? + final public void doSyncOrReplicateRecovery(SolrCore core) throws Exception { +- boolean replayed = false; + boolean successfulRecovery = false; + + UpdateLog ulog; +@@ -500,8 +499,7 @@ public class RecoveryStrategy implements Runnable, Closeable { + // when we went down. We may have received updates since then. + recentVersions = startingVersions; + try { +- if ((ulog.getStartingOperation() & UpdateLog.FLAG_GAP) != 0) { +- // last operation at the time of startup had the GAP flag set... ++ if (ulog.existOldBufferLog()) { + // this means we were previously doing a full index replication + // that probably didn't complete and buffering updates in the + // meantime. +@@ -542,9 +540,9 @@ public class RecoveryStrategy implements Runnable, Closeable { + } + + LOG.info("Begin buffering updates. core=[{}]", coreName); ++ // recalling buffer updates will drop the old buffer tlog + ulog.bufferUpdates(); +- replayed = false; +- ++ + LOG.info("Publishing state of core [{}] as recovering, leader is [{}] and I am [{}]", core.getName(), leader.getCoreUrl(), + ourUrl); + zkController.publish(core.getCoreDescriptor(), Replica.State.RECOVERING); +@@ -603,8 +601,7 @@ public class RecoveryStrategy implements Runnable, Closeable { + + LOG.info("Replaying updates buffered during PeerSync."); + replay(core); +- replayed = true; +- ++ + // sync success + successfulRecovery = true; + return; +@@ -630,8 +627,7 @@ public class RecoveryStrategy implements Runnable, Closeable { + } + + replayFuture = replay(core); +- replayed = true; +- ++ + if (isClosed()) { + LOG.info("RecoveryStrategy has been closed"); + break; +@@ -650,21 +646,6 @@ public class RecoveryStrategy implements Runnable, Closeable { + } catch (Exception e) { + SolrException.log(LOG, "Error while trying to recover. core=" + coreName, e); + } finally { +- if (!replayed) { +- // dropBufferedUpdate()s currently only supports returning to ACTIVE state, which risks additional updates +- // being added w/o UpdateLog.FLAG_GAP, hence losing the info on restart that we are not up-to-date. +- // For now, ulog will simply remain in BUFFERING state, and an additional call to bufferUpdates() will +- // reset our starting point for playback. +- LOG.info("Replay not started, or was not successful... still buffering updates."); +- +- /** this prev code is retained in case we want to switch strategies. +- try { +- ulog.dropBufferedUpdates(); +- } catch (Exception e) { +- SolrException.log(log, "", e); +- } +- **/ +- } + if (successfulRecovery) { + LOG.info("Registering as Active after recovery."); + try { +diff --git a/solr/core/src/java/org/apache/solr/cloud/ReplicateFromLeader.java b/solr/core/src/java/org/apache/solr/cloud/ReplicateFromLeader.java +index 0a742e3..aa648dd 100644 +--- a/solr/core/src/java/org/apache/solr/cloud/ReplicateFromLeader.java ++++ b/solr/core/src/java/org/apache/solr/cloud/ReplicateFromLeader.java +@@ -97,7 +97,7 @@ public class ReplicateFromLeader { + new ModifiableSolrParams()); + CommitUpdateCommand cuc = new CommitUpdateCommand(req, false); + cuc.setVersion(Long.parseLong(commitVersion)); +- updateLog.copyOverOldUpdates(cuc); ++ updateLog.commitAndSwitchToNewTlog(cuc); + lastVersion = Long.parseLong(commitVersion); + } + }); +diff --git a/solr/core/src/java/org/apache/solr/update/CdcrTransactionLog.java b/solr/core/src/java/org/apache/solr/update/CdcrTransactionLog.java +index 3534f62..f668540 100644 +--- a/solr/core/src/java/org/apache/solr/update/CdcrTransactionLog.java ++++ b/solr/core/src/java/org/apache/solr/update/CdcrTransactionLog.java +@@ -41,7 +41,7 @@ import org.slf4j.LoggerFactory; + * methods {@link #incref()}, {@link #close()} and {@link #reopenOutputStream()}. + *
  • encode the number of records in the tlog file in the last commit record. The number of records will be + * decoded and reuse if the tlog file is reopened. This is achieved by extending the constructor, and the +- * methods {@link #writeCommit(CommitUpdateCommand, int)} and {@link #getReader(long)}.
  • ++ * methods {@link #writeCommit(CommitUpdateCommand)} and {@link #getReader(long)}. + * + */ + public class CdcrTransactionLog extends TransactionLog { +@@ -108,7 +108,7 @@ public class CdcrTransactionLog extends TransactionLog { + } + + @Override +- public long write(AddUpdateCommand cmd, long prevPointer, int flags) { ++ public long write(AddUpdateCommand cmd, long prevPointer) { + assert (-1 <= prevPointer && (cmd.isInPlaceUpdate() || (-1 == prevPointer))); + + LogCodec codec = new LogCodec(resolver); +@@ -125,7 +125,7 @@ public class CdcrTransactionLog extends TransactionLog { + codec.init(out); + if (cmd.isInPlaceUpdate()) { + codec.writeTag(JavaBinCodec.ARR, 6); +- codec.writeInt(UpdateLog.UPDATE_INPLACE | flags); // should just take one byte ++ codec.writeInt(UpdateLog.UPDATE_INPLACE); // should just take one byte + codec.writeLong(cmd.getVersion()); + codec.writeLong(prevPointer); + codec.writeLong(cmd.prevVersion); +@@ -141,7 +141,7 @@ public class CdcrTransactionLog extends TransactionLog { + + } else { + codec.writeTag(JavaBinCodec.ARR, 4); +- codec.writeInt(UpdateLog.ADD | flags); // should just take one byte ++ codec.writeInt(UpdateLog.ADD); // should just take one byte + codec.writeLong(cmd.getVersion()); + if (cmd.getReq().getParamString().contains(CdcrUpdateProcessor.CDCR_UPDATE)) { + // if the update is received via cdcr source; add extra boolean entry +@@ -179,7 +179,7 @@ public class CdcrTransactionLog extends TransactionLog { + } + + @Override +- public long writeDelete(DeleteUpdateCommand cmd, int flags) { ++ public long writeDelete(DeleteUpdateCommand cmd) { + LogCodec codec = new LogCodec(resolver); + + try { +@@ -190,7 +190,7 @@ public class CdcrTransactionLog extends TransactionLog { + MemOutputStream out = new MemOutputStream(new byte[20 + br.length]); + codec.init(out); + codec.writeTag(JavaBinCodec.ARR, 4); +- codec.writeInt(UpdateLog.DELETE | flags); // should just take one byte ++ codec.writeInt(UpdateLog.DELETE); // should just take one byte + codec.writeLong(cmd.getVersion()); + codec.writeByteArray(br.bytes, br.offset, br.length); + if (cmd.getReq().getParamString().contains(CdcrUpdateProcessor.CDCR_UPDATE)) { +@@ -217,7 +217,7 @@ public class CdcrTransactionLog extends TransactionLog { + } + + @Override +- public long writeDeleteByQuery(DeleteUpdateCommand cmd, int flags) { ++ public long writeDeleteByQuery(DeleteUpdateCommand cmd) { + LogCodec codec = new LogCodec(resolver); + try { + checkWriteHeader(codec, null); +@@ -225,7 +225,7 @@ public class CdcrTransactionLog extends TransactionLog { + MemOutputStream out = new MemOutputStream(new byte[20 + (cmd.query.length())]); + codec.init(out); + codec.writeTag(JavaBinCodec.ARR, 4); +- codec.writeInt(UpdateLog.DELETE_BY_QUERY | flags); // should just take one byte ++ codec.writeInt(UpdateLog.DELETE_BY_QUERY); // should just take one byte + codec.writeLong(cmd.getVersion()); + codec.writeStr(cmd.query); + if (cmd.getReq().getParamString().contains(CdcrUpdateProcessor.CDCR_UPDATE)) { +@@ -249,7 +249,7 @@ public class CdcrTransactionLog extends TransactionLog { + } + + @Override +- public long writeCommit(CommitUpdateCommand cmd, int flags) { ++ public long writeCommit(CommitUpdateCommand cmd) { + LogCodec codec = new LogCodec(resolver); + synchronized (this) { + try { +@@ -261,7 +261,7 @@ public class CdcrTransactionLog extends TransactionLog { + } + codec.init(fos); + codec.writeTag(JavaBinCodec.ARR, 4); +- codec.writeInt(UpdateLog.COMMIT | flags); // should just take one byte ++ codec.writeInt(UpdateLog.COMMIT); // should just take one byte + codec.writeLong(cmd.getVersion()); + codec.writeTag(JavaBinCodec.INT); // Enforce the encoding of a plain integer, to simplify decoding + fos.writeInt(numRecords + 1); // the number of records in the file - +1 to account for the commit operation being written +diff --git a/solr/core/src/java/org/apache/solr/update/CdcrUpdateLog.java b/solr/core/src/java/org/apache/solr/update/CdcrUpdateLog.java +index 6b20204..bff1612 100644 +--- a/solr/core/src/java/org/apache/solr/update/CdcrUpdateLog.java ++++ b/solr/core/src/java/org/apache/solr/update/CdcrUpdateLog.java +@@ -352,7 +352,6 @@ public class CdcrUpdateLog extends UpdateLog { + long latestVersion = startingUpdates.getMaxRecentVersion(); + try { + startingVersions = startingUpdates.getVersions(numRecordsToKeep); +- startingOperation = startingUpdates.getLatestOperation(); + + // populate recent deletes list (since we can't get that info from the index) + for (int i=startingUpdates.deleteList.size()-1; i>=0; i--) { +@@ -389,9 +388,7 @@ public class CdcrUpdateLog extends UpdateLog { + */ + private void copyBufferedUpdates(File tlogSrc, long offsetSrc, long latestVersion) { + recoveryInfo = new RecoveryInfo(); +- recoveryInfo.positionOfStart = tlog == null ? 0 : tlog.snapshot(); + state = State.BUFFERING; +- operationFlags |= FLAG_GAP; + + ModifiableSolrParams params = new ModifiableSolrParams(); + params.set(DistributingUpdateProcessorFactory.DISTRIB_UPDATE_PARAM, DistributedUpdateProcessor.DistribPhase.FROMLEADER.toString()); +diff --git a/solr/core/src/java/org/apache/solr/update/HdfsTransactionLog.java b/solr/core/src/java/org/apache/solr/update/HdfsTransactionLog.java +index 0f89016..8ed7d7a 100644 +--- a/solr/core/src/java/org/apache/solr/update/HdfsTransactionLog.java ++++ b/solr/core/src/java/org/apache/solr/update/HdfsTransactionLog.java +@@ -166,20 +166,6 @@ public class HdfsTransactionLog extends TransactionLog { + } + return true; + } +- +- // This could mess with any readers or reverse readers that are open, or anything that might try to do a log lookup. +- // This should only be used to roll back buffered updates, not actually applied updates. +- @Override +- public void rollback(long pos) throws IOException { +- synchronized (this) { +- assert snapshot_size == pos; +- ensureFlushed(); +- // TODO: how do we rollback with hdfs?? We need HDFS-3107 +- fos.setWritten(pos); +- assert fos.size() == pos; +- numRecords = snapshot_numRecords; +- } +- } + + private void readHeader(FastInputStream fis) throws IOException { + // read existing header +@@ -210,7 +196,7 @@ public class HdfsTransactionLog extends TransactionLog { + } + + @Override +- public long writeCommit(CommitUpdateCommand cmd, int flags) { ++ public long writeCommit(CommitUpdateCommand cmd) { + LogCodec codec = new LogCodec(resolver); + synchronized (this) { + try { +@@ -223,7 +209,7 @@ public class HdfsTransactionLog extends TransactionLog { + + codec.init(fos); + codec.writeTag(JavaBinCodec.ARR, 3); +- codec.writeInt(UpdateLog.COMMIT | flags); // should just take one byte ++ codec.writeInt(UpdateLog.COMMIT); // should just take one byte + codec.writeLong(cmd.getVersion()); + codec.writeStr(END_MESSAGE); // ensure these bytes are (almost) last in the file + +diff --git a/solr/core/src/java/org/apache/solr/update/HdfsUpdateLog.java b/solr/core/src/java/org/apache/solr/update/HdfsUpdateLog.java +index 7bb74d0..8ca4b1c 100644 +--- a/solr/core/src/java/org/apache/solr/update/HdfsUpdateLog.java ++++ b/solr/core/src/java/org/apache/solr/update/HdfsUpdateLog.java +@@ -65,37 +65,6 @@ public class HdfsUpdateLog extends UpdateLog { + this.confDir = confDir; + } + +- // HACK +- // while waiting for HDFS-3107, instead of quickly +- // dropping, we slowly apply +- // This is somewhat brittle, but current usage +- // allows for it +- @Override +- public boolean dropBufferedUpdates() { +- versionInfo.blockUpdates(); +- try { +- if (state != State.BUFFERING) return false; +- +- if (log.isInfoEnabled()) { +- log.info("Dropping buffered updates " + this); +- } +- +- // since we blocked updates, this synchronization shouldn't strictly be +- // necessary. +- synchronized (this) { +- if (tlog != null) { +- // tlog.rollback(recoveryInfo.positionOfStart); +- } +- } +- +- state = State.ACTIVE; +- operationFlags &= ~FLAG_GAP; +- } finally { +- versionInfo.unblockUpdates(); +- } +- return true; +- } +- + @Override + public void init(PluginInfo info) { + super.init(info); +@@ -186,6 +155,11 @@ public class HdfsUpdateLog extends UpdateLog { + throw new RuntimeException("Problem creating directory: " + tlogDir, e); + } + } ++ ++ String[] oldBufferTlog = getBufferLogList(fs, tlogDir); ++ if (oldBufferTlog != null && oldBufferTlog.length != 0) { ++ existOldBufferLog = true; ++ } + + tlogFiles = getLogList(fs, tlogDir); + id = getLastLogId() + 1; // add 1 since we will create a new log for the +@@ -241,7 +215,6 @@ public class HdfsUpdateLog extends UpdateLog { + // non-complete tlogs. + try (RecentUpdates startingUpdates = getRecentUpdates()) { + startingVersions = startingUpdates.getVersions(getNumRecordsToKeep()); +- startingOperation = startingUpdates.getLatestOperation(); + + // populate recent deletes list (since we can't get that info from the + // index) +@@ -269,6 +242,23 @@ public class HdfsUpdateLog extends UpdateLog { + public String getLogDir() { + return tlogDir.toUri().toString(); + } ++ ++ public static String[] getBufferLogList(FileSystem fs, Path tlogDir) { ++ final String prefix = BUFFER_TLOG_NAME+'.'; ++ assert fs != null; ++ FileStatus[] fileStatuses; ++ try { ++ fileStatuses = fs.listStatus(tlogDir, path -> path.getName().startsWith(prefix)); ++ } catch (IOException e) { ++ throw new SolrException(ErrorCode.SERVER_ERROR, "Failed on listing old buffer tlog", e); ++ } ++ ++ String[] names = new String[fileStatuses.length]; ++ for (int i = 0; i < fileStatuses.length; i++) { ++ names[i] = fileStatuses[i].getPath().getName(); ++ } ++ return names; ++ } + + public static String[] getLogList(FileSystem fs, Path tlogDir) { + final String prefix = TLOG_NAME + '.'; +@@ -307,7 +297,35 @@ public class HdfsUpdateLog extends UpdateLog { + IOUtils.closeQuietly(fs); + } + } +- ++ ++ @Override ++ protected void ensureBufferTlog() { ++ if (bufferTlog != null) return; ++ String newLogName = String.format(Locale.ROOT, LOG_FILENAME_PATTERN, BUFFER_TLOG_NAME, System.nanoTime()); ++ bufferTlog = new HdfsTransactionLog(fs, new Path(tlogDir, newLogName), ++ globalStrings, tlogDfsReplication); ++ } ++ ++ @Override ++ protected void deleteBufferLogs() { ++ // Delete old buffer logs ++ String[] oldBufferTlog = getBufferLogList(fs, tlogDir); ++ if (oldBufferTlog != null && oldBufferTlog.length != 0) { ++ for (String oldBufferLogName : oldBufferTlog) { ++ Path f = new Path(tlogDir, oldBufferLogName); ++ try { ++ boolean s = fs.delete(f, false); ++ if (!s) { ++ log.error("Could not remove old buffer tlog file:" + f); ++ } ++ } catch (IOException e) { ++ // No need to bubble up this exception, because it won't cause any problems on recovering ++ log.error("Could not remove old buffer tlog file:" + f, e); ++ } ++ } ++ } ++ } ++ + @Override + protected void ensureLog() { + if (tlog == null) { +diff --git a/solr/core/src/java/org/apache/solr/update/TransactionLog.java b/solr/core/src/java/org/apache/solr/update/TransactionLog.java +index 96a928c..2a23896 100644 +--- a/solr/core/src/java/org/apache/solr/update/TransactionLog.java ++++ b/solr/core/src/java/org/apache/solr/update/TransactionLog.java +@@ -85,9 +85,6 @@ public class TransactionLog implements Closeable { + Map globalStringMap = new HashMap<>(); + List globalStringList = new ArrayList<>(); + +- long snapshot_size; +- int snapshot_numRecords; +- + // write a BytesRef as a byte array + static final JavaBinCodec.ObjectResolver resolver = new JavaBinCodec.ObjectResolver() { + @Override +@@ -153,7 +150,7 @@ public class TransactionLog implements Closeable { + + // Parse tlog id from the filename + String filename = tlogFile.getName(); +- id = Long.parseLong(filename.substring(filename.indexOf('.') + 1, filename.indexOf('.') + 20)); ++ id = Long.parseLong(filename.substring(filename.lastIndexOf('.')+1)); + + this.tlogFile = tlogFile; + raf = new RandomAccessFile(this.tlogFile, "rw"); +@@ -233,29 +230,6 @@ public class TransactionLog implements Closeable { + return true; + } + +- /** takes a snapshot of the current position and number of records +- * for later possible rollback, and returns the position */ +- public long snapshot() { +- synchronized (this) { +- snapshot_size = fos.size(); +- snapshot_numRecords = numRecords; +- return snapshot_size; +- } +- } +- +- // This could mess with any readers or reverse readers that are open, or anything that might try to do a log lookup. +- // This should only be used to roll back buffered updates, not actually applied updates. +- public void rollback(long pos) throws IOException { +- synchronized (this) { +- assert snapshot_size == pos; +- fos.flush(); +- raf.setLength(pos); +- fos.setWritten(pos); +- assert fos.size() == pos; +- numRecords = snapshot_numRecords; +- } +- } +- + public long writeData(Object o) { + @SuppressWarnings("resource") final LogCodec codec = new LogCodec(resolver); + try { +@@ -346,17 +320,16 @@ public class TransactionLog implements Closeable { + + /** + * Writes an add update command to the transaction log. This is not applicable for +- * in-place updates; use {@link #write(AddUpdateCommand, long, int)}. ++ * in-place updates; use {@link #write(AddUpdateCommand, long)}. + * (The previous pointer (applicable for in-place updates) is set to -1 while writing + * the command to the transaction log.) + * @param cmd The add update command to be written +- * @param flags Options for writing the command to the transaction log + * @return Returns the position pointer of the written update command + * +- * @see #write(AddUpdateCommand, long, int) ++ * @see #write(AddUpdateCommand, long) + */ +- public long write(AddUpdateCommand cmd, int flags) { +- return write(cmd, -1, flags); ++ public long write(AddUpdateCommand cmd) { ++ return write(cmd, -1); + } + + /** +@@ -365,10 +338,9 @@ public class TransactionLog implements Closeable { + * @param cmd The add update command to be written + * @param prevPointer The pointer in the transaction log which this update depends + * on (applicable for in-place updates) +- * @param flags Options for writing the command to the transaction log + * @return Returns the position pointer of the written update command + */ +- public long write(AddUpdateCommand cmd, long prevPointer, int flags) { ++ public long write(AddUpdateCommand cmd, long prevPointer) { + assert (-1 <= prevPointer && (cmd.isInPlaceUpdate() || (-1 == prevPointer))); + + LogCodec codec = new LogCodec(resolver); +@@ -386,14 +358,14 @@ public class TransactionLog implements Closeable { + codec.init(out); + if (cmd.isInPlaceUpdate()) { + codec.writeTag(JavaBinCodec.ARR, 5); +- codec.writeInt(UpdateLog.UPDATE_INPLACE | flags); // should just take one byte ++ codec.writeInt(UpdateLog.UPDATE_INPLACE); // should just take one byte + codec.writeLong(cmd.getVersion()); + codec.writeLong(prevPointer); + codec.writeLong(cmd.prevVersion); + codec.writeSolrInputDocument(cmd.getSolrInputDocument()); + } else { + codec.writeTag(JavaBinCodec.ARR, 3); +- codec.writeInt(UpdateLog.ADD | flags); // should just take one byte ++ codec.writeInt(UpdateLog.ADD); // should just take one byte + codec.writeLong(cmd.getVersion()); + codec.writeSolrInputDocument(cmd.getSolrInputDocument()); + } +@@ -422,7 +394,7 @@ public class TransactionLog implements Closeable { + } + } + +- public long writeDelete(DeleteUpdateCommand cmd, int flags) { ++ public long writeDelete(DeleteUpdateCommand cmd) { + LogCodec codec = new LogCodec(resolver); + + try { +@@ -433,7 +405,7 @@ public class TransactionLog implements Closeable { + MemOutputStream out = new MemOutputStream(new byte[20 + br.length]); + codec.init(out); + codec.writeTag(JavaBinCodec.ARR, 3); +- codec.writeInt(UpdateLog.DELETE | flags); // should just take one byte ++ codec.writeInt(UpdateLog.DELETE); // should just take one byte + codec.writeLong(cmd.getVersion()); + codec.writeByteArray(br.bytes, br.offset, br.length); + +@@ -452,7 +424,7 @@ public class TransactionLog implements Closeable { + + } + +- public long writeDeleteByQuery(DeleteUpdateCommand cmd, int flags) { ++ public long writeDeleteByQuery(DeleteUpdateCommand cmd) { + LogCodec codec = new LogCodec(resolver); + try { + checkWriteHeader(codec, null); +@@ -460,7 +432,7 @@ public class TransactionLog implements Closeable { + MemOutputStream out = new MemOutputStream(new byte[20 + (cmd.query.length())]); + codec.init(out); + codec.writeTag(JavaBinCodec.ARR, 3); +- codec.writeInt(UpdateLog.DELETE_BY_QUERY | flags); // should just take one byte ++ codec.writeInt(UpdateLog.DELETE_BY_QUERY); // should just take one byte + codec.writeLong(cmd.getVersion()); + codec.writeStr(cmd.query); + +@@ -478,7 +450,7 @@ public class TransactionLog implements Closeable { + } + + +- public long writeCommit(CommitUpdateCommand cmd, int flags) { ++ public long writeCommit(CommitUpdateCommand cmd) { + LogCodec codec = new LogCodec(resolver); + synchronized (this) { + try { +@@ -490,7 +462,7 @@ public class TransactionLog implements Closeable { + } + codec.init(fos); + codec.writeTag(JavaBinCodec.ARR, 3); +- codec.writeInt(UpdateLog.COMMIT | flags); // should just take one byte ++ codec.writeInt(UpdateLog.COMMIT); // should just take one byte + codec.writeLong(cmd.getVersion()); + codec.writeStr(END_MESSAGE); // ensure these bytes are (almost) last in the file + +diff --git a/solr/core/src/java/org/apache/solr/update/UpdateLog.java b/solr/core/src/java/org/apache/solr/update/UpdateLog.java +index 7f821ea..1bda23f 100644 +--- a/solr/core/src/java/org/apache/solr/update/UpdateLog.java ++++ b/solr/core/src/java/org/apache/solr/update/UpdateLog.java +@@ -96,6 +96,7 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer { + private static final long STATUS_TIME = TimeUnit.NANOSECONDS.convert(60, TimeUnit.SECONDS); + public static String LOG_FILENAME_PATTERN = "%s.%019d"; + public static String TLOG_NAME="tlog"; ++ public static String BUFFER_TLOG_NAME="buffer.tlog"; + + private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + private boolean debug = log.isDebugEnabled(); +@@ -139,11 +140,7 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer { + public static final int DELETE_BY_QUERY = 0x03; + public static final int COMMIT = 0x04; + public static final int UPDATE_INPLACE = 0x08; +- // Flag indicating that this is a buffered operation, and that a gap exists before buffering started. +- // for example, if full index replication starts and we are buffering updates, then this flag should +- // be set to indicate that replaying the log would not bring us into sync (i.e. peersync should +- // fail if this flag is set on the last update in the tlog). +- public static final int FLAG_GAP = 0x10; ++ // For backward-compatibility, we should delete this field in 9.0 + public static final int OPERATION_MASK = 0x0f; // mask off flags to get the operation + + /** +@@ -186,8 +183,8 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer { + + long id = -1; + protected State state = State.ACTIVE; +- protected int operationFlags; // flags to write in the transaction log with operations (i.e. FLAG_GAP) + ++ protected TransactionLog bufferTlog; + protected TransactionLog tlog; + protected TransactionLog prevTlog; + protected final Deque logs = new LinkedList<>(); // list of recent logs, newest first +@@ -206,6 +203,7 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer { + protected int maxNumLogsToKeep; + protected int numVersionBuckets; // This should only be used to initialize VersionInfo... the actual number of buckets may be rounded up to a power of two. + protected Long maxVersionFromIndex = null; ++ protected boolean existOldBufferLog = false; + + // keep track of deletes only... this is not updated on an add + protected LinkedHashMap oldDeletes = new LinkedHashMap(numDeletesToKeep) { +@@ -244,7 +242,6 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer { + volatile UpdateHandler uhandler; // a core reload can change this reference! + protected volatile boolean cancelApplyBufferUpdate; + List startingVersions; +- int startingOperation; // last operation in the logs on startup + + // metrics + protected Gauge bufferedOpsGauge; +@@ -378,6 +375,10 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer { + log.debug("UpdateHandler init: tlogDir=" + tlogDir + ", existing tlogs=" + Arrays.asList(tlogFiles) + ", next id=" + id); + } + ++ String[] oldBufferTlog = getBufferLogList(tlogDir); ++ if (oldBufferTlog != null && oldBufferTlog.length != 0) { ++ existOldBufferLog = true; ++ } + TransactionLog oldLog = null; + for (String oldLogName : tlogFiles) { + File f = new File(tlogDir, oldLogName); +@@ -408,7 +409,6 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer { + // TODO: these startingVersions assume that we successfully recover from all non-complete tlogs. + try (RecentUpdates startingUpdates = getRecentUpdates()) { + startingVersions = startingUpdates.getVersions(numRecordsToKeep); +- startingOperation = startingUpdates.getLatestOperation(); + + // populate recent deletes list (since we can't get that info from the index) + for (int i = startingUpdates.deleteList.size() - 1; i >= 0; i--) { +@@ -434,14 +434,16 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer { + this.metricManager = manager; + this.registryName = registry; + bufferedOpsGauge = () -> { ++ if (state == State.BUFFERING) { ++ if (bufferTlog == null) return 0; ++ // numRecords counts header as a record ++ return bufferTlog.numRecords() - 1; ++ } + if (tlog == null) { + return 0; + } else if (state == State.APPLYING_BUFFERED) { + // numRecords counts header as a record + return tlog.numRecords() - 1 - recoveryInfo.adds - recoveryInfo.deleteByQuery - recoveryInfo.deletes - recoveryInfo.errors; +- } else if (state == State.BUFFERING) { +- // numRecords counts header as a record +- return tlog.numRecords() - 1; + } else { + return 0; + } +@@ -472,8 +474,8 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer { + return startingVersions; + } + +- public int getStartingOperation() { +- return startingOperation; ++ public boolean existOldBufferLog() { ++ return existOldBufferLog; + } + + /* Takes over ownership of the log, keeping it until no longer needed +@@ -509,6 +511,19 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer { + logs.addFirst(oldLog); + } + ++ public String[] getBufferLogList(File directory) { ++ final String prefix = BUFFER_TLOG_NAME+'.'; ++ return directory.list((dir, name) -> name.startsWith(prefix)); ++ } ++ ++ /** ++ * Does update from old tlogs (not from buffer tlog)? ++ * If yes we must skip writing {@code cmd} to current tlog ++ */ ++ private boolean updateFromOldTlogs(UpdateCommand cmd) { ++ return (cmd.getFlags() & UpdateCommand.REPLAY) != 0 && state == State.REPLAYING; ++ } ++ + public String[] getLogList(File directory) { + final String prefix = TLOG_NAME+'.'; + String[] names = directory.list(new FilenameFilter() { +@@ -541,14 +556,19 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer { + // if ((cmd.getFlags() & UpdateCommand.REPLAY) != 0) return; + + synchronized (this) { +- long pos = -1; ++ if ((cmd.getFlags() & UpdateCommand.BUFFERING) != 0) { ++ ensureBufferTlog(); ++ bufferTlog.write(cmd); ++ return; ++ } + ++ long pos = -1; + long prevPointer = getPrevPointerForUpdate(cmd); + + // don't log if we are replaying from another log +- if ((cmd.getFlags() & UpdateCommand.REPLAY) == 0) { ++ if (!updateFromOldTlogs(cmd)) { + ensureLog(); +- pos = tlog.write(cmd, prevPointer, operationFlags); ++ pos = tlog.write(cmd, prevPointer); + } + + if (!clearCaches) { +@@ -556,10 +576,7 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer { + // Only currently would be useful for RTG while in recovery mode though. + LogPtr ptr = new LogPtr(pos, cmd.getVersion(), prevPointer); + +- // only update our map if we're not buffering +- if ((cmd.getFlags() & UpdateCommand.BUFFERING) == 0) { +- map.put(cmd.getIndexedId(), ptr); +- } ++ map.put(cmd.getIndexedId(), ptr); + + if (trace) { + log.trace("TLOG: added id " + cmd.getPrintableId() + " to " + tlog + " " + ptr + " map=" + System.identityHashCode(map)); +@@ -606,22 +623,21 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer { + BytesRef br = cmd.getIndexedId(); + + synchronized (this) { +- long pos = -1; ++ if ((cmd.getFlags() & UpdateCommand.BUFFERING) != 0) { ++ ensureBufferTlog(); ++ bufferTlog.writeDelete(cmd); ++ return; ++ } + +- // don't log if we are replaying from another log +- if ((cmd.getFlags() & UpdateCommand.REPLAY) == 0) { ++ long pos = -1; ++ if (!updateFromOldTlogs(cmd)) { + ensureLog(); +- pos = tlog.writeDelete(cmd, operationFlags); ++ pos = tlog.writeDelete(cmd); + } + + LogPtr ptr = new LogPtr(pos, cmd.version); +- +- // only update our map if we're not buffering +- if ((cmd.getFlags() & UpdateCommand.BUFFERING) == 0) { +- map.put(br, ptr); +- +- oldDeletes.put(br, ptr); +- } ++ map.put(br, ptr); ++ oldDeletes.put(br, ptr); + + if (trace) { + log.trace("TLOG: added delete for id " + cmd.id + " to " + tlog + " " + ptr + " map=" + System.identityHashCode(map)); +@@ -631,15 +647,20 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer { + + public void deleteByQuery(DeleteUpdateCommand cmd) { + synchronized (this) { ++ if ((cmd.getFlags() & UpdateCommand.BUFFERING) != 0) { ++ ensureBufferTlog(); ++ bufferTlog.writeDeleteByQuery(cmd); ++ return; ++ } ++ + long pos = -1; +- // don't log if we are replaying from another log +- if ((cmd.getFlags() & UpdateCommand.REPLAY) == 0) { ++ if (!updateFromOldTlogs(cmd)) { + ensureLog(); +- pos = tlog.writeDeleteByQuery(cmd, operationFlags); ++ pos = tlog.writeDeleteByQuery(cmd); + } + +- // only change our caches if we are not buffering +- if ((cmd.getFlags() & UpdateCommand.BUFFERING) == 0 && (cmd.getFlags() & UpdateCommand.IGNORE_INDEXWRITER) == 0) { ++ // skip purge our caches in case of tlog replica ++ if ((cmd.getFlags() & UpdateCommand.IGNORE_INDEXWRITER) == 0) { + // given that we just did a delete-by-query, we don't know what documents were + // affected and hence we must purge our caches. + openRealtimeSearcher(); +@@ -802,7 +823,7 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer { + if (prevTlog != null) { + // if we made it through the commit, write a commit command to the log + // TODO: check that this works to cap a tlog we were using to buffer so we don't replay on startup. +- prevTlog.writeCommit(cmd, operationFlags); ++ prevTlog.writeCommit(cmd); + + addOldLog(prevTlog, true); + // the old log list will decref when no longer needed +@@ -1152,9 +1173,16 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer { + public void copyOverBufferingUpdates(CommitUpdateCommand cuc) { + versionInfo.blockUpdates(); + try { +- operationFlags &= ~FLAG_GAP; +- state = State.ACTIVE; +- copyAndSwitchToNewTlog(cuc); ++ synchronized (this) { ++ state = State.ACTIVE; ++ if (bufferTlog == null) { ++ return; ++ } ++ // by calling this, we won't switch to new tlog (compared to applyBufferedUpdates()) ++ // if we switch to new tlog we can possible lose updates on the next fetch ++ copyOverOldUpdates(cuc.getVersion(), bufferTlog); ++ dropBufferTlog(); ++ } + } finally { + versionInfo.unblockUpdates(); + } +@@ -1165,33 +1193,25 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer { + * So any updates which hasn't made it to the index is preserved in the current tlog + * @param cuc any updates that have version larger than the version of cuc will be copied over + */ +- public void copyOverOldUpdates(CommitUpdateCommand cuc) { ++ public void commitAndSwitchToNewTlog(CommitUpdateCommand cuc) { + versionInfo.blockUpdates(); + try { +- copyAndSwitchToNewTlog(cuc); ++ synchronized (this) { ++ if (tlog == null) { ++ return; ++ } ++ preCommit(cuc); ++ try { ++ copyOverOldUpdates(cuc.getVersion()); ++ } finally { ++ postCommit(cuc); ++ } ++ } + } finally { + versionInfo.unblockUpdates(); + } + } + +- protected void copyAndSwitchToNewTlog(CommitUpdateCommand cuc) { +- synchronized (this) { +- if (tlog == null) { +- return; +- } +- preCommit(cuc); +- try { +- copyOverOldUpdates(cuc.getVersion()); +- } finally { +- postCommit(cuc); +- } +- } +- } +- +- /** +- * Copy over updates from prevTlog or last tlog (in tlog folder) to a new tlog +- * @param commitVersion any updates that have version larger than the commitVersion will be copied over +- */ + public void copyOverOldUpdates(long commitVersion) { + TransactionLog oldTlog = prevTlog; + if (oldTlog == null && !logs.isEmpty()) { +@@ -1207,6 +1227,14 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer { + log.warn("Exception reading log", e); + return; + } ++ copyOverOldUpdates(commitVersion, oldTlog); ++ } ++ ++ /** ++ * Copy over updates from prevTlog or last tlog (in tlog folder) to a new tlog ++ * @param commitVersion any updates that have version larger than the commitVersion will be copied over ++ */ ++ public void copyOverOldUpdates(long commitVersion, TransactionLog oldTlog) { + copyOverOldUpdatesMeter.mark(); + + SolrQueryRequest req = new LocalSolrQueryRequest(uhandler.core, +@@ -1270,6 +1298,22 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer { + } + } + ++ protected void ensureBufferTlog() { ++ if (bufferTlog != null) return; ++ String newLogName = String.format(Locale.ROOT, LOG_FILENAME_PATTERN, BUFFER_TLOG_NAME, System.nanoTime()); ++ bufferTlog = newTransactionLog(new File(tlogDir, newLogName), globalStrings, false); ++ } ++ ++ // Cleanup old buffer tlogs ++ protected void deleteBufferLogs() { ++ String[] oldBufferTlog = getBufferLogList(tlogDir); ++ if (oldBufferTlog != null && oldBufferTlog.length != 0) { ++ for (String oldBufferLogName : oldBufferTlog) { ++ deleteFile(new File(tlogDir, oldBufferLogName)); ++ } ++ } ++ } ++ + + protected void ensureLog() { + if (tlog == null) { +@@ -1285,7 +1329,7 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer { + // record a commit + log.info("Recording current closed for " + uhandler.core + " log=" + theLog); + CommitUpdateCommand cmd = new CommitUpdateCommand(new LocalSolrQueryRequest(uhandler.core, new ModifiableSolrParams((SolrParams)null)), false); +- theLog.writeCommit(cmd, operationFlags); ++ theLog.writeCommit(cmd); + } + + theLog.deleteOnClose = false; +@@ -1314,6 +1358,13 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer { + log.forceClose(); + } + ++ if (bufferTlog != null) { ++ // should not delete bufferTlog on close, existing bufferTlog is a sign for skip peerSync ++ bufferTlog.deleteOnClose = false; ++ bufferTlog.decref(); ++ bufferTlog.forceClose(); ++ } ++ + try { + ExecutorUtil.shutdownAndAwaitTermination(recoveryExecutor); + } catch (Exception e) { +@@ -1347,7 +1398,6 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer { + HashMap updates; + List deleteByQueryList; + List deleteList; +- int latestOperation; + + public RecentUpdates(Deque logList) { + this.logList = logList; +@@ -1401,11 +1451,6 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer { + return result; + } + +- public int getLatestOperation() { +- return latestOperation; +- } +- +- + private void update() { + int numUpdates = 0; + updateList = new ArrayList<>(logList.size()); +@@ -1431,9 +1476,6 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer { + + // TODO: refactor this out so we get common error handling + int opAndFlags = (Integer)entry.get(UpdateLog.FLAGS_IDX); +- if (latestOperation == 0) { +- latestOperation = opAndFlags; +- } + int oper = opAndFlags & UpdateLog.OPERATION_MASK; + long version = (Long) entry.get(UpdateLog.VERSION_IDX); + +@@ -1525,6 +1567,10 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer { + tlog.incref(); + logList.addFirst(tlog); + } ++ if (bufferTlog != null) { ++ bufferTlog.incref(); ++ logList.addFirst(bufferTlog); ++ } + } + + // TODO: what if I hand out a list of updates, then do an update, then hand out another list (and +@@ -1542,13 +1588,13 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer { + // reading state and acting on it in the distributed update processor + versionInfo.blockUpdates(); + try { +- if (state == State.BUFFERING) { +- log.info("Restarting buffering. previous=" + recoveryInfo); +- } else if (state != State.ACTIVE) { ++ if (state != State.ACTIVE && state != State.BUFFERING) { + // we don't currently have support for handling other states + log.warn("Unexpected state for bufferUpdates: " + state + ", Ignoring request."); + return; + } ++ dropBufferTlog(); ++ deleteBufferLogs(); + + recoveryInfo = new RecoveryInfo(); + +@@ -1556,15 +1602,7 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer { + log.info("Starting to buffer updates. " + this); + } + +- // since we blocked updates, this synchronization shouldn't strictly be necessary. +- synchronized (this) { +- recoveryInfo.positionOfStart = tlog == null ? 0 : tlog.snapshot(); +- } +- + state = State.BUFFERING; +- +- // currently, buffering is only called by recovery, meaning that there is most likely a gap in updates +- operationFlags |= FLAG_GAP; + } finally { + versionInfo.unblockUpdates(); + } +@@ -1580,25 +1618,24 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer { + log.info("Dropping buffered updates " + this); + } + +- // since we blocked updates, this synchronization shouldn't strictly be necessary. +- synchronized (this) { +- if (tlog != null) { +- tlog.rollback(recoveryInfo.positionOfStart); +- } +- } ++ dropBufferTlog(); + + state = State.ACTIVE; +- operationFlags &= ~FLAG_GAP; +- } catch (IOException e) { +- SolrException.log(log,"Error attempting to roll back log", e); +- return false; +- } +- finally { ++ } finally { + versionInfo.unblockUpdates(); + } + return true; + } + ++ private void dropBufferTlog() { ++ synchronized (this) { ++ if (bufferTlog != null) { ++ bufferTlog.decref(); ++ bufferTlog = null; ++ } ++ } ++ } ++ + + /** Returns the Future to wait on, or null if no replay was needed */ + public Future applyBufferedUpdates() { +@@ -1612,27 +1649,30 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer { + try { + cancelApplyBufferUpdate = false; + if (state != State.BUFFERING) return null; +- operationFlags &= ~FLAG_GAP; + +- // handle case when no log was even created because no updates +- // were received. +- if (tlog == null) { +- state = State.ACTIVE; +- return null; ++ synchronized (this) { ++ // handle case when no updates were received. ++ if (bufferTlog == null) { ++ state = State.ACTIVE; ++ return null; ++ } ++ bufferTlog.incref(); + } +- tlog.incref(); ++ + state = State.APPLYING_BUFFERED; + } finally { + versionInfo.unblockUpdates(); + } + + if (recoveryExecutor.isShutdown()) { +- tlog.decref(); + throw new RuntimeException("executor is not running..."); + } + ExecutorCompletionService cs = new ExecutorCompletionService<>(recoveryExecutor); +- LogReplayer replayer = new LogReplayer(Arrays.asList(new TransactionLog[]{tlog}), true); +- return cs.submit(replayer, recoveryInfo); ++ LogReplayer replayer = new LogReplayer(Collections.singletonList(bufferTlog), true); ++ return cs.submit(() -> { ++ replayer.run(); ++ dropBufferTlog(); ++ }, recoveryInfo); + } + + public State getState() { +@@ -1903,10 +1943,7 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer { + if (!activeLog) { + // if we are replaying an old tlog file, we need to add a commit to the end + // so we don't replay it again if we restart right after. +- +- // if the last operation we replayed had FLAG_GAP set, we want to use that again so we don't lose it +- // as the flag on the last operation. +- translog.writeCommit(cmd, operationFlags | (operationAndFlags & ~OPERATION_MASK)); ++ translog.writeCommit(cmd); + } + + try { +@@ -2037,10 +2074,6 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer { + return cmd; + } + +- public void cancelApplyBufferedUpdates() { +- this.cancelApplyBufferUpdate = true; +- } +- + ThreadPoolExecutor recoveryExecutor = new ExecutorUtil.MDCAwareThreadPoolExecutor(0, + Integer.MAX_VALUE, 1, TimeUnit.SECONDS, new SynchronousQueue(), + new DefaultSolrThreadFactory("recoveryExecutor")); +diff --git a/solr/core/src/test/org/apache/solr/search/TestRecovery.java b/solr/core/src/test/org/apache/solr/search/TestRecovery.java +index 1d62207..1b79cee 100644 +--- a/solr/core/src/test/org/apache/solr/search/TestRecovery.java ++++ b/solr/core/src/test/org/apache/solr/search/TestRecovery.java +@@ -24,7 +24,9 @@ import com.codahale.metrics.Gauge; + import com.codahale.metrics.Meter; + import com.codahale.metrics.Metric; + import com.codahale.metrics.MetricRegistry; ++import org.apache.solr.common.util.TimeSource; + import org.apache.solr.metrics.SolrMetricManager; ++import org.apache.solr.util.TimeOut; + import org.noggit.ObjectBuilder; + + import org.slf4j.Logger; +@@ -820,6 +822,7 @@ public class TestRecovery extends SolrTestCaseJ4 { + +"]" + ); + ++ // Note that the v101->v103 are dropped, therefore it does not present in RTG + assertJQ(req("qt","/get", "getVersions","6") + ,"=={'versions':["+String.join(",",v206,v205,v201,v200,v105,v104)+"]}" + ); +@@ -929,7 +932,6 @@ public class TestRecovery extends SolrTestCaseJ4 { + ,"=={'versions':["+v105+","+v104+"]}" + ); + +- // this time add some docs first before buffering starts (so tlog won't be at pos 0) + updateJ(jsonAdd(sdoc("id","c100", "_version_",v200)), params(DISTRIB_UPDATE_PARAM,FROM_LEADER)); + updateJ(jsonAdd(sdoc("id","c101", "_version_",v201)), params(DISTRIB_UPDATE_PARAM,FROM_LEADER)); + +@@ -957,10 +959,8 @@ public class TestRecovery extends SolrTestCaseJ4 { + +"" +"]" + ); + +- // The updates that were buffered (but never applied) still appear in recent versions! +- // This is good for some uses, but may not be good for others. +- assertJQ(req("qt","/get", "getVersions","11") +- ,"=={'versions':["+String.join(",",v206,v205,v204,v203,v201,v200,v105,v104,v103,v102,v101)+"]}" ++ assertJQ(req("qt","/get", "getVersions","6") ++ ,"=={'versions':["+String.join(",",v206,v205,v201,v200,v105,v104)+"]}" + ); + + assertEquals(UpdateLog.State.ACTIVE, ulog.getState()); // leave each test method in a good state +@@ -1008,13 +1008,9 @@ public class TestRecovery extends SolrTestCaseJ4 { + + + @Test +- public void testBufferingFlags() throws Exception { ++ public void testExistOldBufferLog() throws Exception { + + DirectUpdateHandler2.commitOnClose = false; +- final Semaphore logReplayFinish = new Semaphore(0); +- +- UpdateLog.testing_logReplayFinishHook = () -> logReplayFinish.release(); +- + + SolrQueryRequest req = req(); + UpdateHandler uhandler = req.getCore().getUpdateHandler(); +@@ -1024,9 +1020,6 @@ public class TestRecovery extends SolrTestCaseJ4 { + String v101 = getNextVersion(); + String v102 = getNextVersion(); + String v103 = getNextVersion(); +- String v114 = getNextVersion(); +- String v115 = getNextVersion(); +- String v116 = getNextVersion(); + String v117 = getNextVersion(); + + clearIndex(); +@@ -1049,30 +1042,10 @@ public class TestRecovery extends SolrTestCaseJ4 { + uhandler = req.getCore().getUpdateHandler(); + ulog = uhandler.getUpdateLog(); + +- logReplayFinish.acquire(); // wait for replay to finish +- +- assertTrue((ulog.getStartingOperation() & UpdateLog.FLAG_GAP) != 0); // since we died while buffering, we should see this last +- +- // +- // Try again to ensure that the previous log replay didn't wipe out our flags +- // +- +- req.close(); +- h.close(); +- createCore(); +- +- req = req(); +- uhandler = req.getCore().getUpdateHandler(); +- ulog = uhandler.getUpdateLog(); +- +- assertTrue((ulog.getStartingOperation() & UpdateLog.FLAG_GAP) != 0); +- +- // now do some normal non-buffered adds +- updateJ(jsonAdd(sdoc("id","Q4", "_version_",v114)), params(DISTRIB_UPDATE_PARAM,FROM_LEADER)); +- updateJ(jsonAdd(sdoc("id","Q5", "_version_",v115)), params(DISTRIB_UPDATE_PARAM,FROM_LEADER)); +- updateJ(jsonAdd(sdoc("id","Q6", "_version_",v116)), params(DISTRIB_UPDATE_PARAM,FROM_LEADER)); +- assertU(commit()); ++ // the core does not replay updates from buffer tlog on startup ++ assertTrue(ulog.existOldBufferLog()); // since we died while buffering, we should see this last + ++ // buffer tlog won't be removed on restart + req.close(); + h.close(); + createCore(); +@@ -1081,10 +1054,9 @@ public class TestRecovery extends SolrTestCaseJ4 { + uhandler = req.getCore().getUpdateHandler(); + ulog = uhandler.getUpdateLog(); + +- assertTrue((ulog.getStartingOperation() & UpdateLog.FLAG_GAP) == 0); ++ assertTrue(ulog.existOldBufferLog()); + + ulog.bufferUpdates(); +- // simulate receiving no updates + ulog.applyBufferedUpdates(); + updateJ(jsonAdd(sdoc("id","Q7", "_version_",v117)), params(DISTRIB_UPDATE_PARAM,FROM_LEADER)); // do another add to make sure flags are back to normal + +@@ -1096,10 +1068,12 @@ public class TestRecovery extends SolrTestCaseJ4 { + uhandler = req.getCore().getUpdateHandler(); + ulog = uhandler.getUpdateLog(); + +- assertTrue((ulog.getStartingOperation() & UpdateLog.FLAG_GAP) == 0); // check flags on Q7 +- +- logReplayFinish.acquire(); +- assertEquals(UpdateLog.State.ACTIVE, ulog.getState()); // leave each test method in a good state ++ assertFalse(ulog.existOldBufferLog()); ++ // Timeout for Q7 get replayed, because it was added on tlog, therefore it will be replayed on restart ++ TimeOut timeout = new TimeOut(10, TimeUnit.SECONDS, TimeSource.NANO_TIME); ++ timeout.waitFor("Timeout waiting for finish replay updates", ++ () -> h.getCore().getUpdateHandler().getUpdateLog().getState() == UpdateLog.State.ACTIVE); ++ assertJQ(req("qt","/get", "id", "Q7") ,"/doc/id==Q7"); + } finally { + DirectUpdateHandler2.commitOnClose = true; + UpdateLog.testing_logReplayHook = null; +diff --git a/solr/core/src/test/org/apache/solr/search/TestRecoveryHdfs.java b/solr/core/src/test/org/apache/solr/search/TestRecoveryHdfs.java +index e6bb9a6..1796319 100644 +--- a/solr/core/src/test/org/apache/solr/search/TestRecoveryHdfs.java ++++ b/solr/core/src/test/org/apache/solr/search/TestRecoveryHdfs.java +@@ -44,6 +44,7 @@ import org.apache.hadoop.hdfs.MiniDFSCluster; + import org.apache.solr.SolrTestCaseJ4; + import org.apache.solr.cloud.hdfs.HdfsTestUtil; + import org.apache.solr.common.util.IOUtils; ++import org.apache.solr.common.util.TimeSource; + import org.apache.solr.request.SolrQueryRequest; + import org.apache.solr.update.DirectUpdateHandler2; + import org.apache.solr.update.HdfsUpdateLog; +@@ -51,6 +52,7 @@ import org.apache.solr.update.UpdateHandler; + import org.apache.solr.update.UpdateLog; + import org.apache.solr.update.processor.DistributedUpdateProcessor.DistribPhase; + import org.apache.solr.util.BadHdfsThreadsFilter; ++import org.apache.solr.util.TimeOut; + import org.junit.AfterClass; + import org.junit.BeforeClass; + import org.junit.Ignore; +@@ -515,13 +517,9 @@ public class TestRecoveryHdfs extends SolrTestCaseJ4 { + + + @Test +- public void testBufferingFlags() throws Exception { ++ public void testExistOldBufferLog() throws Exception { + + DirectUpdateHandler2.commitOnClose = false; +- final Semaphore logReplayFinish = new Semaphore(0); +- +- UpdateLog.testing_logReplayFinishHook = () -> logReplayFinish.release(); +- + + SolrQueryRequest req = req(); + UpdateHandler uhandler = req.getCore().getUpdateHandler(); +@@ -548,14 +546,10 @@ public class TestRecoveryHdfs extends SolrTestCaseJ4 { + uhandler = req.getCore().getUpdateHandler(); + ulog = uhandler.getUpdateLog(); + +- logReplayFinish.acquire(); // wait for replay to finish +- +- assertTrue((ulog.getStartingOperation() & UpdateLog.FLAG_GAP) != 0); // since we died while buffering, we should see this last +- +- // +- // Try again to ensure that the previous log replay didn't wipe out our flags +- // ++ // the core no longer replay updates from buffer tlog on startup ++ assertTrue(ulog.existOldBufferLog()); // since we died while buffering, we should see this last + ++ // buffer tlog won't be removed on restart + req.close(); + h.close(); + createCore(); +@@ -564,23 +558,7 @@ public class TestRecoveryHdfs extends SolrTestCaseJ4 { + uhandler = req.getCore().getUpdateHandler(); + ulog = uhandler.getUpdateLog(); + +- assertTrue((ulog.getStartingOperation() & UpdateLog.FLAG_GAP) != 0); +- +- // now do some normal non-buffered adds +- updateJ(jsonAdd(sdoc("id","Q4", "_version_","114")), params(DISTRIB_UPDATE_PARAM,FROM_LEADER)); +- updateJ(jsonAdd(sdoc("id","Q5", "_version_","115")), params(DISTRIB_UPDATE_PARAM,FROM_LEADER)); +- updateJ(jsonAdd(sdoc("id","Q6", "_version_","116")), params(DISTRIB_UPDATE_PARAM,FROM_LEADER)); +- assertU(commit()); +- +- req.close(); +- h.close(); +- createCore(); +- +- req = req(); +- uhandler = req.getCore().getUpdateHandler(); +- ulog = uhandler.getUpdateLog(); +- +- assertTrue((ulog.getStartingOperation() & UpdateLog.FLAG_GAP) == 0); ++ assertTrue(ulog.existOldBufferLog()); + + ulog.bufferUpdates(); + // simulate receiving no updates +@@ -595,10 +573,12 @@ public class TestRecoveryHdfs extends SolrTestCaseJ4 { + uhandler = req.getCore().getUpdateHandler(); + ulog = uhandler.getUpdateLog(); + +- assertTrue((ulog.getStartingOperation() & UpdateLog.FLAG_GAP) == 0); // check flags on Q7 +- +- logReplayFinish.acquire(); +- assertEquals(UpdateLog.State.ACTIVE, ulog.getState()); // leave each test method in a good state ++ assertFalse(ulog.existOldBufferLog()); ++ // Timeout for Q7 get replayed, because it was added on tlog, therefore it will be replayed on restart ++ TimeOut timeout = new TimeOut(10, TimeUnit.SECONDS, TimeSource.NANO_TIME); ++ timeout.waitFor("Timeout waiting for finish replay updates", ++ () -> h.getCore().getUpdateHandler().getUpdateLog().getState() == UpdateLog.State.ACTIVE); ++ assertJQ(req("qt","/get", "id", "Q7") ,"/doc/id==Q7"); + } finally { + DirectUpdateHandler2.commitOnClose = true; + UpdateLog.testing_logReplayHook = null; +diff --git a/solr/core/src/test/org/apache/solr/update/TransactionLogTest.java b/solr/core/src/test/org/apache/solr/update/TransactionLogTest.java +index 1bf4ad4..d2b4b26 100644 +--- a/solr/core/src/test/org/apache/solr/update/TransactionLogTest.java ++++ b/solr/core/src/test/org/apache/solr/update/TransactionLogTest.java +@@ -35,7 +35,7 @@ public class TransactionLogTest extends LuceneTestCase { + transactionLog.lastAddSize = 2000000000; + AddUpdateCommand updateCommand = new AddUpdateCommand(null); + updateCommand.solrDoc = new SolrInputDocument(); +- transactionLog.write(updateCommand, 0); ++ transactionLog.write(updateCommand); + } + } + diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt index 66d885362fd..2c2191eda5b 100644 --- a/solr/CHANGES.txt +++ b/solr/CHANGES.txt @@ -325,6 +325,8 @@ Optimizations SolrConstantScoreQuery as well. QWF since v5.4.0 sometimes needlessly internally executed and cached the query. Affects ExpandComponent, ChildDocTransformer, CurrencyFieldType, TermsQParser. (David Smiley) +* SOLR-9922: Write buffering updates to another tlog. (Cao Manh Dat) + Other Changes ---------------------- diff --git a/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java b/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java index c8f5ae89fbe..966497b0938 100644 --- a/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java +++ b/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java @@ -449,7 +449,6 @@ public class RecoveryStrategy implements Runnable, Closeable { // TODO: perhaps make this grab a new core each time through the loop to handle core reloads? final public void doSyncOrReplicateRecovery(SolrCore core) throws Exception { - boolean replayed = false; boolean successfulRecovery = false; UpdateLog ulog; @@ -500,8 +499,7 @@ public class RecoveryStrategy implements Runnable, Closeable { // when we went down. We may have received updates since then. recentVersions = startingVersions; try { - if ((ulog.getStartingOperation() & UpdateLog.FLAG_GAP) != 0) { - // last operation at the time of startup had the GAP flag set... + if (ulog.existOldBufferLog()) { // this means we were previously doing a full index replication // that probably didn't complete and buffering updates in the // meantime. @@ -542,9 +540,9 @@ public class RecoveryStrategy implements Runnable, Closeable { } LOG.info("Begin buffering updates. core=[{}]", coreName); + // recalling buffer updates will drop the old buffer tlog ulog.bufferUpdates(); - replayed = false; - + LOG.info("Publishing state of core [{}] as recovering, leader is [{}] and I am [{}]", core.getName(), leader.getCoreUrl(), ourUrl); zkController.publish(core.getCoreDescriptor(), Replica.State.RECOVERING); @@ -603,8 +601,7 @@ public class RecoveryStrategy implements Runnable, Closeable { LOG.info("Replaying updates buffered during PeerSync."); replay(core); - replayed = true; - + // sync success successfulRecovery = true; return; @@ -630,8 +627,7 @@ public class RecoveryStrategy implements Runnable, Closeable { } replayFuture = replay(core); - replayed = true; - + if (isClosed()) { LOG.info("RecoveryStrategy has been closed"); break; @@ -650,21 +646,6 @@ public class RecoveryStrategy implements Runnable, Closeable { } catch (Exception e) { SolrException.log(LOG, "Error while trying to recover. core=" + coreName, e); } finally { - if (!replayed) { - // dropBufferedUpdate()s currently only supports returning to ACTIVE state, which risks additional updates - // being added w/o UpdateLog.FLAG_GAP, hence losing the info on restart that we are not up-to-date. - // For now, ulog will simply remain in BUFFERING state, and an additional call to bufferUpdates() will - // reset our starting point for playback. - LOG.info("Replay not started, or was not successful... still buffering updates."); - - /** this prev code is retained in case we want to switch strategies. - try { - ulog.dropBufferedUpdates(); - } catch (Exception e) { - SolrException.log(log, "", e); - } - **/ - } if (successfulRecovery) { LOG.info("Registering as Active after recovery."); try { diff --git a/solr/core/src/java/org/apache/solr/cloud/ReplicateFromLeader.java b/solr/core/src/java/org/apache/solr/cloud/ReplicateFromLeader.java index 0a742e3a5ae..aa648dd8869 100644 --- a/solr/core/src/java/org/apache/solr/cloud/ReplicateFromLeader.java +++ b/solr/core/src/java/org/apache/solr/cloud/ReplicateFromLeader.java @@ -97,7 +97,7 @@ public class ReplicateFromLeader { new ModifiableSolrParams()); CommitUpdateCommand cuc = new CommitUpdateCommand(req, false); cuc.setVersion(Long.parseLong(commitVersion)); - updateLog.copyOverOldUpdates(cuc); + updateLog.commitAndSwitchToNewTlog(cuc); lastVersion = Long.parseLong(commitVersion); } }); diff --git a/solr/core/src/java/org/apache/solr/update/CdcrTransactionLog.java b/solr/core/src/java/org/apache/solr/update/CdcrTransactionLog.java index 3534f622908..f668540325e 100644 --- a/solr/core/src/java/org/apache/solr/update/CdcrTransactionLog.java +++ b/solr/core/src/java/org/apache/solr/update/CdcrTransactionLog.java @@ -41,7 +41,7 @@ import org.slf4j.LoggerFactory; * methods {@link #incref()}, {@link #close()} and {@link #reopenOutputStream()}. *
  • encode the number of records in the tlog file in the last commit record. The number of records will be * decoded and reuse if the tlog file is reopened. This is achieved by extending the constructor, and the - * methods {@link #writeCommit(CommitUpdateCommand, int)} and {@link #getReader(long)}.
  • + * methods {@link #writeCommit(CommitUpdateCommand)} and {@link #getReader(long)}. * */ public class CdcrTransactionLog extends TransactionLog { @@ -108,7 +108,7 @@ public class CdcrTransactionLog extends TransactionLog { } @Override - public long write(AddUpdateCommand cmd, long prevPointer, int flags) { + public long write(AddUpdateCommand cmd, long prevPointer) { assert (-1 <= prevPointer && (cmd.isInPlaceUpdate() || (-1 == prevPointer))); LogCodec codec = new LogCodec(resolver); @@ -125,7 +125,7 @@ public class CdcrTransactionLog extends TransactionLog { codec.init(out); if (cmd.isInPlaceUpdate()) { codec.writeTag(JavaBinCodec.ARR, 6); - codec.writeInt(UpdateLog.UPDATE_INPLACE | flags); // should just take one byte + codec.writeInt(UpdateLog.UPDATE_INPLACE); // should just take one byte codec.writeLong(cmd.getVersion()); codec.writeLong(prevPointer); codec.writeLong(cmd.prevVersion); @@ -141,7 +141,7 @@ public class CdcrTransactionLog extends TransactionLog { } else { codec.writeTag(JavaBinCodec.ARR, 4); - codec.writeInt(UpdateLog.ADD | flags); // should just take one byte + codec.writeInt(UpdateLog.ADD); // should just take one byte codec.writeLong(cmd.getVersion()); if (cmd.getReq().getParamString().contains(CdcrUpdateProcessor.CDCR_UPDATE)) { // if the update is received via cdcr source; add extra boolean entry @@ -179,7 +179,7 @@ public class CdcrTransactionLog extends TransactionLog { } @Override - public long writeDelete(DeleteUpdateCommand cmd, int flags) { + public long writeDelete(DeleteUpdateCommand cmd) { LogCodec codec = new LogCodec(resolver); try { @@ -190,7 +190,7 @@ public class CdcrTransactionLog extends TransactionLog { MemOutputStream out = new MemOutputStream(new byte[20 + br.length]); codec.init(out); codec.writeTag(JavaBinCodec.ARR, 4); - codec.writeInt(UpdateLog.DELETE | flags); // should just take one byte + codec.writeInt(UpdateLog.DELETE); // should just take one byte codec.writeLong(cmd.getVersion()); codec.writeByteArray(br.bytes, br.offset, br.length); if (cmd.getReq().getParamString().contains(CdcrUpdateProcessor.CDCR_UPDATE)) { @@ -217,7 +217,7 @@ public class CdcrTransactionLog extends TransactionLog { } @Override - public long writeDeleteByQuery(DeleteUpdateCommand cmd, int flags) { + public long writeDeleteByQuery(DeleteUpdateCommand cmd) { LogCodec codec = new LogCodec(resolver); try { checkWriteHeader(codec, null); @@ -225,7 +225,7 @@ public class CdcrTransactionLog extends TransactionLog { MemOutputStream out = new MemOutputStream(new byte[20 + (cmd.query.length())]); codec.init(out); codec.writeTag(JavaBinCodec.ARR, 4); - codec.writeInt(UpdateLog.DELETE_BY_QUERY | flags); // should just take one byte + codec.writeInt(UpdateLog.DELETE_BY_QUERY); // should just take one byte codec.writeLong(cmd.getVersion()); codec.writeStr(cmd.query); if (cmd.getReq().getParamString().contains(CdcrUpdateProcessor.CDCR_UPDATE)) { @@ -249,7 +249,7 @@ public class CdcrTransactionLog extends TransactionLog { } @Override - public long writeCommit(CommitUpdateCommand cmd, int flags) { + public long writeCommit(CommitUpdateCommand cmd) { LogCodec codec = new LogCodec(resolver); synchronized (this) { try { @@ -261,7 +261,7 @@ public class CdcrTransactionLog extends TransactionLog { } codec.init(fos); codec.writeTag(JavaBinCodec.ARR, 4); - codec.writeInt(UpdateLog.COMMIT | flags); // should just take one byte + codec.writeInt(UpdateLog.COMMIT); // should just take one byte codec.writeLong(cmd.getVersion()); codec.writeTag(JavaBinCodec.INT); // Enforce the encoding of a plain integer, to simplify decoding fos.writeInt(numRecords + 1); // the number of records in the file - +1 to account for the commit operation being written diff --git a/solr/core/src/java/org/apache/solr/update/CdcrUpdateLog.java b/solr/core/src/java/org/apache/solr/update/CdcrUpdateLog.java index 6b202044d76..bff16122ecf 100644 --- a/solr/core/src/java/org/apache/solr/update/CdcrUpdateLog.java +++ b/solr/core/src/java/org/apache/solr/update/CdcrUpdateLog.java @@ -352,7 +352,6 @@ public class CdcrUpdateLog extends UpdateLog { long latestVersion = startingUpdates.getMaxRecentVersion(); try { startingVersions = startingUpdates.getVersions(numRecordsToKeep); - startingOperation = startingUpdates.getLatestOperation(); // populate recent deletes list (since we can't get that info from the index) for (int i=startingUpdates.deleteList.size()-1; i>=0; i--) { @@ -389,9 +388,7 @@ public class CdcrUpdateLog extends UpdateLog { */ private void copyBufferedUpdates(File tlogSrc, long offsetSrc, long latestVersion) { recoveryInfo = new RecoveryInfo(); - recoveryInfo.positionOfStart = tlog == null ? 0 : tlog.snapshot(); state = State.BUFFERING; - operationFlags |= FLAG_GAP; ModifiableSolrParams params = new ModifiableSolrParams(); params.set(DistributingUpdateProcessorFactory.DISTRIB_UPDATE_PARAM, DistributedUpdateProcessor.DistribPhase.FROMLEADER.toString()); diff --git a/solr/core/src/java/org/apache/solr/update/HdfsTransactionLog.java b/solr/core/src/java/org/apache/solr/update/HdfsTransactionLog.java index 0f89016a107..8ed7d7ad65a 100644 --- a/solr/core/src/java/org/apache/solr/update/HdfsTransactionLog.java +++ b/solr/core/src/java/org/apache/solr/update/HdfsTransactionLog.java @@ -166,20 +166,6 @@ public class HdfsTransactionLog extends TransactionLog { } return true; } - - // This could mess with any readers or reverse readers that are open, or anything that might try to do a log lookup. - // This should only be used to roll back buffered updates, not actually applied updates. - @Override - public void rollback(long pos) throws IOException { - synchronized (this) { - assert snapshot_size == pos; - ensureFlushed(); - // TODO: how do we rollback with hdfs?? We need HDFS-3107 - fos.setWritten(pos); - assert fos.size() == pos; - numRecords = snapshot_numRecords; - } - } private void readHeader(FastInputStream fis) throws IOException { // read existing header @@ -210,7 +196,7 @@ public class HdfsTransactionLog extends TransactionLog { } @Override - public long writeCommit(CommitUpdateCommand cmd, int flags) { + public long writeCommit(CommitUpdateCommand cmd) { LogCodec codec = new LogCodec(resolver); synchronized (this) { try { @@ -223,7 +209,7 @@ public class HdfsTransactionLog extends TransactionLog { codec.init(fos); codec.writeTag(JavaBinCodec.ARR, 3); - codec.writeInt(UpdateLog.COMMIT | flags); // should just take one byte + codec.writeInt(UpdateLog.COMMIT); // should just take one byte codec.writeLong(cmd.getVersion()); codec.writeStr(END_MESSAGE); // ensure these bytes are (almost) last in the file diff --git a/solr/core/src/java/org/apache/solr/update/HdfsUpdateLog.java b/solr/core/src/java/org/apache/solr/update/HdfsUpdateLog.java index 7bb74d05bf9..8ca4b1cb3e5 100644 --- a/solr/core/src/java/org/apache/solr/update/HdfsUpdateLog.java +++ b/solr/core/src/java/org/apache/solr/update/HdfsUpdateLog.java @@ -65,37 +65,6 @@ public class HdfsUpdateLog extends UpdateLog { this.confDir = confDir; } - // HACK - // while waiting for HDFS-3107, instead of quickly - // dropping, we slowly apply - // This is somewhat brittle, but current usage - // allows for it - @Override - public boolean dropBufferedUpdates() { - versionInfo.blockUpdates(); - try { - if (state != State.BUFFERING) return false; - - if (log.isInfoEnabled()) { - log.info("Dropping buffered updates " + this); - } - - // since we blocked updates, this synchronization shouldn't strictly be - // necessary. - synchronized (this) { - if (tlog != null) { - // tlog.rollback(recoveryInfo.positionOfStart); - } - } - - state = State.ACTIVE; - operationFlags &= ~FLAG_GAP; - } finally { - versionInfo.unblockUpdates(); - } - return true; - } - @Override public void init(PluginInfo info) { super.init(info); @@ -186,6 +155,11 @@ public class HdfsUpdateLog extends UpdateLog { throw new RuntimeException("Problem creating directory: " + tlogDir, e); } } + + String[] oldBufferTlog = getBufferLogList(fs, tlogDir); + if (oldBufferTlog != null && oldBufferTlog.length != 0) { + existOldBufferLog = true; + } tlogFiles = getLogList(fs, tlogDir); id = getLastLogId() + 1; // add 1 since we will create a new log for the @@ -241,7 +215,6 @@ public class HdfsUpdateLog extends UpdateLog { // non-complete tlogs. try (RecentUpdates startingUpdates = getRecentUpdates()) { startingVersions = startingUpdates.getVersions(getNumRecordsToKeep()); - startingOperation = startingUpdates.getLatestOperation(); // populate recent deletes list (since we can't get that info from the // index) @@ -269,6 +242,23 @@ public class HdfsUpdateLog extends UpdateLog { public String getLogDir() { return tlogDir.toUri().toString(); } + + public static String[] getBufferLogList(FileSystem fs, Path tlogDir) { + final String prefix = BUFFER_TLOG_NAME+'.'; + assert fs != null; + FileStatus[] fileStatuses; + try { + fileStatuses = fs.listStatus(tlogDir, path -> path.getName().startsWith(prefix)); + } catch (IOException e) { + throw new SolrException(ErrorCode.SERVER_ERROR, "Failed on listing old buffer tlog", e); + } + + String[] names = new String[fileStatuses.length]; + for (int i = 0; i < fileStatuses.length; i++) { + names[i] = fileStatuses[i].getPath().getName(); + } + return names; + } public static String[] getLogList(FileSystem fs, Path tlogDir) { final String prefix = TLOG_NAME + '.'; @@ -307,7 +297,35 @@ public class HdfsUpdateLog extends UpdateLog { IOUtils.closeQuietly(fs); } } - + + @Override + protected void ensureBufferTlog() { + if (bufferTlog != null) return; + String newLogName = String.format(Locale.ROOT, LOG_FILENAME_PATTERN, BUFFER_TLOG_NAME, System.nanoTime()); + bufferTlog = new HdfsTransactionLog(fs, new Path(tlogDir, newLogName), + globalStrings, tlogDfsReplication); + } + + @Override + protected void deleteBufferLogs() { + // Delete old buffer logs + String[] oldBufferTlog = getBufferLogList(fs, tlogDir); + if (oldBufferTlog != null && oldBufferTlog.length != 0) { + for (String oldBufferLogName : oldBufferTlog) { + Path f = new Path(tlogDir, oldBufferLogName); + try { + boolean s = fs.delete(f, false); + if (!s) { + log.error("Could not remove old buffer tlog file:" + f); + } + } catch (IOException e) { + // No need to bubble up this exception, because it won't cause any problems on recovering + log.error("Could not remove old buffer tlog file:" + f, e); + } + } + } + } + @Override protected void ensureLog() { if (tlog == null) { diff --git a/solr/core/src/java/org/apache/solr/update/TransactionLog.java b/solr/core/src/java/org/apache/solr/update/TransactionLog.java index 96a928cc1a8..2a23896d491 100644 --- a/solr/core/src/java/org/apache/solr/update/TransactionLog.java +++ b/solr/core/src/java/org/apache/solr/update/TransactionLog.java @@ -85,9 +85,6 @@ public class TransactionLog implements Closeable { Map globalStringMap = new HashMap<>(); List globalStringList = new ArrayList<>(); - long snapshot_size; - int snapshot_numRecords; - // write a BytesRef as a byte array static final JavaBinCodec.ObjectResolver resolver = new JavaBinCodec.ObjectResolver() { @Override @@ -153,7 +150,7 @@ public class TransactionLog implements Closeable { // Parse tlog id from the filename String filename = tlogFile.getName(); - id = Long.parseLong(filename.substring(filename.indexOf('.') + 1, filename.indexOf('.') + 20)); + id = Long.parseLong(filename.substring(filename.lastIndexOf('.')+1)); this.tlogFile = tlogFile; raf = new RandomAccessFile(this.tlogFile, "rw"); @@ -233,29 +230,6 @@ public class TransactionLog implements Closeable { return true; } - /** takes a snapshot of the current position and number of records - * for later possible rollback, and returns the position */ - public long snapshot() { - synchronized (this) { - snapshot_size = fos.size(); - snapshot_numRecords = numRecords; - return snapshot_size; - } - } - - // This could mess with any readers or reverse readers that are open, or anything that might try to do a log lookup. - // This should only be used to roll back buffered updates, not actually applied updates. - public void rollback(long pos) throws IOException { - synchronized (this) { - assert snapshot_size == pos; - fos.flush(); - raf.setLength(pos); - fos.setWritten(pos); - assert fos.size() == pos; - numRecords = snapshot_numRecords; - } - } - public long writeData(Object o) { @SuppressWarnings("resource") final LogCodec codec = new LogCodec(resolver); try { @@ -346,17 +320,16 @@ public class TransactionLog implements Closeable { /** * Writes an add update command to the transaction log. This is not applicable for - * in-place updates; use {@link #write(AddUpdateCommand, long, int)}. + * in-place updates; use {@link #write(AddUpdateCommand, long)}. * (The previous pointer (applicable for in-place updates) is set to -1 while writing * the command to the transaction log.) * @param cmd The add update command to be written - * @param flags Options for writing the command to the transaction log * @return Returns the position pointer of the written update command * - * @see #write(AddUpdateCommand, long, int) + * @see #write(AddUpdateCommand, long) */ - public long write(AddUpdateCommand cmd, int flags) { - return write(cmd, -1, flags); + public long write(AddUpdateCommand cmd) { + return write(cmd, -1); } /** @@ -365,10 +338,9 @@ public class TransactionLog implements Closeable { * @param cmd The add update command to be written * @param prevPointer The pointer in the transaction log which this update depends * on (applicable for in-place updates) - * @param flags Options for writing the command to the transaction log * @return Returns the position pointer of the written update command */ - public long write(AddUpdateCommand cmd, long prevPointer, int flags) { + public long write(AddUpdateCommand cmd, long prevPointer) { assert (-1 <= prevPointer && (cmd.isInPlaceUpdate() || (-1 == prevPointer))); LogCodec codec = new LogCodec(resolver); @@ -386,14 +358,14 @@ public class TransactionLog implements Closeable { codec.init(out); if (cmd.isInPlaceUpdate()) { codec.writeTag(JavaBinCodec.ARR, 5); - codec.writeInt(UpdateLog.UPDATE_INPLACE | flags); // should just take one byte + codec.writeInt(UpdateLog.UPDATE_INPLACE); // should just take one byte codec.writeLong(cmd.getVersion()); codec.writeLong(prevPointer); codec.writeLong(cmd.prevVersion); codec.writeSolrInputDocument(cmd.getSolrInputDocument()); } else { codec.writeTag(JavaBinCodec.ARR, 3); - codec.writeInt(UpdateLog.ADD | flags); // should just take one byte + codec.writeInt(UpdateLog.ADD); // should just take one byte codec.writeLong(cmd.getVersion()); codec.writeSolrInputDocument(cmd.getSolrInputDocument()); } @@ -422,7 +394,7 @@ public class TransactionLog implements Closeable { } } - public long writeDelete(DeleteUpdateCommand cmd, int flags) { + public long writeDelete(DeleteUpdateCommand cmd) { LogCodec codec = new LogCodec(resolver); try { @@ -433,7 +405,7 @@ public class TransactionLog implements Closeable { MemOutputStream out = new MemOutputStream(new byte[20 + br.length]); codec.init(out); codec.writeTag(JavaBinCodec.ARR, 3); - codec.writeInt(UpdateLog.DELETE | flags); // should just take one byte + codec.writeInt(UpdateLog.DELETE); // should just take one byte codec.writeLong(cmd.getVersion()); codec.writeByteArray(br.bytes, br.offset, br.length); @@ -452,7 +424,7 @@ public class TransactionLog implements Closeable { } - public long writeDeleteByQuery(DeleteUpdateCommand cmd, int flags) { + public long writeDeleteByQuery(DeleteUpdateCommand cmd) { LogCodec codec = new LogCodec(resolver); try { checkWriteHeader(codec, null); @@ -460,7 +432,7 @@ public class TransactionLog implements Closeable { MemOutputStream out = new MemOutputStream(new byte[20 + (cmd.query.length())]); codec.init(out); codec.writeTag(JavaBinCodec.ARR, 3); - codec.writeInt(UpdateLog.DELETE_BY_QUERY | flags); // should just take one byte + codec.writeInt(UpdateLog.DELETE_BY_QUERY); // should just take one byte codec.writeLong(cmd.getVersion()); codec.writeStr(cmd.query); @@ -478,7 +450,7 @@ public class TransactionLog implements Closeable { } - public long writeCommit(CommitUpdateCommand cmd, int flags) { + public long writeCommit(CommitUpdateCommand cmd) { LogCodec codec = new LogCodec(resolver); synchronized (this) { try { @@ -490,7 +462,7 @@ public class TransactionLog implements Closeable { } codec.init(fos); codec.writeTag(JavaBinCodec.ARR, 3); - codec.writeInt(UpdateLog.COMMIT | flags); // should just take one byte + codec.writeInt(UpdateLog.COMMIT); // should just take one byte codec.writeLong(cmd.getVersion()); codec.writeStr(END_MESSAGE); // ensure these bytes are (almost) last in the file diff --git a/solr/core/src/java/org/apache/solr/update/UpdateLog.java b/solr/core/src/java/org/apache/solr/update/UpdateLog.java index 7f821eafc0e..1bda23fc038 100644 --- a/solr/core/src/java/org/apache/solr/update/UpdateLog.java +++ b/solr/core/src/java/org/apache/solr/update/UpdateLog.java @@ -96,6 +96,7 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer { private static final long STATUS_TIME = TimeUnit.NANOSECONDS.convert(60, TimeUnit.SECONDS); public static String LOG_FILENAME_PATTERN = "%s.%019d"; public static String TLOG_NAME="tlog"; + public static String BUFFER_TLOG_NAME="buffer.tlog"; private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); private boolean debug = log.isDebugEnabled(); @@ -139,11 +140,7 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer { public static final int DELETE_BY_QUERY = 0x03; public static final int COMMIT = 0x04; public static final int UPDATE_INPLACE = 0x08; - // Flag indicating that this is a buffered operation, and that a gap exists before buffering started. - // for example, if full index replication starts and we are buffering updates, then this flag should - // be set to indicate that replaying the log would not bring us into sync (i.e. peersync should - // fail if this flag is set on the last update in the tlog). - public static final int FLAG_GAP = 0x10; + // For backward-compatibility, we should delete this field in 9.0 public static final int OPERATION_MASK = 0x0f; // mask off flags to get the operation /** @@ -186,8 +183,8 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer { long id = -1; protected State state = State.ACTIVE; - protected int operationFlags; // flags to write in the transaction log with operations (i.e. FLAG_GAP) + protected TransactionLog bufferTlog; protected TransactionLog tlog; protected TransactionLog prevTlog; protected final Deque logs = new LinkedList<>(); // list of recent logs, newest first @@ -206,6 +203,7 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer { protected int maxNumLogsToKeep; protected int numVersionBuckets; // This should only be used to initialize VersionInfo... the actual number of buckets may be rounded up to a power of two. protected Long maxVersionFromIndex = null; + protected boolean existOldBufferLog = false; // keep track of deletes only... this is not updated on an add protected LinkedHashMap oldDeletes = new LinkedHashMap(numDeletesToKeep) { @@ -244,7 +242,6 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer { volatile UpdateHandler uhandler; // a core reload can change this reference! protected volatile boolean cancelApplyBufferUpdate; List startingVersions; - int startingOperation; // last operation in the logs on startup // metrics protected Gauge bufferedOpsGauge; @@ -378,6 +375,10 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer { log.debug("UpdateHandler init: tlogDir=" + tlogDir + ", existing tlogs=" + Arrays.asList(tlogFiles) + ", next id=" + id); } + String[] oldBufferTlog = getBufferLogList(tlogDir); + if (oldBufferTlog != null && oldBufferTlog.length != 0) { + existOldBufferLog = true; + } TransactionLog oldLog = null; for (String oldLogName : tlogFiles) { File f = new File(tlogDir, oldLogName); @@ -408,7 +409,6 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer { // TODO: these startingVersions assume that we successfully recover from all non-complete tlogs. try (RecentUpdates startingUpdates = getRecentUpdates()) { startingVersions = startingUpdates.getVersions(numRecordsToKeep); - startingOperation = startingUpdates.getLatestOperation(); // populate recent deletes list (since we can't get that info from the index) for (int i = startingUpdates.deleteList.size() - 1; i >= 0; i--) { @@ -434,14 +434,16 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer { this.metricManager = manager; this.registryName = registry; bufferedOpsGauge = () -> { + if (state == State.BUFFERING) { + if (bufferTlog == null) return 0; + // numRecords counts header as a record + return bufferTlog.numRecords() - 1; + } if (tlog == null) { return 0; } else if (state == State.APPLYING_BUFFERED) { // numRecords counts header as a record return tlog.numRecords() - 1 - recoveryInfo.adds - recoveryInfo.deleteByQuery - recoveryInfo.deletes - recoveryInfo.errors; - } else if (state == State.BUFFERING) { - // numRecords counts header as a record - return tlog.numRecords() - 1; } else { return 0; } @@ -472,8 +474,8 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer { return startingVersions; } - public int getStartingOperation() { - return startingOperation; + public boolean existOldBufferLog() { + return existOldBufferLog; } /* Takes over ownership of the log, keeping it until no longer needed @@ -509,6 +511,19 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer { logs.addFirst(oldLog); } + public String[] getBufferLogList(File directory) { + final String prefix = BUFFER_TLOG_NAME+'.'; + return directory.list((dir, name) -> name.startsWith(prefix)); + } + + /** + * Does update from old tlogs (not from buffer tlog)? + * If yes we must skip writing {@code cmd} to current tlog + */ + private boolean updateFromOldTlogs(UpdateCommand cmd) { + return (cmd.getFlags() & UpdateCommand.REPLAY) != 0 && state == State.REPLAYING; + } + public String[] getLogList(File directory) { final String prefix = TLOG_NAME+'.'; String[] names = directory.list(new FilenameFilter() { @@ -541,14 +556,19 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer { // if ((cmd.getFlags() & UpdateCommand.REPLAY) != 0) return; synchronized (this) { - long pos = -1; + if ((cmd.getFlags() & UpdateCommand.BUFFERING) != 0) { + ensureBufferTlog(); + bufferTlog.write(cmd); + return; + } + long pos = -1; long prevPointer = getPrevPointerForUpdate(cmd); // don't log if we are replaying from another log - if ((cmd.getFlags() & UpdateCommand.REPLAY) == 0) { + if (!updateFromOldTlogs(cmd)) { ensureLog(); - pos = tlog.write(cmd, prevPointer, operationFlags); + pos = tlog.write(cmd, prevPointer); } if (!clearCaches) { @@ -556,10 +576,7 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer { // Only currently would be useful for RTG while in recovery mode though. LogPtr ptr = new LogPtr(pos, cmd.getVersion(), prevPointer); - // only update our map if we're not buffering - if ((cmd.getFlags() & UpdateCommand.BUFFERING) == 0) { - map.put(cmd.getIndexedId(), ptr); - } + map.put(cmd.getIndexedId(), ptr); if (trace) { log.trace("TLOG: added id " + cmd.getPrintableId() + " to " + tlog + " " + ptr + " map=" + System.identityHashCode(map)); @@ -606,22 +623,21 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer { BytesRef br = cmd.getIndexedId(); synchronized (this) { - long pos = -1; + if ((cmd.getFlags() & UpdateCommand.BUFFERING) != 0) { + ensureBufferTlog(); + bufferTlog.writeDelete(cmd); + return; + } - // don't log if we are replaying from another log - if ((cmd.getFlags() & UpdateCommand.REPLAY) == 0) { + long pos = -1; + if (!updateFromOldTlogs(cmd)) { ensureLog(); - pos = tlog.writeDelete(cmd, operationFlags); + pos = tlog.writeDelete(cmd); } LogPtr ptr = new LogPtr(pos, cmd.version); - - // only update our map if we're not buffering - if ((cmd.getFlags() & UpdateCommand.BUFFERING) == 0) { - map.put(br, ptr); - - oldDeletes.put(br, ptr); - } + map.put(br, ptr); + oldDeletes.put(br, ptr); if (trace) { log.trace("TLOG: added delete for id " + cmd.id + " to " + tlog + " " + ptr + " map=" + System.identityHashCode(map)); @@ -631,15 +647,20 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer { public void deleteByQuery(DeleteUpdateCommand cmd) { synchronized (this) { - long pos = -1; - // don't log if we are replaying from another log - if ((cmd.getFlags() & UpdateCommand.REPLAY) == 0) { - ensureLog(); - pos = tlog.writeDeleteByQuery(cmd, operationFlags); + if ((cmd.getFlags() & UpdateCommand.BUFFERING) != 0) { + ensureBufferTlog(); + bufferTlog.writeDeleteByQuery(cmd); + return; } - // only change our caches if we are not buffering - if ((cmd.getFlags() & UpdateCommand.BUFFERING) == 0 && (cmd.getFlags() & UpdateCommand.IGNORE_INDEXWRITER) == 0) { + long pos = -1; + if (!updateFromOldTlogs(cmd)) { + ensureLog(); + pos = tlog.writeDeleteByQuery(cmd); + } + + // skip purge our caches in case of tlog replica + if ((cmd.getFlags() & UpdateCommand.IGNORE_INDEXWRITER) == 0) { // given that we just did a delete-by-query, we don't know what documents were // affected and hence we must purge our caches. openRealtimeSearcher(); @@ -802,7 +823,7 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer { if (prevTlog != null) { // if we made it through the commit, write a commit command to the log // TODO: check that this works to cap a tlog we were using to buffer so we don't replay on startup. - prevTlog.writeCommit(cmd, operationFlags); + prevTlog.writeCommit(cmd); addOldLog(prevTlog, true); // the old log list will decref when no longer needed @@ -1152,9 +1173,16 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer { public void copyOverBufferingUpdates(CommitUpdateCommand cuc) { versionInfo.blockUpdates(); try { - operationFlags &= ~FLAG_GAP; - state = State.ACTIVE; - copyAndSwitchToNewTlog(cuc); + synchronized (this) { + state = State.ACTIVE; + if (bufferTlog == null) { + return; + } + // by calling this, we won't switch to new tlog (compared to applyBufferedUpdates()) + // if we switch to new tlog we can possible lose updates on the next fetch + copyOverOldUpdates(cuc.getVersion(), bufferTlog); + dropBufferTlog(); + } } finally { versionInfo.unblockUpdates(); } @@ -1165,33 +1193,25 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer { * So any updates which hasn't made it to the index is preserved in the current tlog * @param cuc any updates that have version larger than the version of cuc will be copied over */ - public void copyOverOldUpdates(CommitUpdateCommand cuc) { + public void commitAndSwitchToNewTlog(CommitUpdateCommand cuc) { versionInfo.blockUpdates(); try { - copyAndSwitchToNewTlog(cuc); + synchronized (this) { + if (tlog == null) { + return; + } + preCommit(cuc); + try { + copyOverOldUpdates(cuc.getVersion()); + } finally { + postCommit(cuc); + } + } } finally { versionInfo.unblockUpdates(); } } - protected void copyAndSwitchToNewTlog(CommitUpdateCommand cuc) { - synchronized (this) { - if (tlog == null) { - return; - } - preCommit(cuc); - try { - copyOverOldUpdates(cuc.getVersion()); - } finally { - postCommit(cuc); - } - } - } - - /** - * Copy over updates from prevTlog or last tlog (in tlog folder) to a new tlog - * @param commitVersion any updates that have version larger than the commitVersion will be copied over - */ public void copyOverOldUpdates(long commitVersion) { TransactionLog oldTlog = prevTlog; if (oldTlog == null && !logs.isEmpty()) { @@ -1207,6 +1227,14 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer { log.warn("Exception reading log", e); return; } + copyOverOldUpdates(commitVersion, oldTlog); + } + + /** + * Copy over updates from prevTlog or last tlog (in tlog folder) to a new tlog + * @param commitVersion any updates that have version larger than the commitVersion will be copied over + */ + public void copyOverOldUpdates(long commitVersion, TransactionLog oldTlog) { copyOverOldUpdatesMeter.mark(); SolrQueryRequest req = new LocalSolrQueryRequest(uhandler.core, @@ -1270,6 +1298,22 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer { } } + protected void ensureBufferTlog() { + if (bufferTlog != null) return; + String newLogName = String.format(Locale.ROOT, LOG_FILENAME_PATTERN, BUFFER_TLOG_NAME, System.nanoTime()); + bufferTlog = newTransactionLog(new File(tlogDir, newLogName), globalStrings, false); + } + + // Cleanup old buffer tlogs + protected void deleteBufferLogs() { + String[] oldBufferTlog = getBufferLogList(tlogDir); + if (oldBufferTlog != null && oldBufferTlog.length != 0) { + for (String oldBufferLogName : oldBufferTlog) { + deleteFile(new File(tlogDir, oldBufferLogName)); + } + } + } + protected void ensureLog() { if (tlog == null) { @@ -1285,7 +1329,7 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer { // record a commit log.info("Recording current closed for " + uhandler.core + " log=" + theLog); CommitUpdateCommand cmd = new CommitUpdateCommand(new LocalSolrQueryRequest(uhandler.core, new ModifiableSolrParams((SolrParams)null)), false); - theLog.writeCommit(cmd, operationFlags); + theLog.writeCommit(cmd); } theLog.deleteOnClose = false; @@ -1314,6 +1358,13 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer { log.forceClose(); } + if (bufferTlog != null) { + // should not delete bufferTlog on close, existing bufferTlog is a sign for skip peerSync + bufferTlog.deleteOnClose = false; + bufferTlog.decref(); + bufferTlog.forceClose(); + } + try { ExecutorUtil.shutdownAndAwaitTermination(recoveryExecutor); } catch (Exception e) { @@ -1347,7 +1398,6 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer { HashMap updates; List deleteByQueryList; List deleteList; - int latestOperation; public RecentUpdates(Deque logList) { this.logList = logList; @@ -1401,11 +1451,6 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer { return result; } - public int getLatestOperation() { - return latestOperation; - } - - private void update() { int numUpdates = 0; updateList = new ArrayList<>(logList.size()); @@ -1431,9 +1476,6 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer { // TODO: refactor this out so we get common error handling int opAndFlags = (Integer)entry.get(UpdateLog.FLAGS_IDX); - if (latestOperation == 0) { - latestOperation = opAndFlags; - } int oper = opAndFlags & UpdateLog.OPERATION_MASK; long version = (Long) entry.get(UpdateLog.VERSION_IDX); @@ -1525,6 +1567,10 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer { tlog.incref(); logList.addFirst(tlog); } + if (bufferTlog != null) { + bufferTlog.incref(); + logList.addFirst(bufferTlog); + } } // TODO: what if I hand out a list of updates, then do an update, then hand out another list (and @@ -1542,13 +1588,13 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer { // reading state and acting on it in the distributed update processor versionInfo.blockUpdates(); try { - if (state == State.BUFFERING) { - log.info("Restarting buffering. previous=" + recoveryInfo); - } else if (state != State.ACTIVE) { + if (state != State.ACTIVE && state != State.BUFFERING) { // we don't currently have support for handling other states log.warn("Unexpected state for bufferUpdates: " + state + ", Ignoring request."); return; } + dropBufferTlog(); + deleteBufferLogs(); recoveryInfo = new RecoveryInfo(); @@ -1556,15 +1602,7 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer { log.info("Starting to buffer updates. " + this); } - // since we blocked updates, this synchronization shouldn't strictly be necessary. - synchronized (this) { - recoveryInfo.positionOfStart = tlog == null ? 0 : tlog.snapshot(); - } - state = State.BUFFERING; - - // currently, buffering is only called by recovery, meaning that there is most likely a gap in updates - operationFlags |= FLAG_GAP; } finally { versionInfo.unblockUpdates(); } @@ -1580,25 +1618,24 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer { log.info("Dropping buffered updates " + this); } - // since we blocked updates, this synchronization shouldn't strictly be necessary. - synchronized (this) { - if (tlog != null) { - tlog.rollback(recoveryInfo.positionOfStart); - } - } + dropBufferTlog(); state = State.ACTIVE; - operationFlags &= ~FLAG_GAP; - } catch (IOException e) { - SolrException.log(log,"Error attempting to roll back log", e); - return false; - } - finally { + } finally { versionInfo.unblockUpdates(); } return true; } + private void dropBufferTlog() { + synchronized (this) { + if (bufferTlog != null) { + bufferTlog.decref(); + bufferTlog = null; + } + } + } + /** Returns the Future to wait on, or null if no replay was needed */ public Future applyBufferedUpdates() { @@ -1612,27 +1649,30 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer { try { cancelApplyBufferUpdate = false; if (state != State.BUFFERING) return null; - operationFlags &= ~FLAG_GAP; - // handle case when no log was even created because no updates - // were received. - if (tlog == null) { - state = State.ACTIVE; - return null; + synchronized (this) { + // handle case when no updates were received. + if (bufferTlog == null) { + state = State.ACTIVE; + return null; + } + bufferTlog.incref(); } - tlog.incref(); + state = State.APPLYING_BUFFERED; } finally { versionInfo.unblockUpdates(); } if (recoveryExecutor.isShutdown()) { - tlog.decref(); throw new RuntimeException("executor is not running..."); } ExecutorCompletionService cs = new ExecutorCompletionService<>(recoveryExecutor); - LogReplayer replayer = new LogReplayer(Arrays.asList(new TransactionLog[]{tlog}), true); - return cs.submit(replayer, recoveryInfo); + LogReplayer replayer = new LogReplayer(Collections.singletonList(bufferTlog), true); + return cs.submit(() -> { + replayer.run(); + dropBufferTlog(); + }, recoveryInfo); } public State getState() { @@ -1903,10 +1943,7 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer { if (!activeLog) { // if we are replaying an old tlog file, we need to add a commit to the end // so we don't replay it again if we restart right after. - - // if the last operation we replayed had FLAG_GAP set, we want to use that again so we don't lose it - // as the flag on the last operation. - translog.writeCommit(cmd, operationFlags | (operationAndFlags & ~OPERATION_MASK)); + translog.writeCommit(cmd); } try { @@ -2037,10 +2074,6 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer { return cmd; } - public void cancelApplyBufferedUpdates() { - this.cancelApplyBufferUpdate = true; - } - ThreadPoolExecutor recoveryExecutor = new ExecutorUtil.MDCAwareThreadPoolExecutor(0, Integer.MAX_VALUE, 1, TimeUnit.SECONDS, new SynchronousQueue(), new DefaultSolrThreadFactory("recoveryExecutor")); diff --git a/solr/core/src/test/org/apache/solr/search/TestRecovery.java b/solr/core/src/test/org/apache/solr/search/TestRecovery.java index 1d622076c99..1b79cee61c1 100644 --- a/solr/core/src/test/org/apache/solr/search/TestRecovery.java +++ b/solr/core/src/test/org/apache/solr/search/TestRecovery.java @@ -24,7 +24,9 @@ import com.codahale.metrics.Gauge; import com.codahale.metrics.Meter; import com.codahale.metrics.Metric; import com.codahale.metrics.MetricRegistry; +import org.apache.solr.common.util.TimeSource; import org.apache.solr.metrics.SolrMetricManager; +import org.apache.solr.util.TimeOut; import org.noggit.ObjectBuilder; import org.slf4j.Logger; @@ -820,6 +822,7 @@ public class TestRecovery extends SolrTestCaseJ4 { +"]" ); + // Note that the v101->v103 are dropped, therefore it does not present in RTG assertJQ(req("qt","/get", "getVersions","6") ,"=={'versions':["+String.join(",",v206,v205,v201,v200,v105,v104)+"]}" ); @@ -929,7 +932,6 @@ public class TestRecovery extends SolrTestCaseJ4 { ,"=={'versions':["+v105+","+v104+"]}" ); - // this time add some docs first before buffering starts (so tlog won't be at pos 0) updateJ(jsonAdd(sdoc("id","c100", "_version_",v200)), params(DISTRIB_UPDATE_PARAM,FROM_LEADER)); updateJ(jsonAdd(sdoc("id","c101", "_version_",v201)), params(DISTRIB_UPDATE_PARAM,FROM_LEADER)); @@ -957,10 +959,8 @@ public class TestRecovery extends SolrTestCaseJ4 { +"" +"]" ); - // The updates that were buffered (but never applied) still appear in recent versions! - // This is good for some uses, but may not be good for others. - assertJQ(req("qt","/get", "getVersions","11") - ,"=={'versions':["+String.join(",",v206,v205,v204,v203,v201,v200,v105,v104,v103,v102,v101)+"]}" + assertJQ(req("qt","/get", "getVersions","6") + ,"=={'versions':["+String.join(",",v206,v205,v201,v200,v105,v104)+"]}" ); assertEquals(UpdateLog.State.ACTIVE, ulog.getState()); // leave each test method in a good state @@ -1008,13 +1008,9 @@ public class TestRecovery extends SolrTestCaseJ4 { @Test - public void testBufferingFlags() throws Exception { + public void testExistOldBufferLog() throws Exception { DirectUpdateHandler2.commitOnClose = false; - final Semaphore logReplayFinish = new Semaphore(0); - - UpdateLog.testing_logReplayFinishHook = () -> logReplayFinish.release(); - SolrQueryRequest req = req(); UpdateHandler uhandler = req.getCore().getUpdateHandler(); @@ -1024,9 +1020,6 @@ public class TestRecovery extends SolrTestCaseJ4 { String v101 = getNextVersion(); String v102 = getNextVersion(); String v103 = getNextVersion(); - String v114 = getNextVersion(); - String v115 = getNextVersion(); - String v116 = getNextVersion(); String v117 = getNextVersion(); clearIndex(); @@ -1049,14 +1042,10 @@ public class TestRecovery extends SolrTestCaseJ4 { uhandler = req.getCore().getUpdateHandler(); ulog = uhandler.getUpdateLog(); - logReplayFinish.acquire(); // wait for replay to finish - - assertTrue((ulog.getStartingOperation() & UpdateLog.FLAG_GAP) != 0); // since we died while buffering, we should see this last - - // - // Try again to ensure that the previous log replay didn't wipe out our flags - // + // the core does not replay updates from buffer tlog on startup + assertTrue(ulog.existOldBufferLog()); // since we died while buffering, we should see this last + // buffer tlog won't be removed on restart req.close(); h.close(); createCore(); @@ -1065,26 +1054,9 @@ public class TestRecovery extends SolrTestCaseJ4 { uhandler = req.getCore().getUpdateHandler(); ulog = uhandler.getUpdateLog(); - assertTrue((ulog.getStartingOperation() & UpdateLog.FLAG_GAP) != 0); - - // now do some normal non-buffered adds - updateJ(jsonAdd(sdoc("id","Q4", "_version_",v114)), params(DISTRIB_UPDATE_PARAM,FROM_LEADER)); - updateJ(jsonAdd(sdoc("id","Q5", "_version_",v115)), params(DISTRIB_UPDATE_PARAM,FROM_LEADER)); - updateJ(jsonAdd(sdoc("id","Q6", "_version_",v116)), params(DISTRIB_UPDATE_PARAM,FROM_LEADER)); - assertU(commit()); - - req.close(); - h.close(); - createCore(); - - req = req(); - uhandler = req.getCore().getUpdateHandler(); - ulog = uhandler.getUpdateLog(); - - assertTrue((ulog.getStartingOperation() & UpdateLog.FLAG_GAP) == 0); + assertTrue(ulog.existOldBufferLog()); ulog.bufferUpdates(); - // simulate receiving no updates ulog.applyBufferedUpdates(); updateJ(jsonAdd(sdoc("id","Q7", "_version_",v117)), params(DISTRIB_UPDATE_PARAM,FROM_LEADER)); // do another add to make sure flags are back to normal @@ -1096,10 +1068,12 @@ public class TestRecovery extends SolrTestCaseJ4 { uhandler = req.getCore().getUpdateHandler(); ulog = uhandler.getUpdateLog(); - assertTrue((ulog.getStartingOperation() & UpdateLog.FLAG_GAP) == 0); // check flags on Q7 - - logReplayFinish.acquire(); - assertEquals(UpdateLog.State.ACTIVE, ulog.getState()); // leave each test method in a good state + assertFalse(ulog.existOldBufferLog()); + // Timeout for Q7 get replayed, because it was added on tlog, therefore it will be replayed on restart + TimeOut timeout = new TimeOut(10, TimeUnit.SECONDS, TimeSource.NANO_TIME); + timeout.waitFor("Timeout waiting for finish replay updates", + () -> h.getCore().getUpdateHandler().getUpdateLog().getState() == UpdateLog.State.ACTIVE); + assertJQ(req("qt","/get", "id", "Q7") ,"/doc/id==Q7"); } finally { DirectUpdateHandler2.commitOnClose = true; UpdateLog.testing_logReplayHook = null; diff --git a/solr/core/src/test/org/apache/solr/search/TestRecoveryHdfs.java b/solr/core/src/test/org/apache/solr/search/TestRecoveryHdfs.java index e6bb9a6edb0..1796319295d 100644 --- a/solr/core/src/test/org/apache/solr/search/TestRecoveryHdfs.java +++ b/solr/core/src/test/org/apache/solr/search/TestRecoveryHdfs.java @@ -44,6 +44,7 @@ import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.solr.SolrTestCaseJ4; import org.apache.solr.cloud.hdfs.HdfsTestUtil; import org.apache.solr.common.util.IOUtils; +import org.apache.solr.common.util.TimeSource; import org.apache.solr.request.SolrQueryRequest; import org.apache.solr.update.DirectUpdateHandler2; import org.apache.solr.update.HdfsUpdateLog; @@ -51,6 +52,7 @@ import org.apache.solr.update.UpdateHandler; import org.apache.solr.update.UpdateLog; import org.apache.solr.update.processor.DistributedUpdateProcessor.DistribPhase; import org.apache.solr.util.BadHdfsThreadsFilter; +import org.apache.solr.util.TimeOut; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Ignore; @@ -515,13 +517,9 @@ public class TestRecoveryHdfs extends SolrTestCaseJ4 { @Test - public void testBufferingFlags() throws Exception { + public void testExistOldBufferLog() throws Exception { DirectUpdateHandler2.commitOnClose = false; - final Semaphore logReplayFinish = new Semaphore(0); - - UpdateLog.testing_logReplayFinishHook = () -> logReplayFinish.release(); - SolrQueryRequest req = req(); UpdateHandler uhandler = req.getCore().getUpdateHandler(); @@ -548,14 +546,10 @@ public class TestRecoveryHdfs extends SolrTestCaseJ4 { uhandler = req.getCore().getUpdateHandler(); ulog = uhandler.getUpdateLog(); - logReplayFinish.acquire(); // wait for replay to finish - - assertTrue((ulog.getStartingOperation() & UpdateLog.FLAG_GAP) != 0); // since we died while buffering, we should see this last - - // - // Try again to ensure that the previous log replay didn't wipe out our flags - // + // the core no longer replay updates from buffer tlog on startup + assertTrue(ulog.existOldBufferLog()); // since we died while buffering, we should see this last + // buffer tlog won't be removed on restart req.close(); h.close(); createCore(); @@ -564,23 +558,7 @@ public class TestRecoveryHdfs extends SolrTestCaseJ4 { uhandler = req.getCore().getUpdateHandler(); ulog = uhandler.getUpdateLog(); - assertTrue((ulog.getStartingOperation() & UpdateLog.FLAG_GAP) != 0); - - // now do some normal non-buffered adds - updateJ(jsonAdd(sdoc("id","Q4", "_version_","114")), params(DISTRIB_UPDATE_PARAM,FROM_LEADER)); - updateJ(jsonAdd(sdoc("id","Q5", "_version_","115")), params(DISTRIB_UPDATE_PARAM,FROM_LEADER)); - updateJ(jsonAdd(sdoc("id","Q6", "_version_","116")), params(DISTRIB_UPDATE_PARAM,FROM_LEADER)); - assertU(commit()); - - req.close(); - h.close(); - createCore(); - - req = req(); - uhandler = req.getCore().getUpdateHandler(); - ulog = uhandler.getUpdateLog(); - - assertTrue((ulog.getStartingOperation() & UpdateLog.FLAG_GAP) == 0); + assertTrue(ulog.existOldBufferLog()); ulog.bufferUpdates(); // simulate receiving no updates @@ -595,10 +573,12 @@ public class TestRecoveryHdfs extends SolrTestCaseJ4 { uhandler = req.getCore().getUpdateHandler(); ulog = uhandler.getUpdateLog(); - assertTrue((ulog.getStartingOperation() & UpdateLog.FLAG_GAP) == 0); // check flags on Q7 - - logReplayFinish.acquire(); - assertEquals(UpdateLog.State.ACTIVE, ulog.getState()); // leave each test method in a good state + assertFalse(ulog.existOldBufferLog()); + // Timeout for Q7 get replayed, because it was added on tlog, therefore it will be replayed on restart + TimeOut timeout = new TimeOut(10, TimeUnit.SECONDS, TimeSource.NANO_TIME); + timeout.waitFor("Timeout waiting for finish replay updates", + () -> h.getCore().getUpdateHandler().getUpdateLog().getState() == UpdateLog.State.ACTIVE); + assertJQ(req("qt","/get", "id", "Q7") ,"/doc/id==Q7"); } finally { DirectUpdateHandler2.commitOnClose = true; UpdateLog.testing_logReplayHook = null; diff --git a/solr/core/src/test/org/apache/solr/update/TransactionLogTest.java b/solr/core/src/test/org/apache/solr/update/TransactionLogTest.java index 1bf4ad41978..d2b4b26df01 100644 --- a/solr/core/src/test/org/apache/solr/update/TransactionLogTest.java +++ b/solr/core/src/test/org/apache/solr/update/TransactionLogTest.java @@ -35,7 +35,7 @@ public class TransactionLogTest extends LuceneTestCase { transactionLog.lastAddSize = 2000000000; AddUpdateCommand updateCommand = new AddUpdateCommand(null); updateCommand.solrDoc = new SolrInputDocument(); - transactionLog.write(updateCommand, 0); + transactionLog.write(updateCommand); } }