HBASE-24515 batch Increment/Append fails when retrying the RPC (#1864)
Signed-off-by: Viraj Jasani <virajjasani007@gmail.com>
This commit is contained in:
parent
f2fde77fc3
commit
022dd9687f
|
@ -970,60 +970,6 @@ public final class ProtobufUtil {
|
|||
throw new IOException("Unknown mutation type " + type);
|
||||
}
|
||||
|
||||
/**
|
||||
* Convert a protocol buffer Mutate to a Get.
|
||||
* @param proto the protocol buffer Mutate to convert.
|
||||
* @param cellScanner
|
||||
* @return the converted client get.
|
||||
* @throws IOException
|
||||
*/
|
||||
public static Get toGet(final MutationProto proto, final CellScanner cellScanner)
|
||||
throws IOException {
|
||||
MutationType type = proto.getMutateType();
|
||||
assert type == MutationType.INCREMENT || type == MutationType.APPEND : type.name();
|
||||
byte[] row = proto.hasRow() ? proto.getRow().toByteArray() : null;
|
||||
Get get = null;
|
||||
int cellCount = proto.hasAssociatedCellCount() ? proto.getAssociatedCellCount() : 0;
|
||||
if (cellCount > 0) {
|
||||
// The proto has metadata only and the data is separate to be found in the cellScanner.
|
||||
if (cellScanner == null) {
|
||||
throw new DoNotRetryIOException("Cell count of " + cellCount + " but no cellScanner: "
|
||||
+ TextFormat.shortDebugString(proto));
|
||||
}
|
||||
for (int i = 0; i < cellCount; i++) {
|
||||
if (!cellScanner.advance()) {
|
||||
throw new DoNotRetryIOException("Cell count of " + cellCount + " but at index " + i
|
||||
+ " no cell returned: " + TextFormat.shortDebugString(proto));
|
||||
}
|
||||
Cell cell = cellScanner.current();
|
||||
if (get == null) {
|
||||
get = new Get(CellUtil.cloneRow(cell));
|
||||
}
|
||||
get.addColumn(CellUtil.cloneFamily(cell), CellUtil.cloneQualifier(cell));
|
||||
}
|
||||
} else {
|
||||
get = new Get(row);
|
||||
for (ColumnValue column : proto.getColumnValueList()) {
|
||||
byte[] family = column.getFamily().toByteArray();
|
||||
for (QualifierValue qv : column.getQualifierValueList()) {
|
||||
byte[] qualifier = qv.getQualifier().toByteArray();
|
||||
if (!qv.hasValue()) {
|
||||
throw new DoNotRetryIOException("Missing required field: qualifier value");
|
||||
}
|
||||
get.addColumn(family, qualifier);
|
||||
}
|
||||
}
|
||||
}
|
||||
if (proto.hasTimeRange()) {
|
||||
TimeRange timeRange = toTimeRange(proto.getTimeRange());
|
||||
get.setTimeRange(timeRange.getMin(), timeRange.getMax());
|
||||
}
|
||||
for (NameBytesPair attribute : proto.getAttributeList()) {
|
||||
get.setAttribute(attribute.getName(), attribute.getValue().toByteArray());
|
||||
}
|
||||
return get;
|
||||
}
|
||||
|
||||
public static ClientProtos.Scan.ReadType toReadType(Scan.ReadType readType) {
|
||||
switch (readType) {
|
||||
case DEFAULT:
|
||||
|
|
|
@ -724,8 +724,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
|
|||
r = region.append(append, nonceGroup, nonce);
|
||||
} else {
|
||||
// convert duplicate append to get
|
||||
List<Cell> results = region.get(ProtobufUtil.toGet(mutation, cellScanner), false,
|
||||
nonceGroup, nonce);
|
||||
List<Cell> results = region.get(toGet(append), false, nonceGroup, nonce);
|
||||
r = Result.create(results);
|
||||
}
|
||||
success = true;
|
||||
|
@ -771,8 +770,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
|
|||
r = region.increment(increment, nonceGroup, nonce);
|
||||
} else {
|
||||
// convert duplicate increment to get
|
||||
List<Cell> results = region.get(ProtobufUtil.toGet(mutation, cells), false, nonceGroup,
|
||||
nonce);
|
||||
List<Cell> results = region.get(toGet(increment), false, nonceGroup, nonce);
|
||||
r = Result.create(results);
|
||||
}
|
||||
success = true;
|
||||
|
@ -794,6 +792,31 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
|
|||
return r == null ? Result.EMPTY_RESULT : r;
|
||||
}
|
||||
|
||||
private static Get toGet(final Mutation mutation) throws IOException {
|
||||
if(!(mutation instanceof Increment) && !(mutation instanceof Append)) {
|
||||
throw new AssertionError("mutation must be a instance of Increment or Append");
|
||||
}
|
||||
Get get = new Get(mutation.getRow());
|
||||
CellScanner cellScanner = mutation.cellScanner();
|
||||
while (!cellScanner.advance()) {
|
||||
Cell cell = cellScanner.current();
|
||||
get.addColumn(CellUtil.cloneFamily(cell), CellUtil.cloneQualifier(cell));
|
||||
}
|
||||
if (mutation instanceof Increment) {
|
||||
// Increment
|
||||
Increment increment = (Increment) mutation;
|
||||
get.setTimeRange(increment.getTimeRange().getMin(), increment.getTimeRange().getMax());
|
||||
} else {
|
||||
// Append
|
||||
Append append = (Append) mutation;
|
||||
get.setTimeRange(append.getTimeRange().getMin(), append.getTimeRange().getMax());
|
||||
}
|
||||
for (Entry<String, byte[]> entry : mutation.getAttributesMap().entrySet()) {
|
||||
get.setAttribute(entry.getKey(), entry.getValue());
|
||||
}
|
||||
return get;
|
||||
}
|
||||
|
||||
/**
|
||||
* Run through the regionMutation <code>rm</code> and per Mutation, do the work, and then when
|
||||
* done, add an instance of a {@link ResultOrException} that corresponds to each Mutation.
|
||||
|
|
|
@ -29,6 +29,7 @@ import java.io.IOException;
|
|||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
@ -38,10 +39,8 @@ import org.apache.hadoop.hbase.CellUtil;
|
|||
import org.apache.hadoop.hbase.CompareOperator;
|
||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.HColumnDescriptor;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.HRegionLocation;
|
||||
import org.apache.hadoop.hbase.HTableDescriptor;
|
||||
import org.apache.hadoop.hbase.KeepDeletedCells;
|
||||
import org.apache.hadoop.hbase.PrivateCellUtil;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
|
@ -114,13 +113,18 @@ public class TestFromClientSide extends FromClientSideBase {
|
|||
*/
|
||||
@Test
|
||||
public void testDuplicateAppend() throws Exception {
|
||||
HTableDescriptor hdt = TEST_UTIL
|
||||
.createTableDescriptor(name.getTableName(), HColumnDescriptor.DEFAULT_MIN_VERSIONS, 3,
|
||||
HConstants.FOREVER, HColumnDescriptor.DEFAULT_KEEP_DELETED);
|
||||
TableDescriptorBuilder.ModifyableTableDescriptor mtd = TEST_UTIL
|
||||
.createModifyableTableDescriptor(name.getTableName(),
|
||||
ColumnFamilyDescriptorBuilder.DEFAULT_MIN_VERSIONS, 3, HConstants.FOREVER,
|
||||
ColumnFamilyDescriptorBuilder.DEFAULT_KEEP_DELETED);
|
||||
Map<String, String> kvs = new HashMap<>();
|
||||
kvs.put(SleepAtFirstRpcCall.SLEEP_TIME_CONF_KEY, "2000");
|
||||
hdt.addCoprocessor(SleepAtFirstRpcCall.class.getName(), null, 1, kvs);
|
||||
TEST_UTIL.createTable(hdt, new byte[][] { ROW }).close();
|
||||
mtd.setCoprocessor(CoprocessorDescriptorBuilder
|
||||
.newBuilder(SleepAtFirstRpcCall.class.getName())
|
||||
.setPriority(1)
|
||||
.setProperties(kvs)
|
||||
.build());
|
||||
TEST_UTIL.createTable(mtd, new byte[][] { ROW }).close();
|
||||
|
||||
Configuration c = new Configuration(TEST_UTIL.getConfiguration());
|
||||
c.setInt(HConstants.HBASE_CLIENT_PAUSE, 50);
|
||||
|
@ -147,6 +151,52 @@ public class TestFromClientSide extends FromClientSideBase {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Test batch append result when there are duplicate rpc request.
|
||||
*/
|
||||
@Test
|
||||
public void testDuplicateBatchAppend() throws Exception {
|
||||
TableDescriptorBuilder.ModifyableTableDescriptor mtd = TEST_UTIL
|
||||
.createModifyableTableDescriptor(name.getTableName(),
|
||||
ColumnFamilyDescriptorBuilder.DEFAULT_MIN_VERSIONS, 3, HConstants.FOREVER,
|
||||
ColumnFamilyDescriptorBuilder.DEFAULT_KEEP_DELETED);
|
||||
Map<String, String> kvs = new HashMap<>();
|
||||
kvs.put(SleepAtFirstRpcCall.SLEEP_TIME_CONF_KEY, "2000");
|
||||
mtd.setCoprocessor(CoprocessorDescriptorBuilder
|
||||
.newBuilder(SleepAtFirstRpcCall.class.getName())
|
||||
.setPriority(1)
|
||||
.setProperties(kvs)
|
||||
.build());
|
||||
TEST_UTIL.createTable(mtd, new byte[][] { ROW }).close();
|
||||
|
||||
Configuration c = new Configuration(TEST_UTIL.getConfiguration());
|
||||
c.setInt(HConstants.HBASE_CLIENT_PAUSE, 50);
|
||||
// Client will retry because rpc timeout is small than the sleep time of first rpc call
|
||||
c.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, 1500);
|
||||
|
||||
try (Connection connection = ConnectionFactory.createConnection(c);
|
||||
Table table = connection.getTableBuilder(name.getTableName(), null).
|
||||
setOperationTimeout(3 * 1000).build()) {
|
||||
Append append = new Append(ROW);
|
||||
append.addColumn(HBaseTestingUtility.fam1, QUALIFIER, VALUE);
|
||||
|
||||
// Batch append
|
||||
Object[] results = new Object[1];
|
||||
table.batch(Collections.singletonList(append), results);
|
||||
|
||||
// Verify expected result
|
||||
Cell[] cells = ((Result) results[0]).rawCells();
|
||||
assertEquals(1, cells.length);
|
||||
assertKey(cells[0], ROW, HBaseTestingUtility.fam1, QUALIFIER, VALUE);
|
||||
|
||||
// Verify expected result again
|
||||
Result readResult = table.get(new Get(ROW));
|
||||
cells = readResult.rawCells();
|
||||
assertEquals(1, cells.length);
|
||||
assertKey(cells[0], ROW, HBaseTestingUtility.fam1, QUALIFIER, VALUE);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Basic client side validation of HBASE-4536
|
||||
*/
|
||||
|
|
|
@ -25,6 +25,7 @@ import static org.junit.Assert.fail;
|
|||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
@ -36,7 +37,6 @@ import org.apache.hadoop.hbase.ExtendedCellBuilderFactory;
|
|||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.HTableDescriptor;
|
||||
import org.apache.hadoop.hbase.KeyValue;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
|
||||
|
@ -101,11 +101,16 @@ public class TestIncrementsFromClientSide {
|
|||
*/
|
||||
@Test
|
||||
public void testDuplicateIncrement() throws Exception {
|
||||
HTableDescriptor hdt = TEST_UTIL.createTableDescriptor(TableName.valueOf(name.getMethodName()));
|
||||
TableDescriptorBuilder.ModifyableTableDescriptor mtd =
|
||||
TEST_UTIL.createModifyableTableDescriptor(name.getMethodName());
|
||||
Map<String, String> kvs = new HashMap<>();
|
||||
kvs.put(SleepAtFirstRpcCall.SLEEP_TIME_CONF_KEY, "2000");
|
||||
hdt.addCoprocessor(SleepAtFirstRpcCall.class.getName(), null, 1, kvs);
|
||||
TEST_UTIL.createTable(hdt, new byte[][] { ROW }).close();
|
||||
mtd.setCoprocessor(CoprocessorDescriptorBuilder
|
||||
.newBuilder(SleepAtFirstRpcCall.class.getName())
|
||||
.setPriority(1)
|
||||
.setProperties(kvs)
|
||||
.build());
|
||||
TEST_UTIL.createTable(mtd, new byte[][] { ROW }).close();
|
||||
|
||||
Configuration c = new Configuration(TEST_UTIL.getConfiguration());
|
||||
c.setInt(HConstants.HBASE_CLIENT_PAUSE, 50);
|
||||
|
@ -131,6 +136,49 @@ public class TestIncrementsFromClientSide {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Test batch increment result when there are duplicate rpc request.
|
||||
*/
|
||||
@Test
|
||||
public void testDuplicateBatchIncrement() throws Exception {
|
||||
TableDescriptorBuilder.ModifyableTableDescriptor mtd =
|
||||
TEST_UTIL.createModifyableTableDescriptor(name.getMethodName());
|
||||
Map<String, String> kvs = new HashMap<>();
|
||||
kvs.put(SleepAtFirstRpcCall.SLEEP_TIME_CONF_KEY, "2000");
|
||||
mtd.setCoprocessor(CoprocessorDescriptorBuilder
|
||||
.newBuilder(SleepAtFirstRpcCall.class.getName())
|
||||
.setPriority(1)
|
||||
.setProperties(kvs)
|
||||
.build());
|
||||
TEST_UTIL.createTable(mtd, new byte[][] { ROW }).close();
|
||||
|
||||
Configuration c = new Configuration(TEST_UTIL.getConfiguration());
|
||||
c.setInt(HConstants.HBASE_CLIENT_PAUSE, 50);
|
||||
// Client will retry beacuse rpc timeout is small than the sleep time of first rpc call
|
||||
c.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, 1500);
|
||||
|
||||
try (Connection connection = ConnectionFactory.createConnection(c);
|
||||
Table table = connection.getTableBuilder(TableName.valueOf(name.getMethodName()), null)
|
||||
.setOperationTimeout(3 * 1000).build()) {
|
||||
Increment inc = new Increment(ROW);
|
||||
inc.addColumn(HBaseTestingUtility.fam1, QUALIFIER, 1);
|
||||
|
||||
// Batch increment
|
||||
Object[] results = new Object[1];
|
||||
table.batch(Collections.singletonList(inc), results);
|
||||
|
||||
Cell[] cells = ((Result) results[0]).rawCells();
|
||||
assertEquals(1, cells.length);
|
||||
assertIncrementKey(cells[0], ROW, HBaseTestingUtility.fam1, QUALIFIER, 1);
|
||||
|
||||
// Verify expected result
|
||||
Result readResult = table.get(new Get(ROW));
|
||||
cells = readResult.rawCells();
|
||||
assertEquals(1, cells.length);
|
||||
assertIncrementKey(cells[0], ROW, HBaseTestingUtility.fam1, QUALIFIER, 1);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testIncrementWithDeletes() throws Exception {
|
||||
LOG.info("Starting " + this.name.getMethodName());
|
||||
|
|
Loading…
Reference in New Issue