HBASE-20219 An error occurs when scanning with reversed=true and loadColumnFamiliesOnDemand=true

Signed-off-by: tedyu <yuzhihong@gmail.com>
This commit is contained in:
Toshihiro Suzuki 2018-03-22 13:57:00 +09:00 committed by tedyu
parent 35b4adf3ed
commit 072c503ec7
2 changed files with 105 additions and 69 deletions

View File

@ -22,13 +22,13 @@ import java.io.IOException;
import java.util.List; import java.util.List;
import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.PrivateCellUtil; import org.apache.hadoop.hbase.PrivateCellUtil;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.regionserver.HRegion.RegionScannerImpl; import org.apache.hadoop.hbase.regionserver.HRegion.RegionScannerImpl;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.yetus.audience.InterfaceAudience;
/** /**
* ReversibleRegionScannerImpl extends from RegionScannerImpl, and is used to * ReversibleRegionScannerImpl extends from RegionScannerImpl, and is used to
@ -53,8 +53,7 @@ class ReversedRegionScannerImpl extends RegionScannerImpl {
List<KeyValueScanner> joinedScanners, HRegion region) throws IOException { List<KeyValueScanner> joinedScanners, HRegion region) throws IOException {
this.storeHeap = new ReversedKeyValueHeap(scanners, comparator); this.storeHeap = new ReversedKeyValueHeap(scanners, comparator);
if (!joinedScanners.isEmpty()) { if (!joinedScanners.isEmpty()) {
this.joinedHeap = new ReversedKeyValueHeap(joinedScanners, throw new DoNotRetryIOException("Reverse scan with loading CFs on demand is not supported");
comparator);
} }
} }

View File

@ -21,23 +21,32 @@ import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Random; import java.util.Random;
import org.apache.hadoop.hbase.CompareOperator;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.MiniHBaseCluster;
import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.filter.CompareFilter; import org.apache.hadoop.hbase.filter.CompareFilter;
import org.apache.hadoop.hbase.filter.SingleColumnValueFilter; import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.RegionServerTests; import org.apache.hadoop.hbase.testclassification.RegionServerTests;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.ClassRule; import org.junit.ClassRule;
import org.junit.Rule; import org.junit.Rule;
import org.junit.Test; import org.junit.Test;
@ -66,13 +75,12 @@ public class TestJoinedScanners {
private static final Logger LOG = LoggerFactory.getLogger(TestJoinedScanners.class); private static final Logger LOG = LoggerFactory.getLogger(TestJoinedScanners.class);
private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
private static final String DIR = TEST_UTIL.getDataTestDir("TestJoinedScanners").toString();
private static final byte[] cf_essential = Bytes.toBytes("essential"); private static final byte[] cf_essential = Bytes.toBytes("essential");
private static final byte[] cf_joined = Bytes.toBytes("joined"); private static final byte[] cf_joined = Bytes.toBytes("joined");
private static final byte[] col_name = Bytes.toBytes("a"); private static final byte[] col_name = Bytes.toBytes("a");
private static final byte[] flag_yes = Bytes.toBytes("Y"); private static final byte[] flag_yes = Bytes.toBytes("Y");
private static final byte[] flag_no = Bytes.toBytes("N"); private static final byte[] flag_no = Bytes.toBytes("N");
private static DataBlockEncoding blockEncoding = DataBlockEncoding.FAST_DIFF; private static DataBlockEncoding blockEncoding = DataBlockEncoding.FAST_DIFF;
private static int selectionRatio = 30; private static int selectionRatio = 30;
@ -81,79 +89,78 @@ public class TestJoinedScanners {
@Rule @Rule
public TestName name = new TestName(); public TestName name = new TestName();
@BeforeClass
public static void setUpBeforeClass() throws Exception {
final int DEFAULT_BLOCK_SIZE = 1024 * 1024;
TEST_UTIL.getConfiguration().setLong("dfs.blocksize", DEFAULT_BLOCK_SIZE);
TEST_UTIL.getConfiguration().setInt("dfs.replication", 1);
TEST_UTIL.getConfiguration().setLong("hbase.hregion.max.filesize", 322122547200L);
String[] dataNodeHosts = new String[] {"host1", "host2", "host3"};
int regionServersCount = 3;
TEST_UTIL.startMiniCluster(1, regionServersCount, dataNodeHosts);
}
@AfterClass
public static void tearDownAfterClass() throws Exception {
TEST_UTIL.shutdownMiniCluster();
}
@Test @Test
public void testJoinedScanners() throws Exception { public void testJoinedScanners() throws Exception {
String dataNodeHosts[] = new String[] { "host1", "host2", "host3" }; byte[][] families = {cf_essential, cf_joined};
int regionServersCount = 3;
HBaseTestingUtility htu = new HBaseTestingUtility(); final TableName tableName = TableName.valueOf(name.getMethodName());
HTableDescriptor desc = new HTableDescriptor(tableName);
for (byte[] family : families) {
HColumnDescriptor hcd = new HColumnDescriptor(family);
hcd.setDataBlockEncoding(blockEncoding);
desc.addFamily(hcd);
}
TEST_UTIL.getAdmin().createTable(desc);
Table ht = TEST_UTIL.getConnection().getTable(tableName);
final int DEFAULT_BLOCK_SIZE = 1024*1024; long rows_to_insert = 1000;
htu.getConfiguration().setLong("dfs.blocksize", DEFAULT_BLOCK_SIZE); int insert_batch = 20;
htu.getConfiguration().setInt("dfs.replication", 1); long time = System.nanoTime();
htu.getConfiguration().setLong("hbase.hregion.max.filesize", 322122547200L); Random rand = new Random(time);
MiniHBaseCluster cluster = null;
try { LOG.info("Make " + Long.toString(rows_to_insert) + " rows, total size = " + Float
cluster = htu.startMiniCluster(1, regionServersCount, dataNodeHosts); .toString(rows_to_insert * valueWidth / 1024 / 1024) + " MB");
byte [][] families = {cf_essential, cf_joined};
final TableName tableName = TableName.valueOf(name.getMethodName()); byte[] val_large = new byte[valueWidth];
HTableDescriptor desc = new HTableDescriptor(tableName);
for(byte[] family : families) { List<Put> puts = new ArrayList<>();
HColumnDescriptor hcd = new HColumnDescriptor(family);
hcd.setDataBlockEncoding(blockEncoding); for (long i = 0; i < rows_to_insert; i++) {
desc.addFamily(hcd); Put put = new Put(Bytes.toBytes(Long.toString(i)));
if (rand.nextInt(100) <= selectionRatio) {
put.addColumn(cf_essential, col_name, flag_yes);
} else {
put.addColumn(cf_essential, col_name, flag_no);
} }
htu.getAdmin().createTable(desc); put.addColumn(cf_joined, col_name, val_large);
Table ht = htu.getConnection().getTable(tableName); puts.add(put);
if (puts.size() >= insert_batch) {
long rows_to_insert = 1000;
int insert_batch = 20;
long time = System.nanoTime();
Random rand = new Random(time);
LOG.info("Make " + Long.toString(rows_to_insert) + " rows, total size = "
+ Float.toString(rows_to_insert * valueWidth / 1024 / 1024) + " MB");
byte [] val_large = new byte[valueWidth];
List<Put> puts = new ArrayList<>();
for (long i = 0; i < rows_to_insert; i++) {
Put put = new Put(Bytes.toBytes(Long.toString (i)));
if (rand.nextInt(100) <= selectionRatio) {
put.addColumn(cf_essential, col_name, flag_yes);
} else {
put.addColumn(cf_essential, col_name, flag_no);
}
put.addColumn(cf_joined, col_name, val_large);
puts.add(put);
if (puts.size() >= insert_batch) {
ht.put(puts);
puts.clear();
}
}
if (!puts.isEmpty()) {
ht.put(puts); ht.put(puts);
puts.clear(); puts.clear();
} }
LOG.info("Data generated in "
+ Double.toString((System.nanoTime() - time) / 1000000000.0) + " seconds");
boolean slow = true;
for (int i = 0; i < 10; ++i) {
runScanner(ht, slow);
slow = !slow;
}
ht.close();
} finally {
if (cluster != null) {
htu.shutdownMiniCluster();
}
} }
if (!puts.isEmpty()) {
ht.put(puts);
puts.clear();
}
LOG.info("Data generated in "
+ Double.toString((System.nanoTime() - time) / 1000000000.0) + " seconds");
boolean slow = true;
for (int i = 0; i < 10; ++i) {
runScanner(ht, slow);
slow = !slow;
}
ht.close();
} }
private void runScanner(Table table, boolean slow) throws Exception { private void runScanner(Table table, boolean slow) throws Exception {
@ -224,4 +231,34 @@ public class TestJoinedScanners {
TestJoinedScanners test = new TestJoinedScanners(); TestJoinedScanners test = new TestJoinedScanners();
test.testJoinedScanners(); test.testJoinedScanners();
} }
@Test(expected = DoNotRetryIOException.class)
public void testWithReverseScan() throws Exception {
try (Connection con = TEST_UTIL.getConnection(); Admin admin = con.getAdmin()) {
TableName tableName = TableName.valueOf(name.getMethodName());
TableDescriptor tableDescriptor = TableDescriptorBuilder.newBuilder(tableName)
.setColumnFamily(ColumnFamilyDescriptorBuilder.of("cf1"))
.setColumnFamily(ColumnFamilyDescriptorBuilder.of("cf2"))
.build();
admin.createTable(tableDescriptor);
try (Table table = con.getTable(tableName)) {
SingleColumnValueFilter filter = new SingleColumnValueFilter(Bytes.toBytes("cf1"),
Bytes.toBytes("col"), CompareOperator.EQUAL, Bytes.toBytes("val"));
filter.setFilterIfMissing(true);
// Reverse scan with loading CFs on demand
Scan scan = new Scan();
scan.setFilter(filter);
scan.setReversed(true);
scan.setLoadColumnFamiliesOnDemand(true);
try (ResultScanner scanner = table.getScanner(scan)) {
// DoNotRetryIOException should occur
scanner.next();
}
}
}
}
} }