HBASE-2947 MultiIncrement/MultiAppend (JGray and Lars H)

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1227382 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
larsh 2012-01-04 23:16:54 +00:00
parent 5866a55cdd
commit 49f35a7756
4 changed files with 51 additions and 6 deletions

View File

@ -41,7 +41,7 @@ import org.apache.hadoop.io.Writable;
* row to append to. At least one column to append must be specified using the
* {@link #add(byte[], byte[], byte[])} method.
*/
public class Append extends Mutation implements Writable {
public class Append extends Mutation implements Row {
// TODO: refactor to derive from Put?
private static final String RETURN_RESULTS = "_rr_";
private static final byte APPEND_VERSION = (byte)1;

View File

@ -43,7 +43,7 @@ import org.apache.hadoop.io.Writable;
* to increment. At least one column to increment must be specified using the
* {@link #addColumn(byte[], byte[], long)} method.
*/
public class Increment implements Writable {
public class Increment implements Row {
private static final byte INCREMENT_VERSION = (byte)2;
private byte [] row = null;
@ -328,4 +328,9 @@ public class Increment implements Writable {
}
out.writeBoolean(writeToWAL);
}
@Override
public int compareTo(Row i) {
return Bytes.compareTo(this.getRow(), i.getRow());
}
}

View File

@ -3234,10 +3234,11 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler,
try {
if (action instanceof Delete) {
delete(regionName, (Delete) action);
delete(regionName, (Delete)action);
response.add(regionName, originalIndex, new Result());
} else if (action instanceof Get) {
response.add(regionName, originalIndex, get(regionName, (Get) action));
response.add(regionName, originalIndex,
get(regionName, (Get)action));
} else if (action instanceof Put) {
puts.add(a); // wont throw.
} else if (action instanceof Exec) {
@ -3245,11 +3246,17 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler,
response.add(regionName, new Pair<Integer, Object>(
a.getOriginalIndex(), result.getValue()
));
} else if (action instanceof Increment) {
response.add(regionName, originalIndex,
increment(regionName, (Increment)action));
} else if (action instanceof Append) {
response.add(regionName, originalIndex,
append(regionName, (Append)action));
} else {
LOG.debug("Error: invalid Action, row must be a Get, Delete, " +
"Put or Exec.");
"Put, Exec, Increment, or Append.");
throw new DoNotRetryIOException("Invalid Action, row must be a " +
"Get, Delete or Put.");
"Get, Delete, Put, Exec, Increment, or Append.");
}
} catch (IOException ex) {
response.add(regionName, originalIndex, ex);

View File

@ -385,6 +385,39 @@ public class TestMultiParallel {
table.close();
}
@Test(timeout=300000)
public void testBatchWithIncrementAndAppend() throws Exception {
LOG.info("test=testBatchWithIncrementAndAppend");
final byte[] QUAL1 = Bytes.toBytes("qual1");
final byte[] QUAL2 = Bytes.toBytes("qual2");
final byte[] QUAL3 = Bytes.toBytes("qual3");
final byte[] QUAL4 = Bytes.toBytes("qual4");
HTable table = new HTable(UTIL.getConfiguration(), TEST_TABLE);
Delete d = new Delete(ONE_ROW);
table.delete(d);
Put put = new Put(ONE_ROW);
put.add(BYTES_FAMILY, QUAL1, Bytes.toBytes("abc"));
put.add(BYTES_FAMILY, QUAL2, Bytes.toBytes(1L));
table.put(put);
Increment inc = new Increment(ONE_ROW);
inc.addColumn(BYTES_FAMILY, QUAL2, 1);
inc.addColumn(BYTES_FAMILY, QUAL3, 1);
Append a = new Append(ONE_ROW);
a.add(BYTES_FAMILY, QUAL1, Bytes.toBytes("def"));
a.add(BYTES_FAMILY, QUAL4, Bytes.toBytes("xyz"));
List<Row> actions = new ArrayList<Row>();
actions.add(inc);
actions.add(a);
Object[] multiRes = table.batch(actions);
validateResult(multiRes[1], QUAL1, Bytes.toBytes("abcdef"));
validateResult(multiRes[1], QUAL4, Bytes.toBytes("xyz"));
validateResult(multiRes[0], QUAL2, Bytes.toBytes(2L));
validateResult(multiRes[0], QUAL3, Bytes.toBytes(1L));
}
@Test(timeout=300000)
public void testBatchWithMixedActions() throws Exception {
LOG.info("test=testBatchWithMixedActions");