HBASE-12600: Remove REPLAY tag dependency in Distributed Replay Mode
This commit is contained in:
parent
0733450473
commit
2c280e6253
|
@ -68,19 +68,6 @@ public class CellComparator implements Comparator<Cell>, Serializable {
|
|||
|
||||
if (!ignoreSequenceid) {
|
||||
// Negate following comparisons so later edits show up first
|
||||
|
||||
// compare log replay tag value if there is any
|
||||
// when either keyvalue tagged with log replay sequence number, we need to compare them:
|
||||
// 1) when both keyvalues have the tag, then use the tag values for comparison
|
||||
// 2) when one has and the other doesn't have, the one without the log
|
||||
// replay tag wins because
|
||||
// it means the edit isn't from recovery but new one coming from clients during recovery
|
||||
// 3) when both doesn't have, then skip to the next mvcc comparison
|
||||
long leftChangeSeqNum = getReplaySeqNum(a);
|
||||
long RightChangeSeqNum = getReplaySeqNum(b);
|
||||
if (leftChangeSeqNum != Long.MAX_VALUE || RightChangeSeqNum != Long.MAX_VALUE) {
|
||||
return Longs.compare(RightChangeSeqNum, leftChangeSeqNum);
|
||||
}
|
||||
// mvccVersion: later sorts first
|
||||
return Longs.compare(b.getMvccVersion(), a.getMvccVersion());
|
||||
} else {
|
||||
|
@ -88,22 +75,6 @@ public class CellComparator implements Comparator<Cell>, Serializable {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Return replay log sequence number for the cell
|
||||
*
|
||||
* @param c
|
||||
* @return Long.MAX_VALUE if there is no LOG_REPLAY_TAG
|
||||
*/
|
||||
private static long getReplaySeqNum(final Cell c) {
|
||||
Tag tag = Tag.getTag(c.getTagsArray(), c.getTagsOffset(), c.getTagsLength(),
|
||||
TagType.LOG_REPLAY_TAG_TYPE);
|
||||
|
||||
if (tag != null) {
|
||||
return Bytes.toLong(tag.getBuffer(), tag.getTagOffset(), tag.getTagLength());
|
||||
}
|
||||
return Long.MAX_VALUE;
|
||||
}
|
||||
|
||||
public static int findCommonPrefixInRowPart(Cell left, Cell right, int rowCommonPrefix) {
|
||||
return findCommonPrefix(left.getRowArray(), right.getRowArray(), left.getRowLength()
|
||||
- rowCommonPrefix, right.getRowLength() - rowCommonPrefix, left.getRowOffset()
|
||||
|
|
|
@ -746,6 +746,7 @@ public class KeyValue implements Cell, HeapSize, Cloneable, SettableSequenceId,
|
|||
c.getQualifierArray(), c.getQualifierOffset(), (int) c.getQualifierLength(),
|
||||
c.getTimestamp(), Type.codeToType(c.getTypeByte()), c.getValueArray(), c.getValueOffset(),
|
||||
c.getValueLength(), c.getTagsArray(), c.getTagsOffset(), c.getTagsLength());
|
||||
this.seqId = c.getSequenceId();
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -961,8 +962,8 @@ public class KeyValue implements Cell, HeapSize, Cloneable, SettableSequenceId,
|
|||
checkForTagsLength(tagsLength);
|
||||
// Allocate right-sized byte array.
|
||||
int keyLength = (int) getKeyDataStructureSize(rlength, flength, qlength);
|
||||
byte [] bytes =
|
||||
new byte[(int) getKeyValueDataStructureSize(rlength, flength, qlength, vlength, tagsLength)];
|
||||
byte[] bytes = new byte[(int) getKeyValueDataStructureSize(rlength, flength, qlength, vlength,
|
||||
tagsLength)];
|
||||
// Write key, value and key row length.
|
||||
int pos = 0;
|
||||
pos = Bytes.putInt(bytes, pos, keyLength);
|
||||
|
@ -2492,8 +2493,8 @@ public class KeyValue implements Cell, HeapSize, Cloneable, SettableSequenceId,
|
|||
* Compare two keys assuming that the first n bytes are the same.
|
||||
* @param commonPrefix How many bytes are the same.
|
||||
*/
|
||||
int compareIgnoringPrefix(
|
||||
int commonPrefix, byte[] left, int loffset, int llength, byte[] right, int roffset, int rlength
|
||||
int compareIgnoringPrefix(int commonPrefix, byte[] left, int loffset, int llength,
|
||||
byte[] right, int roffset, int rlength
|
||||
);
|
||||
}
|
||||
|
||||
|
|
|
@ -26,7 +26,7 @@ public final class TagType {
|
|||
// Please declare new Tag Types here to avoid step on pre-existing tag types.
|
||||
public static final byte ACL_TAG_TYPE = (byte) 1;
|
||||
public static final byte VISIBILITY_TAG_TYPE = (byte) 2;
|
||||
public static final byte LOG_REPLAY_TAG_TYPE = (byte) 3;
|
||||
// public static final byte LOG_REPLAY_TAG_TYPE = (byte) 3; // deprecated
|
||||
public static final byte VISIBILITY_EXP_SERIALIZATION_FORMAT_TAG_TYPE = (byte)4;
|
||||
// String based tag type used in replication
|
||||
public static final byte STRING_VIS_TAG_TYPE = (byte) 7;
|
||||
|
|
|
@ -773,8 +773,8 @@ public class ZKSplitLogManagerCoordination extends ZooKeeperListener implements
|
|||
public void setRecoveryMode(boolean isForInitialization) throws IOException {
|
||||
synchronized(this) {
|
||||
if (this.isDrainingDone) {
|
||||
// when there is no outstanding splitlogtask after master start up, we already have up to date
|
||||
// recovery mode
|
||||
// when there is no outstanding splitlogtask after master start up, we already have up to
|
||||
// date recovery mode
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
@ -866,12 +866,10 @@ public class ZKSplitLogManagerCoordination extends ZooKeeperListener implements
|
|||
boolean dlr =
|
||||
conf.getBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY,
|
||||
HConstants.DEFAULT_DISTRIBUTED_LOG_REPLAY_CONFIG);
|
||||
int version = conf.getInt(HFile.FORMAT_VERSION_KEY, HFile.MAX_FORMAT_VERSION);
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Distributed log replay=" + dlr + ", " + HFile.FORMAT_VERSION_KEY + "=" + version);
|
||||
LOG.debug("Distributed log replay=" + dlr);
|
||||
}
|
||||
// For distributed log replay, hfile version must be 3 at least; we need tag support.
|
||||
return dlr && (version >= 3);
|
||||
return dlr;
|
||||
}
|
||||
|
||||
private boolean resubmit(ServerName serverName, String path, int version) {
|
||||
|
|
|
@ -2684,7 +2684,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
|
|||
continue;
|
||||
}
|
||||
doRollBackMemstore = true; // If we have a failure, we need to clean what we wrote
|
||||
addedSize += applyFamilyMapToMemstore(familyMaps[i], mvccNum, memstoreCells);
|
||||
addedSize += applyFamilyMapToMemstore(familyMaps[i], mvccNum, memstoreCells, isInReplay);
|
||||
}
|
||||
|
||||
// ------------------------------------
|
||||
|
@ -3189,12 +3189,13 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
|
|||
* @param localizedWriteEntry The WriteEntry of the MVCC for this transaction.
|
||||
* If null, then this method internally creates a mvcc transaction.
|
||||
* @param output newly added KVs into memstore
|
||||
* @param isInReplay true when adding replayed KVs into memstore
|
||||
* @return the additional memory usage of the memstore caused by the
|
||||
* new entries.
|
||||
* @throws IOException
|
||||
*/
|
||||
private long applyFamilyMapToMemstore(Map<byte[], List<Cell>> familyMap,
|
||||
long mvccNum, List<Cell> memstoreCells) throws IOException {
|
||||
long mvccNum, List<Cell> memstoreCells, boolean isInReplay) throws IOException {
|
||||
long size = 0;
|
||||
|
||||
for (Map.Entry<byte[], List<Cell>> e : familyMap.entrySet()) {
|
||||
|
@ -3209,6 +3210,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
|
|||
Pair<Long, Cell> ret = store.add(cell);
|
||||
size += ret.getFirst();
|
||||
memstoreCells.add(ret.getSecond());
|
||||
if(isInReplay) {
|
||||
// set memstore newly added cells with replay mvcc number
|
||||
CellUtil.setSequenceId(ret.getSecond(), mvccNum);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -3407,7 +3412,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
|
|||
}
|
||||
|
||||
try {
|
||||
// replay the edits. Replay can return -1 if everything is skipped, only update if seqId is greater
|
||||
// replay the edits. Replay can return -1 if everything is skipped, only update
|
||||
// if seqId is greater
|
||||
seqid = Math.max(seqid, replayRecoveredEdits(edits, maxSeqIdInStores, reporter));
|
||||
} catch (IOException e) {
|
||||
boolean skipErrors = conf.getBoolean(
|
||||
|
|
|
@ -1430,8 +1430,6 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
|
|||
? region.getCoprocessorHost()
|
||||
: null; // do not invoke coprocessors if this is a secondary region replica
|
||||
List<Pair<WALKey, WALEdit>> walEntries = new ArrayList<Pair<WALKey, WALEdit>>();
|
||||
// when tag is enabled, we need tag replay edits with log sequence number
|
||||
boolean needAddReplayTag = (HFile.getFormatVersion(regionServer.conf) >= 3);
|
||||
|
||||
// Skip adding the edits to WAL if this is a secondary region replica
|
||||
boolean isPrimary = RegionReplicaUtil.isDefaultReplica(region.getRegionInfo());
|
||||
|
@ -1452,7 +1450,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
|
|||
Pair<WALKey, WALEdit> walEntry = (coprocessorHost == null) ? null :
|
||||
new Pair<WALKey, WALEdit>();
|
||||
List<WALSplitter.MutationReplay> edits = WALSplitter.getMutationsFromWALEntry(entry,
|
||||
cells, walEntry, needAddReplayTag, durability);
|
||||
cells, walEntry, durability);
|
||||
if (coprocessorHost != null) {
|
||||
// Start coprocessor replay here. The coprocessor is for each WALEdit instead of a
|
||||
// KeyValue.
|
||||
|
|
|
@ -92,8 +92,9 @@ public class DefaultCompactor extends Compactor {
|
|||
smallestReadPoint = Math.min(fd.minSeqIdToKeep, smallestReadPoint);
|
||||
cleanSeqId = true;
|
||||
}
|
||||
|
||||
writer = store.createWriterInTmp(fd.maxKeyCount, this.compactionCompression, true,
|
||||
fd.maxMVCCReadpoint >= smallestReadPoint, fd.maxTagsLength > 0);
|
||||
true, fd.maxTagsLength > 0);
|
||||
boolean finished = performCompaction(scanner, writer, smallestReadPoint, cleanSeqId);
|
||||
if (!finished) {
|
||||
writer.close();
|
||||
|
|
|
@ -112,13 +112,13 @@ public class StripeCompactor extends Compactor {
|
|||
smallestReadPoint = Math.min(fd.minSeqIdToKeep, smallestReadPoint);
|
||||
cleanSeqId = true;
|
||||
}
|
||||
final boolean needMvcc = fd.maxMVCCReadpoint >= smallestReadPoint;
|
||||
|
||||
final Compression.Algorithm compression = store.getFamily().getCompactionCompression();
|
||||
StripeMultiFileWriter.WriterFactory factory = new StripeMultiFileWriter.WriterFactory() {
|
||||
@Override
|
||||
public Writer createWriter() throws IOException {
|
||||
return store.createWriterInTmp(
|
||||
fd.maxKeyCount, compression, true, needMvcc, fd.maxTagsLength > 0);
|
||||
fd.maxKeyCount, compression, true, true, fd.maxTagsLength > 0);
|
||||
}
|
||||
};
|
||||
|
||||
|
|
|
@ -2135,34 +2135,6 @@ public class WALSplitter {
|
|||
public final long nonce;
|
||||
}
|
||||
|
||||
/**
|
||||
* Tag original sequence number for each edit to be replayed
|
||||
* @param seqId
|
||||
* @param cell
|
||||
*/
|
||||
private static Cell tagReplayLogSequenceNumber(long seqId, Cell cell) {
|
||||
// Tag puts with original sequence number if there is no LOG_REPLAY_TAG yet
|
||||
boolean needAddRecoveryTag = true;
|
||||
if (cell.getTagsLength() > 0) {
|
||||
Tag tmpTag = Tag.getTag(cell.getTagsArray(), cell.getTagsOffset(), cell.getTagsLength(),
|
||||
TagType.LOG_REPLAY_TAG_TYPE);
|
||||
if (tmpTag != null) {
|
||||
// found an existing log replay tag so reuse it
|
||||
needAddRecoveryTag = false;
|
||||
}
|
||||
}
|
||||
if (needAddRecoveryTag) {
|
||||
List<Tag> newTags = new ArrayList<Tag>();
|
||||
Tag replayTag = new Tag(TagType.LOG_REPLAY_TAG_TYPE, Bytes.toBytes(seqId));
|
||||
newTags.add(replayTag);
|
||||
if (cell.getTagsLength() > 0) {
|
||||
newTags.addAll(Tag.asList(cell.getTagsArray(), cell.getTagsOffset(), cell.getTagsLength()));
|
||||
}
|
||||
return new TagRewriteCell(cell, Tag.fromList(newTags));
|
||||
}
|
||||
return cell;
|
||||
}
|
||||
|
||||
/**
|
||||
* This function is used to construct mutations from a WALEntry. It also reconstructs WALKey &
|
||||
* WALEdit from the passed in WALEntry
|
||||
|
@ -2170,12 +2142,11 @@ public class WALSplitter {
|
|||
* @param cells
|
||||
* @param logEntry pair of WALKey and WALEdit instance stores WALKey and WALEdit instances
|
||||
* extracted from the passed in WALEntry.
|
||||
* @param addLogReplayTag
|
||||
* @return list of Pair<MutationType, Mutation> to be replayed
|
||||
* @throws IOException
|
||||
*/
|
||||
public static List<MutationReplay> getMutationsFromWALEntry(WALEntry entry, CellScanner cells,
|
||||
Pair<WALKey, WALEdit> logEntry, boolean addLogReplayTag, Durability durability)
|
||||
Pair<WALKey, WALEdit> logEntry, Durability durability)
|
||||
throws IOException {
|
||||
|
||||
if (entry == null) {
|
||||
|
@ -2223,11 +2194,7 @@ public class WALSplitter {
|
|||
if (CellUtil.isDelete(cell)) {
|
||||
((Delete) m).addDeleteMarker(cell);
|
||||
} else {
|
||||
Cell tmpNewCell = cell;
|
||||
if (addLogReplayTag) {
|
||||
tmpNewCell = tagReplayLogSequenceNumber(replaySeqId, cell);
|
||||
}
|
||||
((Put) m).add(tmpNewCell);
|
||||
((Put) m).add(cell);
|
||||
}
|
||||
m.setDurability(durability);
|
||||
previousCell = cell;
|
||||
|
|
|
@ -1188,7 +1188,6 @@ public class TestDistributedLogSplitting {
|
|||
LOG.info("testSameVersionUpdatesRecovery");
|
||||
conf.setLong("hbase.regionserver.hlog.blocksize", 15 * 1024);
|
||||
conf.setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, true);
|
||||
conf.setInt("hfile.format.version", 3);
|
||||
startCluster(NUM_RS);
|
||||
final AtomicLong sequenceId = new AtomicLong(100);
|
||||
final int NUM_REGIONS_TO_CREATE = 40;
|
||||
|
@ -1282,11 +1281,10 @@ public class TestDistributedLogSplitting {
|
|||
conf.setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, true);
|
||||
conf.setInt(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 30 * 1024);
|
||||
conf.setInt("hbase.hstore.compactionThreshold", 3);
|
||||
conf.setInt("hfile.format.version", 3);
|
||||
startCluster(NUM_RS);
|
||||
final AtomicLong sequenceId = new AtomicLong(100);
|
||||
final int NUM_REGIONS_TO_CREATE = 40;
|
||||
final int NUM_LOG_LINES = 1000;
|
||||
final int NUM_LOG_LINES = 2000;
|
||||
// turn off load balancing to prevent regions from moving around otherwise
|
||||
// they will consume recovered.edits
|
||||
master.balanceSwitch(false);
|
||||
|
|
Loading…
Reference in New Issue