HBase-11315: Keeping MVCC for configurable longer time
This commit is contained in:
parent
738bf63f12
commit
cc4f54770b
|
@ -143,6 +143,12 @@ public class TestPayloadCarryingRpcController {
|
|||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getSequenceId() {
|
||||
// unused
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public byte[] getValueArray() {
|
||||
return Bytes.toBytes(this.i);
|
||||
|
|
|
@ -142,14 +142,25 @@ public interface Cell {
|
|||
//6) MvccVersion
|
||||
|
||||
/**
|
||||
* @deprecated as of 1.0, use {@link Cell#getSequenceId()}
|
||||
*
|
||||
* Internal use only. A region-specific sequence ID given to each operation. It always exists for
|
||||
* cells in the memstore but is not retained forever. It may survive several flushes, but
|
||||
* generally becomes irrelevant after the cell's row is no longer involved in any operations that
|
||||
* require strict consistency.
|
||||
* @return mvccVersion (always >= 0 if exists), or 0 if it no longer exists
|
||||
*/
|
||||
@Deprecated
|
||||
long getMvccVersion();
|
||||
|
||||
/**
|
||||
* A region-specific unique monotonically increasing sequence ID given to each Cell. It always
|
||||
* exists for cells in the memstore but is not retained forever. It will be kept for
|
||||
* {@link HConstants#KEEP_SEQID_PERIOD} days, but generally becomes irrelevant after the cell's
|
||||
* row is no longer involved in any operations that require strict consistency.
|
||||
* @return seqId (always > 0 if exists), or 0 if it no longer exists
|
||||
*/
|
||||
long getSequenceId();
|
||||
|
||||
//7) Value
|
||||
|
||||
|
|
|
@ -167,7 +167,7 @@ public final class CellUtil {
|
|||
final long timestamp, final byte type, final byte[] value, final long memstoreTS) {
|
||||
KeyValue keyValue = new KeyValue(row, family, qualifier, timestamp,
|
||||
KeyValue.Type.codeToType(type), value);
|
||||
keyValue.setMvccVersion(memstoreTS);
|
||||
keyValue.setSequenceId(memstoreTS);
|
||||
return keyValue;
|
||||
}
|
||||
|
||||
|
@ -175,7 +175,7 @@ public final class CellUtil {
|
|||
final long timestamp, final byte type, final byte[] value, byte[] tags, final long memstoreTS) {
|
||||
KeyValue keyValue = new KeyValue(row, family, qualifier, timestamp,
|
||||
KeyValue.Type.codeToType(type), value, tags);
|
||||
keyValue.setMvccVersion(memstoreTS);
|
||||
keyValue.setSequenceId(memstoreTS);
|
||||
return keyValue;
|
||||
}
|
||||
|
||||
|
|
|
@ -353,6 +353,11 @@ public final class HConstants {
|
|||
|
||||
/** Default value for cluster ID */
|
||||
public static final String CLUSTER_ID_DEFAULT = "default-cluster";
|
||||
|
||||
/** Parameter name for # days to keep MVCC values during a major compaction */
|
||||
public static final String KEEP_SEQID_PERIOD = "hbase.hstore.compaction.keep.seqId.period";
|
||||
/** At least to keep MVCC values in hfiles for 5 days */
|
||||
public static final int MIN_KEEP_SEQID_PERIOD = 5;
|
||||
|
||||
// Always store the location of the root table's HRegion.
|
||||
// This HRegion is never split.
|
||||
|
|
|
@ -284,15 +284,23 @@ public class KeyValue implements Cell, HeapSize, Cloneable {
|
|||
// used to achieve atomic operations in the memstore.
|
||||
@Override
|
||||
public long getMvccVersion() {
|
||||
return mvcc;
|
||||
return this.getSequenceId();
|
||||
}
|
||||
|
||||
public void setMvccVersion(long mvccVersion){
|
||||
this.mvcc = mvccVersion;
|
||||
/**
|
||||
* used to achieve atomic operations in the memstore.
|
||||
*/
|
||||
@Override
|
||||
public long getSequenceId() {
|
||||
return seqId;
|
||||
}
|
||||
|
||||
public void setSequenceId(long seqId) {
|
||||
this.seqId = seqId;
|
||||
}
|
||||
|
||||
// multi-version concurrency control version. default value is 0, aka do not care.
|
||||
private long mvcc = 0; // this value is not part of a serialized KeyValue (not in HFiles)
|
||||
private long seqId = 0;
|
||||
|
||||
/** Dragon time over, return to normal business */
|
||||
|
||||
|
@ -1083,7 +1091,7 @@ public class KeyValue implements Cell, HeapSize, Cloneable {
|
|||
// Important to clone the memstoreTS as well - otherwise memstore's
|
||||
// update-in-place methods (eg increment) will end up creating
|
||||
// new entries
|
||||
ret.setMvccVersion(mvcc);
|
||||
ret.setSequenceId(seqId);
|
||||
return ret;
|
||||
}
|
||||
|
||||
|
@ -1094,7 +1102,7 @@ public class KeyValue implements Cell, HeapSize, Cloneable {
|
|||
*/
|
||||
public KeyValue shallowCopy() {
|
||||
KeyValue shallowCopy = new KeyValue(this.bytes, this.offset, this.length);
|
||||
shallowCopy.setMvccVersion(this.mvcc);
|
||||
shallowCopy.setSequenceId(this.seqId);
|
||||
return shallowCopy;
|
||||
}
|
||||
|
||||
|
@ -1108,8 +1116,8 @@ public class KeyValue implements Cell, HeapSize, Cloneable {
|
|||
if (this.bytes == null || this.bytes.length == 0) {
|
||||
return "empty";
|
||||
}
|
||||
return keyToString(this.bytes, this.offset + ROW_OFFSET, getKeyLength()) +
|
||||
"/vlen=" + getValueLength() + "/mvcc=" + mvcc;
|
||||
return keyToString(this.bytes, this.offset + ROW_OFFSET, getKeyLength()) + "/vlen="
|
||||
+ getValueLength() + "/seqid=" + seqId;
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -75,7 +75,7 @@ public class KeyValueUtil {
|
|||
public static KeyValue copyToNewKeyValue(final Cell cell) {
|
||||
byte[] bytes = copyToNewByteArray(cell);
|
||||
KeyValue kvCell = new KeyValue(bytes, 0, bytes.length);
|
||||
kvCell.setMvccVersion(cell.getMvccVersion());
|
||||
kvCell.setSequenceId(cell.getMvccVersion());
|
||||
return kvCell;
|
||||
}
|
||||
|
||||
|
@ -175,7 +175,7 @@ public class KeyValueUtil {
|
|||
keyValue = new KeyValue(bb.array(), underlyingArrayOffset, kvLength);
|
||||
if (includesMvccVersion) {
|
||||
long mvccVersion = ByteBufferUtils.readVLong(bb);
|
||||
keyValue.setMvccVersion(mvccVersion);
|
||||
keyValue.setSequenceId(mvccVersion);
|
||||
}
|
||||
return keyValue;
|
||||
}
|
||||
|
|
|
@ -231,6 +231,11 @@ abstract class BufferedDataBlockEncoder implements DataBlockEncoder {
|
|||
return memstoreTS;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getSequenceId() {
|
||||
return memstoreTS;
|
||||
}
|
||||
|
||||
@Override
|
||||
public byte[] getValueArray() {
|
||||
return currentBuffer.array();
|
||||
|
@ -421,6 +426,11 @@ abstract class BufferedDataBlockEncoder implements DataBlockEncoder {
|
|||
return memstoreTS;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getSequenceId() {
|
||||
return memstoreTS;
|
||||
}
|
||||
|
||||
@Override
|
||||
public byte[] getValueArray() {
|
||||
return currentBuffer.array();
|
||||
|
|
|
@ -125,7 +125,7 @@ public class EncodedDataBlock {
|
|||
(int) KeyValue.getKeyValueDataStructureSize(klen, vlen, tagsLen));
|
||||
if (meta.isIncludesMvcc()) {
|
||||
long mvccVersion = ByteBufferUtils.readVLong(decompressedData);
|
||||
kv.setMvccVersion(mvccVersion);
|
||||
kv.setSequenceId(mvccVersion);
|
||||
}
|
||||
return kv;
|
||||
}
|
||||
|
@ -244,7 +244,7 @@ public class EncodedDataBlock {
|
|||
}
|
||||
kv = new KeyValue(in.array(), kvOffset, (int) KeyValue.getKeyValueDataStructureSize(
|
||||
klength, vlength, tagsLength));
|
||||
kv.setMvccVersion(memstoreTS);
|
||||
kv.setSequenceId(memstoreTS);
|
||||
this.dataBlockEncoder.encode(kv, encodingCtx, out);
|
||||
}
|
||||
BufferGrabbingByteArrayOutputStream stream = new BufferGrabbingByteArrayOutputStream();
|
||||
|
|
|
@ -70,7 +70,7 @@ public class TestCellCodec {
|
|||
Codec.Encoder encoder = codec.getEncoder(dos);
|
||||
final KeyValue kv =
|
||||
new KeyValue(Bytes.toBytes("r"), Bytes.toBytes("f"), Bytes.toBytes("q"), Bytes.toBytes("v"));
|
||||
kv.setMvccVersion(Long.MAX_VALUE);
|
||||
kv.setSequenceId(Long.MAX_VALUE);
|
||||
encoder.write(kv);
|
||||
encoder.flush();
|
||||
dos.close();
|
||||
|
|
|
@ -50,7 +50,7 @@ public class TestByteRangeWithKVSerialization {
|
|||
long mvcc = pbr.getVLong();
|
||||
KeyValue kv = new KeyValue(pbr.getBytes(), kvStartPos,
|
||||
(int) KeyValue.getKeyValueDataStructureSize(keyLen, valLen, tagsLen));
|
||||
kv.setMvccVersion(mvcc);
|
||||
kv.setSequenceId(mvcc);
|
||||
return kv;
|
||||
}
|
||||
|
||||
|
@ -65,7 +65,7 @@ public class TestByteRangeWithKVSerialization {
|
|||
Tag[] tags = new Tag[] { new Tag((byte) 1, "tag1") };
|
||||
for (int i = 0; i < kvCount; i++) {
|
||||
KeyValue kv = new KeyValue(Bytes.toBytes(i), FAMILY, QUALIFIER, i, VALUE, tags);
|
||||
kv.setMvccVersion(i);
|
||||
kv.setSequenceId(i);
|
||||
kvs.add(kv);
|
||||
totalSize += kv.getLength() + Bytes.SIZEOF_LONG;
|
||||
}
|
||||
|
|
|
@ -122,6 +122,11 @@ public class PrefixTreeCell implements Cell, Comparable<Cell> {
|
|||
return mvccVersion;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getSequenceId() {
|
||||
return getMvccVersion();
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getValueLength() {
|
||||
return valueLength;
|
||||
|
|
|
@ -44,21 +44,21 @@ public class TestRowDataDifferentTimestamps extends BaseTestRowData{
|
|||
static List<KeyValue> d = Lists.newArrayList();
|
||||
static{
|
||||
KeyValue kv0 = new KeyValue(Arow, cf, cq0, 0L, v0);
|
||||
kv0.setMvccVersion(123456789L);
|
||||
kv0.setSequenceId(123456789L);
|
||||
d.add(kv0);
|
||||
|
||||
KeyValue kv1 = new KeyValue(Arow, cf, cq1, 1L, v0);
|
||||
kv1.setMvccVersion(3L);
|
||||
kv1.setSequenceId(3L);
|
||||
d.add(kv1);
|
||||
|
||||
KeyValue kv2 = new KeyValue(Brow, cf, cq0, 12345678L, v0);
|
||||
kv2.setMvccVersion(65537L);
|
||||
kv2.setSequenceId(65537L);
|
||||
d.add(kv2);
|
||||
|
||||
//watch out... Long.MAX_VALUE comes back as 1332221664203, even with other encoders
|
||||
// d.add(new KeyValue(Brow, cf, cq1, Long.MAX_VALUE, v0));
|
||||
KeyValue kv3 = new KeyValue(Brow, cf, cq1, Long.MAX_VALUE-1, v0);
|
||||
kv3.setMvccVersion(1L);
|
||||
kv3.setSequenceId(1L);
|
||||
d.add(kv3);
|
||||
|
||||
KeyValue kv4 = new KeyValue(Brow, cf, cq1, 999999999, v0);
|
||||
|
@ -66,7 +66,7 @@ public class TestRowDataDifferentTimestamps extends BaseTestRowData{
|
|||
d.add(kv4);
|
||||
|
||||
KeyValue kv5 = new KeyValue(Brow, cf, cq1, 12345, v0);
|
||||
kv5.setMvccVersion(0L);
|
||||
kv5.setSequenceId(0L);
|
||||
d.add(kv5);
|
||||
}
|
||||
|
||||
|
|
|
@ -897,6 +897,16 @@ public final class WALProtos {
|
|||
* <code>optional uint64 nonce = 10;</code>
|
||||
*/
|
||||
long getNonce();
|
||||
|
||||
// optional uint64 orig_sequence_number = 11;
|
||||
/**
|
||||
* <code>optional uint64 orig_sequence_number = 11;</code>
|
||||
*/
|
||||
boolean hasOrigSequenceNumber();
|
||||
/**
|
||||
* <code>optional uint64 orig_sequence_number = 11;</code>
|
||||
*/
|
||||
long getOrigSequenceNumber();
|
||||
}
|
||||
/**
|
||||
* Protobuf type {@code WALKey}
|
||||
|
@ -1017,6 +1027,11 @@ public final class WALProtos {
|
|||
nonce_ = input.readUInt64();
|
||||
break;
|
||||
}
|
||||
case 88: {
|
||||
bitField0_ |= 0x00000100;
|
||||
origSequenceNumber_ = input.readUInt64();
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
} catch (com.google.protobuf.InvalidProtocolBufferException e) {
|
||||
|
@ -1323,6 +1338,22 @@ public final class WALProtos {
|
|||
return nonce_;
|
||||
}
|
||||
|
||||
// optional uint64 orig_sequence_number = 11;
|
||||
public static final int ORIG_SEQUENCE_NUMBER_FIELD_NUMBER = 11;
|
||||
private long origSequenceNumber_;
|
||||
/**
|
||||
* <code>optional uint64 orig_sequence_number = 11;</code>
|
||||
*/
|
||||
public boolean hasOrigSequenceNumber() {
|
||||
return ((bitField0_ & 0x00000100) == 0x00000100);
|
||||
}
|
||||
/**
|
||||
* <code>optional uint64 orig_sequence_number = 11;</code>
|
||||
*/
|
||||
public long getOrigSequenceNumber() {
|
||||
return origSequenceNumber_;
|
||||
}
|
||||
|
||||
private void initFields() {
|
||||
encodedRegionName_ = com.google.protobuf.ByteString.EMPTY;
|
||||
tableName_ = com.google.protobuf.ByteString.EMPTY;
|
||||
|
@ -1334,6 +1365,7 @@ public final class WALProtos {
|
|||
clusterIds_ = java.util.Collections.emptyList();
|
||||
nonceGroup_ = 0L;
|
||||
nonce_ = 0L;
|
||||
origSequenceNumber_ = 0L;
|
||||
}
|
||||
private byte memoizedIsInitialized = -1;
|
||||
public final boolean isInitialized() {
|
||||
|
@ -1411,6 +1443,9 @@ public final class WALProtos {
|
|||
if (((bitField0_ & 0x00000080) == 0x00000080)) {
|
||||
output.writeUInt64(10, nonce_);
|
||||
}
|
||||
if (((bitField0_ & 0x00000100) == 0x00000100)) {
|
||||
output.writeUInt64(11, origSequenceNumber_);
|
||||
}
|
||||
getUnknownFields().writeTo(output);
|
||||
}
|
||||
|
||||
|
@ -1460,6 +1495,10 @@ public final class WALProtos {
|
|||
size += com.google.protobuf.CodedOutputStream
|
||||
.computeUInt64Size(10, nonce_);
|
||||
}
|
||||
if (((bitField0_ & 0x00000100) == 0x00000100)) {
|
||||
size += com.google.protobuf.CodedOutputStream
|
||||
.computeUInt64Size(11, origSequenceNumber_);
|
||||
}
|
||||
size += getUnknownFields().getSerializedSize();
|
||||
memoizedSerializedSize = size;
|
||||
return size;
|
||||
|
@ -1527,6 +1566,11 @@ public final class WALProtos {
|
|||
result = result && (getNonce()
|
||||
== other.getNonce());
|
||||
}
|
||||
result = result && (hasOrigSequenceNumber() == other.hasOrigSequenceNumber());
|
||||
if (hasOrigSequenceNumber()) {
|
||||
result = result && (getOrigSequenceNumber()
|
||||
== other.getOrigSequenceNumber());
|
||||
}
|
||||
result = result &&
|
||||
getUnknownFields().equals(other.getUnknownFields());
|
||||
return result;
|
||||
|
@ -1580,6 +1624,10 @@ public final class WALProtos {
|
|||
hash = (37 * hash) + NONCE_FIELD_NUMBER;
|
||||
hash = (53 * hash) + hashLong(getNonce());
|
||||
}
|
||||
if (hasOrigSequenceNumber()) {
|
||||
hash = (37 * hash) + ORIG_SEQUENCE_NUMBER_FIELD_NUMBER;
|
||||
hash = (53 * hash) + hashLong(getOrigSequenceNumber());
|
||||
}
|
||||
hash = (29 * hash) + getUnknownFields().hashCode();
|
||||
memoizedHashCode = hash;
|
||||
return hash;
|
||||
|
@ -1728,6 +1776,8 @@ public final class WALProtos {
|
|||
bitField0_ = (bitField0_ & ~0x00000100);
|
||||
nonce_ = 0L;
|
||||
bitField0_ = (bitField0_ & ~0x00000200);
|
||||
origSequenceNumber_ = 0L;
|
||||
bitField0_ = (bitField0_ & ~0x00000400);
|
||||
return this;
|
||||
}
|
||||
|
||||
|
@ -1810,6 +1860,10 @@ public final class WALProtos {
|
|||
to_bitField0_ |= 0x00000080;
|
||||
}
|
||||
result.nonce_ = nonce_;
|
||||
if (((from_bitField0_ & 0x00000400) == 0x00000400)) {
|
||||
to_bitField0_ |= 0x00000100;
|
||||
}
|
||||
result.origSequenceNumber_ = origSequenceNumber_;
|
||||
result.bitField0_ = to_bitField0_;
|
||||
onBuilt();
|
||||
return result;
|
||||
|
@ -1902,6 +1956,9 @@ public final class WALProtos {
|
|||
if (other.hasNonce()) {
|
||||
setNonce(other.getNonce());
|
||||
}
|
||||
if (other.hasOrigSequenceNumber()) {
|
||||
setOrigSequenceNumber(other.getOrigSequenceNumber());
|
||||
}
|
||||
this.mergeUnknownFields(other.getUnknownFields());
|
||||
return this;
|
||||
}
|
||||
|
@ -2977,6 +3034,39 @@ public final class WALProtos {
|
|||
return this;
|
||||
}
|
||||
|
||||
// optional uint64 orig_sequence_number = 11;
|
||||
private long origSequenceNumber_ ;
|
||||
/**
|
||||
* <code>optional uint64 orig_sequence_number = 11;</code>
|
||||
*/
|
||||
public boolean hasOrigSequenceNumber() {
|
||||
return ((bitField0_ & 0x00000400) == 0x00000400);
|
||||
}
|
||||
/**
|
||||
* <code>optional uint64 orig_sequence_number = 11;</code>
|
||||
*/
|
||||
public long getOrigSequenceNumber() {
|
||||
return origSequenceNumber_;
|
||||
}
|
||||
/**
|
||||
* <code>optional uint64 orig_sequence_number = 11;</code>
|
||||
*/
|
||||
public Builder setOrigSequenceNumber(long value) {
|
||||
bitField0_ |= 0x00000400;
|
||||
origSequenceNumber_ = value;
|
||||
onChanged();
|
||||
return this;
|
||||
}
|
||||
/**
|
||||
* <code>optional uint64 orig_sequence_number = 11;</code>
|
||||
*/
|
||||
public Builder clearOrigSequenceNumber() {
|
||||
bitField0_ = (bitField0_ & ~0x00000400);
|
||||
origSequenceNumber_ = 0L;
|
||||
onChanged();
|
||||
return this;
|
||||
}
|
||||
|
||||
// @@protoc_insertion_point(builder_scope:WALKey)
|
||||
}
|
||||
|
||||
|
@ -5176,24 +5266,24 @@ public final class WALProtos {
|
|||
java.lang.String[] descriptorData = {
|
||||
"\n\tWAL.proto\032\013HBase.proto\"Y\n\tWALHeader\022\027\n" +
|
||||
"\017has_compression\030\001 \001(\010\022\026\n\016encryption_key" +
|
||||
"\030\002 \001(\014\022\033\n\023has_tag_compression\030\003 \001(\010\"\202\002\n\006" +
|
||||
"\030\002 \001(\014\022\033\n\023has_tag_compression\030\003 \001(\010\"\240\002\n\006" +
|
||||
"WALKey\022\033\n\023encoded_region_name\030\001 \002(\014\022\022\n\nt" +
|
||||
"able_name\030\002 \002(\014\022\033\n\023log_sequence_number\030\003" +
|
||||
" \002(\004\022\022\n\nwrite_time\030\004 \002(\004\022\035\n\ncluster_id\030\005" +
|
||||
" \001(\0132\005.UUIDB\002\030\001\022\034\n\006scopes\030\006 \003(\0132\014.Family" +
|
||||
"Scope\022\032\n\022following_kv_count\030\007 \001(\r\022\032\n\013clu" +
|
||||
"ster_ids\030\010 \003(\0132\005.UUID\022\022\n\nnonceGroup\030\t \001(" +
|
||||
"\004\022\r\n\005nonce\030\n \001(\004\"=\n\013FamilyScope\022\016\n\006famil",
|
||||
"y\030\001 \002(\014\022\036\n\nscope_type\030\002 \002(\0162\n.ScopeType\"" +
|
||||
"\251\001\n\024CompactionDescriptor\022\022\n\ntable_name\030\001" +
|
||||
" \002(\014\022\033\n\023encoded_region_name\030\002 \002(\014\022\023\n\013fam" +
|
||||
"ily_name\030\003 \002(\014\022\030\n\020compaction_input\030\004 \003(\t" +
|
||||
"\022\031\n\021compaction_output\030\005 \003(\t\022\026\n\016store_hom" +
|
||||
"e_dir\030\006 \002(\t\"\014\n\nWALTrailer*F\n\tScopeType\022\033" +
|
||||
"\n\027REPLICATION_SCOPE_LOCAL\020\000\022\034\n\030REPLICATI" +
|
||||
"ON_SCOPE_GLOBAL\020\001B?\n*org.apache.hadoop.h" +
|
||||
"base.protobuf.generatedB\tWALProtosH\001\210\001\000\240" +
|
||||
"\001\001"
|
||||
"\004\022\r\n\005nonce\030\n \001(\004\022\034\n\024orig_sequence_number",
|
||||
"\030\013 \001(\004\"=\n\013FamilyScope\022\016\n\006family\030\001 \002(\014\022\036\n" +
|
||||
"\nscope_type\030\002 \002(\0162\n.ScopeType\"\251\001\n\024Compac" +
|
||||
"tionDescriptor\022\022\n\ntable_name\030\001 \002(\014\022\033\n\023en" +
|
||||
"coded_region_name\030\002 \002(\014\022\023\n\013family_name\030\003" +
|
||||
" \002(\014\022\030\n\020compaction_input\030\004 \003(\t\022\031\n\021compac" +
|
||||
"tion_output\030\005 \003(\t\022\026\n\016store_home_dir\030\006 \002(" +
|
||||
"\t\"\014\n\nWALTrailer*F\n\tScopeType\022\033\n\027REPLICAT" +
|
||||
"ION_SCOPE_LOCAL\020\000\022\034\n\030REPLICATION_SCOPE_G" +
|
||||
"LOBAL\020\001B?\n*org.apache.hadoop.hbase.proto" +
|
||||
"buf.generatedB\tWALProtosH\001\210\001\000\240\001\001"
|
||||
};
|
||||
com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
|
||||
new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
|
||||
|
@ -5211,7 +5301,7 @@ public final class WALProtos {
|
|||
internal_static_WALKey_fieldAccessorTable = new
|
||||
com.google.protobuf.GeneratedMessage.FieldAccessorTable(
|
||||
internal_static_WALKey_descriptor,
|
||||
new java.lang.String[] { "EncodedRegionName", "TableName", "LogSequenceNumber", "WriteTime", "ClusterId", "Scopes", "FollowingKvCount", "ClusterIds", "NonceGroup", "Nonce", });
|
||||
new java.lang.String[] { "EncodedRegionName", "TableName", "LogSequenceNumber", "WriteTime", "ClusterId", "Scopes", "FollowingKvCount", "ClusterIds", "NonceGroup", "Nonce", "OrigSequenceNumber", });
|
||||
internal_static_FamilyScope_descriptor =
|
||||
getDescriptor().getMessageTypes().get(2);
|
||||
internal_static_FamilyScope_fieldAccessorTable = new
|
||||
|
|
|
@ -54,6 +54,7 @@ message WALKey {
|
|||
|
||||
optional uint64 nonceGroup = 9;
|
||||
optional uint64 nonce = 10;
|
||||
optional uint64 orig_sequence_number = 11;
|
||||
|
||||
/*
|
||||
optional CustomEntryType custom_entry_type = 9;
|
||||
|
|
|
@ -765,7 +765,7 @@ public class HFileReaderV2 extends AbstractHFileReader {
|
|||
KeyValue ret = new KeyValue(blockBuffer.array(), blockBuffer.arrayOffset()
|
||||
+ blockBuffer.position(), getCellBufSize());
|
||||
if (this.reader.shouldIncludeMemstoreTS()) {
|
||||
ret.setMvccVersion(currMemstoreTS);
|
||||
ret.setSequenceId(currMemstoreTS);
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
|
|
@ -106,6 +106,9 @@ public class ReplicationProtbufUtil {
|
|||
uuidBuilder.setMostSigBits(clusterId.getMostSignificantBits());
|
||||
keyBuilder.addClusterIds(uuidBuilder.build());
|
||||
}
|
||||
if(key.getOrigLogSeqNum() > 0) {
|
||||
keyBuilder.setOrigSequenceNumber(key.getOrigLogSeqNum());
|
||||
}
|
||||
WALEdit edit = entry.getEdit();
|
||||
NavigableMap<byte[], Integer> scopes = key.getScopes();
|
||||
if (scopes != null && !scopes.isEmpty()) {
|
||||
|
|
|
@ -271,7 +271,7 @@ public class DefaultMemStore implements MemStore {
|
|||
assert alloc.getBytes() != null;
|
||||
alloc.put(0, kv.getBuffer(), kv.getOffset(), len);
|
||||
KeyValue newKv = new KeyValue(alloc.getBytes(), alloc.getOffset(), len);
|
||||
newKv.setMvccVersion(kv.getMvccVersion());
|
||||
newKv.setSequenceId(kv.getMvccVersion());
|
||||
return newKv;
|
||||
}
|
||||
|
||||
|
|
|
@ -2155,6 +2155,7 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
/** This method is potentially expensive and should only be used for non-replay CP path. */
|
||||
public abstract Mutation[] getMutationsForCoprocs();
|
||||
public abstract boolean isInReplay();
|
||||
public abstract long getReplaySequenceId();
|
||||
|
||||
public boolean isDone() {
|
||||
return nextIndexToProcess == operations.length;
|
||||
|
@ -2194,11 +2195,18 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
public boolean isInReplay() {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getReplaySequenceId() {
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
||||
private static class ReplayBatch extends BatchOperationInProgress<HLogSplitter.MutationReplay> {
|
||||
public ReplayBatch(MutationReplay[] operations) {
|
||||
private long replaySeqId = 0;
|
||||
public ReplayBatch(MutationReplay[] operations, long seqId) {
|
||||
super(operations);
|
||||
this.replaySeqId = seqId;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -2226,6 +2234,11 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
public boolean isInReplay() {
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getReplaySequenceId() {
|
||||
return this.replaySeqId;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -2252,13 +2265,14 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
/**
|
||||
* Replay a batch of mutations.
|
||||
* @param mutations mutations to replay.
|
||||
* @param replaySeqId SeqId for current mutations
|
||||
* @return an array of OperationStatus which internally contains the
|
||||
* OperationStatusCode and the exceptionMessage if any.
|
||||
* @throws IOException
|
||||
*/
|
||||
public OperationStatus[] batchReplay(HLogSplitter.MutationReplay[] mutations)
|
||||
public OperationStatus[] batchReplay(HLogSplitter.MutationReplay[] mutations, long replaySeqId)
|
||||
throws IOException {
|
||||
return batchMutate(new ReplayBatch(mutations));
|
||||
return batchMutate(new ReplayBatch(mutations, replaySeqId));
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -2475,7 +2489,7 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
// ------------------------------------
|
||||
// STEP 2. Update any LATEST_TIMESTAMP timestamps
|
||||
// ----------------------------------
|
||||
for (int i = firstIndex; i < lastIndexExclusive; i++) {
|
||||
for (int i = firstIndex; !isInReplay && i < lastIndexExclusive; i++) {
|
||||
// skip invalid
|
||||
if (batchOp.retCodeDetails[i].getOperationStatusCode()
|
||||
!= OperationStatusCode.NOT_RUN) continue;
|
||||
|
@ -2485,16 +2499,18 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
updateKVTimestamps(familyMaps[i].values(), byteNow);
|
||||
noOfPuts++;
|
||||
} else {
|
||||
if (!isInReplay) {
|
||||
prepareDeleteTimestamps(mutation, familyMaps[i], byteNow);
|
||||
}
|
||||
prepareDeleteTimestamps(mutation, familyMaps[i], byteNow);
|
||||
noOfDeletes++;
|
||||
}
|
||||
}
|
||||
|
||||
lock(this.updatesLock.readLock(), numReadyToWrite);
|
||||
locked = true;
|
||||
mvccNum = MultiVersionConsistencyControl.getPreAssignedWriteNumber(this.sequenceId);
|
||||
if(isInReplay) {
|
||||
mvccNum = batchOp.getReplaySequenceId();
|
||||
} else {
|
||||
mvccNum = MultiVersionConsistencyControl.getPreAssignedWriteNumber(this.sequenceId);
|
||||
}
|
||||
//
|
||||
// ------------------------------------
|
||||
// Acquire the latest mvcc number
|
||||
|
@ -2591,6 +2607,9 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
walKey = new HLogKey(this.getRegionInfo().getEncodedNameAsBytes(),
|
||||
this.htableDescriptor.getTableName(), HLog.NO_SEQUENCE_ID, now,
|
||||
mutation.getClusterIds(), currentNonceGroup, currentNonce);
|
||||
if(isInReplay) {
|
||||
walKey.setOrigLogSeqNum(batchOp.getReplaySequenceId());
|
||||
}
|
||||
txid = this.log.appendNoSync(this.htableDescriptor, this.getRegionInfo(), walKey, walEdit,
|
||||
getSequenceId(), true, memstoreCells);
|
||||
}
|
||||
|
@ -2952,7 +2971,7 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
Store store = getStore(family);
|
||||
for (Cell cell: cells) {
|
||||
KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
|
||||
kv.setMvccVersion(mvccNum);
|
||||
kv.setSequenceId(mvccNum);
|
||||
Pair<Long, Cell> ret = store.add(kv);
|
||||
size += ret.getFirst();
|
||||
memstoreCells.add(KeyValueUtil.ensureKeyValue(ret.getSecond()));
|
||||
|
@ -3213,6 +3232,7 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
try {
|
||||
reader = HLogFactory.createReader(fs, edits, conf);
|
||||
long currentEditSeqId = -1;
|
||||
long currentReplaySeqId = -1;
|
||||
long firstSeqIdInLog = -1;
|
||||
long skippedEdits = 0;
|
||||
long editsCount = 0;
|
||||
|
@ -3275,6 +3295,8 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
firstSeqIdInLog = key.getLogSeqNum();
|
||||
}
|
||||
currentEditSeqId = key.getLogSeqNum();
|
||||
currentReplaySeqId = (key.getOrigLogSeqNum() > 0) ?
|
||||
key.getOrigLogSeqNum() : currentEditSeqId;
|
||||
boolean flush = false;
|
||||
for (KeyValue kv: val.getKeyValues()) {
|
||||
// Check this edit is for me. Also, guard against writing the special
|
||||
|
@ -3309,6 +3331,7 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
skippedEdits++;
|
||||
continue;
|
||||
}
|
||||
kv.setSequenceId(currentReplaySeqId);
|
||||
// Once we are over the limit, restoreEdit will keep returning true to
|
||||
// flush -- but don't flush until we've played all the kvs that make up
|
||||
// the WALEdit.
|
||||
|
@ -4922,7 +4945,7 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
writeEntry = mvcc.beginMemstoreInsertWithSeqNum(mvccNum);
|
||||
// 6. Apply to memstore
|
||||
for (KeyValue kv : mutations) {
|
||||
kv.setMvccVersion(mvccNum);
|
||||
kv.setSequenceId(mvccNum);
|
||||
Store store = getStore(kv);
|
||||
if (store == null) {
|
||||
checkFamily(CellUtil.cloneFamily(kv));
|
||||
|
@ -5168,7 +5191,7 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
// so only need to update the timestamp to 'now'
|
||||
newKV.updateLatestStamp(Bytes.toBytes(now));
|
||||
}
|
||||
newKV.setMvccVersion(mvccNum);
|
||||
newKV.setSequenceId(mvccNum);
|
||||
// Give coprocessors a chance to update the new cell
|
||||
if (coprocessorHost != null) {
|
||||
newKV = KeyValueUtil.ensureKeyValue(coprocessorHost.postMutationBeforeWAL(
|
||||
|
@ -5382,7 +5405,7 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
System.arraycopy(kv.getTagsArray(), kv.getTagsOffset(), newKV.getTagsArray(),
|
||||
newKV.getTagsOffset() + oldCellTagsLen, incCellTagsLen);
|
||||
}
|
||||
newKV.setMvccVersion(mvccNum);
|
||||
newKV.setSequenceId(mvccNum);
|
||||
// Give coprocessors a chance to update the new cell
|
||||
if (coprocessorHost != null) {
|
||||
newKV = KeyValueUtil.ensureKeyValue(coprocessorHost.postMutationBeforeWAL(
|
||||
|
@ -6220,4 +6243,14 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
WALEdit.EMPTY_WALEDIT, this.sequenceId, false, cells);
|
||||
return key;
|
||||
}
|
||||
|
||||
/**
|
||||
* Explictly sync wal
|
||||
* @throws IOException
|
||||
*/
|
||||
public void syncWal() throws IOException {
|
||||
if(this.log != null) {
|
||||
this.log.sync();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -246,7 +246,7 @@ public class MultiVersionConsistencyControl {
|
|||
|
||||
public static class WriteEntry {
|
||||
private long writeNumber;
|
||||
private boolean completed = false;
|
||||
private volatile boolean completed = false;
|
||||
|
||||
WriteEntry(long writeNumber) {
|
||||
this.writeNumber = writeNumber;
|
||||
|
|
|
@ -633,12 +633,13 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
|
|||
* constructing MultiResponse to save a possible loop if caller doesn't need MultiResponse.
|
||||
* @param region
|
||||
* @param mutations
|
||||
* @param replaySeqId
|
||||
* @return an array of OperationStatus which internally contains the OperationStatusCode and the
|
||||
* exceptionMessage if any
|
||||
* @throws IOException
|
||||
*/
|
||||
private OperationStatus [] doReplayBatchOp(final HRegion region,
|
||||
final List<HLogSplitter.MutationReplay> mutations) throws IOException {
|
||||
final List<HLogSplitter.MutationReplay> mutations, long replaySeqId) throws IOException {
|
||||
HLogSplitter.MutationReplay[] mArray = new HLogSplitter.MutationReplay[mutations.size()];
|
||||
|
||||
long before = EnvironmentEdgeManager.currentTimeMillis();
|
||||
|
@ -657,7 +658,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
|
|||
if (!region.getRegionInfo().isMetaTable()) {
|
||||
regionServer.cacheFlusher.reclaimMemStoreMemory();
|
||||
}
|
||||
return region.batchReplay(mArray);
|
||||
return region.batchReplay(mArray, replaySeqId);
|
||||
} finally {
|
||||
if (regionServer.metricsRegionServer != null) {
|
||||
long after = EnvironmentEdgeManager.currentTimeMillis();
|
||||
|
@ -1330,7 +1331,6 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
|
|||
entries.get(0).getKey().getEncodedRegionName().toStringUtf8());
|
||||
RegionCoprocessorHost coprocessorHost = region.getCoprocessorHost();
|
||||
List<Pair<HLogKey, WALEdit>> walEntries = new ArrayList<Pair<HLogKey, WALEdit>>();
|
||||
List<HLogSplitter.MutationReplay> mutations = new ArrayList<HLogSplitter.MutationReplay>();
|
||||
// when tag is enabled, we need tag replay edits with log sequence number
|
||||
boolean needAddReplayTag = (HFile.getFormatVersion(regionServer.conf) >= 3);
|
||||
for (WALEntry entry : entries) {
|
||||
|
@ -1354,18 +1354,22 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
|
|||
}
|
||||
walEntries.add(walEntry);
|
||||
}
|
||||
mutations.addAll(edits);
|
||||
}
|
||||
|
||||
if (!mutations.isEmpty()) {
|
||||
OperationStatus[] result = doReplayBatchOp(region, mutations);
|
||||
// check if it's a partial success
|
||||
for (int i = 0; result != null && i < result.length; i++) {
|
||||
if (result[i] != OperationStatus.SUCCESS) {
|
||||
throw new IOException(result[i].getExceptionMsg());
|
||||
if(edits!=null && !edits.isEmpty()) {
|
||||
long replaySeqId = (entry.getKey().hasOrigSequenceNumber()) ?
|
||||
entry.getKey().getOrigSequenceNumber() : entry.getKey().getLogSequenceNumber();
|
||||
OperationStatus[] result = doReplayBatchOp(region, edits, replaySeqId);
|
||||
// check if it's a partial success
|
||||
for (int i = 0; result != null && i < result.length; i++) {
|
||||
if (result[i] != OperationStatus.SUCCESS) {
|
||||
throw new IOException(result[i].getExceptionMsg());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
//sync wal at the end because ASYNC_WAL is used above
|
||||
region.syncWal();
|
||||
|
||||
if (coprocessorHost != null) {
|
||||
for (Pair<HLogKey, WALEdit> wal : walEntries) {
|
||||
coprocessorHost.postWALRestore(region.getRegionInfo(), wal.getFirst(),
|
||||
|
|
|
@ -211,15 +211,6 @@ public class StoreFileScanner implements KeyValueScanner {
|
|||
return false;
|
||||
}
|
||||
|
||||
// For the optimisation in HBASE-4346, we set the KV's memstoreTS to
|
||||
// 0, if it is older than all the scanners' read points. It is possible
|
||||
// that a newer KV's memstoreTS was reset to 0. But, there is an
|
||||
// older KV which was not reset to 0 (because it was
|
||||
// not old enough during flush). Make sure that we set it correctly now,
|
||||
// so that the comparision order does not change.
|
||||
if (cur.getMvccVersion() <= readPt) {
|
||||
KeyValueUtil.ensureKeyValue(cur).setMvccVersion(0);
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
|
|
|
@ -58,6 +58,9 @@ public abstract class Compactor {
|
|||
|
||||
private int compactionKVMax;
|
||||
protected Compression.Algorithm compactionCompression;
|
||||
|
||||
/** specify how many days to keep MVCC values during major compaction **/
|
||||
protected int keepSeqIdPeriod;
|
||||
|
||||
//TODO: depending on Store is not good but, realistically, all compactors currently do.
|
||||
Compactor(final Configuration conf, final Store store) {
|
||||
|
@ -67,6 +70,8 @@ public abstract class Compactor {
|
|||
this.conf.getInt(HConstants.COMPACTION_KV_MAX, HConstants.COMPACTION_KV_MAX_DEFAULT);
|
||||
this.compactionCompression = (this.store.getFamily() == null) ?
|
||||
Compression.Algorithm.NONE : this.store.getFamily().getCompactionCompression();
|
||||
this.keepSeqIdPeriod = Math.max(this.conf.getInt(HConstants.KEEP_SEQID_PERIOD,
|
||||
HConstants.MIN_KEEP_SEQID_PERIOD), HConstants.MIN_KEEP_SEQID_PERIOD);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -92,19 +97,30 @@ public abstract class Compactor {
|
|||
public long maxMVCCReadpoint = 0;
|
||||
/** Max tags length**/
|
||||
public int maxTagsLength = 0;
|
||||
/** Min SeqId to keep during a major compaction **/
|
||||
public long minSeqIdToKeep = 0;
|
||||
}
|
||||
|
||||
/**
|
||||
* Extracts some details about the files to compact that are commonly needed by compactors.
|
||||
* @param filesToCompact Files.
|
||||
* @param calculatePutTs Whether earliest put TS is needed.
|
||||
* @param allFiles Whether all files are included for compaction
|
||||
* @return The result.
|
||||
*/
|
||||
protected FileDetails getFileDetails(
|
||||
Collection<StoreFile> filesToCompact, boolean calculatePutTs) throws IOException {
|
||||
Collection<StoreFile> filesToCompact, boolean allFiles) throws IOException {
|
||||
FileDetails fd = new FileDetails();
|
||||
long oldestHFileTimeStampToKeepMVCC = System.currentTimeMillis() -
|
||||
(1000L * 60 * 60 * 24 * this.keepSeqIdPeriod);
|
||||
|
||||
for (StoreFile file : filesToCompact) {
|
||||
if(allFiles && (file.getModificationTimeStamp() < oldestHFileTimeStampToKeepMVCC)) {
|
||||
// when isAllFiles is true, all files are compacted so we can calculate the smallest
|
||||
// MVCC value to keep
|
||||
if(fd.minSeqIdToKeep < file.getMaxMemstoreTS()) {
|
||||
fd.minSeqIdToKeep = file.getMaxMemstoreTS();
|
||||
}
|
||||
}
|
||||
long seqNum = file.getMaxSequenceId();
|
||||
fd.maxSeqId = Math.max(fd.maxSeqId, seqNum);
|
||||
StoreFile.Reader r = file.getReader();
|
||||
|
@ -130,7 +146,7 @@ public abstract class Compactor {
|
|||
// If required, calculate the earliest put timestamp of all involved storefiles.
|
||||
// This is used to remove family delete marker during compaction.
|
||||
long earliestPutTs = 0;
|
||||
if (calculatePutTs) {
|
||||
if (allFiles) {
|
||||
tmp = fileInfo.get(StoreFile.EARLIEST_PUT_TS);
|
||||
if (tmp == null) {
|
||||
// There's a file with no information, must be an old one
|
||||
|
@ -148,7 +164,7 @@ public abstract class Compactor {
|
|||
", size=" + StringUtils.humanReadableInt(r.length()) +
|
||||
", encoding=" + r.getHFileReader().getDataBlockEncoding() +
|
||||
", seqNum=" + seqNum +
|
||||
(calculatePutTs ? ", earliestPutTs=" + earliestPutTs: ""));
|
||||
(allFiles ? ", earliestPutTs=" + earliestPutTs: ""));
|
||||
}
|
||||
}
|
||||
return fd;
|
||||
|
@ -202,10 +218,11 @@ public abstract class Compactor {
|
|||
* @param scanner Where to read from.
|
||||
* @param writer Where to write to.
|
||||
* @param smallestReadPoint Smallest read point.
|
||||
* @param cleanSeqId When true, remove seqId(used to be mvcc) value which is <= smallestReadPoint
|
||||
* @return Whether compaction ended; false if it was interrupted for some reason.
|
||||
*/
|
||||
protected boolean performCompaction(InternalScanner scanner,
|
||||
CellSink writer, long smallestReadPoint) throws IOException {
|
||||
CellSink writer, long smallestReadPoint, boolean cleanSeqId) throws IOException {
|
||||
int bytesWritten = 0;
|
||||
// Since scanner.next() can return 'false' but still be delivering data,
|
||||
// we have to use a do/while loop.
|
||||
|
@ -218,8 +235,8 @@ public abstract class Compactor {
|
|||
// output to writer:
|
||||
for (Cell c : kvs) {
|
||||
KeyValue kv = KeyValueUtil.ensureKeyValue(c);
|
||||
if (kv.getMvccVersion() <= smallestReadPoint) {
|
||||
kv.setMvccVersion(0);
|
||||
if (cleanSeqId && kv.getSequenceId() <= smallestReadPoint) {
|
||||
kv.setSequenceId(0);
|
||||
}
|
||||
writer.append(kv);
|
||||
++progress.currentCompactedKVs;
|
||||
|
|
|
@ -54,6 +54,7 @@ public class DefaultCompactor extends Compactor {
|
|||
|
||||
StoreFile.Writer writer = null;
|
||||
List<Path> newFiles = new ArrayList<Path>();
|
||||
boolean cleanSeqId = false;
|
||||
try {
|
||||
InternalScanner scanner = null;
|
||||
try {
|
||||
|
@ -71,9 +72,13 @@ public class DefaultCompactor extends Compactor {
|
|||
}
|
||||
// Create the writer even if no kv(Empty store file is also ok),
|
||||
// because we need record the max seq id for the store file, see HBASE-6059
|
||||
if(fd.minSeqIdToKeep > 0) {
|
||||
smallestReadPoint = Math.min(fd.minSeqIdToKeep, smallestReadPoint);
|
||||
cleanSeqId = true;
|
||||
}
|
||||
writer = store.createWriterInTmp(fd.maxKeyCount, this.compactionCompression, true,
|
||||
fd.maxMVCCReadpoint >= smallestReadPoint, fd.maxTagsLength > 0);
|
||||
boolean finished = performCompaction(scanner, writer, smallestReadPoint);
|
||||
boolean finished = performCompaction(scanner, writer, smallestReadPoint, cleanSeqId);
|
||||
if (!finished) {
|
||||
writer.close();
|
||||
store.getFileSystem().delete(writer.getPath(), false);
|
||||
|
|
|
@ -90,6 +90,7 @@ public class StripeCompactor extends Compactor {
|
|||
|
||||
boolean finished = false;
|
||||
InternalScanner scanner = null;
|
||||
boolean cleanSeqId = false;
|
||||
try {
|
||||
// Get scanner to use.
|
||||
ScanType coprocScanType = ScanType.COMPACT_RETAIN_DELETES;
|
||||
|
@ -108,6 +109,10 @@ public class StripeCompactor extends Compactor {
|
|||
}
|
||||
|
||||
// Create the writer factory for compactions.
|
||||
if(fd.minSeqIdToKeep > 0) {
|
||||
smallestReadPoint = Math.min(fd.minSeqIdToKeep, smallestReadPoint);
|
||||
cleanSeqId = true;
|
||||
}
|
||||
final boolean needMvcc = fd.maxMVCCReadpoint >= smallestReadPoint;
|
||||
final Compression.Algorithm compression = store.getFamily().getCompactionCompression();
|
||||
StripeMultiFileWriter.WriterFactory factory = new StripeMultiFileWriter.WriterFactory() {
|
||||
|
@ -122,7 +127,7 @@ public class StripeCompactor extends Compactor {
|
|||
// It is ok here if storeScanner is null.
|
||||
StoreScanner storeScanner = (scanner instanceof StoreScanner) ? (StoreScanner)scanner : null;
|
||||
mw.init(storeScanner, factory, store.getComparator());
|
||||
finished = performCompaction(scanner, mw, smallestReadPoint);
|
||||
finished = performCompaction(scanner, mw, smallestReadPoint, cleanSeqId);
|
||||
if (!finished) {
|
||||
throw new InterruptedIOException( "Aborting compaction of store " + store +
|
||||
" in region " + store.getRegionInfo().getRegionNameAsString() +
|
||||
|
|
|
@ -91,9 +91,9 @@ class FSWALEntry extends HLog.Entry {
|
|||
*/
|
||||
long stampRegionSequenceId() {
|
||||
long regionSequenceId = this.regionSequenceIdReference.incrementAndGet();
|
||||
if(memstoreKVs != null && !memstoreKVs.isEmpty()) {
|
||||
if (!this.getEdit().isReplay() && memstoreKVs != null && !memstoreKVs.isEmpty()) {
|
||||
for(KeyValue kv : this.memstoreKVs){
|
||||
kv.setMvccVersion(regionSequenceId);
|
||||
kv.setSequenceId(regionSequenceId);
|
||||
}
|
||||
}
|
||||
HLogKey key = getKey();
|
||||
|
|
|
@ -119,6 +119,7 @@ public class HLogKey implements WritableComparable<HLogKey>, SequenceNumber {
|
|||
private byte [] encodedRegionName;
|
||||
private TableName tablename;
|
||||
private long logSeqNum;
|
||||
private long origLogSeqNum = 0;
|
||||
private CountDownLatch seqNumAssignedLatch = new CountDownLatch(1);
|
||||
// Time at which this edit was written.
|
||||
private long writeTime;
|
||||
|
@ -255,6 +256,22 @@ public class HLogKey implements WritableComparable<HLogKey>, SequenceNumber {
|
|||
this.seqNumAssignedLatch.countDown();
|
||||
}
|
||||
|
||||
/**
|
||||
* Used to set original seq Id for HLogKey during wal replay
|
||||
* @param seqId
|
||||
*/
|
||||
public void setOrigLogSeqNum(final long seqId) {
|
||||
this.origLogSeqNum = seqId;
|
||||
}
|
||||
|
||||
/**
|
||||
* Return a positive long if current HLogKey is created from a replay edit
|
||||
* @return original sequence number of the WALEdit
|
||||
*/
|
||||
public long getOrigLogSeqNum() {
|
||||
return this.origLogSeqNum;
|
||||
}
|
||||
|
||||
/**
|
||||
* Wait for sequence number is assigned & return the assigned value
|
||||
* @return long the new assigned sequence number
|
||||
|
@ -536,6 +553,9 @@ public class HLogKey implements WritableComparable<HLogKey>, SequenceNumber {
|
|||
}
|
||||
builder.setLogSequenceNumber(this.logSeqNum);
|
||||
builder.setWriteTime(writeTime);
|
||||
if(this.origLogSeqNum > 0) {
|
||||
builder.setOrigSequenceNumber(this.origLogSeqNum);
|
||||
}
|
||||
if (this.nonce != HConstants.NO_NONCE) {
|
||||
builder.setNonce(nonce);
|
||||
}
|
||||
|
@ -599,5 +619,8 @@ public class HLogKey implements WritableComparable<HLogKey>, SequenceNumber {
|
|||
}
|
||||
this.logSeqNum = walKey.getLogSequenceNumber();
|
||||
this.writeTime = walKey.getWriteTime();
|
||||
if(walKey.hasOrigSequenceNumber()) {
|
||||
this.origLogSeqNum = walKey.getOrigSequenceNumber();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -73,6 +73,7 @@ import org.apache.hadoop.hbase.Tag;
|
|||
import org.apache.hadoop.hbase.TagType;
|
||||
import org.apache.hadoop.hbase.client.ConnectionUtils;
|
||||
import org.apache.hadoop.hbase.client.Delete;
|
||||
import org.apache.hadoop.hbase.client.Durability;
|
||||
import org.apache.hadoop.hbase.client.HConnection;
|
||||
import org.apache.hadoop.hbase.client.HConnectionManager;
|
||||
import org.apache.hadoop.hbase.client.Mutation;
|
||||
|
@ -1863,6 +1864,10 @@ public class HLogSplitter {
|
|||
public MutationReplay(MutationType type, Mutation mutation, long nonceGroup, long nonce) {
|
||||
this.type = type;
|
||||
this.mutation = mutation;
|
||||
if(this.mutation.getDurability() != Durability.SKIP_WAL) {
|
||||
// using ASYNC_WAL for relay
|
||||
this.mutation.setDurability(Durability.ASYNC_WAL);
|
||||
}
|
||||
this.nonceGroup = nonceGroup;
|
||||
this.nonce = nonce;
|
||||
}
|
||||
|
@ -1875,10 +1880,10 @@ public class HLogSplitter {
|
|||
|
||||
/**
|
||||
* Tag original sequence number for each edit to be replayed
|
||||
* @param entry
|
||||
* @param seqId
|
||||
* @param cell
|
||||
*/
|
||||
private static Cell tagReplayLogSequenceNumber(WALEntry entry, Cell cell) {
|
||||
private static Cell tagReplayLogSequenceNumber(long seqId, Cell cell) {
|
||||
// Tag puts with original sequence number if there is no LOG_REPLAY_TAG yet
|
||||
boolean needAddRecoveryTag = true;
|
||||
if (cell.getTagsLength() > 0) {
|
||||
|
@ -1891,8 +1896,7 @@ public class HLogSplitter {
|
|||
}
|
||||
if (needAddRecoveryTag) {
|
||||
List<Tag> newTags = new ArrayList<Tag>();
|
||||
Tag replayTag = new Tag(TagType.LOG_REPLAY_TAG_TYPE, Bytes.toBytes(entry.getKey()
|
||||
.getLogSequenceNumber()));
|
||||
Tag replayTag = new Tag(TagType.LOG_REPLAY_TAG_TYPE, Bytes.toBytes(seqId));
|
||||
newTags.add(replayTag);
|
||||
return KeyValue.cloneAndAddTags(cell, newTags);
|
||||
}
|
||||
|
@ -1918,6 +1922,8 @@ public class HLogSplitter {
|
|||
return new ArrayList<MutationReplay>();
|
||||
}
|
||||
|
||||
long replaySeqId = (entry.getKey().hasOrigSequenceNumber()) ?
|
||||
entry.getKey().getOrigSequenceNumber() : entry.getKey().getLogSequenceNumber();
|
||||
int count = entry.getAssociatedCellCount();
|
||||
List<MutationReplay> mutations = new ArrayList<MutationReplay>();
|
||||
Cell previousCell = null;
|
||||
|
@ -1958,7 +1964,7 @@ public class HLogSplitter {
|
|||
} else {
|
||||
Cell tmpNewCell = cell;
|
||||
if (addLogReplayTag) {
|
||||
tmpNewCell = tagReplayLogSequenceNumber(entry, cell);
|
||||
tmpNewCell = tagReplayLogSequenceNumber(replaySeqId, cell);
|
||||
}
|
||||
((Put) m).add(KeyValueUtil.ensureKeyValue(tmpNewCell));
|
||||
}
|
||||
|
@ -1973,8 +1979,8 @@ public class HLogSplitter {
|
|||
clusterIds.add(new UUID(uuid.getMostSigBits(), uuid.getLeastSigBits()));
|
||||
}
|
||||
key = new HLogKey(walKey.getEncodedRegionName().toByteArray(), TableName.valueOf(walKey
|
||||
.getTableName().toByteArray()), walKey.getLogSequenceNumber(),
|
||||
walKey.getWriteTime(), clusterIds, walKey.getNonceGroup(), walKey.getNonce());
|
||||
.getTableName().toByteArray()), replaySeqId, walKey.getWriteTime(), clusterIds,
|
||||
walKey.getNonceGroup(), walKey.getNonce());
|
||||
logEntry.setFirst(key);
|
||||
logEntry.setSecond(val);
|
||||
}
|
||||
|
|
|
@ -1906,7 +1906,7 @@ public class AccessController extends BaseRegionObserver
|
|||
newKv.getValueArray(), newKv.getValueOffset(), newKv.getValueLength(),
|
||||
tags);
|
||||
// Preserve mvcc data
|
||||
rewriteKv.setMvccVersion(newKv.getMvccVersion());
|
||||
rewriteKv.setSequenceId(newKv.getMvccVersion());
|
||||
return rewriteKv;
|
||||
}
|
||||
|
||||
|
|
|
@ -1255,7 +1255,7 @@ public class VisibilityController extends BaseRegionObserver implements MasterOb
|
|||
newKv.getTimestamp(), KeyValue.Type.codeToType(newKv.getTypeByte()),
|
||||
newKv.getValueArray(), newKv.getValueOffset(), newKv.getValueLength(), tags);
|
||||
// Preserve mvcc data
|
||||
rewriteKv.setMvccVersion(newKv.getMvccVersion());
|
||||
rewriteKv.setSequenceId(newKv.getMvccVersion());
|
||||
return rewriteKv;
|
||||
}
|
||||
|
||||
|
|
|
@ -3055,7 +3055,7 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility {
|
|||
}
|
||||
}
|
||||
|
||||
private static <T> String safeGetAsStr(List<T> lst, int i) {
|
||||
public static <T> String safeGetAsStr(List<T> lst, int i) {
|
||||
if (0 <= i && i < lst.size()) {
|
||||
return lst.get(i).toString();
|
||||
} else {
|
||||
|
|
|
@ -176,7 +176,7 @@ public class TestHFileBlock {
|
|||
totalSize += kv.getLength();
|
||||
if (includesMemstoreTS) {
|
||||
long memstoreTS = randomizer.nextLong();
|
||||
kv.setMvccVersion(memstoreTS);
|
||||
kv.setSequenceId(memstoreTS);
|
||||
totalSize += WritableUtils.getVIntSize(memstoreTS);
|
||||
}
|
||||
hbw.write(kv);
|
||||
|
|
|
@ -241,7 +241,7 @@ public class TestDefaultMemStore extends TestCase {
|
|||
mvcc.beginMemstoreInsertWithSeqNum(this.startSeqNum.incrementAndGet());
|
||||
|
||||
KeyValue kv1 = new KeyValue(row, f, q1, v);
|
||||
kv1.setMvccVersion(w.getWriteNumber());
|
||||
kv1.setSequenceId(w.getWriteNumber());
|
||||
memstore.add(kv1);
|
||||
|
||||
KeyValueScanner s = this.memstore.getScanners(mvcc.memstoreReadPoint()).get(0);
|
||||
|
@ -254,7 +254,7 @@ public class TestDefaultMemStore extends TestCase {
|
|||
|
||||
w = mvcc.beginMemstoreInsertWithSeqNum(this.startSeqNum.incrementAndGet());
|
||||
KeyValue kv2 = new KeyValue(row, f, q2, v);
|
||||
kv2.setMvccVersion(w.getWriteNumber());
|
||||
kv2.setSequenceId(w.getWriteNumber());
|
||||
memstore.add(kv2);
|
||||
|
||||
s = this.memstore.getScanners(mvcc.memstoreReadPoint()).get(0);
|
||||
|
@ -285,11 +285,11 @@ public class TestDefaultMemStore extends TestCase {
|
|||
mvcc.beginMemstoreInsertWithSeqNum(this.startSeqNum.incrementAndGet());
|
||||
|
||||
KeyValue kv11 = new KeyValue(row, f, q1, v1);
|
||||
kv11.setMvccVersion(w.getWriteNumber());
|
||||
kv11.setSequenceId(w.getWriteNumber());
|
||||
memstore.add(kv11);
|
||||
|
||||
KeyValue kv12 = new KeyValue(row, f, q2, v1);
|
||||
kv12.setMvccVersion(w.getWriteNumber());
|
||||
kv12.setSequenceId(w.getWriteNumber());
|
||||
memstore.add(kv12);
|
||||
mvcc.completeMemstoreInsert(w);
|
||||
|
||||
|
@ -300,11 +300,11 @@ public class TestDefaultMemStore extends TestCase {
|
|||
// START INSERT 2: Write both columns val2
|
||||
w = mvcc.beginMemstoreInsertWithSeqNum(this.startSeqNum.incrementAndGet());
|
||||
KeyValue kv21 = new KeyValue(row, f, q1, v2);
|
||||
kv21.setMvccVersion(w.getWriteNumber());
|
||||
kv21.setSequenceId(w.getWriteNumber());
|
||||
memstore.add(kv21);
|
||||
|
||||
KeyValue kv22 = new KeyValue(row, f, q2, v2);
|
||||
kv22.setMvccVersion(w.getWriteNumber());
|
||||
kv22.setSequenceId(w.getWriteNumber());
|
||||
memstore.add(kv22);
|
||||
|
||||
// BEFORE COMPLETING INSERT 2, SEE FIRST KVS
|
||||
|
@ -337,11 +337,11 @@ public class TestDefaultMemStore extends TestCase {
|
|||
mvcc.beginMemstoreInsertWithSeqNum(this.startSeqNum.incrementAndGet());
|
||||
|
||||
KeyValue kv11 = new KeyValue(row, f, q1, v1);
|
||||
kv11.setMvccVersion(w.getWriteNumber());
|
||||
kv11.setSequenceId(w.getWriteNumber());
|
||||
memstore.add(kv11);
|
||||
|
||||
KeyValue kv12 = new KeyValue(row, f, q2, v1);
|
||||
kv12.setMvccVersion(w.getWriteNumber());
|
||||
kv12.setSequenceId(w.getWriteNumber());
|
||||
memstore.add(kv12);
|
||||
mvcc.completeMemstoreInsert(w);
|
||||
|
||||
|
@ -353,7 +353,7 @@ public class TestDefaultMemStore extends TestCase {
|
|||
w = mvcc.beginMemstoreInsertWithSeqNum(this.startSeqNum.incrementAndGet());
|
||||
KeyValue kvDel = new KeyValue(row, f, q2, kv11.getTimestamp(),
|
||||
KeyValue.Type.DeleteColumn);
|
||||
kvDel.setMvccVersion(w.getWriteNumber());
|
||||
kvDel.setSequenceId(w.getWriteNumber());
|
||||
memstore.add(kvDel);
|
||||
|
||||
// BEFORE COMPLETING DELETE, SEE FIRST KVS
|
||||
|
@ -414,7 +414,7 @@ public class TestDefaultMemStore extends TestCase {
|
|||
byte[] v = Bytes.toBytes(i);
|
||||
|
||||
KeyValue kv = new KeyValue(row, f, q1, i, v);
|
||||
kv.setMvccVersion(w.getWriteNumber());
|
||||
kv.setSequenceId(w.getWriteNumber());
|
||||
memstore.add(kv);
|
||||
mvcc.completeMemstoreInsert(w);
|
||||
|
||||
|
@ -827,7 +827,7 @@ public class TestDefaultMemStore extends TestCase {
|
|||
KeyValue kv2 = KeyValueTestUtil.create("r", "f", "q", 101, "v");
|
||||
KeyValue kv3 = KeyValueTestUtil.create("r", "f", "q", 102, "v");
|
||||
|
||||
kv1.setMvccVersion(1); kv2.setMvccVersion(1);kv3.setMvccVersion(1);
|
||||
kv1.setSequenceId(1); kv2.setSequenceId(1);kv3.setSequenceId(1);
|
||||
l.add(kv1); l.add(kv2); l.add(kv3);
|
||||
|
||||
this.memstore.upsert(l, 2);// readpoint is 2
|
||||
|
@ -835,7 +835,7 @@ public class TestDefaultMemStore extends TestCase {
|
|||
assert(newSize > oldSize);
|
||||
|
||||
KeyValue kv4 = KeyValueTestUtil.create("r", "f", "q", 104, "v");
|
||||
kv4.setMvccVersion(1);
|
||||
kv4.setSequenceId(1);
|
||||
l.clear(); l.add(kv4);
|
||||
this.memstore.upsert(l, 3);
|
||||
assertEquals(newSize, this.memstore.size.get());
|
||||
|
@ -877,7 +877,7 @@ public class TestDefaultMemStore extends TestCase {
|
|||
// test the case that the timeOfOldestEdit is updated after a KV upsert
|
||||
List<Cell> l = new ArrayList<Cell>();
|
||||
KeyValue kv1 = KeyValueTestUtil.create("r", "f", "q", 100, "v");
|
||||
kv1.setMvccVersion(100);
|
||||
kv1.setSequenceId(100);
|
||||
l.add(kv1);
|
||||
memstore.upsert(l, 1000);
|
||||
t = memstore.timeOfOldestEdit();
|
||||
|
|
|
@ -528,6 +528,7 @@ public class TestHRegion {
|
|||
}
|
||||
long seqId = region.replayRecoveredEditsIfAny(regiondir, maxSeqIdInStores, null, status);
|
||||
assertEquals(maxSeqId, seqId);
|
||||
region.getMVCC().initialize(seqId);
|
||||
Get get = new Get(row);
|
||||
Result result = region.get(get);
|
||||
for (long i = minSeqId; i <= maxSeqId; i += 10) {
|
||||
|
@ -579,6 +580,7 @@ public class TestHRegion {
|
|||
}
|
||||
long seqId = region.replayRecoveredEditsIfAny(regiondir, maxSeqIdInStores, null, status);
|
||||
assertEquals(maxSeqId, seqId);
|
||||
region.getMVCC().initialize(seqId);
|
||||
Get get = new Get(row);
|
||||
Result result = region.get(get);
|
||||
for (long i = minSeqId; i <= maxSeqId; i += 10) {
|
||||
|
|
|
@ -689,7 +689,7 @@ public class TestReversibleScanners {
|
|||
private static KeyValue makeKV(int rowNum, int cqNum, byte[] familyName) {
|
||||
KeyValue kv = new KeyValue(ROWS[rowNum], familyName, QUALS[cqNum], TS,
|
||||
VALUES[rowNum % VALUESIZE]);
|
||||
kv.setMvccVersion(makeMVCC(rowNum, cqNum));
|
||||
kv.setSequenceId(makeMVCC(rowNum, cqNum));
|
||||
return kv;
|
||||
}
|
||||
|
||||
|
|
|
@ -18,7 +18,6 @@
|
|||
*/
|
||||
package org.apache.hadoop.hbase.regionserver;
|
||||
|
||||
import static org.apache.hadoop.hbase.HBaseTestingUtility.assertKVListsEqual;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
import java.io.IOException;
|
||||
|
@ -449,5 +448,32 @@ public class TestSeekOptimizations {
|
|||
}
|
||||
|
||||
|
||||
public void assertKVListsEqual(String additionalMsg,
|
||||
final List<? extends Cell> expected,
|
||||
final List<? extends Cell> actual) {
|
||||
final int eLen = expected.size();
|
||||
final int aLen = actual.size();
|
||||
final int minLen = Math.min(eLen, aLen);
|
||||
|
||||
int i;
|
||||
for (i = 0; i < minLen
|
||||
&& KeyValue.COMPARATOR.compareOnlyKeyPortion(expected.get(i), actual.get(i)) == 0;
|
||||
++i) {}
|
||||
|
||||
if (additionalMsg == null) {
|
||||
additionalMsg = "";
|
||||
}
|
||||
if (!additionalMsg.isEmpty()) {
|
||||
additionalMsg = ". " + additionalMsg;
|
||||
}
|
||||
|
||||
if (eLen != aLen || i != minLen) {
|
||||
throw new AssertionError(
|
||||
"Expected and actual KV arrays differ at position " + i + ": " +
|
||||
HBaseTestingUtility.safeGetAsStr(expected, i) + " (length " + eLen +") vs. " +
|
||||
HBaseTestingUtility.safeGetAsStr(actual, i) + " (length " + aLen + ")" + additionalMsg);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue