HBASE-6291 Don't retry increments on an invalid cell

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1379058 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
larsh 2012-08-30 18:49:58 +00:00
parent 3bbca25d38
commit cf76929db6
2 changed files with 38 additions and 7 deletions

View File

@ -4827,7 +4827,13 @@ public class HRegion implements HeapSize { // , Writable{
if (idx < results.size() && if (idx < results.size() &&
results.get(idx).matchingQualifier(column.getKey())) { results.get(idx).matchingQualifier(column.getKey())) {
KeyValue kv = results.get(idx); KeyValue kv = results.get(idx);
amount += Bytes.toLong(kv.getBuffer(), kv.getValueOffset(), kv.getValueLength()); if(kv.getValueLength() == Bytes.SIZEOF_LONG) {
amount += Bytes.toLong(kv.getBuffer(), kv.getValueOffset(), Bytes.SIZEOF_LONG);
} else {
// throw DoNotRetryIOException instead of IllegalArgumentException
throw new DoNotRetryIOException(
"Attempted to increment field that isn't 64 bits wide");
}
idx++; idx++;
} }
@ -4876,10 +4882,9 @@ public class HRegion implements HeapSize { // , Writable{
} }
} finally { } finally {
closeRegionOperation(); closeRegionOperation();
}
long after = EnvironmentEdgeManager.currentTimeMillis(); long after = EnvironmentEdgeManager.currentTimeMillis();
this.opMetrics.updateIncrementMetrics(increment.getFamilyMap().keySet(), after - before); this.opMetrics.updateIncrementMetrics(increment.getFamilyMap().keySet(), after - before);
}
if (flush) { if (flush) {
// Request a cache flush. Do it outside update lock. // Request a cache flush. Do it outside update lock.
@ -4930,7 +4935,7 @@ public class HRegion implements HeapSize { // , Writable{
if (!results.isEmpty()) { if (!results.isEmpty()) {
KeyValue kv = results.get(0); KeyValue kv = results.get(0);
if(kv.getValueLength() == 8){ if(kv.getValueLength() == Bytes.SIZEOF_LONG){
byte [] buffer = kv.getBuffer(); byte [] buffer = kv.getBuffer();
int valueOffset = kv.getValueOffset(); int valueOffset = kv.getValueOffset();
result += Bytes.toLong(buffer, valueOffset, Bytes.SIZEOF_LONG); result += Bytes.toLong(buffer, valueOffset, Bytes.SIZEOF_LONG);
@ -4986,7 +4991,7 @@ public class HRegion implements HeapSize { // , Writable{
requestFlush(); requestFlush();
} }
if (wrongLength) { if (wrongLength) {
throw new IOException( throw new DoNotRetryIOException(
"Attempted to increment field that isn't 64 bits wide"); "Attempted to increment field that isn't 64 bits wide");
} }
return result; return result;

View File

@ -4230,7 +4230,7 @@ public class TestFromClientSide {
@Test @Test
public void testIncrementWithDeletes() throws Exception { public void testIncrementWithDeletes() throws Exception {
LOG.info("Starting testIncrement"); LOG.info("Starting testIncrementWithDeletes");
final byte [] TABLENAME = Bytes.toBytes("testIncrementWithDeletes"); final byte [] TABLENAME = Bytes.toBytes("testIncrementWithDeletes");
HTable ht = TEST_UTIL.createTable(TABLENAME, FAMILY); HTable ht = TEST_UTIL.createTable(TABLENAME, FAMILY);
final byte[] COLUMN = Bytes.toBytes("column"); final byte[] COLUMN = Bytes.toBytes("column");
@ -4249,6 +4249,32 @@ public class TestFromClientSide {
assertEquals(5, Bytes.toLong(r.getValue(FAMILY, COLUMN))); assertEquals(5, Bytes.toLong(r.getValue(FAMILY, COLUMN)));
} }
@Test
public void testIncrementingInvalidValue() throws Exception {
LOG.info("Starting testIncrementingInvalidValue");
final byte [] TABLENAME = Bytes.toBytes("testIncrementingInvalidValue");
HTable ht = TEST_UTIL.createTable(TABLENAME, FAMILY);
final byte[] COLUMN = Bytes.toBytes("column");
Put p = new Put(ROW);
// write an integer here (not a Long)
p.add(FAMILY, COLUMN, Bytes.toBytes(5));
ht.put(p);
try {
ht.incrementColumnValue(ROW, FAMILY, COLUMN, 5);
fail("Should have thrown DoNotRetryIOException");
} catch (DoNotRetryIOException iox) {
// success
}
Increment inc = new Increment(ROW);
inc.addColumn(FAMILY, COLUMN, 5);
try {
ht.increment(inc);
fail("Should have thrown DoNotRetryIOException");
} catch (DoNotRetryIOException iox) {
// success
}
}
@Test @Test
public void testIncrement() throws Exception { public void testIncrement() throws Exception {
LOG.info("Starting testIncrement"); LOG.info("Starting testIncrement");