Revert "Remove Replay Tag"

This reverts commit f723ffde7f.
This commit is contained in:
Jeffrey Zhong 2014-12-01 11:17:24 -08:00
parent f723ffde7f
commit 0733450473
10 changed files with 87 additions and 27 deletions

View File

@ -68,6 +68,19 @@ public class CellComparator implements Comparator<Cell>, Serializable {
if (!ignoreSequenceid) { if (!ignoreSequenceid) {
// Negate following comparisons so later edits show up first // Negate following comparisons so later edits show up first
// compare log replay tag value if there is any
// when either keyvalue tagged with log replay sequence number, we need to compare them:
// 1) when both keyvalues have the tag, then use the tag values for comparison
// 2) when one has and the other doesn't have, the one without the log
// replay tag wins because
// it means the edit isn't from recovery but new one coming from clients during recovery
// 3) when both doesn't have, then skip to the next mvcc comparison
long leftChangeSeqNum = getReplaySeqNum(a);
long RightChangeSeqNum = getReplaySeqNum(b);
if (leftChangeSeqNum != Long.MAX_VALUE || RightChangeSeqNum != Long.MAX_VALUE) {
return Longs.compare(RightChangeSeqNum, leftChangeSeqNum);
}
// mvccVersion: later sorts first // mvccVersion: later sorts first
return Longs.compare(b.getMvccVersion(), a.getMvccVersion()); return Longs.compare(b.getMvccVersion(), a.getMvccVersion());
} else { } else {
@ -75,6 +88,22 @@ public class CellComparator implements Comparator<Cell>, Serializable {
} }
} }
/**
* Return replay log sequence number for the cell
*
* @param c
* @return Long.MAX_VALUE if there is no LOG_REPLAY_TAG
*/
private static long getReplaySeqNum(final Cell c) {
Tag tag = Tag.getTag(c.getTagsArray(), c.getTagsOffset(), c.getTagsLength(),
TagType.LOG_REPLAY_TAG_TYPE);
if (tag != null) {
return Bytes.toLong(tag.getBuffer(), tag.getTagOffset(), tag.getTagLength());
}
return Long.MAX_VALUE;
}
public static int findCommonPrefixInRowPart(Cell left, Cell right, int rowCommonPrefix) { public static int findCommonPrefixInRowPart(Cell left, Cell right, int rowCommonPrefix) {
return findCommonPrefix(left.getRowArray(), right.getRowArray(), left.getRowLength() return findCommonPrefix(left.getRowArray(), right.getRowArray(), left.getRowLength()
- rowCommonPrefix, right.getRowLength() - rowCommonPrefix, left.getRowOffset() - rowCommonPrefix, right.getRowLength() - rowCommonPrefix, left.getRowOffset()

View File

@ -746,7 +746,6 @@ public class KeyValue implements Cell, HeapSize, Cloneable, SettableSequenceId,
c.getQualifierArray(), c.getQualifierOffset(), (int) c.getQualifierLength(), c.getQualifierArray(), c.getQualifierOffset(), (int) c.getQualifierLength(),
c.getTimestamp(), Type.codeToType(c.getTypeByte()), c.getValueArray(), c.getValueOffset(), c.getTimestamp(), Type.codeToType(c.getTypeByte()), c.getValueArray(), c.getValueOffset(),
c.getValueLength(), c.getTagsArray(), c.getTagsOffset(), c.getTagsLength()); c.getValueLength(), c.getTagsArray(), c.getTagsOffset(), c.getTagsLength());
this.seqId = c.getSequenceId();
} }
/** /**
@ -962,8 +961,8 @@ public class KeyValue implements Cell, HeapSize, Cloneable, SettableSequenceId,
checkForTagsLength(tagsLength); checkForTagsLength(tagsLength);
// Allocate right-sized byte array. // Allocate right-sized byte array.
int keyLength = (int) getKeyDataStructureSize(rlength, flength, qlength); int keyLength = (int) getKeyDataStructureSize(rlength, flength, qlength);
byte[] bytes = new byte[(int) getKeyValueDataStructureSize(rlength, flength, qlength, vlength, byte [] bytes =
tagsLength)]; new byte[(int) getKeyValueDataStructureSize(rlength, flength, qlength, vlength, tagsLength)];
// Write key, value and key row length. // Write key, value and key row length.
int pos = 0; int pos = 0;
pos = Bytes.putInt(bytes, pos, keyLength); pos = Bytes.putInt(bytes, pos, keyLength);
@ -2493,8 +2492,8 @@ public class KeyValue implements Cell, HeapSize, Cloneable, SettableSequenceId,
* Compare two keys assuming that the first n bytes are the same. * Compare two keys assuming that the first n bytes are the same.
* @param commonPrefix How many bytes are the same. * @param commonPrefix How many bytes are the same.
*/ */
int compareIgnoringPrefix(int commonPrefix, byte[] left, int loffset, int llength, int compareIgnoringPrefix(
byte[] right, int roffset, int rlength int commonPrefix, byte[] left, int loffset, int llength, byte[] right, int roffset, int rlength
); );
} }

View File

@ -26,7 +26,7 @@ public final class TagType {
// Please declare new Tag Types here to avoid step on pre-existing tag types. // Please declare new Tag Types here to avoid step on pre-existing tag types.
public static final byte ACL_TAG_TYPE = (byte) 1; public static final byte ACL_TAG_TYPE = (byte) 1;
public static final byte VISIBILITY_TAG_TYPE = (byte) 2; public static final byte VISIBILITY_TAG_TYPE = (byte) 2;
// public static final byte LOG_REPLAY_TAG_TYPE = (byte) 3; // deprecated public static final byte LOG_REPLAY_TAG_TYPE = (byte) 3;
public static final byte VISIBILITY_EXP_SERIALIZATION_FORMAT_TAG_TYPE = (byte)4; public static final byte VISIBILITY_EXP_SERIALIZATION_FORMAT_TAG_TYPE = (byte)4;
// String based tag type used in replication // String based tag type used in replication
public static final byte STRING_VIS_TAG_TYPE = (byte) 7; public static final byte STRING_VIS_TAG_TYPE = (byte) 7;

View File

@ -773,8 +773,8 @@ public class ZKSplitLogManagerCoordination extends ZooKeeperListener implements
public void setRecoveryMode(boolean isForInitialization) throws IOException { public void setRecoveryMode(boolean isForInitialization) throws IOException {
synchronized(this) { synchronized(this) {
if (this.isDrainingDone) { if (this.isDrainingDone) {
// when there is no outstanding splitlogtask after master start up, we already have up to // when there is no outstanding splitlogtask after master start up, we already have up to date
// date recovery mode // recovery mode
return; return;
} }
} }
@ -866,10 +866,12 @@ public class ZKSplitLogManagerCoordination extends ZooKeeperListener implements
boolean dlr = boolean dlr =
conf.getBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, conf.getBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY,
HConstants.DEFAULT_DISTRIBUTED_LOG_REPLAY_CONFIG); HConstants.DEFAULT_DISTRIBUTED_LOG_REPLAY_CONFIG);
int version = conf.getInt(HFile.FORMAT_VERSION_KEY, HFile.MAX_FORMAT_VERSION);
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("Distributed log replay=" + dlr); LOG.debug("Distributed log replay=" + dlr + ", " + HFile.FORMAT_VERSION_KEY + "=" + version);
} }
return dlr; // For distributed log replay, hfile version must be 3 at least; we need tag support.
return dlr && (version >= 3);
} }
private boolean resubmit(ServerName serverName, String path, int version) { private boolean resubmit(ServerName serverName, String path, int version) {

View File

@ -2684,7 +2684,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
continue; continue;
} }
doRollBackMemstore = true; // If we have a failure, we need to clean what we wrote doRollBackMemstore = true; // If we have a failure, we need to clean what we wrote
addedSize += applyFamilyMapToMemstore(familyMaps[i], mvccNum, memstoreCells, isInReplay); addedSize += applyFamilyMapToMemstore(familyMaps[i], mvccNum, memstoreCells);
} }
// ------------------------------------ // ------------------------------------
@ -3189,13 +3189,12 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
* @param localizedWriteEntry The WriteEntry of the MVCC for this transaction. * @param localizedWriteEntry The WriteEntry of the MVCC for this transaction.
* If null, then this method internally creates a mvcc transaction. * If null, then this method internally creates a mvcc transaction.
* @param output newly added KVs into memstore * @param output newly added KVs into memstore
* @param isInReplay true when adding replayed KVs into memstore
* @return the additional memory usage of the memstore caused by the * @return the additional memory usage of the memstore caused by the
* new entries. * new entries.
* @throws IOException * @throws IOException
*/ */
private long applyFamilyMapToMemstore(Map<byte[], List<Cell>> familyMap, private long applyFamilyMapToMemstore(Map<byte[], List<Cell>> familyMap,
long mvccNum, List<Cell> memstoreCells, boolean isInReplay) throws IOException { long mvccNum, List<Cell> memstoreCells) throws IOException {
long size = 0; long size = 0;
for (Map.Entry<byte[], List<Cell>> e : familyMap.entrySet()) { for (Map.Entry<byte[], List<Cell>> e : familyMap.entrySet()) {
@ -3210,10 +3209,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
Pair<Long, Cell> ret = store.add(cell); Pair<Long, Cell> ret = store.add(cell);
size += ret.getFirst(); size += ret.getFirst();
memstoreCells.add(ret.getSecond()); memstoreCells.add(ret.getSecond());
if(isInReplay) {
// set memstore newly added cells with replay mvcc number
CellUtil.setSequenceId(ret.getSecond(), mvccNum);
}
} }
} }
@ -3412,8 +3407,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
} }
try { try {
// replay the edits. Replay can return -1 if everything is skipped, only update // replay the edits. Replay can return -1 if everything is skipped, only update if seqId is greater
// if seqId is greater
seqid = Math.max(seqid, replayRecoveredEdits(edits, maxSeqIdInStores, reporter)); seqid = Math.max(seqid, replayRecoveredEdits(edits, maxSeqIdInStores, reporter));
} catch (IOException e) { } catch (IOException e) {
boolean skipErrors = conf.getBoolean( boolean skipErrors = conf.getBoolean(

View File

@ -1430,6 +1430,8 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
? region.getCoprocessorHost() ? region.getCoprocessorHost()
: null; // do not invoke coprocessors if this is a secondary region replica : null; // do not invoke coprocessors if this is a secondary region replica
List<Pair<WALKey, WALEdit>> walEntries = new ArrayList<Pair<WALKey, WALEdit>>(); List<Pair<WALKey, WALEdit>> walEntries = new ArrayList<Pair<WALKey, WALEdit>>();
// when tag is enabled, we need tag replay edits with log sequence number
boolean needAddReplayTag = (HFile.getFormatVersion(regionServer.conf) >= 3);
// Skip adding the edits to WAL if this is a secondary region replica // Skip adding the edits to WAL if this is a secondary region replica
boolean isPrimary = RegionReplicaUtil.isDefaultReplica(region.getRegionInfo()); boolean isPrimary = RegionReplicaUtil.isDefaultReplica(region.getRegionInfo());
@ -1450,7 +1452,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
Pair<WALKey, WALEdit> walEntry = (coprocessorHost == null) ? null : Pair<WALKey, WALEdit> walEntry = (coprocessorHost == null) ? null :
new Pair<WALKey, WALEdit>(); new Pair<WALKey, WALEdit>();
List<WALSplitter.MutationReplay> edits = WALSplitter.getMutationsFromWALEntry(entry, List<WALSplitter.MutationReplay> edits = WALSplitter.getMutationsFromWALEntry(entry,
cells, walEntry, durability); cells, walEntry, needAddReplayTag, durability);
if (coprocessorHost != null) { if (coprocessorHost != null) {
// Start coprocessor replay here. The coprocessor is for each WALEdit instead of a // Start coprocessor replay here. The coprocessor is for each WALEdit instead of a
// KeyValue. // KeyValue.

View File

@ -92,9 +92,8 @@ public class DefaultCompactor extends Compactor {
smallestReadPoint = Math.min(fd.minSeqIdToKeep, smallestReadPoint); smallestReadPoint = Math.min(fd.minSeqIdToKeep, smallestReadPoint);
cleanSeqId = true; cleanSeqId = true;
} }
writer = store.createWriterInTmp(fd.maxKeyCount, this.compactionCompression, true, writer = store.createWriterInTmp(fd.maxKeyCount, this.compactionCompression, true,
true, fd.maxTagsLength > 0); fd.maxMVCCReadpoint >= smallestReadPoint, fd.maxTagsLength > 0);
boolean finished = performCompaction(scanner, writer, smallestReadPoint, cleanSeqId); boolean finished = performCompaction(scanner, writer, smallestReadPoint, cleanSeqId);
if (!finished) { if (!finished) {
writer.close(); writer.close();

View File

@ -112,13 +112,13 @@ public class StripeCompactor extends Compactor {
smallestReadPoint = Math.min(fd.minSeqIdToKeep, smallestReadPoint); smallestReadPoint = Math.min(fd.minSeqIdToKeep, smallestReadPoint);
cleanSeqId = true; cleanSeqId = true;
} }
final boolean needMvcc = fd.maxMVCCReadpoint >= smallestReadPoint;
final Compression.Algorithm compression = store.getFamily().getCompactionCompression(); final Compression.Algorithm compression = store.getFamily().getCompactionCompression();
StripeMultiFileWriter.WriterFactory factory = new StripeMultiFileWriter.WriterFactory() { StripeMultiFileWriter.WriterFactory factory = new StripeMultiFileWriter.WriterFactory() {
@Override @Override
public Writer createWriter() throws IOException { public Writer createWriter() throws IOException {
return store.createWriterInTmp( return store.createWriterInTmp(
fd.maxKeyCount, compression, true, true, fd.maxTagsLength > 0); fd.maxKeyCount, compression, true, needMvcc, fd.maxTagsLength > 0);
} }
}; };

View File

@ -2135,6 +2135,34 @@ public class WALSplitter {
public final long nonce; public final long nonce;
} }
/**
* Tag original sequence number for each edit to be replayed
* @param seqId
* @param cell
*/
private static Cell tagReplayLogSequenceNumber(long seqId, Cell cell) {
// Tag puts with original sequence number if there is no LOG_REPLAY_TAG yet
boolean needAddRecoveryTag = true;
if (cell.getTagsLength() > 0) {
Tag tmpTag = Tag.getTag(cell.getTagsArray(), cell.getTagsOffset(), cell.getTagsLength(),
TagType.LOG_REPLAY_TAG_TYPE);
if (tmpTag != null) {
// found an existing log replay tag so reuse it
needAddRecoveryTag = false;
}
}
if (needAddRecoveryTag) {
List<Tag> newTags = new ArrayList<Tag>();
Tag replayTag = new Tag(TagType.LOG_REPLAY_TAG_TYPE, Bytes.toBytes(seqId));
newTags.add(replayTag);
if (cell.getTagsLength() > 0) {
newTags.addAll(Tag.asList(cell.getTagsArray(), cell.getTagsOffset(), cell.getTagsLength()));
}
return new TagRewriteCell(cell, Tag.fromList(newTags));
}
return cell;
}
/** /**
* This function is used to construct mutations from a WALEntry. It also reconstructs WALKey & * This function is used to construct mutations from a WALEntry. It also reconstructs WALKey &
* WALEdit from the passed in WALEntry * WALEdit from the passed in WALEntry
@ -2142,11 +2170,12 @@ public class WALSplitter {
* @param cells * @param cells
* @param logEntry pair of WALKey and WALEdit instance stores WALKey and WALEdit instances * @param logEntry pair of WALKey and WALEdit instance stores WALKey and WALEdit instances
* extracted from the passed in WALEntry. * extracted from the passed in WALEntry.
* @param addLogReplayTag
* @return list of Pair<MutationType, Mutation> to be replayed * @return list of Pair<MutationType, Mutation> to be replayed
* @throws IOException * @throws IOException
*/ */
public static List<MutationReplay> getMutationsFromWALEntry(WALEntry entry, CellScanner cells, public static List<MutationReplay> getMutationsFromWALEntry(WALEntry entry, CellScanner cells,
Pair<WALKey, WALEdit> logEntry, Durability durability) Pair<WALKey, WALEdit> logEntry, boolean addLogReplayTag, Durability durability)
throws IOException { throws IOException {
if (entry == null) { if (entry == null) {
@ -2194,7 +2223,11 @@ public class WALSplitter {
if (CellUtil.isDelete(cell)) { if (CellUtil.isDelete(cell)) {
((Delete) m).addDeleteMarker(cell); ((Delete) m).addDeleteMarker(cell);
} else { } else {
((Put) m).add(cell); Cell tmpNewCell = cell;
if (addLogReplayTag) {
tmpNewCell = tagReplayLogSequenceNumber(replaySeqId, cell);
}
((Put) m).add(tmpNewCell);
} }
m.setDurability(durability); m.setDurability(durability);
previousCell = cell; previousCell = cell;

View File

@ -1188,6 +1188,7 @@ public class TestDistributedLogSplitting {
LOG.info("testSameVersionUpdatesRecovery"); LOG.info("testSameVersionUpdatesRecovery");
conf.setLong("hbase.regionserver.hlog.blocksize", 15 * 1024); conf.setLong("hbase.regionserver.hlog.blocksize", 15 * 1024);
conf.setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, true); conf.setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, true);
conf.setInt("hfile.format.version", 3);
startCluster(NUM_RS); startCluster(NUM_RS);
final AtomicLong sequenceId = new AtomicLong(100); final AtomicLong sequenceId = new AtomicLong(100);
final int NUM_REGIONS_TO_CREATE = 40; final int NUM_REGIONS_TO_CREATE = 40;
@ -1281,10 +1282,11 @@ public class TestDistributedLogSplitting {
conf.setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, true); conf.setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, true);
conf.setInt(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 30 * 1024); conf.setInt(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 30 * 1024);
conf.setInt("hbase.hstore.compactionThreshold", 3); conf.setInt("hbase.hstore.compactionThreshold", 3);
conf.setInt("hfile.format.version", 3);
startCluster(NUM_RS); startCluster(NUM_RS);
final AtomicLong sequenceId = new AtomicLong(100); final AtomicLong sequenceId = new AtomicLong(100);
final int NUM_REGIONS_TO_CREATE = 40; final int NUM_REGIONS_TO_CREATE = 40;
final int NUM_LOG_LINES = 2000; final int NUM_LOG_LINES = 1000;
// turn off load balancing to prevent regions from moving around otherwise // turn off load balancing to prevent regions from moving around otherwise
// they will consume recovered.edits // they will consume recovered.edits
master.balanceSwitch(false); master.balanceSwitch(false);