diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedRegionScannerImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedRegionScannerImpl.java index 0ae8fac43b7..3ca064f0510 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedRegionScannerImpl.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedRegionScannerImpl.java @@ -22,13 +22,13 @@ import java.io.IOException; import java.util.List; import org.apache.hadoop.hbase.Cell; -import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.PrivateCellUtil; -import org.apache.yetus.audience.InterfaceAudience; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.regionserver.HRegion.RegionScannerImpl; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.yetus.audience.InterfaceAudience; /** * ReversibleRegionScannerImpl extends from RegionScannerImpl, and is used to @@ -53,8 +53,7 @@ class ReversedRegionScannerImpl extends RegionScannerImpl { List joinedScanners, HRegion region) throws IOException { this.storeHeap = new ReversedKeyValueHeap(scanners, comparator); if (!joinedScanners.isEmpty()) { - this.joinedHeap = new ReversedKeyValueHeap(joinedScanners, - comparator); + throw new DoNotRetryIOException("Reverse scan with loading CFs on demand is not supported"); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestJoinedScanners.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestJoinedScanners.java index 3434d932e60..3e23eeaba75 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestJoinedScanners.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestJoinedScanners.java @@ -21,23 +21,32 @@ import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.Random; + +import org.apache.hadoop.hbase.CompareOperator; +import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HTableDescriptor; -import org.apache.hadoop.hbase.MiniHBaseCluster; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; +import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.client.TableDescriptor; +import org.apache.hadoop.hbase.client.TableDescriptorBuilder; import org.apache.hadoop.hbase.filter.CompareFilter; import org.apache.hadoop.hbase.filter.SingleColumnValueFilter; import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.testclassification.RegionServerTests; import org.apache.hadoop.hbase.util.Bytes; +import org.junit.AfterClass; +import org.junit.BeforeClass; import org.junit.ClassRule; import org.junit.Rule; import org.junit.Test; @@ -66,13 +75,12 @@ public class TestJoinedScanners { private static final Logger LOG = LoggerFactory.getLogger(TestJoinedScanners.class); private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); - private static final String DIR = TEST_UTIL.getDataTestDir("TestJoinedScanners").toString(); private static final byte[] cf_essential = Bytes.toBytes("essential"); private static final byte[] cf_joined = Bytes.toBytes("joined"); private static final byte[] col_name = Bytes.toBytes("a"); private static final byte[] flag_yes = Bytes.toBytes("Y"); - private static final byte[] flag_no = Bytes.toBytes("N"); + private static final byte[] flag_no = Bytes.toBytes("N"); private static DataBlockEncoding blockEncoding = DataBlockEncoding.FAST_DIFF; private static int selectionRatio = 30; @@ -81,79 +89,78 @@ public class TestJoinedScanners { @Rule public TestName name = new TestName(); + @BeforeClass + public static void setUpBeforeClass() throws Exception { + final int DEFAULT_BLOCK_SIZE = 1024 * 1024; + TEST_UTIL.getConfiguration().setLong("dfs.blocksize", DEFAULT_BLOCK_SIZE); + TEST_UTIL.getConfiguration().setInt("dfs.replication", 1); + TEST_UTIL.getConfiguration().setLong("hbase.hregion.max.filesize", 322122547200L); + + String[] dataNodeHosts = new String[] {"host1", "host2", "host3"}; + int regionServersCount = 3; + TEST_UTIL.startMiniCluster(1, regionServersCount, dataNodeHosts); + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + TEST_UTIL.shutdownMiniCluster(); + } + @Test public void testJoinedScanners() throws Exception { - String dataNodeHosts[] = new String[] { "host1", "host2", "host3" }; - int regionServersCount = 3; + byte[][] families = {cf_essential, cf_joined}; - HBaseTestingUtility htu = new HBaseTestingUtility(); + final TableName tableName = TableName.valueOf(name.getMethodName()); + HTableDescriptor desc = new HTableDescriptor(tableName); + for (byte[] family : families) { + HColumnDescriptor hcd = new HColumnDescriptor(family); + hcd.setDataBlockEncoding(blockEncoding); + desc.addFamily(hcd); + } + TEST_UTIL.getAdmin().createTable(desc); + Table ht = TEST_UTIL.getConnection().getTable(tableName); - final int DEFAULT_BLOCK_SIZE = 1024*1024; - htu.getConfiguration().setLong("dfs.blocksize", DEFAULT_BLOCK_SIZE); - htu.getConfiguration().setInt("dfs.replication", 1); - htu.getConfiguration().setLong("hbase.hregion.max.filesize", 322122547200L); - MiniHBaseCluster cluster = null; + long rows_to_insert = 1000; + int insert_batch = 20; + long time = System.nanoTime(); + Random rand = new Random(time); - try { - cluster = htu.startMiniCluster(1, regionServersCount, dataNodeHosts); - byte [][] families = {cf_essential, cf_joined}; + LOG.info("Make " + Long.toString(rows_to_insert) + " rows, total size = " + Float + .toString(rows_to_insert * valueWidth / 1024 / 1024) + " MB"); - final TableName tableName = TableName.valueOf(name.getMethodName()); - HTableDescriptor desc = new HTableDescriptor(tableName); - for(byte[] family : families) { - HColumnDescriptor hcd = new HColumnDescriptor(family); - hcd.setDataBlockEncoding(blockEncoding); - desc.addFamily(hcd); + byte[] val_large = new byte[valueWidth]; + + List puts = new ArrayList<>(); + + for (long i = 0; i < rows_to_insert; i++) { + Put put = new Put(Bytes.toBytes(Long.toString(i))); + if (rand.nextInt(100) <= selectionRatio) { + put.addColumn(cf_essential, col_name, flag_yes); + } else { + put.addColumn(cf_essential, col_name, flag_no); } - htu.getAdmin().createTable(desc); - Table ht = htu.getConnection().getTable(tableName); - - long rows_to_insert = 1000; - int insert_batch = 20; - long time = System.nanoTime(); - Random rand = new Random(time); - - LOG.info("Make " + Long.toString(rows_to_insert) + " rows, total size = " - + Float.toString(rows_to_insert * valueWidth / 1024 / 1024) + " MB"); - - byte [] val_large = new byte[valueWidth]; - - List puts = new ArrayList<>(); - - for (long i = 0; i < rows_to_insert; i++) { - Put put = new Put(Bytes.toBytes(Long.toString (i))); - if (rand.nextInt(100) <= selectionRatio) { - put.addColumn(cf_essential, col_name, flag_yes); - } else { - put.addColumn(cf_essential, col_name, flag_no); - } - put.addColumn(cf_joined, col_name, val_large); - puts.add(put); - if (puts.size() >= insert_batch) { - ht.put(puts); - puts.clear(); - } - } - if (!puts.isEmpty()) { + put.addColumn(cf_joined, col_name, val_large); + puts.add(put); + if (puts.size() >= insert_batch) { ht.put(puts); puts.clear(); } - - LOG.info("Data generated in " - + Double.toString((System.nanoTime() - time) / 1000000000.0) + " seconds"); - - boolean slow = true; - for (int i = 0; i < 10; ++i) { - runScanner(ht, slow); - slow = !slow; - } - - ht.close(); - } finally { - if (cluster != null) { - htu.shutdownMiniCluster(); - } } + if (!puts.isEmpty()) { + ht.put(puts); + puts.clear(); + } + + LOG.info("Data generated in " + + Double.toString((System.nanoTime() - time) / 1000000000.0) + " seconds"); + + boolean slow = true; + for (int i = 0; i < 10; ++i) { + runScanner(ht, slow); + slow = !slow; + } + + ht.close(); } private void runScanner(Table table, boolean slow) throws Exception { @@ -224,4 +231,34 @@ public class TestJoinedScanners { TestJoinedScanners test = new TestJoinedScanners(); test.testJoinedScanners(); } + + @Test(expected = DoNotRetryIOException.class) + public void testWithReverseScan() throws Exception { + try (Connection con = TEST_UTIL.getConnection(); Admin admin = con.getAdmin()) { + TableName tableName = TableName.valueOf(name.getMethodName()); + + TableDescriptor tableDescriptor = TableDescriptorBuilder.newBuilder(tableName) + .setColumnFamily(ColumnFamilyDescriptorBuilder.of("cf1")) + .setColumnFamily(ColumnFamilyDescriptorBuilder.of("cf2")) + .build(); + admin.createTable(tableDescriptor); + + try (Table table = con.getTable(tableName)) { + SingleColumnValueFilter filter = new SingleColumnValueFilter(Bytes.toBytes("cf1"), + Bytes.toBytes("col"), CompareOperator.EQUAL, Bytes.toBytes("val")); + filter.setFilterIfMissing(true); + + // Reverse scan with loading CFs on demand + Scan scan = new Scan(); + scan.setFilter(filter); + scan.setReversed(true); + scan.setLoadColumnFamiliesOnDemand(true); + + try (ResultScanner scanner = table.getScanner(scan)) { + // DoNotRetryIOException should occur + scanner.next(); + } + } + } + } }