HBASE-25994 Active WAL tailing fails when WAL value compression is enabled (#3377)
Depending on which compression codec is used, a short read of the compressed bytes can cause catastrophic errors that confuse the WAL reader. This problem can manifest when the reader is actively tailing the WAL for replication. To avoid these issues when WAL value compression is enabled, BoundedDelegatingInputStream should assume enough bytes are available to supply a reader up to its bound. This behavior is valid per the contract of available(), which provides an _estimate_ of available bytes, and equivalent to IOUtils.readFully but without requiring an intermediate buffer. Added TestReplicationCompressedWAL and TestReplicationValueCompressedWAL. Without the WALCellCodec change TestReplicationValueCompressedWAL will fail. Signed-off-by: Bharath Vissapragada <bharathv@apache.org>
This commit is contained in:
parent
a35ec994b9
commit
97f90e0be2
|
@ -95,8 +95,7 @@ public class BoundedDelegatingInputStream extends DelegatingInputStream {
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Call the delegate's {@code available()} method.
|
* @return the remaining bytes within the bound if the current position is less than the
|
||||||
* @return the delegate's available bytes if the current position is less than the
|
|
||||||
* limit, or 0 otherwise.
|
* limit, or 0 otherwise.
|
||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
|
@ -104,8 +103,13 @@ public class BoundedDelegatingInputStream extends DelegatingInputStream {
|
||||||
if (pos >= limit) {
|
if (pos >= limit) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
int available = in.available();
|
// Do not call the delegate's available() method. Data in a bounded input stream is assumed
|
||||||
return (int) Math.min(available, limit - pos);
|
// available up to the limit and that is the contract we have with our callers. Regardless
|
||||||
|
// of what we do here, read() and skip() will behave as expected when EOF is encountered if
|
||||||
|
// the underlying stream is closed early or otherwise could not provide enough bytes.
|
||||||
|
// Note: This class is used to supply buffers to compression codecs during WAL tailing and
|
||||||
|
// successful decompression depends on this behavior.
|
||||||
|
return (int) (limit - pos);
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,108 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.replication.regionserver;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertArrayEquals;
|
||||||
|
import static org.junit.Assert.fail;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
|
||||||
|
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||||
|
import org.apache.hadoop.hbase.HConstants;
|
||||||
|
import org.apache.hadoop.hbase.client.Get;
|
||||||
|
import org.apache.hadoop.hbase.client.Put;
|
||||||
|
import org.apache.hadoop.hbase.client.Result;
|
||||||
|
import org.apache.hadoop.hbase.replication.TestReplicationBase;
|
||||||
|
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||||
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
|
import org.junit.AfterClass;
|
||||||
|
import org.junit.BeforeClass;
|
||||||
|
import org.junit.ClassRule;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.junit.experimental.categories.Category;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
@Category(MediumTests.class)
|
||||||
|
public class TestReplicationCompressedWAL extends TestReplicationBase {
|
||||||
|
|
||||||
|
@ClassRule
|
||||||
|
public static final HBaseClassTestRule CLASS_RULE =
|
||||||
|
HBaseClassTestRule.forClass(TestReplicationCompressedWAL.class);
|
||||||
|
|
||||||
|
static final Logger LOG = LoggerFactory.getLogger(TestReplicationCompressedWAL.class);
|
||||||
|
static final int NUM_BATCHES = 20;
|
||||||
|
static final int NUM_ROWS_PER_BATCH = 100;
|
||||||
|
|
||||||
|
@BeforeClass
|
||||||
|
public static void setUpBeforeClass() throws Exception {
|
||||||
|
CONF1.setBoolean(HConstants.ENABLE_WAL_COMPRESSION, true);
|
||||||
|
TestReplicationBase.setUpBeforeClass();
|
||||||
|
}
|
||||||
|
|
||||||
|
@AfterClass
|
||||||
|
public static void tearDownAfterClass() throws Exception {
|
||||||
|
TestReplicationBase.tearDownAfterClass();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testMultiplePuts() throws Exception {
|
||||||
|
runMultiplePutTest();
|
||||||
|
}
|
||||||
|
|
||||||
|
protected static void runMultiplePutTest() throws IOException, InterruptedException {
|
||||||
|
for (int i = 0; i < NUM_BATCHES; i++) {
|
||||||
|
putBatch(i);
|
||||||
|
getBatch(i);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
protected static void getBatch(int batch) throws IOException, InterruptedException {
|
||||||
|
for (int i = 0; i < NUM_ROWS_PER_BATCH; i++) {
|
||||||
|
byte[] row = getRowKey(batch, i);
|
||||||
|
Get get = new Get(row);
|
||||||
|
for (int j = 0; j < NB_RETRIES; j++) {
|
||||||
|
if (j == NB_RETRIES - 1) {
|
||||||
|
fail("Waited too much time for replication");
|
||||||
|
}
|
||||||
|
Result res = htable2.get(get);
|
||||||
|
if (res.isEmpty()) {
|
||||||
|
LOG.info("Row not available");
|
||||||
|
Thread.sleep(SLEEP_TIME);
|
||||||
|
} else {
|
||||||
|
assertArrayEquals(row, res.value());
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
protected static byte[] getRowKey(int batch, int count) {
|
||||||
|
return Bytes.toBytes("row" + ((batch * NUM_ROWS_PER_BATCH) + count));
|
||||||
|
}
|
||||||
|
|
||||||
|
protected static void putBatch(int batch) throws IOException {
|
||||||
|
for (int i = 0; i < NUM_ROWS_PER_BATCH; i++) {
|
||||||
|
byte[] row = getRowKey(batch, i);
|
||||||
|
Put put = new Put(row);
|
||||||
|
put.addColumn(famName, row, row);
|
||||||
|
htable1.put(put);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,59 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.replication.regionserver;
|
||||||
|
|
||||||
|
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||||
|
import org.apache.hadoop.hbase.HConstants;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.wal.CompressionContext;
|
||||||
|
import org.apache.hadoop.hbase.replication.TestReplicationBase;
|
||||||
|
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||||
|
import org.junit.AfterClass;
|
||||||
|
import org.junit.BeforeClass;
|
||||||
|
import org.junit.ClassRule;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.junit.experimental.categories.Category;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
@Category(MediumTests.class)
|
||||||
|
public class TestReplicationValueCompressedWAL extends TestReplicationBase {
|
||||||
|
|
||||||
|
@ClassRule
|
||||||
|
public static final HBaseClassTestRule CLASS_RULE =
|
||||||
|
HBaseClassTestRule.forClass(TestReplicationValueCompressedWAL.class);
|
||||||
|
|
||||||
|
static final Logger LOG = LoggerFactory.getLogger(TestReplicationValueCompressedWAL.class);
|
||||||
|
|
||||||
|
@BeforeClass
|
||||||
|
public static void setUpBeforeClass() throws Exception {
|
||||||
|
CONF1.setBoolean(HConstants.ENABLE_WAL_COMPRESSION, true);
|
||||||
|
CONF1.setBoolean(CompressionContext.ENABLE_WAL_VALUE_COMPRESSION, true);
|
||||||
|
TestReplicationBase.setUpBeforeClass();
|
||||||
|
}
|
||||||
|
|
||||||
|
@AfterClass
|
||||||
|
public static void tearDownAfterClass() throws Exception {
|
||||||
|
TestReplicationBase.tearDownAfterClass();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testMultiplePuts() throws Exception {
|
||||||
|
TestReplicationCompressedWAL.runMultiplePutTest();
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
Loading…
Reference in New Issue