keyValues = edit.getKeyValues();
for (KeyValue value: keyValues) {
- editBuilder.addKeyValueBytes(ByteString.copyFrom(
+ entryBuilder.addKeyValueBytes(ByteString.copyFrom(
value.getBuffer(), value.getOffset(), value.getLength()));
}
builder.addEntry(entryBuilder.build());
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index bd9c73b0c40..bc1ef09d2df 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -115,7 +115,7 @@ import org.apache.hadoop.hbase.monitoring.TaskMonitor;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetRegionInfoResponse.CompactionState;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceCall;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
-import org.apache.hadoop.hbase.protobuf.generated.WAL.CompactionDescriptor;
+import org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor;
import org.apache.hadoop.hbase.regionserver.MultiVersionConsistencyControl.WriteEntry;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
import org.apache.hadoop.hbase.regionserver.wal.HLog;
@@ -2890,7 +2890,7 @@ public class HRegion implements HeapSize { // , Writable{
skippedEdits++;
continue;
- }
+ }
// Figure which store the edit is meant for.
if (store == null || !kv.matchingFamily(store.getFamily().getName())) {
store = this.stores.get(kv.getFamily());
@@ -2972,7 +2972,8 @@ public class HRegion implements HeapSize { // , Writable{
throws IOException {
Store store = this.getStore(compaction.getFamilyName().toByteArray());
if (store == null) {
- LOG.warn("Found Compaction WAL edit for deleted family:" + Bytes.toString(compaction.getFamilyName().toByteArray()));
+ LOG.warn("Found Compaction WAL edit for deleted family:" +
+ Bytes.toString(compaction.getFamilyName().toByteArray()));
return;
}
store.completeCompactionMarker(compaction);
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
index 907534bf299..36db9226162 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
@@ -63,7 +63,7 @@ import org.apache.hadoop.hbase.io.hfile.HFileScanner;
import org.apache.hadoop.hbase.io.hfile.NoOpDataBlockEncoder;
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
-import org.apache.hadoop.hbase.protobuf.generated.WAL.CompactionDescriptor;
+import org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
index f053398de5d..1f2017c5b6f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
@@ -35,7 +35,7 @@ import org.apache.hadoop.hbase.io.HeapSize;
import org.apache.hadoop.hbase.io.compress.Compression;
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoder;
-import org.apache.hadoop.hbase.protobuf.generated.WAL.CompactionDescriptor;
+import org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/Compressor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/Compressor.java
index a7b301d3716..f83217a3da3 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/Compressor.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/Compressor.java
@@ -25,6 +25,7 @@ import java.io.IOException;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.regionserver.wal.HLog;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.WritableUtils;
@@ -32,6 +33,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import com.google.common.base.Preconditions;
+import com.google.protobuf.ByteString;
/**
* A set of static functions for running our custom WAL compression/decompression.
@@ -63,26 +65,31 @@ public class Compressor {
private static void transformFile(Path input, Path output)
throws IOException {
- SequenceFileLogReader in = new SequenceFileLogReader();
- SequenceFileLogWriter out = new SequenceFileLogWriter();
+ Configuration conf = HBaseConfiguration.create();
+
+ FileSystem inFS = input.getFileSystem(conf);
+ FileSystem outFS = output.getFileSystem(conf);
+
+ HLog.Reader in = HLogFactory.createReader(inFS, input, conf, null, false);
+ HLog.Writer out = null;
try {
- Configuration conf = HBaseConfiguration.create();
-
- FileSystem inFS = input.getFileSystem(conf);
- FileSystem outFS = output.getFileSystem(conf);
-
- in.init(inFS, input, conf);
- boolean compress = in.reader.isWALCompressionEnabled();
-
+ if (!(in instanceof ReaderBase)) {
+ System.err.println("Cannot proceed, invalid reader type: " + in.getClass().getName());
+ return;
+ }
+ boolean compress = ((ReaderBase)in).hasCompression();
conf.setBoolean(HConstants.ENABLE_WAL_COMPRESSION, !compress);
- out.init(outFS, output, conf);
+ out = HLogFactory.createWriter(outFS, output, conf);
HLog.Entry e = null;
while ((e = in.next()) != null) out.append(e);
} finally {
in.close();
- out.close();
+ if (out != null) {
+ out.close();
+ out = null;
+ }
}
}
@@ -93,6 +100,7 @@ public class Compressor {
* @param dict the dictionary we use for our read.
* @return the uncompressed array.
*/
+ @Deprecated
static byte[] readCompressed(DataInput in, Dictionary dict)
throws IOException {
byte status = in.readByte();
@@ -129,6 +137,7 @@ public class Compressor {
*
* @return the length of the uncompressed data
*/
+ @Deprecated
static int uncompressIntoArray(byte[] to, int offset, DataInput in,
Dictionary dict) throws IOException {
byte status = in.readByte();
@@ -167,6 +176,7 @@ public class Compressor {
* @param out the DataOutput to write into
* @param dict the dictionary to use for compression
*/
+ @Deprecated
static void writeCompressed(byte[] data, int offset, int length,
DataOutput out, Dictionary dict)
throws IOException {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
index ed36122cf3b..00e2590f624 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
@@ -527,12 +527,10 @@ class FSHLog implements HLog, Syncable {
}
}
FSHLog.Writer nextWriter = this.createWriterInstance(fs, newPath, conf);
- // Can we get at the dfsclient outputstream? If an instance of
- // SFLW, it'll have done the necessary reflection to get at the
- // protected field name.
+ // Can we get at the dfsclient outputstream?
FSDataOutputStream nextHdfsOut = null;
- if (nextWriter instanceof SequenceFileLogWriter) {
- nextHdfsOut = ((SequenceFileLogWriter)nextWriter).getWriterFSDataOutputStream();
+ if (nextWriter instanceof ProtobufLogWriter) {
+ nextHdfsOut = ((ProtobufLogWriter)nextWriter).getStream();
}
Path oldFile = null;
@@ -853,43 +851,6 @@ class FSHLog implements HLog, Syncable {
return new HLogKey(encodedRegionName, tableName, seqnum, now, clusterId);
}
- @Override
- public long append(HRegionInfo regionInfo, HLogKey logKey, WALEdit logEdit,
- HTableDescriptor htd, boolean doSync)
- throws IOException {
- if (this.closed) {
- throw new IOException("Cannot append; log is closed");
- }
- long txid = 0;
- synchronized (updateLock) {
- long seqNum = obtainSeqNum();
- logKey.setLogSeqNum(seqNum);
- // The 'lastSeqWritten' map holds the sequence number of the oldest
- // write for each region (i.e. the first edit added to the particular
- // memstore). When the cache is flushed, the entry for the
- // region being flushed is removed if the sequence number of the flush
- // is greater than or equal to the value in lastSeqWritten.
- this.oldestUnflushedSeqNums.putIfAbsent(regionInfo.getEncodedNameAsBytes(),
- Long.valueOf(seqNum));
- doWrite(regionInfo, logKey, logEdit, htd);
- txid = this.unflushedEntries.incrementAndGet();
- this.numEntries.incrementAndGet();
- if (htd.isDeferredLogFlush()) {
- lastDeferredTxid = txid;
- }
- }
-
- // Sync if catalog region, and if not then check if that table supports
- // deferred log flushing
- if (doSync &&
- (regionInfo.isMetaRegion() ||
- !htd.isDeferredLogFlush())) {
- // sync txn to file system
- this.sync(txid);
- }
- return txid;
- }
-
@Override
public void append(HRegionInfo info, byte [] tableName, WALEdit edits,
final long now, HTableDescriptor htd)
@@ -1456,4 +1417,4 @@ class FSHLog implements HLog, Syncable {
System.exit(-1);
}
}
-}
\ No newline at end of file
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java
index bba8b32c024..42ba3cb6948 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java
@@ -29,6 +29,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HRegionInfo;
@@ -51,7 +52,14 @@ public interface HLog {
public static final String RECOVERED_LOG_TMPFILE_SUFFIX = ".temp";
public interface Reader {
- void init(FileSystem fs, Path path, Configuration c) throws IOException;
+
+ /**
+ * @param fs File system.
+ * @param path Path.
+ * @param c Config.
+ * @param s Input stream that may have been pre-opened by the caller; may be null.
+ */
+ void init(FileSystem fs, Path path, Configuration c, FSDataInputStream s) throws IOException;
void close() throws IOException;
@@ -234,20 +242,6 @@ public interface HLog {
*/
public void closeAndDelete() throws IOException;
- /**
- * Append an entry to the log.
- *
- * @param regionInfo
- * @param logEdit
- * @param logKey
- * @param doSync
- * shall we sync after writing the transaction
- * @return The txid of this transaction
- * @throws IOException
- */
- public long append(HRegionInfo regionInfo, HLogKey logKey, WALEdit logEdit,
- HTableDescriptor htd, boolean doSync) throws IOException;
-
/**
* Only used in tests.
*
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogFactory.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogFactory.java
index a91284ab8d0..83db320d4c7 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogFactory.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogFactory.java
@@ -21,23 +21,26 @@
package org.apache.hadoop.hbase.regionserver.wal;
import java.io.IOException;
+import java.util.Arrays;
import java.io.InterruptedIOException;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.regionserver.wal.HLog.Reader;
import org.apache.hadoop.hbase.regionserver.wal.HLog.Writer;
+import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.CancelableProgressable;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
public class HLogFactory {
private static final Log LOG = LogFactory.getLog(HLogFactory.class);
-
+
public static HLog createHLog(final FileSystem fs, final Path root, final String logName,
final Configuration conf) throws IOException {
return new FSHLog(fs, root, logName, conf);
@@ -60,13 +63,12 @@ public class HLogFactory {
return new FSHLog(fs, root, logName, HConstants.HREGION_OLDLOGDIR_NAME,
conf, listeners, false, prefix, true);
}
-
+
/*
* WAL Reader
*/
-
private static Class extends Reader> logReaderClass;
-
+
static void resetLogReaderClass() {
logReaderClass = null;
}
@@ -85,10 +87,17 @@ public class HLogFactory {
*/
public static HLog.Reader createReader(final FileSystem fs, final Path path,
Configuration conf, CancelableProgressable reporter) throws IOException {
- if (logReaderClass == null) {
+ return createReader(fs, path, conf, reporter, true);
+ }
+
+ public static HLog.Reader createReader(final FileSystem fs, final Path path,
+ Configuration conf, CancelableProgressable reporter, boolean allowCustom)
+ throws IOException {
+ if (allowCustom && (logReaderClass == null)) {
logReaderClass = conf.getClass("hbase.regionserver.hlog.reader.impl",
- SequenceFileLogReader.class, Reader.class);
+ ProtobufLogReader.class, Reader.class);
}
+ Class extends Reader> lrClass = allowCustom ? logReaderClass : ProtobufLogReader.class;
try {
// A hlog file could be under recovery, so it may take several
@@ -99,9 +108,25 @@ public class HLogFactory {
int nbAttempt = 0;
while (true) {
try {
- HLog.Reader reader = logReaderClass.newInstance();
- reader.init(fs, path, conf);
- return reader;
+ if (lrClass != ProtobufLogReader.class) {
+ // User is overriding the WAL reader, let them.
+ HLog.Reader reader = lrClass.newInstance();
+ reader.init(fs, path, conf, null);
+ return reader;
+ } else {
+ FSDataInputStream stream = fs.open(path);
+ // Note that zero-length file will fail to read PB magic, and attempt to create
+ // a non-PB reader and fail the same way existing code expects it to. If we get
+ // rid of the old reader entirely, we need to handle 0-size files differently from
+ // merely non-PB files.
+ byte[] magic = new byte[ProtobufLogReader.PB_WAL_MAGIC.length];
+ boolean isPbWal = (stream.read(magic) == magic.length)
+ && Arrays.equals(magic, ProtobufLogReader.PB_WAL_MAGIC);
+ HLog.Reader reader =
+ isPbWal ? new ProtobufLogReader() : new SequenceFileLogReader();
+ reader.init(fs, path, conf, stream);
+ return reader;
+ }
} catch (IOException e) {
String msg = e.getMessage();
if (msg != null && msg.contains("Cannot obtain block length")) {
@@ -139,9 +164,8 @@ public class HLogFactory {
/*
* WAL writer
*/
-
private static Class extends Writer> logWriterClass;
-
+
/**
* Create a writer for the WAL.
* @return A WAL writer. Close when done with it.
@@ -153,9 +177,9 @@ public class HLogFactory {
try {
if (logWriterClass == null) {
logWriterClass = conf.getClass("hbase.regionserver.hlog.writer.impl",
- SequenceFileLogWriter.class, Writer.class);
+ ProtobufLogWriter.class, Writer.class);
}
- HLog.Writer writer = (HLog.Writer) logWriterClass.newInstance();
+ HLog.Writer writer = (HLog.Writer)logWriterClass.newInstance();
writer.init(fs, path, conf);
return writer;
} catch (Exception e) {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogKey.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogKey.java
index ac90a539cb9..0028abebc3f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogKey.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogKey.java
@@ -22,16 +22,27 @@ import java.io.DataInput;
import java.io.DataOutput;
import java.io.EOFException;
import java.io.IOException;
+import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Map;
+import java.util.NavigableMap;
+import java.util.TreeMap;
import java.util.UUID;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
+import org.apache.hadoop.hbase.protobuf.generated.WALProtos.FamilyScope;
+import org.apache.hadoop.hbase.protobuf.generated.WALProtos.ScopeType;
+import org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALKey;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableUtils;
+import com.google.protobuf.ByteString;
+
/**
* A Key for an entry in the change log.
*
@@ -42,8 +53,12 @@ import org.apache.hadoop.io.WritableUtils;
* Some Transactional edits (START, COMMIT, ABORT) will not have an
* associated row.
*/
+// TODO: Key and WALEdit are never used separately, or in one-to-many relation, for practical
+// purposes. They need to be merged into HLogEntry.
@InterfaceAudience.Private
public class HLogKey implements WritableComparable {
+ public static final Log LOG = LogFactory.getLog(HLogKey.class);
+
// should be < 0 (@see #readFields(DataInput))
// version 2 supports HLog compression
enum Version {
@@ -89,16 +104,17 @@ public class HLogKey implements WritableComparable {
private UUID clusterId;
+ private NavigableMap scopes;
+
private CompressionContext compressionContext;
- /** Writable Constructor -- Do not use. */
public HLogKey() {
this(null, null, 0L, HConstants.LATEST_TIMESTAMP,
HConstants.DEFAULT_CLUSTER_ID);
}
/**
- * Create the log key!
+ * Create the log key for writing to somewhere.
* We maintain the tablename mainly for debugging purposes.
* A regionName is always a sub-table object.
*
@@ -111,11 +127,19 @@ public class HLogKey implements WritableComparable {
*/
public HLogKey(final byte [] encodedRegionName, final byte [] tablename,
long logSeqNum, final long now, UUID clusterId) {
- this.encodedRegionName = encodedRegionName;
- this.tablename = tablename;
this.logSeqNum = logSeqNum;
this.writeTime = now;
this.clusterId = clusterId;
+ this.encodedRegionName = encodedRegionName;
+ this.tablename = tablename;
+ }
+
+ /**
+ * Create HLogKey wrapper around protobuf WAL key; takes care of compression.
+ * @throws IOException Never, as the compression is not enabled.
+ */
+ public HLogKey(WALKey walKey) throws IOException {
+ readFieldsFromPb(walKey, null);
}
/**
@@ -137,11 +161,7 @@ public class HLogKey implements WritableComparable {
/** @return log sequence number */
public long getLogSeqNum() {
- return logSeqNum;
- }
-
- void setLogSeqNum(long logSeqNum) {
- this.logSeqNum = logSeqNum;
+ return this.logSeqNum;
}
/**
@@ -159,8 +179,16 @@ public class HLogKey implements WritableComparable {
return clusterId;
}
+ public NavigableMap getScopes() {
+ return scopes;
+ }
+
+ public void setScopes(NavigableMap scopes) {
+ this.scopes = scopes;
+ }
+
/**
- * Set the cluster id of this key
+ * Set the cluster id of this key.
* @param clusterId
*/
public void setClusterId(UUID clusterId) {
@@ -213,7 +241,7 @@ public class HLogKey implements WritableComparable {
if (result == 0) {
if (this.logSeqNum < o.logSeqNum) {
result = -1;
- } else if (this.logSeqNum > o.logSeqNum) {
+ } else if (this.logSeqNum > o.logSeqNum ) {
result = 1;
}
if (result == 0) {
@@ -255,7 +283,9 @@ public class HLogKey implements WritableComparable {
}
@Override
+ @Deprecated
public void write(DataOutput out) throws IOException {
+ LOG.warn("HLogKey is being serialized to writable - only expected in test code");
WritableUtils.writeVInt(out, VERSION.code);
if (compressionContext == null) {
Bytes.writeByteArray(out, this.encodedRegionName);
@@ -290,6 +320,7 @@ public class HLogKey implements WritableComparable {
// encodes the length of encodedRegionName.
// If < 0 we just read the version and the next vint is the length.
// @see Bytes#readByteArray(DataInput)
+ this.scopes = null; // writable HLogKey does not contain scopes
int len = WritableUtils.readVInt(in);
if (len < 0) {
// what we just read was the version
@@ -308,7 +339,7 @@ public class HLogKey implements WritableComparable {
this.encodedRegionName = Compressor.readCompressed(in, compressionContext.regionDict);
this.tablename = Compressor.readCompressed(in, compressionContext.tableDict);
}
-
+
this.logSeqNum = in.readLong();
this.writeTime = in.readLong();
this.clusterId = HConstants.DEFAULT_CLUSTER_ID;
@@ -325,4 +356,62 @@ public class HLogKey implements WritableComparable {
}
}
}
+
+ public WALKey.Builder getBuilder(
+ WALCellCodec.ByteStringCompressor compressor) throws IOException {
+ WALKey.Builder builder = WALKey.newBuilder();
+ if (compressionContext == null) {
+ builder.setEncodedRegionName(ByteString.copyFrom(this.encodedRegionName));
+ builder.setTableName(ByteString.copyFrom(this.tablename));
+ } else {
+ builder.setEncodedRegionName(
+ compressor.compress(this.encodedRegionName, compressionContext.regionDict));
+ builder.setTableName(compressor.compress(this.tablename, compressionContext.tableDict));
+ }
+ builder.setLogSequenceNumber(this.logSeqNum);
+ builder.setWriteTime(writeTime);
+ if (this.clusterId != HConstants.DEFAULT_CLUSTER_ID) {
+ builder.setClusterId(HBaseProtos.UUID.newBuilder()
+ .setLeastSigBits(this.clusterId.getLeastSignificantBits())
+ .setMostSigBits(this.clusterId.getMostSignificantBits()));
+ }
+ if (scopes != null) {
+ for (Map.Entry e : scopes.entrySet()) {
+ ByteString family = (compressionContext == null) ? ByteString.copyFrom(e.getKey())
+ : compressor.compress(e.getKey(), compressionContext.familyDict);
+ builder.addScopes(FamilyScope.newBuilder()
+ .setFamily(family).setScopeType(ScopeType.valueOf(e.getValue())));
+ }
+ }
+ return builder;
+ }
+
+ public void readFieldsFromPb(
+ WALKey walKey, WALCellCodec.ByteStringUncompressor uncompressor) throws IOException {
+ if (this.compressionContext != null) {
+ this.encodedRegionName = uncompressor.uncompress(
+ walKey.getEncodedRegionName(), compressionContext.regionDict);
+ this.tablename = uncompressor.uncompress(
+ walKey.getTableName(), compressionContext.tableDict);
+ } else {
+ this.encodedRegionName = walKey.getEncodedRegionName().toByteArray();
+ this.tablename = walKey.getTableName().toByteArray();
+ }
+ this.clusterId = HConstants.DEFAULT_CLUSTER_ID;
+ if (walKey.hasClusterId()) {
+ this.clusterId = new UUID(
+ walKey.getClusterId().getMostSigBits(), walKey.getClusterId().getLeastSigBits());
+ }
+ this.scopes = null;
+ if (walKey.getScopesCount() > 0) {
+ this.scopes = new TreeMap(Bytes.BYTES_COMPARATOR);
+ for (FamilyScope scope : walKey.getScopesList()) {
+ byte[] family = (compressionContext == null) ? scope.getFamily().toByteArray() :
+ uncompressor.uncompress(scope.getFamily(), compressionContext.familyDict);
+ this.scopes.put(family, scope.getScopeType().getNumber());
+ }
+ }
+ this.logSeqNum = walKey.getLogSequenceNumber();
+ this.writeTime = walKey.getWriteTime();
+ }
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogUtil.java
index 45ac26597e1..01e13c88813 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogUtil.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogUtil.java
@@ -36,30 +36,13 @@ import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.ServerName;
-import org.apache.hadoop.hbase.protobuf.generated.WAL.CompactionDescriptor;
+import org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.FSUtils;
public class HLogUtil {
static final Log LOG = LogFactory.getLog(HLogUtil.class);
- @SuppressWarnings("unchecked")
- public static Class extends HLogKey> getKeyClass(Configuration conf) {
- return (Class extends HLogKey>) conf.getClass(
- "hbase.regionserver.hlog.keyclass", HLogKey.class);
- }
-
- public static HLogKey newKey(Configuration conf) throws IOException {
- Class extends HLogKey> keyClass = getKeyClass(conf);
- try {
- return keyClass.newInstance();
- } catch (InstantiationException e) {
- throw new IOException("cannot create hlog key");
- } catch (IllegalAccessException e) {
- throw new IOException("cannot create hlog key");
- }
- }
-
/**
* Pattern used to validate a HLog file name
*/
@@ -76,52 +59,6 @@ public class HLogUtil {
return pattern.matcher(filename).matches();
}
- /*
- * Get a reader for the WAL.
- *
- * @param fs
- *
- * @param path
- *
- * @param conf
- *
- * @return A WAL reader. Close when done with it.
- *
- * @throws IOException
- *
- * public static HLog.Reader getReader(final FileSystem fs, final Path path,
- * Configuration conf) throws IOException { try {
- *
- * if (logReaderClass == null) {
- *
- * logReaderClass = conf.getClass("hbase.regionserver.hlog.reader.impl",
- * SequenceFileLogReader.class, Reader.class); }
- *
- *
- * HLog.Reader reader = logReaderClass.newInstance(); reader.init(fs, path,
- * conf); return reader; } catch (IOException e) { throw e; } catch (Exception
- * e) { throw new IOException("Cannot get log reader", e); } }
- *
- * * Get a writer for the WAL.
- *
- * @param path
- *
- * @param conf
- *
- * @return A WAL writer. Close when done with it.
- *
- * @throws IOException
- *
- * public static HLog.Writer createWriter(final FileSystem fs, final Path
- * path, Configuration conf) throws IOException { try { if (logWriterClass ==
- * null) { logWriterClass =
- * conf.getClass("hbase.regionserver.hlog.writer.impl",
- * SequenceFileLogWriter.class, Writer.class); } FSHLog.Writer writer =
- * (FSHLog.Writer) logWriterClass.newInstance(); writer.init(fs, path, conf);
- * return writer; } catch (Exception e) { throw new
- * IOException("cannot get log writer", e); } }
- */
-
/**
* Construct the HLog directory name
*
@@ -285,11 +222,11 @@ public class HLogUtil {
/**
* Write the marker that a compaction has succeeded and is about to be committed.
* This provides info to the HMaster to allow it to recover the compaction if
- * this regionserver dies in the middle (This part is not yet implemented). It also prevents the compaction from
- * finishing if this regionserver has already lost its lease on the log.
+ * this regionserver dies in the middle (This part is not yet implemented). It also prevents
+ * the compaction from finishing if this regionserver has already lost its lease on the log.
*/
- public static void writeCompactionMarker(HLog log, HTableDescriptor htd, HRegionInfo info, final CompactionDescriptor c)
- throws IOException {
+ public static void writeCompactionMarker(HLog log, HTableDescriptor htd, HRegionInfo info,
+ final CompactionDescriptor c) throws IOException {
WALEdit e = WALEdit.createCompaction(c);
log.append(info, c.getTableName().toByteArray(), e,
EnvironmentEdgeManager.currentTimeMillis(), htd);
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/KeyValueCompression.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/KeyValueCompression.java
index 0f9743f70a0..d14610aec86 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/KeyValueCompression.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/KeyValueCompression.java
@@ -27,12 +27,15 @@ import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.WritableUtils;
/**
+ * DO NOT USE. This class is deprecated and should only be used in pre-PB WAL.
+ *
* Compression class for {@link KeyValue}s written to the WAL. This is not
* synchronized, so synchronization should be handled outside.
*
* Class only compresses and uncompresses row keys, family names, and the
* qualifier. More may be added depending on use patterns.
*/
+@Deprecated
class KeyValueCompression {
/**
* Uncompresses a KeyValue from a DataInput and returns it.
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.java
new file mode 100644
index 00000000000..12dbb972dcc
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.java
@@ -0,0 +1,173 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.regionserver.wal;
+
+import java.io.EOFException;
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.codec.Codec;
+import org.apache.hadoop.hbase.protobuf.generated.WALProtos;
+import org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALKey;
+import org.apache.hadoop.hbase.util.Bytes;
+
+import com.google.protobuf.InvalidProtocolBufferException;
+
+/**
+ * Reader for protobuf-based WAL.
+ */
+@InterfaceAudience.Private
+public class ProtobufLogReader extends ReaderBase {
+ private static final Log LOG = LogFactory.getLog(ProtobufLogReader.class);
+ static final byte[] PB_WAL_MAGIC = Bytes.toBytes("PWAL");
+
+ private FSDataInputStream inputStream;
+ private Codec.Decoder cellDecoder;
+ private WALCellCodec.ByteStringUncompressor byteStringUncompressor;
+ private boolean hasCompression = false;
+
+ public ProtobufLogReader() {
+ super();
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (this.inputStream != null) {
+ this.inputStream.close();
+ this.inputStream = null;
+ }
+ }
+
+ @Override
+ public long getPosition() throws IOException {
+ return inputStream.getPos();
+ }
+
+ @Override
+ public void reset() throws IOException {
+ initInternal(null, false);
+ initAfterCompression(); // We need a new decoder (at least).
+ }
+
+ @Override
+ protected void initReader(FSDataInputStream stream) throws IOException {
+ initInternal(stream, true);
+ }
+
+ private void initInternal(FSDataInputStream stream, boolean isFirst) throws IOException {
+ close();
+ long expectedPos = PB_WAL_MAGIC.length;
+ if (stream == null) {
+ stream = fs.open(path);
+ stream.seek(expectedPos);
+ }
+ if (stream.getPos() != expectedPos) {
+ throw new IOException("The stream is at invalid position: " + stream.getPos());
+ }
+ // Initialize metadata or, when we reset, just skip the header.
+ WALProtos.WALHeader.Builder builder = WALProtos.WALHeader.newBuilder();
+ boolean hasHeader = builder.mergeDelimitedFrom(stream);
+ if (!hasHeader) {
+ throw new EOFException("Couldn't read WAL PB header");
+ }
+ if (isFirst) {
+ WALProtos.WALHeader header = builder.build();
+ this.hasCompression = header.hasHasCompression() && header.getHasCompression();
+ }
+ this.inputStream = stream;
+
+ }
+
+ @Override
+ protected void initAfterCompression() throws IOException {
+ WALCellCodec codec = new WALCellCodec(this.compressionContext);
+ this.cellDecoder = codec.getDecoder(this.inputStream);
+ if (this.hasCompression) {
+ this.byteStringUncompressor = codec.getByteStringUncompressor();
+ }
+ }
+
+ @Override
+ protected boolean hasCompression() {
+ return this.hasCompression;
+ }
+
+ @Override
+ protected boolean readNext(HLog.Entry entry) throws IOException {
+ WALKey.Builder builder = WALKey.newBuilder();
+ boolean hasNext = false;
+ try {
+ hasNext = builder.mergeDelimitedFrom(inputStream);
+ } catch (InvalidProtocolBufferException ipbe) {
+ LOG.error("Invalid PB while reading WAL, probably an unexpected EOF, ignoring", ipbe);
+ }
+ if (!hasNext) return false;
+ if (!builder.isInitialized()) {
+ // TODO: not clear if we should try to recover from corrupt PB that looks semi-legit.
+ // If we can get the KV count, we could, theoretically, try to get next record.
+ LOG.error("Partial PB while reading WAL, probably an unexpected EOF, ignoring");
+ return false;
+ }
+ WALKey walKey = builder.build();
+ entry.getKey().readFieldsFromPb(walKey, this.byteStringUncompressor);
+ try {
+ int expectedCells = walKey.getFollowingKvCount();
+ int actualCells = entry.getEdit().readFromCells(cellDecoder, expectedCells);
+ if (expectedCells != actualCells) {
+ throw new EOFException("Unable to read " + expectedCells + " cells, got " + actualCells);
+ }
+ } catch (EOFException ex) {
+ LOG.error("EOF while reading KVs, ignoring", ex);
+ return false;
+ } catch (Exception ex) {
+ IOException realEofEx = extractHiddenEofOrRethrow(ex);
+ LOG.error("EOF while reading KVs, ignoring", realEofEx);
+ return false;
+ }
+ return true;
+ }
+
+ private IOException extractHiddenEofOrRethrow(Exception ex) throws IOException {
+ // There are two problems we are dealing with here. Hadoop stream throws generic exception
+ // for EOF, not EOFException; and scanner further hides it inside RuntimeException.
+ IOException ioEx = null;
+ if (ex instanceof IOException) {
+ ioEx = (IOException)ex;
+ } else if (ex instanceof RuntimeException
+ && ex.getCause() != null && ex.getCause() instanceof IOException) {
+ ioEx = (IOException)ex.getCause();
+ }
+ if (ioEx != null) {
+ if (ioEx.getMessage().contains("EOF")) return ioEx;
+ throw ioEx;
+ }
+ throw new IOException(ex);
+ }
+
+ @Override
+ protected void seekOnFs(long pos) throws IOException {
+ this.inputStream.seek(pos);
+ }
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java
new file mode 100644
index 00000000000..3d81cc455d7
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java
@@ -0,0 +1,130 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.regionserver.wal;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.codec.Codec;
+import org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALHeader;
+
+/**
+ * Writer for protobuf-based WAL.
+ */
+@InterfaceAudience.Private
+public class ProtobufLogWriter implements HLog.Writer {
+ private final Log LOG = LogFactory.getLog(this.getClass());
+ private FSDataOutputStream output;
+ private Codec.Encoder cellEncoder;
+ private WALCellCodec.ByteStringCompressor compressor;
+
+
+ /** Context used by our wal dictionary compressor.
+ * Null if we're not to do our custom dictionary compression. */
+ private CompressionContext compressionContext;
+
+ public ProtobufLogWriter() {
+ super();
+ }
+
+ @Override
+ public void init(FileSystem fs, Path path, Configuration conf) throws IOException {
+ assert this.output == null;
+ boolean doCompress = conf.getBoolean(HConstants.ENABLE_WAL_COMPRESSION, false);
+ if (doCompress) {
+ try {
+ this.compressionContext = new CompressionContext(LRUDictionary.class);
+ } catch (Exception e) {
+ throw new IOException("Failed to initiate CompressionContext", e);
+ }
+ }
+ int bufferSize = fs.getConf().getInt("io.file.buffer.size", 4096);
+ short replication = (short)conf.getInt(
+ "hbase.regionserver.hlog.replication", fs.getDefaultReplication());
+ long blockSize = conf.getLong("hbase.regionserver.hlog.blocksize", fs.getDefaultBlockSize());
+ output = fs.create(path, true, bufferSize, replication, blockSize);
+ output.write(ProtobufLogReader.PB_WAL_MAGIC);
+ WALHeader.newBuilder().setHasCompression(doCompress).build().writeDelimitedTo(output);
+
+ WALCellCodec codec = new WALCellCodec(this.compressionContext);
+ this.cellEncoder = codec.getEncoder(this.output);
+ if (doCompress) {
+ this.compressor = codec.getByteStringCompressor();
+ }
+ LOG.debug("Writing protobuf WAL; path=" + path + ", compression=" + doCompress);
+ }
+
+ @Override
+ public void append(HLog.Entry entry) throws IOException {
+ entry.setCompressionContext(compressionContext);
+ entry.getKey().getBuilder(compressor).setFollowingKvCount(entry.getEdit().size())
+ .build().writeDelimitedTo(output);
+ for (KeyValue kv : entry.getEdit().getKeyValues()) {
+ // cellEncoder must assume little about the stream, since we write PB and cells in turn.
+ cellEncoder.write(kv);
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (this.output != null) {
+ try {
+ this.output.close();
+ } catch (NullPointerException npe) {
+ // Can get a NPE coming up from down in DFSClient$DFSOutputStream#close
+ LOG.warn(npe);
+ }
+ this.output = null;
+ }
+ }
+
+ @Override
+ public void sync() throws IOException {
+ try {
+ this.output.flush();
+ this.output.sync();
+ } catch (NullPointerException npe) {
+ // Concurrent close...
+ throw new IOException(npe);
+ }
+ }
+
+ @Override
+ public long getLength() throws IOException {
+ try {
+ return this.output.getPos();
+ } catch (NullPointerException npe) {
+ // Concurrent close...
+ throw new IOException(npe);
+ }
+ }
+
+ public FSDataOutputStream getStream() {
+ return this.output;
+ }
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ReaderBase.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ReaderBase.java
new file mode 100644
index 00000000000..742a3ec23d5
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ReaderBase.java
@@ -0,0 +1,137 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.regionserver.wal;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+@InterfaceAudience.Private
+public abstract class ReaderBase implements HLog.Reader {
+ protected Configuration conf;
+ protected FileSystem fs;
+ protected Path path;
+ protected long edit = 0;
+ /**
+ * Compression context to use reading. Can be null if no compression.
+ */
+ protected CompressionContext compressionContext = null;
+ protected boolean emptyCompressionContext = true;
+
+ /**
+ * Default constructor.
+ */
+ public ReaderBase() {
+ }
+
+ @Override
+ public void init(FileSystem fs, Path path, Configuration conf, FSDataInputStream stream)
+ throws IOException {
+ this.conf = conf;
+ this.path = path;
+ this.fs = fs;
+
+ initReader(stream);
+
+ boolean compression = hasCompression();
+ if (compression) {
+ // If compression is enabled, new dictionaries are created here.
+ try {
+ if (compressionContext == null) {
+ compressionContext = new CompressionContext(LRUDictionary.class);
+ } else {
+ compressionContext.clear();
+ }
+ } catch (Exception e) {
+ throw new IOException("Failed to initialize CompressionContext", e);
+ }
+ }
+ initAfterCompression();
+ }
+
+ @Override
+ public HLog.Entry next() throws IOException {
+ return next(null);
+ }
+
+ @Override
+ public HLog.Entry next(HLog.Entry reuse) throws IOException {
+ HLog.Entry e = reuse;
+ if (e == null) {
+ e = new HLog.Entry(new HLogKey(), new WALEdit());
+ }
+ if (compressionContext != null) {
+ e.setCompressionContext(compressionContext);
+ }
+
+ boolean hasEntry = readNext(e);
+ edit++;
+ if (compressionContext != null && emptyCompressionContext) {
+ emptyCompressionContext = false;
+ }
+ return hasEntry ? e : null;
+ }
+
+
+ @Override
+ public void seek(long pos) throws IOException {
+ if (compressionContext != null && emptyCompressionContext) {
+ while (next() != null) {
+ if (getPosition() == pos) {
+ emptyCompressionContext = false;
+ break;
+ }
+ }
+ }
+ seekOnFs(pos);
+ }
+
+ /**
+ * Initializes the log reader with a particular stream (may be null).
+ * Reader assumes ownership of the stream if not null and may use it. Called once.
+ */
+ protected abstract void initReader(FSDataInputStream stream) throws IOException;
+
+ /**
+ * Initializes the compression after the shared stuff has been initialized. Called once.
+ */
+ protected abstract void initAfterCompression() throws IOException;
+ /**
+ * @return Whether compression is enabled for this log.
+ */
+ protected abstract boolean hasCompression();
+
+ /**
+ * Read next entry.
+ * @param e The entry to read into.
+ * @return Whether there was anything to read.
+ */
+ protected abstract boolean readNext(HLog.Entry e) throws IOException;
+
+ /**
+ * Performs a filesystem-level seek to a certain position in an underlying file.
+ */
+ protected abstract void seekOnFs(long pos) throws IOException;
+
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogReader.java
index fea92a3555e..d2bfae587f1 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogReader.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogReader.java
@@ -23,6 +23,8 @@ import java.io.FilterInputStream;
import java.io.IOException;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
+import java.util.NavigableMap;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -31,12 +33,24 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.regionserver.wal.HLog.Entry;
import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.SequenceFile.Metadata;
@InterfaceAudience.Private
-public class SequenceFileLogReader implements HLog.Reader {
+public class SequenceFileLogReader extends ReaderBase {
private static final Log LOG = LogFactory.getLog(SequenceFileLogReader.class);
+ // Legacy stuff from pre-PB WAL metadata.
+ private static final Text WAL_VERSION_KEY = new Text("version");
+ // Let the version be 1. Let absence of a version meta tag be old, version 0.
+ // Set this version '1' to be the version that introduces compression,
+ // the COMPRESSION_VERSION.
+ private static final int COMPRESSION_VERSION = 1;
+ private static final Text WAL_COMPRESSION_TYPE_KEY = new Text("compression.type");
+ private static final Text DICTIONARY_COMPRESSION_TYPE = new Text("dictionary");
+
/**
* Hack just to set the correct file length up in SequenceFile.Reader.
* See HADOOP-6307. The below is all about setting the right length on the
@@ -49,7 +63,7 @@ public class SequenceFileLogReader implements HLog.Reader {
* this.end = in.getPos() + length;
*
*/
- static class WALReader extends SequenceFile.Reader {
+ private static class WALReader extends SequenceFile.Reader {
WALReader(final FileSystem fs, final Path p, final Configuration c)
throws IOException {
@@ -64,15 +78,6 @@ public class SequenceFileLogReader implements HLog.Reader {
bufferSize, length), length);
}
- /**
- * Call this method after init() has been executed
- *
- * @return whether WAL compression is enabled
- */
- public boolean isWALCompressionEnabled() {
- return SequenceFileLogWriter.isWALCompressionEnabled(this.getMetadata());
- }
-
/**
* Override just so can intercept first call to getPos.
*/
@@ -138,59 +143,12 @@ public class SequenceFileLogReader implements HLog.Reader {
}
}
- Configuration conf;
- WALReader reader;
- FileSystem fs;
+ // Protected for tests.
+ protected SequenceFile.Reader reader;
+ long entryStart = 0; // needed for logging exceptions
- // Needed logging exceptions
- Path path;
- int edit = 0;
- long entryStart = 0;
- boolean emptyCompressionContext = true;
- /**
- * Compression context to use reading. Can be null if no compression.
- */
- protected CompressionContext compressionContext = null;
-
- protected Class extends HLogKey> keyClass;
-
- /**
- * Default constructor.
- */
public SequenceFileLogReader() {
- }
-
- /**
- * This constructor allows a specific HLogKey implementation to override that
- * which would otherwise be chosen via configuration property.
- *
- * @param keyClass
- */
- public SequenceFileLogReader(Class extends HLogKey> keyClass) {
- this.keyClass = keyClass;
- }
-
- @Override
- public void init(FileSystem fs, Path path, Configuration conf)
- throws IOException {
- this.conf = conf;
- this.path = path;
- reader = new WALReader(fs, path, conf);
- this.fs = fs;
-
- // If compression is enabled, new dictionaries are created here.
- boolean compression = reader.isWALCompressionEnabled();
- if (compression) {
- try {
- if (compressionContext == null) {
- compressionContext = new CompressionContext(LRUDictionary.class);
- } else {
- compressionContext.clear();
- }
- } catch (Exception e) {
- throw new IOException("Failed to initialize CompressionContext", e);
- }
- }
+ super();
}
@Override
@@ -206,57 +164,70 @@ public class SequenceFileLogReader implements HLog.Reader {
}
@Override
- public HLog.Entry next() throws IOException {
- return next(null);
+ public long getPosition() throws IOException {
+ return reader != null ? reader.getPosition() : 0;
}
@Override
- public HLog.Entry next(HLog.Entry reuse) throws IOException {
- this.entryStart = this.reader.getPosition();
- HLog.Entry e = reuse;
- if (e == null) {
- HLogKey key;
- if (keyClass == null) {
- key = HLogUtil.newKey(conf);
- } else {
- try {
- key = keyClass.newInstance();
- } catch (InstantiationException ie) {
- throw new IOException(ie);
- } catch (IllegalAccessException iae) {
- throw new IOException(iae);
- }
- }
+ public void reset() throws IOException {
+ // Resetting the reader lets us see newly added data if the file is being written to
+ // We also keep the same compressionContext which was previously populated for this file
+ reader = new WALReader(fs, path, conf);
+ }
- WALEdit val = new WALEdit();
- e = new HLog.Entry(key, val);
+ @Override
+ protected void initReader(FSDataInputStream stream) throws IOException {
+ // We don't use the stream because we have to have the magic stream above.
+ if (stream != null) {
+ stream.close();
}
- boolean b = false;
+ reset();
+ }
+
+ @Override
+ protected void initAfterCompression() throws IOException {
+ // Nothing to do here
+ }
+
+ @Override
+ protected boolean hasCompression() {
+ return isWALCompressionEnabled(reader.getMetadata());
+ }
+
+ /**
+ * Call this method after init() has been executed
+ * @return whether WAL compression is enabled
+ */
+ static boolean isWALCompressionEnabled(final Metadata metadata) {
+ // Check version is >= VERSION?
+ Text txt = metadata.get(WAL_VERSION_KEY);
+ if (txt == null || Integer.parseInt(txt.toString()) < COMPRESSION_VERSION) {
+ return false;
+ }
+ // Now check that compression type is present. Currently only one value.
+ txt = metadata.get(WAL_COMPRESSION_TYPE_KEY);
+ return txt != null && txt.equals(DICTIONARY_COMPRESSION_TYPE);
+ }
+
+
+ @Override
+ protected boolean readNext(Entry e) throws IOException {
try {
- if (compressionContext != null) {
- e.setCompressionContext(compressionContext);
+ boolean hasNext = this.reader.next(e.getKey(), e.getEdit());
+ if (!hasNext) return false;
+ // Scopes are probably in WAL edit, move to key
+ NavigableMap scopes = e.getEdit().getAndRemoveScopes();
+ if (scopes != null) {
+ e.getKey().setScopes(scopes);
}
- b = this.reader.next(e.getKey(), e.getEdit());
+ return true;
} catch (IOException ioe) {
throw addFileInfoToException(ioe);
}
- edit++;
- if (compressionContext != null && emptyCompressionContext) {
- emptyCompressionContext = false;
- }
- return b? e: null;
}
@Override
- public void seek(long pos) throws IOException {
- if (compressionContext != null && emptyCompressionContext) {
- while (next() != null) {
- if (getPosition() == pos) {
- emptyCompressionContext = false;
- break;
- }
- }
- }
+ protected void seekOnFs(long pos) throws IOException {
try {
reader.seek(pos);
} catch (IOException ioe) {
@@ -264,11 +235,6 @@ public class SequenceFileLogReader implements HLog.Reader {
}
}
- @Override
- public long getPosition() throws IOException {
- return reader != null ? reader.getPosition() : 0;
- }
-
protected IOException addFileInfoToException(final IOException ioe)
throws IOException {
long pos = -1;
@@ -301,11 +267,4 @@ public class SequenceFileLogReader implements HLog.Reader {
return ioe;
}
-
- @Override
- public void reset() throws IOException {
- // Resetting the reader lets us see newly added data if the file is being written to
- // We also keep the same compressionContext which was previously populated for this file
- reader = new WALReader(fs, path, conf);
- }
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCellCodec.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCellCodec.java
new file mode 100644
index 00000000000..245abba2ddc
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCellCodec.java
@@ -0,0 +1,335 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver.wal;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.codec.BaseDecoder;
+import org.apache.hadoop.hbase.codec.BaseEncoder;
+import org.apache.hadoop.hbase.codec.Codec;
+import org.apache.hadoop.hbase.codec.KeyValueCodec;
+import org.apache.hadoop.hbase.util.Bytes;
+
+import com.google.common.base.Preconditions;
+import com.google.protobuf.ByteString;
+
+
+/**
+ * Compression in this class is lifted off Compressor/KeyValueCompression.
+ * This is a pure coincidence... they are independent and don't have to be compatible.
+ */
+public class WALCellCodec implements Codec {
+ private final CompressionContext compression;
+ private final ByteStringUncompressor statelessUncompressor = new ByteStringUncompressor() {
+ @Override
+ public byte[] uncompress(ByteString data, Dictionary dict) throws IOException {
+ return WALCellCodec.uncompressByteString(data, dict);
+ }
+ };
+
+ public WALCellCodec(CompressionContext compression) {
+ this.compression = compression;
+ }
+
+ public interface ByteStringCompressor {
+ ByteString compress(byte[] data, Dictionary dict) throws IOException;
+ }
+
+ public interface ByteStringUncompressor {
+ byte[] uncompress(ByteString data, Dictionary dict) throws IOException;
+ }
+
+ // TODO: it sucks that compression context is in HLog.Entry. It'd be nice if it was here.
+ // Dictionary could be gotten by enum; initially, based on enum, context would create
+ // an array of dictionaries.
+ static class BaosAndCompressor extends ByteArrayOutputStream implements ByteStringCompressor {
+ public ByteString toByteString() {
+ return ByteString.copyFrom(this.buf, 0, this.count);
+ }
+
+ @Override
+ public ByteString compress(byte[] data, Dictionary dict) throws IOException {
+ writeCompressed(data, dict);
+ ByteString result = ByteString.copyFrom(this.buf, 0, this.count);
+ reset(); // Only resets the count - we reuse the byte array.
+ return result;
+ }
+
+ private void writeCompressed(byte[] data, Dictionary dict) throws IOException {
+ assert dict != null;
+ short dictIdx = dict.findEntry(data, 0, data.length);
+ if (dictIdx == Dictionary.NOT_IN_DICTIONARY) {
+ write(Dictionary.NOT_IN_DICTIONARY);
+ StreamUtils.writeRawVInt32(this, data.length);
+ write(data, 0, data.length);
+ } else {
+ StreamUtils.writeShort(this, dictIdx);
+ }
+ }
+ }
+
+ private static byte[] uncompressByteString(ByteString bs, Dictionary dict) throws IOException {
+ InputStream in = bs.newInput();
+ byte status = (byte)in.read();
+ if (status == Dictionary.NOT_IN_DICTIONARY) {
+ byte[] arr = new byte[StreamUtils.readRawVarint32(in)];
+ int bytesRead = in.read(arr);
+ if (bytesRead != arr.length) {
+ throw new IOException("Cannot read; wanted " + arr.length + ", but got " + bytesRead);
+ }
+ if (dict != null) dict.addEntry(arr, 0, arr.length);
+ return arr;
+ } else {
+ // Status here is the higher-order byte of index of the dictionary entry.
+ short dictIdx = StreamUtils.toShort(status, (byte)in.read());
+ byte[] entry = dict.getEntry(dictIdx);
+ if (entry == null) {
+ throw new IOException("Missing dictionary entry for index " + dictIdx);
+ }
+ return entry;
+ }
+ }
+
+ static class CompressedKvEncoder extends BaseEncoder {
+ private final CompressionContext compression;
+ public CompressedKvEncoder(OutputStream out, CompressionContext compression) {
+ super(out);
+ this.compression = compression;
+ }
+
+ @Override
+ public void write(Cell cell) throws IOException {
+ if (!(cell instanceof KeyValue)) throw new IOException("Cannot write non-KV cells to WAL");
+ KeyValue kv = (KeyValue)cell;
+ byte[] kvBuffer = kv.getBuffer();
+ int offset = kv.getOffset();
+
+ // We first write the KeyValue infrastructure as VInts.
+ StreamUtils.writeRawVInt32(out, kv.getKeyLength());
+ StreamUtils.writeRawVInt32(out, kv.getValueLength());
+
+ // Write row, qualifier, and family; use dictionary
+ // compression as they're likely to have duplicates.
+ write(kvBuffer, kv.getRowOffset(), kv.getRowLength(), compression.rowDict);
+ write(kvBuffer, kv.getFamilyOffset(), kv.getFamilyLength(), compression.familyDict);
+ write(kvBuffer, kv.getQualifierOffset(), kv.getQualifierLength(), compression.qualifierDict);
+
+ // Write the rest uncompressed.
+ int pos = kv.getTimestampOffset();
+ int remainingLength = kv.getLength() + offset - pos;
+ out.write(kvBuffer, pos, remainingLength);
+
+ }
+
+ private void write(byte[] data, int offset, int length, Dictionary dict) throws IOException {
+ short dictIdx = Dictionary.NOT_IN_DICTIONARY;
+ if (dict != null) {
+ dictIdx = dict.findEntry(data, offset, length);
+ }
+ if (dictIdx == Dictionary.NOT_IN_DICTIONARY) {
+ out.write(Dictionary.NOT_IN_DICTIONARY);
+ StreamUtils.writeRawVInt32(out, length);
+ out.write(data, offset, length);
+ } else {
+ StreamUtils.writeShort(out, dictIdx);
+ }
+ }
+ }
+
+ static class CompressedKvDecoder extends BaseDecoder {
+ private final CompressionContext compression;
+ public CompressedKvDecoder(InputStream in, CompressionContext compression) {
+ super(in);
+ this.compression = compression;
+ }
+
+ @Override
+ protected Cell parseCell() throws IOException {
+ int keylength = StreamUtils.readRawVarint32(in);
+ int vlength = StreamUtils.readRawVarint32(in);
+ int length = KeyValue.KEYVALUE_INFRASTRUCTURE_SIZE + keylength + vlength;
+
+ byte[] backingArray = new byte[length];
+ int pos = 0;
+ pos = Bytes.putInt(backingArray, pos, keylength);
+ pos = Bytes.putInt(backingArray, pos, vlength);
+
+ // the row
+ int elemLen = readIntoArray(backingArray, pos + Bytes.SIZEOF_SHORT, compression.rowDict);
+ checkLength(elemLen, Short.MAX_VALUE);
+ pos = Bytes.putShort(backingArray, pos, (short)elemLen);
+ pos += elemLen;
+
+ // family
+ elemLen = readIntoArray(backingArray, pos + Bytes.SIZEOF_BYTE, compression.familyDict);
+ checkLength(elemLen, Byte.MAX_VALUE);
+ pos = Bytes.putByte(backingArray, pos, (byte)elemLen);
+ pos += elemLen;
+
+ // qualifier
+ elemLen = readIntoArray(backingArray, pos, compression.qualifierDict);
+ pos += elemLen;
+
+ // the rest
+ in.read(backingArray, pos, length - pos);
+ return new KeyValue(backingArray);
+ }
+
+ private int readIntoArray(byte[] to, int offset, Dictionary dict) throws IOException {
+ byte status = (byte)in.read();
+ if (status == Dictionary.NOT_IN_DICTIONARY) {
+ // status byte indicating that data to be read is not in dictionary.
+ // if this isn't in the dictionary, we need to add to the dictionary.
+ int length = StreamUtils.readRawVarint32(in);
+ in.read(to, offset, length);
+ dict.addEntry(to, offset, length);
+ return length;
+ } else {
+ // the status byte also acts as the higher order byte of the dictionary entry.
+ short dictIdx = StreamUtils.toShort(status, (byte)in.read());
+ byte[] entry = dict.getEntry(dictIdx);
+ if (entry == null) {
+ throw new IOException("Missing dictionary entry for index " + dictIdx);
+ }
+ // now we write the uncompressed value.
+ Bytes.putBytes(to, offset, entry, 0, entry.length);
+ return entry.length;
+ }
+ }
+
+ private static void checkLength(int len, int max) throws IOException {
+ if (len < 0 || len > max) {
+ throw new IOException("Invalid length for compresesed portion of keyvalue: " + len);
+ }
+ }
+ }
+
+ public class EnsureKvEncoder extends KeyValueCodec.KeyValueEncoder {
+ public EnsureKvEncoder(OutputStream out) {
+ super(out);
+ }
+ @Override
+ public void write(Cell cell) throws IOException {
+ if (!(cell instanceof KeyValue)) throw new IOException("Cannot write non-KV cells to WAL");
+ super.write(cell);
+ }
+ }
+
+ @Override
+ public Decoder getDecoder(InputStream is) {
+ return (compression == null)
+ ? new KeyValueCodec.KeyValueDecoder(is) : new CompressedKvDecoder(is, compression);
+ }
+
+ @Override
+ public Encoder getEncoder(OutputStream os) {
+ return (compression == null)
+ ? new EnsureKvEncoder(os) : new CompressedKvEncoder(os, compression);
+ }
+
+ public ByteStringCompressor getByteStringCompressor() {
+ // TODO: ideally this should also encapsulate compressionContext
+ return new BaosAndCompressor();
+ }
+
+ public ByteStringUncompressor getByteStringUncompressor() {
+ // TODO: ideally this should also encapsulate compressionContext
+ return this.statelessUncompressor;
+ }
+
+ /**
+ * It seems like as soon as somebody sets himself to the task of creating VInt encoding,
+ * his mind blanks out for a split-second and he starts the work by wrapping it in the
+ * most convoluted interface he can come up with. Custom streams that allocate memory,
+ * DataOutput that is only used to write single bytes... We operate on simple streams.
+ * Thus, we are going to have a simple implementation copy-pasted from protobuf Coded*Stream.
+ */
+ private static class StreamUtils {
+ public static int computeRawVarint32Size(final int value) {
+ if ((value & (0xffffffff << 7)) == 0) return 1;
+ if ((value & (0xffffffff << 14)) == 0) return 2;
+ if ((value & (0xffffffff << 21)) == 0) return 3;
+ if ((value & (0xffffffff << 28)) == 0) return 4;
+ return 5;
+ }
+
+ static void writeRawVInt32(OutputStream output, int value) throws IOException {
+ assert value >= 0;
+ while (true) {
+ if ((value & ~0x7F) == 0) {
+ output.write(value);
+ return;
+ } else {
+ output.write((value & 0x7F) | 0x80);
+ value >>>= 7;
+ }
+ }
+ }
+
+ static int readRawVarint32(InputStream input) throws IOException {
+ byte tmp = (byte)input.read();
+ if (tmp >= 0) {
+ return tmp;
+ }
+ int result = tmp & 0x7f;
+ if ((tmp = (byte)input.read()) >= 0) {
+ result |= tmp << 7;
+ } else {
+ result |= (tmp & 0x7f) << 7;
+ if ((tmp = (byte)input.read()) >= 0) {
+ result |= tmp << 14;
+ } else {
+ result |= (tmp & 0x7f) << 14;
+ if ((tmp = (byte)input.read()) >= 0) {
+ result |= tmp << 21;
+ } else {
+ result |= (tmp & 0x7f) << 21;
+ result |= (tmp = (byte)input.read()) << 28;
+ if (tmp < 0) {
+ // Discard upper 32 bits.
+ for (int i = 0; i < 5; i++) {
+ if (input.read() >= 0) {
+ return result;
+ }
+ }
+ throw new IOException("Malformed varint");
+ }
+ }
+ }
+ }
+ return result;
+ }
+
+ static short toShort(byte hi, byte lo) {
+ short s = (short) (((hi & 0xFF) << 8) | (lo & 0xFF));
+ Preconditions.checkArgument(s >= 0);
+ return s;
+ }
+
+ static void writeShort(OutputStream out, short v) throws IOException {
+ Preconditions.checkArgument(v >= 0);
+ out.write((byte)(0xff & (v >> 8)));
+ out.write((byte)(0xff & v));
+ }
+ }
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEdit.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEdit.java
index f9b72f2316f..f5a47af914f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEdit.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEdit.java
@@ -26,14 +26,19 @@ import java.util.List;
import java.util.NavigableMap;
import java.util.TreeMap;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.codec.Codec;
import org.apache.hadoop.hbase.io.HeapSize;
-import org.apache.hadoop.hbase.protobuf.generated.WAL.CompactionDescriptor;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ClassSize;
import org.apache.hadoop.io.Writable;
+
/**
* WALEdit: Used in HBase's transaction log (WAL) to represent
* the collection of edits (KeyValue objects) corresponding to a
@@ -70,15 +75,19 @@ import org.apache.hadoop.io.Writable;
*/
@InterfaceAudience.Private
public class WALEdit implements Writable, HeapSize {
- // TODO: Make it so user cannot make a cf w/ this name. Make the illegal cf names. Ditto for row.
+ public static final Log LOG = LogFactory.getLog(WALEdit.class);
+
+ // TODO: Get rid of this; see HBASE-8457
public static final byte [] METAFAMILY = Bytes.toBytes("METAFAMILY");
static final byte [] METAROW = Bytes.toBytes("METAROW");
static final byte[] COMPLETE_CACHE_FLUSH = Bytes.toBytes("HBASE::CACHEFLUSH");
static final byte[] COMPACTION = Bytes.toBytes("HBASE::COMPACTION");
-
private final int VERSION_2 = -1;
private final ArrayList kvs = new ArrayList();
+
+ // Only here for legacy writable deserialization
+ @Deprecated
private NavigableMap scopes;
private CompressionContext compressionContext;
@@ -115,15 +124,10 @@ public class WALEdit implements Writable, HeapSize {
return kvs;
}
- public NavigableMap getScopes() {
- return scopes;
- }
-
-
- public void setScopes (NavigableMap scopes) {
- // We currently process the map outside of WALEdit,
- // TODO revisit when replication is part of core
- this.scopes = scopes;
+ public NavigableMap getAndRemoveScopes() {
+ NavigableMap result = scopes;
+ scopes = null;
+ return result;
}
public void readFields(DataInput in) throws IOException {
@@ -141,7 +145,7 @@ public class WALEdit implements Writable, HeapSize {
this.add(KeyValueCompression.readKV(in, compressionContext));
} else {
this.add(KeyValue.create(in));
- }
+ }
}
int numFamilies = in.readInt();
if (numFamilies > 0) {
@@ -159,10 +163,10 @@ public class WALEdit implements Writable, HeapSize {
// read is actually the length of a single KeyValue
this.add(KeyValue.create(versionOrLength, in));
}
-
}
public void write(DataOutput out) throws IOException {
+ LOG.warn("WALEdit is being serialized to writable - only expected in test code");
out.writeInt(VERSION_2);
out.writeInt(kvs.size());
// We interleave the two lists for code simplicity
@@ -184,6 +188,24 @@ public class WALEdit implements Writable, HeapSize {
}
}
+ /**
+ * Reads WALEdit from cells.
+ * @param cellDecoder Cell decoder.
+ * @param expectedCount Expected cell count.
+ * @return Number of KVs read.
+ */
+ public int readFromCells(Codec.Decoder cellDecoder, int expectedCount) throws IOException {
+ kvs.clear();
+ while (kvs.size() < expectedCount && cellDecoder.advance()) {
+ Cell cell = cellDecoder.current();
+ if (!(cell instanceof KeyValue)) {
+ throw new IOException("WAL edit only supports KVs as cells");
+ }
+ kvs.add((KeyValue)cell);
+ }
+ return kvs.size();
+ }
+
public long heapSize() {
long ret = 0;
for (KeyValue kv : kvs) {
@@ -234,4 +256,5 @@ public class WALEdit implements Writable, HeapSize {
}
return null;
}
-}
\ No newline at end of file
+}
+
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
index 71aa2c76ea8..0d26e241c1d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
@@ -215,7 +215,7 @@ public class Replication implements WALActionsListener,
}
}
if (!scopes.isEmpty()) {
- logEdit.setScopes(scopes);
+ logKey.setScopes(scopes);
}
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
index 398501a0143..cb605711809 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
@@ -491,7 +491,7 @@ public class ReplicationSource extends Thread
HLogKey logKey = entry.getKey();
// don't replicate if the log entries originated in the peer
if (!logKey.getClusterId().equals(peerClusterId)) {
- removeNonReplicableEdits(edit);
+ removeNonReplicableEdits(entry);
// Don't replicate catalog entries, if the WALEdit wasn't
// containing anything to replicate and if we're currently not set to replicate
if (!(Bytes.equals(logKey.getTablename(), HConstants.ROOT_TABLE_NAME) ||
@@ -666,12 +666,12 @@ public class ReplicationSource extends Thread
/**
* We only want KVs that are scoped other than local
- * @param edit The KV to check for replication
+ * @param entry The entry to check for replication
*/
- protected void removeNonReplicableEdits(WALEdit edit) {
- NavigableMap scopes = edit.getScopes();
- List kvs = edit.getKeyValues();
- for (int i = edit.size()-1; i >= 0; i--) {
+ protected void removeNonReplicableEdits(HLog.Entry entry) {
+ NavigableMap scopes = entry.getKey().getScopes();
+ List kvs = entry.getEdit().getKeyValues();
+ for (int i = kvs.size()-1; i >= 0; i--) {
KeyValue kv = kvs.get(i);
// The scope will be null or empty if
// there's nothing to replicate in that WALEdit
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHLogRecordReader.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHLogRecordReader.java
index 57e04f28a14..485b22c82a1 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHLogRecordReader.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHLogRecordReader.java
@@ -110,34 +110,25 @@ public class TestHLogRecordReader {
*/
@Test
public void testPartialRead() throws Exception {
- HLog log = HLogFactory.createHLog(fs, hbaseDir,
- logName, conf);
+ HLog log = HLogFactory.createHLog(fs, hbaseDir, logName, conf);
long ts = System.currentTimeMillis();
WALEdit edit = new WALEdit();
- edit.add(new KeyValue(rowName, family, Bytes.toBytes("1"),
- ts, value));
- log.append(info, tableName, edit,
- ts, htd);
+ edit.add(new KeyValue(rowName, family, Bytes.toBytes("1"), ts, value));
+ log.append(info, tableName, edit, ts, htd);
edit = new WALEdit();
- edit.add(new KeyValue(rowName, family, Bytes.toBytes("2"),
- ts+1, value));
- log.append(info, tableName, edit,
- ts+1, htd);
+ edit.add(new KeyValue(rowName, family, Bytes.toBytes("2"), ts+1, value));
+ log.append(info, tableName, edit, ts+1, htd);
log.rollWriter();
Thread.sleep(1);
long ts1 = System.currentTimeMillis();
edit = new WALEdit();
- edit.add(new KeyValue(rowName, family, Bytes.toBytes("3"),
- ts1+1, value));
- log.append(info, tableName, edit,
- ts1+1, htd);
+ edit.add(new KeyValue(rowName, family, Bytes.toBytes("3"), ts1+1, value));
+ log.append(info, tableName, edit, ts1+1, htd);
edit = new WALEdit();
- edit.add(new KeyValue(rowName, family, Bytes.toBytes("4"),
- ts1+2, value));
- log.append(info, tableName, edit,
- ts1+2, htd);
+ edit.add(new KeyValue(rowName, family, Bytes.toBytes("4"), ts1+2, value));
+ log.append(info, tableName, edit, ts1+2, htd);
log.close();
HLogInputFormat input = new HLogInputFormat();
@@ -229,8 +220,11 @@ public class TestHLogRecordReader {
for (byte[] column : columns) {
assertTrue(reader.nextKeyValue());
- assertTrue(Bytes
- .equals(column, reader.getCurrentValue().getKeyValues().get(0).getQualifier()));
+ KeyValue kv = reader.getCurrentValue().getKeyValues().get(0);
+ if (!Bytes.equals(column, kv.getQualifier())) {
+ assertTrue("expected [" + Bytes.toString(column) + "], actual ["
+ + Bytes.toString(kv.getQualifier()) + "]", false);
+ }
}
assertFalse(reader.nextKeyValue());
reader.close();
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
index a40b71f369d..57f49060906 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
@@ -84,7 +84,7 @@ import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
import org.apache.hadoop.hbase.monitoring.TaskMonitor;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
-import org.apache.hadoop.hbase.protobuf.generated.WAL.CompactionDescriptor;
+import org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor;
import org.apache.hadoop.hbase.regionserver.HRegion.RegionScannerImpl;
import org.apache.hadoop.hbase.regionserver.wal.HLog;
import org.apache.hadoop.hbase.regionserver.wal.HLogFactory;
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/FaultySequenceFileLogReader.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/FaultySequenceFileLogReader.java
index b79e4385a02..2164a435818 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/FaultySequenceFileLogReader.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/FaultySequenceFileLogReader.java
@@ -41,14 +41,12 @@ public class FaultySequenceFileLogReader extends SequenceFileLogReader {
@Override
public HLog.Entry next(HLog.Entry reuse) throws IOException {
- this.entryStart = this.reader.getPosition();
+ this.entryStart = this.getPosition();
boolean b = true;
if (nextQueue.isEmpty()) { // Read the whole thing at once and fake reading
while (b == true) {
- HLogKey key = HLogUtil.newKey(conf);
- WALEdit val = new WALEdit();
- HLog.Entry e = new HLog.Entry(key, val);
+ HLog.Entry e = new HLog.Entry(new HLogKey(), new WALEdit());
if (compressionContext != null) {
e.setCompressionContext(compressionContext);
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/InstrumentedSequenceFileLogWriter.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/InstrumentedSequenceFileLogWriter.java
index 1e669a4cf35..d240e66afbc 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/InstrumentedSequenceFileLogWriter.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/InstrumentedSequenceFileLogWriter.java
@@ -22,12 +22,12 @@ import java.io.IOException;
import org.apache.hadoop.hbase.util.Bytes;
-public class InstrumentedSequenceFileLogWriter extends SequenceFileLogWriter {
+public class InstrumentedSequenceFileLogWriter extends ProtobufLogWriter {
public InstrumentedSequenceFileLogWriter() {
- super(HLogKey.class);
+ super();
}
-
+
public static boolean activateFailure = false;
@Override
public void append(HLog.Entry entry) throws IOException {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogWriter.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogWriter.java
similarity index 81%
rename from hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogWriter.java
rename to hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogWriter.java
index 88f0ebf393f..721b87c11f5 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogWriter.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogWriter.java
@@ -41,20 +41,10 @@ import org.apache.hadoop.io.compress.DefaultCodec;
/**
* Implementation of {@link HLog.Writer} that delegates to
- * SequenceFile.Writer.
+ * SequenceFile.Writer. Legacy implementation only used for compat tests.
*/
@InterfaceAudience.Private
public class SequenceFileLogWriter implements HLog.Writer {
- static final Text WAL_VERSION_KEY = new Text("version");
- // Let the version be 1. Let absence of a version meta tag be old, version 0.
- // Set this version '1' to be the version that introduces compression,
- // the COMPRESSION_VERSION.
- private static final int COMPRESSION_VERSION = 1;
- static final int VERSION = COMPRESSION_VERSION;
- static final Text WAL_VERSION = new Text("" + VERSION);
- static final Text WAL_COMPRESSION_TYPE_KEY = new Text("compression.type");
- static final Text DICTIONARY_COMPRESSION_TYPE = new Text("dictionary");
-
private final Log LOG = LogFactory.getLog(this.getClass());
// The sequence file we delegate to.
private SequenceFile.Writer writer;
@@ -62,8 +52,11 @@ public class SequenceFileLogWriter implements HLog.Writer {
// in the SequenceFile.Writer 'writer' instance above.
private FSDataOutputStream writer_out;
- private Class extends HLogKey> keyClass;
-
+ // Legacy stuff from pre-PB WAL metadata.
+ private static final Text WAL_VERSION_KEY = new Text("version");
+ private static final Text WAL_COMPRESSION_TYPE_KEY = new Text("compression.type");
+ private static final Text DICTIONARY_COMPRESSION_TYPE = new Text("dictionary");
+
/**
* Context used by our wal dictionary compressor. Null if we're not to do
* our custom dictionary compression. This custom WAL compression is distinct
@@ -78,16 +71,6 @@ public class SequenceFileLogWriter implements HLog.Writer {
super();
}
- /**
- * This constructor allows a specific HLogKey implementation to override that
- * which would otherwise be chosen via configuration property.
- *
- * @param keyClass
- */
- public SequenceFileLogWriter(Class extends HLogKey> keyClass) {
- this.keyClass = keyClass;
- }
-
/**
* Create sequence file Metadata for our WAL file with version and compression
* type (if any).
@@ -98,7 +81,7 @@ public class SequenceFileLogWriter implements HLog.Writer {
private static Metadata createMetadata(final Configuration conf,
final boolean compress) {
TreeMap metaMap = new TreeMap();
- metaMap.put(WAL_VERSION_KEY, WAL_VERSION);
+ metaMap.put(WAL_VERSION_KEY, new Text("1"));
if (compress) {
// Currently we only do one compression type.
metaMap.put(WAL_COMPRESSION_TYPE_KEY, DICTIONARY_COMPRESSION_TYPE);
@@ -106,22 +89,6 @@ public class SequenceFileLogWriter implements HLog.Writer {
return new Metadata(metaMap);
}
- /**
- * Call this method after init() has been executed
- *
- * @return whether WAL compression is enabled
- */
- static boolean isWALCompressionEnabled(final Metadata metadata) {
- // Check version is >= VERSION?
- Text txt = metadata.get(WAL_VERSION_KEY);
- if (txt == null || Integer.parseInt(txt.toString()) < COMPRESSION_VERSION) {
- return false;
- }
- // Now check that compression type is present. Currently only one value.
- txt = metadata.get(WAL_COMPRESSION_TYPE_KEY);
- return txt != null && txt.equals(DICTIONARY_COMPRESSION_TYPE);
- }
-
@Override
public void init(FileSystem fs, Path path, Configuration conf)
throws IOException {
@@ -139,10 +106,6 @@ public class SequenceFileLogWriter implements HLog.Writer {
}
}
- if (null == keyClass) {
- keyClass = HLogUtil.getKeyClass(conf);
- }
-
// Create a SF.Writer instance.
try {
// reflection for a version of SequenceFile.createWriter that doesn't
@@ -152,8 +115,7 @@ public class SequenceFileLogWriter implements HLog.Writer {
Configuration.class, Path.class, Class.class, Class.class,
Integer.TYPE, Short.TYPE, Long.TYPE, Boolean.TYPE,
CompressionType.class, CompressionCodec.class, Metadata.class})
- .invoke(null, new Object[] {fs, conf, path, HLogUtil.getKeyClass(conf),
- WALEdit.class,
+ .invoke(null, new Object[] {fs, conf, path, HLogKey.class, WALEdit.class,
Integer.valueOf(fs.getConf().getInt("io.file.buffer.size", 4096)),
Short.valueOf((short)
conf.getInt("hbase.regionserver.hlog.replication",
@@ -175,7 +137,7 @@ public class SequenceFileLogWriter implements HLog.Writer {
if (this.writer == null) {
LOG.debug("new createWriter -- HADOOP-6840 -- not available");
this.writer = SequenceFile.createWriter(fs, conf, path,
- HLogUtil.getKeyClass(conf), WALEdit.class,
+ HLogKey.class, WALEdit.class,
fs.getConf().getInt("io.file.buffer.size", 4096),
(short) conf.getInt("hbase.regionserver.hlog.replication",
fs.getDefaultReplication()),
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java
index 3c24a22aad2..e025d7d2689 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java
@@ -754,6 +754,73 @@ public class TestHLog {
log.append(hri, tableName, cols, timestamp, htd);
}
}
+
+
+ /**
+ * @throws IOException
+ */
+ @Test
+ public void testReadLegacyLog() throws IOException {
+ final int columnCount = 5;
+ final int recordCount = 5;
+ final byte[] tableName = Bytes.toBytes("tablename");
+ final byte[] row = Bytes.toBytes("row");
+ long timestamp = System.currentTimeMillis();
+ Path path = new Path(dir, "temphlog");
+ SequenceFileLogWriter sflw = null;
+ HLog.Reader reader = null;
+ try {
+ HRegionInfo hri = new HRegionInfo(tableName,
+ HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW);
+ HTableDescriptor htd = new HTableDescriptor(tableName);
+ fs.mkdirs(dir);
+ // Write log in pre-PB format.
+ sflw = new SequenceFileLogWriter();
+ sflw.init(fs, path, conf);
+ for (int i = 0; i < recordCount; ++i) {
+ HLogKey key = new HLogKey(
+ hri.getEncodedNameAsBytes(), tableName, i, timestamp, HConstants.DEFAULT_CLUSTER_ID);
+ WALEdit edit = new WALEdit();
+ for (int j = 0; j < columnCount; ++j) {
+ if (i == 0) {
+ htd.addFamily(new HColumnDescriptor("column" + j));
+ }
+ String value = i + "" + j;
+ edit.add(new KeyValue(row, row, row, timestamp, Bytes.toBytes(value)));
+ }
+ sflw.append(new HLog.Entry(key, edit));
+ }
+ sflw.sync();
+ sflw.close();
+
+ // Now read the log using standard means.
+ reader = HLogFactory.createReader(fs, path, conf);
+ assertTrue(reader instanceof SequenceFileLogReader);
+ for (int i = 0; i < recordCount; ++i) {
+ HLog.Entry entry = reader.next();
+ assertNotNull(entry);
+ assertEquals(columnCount, entry.getEdit().size());
+ assertArrayEquals(hri.getEncodedNameAsBytes(), entry.getKey().getEncodedRegionName());
+ assertArrayEquals(tableName, entry.getKey().getTablename());
+ int idx = 0;
+ for (KeyValue val : entry.getEdit().getKeyValues()) {
+ assertTrue(Bytes.equals(row, val.getRow()));
+ String value = i + "" + idx;
+ assertArrayEquals(Bytes.toBytes(value), val.getValue());
+ idx++;
+ }
+ }
+ HLog.Entry entry = reader.next();
+ assertNull(entry);
+ } finally {
+ if (sflw != null) {
+ sflw.close();
+ }
+ if (reader != null) {
+ reader.close();
+ }
+ }
+ }
static class DumbWALActionsListener implements WALActionsListener {
int increments = 0;
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java
index 28586791e5f..9438fb929db 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java
@@ -652,7 +652,7 @@ public class TestHLogSplit {
int actualCount = 0;
HLog.Reader in = HLogFactory.createReader(fs, splitLog, conf);
@SuppressWarnings("unused")
- HLog.Entry entry;
+ HLog.Entry entry;
while ((entry = in.next()) != null) ++actualCount;
assertEquals(entryCount-1, actualCount);
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALActionsListener.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALActionsListener.java
index 819721c83be..86047add863 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALActionsListener.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALActionsListener.java
@@ -100,8 +100,7 @@ public class TestWALActionsListener {
HTableDescriptor htd = new HTableDescriptor();
htd.addFamily(new HColumnDescriptor(b));
- HLogKey key = new HLogKey(b,b, 0, 0, HConstants.DEFAULT_CLUSTER_ID);
- hlog.append(hri, key, edit, htd, true);
+ hlog.append(hri, b, edit, 0, htd);
if (i == 10) {
hlog.registerWALActionsListener(laterobserver);
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java
index b79cf91d047..0ffd132413c 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java
@@ -827,7 +827,7 @@ public class TestWALReplay {
"The sequence number of the recoverd.edits and the current edit seq should be same",
lastestSeqNumber, editCount);
}
-
+
static class MockHLog extends FSHLog {
boolean doCompleteCacheFlush = false;
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
index eef6d9252e4..e6a514e2d93 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
@@ -191,7 +191,7 @@ public class TestReplicationSourceManager {
LOG.info(i);
HLogKey key = new HLogKey(hri.getRegionName(), test, seq++,
System.currentTimeMillis(), HConstants.DEFAULT_CLUSTER_ID);
- hlog.append(hri, key, edit, htd, true);
+ hlog.append(hri, test, edit, System.currentTimeMillis(), htd);
}
// Simulate a rapid insert that's followed
@@ -202,9 +202,7 @@ public class TestReplicationSourceManager {
LOG.info(baseline + " and " + time);
for (int i = 0; i < 3; i++) {
- HLogKey key = new HLogKey(hri.getRegionName(), test, seq++,
- System.currentTimeMillis(), HConstants.DEFAULT_CLUSTER_ID);
- hlog.append(hri, key, edit, htd, true);
+ hlog.append(hri, test, edit, System.currentTimeMillis(), htd);
}
assertEquals(6, manager.getHLogs().get(slaveId).size());
@@ -214,9 +212,7 @@ public class TestReplicationSourceManager {
manager.logPositionAndCleanOldLogs(manager.getSources().get(0).getCurrentPath(),
"1", 0, false, false);
- HLogKey key = new HLogKey(hri.getRegionName(), test, seq++,
- System.currentTimeMillis(), HConstants.DEFAULT_CLUSTER_ID);
- hlog.append(hri, key, edit, htd, true);
+ hlog.append(hri, test, edit, System.currentTimeMillis(), htd);
assertEquals(1, manager.getHLogs().size());