HBASE-25994 Active WAL tailing fails when WAL value compression is enabled (#3377)

Depending on which compression codec is used, a short read of the
compressed bytes can cause catastrophic errors that confuse the WAL reader.
This problem can manifest when the reader is actively tailing the WAL for
replication. To avoid these issues when WAL value compression is enabled,
BoundedDelegatingInputStream should assume enough bytes are available to
supply a reader up to its bound. This behavior is valid per the contract
of available(), which provides an _estimate_ of available bytes, and
equivalent to IOUtils.readFully but without requiring an intermediate
buffer.

Added TestReplicationCompressedWAL and TestReplicationValueCompressedWAL.
Without the WALCellCodec change TestReplicationValueCompressedWAL will
fail.

Signed-off-by: Bharath Vissapragada <bharathv@apache.org>
This commit is contained in:
Andrew Purtell 2021-06-14 17:16:31 -07:00
parent ef9149bf78
commit a1d59bf1a5
3 changed files with 176 additions and 5 deletions

View File

@ -95,17 +95,21 @@ public class BoundedDelegatingInputStream extends DelegatingInputStream {
}
/**
* Call the delegate's {@code available()} method.
* @return the delegate's available bytes if the current position is less than the limit,
* or 0 otherwise
* @return the remaining bytes within the bound if the current position is less than the
* limit, or 0 otherwise.
*/
@Override
public int available() throws IOException {
if (pos >= limit) {
return 0;
}
int available = in.available();
return (int) Math.min(available, limit - pos);
// Do not call the delegate's available() method. Data in a bounded input stream is assumed
// available up to the limit and that is the contract we have with our callers. Regardless
// of what we do here, read() and skip() will behave as expected when EOF is encountered if
// the underlying stream is closed early or otherwise could not provide enough bytes.
// Note: This class is used to supply buffers to compression codecs during WAL tailing and
// successful decompression depends on this behavior.
return (int) (limit - pos);
}
}

View File

@ -0,0 +1,108 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication.regionserver;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.fail;
import java.io.IOException;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.replication.TestReplicationBase;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@Category(MediumTests.class)
public class TestReplicationCompressedWAL extends TestReplicationBase {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestReplicationCompressedWAL.class);
static final Logger LOG = LoggerFactory.getLogger(TestReplicationCompressedWAL.class);
static final int NUM_BATCHES = 20;
static final int NUM_ROWS_PER_BATCH = 100;
@BeforeClass
public static void setUpBeforeClass() throws Exception {
CONF1.setBoolean(HConstants.ENABLE_WAL_COMPRESSION, true);
TestReplicationBase.setUpBeforeClass();
}
@AfterClass
public static void tearDownAfterClass() throws Exception {
TestReplicationBase.tearDownAfterClass();
}
@Test
public void testMultiplePuts() throws Exception {
runMultiplePutTest();
}
protected static void runMultiplePutTest() throws IOException, InterruptedException {
for (int i = 0; i < NUM_BATCHES; i++) {
putBatch(i);
getBatch(i);
}
}
protected static void getBatch(int batch) throws IOException, InterruptedException {
for (int i = 0; i < NUM_ROWS_PER_BATCH; i++) {
byte[] row = getRowKey(batch, i);
Get get = new Get(row);
for (int j = 0; j < NB_RETRIES; j++) {
if (j == NB_RETRIES - 1) {
fail("Waited too much time for replication");
}
Result res = htable2.get(get);
if (res.isEmpty()) {
LOG.info("Row not available");
Thread.sleep(SLEEP_TIME);
} else {
assertArrayEquals(row, res.value());
break;
}
}
}
}
protected static byte[] getRowKey(int batch, int count) {
return Bytes.toBytes("row" + ((batch * NUM_ROWS_PER_BATCH) + count));
}
protected static void putBatch(int batch) throws IOException {
for (int i = 0; i < NUM_ROWS_PER_BATCH; i++) {
byte[] row = getRowKey(batch, i);
Put put = new Put(row);
put.addColumn(famName, row, row);
htable1.put(put);
}
}
}

View File

@ -0,0 +1,59 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication.regionserver;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.regionserver.wal.CompressionContext;
import org.apache.hadoop.hbase.replication.TestReplicationBase;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@Category(MediumTests.class)
public class TestReplicationValueCompressedWAL extends TestReplicationBase {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestReplicationValueCompressedWAL.class);
static final Logger LOG = LoggerFactory.getLogger(TestReplicationValueCompressedWAL.class);
@BeforeClass
public static void setUpBeforeClass() throws Exception {
CONF1.setBoolean(HConstants.ENABLE_WAL_COMPRESSION, true);
CONF1.setBoolean(CompressionContext.ENABLE_WAL_VALUE_COMPRESSION, true);
TestReplicationBase.setUpBeforeClass();
}
@AfterClass
public static void tearDownAfterClass() throws Exception {
TestReplicationBase.tearDownAfterClass();
}
@Test
public void testMultiplePuts() throws Exception {
TestReplicationCompressedWAL.runMultiplePutTest();
}
}