HBASE-8497 Protobuf WAL also needs a trailer (Himanshu)

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1485866 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Zhihong Yu 2013-05-23 21:59:10 +00:00
parent 62e1137c94
commit 80e0cbd96e
10 changed files with 623 additions and 14 deletions

View File

@ -2922,6 +2922,306 @@ public final class WALProtos {
// @@protoc_insertion_point(class_scope:CompactionDescriptor)
}
public interface WALTrailerOrBuilder
extends com.google.protobuf.MessageOrBuilder {
}
public static final class WALTrailer extends
com.google.protobuf.GeneratedMessage
implements WALTrailerOrBuilder {
// Use WALTrailer.newBuilder() to construct.
private WALTrailer(Builder builder) {
super(builder);
}
private WALTrailer(boolean noInit) {}
private static final WALTrailer defaultInstance;
public static WALTrailer getDefaultInstance() {
return defaultInstance;
}
public WALTrailer getDefaultInstanceForType() {
return defaultInstance;
}
public static final com.google.protobuf.Descriptors.Descriptor
getDescriptor() {
return org.apache.hadoop.hbase.protobuf.generated.WALProtos.internal_static_WALTrailer_descriptor;
}
protected com.google.protobuf.GeneratedMessage.FieldAccessorTable
internalGetFieldAccessorTable() {
return org.apache.hadoop.hbase.protobuf.generated.WALProtos.internal_static_WALTrailer_fieldAccessorTable;
}
private void initFields() {
}
private byte memoizedIsInitialized = -1;
public final boolean isInitialized() {
byte isInitialized = memoizedIsInitialized;
if (isInitialized != -1) return isInitialized == 1;
memoizedIsInitialized = 1;
return true;
}
public void writeTo(com.google.protobuf.CodedOutputStream output)
throws java.io.IOException {
getSerializedSize();
getUnknownFields().writeTo(output);
}
private int memoizedSerializedSize = -1;
public int getSerializedSize() {
int size = memoizedSerializedSize;
if (size != -1) return size;
size = 0;
size += getUnknownFields().getSerializedSize();
memoizedSerializedSize = size;
return size;
}
private static final long serialVersionUID = 0L;
@java.lang.Override
protected java.lang.Object writeReplace()
throws java.io.ObjectStreamException {
return super.writeReplace();
}
@java.lang.Override
public boolean equals(final java.lang.Object obj) {
if (obj == this) {
return true;
}
if (!(obj instanceof org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALTrailer)) {
return super.equals(obj);
}
org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALTrailer other = (org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALTrailer) obj;
boolean result = true;
result = result &&
getUnknownFields().equals(other.getUnknownFields());
return result;
}
@java.lang.Override
public int hashCode() {
int hash = 41;
hash = (19 * hash) + getDescriptorForType().hashCode();
hash = (29 * hash) + getUnknownFields().hashCode();
return hash;
}
public static org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALTrailer parseFrom(
com.google.protobuf.ByteString data)
throws com.google.protobuf.InvalidProtocolBufferException {
return newBuilder().mergeFrom(data).buildParsed();
}
public static org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALTrailer parseFrom(
com.google.protobuf.ByteString data,
com.google.protobuf.ExtensionRegistryLite extensionRegistry)
throws com.google.protobuf.InvalidProtocolBufferException {
return newBuilder().mergeFrom(data, extensionRegistry)
.buildParsed();
}
public static org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALTrailer parseFrom(byte[] data)
throws com.google.protobuf.InvalidProtocolBufferException {
return newBuilder().mergeFrom(data).buildParsed();
}
public static org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALTrailer parseFrom(
byte[] data,
com.google.protobuf.ExtensionRegistryLite extensionRegistry)
throws com.google.protobuf.InvalidProtocolBufferException {
return newBuilder().mergeFrom(data, extensionRegistry)
.buildParsed();
}
public static org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALTrailer parseFrom(java.io.InputStream input)
throws java.io.IOException {
return newBuilder().mergeFrom(input).buildParsed();
}
public static org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALTrailer parseFrom(
java.io.InputStream input,
com.google.protobuf.ExtensionRegistryLite extensionRegistry)
throws java.io.IOException {
return newBuilder().mergeFrom(input, extensionRegistry)
.buildParsed();
}
public static org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALTrailer parseDelimitedFrom(java.io.InputStream input)
throws java.io.IOException {
Builder builder = newBuilder();
if (builder.mergeDelimitedFrom(input)) {
return builder.buildParsed();
} else {
return null;
}
}
public static org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALTrailer parseDelimitedFrom(
java.io.InputStream input,
com.google.protobuf.ExtensionRegistryLite extensionRegistry)
throws java.io.IOException {
Builder builder = newBuilder();
if (builder.mergeDelimitedFrom(input, extensionRegistry)) {
return builder.buildParsed();
} else {
return null;
}
}
public static org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALTrailer parseFrom(
com.google.protobuf.CodedInputStream input)
throws java.io.IOException {
return newBuilder().mergeFrom(input).buildParsed();
}
public static org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALTrailer parseFrom(
com.google.protobuf.CodedInputStream input,
com.google.protobuf.ExtensionRegistryLite extensionRegistry)
throws java.io.IOException {
return newBuilder().mergeFrom(input, extensionRegistry)
.buildParsed();
}
public static Builder newBuilder() { return Builder.create(); }
public Builder newBuilderForType() { return newBuilder(); }
public static Builder newBuilder(org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALTrailer prototype) {
return newBuilder().mergeFrom(prototype);
}
public Builder toBuilder() { return newBuilder(this); }
@java.lang.Override
protected Builder newBuilderForType(
com.google.protobuf.GeneratedMessage.BuilderParent parent) {
Builder builder = new Builder(parent);
return builder;
}
public static final class Builder extends
com.google.protobuf.GeneratedMessage.Builder<Builder>
implements org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALTrailerOrBuilder {
public static final com.google.protobuf.Descriptors.Descriptor
getDescriptor() {
return org.apache.hadoop.hbase.protobuf.generated.WALProtos.internal_static_WALTrailer_descriptor;
}
protected com.google.protobuf.GeneratedMessage.FieldAccessorTable
internalGetFieldAccessorTable() {
return org.apache.hadoop.hbase.protobuf.generated.WALProtos.internal_static_WALTrailer_fieldAccessorTable;
}
// Construct using org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALTrailer.newBuilder()
private Builder() {
maybeForceBuilderInitialization();
}
private Builder(BuilderParent parent) {
super(parent);
maybeForceBuilderInitialization();
}
private void maybeForceBuilderInitialization() {
if (com.google.protobuf.GeneratedMessage.alwaysUseFieldBuilders) {
}
}
private static Builder create() {
return new Builder();
}
public Builder clear() {
super.clear();
return this;
}
public Builder clone() {
return create().mergeFrom(buildPartial());
}
public com.google.protobuf.Descriptors.Descriptor
getDescriptorForType() {
return org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALTrailer.getDescriptor();
}
public org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALTrailer getDefaultInstanceForType() {
return org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALTrailer.getDefaultInstance();
}
public org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALTrailer build() {
org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALTrailer result = buildPartial();
if (!result.isInitialized()) {
throw newUninitializedMessageException(result);
}
return result;
}
private org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALTrailer buildParsed()
throws com.google.protobuf.InvalidProtocolBufferException {
org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALTrailer result = buildPartial();
if (!result.isInitialized()) {
throw newUninitializedMessageException(
result).asInvalidProtocolBufferException();
}
return result;
}
public org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALTrailer buildPartial() {
org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALTrailer result = new org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALTrailer(this);
onBuilt();
return result;
}
public Builder mergeFrom(com.google.protobuf.Message other) {
if (other instanceof org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALTrailer) {
return mergeFrom((org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALTrailer)other);
} else {
super.mergeFrom(other);
return this;
}
}
public Builder mergeFrom(org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALTrailer other) {
if (other == org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALTrailer.getDefaultInstance()) return this;
this.mergeUnknownFields(other.getUnknownFields());
return this;
}
public final boolean isInitialized() {
return true;
}
public Builder mergeFrom(
com.google.protobuf.CodedInputStream input,
com.google.protobuf.ExtensionRegistryLite extensionRegistry)
throws java.io.IOException {
com.google.protobuf.UnknownFieldSet.Builder unknownFields =
com.google.protobuf.UnknownFieldSet.newBuilder(
this.getUnknownFields());
while (true) {
int tag = input.readTag();
switch (tag) {
case 0:
this.setUnknownFields(unknownFields.build());
onChanged();
return this;
default: {
if (!parseUnknownField(input, unknownFields,
extensionRegistry, tag)) {
this.setUnknownFields(unknownFields.build());
onChanged();
return this;
}
break;
}
}
}
}
// @@protoc_insertion_point(builder_scope:WALTrailer)
}
static {
defaultInstance = new WALTrailer(true);
defaultInstance.initFields();
}
// @@protoc_insertion_point(class_scope:WALTrailer)
}
private static com.google.protobuf.Descriptors.Descriptor
internal_static_WALHeader_descriptor;
private static
@ -2942,6 +3242,11 @@ public final class WALProtos {
private static
com.google.protobuf.GeneratedMessage.FieldAccessorTable
internal_static_CompactionDescriptor_fieldAccessorTable;
private static com.google.protobuf.Descriptors.Descriptor
internal_static_WALTrailer_descriptor;
private static
com.google.protobuf.GeneratedMessage.FieldAccessorTable
internal_static_WALTrailer_fieldAccessorTable;
public static com.google.protobuf.Descriptors.FileDescriptor
getDescriptor() {
@ -2962,11 +3267,11 @@ public final class WALProtos {
"ionDescriptor\022\021\n\ttableName\030\001 \002(\014\022\031\n\021enco" +
"dedRegionName\030\002 \002(\014\022\022\n\nfamilyName\030\003 \002(\014\022",
"\027\n\017compactionInput\030\004 \003(\t\022\030\n\020compactionOu" +
"tput\030\005 \003(\t\022\024\n\014storeHomeDir\030\006 \002(\t*F\n\tScop" +
"eType\022\033\n\027REPLICATION_SCOPE_LOCAL\020\000\022\034\n\030RE" +
"PLICATION_SCOPE_GLOBAL\020\001B?\n*org.apache.h" +
"adoop.hbase.protobuf.generatedB\tWALProto" +
"sH\001\210\001\000\240\001\001"
"tput\030\005 \003(\t\022\024\n\014storeHomeDir\030\006 \002(\t\"\014\n\nWALT" +
"railer*F\n\tScopeType\022\033\n\027REPLICATION_SCOPE" +
"_LOCAL\020\000\022\034\n\030REPLICATION_SCOPE_GLOBAL\020\001B?" +
"\n*org.apache.hadoop.hbase.protobuf.gener" +
"atedB\tWALProtosH\001\210\001\000\240\001\001"
};
com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@ -3005,6 +3310,14 @@ public final class WALProtos {
new java.lang.String[] { "TableName", "EncodedRegionName", "FamilyName", "CompactionInput", "CompactionOutput", "StoreHomeDir", },
org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor.class,
org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor.Builder.class);
internal_static_WALTrailer_descriptor =
getDescriptor().getMessageTypes().get(4);
internal_static_WALTrailer_fieldAccessorTable = new
com.google.protobuf.GeneratedMessage.FieldAccessorTable(
internal_static_WALTrailer_descriptor,
new java.lang.String[] { },
org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALTrailer.class,
org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALTrailer.Builder.class);
return null;
}
};

View File

@ -74,3 +74,10 @@ message CompactionDescriptor {
repeated string compactionOutput = 5;
required string storeHomeDir = 6;
}
/**
* A trailer that is appended to the end of a properly closed HLog WAL file.
* If missing, this is either a legacy or a corrupted WAL file.
*/
message WALTrailer {
}

View File

@ -35,6 +35,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.exceptions.FailedLogCloseException;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALTrailer;
import org.apache.hadoop.io.Writable;
@ -48,6 +49,14 @@ public interface HLog {
/** The META region's HLog filename extension */
public static final String META_HLOG_FILE_EXTN = ".meta";
/**
* Configuration name of HLog Trailer's warning size. If a waltrailer's size is greater than the
* configured size, a warning is logged. This is used with Protobuf reader/writer.
*/
public static final String WAL_TRAILER_WARN_SIZE =
"hbase.regionserver.waltrailer.warn.size";
public static final int DEFAULT_WAL_TRAILER_WARN_SIZE = 1024*1024; // 1MB
static final Pattern EDITFILES_NAME_PATTERN = Pattern.compile("-?[0-9]+");
public static final String RECOVERED_LOG_TMPFILE_SUFFIX = ".temp";
@ -71,6 +80,12 @@ public interface HLog {
long getPosition() throws IOException;
void reset() throws IOException;
/**
* @return the WALTrailer of the current HLog. It may be null in case of legacy or corrupt WAL
* files.
*/
WALTrailer getWALTrailer();
}
public interface Writer {
@ -83,6 +98,12 @@ public interface HLog {
void append(Entry entry) throws IOException;
long getLength() throws IOException;
/**
* Sets HLog's WALTrailer. This trailer is appended at the end of WAL on closing.
* @param walTrailer trailer to append to WAL.
*/
void setWALTrailer(WALTrailer walTrailer);
}
/**

View File

@ -21,32 +21,45 @@ package org.apache.hadoop.hbase.regionserver.wal;
import java.io.EOFException;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Arrays;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.codec.Codec;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALKey;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALTrailer;
import org.apache.hadoop.hbase.util.Bytes;
import com.google.protobuf.InvalidProtocolBufferException;
/**
* Reader for protobuf-based WAL.
* A Protobuf based WAL has the following structure:
* <p>
* &lt;PB_WAL_MAGIC&gt;&lt;WALHeader&gt;&lt;WALEdits&gt;...&lt;WALEdits&gt;&lt;Trailer&gt;
* &lt;TrailerSize&gt; &lt;PB_WAL_COMPLETE_MAGIC&gt;
* </p>
* The Reader reads meta information (WAL Compression state, WALTrailer, etc) in
* {@link ProtobufLogReader#initReader(FSDataInputStream)}. A WALTrailer is an extensible structure
* which is appended at the end of the WAL. This is empty for now; it can contain some meta
* information such as Region level stats, etc in future.
*/
@InterfaceAudience.Private
public class ProtobufLogReader extends ReaderBase {
private static final Log LOG = LogFactory.getLog(ProtobufLogReader.class);
static final byte[] PB_WAL_MAGIC = Bytes.toBytes("PWAL");
static final byte[] PB_WAL_COMPLETE_MAGIC = Bytes.toBytes("LAWP");
private FSDataInputStream inputStream;
private Codec.Decoder cellDecoder;
private WALCellCodec.ByteStringUncompressor byteStringUncompressor;
private boolean hasCompression = false;
// walEditsStopOffset is the position of the last byte to read. After reading the last WALEdit entry
// in the hlog, the inputstream's position is equal to walEditsStopOffset.
private long walEditsStopOffset;
private boolean trailerPresent;
public ProtobufLogReader() {
super();
@ -97,7 +110,67 @@ public class ProtobufLogReader extends ReaderBase {
this.hasCompression = header.hasHasCompression() && header.getHasCompression();
}
this.inputStream = stream;
this.walEditsStopOffset = this.fileLength;
long currentPosition = stream.getPos();
trailerPresent = setTrailerIfPresent();
this.seekOnFs(currentPosition);
if (LOG.isDebugEnabled()) {
LOG.debug("After reading the trailer: walEditsStopOffset: " + this.walEditsStopOffset
+ ", fileLength: " + this.fileLength + ", " + "trailerPresent: " + trailerPresent);
}
}
/**
* To check whether a trailer is present in a WAL, it seeks to position (fileLength -
* PB_WAL_COMPLETE_MAGIC.size() - Bytes.SIZEOF_INT). It reads the int value to know the size of
* the trailer, and checks whether the trailer is present at the end or not by comparing the last
* PB_WAL_COMPLETE_MAGIC.size() bytes. In case trailer is not present, it returns false;
* otherwise, sets the trailer and sets this.walEditsStopOffset variable up to the point just
* before the trailer.
* <ul>
* The trailer is ignored in case:
* <li>fileLength is 0 or not correct (when file is under recovery, etc).
* <li>the trailer size is negative.
* </ul>
* <p>
* In case the trailer size > this.trailerMaxSize, it is read after a WARN message.
* @return true if a valid trailer is present
* @throws IOException
*/
private boolean setTrailerIfPresent() {
try {
long trailerSizeOffset = this.fileLength - (PB_WAL_COMPLETE_MAGIC.length + Bytes.SIZEOF_INT);
if (trailerSizeOffset <= 0) return false;// no trailer possible.
this.seekOnFs(trailerSizeOffset);
// read the int as trailer size.
int trailerSize = this.inputStream.readInt();
ByteBuffer buf = ByteBuffer.allocate(ProtobufLogReader.PB_WAL_COMPLETE_MAGIC.length);
this.inputStream.readFully(buf.array(), buf.arrayOffset(), buf.capacity());
if (!Arrays.equals(buf.array(), PB_WAL_COMPLETE_MAGIC)) {
LOG.warn("No trailer found.");
return false;
}
if (trailerSize < 0) {
LOG.warn("Invalid trailer Size " + trailerSize + ", ignoring the trailer");
return false;
} else if (trailerSize > this.trailerWarnSize) {
// continue reading after warning the user.
LOG.warn("Please investigate WALTrailer usage. Trailer size > maximum configured size : "
+ trailerSize + " > " + this.trailerWarnSize);
}
// seek to the position where trailer starts.
long positionOfTrailer = trailerSizeOffset - trailerSize;
this.seekOnFs(positionOfTrailer);
// read the trailer.
buf = ByteBuffer.allocate(trailerSize);// for trailer.
this.inputStream.readFully(buf.array(), buf.arrayOffset(), buf.capacity());
trailer = WALTrailer.parseFrom(buf.array());
this.walEditsStopOffset = positionOfTrailer;
return true;
} catch (IOException ioe) {
LOG.warn("Got IOE while reading the trailer. Continuing as if no trailer is present.", ioe);
}
return false;
}
@Override
@ -117,6 +190,7 @@ public class ProtobufLogReader extends ReaderBase {
@Override
protected boolean readNext(HLog.Entry entry) throws IOException {
while (true) {
if (trailerPresent && this.inputStream.getPos() == this.walEditsStopOffset) return false;
WALKey.Builder builder = WALKey.newBuilder();
boolean hasNext = false;
try {
@ -162,6 +236,12 @@ public class ProtobufLogReader extends ReaderBase {
LOG.error(message);
throw new IOException(message, ex);
}
if (trailerPresent && this.inputStream.getPos() > this.walEditsStopOffset) {
LOG.error("Read WALTrailer while reading WALEdits. hlog: " + this.path
+ ", inputStream.getPos(): " + this.inputStream.getPos() + ", walEditsStopOffset: "
+ this.walEditsStopOffset);
throw new IOException("Read WALTrailer while reading WALEdits");
}
return true;
}
}
@ -185,6 +265,11 @@ public class ProtobufLogReader extends ReaderBase {
return null;
}
@Override
public WALTrailer getWALTrailer() {
return trailer;
}
@Override
protected void seekOnFs(long pos) throws IOException {
this.inputStream.seek(pos);

View File

@ -33,6 +33,7 @@ import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.codec.Codec;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALHeader;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALTrailer;
/**
* Writer for protobuf-based WAL.
@ -43,7 +44,11 @@ public class ProtobufLogWriter implements HLog.Writer {
private FSDataOutputStream output;
private Codec.Encoder cellEncoder;
private WALCellCodec.ByteStringCompressor compressor;
private boolean trailerWritten;
private WALTrailer trailer;
// maximum size of the wal Trailer in bytes. If a user writes/reads a trailer with size larger
// than this size, it is written/read respectively, with a WARN message in the log.
private int trailerWarnSize;
/** Context used by our wal dictionary compressor.
* Null if we're not to do our custom dictionary compression. */
@ -64,6 +69,8 @@ public class ProtobufLogWriter implements HLog.Writer {
throw new IOException("Failed to initiate CompressionContext", e);
}
}
this.trailerWarnSize = conf.getInt(HLog.WAL_TRAILER_WARN_SIZE,
HLog.DEFAULT_WAL_TRAILER_WARN_SIZE);
int bufferSize = FSUtils.getDefaultBufferSize(fs);
short replication = (short)conf.getInt(
"hbase.regionserver.hlog.replication", FSUtils.getDefaultReplication(fs, path));
@ -78,6 +85,8 @@ public class ProtobufLogWriter implements HLog.Writer {
if (doCompress) {
this.compressor = codec.getByteStringCompressor();
}
// instantiate trailer to default value.
trailer = WALTrailer.newBuilder().build();
LOG.debug("Writing protobuf WAL; path=" + path + ", compression=" + doCompress);
}
@ -96,6 +105,7 @@ public class ProtobufLogWriter implements HLog.Writer {
public void close() throws IOException {
if (this.output != null) {
try {
if (!trailerWritten) writeWALTrailer();
this.output.close();
} catch (NullPointerException npe) {
// Can get a NPE coming up from down in DFSClient$DFSOutputStream#close
@ -105,6 +115,28 @@ public class ProtobufLogWriter implements HLog.Writer {
}
}
private void writeWALTrailer() {
try {
int trailerSize = 0;
if (this.trailer == null) {
// use default trailer.
LOG.warn("WALTrailer is null. Continuing with default.");
this.trailer = WALTrailer.newBuilder().build();
trailerSize = this.trailer.getSerializedSize();
} else if ((trailerSize = this.trailer.getSerializedSize()) > this.trailerWarnSize) {
// continue writing after warning the user.
LOG.warn("Please investigate WALTrailer usage. Trailer size > maximum size : " +
trailerSize + " > " + this.trailerWarnSize);
}
this.trailer.writeTo(output);
this.output.writeInt(trailerSize);
this.output.write(ProtobufLogReader.PB_WAL_COMPLETE_MAGIC);
this.trailerWritten = true;
} catch (IOException ioe) {
LOG.error("Got IOException while writing trailer", ioe);
}
}
@Override
public void sync() throws IOException {
try {
@ -129,4 +161,9 @@ public class ProtobufLogWriter implements HLog.Writer {
public FSDataOutputStream getStream() {
return this.output;
}
@Override
public void setWALTrailer(WALTrailer walTrailer) {
this.trailer = walTrailer;
}
}

View File

@ -26,6 +26,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALTrailer;
@InterfaceAudience.Private
public abstract class ReaderBase implements HLog.Reader {
@ -33,6 +34,11 @@ public abstract class ReaderBase implements HLog.Reader {
protected FileSystem fs;
protected Path path;
protected long edit = 0;
protected long fileLength;
protected WALTrailer trailer;
// maximum size of the wal Trailer in bytes. If a user writes/reads a trailer with size larger
// than this size, it is written/read respectively, with a WARN message in the log.
protected int trailerWarnSize;
/**
* Compression context to use reading. Can be null if no compression.
*/
@ -51,7 +57,9 @@ public abstract class ReaderBase implements HLog.Reader {
this.conf = conf;
this.path = path;
this.fs = fs;
this.fileLength = this.fs.getFileStatus(path).getLen();
this.trailerWarnSize = conf.getInt(HLog.WAL_TRAILER_WARN_SIZE,
HLog.DEFAULT_WAL_TRAILER_WARN_SIZE);
initReader(stream);
boolean compression = hasCompression();
@ -134,4 +142,8 @@ public abstract class ReaderBase implements HLog.Reader {
*/
protected abstract void seekOnFs(long pos) throws IOException;
@Override
public WALTrailer getWALTrailer() {
return null;
}
}

View File

@ -25,7 +25,6 @@ import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.util.NavigableMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
@ -35,8 +34,8 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.regionserver.wal.HLog.Entry;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.SequenceFile.Metadata;
import org.apache.hadoop.io.Text;
@InterfaceAudience.Private
public class SequenceFileLogReader extends ReaderBase {

View File

@ -33,6 +33,7 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALTrailer;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.SequenceFile.CompressionType;
@ -232,4 +233,11 @@ public class SequenceFileLogWriter implements HLog.Writer {
public FSDataOutputStream getWriterFSDataOutputStream() {
return this.writer_out;
}
/**
* This method is empty as trailer is added only in Protobuf based hlog readers/writers.
*/
@Override
public void setWALTrailer(WALTrailer walTrailer) {
}
}

View File

@ -822,6 +822,93 @@ public class TestHLog {
}
}
/**
* Reads the WAL with and without WALTrailer.
* @throws IOException
*/
@Test
public void testWALTrailer() throws IOException {
// read With trailer.
doRead(true);
// read without trailer
doRead(false);
}
/**
* Appends entries in the WAL and reads it.
* @param withTrailer If 'withTrailer' is true, it calls a close on the WALwriter before reading
* so that a trailer is appended to the WAL. Otherwise, it starts reading after the sync
* call. This means that reader is not aware of the trailer. In this scenario, if the
* reader tries to read the trailer in its next() call, it returns false from
* ProtoBufLogReader.
* @throws IOException
*/
private void doRead(boolean withTrailer) throws IOException {
final int columnCount = 5;
final int recordCount = 5;
final byte[] tableName = Bytes.toBytes("tablename");
final byte[] row = Bytes.toBytes("row");
long timestamp = System.currentTimeMillis();
Path path = new Path(dir, "temphlog");
HLog.Writer writer = null;
HLog.Reader reader = null;
try {
HRegionInfo hri = new HRegionInfo(tableName,
HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW);
HTableDescriptor htd = new HTableDescriptor(tableName);
fs.mkdirs(dir);
// Write log in pb format.
writer = HLogFactory.createWriter(fs, path, conf);
for (int i = 0; i < recordCount; ++i) {
HLogKey key = new HLogKey(
hri.getEncodedNameAsBytes(), tableName, i, timestamp, HConstants.DEFAULT_CLUSTER_ID);
WALEdit edit = new WALEdit();
for (int j = 0; j < columnCount; ++j) {
if (i == 0) {
htd.addFamily(new HColumnDescriptor("column" + j));
}
String value = i + "" + j;
edit.add(new KeyValue(row, row, row, timestamp, Bytes.toBytes(value)));
}
writer.append(new HLog.Entry(key, edit));
}
writer.sync();
if (withTrailer) writer.close();
// Now read the log using standard means.
reader = HLogFactory.createReader(fs, path, conf);
assertTrue(reader instanceof ProtobufLogReader);
if (withTrailer) {
assertNotNull(reader.getWALTrailer());
} else {
assertNull(reader.getWALTrailer());
}
for (int i = 0; i < recordCount; ++i) {
HLog.Entry entry = reader.next();
assertNotNull(entry);
assertEquals(columnCount, entry.getEdit().size());
assertArrayEquals(hri.getEncodedNameAsBytes(), entry.getKey().getEncodedRegionName());
assertArrayEquals(tableName, entry.getKey().getTablename());
int idx = 0;
for (KeyValue val : entry.getEdit().getKeyValues()) {
assertTrue(Bytes.equals(row, val.getRow()));
String value = i + "" + idx;
assertArrayEquals(Bytes.toBytes(value), val.getValue());
idx++;
}
}
HLog.Entry entry = reader.next();
assertNull(entry);
} finally {
if (writer != null) {
writer.close();
}
if (reader != null) {
reader.close();
}
}
}
static class DumbWALActionsListener implements WALActionsListener {
int increments = 0;

View File

@ -133,6 +133,7 @@ public class TestHLogSplit {
INSERT_GARBAGE_IN_THE_MIDDLE,
APPEND_GARBAGE,
TRUNCATE,
TRUNCATE_TRAILER
}
@BeforeClass
@ -661,6 +662,38 @@ public class TestHLogSplit {
assertEquals(archivedLogs.length, 0);
}
@Test
public void testCorruptWALTrailer() throws IOException {
conf.setBoolean(HBASE_SKIP_ERRORS, false);
final String REGION = "region__1";
REGIONS.removeAll(REGIONS);
REGIONS.add(REGION);
int entryCount = 10;
Path c1 = new Path(HLOGDIR, HLOG_FILE_PREFIX + "0");
generateHLogs(1, entryCount, -1);
corruptHLog(c1, Corruptions.TRUNCATE_TRAILER, true, fs);
fs.initialize(fs.getUri(), conf);
HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf,
HBASEDIR, HLOGDIR, OLDLOGDIR, fs);
logSplitter.splitLog();
Path splitLog = getLogForRegion(HBASEDIR, TABLE_NAME, REGION);
int actualCount = 0;
HLog.Reader in = HLogFactory.createReader(fs, splitLog, conf);
@SuppressWarnings("unused")
HLog.Entry entry;
while ((entry = in.next()) != null) ++actualCount;
assertEquals(entryCount, actualCount);
// should not have stored the EOF files as corrupt
FileStatus[] archivedLogs = fs.listStatus(CORRUPTDIR);
assertEquals(archivedLogs.length, 0);
}
@Test
public void testLogsGetArchivedAfterSplit() throws IOException {
conf.setBoolean(HBASE_SKIP_ERRORS, false);
@ -1462,9 +1495,16 @@ public class TestHLogSplit {
case TRUNCATE:
fs.delete(path, false);
out = fs.create(path);
out.write(corrupted_bytes, 0, fileSize-32);
out.write(corrupted_bytes, 0, fileSize
- (32 + ProtobufLogReader.PB_WAL_COMPLETE_MAGIC.length + Bytes.SIZEOF_INT));
closeOrFlush(close, out);
break;
case TRUNCATE_TRAILER:
fs.delete(path, false);
out = fs.create(path);
out.write(corrupted_bytes, 0, fileSize - Bytes.SIZEOF_INT);// trailer is truncated.
closeOrFlush(close, out);
break;
}
}