HBASE-11569 Flush / Compaction handling from secondary region replicas

Conflicts:
	hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
	hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
This commit is contained in:
Enis Soztutar 2015-02-13 11:08:24 -08:00
parent 911432a7cb
commit 36e76c4be6
19 changed files with 2442 additions and 117 deletions

View File

@ -2586,6 +2586,7 @@ public final class ProtobufUtil {
FlushDescriptor.Builder desc = FlushDescriptor.newBuilder()
.setAction(action)
.setEncodedRegionName(ByteStringer.wrap(hri.getEncodedNameAsBytes()))
.setRegionName(ByteStringer.wrap(hri.getRegionName()))
.setFlushSequenceNumber(flushSeqId)
.setTableName(ByteStringer.wrap(hri.getTable().getName()));
@ -2611,6 +2612,7 @@ public final class ProtobufUtil {
.setEventType(eventType)
.setTableName(ByteStringer.wrap(hri.getTable().getName()))
.setEncodedRegionName(ByteStringer.wrap(hri.getEncodedNameAsBytes()))
.setRegionName(ByteStringer.wrap(hri.getRegionName()))
.setLogSequenceNumber(seqId)
.setServer(toServerName(server));

View File

@ -297,6 +297,7 @@ public class KeyValue implements Cell, HeapSize, Cloneable, SettableSequenceId,
return seqId;
}
@Override
public void setSequenceId(long seqId) {
this.seqId = seqId;
}
@ -1174,6 +1175,7 @@ public class KeyValue implements Cell, HeapSize, Cloneable, SettableSequenceId,
//
//---------------------------------------------------------------------------
@Override
public String toString() {
if (this.bytes == null || this.bytes.length == 0) {
return "empty";
@ -1523,6 +1525,7 @@ public class KeyValue implements Cell, HeapSize, Cloneable, SettableSequenceId,
* save on allocations.
* @return Value in a new byte array.
*/
@Override
@Deprecated // use CellUtil.getValueArray()
public byte [] getValue() {
return CellUtil.cloneValue(this);
@ -1536,6 +1539,7 @@ public class KeyValue implements Cell, HeapSize, Cloneable, SettableSequenceId,
* lengths instead.
* @return Row in a new byte array.
*/
@Override
@Deprecated // use CellUtil.getRowArray()
public byte [] getRow() {
return CellUtil.cloneRow(this);
@ -1593,6 +1597,7 @@ public class KeyValue implements Cell, HeapSize, Cloneable, SettableSequenceId,
* lengths instead.
* @return Returns family. Makes a copy.
*/
@Override
@Deprecated // use CellUtil.getFamilyArray
public byte [] getFamily() {
return CellUtil.cloneFamily(this);
@ -1607,6 +1612,7 @@ public class KeyValue implements Cell, HeapSize, Cloneable, SettableSequenceId,
* Use {@link #getBuffer()} with appropriate offsets and lengths instead.
* @return Returns qualifier. Makes a copy.
*/
@Override
@Deprecated // use CellUtil.getQualifierArray
public byte [] getQualifier() {
return CellUtil.cloneQualifier(this);
@ -2537,6 +2543,7 @@ public class KeyValue implements Cell, HeapSize, Cloneable, SettableSequenceId,
this.comparator = c;
}
@Override
public int compare(KeyValue left, KeyValue right) {
return comparator.compareRows(left, right);
}
@ -2568,6 +2575,7 @@ public class KeyValue implements Cell, HeapSize, Cloneable, SettableSequenceId,
* TODO: With V3 consider removing this.
* @return legacy class name for FileFileTrailer#comparatorClassName
*/
@Override
public String getLegacyKeyComparatorName() {
return "org.apache.hadoop.hbase.util.Bytes$ByteArrayComparator";
}
@ -2575,6 +2583,7 @@ public class KeyValue implements Cell, HeapSize, Cloneable, SettableSequenceId,
/**
* @deprecated Since 0.99.2.
*/
@Override
@Deprecated
public int compareFlatKey(byte[] left, int loffset, int llength, byte[] right,
int roffset, int rlength) {
@ -2586,6 +2595,7 @@ public class KeyValue implements Cell, HeapSize, Cloneable, SettableSequenceId,
return compareOnlyKeyPortion(left, right);
}
@Override
@VisibleForTesting
public int compareOnlyKeyPortion(Cell left, Cell right) {
int c = Bytes.BYTES_RAWCOMPARATOR.compare(left.getRowArray(), left.getRowOffset(),
@ -2612,6 +2622,7 @@ public class KeyValue implements Cell, HeapSize, Cloneable, SettableSequenceId,
return (0xff & left.getTypeByte()) - (0xff & right.getTypeByte());
}
@Override
public byte[] calcIndexKey(byte[] lastKeyOfPreviousBlock, byte[] firstKeyInBlock) {
return firstKeyInBlock;
}

View File

@ -5522,6 +5522,24 @@ public final class WALProtos {
*/
org.apache.hadoop.hbase.protobuf.generated.WALProtos.FlushDescriptor.StoreFlushDescriptorOrBuilder getStoreFlushesOrBuilder(
int index);
// optional bytes region_name = 6;
/**
* <code>optional bytes region_name = 6;</code>
*
* <pre>
* full region name
* </pre>
*/
boolean hasRegionName();
/**
* <code>optional bytes region_name = 6;</code>
*
* <pre>
* full region name
* </pre>
*/
com.google.protobuf.ByteString getRegionName();
}
/**
* Protobuf type {@code FlushDescriptor}
@ -5613,6 +5631,11 @@ public final class WALProtos {
storeFlushes_.add(input.readMessage(org.apache.hadoop.hbase.protobuf.generated.WALProtos.FlushDescriptor.StoreFlushDescriptor.PARSER, extensionRegistry));
break;
}
case 50: {
bitField0_ |= 0x00000010;
regionName_ = input.readBytes();
break;
}
}
}
} catch (com.google.protobuf.InvalidProtocolBufferException e) {
@ -6772,12 +6795,37 @@ public final class WALProtos {
return storeFlushes_.get(index);
}
// optional bytes region_name = 6;
public static final int REGION_NAME_FIELD_NUMBER = 6;
private com.google.protobuf.ByteString regionName_;
/**
* <code>optional bytes region_name = 6;</code>
*
* <pre>
* full region name
* </pre>
*/
public boolean hasRegionName() {
return ((bitField0_ & 0x00000010) == 0x00000010);
}
/**
* <code>optional bytes region_name = 6;</code>
*
* <pre>
* full region name
* </pre>
*/
public com.google.protobuf.ByteString getRegionName() {
return regionName_;
}
private void initFields() {
action_ = org.apache.hadoop.hbase.protobuf.generated.WALProtos.FlushDescriptor.FlushAction.START_FLUSH;
tableName_ = com.google.protobuf.ByteString.EMPTY;
encodedRegionName_ = com.google.protobuf.ByteString.EMPTY;
flushSequenceNumber_ = 0L;
storeFlushes_ = java.util.Collections.emptyList();
regionName_ = com.google.protobuf.ByteString.EMPTY;
}
private byte memoizedIsInitialized = -1;
public final boolean isInitialized() {
@ -6824,6 +6872,9 @@ public final class WALProtos {
for (int i = 0; i < storeFlushes_.size(); i++) {
output.writeMessage(5, storeFlushes_.get(i));
}
if (((bitField0_ & 0x00000010) == 0x00000010)) {
output.writeBytes(6, regionName_);
}
getUnknownFields().writeTo(output);
}
@ -6853,6 +6904,10 @@ public final class WALProtos {
size += com.google.protobuf.CodedOutputStream
.computeMessageSize(5, storeFlushes_.get(i));
}
if (((bitField0_ & 0x00000010) == 0x00000010)) {
size += com.google.protobuf.CodedOutputStream
.computeBytesSize(6, regionName_);
}
size += getUnknownFields().getSerializedSize();
memoizedSerializedSize = size;
return size;
@ -6898,6 +6953,11 @@ public final class WALProtos {
}
result = result && getStoreFlushesList()
.equals(other.getStoreFlushesList());
result = result && (hasRegionName() == other.hasRegionName());
if (hasRegionName()) {
result = result && getRegionName()
.equals(other.getRegionName());
}
result = result &&
getUnknownFields().equals(other.getUnknownFields());
return result;
@ -6931,6 +6991,10 @@ public final class WALProtos {
hash = (37 * hash) + STORE_FLUSHES_FIELD_NUMBER;
hash = (53 * hash) + getStoreFlushesList().hashCode();
}
if (hasRegionName()) {
hash = (37 * hash) + REGION_NAME_FIELD_NUMBER;
hash = (53 * hash) + getRegionName().hashCode();
}
hash = (29 * hash) + getUnknownFields().hashCode();
memoizedHashCode = hash;
return hash;
@ -7060,6 +7124,8 @@ public final class WALProtos {
} else {
storeFlushesBuilder_.clear();
}
regionName_ = com.google.protobuf.ByteString.EMPTY;
bitField0_ = (bitField0_ & ~0x00000020);
return this;
}
@ -7113,6 +7179,10 @@ public final class WALProtos {
} else {
result.storeFlushes_ = storeFlushesBuilder_.build();
}
if (((from_bitField0_ & 0x00000020) == 0x00000020)) {
to_bitField0_ |= 0x00000010;
}
result.regionName_ = regionName_;
result.bitField0_ = to_bitField0_;
onBuilt();
return result;
@ -7167,6 +7237,9 @@ public final class WALProtos {
}
}
}
if (other.hasRegionName()) {
setRegionName(other.getRegionName());
}
this.mergeUnknownFields(other.getUnknownFields());
return this;
}
@ -7593,6 +7666,58 @@ public final class WALProtos {
return storeFlushesBuilder_;
}
// optional bytes region_name = 6;
private com.google.protobuf.ByteString regionName_ = com.google.protobuf.ByteString.EMPTY;
/**
* <code>optional bytes region_name = 6;</code>
*
* <pre>
* full region name
* </pre>
*/
public boolean hasRegionName() {
return ((bitField0_ & 0x00000020) == 0x00000020);
}
/**
* <code>optional bytes region_name = 6;</code>
*
* <pre>
* full region name
* </pre>
*/
public com.google.protobuf.ByteString getRegionName() {
return regionName_;
}
/**
* <code>optional bytes region_name = 6;</code>
*
* <pre>
* full region name
* </pre>
*/
public Builder setRegionName(com.google.protobuf.ByteString value) {
if (value == null) {
throw new NullPointerException();
}
bitField0_ |= 0x00000020;
regionName_ = value;
onChanged();
return this;
}
/**
* <code>optional bytes region_name = 6;</code>
*
* <pre>
* full region name
* </pre>
*/
public Builder clearRegionName() {
bitField0_ = (bitField0_ & ~0x00000020);
regionName_ = getDefaultInstance().getRegionName();
onChanged();
return this;
}
// @@protoc_insertion_point(builder_scope:FlushDescriptor)
}
@ -9772,6 +9897,24 @@ public final class WALProtos {
* </pre>
*/
org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerNameOrBuilder getServerOrBuilder();
// optional bytes region_name = 7;
/**
* <code>optional bytes region_name = 7;</code>
*
* <pre>
* full region name
* </pre>
*/
boolean hasRegionName();
/**
* <code>optional bytes region_name = 7;</code>
*
* <pre>
* full region name
* </pre>
*/
com.google.protobuf.ByteString getRegionName();
}
/**
* Protobuf type {@code RegionEventDescriptor}
@ -9876,6 +10019,11 @@ public final class WALProtos {
bitField0_ |= 0x00000010;
break;
}
case 58: {
bitField0_ |= 0x00000020;
regionName_ = input.readBytes();
break;
}
}
}
} catch (com.google.protobuf.InvalidProtocolBufferException e) {
@ -10135,6 +10283,30 @@ public final class WALProtos {
return server_;
}
// optional bytes region_name = 7;
public static final int REGION_NAME_FIELD_NUMBER = 7;
private com.google.protobuf.ByteString regionName_;
/**
* <code>optional bytes region_name = 7;</code>
*
* <pre>
* full region name
* </pre>
*/
public boolean hasRegionName() {
return ((bitField0_ & 0x00000020) == 0x00000020);
}
/**
* <code>optional bytes region_name = 7;</code>
*
* <pre>
* full region name
* </pre>
*/
public com.google.protobuf.ByteString getRegionName() {
return regionName_;
}
private void initFields() {
eventType_ = org.apache.hadoop.hbase.protobuf.generated.WALProtos.RegionEventDescriptor.EventType.REGION_OPEN;
tableName_ = com.google.protobuf.ByteString.EMPTY;
@ -10142,6 +10314,7 @@ public final class WALProtos {
logSequenceNumber_ = 0L;
stores_ = java.util.Collections.emptyList();
server_ = org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName.getDefaultInstance();
regionName_ = com.google.protobuf.ByteString.EMPTY;
}
private byte memoizedIsInitialized = -1;
public final boolean isInitialized() {
@ -10197,6 +10370,9 @@ public final class WALProtos {
if (((bitField0_ & 0x00000010) == 0x00000010)) {
output.writeMessage(6, server_);
}
if (((bitField0_ & 0x00000020) == 0x00000020)) {
output.writeBytes(7, regionName_);
}
getUnknownFields().writeTo(output);
}
@ -10230,6 +10406,10 @@ public final class WALProtos {
size += com.google.protobuf.CodedOutputStream
.computeMessageSize(6, server_);
}
if (((bitField0_ & 0x00000020) == 0x00000020)) {
size += com.google.protobuf.CodedOutputStream
.computeBytesSize(7, regionName_);
}
size += getUnknownFields().getSerializedSize();
memoizedSerializedSize = size;
return size;
@ -10280,6 +10460,11 @@ public final class WALProtos {
result = result && getServer()
.equals(other.getServer());
}
result = result && (hasRegionName() == other.hasRegionName());
if (hasRegionName()) {
result = result && getRegionName()
.equals(other.getRegionName());
}
result = result &&
getUnknownFields().equals(other.getUnknownFields());
return result;
@ -10317,6 +10502,10 @@ public final class WALProtos {
hash = (37 * hash) + SERVER_FIELD_NUMBER;
hash = (53 * hash) + getServer().hashCode();
}
if (hasRegionName()) {
hash = (37 * hash) + REGION_NAME_FIELD_NUMBER;
hash = (53 * hash) + getRegionName().hashCode();
}
hash = (29 * hash) + getUnknownFields().hashCode();
memoizedHashCode = hash;
return hash;
@ -10453,6 +10642,8 @@ public final class WALProtos {
serverBuilder_.clear();
}
bitField0_ = (bitField0_ & ~0x00000020);
regionName_ = com.google.protobuf.ByteString.EMPTY;
bitField0_ = (bitField0_ & ~0x00000040);
return this;
}
@ -10514,6 +10705,10 @@ public final class WALProtos {
} else {
result.server_ = serverBuilder_.build();
}
if (((from_bitField0_ & 0x00000040) == 0x00000040)) {
to_bitField0_ |= 0x00000020;
}
result.regionName_ = regionName_;
result.bitField0_ = to_bitField0_;
onBuilt();
return result;
@ -10571,6 +10766,9 @@ public final class WALProtos {
if (other.hasServer()) {
mergeServer(other.getServer());
}
if (other.hasRegionName()) {
setRegionName(other.getRegionName());
}
this.mergeUnknownFields(other.getUnknownFields());
return this;
}
@ -11156,6 +11354,58 @@ public final class WALProtos {
return serverBuilder_;
}
// optional bytes region_name = 7;
private com.google.protobuf.ByteString regionName_ = com.google.protobuf.ByteString.EMPTY;
/**
* <code>optional bytes region_name = 7;</code>
*
* <pre>
* full region name
* </pre>
*/
public boolean hasRegionName() {
return ((bitField0_ & 0x00000040) == 0x00000040);
}
/**
* <code>optional bytes region_name = 7;</code>
*
* <pre>
* full region name
* </pre>
*/
public com.google.protobuf.ByteString getRegionName() {
return regionName_;
}
/**
* <code>optional bytes region_name = 7;</code>
*
* <pre>
* full region name
* </pre>
*/
public Builder setRegionName(com.google.protobuf.ByteString value) {
if (value == null) {
throw new NullPointerException();
}
bitField0_ |= 0x00000040;
regionName_ = value;
onChanged();
return this;
}
/**
* <code>optional bytes region_name = 7;</code>
*
* <pre>
* full region name
* </pre>
*/
public Builder clearRegionName() {
bitField0_ = (bitField0_ & ~0x00000040);
regionName_ = getDefaultInstance().getRegionName();
onChanged();
return this;
}
// @@protoc_insertion_point(builder_scope:RegionEventDescriptor)
}
@ -11598,32 +11848,33 @@ public final class WALProtos {
"n_name\030\002 \002(\014\022\023\n\013family_name\030\003 \002(\014\022\030\n\020com" +
"paction_input\030\004 \003(\t\022\031\n\021compaction_output" +
"\030\005 \003(\t\022\026\n\016store_home_dir\030\006 \002(\t\022\023\n\013region" +
"_name\030\007 \001(\014\"\353\002\n\017FlushDescriptor\022,\n\006actio" +
"_name\030\007 \001(\014\"\200\003\n\017FlushDescriptor\022,\n\006actio" +
"n\030\001 \002(\0162\034.FlushDescriptor.FlushAction\022\022\n",
"\ntable_name\030\002 \002(\014\022\033\n\023encoded_region_name" +
"\030\003 \002(\014\022\035\n\025flush_sequence_number\030\004 \001(\004\022<\n" +
"\rstore_flushes\030\005 \003(\0132%.FlushDescriptor.S" +
"toreFlushDescriptor\032Y\n\024StoreFlushDescrip" +
"tor\022\023\n\013family_name\030\001 \002(\014\022\026\n\016store_home_d" +
"ir\030\002 \002(\t\022\024\n\014flush_output\030\003 \003(\t\"A\n\013FlushA" +
"ction\022\017\n\013START_FLUSH\020\000\022\020\n\014COMMIT_FLUSH\020\001" +
"\022\017\n\013ABORT_FLUSH\020\002\"R\n\017StoreDescriptor\022\023\n\013" +
"family_name\030\001 \002(\014\022\026\n\016store_home_dir\030\002 \002(" +
"\t\022\022\n\nstore_file\030\003 \003(\t\"\215\001\n\022BulkLoadDescri",
"ptor\022\036\n\ntable_name\030\001 \002(\0132\n.TableName\022\033\n\023" +
"encoded_region_name\030\002 \002(\014\022 \n\006stores\030\003 \003(" +
"\0132\020.StoreDescriptor\022\030\n\020bulkload_seq_num\030" +
"\004 \002(\003\"\212\002\n\025RegionEventDescriptor\0224\n\nevent" +
"_type\030\001 \002(\0162 .RegionEventDescriptor.Even" +
"tType\022\022\n\ntable_name\030\002 \002(\014\022\033\n\023encoded_reg" +
"ion_name\030\003 \002(\014\022\033\n\023log_sequence_number\030\004 " +
"\001(\004\022 \n\006stores\030\005 \003(\0132\020.StoreDescriptor\022\033\n" +
"\006server\030\006 \001(\0132\013.ServerName\".\n\tEventType\022" +
"\017\n\013REGION_OPEN\020\000\022\020\n\014REGION_CLOSE\020\001\"\014\n\nWA",
"LTrailer*F\n\tScopeType\022\033\n\027REPLICATION_SCO" +
"PE_LOCAL\020\000\022\034\n\030REPLICATION_SCOPE_GLOBAL\020\001" +
"B?\n*org.apache.hadoop.hbase.protobuf.gen" +
"eratedB\tWALProtosH\001\210\001\000\240\001\001"
"toreFlushDescriptor\022\023\n\013region_name\030\006 \001(\014" +
"\032Y\n\024StoreFlushDescriptor\022\023\n\013family_name\030" +
"\001 \002(\014\022\026\n\016store_home_dir\030\002 \002(\t\022\024\n\014flush_o" +
"utput\030\003 \003(\t\"A\n\013FlushAction\022\017\n\013START_FLUS" +
"H\020\000\022\020\n\014COMMIT_FLUSH\020\001\022\017\n\013ABORT_FLUSH\020\002\"R" +
"\n\017StoreDescriptor\022\023\n\013family_name\030\001 \002(\014\022\026" +
"\n\016store_home_dir\030\002 \002(\t\022\022\n\nstore_file\030\003 \003",
"(\t\"\215\001\n\022BulkLoadDescriptor\022\036\n\ntable_name\030" +
"\001 \002(\0132\n.TableName\022\033\n\023encoded_region_name" +
"\030\002 \002(\014\022 \n\006stores\030\003 \003(\0132\020.StoreDescriptor" +
"\022\030\n\020bulkload_seq_num\030\004 \002(\003\"\237\002\n\025RegionEve" +
"ntDescriptor\0224\n\nevent_type\030\001 \002(\0162 .Regio" +
"nEventDescriptor.EventType\022\022\n\ntable_name" +
"\030\002 \002(\014\022\033\n\023encoded_region_name\030\003 \002(\014\022\033\n\023l" +
"og_sequence_number\030\004 \001(\004\022 \n\006stores\030\005 \003(\013" +
"2\020.StoreDescriptor\022\033\n\006server\030\006 \001(\0132\013.Ser" +
"verName\022\023\n\013region_name\030\007 \001(\014\".\n\tEventTyp",
"e\022\017\n\013REGION_OPEN\020\000\022\020\n\014REGION_CLOSE\020\001\"\014\n\n" +
"WALTrailer*F\n\tScopeType\022\033\n\027REPLICATION_S" +
"COPE_LOCAL\020\000\022\034\n\030REPLICATION_SCOPE_GLOBAL" +
"\020\001B?\n*org.apache.hadoop.hbase.protobuf.g" +
"eneratedB\tWALProtosH\001\210\001\000\240\001\001"
};
com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@ -11659,7 +11910,7 @@ public final class WALProtos {
internal_static_FlushDescriptor_fieldAccessorTable = new
com.google.protobuf.GeneratedMessage.FieldAccessorTable(
internal_static_FlushDescriptor_descriptor,
new java.lang.String[] { "Action", "TableName", "EncodedRegionName", "FlushSequenceNumber", "StoreFlushes", });
new java.lang.String[] { "Action", "TableName", "EncodedRegionName", "FlushSequenceNumber", "StoreFlushes", "RegionName", });
internal_static_FlushDescriptor_StoreFlushDescriptor_descriptor =
internal_static_FlushDescriptor_descriptor.getNestedTypes().get(0);
internal_static_FlushDescriptor_StoreFlushDescriptor_fieldAccessorTable = new
@ -11683,7 +11934,7 @@ public final class WALProtos {
internal_static_RegionEventDescriptor_fieldAccessorTable = new
com.google.protobuf.GeneratedMessage.FieldAccessorTable(
internal_static_RegionEventDescriptor_descriptor,
new java.lang.String[] { "EventType", "TableName", "EncodedRegionName", "LogSequenceNumber", "Stores", "Server", });
new java.lang.String[] { "EventType", "TableName", "EncodedRegionName", "LogSequenceNumber", "Stores", "Server", "RegionName", });
internal_static_WALTrailer_descriptor =
getDescriptor().getMessageTypes().get(8);
internal_static_WALTrailer_fieldAccessorTable = new

View File

@ -122,6 +122,7 @@ message FlushDescriptor {
required bytes encoded_region_name = 3;
optional uint64 flush_sequence_number = 4;
repeated StoreFlushDescriptor store_flushes = 5;
optional bytes region_name = 6; // full region name
}
message StoreDescriptor {
@ -155,6 +156,7 @@ message RegionEventDescriptor {
optional uint64 log_sequence_number = 4;
repeated StoreDescriptor stores = 5;
optional ServerName server = 6; // Server who opened the region
optional bytes region_name = 7; // full region name
}
/**

View File

@ -208,6 +208,11 @@ public class DefaultMemStore implements MemStore {
return this.snapshotSize > 0 ? this.snapshotSize : keySize();
}
@Override
public long getSnapshotSize() {
return this.snapshotSize;
}
/**
* Write an update
* @param cell
@ -462,6 +467,7 @@ public class DefaultMemStore implements MemStore {
* @param now
* @return Timestamp
*/
@Override
public long updateColumnValue(byte[] row,
byte[] family,
byte[] qualifier,

View File

@ -34,6 +34,7 @@ import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.NavigableMap;
import java.util.NavigableSet;
import java.util.RandomAccess;
@ -62,7 +63,6 @@ import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.commons.lang.RandomStringUtils;
import com.google.protobuf.ByteString;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@ -73,7 +73,6 @@ import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.CompoundConfiguration;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.DroppedSnapshotException;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
@ -101,6 +100,7 @@ import org.apache.hadoop.hbase.client.Increment;
import org.apache.hadoop.hbase.client.IsolationLevel;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.RegionReplicaUtil;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.RowMutations;
import org.apache.hadoop.hbase.client.Scan;
@ -134,7 +134,10 @@ import org.apache.hadoop.hbase.protobuf.generated.WALProtos;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.FlushDescriptor;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.FlushDescriptor.FlushAction;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.FlushDescriptor.StoreFlushDescriptor;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.RegionEventDescriptor;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.RegionEventDescriptor.EventType;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.StoreDescriptor;
import org.apache.hadoop.hbase.regionserver.MultiVersionConsistencyControl.WriteEntry;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
import org.apache.hadoop.hbase.regionserver.compactions.NoLimitCompactionThroughputController;
@ -142,6 +145,7 @@ import org.apache.hadoop.hbase.regionserver.compactions.CompactionThroughputCont
import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
import org.apache.hadoop.hbase.regionserver.wal.MetricsWAL;
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
import org.apache.hadoop.hbase.regionserver.wal.ReplayHLogKey;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.regionserver.wal.WALUtil;
import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
@ -179,6 +183,7 @@ import com.google.protobuf.Message;
import com.google.protobuf.RpcCallback;
import com.google.protobuf.RpcController;
import com.google.protobuf.Service;
import com.google.protobuf.TextFormat;
/**
* HRegion stores data for a certain region of a table. It stores all columns
@ -257,6 +262,13 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
*/
private final AtomicLong sequenceId = new AtomicLong(-1L);
/**
* The sequence id of the last replayed open region event from the primary region. This is used
* to skip entries before this due to the possibility of replay edits coming out of order from
* replication.
*/
protected volatile long lastReplayedOpenRegionSeqId = -1L;
/**
* Operation enum is used in {@link HRegion#startRegionOperation} to provide operation context for
* startRegionOperation to possibly invoke different checks before any region operations. Not all
@ -265,7 +277,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
*/
public enum Operation {
ANY, GET, PUT, DELETE, SCAN, APPEND, INCREMENT, SPLIT_REGION, MERGE_REGION, BATCH_MUTATE,
REPLAY_BATCH_MUTATE, COMPACT_REGION
REPLAY_BATCH_MUTATE, COMPACT_REGION, REPLAY_EVENT
}
//////////////////////////////////////////////////////////////////////////////
@ -370,6 +382,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
// The following map is populated when opening the region
Map<byte[], Long> maxSeqIdInStores = new TreeMap<byte[], Long>(Bytes.BYTES_COMPARATOR);
/** Saved state from replaying prepare flush cache */
private PrepareFlushResult prepareFlushResult = null;
/**
* Config setting for whether to allow writes when a region is in recovering or not.
*/
@ -519,6 +534,54 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
public boolean isCompactionNeeded() {
return result == Result.FLUSHED_COMPACTION_NEEDED;
}
@Override
public String toString() {
return new StringBuilder()
.append("flush result:").append(result).append(", ")
.append("failureReason:").append(failureReason).append(",")
.append("flush seq id").append(flushSequenceId).toString();
}
}
/** A result object from prepare flush cache stage */
@VisibleForTesting
static class PrepareFlushResult {
final FlushResult result; // indicating a failure result from prepare
final TreeMap<byte[], StoreFlushContext> storeFlushCtxs;
final TreeMap<byte[], List<Path>> committedFiles;
final long startTime;
final long flushOpSeqId;
final long flushedSeqId;
final long totalFlushableSize;
/** Constructs an early exit case */
PrepareFlushResult(FlushResult result, long flushSeqId) {
this(result, null, null, Math.max(0, flushSeqId), 0, 0, 0);
}
/** Constructs a successful prepare flush result */
PrepareFlushResult(
TreeMap<byte[], StoreFlushContext> storeFlushCtxs,
TreeMap<byte[], List<Path>> committedFiles, long startTime, long flushSeqId,
long flushedSeqId, long totalFlushableSize) {
this(null, storeFlushCtxs, committedFiles, startTime,
flushSeqId, flushedSeqId, totalFlushableSize);
}
private PrepareFlushResult(
FlushResult result,
TreeMap<byte[], StoreFlushContext> storeFlushCtxs,
TreeMap<byte[], List<Path>> committedFiles, long startTime, long flushSeqId,
long flushedSeqId, long totalFlushableSize) {
this.result = result;
this.storeFlushCtxs = storeFlushCtxs;
this.committedFiles = committedFiles;
this.startTime = startTime;
this.flushOpSeqId = flushSeqId;
this.flushedSeqId = flushedSeqId;
this.totalFlushableSize = totalFlushableSize;
}
}
final WriteState writestate = new WriteState();
@ -776,6 +839,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
// Initialize all the HStores
status.setStatus("Initializing all the Stores");
long maxSeqId = initializeRegionStores(reporter, status);
this.lastReplayedOpenRegionSeqId = maxSeqId;
this.writestate.setReadOnly(ServerRegionReplicaUtil.isReadOnly(this));
this.writestate.flushRequested = false;
@ -1234,9 +1298,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
}
status.setStatus("Disabling compacts and flushes for region");
boolean canFlush = true;
synchronized (writestate) {
// Disable compacting and flushing by background threads for this
// region.
canFlush = !writestate.readOnly;
writestate.writesEnabled = false;
LOG.debug("Closing " + this + ": disabling compactions & flushes");
waitForFlushesAndCompactions();
@ -1244,7 +1310,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
// If we were not just flushing, is it worth doing a preflush...one
// that will clear out of the bulk of the memstore before we put up
// the close flag?
if (!abort && worthPreFlushing()) {
if (!abort && worthPreFlushing() && canFlush) {
status.setStatus("Pre-flushing region before close");
LOG.info("Running close preflush of " + this.getRegionNameAsString());
try {
@ -1267,7 +1333,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
}
LOG.debug("Updates disabled for region " + this);
// Don't flush the cache if we are aborting
if (!abort) {
if (!abort && canFlush) {
int flushCount = 0;
while (this.getMemstoreSize().get() > 0) {
try {
@ -1305,7 +1371,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
// close each store in parallel
for (final Store store : stores.values()) {
assert abort || store.getFlushableSize() == 0;
assert abort || store.getFlushableSize() == 0 || writestate.readOnly;
completionService
.submit(new Callable<Pair<byte[], Collection<StoreFile>>>() {
@Override
@ -1341,7 +1407,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
}
this.closed.set(true);
if (memstoreSize.get() != 0) LOG.error("Memstore size is " + memstoreSize.get());
if (!canFlush) {
addAndGetGlobalMemstoreSize(-memstoreSize.get());
} else if (memstoreSize.get() != 0) {
LOG.error("Memstore size is " + memstoreSize.get());
}
if (coprocessorHost != null) {
status.setStatus("Running coprocessor post-close hooks");
this.coprocessorHost.postClose(abort);
@ -1367,6 +1437,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
*/
public void waitForFlushesAndCompactions() {
synchronized (writestate) {
if (this.writestate.readOnly) {
// we should not wait for replayed flushed if we are read only (for example in case the
// region is a secondary replica).
return;
}
boolean interrupted = false;
try {
while (writestate.compacting > 0 || writestate.flushing) {
@ -1590,6 +1665,22 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
}
/**
* This is a helper function that compact the given store
* It is used by utilities and testing
*
* @throws IOException e
*/
@VisibleForTesting
void compactStore(byte[] family, CompactionThroughputController throughputController)
throws IOException {
Store s = getStore(family);
CompactionContext compaction = s.requestCompaction();
if (compaction != null) {
compact(compaction, s, throughputController);
}
}
/*
* Called by compaction thread and after region is opened to compact the
* HStores if necessary.
*
@ -1736,6 +1827,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
status.setStatus("Running coprocessor pre-flush hooks");
coprocessorHost.preFlush();
}
// TODO: this should be managed within memstore with the snapshot, updated only after flush
// successful
if (numMutationsWithoutWAL.get() > 0) {
numMutationsWithoutWAL.set(0);
dataInMemoryWithoutWAL.set(0);
@ -1901,6 +1994,20 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
*/
protected FlushResult internalFlushcache(final WAL wal, final long myseqid,
final Collection<Store> storesToFlush, MonitoredTask status) throws IOException {
PrepareFlushResult result
= internalPrepareFlushCache(wal, myseqid, storesToFlush, status, false);
if (result.result == null) {
return internalFlushCacheAndCommit(wal, status, result, storesToFlush);
} else {
return result.result; // early exit due to failure from prepare stage
}
}
protected PrepareFlushResult internalPrepareFlushCache(
final WAL wal, final long myseqid, final Collection<Store> storesToFlush,
MonitoredTask status, boolean isReplay)
throws IOException {
if (this.rsServices != null && this.rsServices.isAborted()) {
// Don't flush when server aborting, it's unsafe
throw new IOException("Aborting flush because server is aborted...");
@ -1928,10 +2035,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
w.setWriteNumber(flushSeqId);
mvcc.waitForPreviousTransactionsComplete(w);
w = null;
return flushResult;
return new PrepareFlushResult(flushResult, myseqid);
} else {
return new FlushResult(FlushResult.Result.CANNOT_FLUSH_MEMSTORE_EMPTY,
"Nothing to flush");
return new PrepareFlushResult(
new FlushResult(FlushResult.Result.CANNOT_FLUSH_MEMSTORE_EMPTY, "Nothing to flush"),
myseqid);
}
}
} finally {
@ -1975,7 +2083,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
flushedFamilyNames.add(store.getFamily().getName());
}
List<StoreFlushContext> storeFlushCtxs = new ArrayList<StoreFlushContext>(stores.size());
TreeMap<byte[], StoreFlushContext> storeFlushCtxs
= new TreeMap<byte[], StoreFlushContext>(Bytes.BYTES_COMPARATOR);
TreeMap<byte[], List<Path>> committedFiles = new TreeMap<byte[], List<Path>>(
Bytes.BYTES_COMPARATOR);
// The sequence id of this flush operation which is used to log FlushMarker and pass to
@ -1996,7 +2105,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
String msg = "Flush will not be started for ["
+ this.getRegionInfo().getEncodedName() + "] - because the WAL is closing.";
status.setStatus(msg);
return new FlushResult(FlushResult.Result.CANNOT_FLUSH, msg);
return new PrepareFlushResult(new FlushResult(FlushResult.Result.CANNOT_FLUSH, msg),
myseqid);
}
flushOpSeqId = getNextSequenceId(wal);
long oldestUnflushedSeqId = wal.getEarliestMemstoreSeqNum(encodedRegionName);
@ -2011,12 +2121,12 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
for (Store s : storesToFlush) {
totalFlushableSizeOfFlushableStores += s.getFlushableSize();
storeFlushCtxs.add(s.createFlushContext(flushOpSeqId));
storeFlushCtxs.put(s.getFamily().getName(), s.createFlushContext(flushOpSeqId));
committedFiles.put(s.getFamily().getName(), null); // for writing stores to WAL
}
// write the snapshot start to WAL
if (wal != null) {
if (wal != null && !writestate.readOnly) {
FlushDescriptor desc = ProtobufUtil.toFlushDescriptor(FlushAction.START_FLUSH,
getRegionInfo(), flushOpSeqId, committedFiles);
// no sync. Sync is below where we do not hold the updates lock
@ -2025,7 +2135,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
}
// Prepare flush (take a snapshot)
for (StoreFlushContext flush : storeFlushCtxs) {
for (StoreFlushContext flush : storeFlushCtxs.values()) {
flush.prepare();
}
} catch (IOException ex) {
@ -2073,15 +2183,32 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
mvcc.waitForPreviousTransactionsComplete(w);
// set w to null to prevent mvcc.advanceMemstore from being called again inside finally block
w = null;
s = "Flushing stores of " + this;
status.setStatus(s);
if (LOG.isTraceEnabled()) LOG.trace(s);
} finally {
if (w != null) {
// in case of failure just mark current w as complete
mvcc.advanceMemstore(w);
}
}
return new PrepareFlushResult(storeFlushCtxs, committedFiles, startTime, flushOpSeqId,
flushedSeqId, totalFlushableSizeOfFlushableStores);
}
protected FlushResult internalFlushCacheAndCommit(
final WAL wal, MonitoredTask status, final PrepareFlushResult prepareResult,
final Collection<Store> storesToFlush)
throws IOException {
// prepare flush context is carried via PrepareFlushResult
TreeMap<byte[], StoreFlushContext> storeFlushCtxs = prepareResult.storeFlushCtxs;
TreeMap<byte[], List<Path>> committedFiles = prepareResult.committedFiles;
long startTime = prepareResult.startTime;
long flushOpSeqId = prepareResult.flushOpSeqId;
long flushedSeqId = prepareResult.flushedSeqId;
long totalFlushableSizeOfFlushableStores = prepareResult.totalFlushableSize;
String s = "Flushing stores of " + this;
status.setStatus(s);
if (LOG.isTraceEnabled()) LOG.trace(s);
// Any failure from here on out will be catastrophic requiring server
// restart so wal content can be replayed and put back into the memstore.
@ -2094,7 +2221,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
// just-made new flush store file. The new flushed file is still in the
// tmp directory.
for (StoreFlushContext flush : storeFlushCtxs) {
for (StoreFlushContext flush : storeFlushCtxs.values()) {
flush.flushCache(status);
}
@ -2102,7 +2229,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
// all the store scanners to reset/reseek).
Iterator<Store> it = storesToFlush.iterator();
// stores.values() and storeFlushCtxs have same order
for (StoreFlushContext flush : storeFlushCtxs) {
for (StoreFlushContext flush : storeFlushCtxs.values()) {
boolean needsCompaction = flush.commit(status);
if (needsCompaction) {
compactionRequested = true;
@ -2591,6 +2718,25 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
*/
public OperationStatus[] batchReplay(MutationReplay[] mutations, long replaySeqId)
throws IOException {
if (!RegionReplicaUtil.isDefaultReplica(getRegionInfo())
&& replaySeqId < lastReplayedOpenRegionSeqId) {
// if it is a secondary replica we should ignore these entries silently
// since they are coming out of order
if (LOG.isTraceEnabled()) {
LOG.trace(getRegionInfo().getEncodedName() + " : "
+ "Skipping " + mutations.length + " mutations with replaySeqId=" + replaySeqId
+ " which is < than lastReplayedOpenRegionSeqId=" + lastReplayedOpenRegionSeqId);
for (MutationReplay mut : mutations) {
LOG.trace(getRegionInfo().getEncodedName() + " : Skipping : " + mut.mutation);
}
}
OperationStatus[] statuses = new OperationStatus[mutations.length];
for (int i = 0; i < statuses.length; i++) {
statuses[i] = OperationStatus.SUCCESS;
}
return statuses;
}
return batchMutate(new ReplayBatch(mutations, replaySeqId));
}
@ -2895,7 +3041,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
}
// txid should always increase, so having the one from the last call is ok.
// we use HLogKey here instead of WALKey directly to support legacy coprocessors.
walKey = new HLogKey(this.getRegionInfo().getEncodedNameAsBytes(),
walKey = new ReplayHLogKey(this.getRegionInfo().getEncodedNameAsBytes(),
this.htableDescriptor.getTableName(), now, m.getClusterIds(),
currentNonceGroup, currentNonce);
txid = this.wal.append(this.htableDescriptor, this.getRegionInfo(), walKey,
@ -2921,14 +3067,29 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
// STEP 5. Append the final edit to WAL. Do not sync wal.
// -------------------------
Mutation mutation = batchOp.getMutation(firstIndex);
if (isInReplay) {
// use wal key from the original
walKey = new ReplayHLogKey(this.getRegionInfo().getEncodedNameAsBytes(),
this.htableDescriptor.getTableName(), WALKey.NO_SEQUENCE_ID, now,
mutation.getClusterIds(), currentNonceGroup, currentNonce);
long replaySeqId = batchOp.getReplaySequenceId();
walKey.setOrigLogSeqNum(replaySeqId);
// ensure that the sequence id of the region is at least as big as orig log seq id
while (true) {
long seqId = getSequenceId().get();
if (seqId >= replaySeqId) break;
if (getSequenceId().compareAndSet(seqId, replaySeqId)) break;
}
}
if (walEdit.size() > 0) {
if (!isInReplay) {
// we use HLogKey here instead of WALKey directly to support legacy coprocessors.
walKey = new HLogKey(this.getRegionInfo().getEncodedNameAsBytes(),
this.htableDescriptor.getTableName(), WALKey.NO_SEQUENCE_ID, now,
mutation.getClusterIds(), currentNonceGroup, currentNonce);
if(isInReplay) {
walKey.setOrigLogSeqNum(batchOp.getReplaySequenceId());
}
txid = this.wal.append(this.htableDescriptor, this.getRegionInfo(), walKey, walEdit,
getSequenceId(), true, memstoreCells);
}
@ -3804,7 +3965,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
CompactionDescriptor compaction = WALEdit.getCompaction(cell);
if (compaction != null) {
//replay the compaction
completeCompactionMarker(compaction);
replayWALCompactionMarker(compaction, false, true, Long.MAX_VALUE);
}
skippedEdits++;
continue;
@ -3887,15 +4048,506 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
* that was not finished. We could find one recovering a WAL after a regionserver crash.
* See HBASE-2331.
*/
void completeCompactionMarker(CompactionDescriptor compaction)
void replayWALCompactionMarker(CompactionDescriptor compaction, boolean pickCompactionFiles,
boolean removeFiles, long replaySeqId)
throws IOException {
Store store = this.getStore(compaction.getFamilyName().toByteArray());
if (store == null) {
LOG.warn("Found Compaction WAL edit for deleted family:" +
Bytes.toString(compaction.getFamilyName().toByteArray()));
checkTargetRegion(compaction.getEncodedRegionName().toByteArray(),
"Compaction marker from WAL ", compaction);
if (replaySeqId < lastReplayedOpenRegionSeqId) {
LOG.warn("Skipping replaying compaction event :" + TextFormat.shortDebugString(compaction)
+ " because its sequence id is smaller than this regions lastReplayedOpenRegionSeqId "
+ " of " + lastReplayedOpenRegionSeqId);
return;
}
store.completeCompactionMarker(compaction);
startRegionOperation(Operation.REPLAY_EVENT);
try {
Store store = this.getStore(compaction.getFamilyName().toByteArray());
if (store == null) {
LOG.warn("Found Compaction WAL edit for deleted family:" +
Bytes.toString(compaction.getFamilyName().toByteArray()));
return;
}
store.replayCompactionMarker(compaction, pickCompactionFiles, removeFiles);
} finally {
closeRegionOperation(Operation.REPLAY_EVENT);
}
}
void replayWALFlushMarker(FlushDescriptor flush) throws IOException {
checkTargetRegion(flush.getEncodedRegionName().toByteArray(),
"Flush marker from WAL ", flush);
if (ServerRegionReplicaUtil.isDefaultReplica(this.getRegionInfo())) {
return; // if primary nothing to do
}
if (LOG.isDebugEnabled()) {
LOG.debug("Replaying flush marker " + TextFormat.shortDebugString(flush));
}
startRegionOperation(Operation.REPLAY_EVENT); // use region close lock to guard against close
try {
FlushAction action = flush.getAction();
switch (action) {
case START_FLUSH:
replayWALFlushStartMarker(flush);
break;
case COMMIT_FLUSH:
replayWALFlushCommitMarker(flush);
break;
case ABORT_FLUSH:
replayWALFlushAbortMarker(flush);
break;
default:
LOG.warn("Received a flush event with unknown action, ignoring. "
+ TextFormat.shortDebugString(flush));
break;
}
} finally {
closeRegionOperation(Operation.REPLAY_EVENT);
}
}
/** Replay the flush marker from primary region by creating a corresponding snapshot of
* the store memstores, only if the memstores do not have a higher seqId from an earlier wal
* edit (because the events may be coming out of order).
*/
@VisibleForTesting
PrepareFlushResult replayWALFlushStartMarker(FlushDescriptor flush) throws IOException {
long flushSeqId = flush.getFlushSequenceNumber();
HashSet<Store> storesToFlush = new HashSet<Store>();
for (StoreFlushDescriptor storeFlush : flush.getStoreFlushesList()) {
byte[] family = storeFlush.getFamilyName().toByteArray();
Store store = getStore(family);
if (store == null) {
LOG.info("Received a flush start marker from primary, but the family is not found. Ignoring"
+ " StoreFlushDescriptor:" + TextFormat.shortDebugString(storeFlush));
continue;
}
storesToFlush.add(store);
}
MonitoredTask status = TaskMonitor.get().createStatus("Preparing flush " + this);
// we will use writestate as a coarse-grain lock for all the replay events
// (flush, compaction, region open etc)
synchronized (writestate) {
try {
if (flush.getFlushSequenceNumber() < lastReplayedOpenRegionSeqId) {
LOG.warn("Skipping replaying flush event :" + TextFormat.shortDebugString(flush)
+ " because its sequence id is smaller than this regions lastReplayedOpenRegionSeqId "
+ " of " + lastReplayedOpenRegionSeqId);
return null;
}
if (numMutationsWithoutWAL.get() > 0) {
numMutationsWithoutWAL.set(0);
dataInMemoryWithoutWAL.set(0);
}
if (!writestate.flushing) {
// we do not have an active snapshot and corresponding this.prepareResult. This means
// we can just snapshot our memstores and continue as normal.
// invoke prepareFlushCache. Send null as wal since we do not want the flush events in wal
PrepareFlushResult prepareResult = internalPrepareFlushCache(null,
flushSeqId, storesToFlush, status, true);
if (prepareResult.result == null) {
// save the PrepareFlushResult so that we can use it later from commit flush
this.writestate.flushing = true;
this.prepareFlushResult = prepareResult;
status.markComplete("Flush prepare successful");
if (LOG.isDebugEnabled()) {
LOG.debug(getRegionInfo().getEncodedName() + " : "
+ " Prepared flush with seqId:" + flush.getFlushSequenceNumber());
}
} else {
status.abort("Flush prepare failed with " + prepareResult.result);
// nothing much to do. prepare flush failed because of some reason.
}
return prepareResult;
} else {
// we already have an active snapshot.
if (flush.getFlushSequenceNumber() == this.prepareFlushResult.flushOpSeqId) {
// They define the same flush. Log and continue.
LOG.warn("Received a flush prepare marker with the same seqId: " +
+ flush.getFlushSequenceNumber() + " before clearing the previous one with seqId: "
+ prepareFlushResult.flushOpSeqId + ". Ignoring");
// ignore
} else if (flush.getFlushSequenceNumber() < this.prepareFlushResult.flushOpSeqId) {
// We received a flush with a smaller seqNum than what we have prepared. We can only
// ignore this prepare flush request.
LOG.warn("Received a flush prepare marker with a smaller seqId: " +
+ flush.getFlushSequenceNumber() + " before clearing the previous one with seqId: "
+ prepareFlushResult.flushOpSeqId + ". Ignoring");
// ignore
} else {
// We received a flush with a larger seqNum than what we have prepared
LOG.warn("Received a flush prepare marker with a larger seqId: " +
+ flush.getFlushSequenceNumber() + " before clearing the previous one with seqId: "
+ prepareFlushResult.flushOpSeqId + ". Ignoring");
// We do not have multiple active snapshots in the memstore or a way to merge current
// memstore snapshot with the contents and resnapshot for now. We cannot take
// another snapshot and drop the previous one because that will cause temporary
// data loss in the secondary. So we ignore this for now, deferring the resolution
// to happen when we see the corresponding flush commit marker. If we have a memstore
// snapshot with x, and later received another prepare snapshot with y (where x < y),
// when we see flush commit for y, we will drop snapshot for x, and can also drop all
// the memstore edits if everything in memstore is < y. This is the usual case for
// RS crash + recovery where we might see consequtive prepare flush wal markers.
// Otherwise, this will cause more memory to be used in secondary replica until a
// further prapare + commit flush is seen and replayed.
}
}
} finally {
status.cleanup();
writestate.notifyAll();
}
}
return null;
}
@VisibleForTesting
void replayWALFlushCommitMarker(FlushDescriptor flush) throws IOException {
MonitoredTask status = TaskMonitor.get().createStatus("Committing flush " + this);
// check whether we have the memstore snapshot with the corresponding seqId. Replay to
// secondary region replicas are in order, except for when the region moves or then the
// region server crashes. In those cases, we may receive replay requests out of order from
// the original seqIds.
synchronized (writestate) {
try {
if (flush.getFlushSequenceNumber() < lastReplayedOpenRegionSeqId) {
LOG.warn("Skipping replaying flush event :" + TextFormat.shortDebugString(flush)
+ " because its sequence id is smaller than this regions lastReplayedOpenRegionSeqId "
+ " of " + lastReplayedOpenRegionSeqId);
return;
}
if (writestate.flushing) {
PrepareFlushResult prepareFlushResult = this.prepareFlushResult;
if (flush.getFlushSequenceNumber() == prepareFlushResult.flushOpSeqId) {
if (LOG.isDebugEnabled()) {
LOG.debug(getRegionInfo().getEncodedName() + " : "
+ "Received a flush commit marker with seqId:" + flush.getFlushSequenceNumber()
+ " and a previous prepared snapshot was found");
}
// This is the regular case where we received commit flush after prepare flush
// corresponding to the same seqId.
replayFlushInStores(flush, prepareFlushResult, true);
// Set down the memstore size by amount of flush.
this.addAndGetGlobalMemstoreSize(-prepareFlushResult.totalFlushableSize);
this.prepareFlushResult = null;
writestate.flushing = false;
} else if (flush.getFlushSequenceNumber() < prepareFlushResult.flushOpSeqId) {
// This should not happen normally. However, lets be safe and guard against these cases
// we received a flush commit with a smaller seqId than what we have prepared
// we will pick the flush file up from this commit (if we have not seen it), but we
// will not drop the memstore
LOG.warn("Received a flush commit marker with smaller seqId: "
+ flush.getFlushSequenceNumber() + " than what we have prepared with seqId: "
+ prepareFlushResult.flushOpSeqId + ". Picking up new file, but not dropping"
+" prepared memstore snapshot");
replayFlushInStores(flush, prepareFlushResult, false);
// snapshot is not dropped, so memstore sizes should not be decremented
// we still have the prepared snapshot, flushing should still be true
} else {
// This should not happen normally. However, lets be safe and guard against these cases
// we received a flush commit with a larger seqId than what we have prepared
// we will pick the flush file for this. We will also obtain the updates lock and
// look for contents of the memstore to see whether we have edits after this seqId.
// If not, we will drop all the memstore edits and the snapshot as well.
LOG.warn("Received a flush commit marker with larger seqId: "
+ flush.getFlushSequenceNumber() + " than what we have prepared with seqId: " +
prepareFlushResult.flushOpSeqId + ". Picking up new file and dropping prepared"
+" memstore snapshot");
replayFlushInStores(flush, prepareFlushResult, true);
// Set down the memstore size by amount of flush.
this.addAndGetGlobalMemstoreSize(-prepareFlushResult.totalFlushableSize);
// Inspect the memstore contents to see whether the memstore contains only edits
// with seqId smaller than the flush seqId. If so, we can discard those edits.
dropMemstoreContentsForSeqId(flush.getFlushSequenceNumber(), null);
this.prepareFlushResult = null;
writestate.flushing = false;
}
} else {
LOG.warn(getRegionInfo().getEncodedName() + " : "
+ "Received a flush commit marker with seqId:" + flush.getFlushSequenceNumber()
+ ", but no previous prepared snapshot was found");
// There is no corresponding prepare snapshot from before.
// We will pick up the new flushed file
replayFlushInStores(flush, null, false);
// Inspect the memstore contents to see whether the memstore contains only edits
// with seqId smaller than the flush seqId. If so, we can discard those edits.
dropMemstoreContentsForSeqId(flush.getFlushSequenceNumber(), null);
}
status.markComplete("Flush commit successful");
// Update the last flushed sequence id for region.
this.maxFlushedSeqId = flush.getFlushSequenceNumber();
// advance the mvcc read point so that the new flushed file is visible.
// there may be some in-flight transactions, but they won't be made visible since they are
// either greater than flush seq number or they were already dropped via flush.
// TODO: If we are using FlushAllStoresPolicy, then this can make edits visible from other
// stores while they are still in flight because the flush commit marker will not contain
// flushes from ALL stores.
getMVCC().advanceMemstoreReadPointIfNeeded(flush.getFlushSequenceNumber());
// C. Finally notify anyone waiting on memstore to clear:
// e.g. checkResources().
synchronized (this) {
notifyAll(); // FindBugs NN_NAKED_NOTIFY
}
} finally {
status.cleanup();
writestate.notifyAll();
}
}
}
/**
* Replays the given flush descriptor by opening the flush files in stores and dropping the
* memstore snapshots if requested.
* @param flush
* @param prepareFlushResult
* @param dropMemstoreSnapshot
* @throws IOException
*/
private void replayFlushInStores(FlushDescriptor flush, PrepareFlushResult prepareFlushResult,
boolean dropMemstoreSnapshot)
throws IOException {
for (StoreFlushDescriptor storeFlush : flush.getStoreFlushesList()) {
byte[] family = storeFlush.getFamilyName().toByteArray();
Store store = getStore(family);
if (store == null) {
LOG.warn("Received a flush commit marker from primary, but the family is not found." +
"Ignoring StoreFlushDescriptor:" + storeFlush);
continue;
}
List<String> flushFiles = storeFlush.getFlushOutputList();
StoreFlushContext ctx = null;
long startTime = EnvironmentEdgeManager.currentTime();
if (prepareFlushResult == null) {
ctx = store.createFlushContext(flush.getFlushSequenceNumber());
} else {
ctx = prepareFlushResult.storeFlushCtxs.get(family);
startTime = prepareFlushResult.startTime;
}
if (ctx == null) {
LOG.warn("Unexpected: flush commit marker received from store "
+ Bytes.toString(family) + " but no associated flush context. Ignoring");
continue;
}
ctx.replayFlush(flushFiles, dropMemstoreSnapshot); // replay the flush
// Record latest flush time
this.lastStoreFlushTimeMap.put(store, startTime);
}
}
/**
* Drops the memstore contents after replaying a flush descriptor or region open event replay
* if the memstore edits have seqNums smaller than the given seq id
* @param flush the flush descriptor
* @throws IOException
*/
private void dropMemstoreContentsForSeqId(long seqId, Store store) throws IOException {
this.updatesLock.writeLock().lock();
try {
mvcc.waitForPreviousTransactionsComplete();
long currentSeqId = getSequenceId().get();
if (seqId >= currentSeqId) {
// then we can drop the memstore contents since everything is below this seqId
LOG.info("Dropping memstore contents as well since replayed flush seqId: "
+ seqId + " is greater than current seqId:" + currentSeqId);
// Prepare flush (take a snapshot) and then abort (drop the snapshot)
if (store == null ) {
for (Store s : stores.values()) {
dropStoreMemstoreContentsForSeqId(s, currentSeqId);
}
} else {
dropStoreMemstoreContentsForSeqId(store, currentSeqId);
}
} else {
LOG.info("Not dropping memstore contents since replayed flush seqId: "
+ seqId + " is smaller than current seqId:" + currentSeqId);
}
} finally {
this.updatesLock.writeLock().unlock();
}
}
private void dropStoreMemstoreContentsForSeqId(Store s, long currentSeqId) throws IOException {
this.addAndGetGlobalMemstoreSize(-s.getFlushableSize());
StoreFlushContext ctx = s.createFlushContext(currentSeqId);
ctx.prepare();
ctx.abort();
}
private void replayWALFlushAbortMarker(FlushDescriptor flush) {
// nothing to do for now. A flush abort will cause a RS abort which means that the region
// will be opened somewhere else later. We will see the region open event soon, and replaying
// that will drop the snapshot
}
@VisibleForTesting
PrepareFlushResult getPrepareFlushResult() {
return prepareFlushResult;
}
void replayWALRegionEventMarker(RegionEventDescriptor regionEvent) throws IOException {
checkTargetRegion(regionEvent.getEncodedRegionName().toByteArray(),
"RegionEvent marker from WAL ", regionEvent);
startRegionOperation(Operation.REPLAY_EVENT);
try {
if (ServerRegionReplicaUtil.isDefaultReplica(this.getRegionInfo())) {
return; // if primary nothing to do
}
if (regionEvent.getEventType() == EventType.REGION_CLOSE) {
// nothing to do on REGION_CLOSE for now.
return;
}
if (regionEvent.getEventType() != EventType.REGION_OPEN) {
LOG.warn("Unknown region event received, ignoring :"
+ TextFormat.shortDebugString(regionEvent));
return;
}
if (LOG.isDebugEnabled()) {
LOG.debug("Replaying region open event marker " + TextFormat.shortDebugString(regionEvent));
}
// we will use writestate as a coarse-grain lock for all the replay events
synchronized (writestate) {
// Replication can deliver events out of order when primary region moves or the region
// server crashes, since there is no coordination between replication of different wal files
// belonging to different region servers. We have to safe guard against this case by using
// region open event's seqid. Since this is the first event that the region puts (after
// possibly flushing recovered.edits), after seeing this event, we can ignore every edit
// smaller than this seqId
if (this.lastReplayedOpenRegionSeqId < regionEvent.getLogSequenceNumber()) {
this.lastReplayedOpenRegionSeqId = regionEvent.getLogSequenceNumber();
} else {
LOG.warn("Skipping replaying region event :" + TextFormat.shortDebugString(regionEvent)
+ " because its sequence id is smaller than this regions lastReplayedOpenRegionSeqId "
+ " of " + lastReplayedOpenRegionSeqId);
return;
}
// region open lists all the files that the region has at the time of the opening. Just pick
// all the files and drop prepared flushes and empty memstores
for (StoreDescriptor storeDescriptor : regionEvent.getStoresList()) {
// stores of primary may be different now
byte[] family = storeDescriptor.getFamilyName().toByteArray();
Store store = getStore(family);
if (store == null) {
LOG.warn("Received a region open marker from primary, but the family is not found. "
+ "Ignoring. StoreDescriptor:" + storeDescriptor);
continue;
}
long storeSeqId = store.getMaxSequenceId();
List<String> storeFiles = storeDescriptor.getStoreFileList();
store.refreshStoreFiles(storeFiles); // replace the files with the new ones
if (store.getMaxSequenceId() != storeSeqId) {
// Record latest flush time if we picked up new files
lastStoreFlushTimeMap.put(store, EnvironmentEdgeManager.currentTime());
}
if (writestate.flushing) {
// only drop memstore snapshots if they are smaller than last flush for the store
if (this.prepareFlushResult.flushOpSeqId <= regionEvent.getLogSequenceNumber()) {
StoreFlushContext ctx = this.prepareFlushResult.storeFlushCtxs.get(family);
if (ctx != null) {
long snapshotSize = store.getFlushableSize();
ctx.abort();
this.addAndGetGlobalMemstoreSize(-snapshotSize);
this.prepareFlushResult.storeFlushCtxs.remove(family);
}
}
}
// Drop the memstore contents if they are now smaller than the latest seen flushed file
dropMemstoreContentsForSeqId(regionEvent.getLogSequenceNumber(), store);
if (storeSeqId > this.maxFlushedSeqId) {
this.maxFlushedSeqId = storeSeqId;
}
}
// if all stores ended up dropping their snapshots, we can safely drop the
// prepareFlushResult
if (writestate.flushing) {
boolean canDrop = true;
for (Entry<byte[], StoreFlushContext> entry
: prepareFlushResult.storeFlushCtxs.entrySet()) {
Store store = getStore(entry.getKey());
if (store == null) {
continue;
}
if (store.getSnapshotSize() > 0) {
canDrop = false;
}
}
// this means that all the stores in the region has finished flushing, but the WAL marker
// may not have been written or we did not receive it yet.
if (canDrop) {
writestate.flushing = false;
this.prepareFlushResult = null;
}
}
// advance the mvcc read point so that the new flushed file is visible.
// there may be some in-flight transactions, but they won't be made visible since they are
// either greater than flush seq number or they were already dropped via flush.
getMVCC().advanceMemstoreReadPointIfNeeded(this.maxFlushedSeqId);
// C. Finally notify anyone waiting on memstore to clear:
// e.g. checkResources().
synchronized (this) {
notifyAll(); // FindBugs NN_NAKED_NOTIFY
}
}
} finally {
closeRegionOperation(Operation.REPLAY_EVENT);
}
}
/** Checks whether the given regionName is either equal to our region, or that
* the regionName is the primary region to our corresponding range for the secondary replica.
*/
private void checkTargetRegion(byte[] encodedRegionName, String exceptionMsg, Object payload)
throws WrongRegionException {
if (Bytes.equals(this.getRegionInfo().getEncodedNameAsBytes(), encodedRegionName)) {
return;
}
if (!RegionReplicaUtil.isDefaultReplica(this.getRegionInfo()) &&
Bytes.equals(encodedRegionName,
this.fs.getRegionInfoForFS().getEncodedNameAsBytes())) {
return;
}
throw new WrongRegionException(exceptionMsg + payload
+ " targetted for region " + Bytes.toStringBinary(encodedRegionName)
+ " does not match this region: " + this.getRegionInfo());
}
/**
@ -5093,7 +5745,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
checkClassLoading();
this.openSeqNum = initialize(reporter);
this.setSequenceId(openSeqNum);
if (wal != null && getRegionServerServices() != null) {
if (wal != null && getRegionServerServices() != null && !writestate.readOnly) {
writeRegionOpenMarker(wal, openSeqNum);
}
return this;
@ -6185,8 +6837,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
public static final long FIXED_OVERHEAD = ClassSize.align(
ClassSize.OBJECT +
ClassSize.ARRAY +
44 * ClassSize.REFERENCE + 2 * Bytes.SIZEOF_INT +
(11 * Bytes.SIZEOF_LONG) +
45 * ClassSize.REFERENCE + 2 * Bytes.SIZEOF_INT +
(12 * Bytes.SIZEOF_LONG) +
4 * Bytes.SIZEOF_BOOLEAN);
// woefully out of date - currently missing:

View File

@ -119,6 +119,10 @@ public class HRegionFileSystem {
return this.regionInfo;
}
public HRegionInfo getRegionInfoForFS() {
return this.regionInfoForFs;
}
/** @return {@link Path} to the region's root directory. */
public Path getTableDir() {
return this.tableDir;
@ -205,7 +209,7 @@ public class HRegionFileSystem {
continue;
}
StoreFileInfo info = ServerRegionReplicaUtil.getStoreFileInfo(conf, fs, regionInfo,
regionInfoForFs, familyName, status);
regionInfoForFs, familyName, status.getPath());
storeFiles.add(info);
}
@ -234,8 +238,8 @@ public class HRegionFileSystem {
StoreFileInfo getStoreFileInfo(final String familyName, final String fileName)
throws IOException {
Path familyDir = getStoreDir(familyName);
FileStatus status = fs.getFileStatus(new Path(familyDir, fileName));
return new StoreFileInfo(this.conf, this.fs, status);
return ServerRegionReplicaUtil.getStoreFileInfo(conf, fs, regionInfo,
regionInfoForFs, familyName, new Path(familyDir, fileName));
}
/**

View File

@ -398,6 +398,11 @@ public class HStore implements Store {
return this.memstore.getFlushableSize();
}
@Override
public long getSnapshotSize() {
return this.memstore.getSnapshotSize();
}
@Override
public long getCompactionCheckMultiplier() {
return this.compactionCheckMultiplier;
@ -448,7 +453,8 @@ public class HStore implements Store {
/**
* @return The maximum sequence id in all store files. Used for log replay.
*/
long getMaxSequenceId() {
@Override
public long getMaxSequenceId() {
return StoreFile.getMaxSequenceIdInList(this.getStorefiles());
}
@ -576,11 +582,31 @@ public class HStore implements Store {
*/
@Override
public void refreshStoreFiles() throws IOException {
Collection<StoreFileInfo> newFiles = fs.getStoreFiles(getColumnFamilyName());
refreshStoreFilesInternal(newFiles);
}
@Override
public void refreshStoreFiles(Collection<String> newFiles) throws IOException {
List<StoreFileInfo> storeFiles = new ArrayList<StoreFileInfo>(newFiles.size());
for (String file : newFiles) {
storeFiles.add(fs.getStoreFileInfo(getColumnFamilyName(), file));
}
refreshStoreFilesInternal(storeFiles);
}
/**
* Checks the underlying store files, and opens the files that have not
* been opened, and removes the store file readers for store files no longer
* available. Mainly used by secondary region replicas to keep up to date with
* the primary region files.
* @throws IOException
*/
private void refreshStoreFilesInternal(Collection<StoreFileInfo> newFiles) throws IOException {
StoreFileManager sfm = storeEngine.getStoreFileManager();
Collection<StoreFile> currentFiles = sfm.getStorefiles();
if (currentFiles == null) currentFiles = new ArrayList<StoreFile>(0);
Collection<StoreFileInfo> newFiles = fs.getStoreFiles(getColumnFamilyName());
if (newFiles == null) newFiles = new ArrayList<StoreFileInfo>(0);
HashMap<StoreFileInfo, StoreFile> currentFilesSet = new HashMap<StoreFileInfo, StoreFile>(currentFiles.size());
@ -1011,7 +1037,9 @@ public class HStore implements Store {
this.lock.writeLock().lock();
try {
this.storeEngine.getStoreFileManager().insertNewFiles(sfs);
this.memstore.clearSnapshot(snapshotId);
if (snapshotId > 0) {
this.memstore.clearSnapshot(snapshotId);
}
} finally {
// We need the lock, as long as we are updating the storeFiles
// or changing the memstore. Let us release it before calling
@ -1311,10 +1339,12 @@ public class HStore implements Store {
* @param compaction
*/
@Override
public void completeCompactionMarker(CompactionDescriptor compaction)
public void replayCompactionMarker(CompactionDescriptor compaction,
boolean pickCompactionFiles, boolean removeFiles)
throws IOException {
LOG.debug("Completing compaction from the WAL marker");
List<String> compactionInputs = compaction.getCompactionInputList();
List<String> compactionOutputs = Lists.newArrayList(compaction.getCompactionOutputList());
// The Compaction Marker is written after the compaction is completed,
// and the files moved into the region/family folder.
@ -1331,22 +1361,40 @@ public class HStore implements Store {
// being in the store's folder) or they may be missing due to a compaction.
String familyName = this.getColumnFamilyName();
List<Path> inputPaths = new ArrayList<Path>(compactionInputs.size());
List<String> inputFiles = new ArrayList<String>(compactionInputs.size());
for (String compactionInput : compactionInputs) {
Path inputPath = fs.getStoreFilePath(familyName, compactionInput);
inputPaths.add(inputPath);
inputFiles.add(inputPath.getName());
}
//some of the input files might already be deleted
List<StoreFile> inputStoreFiles = new ArrayList<StoreFile>(compactionInputs.size());
for (StoreFile sf : this.getStorefiles()) {
if (inputPaths.contains(sf.getQualifiedPath())) {
if (inputFiles.contains(sf.getPath().getName())) {
inputStoreFiles.add(sf);
}
}
this.replaceStoreFiles(inputStoreFiles, Collections.<StoreFile>emptyList());
this.completeCompaction(inputStoreFiles);
// check whether we need to pick up the new files
List<StoreFile> outputStoreFiles = new ArrayList<StoreFile>(compactionOutputs.size());
if (pickCompactionFiles) {
for (StoreFile sf : this.getStorefiles()) {
compactionOutputs.remove(sf.getPath().getName());
}
for (String compactionOutput : compactionOutputs) {
StoreFileInfo storeFileInfo = fs.getStoreFileInfo(getColumnFamilyName(), compactionOutput);
StoreFile storeFile = createStoreFileAndReader(storeFileInfo);
outputStoreFiles.add(storeFile);
}
}
if (!inputStoreFiles.isEmpty() || !outputStoreFiles.isEmpty()) {
LOG.info("Replaying compaction marker, replacing input files: " +
inputStoreFiles + " with output files : " + outputStoreFiles);
this.replaceStoreFiles(inputStoreFiles, outputStoreFiles);
this.completeCompaction(inputStoreFiles, removeFiles);
}
}
/**
@ -2174,6 +2222,47 @@ public class HStore implements Store {
public List<Path> getCommittedFiles() {
return committedFiles;
}
/**
* Similar to commit, but called in secondary region replicas for replaying the
* flush cache from primary region. Adds the new files to the store, and drops the
* snapshot depending on dropMemstoreSnapshot argument.
* @param fileNames names of the flushed files
* @param dropMemstoreSnapshot whether to drop the prepared memstore snapshot
* @throws IOException
*/
@Override
public void replayFlush(List<String> fileNames, boolean dropMemstoreSnapshot)
throws IOException {
List<StoreFile> storeFiles = new ArrayList<StoreFile>(fileNames.size());
for (String file : fileNames) {
// open the file as a store file (hfile link, etc)
StoreFileInfo storeFileInfo = fs.getStoreFileInfo(getColumnFamilyName(), file);
StoreFile storeFile = createStoreFileAndReader(storeFileInfo);
storeFiles.add(storeFile);
if (LOG.isInfoEnabled()) {
LOG.info("Region: " + HStore.this.getRegionInfo().getEncodedName() +
" added " + storeFile + ", entries=" + storeFile.getReader().getEntries() +
", sequenceid=" + + storeFile.getReader().getSequenceID() +
", filesize=" + StringUtils.humanReadableInt(storeFile.getReader().length()));
}
}
long snapshotId = dropMemstoreSnapshot ? snapshot.getId() : -1; // -1 means do not drop
HStore.this.updateStorefiles(storeFiles, snapshotId);
}
/**
* Abort the snapshot preparation. Drops the snapshot if any.
* @throws IOException
*/
@Override
public void abort() throws IOException {
if (snapshot == null) {
return;
}
HStore.this.updateStorefiles(new ArrayList<StoreFile>(0), snapshot.getId());
}
}
@Override

View File

@ -59,6 +59,12 @@ public interface MemStore extends HeapSize {
*/
long getFlushableSize();
/**
* Return the size of the snapshot(s) if any
* @return size of the memstore snapshot
*/
long getSnapshotSize();
/**
* Write an update
* @param cell

View File

@ -147,6 +147,8 @@ import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RequestHeader;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.FlushDescriptor;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.RegionEventDescriptor;
import org.apache.hadoop.hbase.regionserver.HRegion.Operation;
import org.apache.hadoop.hbase.regionserver.Leases.LeaseStillHeldException;
import org.apache.hadoop.hbase.regionserver.handler.OpenMetaHandler;
@ -710,8 +712,23 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
if (metaCells != null && !metaCells.isEmpty()) {
for (Cell metaCell : metaCells) {
CompactionDescriptor compactionDesc = WALEdit.getCompaction(metaCell);
boolean isDefaultReplica = RegionReplicaUtil.isDefaultReplica(region.getRegionInfo());
if (compactionDesc != null) {
region.completeCompactionMarker(compactionDesc);
// replay the compaction. Remove the files from stores only if we are the primary
// region replica (thus own the files)
region.replayWALCompactionMarker(compactionDesc, !isDefaultReplica, isDefaultReplica,
replaySeqId);
continue;
}
FlushDescriptor flushDesc = WALEdit.getFlushDescriptor(metaCell);
if (flushDesc != null && !isDefaultReplica) {
region.replayWALFlushMarker(flushDesc);
continue;
}
RegionEventDescriptor regionEvent = WALEdit.getRegionEventDescriptor(metaCell);
if (regionEvent != null && !isDefaultReplica) {
region.replayWALRegionEventMarker(regionEvent);
continue;
}
}
it.remove();

View File

@ -213,9 +213,13 @@ public interface Store extends HeapSize, StoreConfigInformation, PropagatingConf
* Call to complete a compaction. Its for the case where we find in the WAL a compaction
* that was not finished. We could find one recovering a WAL after a regionserver crash.
* See HBASE-2331.
* @param compaction
* @param compaction the descriptor for compaction
* @param pickCompactionFiles whether or not pick up the new compaction output files and
* add it to the store
* @param removeFiles whether to remove/archive files from filesystem
*/
void completeCompactionMarker(CompactionDescriptor compaction)
void replayCompactionMarker(CompactionDescriptor compaction, boolean pickCompactionFiles,
boolean removeFiles)
throws IOException;
// Split oriented methods
@ -265,8 +269,19 @@ public interface Store extends HeapSize, StoreConfigInformation, PropagatingConf
*/
long getFlushableSize();
/**
* Returns the memstore snapshot size
* @return size of the memstore snapshot
*/
long getSnapshotSize();
HColumnDescriptor getFamily();
/**
* @return The maximum sequence id in all store files.
*/
long getMaxSequenceId();
/**
* @return The maximum memstoreTS in all store files.
*/
@ -416,4 +431,13 @@ public interface Store extends HeapSize, StoreConfigInformation, PropagatingConf
* linear formula.
*/
double getCompactionPressure();
/**
* Replaces the store files that the store has with the given files. Mainly used by
* secondary region replicas to keep up to date with
* the primary region files.
* @throws IOException
*/
void refreshStoreFiles(Collection<String> newFiles) throws IOException;
}

View File

@ -64,6 +64,22 @@ interface StoreFlushContext {
*/
boolean commit(MonitoredTask status) throws IOException;
/**
* Similar to commit, but called in secondary region replicas for replaying the
* flush cache from primary region. Adds the new files to the store, and drops the
* snapshot depending on dropMemstoreSnapshot argument.
* @param fileNames names of the flushed files
* @param dropMemstoreSnapshot whether to drop the prepared memstore snapshot
* @throws IOException
*/
void replayFlush(List<String> fileNames, boolean dropMemstoreSnapshot) throws IOException;
/**
* Abort the snapshot preparation. Drops the snapshot if any.
* @throws IOException
*/
void abort() throws IOException;
/**
* Returns the newly committed files from the flush. Called only if commit returns true
* @return a list of Paths for new files

View File

@ -0,0 +1,53 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver.wal;
import java.io.IOException;
import java.util.List;
import java.util.UUID;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hbase.TableName;
/**
* An HLogKey specific to WalEdits coming from replay.
*/
@InterfaceAudience.Private
public class ReplayHLogKey extends HLogKey {
public ReplayHLogKey(final byte [] encodedRegionName, final TableName tablename,
final long now, List<UUID> clusterIds, long nonceGroup, long nonce) {
super(encodedRegionName, tablename, now, clusterIds, nonceGroup, nonce);
}
public ReplayHLogKey(final byte [] encodedRegionName, final TableName tablename,
long logSeqNum, final long now, List<UUID> clusterIds, long nonceGroup, long nonce) {
super(encodedRegionName, tablename, logSeqNum, now, clusterIds, nonceGroup, nonce);
}
/**
* Returns the original sequence id
* @return long the new assigned sequence number
* @throws InterruptedException
*/
@Override
public long getSequenceId() throws IOException {
return this.getOrigLogSeqNum();
}
}

View File

@ -288,7 +288,7 @@ public class RegionReplicaReplicationEndpoint extends HBaseReplicationEndpoint {
@Override
public List<Path> finishWritingAndClose() throws IOException {
finishWriting();
finishWriting(true);
return null;
}

View File

@ -21,8 +21,8 @@ package org.apache.hadoop.hbase.util;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.client.RegionReplicaUtil;
import org.apache.hadoop.hbase.client.replication.ReplicationAdmin;
@ -96,23 +96,24 @@ public class ServerRegionReplicaUtil extends RegionReplicaUtil {
* @throws IOException
*/
public static StoreFileInfo getStoreFileInfo(Configuration conf, FileSystem fs,
HRegionInfo regionInfo, HRegionInfo regionInfoForFs, String familyName, FileStatus status)
HRegionInfo regionInfo, HRegionInfo regionInfoForFs, String familyName, Path path)
throws IOException {
// if this is a primary region, just return the StoreFileInfo constructed from path
if (regionInfo.equals(regionInfoForFs)) {
return new StoreFileInfo(conf, fs, status);
}
if (StoreFileInfo.isReference(status.getPath())) {
Reference reference = Reference.read(fs, status.getPath());
return new StoreFileInfo(conf, fs, status, reference);
return new StoreFileInfo(conf, fs, path);
}
// else create a store file link. The link file does not exists on filesystem though.
HFileLink link = HFileLink.build(conf, regionInfoForFs.getTable(),
regionInfoForFs.getEncodedName(), familyName, status.getPath().getName());
return new StoreFileInfo(conf, fs, status, link);
regionInfoForFs.getEncodedName(), familyName, path.getName());
if (StoreFileInfo.isReference(path)) {
Reference reference = Reference.read(fs, path);
return new StoreFileInfo(conf, fs, link.getFileStatus(fs), reference);
}
return new StoreFileInfo(conf, fs, link.getFileStatus(fs), link);
}
/**

View File

@ -1201,12 +1201,18 @@ public class WALSplitter {
* @return true when there is no error
* @throws IOException
*/
protected boolean finishWriting() throws IOException {
protected boolean finishWriting(boolean interrupt) throws IOException {
LOG.debug("Waiting for split writer threads to finish");
boolean progress_failed = false;
for (WriterThread t : writerThreads) {
t.finish();
}
if (interrupt) {
for (WriterThread t : writerThreads) {
t.interrupt(); // interrupt the writer threads. We are stopping now.
}
}
for (WriterThread t : writerThreads) {
if (!progress_failed && reporter != null && !reporter.progress()) {
progress_failed = true;
@ -1275,7 +1281,7 @@ public class WALSplitter {
boolean isSuccessful = false;
List<Path> result = null;
try {
isSuccessful = finishWriting();
isSuccessful = finishWriting(false);
} finally {
result = close();
List<IOException> thrown = closeLogWriters(null);
@ -1973,7 +1979,7 @@ public class WALSplitter {
@Override
public List<Path> finishWritingAndClose() throws IOException {
try {
if (!finishWriting()) {
if (!finishWriting(false)) {
return null;
}
if (hasEditsInDisablingOrDisabledTables) {

View File

@ -50,6 +50,7 @@ import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
@ -60,6 +61,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.lang.RandomStringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@ -129,6 +131,8 @@ import org.apache.hadoop.hbase.regionserver.HRegion.RegionScannerImpl;
import org.apache.hadoop.hbase.regionserver.HRegion.RowLock;
import org.apache.hadoop.hbase.regionserver.TestStore.FaultyFileSystem;
import org.apache.hadoop.hbase.regionserver.wal.MetricsWALSource;
import org.apache.hadoop.hbase.regionserver.wal.MetricsWAL;
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
import org.apache.hadoop.hbase.regionserver.wal.WALUtil;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
@ -4729,6 +4733,14 @@ public class TestHRegion {
}
}
static WALFactory createWALFactory(Configuration conf, Path rootDir) throws IOException {
Configuration confForWAL = new Configuration(conf);
confForWAL.set(HConstants.HBASE_DIR, rootDir.toString());
return new WALFactory(confForWAL,
Collections.<WALActionsListener>singletonList(new MetricsWAL()),
"hregion-" + RandomStringUtils.randomNumeric(8));
}
@Test
public void testCompactionFromPrimary() throws IOException {
Path rootDir = new Path(dir + "testRegionReplicaSecondary");
@ -4789,9 +4801,14 @@ public class TestHRegion {
private void putData(HRegion region,
int startRow, int numRows, byte[] qf, byte[]... families) throws IOException {
putData(region, Durability.SKIP_WAL, startRow, numRows, qf, families);
}
static void putData(HRegion region, Durability durability,
int startRow, int numRows, byte[] qf, byte[]... families) throws IOException {
for (int i = startRow; i < startRow + numRows; i++) {
Put put = new Put(Bytes.toBytes("" + i));
put.setDurability(Durability.SKIP_WAL);
put.setDurability(durability);
for (byte[] family : families) {
put.add(family, qf, null);
}
@ -4799,7 +4816,7 @@ public class TestHRegion {
}
}
private void verifyData(HRegion newReg, int startRow, int numRows, byte[] qf, byte[]... families)
static void verifyData(HRegion newReg, int startRow, int numRows, byte[] qf, byte[]... families)
throws IOException {
for (int i = startRow; i < startRow + numRows; i++) {
byte[] row = Bytes.toBytes("" + i);
@ -4818,7 +4835,7 @@ public class TestHRegion {
}
}
private void assertGet(final HRegion r, final byte[] family, final byte[] k) throws IOException {
static void assertGet(final HRegion r, final byte[] family, final byte[] k) throws IOException {
// Now I have k, get values out and assert they are as expected.
Get get = new Get(k).addFamily(family).setMaxVersions();
Cell[] results = r.get(get).rawCells();
@ -4965,7 +4982,7 @@ public class TestHRegion {
return initHRegion(tableName, null, null, callingMethod, conf, isReadOnly, families);
}
private static HRegion initHRegion(byte[] tableName, byte[] startKey, byte[] stopKey,
public static HRegion initHRegion(byte[] tableName, byte[] startKey, byte[] stopKey,
String callingMethod, Configuration conf, boolean isReadOnly, byte[]... families)
throws IOException {
return initHRegion(tableName, startKey, stopKey, callingMethod, conf, isReadOnly,
@ -4984,7 +5001,7 @@ public class TestHRegion {
* @return A region on which you must call
* {@link HRegion#closeHRegion(HRegion)} when done.
*/
private static HRegion initHRegion(byte[] tableName, byte[] startKey, byte[] stopKey,
public static HRegion initHRegion(byte[] tableName, byte[] startKey, byte[] stopKey,
String callingMethod, Configuration conf, boolean isReadOnly, Durability durability,
WAL wal, byte[]... families) throws IOException {
return TEST_UTIL.createLocalHRegion(tableName, startKey, stopKey, callingMethod, conf,
@ -5999,7 +6016,7 @@ public class TestHRegion {
}
}
private static HRegion initHRegion(byte[] tableName, String callingMethod,
static HRegion initHRegion(byte[] tableName, String callingMethod,
byte[]... families) throws IOException {
return initHRegion(tableName, callingMethod, HBaseConfiguration.create(),
families);

View File

@ -327,8 +327,7 @@ public class TestPerColumnFamilyFlush {
return null;
}
@Test (timeout=180000)
public void testLogReplay() throws Exception {
public void doTestLogReplay() throws Exception {
Configuration conf = TEST_UTIL.getConfiguration();
conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 20000);
// Carefully chosen limits so that the memstore just flushes when we're done
@ -418,7 +417,14 @@ public class TestPerColumnFamilyFlush {
@Test (timeout=180000)
public void testLogReplayWithDistributedReplay() throws Exception {
TEST_UTIL.getConfiguration().setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, true);
testLogReplay();
doTestLogReplay();
}
// Test Log Replay with Distributed log split on.
@Test (timeout=180000)
public void testLogReplayWithDistributedLogSplit() throws Exception {
TEST_UTIL.getConfiguration().setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, false);
doTestLogReplay();
}
/**