HBASE-12405 WAL accounting by Store

This commit is contained in:
zhangduo 2015-03-10 12:56:48 +08:00
parent 0654d13403
commit 9b53a1c214
22 changed files with 2746 additions and 1619 deletions

View File

@ -20,9 +20,12 @@
package org.apache.hadoop.hbase;
import java.util.List;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
import org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos;
import org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos.StoreSequenceId;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Strings;
@ -153,6 +156,13 @@ public class RegionLoad {
return regionLoadPB.getCompleteSequenceId();
}
/**
* @return completed sequence id per store.
*/
public List<StoreSequenceId> getStoreCompleteSequenceId() {
return regionLoadPB.getStoreCompleteSequenceIdList();
}
/**
* @return the uncompressed size of the storefiles in MB.
*/

View File

@ -44,6 +44,7 @@ import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionAction;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionActionResult;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ResultOrException;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanResponse;
import org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos.RegionStoreSequenceIds;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameBytesPair;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.EnableCatalogJanitorResponse;
@ -300,8 +301,10 @@ public final class ResponseConverter {
* @return A GetLastFlushedSequenceIdResponse
*/
public static GetLastFlushedSequenceIdResponse buildGetLastFlushedSequenceIdResponse(
long seqId) {
return GetLastFlushedSequenceIdResponse.newBuilder().setLastFlushedSequenceId(seqId).build();
RegionStoreSequenceIds ids) {
return GetLastFlushedSequenceIdResponse.newBuilder()
.setLastFlushedSequenceId(ids.getLastFlushedSequenceId())
.addAllStoreLastFlushedSequenceId(ids.getStoreSequenceIdList()).build();
}
/**

View File

@ -45,8 +45,9 @@ import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.exceptions.DeserializationException;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos;
import org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos.RegionStoreSequenceIds;
import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos;
import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.RegionStoreSequenceIds;
import org.apache.hadoop.hbase.util.ByteStringer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Threads;
@ -2102,10 +2103,10 @@ public class ZKUtil {
*/
public static byte[] regionSequenceIdsToByteArray(final Long regionLastFlushedSequenceId,
final Map<byte[], Long> storeSequenceIds) {
ZooKeeperProtos.RegionStoreSequenceIds.Builder regionSequenceIdsBuilder =
ZooKeeperProtos.RegionStoreSequenceIds.newBuilder();
ZooKeeperProtos.StoreSequenceId.Builder storeSequenceIdBuilder =
ZooKeeperProtos.StoreSequenceId.newBuilder();
ClusterStatusProtos.RegionStoreSequenceIds.Builder regionSequenceIdsBuilder =
ClusterStatusProtos.RegionStoreSequenceIds.newBuilder();
ClusterStatusProtos.StoreSequenceId.Builder storeSequenceIdBuilder =
ClusterStatusProtos.StoreSequenceId.newBuilder();
if (storeSequenceIds != null) {
for (Map.Entry<byte[], Long> e : storeSequenceIds.entrySet()){
byte[] columnFamilyName = e.getKey();
@ -2132,7 +2133,7 @@ public class ZKUtil {
throw new DeserializationException("Unable to parse RegionStoreSequenceIds.");
}
RegionStoreSequenceIds.Builder regionSequenceIdsBuilder =
ZooKeeperProtos.RegionStoreSequenceIds.newBuilder();
ClusterStatusProtos.RegionStoreSequenceIds.newBuilder();
int pblen = ProtobufUtil.lengthOfPBMagic();
RegionStoreSequenceIds storeIds = null;
try {

View File

@ -4496,7 +4496,7 @@ public final class RegionServerStatusProtos {
* <code>required uint64 last_flushed_sequence_id = 1;</code>
*
* <pre>
* the last WAL sequence id flushed from MemStore to HFile for the region
** the last WAL sequence id flushed from MemStore to HFile for the region
* </pre>
*/
boolean hasLastFlushedSequenceId();
@ -4504,10 +4504,55 @@ public final class RegionServerStatusProtos {
* <code>required uint64 last_flushed_sequence_id = 1;</code>
*
* <pre>
* the last WAL sequence id flushed from MemStore to HFile for the region
** the last WAL sequence id flushed from MemStore to HFile for the region
* </pre>
*/
long getLastFlushedSequenceId();
// repeated .StoreSequenceId store_last_flushed_sequence_id = 2;
/**
* <code>repeated .StoreSequenceId store_last_flushed_sequence_id = 2;</code>
*
* <pre>
** the last WAL sequence id flushed from MemStore to HFile for stores of the region
* </pre>
*/
java.util.List<org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos.StoreSequenceId>
getStoreLastFlushedSequenceIdList();
/**
* <code>repeated .StoreSequenceId store_last_flushed_sequence_id = 2;</code>
*
* <pre>
** the last WAL sequence id flushed from MemStore to HFile for stores of the region
* </pre>
*/
org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos.StoreSequenceId getStoreLastFlushedSequenceId(int index);
/**
* <code>repeated .StoreSequenceId store_last_flushed_sequence_id = 2;</code>
*
* <pre>
** the last WAL sequence id flushed from MemStore to HFile for stores of the region
* </pre>
*/
int getStoreLastFlushedSequenceIdCount();
/**
* <code>repeated .StoreSequenceId store_last_flushed_sequence_id = 2;</code>
*
* <pre>
** the last WAL sequence id flushed from MemStore to HFile for stores of the region
* </pre>
*/
java.util.List<? extends org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos.StoreSequenceIdOrBuilder>
getStoreLastFlushedSequenceIdOrBuilderList();
/**
* <code>repeated .StoreSequenceId store_last_flushed_sequence_id = 2;</code>
*
* <pre>
** the last WAL sequence id flushed from MemStore to HFile for stores of the region
* </pre>
*/
org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos.StoreSequenceIdOrBuilder getStoreLastFlushedSequenceIdOrBuilder(
int index);
}
/**
* Protobuf type {@code GetLastFlushedSequenceIdResponse}
@ -4565,6 +4610,14 @@ public final class RegionServerStatusProtos {
lastFlushedSequenceId_ = input.readUInt64();
break;
}
case 18: {
if (!((mutable_bitField0_ & 0x00000002) == 0x00000002)) {
storeLastFlushedSequenceId_ = new java.util.ArrayList<org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos.StoreSequenceId>();
mutable_bitField0_ |= 0x00000002;
}
storeLastFlushedSequenceId_.add(input.readMessage(org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos.StoreSequenceId.PARSER, extensionRegistry));
break;
}
}
}
} catch (com.google.protobuf.InvalidProtocolBufferException e) {
@ -4573,6 +4626,9 @@ public final class RegionServerStatusProtos {
throw new com.google.protobuf.InvalidProtocolBufferException(
e.getMessage()).setUnfinishedMessage(this);
} finally {
if (((mutable_bitField0_ & 0x00000002) == 0x00000002)) {
storeLastFlushedSequenceId_ = java.util.Collections.unmodifiableList(storeLastFlushedSequenceId_);
}
this.unknownFields = unknownFields.build();
makeExtensionsImmutable();
}
@ -4612,7 +4668,7 @@ public final class RegionServerStatusProtos {
* <code>required uint64 last_flushed_sequence_id = 1;</code>
*
* <pre>
* the last WAL sequence id flushed from MemStore to HFile for the region
** the last WAL sequence id flushed from MemStore to HFile for the region
* </pre>
*/
public boolean hasLastFlushedSequenceId() {
@ -4622,15 +4678,72 @@ public final class RegionServerStatusProtos {
* <code>required uint64 last_flushed_sequence_id = 1;</code>
*
* <pre>
* the last WAL sequence id flushed from MemStore to HFile for the region
** the last WAL sequence id flushed from MemStore to HFile for the region
* </pre>
*/
public long getLastFlushedSequenceId() {
return lastFlushedSequenceId_;
}
// repeated .StoreSequenceId store_last_flushed_sequence_id = 2;
public static final int STORE_LAST_FLUSHED_SEQUENCE_ID_FIELD_NUMBER = 2;
private java.util.List<org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos.StoreSequenceId> storeLastFlushedSequenceId_;
/**
* <code>repeated .StoreSequenceId store_last_flushed_sequence_id = 2;</code>
*
* <pre>
** the last WAL sequence id flushed from MemStore to HFile for stores of the region
* </pre>
*/
public java.util.List<org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos.StoreSequenceId> getStoreLastFlushedSequenceIdList() {
return storeLastFlushedSequenceId_;
}
/**
* <code>repeated .StoreSequenceId store_last_flushed_sequence_id = 2;</code>
*
* <pre>
** the last WAL sequence id flushed from MemStore to HFile for stores of the region
* </pre>
*/
public java.util.List<? extends org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos.StoreSequenceIdOrBuilder>
getStoreLastFlushedSequenceIdOrBuilderList() {
return storeLastFlushedSequenceId_;
}
/**
* <code>repeated .StoreSequenceId store_last_flushed_sequence_id = 2;</code>
*
* <pre>
** the last WAL sequence id flushed from MemStore to HFile for stores of the region
* </pre>
*/
public int getStoreLastFlushedSequenceIdCount() {
return storeLastFlushedSequenceId_.size();
}
/**
* <code>repeated .StoreSequenceId store_last_flushed_sequence_id = 2;</code>
*
* <pre>
** the last WAL sequence id flushed from MemStore to HFile for stores of the region
* </pre>
*/
public org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos.StoreSequenceId getStoreLastFlushedSequenceId(int index) {
return storeLastFlushedSequenceId_.get(index);
}
/**
* <code>repeated .StoreSequenceId store_last_flushed_sequence_id = 2;</code>
*
* <pre>
** the last WAL sequence id flushed from MemStore to HFile for stores of the region
* </pre>
*/
public org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos.StoreSequenceIdOrBuilder getStoreLastFlushedSequenceIdOrBuilder(
int index) {
return storeLastFlushedSequenceId_.get(index);
}
private void initFields() {
lastFlushedSequenceId_ = 0L;
storeLastFlushedSequenceId_ = java.util.Collections.emptyList();
}
private byte memoizedIsInitialized = -1;
public final boolean isInitialized() {
@ -4641,6 +4754,12 @@ public final class RegionServerStatusProtos {
memoizedIsInitialized = 0;
return false;
}
for (int i = 0; i < getStoreLastFlushedSequenceIdCount(); i++) {
if (!getStoreLastFlushedSequenceId(i).isInitialized()) {
memoizedIsInitialized = 0;
return false;
}
}
memoizedIsInitialized = 1;
return true;
}
@ -4651,6 +4770,9 @@ public final class RegionServerStatusProtos {
if (((bitField0_ & 0x00000001) == 0x00000001)) {
output.writeUInt64(1, lastFlushedSequenceId_);
}
for (int i = 0; i < storeLastFlushedSequenceId_.size(); i++) {
output.writeMessage(2, storeLastFlushedSequenceId_.get(i));
}
getUnknownFields().writeTo(output);
}
@ -4664,6 +4786,10 @@ public final class RegionServerStatusProtos {
size += com.google.protobuf.CodedOutputStream
.computeUInt64Size(1, lastFlushedSequenceId_);
}
for (int i = 0; i < storeLastFlushedSequenceId_.size(); i++) {
size += com.google.protobuf.CodedOutputStream
.computeMessageSize(2, storeLastFlushedSequenceId_.get(i));
}
size += getUnknownFields().getSerializedSize();
memoizedSerializedSize = size;
return size;
@ -4692,6 +4818,8 @@ public final class RegionServerStatusProtos {
result = result && (getLastFlushedSequenceId()
== other.getLastFlushedSequenceId());
}
result = result && getStoreLastFlushedSequenceIdList()
.equals(other.getStoreLastFlushedSequenceIdList());
result = result &&
getUnknownFields().equals(other.getUnknownFields());
return result;
@ -4709,6 +4837,10 @@ public final class RegionServerStatusProtos {
hash = (37 * hash) + LAST_FLUSHED_SEQUENCE_ID_FIELD_NUMBER;
hash = (53 * hash) + hashLong(getLastFlushedSequenceId());
}
if (getStoreLastFlushedSequenceIdCount() > 0) {
hash = (37 * hash) + STORE_LAST_FLUSHED_SEQUENCE_ID_FIELD_NUMBER;
hash = (53 * hash) + getStoreLastFlushedSequenceIdList().hashCode();
}
hash = (29 * hash) + getUnknownFields().hashCode();
memoizedHashCode = hash;
return hash;
@ -4810,6 +4942,7 @@ public final class RegionServerStatusProtos {
}
private void maybeForceBuilderInitialization() {
if (com.google.protobuf.GeneratedMessage.alwaysUseFieldBuilders) {
getStoreLastFlushedSequenceIdFieldBuilder();
}
}
private static Builder create() {
@ -4820,6 +4953,12 @@ public final class RegionServerStatusProtos {
super.clear();
lastFlushedSequenceId_ = 0L;
bitField0_ = (bitField0_ & ~0x00000001);
if (storeLastFlushedSequenceIdBuilder_ == null) {
storeLastFlushedSequenceId_ = java.util.Collections.emptyList();
bitField0_ = (bitField0_ & ~0x00000002);
} else {
storeLastFlushedSequenceIdBuilder_.clear();
}
return this;
}
@ -4852,6 +4991,15 @@ public final class RegionServerStatusProtos {
to_bitField0_ |= 0x00000001;
}
result.lastFlushedSequenceId_ = lastFlushedSequenceId_;
if (storeLastFlushedSequenceIdBuilder_ == null) {
if (((bitField0_ & 0x00000002) == 0x00000002)) {
storeLastFlushedSequenceId_ = java.util.Collections.unmodifiableList(storeLastFlushedSequenceId_);
bitField0_ = (bitField0_ & ~0x00000002);
}
result.storeLastFlushedSequenceId_ = storeLastFlushedSequenceId_;
} else {
result.storeLastFlushedSequenceId_ = storeLastFlushedSequenceIdBuilder_.build();
}
result.bitField0_ = to_bitField0_;
onBuilt();
return result;
@ -4871,6 +5019,32 @@ public final class RegionServerStatusProtos {
if (other.hasLastFlushedSequenceId()) {
setLastFlushedSequenceId(other.getLastFlushedSequenceId());
}
if (storeLastFlushedSequenceIdBuilder_ == null) {
if (!other.storeLastFlushedSequenceId_.isEmpty()) {
if (storeLastFlushedSequenceId_.isEmpty()) {
storeLastFlushedSequenceId_ = other.storeLastFlushedSequenceId_;
bitField0_ = (bitField0_ & ~0x00000002);
} else {
ensureStoreLastFlushedSequenceIdIsMutable();
storeLastFlushedSequenceId_.addAll(other.storeLastFlushedSequenceId_);
}
onChanged();
}
} else {
if (!other.storeLastFlushedSequenceId_.isEmpty()) {
if (storeLastFlushedSequenceIdBuilder_.isEmpty()) {
storeLastFlushedSequenceIdBuilder_.dispose();
storeLastFlushedSequenceIdBuilder_ = null;
storeLastFlushedSequenceId_ = other.storeLastFlushedSequenceId_;
bitField0_ = (bitField0_ & ~0x00000002);
storeLastFlushedSequenceIdBuilder_ =
com.google.protobuf.GeneratedMessage.alwaysUseFieldBuilders ?
getStoreLastFlushedSequenceIdFieldBuilder() : null;
} else {
storeLastFlushedSequenceIdBuilder_.addAllMessages(other.storeLastFlushedSequenceId_);
}
}
}
this.mergeUnknownFields(other.getUnknownFields());
return this;
}
@ -4880,6 +5054,12 @@ public final class RegionServerStatusProtos {
return false;
}
for (int i = 0; i < getStoreLastFlushedSequenceIdCount(); i++) {
if (!getStoreLastFlushedSequenceId(i).isInitialized()) {
return false;
}
}
return true;
}
@ -4908,7 +5088,7 @@ public final class RegionServerStatusProtos {
* <code>required uint64 last_flushed_sequence_id = 1;</code>
*
* <pre>
* the last WAL sequence id flushed from MemStore to HFile for the region
** the last WAL sequence id flushed from MemStore to HFile for the region
* </pre>
*/
public boolean hasLastFlushedSequenceId() {
@ -4918,7 +5098,7 @@ public final class RegionServerStatusProtos {
* <code>required uint64 last_flushed_sequence_id = 1;</code>
*
* <pre>
* the last WAL sequence id flushed from MemStore to HFile for the region
** the last WAL sequence id flushed from MemStore to HFile for the region
* </pre>
*/
public long getLastFlushedSequenceId() {
@ -4928,7 +5108,7 @@ public final class RegionServerStatusProtos {
* <code>required uint64 last_flushed_sequence_id = 1;</code>
*
* <pre>
* the last WAL sequence id flushed from MemStore to HFile for the region
** the last WAL sequence id flushed from MemStore to HFile for the region
* </pre>
*/
public Builder setLastFlushedSequenceId(long value) {
@ -4941,7 +5121,7 @@ public final class RegionServerStatusProtos {
* <code>required uint64 last_flushed_sequence_id = 1;</code>
*
* <pre>
* the last WAL sequence id flushed from MemStore to HFile for the region
** the last WAL sequence id flushed from MemStore to HFile for the region
* </pre>
*/
public Builder clearLastFlushedSequenceId() {
@ -4951,6 +5131,318 @@ public final class RegionServerStatusProtos {
return this;
}
// repeated .StoreSequenceId store_last_flushed_sequence_id = 2;
private java.util.List<org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos.StoreSequenceId> storeLastFlushedSequenceId_ =
java.util.Collections.emptyList();
private void ensureStoreLastFlushedSequenceIdIsMutable() {
if (!((bitField0_ & 0x00000002) == 0x00000002)) {
storeLastFlushedSequenceId_ = new java.util.ArrayList<org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos.StoreSequenceId>(storeLastFlushedSequenceId_);
bitField0_ |= 0x00000002;
}
}
private com.google.protobuf.RepeatedFieldBuilder<
org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos.StoreSequenceId, org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos.StoreSequenceId.Builder, org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos.StoreSequenceIdOrBuilder> storeLastFlushedSequenceIdBuilder_;
/**
* <code>repeated .StoreSequenceId store_last_flushed_sequence_id = 2;</code>
*
* <pre>
** the last WAL sequence id flushed from MemStore to HFile for stores of the region
* </pre>
*/
public java.util.List<org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos.StoreSequenceId> getStoreLastFlushedSequenceIdList() {
if (storeLastFlushedSequenceIdBuilder_ == null) {
return java.util.Collections.unmodifiableList(storeLastFlushedSequenceId_);
} else {
return storeLastFlushedSequenceIdBuilder_.getMessageList();
}
}
/**
* <code>repeated .StoreSequenceId store_last_flushed_sequence_id = 2;</code>
*
* <pre>
** the last WAL sequence id flushed from MemStore to HFile for stores of the region
* </pre>
*/
public int getStoreLastFlushedSequenceIdCount() {
if (storeLastFlushedSequenceIdBuilder_ == null) {
return storeLastFlushedSequenceId_.size();
} else {
return storeLastFlushedSequenceIdBuilder_.getCount();
}
}
/**
* <code>repeated .StoreSequenceId store_last_flushed_sequence_id = 2;</code>
*
* <pre>
** the last WAL sequence id flushed from MemStore to HFile for stores of the region
* </pre>
*/
public org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos.StoreSequenceId getStoreLastFlushedSequenceId(int index) {
if (storeLastFlushedSequenceIdBuilder_ == null) {
return storeLastFlushedSequenceId_.get(index);
} else {
return storeLastFlushedSequenceIdBuilder_.getMessage(index);
}
}
/**
* <code>repeated .StoreSequenceId store_last_flushed_sequence_id = 2;</code>
*
* <pre>
** the last WAL sequence id flushed from MemStore to HFile for stores of the region
* </pre>
*/
public Builder setStoreLastFlushedSequenceId(
int index, org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos.StoreSequenceId value) {
if (storeLastFlushedSequenceIdBuilder_ == null) {
if (value == null) {
throw new NullPointerException();
}
ensureStoreLastFlushedSequenceIdIsMutable();
storeLastFlushedSequenceId_.set(index, value);
onChanged();
} else {
storeLastFlushedSequenceIdBuilder_.setMessage(index, value);
}
return this;
}
/**
* <code>repeated .StoreSequenceId store_last_flushed_sequence_id = 2;</code>
*
* <pre>
** the last WAL sequence id flushed from MemStore to HFile for stores of the region
* </pre>
*/
public Builder setStoreLastFlushedSequenceId(
int index, org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos.StoreSequenceId.Builder builderForValue) {
if (storeLastFlushedSequenceIdBuilder_ == null) {
ensureStoreLastFlushedSequenceIdIsMutable();
storeLastFlushedSequenceId_.set(index, builderForValue.build());
onChanged();
} else {
storeLastFlushedSequenceIdBuilder_.setMessage(index, builderForValue.build());
}
return this;
}
/**
* <code>repeated .StoreSequenceId store_last_flushed_sequence_id = 2;</code>
*
* <pre>
** the last WAL sequence id flushed from MemStore to HFile for stores of the region
* </pre>
*/
public Builder addStoreLastFlushedSequenceId(org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos.StoreSequenceId value) {
if (storeLastFlushedSequenceIdBuilder_ == null) {
if (value == null) {
throw new NullPointerException();
}
ensureStoreLastFlushedSequenceIdIsMutable();
storeLastFlushedSequenceId_.add(value);
onChanged();
} else {
storeLastFlushedSequenceIdBuilder_.addMessage(value);
}
return this;
}
/**
* <code>repeated .StoreSequenceId store_last_flushed_sequence_id = 2;</code>
*
* <pre>
** the last WAL sequence id flushed from MemStore to HFile for stores of the region
* </pre>
*/
public Builder addStoreLastFlushedSequenceId(
int index, org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos.StoreSequenceId value) {
if (storeLastFlushedSequenceIdBuilder_ == null) {
if (value == null) {
throw new NullPointerException();
}
ensureStoreLastFlushedSequenceIdIsMutable();
storeLastFlushedSequenceId_.add(index, value);
onChanged();
} else {
storeLastFlushedSequenceIdBuilder_.addMessage(index, value);
}
return this;
}
/**
* <code>repeated .StoreSequenceId store_last_flushed_sequence_id = 2;</code>
*
* <pre>
** the last WAL sequence id flushed from MemStore to HFile for stores of the region
* </pre>
*/
public Builder addStoreLastFlushedSequenceId(
org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos.StoreSequenceId.Builder builderForValue) {
if (storeLastFlushedSequenceIdBuilder_ == null) {
ensureStoreLastFlushedSequenceIdIsMutable();
storeLastFlushedSequenceId_.add(builderForValue.build());
onChanged();
} else {
storeLastFlushedSequenceIdBuilder_.addMessage(builderForValue.build());
}
return this;
}
/**
* <code>repeated .StoreSequenceId store_last_flushed_sequence_id = 2;</code>
*
* <pre>
** the last WAL sequence id flushed from MemStore to HFile for stores of the region
* </pre>
*/
public Builder addStoreLastFlushedSequenceId(
int index, org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos.StoreSequenceId.Builder builderForValue) {
if (storeLastFlushedSequenceIdBuilder_ == null) {
ensureStoreLastFlushedSequenceIdIsMutable();
storeLastFlushedSequenceId_.add(index, builderForValue.build());
onChanged();
} else {
storeLastFlushedSequenceIdBuilder_.addMessage(index, builderForValue.build());
}
return this;
}
/**
* <code>repeated .StoreSequenceId store_last_flushed_sequence_id = 2;</code>
*
* <pre>
** the last WAL sequence id flushed from MemStore to HFile for stores of the region
* </pre>
*/
public Builder addAllStoreLastFlushedSequenceId(
java.lang.Iterable<? extends org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos.StoreSequenceId> values) {
if (storeLastFlushedSequenceIdBuilder_ == null) {
ensureStoreLastFlushedSequenceIdIsMutable();
super.addAll(values, storeLastFlushedSequenceId_);
onChanged();
} else {
storeLastFlushedSequenceIdBuilder_.addAllMessages(values);
}
return this;
}
/**
* <code>repeated .StoreSequenceId store_last_flushed_sequence_id = 2;</code>
*
* <pre>
** the last WAL sequence id flushed from MemStore to HFile for stores of the region
* </pre>
*/
public Builder clearStoreLastFlushedSequenceId() {
if (storeLastFlushedSequenceIdBuilder_ == null) {
storeLastFlushedSequenceId_ = java.util.Collections.emptyList();
bitField0_ = (bitField0_ & ~0x00000002);
onChanged();
} else {
storeLastFlushedSequenceIdBuilder_.clear();
}
return this;
}
/**
* <code>repeated .StoreSequenceId store_last_flushed_sequence_id = 2;</code>
*
* <pre>
** the last WAL sequence id flushed from MemStore to HFile for stores of the region
* </pre>
*/
public Builder removeStoreLastFlushedSequenceId(int index) {
if (storeLastFlushedSequenceIdBuilder_ == null) {
ensureStoreLastFlushedSequenceIdIsMutable();
storeLastFlushedSequenceId_.remove(index);
onChanged();
} else {
storeLastFlushedSequenceIdBuilder_.remove(index);
}
return this;
}
/**
* <code>repeated .StoreSequenceId store_last_flushed_sequence_id = 2;</code>
*
* <pre>
** the last WAL sequence id flushed from MemStore to HFile for stores of the region
* </pre>
*/
public org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos.StoreSequenceId.Builder getStoreLastFlushedSequenceIdBuilder(
int index) {
return getStoreLastFlushedSequenceIdFieldBuilder().getBuilder(index);
}
/**
* <code>repeated .StoreSequenceId store_last_flushed_sequence_id = 2;</code>
*
* <pre>
** the last WAL sequence id flushed from MemStore to HFile for stores of the region
* </pre>
*/
public org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos.StoreSequenceIdOrBuilder getStoreLastFlushedSequenceIdOrBuilder(
int index) {
if (storeLastFlushedSequenceIdBuilder_ == null) {
return storeLastFlushedSequenceId_.get(index); } else {
return storeLastFlushedSequenceIdBuilder_.getMessageOrBuilder(index);
}
}
/**
* <code>repeated .StoreSequenceId store_last_flushed_sequence_id = 2;</code>
*
* <pre>
** the last WAL sequence id flushed from MemStore to HFile for stores of the region
* </pre>
*/
public java.util.List<? extends org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos.StoreSequenceIdOrBuilder>
getStoreLastFlushedSequenceIdOrBuilderList() {
if (storeLastFlushedSequenceIdBuilder_ != null) {
return storeLastFlushedSequenceIdBuilder_.getMessageOrBuilderList();
} else {
return java.util.Collections.unmodifiableList(storeLastFlushedSequenceId_);
}
}
/**
* <code>repeated .StoreSequenceId store_last_flushed_sequence_id = 2;</code>
*
* <pre>
** the last WAL sequence id flushed from MemStore to HFile for stores of the region
* </pre>
*/
public org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos.StoreSequenceId.Builder addStoreLastFlushedSequenceIdBuilder() {
return getStoreLastFlushedSequenceIdFieldBuilder().addBuilder(
org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos.StoreSequenceId.getDefaultInstance());
}
/**
* <code>repeated .StoreSequenceId store_last_flushed_sequence_id = 2;</code>
*
* <pre>
** the last WAL sequence id flushed from MemStore to HFile for stores of the region
* </pre>
*/
public org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos.StoreSequenceId.Builder addStoreLastFlushedSequenceIdBuilder(
int index) {
return getStoreLastFlushedSequenceIdFieldBuilder().addBuilder(
index, org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos.StoreSequenceId.getDefaultInstance());
}
/**
* <code>repeated .StoreSequenceId store_last_flushed_sequence_id = 2;</code>
*
* <pre>
** the last WAL sequence id flushed from MemStore to HFile for stores of the region
* </pre>
*/
public java.util.List<org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos.StoreSequenceId.Builder>
getStoreLastFlushedSequenceIdBuilderList() {
return getStoreLastFlushedSequenceIdFieldBuilder().getBuilderList();
}
private com.google.protobuf.RepeatedFieldBuilder<
org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos.StoreSequenceId, org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos.StoreSequenceId.Builder, org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos.StoreSequenceIdOrBuilder>
getStoreLastFlushedSequenceIdFieldBuilder() {
if (storeLastFlushedSequenceIdBuilder_ == null) {
storeLastFlushedSequenceIdBuilder_ = new com.google.protobuf.RepeatedFieldBuilder<
org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos.StoreSequenceId, org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos.StoreSequenceId.Builder, org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos.StoreSequenceIdOrBuilder>(
storeLastFlushedSequenceId_,
((bitField0_ & 0x00000002) == 0x00000002),
getParentForChildren(),
isClean());
storeLastFlushedSequenceId_ = null;
}
return storeLastFlushedSequenceIdBuilder_;
}
// @@protoc_insertion_point(builder_scope:GetLastFlushedSequenceIdResponse)
}
@ -8410,36 +8902,37 @@ public final class RegionServerStatusProtos {
"lErrorRequest\022\033\n\006server\030\001 \002(\0132\013.ServerNa",
"me\022\025\n\rerror_message\030\002 \002(\t\"\034\n\032ReportRSFat" +
"alErrorResponse\"6\n\037GetLastFlushedSequenc" +
"eIdRequest\022\023\n\013region_name\030\001 \002(\014\"D\n GetLa" +
"eIdRequest\022\023\n\013region_name\030\001 \002(\014\"~\n GetLa" +
"stFlushedSequenceIdResponse\022 \n\030last_flus" +
"hed_sequence_id\030\001 \002(\004\"\322\002\n\025RegionStateTra" +
"nsition\022>\n\017transition_code\030\001 \002(\0162%.Regio" +
"nStateTransition.TransitionCode\022 \n\013regio" +
"n_info\030\002 \003(\0132\013.RegionInfo\022\024\n\014open_seq_nu" +
"m\030\003 \001(\004\"\300\001\n\016TransitionCode\022\n\n\006OPENED\020\000\022\017" +
"\n\013FAILED_OPEN\020\001\022\n\n\006CLOSED\020\002\022\022\n\016READY_TO_",
"SPLIT\020\003\022\022\n\016READY_TO_MERGE\020\004\022\016\n\nSPLIT_PON" +
"R\020\005\022\016\n\nMERGE_PONR\020\006\022\t\n\005SPLIT\020\007\022\n\n\006MERGED" +
"\020\010\022\022\n\016SPLIT_REVERTED\020\t\022\022\n\016MERGE_REVERTED" +
"\020\n\"m\n\"ReportRegionStateTransitionRequest" +
"\022\033\n\006server\030\001 \002(\0132\013.ServerName\022*\n\ntransit" +
"ion\030\002 \003(\0132\026.RegionStateTransition\"<\n#Rep" +
"ortRegionStateTransitionResponse\022\025\n\rerro" +
"r_message\030\001 \001(\t2\326\003\n\031RegionServerStatusSe" +
"rvice\022P\n\023RegionServerStartup\022\033.RegionSer" +
"verStartupRequest\032\034.RegionServerStartupR",
"esponse\022M\n\022RegionServerReport\022\032.RegionSe" +
"rverReportRequest\032\033.RegionServerReportRe" +
"sponse\022M\n\022ReportRSFatalError\022\032.ReportRSF" +
"atalErrorRequest\032\033.ReportRSFatalErrorRes" +
"ponse\022_\n\030GetLastFlushedSequenceId\022 .GetL" +
"astFlushedSequenceIdRequest\032!.GetLastFlu" +
"shedSequenceIdResponse\022h\n\033ReportRegionSt" +
"ateTransition\022#.ReportRegionStateTransit" +
"ionRequest\032$.ReportRegionStateTransition" +
"ResponseBN\n*org.apache.hadoop.hbase.prot",
"obuf.generatedB\030RegionServerStatusProtos" +
"H\001\210\001\001\240\001\001"
"hed_sequence_id\030\001 \002(\004\0228\n\036store_last_flus" +
"hed_sequence_id\030\002 \003(\0132\020.StoreSequenceId\"" +
"\322\002\n\025RegionStateTransition\022>\n\017transition_" +
"code\030\001 \002(\0162%.RegionStateTransition.Trans" +
"itionCode\022 \n\013region_info\030\002 \003(\0132\013.RegionI" +
"nfo\022\024\n\014open_seq_num\030\003 \001(\004\"\300\001\n\016Transition",
"Code\022\n\n\006OPENED\020\000\022\017\n\013FAILED_OPEN\020\001\022\n\n\006CLO" +
"SED\020\002\022\022\n\016READY_TO_SPLIT\020\003\022\022\n\016READY_TO_ME" +
"RGE\020\004\022\016\n\nSPLIT_PONR\020\005\022\016\n\nMERGE_PONR\020\006\022\t\n" +
"\005SPLIT\020\007\022\n\n\006MERGED\020\010\022\022\n\016SPLIT_REVERTED\020\t" +
"\022\022\n\016MERGE_REVERTED\020\n\"m\n\"ReportRegionStat" +
"eTransitionRequest\022\033\n\006server\030\001 \002(\0132\013.Ser" +
"verName\022*\n\ntransition\030\002 \003(\0132\026.RegionStat" +
"eTransition\"<\n#ReportRegionStateTransiti" +
"onResponse\022\025\n\rerror_message\030\001 \001(\t2\326\003\n\031Re" +
"gionServerStatusService\022P\n\023RegionServerS",
"tartup\022\033.RegionServerStartupRequest\032\034.Re" +
"gionServerStartupResponse\022M\n\022RegionServe" +
"rReport\022\032.RegionServerReportRequest\032\033.Re" +
"gionServerReportResponse\022M\n\022ReportRSFata" +
"lError\022\032.ReportRSFatalErrorRequest\032\033.Rep" +
"ortRSFatalErrorResponse\022_\n\030GetLastFlushe" +
"dSequenceId\022 .GetLastFlushedSequenceIdRe" +
"quest\032!.GetLastFlushedSequenceIdResponse" +
"\022h\n\033ReportRegionStateTransition\022#.Report" +
"RegionStateTransitionRequest\032$.ReportReg",
"ionStateTransitionResponseBN\n*org.apache" +
".hadoop.hbase.protobuf.generatedB\030Region" +
"ServerStatusProtosH\001\210\001\001\240\001\001"
};
com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@ -8493,7 +8986,7 @@ public final class RegionServerStatusProtos {
internal_static_GetLastFlushedSequenceIdResponse_fieldAccessorTable = new
com.google.protobuf.GeneratedMessage.FieldAccessorTable(
internal_static_GetLastFlushedSequenceIdResponse_descriptor,
new java.lang.String[] { "LastFlushedSequenceId", });
new java.lang.String[] { "LastFlushedSequenceId", "StoreLastFlushedSequenceId", });
internal_static_RegionStateTransition_descriptor =
getDescriptor().getMessageTypes().get(8);
internal_static_RegionStateTransition_fieldAccessorTable = new

View File

@ -59,6 +59,23 @@ message RegionInTransition {
required RegionState region_state = 2;
}
/**
* sequence Id of a store
*/
message StoreSequenceId {
required bytes family_name = 1;
required uint64 sequence_id = 2;
}
/**
* contains a sequence id of a region which should be the minimum of its store sequence ids and
* list of sequence ids of the region's stores
*/
message RegionStoreSequenceIds {
required uint64 last_flushed_sequence_id = 1;
repeated StoreSequenceId store_sequence_id = 2;
}
message RegionLoad {
/** the region specifier */
required RegionSpecifier region_specifier = 1;
@ -115,6 +132,9 @@ message RegionLoad {
optional float data_locality = 16;
optional uint64 last_major_compaction_ts = 17 [default = 0];
/** the most recent sequence Id of store from cache flush */
repeated StoreSequenceId store_complete_sequence_id = 18;
}
/* Server-level protobufs */

View File

@ -74,8 +74,11 @@ message GetLastFlushedSequenceIdRequest {
}
message GetLastFlushedSequenceIdResponse {
/* the last WAL sequence id flushed from MemStore to HFile for the region */
/** the last WAL sequence id flushed from MemStore to HFile for the region */
required uint64 last_flushed_sequence_id = 1;
/** the last WAL sequence id flushed from MemStore to HFile for stores of the region */
repeated StoreSequenceId store_last_flushed_sequence_id = 2;
}
message RegionStateTransition {

View File

@ -152,20 +152,3 @@ message TableLock {
optional string purpose = 5;
optional int64 create_time = 6;
}
/**
* sequence Id of a store
*/
message StoreSequenceId {
required bytes family_name = 1;
required uint64 sequence_id = 2;
}
/**
* contains a sequence id of a region which should be the minimum of its store sequence ids and
* list sequence ids of the region's stores
*/
message RegionStoreSequenceIds {
required uint64 last_flushed_sequence_id = 1;
repeated StoreSequenceId store_sequence_id = 2;
}

View File

@ -20,12 +20,12 @@ package org.apache.hadoop.hbase.coordination;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.SplitLogTask;
import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.RegionStoreSequenceIds;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos.RegionStoreSequenceIds;
import org.apache.hadoop.hbase.regionserver.RegionServerServices;
import org.apache.hadoop.hbase.regionserver.SplitLogWorker;
import org.apache.hadoop.hbase.regionserver.SplitLogWorker.TaskExecutor;

View File

@ -620,7 +620,7 @@ public class ZKSplitLogManagerCoordination extends ZooKeeperListener implements
try {
long lastSequenceId =
this.details.getMaster().getServerManager()
.getLastFlushedSequenceId(regionEncodeName.getBytes());
.getLastFlushedSequenceId(regionEncodeName.getBytes()).getLastFlushedSequenceId();
/*
* znode layout: .../region_id[last known flushed sequence id]/failed server[last known

View File

@ -30,7 +30,6 @@ import org.apache.commons.lang.math.RandomUtils;
import org.apache.commons.lang.mutable.MutableInt;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@ -38,8 +37,9 @@ import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.SplitLogCounters;
import org.apache.hadoop.hbase.SplitLogTask;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.exceptions.DeserializationException;
import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.RegionStoreSequenceIds;
import org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos.RegionStoreSequenceIds;
import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.RegionServerServices;
@ -47,9 +47,9 @@ import org.apache.hadoop.hbase.regionserver.SplitLogWorker;
import org.apache.hadoop.hbase.regionserver.SplitLogWorker.TaskExecutor;
import org.apache.hadoop.hbase.regionserver.handler.FinishRegionRecoveringHandler;
import org.apache.hadoop.hbase.regionserver.handler.WALSplitterHandler;
import org.apache.hadoop.hbase.wal.DefaultWALProvider;
import org.apache.hadoop.hbase.util.CancelableProgressable;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.wal.DefaultWALProvider;
import org.apache.hadoop.hbase.zookeeper.ZKSplitLog;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener;

View File

@ -35,7 +35,6 @@ import org.apache.hadoop.hbase.PleaseHoldException;
import org.apache.hadoop.hbase.ServerLoad;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.TableNotFoundException;
import org.apache.hadoop.hbase.UnknownRegionException;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.TableState;
@ -48,6 +47,7 @@ import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.RequestConverter;
import org.apache.hadoop.hbase.protobuf.ResponseConverter;
import org.apache.hadoop.hbase.protobuf.generated.*;
import org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos.RegionStoreSequenceIds;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameStringPair;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ProcedureDescription;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
@ -270,8 +270,8 @@ public class MasterRpcServices extends RSRpcServices
throw new ServiceException(ioe);
}
byte[] encodedRegionName = request.getRegionName().toByteArray();
long seqId = master.serverManager.getLastFlushedSequenceId(encodedRegionName);
return ResponseConverter.buildGetLastFlushedSequenceIdResponse(seqId);
RegionStoreSequenceIds ids = master.serverManager.getLastFlushedSequenceId(encodedRegionName);
return ResponseConverter.buildGetLastFlushedSequenceIdResponse(ids);
}
@Override
@ -978,8 +978,9 @@ public class MasterRpcServices extends RSRpcServices
ListTableDescriptorsByNamespaceRequest request) throws ServiceException {
try {
ListTableDescriptorsByNamespaceResponse.Builder b =
ListTableDescriptorsByNamespaceResponse.newBuilder();
for(HTableDescriptor htd: master.listTableDescriptorsByNamespace(request.getNamespaceName())) {
ListTableDescriptorsByNamespaceResponse.newBuilder();
for (HTableDescriptor htd : master
.listTableDescriptorsByNamespace(request.getNamespaceName())) {
b.addTableSchema(htd.convert());
}
return b.build();

View File

@ -29,8 +29,8 @@ import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.SortedMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.CopyOnWriteArrayList;
@ -38,6 +38,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.ClockOutOfSyncException;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.NotServingRegionException;
import org.apache.hadoop.hbase.RegionLoad;
@ -45,9 +46,11 @@ import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.ServerLoad;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.YouAreDeadException;
import org.apache.hadoop.hbase.ZooKeeperConnectionException;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.ClusterConnection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.RetriesExhaustedException;
import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer;
import org.apache.hadoop.hbase.master.handler.MetaServerShutdownHandler;
import org.apache.hadoop.hbase.master.handler.ServerShutdownHandler;
@ -59,6 +62,8 @@ import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.OpenRegionRequest;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.OpenRegionResponse;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.ServerInfo;
import org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos.RegionStoreSequenceIds;
import org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos.StoreSequenceId;
import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.RegionOpeningState;
@ -71,6 +76,7 @@ import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.apache.zookeeper.KeeperException;
import com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.ByteString;
import com.google.protobuf.ServiceException;
/**
@ -114,9 +120,13 @@ public class ServerManager {
// Set if we are to shutdown the cluster.
private volatile boolean clusterShutdown = false;
private final SortedMap<byte[], Long> flushedSequenceIdByRegion =
private final ConcurrentNavigableMap<byte[], Long> flushedSequenceIdByRegion =
new ConcurrentSkipListMap<byte[], Long>(Bytes.BYTES_COMPARATOR);
private final ConcurrentNavigableMap<byte[], ConcurrentNavigableMap<byte[], Long>>
storeFlushedSequenceIdsByRegion =
new ConcurrentSkipListMap<byte[], ConcurrentNavigableMap<byte[], Long>>(Bytes.BYTES_COMPARATOR);
/** Map of registered servers to their current load */
private final ConcurrentHashMap<ServerName, ServerLoad> onlineServers =
new ConcurrentHashMap<ServerName, ServerLoad>();
@ -256,6 +266,18 @@ public class ServerManager {
return sn;
}
private ConcurrentNavigableMap<byte[], Long> getOrCreateStoreFlushedSequenceId(
byte[] regionName) {
ConcurrentNavigableMap<byte[], Long> storeFlushedSequenceId =
storeFlushedSequenceIdsByRegion.get(regionName);
if (storeFlushedSequenceId != null) {
return storeFlushedSequenceId;
}
storeFlushedSequenceId = new ConcurrentSkipListMap<byte[], Long>(Bytes.BYTES_COMPARATOR);
ConcurrentNavigableMap<byte[], Long> alreadyPut =
storeFlushedSequenceIdsByRegion.putIfAbsent(regionName, storeFlushedSequenceId);
return alreadyPut == null ? storeFlushedSequenceId : alreadyPut;
}
/**
* Updates last flushed sequence Ids for the regions on server sn
* @param sn
@ -267,18 +289,25 @@ public class ServerManager {
byte[] encodedRegionName = Bytes.toBytes(HRegionInfo.encodeRegionName(entry.getKey()));
Long existingValue = flushedSequenceIdByRegion.get(encodedRegionName);
long l = entry.getValue().getCompleteSequenceId();
if (existingValue != null) {
if (l != -1 && l < existingValue) {
LOG.warn("RegionServer " + sn +
" indicates a last flushed sequence id (" + entry.getValue() +
") that is less than the previous last flushed sequence id (" +
existingValue + ") for region " +
Bytes.toString(entry.getKey()) + " Ignoring.");
continue; // Don't let smaller sequence ids override greater sequence ids.
// Don't let smaller sequence ids override greater sequence ids.
if (existingValue == null || (l != HConstants.NO_SEQNUM && l > existingValue)) {
flushedSequenceIdByRegion.put(encodedRegionName, l);
} else if (l != HConstants.NO_SEQNUM && l < existingValue) {
LOG.warn("RegionServer " + sn + " indicates a last flushed sequence id ("
+ l + ") that is less than the previous last flushed sequence id ("
+ existingValue + ") for region " + Bytes.toString(entry.getKey()) + " Ignoring.");
}
ConcurrentNavigableMap<byte[], Long> storeFlushedSequenceId =
getOrCreateStoreFlushedSequenceId(encodedRegionName);
for (StoreSequenceId storeSeqId : entry.getValue().getStoreCompleteSequenceId()) {
byte[] family = storeSeqId.getFamilyName().toByteArray();
existingValue = storeFlushedSequenceId.get(family);
l = storeSeqId.getSequenceId();
// Don't let smaller sequence ids override greater sequence ids.
if (existingValue == null || (l != HConstants.NO_SEQNUM && l > existingValue.longValue())) {
storeFlushedSequenceId.put(family, l);
}
}
flushedSequenceIdByRegion.put(encodedRegionName, l);
}
}
@ -417,12 +446,20 @@ public class ServerManager {
this.rsAdmins.remove(serverName);
}
public long getLastFlushedSequenceId(byte[] encodedRegionName) {
long seqId = -1L;
if (flushedSequenceIdByRegion.containsKey(encodedRegionName)) {
seqId = flushedSequenceIdByRegion.get(encodedRegionName);
public RegionStoreSequenceIds getLastFlushedSequenceId(byte[] encodedRegionName) {
RegionStoreSequenceIds.Builder builder = RegionStoreSequenceIds.newBuilder();
Long seqId = flushedSequenceIdByRegion.get(encodedRegionName);
builder.setLastFlushedSequenceId(seqId != null ? seqId.longValue() : HConstants.NO_SEQNUM);
Map<byte[], Long> storeFlushedSequenceId =
storeFlushedSequenceIdsByRegion.get(encodedRegionName);
if (storeFlushedSequenceId != null) {
for (Map.Entry<byte[], Long> entry : storeFlushedSequenceId.entrySet()) {
builder.addStoreSequenceId(StoreSequenceId.newBuilder()
.setFamilyName(ByteString.copyFrom(entry.getKey()))
.setSequenceId(entry.getValue().longValue()).build());
}
}
return seqId;
return builder.build();
}
/**

View File

@ -129,6 +129,8 @@ import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetRegionInfoResponse.CompactionState;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceCall;
import org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos.RegionLoad;
import org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos.StoreSequenceId;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor;
@ -179,6 +181,7 @@ import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.io.Closeables;
import com.google.protobuf.ByteString;
import com.google.protobuf.Descriptors;
import com.google.protobuf.Message;
import com.google.protobuf.RpcCallback;
@ -247,8 +250,12 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
* The max sequence id of flushed data on this region. Used doing some rough calculations on
* whether time to flush or not.
*/
protected volatile long maxFlushedSeqId = -1L;
private volatile long maxFlushedSeqId = HConstants.NO_SEQNUM;
/**
* Record the sequence id of last flush operation.
*/
private volatile long lastFlushOpSeqId = HConstants.NO_SEQNUM;
/**
* Region scoped edit sequence Id. Edits to this region are GUARANTEED to appear in the WAL
* file in this sequence id's order; i.e. edit #2 will be in the WAL after edit #1.
@ -1646,6 +1653,23 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
return result == Long.MAX_VALUE ? 0 : result;
}
RegionLoad.Builder setCompleteSequenceId(RegionLoad.Builder regionLoadBldr) {
long lastFlushOpSeqIdLocal = this.lastFlushOpSeqId;
byte[] encodedRegionName = this.getRegionInfo().getEncodedNameAsBytes();
regionLoadBldr.clearStoreCompleteSequenceId();
for (byte[] familyName : this.stores.keySet()) {
long oldestUnflushedSeqId = this.wal.getEarliestMemstoreSeqNum(encodedRegionName, familyName);
// no oldestUnflushedSeqId means no data has written to the store after last flush, so we use
// lastFlushOpSeqId as complete sequence id for the store.
regionLoadBldr.addStoreCompleteSequenceId(StoreSequenceId
.newBuilder()
.setFamilyName(ByteString.copyFrom(familyName))
.setSequenceId(
oldestUnflushedSeqId < 0 ? lastFlushOpSeqIdLocal : oldestUnflushedSeqId - 1).build());
}
return regionLoadBldr.setCompleteSequenceId(this.maxFlushedSeqId);
}
//////////////////////////////////////////////////////////////////////////////
// HRegion maintenance.
//
@ -2109,11 +2133,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
// wal can be null replaying edits.
if (wal != null) {
w = mvcc.beginMemstoreInsert();
long flushSeqId = getNextSequenceId(wal);
long flushOpSeqId = getNextSequenceId(wal);
FlushResult flushResult = new FlushResult(
FlushResult.Result.CANNOT_FLUSH_MEMSTORE_EMPTY, flushSeqId, "Nothing to flush",
FlushResult.Result.CANNOT_FLUSH_MEMSTORE_EMPTY, flushOpSeqId, "Nothing to flush",
writeFlushRequestMarkerToWAL(wal, writeFlushWalMarker));
w.setWriteNumber(flushSeqId);
w.setWriteNumber(flushOpSeqId);
mvcc.waitForPreviousTransactionsComplete(w);
w = null;
return new PrepareFlushResult(flushResult, myseqid);
@ -2394,6 +2418,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
// Update the oldest unflushed sequence id for region.
this.maxFlushedSeqId = flushedSeqId;
// Record flush operation sequence id.
this.lastFlushOpSeqId = flushOpSeqId;
// C. Finally notify anyone waiting on memstore to clear:
// e.g. checkResources().
synchronized (this) {

View File

@ -111,12 +111,14 @@ import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServic
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceResponse;
import org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos;
import org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos.RegionLoad;
import org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos.RegionStoreSequenceIds;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.Coprocessor;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameStringPair;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionServerInfo;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.GetLastFlushedSequenceIdRequest;
import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.GetLastFlushedSequenceIdResponse;
import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionServerReportRequest;
import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionServerStartupRequest;
import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionServerStartupResponse;
@ -1424,7 +1426,8 @@ public class HRegionServer extends HasThread implements
}
regionSpecifier.setType(RegionSpecifierType.REGION_NAME);
regionSpecifier.setValue(ByteStringer.wrap(name));
regionLoadBldr.setRegionSpecifier(regionSpecifier.build())
r.setCompleteSequenceId(regionLoadBldr)
.setRegionSpecifier(regionSpecifier.build())
.setStores(stores)
.setStorefiles(storefiles)
.setStoreUncompressedSizeMB(storeUncompressedSizeMB)
@ -1438,9 +1441,9 @@ public class HRegionServer extends HasThread implements
.setWriteRequestsCount(r.writeRequestsCount.get())
.setTotalCompactingKVs(totalCompactingKVs)
.setCurrentCompactedKVs(currentCompactedKVs)
.setCompleteSequenceId(r.maxFlushedSeqId)
.setDataLocality(dataLocality)
.setLastMajorCompactionTs(r.getOldestHfileTs(true));
return regionLoadBldr.build();
}
@ -2228,30 +2231,30 @@ public class HRegionServer extends HasThread implements
}
@Override
public long getLastSequenceId(byte[] encodedRegionName) {
long lastFlushedSequenceId = -1L;
public RegionStoreSequenceIds getLastSequenceId(byte[] encodedRegionName) {
try {
GetLastFlushedSequenceIdRequest req = RequestConverter
.buildGetLastFlushedSequenceIdRequest(encodedRegionName);
GetLastFlushedSequenceIdRequest req =
RequestConverter.buildGetLastFlushedSequenceIdRequest(encodedRegionName);
RegionServerStatusService.BlockingInterface rss = rssStub;
if (rss == null) { // Try to connect one more time
createRegionServerStatusStub();
rss = rssStub;
if (rss == null) {
// Still no luck, we tried
LOG.warn("Unable to connect to the master to check "
+ "the last flushed sequence id");
return -1L;
LOG.warn("Unable to connect to the master to check " + "the last flushed sequence id");
return RegionStoreSequenceIds.newBuilder().setLastFlushedSequenceId(HConstants.NO_SEQNUM)
.build();
}
}
lastFlushedSequenceId = rss.getLastFlushedSequenceId(null, req)
.getLastFlushedSequenceId();
GetLastFlushedSequenceIdResponse resp = rss.getLastFlushedSequenceId(null, req);
return RegionStoreSequenceIds.newBuilder()
.setLastFlushedSequenceId(resp.getLastFlushedSequenceId())
.addAllStoreSequenceId(resp.getStoreLastFlushedSequenceIdList()).build();
} catch (ServiceException e) {
lastFlushedSequenceId = -1l;
LOG.warn("Unable to connect to the master to check "
+ "the last flushed sequence id", e);
LOG.warn("Unable to connect to the master to check the last flushed sequence id", e);
return RegionStoreSequenceIds.newBuilder().setLastFlushedSequenceId(HConstants.NO_SEQNUM)
.build();
}
return lastFlushedSequenceId;
}
/**

View File

@ -19,15 +19,19 @@
package org.apache.hadoop.hbase.regionserver;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos.RegionStoreSequenceIds;
/**
* Last flushed sequence Ids for the regions on region server
* Last flushed sequence Ids for the regions and their stores on region server
*/
@InterfaceAudience.Private
public interface LastSequenceId {
/**
* @param encodedRegionName Encoded region name
* @return Last flushed sequence Id for region or -1 if it can't be determined
* @return Last flushed sequence Id for region and its stores. Id will be -1 if it can't be
* determined
*/
long getLastSequenceId(byte[] encodedRegionName);
RegionStoreSequenceIds getLastSequenceId(byte[] encodedRegionName);
}

View File

@ -56,6 +56,7 @@ class FSWALEntry extends Entry {
private final transient HTableDescriptor htd;
private final transient HRegionInfo hri;
private final transient List<Cell> memstoreCells;
private final Set<byte[]> familyNames;
FSWALEntry(final long sequence, final WALKey key, final WALEdit edit,
final AtomicLong referenceToRegionSequenceId, final boolean inMemstore,
@ -67,6 +68,23 @@ class FSWALEntry extends Entry {
this.hri = hri;
this.sequence = sequence;
this.memstoreCells = memstoreCells;
if (inMemstore) {
// construct familyNames here to reduce the work of log sinker.
ArrayList<Cell> cells = this.getEdit().getCells();
if (CollectionUtils.isEmpty(cells)) {
this.familyNames = Collections.<byte[]> emptySet();
} else {
Set<byte[]> familySet = Sets.newTreeSet(Bytes.BYTES_COMPARATOR);
for (Cell cell : cells) {
if (!CellUtil.matchingFamily(cell, WALEdit.METAFAMILY)) {
familySet.add(CellUtil.cloneFamily(cell));
}
}
this.familyNames = Collections.unmodifiableSet(familySet);
}
} else {
this.familyNames = Collections.<byte[]> emptySet();
}
}
public String toString() {
@ -118,16 +136,6 @@ class FSWALEntry extends Entry {
* @return the family names which are effected by this edit.
*/
Set<byte[]> getFamilyNames() {
ArrayList<Cell> cells = this.getEdit().getCells();
if (CollectionUtils.isEmpty(cells)) {
return Collections.<byte[]>emptySet();
}
Set<byte[]> familySet = Sets.newTreeSet(Bytes.BYTES_COMPARATOR);
for (Cell cell : cells) {
if (!CellUtil.matchingFamily(cell, WALEdit.METAFAMILY)) {
familySet.add(CellUtil.cloneFamily(cell));
}
}
return familySet;
return familyNames;
}
}

View File

@ -92,11 +92,11 @@ import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetRegionInfoReque
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetRegionInfoResponse;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WALEntry;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.MutationType;
import org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos.RegionStoreSequenceIds;
import org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos.StoreSequenceId;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor;
import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.RegionStoreSequenceIds;
import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode;
import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.StoreSequenceId;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.LastSequenceId;
// imports for things that haven't moved from regionserver.wal yet.
@ -326,7 +326,14 @@ public class WALSplitter {
lastFlushedSequenceId = ids.getLastFlushedSequenceId();
}
} else if (sequenceIdChecker != null) {
lastFlushedSequenceId = sequenceIdChecker.getLastSequenceId(region);
RegionStoreSequenceIds ids = sequenceIdChecker.getLastSequenceId(region);
Map<byte[], Long> maxSeqIdInStores = new TreeMap<byte[], Long>(Bytes.BYTES_COMPARATOR);
for (StoreSequenceId storeSeqId : ids.getStoreSequenceIdList()) {
maxSeqIdInStores.put(storeSeqId.getFamilyName().toByteArray(),
storeSeqId.getSequenceId());
}
regionMaxSeqIdInStores.put(key, maxSeqIdInStores);
lastFlushedSequenceId = ids.getLastFlushedSequenceId();
}
if (lastFlushedSequenceId == null) {
lastFlushedSequenceId = -1L;
@ -1479,6 +1486,29 @@ public class WALSplitter {
return (new WriterAndPath(regionedits, w));
}
private void filterCellByStore(Entry logEntry) {
Map<byte[], Long> maxSeqIdInStores =
regionMaxSeqIdInStores.get(Bytes.toString(logEntry.getKey().getEncodedRegionName()));
if (maxSeqIdInStores == null || maxSeqIdInStores.isEmpty()) {
return;
}
List<Cell> skippedCells = new ArrayList<Cell>();
for (Cell cell : logEntry.getEdit().getCells()) {
if (!CellUtil.matchingFamily(cell, WALEdit.METAFAMILY)) {
byte[] family = CellUtil.cloneFamily(cell);
Long maxSeqId = maxSeqIdInStores.get(family);
// Do not skip cell even if maxSeqId is null. Maybe we are in a rolling upgrade,
// or the master was crashed before and we can not get the information.
if (maxSeqId != null && maxSeqId.longValue() >= logEntry.getKey().getLogSeqNum()) {
skippedCells.add(cell);
}
}
}
if (!skippedCells.isEmpty()) {
logEntry.getEdit().getCells().removeAll(skippedCells);
}
}
@Override
public void append(RegionEntryBuffer buffer) throws IOException {
List<Entry> entries = buffer.entryBuffer;
@ -1503,7 +1533,10 @@ public class WALSplitter {
return;
}
}
wap.w.append(logEntry);
filterCellByStore(logEntry);
if (!logEntry.getEdit().isEmpty()) {
wap.w.append(logEntry);
}
this.updateRegionMaximumEditLogSeqNum(logEntry);
editsCount++;
}
@ -1695,8 +1728,8 @@ public class WALSplitter {
HConnection hconn = this.getConnectionByTableName(table);
for (Cell cell : cells) {
byte[] row = cell.getRow();
byte[] family = cell.getFamily();
byte[] row = CellUtil.cloneRow(cell);
byte[] family = CellUtil.cloneFamily(cell);
boolean isCompactionEntry = false;
if (CellUtil.matchingFamily(cell, WALEdit.METAFAMILY)) {
CompactionDescriptor compaction = WALEdit.getCompaction(cell);

View File

@ -26,12 +26,12 @@ import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.exceptions.DeserializationException;
import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.RegionStoreSequenceIds;
import org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos.RegionStoreSequenceIds;
import org.apache.zookeeper.KeeperException;
/**

View File

@ -31,6 +31,7 @@ import org.apache.hadoop.hbase.NamespaceDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos.RegionStoreSequenceIds;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.testclassification.MediumTests;
@ -86,14 +87,20 @@ public class TestGetLastFlushedSequenceId {
}
assertNotNull(region);
Thread.sleep(2000);
assertEquals(
HConstants.NO_SEQNUM,
testUtil.getHBaseCluster().getMaster()
.getLastSequenceId(region.getRegionInfo().getEncodedNameAsBytes()));
RegionStoreSequenceIds ids =
testUtil.getHBaseCluster().getMaster()
.getLastSequenceId(region.getRegionInfo().getEncodedNameAsBytes());
assertEquals(HConstants.NO_SEQNUM, ids.getLastFlushedSequenceId());
long storeSequenceId = ids.getStoreSequenceId(0).getSequenceId();
assertTrue(storeSequenceId > 0);
testUtil.getHBaseAdmin().flush(tableName);
Thread.sleep(2000);
assertTrue(testUtil.getHBaseCluster().getMaster()
.getLastSequenceId(region.getRegionInfo().getEncodedNameAsBytes()) > 0);
ids =
testUtil.getHBaseCluster().getMaster()
.getLastSequenceId(region.getRegionInfo().getEncodedNameAsBytes());
assertTrue(ids.getLastFlushedSequenceId() + " > " + storeSequenceId,
ids.getLastFlushedSequenceId() > storeSequenceId);
assertEquals(ids.getLastFlushedSequenceId(), ids.getStoreSequenceId(0).getSequenceId());
table.close();
}
}

View File

@ -71,18 +71,18 @@ public class TestPerColumnFamilyFlush {
public static final TableName TABLENAME = TableName.valueOf("TestPerColumnFamilyFlush", "t1");
public static final byte[][] families = { Bytes.toBytes("f1"), Bytes.toBytes("f2"),
public static final byte[][] FAMILIES = { Bytes.toBytes("f1"), Bytes.toBytes("f2"),
Bytes.toBytes("f3"), Bytes.toBytes("f4"), Bytes.toBytes("f5") };
public static final byte[] FAMILY1 = families[0];
public static final byte[] FAMILY1 = FAMILIES[0];
public static final byte[] FAMILY2 = families[1];
public static final byte[] FAMILY2 = FAMILIES[1];
public static final byte[] FAMILY3 = families[2];
public static final byte[] FAMILY3 = FAMILIES[2];
private HRegion initHRegion(String callingMethod, Configuration conf) throws IOException {
HTableDescriptor htd = new HTableDescriptor(TABLENAME);
for (byte[] family : families) {
for (byte[] family : FAMILIES) {
htd.addFamily(new HColumnDescriptor(family));
}
HRegionInfo info = new HRegionInfo(TABLENAME, null, null, false);
@ -96,7 +96,7 @@ public class TestPerColumnFamilyFlush {
byte[] row = Bytes.toBytes("row" + familyNum + "-" + putNum);
byte[] val = Bytes.toBytes("val" + familyNum + "-" + putNum);
Put p = new Put(row);
p.add(families[familyNum - 1], qf, val);
p.addColumn(FAMILIES[familyNum - 1], qf, val);
return p;
}
@ -109,7 +109,7 @@ public class TestPerColumnFamilyFlush {
// A helper function to verify edits.
void verifyEdit(int familyNum, int putNum, HTable table) throws IOException {
Result r = table.get(createGet(familyNum, putNum));
byte[] family = families[familyNum - 1];
byte[] family = FAMILIES[familyNum - 1];
byte[] qf = Bytes.toBytes("q" + familyNum);
byte[] val = Bytes.toBytes("val" + familyNum + "-" + putNum);
assertNotNull(("Missing Put#" + putNum + " for CF# " + familyNum), r.getFamilyMap(family));
@ -327,7 +327,7 @@ public class TestPerColumnFamilyFlush {
return null;
}
public void doTestLogReplay() throws Exception {
private void doTestLogReplay() throws Exception {
Configuration conf = TEST_UTIL.getConfiguration();
conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 20000);
// Carefully chosen limits so that the memstore just flushes when we're done
@ -338,10 +338,10 @@ public class TestPerColumnFamilyFlush {
TEST_UTIL.startMiniCluster(numRegionServers);
TEST_UTIL.getHBaseAdmin().createNamespace(
NamespaceDescriptor.create(TABLENAME.getNamespaceAsString()).build());
HTable table = TEST_UTIL.createTable(TABLENAME, families);
HTable table = TEST_UTIL.createTable(TABLENAME, FAMILIES);
HTableDescriptor htd = table.getTableDescriptor();
for (byte[] family : families) {
for (byte[] family : FAMILIES) {
if (!htd.hasFamily(family)) {
htd.addFamily(new HColumnDescriptor(family));
}
@ -455,7 +455,7 @@ public class TestPerColumnFamilyFlush {
try {
TEST_UTIL.startMiniCluster(numRegionServers);
HTable table = null;
table = TEST_UTIL.createTable(tableName, families);
table = TEST_UTIL.createTable(tableName, FAMILIES);
// Force flush the namespace table so edits to it are not hanging around as oldest
// edits. Otherwise, below, when we make maximum number of WAL files, then it will be
// the namespace region that is flushed and not the below 'desiredRegion'.
@ -521,9 +521,9 @@ public class TestPerColumnFamilyFlush {
rand.nextBytes(value1);
rand.nextBytes(value2);
rand.nextBytes(value3);
put.add(FAMILY1, qf, value1);
put.add(FAMILY2, qf, value2);
put.add(FAMILY3, qf, value3);
put.addColumn(FAMILY1, qf, value1);
put.addColumn(FAMILY2, qf, value2);
put.addColumn(FAMILY3, qf, value3);
table.put(put);
// slow down to let regionserver flush region.
while (region.getMemstoreSize().get() > memstoreFlushSize) {
@ -650,9 +650,9 @@ public class TestPerColumnFamilyFlush {
rand.nextBytes(value1);
rand.nextBytes(value2);
rand.nextBytes(value3);
put.add(FAMILY1, qf, value1);
put.add(FAMILY2, qf, value2);
put.add(FAMILY3, qf, value3);
put.addColumn(FAMILY1, qf, value1);
put.addColumn(FAMILY2, qf, value2);
put.addColumn(FAMILY3, qf, value3);
table.put(put);
if (i % 10000 == 0) {
LOG.info(i + " rows put");