HBASE-18066: Get with closest_row_before on hbase:meta can return empty Cell during region merge/split
Signed-off-by: Andrew Purtell <apurtell@apache.org>
This commit is contained in:
parent
ea3075e7fd
commit
9c1efc9f9d
|
@ -136,6 +136,7 @@ import org.apache.hadoop.hbase.filter.ByteArrayComparable;
|
|||
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
|
||||
import org.apache.hadoop.hbase.filter.FilterWrapper;
|
||||
import org.apache.hadoop.hbase.filter.IncompatibleFilterException;
|
||||
import org.apache.hadoop.hbase.filter.PrefixFilter;
|
||||
import org.apache.hadoop.hbase.io.HFileLink;
|
||||
import org.apache.hadoop.hbase.io.HeapSize;
|
||||
import org.apache.hadoop.hbase.io.TimeRange;
|
||||
|
@ -2726,15 +2727,13 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
|||
startRegionOperation(Operation.GET);
|
||||
this.readRequestsCount.increment();
|
||||
try {
|
||||
Store store = getStore(family);
|
||||
// get the closest key. (HStore.getRowKeyAtOrBefore can return null)
|
||||
Cell key = store.getRowKeyAtOrBefore(row);
|
||||
Result result = null;
|
||||
if (key != null) {
|
||||
Get get = new Get(CellUtil.cloneRow(key));
|
||||
Get get = new Get(row);
|
||||
get.addFamily(family);
|
||||
get.setClosestRowBefore(true);
|
||||
result = get(get);
|
||||
}
|
||||
// for compatibility
|
||||
result = result.isEmpty() ? null : result;
|
||||
if (coprocessorHost != null) {
|
||||
coprocessorHost.postGetClosestRowBefore(row, family, result);
|
||||
}
|
||||
|
@ -7224,6 +7223,20 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
|||
return get(get, withCoprocessor, HConstants.NO_NONCE, HConstants.NO_NONCE);
|
||||
}
|
||||
|
||||
private Scan buildScanForGetWithClosestRowBefore(Get get) throws IOException {
|
||||
Scan scan = new Scan().withStartRow(get.getRow())
|
||||
.addFamily(get.getFamilyMap().keySet().iterator().next()).setReversed(true)
|
||||
.withStopRow(HConstants.EMPTY_END_ROW, false).setLimit(1);
|
||||
if (this.getRegionInfo().isMetaRegion()) {
|
||||
int delimiterIdx =
|
||||
KeyValue.getDelimiter(get.getRow(), 0, get.getRow().length, HConstants.DELIMITER);
|
||||
if (delimiterIdx >= 0) {
|
||||
scan.setFilter(new PrefixFilter(Bytes.copy(get.getRow(), 0, delimiterIdx + 1)));
|
||||
}
|
||||
}
|
||||
return scan;
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<Cell> get(Get get, boolean withCoprocessor, long nonceGroup, long nonce)
|
||||
throws IOException {
|
||||
|
@ -7236,7 +7249,12 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
|||
}
|
||||
}
|
||||
long before = EnvironmentEdgeManager.currentTime();
|
||||
Scan scan = new Scan(get);
|
||||
Scan scan;
|
||||
if (get.isClosestRowBefore()) {
|
||||
scan = buildScanForGetWithClosestRowBefore(get);
|
||||
} else {
|
||||
scan = new Scan(get);
|
||||
}
|
||||
|
||||
RegionScanner scanner = null;
|
||||
try {
|
||||
|
|
|
@ -0,0 +1,164 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.hbase.client;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.HColumnDescriptor;
|
||||
import org.apache.hadoop.hbase.HTableDescriptor;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.log4j.Logger;
|
||||
import org.junit.After;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Random;
|
||||
|
||||
@Category({ MediumTests.class })
|
||||
public class TestFromClientGetWithClosestRowBefore {
|
||||
|
||||
private static final Logger LOG = Logger.getLogger(TestFromClientGetWithClosestRowBefore.class);
|
||||
private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
|
||||
private static Configuration CONF;
|
||||
private static final TableName TEST_TABLE = TableName.valueOf("test_table");
|
||||
private static final byte[] COLUMN_FAMILY = Bytes.toBytes("f1");
|
||||
private static final Random RANDOM = new Random();
|
||||
|
||||
@BeforeClass
|
||||
public static void setup() throws Exception {
|
||||
CONF = UTIL.getConfiguration();
|
||||
UTIL.startMiniCluster();
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void teardown() throws Exception {
|
||||
UTIL.shutdownMiniCluster();
|
||||
}
|
||||
|
||||
@Before
|
||||
public void setUp() throws Exception {
|
||||
HTableDescriptor htd = new HTableDescriptor(TEST_TABLE);
|
||||
HColumnDescriptor hcd = new HColumnDescriptor(COLUMN_FAMILY);
|
||||
htd.addFamily(hcd);
|
||||
|
||||
UTIL.getHBaseAdmin().createTable(htd);
|
||||
}
|
||||
|
||||
@After
|
||||
public void tearDown() throws Exception {
|
||||
for (HTableDescriptor htd : UTIL.getHBaseAdmin().listTables()) {
|
||||
UTIL.deleteTable(htd.getTableName());
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetWithClosestRowBeforeWhenSplitRegion() throws Exception {
|
||||
Thread t = new Thread() {
|
||||
public void run() {
|
||||
try {
|
||||
Thread.sleep(100);
|
||||
UTIL.getHBaseAdmin().split(TEST_TABLE);
|
||||
} catch (Exception e) {
|
||||
LOG.error("split region failed: ", e);
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
try (Connection conn = ConnectionFactory.createConnection(CONF)) {
|
||||
try (Table table = conn.getTable(TEST_TABLE)) {
|
||||
for (int i = 0; i < 1000; i++) {
|
||||
byte[] data = Bytes.toBytes(String.format("%026d", i));
|
||||
Put put = new Put(data).addColumn(COLUMN_FAMILY, null, data);
|
||||
table.put(put);
|
||||
}
|
||||
}
|
||||
try (Table table = conn.getTable(TableName.META_TABLE_NAME)) {
|
||||
t.start();
|
||||
for (int i = 0; i < 10000; i++) {
|
||||
Get get = new Get(Bytes.toBytes(TEST_TABLE + ",,:")).addFamily(Bytes.toBytes("info"))
|
||||
.setClosestRowBefore(true);
|
||||
Result result = table.get(get);
|
||||
if (Result.getTotalSizeOfCells(result) == 0) {
|
||||
Assert.fail("Get with closestRowBefore return NONE result.");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testClosestRowIsLatestPutRow() throws IOException {
|
||||
final int[] initialRowkeys = new int[] { 1, 1000 };
|
||||
|
||||
Thread t = new Thread() {
|
||||
public void run() {
|
||||
try {
|
||||
// a huge value to slow down transaction committing.
|
||||
byte[] value = new byte[512 * 1024];
|
||||
for (int i = 0; i < value.length; i++) {
|
||||
value[i] = (byte) RANDOM.nextInt(256);
|
||||
}
|
||||
|
||||
// Put rowKey= 2,3,4,...,(initialRowkeys[1]-1) into table, let the rowkey returned by a
|
||||
// Get with closestRowBefore to be exactly the latest put rowkey.
|
||||
try (Connection conn = ConnectionFactory.createConnection(CONF)) {
|
||||
try (Table table = conn.getTable(TEST_TABLE)) {
|
||||
for (int i = initialRowkeys[0] + 1; i < initialRowkeys[1]; i++) {
|
||||
byte[] data = Bytes.toBytes(String.format("%026d", i));
|
||||
Put put = new Put(data).addColumn(COLUMN_FAMILY, null, value);
|
||||
table.put(put);
|
||||
}
|
||||
}
|
||||
}
|
||||
} catch (Exception e) {
|
||||
LOG.error("Put huge value into table failed: ", e);
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
try (Connection conn = ConnectionFactory.createConnection(CONF)) {
|
||||
try (Table table = conn.getTable(TEST_TABLE)) {
|
||||
|
||||
// Put the boundary into table firstly.
|
||||
for (int i = 0; i < initialRowkeys.length; i++) {
|
||||
byte[] rowKey = Bytes.toBytes(String.format("%026d", initialRowkeys[i]));
|
||||
Put put = new Put(rowKey).addColumn(COLUMN_FAMILY, null, rowKey);
|
||||
table.put(put);
|
||||
}
|
||||
|
||||
t.start();
|
||||
byte[] rowKey = Bytes.toBytes(String.format("%026d", initialRowkeys[1] - 1));
|
||||
for (int i = 0; i < 1000; i++) {
|
||||
Get get = new Get(rowKey).addFamily(COLUMN_FAMILY).setClosestRowBefore(true);
|
||||
Result result = table.get(get);
|
||||
if (Result.getTotalSizeOfCells(result) == 0) {
|
||||
Assert.fail("Get with closestRowBefore return NONE result.");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue