HBASE-5778 Fix HLog compression's incompatibilities
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1424174 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
@ -144,7 +144,12 @@ public class Compressor {
// the status byte also acts as the higher order byte of the dictionary
// entry
short dictIdx = toShort(status, in.readByte());
byte[] entry = dict.getEntry(dictIdx);
byte[] entry;
try {
entry = dict.getEntry(dictIdx);
} catch (Exception ex) {
throw new IOException("Unable to uncompress the log entry", ex);
if (entry == null) {
throw new IOException("Missing dictionary entry for index "
+ dictIdx);
@ -71,6 +71,7 @@ public interface HLog {
void seek(long pos) throws IOException;
long getPosition() throws IOException;
void reset() throws IOException;
public interface Writer {
@ -62,7 +62,9 @@ public class HLogFactory {
* Create a reader for the WAL.
* Create a reader for the WAL. If you are reading from a file that's being written to
* and need to reopen it multiple times, use {@link HLog.Reader#reset()} instead of this method
* then just seek back to the last known good position.
* @return A WAL reader. Close when done with it.
* @throws IOException
@ -140,15 +140,17 @@ public class SequenceFileLogReader implements HLog.Reader {
Configuration conf;
WALReader reader;
FileSystem fs;
// Needed logging exceptions
Path path;
int edit = 0;
long entryStart = 0;
boolean emptyCompressionContext = true;
* Compression context to use reading. Can be null if no compression.
private CompressionContext compressionContext = null;
protected CompressionContext compressionContext = null;
protected Class<? extends HLogKey> keyClass;
@ -174,6 +176,7 @@ public class SequenceFileLogReader implements HLog.Reader {
this.conf = conf;
this.path = path;
reader = new WALReader(fs, path, conf);
this.fs = fs;
// If compression is enabled, new dictionaries are created here.
boolean compression = reader.isWALCompressionEnabled();
@ -238,11 +241,22 @@ public class SequenceFileLogReader implements HLog.Reader {
throw addFileInfoToException(ioe);
if (compressionContext != null && emptyCompressionContext) {
emptyCompressionContext = false;
return b? e: null;
public void seek(long pos) throws IOException {
if (compressionContext != null && emptyCompressionContext) {
while (next() != null) {
if (getPosition() == pos) {
emptyCompressionContext = false;
try {
} catch (IOException ioe) {
@ -287,4 +301,11 @@ public class SequenceFileLogReader implements HLog.Reader {
return ioe;
public void reset() throws IOException {
// Resetting the reader lets us see newly added data if the file is being written to
// We also keep the same compressionContext which was previously populated for this file
reader = new WALReader(fs, path, conf);
@ -0,0 +1,137 @@
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.hadoop.hbase.replication.regionserver;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.regionserver.wal.HLog;
import org.apache.hadoop.hbase.regionserver.wal.HLogFactory;
import java.io.IOException;
* Wrapper class around HLog to help manage the implementation details
* such as compression.
public class ReplicationHLogReaderManager {
private static final Log LOG = LogFactory.getLog(ReplicationHLogReaderManager.class);
private final FileSystem fs;
private final Configuration conf;
private long position = 0;
private HLog.Reader reader;
private Path lastPath;
* Creates the helper but doesn't open any file
* Use setInitialPosition after using the constructor if some content needs to be skipped
* @param fs
* @param conf
public ReplicationHLogReaderManager(FileSystem fs, Configuration conf) {
this.fs = fs;
this.conf = conf;
* Opens the file at the current position
* @param path
* @return
* @throws IOException
public HLog.Reader openReader(Path path) throws IOException {
// Detect if this is a new file, if so get a new reader else
// reset the current reader so that we see the new data
if (this.reader == null || !this.lastPath.equals(path)) {
this.reader = HLogFactory.createReader(this.fs, path, this.conf);
this.lastPath = path;
} else {
return this.reader;
* Get the next entry, returned and also added in the array
* @param entriesArray
* @param currentNbEntries
* @return a new entry or null
* @throws IOException
public HLog.Entry readNextAndSetPosition(HLog.Entry[] entriesArray,
int currentNbEntries) throws IOException {
HLog.Entry entry = this.reader.next(entriesArray[currentNbEntries]);
// Store the position so that in the future the reader can start
// reading from here. If the above call to next() throws an
// exception, the position won't be changed and retry will happen
// from the last known good position
this.position = this.reader.getPosition();
// We need to set the CC to null else it will be compressed when sent to the sink
if (entry != null) {
return entry;
* Advance the reader to the current position
* @throws IOException
public void seek() throws IOException {
if (this.position != 0) {
* Get the position that we stopped reading at
* @return current position, cannot be negative
public long getPosition() {
return this.position;
public void setPosition(long pos) {
this.position = pos;
* Close the current reader
* @throws IOException
public void closeReader() throws IOException {
if (this.reader != null) {
* Tell the helper to reset internal state
public void finishCurrentFile() {
this.position = 0;
this.reader = null;
@ -107,8 +107,6 @@ public class ReplicationSource extends Thread
private int replicationQueueNbCapacity;
// Our reader for the current log
private HLog.Reader reader;
// Current position in the log
private long position = 0;
// Last position in the log that we sent to ZooKeeper
private long lastLoggedPosition = -1;
// Path of the current log
@ -134,10 +132,14 @@ public class ReplicationSource extends Thread
private int currentNbEntries = 0;
// Current number of operations (Put/Delete) that we need to replicate
private int currentNbOperations = 0;
// Current size of data we need to replicate
private int currentSize = 0;
// Indicates if this particular source is running
private volatile boolean running = true;
// Metrics for this source
private MetricsSource metrics;
// Handle on the log reader helper
private ReplicationHLogReaderManager repLogReader;
* Instantiation method used by region servers
@ -185,7 +187,7 @@ public class ReplicationSource extends Thread
this.conf.getLong("replication.source.sleepforretries", 1000);
this.fs = fs;
this.metrics = new MetricsSource(peerClusterZnode);
this.repLogReader = new ReplicationHLogReaderManager(this.fs, this.conf);
try {
this.clusterId = zkHelper.getUUIDForCluster(zkHelper.getZookeeperWatcher());
} catch (KeeperException ke) {
@ -266,8 +268,8 @@ public class ReplicationSource extends Thread
// normally has a position (unless the RS failed between 2 logs)
if (this.queueRecovered) {
try {
this.position = this.zkHelper.getHLogRepPosition(
this.peerClusterZnode, this.queue.peek().getName());
this.peerClusterZnode, this.queue.peek().getName()));
} catch (KeeperException e) {
this.terminate("Couldn't get the position of this recovered queue " +
peerClusterZnode, e);
@ -325,6 +327,7 @@ public class ReplicationSource extends Thread
boolean gotIOE = false;
currentNbEntries = 0;
currentSize = 0;
try {
if (readAllEntriesToReplicateOrNextFile(currentWALisBeingWrittenTo)) {
@ -360,9 +363,7 @@ public class ReplicationSource extends Thread
} finally {
try {
if (this.reader != null) {
} catch (IOException e) {
gotIOE = true;
LOG.warn("Unable to finalize the tailing of a file", e);
@ -373,10 +374,11 @@ public class ReplicationSource extends Thread
// wait a bit and retry.
// But if we need to stop, don't bother sleeping
if (this.isActive() && (gotIOE || currentNbEntries == 0)) {
if (this.lastLoggedPosition != this.position) {
if (this.lastLoggedPosition != this.repLogReader.getPosition()) {
this.peerClusterZnode, this.position, queueRecovered, currentWALisBeingWrittenTo);
this.lastLoggedPosition = this.position;
this.peerClusterZnode, this.repLogReader.getPosition(),
queueRecovered, currentWALisBeingWrittenTo);
this.lastLoggedPosition = this.repLogReader.getPosition();
if (sleepForRetries("Nothing to replicate", sleepMultiplier)) {
@ -409,11 +411,9 @@ public class ReplicationSource extends Thread
protected boolean readAllEntriesToReplicateOrNextFile(boolean currentWALisBeingWrittenTo)
throws IOException{
long seenEntries = 0;
if (this.position != 0) {
long startPosition = this.position;
HLog.Entry entry = readNextAndSetPosition();
HLog.Entry entry =
this.repLogReader.readNextAndSetPosition(this.entriesArray, this.currentNbEntries);
while (entry != null) {
WALEdit edit = entry.getEdit();
@ -437,18 +437,18 @@ public class ReplicationSource extends Thread
currentNbOperations += countDistinctRowKeys(edit);
currentSize += entry.getEdit().size();
} else {
// Stop if too many entries or too big
if ((this.reader.getPosition() - startPosition)
>= this.replicationQueueSizeCapacity ||
if (currentSize >= this.replicationQueueSizeCapacity ||
currentNbEntries >= this.replicationQueueNbCapacity) {
try {
entry = readNextAndSetPosition();
entry = this.repLogReader.readNextAndSetPosition(this.entriesArray, this.currentNbEntries);
} catch (IOException ie) {
LOG.debug("Break on IOE: " + ie.getMessage());
@ -462,16 +462,6 @@ public class ReplicationSource extends Thread
return seenEntries == 0 && processEndOfFile();
private HLog.Entry readNextAndSetPosition() throws IOException {
HLog.Entry entry = this.reader.next(entriesArray[currentNbEntries]);
// Store the position so that in the future the reader can start
// reading from here. If the above call to next() throws an
// exception, the position won't be changed and retry will happen
// from the last known good position
this.position = this.reader.getPosition();
return entry;
private void connectToPeers() {
// Connect to peer cluster first, unless we have to stop
while (this.isActive() && this.currentPeers.size() == 0) {
@ -510,9 +500,7 @@ public class ReplicationSource extends Thread
protected boolean openReader(int sleepMultiplier) {
try {
try {
this.reader = null;
this.reader = HLogFactory.createReader(this.fs,
this.currentPath, this.conf);
this.reader = repLogReader.openReader(this.currentPath);
} catch (FileNotFoundException fnfe) {
if (this.queueRecovered) {
// We didn't find the log in the archive directory, look if it still
@ -648,10 +636,11 @@ public class ReplicationSource extends Thread
AdminProtocol rrs = getRS();
Arrays.copyOf(this.entriesArray, currentNbEntries));
if (this.lastLoggedPosition != this.position) {
if (this.lastLoggedPosition != this.repLogReader.getPosition()) {
this.peerClusterZnode, this.position, queueRecovered, currentWALisBeingWrittenTo);
this.lastLoggedPosition = this.position;
this.peerClusterZnode, this.repLogReader.getPosition(),
queueRecovered, currentWALisBeingWrittenTo);
this.lastLoggedPosition = this.repLogReader.getPosition();
this.totalReplicatedEdits += currentNbEntries;
@ -718,7 +707,8 @@ public class ReplicationSource extends Thread
protected boolean processEndOfFile() {
if (this.queue.size() != 0) {
this.currentPath = null;
this.position = 0;
this.reader = null;
return true;
} else if (this.queueRecovered) {
@ -842,8 +832,14 @@ public class ReplicationSource extends Thread
public String getStats() {
String position;
try {
position = this.reader.getPosition()+"";
} catch (IOException ioe) {
position = "N/A";
return "Total replicated edits: " + totalReplicatedEdits +
", currently replicating from: " + this.currentPath +
" at position: " + this.position;
" at position: " + position;
@ -49,6 +49,9 @@ public class FaultySequenceFileLogReader extends SequenceFileLogReader {
HLogKey key = HLogUtil.newKey(conf);
WALEdit val = new WALEdit();
HLog.Entry e = new HLog.Entry(key, val);
if (compressionContext != null) {
b = this.reader.next(e.getKey(), e.getEdit());
@ -477,15 +477,13 @@ public class TestHLog {
throw t.exception;
// Make sure you can read all the content
SequenceFile.Reader reader
= new SequenceFile.Reader(this.fs, walPath, this.conf);
HLog.Reader reader = HLogFactory.createReader(this.fs, walPath, this.conf);
int count = 0;
HLogKey key = HLogUtil.newKey(conf);
WALEdit val = new WALEdit();
while (reader.next(key, val)) {
HLog.Entry entry = new HLog.Entry();
while (reader.next(entry) != null) {
assertTrue("Should be one KeyValue per WALEdit",
val.getKeyValues().size() == 1);
entry.getEdit().getKeyValues().size() == 1);
assertEquals(total, count);
@ -100,7 +100,7 @@ public class TestHLogSplit {
private Configuration conf;
private FileSystem fs;
private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
protected final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
private static final Path HBASEDIR = new Path("/hbase");
@ -0,0 +1,35 @@
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.hadoop.hbase.regionserver.wal;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.LargeTests;
import org.junit.BeforeClass;
import org.junit.experimental.categories.Category;
public class TestHLogSplitCompressed extends TestHLogSplit {
public static void setUpBeforeClass() throws Exception {
TEST_UTIL.getConfiguration().setBoolean(HConstants.ENABLE_WAL_COMPRESSION, true);
@ -60,7 +60,7 @@ public class TestReplication {
private static final Log LOG = LogFactory.getLog(TestReplication.class);
private static Configuration conf1;
protected static Configuration conf1 = HBaseConfiguration.create();
private static Configuration conf2;
private static Configuration CONF_WITH_LOCALFS;
@ -90,7 +90,6 @@ public class TestReplication {
public static void setUpBeforeClass() throws Exception {
conf1 = HBaseConfiguration.create();
conf1.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/1");
// smaller block size and capacity to trigger more operations
// and test them
@ -520,7 +519,7 @@ public class TestReplication {
// disable and start the peer
utility2.startMiniHBaseCluster(1, 1);
utility2.startMiniHBaseCluster(1, 2);
Get get = new Get(rowkey);
for (int i = 0; i < NB_RETRIES; i++) {
Result res = htable2.get(get);
@ -760,7 +759,8 @@ public class TestReplication {
int lastCount = 0;
final long start = System.currentTimeMillis();
for (int i = 0; i < NB_RETRIES; i++) {
int i = 0;
while (true) {
if (i==NB_RETRIES-1) {
fail("Waited too much time for queueFailover replication. " +
"Waited "+(System.currentTimeMillis() - start)+"ms.");
@ -772,6 +772,8 @@ public class TestReplication {
if (res2.length < initialCount) {
if (lastCount < res2.length) {
i--; // Don't increment timeout if we make progress
} else {
lastCount = res2.length;
LOG.info("Only got " + lastCount + " rows instead of " +
@ -791,7 +793,7 @@ public class TestReplication {
} catch (Exception e) {
LOG.error("Couldn't kill a region server", e);
@ -0,0 +1,40 @@
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.hadoop.hbase.replication;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.LargeTests;
import org.junit.BeforeClass;
import org.junit.experimental.categories.Category;
* Run the same test as TestReplication but with HLog compression enabled
public class TestReplicationWithCompression extends TestReplication {
* @throws java.lang.Exception
public static void setUpBeforeClass() throws Exception {
conf1.setBoolean(HConstants.ENABLE_WAL_COMPRESSION, true);
Reference in New Issue
Block a user