mirror of https://github.com/apache/lucene.git
SOLR-9922: Write buffering updates to another tlog
This commit is contained in:
parent
3dc4fa199c
commit
ab316bbc91
File diff suppressed because it is too large
Load Diff
|
@ -325,6 +325,8 @@ Optimizations
|
|||
SolrConstantScoreQuery as well. QWF since v5.4.0 sometimes needlessly internally executed and cached the query.
|
||||
Affects ExpandComponent, ChildDocTransformer, CurrencyFieldType, TermsQParser. (David Smiley)
|
||||
|
||||
* SOLR-9922: Write buffering updates to another tlog. (Cao Manh Dat)
|
||||
|
||||
Other Changes
|
||||
----------------------
|
||||
|
||||
|
|
|
@ -449,7 +449,6 @@ public class RecoveryStrategy implements Runnable, Closeable {
|
|||
|
||||
// TODO: perhaps make this grab a new core each time through the loop to handle core reloads?
|
||||
final public void doSyncOrReplicateRecovery(SolrCore core) throws Exception {
|
||||
boolean replayed = false;
|
||||
boolean successfulRecovery = false;
|
||||
|
||||
UpdateLog ulog;
|
||||
|
@ -500,8 +499,7 @@ public class RecoveryStrategy implements Runnable, Closeable {
|
|||
// when we went down. We may have received updates since then.
|
||||
recentVersions = startingVersions;
|
||||
try {
|
||||
if ((ulog.getStartingOperation() & UpdateLog.FLAG_GAP) != 0) {
|
||||
// last operation at the time of startup had the GAP flag set...
|
||||
if (ulog.existOldBufferLog()) {
|
||||
// this means we were previously doing a full index replication
|
||||
// that probably didn't complete and buffering updates in the
|
||||
// meantime.
|
||||
|
@ -542,9 +540,9 @@ public class RecoveryStrategy implements Runnable, Closeable {
|
|||
}
|
||||
|
||||
LOG.info("Begin buffering updates. core=[{}]", coreName);
|
||||
// recalling buffer updates will drop the old buffer tlog
|
||||
ulog.bufferUpdates();
|
||||
replayed = false;
|
||||
|
||||
|
||||
LOG.info("Publishing state of core [{}] as recovering, leader is [{}] and I am [{}]", core.getName(), leader.getCoreUrl(),
|
||||
ourUrl);
|
||||
zkController.publish(core.getCoreDescriptor(), Replica.State.RECOVERING);
|
||||
|
@ -603,8 +601,7 @@ public class RecoveryStrategy implements Runnable, Closeable {
|
|||
|
||||
LOG.info("Replaying updates buffered during PeerSync.");
|
||||
replay(core);
|
||||
replayed = true;
|
||||
|
||||
|
||||
// sync success
|
||||
successfulRecovery = true;
|
||||
return;
|
||||
|
@ -630,8 +627,7 @@ public class RecoveryStrategy implements Runnable, Closeable {
|
|||
}
|
||||
|
||||
replayFuture = replay(core);
|
||||
replayed = true;
|
||||
|
||||
|
||||
if (isClosed()) {
|
||||
LOG.info("RecoveryStrategy has been closed");
|
||||
break;
|
||||
|
@ -650,21 +646,6 @@ public class RecoveryStrategy implements Runnable, Closeable {
|
|||
} catch (Exception e) {
|
||||
SolrException.log(LOG, "Error while trying to recover. core=" + coreName, e);
|
||||
} finally {
|
||||
if (!replayed) {
|
||||
// dropBufferedUpdate()s currently only supports returning to ACTIVE state, which risks additional updates
|
||||
// being added w/o UpdateLog.FLAG_GAP, hence losing the info on restart that we are not up-to-date.
|
||||
// For now, ulog will simply remain in BUFFERING state, and an additional call to bufferUpdates() will
|
||||
// reset our starting point for playback.
|
||||
LOG.info("Replay not started, or was not successful... still buffering updates.");
|
||||
|
||||
/** this prev code is retained in case we want to switch strategies.
|
||||
try {
|
||||
ulog.dropBufferedUpdates();
|
||||
} catch (Exception e) {
|
||||
SolrException.log(log, "", e);
|
||||
}
|
||||
**/
|
||||
}
|
||||
if (successfulRecovery) {
|
||||
LOG.info("Registering as Active after recovery.");
|
||||
try {
|
||||
|
|
|
@ -97,7 +97,7 @@ public class ReplicateFromLeader {
|
|||
new ModifiableSolrParams());
|
||||
CommitUpdateCommand cuc = new CommitUpdateCommand(req, false);
|
||||
cuc.setVersion(Long.parseLong(commitVersion));
|
||||
updateLog.copyOverOldUpdates(cuc);
|
||||
updateLog.commitAndSwitchToNewTlog(cuc);
|
||||
lastVersion = Long.parseLong(commitVersion);
|
||||
}
|
||||
});
|
||||
|
|
|
@ -41,7 +41,7 @@ import org.slf4j.LoggerFactory;
|
|||
* methods {@link #incref()}, {@link #close()} and {@link #reopenOutputStream()}.</li>
|
||||
* <li>encode the number of records in the tlog file in the last commit record. The number of records will be
|
||||
* decoded and reuse if the tlog file is reopened. This is achieved by extending the constructor, and the
|
||||
* methods {@link #writeCommit(CommitUpdateCommand, int)} and {@link #getReader(long)}.</li>
|
||||
* methods {@link #writeCommit(CommitUpdateCommand)} and {@link #getReader(long)}.</li>
|
||||
* </ul>
|
||||
*/
|
||||
public class CdcrTransactionLog extends TransactionLog {
|
||||
|
@ -108,7 +108,7 @@ public class CdcrTransactionLog extends TransactionLog {
|
|||
}
|
||||
|
||||
@Override
|
||||
public long write(AddUpdateCommand cmd, long prevPointer, int flags) {
|
||||
public long write(AddUpdateCommand cmd, long prevPointer) {
|
||||
assert (-1 <= prevPointer && (cmd.isInPlaceUpdate() || (-1 == prevPointer)));
|
||||
|
||||
LogCodec codec = new LogCodec(resolver);
|
||||
|
@ -125,7 +125,7 @@ public class CdcrTransactionLog extends TransactionLog {
|
|||
codec.init(out);
|
||||
if (cmd.isInPlaceUpdate()) {
|
||||
codec.writeTag(JavaBinCodec.ARR, 6);
|
||||
codec.writeInt(UpdateLog.UPDATE_INPLACE | flags); // should just take one byte
|
||||
codec.writeInt(UpdateLog.UPDATE_INPLACE); // should just take one byte
|
||||
codec.writeLong(cmd.getVersion());
|
||||
codec.writeLong(prevPointer);
|
||||
codec.writeLong(cmd.prevVersion);
|
||||
|
@ -141,7 +141,7 @@ public class CdcrTransactionLog extends TransactionLog {
|
|||
|
||||
} else {
|
||||
codec.writeTag(JavaBinCodec.ARR, 4);
|
||||
codec.writeInt(UpdateLog.ADD | flags); // should just take one byte
|
||||
codec.writeInt(UpdateLog.ADD); // should just take one byte
|
||||
codec.writeLong(cmd.getVersion());
|
||||
if (cmd.getReq().getParamString().contains(CdcrUpdateProcessor.CDCR_UPDATE)) {
|
||||
// if the update is received via cdcr source; add extra boolean entry
|
||||
|
@ -179,7 +179,7 @@ public class CdcrTransactionLog extends TransactionLog {
|
|||
}
|
||||
|
||||
@Override
|
||||
public long writeDelete(DeleteUpdateCommand cmd, int flags) {
|
||||
public long writeDelete(DeleteUpdateCommand cmd) {
|
||||
LogCodec codec = new LogCodec(resolver);
|
||||
|
||||
try {
|
||||
|
@ -190,7 +190,7 @@ public class CdcrTransactionLog extends TransactionLog {
|
|||
MemOutputStream out = new MemOutputStream(new byte[20 + br.length]);
|
||||
codec.init(out);
|
||||
codec.writeTag(JavaBinCodec.ARR, 4);
|
||||
codec.writeInt(UpdateLog.DELETE | flags); // should just take one byte
|
||||
codec.writeInt(UpdateLog.DELETE); // should just take one byte
|
||||
codec.writeLong(cmd.getVersion());
|
||||
codec.writeByteArray(br.bytes, br.offset, br.length);
|
||||
if (cmd.getReq().getParamString().contains(CdcrUpdateProcessor.CDCR_UPDATE)) {
|
||||
|
@ -217,7 +217,7 @@ public class CdcrTransactionLog extends TransactionLog {
|
|||
}
|
||||
|
||||
@Override
|
||||
public long writeDeleteByQuery(DeleteUpdateCommand cmd, int flags) {
|
||||
public long writeDeleteByQuery(DeleteUpdateCommand cmd) {
|
||||
LogCodec codec = new LogCodec(resolver);
|
||||
try {
|
||||
checkWriteHeader(codec, null);
|
||||
|
@ -225,7 +225,7 @@ public class CdcrTransactionLog extends TransactionLog {
|
|||
MemOutputStream out = new MemOutputStream(new byte[20 + (cmd.query.length())]);
|
||||
codec.init(out);
|
||||
codec.writeTag(JavaBinCodec.ARR, 4);
|
||||
codec.writeInt(UpdateLog.DELETE_BY_QUERY | flags); // should just take one byte
|
||||
codec.writeInt(UpdateLog.DELETE_BY_QUERY); // should just take one byte
|
||||
codec.writeLong(cmd.getVersion());
|
||||
codec.writeStr(cmd.query);
|
||||
if (cmd.getReq().getParamString().contains(CdcrUpdateProcessor.CDCR_UPDATE)) {
|
||||
|
@ -249,7 +249,7 @@ public class CdcrTransactionLog extends TransactionLog {
|
|||
}
|
||||
|
||||
@Override
|
||||
public long writeCommit(CommitUpdateCommand cmd, int flags) {
|
||||
public long writeCommit(CommitUpdateCommand cmd) {
|
||||
LogCodec codec = new LogCodec(resolver);
|
||||
synchronized (this) {
|
||||
try {
|
||||
|
@ -261,7 +261,7 @@ public class CdcrTransactionLog extends TransactionLog {
|
|||
}
|
||||
codec.init(fos);
|
||||
codec.writeTag(JavaBinCodec.ARR, 4);
|
||||
codec.writeInt(UpdateLog.COMMIT | flags); // should just take one byte
|
||||
codec.writeInt(UpdateLog.COMMIT); // should just take one byte
|
||||
codec.writeLong(cmd.getVersion());
|
||||
codec.writeTag(JavaBinCodec.INT); // Enforce the encoding of a plain integer, to simplify decoding
|
||||
fos.writeInt(numRecords + 1); // the number of records in the file - +1 to account for the commit operation being written
|
||||
|
|
|
@ -352,7 +352,6 @@ public class CdcrUpdateLog extends UpdateLog {
|
|||
long latestVersion = startingUpdates.getMaxRecentVersion();
|
||||
try {
|
||||
startingVersions = startingUpdates.getVersions(numRecordsToKeep);
|
||||
startingOperation = startingUpdates.getLatestOperation();
|
||||
|
||||
// populate recent deletes list (since we can't get that info from the index)
|
||||
for (int i=startingUpdates.deleteList.size()-1; i>=0; i--) {
|
||||
|
@ -389,9 +388,7 @@ public class CdcrUpdateLog extends UpdateLog {
|
|||
*/
|
||||
private void copyBufferedUpdates(File tlogSrc, long offsetSrc, long latestVersion) {
|
||||
recoveryInfo = new RecoveryInfo();
|
||||
recoveryInfo.positionOfStart = tlog == null ? 0 : tlog.snapshot();
|
||||
state = State.BUFFERING;
|
||||
operationFlags |= FLAG_GAP;
|
||||
|
||||
ModifiableSolrParams params = new ModifiableSolrParams();
|
||||
params.set(DistributingUpdateProcessorFactory.DISTRIB_UPDATE_PARAM, DistributedUpdateProcessor.DistribPhase.FROMLEADER.toString());
|
||||
|
|
|
@ -166,20 +166,6 @@ public class HdfsTransactionLog extends TransactionLog {
|
|||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
// This could mess with any readers or reverse readers that are open, or anything that might try to do a log lookup.
|
||||
// This should only be used to roll back buffered updates, not actually applied updates.
|
||||
@Override
|
||||
public void rollback(long pos) throws IOException {
|
||||
synchronized (this) {
|
||||
assert snapshot_size == pos;
|
||||
ensureFlushed();
|
||||
// TODO: how do we rollback with hdfs?? We need HDFS-3107
|
||||
fos.setWritten(pos);
|
||||
assert fos.size() == pos;
|
||||
numRecords = snapshot_numRecords;
|
||||
}
|
||||
}
|
||||
|
||||
private void readHeader(FastInputStream fis) throws IOException {
|
||||
// read existing header
|
||||
|
@ -210,7 +196,7 @@ public class HdfsTransactionLog extends TransactionLog {
|
|||
}
|
||||
|
||||
@Override
|
||||
public long writeCommit(CommitUpdateCommand cmd, int flags) {
|
||||
public long writeCommit(CommitUpdateCommand cmd) {
|
||||
LogCodec codec = new LogCodec(resolver);
|
||||
synchronized (this) {
|
||||
try {
|
||||
|
@ -223,7 +209,7 @@ public class HdfsTransactionLog extends TransactionLog {
|
|||
|
||||
codec.init(fos);
|
||||
codec.writeTag(JavaBinCodec.ARR, 3);
|
||||
codec.writeInt(UpdateLog.COMMIT | flags); // should just take one byte
|
||||
codec.writeInt(UpdateLog.COMMIT); // should just take one byte
|
||||
codec.writeLong(cmd.getVersion());
|
||||
codec.writeStr(END_MESSAGE); // ensure these bytes are (almost) last in the file
|
||||
|
||||
|
|
|
@ -65,37 +65,6 @@ public class HdfsUpdateLog extends UpdateLog {
|
|||
this.confDir = confDir;
|
||||
}
|
||||
|
||||
// HACK
|
||||
// while waiting for HDFS-3107, instead of quickly
|
||||
// dropping, we slowly apply
|
||||
// This is somewhat brittle, but current usage
|
||||
// allows for it
|
||||
@Override
|
||||
public boolean dropBufferedUpdates() {
|
||||
versionInfo.blockUpdates();
|
||||
try {
|
||||
if (state != State.BUFFERING) return false;
|
||||
|
||||
if (log.isInfoEnabled()) {
|
||||
log.info("Dropping buffered updates " + this);
|
||||
}
|
||||
|
||||
// since we blocked updates, this synchronization shouldn't strictly be
|
||||
// necessary.
|
||||
synchronized (this) {
|
||||
if (tlog != null) {
|
||||
// tlog.rollback(recoveryInfo.positionOfStart);
|
||||
}
|
||||
}
|
||||
|
||||
state = State.ACTIVE;
|
||||
operationFlags &= ~FLAG_GAP;
|
||||
} finally {
|
||||
versionInfo.unblockUpdates();
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void init(PluginInfo info) {
|
||||
super.init(info);
|
||||
|
@ -186,6 +155,11 @@ public class HdfsUpdateLog extends UpdateLog {
|
|||
throw new RuntimeException("Problem creating directory: " + tlogDir, e);
|
||||
}
|
||||
}
|
||||
|
||||
String[] oldBufferTlog = getBufferLogList(fs, tlogDir);
|
||||
if (oldBufferTlog != null && oldBufferTlog.length != 0) {
|
||||
existOldBufferLog = true;
|
||||
}
|
||||
|
||||
tlogFiles = getLogList(fs, tlogDir);
|
||||
id = getLastLogId() + 1; // add 1 since we will create a new log for the
|
||||
|
@ -241,7 +215,6 @@ public class HdfsUpdateLog extends UpdateLog {
|
|||
// non-complete tlogs.
|
||||
try (RecentUpdates startingUpdates = getRecentUpdates()) {
|
||||
startingVersions = startingUpdates.getVersions(getNumRecordsToKeep());
|
||||
startingOperation = startingUpdates.getLatestOperation();
|
||||
|
||||
// populate recent deletes list (since we can't get that info from the
|
||||
// index)
|
||||
|
@ -269,6 +242,23 @@ public class HdfsUpdateLog extends UpdateLog {
|
|||
public String getLogDir() {
|
||||
return tlogDir.toUri().toString();
|
||||
}
|
||||
|
||||
public static String[] getBufferLogList(FileSystem fs, Path tlogDir) {
|
||||
final String prefix = BUFFER_TLOG_NAME+'.';
|
||||
assert fs != null;
|
||||
FileStatus[] fileStatuses;
|
||||
try {
|
||||
fileStatuses = fs.listStatus(tlogDir, path -> path.getName().startsWith(prefix));
|
||||
} catch (IOException e) {
|
||||
throw new SolrException(ErrorCode.SERVER_ERROR, "Failed on listing old buffer tlog", e);
|
||||
}
|
||||
|
||||
String[] names = new String[fileStatuses.length];
|
||||
for (int i = 0; i < fileStatuses.length; i++) {
|
||||
names[i] = fileStatuses[i].getPath().getName();
|
||||
}
|
||||
return names;
|
||||
}
|
||||
|
||||
public static String[] getLogList(FileSystem fs, Path tlogDir) {
|
||||
final String prefix = TLOG_NAME + '.';
|
||||
|
@ -307,7 +297,35 @@ public class HdfsUpdateLog extends UpdateLog {
|
|||
IOUtils.closeQuietly(fs);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
protected void ensureBufferTlog() {
|
||||
if (bufferTlog != null) return;
|
||||
String newLogName = String.format(Locale.ROOT, LOG_FILENAME_PATTERN, BUFFER_TLOG_NAME, System.nanoTime());
|
||||
bufferTlog = new HdfsTransactionLog(fs, new Path(tlogDir, newLogName),
|
||||
globalStrings, tlogDfsReplication);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void deleteBufferLogs() {
|
||||
// Delete old buffer logs
|
||||
String[] oldBufferTlog = getBufferLogList(fs, tlogDir);
|
||||
if (oldBufferTlog != null && oldBufferTlog.length != 0) {
|
||||
for (String oldBufferLogName : oldBufferTlog) {
|
||||
Path f = new Path(tlogDir, oldBufferLogName);
|
||||
try {
|
||||
boolean s = fs.delete(f, false);
|
||||
if (!s) {
|
||||
log.error("Could not remove old buffer tlog file:" + f);
|
||||
}
|
||||
} catch (IOException e) {
|
||||
// No need to bubble up this exception, because it won't cause any problems on recovering
|
||||
log.error("Could not remove old buffer tlog file:" + f, e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void ensureLog() {
|
||||
if (tlog == null) {
|
||||
|
|
|
@ -85,9 +85,6 @@ public class TransactionLog implements Closeable {
|
|||
Map<String,Integer> globalStringMap = new HashMap<>();
|
||||
List<String> globalStringList = new ArrayList<>();
|
||||
|
||||
long snapshot_size;
|
||||
int snapshot_numRecords;
|
||||
|
||||
// write a BytesRef as a byte array
|
||||
static final JavaBinCodec.ObjectResolver resolver = new JavaBinCodec.ObjectResolver() {
|
||||
@Override
|
||||
|
@ -153,7 +150,7 @@ public class TransactionLog implements Closeable {
|
|||
|
||||
// Parse tlog id from the filename
|
||||
String filename = tlogFile.getName();
|
||||
id = Long.parseLong(filename.substring(filename.indexOf('.') + 1, filename.indexOf('.') + 20));
|
||||
id = Long.parseLong(filename.substring(filename.lastIndexOf('.')+1));
|
||||
|
||||
this.tlogFile = tlogFile;
|
||||
raf = new RandomAccessFile(this.tlogFile, "rw");
|
||||
|
@ -233,29 +230,6 @@ public class TransactionLog implements Closeable {
|
|||
return true;
|
||||
}
|
||||
|
||||
/** takes a snapshot of the current position and number of records
|
||||
* for later possible rollback, and returns the position */
|
||||
public long snapshot() {
|
||||
synchronized (this) {
|
||||
snapshot_size = fos.size();
|
||||
snapshot_numRecords = numRecords;
|
||||
return snapshot_size;
|
||||
}
|
||||
}
|
||||
|
||||
// This could mess with any readers or reverse readers that are open, or anything that might try to do a log lookup.
|
||||
// This should only be used to roll back buffered updates, not actually applied updates.
|
||||
public void rollback(long pos) throws IOException {
|
||||
synchronized (this) {
|
||||
assert snapshot_size == pos;
|
||||
fos.flush();
|
||||
raf.setLength(pos);
|
||||
fos.setWritten(pos);
|
||||
assert fos.size() == pos;
|
||||
numRecords = snapshot_numRecords;
|
||||
}
|
||||
}
|
||||
|
||||
public long writeData(Object o) {
|
||||
@SuppressWarnings("resource") final LogCodec codec = new LogCodec(resolver);
|
||||
try {
|
||||
|
@ -346,17 +320,16 @@ public class TransactionLog implements Closeable {
|
|||
|
||||
/**
|
||||
* Writes an add update command to the transaction log. This is not applicable for
|
||||
* in-place updates; use {@link #write(AddUpdateCommand, long, int)}.
|
||||
* in-place updates; use {@link #write(AddUpdateCommand, long)}.
|
||||
* (The previous pointer (applicable for in-place updates) is set to -1 while writing
|
||||
* the command to the transaction log.)
|
||||
* @param cmd The add update command to be written
|
||||
* @param flags Options for writing the command to the transaction log
|
||||
* @return Returns the position pointer of the written update command
|
||||
*
|
||||
* @see #write(AddUpdateCommand, long, int)
|
||||
* @see #write(AddUpdateCommand, long)
|
||||
*/
|
||||
public long write(AddUpdateCommand cmd, int flags) {
|
||||
return write(cmd, -1, flags);
|
||||
public long write(AddUpdateCommand cmd) {
|
||||
return write(cmd, -1);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -365,10 +338,9 @@ public class TransactionLog implements Closeable {
|
|||
* @param cmd The add update command to be written
|
||||
* @param prevPointer The pointer in the transaction log which this update depends
|
||||
* on (applicable for in-place updates)
|
||||
* @param flags Options for writing the command to the transaction log
|
||||
* @return Returns the position pointer of the written update command
|
||||
*/
|
||||
public long write(AddUpdateCommand cmd, long prevPointer, int flags) {
|
||||
public long write(AddUpdateCommand cmd, long prevPointer) {
|
||||
assert (-1 <= prevPointer && (cmd.isInPlaceUpdate() || (-1 == prevPointer)));
|
||||
|
||||
LogCodec codec = new LogCodec(resolver);
|
||||
|
@ -386,14 +358,14 @@ public class TransactionLog implements Closeable {
|
|||
codec.init(out);
|
||||
if (cmd.isInPlaceUpdate()) {
|
||||
codec.writeTag(JavaBinCodec.ARR, 5);
|
||||
codec.writeInt(UpdateLog.UPDATE_INPLACE | flags); // should just take one byte
|
||||
codec.writeInt(UpdateLog.UPDATE_INPLACE); // should just take one byte
|
||||
codec.writeLong(cmd.getVersion());
|
||||
codec.writeLong(prevPointer);
|
||||
codec.writeLong(cmd.prevVersion);
|
||||
codec.writeSolrInputDocument(cmd.getSolrInputDocument());
|
||||
} else {
|
||||
codec.writeTag(JavaBinCodec.ARR, 3);
|
||||
codec.writeInt(UpdateLog.ADD | flags); // should just take one byte
|
||||
codec.writeInt(UpdateLog.ADD); // should just take one byte
|
||||
codec.writeLong(cmd.getVersion());
|
||||
codec.writeSolrInputDocument(cmd.getSolrInputDocument());
|
||||
}
|
||||
|
@ -422,7 +394,7 @@ public class TransactionLog implements Closeable {
|
|||
}
|
||||
}
|
||||
|
||||
public long writeDelete(DeleteUpdateCommand cmd, int flags) {
|
||||
public long writeDelete(DeleteUpdateCommand cmd) {
|
||||
LogCodec codec = new LogCodec(resolver);
|
||||
|
||||
try {
|
||||
|
@ -433,7 +405,7 @@ public class TransactionLog implements Closeable {
|
|||
MemOutputStream out = new MemOutputStream(new byte[20 + br.length]);
|
||||
codec.init(out);
|
||||
codec.writeTag(JavaBinCodec.ARR, 3);
|
||||
codec.writeInt(UpdateLog.DELETE | flags); // should just take one byte
|
||||
codec.writeInt(UpdateLog.DELETE); // should just take one byte
|
||||
codec.writeLong(cmd.getVersion());
|
||||
codec.writeByteArray(br.bytes, br.offset, br.length);
|
||||
|
||||
|
@ -452,7 +424,7 @@ public class TransactionLog implements Closeable {
|
|||
|
||||
}
|
||||
|
||||
public long writeDeleteByQuery(DeleteUpdateCommand cmd, int flags) {
|
||||
public long writeDeleteByQuery(DeleteUpdateCommand cmd) {
|
||||
LogCodec codec = new LogCodec(resolver);
|
||||
try {
|
||||
checkWriteHeader(codec, null);
|
||||
|
@ -460,7 +432,7 @@ public class TransactionLog implements Closeable {
|
|||
MemOutputStream out = new MemOutputStream(new byte[20 + (cmd.query.length())]);
|
||||
codec.init(out);
|
||||
codec.writeTag(JavaBinCodec.ARR, 3);
|
||||
codec.writeInt(UpdateLog.DELETE_BY_QUERY | flags); // should just take one byte
|
||||
codec.writeInt(UpdateLog.DELETE_BY_QUERY); // should just take one byte
|
||||
codec.writeLong(cmd.getVersion());
|
||||
codec.writeStr(cmd.query);
|
||||
|
||||
|
@ -478,7 +450,7 @@ public class TransactionLog implements Closeable {
|
|||
}
|
||||
|
||||
|
||||
public long writeCommit(CommitUpdateCommand cmd, int flags) {
|
||||
public long writeCommit(CommitUpdateCommand cmd) {
|
||||
LogCodec codec = new LogCodec(resolver);
|
||||
synchronized (this) {
|
||||
try {
|
||||
|
@ -490,7 +462,7 @@ public class TransactionLog implements Closeable {
|
|||
}
|
||||
codec.init(fos);
|
||||
codec.writeTag(JavaBinCodec.ARR, 3);
|
||||
codec.writeInt(UpdateLog.COMMIT | flags); // should just take one byte
|
||||
codec.writeInt(UpdateLog.COMMIT); // should just take one byte
|
||||
codec.writeLong(cmd.getVersion());
|
||||
codec.writeStr(END_MESSAGE); // ensure these bytes are (almost) last in the file
|
||||
|
||||
|
|
|
@ -96,6 +96,7 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer {
|
|||
private static final long STATUS_TIME = TimeUnit.NANOSECONDS.convert(60, TimeUnit.SECONDS);
|
||||
public static String LOG_FILENAME_PATTERN = "%s.%019d";
|
||||
public static String TLOG_NAME="tlog";
|
||||
public static String BUFFER_TLOG_NAME="buffer.tlog";
|
||||
|
||||
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
|
||||
private boolean debug = log.isDebugEnabled();
|
||||
|
@ -139,11 +140,7 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer {
|
|||
public static final int DELETE_BY_QUERY = 0x03;
|
||||
public static final int COMMIT = 0x04;
|
||||
public static final int UPDATE_INPLACE = 0x08;
|
||||
// Flag indicating that this is a buffered operation, and that a gap exists before buffering started.
|
||||
// for example, if full index replication starts and we are buffering updates, then this flag should
|
||||
// be set to indicate that replaying the log would not bring us into sync (i.e. peersync should
|
||||
// fail if this flag is set on the last update in the tlog).
|
||||
public static final int FLAG_GAP = 0x10;
|
||||
// For backward-compatibility, we should delete this field in 9.0
|
||||
public static final int OPERATION_MASK = 0x0f; // mask off flags to get the operation
|
||||
|
||||
/**
|
||||
|
@ -186,8 +183,8 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer {
|
|||
|
||||
long id = -1;
|
||||
protected State state = State.ACTIVE;
|
||||
protected int operationFlags; // flags to write in the transaction log with operations (i.e. FLAG_GAP)
|
||||
|
||||
protected TransactionLog bufferTlog;
|
||||
protected TransactionLog tlog;
|
||||
protected TransactionLog prevTlog;
|
||||
protected final Deque<TransactionLog> logs = new LinkedList<>(); // list of recent logs, newest first
|
||||
|
@ -206,6 +203,7 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer {
|
|||
protected int maxNumLogsToKeep;
|
||||
protected int numVersionBuckets; // This should only be used to initialize VersionInfo... the actual number of buckets may be rounded up to a power of two.
|
||||
protected Long maxVersionFromIndex = null;
|
||||
protected boolean existOldBufferLog = false;
|
||||
|
||||
// keep track of deletes only... this is not updated on an add
|
||||
protected LinkedHashMap<BytesRef, LogPtr> oldDeletes = new LinkedHashMap<BytesRef, LogPtr>(numDeletesToKeep) {
|
||||
|
@ -244,7 +242,6 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer {
|
|||
volatile UpdateHandler uhandler; // a core reload can change this reference!
|
||||
protected volatile boolean cancelApplyBufferUpdate;
|
||||
List<Long> startingVersions;
|
||||
int startingOperation; // last operation in the logs on startup
|
||||
|
||||
// metrics
|
||||
protected Gauge<Integer> bufferedOpsGauge;
|
||||
|
@ -378,6 +375,10 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer {
|
|||
log.debug("UpdateHandler init: tlogDir=" + tlogDir + ", existing tlogs=" + Arrays.asList(tlogFiles) + ", next id=" + id);
|
||||
}
|
||||
|
||||
String[] oldBufferTlog = getBufferLogList(tlogDir);
|
||||
if (oldBufferTlog != null && oldBufferTlog.length != 0) {
|
||||
existOldBufferLog = true;
|
||||
}
|
||||
TransactionLog oldLog = null;
|
||||
for (String oldLogName : tlogFiles) {
|
||||
File f = new File(tlogDir, oldLogName);
|
||||
|
@ -408,7 +409,6 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer {
|
|||
// TODO: these startingVersions assume that we successfully recover from all non-complete tlogs.
|
||||
try (RecentUpdates startingUpdates = getRecentUpdates()) {
|
||||
startingVersions = startingUpdates.getVersions(numRecordsToKeep);
|
||||
startingOperation = startingUpdates.getLatestOperation();
|
||||
|
||||
// populate recent deletes list (since we can't get that info from the index)
|
||||
for (int i = startingUpdates.deleteList.size() - 1; i >= 0; i--) {
|
||||
|
@ -434,14 +434,16 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer {
|
|||
this.metricManager = manager;
|
||||
this.registryName = registry;
|
||||
bufferedOpsGauge = () -> {
|
||||
if (state == State.BUFFERING) {
|
||||
if (bufferTlog == null) return 0;
|
||||
// numRecords counts header as a record
|
||||
return bufferTlog.numRecords() - 1;
|
||||
}
|
||||
if (tlog == null) {
|
||||
return 0;
|
||||
} else if (state == State.APPLYING_BUFFERED) {
|
||||
// numRecords counts header as a record
|
||||
return tlog.numRecords() - 1 - recoveryInfo.adds - recoveryInfo.deleteByQuery - recoveryInfo.deletes - recoveryInfo.errors;
|
||||
} else if (state == State.BUFFERING) {
|
||||
// numRecords counts header as a record
|
||||
return tlog.numRecords() - 1;
|
||||
} else {
|
||||
return 0;
|
||||
}
|
||||
|
@ -472,8 +474,8 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer {
|
|||
return startingVersions;
|
||||
}
|
||||
|
||||
public int getStartingOperation() {
|
||||
return startingOperation;
|
||||
public boolean existOldBufferLog() {
|
||||
return existOldBufferLog;
|
||||
}
|
||||
|
||||
/* Takes over ownership of the log, keeping it until no longer needed
|
||||
|
@ -509,6 +511,19 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer {
|
|||
logs.addFirst(oldLog);
|
||||
}
|
||||
|
||||
public String[] getBufferLogList(File directory) {
|
||||
final String prefix = BUFFER_TLOG_NAME+'.';
|
||||
return directory.list((dir, name) -> name.startsWith(prefix));
|
||||
}
|
||||
|
||||
/**
|
||||
* Does update from old tlogs (not from buffer tlog)?
|
||||
* If yes we must skip writing {@code cmd} to current tlog
|
||||
*/
|
||||
private boolean updateFromOldTlogs(UpdateCommand cmd) {
|
||||
return (cmd.getFlags() & UpdateCommand.REPLAY) != 0 && state == State.REPLAYING;
|
||||
}
|
||||
|
||||
public String[] getLogList(File directory) {
|
||||
final String prefix = TLOG_NAME+'.';
|
||||
String[] names = directory.list(new FilenameFilter() {
|
||||
|
@ -541,14 +556,19 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer {
|
|||
// if ((cmd.getFlags() & UpdateCommand.REPLAY) != 0) return;
|
||||
|
||||
synchronized (this) {
|
||||
long pos = -1;
|
||||
if ((cmd.getFlags() & UpdateCommand.BUFFERING) != 0) {
|
||||
ensureBufferTlog();
|
||||
bufferTlog.write(cmd);
|
||||
return;
|
||||
}
|
||||
|
||||
long pos = -1;
|
||||
long prevPointer = getPrevPointerForUpdate(cmd);
|
||||
|
||||
// don't log if we are replaying from another log
|
||||
if ((cmd.getFlags() & UpdateCommand.REPLAY) == 0) {
|
||||
if (!updateFromOldTlogs(cmd)) {
|
||||
ensureLog();
|
||||
pos = tlog.write(cmd, prevPointer, operationFlags);
|
||||
pos = tlog.write(cmd, prevPointer);
|
||||
}
|
||||
|
||||
if (!clearCaches) {
|
||||
|
@ -556,10 +576,7 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer {
|
|||
// Only currently would be useful for RTG while in recovery mode though.
|
||||
LogPtr ptr = new LogPtr(pos, cmd.getVersion(), prevPointer);
|
||||
|
||||
// only update our map if we're not buffering
|
||||
if ((cmd.getFlags() & UpdateCommand.BUFFERING) == 0) {
|
||||
map.put(cmd.getIndexedId(), ptr);
|
||||
}
|
||||
map.put(cmd.getIndexedId(), ptr);
|
||||
|
||||
if (trace) {
|
||||
log.trace("TLOG: added id " + cmd.getPrintableId() + " to " + tlog + " " + ptr + " map=" + System.identityHashCode(map));
|
||||
|
@ -606,22 +623,21 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer {
|
|||
BytesRef br = cmd.getIndexedId();
|
||||
|
||||
synchronized (this) {
|
||||
long pos = -1;
|
||||
if ((cmd.getFlags() & UpdateCommand.BUFFERING) != 0) {
|
||||
ensureBufferTlog();
|
||||
bufferTlog.writeDelete(cmd);
|
||||
return;
|
||||
}
|
||||
|
||||
// don't log if we are replaying from another log
|
||||
if ((cmd.getFlags() & UpdateCommand.REPLAY) == 0) {
|
||||
long pos = -1;
|
||||
if (!updateFromOldTlogs(cmd)) {
|
||||
ensureLog();
|
||||
pos = tlog.writeDelete(cmd, operationFlags);
|
||||
pos = tlog.writeDelete(cmd);
|
||||
}
|
||||
|
||||
LogPtr ptr = new LogPtr(pos, cmd.version);
|
||||
|
||||
// only update our map if we're not buffering
|
||||
if ((cmd.getFlags() & UpdateCommand.BUFFERING) == 0) {
|
||||
map.put(br, ptr);
|
||||
|
||||
oldDeletes.put(br, ptr);
|
||||
}
|
||||
map.put(br, ptr);
|
||||
oldDeletes.put(br, ptr);
|
||||
|
||||
if (trace) {
|
||||
log.trace("TLOG: added delete for id " + cmd.id + " to " + tlog + " " + ptr + " map=" + System.identityHashCode(map));
|
||||
|
@ -631,15 +647,20 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer {
|
|||
|
||||
public void deleteByQuery(DeleteUpdateCommand cmd) {
|
||||
synchronized (this) {
|
||||
long pos = -1;
|
||||
// don't log if we are replaying from another log
|
||||
if ((cmd.getFlags() & UpdateCommand.REPLAY) == 0) {
|
||||
ensureLog();
|
||||
pos = tlog.writeDeleteByQuery(cmd, operationFlags);
|
||||
if ((cmd.getFlags() & UpdateCommand.BUFFERING) != 0) {
|
||||
ensureBufferTlog();
|
||||
bufferTlog.writeDeleteByQuery(cmd);
|
||||
return;
|
||||
}
|
||||
|
||||
// only change our caches if we are not buffering
|
||||
if ((cmd.getFlags() & UpdateCommand.BUFFERING) == 0 && (cmd.getFlags() & UpdateCommand.IGNORE_INDEXWRITER) == 0) {
|
||||
long pos = -1;
|
||||
if (!updateFromOldTlogs(cmd)) {
|
||||
ensureLog();
|
||||
pos = tlog.writeDeleteByQuery(cmd);
|
||||
}
|
||||
|
||||
// skip purge our caches in case of tlog replica
|
||||
if ((cmd.getFlags() & UpdateCommand.IGNORE_INDEXWRITER) == 0) {
|
||||
// given that we just did a delete-by-query, we don't know what documents were
|
||||
// affected and hence we must purge our caches.
|
||||
openRealtimeSearcher();
|
||||
|
@ -802,7 +823,7 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer {
|
|||
if (prevTlog != null) {
|
||||
// if we made it through the commit, write a commit command to the log
|
||||
// TODO: check that this works to cap a tlog we were using to buffer so we don't replay on startup.
|
||||
prevTlog.writeCommit(cmd, operationFlags);
|
||||
prevTlog.writeCommit(cmd);
|
||||
|
||||
addOldLog(prevTlog, true);
|
||||
// the old log list will decref when no longer needed
|
||||
|
@ -1152,9 +1173,16 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer {
|
|||
public void copyOverBufferingUpdates(CommitUpdateCommand cuc) {
|
||||
versionInfo.blockUpdates();
|
||||
try {
|
||||
operationFlags &= ~FLAG_GAP;
|
||||
state = State.ACTIVE;
|
||||
copyAndSwitchToNewTlog(cuc);
|
||||
synchronized (this) {
|
||||
state = State.ACTIVE;
|
||||
if (bufferTlog == null) {
|
||||
return;
|
||||
}
|
||||
// by calling this, we won't switch to new tlog (compared to applyBufferedUpdates())
|
||||
// if we switch to new tlog we can possible lose updates on the next fetch
|
||||
copyOverOldUpdates(cuc.getVersion(), bufferTlog);
|
||||
dropBufferTlog();
|
||||
}
|
||||
} finally {
|
||||
versionInfo.unblockUpdates();
|
||||
}
|
||||
|
@ -1165,33 +1193,25 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer {
|
|||
* So any updates which hasn't made it to the index is preserved in the current tlog
|
||||
* @param cuc any updates that have version larger than the version of cuc will be copied over
|
||||
*/
|
||||
public void copyOverOldUpdates(CommitUpdateCommand cuc) {
|
||||
public void commitAndSwitchToNewTlog(CommitUpdateCommand cuc) {
|
||||
versionInfo.blockUpdates();
|
||||
try {
|
||||
copyAndSwitchToNewTlog(cuc);
|
||||
synchronized (this) {
|
||||
if (tlog == null) {
|
||||
return;
|
||||
}
|
||||
preCommit(cuc);
|
||||
try {
|
||||
copyOverOldUpdates(cuc.getVersion());
|
||||
} finally {
|
||||
postCommit(cuc);
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
versionInfo.unblockUpdates();
|
||||
}
|
||||
}
|
||||
|
||||
protected void copyAndSwitchToNewTlog(CommitUpdateCommand cuc) {
|
||||
synchronized (this) {
|
||||
if (tlog == null) {
|
||||
return;
|
||||
}
|
||||
preCommit(cuc);
|
||||
try {
|
||||
copyOverOldUpdates(cuc.getVersion());
|
||||
} finally {
|
||||
postCommit(cuc);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Copy over updates from prevTlog or last tlog (in tlog folder) to a new tlog
|
||||
* @param commitVersion any updates that have version larger than the commitVersion will be copied over
|
||||
*/
|
||||
public void copyOverOldUpdates(long commitVersion) {
|
||||
TransactionLog oldTlog = prevTlog;
|
||||
if (oldTlog == null && !logs.isEmpty()) {
|
||||
|
@ -1207,6 +1227,14 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer {
|
|||
log.warn("Exception reading log", e);
|
||||
return;
|
||||
}
|
||||
copyOverOldUpdates(commitVersion, oldTlog);
|
||||
}
|
||||
|
||||
/**
|
||||
* Copy over updates from prevTlog or last tlog (in tlog folder) to a new tlog
|
||||
* @param commitVersion any updates that have version larger than the commitVersion will be copied over
|
||||
*/
|
||||
public void copyOverOldUpdates(long commitVersion, TransactionLog oldTlog) {
|
||||
copyOverOldUpdatesMeter.mark();
|
||||
|
||||
SolrQueryRequest req = new LocalSolrQueryRequest(uhandler.core,
|
||||
|
@ -1270,6 +1298,22 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer {
|
|||
}
|
||||
}
|
||||
|
||||
protected void ensureBufferTlog() {
|
||||
if (bufferTlog != null) return;
|
||||
String newLogName = String.format(Locale.ROOT, LOG_FILENAME_PATTERN, BUFFER_TLOG_NAME, System.nanoTime());
|
||||
bufferTlog = newTransactionLog(new File(tlogDir, newLogName), globalStrings, false);
|
||||
}
|
||||
|
||||
// Cleanup old buffer tlogs
|
||||
protected void deleteBufferLogs() {
|
||||
String[] oldBufferTlog = getBufferLogList(tlogDir);
|
||||
if (oldBufferTlog != null && oldBufferTlog.length != 0) {
|
||||
for (String oldBufferLogName : oldBufferTlog) {
|
||||
deleteFile(new File(tlogDir, oldBufferLogName));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
protected void ensureLog() {
|
||||
if (tlog == null) {
|
||||
|
@ -1285,7 +1329,7 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer {
|
|||
// record a commit
|
||||
log.info("Recording current closed for " + uhandler.core + " log=" + theLog);
|
||||
CommitUpdateCommand cmd = new CommitUpdateCommand(new LocalSolrQueryRequest(uhandler.core, new ModifiableSolrParams((SolrParams)null)), false);
|
||||
theLog.writeCommit(cmd, operationFlags);
|
||||
theLog.writeCommit(cmd);
|
||||
}
|
||||
|
||||
theLog.deleteOnClose = false;
|
||||
|
@ -1314,6 +1358,13 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer {
|
|||
log.forceClose();
|
||||
}
|
||||
|
||||
if (bufferTlog != null) {
|
||||
// should not delete bufferTlog on close, existing bufferTlog is a sign for skip peerSync
|
||||
bufferTlog.deleteOnClose = false;
|
||||
bufferTlog.decref();
|
||||
bufferTlog.forceClose();
|
||||
}
|
||||
|
||||
try {
|
||||
ExecutorUtil.shutdownAndAwaitTermination(recoveryExecutor);
|
||||
} catch (Exception e) {
|
||||
|
@ -1347,7 +1398,6 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer {
|
|||
HashMap<Long, Update> updates;
|
||||
List<Update> deleteByQueryList;
|
||||
List<DeleteUpdate> deleteList;
|
||||
int latestOperation;
|
||||
|
||||
public RecentUpdates(Deque<TransactionLog> logList) {
|
||||
this.logList = logList;
|
||||
|
@ -1401,11 +1451,6 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer {
|
|||
return result;
|
||||
}
|
||||
|
||||
public int getLatestOperation() {
|
||||
return latestOperation;
|
||||
}
|
||||
|
||||
|
||||
private void update() {
|
||||
int numUpdates = 0;
|
||||
updateList = new ArrayList<>(logList.size());
|
||||
|
@ -1431,9 +1476,6 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer {
|
|||
|
||||
// TODO: refactor this out so we get common error handling
|
||||
int opAndFlags = (Integer)entry.get(UpdateLog.FLAGS_IDX);
|
||||
if (latestOperation == 0) {
|
||||
latestOperation = opAndFlags;
|
||||
}
|
||||
int oper = opAndFlags & UpdateLog.OPERATION_MASK;
|
||||
long version = (Long) entry.get(UpdateLog.VERSION_IDX);
|
||||
|
||||
|
@ -1525,6 +1567,10 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer {
|
|||
tlog.incref();
|
||||
logList.addFirst(tlog);
|
||||
}
|
||||
if (bufferTlog != null) {
|
||||
bufferTlog.incref();
|
||||
logList.addFirst(bufferTlog);
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: what if I hand out a list of updates, then do an update, then hand out another list (and
|
||||
|
@ -1542,13 +1588,13 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer {
|
|||
// reading state and acting on it in the distributed update processor
|
||||
versionInfo.blockUpdates();
|
||||
try {
|
||||
if (state == State.BUFFERING) {
|
||||
log.info("Restarting buffering. previous=" + recoveryInfo);
|
||||
} else if (state != State.ACTIVE) {
|
||||
if (state != State.ACTIVE && state != State.BUFFERING) {
|
||||
// we don't currently have support for handling other states
|
||||
log.warn("Unexpected state for bufferUpdates: " + state + ", Ignoring request.");
|
||||
return;
|
||||
}
|
||||
dropBufferTlog();
|
||||
deleteBufferLogs();
|
||||
|
||||
recoveryInfo = new RecoveryInfo();
|
||||
|
||||
|
@ -1556,15 +1602,7 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer {
|
|||
log.info("Starting to buffer updates. " + this);
|
||||
}
|
||||
|
||||
// since we blocked updates, this synchronization shouldn't strictly be necessary.
|
||||
synchronized (this) {
|
||||
recoveryInfo.positionOfStart = tlog == null ? 0 : tlog.snapshot();
|
||||
}
|
||||
|
||||
state = State.BUFFERING;
|
||||
|
||||
// currently, buffering is only called by recovery, meaning that there is most likely a gap in updates
|
||||
operationFlags |= FLAG_GAP;
|
||||
} finally {
|
||||
versionInfo.unblockUpdates();
|
||||
}
|
||||
|
@ -1580,25 +1618,24 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer {
|
|||
log.info("Dropping buffered updates " + this);
|
||||
}
|
||||
|
||||
// since we blocked updates, this synchronization shouldn't strictly be necessary.
|
||||
synchronized (this) {
|
||||
if (tlog != null) {
|
||||
tlog.rollback(recoveryInfo.positionOfStart);
|
||||
}
|
||||
}
|
||||
dropBufferTlog();
|
||||
|
||||
state = State.ACTIVE;
|
||||
operationFlags &= ~FLAG_GAP;
|
||||
} catch (IOException e) {
|
||||
SolrException.log(log,"Error attempting to roll back log", e);
|
||||
return false;
|
||||
}
|
||||
finally {
|
||||
} finally {
|
||||
versionInfo.unblockUpdates();
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
private void dropBufferTlog() {
|
||||
synchronized (this) {
|
||||
if (bufferTlog != null) {
|
||||
bufferTlog.decref();
|
||||
bufferTlog = null;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/** Returns the Future to wait on, or null if no replay was needed */
|
||||
public Future<RecoveryInfo> applyBufferedUpdates() {
|
||||
|
@ -1612,27 +1649,30 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer {
|
|||
try {
|
||||
cancelApplyBufferUpdate = false;
|
||||
if (state != State.BUFFERING) return null;
|
||||
operationFlags &= ~FLAG_GAP;
|
||||
|
||||
// handle case when no log was even created because no updates
|
||||
// were received.
|
||||
if (tlog == null) {
|
||||
state = State.ACTIVE;
|
||||
return null;
|
||||
synchronized (this) {
|
||||
// handle case when no updates were received.
|
||||
if (bufferTlog == null) {
|
||||
state = State.ACTIVE;
|
||||
return null;
|
||||
}
|
||||
bufferTlog.incref();
|
||||
}
|
||||
tlog.incref();
|
||||
|
||||
state = State.APPLYING_BUFFERED;
|
||||
} finally {
|
||||
versionInfo.unblockUpdates();
|
||||
}
|
||||
|
||||
if (recoveryExecutor.isShutdown()) {
|
||||
tlog.decref();
|
||||
throw new RuntimeException("executor is not running...");
|
||||
}
|
||||
ExecutorCompletionService<RecoveryInfo> cs = new ExecutorCompletionService<>(recoveryExecutor);
|
||||
LogReplayer replayer = new LogReplayer(Arrays.asList(new TransactionLog[]{tlog}), true);
|
||||
return cs.submit(replayer, recoveryInfo);
|
||||
LogReplayer replayer = new LogReplayer(Collections.singletonList(bufferTlog), true);
|
||||
return cs.submit(() -> {
|
||||
replayer.run();
|
||||
dropBufferTlog();
|
||||
}, recoveryInfo);
|
||||
}
|
||||
|
||||
public State getState() {
|
||||
|
@ -1903,10 +1943,7 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer {
|
|||
if (!activeLog) {
|
||||
// if we are replaying an old tlog file, we need to add a commit to the end
|
||||
// so we don't replay it again if we restart right after.
|
||||
|
||||
// if the last operation we replayed had FLAG_GAP set, we want to use that again so we don't lose it
|
||||
// as the flag on the last operation.
|
||||
translog.writeCommit(cmd, operationFlags | (operationAndFlags & ~OPERATION_MASK));
|
||||
translog.writeCommit(cmd);
|
||||
}
|
||||
|
||||
try {
|
||||
|
@ -2037,10 +2074,6 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer {
|
|||
return cmd;
|
||||
}
|
||||
|
||||
public void cancelApplyBufferedUpdates() {
|
||||
this.cancelApplyBufferUpdate = true;
|
||||
}
|
||||
|
||||
ThreadPoolExecutor recoveryExecutor = new ExecutorUtil.MDCAwareThreadPoolExecutor(0,
|
||||
Integer.MAX_VALUE, 1, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(),
|
||||
new DefaultSolrThreadFactory("recoveryExecutor"));
|
||||
|
|
|
@ -24,7 +24,9 @@ import com.codahale.metrics.Gauge;
|
|||
import com.codahale.metrics.Meter;
|
||||
import com.codahale.metrics.Metric;
|
||||
import com.codahale.metrics.MetricRegistry;
|
||||
import org.apache.solr.common.util.TimeSource;
|
||||
import org.apache.solr.metrics.SolrMetricManager;
|
||||
import org.apache.solr.util.TimeOut;
|
||||
import org.noggit.ObjectBuilder;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
|
@ -820,6 +822,7 @@ public class TestRecovery extends SolrTestCaseJ4 {
|
|||
+"]"
|
||||
);
|
||||
|
||||
// Note that the v101->v103 are dropped, therefore it does not present in RTG
|
||||
assertJQ(req("qt","/get", "getVersions","6")
|
||||
,"=={'versions':["+String.join(",",v206,v205,v201,v200,v105,v104)+"]}"
|
||||
);
|
||||
|
@ -929,7 +932,6 @@ public class TestRecovery extends SolrTestCaseJ4 {
|
|||
,"=={'versions':["+v105+","+v104+"]}"
|
||||
);
|
||||
|
||||
// this time add some docs first before buffering starts (so tlog won't be at pos 0)
|
||||
updateJ(jsonAdd(sdoc("id","c100", "_version_",v200)), params(DISTRIB_UPDATE_PARAM,FROM_LEADER));
|
||||
updateJ(jsonAdd(sdoc("id","c101", "_version_",v201)), params(DISTRIB_UPDATE_PARAM,FROM_LEADER));
|
||||
|
||||
|
@ -957,10 +959,8 @@ public class TestRecovery extends SolrTestCaseJ4 {
|
|||
+"" +"]"
|
||||
);
|
||||
|
||||
// The updates that were buffered (but never applied) still appear in recent versions!
|
||||
// This is good for some uses, but may not be good for others.
|
||||
assertJQ(req("qt","/get", "getVersions","11")
|
||||
,"=={'versions':["+String.join(",",v206,v205,v204,v203,v201,v200,v105,v104,v103,v102,v101)+"]}"
|
||||
assertJQ(req("qt","/get", "getVersions","6")
|
||||
,"=={'versions':["+String.join(",",v206,v205,v201,v200,v105,v104)+"]}"
|
||||
);
|
||||
|
||||
assertEquals(UpdateLog.State.ACTIVE, ulog.getState()); // leave each test method in a good state
|
||||
|
@ -1008,13 +1008,9 @@ public class TestRecovery extends SolrTestCaseJ4 {
|
|||
|
||||
|
||||
@Test
|
||||
public void testBufferingFlags() throws Exception {
|
||||
public void testExistOldBufferLog() throws Exception {
|
||||
|
||||
DirectUpdateHandler2.commitOnClose = false;
|
||||
final Semaphore logReplayFinish = new Semaphore(0);
|
||||
|
||||
UpdateLog.testing_logReplayFinishHook = () -> logReplayFinish.release();
|
||||
|
||||
|
||||
SolrQueryRequest req = req();
|
||||
UpdateHandler uhandler = req.getCore().getUpdateHandler();
|
||||
|
@ -1024,9 +1020,6 @@ public class TestRecovery extends SolrTestCaseJ4 {
|
|||
String v101 = getNextVersion();
|
||||
String v102 = getNextVersion();
|
||||
String v103 = getNextVersion();
|
||||
String v114 = getNextVersion();
|
||||
String v115 = getNextVersion();
|
||||
String v116 = getNextVersion();
|
||||
String v117 = getNextVersion();
|
||||
|
||||
clearIndex();
|
||||
|
@ -1049,14 +1042,10 @@ public class TestRecovery extends SolrTestCaseJ4 {
|
|||
uhandler = req.getCore().getUpdateHandler();
|
||||
ulog = uhandler.getUpdateLog();
|
||||
|
||||
logReplayFinish.acquire(); // wait for replay to finish
|
||||
|
||||
assertTrue((ulog.getStartingOperation() & UpdateLog.FLAG_GAP) != 0); // since we died while buffering, we should see this last
|
||||
|
||||
//
|
||||
// Try again to ensure that the previous log replay didn't wipe out our flags
|
||||
//
|
||||
// the core does not replay updates from buffer tlog on startup
|
||||
assertTrue(ulog.existOldBufferLog()); // since we died while buffering, we should see this last
|
||||
|
||||
// buffer tlog won't be removed on restart
|
||||
req.close();
|
||||
h.close();
|
||||
createCore();
|
||||
|
@ -1065,26 +1054,9 @@ public class TestRecovery extends SolrTestCaseJ4 {
|
|||
uhandler = req.getCore().getUpdateHandler();
|
||||
ulog = uhandler.getUpdateLog();
|
||||
|
||||
assertTrue((ulog.getStartingOperation() & UpdateLog.FLAG_GAP) != 0);
|
||||
|
||||
// now do some normal non-buffered adds
|
||||
updateJ(jsonAdd(sdoc("id","Q4", "_version_",v114)), params(DISTRIB_UPDATE_PARAM,FROM_LEADER));
|
||||
updateJ(jsonAdd(sdoc("id","Q5", "_version_",v115)), params(DISTRIB_UPDATE_PARAM,FROM_LEADER));
|
||||
updateJ(jsonAdd(sdoc("id","Q6", "_version_",v116)), params(DISTRIB_UPDATE_PARAM,FROM_LEADER));
|
||||
assertU(commit());
|
||||
|
||||
req.close();
|
||||
h.close();
|
||||
createCore();
|
||||
|
||||
req = req();
|
||||
uhandler = req.getCore().getUpdateHandler();
|
||||
ulog = uhandler.getUpdateLog();
|
||||
|
||||
assertTrue((ulog.getStartingOperation() & UpdateLog.FLAG_GAP) == 0);
|
||||
assertTrue(ulog.existOldBufferLog());
|
||||
|
||||
ulog.bufferUpdates();
|
||||
// simulate receiving no updates
|
||||
ulog.applyBufferedUpdates();
|
||||
updateJ(jsonAdd(sdoc("id","Q7", "_version_",v117)), params(DISTRIB_UPDATE_PARAM,FROM_LEADER)); // do another add to make sure flags are back to normal
|
||||
|
||||
|
@ -1096,10 +1068,12 @@ public class TestRecovery extends SolrTestCaseJ4 {
|
|||
uhandler = req.getCore().getUpdateHandler();
|
||||
ulog = uhandler.getUpdateLog();
|
||||
|
||||
assertTrue((ulog.getStartingOperation() & UpdateLog.FLAG_GAP) == 0); // check flags on Q7
|
||||
|
||||
logReplayFinish.acquire();
|
||||
assertEquals(UpdateLog.State.ACTIVE, ulog.getState()); // leave each test method in a good state
|
||||
assertFalse(ulog.existOldBufferLog());
|
||||
// Timeout for Q7 get replayed, because it was added on tlog, therefore it will be replayed on restart
|
||||
TimeOut timeout = new TimeOut(10, TimeUnit.SECONDS, TimeSource.NANO_TIME);
|
||||
timeout.waitFor("Timeout waiting for finish replay updates",
|
||||
() -> h.getCore().getUpdateHandler().getUpdateLog().getState() == UpdateLog.State.ACTIVE);
|
||||
assertJQ(req("qt","/get", "id", "Q7") ,"/doc/id==Q7");
|
||||
} finally {
|
||||
DirectUpdateHandler2.commitOnClose = true;
|
||||
UpdateLog.testing_logReplayHook = null;
|
||||
|
|
|
@ -44,6 +44,7 @@ import org.apache.hadoop.hdfs.MiniDFSCluster;
|
|||
import org.apache.solr.SolrTestCaseJ4;
|
||||
import org.apache.solr.cloud.hdfs.HdfsTestUtil;
|
||||
import org.apache.solr.common.util.IOUtils;
|
||||
import org.apache.solr.common.util.TimeSource;
|
||||
import org.apache.solr.request.SolrQueryRequest;
|
||||
import org.apache.solr.update.DirectUpdateHandler2;
|
||||
import org.apache.solr.update.HdfsUpdateLog;
|
||||
|
@ -51,6 +52,7 @@ import org.apache.solr.update.UpdateHandler;
|
|||
import org.apache.solr.update.UpdateLog;
|
||||
import org.apache.solr.update.processor.DistributedUpdateProcessor.DistribPhase;
|
||||
import org.apache.solr.util.BadHdfsThreadsFilter;
|
||||
import org.apache.solr.util.TimeOut;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Ignore;
|
||||
|
@ -515,13 +517,9 @@ public class TestRecoveryHdfs extends SolrTestCaseJ4 {
|
|||
|
||||
|
||||
@Test
|
||||
public void testBufferingFlags() throws Exception {
|
||||
public void testExistOldBufferLog() throws Exception {
|
||||
|
||||
DirectUpdateHandler2.commitOnClose = false;
|
||||
final Semaphore logReplayFinish = new Semaphore(0);
|
||||
|
||||
UpdateLog.testing_logReplayFinishHook = () -> logReplayFinish.release();
|
||||
|
||||
|
||||
SolrQueryRequest req = req();
|
||||
UpdateHandler uhandler = req.getCore().getUpdateHandler();
|
||||
|
@ -548,14 +546,10 @@ public class TestRecoveryHdfs extends SolrTestCaseJ4 {
|
|||
uhandler = req.getCore().getUpdateHandler();
|
||||
ulog = uhandler.getUpdateLog();
|
||||
|
||||
logReplayFinish.acquire(); // wait for replay to finish
|
||||
|
||||
assertTrue((ulog.getStartingOperation() & UpdateLog.FLAG_GAP) != 0); // since we died while buffering, we should see this last
|
||||
|
||||
//
|
||||
// Try again to ensure that the previous log replay didn't wipe out our flags
|
||||
//
|
||||
// the core no longer replay updates from buffer tlog on startup
|
||||
assertTrue(ulog.existOldBufferLog()); // since we died while buffering, we should see this last
|
||||
|
||||
// buffer tlog won't be removed on restart
|
||||
req.close();
|
||||
h.close();
|
||||
createCore();
|
||||
|
@ -564,23 +558,7 @@ public class TestRecoveryHdfs extends SolrTestCaseJ4 {
|
|||
uhandler = req.getCore().getUpdateHandler();
|
||||
ulog = uhandler.getUpdateLog();
|
||||
|
||||
assertTrue((ulog.getStartingOperation() & UpdateLog.FLAG_GAP) != 0);
|
||||
|
||||
// now do some normal non-buffered adds
|
||||
updateJ(jsonAdd(sdoc("id","Q4", "_version_","114")), params(DISTRIB_UPDATE_PARAM,FROM_LEADER));
|
||||
updateJ(jsonAdd(sdoc("id","Q5", "_version_","115")), params(DISTRIB_UPDATE_PARAM,FROM_LEADER));
|
||||
updateJ(jsonAdd(sdoc("id","Q6", "_version_","116")), params(DISTRIB_UPDATE_PARAM,FROM_LEADER));
|
||||
assertU(commit());
|
||||
|
||||
req.close();
|
||||
h.close();
|
||||
createCore();
|
||||
|
||||
req = req();
|
||||
uhandler = req.getCore().getUpdateHandler();
|
||||
ulog = uhandler.getUpdateLog();
|
||||
|
||||
assertTrue((ulog.getStartingOperation() & UpdateLog.FLAG_GAP) == 0);
|
||||
assertTrue(ulog.existOldBufferLog());
|
||||
|
||||
ulog.bufferUpdates();
|
||||
// simulate receiving no updates
|
||||
|
@ -595,10 +573,12 @@ public class TestRecoveryHdfs extends SolrTestCaseJ4 {
|
|||
uhandler = req.getCore().getUpdateHandler();
|
||||
ulog = uhandler.getUpdateLog();
|
||||
|
||||
assertTrue((ulog.getStartingOperation() & UpdateLog.FLAG_GAP) == 0); // check flags on Q7
|
||||
|
||||
logReplayFinish.acquire();
|
||||
assertEquals(UpdateLog.State.ACTIVE, ulog.getState()); // leave each test method in a good state
|
||||
assertFalse(ulog.existOldBufferLog());
|
||||
// Timeout for Q7 get replayed, because it was added on tlog, therefore it will be replayed on restart
|
||||
TimeOut timeout = new TimeOut(10, TimeUnit.SECONDS, TimeSource.NANO_TIME);
|
||||
timeout.waitFor("Timeout waiting for finish replay updates",
|
||||
() -> h.getCore().getUpdateHandler().getUpdateLog().getState() == UpdateLog.State.ACTIVE);
|
||||
assertJQ(req("qt","/get", "id", "Q7") ,"/doc/id==Q7");
|
||||
} finally {
|
||||
DirectUpdateHandler2.commitOnClose = true;
|
||||
UpdateLog.testing_logReplayHook = null;
|
||||
|
|
|
@ -35,7 +35,7 @@ public class TransactionLogTest extends LuceneTestCase {
|
|||
transactionLog.lastAddSize = 2000000000;
|
||||
AddUpdateCommand updateCommand = new AddUpdateCommand(null);
|
||||
updateCommand.solrDoc = new SolrInputDocument();
|
||||
transactionLog.write(updateCommand, 0);
|
||||
transactionLog.write(updateCommand);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue