HBASE-18522 Add RowMutations support to Batch

This commit is contained in:
Jerry He 2017-08-14 09:28:49 -07:00
parent add9974515
commit cf050de917
6 changed files with 168 additions and 23 deletions

View File

@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.client;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@ -93,22 +94,54 @@ class MultiServerCallable extends CancellableRegionServerCallable<MultiResponse>
RegionAction.Builder regionActionBuilder = RegionAction.newBuilder(); RegionAction.Builder regionActionBuilder = RegionAction.newBuilder();
ClientProtos.Action.Builder actionBuilder = ClientProtos.Action.newBuilder(); ClientProtos.Action.Builder actionBuilder = ClientProtos.Action.newBuilder();
MutationProto.Builder mutationBuilder = MutationProto.newBuilder(); MutationProto.Builder mutationBuilder = MutationProto.newBuilder();
List<CellScannable> cells = null;
// The multi object is a list of Actions by region. Iterate by region. // Pre-size. Presume at least a KV per Action. There are likely more.
List<CellScannable> cells =
(this.cellBlock ? new ArrayList<CellScannable>(countOfActions) : null);
long nonceGroup = multiAction.getNonceGroup(); long nonceGroup = multiAction.getNonceGroup();
if (nonceGroup != HConstants.NO_NONCE) { if (nonceGroup != HConstants.NO_NONCE) {
multiRequestBuilder.setNonceGroup(nonceGroup); multiRequestBuilder.setNonceGroup(nonceGroup);
} }
// Index to track RegionAction within the MultiRequest
int regionActionIndex = -1;
// Map from a created RegionAction to the original index for a RowMutations within
// its original list of actions
Map<Integer, Integer> rowMutationsIndexMap = new HashMap<>();
// The multi object is a list of Actions by region. Iterate by region.
for (Map.Entry<byte[], List<Action>> e: this.multiAction.actions.entrySet()) { for (Map.Entry<byte[], List<Action>> e: this.multiAction.actions.entrySet()) {
final byte [] regionName = e.getKey(); final byte [] regionName = e.getKey();
final List<Action> actions = e.getValue(); final List<Action> actions = e.getValue();
regionActionBuilder.clear(); regionActionBuilder.clear();
regionActionBuilder.setRegion(RequestConverter.buildRegionSpecifier( regionActionBuilder.setRegion(RequestConverter.buildRegionSpecifier(
HBaseProtos.RegionSpecifier.RegionSpecifierType.REGION_NAME, regionName)); HBaseProtos.RegionSpecifier.RegionSpecifierType.REGION_NAME, regionName));
int rowMutations = 0;
for (Action action : actions) {
Row row = action.getAction();
// Row Mutations are a set of Puts and/or Deletes all to be applied atomically
// on the one row. We do separate RegionAction for each RowMutations.
// We maintain a map to keep track of this RegionAction and the original Action index.
if (row instanceof RowMutations) {
RowMutations rms = (RowMutations)row;
if (this.cellBlock) { if (this.cellBlock) {
// Pre-size. Presume at least a KV per Action. There are likely more. // Build a multi request absent its Cell payload. Send data in cellblocks.
if (cells == null) cells = new ArrayList<>(countOfActions); regionActionBuilder = RequestConverter.buildNoDataRegionAction(regionName, rms, cells,
// Send data in cellblocks. The call to buildNoDataMultiRequest will skip RowMutations. regionActionBuilder, actionBuilder, mutationBuilder);
} else {
regionActionBuilder = RequestConverter.buildRegionAction(regionName, rms);
}
regionActionBuilder.setAtomic(true);
multiRequestBuilder.addRegionAction(regionActionBuilder.build());
regionActionIndex++;
rowMutationsIndexMap.put(regionActionIndex, action.getOriginalIndex());
rowMutations++;
}
}
if (actions.size() > rowMutations) {
if (this.cellBlock) {
// Send data in cellblocks. The call to buildNoDataRegionAction will skip RowMutations.
// They have already been handled above. Guess at count of cells // They have already been handled above. Guess at count of cells
regionActionBuilder = RequestConverter.buildNoDataRegionAction(regionName, actions, cells, regionActionBuilder = RequestConverter.buildNoDataRegionAction(regionName, actions, cells,
regionActionBuilder, actionBuilder, mutationBuilder); regionActionBuilder, actionBuilder, mutationBuilder);
@ -117,6 +150,8 @@ class MultiServerCallable extends CancellableRegionServerCallable<MultiResponse>
regionActionBuilder, actionBuilder, mutationBuilder); regionActionBuilder, actionBuilder, mutationBuilder);
} }
multiRequestBuilder.addRegionAction(regionActionBuilder.build()); multiRequestBuilder.addRegionAction(regionActionBuilder.build());
regionActionIndex++;
}
} }
if (cells != null) { if (cells != null) {
@ -125,7 +160,8 @@ class MultiServerCallable extends CancellableRegionServerCallable<MultiResponse>
ClientProtos.MultiRequest requestProto = multiRequestBuilder.build(); ClientProtos.MultiRequest requestProto = multiRequestBuilder.build();
ClientProtos.MultiResponse responseProto = getStub().multi(getRpcController(), requestProto); ClientProtos.MultiResponse responseProto = getStub().multi(getRpcController(), requestProto);
if (responseProto == null) return null; // Occurs on cancel if (responseProto == null) return null; // Occurs on cancel
return ResponseConverter.getResults(requestProto, responseProto, getRpcControllerCellScanner()); return ResponseConverter.getResults(requestProto, rowMutationsIndexMap, responseProto,
getRpcControllerCellScanner());
} }
/** /**

View File

@ -111,12 +111,12 @@ public interface Table extends Closeable {
boolean[] existsAll(List<Get> gets) throws IOException; boolean[] existsAll(List<Get> gets) throws IOException;
/** /**
* Method that does a batch call on Deletes, Gets, Puts, Increments and Appends. * Method that does a batch call on Deletes, Gets, Puts, Increments, Appends, RowMutations.
* The ordering of execution of the actions is not defined. Meaning if you do a Put and a * The ordering of execution of the actions is not defined. Meaning if you do a Put and a
* Get in the same {@link #batch} call, you will not necessarily be * Get in the same {@link #batch} call, you will not necessarily be
* guaranteed that the Get returns what the Put had put. * guaranteed that the Get returns what the Put had put.
* *
* @param actions list of Get, Put, Delete, Increment, Append objects * @param actions list of Get, Put, Delete, Increment, Append, RowMutations.
* @param results Empty Object[], same size as actions. Provides access to partial * @param results Empty Object[], same size as actions. Provides access to partial
* results, in case an exception is thrown. A null in the result array means that * results, in case an exception is thrown. A null in the result array means that
* the call for that action failed, even after retries. The order of the objects * the call for that action failed, even after retries. The order of the objects

View File

@ -668,7 +668,8 @@ public final class RequestConverter {
.setMethodName(exec.getMethod().getName()) .setMethodName(exec.getMethod().getName())
.setRequest(value))); .setRequest(value)));
} else if (row instanceof RowMutations) { } else if (row instanceof RowMutations) {
throw new UnsupportedOperationException("No RowMutations in multi calls; use mutateRow"); // Skip RowMutations, which has been separately converted to RegionAction
continue;
} else { } else {
throw new DoNotRetryIOException("Multi doesn't support " + row.getClass().getName()); throw new DoNotRetryIOException("Multi doesn't support " + row.getClass().getName());
} }
@ -756,7 +757,8 @@ public final class RequestConverter {
.setMethodName(exec.getMethod().getName()) .setMethodName(exec.getMethod().getName())
.setRequest(value))); .setRequest(value)));
} else if (row instanceof RowMutations) { } else if (row instanceof RowMutations) {
throw new UnsupportedOperationException("No RowMutations in multi calls; use mutateRow"); // Skip RowMutations, which has been separately converted to RegionAction
continue;
} else { } else {
throw new DoNotRetryIOException("Multi doesn't support " + row.getClass().getName()); throw new DoNotRetryIOException("Multi doesn't support " + row.getClass().getName());
} }

View File

@ -89,7 +89,8 @@ public final class ResponseConverter {
/** /**
* Get the results from a protocol buffer MultiResponse * Get the results from a protocol buffer MultiResponse
* *
* @param request the protocol buffer MultiResponse to convert * @param request the original protocol buffer MultiRequest
* @param response the protocol buffer MultiResponse to convert
* @param cells Cells to go with the passed in <code>proto</code>. Can be null. * @param cells Cells to go with the passed in <code>proto</code>. Can be null.
* @return the results that were in the MultiResponse (a Result or an Exception). * @return the results that were in the MultiResponse (a Result or an Exception).
* @throws IOException * @throws IOException
@ -97,6 +98,22 @@ public final class ResponseConverter {
public static org.apache.hadoop.hbase.client.MultiResponse getResults(final MultiRequest request, public static org.apache.hadoop.hbase.client.MultiResponse getResults(final MultiRequest request,
final MultiResponse response, final CellScanner cells) final MultiResponse response, final CellScanner cells)
throws IOException { throws IOException {
return getResults(request, null, response, cells);
}
/**
* Get the results from a protocol buffer MultiResponse
*
* @param request the original protocol buffer MultiRequest
* @param rowMutationsIndexMap Used to support RowMutations in batch
* @param response the protocol buffer MultiResponse to convert
* @param cells Cells to go with the passed in <code>proto</code>. Can be null.
* @return the results that were in the MultiResponse (a Result or an Exception).
* @throws IOException
*/
public static org.apache.hadoop.hbase.client.MultiResponse getResults(final MultiRequest request,
final Map<Integer, Integer> rowMutationsIndexMap, final MultiResponse response,
final CellScanner cells) throws IOException {
int requestRegionActionCount = request.getRegionActionCount(); int requestRegionActionCount = request.getRegionActionCount();
int responseRegionActionResultCount = response.getRegionActionResultCount(); int responseRegionActionResultCount = response.getRegionActionResultCount();
if (requestRegionActionCount != responseRegionActionResultCount) { if (requestRegionActionCount != responseRegionActionResultCount) {
@ -130,8 +147,24 @@ public final class ResponseConverter {
actionResult.getResultOrExceptionCount() + " for region " + actions.getRegion()); actionResult.getResultOrExceptionCount() + " for region " + actions.getRegion());
} }
for (ResultOrException roe : actionResult.getResultOrExceptionList()) {
Object responseValue; Object responseValue;
// For RowMutations action, if there is an exception, the exception is set
// at the RegionActionResult level and the ResultOrException is null at the original index
Integer rowMutationsIndex =
(rowMutationsIndexMap == null ? null : rowMutationsIndexMap.get(i));
if (rowMutationsIndex != null) {
// This RegionAction is from a RowMutations in a batch.
// If there is an exception from the server, the exception is set at
// the RegionActionResult level, which has been handled above.
responseValue = response.getProcessed() ?
ProtobufUtil.EMPTY_RESULT_EXISTS_TRUE :
ProtobufUtil.EMPTY_RESULT_EXISTS_FALSE;
results.add(regionName, rowMutationsIndex, responseValue);
continue;
}
for (ResultOrException roe : actionResult.getResultOrExceptionList()) {
if (roe.hasException()) { if (roe.hasException()) {
responseValue = ProtobufUtil.toException(roe.getException()); responseValue = ProtobufUtil.toException(roe.getException());
} else if (roe.hasResult()) { } else if (roe.hasResult()) {

View File

@ -309,6 +309,52 @@ public class TestFromClientSide3 {
} }
} }
@Test
public void testBatchWithRowMutation() throws Exception {
LOG.info("Starting testBatchWithRowMutation");
final TableName TABLENAME = TableName.valueOf("testBatchWithRowMutation");
try (Table t = TEST_UTIL.createTable(TABLENAME, FAMILY)) {
byte [][] QUALIFIERS = new byte [][] {
Bytes.toBytes("a"), Bytes.toBytes("b")
};
RowMutations arm = new RowMutations(ROW);
Put p = new Put(ROW);
p.addColumn(FAMILY, QUALIFIERS[0], VALUE);
arm.add(p);
Object[] batchResult = new Object[1];
t.batch(Arrays.asList(arm), batchResult);
Get g = new Get(ROW);
Result r = t.get(g);
assertEquals(0, Bytes.compareTo(VALUE, r.getValue(FAMILY, QUALIFIERS[0])));
arm = new RowMutations(ROW);
p = new Put(ROW);
p.addColumn(FAMILY, QUALIFIERS[1], VALUE);
arm.add(p);
Delete d = new Delete(ROW);
d.addColumns(FAMILY, QUALIFIERS[0]);
arm.add(d);
t.batch(Arrays.asList(arm), batchResult);
r = t.get(g);
assertEquals(0, Bytes.compareTo(VALUE, r.getValue(FAMILY, QUALIFIERS[1])));
assertNull(r.getValue(FAMILY, QUALIFIERS[0]));
// Test that we get the correct remote exception for RowMutations from batch()
try {
arm = new RowMutations(ROW);
p = new Put(ROW);
p.addColumn(new byte[]{'b', 'o', 'g', 'u', 's'}, QUALIFIERS[0], VALUE);
arm.add(p);
t.batch(Arrays.asList(arm), batchResult);
fail("Expected RetriesExhaustedWithDetailsException with NoSuchColumnFamilyException");
} catch(RetriesExhaustedWithDetailsException e) {
String msg = e.getMessage();
assertTrue(msg.contains("NoSuchColumnFamilyException"));
}
}
}
@Test @Test
public void testHTableExistsMethodSingleRegionSingleGet() throws Exception { public void testHTableExistsMethodSingleRegionSingleGet() throws Exception {
// Test with a single region table. // Test with a single region table.

View File

@ -642,6 +642,23 @@ public class TestMultiParallel {
put.addColumn(BYTES_FAMILY, qual2, val2); put.addColumn(BYTES_FAMILY, qual2, val2);
actions.add(put); actions.add(put);
// 6 RowMutations
RowMutations rm = new RowMutations(KEYS[50]);
put = new Put(KEYS[50]);
put.addColumn(BYTES_FAMILY, qual2, val2);
rm.add(put);
byte[] qual3 = Bytes.toBytes("qual3");
byte[] val3 = Bytes.toBytes("putvalue3");
put = new Put(KEYS[50]);
put.addColumn(BYTES_FAMILY, qual3, val3);
rm.add(put);
actions.add(rm);
// 7 Add another Get to the mixed sequence after RowMutations
get = new Get(KEYS[10]);
get.addColumn(BYTES_FAMILY, QUALIFIER);
actions.add(get);
results = new Object[actions.size()]; results = new Object[actions.size()];
table.batch(actions, results); table.batch(actions, results);
@ -649,10 +666,11 @@ public class TestMultiParallel {
validateResult(results[0]); validateResult(results[0]);
validateResult(results[1]); validateResult(results[1]);
validateEmpty(results[2]);
validateEmpty(results[3]); validateEmpty(results[3]);
validateResult(results[4]); validateResult(results[4]);
validateEmpty(results[5]); validateEmpty(results[5]);
validateEmpty(results[6]);
validateResult(results[7]);
// validate last put, externally from the batch // validate last put, externally from the batch
get = new Get(KEYS[40]); get = new Get(KEYS[40]);
@ -660,6 +678,17 @@ public class TestMultiParallel {
Result r = table.get(get); Result r = table.get(get);
validateResult(r, qual2, val2); validateResult(r, qual2, val2);
// validate last RowMutations, externally from the batch
get = new Get(KEYS[50]);
get.addColumn(BYTES_FAMILY, qual2);
r = table.get(get);
validateResult(r, qual2, val2);
get = new Get(KEYS[50]);
get.addColumn(BYTES_FAMILY, qual3);
r = table.get(get);
validateResult(r, qual3, val3);
table.close(); table.close();
} }
@ -736,8 +765,7 @@ public class TestMultiParallel {
private void validateEmpty(Object r1) { private void validateEmpty(Object r1) {
Result result = (Result)r1; Result result = (Result)r1;
Assert.assertTrue(result != null); Assert.assertTrue(result != null);
Assert.assertTrue(result.getRow() == null); Assert.assertTrue(result.isEmpty());
Assert.assertEquals(0, result.rawCells().length);
} }
private void validateSizeAndEmpty(Object[] results, int expectedSize) { private void validateSizeAndEmpty(Object[] results, int expectedSize) {