HBASE-2231 Compaction events should be written to HLog (Stack & Enis)

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1476414 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Enis Soztutar 2013-04-26 22:05:33 +00:00
parent 13297c9a07
commit e2868556df
17 changed files with 1779 additions and 203 deletions

View File

@ -35,19 +35,8 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.NavigableSet;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.ListMultimap;
import com.google.common.collect.Lists;
import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.Message;
import com.google.protobuf.RpcChannel;
import com.google.protobuf.Service;
import com.google.protobuf.ServiceException;
import com.google.protobuf.TextFormat;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.CellUtil;
@ -63,13 +52,13 @@ import org.apache.hadoop.hbase.client.AdminProtocol;
import org.apache.hadoop.hbase.client.Append;
import org.apache.hadoop.hbase.client.ClientProtocol;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Increment;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
import org.apache.hadoop.hbase.exceptions.DeserializationException;
import org.apache.hadoop.hbase.exceptions.DoNotRetryIOException;
@ -117,6 +106,7 @@ import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.Re
import org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos;
import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.CreateTableRequest;
import org.apache.hadoop.hbase.protobuf.generated.MasterMonitorProtos.GetTableDescriptorsResponse;
import org.apache.hadoop.hbase.protobuf.generated.WAL.CompactionDescriptor;
import org.apache.hadoop.hbase.security.access.Permission;
import org.apache.hadoop.hbase.security.access.TablePermission;
import org.apache.hadoop.hbase.security.access.UserPermission;
@ -128,6 +118,17 @@ import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.token.Token;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.ListMultimap;
import com.google.common.collect.Lists;
import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.Message;
import com.google.protobuf.RpcChannel;
import com.google.protobuf.Service;
import com.google.protobuf.ServiceException;
import com.google.protobuf.TextFormat;
/**
* Protobufs utility.
*/
@ -1997,4 +1998,23 @@ public final class ProtobufUtil {
throw new IOException(e);
}
}
public static CompactionDescriptor toCompactionDescriptor(HRegionInfo info, byte[] family,
List<Path> inputPaths, List<Path> outputPaths, Path storeDir) {
// compaction descriptor contains relative paths.
// input / output paths are relative to the store dir
// store dir is relative to region dir
CompactionDescriptor.Builder builder = CompactionDescriptor.newBuilder()
.setTableName(ByteString.copyFrom(info.getTableName()))
.setEncodedRegionName(ByteString.copyFrom(info.getEncodedNameAsBytes()))
.setFamilyName(ByteString.copyFrom(family))
.setStoreHomeDir(storeDir.getName()); //make relative
for (Path inputPath : inputPaths) {
builder.addCompactionInput(inputPath.getName()); //relative path
}
for (Path outputPath : outputPaths) {
builder.addCompactionOutput(outputPath.getName());
}
return builder.build();
}
}

View File

@ -0,0 +1,938 @@
// Generated by the protocol buffer compiler. DO NOT EDIT!
// source: WAL.proto
package org.apache.hadoop.hbase.protobuf.generated;
public final class WAL {
private WAL() {}
public static void registerAllExtensions(
com.google.protobuf.ExtensionRegistry registry) {
}
public interface CompactionDescriptorOrBuilder
extends com.google.protobuf.MessageOrBuilder {
// required bytes tableName = 1;
boolean hasTableName();
com.google.protobuf.ByteString getTableName();
// required bytes encodedRegionName = 2;
boolean hasEncodedRegionName();
com.google.protobuf.ByteString getEncodedRegionName();
// required bytes familyName = 3;
boolean hasFamilyName();
com.google.protobuf.ByteString getFamilyName();
// repeated string compactionInput = 4;
java.util.List<String> getCompactionInputList();
int getCompactionInputCount();
String getCompactionInput(int index);
// repeated string compactionOutput = 5;
java.util.List<String> getCompactionOutputList();
int getCompactionOutputCount();
String getCompactionOutput(int index);
// required string storeHomeDir = 6;
boolean hasStoreHomeDir();
String getStoreHomeDir();
}
public static final class CompactionDescriptor extends
com.google.protobuf.GeneratedMessage
implements CompactionDescriptorOrBuilder {
// Use CompactionDescriptor.newBuilder() to construct.
private CompactionDescriptor(Builder builder) {
super(builder);
}
private CompactionDescriptor(boolean noInit) {}
private static final CompactionDescriptor defaultInstance;
public static CompactionDescriptor getDefaultInstance() {
return defaultInstance;
}
public CompactionDescriptor getDefaultInstanceForType() {
return defaultInstance;
}
public static final com.google.protobuf.Descriptors.Descriptor
getDescriptor() {
return org.apache.hadoop.hbase.protobuf.generated.WAL.internal_static_CompactionDescriptor_descriptor;
}
protected com.google.protobuf.GeneratedMessage.FieldAccessorTable
internalGetFieldAccessorTable() {
return org.apache.hadoop.hbase.protobuf.generated.WAL.internal_static_CompactionDescriptor_fieldAccessorTable;
}
private int bitField0_;
// required bytes tableName = 1;
public static final int TABLENAME_FIELD_NUMBER = 1;
private com.google.protobuf.ByteString tableName_;
public boolean hasTableName() {
return ((bitField0_ & 0x00000001) == 0x00000001);
}
public com.google.protobuf.ByteString getTableName() {
return tableName_;
}
// required bytes encodedRegionName = 2;
public static final int ENCODEDREGIONNAME_FIELD_NUMBER = 2;
private com.google.protobuf.ByteString encodedRegionName_;
public boolean hasEncodedRegionName() {
return ((bitField0_ & 0x00000002) == 0x00000002);
}
public com.google.protobuf.ByteString getEncodedRegionName() {
return encodedRegionName_;
}
// required bytes familyName = 3;
public static final int FAMILYNAME_FIELD_NUMBER = 3;
private com.google.protobuf.ByteString familyName_;
public boolean hasFamilyName() {
return ((bitField0_ & 0x00000004) == 0x00000004);
}
public com.google.protobuf.ByteString getFamilyName() {
return familyName_;
}
// repeated string compactionInput = 4;
public static final int COMPACTIONINPUT_FIELD_NUMBER = 4;
private com.google.protobuf.LazyStringList compactionInput_;
public java.util.List<String>
getCompactionInputList() {
return compactionInput_;
}
public int getCompactionInputCount() {
return compactionInput_.size();
}
public String getCompactionInput(int index) {
return compactionInput_.get(index);
}
// repeated string compactionOutput = 5;
public static final int COMPACTIONOUTPUT_FIELD_NUMBER = 5;
private com.google.protobuf.LazyStringList compactionOutput_;
public java.util.List<String>
getCompactionOutputList() {
return compactionOutput_;
}
public int getCompactionOutputCount() {
return compactionOutput_.size();
}
public String getCompactionOutput(int index) {
return compactionOutput_.get(index);
}
// required string storeHomeDir = 6;
public static final int STOREHOMEDIR_FIELD_NUMBER = 6;
private java.lang.Object storeHomeDir_;
public boolean hasStoreHomeDir() {
return ((bitField0_ & 0x00000008) == 0x00000008);
}
public String getStoreHomeDir() {
java.lang.Object ref = storeHomeDir_;
if (ref instanceof String) {
return (String) ref;
} else {
com.google.protobuf.ByteString bs =
(com.google.protobuf.ByteString) ref;
String s = bs.toStringUtf8();
if (com.google.protobuf.Internal.isValidUtf8(bs)) {
storeHomeDir_ = s;
}
return s;
}
}
private com.google.protobuf.ByteString getStoreHomeDirBytes() {
java.lang.Object ref = storeHomeDir_;
if (ref instanceof String) {
com.google.protobuf.ByteString b =
com.google.protobuf.ByteString.copyFromUtf8((String) ref);
storeHomeDir_ = b;
return b;
} else {
return (com.google.protobuf.ByteString) ref;
}
}
private void initFields() {
tableName_ = com.google.protobuf.ByteString.EMPTY;
encodedRegionName_ = com.google.protobuf.ByteString.EMPTY;
familyName_ = com.google.protobuf.ByteString.EMPTY;
compactionInput_ = com.google.protobuf.LazyStringArrayList.EMPTY;
compactionOutput_ = com.google.protobuf.LazyStringArrayList.EMPTY;
storeHomeDir_ = "";
}
private byte memoizedIsInitialized = -1;
public final boolean isInitialized() {
byte isInitialized = memoizedIsInitialized;
if (isInitialized != -1) return isInitialized == 1;
if (!hasTableName()) {
memoizedIsInitialized = 0;
return false;
}
if (!hasEncodedRegionName()) {
memoizedIsInitialized = 0;
return false;
}
if (!hasFamilyName()) {
memoizedIsInitialized = 0;
return false;
}
if (!hasStoreHomeDir()) {
memoizedIsInitialized = 0;
return false;
}
memoizedIsInitialized = 1;
return true;
}
public void writeTo(com.google.protobuf.CodedOutputStream output)
throws java.io.IOException {
getSerializedSize();
if (((bitField0_ & 0x00000001) == 0x00000001)) {
output.writeBytes(1, tableName_);
}
if (((bitField0_ & 0x00000002) == 0x00000002)) {
output.writeBytes(2, encodedRegionName_);
}
if (((bitField0_ & 0x00000004) == 0x00000004)) {
output.writeBytes(3, familyName_);
}
for (int i = 0; i < compactionInput_.size(); i++) {
output.writeBytes(4, compactionInput_.getByteString(i));
}
for (int i = 0; i < compactionOutput_.size(); i++) {
output.writeBytes(5, compactionOutput_.getByteString(i));
}
if (((bitField0_ & 0x00000008) == 0x00000008)) {
output.writeBytes(6, getStoreHomeDirBytes());
}
getUnknownFields().writeTo(output);
}
private int memoizedSerializedSize = -1;
public int getSerializedSize() {
int size = memoizedSerializedSize;
if (size != -1) return size;
size = 0;
if (((bitField0_ & 0x00000001) == 0x00000001)) {
size += com.google.protobuf.CodedOutputStream
.computeBytesSize(1, tableName_);
}
if (((bitField0_ & 0x00000002) == 0x00000002)) {
size += com.google.protobuf.CodedOutputStream
.computeBytesSize(2, encodedRegionName_);
}
if (((bitField0_ & 0x00000004) == 0x00000004)) {
size += com.google.protobuf.CodedOutputStream
.computeBytesSize(3, familyName_);
}
{
int dataSize = 0;
for (int i = 0; i < compactionInput_.size(); i++) {
dataSize += com.google.protobuf.CodedOutputStream
.computeBytesSizeNoTag(compactionInput_.getByteString(i));
}
size += dataSize;
size += 1 * getCompactionInputList().size();
}
{
int dataSize = 0;
for (int i = 0; i < compactionOutput_.size(); i++) {
dataSize += com.google.protobuf.CodedOutputStream
.computeBytesSizeNoTag(compactionOutput_.getByteString(i));
}
size += dataSize;
size += 1 * getCompactionOutputList().size();
}
if (((bitField0_ & 0x00000008) == 0x00000008)) {
size += com.google.protobuf.CodedOutputStream
.computeBytesSize(6, getStoreHomeDirBytes());
}
size += getUnknownFields().getSerializedSize();
memoizedSerializedSize = size;
return size;
}
private static final long serialVersionUID = 0L;
@java.lang.Override
protected java.lang.Object writeReplace()
throws java.io.ObjectStreamException {
return super.writeReplace();
}
@java.lang.Override
public boolean equals(final java.lang.Object obj) {
if (obj == this) {
return true;
}
if (!(obj instanceof org.apache.hadoop.hbase.protobuf.generated.WAL.CompactionDescriptor)) {
return super.equals(obj);
}
org.apache.hadoop.hbase.protobuf.generated.WAL.CompactionDescriptor other = (org.apache.hadoop.hbase.protobuf.generated.WAL.CompactionDescriptor) obj;
boolean result = true;
result = result && (hasTableName() == other.hasTableName());
if (hasTableName()) {
result = result && getTableName()
.equals(other.getTableName());
}
result = result && (hasEncodedRegionName() == other.hasEncodedRegionName());
if (hasEncodedRegionName()) {
result = result && getEncodedRegionName()
.equals(other.getEncodedRegionName());
}
result = result && (hasFamilyName() == other.hasFamilyName());
if (hasFamilyName()) {
result = result && getFamilyName()
.equals(other.getFamilyName());
}
result = result && getCompactionInputList()
.equals(other.getCompactionInputList());
result = result && getCompactionOutputList()
.equals(other.getCompactionOutputList());
result = result && (hasStoreHomeDir() == other.hasStoreHomeDir());
if (hasStoreHomeDir()) {
result = result && getStoreHomeDir()
.equals(other.getStoreHomeDir());
}
result = result &&
getUnknownFields().equals(other.getUnknownFields());
return result;
}
@java.lang.Override
public int hashCode() {
int hash = 41;
hash = (19 * hash) + getDescriptorForType().hashCode();
if (hasTableName()) {
hash = (37 * hash) + TABLENAME_FIELD_NUMBER;
hash = (53 * hash) + getTableName().hashCode();
}
if (hasEncodedRegionName()) {
hash = (37 * hash) + ENCODEDREGIONNAME_FIELD_NUMBER;
hash = (53 * hash) + getEncodedRegionName().hashCode();
}
if (hasFamilyName()) {
hash = (37 * hash) + FAMILYNAME_FIELD_NUMBER;
hash = (53 * hash) + getFamilyName().hashCode();
}
if (getCompactionInputCount() > 0) {
hash = (37 * hash) + COMPACTIONINPUT_FIELD_NUMBER;
hash = (53 * hash) + getCompactionInputList().hashCode();
}
if (getCompactionOutputCount() > 0) {
hash = (37 * hash) + COMPACTIONOUTPUT_FIELD_NUMBER;
hash = (53 * hash) + getCompactionOutputList().hashCode();
}
if (hasStoreHomeDir()) {
hash = (37 * hash) + STOREHOMEDIR_FIELD_NUMBER;
hash = (53 * hash) + getStoreHomeDir().hashCode();
}
hash = (29 * hash) + getUnknownFields().hashCode();
return hash;
}
public static org.apache.hadoop.hbase.protobuf.generated.WAL.CompactionDescriptor parseFrom(
com.google.protobuf.ByteString data)
throws com.google.protobuf.InvalidProtocolBufferException {
return newBuilder().mergeFrom(data).buildParsed();
}
public static org.apache.hadoop.hbase.protobuf.generated.WAL.CompactionDescriptor parseFrom(
com.google.protobuf.ByteString data,
com.google.protobuf.ExtensionRegistryLite extensionRegistry)
throws com.google.protobuf.InvalidProtocolBufferException {
return newBuilder().mergeFrom(data, extensionRegistry)
.buildParsed();
}
public static org.apache.hadoop.hbase.protobuf.generated.WAL.CompactionDescriptor parseFrom(byte[] data)
throws com.google.protobuf.InvalidProtocolBufferException {
return newBuilder().mergeFrom(data).buildParsed();
}
public static org.apache.hadoop.hbase.protobuf.generated.WAL.CompactionDescriptor parseFrom(
byte[] data,
com.google.protobuf.ExtensionRegistryLite extensionRegistry)
throws com.google.protobuf.InvalidProtocolBufferException {
return newBuilder().mergeFrom(data, extensionRegistry)
.buildParsed();
}
public static org.apache.hadoop.hbase.protobuf.generated.WAL.CompactionDescriptor parseFrom(java.io.InputStream input)
throws java.io.IOException {
return newBuilder().mergeFrom(input).buildParsed();
}
public static org.apache.hadoop.hbase.protobuf.generated.WAL.CompactionDescriptor parseFrom(
java.io.InputStream input,
com.google.protobuf.ExtensionRegistryLite extensionRegistry)
throws java.io.IOException {
return newBuilder().mergeFrom(input, extensionRegistry)
.buildParsed();
}
public static org.apache.hadoop.hbase.protobuf.generated.WAL.CompactionDescriptor parseDelimitedFrom(java.io.InputStream input)
throws java.io.IOException {
Builder builder = newBuilder();
if (builder.mergeDelimitedFrom(input)) {
return builder.buildParsed();
} else {
return null;
}
}
public static org.apache.hadoop.hbase.protobuf.generated.WAL.CompactionDescriptor parseDelimitedFrom(
java.io.InputStream input,
com.google.protobuf.ExtensionRegistryLite extensionRegistry)
throws java.io.IOException {
Builder builder = newBuilder();
if (builder.mergeDelimitedFrom(input, extensionRegistry)) {
return builder.buildParsed();
} else {
return null;
}
}
public static org.apache.hadoop.hbase.protobuf.generated.WAL.CompactionDescriptor parseFrom(
com.google.protobuf.CodedInputStream input)
throws java.io.IOException {
return newBuilder().mergeFrom(input).buildParsed();
}
public static org.apache.hadoop.hbase.protobuf.generated.WAL.CompactionDescriptor parseFrom(
com.google.protobuf.CodedInputStream input,
com.google.protobuf.ExtensionRegistryLite extensionRegistry)
throws java.io.IOException {
return newBuilder().mergeFrom(input, extensionRegistry)
.buildParsed();
}
public static Builder newBuilder() { return Builder.create(); }
public Builder newBuilderForType() { return newBuilder(); }
public static Builder newBuilder(org.apache.hadoop.hbase.protobuf.generated.WAL.CompactionDescriptor prototype) {
return newBuilder().mergeFrom(prototype);
}
public Builder toBuilder() { return newBuilder(this); }
@java.lang.Override
protected Builder newBuilderForType(
com.google.protobuf.GeneratedMessage.BuilderParent parent) {
Builder builder = new Builder(parent);
return builder;
}
public static final class Builder extends
com.google.protobuf.GeneratedMessage.Builder<Builder>
implements org.apache.hadoop.hbase.protobuf.generated.WAL.CompactionDescriptorOrBuilder {
public static final com.google.protobuf.Descriptors.Descriptor
getDescriptor() {
return org.apache.hadoop.hbase.protobuf.generated.WAL.internal_static_CompactionDescriptor_descriptor;
}
protected com.google.protobuf.GeneratedMessage.FieldAccessorTable
internalGetFieldAccessorTable() {
return org.apache.hadoop.hbase.protobuf.generated.WAL.internal_static_CompactionDescriptor_fieldAccessorTable;
}
// Construct using org.apache.hadoop.hbase.protobuf.generated.WAL.CompactionDescriptor.newBuilder()
private Builder() {
maybeForceBuilderInitialization();
}
private Builder(BuilderParent parent) {
super(parent);
maybeForceBuilderInitialization();
}
private void maybeForceBuilderInitialization() {
if (com.google.protobuf.GeneratedMessage.alwaysUseFieldBuilders) {
}
}
private static Builder create() {
return new Builder();
}
public Builder clear() {
super.clear();
tableName_ = com.google.protobuf.ByteString.EMPTY;
bitField0_ = (bitField0_ & ~0x00000001);
encodedRegionName_ = com.google.protobuf.ByteString.EMPTY;
bitField0_ = (bitField0_ & ~0x00000002);
familyName_ = com.google.protobuf.ByteString.EMPTY;
bitField0_ = (bitField0_ & ~0x00000004);
compactionInput_ = com.google.protobuf.LazyStringArrayList.EMPTY;
bitField0_ = (bitField0_ & ~0x00000008);
compactionOutput_ = com.google.protobuf.LazyStringArrayList.EMPTY;
bitField0_ = (bitField0_ & ~0x00000010);
storeHomeDir_ = "";
bitField0_ = (bitField0_ & ~0x00000020);
return this;
}
public Builder clone() {
return create().mergeFrom(buildPartial());
}
public com.google.protobuf.Descriptors.Descriptor
getDescriptorForType() {
return org.apache.hadoop.hbase.protobuf.generated.WAL.CompactionDescriptor.getDescriptor();
}
public org.apache.hadoop.hbase.protobuf.generated.WAL.CompactionDescriptor getDefaultInstanceForType() {
return org.apache.hadoop.hbase.protobuf.generated.WAL.CompactionDescriptor.getDefaultInstance();
}
public org.apache.hadoop.hbase.protobuf.generated.WAL.CompactionDescriptor build() {
org.apache.hadoop.hbase.protobuf.generated.WAL.CompactionDescriptor result = buildPartial();
if (!result.isInitialized()) {
throw newUninitializedMessageException(result);
}
return result;
}
private org.apache.hadoop.hbase.protobuf.generated.WAL.CompactionDescriptor buildParsed()
throws com.google.protobuf.InvalidProtocolBufferException {
org.apache.hadoop.hbase.protobuf.generated.WAL.CompactionDescriptor result = buildPartial();
if (!result.isInitialized()) {
throw newUninitializedMessageException(
result).asInvalidProtocolBufferException();
}
return result;
}
public org.apache.hadoop.hbase.protobuf.generated.WAL.CompactionDescriptor buildPartial() {
org.apache.hadoop.hbase.protobuf.generated.WAL.CompactionDescriptor result = new org.apache.hadoop.hbase.protobuf.generated.WAL.CompactionDescriptor(this);
int from_bitField0_ = bitField0_;
int to_bitField0_ = 0;
if (((from_bitField0_ & 0x00000001) == 0x00000001)) {
to_bitField0_ |= 0x00000001;
}
result.tableName_ = tableName_;
if (((from_bitField0_ & 0x00000002) == 0x00000002)) {
to_bitField0_ |= 0x00000002;
}
result.encodedRegionName_ = encodedRegionName_;
if (((from_bitField0_ & 0x00000004) == 0x00000004)) {
to_bitField0_ |= 0x00000004;
}
result.familyName_ = familyName_;
if (((bitField0_ & 0x00000008) == 0x00000008)) {
compactionInput_ = new com.google.protobuf.UnmodifiableLazyStringList(
compactionInput_);
bitField0_ = (bitField0_ & ~0x00000008);
}
result.compactionInput_ = compactionInput_;
if (((bitField0_ & 0x00000010) == 0x00000010)) {
compactionOutput_ = new com.google.protobuf.UnmodifiableLazyStringList(
compactionOutput_);
bitField0_ = (bitField0_ & ~0x00000010);
}
result.compactionOutput_ = compactionOutput_;
if (((from_bitField0_ & 0x00000020) == 0x00000020)) {
to_bitField0_ |= 0x00000008;
}
result.storeHomeDir_ = storeHomeDir_;
result.bitField0_ = to_bitField0_;
onBuilt();
return result;
}
public Builder mergeFrom(com.google.protobuf.Message other) {
if (other instanceof org.apache.hadoop.hbase.protobuf.generated.WAL.CompactionDescriptor) {
return mergeFrom((org.apache.hadoop.hbase.protobuf.generated.WAL.CompactionDescriptor)other);
} else {
super.mergeFrom(other);
return this;
}
}
public Builder mergeFrom(org.apache.hadoop.hbase.protobuf.generated.WAL.CompactionDescriptor other) {
if (other == org.apache.hadoop.hbase.protobuf.generated.WAL.CompactionDescriptor.getDefaultInstance()) return this;
if (other.hasTableName()) {
setTableName(other.getTableName());
}
if (other.hasEncodedRegionName()) {
setEncodedRegionName(other.getEncodedRegionName());
}
if (other.hasFamilyName()) {
setFamilyName(other.getFamilyName());
}
if (!other.compactionInput_.isEmpty()) {
if (compactionInput_.isEmpty()) {
compactionInput_ = other.compactionInput_;
bitField0_ = (bitField0_ & ~0x00000008);
} else {
ensureCompactionInputIsMutable();
compactionInput_.addAll(other.compactionInput_);
}
onChanged();
}
if (!other.compactionOutput_.isEmpty()) {
if (compactionOutput_.isEmpty()) {
compactionOutput_ = other.compactionOutput_;
bitField0_ = (bitField0_ & ~0x00000010);
} else {
ensureCompactionOutputIsMutable();
compactionOutput_.addAll(other.compactionOutput_);
}
onChanged();
}
if (other.hasStoreHomeDir()) {
setStoreHomeDir(other.getStoreHomeDir());
}
this.mergeUnknownFields(other.getUnknownFields());
return this;
}
public final boolean isInitialized() {
if (!hasTableName()) {
return false;
}
if (!hasEncodedRegionName()) {
return false;
}
if (!hasFamilyName()) {
return false;
}
if (!hasStoreHomeDir()) {
return false;
}
return true;
}
public Builder mergeFrom(
com.google.protobuf.CodedInputStream input,
com.google.protobuf.ExtensionRegistryLite extensionRegistry)
throws java.io.IOException {
com.google.protobuf.UnknownFieldSet.Builder unknownFields =
com.google.protobuf.UnknownFieldSet.newBuilder(
this.getUnknownFields());
while (true) {
int tag = input.readTag();
switch (tag) {
case 0:
this.setUnknownFields(unknownFields.build());
onChanged();
return this;
default: {
if (!parseUnknownField(input, unknownFields,
extensionRegistry, tag)) {
this.setUnknownFields(unknownFields.build());
onChanged();
return this;
}
break;
}
case 10: {
bitField0_ |= 0x00000001;
tableName_ = input.readBytes();
break;
}
case 18: {
bitField0_ |= 0x00000002;
encodedRegionName_ = input.readBytes();
break;
}
case 26: {
bitField0_ |= 0x00000004;
familyName_ = input.readBytes();
break;
}
case 34: {
ensureCompactionInputIsMutable();
compactionInput_.add(input.readBytes());
break;
}
case 42: {
ensureCompactionOutputIsMutable();
compactionOutput_.add(input.readBytes());
break;
}
case 50: {
bitField0_ |= 0x00000020;
storeHomeDir_ = input.readBytes();
break;
}
}
}
}
private int bitField0_;
// required bytes tableName = 1;
private com.google.protobuf.ByteString tableName_ = com.google.protobuf.ByteString.EMPTY;
public boolean hasTableName() {
return ((bitField0_ & 0x00000001) == 0x00000001);
}
public com.google.protobuf.ByteString getTableName() {
return tableName_;
}
public Builder setTableName(com.google.protobuf.ByteString value) {
if (value == null) {
throw new NullPointerException();
}
bitField0_ |= 0x00000001;
tableName_ = value;
onChanged();
return this;
}
public Builder clearTableName() {
bitField0_ = (bitField0_ & ~0x00000001);
tableName_ = getDefaultInstance().getTableName();
onChanged();
return this;
}
// required bytes encodedRegionName = 2;
private com.google.protobuf.ByteString encodedRegionName_ = com.google.protobuf.ByteString.EMPTY;
public boolean hasEncodedRegionName() {
return ((bitField0_ & 0x00000002) == 0x00000002);
}
public com.google.protobuf.ByteString getEncodedRegionName() {
return encodedRegionName_;
}
public Builder setEncodedRegionName(com.google.protobuf.ByteString value) {
if (value == null) {
throw new NullPointerException();
}
bitField0_ |= 0x00000002;
encodedRegionName_ = value;
onChanged();
return this;
}
public Builder clearEncodedRegionName() {
bitField0_ = (bitField0_ & ~0x00000002);
encodedRegionName_ = getDefaultInstance().getEncodedRegionName();
onChanged();
return this;
}
// required bytes familyName = 3;
private com.google.protobuf.ByteString familyName_ = com.google.protobuf.ByteString.EMPTY;
public boolean hasFamilyName() {
return ((bitField0_ & 0x00000004) == 0x00000004);
}
public com.google.protobuf.ByteString getFamilyName() {
return familyName_;
}
public Builder setFamilyName(com.google.protobuf.ByteString value) {
if (value == null) {
throw new NullPointerException();
}
bitField0_ |= 0x00000004;
familyName_ = value;
onChanged();
return this;
}
public Builder clearFamilyName() {
bitField0_ = (bitField0_ & ~0x00000004);
familyName_ = getDefaultInstance().getFamilyName();
onChanged();
return this;
}
// repeated string compactionInput = 4;
private com.google.protobuf.LazyStringList compactionInput_ = com.google.protobuf.LazyStringArrayList.EMPTY;
private void ensureCompactionInputIsMutable() {
if (!((bitField0_ & 0x00000008) == 0x00000008)) {
compactionInput_ = new com.google.protobuf.LazyStringArrayList(compactionInput_);
bitField0_ |= 0x00000008;
}
}
public java.util.List<String>
getCompactionInputList() {
return java.util.Collections.unmodifiableList(compactionInput_);
}
public int getCompactionInputCount() {
return compactionInput_.size();
}
public String getCompactionInput(int index) {
return compactionInput_.get(index);
}
public Builder setCompactionInput(
int index, String value) {
if (value == null) {
throw new NullPointerException();
}
ensureCompactionInputIsMutable();
compactionInput_.set(index, value);
onChanged();
return this;
}
public Builder addCompactionInput(String value) {
if (value == null) {
throw new NullPointerException();
}
ensureCompactionInputIsMutable();
compactionInput_.add(value);
onChanged();
return this;
}
public Builder addAllCompactionInput(
java.lang.Iterable<String> values) {
ensureCompactionInputIsMutable();
super.addAll(values, compactionInput_);
onChanged();
return this;
}
public Builder clearCompactionInput() {
compactionInput_ = com.google.protobuf.LazyStringArrayList.EMPTY;
bitField0_ = (bitField0_ & ~0x00000008);
onChanged();
return this;
}
void addCompactionInput(com.google.protobuf.ByteString value) {
ensureCompactionInputIsMutable();
compactionInput_.add(value);
onChanged();
}
// repeated string compactionOutput = 5;
private com.google.protobuf.LazyStringList compactionOutput_ = com.google.protobuf.LazyStringArrayList.EMPTY;
private void ensureCompactionOutputIsMutable() {
if (!((bitField0_ & 0x00000010) == 0x00000010)) {
compactionOutput_ = new com.google.protobuf.LazyStringArrayList(compactionOutput_);
bitField0_ |= 0x00000010;
}
}
public java.util.List<String>
getCompactionOutputList() {
return java.util.Collections.unmodifiableList(compactionOutput_);
}
public int getCompactionOutputCount() {
return compactionOutput_.size();
}
public String getCompactionOutput(int index) {
return compactionOutput_.get(index);
}
public Builder setCompactionOutput(
int index, String value) {
if (value == null) {
throw new NullPointerException();
}
ensureCompactionOutputIsMutable();
compactionOutput_.set(index, value);
onChanged();
return this;
}
public Builder addCompactionOutput(String value) {
if (value == null) {
throw new NullPointerException();
}
ensureCompactionOutputIsMutable();
compactionOutput_.add(value);
onChanged();
return this;
}
public Builder addAllCompactionOutput(
java.lang.Iterable<String> values) {
ensureCompactionOutputIsMutable();
super.addAll(values, compactionOutput_);
onChanged();
return this;
}
public Builder clearCompactionOutput() {
compactionOutput_ = com.google.protobuf.LazyStringArrayList.EMPTY;
bitField0_ = (bitField0_ & ~0x00000010);
onChanged();
return this;
}
void addCompactionOutput(com.google.protobuf.ByteString value) {
ensureCompactionOutputIsMutable();
compactionOutput_.add(value);
onChanged();
}
// required string storeHomeDir = 6;
private java.lang.Object storeHomeDir_ = "";
public boolean hasStoreHomeDir() {
return ((bitField0_ & 0x00000020) == 0x00000020);
}
public String getStoreHomeDir() {
java.lang.Object ref = storeHomeDir_;
if (!(ref instanceof String)) {
String s = ((com.google.protobuf.ByteString) ref).toStringUtf8();
storeHomeDir_ = s;
return s;
} else {
return (String) ref;
}
}
public Builder setStoreHomeDir(String value) {
if (value == null) {
throw new NullPointerException();
}
bitField0_ |= 0x00000020;
storeHomeDir_ = value;
onChanged();
return this;
}
public Builder clearStoreHomeDir() {
bitField0_ = (bitField0_ & ~0x00000020);
storeHomeDir_ = getDefaultInstance().getStoreHomeDir();
onChanged();
return this;
}
void setStoreHomeDir(com.google.protobuf.ByteString value) {
bitField0_ |= 0x00000020;
storeHomeDir_ = value;
onChanged();
}
// @@protoc_insertion_point(builder_scope:CompactionDescriptor)
}
static {
defaultInstance = new CompactionDescriptor(true);
defaultInstance.initFields();
}
// @@protoc_insertion_point(class_scope:CompactionDescriptor)
}
private static com.google.protobuf.Descriptors.Descriptor
internal_static_CompactionDescriptor_descriptor;
private static
com.google.protobuf.GeneratedMessage.FieldAccessorTable
internal_static_CompactionDescriptor_fieldAccessorTable;
public static com.google.protobuf.Descriptors.FileDescriptor
getDescriptor() {
return descriptor;
}
private static com.google.protobuf.Descriptors.FileDescriptor
descriptor;
static {
java.lang.String[] descriptorData = {
"\n\tWAL.proto\032\013hbase.proto\"\241\001\n\024CompactionD" +
"escriptor\022\021\n\ttableName\030\001 \002(\014\022\031\n\021encodedR" +
"egionName\030\002 \002(\014\022\022\n\nfamilyName\030\003 \002(\014\022\027\n\017c" +
"ompactionInput\030\004 \003(\t\022\030\n\020compactionOutput" +
"\030\005 \003(\t\022\024\n\014storeHomeDir\030\006 \002(\tB6\n*org.apac" +
"he.hadoop.hbase.protobuf.generatedB\003WALH" +
"\001\240\001\001"
};
com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
public com.google.protobuf.ExtensionRegistry assignDescriptors(
com.google.protobuf.Descriptors.FileDescriptor root) {
descriptor = root;
internal_static_CompactionDescriptor_descriptor =
getDescriptor().getMessageTypes().get(0);
internal_static_CompactionDescriptor_fieldAccessorTable = new
com.google.protobuf.GeneratedMessage.FieldAccessorTable(
internal_static_CompactionDescriptor_descriptor,
new java.lang.String[] { "TableName", "EncodedRegionName", "FamilyName", "CompactionInput", "CompactionOutput", "StoreHomeDir", },
org.apache.hadoop.hbase.protobuf.generated.WAL.CompactionDescriptor.class,
org.apache.hadoop.hbase.protobuf.generated.WAL.CompactionDescriptor.Builder.class);
return null;
}
};
com.google.protobuf.Descriptors.FileDescriptor
.internalBuildGeneratedFileFrom(descriptorData,
new com.google.protobuf.Descriptors.FileDescriptor[] {
org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.getDescriptor(),
}, assigner);
}
// @@protoc_insertion_point(outer_class_scope)
}

View File

@ -0,0 +1,42 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
option java_package = "org.apache.hadoop.hbase.protobuf.generated";
option java_outer_classname = "WAL";
option java_generate_equals_and_hash = true;
option optimize_for = SPEED;
import "hbase.proto";
/**
* WAL entries
*/
/**
* Special WAL entry to hold all related to a compaction.
* Written to WAL before completing compaction. There is
* sufficient info in the below message to complete later
* the * compaction should we fail the WAL write.
*/
message CompactionDescriptor {
required bytes tableName = 1;
required bytes encodedRegionName = 2;
required bytes familyName = 3;
repeated string compactionInput = 4;
repeated string compactionOutput = 5;
required string storeHomeDir = 6;
}

View File

@ -35,9 +35,7 @@ import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.regionserver.wal.HLog;
import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
import org.apache.hadoop.hbase.regionserver.wal.HLogUtil;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.mapreduce.Job;
@ -83,7 +81,7 @@ public class WALPlayer extends Configured implements Tool {
// skip all other tables
if (Bytes.equals(table, key.getTablename())) {
for (KeyValue kv : value.getKeyValues()) {
if (HLogUtil.isMetaFamily(kv.getFamily())) continue;
if (WALEdit.isMetaEditFamily(kv.getFamily())) continue;
context.write(new ImmutableBytesWritable(kv.getRow()), kv);
}
}
@ -127,7 +125,7 @@ public class WALPlayer extends Configured implements Tool {
KeyValue lastKV = null;
for (KeyValue kv : value.getKeyValues()) {
// filtering HLog meta entries
if (HLogUtil.isMetaFamily(kv.getFamily())) continue;
if (WALEdit.isMetaEditFamily(kv.getFamily())) continue;
// A WALEdit may contain multiple operations (HBASE-3584) and/or
// multiple rows (HBASE-5229).

View File

@ -80,6 +80,7 @@ import org.apache.hadoop.hbase.KeyValueUtil;
import org.apache.hadoop.hbase.backup.HFileArchiver;
import org.apache.hadoop.hbase.client.Append;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Increment;
import org.apache.hadoop.hbase.client.IsolationLevel;
@ -89,7 +90,6 @@ import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Row;
import org.apache.hadoop.hbase.client.RowMutations;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.errorhandling.ForeignExceptionSnare;
import org.apache.hadoop.hbase.exceptions.DroppedSnapshotException;
import org.apache.hadoop.hbase.exceptions.FailedSanityCheckException;
@ -115,6 +115,7 @@ import org.apache.hadoop.hbase.monitoring.TaskMonitor;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetRegionInfoResponse.CompactionState;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceCall;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
import org.apache.hadoop.hbase.protobuf.generated.WAL.CompactionDescriptor;
import org.apache.hadoop.hbase.regionserver.MultiVersionConsistencyControl.WriteEntry;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
import org.apache.hadoop.hbase.regionserver.wal.HLog;
@ -235,7 +236,7 @@ public class HRegion implements HeapSize { // , Writable{
private final HLog log;
private final HRegionFileSystem fs;
private final Configuration conf;
protected final Configuration conf;
private final Configuration baseConf;
private final KeyValue.KVComparator comparator;
private final int rowLockWaitDuration;
@ -481,7 +482,7 @@ public class HRegion implements HeapSize { // , Writable{
// When hbase.regionserver.optionallogflushinterval <= 0 , deferred log sync is disabled.
this.deferredLogSyncDisabled = conf.getLong("hbase.regionserver.optionallogflushinterval",
1 * 1000) <= 0;
if (rsServices != null) {
this.rsAccounting = this.rsServices.getRegionServerAccounting();
// don't initialize coprocessors if not running within a regionserver
@ -1127,7 +1128,7 @@ public class HRegion implements HeapSize { // , Writable{
* Do preparation for pending compaction.
* @throws IOException
*/
void doRegionCompactionPrep() throws IOException {
protected void doRegionCompactionPrep() throws IOException {
}
void triggerMajorCompaction() {
@ -2078,8 +2079,8 @@ public class HRegion implements HeapSize { // , Writable{
// calling the pre CP hook for batch mutation
if (coprocessorHost != null) {
MiniBatchOperationInProgress<Pair<Mutation, Integer>> miniBatchOp =
new MiniBatchOperationInProgress<Pair<Mutation, Integer>>(batchOp.operations,
MiniBatchOperationInProgress<Pair<Mutation, Integer>> miniBatchOp =
new MiniBatchOperationInProgress<Pair<Mutation, Integer>>(batchOp.operations,
batchOp.retCodeDetails, batchOp.walEditsFromCoprocessors, firstIndex, lastIndexExclusive);
if (coprocessorHost.preBatchMutate(miniBatchOp)) return 0L;
}
@ -2115,7 +2116,7 @@ public class HRegion implements HeapSize { // , Writable{
batchOp.retCodeDetails[i] = OperationStatus.SUCCESS;
Mutation m = batchOp.operations[i].getFirst();
Durability tmpDur = m.getDurability();
Durability tmpDur = m.getDurability();
if (tmpDur.ordinal() > durability.ordinal()) {
durability = tmpDur;
}
@ -2165,8 +2166,8 @@ public class HRegion implements HeapSize { // , Writable{
walSyncSuccessful = true;
// calling the post CP hook for batch mutation
if (coprocessorHost != null) {
MiniBatchOperationInProgress<Pair<Mutation, Integer>> miniBatchOp =
new MiniBatchOperationInProgress<Pair<Mutation, Integer>>(batchOp.operations,
MiniBatchOperationInProgress<Pair<Mutation, Integer>> miniBatchOp =
new MiniBatchOperationInProgress<Pair<Mutation, Integer>>(batchOp.operations,
batchOp.retCodeDetails, batchOp.walEditsFromCoprocessors, firstIndex, lastIndexExclusive);
coprocessorHost.postBatchMutate(miniBatchOp);
}
@ -2877,9 +2878,16 @@ public class HRegion implements HeapSize { // , Writable{
for (KeyValue kv: val.getKeyValues()) {
// Check this edit is for me. Also, guard against writing the special
// METACOLUMN info such as HBASE::CACHEFLUSH entries
if (kv.matchingFamily(HLog.METAFAMILY) ||
if (kv.matchingFamily(WALEdit.METAFAMILY) ||
!Bytes.equals(key.getEncodedRegionName(),
this.getRegionInfo().getEncodedNameAsBytes())) {
//this is a special edit, we should handle it
CompactionDescriptor compaction = WALEdit.getCompaction(kv);
if (compaction != null) {
//replay the compaction
completeCompactionMarker(compaction);
}
skippedEdits++;
continue;
}
@ -2953,6 +2961,23 @@ public class HRegion implements HeapSize { // , Writable{
}
}
/**
* Call to complete a compaction. Its for the case where we find in the WAL a compaction
* that was not finished. We could find one recovering a WAL after a regionserver crash.
* See HBASE-2331.
* @param fs
* @param compaction
*/
void completeCompactionMarker(CompactionDescriptor compaction)
throws IOException {
Store store = this.getStore(compaction.getFamilyName().toByteArray());
if (store == null) {
LOG.warn("Found Compaction WAL edit for deleted family:" + Bytes.toString(compaction.getFamilyName().toByteArray()));
return;
}
store.completeCompactionMarker(compaction);
}
/**
* Used by tests
* @param s Store to add edit too.
@ -3425,10 +3450,10 @@ public class HRegion implements HeapSize { // , Writable{
public long getMvccReadPoint() {
return this.readPt;
}
/**
* Reset both the filter and the old filter.
*
*
* @throws IOException in case a filter raises an I/O exception.
*/
protected void resetFilters() throws IOException {
@ -3637,7 +3662,7 @@ public class HRegion implements HeapSize { // , Writable{
// If joinedHeap is pointing to some other row, try to seek to a correct one.
boolean mayHaveData =
(nextJoinedKv != null && nextJoinedKv.matchingRow(currentRow, offset, length))
|| (this.joinedHeap.requestSeek(KeyValue.createFirstOnRow(currentRow, offset, length),
|| (this.joinedHeap.requestSeek(KeyValue.createFirstOnRow(currentRow, offset, length),
true, true)
&& joinedHeap.peek() != null
&& joinedHeap.peek().matchingRow(currentRow, offset, length));
@ -4245,7 +4270,7 @@ public class HRegion implements HeapSize { // , Writable{
LOG.debug("Files for region: " + b);
b.getRegionFileSystem().logFileSystemState(LOG);
}
RegionMergeTransaction rmt = new RegionMergeTransaction(a, b, true);
if (!rmt.prepare(null)) {
throw new IOException("Unable to merge regions " + a + " and " + b);
@ -4271,7 +4296,7 @@ public class HRegion implements HeapSize { // , Writable{
LOG.debug("Files for new region");
dstRegion.getRegionFileSystem().logFileSystemState(LOG);
}
if (dstRegion.getRegionFileSystem().hasReferences(dstRegion.getTableDesc())) {
throw new IOException("Merged region " + dstRegion
+ " still has references after the compaction, is compaction canceled?");

View File

@ -52,23 +52,23 @@ import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.RemoteExceptionHandler;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.exceptions.InvalidHFileException;
import org.apache.hadoop.hbase.exceptions.WrongRegionException;
import org.apache.hadoop.hbase.fs.HFileSystem;
import org.apache.hadoop.hbase.io.compress.Compression;
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoder;
import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoderImpl;
import org.apache.hadoop.hbase.io.hfile.HFileScanner;
import org.apache.hadoop.hbase.exceptions.InvalidHFileException;
import org.apache.hadoop.hbase.io.hfile.NoOpDataBlockEncoder;
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.generated.WAL.CompactionDescriptor;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionPolicy;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
import org.apache.hadoop.hbase.regionserver.compactions.Compactor;
import org.apache.hadoop.hbase.regionserver.compactions.OffPeakHours;
import org.apache.hadoop.hbase.regionserver.wal.HLogUtil;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ChecksumType;
import org.apache.hadoop.hbase.util.ClassSize;
@ -124,6 +124,16 @@ public class HStore implements Store {
static int closeCheckInterval = 0;
private volatile long storeSize = 0L;
private volatile long totalUncompressedBytes = 0L;
/**
* RWLock for store operations.
* Locked in shared mode when the list of component stores is looked at:
* - all reads/writes to table data
* - checking for split
* Locked in exclusive mode when the list of component stores is modified:
* - closing
* - completing a compaction
*/
final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
private final boolean verifyBulkLoads;
@ -389,14 +399,11 @@ public class HStore implements Store {
new ExecutorCompletionService<StoreFile>(storeFileOpenerThreadPool);
int totalValidStoreFile = 0;
final FileSystem fs = this.getFileSystem();
for (final StoreFileInfo storeFileInfo: files) {
// open each store file in parallel
completionService.submit(new Callable<StoreFile>() {
public StoreFile call() throws IOException {
StoreFile storeFile = new StoreFile(fs, storeFileInfo.getPath(), conf, cacheConf,
family.getBloomFilterType(), dataBlockEncoder);
storeFile.createReader();
StoreFile storeFile = createStoreFileAndReader(storeFileInfo.getPath());
return storeFile;
}
});
@ -440,6 +447,17 @@ public class HStore implements Store {
return results;
}
private StoreFile createStoreFileAndReader(final Path p) throws IOException {
return createStoreFileAndReader(p, this.dataBlockEncoder);
}
private StoreFile createStoreFileAndReader(final Path p, final HFileDataBlockEncoder encoder) throws IOException {
StoreFile storeFile = new StoreFile(this.getFileSystem(), p, this.conf, this.cacheConf,
this.family.getBloomFilterType(), encoder);
storeFile.createReader();
return storeFile;
}
@Override
public long add(final KeyValue kv) {
lock.readLock().lock();
@ -553,10 +571,9 @@ public class HStore implements Store {
Path srcPath = new Path(srcPathStr);
Path dstPath = fs.bulkLoadStoreFile(getColumnFamilyName(), srcPath, seqNum);
StoreFile sf = new StoreFile(this.getFileSystem(), dstPath, this.conf, this.cacheConf,
this.family.getBloomFilterType(), this.dataBlockEncoder);
StoreFile sf = createStoreFileAndReader(dstPath);
StoreFile.Reader r = sf.createReader();
StoreFile.Reader r = sf.getReader();
this.storeSize += r.length();
this.totalUncompressedBytes += r.getTotalUncompressedBytes();
@ -716,10 +733,9 @@ public class HStore implements Store {
Path dstPath = fs.commitStoreFile(getColumnFamilyName(), path);
status.setStatus("Flushing " + this + ": reopening flushed file");
StoreFile sf = new StoreFile(this.getFileSystem(), dstPath, this.conf, this.cacheConf,
this.family.getBloomFilterType(), this.dataBlockEncoder);
StoreFile sf = createStoreFileAndReader(dstPath);
StoreFile.Reader r = sf.createReader();
StoreFile.Reader r = sf.getReader();
this.storeSize += r.length();
this.totalUncompressedBytes += r.getTotalUncompressedBytes();
@ -878,6 +894,29 @@ public class HStore implements Store {
* <p>We don't want to hold the structureLock for the whole time, as a compact()
* can be lengthy and we want to allow cache-flushes during this period.
*
* <p> Compaction event should be idempotent, since there is no IO Fencing for
* the region directory in hdfs. A region server might still try to complete the
* compaction after it lost the region. That is why the following events are carefully
* ordered for a compaction:
* 1. Compaction writes new files under region/.tmp directory (compaction output)
* 2. Compaction atomically moves the temporary file under region directory
* 3. Compaction appends a WAL edit containing the compaction input and output files.
* Forces sync on WAL.
* 4. Compaction deletes the input files from the region directory.
*
* Failure conditions are handled like this:
* - If RS fails before 2, compaction wont complete. Even if RS lives on and finishes
* the compaction later, it will only write the new data file to the region directory.
* Since we already have this data, this will be idempotent but we will have a redundant
* copy of the data.
* - If RS fails between 2 and 3, the region will have a redundant copy of the data. The
* RS that failed won't be able to finish snyc() for WAL because of lease recovery in WAL.
* - If RS fails after 3, the region region server who opens the region will pick up the
* the compaction marker from the WAL and replay it by removing the compaction input files.
* Failed RS can also attempt to delete those files, but the operation will be idempotent
*
* See HBASE-2231 for details.
*
* @param compaction compaction details obtained from requestCompaction()
* @throws IOException
* @return Storefile we compacted into or null if we failed or opted out early.
@ -906,6 +945,13 @@ public class HStore implements Store {
List<Path> newFiles = compaction.compact();
// Move the compaction into place.
if (this.conf.getBoolean("hbase.hstore.compaction.complete", true)) {
//Write compaction to WAL
List<Path> inputPaths = new ArrayList<Path>();
for (StoreFile f : filesToCompact) {
inputPaths.add(f.getPath());
}
ArrayList<Path> outputPaths = new ArrayList(newFiles.size());
for (Path newFile: newFiles) {
assert newFile != null;
StoreFile sf = moveFileIntoPlace(newFile);
@ -914,14 +960,21 @@ public class HStore implements Store {
}
assert sf != null;
sfs.add(sf);
outputPaths.add(sf.getPath());
}
if (region.getLog() != null) {
HRegionInfo info = this.region.getRegionInfo();
CompactionDescriptor compactionDescriptor = ProtobufUtil.toCompactionDescriptor(info,
family.getName(), inputPaths, outputPaths, fs.getStoreDir(getFamily().getNameAsString()));
HLogUtil.writeCompactionMarker(region.getLog(), this.region.getTableDesc(),
this.region.getRegionInfo(), compactionDescriptor);
}
completeCompaction(filesToCompact, sfs);
} else {
for (Path newFile: newFiles) {
// Create storefile around what we wrote with a reader on it.
StoreFile sf = new StoreFile(this.getFileSystem(), newFile, this.conf, this.cacheConf,
this.family.getBloomFilterType(), this.dataBlockEncoder);
sf.createReader();
StoreFile sf = createStoreFileAndReader(newFile);
sfs.add(sf);
}
}
@ -970,10 +1023,58 @@ public class HStore implements Store {
validateStoreFile(newFile);
// Move the file into the right spot
Path destPath = fs.commitStoreFile(getColumnFamilyName(), newFile);
StoreFile result = new StoreFile(this.getFileSystem(), destPath, this.conf, this.cacheConf,
this.family.getBloomFilterType(), this.dataBlockEncoder);
result.createReader();
return result;
StoreFile sf = createStoreFileAndReader(destPath);
return sf;
}
/**
* Call to complete a compaction. Its for the case where we find in the WAL a compaction
* that was not finished. We could find one recovering a WAL after a regionserver crash.
* See HBASE-2331.
* @param compaction
*/
public void completeCompactionMarker(CompactionDescriptor compaction)
throws IOException {
LOG.debug("Completing compaction from the WAL marker");
List<String> compactionInputs = compaction.getCompactionInputList();
List<String> compactionOutputs = compaction.getCompactionOutputList();
List<StoreFile> outputStoreFiles = new ArrayList<StoreFile>(compactionOutputs.size());
for (String compactionOutput : compactionOutputs) {
//we should have this store file already
boolean found = false;
Path outputPath = new Path(fs.getStoreDir(family.getNameAsString()), compactionOutput);
outputPath = outputPath.makeQualified(fs.getFileSystem());
for (StoreFile sf : this.getStorefiles()) {
if (sf.getPath().makeQualified(sf.getPath().getFileSystem(conf)).equals(outputPath)) {
found = true;
break;
}
}
if (!found) {
if (getFileSystem().exists(outputPath)) {
outputStoreFiles.add(createStoreFileAndReader(outputPath));
}
}
}
List<Path> inputPaths = new ArrayList<Path>(compactionInputs.size());
for (String compactionInput : compactionInputs) {
Path inputPath = new Path(fs.getStoreDir(family.getNameAsString()), compactionInput);
inputPath = inputPath.makeQualified(fs.getFileSystem());
inputPaths.add(inputPath);
}
//some of the input files might already be deleted
List<StoreFile> inputStoreFiles = new ArrayList<StoreFile>(compactionInputs.size());
for (StoreFile sf : this.getStorefiles()) {
if (inputPaths.contains(sf.getPath().makeQualified(fs.getFileSystem()))) {
inputStoreFiles.add(sf);
}
}
this.completeCompaction(inputStoreFiles, outputStoreFiles);
}
/**
@ -1180,10 +1281,7 @@ public class HStore implements Store {
throws IOException {
StoreFile storeFile = null;
try {
storeFile = new StoreFile(this.getFileSystem(), path, this.conf,
this.cacheConf, this.family.getBloomFilterType(),
NoOpDataBlockEncoder.INSTANCE);
storeFile.createReader();
createStoreFileAndReader(path, NoOpDataBlockEncoder.INSTANCE);
} catch (IOException e) {
LOG.error("Failed to open store file : " + path
+ ", keeping it in tmp location", e);
@ -1214,7 +1312,7 @@ public class HStore implements Store {
* @return StoreFile created. May be null.
* @throws IOException
*/
private void completeCompaction(final Collection<StoreFile> compactedFiles,
protected void completeCompaction(final Collection<StoreFile> compactedFiles,
final Collection<StoreFile> result) throws IOException {
try {
this.lock.writeLock().lock();
@ -1239,6 +1337,9 @@ public class HStore implements Store {
// let the archive util decide if we should archive or delete the files
LOG.debug("Removing store files after compaction...");
for (StoreFile compactedFile : compactedFiles) {
compactedFile.closeReader(true);
}
this.fs.removeStoreFiles(this.getColumnFamilyName(), compactedFiles);
} catch (IOException e) {

View File

@ -35,6 +35,7 @@ import org.apache.hadoop.hbase.io.HeapSize;
import org.apache.hadoop.hbase.io.compress.Compression;
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoder;
import org.apache.hadoop.hbase.protobuf.generated.WAL.CompactionDescriptor;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
@ -103,7 +104,7 @@ public interface Store extends HeapSize, StoreConfigInformation {
* This operation is atomic on each KeyValue (row/family/qualifier) but not necessarily atomic
* across all of them.
* @param cells
* @param readpoint readpoint below which we can safely remove duplicate KVs
* @param readpoint readpoint below which we can safely remove duplicate KVs
* @return memstore size delta
* @throws IOException
*/
@ -190,6 +191,15 @@ public interface Store extends HeapSize, StoreConfigInformation {
public StoreFlushContext createFlushContext(long cacheFlushId);
/**
* Call to complete a compaction. Its for the case where we find in the WAL a compaction
* that was not finished. We could find one recovering a WAL after a regionserver crash.
* See HBASE-2331.
* @param compaction
*/
public void completeCompactionMarker(CompactionDescriptor compaction)
throws IOException;
// Split oriented methods
public boolean canSplit();
@ -211,7 +221,7 @@ public interface Store extends HeapSize, StoreConfigInformation {
/**
* This method should only be called from HRegion. It is assumed that the ranges of values in the
* HFile fit within the stores assigned region. (assertBulkLoadHFileOk checks this)
*
*
* @param srcPathStr
* @param sequenceId sequence Id associated with the HFile
*/

View File

@ -27,7 +27,6 @@ import java.net.URLEncoder;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.TreeMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
@ -107,7 +106,7 @@ import org.apache.hadoop.util.StringUtils;
@InterfaceAudience.Private
class FSHLog implements HLog, Syncable {
static final Log LOG = LogFactory.getLog(FSHLog.class);
private final FileSystem fs;
private final Path rootDir;
private final Path dir;
@ -123,12 +122,11 @@ class FSHLog implements HLog, Syncable {
private long lastDeferredTxid;
private final Path oldLogDir;
private volatile boolean logRollRunning;
private boolean failIfLogDirExists;
private WALCoprocessorHost coprocessorHost;
private FSDataOutputStream hdfs_out; // FSDataOutputStream associated with the current SequenceFile.writer
// Minimum tolerable replicas, if the actual value is lower than it,
// Minimum tolerable replicas, if the actual value is lower than it,
// rollWriter will be triggered
private int minTolerableReplication;
private Method getNumCurrentReplicas; // refers to DFSOutputStream.getNumCurrentReplicas
@ -241,10 +239,10 @@ class FSHLog implements HLog, Syncable {
public FSHLog(final FileSystem fs, final Path root, final String logDir,
final Configuration conf)
throws IOException {
this(fs, root, logDir, HConstants.HREGION_OLDLOGDIR_NAME,
this(fs, root, logDir, HConstants.HREGION_OLDLOGDIR_NAME,
conf, null, true, null, false);
}
/**
* Constructor.
*
@ -258,7 +256,7 @@ class FSHLog implements HLog, Syncable {
public FSHLog(final FileSystem fs, final Path root, final String logDir,
final String oldLogDir, final Configuration conf)
throws IOException {
this(fs, root, logDir, oldLogDir,
this(fs, root, logDir, oldLogDir,
conf, null, true, null, false);
}
@ -284,7 +282,7 @@ class FSHLog implements HLog, Syncable {
public FSHLog(final FileSystem fs, final Path root, final String logDir,
final Configuration conf, final List<WALActionsListener> listeners,
final String prefix) throws IOException {
this(fs, root, logDir, HConstants.HREGION_OLDLOGDIR_NAME,
this(fs, root, logDir, HConstants.HREGION_OLDLOGDIR_NAME,
conf, listeners, true, prefix, false);
}
@ -311,7 +309,7 @@ class FSHLog implements HLog, Syncable {
* @throws IOException
*/
public FSHLog(final FileSystem fs, final Path root, final String logDir,
final String oldLogDir, final Configuration conf,
final String oldLogDir, final Configuration conf,
final List<WALActionsListener> listeners,
final boolean failIfLogDirExists, final String prefix, boolean forMeta)
throws IOException {
@ -322,15 +320,13 @@ class FSHLog implements HLog, Syncable {
this.oldLogDir = new Path(this.rootDir, oldLogDir);
this.forMeta = forMeta;
this.conf = conf;
if (listeners != null) {
for (WALActionsListener i: listeners) {
registerWALActionsListener(i);
}
}
this.failIfLogDirExists = failIfLogDirExists;
this.blocksize = this.conf.getLong("hbase.regionserver.hlog.blocksize",
getDefaultBlockSize());
// Roll at 95% of block size.
@ -338,7 +334,7 @@ class FSHLog implements HLog, Syncable {
this.logrollsize = (long)(this.blocksize * multi);
this.optionalFlushInterval =
conf.getLong("hbase.regionserver.optionallogflushinterval", 1 * 1000);
this.maxLogs = conf.getInt("hbase.regionserver.maxlogs", 32);
this.minTolerableReplication = conf.getInt(
"hbase.regionserver.hlog.tolerable.lowreplication",
@ -348,9 +344,9 @@ class FSHLog implements HLog, Syncable {
this.enabled = conf.getBoolean("hbase.regionserver.hlog.enabled", true);
this.closeErrorsTolerated = conf.getInt(
"hbase.regionserver.logroll.errors.tolerated", 0);
this.logSyncer = new LogSyncer(this.optionalFlushInterval);
LOG.info("HLog configuration: blocksize=" +
StringUtils.byteDesc(this.blocksize) +
", rollsize=" + StringUtils.byteDesc(this.logrollsize) +
@ -375,7 +371,7 @@ class FSHLog implements HLog, Syncable {
}
// rollWriter sets this.hdfs_out if it can.
rollWriter();
// handle the reflection necessary to call getNumCurrentReplicas()
this.getNumCurrentReplicas = getGetNumCurrentReplicas(this.hdfs_out);
@ -392,7 +388,7 @@ class FSHLog implements HLog, Syncable {
this.metrics = new MetricsWAL();
}
// use reflection to search for getDefaultBlockSize(Path f)
// if the method doesn't exist, fall back to using getDefaultBlockSize()
private long getDefaultBlockSize() throws IOException {
@ -485,7 +481,7 @@ class FSHLog implements HLog, Syncable {
* @return The wrapped stream our writer is using; its not the
* writer's 'out' FSDatoOutputStream but the stream that this 'out' wraps
* (In hdfs its an instance of DFSDataOutputStream).
*
*
* usage: see TestLogRolling.java
*/
OutputStream getOutputStream() {
@ -576,7 +572,7 @@ class FSHLog implements HLog, Syncable {
/**
* This method allows subclasses to inject different writers without having to
* extend other methods like rollWriter().
*
*
* @param fs
* @param path
* @param conf
@ -773,28 +769,30 @@ class FSHLog implements HLog, Syncable {
close();
if (!fs.exists(this.dir)) return;
FileStatus[] files = fs.listStatus(this.dir);
for(FileStatus file : files) {
if (files != null) {
for(FileStatus file : files) {
Path p = getHLogArchivePath(this.oldLogDir, file.getPath());
// Tell our listeners that a log is going to be archived.
if (!this.listeners.isEmpty()) {
for (WALActionsListener i : this.listeners) {
i.preLogArchive(file.getPath(), p);
}
}
if (!fs.rename(file.getPath(),p)) {
throw new IOException("Unable to rename " + file.getPath() + " to " + p);
}
// Tell our listeners that a log was archived.
if (!this.listeners.isEmpty()) {
for (WALActionsListener i : this.listeners) {
i.postLogArchive(file.getPath(), p);
Path p = getHLogArchivePath(this.oldLogDir, file.getPath());
// Tell our listeners that a log is going to be archived.
if (!this.listeners.isEmpty()) {
for (WALActionsListener i : this.listeners) {
i.preLogArchive(file.getPath(), p);
}
}
if (!fs.rename(file.getPath(),p)) {
throw new IOException("Unable to rename " + file.getPath() + " to " + p);
}
// Tell our listeners that a log was archived.
if (!this.listeners.isEmpty()) {
for (WALActionsListener i : this.listeners) {
i.postLogArchive(file.getPath(), p);
}
}
}
LOG.debug("Moved " + files.length + " log files to " +
FSUtils.getPath(this.oldLogDir));
}
LOG.debug("Moved " + files.length + " log files to " +
FSUtils.getPath(this.oldLogDir));
if (!fs.delete(dir, true)) {
LOG.info("Unable to delete " + dir);
}
@ -844,14 +842,15 @@ class FSHLog implements HLog, Syncable {
/**
* @param now
* @param regionName
* @param encodedRegionName Encoded name of the region as returned by
* <code>HRegionInfo#getEncodedNameAsBytes()</code>.
* @param tableName
* @param clusterId
* @return New log key.
*/
protected HLogKey makeKey(byte[] regionName, byte[] tableName, long seqnum,
protected HLogKey makeKey(byte[] encodedRegionName, byte[] tableName, long seqnum,
long now, UUID clusterId) {
return new HLogKey(regionName, tableName, seqnum, now, clusterId);
return new HLogKey(encodedRegionName, tableName, seqnum, now, clusterId);
}
@Override
@ -953,7 +952,7 @@ class FSHLog implements HLog, Syncable {
}
// Sync if catalog region, and if not then check if that table supports
// deferred log flushing
if (doSync &&
if (doSync &&
(info.isMetaRegion() ||
!htd.isDeferredLogFlush())) {
// sync txn to file system
@ -963,14 +962,14 @@ class FSHLog implements HLog, Syncable {
}
@Override
public long appendNoSync(HRegionInfo info, byte [] tableName, WALEdit edits,
public long appendNoSync(HRegionInfo info, byte [] tableName, WALEdit edits,
UUID clusterId, final long now, HTableDescriptor htd)
throws IOException {
return append(info, tableName, edits, clusterId, now, htd, false);
}
@Override
public long append(HRegionInfo info, byte [] tableName, WALEdit edits,
public long append(HRegionInfo info, byte [] tableName, WALEdit edits,
UUID clusterId, final long now, HTableDescriptor htd)
throws IOException {
return append(info, tableName, edits, clusterId, now, htd, true);
@ -992,8 +991,8 @@ class FSHLog implements HLog, Syncable {
// List of pending writes to the HLog. There corresponds to transactions
// that have not yet returned to the client. We keep them cached here
// instead of writing them to HDFS piecemeal, because the HDFS write
// method is pretty heavyweight as far as locking is concerned. The
// instead of writing them to HDFS piecemeal, because the HDFS write
// method is pretty heavyweight as far as locking is concerned. The
// goal is to increase the batchsize for writing-to-hdfs as well as
// sync-to-hdfs, so that we can get better system throughput.
private List<Entry> pendingWrites = new LinkedList<Entry>();
@ -1088,7 +1087,7 @@ class FSHLog implements HLog, Syncable {
try {
long doneUpto;
long now = EnvironmentEdgeManager.currentTimeMillis();
// First flush all the pending writes to HDFS. Then
// First flush all the pending writes to HDFS. Then
// issue the sync to HDFS. If sync is successful, then update
// syncedTillHere to indicate that transactions till this
// number has been successfully synced.
@ -1114,7 +1113,7 @@ class FSHLog implements HLog, Syncable {
tempWriter = this.writer;
logSyncer.hlogFlush(tempWriter, pending);
}
}
}
}
// another thread might have sync'ed avoid double-sync'ing
if (txid <= this.syncedTillHere) {
@ -1251,6 +1250,7 @@ class FSHLog implements HLog, Syncable {
}
}
// TODO: Remove info. Unused.
protected void doWrite(HRegionInfo info, HLogKey logKey, WALEdit logEdit,
HTableDescriptor htd)
throws IOException {
@ -1363,13 +1363,13 @@ class FSHLog implements HLog, Syncable {
/**
* Get the directory we are making logs in.
*
*
* @return dir
*/
protected Path getDir() {
return dir;
}
static Path getHLogArchivePath(Path oldLogDir, Path p) {
return new Path(oldLogDir, p.getName());
}
@ -1407,7 +1407,7 @@ class FSHLog implements HLog, Syncable {
conf, baseDir, p, oldLogDir, fs);
logSplitter.splitLog();
}
@Override
public WALCoprocessorHost getCoprocessorHost() {
return coprocessorHost;
@ -1456,4 +1456,4 @@ class FSHLog implements HLog, Syncable {
System.exit(-1);
}
}
}
}

View File

@ -27,24 +27,20 @@ import java.util.regex.Pattern;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.exceptions.FailedLogCloseException;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hbase.exceptions.FailedLogCloseException;
import org.apache.hadoop.io.Writable;
@InterfaceAudience.Private
public interface HLog {
public static final Log LOG = LogFactory.getLog(HLog.class);
public static final byte[] METAFAMILY = Bytes.toBytes("METAFAMILY");
static final byte[] METAROW = Bytes.toBytes("METAROW");
/** File Extension used while splitting an HLog into regions (HBASE-2312) */
public static final String SPLITTING_EXT = "-splitting";
public static final boolean SPLIT_SKIP_ERRORS_DEFAULT = false;
@ -96,7 +92,7 @@ public interface HLog {
/**
* Constructor for both params
*
*
* @param edit
* log's edit
* @param key
@ -110,7 +106,7 @@ public interface HLog {
/**
* Gets the edit
*
*
* @return edit
*/
public WALEdit getEdit() {
@ -119,7 +115,7 @@ public interface HLog {
/**
* Gets the key
*
*
* @return key
*/
public HLogKey getKey() {
@ -128,7 +124,7 @@ public interface HLog {
/**
* Set compression context for this entry.
*
*
* @param compressionContext
* Compression context
*/
@ -157,14 +153,14 @@ public interface HLog {
/**
* registers WALActionsListener
*
*
* @param listener
*/
public void registerWALActionsListener(final WALActionsListener listener);
/**
* unregisters WALActionsListener
*
*
* @param listener
*/
public boolean unregisterWALActionsListener(final WALActionsListener listener);
@ -178,7 +174,7 @@ public interface HLog {
* Called by HRegionServer when it opens a new region to ensure that log
* sequence numbers are always greater than the latest sequence number of the
* region being brought on-line.
*
*
* @param newvalue
* We'll set log edit/sequence number to this value if it is greater
* than the current value.
@ -192,7 +188,7 @@ public interface HLog {
/**
* Roll the log writer. That is, start writing log messages to a new file.
*
*
* <p>
* The implementation is synchronized in order to make sure there's one rollWriter
* running at any given time.
@ -207,11 +203,11 @@ public interface HLog {
/**
* Roll the log writer. That is, start writing log messages to a new file.
*
*
* <p>
* The implementation is synchronized in order to make sure there's one rollWriter
* running at any given time.
*
*
* @param force
* If true, force creation of a new writer even if no entries have
* been written to the current writer
@ -226,21 +222,21 @@ public interface HLog {
/**
* Shut down the log.
*
*
* @throws IOException
*/
public void close() throws IOException;
/**
* Shut down the log and delete the log directory
*
*
* @throws IOException
*/
public void closeAndDelete() throws IOException;
/**
* Append an entry to the log.
*
*
* @param regionInfo
* @param logEdit
* @param logKey
@ -254,7 +250,7 @@ public interface HLog {
/**
* Only used in tests.
*
*
* @param info
* @param tableName
* @param edits
@ -269,7 +265,7 @@ public interface HLog {
* Append a set of edits to the log. Log edits are keyed by (encoded)
* regionName, rowname, and log-sequence-id. The HLog is not flushed after
* this transaction is written to the log.
*
*
* @param info
* @param tableName
* @param edits
@ -286,7 +282,7 @@ public interface HLog {
* Append a set of edits to the log. Log edits are keyed by (encoded)
* regionName, rowname, and log-sequence-id. The HLog is flushed after this
* transaction is written to the log.
*
*
* @param info
* @param tableName
* @param edits
@ -351,7 +347,7 @@ public interface HLog {
/**
* Get LowReplication-Roller status
*
*
* @return lowReplicationRollEnabled
*/
public boolean isLowReplicationRollEnabled();

View File

@ -28,27 +28,21 @@ import java.util.regex.Pattern;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.protobuf.generated.WAL.CompactionDescriptor;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.FSUtils;
public class HLogUtil {
static final Log LOG = LogFactory.getLog(HLogUtil.class);
/**
* @param family
* @return true if the column is a meta column
*/
public static boolean isMetaFamily(byte[] family) {
return Bytes.equals(HLog.METAFAMILY, family);
}
@SuppressWarnings("unchecked")
public static Class<? extends HLogKey> getKeyClass(Configuration conf) {
return (Class<? extends HLogKey>) conf.getClass(
@ -69,7 +63,7 @@ public class HLogUtil {
/**
* Pattern used to validate a HLog file name
*/
private static final Pattern pattern =
private static final Pattern pattern =
Pattern.compile(".*\\.\\d*("+HLog.META_HLOG_FILE_EXTN+")*");
/**
@ -84,40 +78,40 @@ public class HLogUtil {
/*
* Get a reader for the WAL.
*
*
* @param fs
*
*
* @param path
*
*
* @param conf
*
*
* @return A WAL reader. Close when done with it.
*
*
* @throws IOException
*
*
* public static HLog.Reader getReader(final FileSystem fs, final Path path,
* Configuration conf) throws IOException { try {
*
*
* if (logReaderClass == null) {
*
*
* logReaderClass = conf.getClass("hbase.regionserver.hlog.reader.impl",
* SequenceFileLogReader.class, Reader.class); }
*
*
*
*
* HLog.Reader reader = logReaderClass.newInstance(); reader.init(fs, path,
* conf); return reader; } catch (IOException e) { throw e; } catch (Exception
* e) { throw new IOException("Cannot get log reader", e); } }
*
*
* * Get a writer for the WAL.
*
*
* @param path
*
*
* @param conf
*
*
* @return A WAL writer. Close when done with it.
*
*
* @throws IOException
*
*
* public static HLog.Writer createWriter(final FileSystem fs, final Path
* path, Configuration conf) throws IOException { try { if (logWriterClass ==
* null) { logWriterClass =
@ -130,7 +124,7 @@ public class HLogUtil {
/**
* Construct the HLog directory name
*
*
* @param serverName
* Server name formatted as described in {@link ServerName}
* @return the relative HLog directory name, e.g.
@ -157,7 +151,7 @@ public class HLogUtil {
/**
* Move aside a bad edits file.
*
*
* @param fs
* @param edits
* Edits file to move aside.
@ -239,7 +233,7 @@ public class HLogUtil {
/**
* Returns sorted set of edit files made by wal-log splitter, excluding files
* with '.temp' suffix.
*
*
* @param fs
* @param regiondir
* @return Files in passed <code>regiondir</code> as a sorted set.
@ -287,4 +281,18 @@ public class HLogUtil {
}
return false;
}
/**
* Write the marker that a compaction has succeeded and is about to be committed.
* This provides info to the HMaster to allow it to recover the compaction if
* this regionserver dies in the middle (This part is not yet implemented). It also prevents the compaction from
* finishing if this regionserver has already lost its lease on the log.
*/
public static void writeCompactionMarker(HLog log, HTableDescriptor htd, HRegionInfo info, final CompactionDescriptor c)
throws IOException {
WALEdit e = WALEdit.createCompaction(c);
log.append(info, c.getTableName().toByteArray(), e,
EnvironmentEdgeManager.currentTimeMillis(), htd);
LOG.info("Appended compaction marker " + c);
}
}

View File

@ -27,8 +27,9 @@ import java.util.NavigableMap;
import java.util.TreeMap;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hbase.io.HeapSize;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.io.HeapSize;
import org.apache.hadoop.hbase.protobuf.generated.WAL.CompactionDescriptor;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ClassSize;
import org.apache.hadoop.io.Writable;
@ -69,6 +70,11 @@ import org.apache.hadoop.io.Writable;
*/
@InterfaceAudience.Private
public class WALEdit implements Writable, HeapSize {
// TODO: Make it so user cannot make a cf w/ this name. Make the illegal cf names. Ditto for row.
public static final byte [] METAFAMILY = Bytes.toBytes("METAFAMILY");
static final byte [] METAROW = Bytes.toBytes("METAROW");
static final byte[] COMPLETE_CACHE_FLUSH = Bytes.toBytes("HBASE::CACHEFLUSH");
static final byte[] COMPACTION = Bytes.toBytes("HBASE::COMPACTION");
private final int VERSION_2 = -1;
@ -80,12 +86,21 @@ public class WALEdit implements Writable, HeapSize {
public WALEdit() {
}
/**
* @param f
* @return True is <code>f</code> is {@link #METAFAMILY}
*/
public static boolean isMetaEditFamily(final byte [] f) {
return Bytes.equals(METAFAMILY, f);
}
public void setCompressionContext(final CompressionContext compressionContext) {
this.compressionContext = compressionContext;
}
public void add(KeyValue kv) {
public WALEdit add(KeyValue kv) {
this.kvs.add(kv);
return this;
}
public boolean isEmpty() {
@ -197,4 +212,26 @@ public class WALEdit implements Writable, HeapSize {
return sb.toString();
}
}
/**
* Create a compacion WALEdit
* @param c
* @return A WALEdit that has <code>c</code> serialized as its value
*/
public static WALEdit createCompaction(final CompactionDescriptor c) {
byte [] pbbytes = c.toByteArray();
KeyValue kv = new KeyValue(METAROW, METAFAMILY, COMPACTION, System.currentTimeMillis(), pbbytes);
return new WALEdit().add(kv); //replication scope null so that this won't be replicated
}
/**
* Deserialized and returns a CompactionDescriptor is the KeyValue contains one.
* @param kv the key value
* @return deserialized CompactionDescriptor or null.
*/
public static CompactionDescriptor getCompaction(KeyValue kv) throws IOException {
if (kv.matchingRow(METAROW) && kv.matchingColumn(METAFAMILY, COMPACTION)) {
return CompactionDescriptor.parseFrom(kv.getValue());
}
return null;
}
}

View File

@ -57,6 +57,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Waiter.Predicate;
import org.apache.hadoop.hbase.catalog.MetaEditor;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HConnection;
@ -65,7 +66,6 @@ import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.exceptions.MasterNotRunningException;
import org.apache.hadoop.hbase.exceptions.TableExistsException;
import org.apache.hadoop.hbase.exceptions.TableNotEnabledException;
@ -86,7 +86,6 @@ import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.HStore;
import org.apache.hadoop.hbase.regionserver.InternalScanner;
import org.apache.hadoop.hbase.regionserver.MultiVersionConsistencyControl;
import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FSUtils;
@ -1322,6 +1321,15 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility {
return rowCount;
}
public void loadNumericRows(final HTable t, final byte[] f, int startRow, int endRow) throws IOException {
for (int i = startRow; i < endRow; i++) {
byte[] data = Bytes.toBytes(String.valueOf(i));
Put put = new Put(data);
put.add(f, null, data);
t.put(put);
}
}
/**
* Return the number of rows in the given table.
*/
@ -1937,7 +1945,7 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility {
/*
* Retrieves a splittable region randomly from tableName
*
*
* @param tableName name of table
* @param maxAttempts maximum number of attempts, unlimited for value of -1
* @return the HRegion chosen, null if none was found within limit of maxAttempts
@ -1956,7 +1964,7 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility {
}
regCount = regions.size();
// There are chances that before we get the region for the table from an RS the region may
// be going for CLOSE. This may be because online schema change is enabled
// be going for CLOSE. This may be because online schema change is enabled
if (regCount > 0) {
idx = random.nextInt(regCount);
// if we have just tried this region, there is no need to try again
@ -1974,7 +1982,7 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility {
} while (maxAttempts == -1 || attempts < maxAttempts);
return null;
}
public MiniZooKeeperCluster getZkCluster() {
return zkCluster;
}
@ -2252,10 +2260,10 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility {
scanner.close();
return result;
}
/**
* Create region split keys between startkey and endKey
*
*
* @param startKey
* @param endKey
* @param numRegions the number of regions to be created. it has to be greater than 3.

View File

@ -676,6 +676,20 @@ public class MiniHBaseCluster extends HBaseCluster {
this.hbaseCluster.join();
}
public List<HRegion> findRegionsForTable(byte[] tableName) {
ArrayList<HRegion> ret = new ArrayList<HRegion>();
for (JVMClusterUtil.RegionServerThread rst : getRegionServerThreads()) {
HRegionServer hrs = rst.getRegionServer();
for (HRegion region : hrs.getOnlineRegions(tableName)) {
if (Bytes.equals(region.getTableDesc().getName(), tableName)) {
ret.add(region);
}
}
}
return ret;
}
protected int getRegionServerIndex(ServerName serverName) {
//we have a small number of region servers, this should be fine for now.
List<RegionServerThread> servers = getRegionServerThreads();

View File

@ -23,9 +23,6 @@ import static org.junit.Assert.assertEquals;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.After;
import org.junit.AfterClass;
@ -99,13 +96,7 @@ public class TestFullLogReconstruction {
// Load up the table with simple rows and count them
int initialCount = TEST_UTIL.loadTable(table, FAMILY);
Scan scan = new Scan();
ResultScanner results = table.getScanner(scan);
int count = 0;
for (Result res : results) {
count++;
}
results.close();
int count = TEST_UTIL.countRows(table);
assertEquals(initialCount, count);
@ -114,15 +105,8 @@ public class TestFullLogReconstruction {
}
TEST_UTIL.expireRegionServerSession(0);
scan = new Scan();
results = table.getScanner(scan);
int newCount = 0;
for (Result res : results) {
newCount++;
}
int newCount = TEST_UTIL.countRows(table);
assertEquals(count, newCount);
results.close();
table.close();
}
}
}

View File

@ -0,0 +1,303 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.commons.logging.impl.Log4JLogger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.regionserver.ConstantSizeRegionSplitPolicy;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.HStore;
import org.apache.hadoop.hbase.regionserver.RegionServerServices;
import org.apache.hadoop.hbase.regionserver.Store;
import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
import org.apache.hadoop.hbase.regionserver.wal.HLog;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.namenode.LeaseManager;
import org.apache.log4j.Level;
import org.junit.Test;
import org.junit.experimental.categories.Category;
/**
* Test for the case where a regionserver going down has enough cycles to do damage to regions
* that have actually been assigned elsehwere.
*
* <p>If we happen to assign a region before it fully done with in its old location -- i.e. it is on two servers at the
* same time -- all can work fine until the case where the region on the dying server decides to compact or otherwise
* change the region file set. The region in its new location will then get a surprise when it tries to do something
* w/ a file removed by the region in its old location on dying server.
*
* <p>Making a test for this case is a little tough in that even if a file is deleted up on the namenode,
* if the file was opened before the delete, it will continue to let reads happen until something changes the
* state of cached blocks in the dfsclient that was already open (a block from the deleted file is cleaned
* from the datanode by NN).
*
* <p>What we will do below is do an explicit check for existence on the files listed in the region that
* has had some files removed because of a compaction. This sort of hurry's along and makes certain what is a chance
* occurance.
*/
@Category(MediumTests.class)
public class TestIOFencing {
static final Log LOG = LogFactory.getLog(TestIOFencing.class);
static {
((Log4JLogger)FSNamesystem.LOG).getLogger().setLevel(Level.ALL);
((Log4JLogger)DataNode.LOG).getLogger().setLevel(Level.ALL);
((Log4JLogger)LeaseManager.LOG).getLogger().setLevel(Level.ALL);
((Log4JLogger)LogFactory.getLog("org.apache.hadoop.hdfs.server.namenode.FSNamesystem")).getLogger().setLevel(Level.ALL);
((Log4JLogger)DFSClient.LOG).getLogger().setLevel(Level.ALL);
((Log4JLogger)HLog.LOG).getLogger().setLevel(Level.ALL);
}
public abstract static class CompactionBlockerRegion extends HRegion {
volatile int compactCount = 0;
volatile CountDownLatch compactionsBlocked = new CountDownLatch(0);
volatile CountDownLatch compactionsWaiting = new CountDownLatch(0);
public CompactionBlockerRegion(Path tableDir, HLog log,
FileSystem fs, Configuration confParam, HRegionInfo info,
HTableDescriptor htd, RegionServerServices rsServices) {
super(tableDir, log, fs, confParam, info, htd, rsServices);
}
public void stopCompactions() {
compactionsBlocked = new CountDownLatch(1);
compactionsWaiting = new CountDownLatch(1);
}
public void allowCompactions() {
LOG.debug("allowing compactions");
compactionsBlocked.countDown();
}
public void waitForCompactionToBlock() throws IOException {
try {
LOG.debug("waiting for compaction to block");
compactionsWaiting.await();
LOG.debug("compaction block reached");
} catch (InterruptedException ex) {
throw new IOException(ex);
}
}
@Override
public boolean compact(CompactionContext compaction, Store store) throws IOException {
try {
return super.compact(compaction, store);
} finally {
compactCount++;
}
}
public int countStoreFiles() {
int count = 0;
for (Store store : stores.values()) {
count += store.getStorefilesCount();
}
return count;
}
}
/**
* An override of HRegion that allows us park compactions in a holding pattern and
* then when appropriate for the test, allow them proceed again.
*/
public static class BlockCompactionsInPrepRegion extends CompactionBlockerRegion {
public BlockCompactionsInPrepRegion(Path tableDir, HLog log,
FileSystem fs, Configuration confParam, HRegionInfo info,
HTableDescriptor htd, RegionServerServices rsServices) {
super(tableDir, log, fs, confParam, info, htd, rsServices);
}
@Override
protected void doRegionCompactionPrep() throws IOException {
compactionsWaiting.countDown();
try {
compactionsBlocked.await();
} catch (InterruptedException ex) {
throw new IOException();
}
super.doRegionCompactionPrep();
}
}
/**
* An override of HRegion that allows us park compactions in a holding pattern and
* then when appropriate for the test, allow them proceed again. This allows the compaction
* entry to go the WAL before blocking, but blocks afterwards
*/
public static class BlockCompactionsInCompletionRegion extends CompactionBlockerRegion {
public BlockCompactionsInCompletionRegion(Path tableDir, HLog log,
FileSystem fs, Configuration confParam, HRegionInfo info,
HTableDescriptor htd, RegionServerServices rsServices) {
super(tableDir, log, fs, confParam, info, htd, rsServices);
}
protected HStore instantiateHStore(final HColumnDescriptor family) throws IOException {
return new BlockCompactionsInCompletionHStore(this, family, this.conf);
}
}
public static class BlockCompactionsInCompletionHStore extends HStore {
CompactionBlockerRegion r;
protected BlockCompactionsInCompletionHStore(HRegion region, HColumnDescriptor family,
Configuration confParam) throws IOException {
super(region, family, confParam);
r = (CompactionBlockerRegion) region;
}
@Override
protected void completeCompaction(Collection<StoreFile> compactedFiles,
Collection<StoreFile> result) throws IOException {
try {
r.compactionsWaiting.countDown();
r.compactionsBlocked.await();
} catch (InterruptedException ex) {
throw new IOException(ex);
}
super.completeCompaction(compactedFiles, result);
}
}
private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
private final static byte[] TABLE_NAME = Bytes.toBytes("tabletest");
private final static byte[] FAMILY = Bytes.toBytes("family");
private static final int FIRST_BATCH_COUNT = 4000;
private static final int SECOND_BATCH_COUNT = FIRST_BATCH_COUNT;
/**
* Test that puts up a regionserver, starts a compaction on a loaded region but holds the
* compaction until after we have killed the server and the region has come up on
* a new regionserver altogether. This fakes the double assignment case where region in one
* location changes the files out from underneath a region being served elsewhere.
*/
@Test
public void testFencingAroundCompaction() throws Exception {
doTest(BlockCompactionsInPrepRegion.class);
}
/**
* Test that puts up a regionserver, starts a compaction on a loaded region but holds the
* compaction completion until after we have killed the server and the region has come up on
* a new regionserver altogether. This fakes the double assignment case where region in one
* location changes the files out from underneath a region being served elsewhere.
*/
@Test
public void testFencingAroundCompactionAfterWALSync() throws Exception {
doTest(BlockCompactionsInCompletionRegion.class);
}
public void doTest(Class<?> regionClass) throws Exception {
Configuration c = TEST_UTIL.getConfiguration();
// Insert our custom region
c.setClass(HConstants.REGION_IMPL, regionClass, HRegion.class);
c.setBoolean("dfs.support.append", true);
// Encourage plenty of flushes
c.setLong("hbase.hregion.memstore.flush.size", 200000);
c.set(HConstants.HBASE_REGION_SPLIT_POLICY_KEY, ConstantSizeRegionSplitPolicy.class.getName());
// Only run compaction when we tell it to
c.setInt("hbase.hstore.compactionThreshold", 1000);
c.setLong("hbase.hstore.blockingStoreFiles", 1000);
// Compact quickly after we tell it to!
c.setInt("hbase.regionserver.thread.splitcompactcheckfrequency", 1000);
LOG.info("Starting mini cluster");
TEST_UTIL.startMiniCluster(1);
CompactionBlockerRegion compactingRegion = null;
HBaseAdmin admin = null;
try {
LOG.info("Creating admin");
admin = new HBaseAdmin(c);
LOG.info("Creating table");
TEST_UTIL.createTable(TABLE_NAME, FAMILY);
HTable table = new HTable(c, TABLE_NAME);
LOG.info("Loading test table");
// Load some rows
TEST_UTIL.loadNumericRows(table, FAMILY, 0, FIRST_BATCH_COUNT);
// Find the region
List<HRegion> testRegions = TEST_UTIL.getMiniHBaseCluster().findRegionsForTable(TABLE_NAME);
assertEquals(1, testRegions.size());
compactingRegion = (CompactionBlockerRegion)testRegions.get(0);
assertTrue(compactingRegion.countStoreFiles() > 1);
final byte REGION_NAME[] = compactingRegion.getRegionName();
LOG.info("Blocking compactions");
compactingRegion.stopCompactions();
LOG.info("Asking for compaction");
admin.majorCompact(TABLE_NAME);
LOG.info("Waiting for compaction to be about to start");
compactingRegion.waitForCompactionToBlock();
LOG.info("Starting a new server");
RegionServerThread newServerThread = TEST_UTIL.getMiniHBaseCluster().startRegionServer();
HRegionServer newServer = newServerThread.getRegionServer();
LOG.info("Killing region server ZK lease");
TEST_UTIL.expireRegionServerSession(0);
CompactionBlockerRegion newRegion = null;
long startWaitTime = System.currentTimeMillis();
while (newRegion == null) {
LOG.info("Waiting for the new server to pick up the region " + Bytes.toString(REGION_NAME));
Thread.sleep(100);
newRegion = (CompactionBlockerRegion)newServer.getOnlineRegion(REGION_NAME);
assertTrue("Timed out waiting for new server to open region",
System.currentTimeMillis() - startWaitTime < 60000);
}
LOG.info("Allowing compaction to proceed");
compactingRegion.allowCompactions();
while (compactingRegion.compactCount == 0) {
Thread.sleep(1000);
}
// The server we killed stays up until the compaction that was started before it was killed completes. In logs
// you should see the old regionserver now going down.
LOG.info("Compaction finished");
// After compaction of old region finishes on the server that was going down, make sure that
// all the files we expect are still working when region is up in new location.
FileSystem fs = newRegion.getFilesystem();
for (String f: newRegion.getStoreFileList(new byte [][] {FAMILY})) {
assertTrue("After compaction, does not exist: " + f, fs.exists(new Path(f)));
}
// If we survive the split keep going...
// Now we make sure that the region isn't totally confused. Load up more rows.
TEST_UTIL.loadNumericRows(table, FAMILY, FIRST_BATCH_COUNT, FIRST_BATCH_COUNT + SECOND_BATCH_COUNT);
admin.majorCompact(TABLE_NAME);
startWaitTime = System.currentTimeMillis();
while (newRegion.compactCount == 0) {
Thread.sleep(1000);
assertTrue("New region never compacted", System.currentTimeMillis() - startWaitTime < 30000);
}
assertEquals(FIRST_BATCH_COUNT + SECOND_BATCH_COUNT, TEST_UTIL.countRows(table));
} finally {
if (compactingRegion != null) {
compactingRegion.allowCompactions();
}
admin.close();
TEST_UTIL.shutdownMiniCluster();
}
}
}

View File

@ -350,7 +350,7 @@ public class TestRowProcessorEndpoint {
// We can also inject some meta data to the walEdit
KeyValue metaKv = new KeyValue(
row, HLog.METAFAMILY,
row, WALEdit.METAFAMILY,
Bytes.toBytes("I just increment counter"),
Bytes.toBytes(counter));
walEdit.add(metaKv);

View File

@ -35,18 +35,18 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellComparator;
import org.apache.hadoop.hbase.CompatibilitySingletonFactory;
import org.apache.hadoop.hbase.exceptions.FailedSanityCheckException;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseTestCase;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HConstants.OperationStatusCode;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellComparator;
import org.apache.hadoop.hbase.HDFSBlocksDistribution;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
@ -58,6 +58,7 @@ import org.apache.hadoop.hbase.MultithreadedTestUtil.RepeatingTestThread;
import org.apache.hadoop.hbase.MultithreadedTestUtil.TestThread;
import org.apache.hadoop.hbase.client.Append;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Increment;
@ -65,7 +66,7 @@ import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.exceptions.FailedSanityCheckException;
import org.apache.hadoop.hbase.exceptions.NoSuchColumnFamilyException;
import org.apache.hadoop.hbase.exceptions.NotServingRegionException;
import org.apache.hadoop.hbase.exceptions.WrongRegionException;
@ -82,6 +83,8 @@ import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
import org.apache.hadoop.hbase.monitoring.TaskMonitor;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.generated.WAL.CompactionDescriptor;
import org.apache.hadoop.hbase.regionserver.HRegion.RegionScannerImpl;
import org.apache.hadoop.hbase.regionserver.wal.HLog;
import org.apache.hadoop.hbase.regionserver.wal.HLogFactory;
@ -382,6 +385,95 @@ public class TestHRegion extends HBaseTestCase {
}
}
@Test
public void testRecoveredEditsReplayCompaction() throws Exception {
String method = "testRecoveredEditsReplayCompaction";
byte[] tableName = Bytes.toBytes(method);
byte[] family = Bytes.toBytes("family");
this.region = initHRegion(tableName, method, conf, family);
try {
Path regiondir = region.getRegionFileSystem().getRegionDir();
FileSystem fs = region.getRegionFileSystem().getFileSystem();
byte[] regionName = region.getRegionInfo().getEncodedNameAsBytes();
long maxSeqId = 3;
long minSeqId = 0;
for (long i = minSeqId; i < maxSeqId; i++) {
Put put = new Put(Bytes.toBytes(i));
put.add(family, Bytes.toBytes(i), Bytes.toBytes(i));
region.put(put);
region.flushcache();
}
//this will create a region with 3 files
assertEquals(3, region.getStore(family).getStorefilesCount());
List<Path> storeFiles = new ArrayList<Path>(3);
for (StoreFile sf : region.getStore(family).getStorefiles()) {
storeFiles.add(sf.getPath());
}
//disable compaction completion
conf.setBoolean("hbase.hstore.compaction.complete",false);
region.compactStores();
//ensure that nothing changed
assertEquals(3, region.getStore(family).getStorefilesCount());
//now find the compacted file, and manually add it to the recovered edits
Path tmpDir = region.getRegionFileSystem().getTempDir();
FileStatus[] files = region.getRegionFileSystem().getFileSystem().listStatus(tmpDir);
assertEquals(1, files.length);
//move the file inside region dir
Path newFile = region.getRegionFileSystem().commitStoreFile(Bytes.toString(family), files[0].getPath());
CompactionDescriptor compactionDescriptor = ProtobufUtil.toCompactionDescriptor(
this.region.getRegionInfo(), family,
storeFiles, Lists.newArrayList(newFile),
region.getRegionFileSystem().getStoreDir(Bytes.toString(family)));
HLogUtil.writeCompactionMarker(region.getLog(), this.region.getTableDesc(),
this.region.getRegionInfo(), compactionDescriptor);
Path recoveredEditsDir = HLogUtil.getRegionDirRecoveredEditsDir(regiondir);
Path recoveredEdits = new Path(recoveredEditsDir, String.format("%019d", 1000));
fs.create(recoveredEdits);
HLog.Writer writer = HLogFactory.createWriter(fs, recoveredEdits, conf);
long time = System.nanoTime();
writer.append(new HLog.Entry(new HLogKey(regionName, tableName, 10, time, HConstants.DEFAULT_CLUSTER_ID),
WALEdit.createCompaction(compactionDescriptor)));
writer.close();
//close the region now, and reopen again
HTableDescriptor htd = region.getTableDesc();
HRegionInfo info = region.getRegionInfo();
region.close();
region = HRegion.openHRegion(conf, fs, regiondir.getParent().getParent(), info, htd, null);
//now check whether we have only one store file, the compacted one
Collection<StoreFile> sfs = region.getStore(family).getStorefiles();
for (StoreFile sf : sfs) {
LOG.info(sf.getPath());
}
assertEquals(1, region.getStore(family).getStorefilesCount());
files = region.getRegionFileSystem().getFileSystem().listStatus(tmpDir);
assertEquals(0, files.length);
for (long i = minSeqId; i < maxSeqId; i++) {
Get get = new Get(Bytes.toBytes(i));
Result result = region.get(get);
byte[] value = result.getValue(family, Bytes.toBytes(i));
assertEquals(Bytes.toBytes(i), value);
}
} finally {
HRegion.closeHRegion(this.region);
this.region = null;
}
}
public void testGetWhileRegionClose() throws IOException {
Configuration hc = initSplit();
int numRows = 100;