From 80e0cbd96e5aba8c8b55423f95a34b7086531b0e Mon Sep 17 00:00:00 2001 From: Zhihong Yu Date: Thu, 23 May 2013 21:59:10 +0000 Subject: [PATCH] HBASE-8497 Protobuf WAL also needs a trailer (Himanshu) git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1485866 13f79535-47bb-0310-9956-ffa450edef68 --- .../hbase/protobuf/generated/WALProtos.java | 323 +++++++++++++++++- hbase-protocol/src/main/protobuf/WAL.proto | 7 + .../hadoop/hbase/regionserver/wal/HLog.java | 21 ++ .../regionserver/wal/ProtobufLogReader.java | 93 ++++- .../regionserver/wal/ProtobufLogWriter.java | 39 ++- .../hbase/regionserver/wal/ReaderBase.java | 14 +- .../wal/SequenceFileLogReader.java | 3 +- .../wal/SequenceFileLogWriter.java | 8 + .../hbase/regionserver/wal/TestHLog.java | 87 +++++ .../hbase/regionserver/wal/TestHLogSplit.java | 42 ++- 10 files changed, 623 insertions(+), 14 deletions(-) diff --git a/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/WALProtos.java b/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/WALProtos.java index d5e9d844168..f72c90e8684 100644 --- a/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/WALProtos.java +++ b/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/WALProtos.java @@ -2922,6 +2922,306 @@ public final class WALProtos { // @@protoc_insertion_point(class_scope:CompactionDescriptor) } + public interface WALTrailerOrBuilder + extends com.google.protobuf.MessageOrBuilder { + } + public static final class WALTrailer extends + com.google.protobuf.GeneratedMessage + implements WALTrailerOrBuilder { + // Use WALTrailer.newBuilder() to construct. + private WALTrailer(Builder builder) { + super(builder); + } + private WALTrailer(boolean noInit) {} + + private static final WALTrailer defaultInstance; + public static WALTrailer getDefaultInstance() { + return defaultInstance; + } + + public WALTrailer getDefaultInstanceForType() { + return defaultInstance; + } + + public static final com.google.protobuf.Descriptors.Descriptor + getDescriptor() { + return org.apache.hadoop.hbase.protobuf.generated.WALProtos.internal_static_WALTrailer_descriptor; + } + + protected com.google.protobuf.GeneratedMessage.FieldAccessorTable + internalGetFieldAccessorTable() { + return org.apache.hadoop.hbase.protobuf.generated.WALProtos.internal_static_WALTrailer_fieldAccessorTable; + } + + private void initFields() { + } + private byte memoizedIsInitialized = -1; + public final boolean isInitialized() { + byte isInitialized = memoizedIsInitialized; + if (isInitialized != -1) return isInitialized == 1; + + memoizedIsInitialized = 1; + return true; + } + + public void writeTo(com.google.protobuf.CodedOutputStream output) + throws java.io.IOException { + getSerializedSize(); + getUnknownFields().writeTo(output); + } + + private int memoizedSerializedSize = -1; + public int getSerializedSize() { + int size = memoizedSerializedSize; + if (size != -1) return size; + + size = 0; + size += getUnknownFields().getSerializedSize(); + memoizedSerializedSize = size; + return size; + } + + private static final long serialVersionUID = 0L; + @java.lang.Override + protected java.lang.Object writeReplace() + throws java.io.ObjectStreamException { + return super.writeReplace(); + } + + @java.lang.Override + public boolean equals(final java.lang.Object obj) { + if (obj == this) { + return true; + } + if (!(obj instanceof org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALTrailer)) { + return super.equals(obj); + } + org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALTrailer other = (org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALTrailer) obj; + + boolean result = true; + result = result && + getUnknownFields().equals(other.getUnknownFields()); + return result; + } + + @java.lang.Override + public int hashCode() { + int hash = 41; + hash = (19 * hash) + getDescriptorForType().hashCode(); + hash = (29 * hash) + getUnknownFields().hashCode(); + return hash; + } + + public static org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALTrailer parseFrom( + com.google.protobuf.ByteString data) + throws com.google.protobuf.InvalidProtocolBufferException { + return newBuilder().mergeFrom(data).buildParsed(); + } + public static org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALTrailer parseFrom( + com.google.protobuf.ByteString data, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws com.google.protobuf.InvalidProtocolBufferException { + return newBuilder().mergeFrom(data, extensionRegistry) + .buildParsed(); + } + public static org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALTrailer parseFrom(byte[] data) + throws com.google.protobuf.InvalidProtocolBufferException { + return newBuilder().mergeFrom(data).buildParsed(); + } + public static org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALTrailer parseFrom( + byte[] data, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws com.google.protobuf.InvalidProtocolBufferException { + return newBuilder().mergeFrom(data, extensionRegistry) + .buildParsed(); + } + public static org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALTrailer parseFrom(java.io.InputStream input) + throws java.io.IOException { + return newBuilder().mergeFrom(input).buildParsed(); + } + public static org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALTrailer parseFrom( + java.io.InputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + return newBuilder().mergeFrom(input, extensionRegistry) + .buildParsed(); + } + public static org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALTrailer parseDelimitedFrom(java.io.InputStream input) + throws java.io.IOException { + Builder builder = newBuilder(); + if (builder.mergeDelimitedFrom(input)) { + return builder.buildParsed(); + } else { + return null; + } + } + public static org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALTrailer parseDelimitedFrom( + java.io.InputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + Builder builder = newBuilder(); + if (builder.mergeDelimitedFrom(input, extensionRegistry)) { + return builder.buildParsed(); + } else { + return null; + } + } + public static org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALTrailer parseFrom( + com.google.protobuf.CodedInputStream input) + throws java.io.IOException { + return newBuilder().mergeFrom(input).buildParsed(); + } + public static org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALTrailer parseFrom( + com.google.protobuf.CodedInputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + return newBuilder().mergeFrom(input, extensionRegistry) + .buildParsed(); + } + + public static Builder newBuilder() { return Builder.create(); } + public Builder newBuilderForType() { return newBuilder(); } + public static Builder newBuilder(org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALTrailer prototype) { + return newBuilder().mergeFrom(prototype); + } + public Builder toBuilder() { return newBuilder(this); } + + @java.lang.Override + protected Builder newBuilderForType( + com.google.protobuf.GeneratedMessage.BuilderParent parent) { + Builder builder = new Builder(parent); + return builder; + } + public static final class Builder extends + com.google.protobuf.GeneratedMessage.Builder + implements org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALTrailerOrBuilder { + public static final com.google.protobuf.Descriptors.Descriptor + getDescriptor() { + return org.apache.hadoop.hbase.protobuf.generated.WALProtos.internal_static_WALTrailer_descriptor; + } + + protected com.google.protobuf.GeneratedMessage.FieldAccessorTable + internalGetFieldAccessorTable() { + return org.apache.hadoop.hbase.protobuf.generated.WALProtos.internal_static_WALTrailer_fieldAccessorTable; + } + + // Construct using org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALTrailer.newBuilder() + private Builder() { + maybeForceBuilderInitialization(); + } + + private Builder(BuilderParent parent) { + super(parent); + maybeForceBuilderInitialization(); + } + private void maybeForceBuilderInitialization() { + if (com.google.protobuf.GeneratedMessage.alwaysUseFieldBuilders) { + } + } + private static Builder create() { + return new Builder(); + } + + public Builder clear() { + super.clear(); + return this; + } + + public Builder clone() { + return create().mergeFrom(buildPartial()); + } + + public com.google.protobuf.Descriptors.Descriptor + getDescriptorForType() { + return org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALTrailer.getDescriptor(); + } + + public org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALTrailer getDefaultInstanceForType() { + return org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALTrailer.getDefaultInstance(); + } + + public org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALTrailer build() { + org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALTrailer result = buildPartial(); + if (!result.isInitialized()) { + throw newUninitializedMessageException(result); + } + return result; + } + + private org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALTrailer buildParsed() + throws com.google.protobuf.InvalidProtocolBufferException { + org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALTrailer result = buildPartial(); + if (!result.isInitialized()) { + throw newUninitializedMessageException( + result).asInvalidProtocolBufferException(); + } + return result; + } + + public org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALTrailer buildPartial() { + org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALTrailer result = new org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALTrailer(this); + onBuilt(); + return result; + } + + public Builder mergeFrom(com.google.protobuf.Message other) { + if (other instanceof org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALTrailer) { + return mergeFrom((org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALTrailer)other); + } else { + super.mergeFrom(other); + return this; + } + } + + public Builder mergeFrom(org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALTrailer other) { + if (other == org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALTrailer.getDefaultInstance()) return this; + this.mergeUnknownFields(other.getUnknownFields()); + return this; + } + + public final boolean isInitialized() { + return true; + } + + public Builder mergeFrom( + com.google.protobuf.CodedInputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + com.google.protobuf.UnknownFieldSet.Builder unknownFields = + com.google.protobuf.UnknownFieldSet.newBuilder( + this.getUnknownFields()); + while (true) { + int tag = input.readTag(); + switch (tag) { + case 0: + this.setUnknownFields(unknownFields.build()); + onChanged(); + return this; + default: { + if (!parseUnknownField(input, unknownFields, + extensionRegistry, tag)) { + this.setUnknownFields(unknownFields.build()); + onChanged(); + return this; + } + break; + } + } + } + } + + + // @@protoc_insertion_point(builder_scope:WALTrailer) + } + + static { + defaultInstance = new WALTrailer(true); + defaultInstance.initFields(); + } + + // @@protoc_insertion_point(class_scope:WALTrailer) + } + private static com.google.protobuf.Descriptors.Descriptor internal_static_WALHeader_descriptor; private static @@ -2942,6 +3242,11 @@ public final class WALProtos { private static com.google.protobuf.GeneratedMessage.FieldAccessorTable internal_static_CompactionDescriptor_fieldAccessorTable; + private static com.google.protobuf.Descriptors.Descriptor + internal_static_WALTrailer_descriptor; + private static + com.google.protobuf.GeneratedMessage.FieldAccessorTable + internal_static_WALTrailer_fieldAccessorTable; public static com.google.protobuf.Descriptors.FileDescriptor getDescriptor() { @@ -2962,11 +3267,11 @@ public final class WALProtos { "ionDescriptor\022\021\n\ttableName\030\001 \002(\014\022\031\n\021enco" + "dedRegionName\030\002 \002(\014\022\022\n\nfamilyName\030\003 \002(\014\022", "\027\n\017compactionInput\030\004 \003(\t\022\030\n\020compactionOu" + - "tput\030\005 \003(\t\022\024\n\014storeHomeDir\030\006 \002(\t*F\n\tScop" + - "eType\022\033\n\027REPLICATION_SCOPE_LOCAL\020\000\022\034\n\030RE" + - "PLICATION_SCOPE_GLOBAL\020\001B?\n*org.apache.h" + - "adoop.hbase.protobuf.generatedB\tWALProto" + - "sH\001\210\001\000\240\001\001" + "tput\030\005 \003(\t\022\024\n\014storeHomeDir\030\006 \002(\t\"\014\n\nWALT" + + "railer*F\n\tScopeType\022\033\n\027REPLICATION_SCOPE" + + "_LOCAL\020\000\022\034\n\030REPLICATION_SCOPE_GLOBAL\020\001B?" + + "\n*org.apache.hadoop.hbase.protobuf.gener" + + "atedB\tWALProtosH\001\210\001\000\240\001\001" }; com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() { @@ -3005,6 +3310,14 @@ public final class WALProtos { new java.lang.String[] { "TableName", "EncodedRegionName", "FamilyName", "CompactionInput", "CompactionOutput", "StoreHomeDir", }, org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor.class, org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor.Builder.class); + internal_static_WALTrailer_descriptor = + getDescriptor().getMessageTypes().get(4); + internal_static_WALTrailer_fieldAccessorTable = new + com.google.protobuf.GeneratedMessage.FieldAccessorTable( + internal_static_WALTrailer_descriptor, + new java.lang.String[] { }, + org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALTrailer.class, + org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALTrailer.Builder.class); return null; } }; diff --git a/hbase-protocol/src/main/protobuf/WAL.proto b/hbase-protocol/src/main/protobuf/WAL.proto index 43cff5943f1..5635c60de4e 100644 --- a/hbase-protocol/src/main/protobuf/WAL.proto +++ b/hbase-protocol/src/main/protobuf/WAL.proto @@ -74,3 +74,10 @@ message CompactionDescriptor { repeated string compactionOutput = 5; required string storeHomeDir = 6; } + +/** + * A trailer that is appended to the end of a properly closed HLog WAL file. + * If missing, this is either a legacy or a corrupted WAL file. + */ +message WALTrailer { +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java index 42ba3cb6948..3d814557dd1 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java @@ -35,6 +35,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.exceptions.FailedLogCloseException; +import org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALTrailer; import org.apache.hadoop.io.Writable; @@ -48,6 +49,14 @@ public interface HLog { /** The META region's HLog filename extension */ public static final String META_HLOG_FILE_EXTN = ".meta"; + /** + * Configuration name of HLog Trailer's warning size. If a waltrailer's size is greater than the + * configured size, a warning is logged. This is used with Protobuf reader/writer. + */ + public static final String WAL_TRAILER_WARN_SIZE = + "hbase.regionserver.waltrailer.warn.size"; + public static final int DEFAULT_WAL_TRAILER_WARN_SIZE = 1024*1024; // 1MB + static final Pattern EDITFILES_NAME_PATTERN = Pattern.compile("-?[0-9]+"); public static final String RECOVERED_LOG_TMPFILE_SUFFIX = ".temp"; @@ -71,6 +80,12 @@ public interface HLog { long getPosition() throws IOException; void reset() throws IOException; + + /** + * @return the WALTrailer of the current HLog. It may be null in case of legacy or corrupt WAL + * files. + */ + WALTrailer getWALTrailer(); } public interface Writer { @@ -83,6 +98,12 @@ public interface HLog { void append(Entry entry) throws IOException; long getLength() throws IOException; + + /** + * Sets HLog's WALTrailer. This trailer is appended at the end of WAL on closing. + * @param walTrailer trailer to append to WAL. + */ + void setWALTrailer(WALTrailer walTrailer); } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.java index 213ef7ab041..b2d3dcc5770 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.java @@ -21,32 +21,45 @@ package org.apache.hadoop.hbase.regionserver.wal; import java.io.EOFException; import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Arrays; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.fs.FSDataInputStream; -import org.apache.hadoop.hbase.Cell; -import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.codec.Codec; import org.apache.hadoop.hbase.protobuf.generated.WALProtos; import org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALKey; +import org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALTrailer; import org.apache.hadoop.hbase.util.Bytes; import com.google.protobuf.InvalidProtocolBufferException; /** - * Reader for protobuf-based WAL. + * A Protobuf based WAL has the following structure: + *

+ * <PB_WAL_MAGIC><WALHeader><WALEdits>...<WALEdits><Trailer> + * <TrailerSize> <PB_WAL_COMPLETE_MAGIC> + *

+ * The Reader reads meta information (WAL Compression state, WALTrailer, etc) in + * {@link ProtobufLogReader#initReader(FSDataInputStream)}. A WALTrailer is an extensible structure + * which is appended at the end of the WAL. This is empty for now; it can contain some meta + * information such as Region level stats, etc in future. */ @InterfaceAudience.Private public class ProtobufLogReader extends ReaderBase { private static final Log LOG = LogFactory.getLog(ProtobufLogReader.class); static final byte[] PB_WAL_MAGIC = Bytes.toBytes("PWAL"); - + static final byte[] PB_WAL_COMPLETE_MAGIC = Bytes.toBytes("LAWP"); private FSDataInputStream inputStream; private Codec.Decoder cellDecoder; private WALCellCodec.ByteStringUncompressor byteStringUncompressor; private boolean hasCompression = false; + // walEditsStopOffset is the position of the last byte to read. After reading the last WALEdit entry + // in the hlog, the inputstream's position is equal to walEditsStopOffset. + private long walEditsStopOffset; + private boolean trailerPresent; public ProtobufLogReader() { super(); @@ -97,7 +110,67 @@ public class ProtobufLogReader extends ReaderBase { this.hasCompression = header.hasHasCompression() && header.getHasCompression(); } this.inputStream = stream; + this.walEditsStopOffset = this.fileLength; + long currentPosition = stream.getPos(); + trailerPresent = setTrailerIfPresent(); + this.seekOnFs(currentPosition); + if (LOG.isDebugEnabled()) { + LOG.debug("After reading the trailer: walEditsStopOffset: " + this.walEditsStopOffset + + ", fileLength: " + this.fileLength + ", " + "trailerPresent: " + trailerPresent); + } + } + /** + * To check whether a trailer is present in a WAL, it seeks to position (fileLength - + * PB_WAL_COMPLETE_MAGIC.size() - Bytes.SIZEOF_INT). It reads the int value to know the size of + * the trailer, and checks whether the trailer is present at the end or not by comparing the last + * PB_WAL_COMPLETE_MAGIC.size() bytes. In case trailer is not present, it returns false; + * otherwise, sets the trailer and sets this.walEditsStopOffset variable up to the point just + * before the trailer. + * + *

+ * In case the trailer size > this.trailerMaxSize, it is read after a WARN message. + * @return true if a valid trailer is present + * @throws IOException + */ + private boolean setTrailerIfPresent() { + try { + long trailerSizeOffset = this.fileLength - (PB_WAL_COMPLETE_MAGIC.length + Bytes.SIZEOF_INT); + if (trailerSizeOffset <= 0) return false;// no trailer possible. + this.seekOnFs(trailerSizeOffset); + // read the int as trailer size. + int trailerSize = this.inputStream.readInt(); + ByteBuffer buf = ByteBuffer.allocate(ProtobufLogReader.PB_WAL_COMPLETE_MAGIC.length); + this.inputStream.readFully(buf.array(), buf.arrayOffset(), buf.capacity()); + if (!Arrays.equals(buf.array(), PB_WAL_COMPLETE_MAGIC)) { + LOG.warn("No trailer found."); + return false; + } + if (trailerSize < 0) { + LOG.warn("Invalid trailer Size " + trailerSize + ", ignoring the trailer"); + return false; + } else if (trailerSize > this.trailerWarnSize) { + // continue reading after warning the user. + LOG.warn("Please investigate WALTrailer usage. Trailer size > maximum configured size : " + + trailerSize + " > " + this.trailerWarnSize); + } + // seek to the position where trailer starts. + long positionOfTrailer = trailerSizeOffset - trailerSize; + this.seekOnFs(positionOfTrailer); + // read the trailer. + buf = ByteBuffer.allocate(trailerSize);// for trailer. + this.inputStream.readFully(buf.array(), buf.arrayOffset(), buf.capacity()); + trailer = WALTrailer.parseFrom(buf.array()); + this.walEditsStopOffset = positionOfTrailer; + return true; + } catch (IOException ioe) { + LOG.warn("Got IOE while reading the trailer. Continuing as if no trailer is present.", ioe); + } + return false; } @Override @@ -117,6 +190,7 @@ public class ProtobufLogReader extends ReaderBase { @Override protected boolean readNext(HLog.Entry entry) throws IOException { while (true) { + if (trailerPresent && this.inputStream.getPos() == this.walEditsStopOffset) return false; WALKey.Builder builder = WALKey.newBuilder(); boolean hasNext = false; try { @@ -162,6 +236,12 @@ public class ProtobufLogReader extends ReaderBase { LOG.error(message); throw new IOException(message, ex); } + if (trailerPresent && this.inputStream.getPos() > this.walEditsStopOffset) { + LOG.error("Read WALTrailer while reading WALEdits. hlog: " + this.path + + ", inputStream.getPos(): " + this.inputStream.getPos() + ", walEditsStopOffset: " + + this.walEditsStopOffset); + throw new IOException("Read WALTrailer while reading WALEdits"); + } return true; } } @@ -185,6 +265,11 @@ public class ProtobufLogReader extends ReaderBase { return null; } + @Override + public WALTrailer getWALTrailer() { + return trailer; + } + @Override protected void seekOnFs(long pos) throws IOException { this.inputStream.seek(pos); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java index aba09f7f947..396a536a818 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java @@ -33,6 +33,7 @@ import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.codec.Codec; import org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALHeader; import org.apache.hadoop.hbase.util.FSUtils; +import org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALTrailer; /** * Writer for protobuf-based WAL. @@ -43,7 +44,11 @@ public class ProtobufLogWriter implements HLog.Writer { private FSDataOutputStream output; private Codec.Encoder cellEncoder; private WALCellCodec.ByteStringCompressor compressor; - + private boolean trailerWritten; + private WALTrailer trailer; + // maximum size of the wal Trailer in bytes. If a user writes/reads a trailer with size larger + // than this size, it is written/read respectively, with a WARN message in the log. + private int trailerWarnSize; /** Context used by our wal dictionary compressor. * Null if we're not to do our custom dictionary compression. */ @@ -64,6 +69,8 @@ public class ProtobufLogWriter implements HLog.Writer { throw new IOException("Failed to initiate CompressionContext", e); } } + this.trailerWarnSize = conf.getInt(HLog.WAL_TRAILER_WARN_SIZE, + HLog.DEFAULT_WAL_TRAILER_WARN_SIZE); int bufferSize = FSUtils.getDefaultBufferSize(fs); short replication = (short)conf.getInt( "hbase.regionserver.hlog.replication", FSUtils.getDefaultReplication(fs, path)); @@ -78,6 +85,8 @@ public class ProtobufLogWriter implements HLog.Writer { if (doCompress) { this.compressor = codec.getByteStringCompressor(); } + // instantiate trailer to default value. + trailer = WALTrailer.newBuilder().build(); LOG.debug("Writing protobuf WAL; path=" + path + ", compression=" + doCompress); } @@ -96,6 +105,7 @@ public class ProtobufLogWriter implements HLog.Writer { public void close() throws IOException { if (this.output != null) { try { + if (!trailerWritten) writeWALTrailer(); this.output.close(); } catch (NullPointerException npe) { // Can get a NPE coming up from down in DFSClient$DFSOutputStream#close @@ -105,6 +115,28 @@ public class ProtobufLogWriter implements HLog.Writer { } } + private void writeWALTrailer() { + try { + int trailerSize = 0; + if (this.trailer == null) { + // use default trailer. + LOG.warn("WALTrailer is null. Continuing with default."); + this.trailer = WALTrailer.newBuilder().build(); + trailerSize = this.trailer.getSerializedSize(); + } else if ((trailerSize = this.trailer.getSerializedSize()) > this.trailerWarnSize) { + // continue writing after warning the user. + LOG.warn("Please investigate WALTrailer usage. Trailer size > maximum size : " + + trailerSize + " > " + this.trailerWarnSize); + } + this.trailer.writeTo(output); + this.output.writeInt(trailerSize); + this.output.write(ProtobufLogReader.PB_WAL_COMPLETE_MAGIC); + this.trailerWritten = true; + } catch (IOException ioe) { + LOG.error("Got IOException while writing trailer", ioe); + } + } + @Override public void sync() throws IOException { try { @@ -129,4 +161,9 @@ public class ProtobufLogWriter implements HLog.Writer { public FSDataOutputStream getStream() { return this.output; } + + @Override + public void setWALTrailer(WALTrailer walTrailer) { + this.trailer = walTrailer; + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ReaderBase.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ReaderBase.java index 742a3ec23d5..dab80fea28f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ReaderBase.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ReaderBase.java @@ -26,6 +26,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALTrailer; @InterfaceAudience.Private public abstract class ReaderBase implements HLog.Reader { @@ -33,6 +34,11 @@ public abstract class ReaderBase implements HLog.Reader { protected FileSystem fs; protected Path path; protected long edit = 0; + protected long fileLength; + protected WALTrailer trailer; + // maximum size of the wal Trailer in bytes. If a user writes/reads a trailer with size larger + // than this size, it is written/read respectively, with a WARN message in the log. + protected int trailerWarnSize; /** * Compression context to use reading. Can be null if no compression. */ @@ -51,7 +57,9 @@ public abstract class ReaderBase implements HLog.Reader { this.conf = conf; this.path = path; this.fs = fs; - + this.fileLength = this.fs.getFileStatus(path).getLen(); + this.trailerWarnSize = conf.getInt(HLog.WAL_TRAILER_WARN_SIZE, + HLog.DEFAULT_WAL_TRAILER_WARN_SIZE); initReader(stream); boolean compression = hasCompression(); @@ -134,4 +142,8 @@ public abstract class ReaderBase implements HLog.Reader { */ protected abstract void seekOnFs(long pos) throws IOException; + @Override + public WALTrailer getWALTrailer() { + return null; + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogReader.java index d2bfae587f1..b654798a825 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogReader.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogReader.java @@ -25,7 +25,6 @@ import java.lang.reflect.Field; import java.lang.reflect.Method; import java.util.NavigableMap; - import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; @@ -35,8 +34,8 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.regionserver.wal.HLog.Entry; import org.apache.hadoop.io.SequenceFile; -import org.apache.hadoop.io.Text; import org.apache.hadoop.io.SequenceFile.Metadata; +import org.apache.hadoop.io.Text; @InterfaceAudience.Private public class SequenceFileLogReader extends ReaderBase { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogWriter.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogWriter.java index 4bf67f6d245..672446e6eac 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogWriter.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogWriter.java @@ -33,6 +33,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.util.FSUtils; +import org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALTrailer; import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.SequenceFile.CompressionType; @@ -232,4 +233,11 @@ public class SequenceFileLogWriter implements HLog.Writer { public FSDataOutputStream getWriterFSDataOutputStream() { return this.writer_out; } + + /** + * This method is empty as trailer is added only in Protobuf based hlog readers/writers. + */ + @Override + public void setWALTrailer(WALTrailer walTrailer) { + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java index e025d7d2689..c24dfc7f4b8 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java @@ -822,6 +822,93 @@ public class TestHLog { } } + /** + * Reads the WAL with and without WALTrailer. + * @throws IOException + */ + @Test + public void testWALTrailer() throws IOException { + // read With trailer. + doRead(true); + // read without trailer + doRead(false); + } + + /** + * Appends entries in the WAL and reads it. + * @param withTrailer If 'withTrailer' is true, it calls a close on the WALwriter before reading + * so that a trailer is appended to the WAL. Otherwise, it starts reading after the sync + * call. This means that reader is not aware of the trailer. In this scenario, if the + * reader tries to read the trailer in its next() call, it returns false from + * ProtoBufLogReader. + * @throws IOException + */ + private void doRead(boolean withTrailer) throws IOException { + final int columnCount = 5; + final int recordCount = 5; + final byte[] tableName = Bytes.toBytes("tablename"); + final byte[] row = Bytes.toBytes("row"); + long timestamp = System.currentTimeMillis(); + Path path = new Path(dir, "temphlog"); + HLog.Writer writer = null; + HLog.Reader reader = null; + try { + HRegionInfo hri = new HRegionInfo(tableName, + HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW); + HTableDescriptor htd = new HTableDescriptor(tableName); + fs.mkdirs(dir); + // Write log in pb format. + writer = HLogFactory.createWriter(fs, path, conf); + for (int i = 0; i < recordCount; ++i) { + HLogKey key = new HLogKey( + hri.getEncodedNameAsBytes(), tableName, i, timestamp, HConstants.DEFAULT_CLUSTER_ID); + WALEdit edit = new WALEdit(); + for (int j = 0; j < columnCount; ++j) { + if (i == 0) { + htd.addFamily(new HColumnDescriptor("column" + j)); + } + String value = i + "" + j; + edit.add(new KeyValue(row, row, row, timestamp, Bytes.toBytes(value))); + } + writer.append(new HLog.Entry(key, edit)); + } + writer.sync(); + if (withTrailer) writer.close(); + + // Now read the log using standard means. + reader = HLogFactory.createReader(fs, path, conf); + assertTrue(reader instanceof ProtobufLogReader); + if (withTrailer) { + assertNotNull(reader.getWALTrailer()); + } else { + assertNull(reader.getWALTrailer()); + } + for (int i = 0; i < recordCount; ++i) { + HLog.Entry entry = reader.next(); + assertNotNull(entry); + assertEquals(columnCount, entry.getEdit().size()); + assertArrayEquals(hri.getEncodedNameAsBytes(), entry.getKey().getEncodedRegionName()); + assertArrayEquals(tableName, entry.getKey().getTablename()); + int idx = 0; + for (KeyValue val : entry.getEdit().getKeyValues()) { + assertTrue(Bytes.equals(row, val.getRow())); + String value = i + "" + idx; + assertArrayEquals(Bytes.toBytes(value), val.getValue()); + idx++; + } + } + HLog.Entry entry = reader.next(); + assertNull(entry); + } finally { + if (writer != null) { + writer.close(); + } + if (reader != null) { + reader.close(); + } + } + } + static class DumbWALActionsListener implements WALActionsListener { int increments = 0; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java index 40bd7a138a7..3d1aca9cd57 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java @@ -133,6 +133,7 @@ public class TestHLogSplit { INSERT_GARBAGE_IN_THE_MIDDLE, APPEND_GARBAGE, TRUNCATE, + TRUNCATE_TRAILER } @BeforeClass @@ -661,6 +662,38 @@ public class TestHLogSplit { assertEquals(archivedLogs.length, 0); } + @Test + public void testCorruptWALTrailer() throws IOException { + conf.setBoolean(HBASE_SKIP_ERRORS, false); + + final String REGION = "region__1"; + REGIONS.removeAll(REGIONS); + REGIONS.add(REGION); + + int entryCount = 10; + Path c1 = new Path(HLOGDIR, HLOG_FILE_PREFIX + "0"); + generateHLogs(1, entryCount, -1); + corruptHLog(c1, Corruptions.TRUNCATE_TRAILER, true, fs); + + fs.initialize(fs.getUri(), conf); + HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf, + HBASEDIR, HLOGDIR, OLDLOGDIR, fs); + logSplitter.splitLog(); + + Path splitLog = getLogForRegion(HBASEDIR, TABLE_NAME, REGION); + + int actualCount = 0; + HLog.Reader in = HLogFactory.createReader(fs, splitLog, conf); + @SuppressWarnings("unused") + HLog.Entry entry; + while ((entry = in.next()) != null) ++actualCount; + assertEquals(entryCount, actualCount); + + // should not have stored the EOF files as corrupt + FileStatus[] archivedLogs = fs.listStatus(CORRUPTDIR); + assertEquals(archivedLogs.length, 0); + } + @Test public void testLogsGetArchivedAfterSplit() throws IOException { conf.setBoolean(HBASE_SKIP_ERRORS, false); @@ -1462,9 +1495,16 @@ public class TestHLogSplit { case TRUNCATE: fs.delete(path, false); out = fs.create(path); - out.write(corrupted_bytes, 0, fileSize-32); + out.write(corrupted_bytes, 0, fileSize + - (32 + ProtobufLogReader.PB_WAL_COMPLETE_MAGIC.length + Bytes.SIZEOF_INT)); closeOrFlush(close, out); + break; + case TRUNCATE_TRAILER: + fs.delete(path, false); + out = fs.create(path); + out.write(corrupted_bytes, 0, fileSize - Bytes.SIZEOF_INT);// trailer is truncated. + closeOrFlush(close, out); break; } }