HBase-11315: Keeping MVCC for configurable longer time

This commit is contained in:
Jeffrey Zhong 2014-07-06 18:25:18 -07:00
parent a04e0b703f
commit d07bc87cd6
35 changed files with 359 additions and 108 deletions

View File

@ -143,6 +143,12 @@ public class TestPayloadCarryingRpcController {
return 0;
}
@Override
public long getSequenceId() {
// unused
return 0;
}
@Override
public byte[] getValueArray() {
return Bytes.toBytes(this.i);

View File

@ -142,14 +142,25 @@ public interface Cell {
//6) MvccVersion
/**
* @deprecated as of 1.0, use {@link Cell#getSequenceId()}
*
* Internal use only. A region-specific sequence ID given to each operation. It always exists for
* cells in the memstore but is not retained forever. It may survive several flushes, but
* generally becomes irrelevant after the cell's row is no longer involved in any operations that
* require strict consistency.
* @return mvccVersion (always >= 0 if exists), or 0 if it no longer exists
*/
@Deprecated
long getMvccVersion();
/**
* A region-specific unique monotonically increasing sequence ID given to each Cell. It always
* exists for cells in the memstore but is not retained forever. It will be kept for
* {@link HConstants#KEEP_SEQID_PERIOD} days, but generally becomes irrelevant after the cell's
* row is no longer involved in any operations that require strict consistency.
* @return seqId (always > 0 if exists), or 0 if it no longer exists
*/
long getSequenceId();
//7) Value

View File

@ -167,7 +167,7 @@ public final class CellUtil {
final long timestamp, final byte type, final byte[] value, final long memstoreTS) {
KeyValue keyValue = new KeyValue(row, family, qualifier, timestamp,
KeyValue.Type.codeToType(type), value);
keyValue.setMvccVersion(memstoreTS);
keyValue.setSequenceId(memstoreTS);
return keyValue;
}
@ -175,7 +175,7 @@ public final class CellUtil {
final long timestamp, final byte type, final byte[] value, byte[] tags, final long memstoreTS) {
KeyValue keyValue = new KeyValue(row, family, qualifier, timestamp,
KeyValue.Type.codeToType(type), value, tags);
keyValue.setMvccVersion(memstoreTS);
keyValue.setSequenceId(memstoreTS);
return keyValue;
}

View File

@ -354,6 +354,11 @@ public final class HConstants {
/** Default value for cluster ID */
public static final String CLUSTER_ID_DEFAULT = "default-cluster";
/** Parameter name for # days to keep MVCC values during a major compaction */
public static final String KEEP_SEQID_PERIOD = "hbase.hstore.compaction.keep.seqId.period";
/** At least to keep MVCC values in hfiles for 5 days */
public static final int MIN_KEEP_SEQID_PERIOD = 5;
// Always store the location of the root table's HRegion.
// This HRegion is never split.

View File

@ -284,15 +284,23 @@ public class KeyValue implements Cell, HeapSize, Cloneable {
// used to achieve atomic operations in the memstore.
@Override
public long getMvccVersion() {
return mvcc;
return this.getSequenceId();
}
public void setMvccVersion(long mvccVersion){
this.mvcc = mvccVersion;
/**
* used to achieve atomic operations in the memstore.
*/
@Override
public long getSequenceId() {
return seqId;
}
public void setSequenceId(long seqId) {
this.seqId = seqId;
}
// multi-version concurrency control version. default value is 0, aka do not care.
private long mvcc = 0; // this value is not part of a serialized KeyValue (not in HFiles)
private long seqId = 0;
/** Dragon time over, return to normal business */
@ -1083,7 +1091,7 @@ public class KeyValue implements Cell, HeapSize, Cloneable {
// Important to clone the memstoreTS as well - otherwise memstore's
// update-in-place methods (eg increment) will end up creating
// new entries
ret.setMvccVersion(mvcc);
ret.setSequenceId(seqId);
return ret;
}
@ -1094,7 +1102,7 @@ public class KeyValue implements Cell, HeapSize, Cloneable {
*/
public KeyValue shallowCopy() {
KeyValue shallowCopy = new KeyValue(this.bytes, this.offset, this.length);
shallowCopy.setMvccVersion(this.mvcc);
shallowCopy.setSequenceId(this.seqId);
return shallowCopy;
}
@ -1108,8 +1116,8 @@ public class KeyValue implements Cell, HeapSize, Cloneable {
if (this.bytes == null || this.bytes.length == 0) {
return "empty";
}
return keyToString(this.bytes, this.offset + ROW_OFFSET, getKeyLength()) +
"/vlen=" + getValueLength() + "/mvcc=" + mvcc;
return keyToString(this.bytes, this.offset + ROW_OFFSET, getKeyLength()) + "/vlen="
+ getValueLength() + "/seqid=" + seqId;
}
/**

View File

@ -75,7 +75,7 @@ public class KeyValueUtil {
public static KeyValue copyToNewKeyValue(final Cell cell) {
byte[] bytes = copyToNewByteArray(cell);
KeyValue kvCell = new KeyValue(bytes, 0, bytes.length);
kvCell.setMvccVersion(cell.getMvccVersion());
kvCell.setSequenceId(cell.getMvccVersion());
return kvCell;
}
@ -175,7 +175,7 @@ public class KeyValueUtil {
keyValue = new KeyValue(bb.array(), underlyingArrayOffset, kvLength);
if (includesMvccVersion) {
long mvccVersion = ByteBufferUtils.readVLong(bb);
keyValue.setMvccVersion(mvccVersion);
keyValue.setSequenceId(mvccVersion);
}
return keyValue;
}

View File

@ -231,6 +231,11 @@ abstract class BufferedDataBlockEncoder implements DataBlockEncoder {
return memstoreTS;
}
@Override
public long getSequenceId() {
return memstoreTS;
}
@Override
public byte[] getValueArray() {
return currentBuffer.array();
@ -421,6 +426,11 @@ abstract class BufferedDataBlockEncoder implements DataBlockEncoder {
return memstoreTS;
}
@Override
public long getSequenceId() {
return memstoreTS;
}
@Override
public byte[] getValueArray() {
return currentBuffer.array();

View File

@ -125,7 +125,7 @@ public class EncodedDataBlock {
(int) KeyValue.getKeyValueDataStructureSize(klen, vlen, tagsLen));
if (meta.isIncludesMvcc()) {
long mvccVersion = ByteBufferUtils.readVLong(decompressedData);
kv.setMvccVersion(mvccVersion);
kv.setSequenceId(mvccVersion);
}
return kv;
}
@ -244,7 +244,7 @@ public class EncodedDataBlock {
}
kv = new KeyValue(in.array(), kvOffset, (int) KeyValue.getKeyValueDataStructureSize(
klength, vlength, tagsLength));
kv.setMvccVersion(memstoreTS);
kv.setSequenceId(memstoreTS);
this.dataBlockEncoder.encode(kv, encodingCtx, out);
}
BufferGrabbingByteArrayOutputStream stream = new BufferGrabbingByteArrayOutputStream();

View File

@ -70,7 +70,7 @@ public class TestCellCodec {
Codec.Encoder encoder = codec.getEncoder(dos);
final KeyValue kv =
new KeyValue(Bytes.toBytes("r"), Bytes.toBytes("f"), Bytes.toBytes("q"), Bytes.toBytes("v"));
kv.setMvccVersion(Long.MAX_VALUE);
kv.setSequenceId(Long.MAX_VALUE);
encoder.write(kv);
encoder.flush();
dos.close();

View File

@ -50,7 +50,7 @@ public class TestByteRangeWithKVSerialization {
long mvcc = pbr.getVLong();
KeyValue kv = new KeyValue(pbr.getBytes(), kvStartPos,
(int) KeyValue.getKeyValueDataStructureSize(keyLen, valLen, tagsLen));
kv.setMvccVersion(mvcc);
kv.setSequenceId(mvcc);
return kv;
}
@ -65,7 +65,7 @@ public class TestByteRangeWithKVSerialization {
Tag[] tags = new Tag[] { new Tag((byte) 1, "tag1") };
for (int i = 0; i < kvCount; i++) {
KeyValue kv = new KeyValue(Bytes.toBytes(i), FAMILY, QUALIFIER, i, VALUE, tags);
kv.setMvccVersion(i);
kv.setSequenceId(i);
kvs.add(kv);
totalSize += kv.getLength() + Bytes.SIZEOF_LONG;
}

View File

@ -122,6 +122,11 @@ public class PrefixTreeCell implements Cell, Comparable<Cell> {
return mvccVersion;
}
@Override
public long getSequenceId() {
return getMvccVersion();
}
@Override
public int getValueLength() {
return valueLength;

View File

@ -44,21 +44,21 @@ public class TestRowDataDifferentTimestamps extends BaseTestRowData{
static List<KeyValue> d = Lists.newArrayList();
static{
KeyValue kv0 = new KeyValue(Arow, cf, cq0, 0L, v0);
kv0.setMvccVersion(123456789L);
kv0.setSequenceId(123456789L);
d.add(kv0);
KeyValue kv1 = new KeyValue(Arow, cf, cq1, 1L, v0);
kv1.setMvccVersion(3L);
kv1.setSequenceId(3L);
d.add(kv1);
KeyValue kv2 = new KeyValue(Brow, cf, cq0, 12345678L, v0);
kv2.setMvccVersion(65537L);
kv2.setSequenceId(65537L);
d.add(kv2);
//watch out... Long.MAX_VALUE comes back as 1332221664203, even with other encoders
// d.add(new KeyValue(Brow, cf, cq1, Long.MAX_VALUE, v0));
KeyValue kv3 = new KeyValue(Brow, cf, cq1, Long.MAX_VALUE-1, v0);
kv3.setMvccVersion(1L);
kv3.setSequenceId(1L);
d.add(kv3);
KeyValue kv4 = new KeyValue(Brow, cf, cq1, 999999999, v0);
@ -66,7 +66,7 @@ public class TestRowDataDifferentTimestamps extends BaseTestRowData{
d.add(kv4);
KeyValue kv5 = new KeyValue(Brow, cf, cq1, 12345, v0);
kv5.setMvccVersion(0L);
kv5.setSequenceId(0L);
d.add(kv5);
}

View File

@ -897,6 +897,16 @@ public final class WALProtos {
* <code>optional uint64 nonce = 10;</code>
*/
long getNonce();
// optional uint64 orig_sequence_number = 11;
/**
* <code>optional uint64 orig_sequence_number = 11;</code>
*/
boolean hasOrigSequenceNumber();
/**
* <code>optional uint64 orig_sequence_number = 11;</code>
*/
long getOrigSequenceNumber();
}
/**
* Protobuf type {@code WALKey}
@ -1017,6 +1027,11 @@ public final class WALProtos {
nonce_ = input.readUInt64();
break;
}
case 88: {
bitField0_ |= 0x00000100;
origSequenceNumber_ = input.readUInt64();
break;
}
}
}
} catch (com.google.protobuf.InvalidProtocolBufferException e) {
@ -1323,6 +1338,22 @@ public final class WALProtos {
return nonce_;
}
// optional uint64 orig_sequence_number = 11;
public static final int ORIG_SEQUENCE_NUMBER_FIELD_NUMBER = 11;
private long origSequenceNumber_;
/**
* <code>optional uint64 orig_sequence_number = 11;</code>
*/
public boolean hasOrigSequenceNumber() {
return ((bitField0_ & 0x00000100) == 0x00000100);
}
/**
* <code>optional uint64 orig_sequence_number = 11;</code>
*/
public long getOrigSequenceNumber() {
return origSequenceNumber_;
}
private void initFields() {
encodedRegionName_ = com.google.protobuf.ByteString.EMPTY;
tableName_ = com.google.protobuf.ByteString.EMPTY;
@ -1334,6 +1365,7 @@ public final class WALProtos {
clusterIds_ = java.util.Collections.emptyList();
nonceGroup_ = 0L;
nonce_ = 0L;
origSequenceNumber_ = 0L;
}
private byte memoizedIsInitialized = -1;
public final boolean isInitialized() {
@ -1411,6 +1443,9 @@ public final class WALProtos {
if (((bitField0_ & 0x00000080) == 0x00000080)) {
output.writeUInt64(10, nonce_);
}
if (((bitField0_ & 0x00000100) == 0x00000100)) {
output.writeUInt64(11, origSequenceNumber_);
}
getUnknownFields().writeTo(output);
}
@ -1460,6 +1495,10 @@ public final class WALProtos {
size += com.google.protobuf.CodedOutputStream
.computeUInt64Size(10, nonce_);
}
if (((bitField0_ & 0x00000100) == 0x00000100)) {
size += com.google.protobuf.CodedOutputStream
.computeUInt64Size(11, origSequenceNumber_);
}
size += getUnknownFields().getSerializedSize();
memoizedSerializedSize = size;
return size;
@ -1527,6 +1566,11 @@ public final class WALProtos {
result = result && (getNonce()
== other.getNonce());
}
result = result && (hasOrigSequenceNumber() == other.hasOrigSequenceNumber());
if (hasOrigSequenceNumber()) {
result = result && (getOrigSequenceNumber()
== other.getOrigSequenceNumber());
}
result = result &&
getUnknownFields().equals(other.getUnknownFields());
return result;
@ -1580,6 +1624,10 @@ public final class WALProtos {
hash = (37 * hash) + NONCE_FIELD_NUMBER;
hash = (53 * hash) + hashLong(getNonce());
}
if (hasOrigSequenceNumber()) {
hash = (37 * hash) + ORIG_SEQUENCE_NUMBER_FIELD_NUMBER;
hash = (53 * hash) + hashLong(getOrigSequenceNumber());
}
hash = (29 * hash) + getUnknownFields().hashCode();
memoizedHashCode = hash;
return hash;
@ -1728,6 +1776,8 @@ public final class WALProtos {
bitField0_ = (bitField0_ & ~0x00000100);
nonce_ = 0L;
bitField0_ = (bitField0_ & ~0x00000200);
origSequenceNumber_ = 0L;
bitField0_ = (bitField0_ & ~0x00000400);
return this;
}
@ -1810,6 +1860,10 @@ public final class WALProtos {
to_bitField0_ |= 0x00000080;
}
result.nonce_ = nonce_;
if (((from_bitField0_ & 0x00000400) == 0x00000400)) {
to_bitField0_ |= 0x00000100;
}
result.origSequenceNumber_ = origSequenceNumber_;
result.bitField0_ = to_bitField0_;
onBuilt();
return result;
@ -1902,6 +1956,9 @@ public final class WALProtos {
if (other.hasNonce()) {
setNonce(other.getNonce());
}
if (other.hasOrigSequenceNumber()) {
setOrigSequenceNumber(other.getOrigSequenceNumber());
}
this.mergeUnknownFields(other.getUnknownFields());
return this;
}
@ -2977,6 +3034,39 @@ public final class WALProtos {
return this;
}
// optional uint64 orig_sequence_number = 11;
private long origSequenceNumber_ ;
/**
* <code>optional uint64 orig_sequence_number = 11;</code>
*/
public boolean hasOrigSequenceNumber() {
return ((bitField0_ & 0x00000400) == 0x00000400);
}
/**
* <code>optional uint64 orig_sequence_number = 11;</code>
*/
public long getOrigSequenceNumber() {
return origSequenceNumber_;
}
/**
* <code>optional uint64 orig_sequence_number = 11;</code>
*/
public Builder setOrigSequenceNumber(long value) {
bitField0_ |= 0x00000400;
origSequenceNumber_ = value;
onChanged();
return this;
}
/**
* <code>optional uint64 orig_sequence_number = 11;</code>
*/
public Builder clearOrigSequenceNumber() {
bitField0_ = (bitField0_ & ~0x00000400);
origSequenceNumber_ = 0L;
onChanged();
return this;
}
// @@protoc_insertion_point(builder_scope:WALKey)
}
@ -5176,24 +5266,24 @@ public final class WALProtos {
java.lang.String[] descriptorData = {
"\n\tWAL.proto\032\013HBase.proto\"Y\n\tWALHeader\022\027\n" +
"\017has_compression\030\001 \001(\010\022\026\n\016encryption_key" +
"\030\002 \001(\014\022\033\n\023has_tag_compression\030\003 \001(\010\"\202\002\n\006" +
"\030\002 \001(\014\022\033\n\023has_tag_compression\030\003 \001(\010\"\240\002\n\006" +
"WALKey\022\033\n\023encoded_region_name\030\001 \002(\014\022\022\n\nt" +
"able_name\030\002 \002(\014\022\033\n\023log_sequence_number\030\003" +
" \002(\004\022\022\n\nwrite_time\030\004 \002(\004\022\035\n\ncluster_id\030\005" +
" \001(\0132\005.UUIDB\002\030\001\022\034\n\006scopes\030\006 \003(\0132\014.Family" +
"Scope\022\032\n\022following_kv_count\030\007 \001(\r\022\032\n\013clu" +
"ster_ids\030\010 \003(\0132\005.UUID\022\022\n\nnonceGroup\030\t \001(" +
"\004\022\r\n\005nonce\030\n \001(\004\"=\n\013FamilyScope\022\016\n\006famil",
"y\030\001 \002(\014\022\036\n\nscope_type\030\002 \002(\0162\n.ScopeType\"" +
"\251\001\n\024CompactionDescriptor\022\022\n\ntable_name\030\001" +
" \002(\014\022\033\n\023encoded_region_name\030\002 \002(\014\022\023\n\013fam" +
"ily_name\030\003 \002(\014\022\030\n\020compaction_input\030\004 \003(\t" +
"\022\031\n\021compaction_output\030\005 \003(\t\022\026\n\016store_hom" +
"e_dir\030\006 \002(\t\"\014\n\nWALTrailer*F\n\tScopeType\022\033" +
"\n\027REPLICATION_SCOPE_LOCAL\020\000\022\034\n\030REPLICATI" +
"ON_SCOPE_GLOBAL\020\001B?\n*org.apache.hadoop.h" +
"base.protobuf.generatedB\tWALProtosH\001\210\001\000\240" +
"\001\001"
"\004\022\r\n\005nonce\030\n \001(\004\022\034\n\024orig_sequence_number",
"\030\013 \001(\004\"=\n\013FamilyScope\022\016\n\006family\030\001 \002(\014\022\036\n" +
"\nscope_type\030\002 \002(\0162\n.ScopeType\"\251\001\n\024Compac" +
"tionDescriptor\022\022\n\ntable_name\030\001 \002(\014\022\033\n\023en" +
"coded_region_name\030\002 \002(\014\022\023\n\013family_name\030\003" +
" \002(\014\022\030\n\020compaction_input\030\004 \003(\t\022\031\n\021compac" +
"tion_output\030\005 \003(\t\022\026\n\016store_home_dir\030\006 \002(" +
"\t\"\014\n\nWALTrailer*F\n\tScopeType\022\033\n\027REPLICAT" +
"ION_SCOPE_LOCAL\020\000\022\034\n\030REPLICATION_SCOPE_G" +
"LOBAL\020\001B?\n*org.apache.hadoop.hbase.proto" +
"buf.generatedB\tWALProtosH\001\210\001\000\240\001\001"
};
com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@ -5211,7 +5301,7 @@ public final class WALProtos {
internal_static_WALKey_fieldAccessorTable = new
com.google.protobuf.GeneratedMessage.FieldAccessorTable(
internal_static_WALKey_descriptor,
new java.lang.String[] { "EncodedRegionName", "TableName", "LogSequenceNumber", "WriteTime", "ClusterId", "Scopes", "FollowingKvCount", "ClusterIds", "NonceGroup", "Nonce", });
new java.lang.String[] { "EncodedRegionName", "TableName", "LogSequenceNumber", "WriteTime", "ClusterId", "Scopes", "FollowingKvCount", "ClusterIds", "NonceGroup", "Nonce", "OrigSequenceNumber", });
internal_static_FamilyScope_descriptor =
getDescriptor().getMessageTypes().get(2);
internal_static_FamilyScope_fieldAccessorTable = new

View File

@ -54,6 +54,7 @@ message WALKey {
optional uint64 nonceGroup = 9;
optional uint64 nonce = 10;
optional uint64 orig_sequence_number = 11;
/*
optional CustomEntryType custom_entry_type = 9;

View File

@ -765,7 +765,7 @@ public class HFileReaderV2 extends AbstractHFileReader {
KeyValue ret = new KeyValue(blockBuffer.array(), blockBuffer.arrayOffset()
+ blockBuffer.position(), getCellBufSize());
if (this.reader.shouldIncludeMemstoreTS()) {
ret.setMvccVersion(currMemstoreTS);
ret.setSequenceId(currMemstoreTS);
}
return ret;
}

View File

@ -106,6 +106,9 @@ public class ReplicationProtbufUtil {
uuidBuilder.setMostSigBits(clusterId.getMostSignificantBits());
keyBuilder.addClusterIds(uuidBuilder.build());
}
if(key.getOrigLogSeqNum() > 0) {
keyBuilder.setOrigSequenceNumber(key.getOrigLogSeqNum());
}
WALEdit edit = entry.getEdit();
NavigableMap<byte[], Integer> scopes = key.getScopes();
if (scopes != null && !scopes.isEmpty()) {

View File

@ -271,7 +271,7 @@ public class DefaultMemStore implements MemStore {
assert alloc.getBytes() != null;
alloc.put(0, kv.getBuffer(), kv.getOffset(), len);
KeyValue newKv = new KeyValue(alloc.getBytes(), alloc.getOffset(), len);
newKv.setMvccVersion(kv.getMvccVersion());
newKv.setSequenceId(kv.getMvccVersion());
return newKv;
}

View File

@ -2155,6 +2155,7 @@ public class HRegion implements HeapSize { // , Writable{
/** This method is potentially expensive and should only be used for non-replay CP path. */
public abstract Mutation[] getMutationsForCoprocs();
public abstract boolean isInReplay();
public abstract long getReplaySequenceId();
public boolean isDone() {
return nextIndexToProcess == operations.length;
@ -2194,11 +2195,18 @@ public class HRegion implements HeapSize { // , Writable{
public boolean isInReplay() {
return false;
}
@Override
public long getReplaySequenceId() {
return 0;
}
}
private static class ReplayBatch extends BatchOperationInProgress<HLogSplitter.MutationReplay> {
public ReplayBatch(MutationReplay[] operations) {
private long replaySeqId = 0;
public ReplayBatch(MutationReplay[] operations, long seqId) {
super(operations);
this.replaySeqId = seqId;
}
@Override
@ -2226,6 +2234,11 @@ public class HRegion implements HeapSize { // , Writable{
public boolean isInReplay() {
return true;
}
@Override
public long getReplaySequenceId() {
return this.replaySeqId;
}
}
/**
@ -2252,13 +2265,14 @@ public class HRegion implements HeapSize { // , Writable{
/**
* Replay a batch of mutations.
* @param mutations mutations to replay.
* @param replaySeqId SeqId for current mutations
* @return an array of OperationStatus which internally contains the
* OperationStatusCode and the exceptionMessage if any.
* @throws IOException
*/
public OperationStatus[] batchReplay(HLogSplitter.MutationReplay[] mutations)
public OperationStatus[] batchReplay(HLogSplitter.MutationReplay[] mutations, long replaySeqId)
throws IOException {
return batchMutate(new ReplayBatch(mutations));
return batchMutate(new ReplayBatch(mutations, replaySeqId));
}
/**
@ -2475,7 +2489,7 @@ public class HRegion implements HeapSize { // , Writable{
// ------------------------------------
// STEP 2. Update any LATEST_TIMESTAMP timestamps
// ----------------------------------
for (int i = firstIndex; i < lastIndexExclusive; i++) {
for (int i = firstIndex; !isInReplay && i < lastIndexExclusive; i++) {
// skip invalid
if (batchOp.retCodeDetails[i].getOperationStatusCode()
!= OperationStatusCode.NOT_RUN) continue;
@ -2485,16 +2499,18 @@ public class HRegion implements HeapSize { // , Writable{
updateKVTimestamps(familyMaps[i].values(), byteNow);
noOfPuts++;
} else {
if (!isInReplay) {
prepareDeleteTimestamps(mutation, familyMaps[i], byteNow);
}
prepareDeleteTimestamps(mutation, familyMaps[i], byteNow);
noOfDeletes++;
}
}
lock(this.updatesLock.readLock(), numReadyToWrite);
locked = true;
mvccNum = MultiVersionConsistencyControl.getPreAssignedWriteNumber(this.sequenceId);
if(isInReplay) {
mvccNum = batchOp.getReplaySequenceId();
} else {
mvccNum = MultiVersionConsistencyControl.getPreAssignedWriteNumber(this.sequenceId);
}
//
// ------------------------------------
// Acquire the latest mvcc number
@ -2591,6 +2607,9 @@ public class HRegion implements HeapSize { // , Writable{
walKey = new HLogKey(this.getRegionInfo().getEncodedNameAsBytes(),
this.htableDescriptor.getTableName(), HLog.NO_SEQUENCE_ID, now,
mutation.getClusterIds(), currentNonceGroup, currentNonce);
if(isInReplay) {
walKey.setOrigLogSeqNum(batchOp.getReplaySequenceId());
}
txid = this.log.appendNoSync(this.htableDescriptor, this.getRegionInfo(), walKey, walEdit,
getSequenceId(), true, memstoreCells);
}
@ -2952,7 +2971,7 @@ public class HRegion implements HeapSize { // , Writable{
Store store = getStore(family);
for (Cell cell: cells) {
KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
kv.setMvccVersion(mvccNum);
kv.setSequenceId(mvccNum);
Pair<Long, Cell> ret = store.add(kv);
size += ret.getFirst();
memstoreCells.add(KeyValueUtil.ensureKeyValue(ret.getSecond()));
@ -3213,6 +3232,7 @@ public class HRegion implements HeapSize { // , Writable{
try {
reader = HLogFactory.createReader(fs, edits, conf);
long currentEditSeqId = -1;
long currentReplaySeqId = -1;
long firstSeqIdInLog = -1;
long skippedEdits = 0;
long editsCount = 0;
@ -3275,6 +3295,8 @@ public class HRegion implements HeapSize { // , Writable{
firstSeqIdInLog = key.getLogSeqNum();
}
currentEditSeqId = key.getLogSeqNum();
currentReplaySeqId = (key.getOrigLogSeqNum() > 0) ?
key.getOrigLogSeqNum() : currentEditSeqId;
boolean flush = false;
for (KeyValue kv: val.getKeyValues()) {
// Check this edit is for me. Also, guard against writing the special
@ -3309,6 +3331,7 @@ public class HRegion implements HeapSize { // , Writable{
skippedEdits++;
continue;
}
kv.setSequenceId(currentReplaySeqId);
// Once we are over the limit, restoreEdit will keep returning true to
// flush -- but don't flush until we've played all the kvs that make up
// the WALEdit.
@ -4922,7 +4945,7 @@ public class HRegion implements HeapSize { // , Writable{
writeEntry = mvcc.beginMemstoreInsertWithSeqNum(mvccNum);
// 6. Apply to memstore
for (KeyValue kv : mutations) {
kv.setMvccVersion(mvccNum);
kv.setSequenceId(mvccNum);
Store store = getStore(kv);
if (store == null) {
checkFamily(CellUtil.cloneFamily(kv));
@ -5168,7 +5191,7 @@ public class HRegion implements HeapSize { // , Writable{
// so only need to update the timestamp to 'now'
newKV.updateLatestStamp(Bytes.toBytes(now));
}
newKV.setMvccVersion(mvccNum);
newKV.setSequenceId(mvccNum);
// Give coprocessors a chance to update the new cell
if (coprocessorHost != null) {
newKV = KeyValueUtil.ensureKeyValue(coprocessorHost.postMutationBeforeWAL(
@ -5382,7 +5405,7 @@ public class HRegion implements HeapSize { // , Writable{
System.arraycopy(kv.getTagsArray(), kv.getTagsOffset(), newKV.getTagsArray(),
newKV.getTagsOffset() + oldCellTagsLen, incCellTagsLen);
}
newKV.setMvccVersion(mvccNum);
newKV.setSequenceId(mvccNum);
// Give coprocessors a chance to update the new cell
if (coprocessorHost != null) {
newKV = KeyValueUtil.ensureKeyValue(coprocessorHost.postMutationBeforeWAL(
@ -6220,4 +6243,14 @@ public class HRegion implements HeapSize { // , Writable{
WALEdit.EMPTY_WALEDIT, this.sequenceId, false, cells);
return key;
}
/**
* Explictly sync wal
* @throws IOException
*/
public void syncWal() throws IOException {
if(this.log != null) {
this.log.sync();
}
}
}

View File

@ -246,7 +246,7 @@ public class MultiVersionConsistencyControl {
public static class WriteEntry {
private long writeNumber;
private boolean completed = false;
private volatile boolean completed = false;
WriteEntry(long writeNumber) {
this.writeNumber = writeNumber;

View File

@ -633,12 +633,13 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
* constructing MultiResponse to save a possible loop if caller doesn't need MultiResponse.
* @param region
* @param mutations
* @param replaySeqId
* @return an array of OperationStatus which internally contains the OperationStatusCode and the
* exceptionMessage if any
* @throws IOException
*/
private OperationStatus [] doReplayBatchOp(final HRegion region,
final List<HLogSplitter.MutationReplay> mutations) throws IOException {
final List<HLogSplitter.MutationReplay> mutations, long replaySeqId) throws IOException {
HLogSplitter.MutationReplay[] mArray = new HLogSplitter.MutationReplay[mutations.size()];
long before = EnvironmentEdgeManager.currentTimeMillis();
@ -657,7 +658,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
if (!region.getRegionInfo().isMetaTable()) {
regionServer.cacheFlusher.reclaimMemStoreMemory();
}
return region.batchReplay(mArray);
return region.batchReplay(mArray, replaySeqId);
} finally {
if (regionServer.metricsRegionServer != null) {
long after = EnvironmentEdgeManager.currentTimeMillis();
@ -1330,7 +1331,6 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
entries.get(0).getKey().getEncodedRegionName().toStringUtf8());
RegionCoprocessorHost coprocessorHost = region.getCoprocessorHost();
List<Pair<HLogKey, WALEdit>> walEntries = new ArrayList<Pair<HLogKey, WALEdit>>();
List<HLogSplitter.MutationReplay> mutations = new ArrayList<HLogSplitter.MutationReplay>();
// when tag is enabled, we need tag replay edits with log sequence number
boolean needAddReplayTag = (HFile.getFormatVersion(regionServer.conf) >= 3);
for (WALEntry entry : entries) {
@ -1354,18 +1354,22 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
}
walEntries.add(walEntry);
}
mutations.addAll(edits);
}
if (!mutations.isEmpty()) {
OperationStatus[] result = doReplayBatchOp(region, mutations);
// check if it's a partial success
for (int i = 0; result != null && i < result.length; i++) {
if (result[i] != OperationStatus.SUCCESS) {
throw new IOException(result[i].getExceptionMsg());
if(edits!=null && !edits.isEmpty()) {
long replaySeqId = (entry.getKey().hasOrigSequenceNumber()) ?
entry.getKey().getOrigSequenceNumber() : entry.getKey().getLogSequenceNumber();
OperationStatus[] result = doReplayBatchOp(region, edits, replaySeqId);
// check if it's a partial success
for (int i = 0; result != null && i < result.length; i++) {
if (result[i] != OperationStatus.SUCCESS) {
throw new IOException(result[i].getExceptionMsg());
}
}
}
}
//sync wal at the end because ASYNC_WAL is used above
region.syncWal();
if (coprocessorHost != null) {
for (Pair<HLogKey, WALEdit> wal : walEntries) {
coprocessorHost.postWALRestore(region.getRegionInfo(), wal.getFirst(),

View File

@ -211,15 +211,6 @@ public class StoreFileScanner implements KeyValueScanner {
return false;
}
// For the optimisation in HBASE-4346, we set the KV's memstoreTS to
// 0, if it is older than all the scanners' read points. It is possible
// that a newer KV's memstoreTS was reset to 0. But, there is an
// older KV which was not reset to 0 (because it was
// not old enough during flush). Make sure that we set it correctly now,
// so that the comparision order does not change.
if (cur.getMvccVersion() <= readPt) {
KeyValueUtil.ensureKeyValue(cur).setMvccVersion(0);
}
return true;
}

View File

@ -59,6 +59,9 @@ public abstract class Compactor {
private int compactionKVMax;
protected Compression.Algorithm compactionCompression;
/** specify how many days to keep MVCC values during major compaction **/
protected int keepSeqIdPeriod;
//TODO: depending on Store is not good but, realistically, all compactors currently do.
Compactor(final Configuration conf, final Store store) {
this.conf = conf;
@ -67,6 +70,8 @@ public abstract class Compactor {
this.conf.getInt(HConstants.COMPACTION_KV_MAX, HConstants.COMPACTION_KV_MAX_DEFAULT);
this.compactionCompression = (this.store.getFamily() == null) ?
Compression.Algorithm.NONE : this.store.getFamily().getCompactionCompression();
this.keepSeqIdPeriod = Math.max(this.conf.getInt(HConstants.KEEP_SEQID_PERIOD,
HConstants.MIN_KEEP_SEQID_PERIOD), HConstants.MIN_KEEP_SEQID_PERIOD);
}
/**
@ -92,19 +97,30 @@ public abstract class Compactor {
public long maxMVCCReadpoint = 0;
/** Max tags length**/
public int maxTagsLength = 0;
/** Min SeqId to keep during a major compaction **/
public long minSeqIdToKeep = 0;
}
/**
* Extracts some details about the files to compact that are commonly needed by compactors.
* @param filesToCompact Files.
* @param calculatePutTs Whether earliest put TS is needed.
* @param allFiles Whether all files are included for compaction
* @return The result.
*/
protected FileDetails getFileDetails(
Collection<StoreFile> filesToCompact, boolean calculatePutTs) throws IOException {
Collection<StoreFile> filesToCompact, boolean allFiles) throws IOException {
FileDetails fd = new FileDetails();
long oldestHFileTimeStampToKeepMVCC = System.currentTimeMillis() -
(1000L * 60 * 60 * 24 * this.keepSeqIdPeriod);
for (StoreFile file : filesToCompact) {
if(allFiles && (file.getModificationTimeStamp() < oldestHFileTimeStampToKeepMVCC)) {
// when isAllFiles is true, all files are compacted so we can calculate the smallest
// MVCC value to keep
if(fd.minSeqIdToKeep < file.getMaxMemstoreTS()) {
fd.minSeqIdToKeep = file.getMaxMemstoreTS();
}
}
long seqNum = file.getMaxSequenceId();
fd.maxSeqId = Math.max(fd.maxSeqId, seqNum);
StoreFile.Reader r = file.getReader();
@ -130,7 +146,7 @@ public abstract class Compactor {
// If required, calculate the earliest put timestamp of all involved storefiles.
// This is used to remove family delete marker during compaction.
long earliestPutTs = 0;
if (calculatePutTs) {
if (allFiles) {
tmp = fileInfo.get(StoreFile.EARLIEST_PUT_TS);
if (tmp == null) {
// There's a file with no information, must be an old one
@ -148,7 +164,7 @@ public abstract class Compactor {
", size=" + StringUtils.humanReadableInt(r.length()) +
", encoding=" + r.getHFileReader().getDataBlockEncoding() +
", seqNum=" + seqNum +
(calculatePutTs ? ", earliestPutTs=" + earliestPutTs: ""));
(allFiles ? ", earliestPutTs=" + earliestPutTs: ""));
}
}
return fd;
@ -202,10 +218,11 @@ public abstract class Compactor {
* @param scanner Where to read from.
* @param writer Where to write to.
* @param smallestReadPoint Smallest read point.
* @param cleanSeqId When true, remove seqId(used to be mvcc) value which is <= smallestReadPoint
* @return Whether compaction ended; false if it was interrupted for some reason.
*/
protected boolean performCompaction(InternalScanner scanner,
CellSink writer, long smallestReadPoint) throws IOException {
CellSink writer, long smallestReadPoint, boolean cleanSeqId) throws IOException {
int bytesWritten = 0;
// Since scanner.next() can return 'false' but still be delivering data,
// we have to use a do/while loop.
@ -218,8 +235,8 @@ public abstract class Compactor {
// output to writer:
for (Cell c : kvs) {
KeyValue kv = KeyValueUtil.ensureKeyValue(c);
if (kv.getMvccVersion() <= smallestReadPoint) {
kv.setMvccVersion(0);
if (cleanSeqId && kv.getSequenceId() <= smallestReadPoint) {
kv.setSequenceId(0);
}
writer.append(kv);
++progress.currentCompactedKVs;

View File

@ -54,6 +54,7 @@ public class DefaultCompactor extends Compactor {
StoreFile.Writer writer = null;
List<Path> newFiles = new ArrayList<Path>();
boolean cleanSeqId = false;
try {
InternalScanner scanner = null;
try {
@ -71,9 +72,13 @@ public class DefaultCompactor extends Compactor {
}
// Create the writer even if no kv(Empty store file is also ok),
// because we need record the max seq id for the store file, see HBASE-6059
if(fd.minSeqIdToKeep > 0) {
smallestReadPoint = Math.min(fd.minSeqIdToKeep, smallestReadPoint);
cleanSeqId = true;
}
writer = store.createWriterInTmp(fd.maxKeyCount, this.compactionCompression, true,
fd.maxMVCCReadpoint >= smallestReadPoint, fd.maxTagsLength > 0);
boolean finished = performCompaction(scanner, writer, smallestReadPoint);
boolean finished = performCompaction(scanner, writer, smallestReadPoint, cleanSeqId);
if (!finished) {
writer.close();
store.getFileSystem().delete(writer.getPath(), false);

View File

@ -90,6 +90,7 @@ public class StripeCompactor extends Compactor {
boolean finished = false;
InternalScanner scanner = null;
boolean cleanSeqId = false;
try {
// Get scanner to use.
ScanType coprocScanType = ScanType.COMPACT_RETAIN_DELETES;
@ -108,6 +109,10 @@ public class StripeCompactor extends Compactor {
}
// Create the writer factory for compactions.
if(fd.minSeqIdToKeep > 0) {
smallestReadPoint = Math.min(fd.minSeqIdToKeep, smallestReadPoint);
cleanSeqId = true;
}
final boolean needMvcc = fd.maxMVCCReadpoint >= smallestReadPoint;
final Compression.Algorithm compression = store.getFamily().getCompactionCompression();
StripeMultiFileWriter.WriterFactory factory = new StripeMultiFileWriter.WriterFactory() {
@ -122,7 +127,7 @@ public class StripeCompactor extends Compactor {
// It is ok here if storeScanner is null.
StoreScanner storeScanner = (scanner instanceof StoreScanner) ? (StoreScanner)scanner : null;
mw.init(storeScanner, factory, store.getComparator());
finished = performCompaction(scanner, mw, smallestReadPoint);
finished = performCompaction(scanner, mw, smallestReadPoint, cleanSeqId);
if (!finished) {
throw new InterruptedIOException( "Aborting compaction of store " + store +
" in region " + store.getRegionInfo().getRegionNameAsString() +

View File

@ -91,9 +91,9 @@ class FSWALEntry extends HLog.Entry {
*/
long stampRegionSequenceId() {
long regionSequenceId = this.regionSequenceIdReference.incrementAndGet();
if(memstoreKVs != null && !memstoreKVs.isEmpty()) {
if (!this.getEdit().isReplay() && memstoreKVs != null && !memstoreKVs.isEmpty()) {
for(KeyValue kv : this.memstoreKVs){
kv.setMvccVersion(regionSequenceId);
kv.setSequenceId(regionSequenceId);
}
}
HLogKey key = getKey();

View File

@ -119,6 +119,7 @@ public class HLogKey implements WritableComparable<HLogKey>, SequenceNumber {
private byte [] encodedRegionName;
private TableName tablename;
private long logSeqNum;
private long origLogSeqNum = 0;
private CountDownLatch seqNumAssignedLatch = new CountDownLatch(1);
// Time at which this edit was written.
private long writeTime;
@ -255,6 +256,22 @@ public class HLogKey implements WritableComparable<HLogKey>, SequenceNumber {
this.seqNumAssignedLatch.countDown();
}
/**
* Used to set original seq Id for HLogKey during wal replay
* @param seqId
*/
public void setOrigLogSeqNum(final long seqId) {
this.origLogSeqNum = seqId;
}
/**
* Return a positive long if current HLogKey is created from a replay edit
* @return original sequence number of the WALEdit
*/
public long getOrigLogSeqNum() {
return this.origLogSeqNum;
}
/**
* Wait for sequence number is assigned & return the assigned value
* @return long the new assigned sequence number
@ -536,6 +553,9 @@ public class HLogKey implements WritableComparable<HLogKey>, SequenceNumber {
}
builder.setLogSequenceNumber(this.logSeqNum);
builder.setWriteTime(writeTime);
if(this.origLogSeqNum > 0) {
builder.setOrigSequenceNumber(this.origLogSeqNum);
}
if (this.nonce != HConstants.NO_NONCE) {
builder.setNonce(nonce);
}
@ -599,5 +619,8 @@ public class HLogKey implements WritableComparable<HLogKey>, SequenceNumber {
}
this.logSeqNum = walKey.getLogSequenceNumber();
this.writeTime = walKey.getWriteTime();
if(walKey.hasOrigSequenceNumber()) {
this.origLogSeqNum = walKey.getOrigSequenceNumber();
}
}
}

View File

@ -73,6 +73,7 @@ import org.apache.hadoop.hbase.Tag;
import org.apache.hadoop.hbase.TagType;
import org.apache.hadoop.hbase.client.ConnectionUtils;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.client.HConnectionManager;
import org.apache.hadoop.hbase.client.Mutation;
@ -1863,6 +1864,10 @@ public class HLogSplitter {
public MutationReplay(MutationType type, Mutation mutation, long nonceGroup, long nonce) {
this.type = type;
this.mutation = mutation;
if(this.mutation.getDurability() != Durability.SKIP_WAL) {
// using ASYNC_WAL for relay
this.mutation.setDurability(Durability.ASYNC_WAL);
}
this.nonceGroup = nonceGroup;
this.nonce = nonce;
}
@ -1875,10 +1880,10 @@ public class HLogSplitter {
/**
* Tag original sequence number for each edit to be replayed
* @param entry
* @param seqId
* @param cell
*/
private static Cell tagReplayLogSequenceNumber(WALEntry entry, Cell cell) {
private static Cell tagReplayLogSequenceNumber(long seqId, Cell cell) {
// Tag puts with original sequence number if there is no LOG_REPLAY_TAG yet
boolean needAddRecoveryTag = true;
if (cell.getTagsLength() > 0) {
@ -1891,8 +1896,7 @@ public class HLogSplitter {
}
if (needAddRecoveryTag) {
List<Tag> newTags = new ArrayList<Tag>();
Tag replayTag = new Tag(TagType.LOG_REPLAY_TAG_TYPE, Bytes.toBytes(entry.getKey()
.getLogSequenceNumber()));
Tag replayTag = new Tag(TagType.LOG_REPLAY_TAG_TYPE, Bytes.toBytes(seqId));
newTags.add(replayTag);
return KeyValue.cloneAndAddTags(cell, newTags);
}
@ -1918,6 +1922,8 @@ public class HLogSplitter {
return new ArrayList<MutationReplay>();
}
long replaySeqId = (entry.getKey().hasOrigSequenceNumber()) ?
entry.getKey().getOrigSequenceNumber() : entry.getKey().getLogSequenceNumber();
int count = entry.getAssociatedCellCount();
List<MutationReplay> mutations = new ArrayList<MutationReplay>();
Cell previousCell = null;
@ -1958,7 +1964,7 @@ public class HLogSplitter {
} else {
Cell tmpNewCell = cell;
if (addLogReplayTag) {
tmpNewCell = tagReplayLogSequenceNumber(entry, cell);
tmpNewCell = tagReplayLogSequenceNumber(replaySeqId, cell);
}
((Put) m).add(KeyValueUtil.ensureKeyValue(tmpNewCell));
}
@ -1973,8 +1979,8 @@ public class HLogSplitter {
clusterIds.add(new UUID(uuid.getMostSigBits(), uuid.getLeastSigBits()));
}
key = new HLogKey(walKey.getEncodedRegionName().toByteArray(), TableName.valueOf(walKey
.getTableName().toByteArray()), walKey.getLogSequenceNumber(),
walKey.getWriteTime(), clusterIds, walKey.getNonceGroup(), walKey.getNonce());
.getTableName().toByteArray()), replaySeqId, walKey.getWriteTime(), clusterIds,
walKey.getNonceGroup(), walKey.getNonce());
logEntry.setFirst(key);
logEntry.setSecond(val);
}

View File

@ -1906,7 +1906,7 @@ public class AccessController extends BaseRegionObserver
newKv.getValueArray(), newKv.getValueOffset(), newKv.getValueLength(),
tags);
// Preserve mvcc data
rewriteKv.setMvccVersion(newKv.getMvccVersion());
rewriteKv.setSequenceId(newKv.getMvccVersion());
return rewriteKv;
}

View File

@ -1255,7 +1255,7 @@ public class VisibilityController extends BaseRegionObserver implements MasterOb
newKv.getTimestamp(), KeyValue.Type.codeToType(newKv.getTypeByte()),
newKv.getValueArray(), newKv.getValueOffset(), newKv.getValueLength(), tags);
// Preserve mvcc data
rewriteKv.setMvccVersion(newKv.getMvccVersion());
rewriteKv.setSequenceId(newKv.getMvccVersion());
return rewriteKv;
}

View File

@ -3055,7 +3055,7 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility {
}
}
private static <T> String safeGetAsStr(List<T> lst, int i) {
public static <T> String safeGetAsStr(List<T> lst, int i) {
if (0 <= i && i < lst.size()) {
return lst.get(i).toString();
} else {

View File

@ -176,7 +176,7 @@ public class TestHFileBlock {
totalSize += kv.getLength();
if (includesMemstoreTS) {
long memstoreTS = randomizer.nextLong();
kv.setMvccVersion(memstoreTS);
kv.setSequenceId(memstoreTS);
totalSize += WritableUtils.getVIntSize(memstoreTS);
}
hbw.write(kv);

View File

@ -241,7 +241,7 @@ public class TestDefaultMemStore extends TestCase {
mvcc.beginMemstoreInsertWithSeqNum(this.startSeqNum.incrementAndGet());
KeyValue kv1 = new KeyValue(row, f, q1, v);
kv1.setMvccVersion(w.getWriteNumber());
kv1.setSequenceId(w.getWriteNumber());
memstore.add(kv1);
KeyValueScanner s = this.memstore.getScanners(mvcc.memstoreReadPoint()).get(0);
@ -254,7 +254,7 @@ public class TestDefaultMemStore extends TestCase {
w = mvcc.beginMemstoreInsertWithSeqNum(this.startSeqNum.incrementAndGet());
KeyValue kv2 = new KeyValue(row, f, q2, v);
kv2.setMvccVersion(w.getWriteNumber());
kv2.setSequenceId(w.getWriteNumber());
memstore.add(kv2);
s = this.memstore.getScanners(mvcc.memstoreReadPoint()).get(0);
@ -285,11 +285,11 @@ public class TestDefaultMemStore extends TestCase {
mvcc.beginMemstoreInsertWithSeqNum(this.startSeqNum.incrementAndGet());
KeyValue kv11 = new KeyValue(row, f, q1, v1);
kv11.setMvccVersion(w.getWriteNumber());
kv11.setSequenceId(w.getWriteNumber());
memstore.add(kv11);
KeyValue kv12 = new KeyValue(row, f, q2, v1);
kv12.setMvccVersion(w.getWriteNumber());
kv12.setSequenceId(w.getWriteNumber());
memstore.add(kv12);
mvcc.completeMemstoreInsert(w);
@ -300,11 +300,11 @@ public class TestDefaultMemStore extends TestCase {
// START INSERT 2: Write both columns val2
w = mvcc.beginMemstoreInsertWithSeqNum(this.startSeqNum.incrementAndGet());
KeyValue kv21 = new KeyValue(row, f, q1, v2);
kv21.setMvccVersion(w.getWriteNumber());
kv21.setSequenceId(w.getWriteNumber());
memstore.add(kv21);
KeyValue kv22 = new KeyValue(row, f, q2, v2);
kv22.setMvccVersion(w.getWriteNumber());
kv22.setSequenceId(w.getWriteNumber());
memstore.add(kv22);
// BEFORE COMPLETING INSERT 2, SEE FIRST KVS
@ -337,11 +337,11 @@ public class TestDefaultMemStore extends TestCase {
mvcc.beginMemstoreInsertWithSeqNum(this.startSeqNum.incrementAndGet());
KeyValue kv11 = new KeyValue(row, f, q1, v1);
kv11.setMvccVersion(w.getWriteNumber());
kv11.setSequenceId(w.getWriteNumber());
memstore.add(kv11);
KeyValue kv12 = new KeyValue(row, f, q2, v1);
kv12.setMvccVersion(w.getWriteNumber());
kv12.setSequenceId(w.getWriteNumber());
memstore.add(kv12);
mvcc.completeMemstoreInsert(w);
@ -353,7 +353,7 @@ public class TestDefaultMemStore extends TestCase {
w = mvcc.beginMemstoreInsertWithSeqNum(this.startSeqNum.incrementAndGet());
KeyValue kvDel = new KeyValue(row, f, q2, kv11.getTimestamp(),
KeyValue.Type.DeleteColumn);
kvDel.setMvccVersion(w.getWriteNumber());
kvDel.setSequenceId(w.getWriteNumber());
memstore.add(kvDel);
// BEFORE COMPLETING DELETE, SEE FIRST KVS
@ -414,7 +414,7 @@ public class TestDefaultMemStore extends TestCase {
byte[] v = Bytes.toBytes(i);
KeyValue kv = new KeyValue(row, f, q1, i, v);
kv.setMvccVersion(w.getWriteNumber());
kv.setSequenceId(w.getWriteNumber());
memstore.add(kv);
mvcc.completeMemstoreInsert(w);
@ -827,7 +827,7 @@ public class TestDefaultMemStore extends TestCase {
KeyValue kv2 = KeyValueTestUtil.create("r", "f", "q", 101, "v");
KeyValue kv3 = KeyValueTestUtil.create("r", "f", "q", 102, "v");
kv1.setMvccVersion(1); kv2.setMvccVersion(1);kv3.setMvccVersion(1);
kv1.setSequenceId(1); kv2.setSequenceId(1);kv3.setSequenceId(1);
l.add(kv1); l.add(kv2); l.add(kv3);
this.memstore.upsert(l, 2);// readpoint is 2
@ -835,7 +835,7 @@ public class TestDefaultMemStore extends TestCase {
assert(newSize > oldSize);
KeyValue kv4 = KeyValueTestUtil.create("r", "f", "q", 104, "v");
kv4.setMvccVersion(1);
kv4.setSequenceId(1);
l.clear(); l.add(kv4);
this.memstore.upsert(l, 3);
assertEquals(newSize, this.memstore.size.get());
@ -877,7 +877,7 @@ public class TestDefaultMemStore extends TestCase {
// test the case that the timeOfOldestEdit is updated after a KV upsert
List<Cell> l = new ArrayList<Cell>();
KeyValue kv1 = KeyValueTestUtil.create("r", "f", "q", 100, "v");
kv1.setMvccVersion(100);
kv1.setSequenceId(100);
l.add(kv1);
memstore.upsert(l, 1000);
t = memstore.timeOfOldestEdit();

View File

@ -528,6 +528,7 @@ public class TestHRegion {
}
long seqId = region.replayRecoveredEditsIfAny(regiondir, maxSeqIdInStores, null, status);
assertEquals(maxSeqId, seqId);
region.getMVCC().initialize(seqId);
Get get = new Get(row);
Result result = region.get(get);
for (long i = minSeqId; i <= maxSeqId; i += 10) {
@ -579,6 +580,7 @@ public class TestHRegion {
}
long seqId = region.replayRecoveredEditsIfAny(regiondir, maxSeqIdInStores, null, status);
assertEquals(maxSeqId, seqId);
region.getMVCC().initialize(seqId);
Get get = new Get(row);
Result result = region.get(get);
for (long i = minSeqId; i <= maxSeqId; i += 10) {

View File

@ -689,7 +689,7 @@ public class TestReversibleScanners {
private static KeyValue makeKV(int rowNum, int cqNum, byte[] familyName) {
KeyValue kv = new KeyValue(ROWS[rowNum], familyName, QUALS[cqNum], TS,
VALUES[rowNum % VALUESIZE]);
kv.setMvccVersion(makeMVCC(rowNum, cqNum));
kv.setSequenceId(makeMVCC(rowNum, cqNum));
return kv;
}

View File

@ -18,7 +18,6 @@
*/
package org.apache.hadoop.hbase.regionserver;
import static org.apache.hadoop.hbase.HBaseTestingUtility.assertKVListsEqual;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
@ -449,5 +448,32 @@ public class TestSeekOptimizations {
}
public void assertKVListsEqual(String additionalMsg,
final List<? extends Cell> expected,
final List<? extends Cell> actual) {
final int eLen = expected.size();
final int aLen = actual.size();
final int minLen = Math.min(eLen, aLen);
int i;
for (i = 0; i < minLen
&& KeyValue.COMPARATOR.compareOnlyKeyPortion(expected.get(i), actual.get(i)) == 0;
++i) {}
if (additionalMsg == null) {
additionalMsg = "";
}
if (!additionalMsg.isEmpty()) {
additionalMsg = ". " + additionalMsg;
}
if (eLen != aLen || i != minLen) {
throw new AssertionError(
"Expected and actual KV arrays differ at position " + i + ": " +
HBaseTestingUtility.safeGetAsStr(expected, i) + " (length " + eLen +") vs. " +
HBaseTestingUtility.safeGetAsStr(actual, i) + " (length " + aLen + ")" + additionalMsg);
}
}
}