HBASE-21945 Maintain the original order when sending batch request
Signed-off-by: Guanghao Zhang <zghao@apache.org>
This commit is contained in:
parent
449ed0834b
commit
a47c1ddcd2
|
@ -255,8 +255,11 @@ class AsyncBatchRpcRetryingCaller<T> {
|
||||||
// multiRequestBuilder will be populated with region actions.
|
// multiRequestBuilder will be populated with region actions.
|
||||||
// rowMutationsIndexMap will be non-empty after the call if there is RowMutations in the
|
// rowMutationsIndexMap will be non-empty after the call if there is RowMutations in the
|
||||||
// action list.
|
// action list.
|
||||||
RequestConverter.buildNoDataRegionActions(entry.getKey(), entry.getValue().actions, cells,
|
RequestConverter.buildNoDataRegionActions(entry.getKey(),
|
||||||
multiRequestBuilder, regionActionBuilder, actionBuilder, mutationBuilder, nonceGroup,
|
entry.getValue().actions.stream()
|
||||||
|
.sorted((a1, a2) -> Integer.compare(a1.getOriginalIndex(), a2.getOriginalIndex()))
|
||||||
|
.collect(Collectors.toList()),
|
||||||
|
cells, multiRequestBuilder, regionActionBuilder, actionBuilder, mutationBuilder, nonceGroup,
|
||||||
rowMutationsIndexMap);
|
rowMutationsIndexMap);
|
||||||
}
|
}
|
||||||
return multiRequestBuilder.build();
|
return multiRequestBuilder.build();
|
||||||
|
|
|
@ -23,6 +23,7 @@ import static org.junit.Assert.assertEquals;
|
||||||
import static org.junit.Assert.assertFalse;
|
import static org.junit.Assert.assertFalse;
|
||||||
import static org.junit.Assert.assertThat;
|
import static org.junit.Assert.assertThat;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
|
import static org.junit.Assert.fail;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.UncheckedIOException;
|
import java.io.UncheckedIOException;
|
||||||
|
@ -47,6 +48,7 @@ import org.apache.hadoop.hbase.coprocessor.ObserverContext;
|
||||||
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
|
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
|
||||||
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
|
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
|
||||||
import org.apache.hadoop.hbase.coprocessor.RegionObserver;
|
import org.apache.hadoop.hbase.coprocessor.RegionObserver;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.NoSuchColumnFamilyException;
|
||||||
import org.apache.hadoop.hbase.testclassification.ClientTests;
|
import org.apache.hadoop.hbase.testclassification.ClientTests;
|
||||||
import org.apache.hadoop.hbase.testclassification.LargeTests;
|
import org.apache.hadoop.hbase.testclassification.LargeTests;
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
|
@ -280,8 +282,30 @@ public class TestAsyncTableBatch {
|
||||||
}
|
}
|
||||||
try {
|
try {
|
||||||
futures.get(SPLIT_KEYS.length - 1).get();
|
futures.get(SPLIT_KEYS.length - 1).get();
|
||||||
|
fail();
|
||||||
} catch (ExecutionException e) {
|
} catch (ExecutionException e) {
|
||||||
assertThat(e.getCause(), instanceOf(RetriesExhaustedException.class));
|
assertThat(e.getCause(), instanceOf(RetriesExhaustedException.class));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testPartialSuccessOnSameRegion() throws InterruptedException, ExecutionException {
|
||||||
|
AsyncTable<?> table = tableGetter.apply(TABLE_NAME);
|
||||||
|
List<CompletableFuture<Object>> futures = table.batch(Arrays.asList(
|
||||||
|
new Put(Bytes.toBytes("put")).addColumn(Bytes.toBytes("not-exists"), CQ,
|
||||||
|
Bytes.toBytes("bad")),
|
||||||
|
new Increment(Bytes.toBytes("inc")).addColumn(FAMILY, CQ, 1),
|
||||||
|
new Put(Bytes.toBytes("put")).addColumn(FAMILY, CQ, Bytes.toBytes("good"))));
|
||||||
|
try {
|
||||||
|
futures.get(0).get();
|
||||||
|
fail();
|
||||||
|
} catch (ExecutionException e) {
|
||||||
|
assertThat(e.getCause(), instanceOf(RetriesExhaustedException.class));
|
||||||
|
assertThat(e.getCause().getCause(), instanceOf(NoSuchColumnFamilyException.class));
|
||||||
|
}
|
||||||
|
assertEquals(1, Bytes.toLong(((Result) futures.get(1).get()).getValue(FAMILY, CQ)));
|
||||||
|
assertTrue(((Result) futures.get(2).get()).isEmpty());
|
||||||
|
assertEquals("good",
|
||||||
|
Bytes.toString(table.get(new Get(Bytes.toBytes("put"))).get().getValue(FAMILY, CQ)));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue