HBASE-742 Column length limit is not enforced

git-svn-id: https://svn.apache.org/repos/asf/hadoop/hbase/trunk@676748 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Michael Stack 2008-07-14 21:38:45 +00:00
parent e545586c0d
commit 34f05ef4ae
3 changed files with 66 additions and 0 deletions

View File

@ -184,6 +184,7 @@ Trunk (unreleased changes)
HBASE-739 HBaseAdmin.createTable() using old HTableDescription doesn't work HBASE-739 HBaseAdmin.createTable() using old HTableDescription doesn't work
(Izaak Rubin via Stack) (Izaak Rubin via Stack)
HBASE-744 BloomFilter serialization/deserialization broken HBASE-744 BloomFilter serialization/deserialization broken
HBASE-742 Column length limit is not enforced (Jean-Daniel Cryans via Stack)
IMPROVEMENTS IMPROVEMENTS
HBASE-559 MR example job to count table rows HBASE-559 MR example job to count table rows

View File

@ -71,6 +71,7 @@ import org.apache.hadoop.hbase.RemoteExceptionHandler;
import org.apache.hadoop.hbase.UnknownScannerException; import org.apache.hadoop.hbase.UnknownScannerException;
import org.apache.hadoop.hbase.Leases.LeaseStillHeldException; import org.apache.hadoop.hbase.Leases.LeaseStillHeldException;
import org.apache.hadoop.hbase.filter.RowFilterInterface; import org.apache.hadoop.hbase.filter.RowFilterInterface;
import org.apache.hadoop.hbase.io.BatchOperation;
import org.apache.hadoop.hbase.io.BatchUpdate; import org.apache.hadoop.hbase.io.BatchUpdate;
import org.apache.hadoop.hbase.io.Cell; import org.apache.hadoop.hbase.io.Cell;
import org.apache.hadoop.hbase.io.HbaseMapWritable; import org.apache.hadoop.hbase.io.HbaseMapWritable;
@ -1146,6 +1147,7 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
checkOpen(); checkOpen();
this.requestCount.incrementAndGet(); this.requestCount.incrementAndGet();
HRegion region = getRegion(regionName); HRegion region = getRegion(regionName);
validateValuesLength(b, region);
try { try {
cacheFlusher.reclaimMemcacheMemory(); cacheFlusher.reclaimMemcacheMemory();
region.batchUpdate(b); region.batchUpdate(b);
@ -1158,6 +1160,30 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
} }
} }
/**
* Utility method to verify values length
* @param batchUpdate The update to verify
* @throws IOException Thrown if a value is too long
*/
private void validateValuesLength(BatchUpdate batchUpdate,
HRegion region) throws IOException {
HTableDescriptor desc = region.getTableDesc();
for (Iterator<BatchOperation> iter =
batchUpdate.iterator(); iter.hasNext();) {
BatchOperation operation = iter.next();
int maxLength =
desc.getFamily(HStoreKey.getFamily(operation.getColumn())).
getMaxValueLength();
if(operation.getValue() != null)
if(operation.getValue().length > maxLength) {
throw new IOException("Value in column " +
Bytes.toString(operation.getColumn()) + " is too long. " +
operation.getValue().length + " instead of " + maxLength);
}
}
}
// //
// remote scanner interface // remote scanner interface
// //

View File

@ -38,7 +38,11 @@ import org.apache.hadoop.hbase.util.Bytes;
public class TestBatchUpdate extends HBaseClusterTestCase { public class TestBatchUpdate extends HBaseClusterTestCase {
private static final String CONTENTS_STR = "contents:"; private static final String CONTENTS_STR = "contents:";
private static final byte [] CONTENTS = Bytes.toBytes(CONTENTS_STR); private static final byte [] CONTENTS = Bytes.toBytes(CONTENTS_STR);
private static final String SMALLFAM_STR = "smallfam:";
private static final byte [] SMALLFAM = Bytes.toBytes(SMALLFAM_STR);
private static final int SMALL_LENGTH = 1;
private byte[] value; private byte[] value;
private byte[] smallValue;
private HTableDescriptor desc = null; private HTableDescriptor desc = null;
private HTable table = null; private HTable table = null;
@ -49,6 +53,7 @@ public class TestBatchUpdate extends HBaseClusterTestCase {
public TestBatchUpdate() throws UnsupportedEncodingException { public TestBatchUpdate() throws UnsupportedEncodingException {
super(); super();
value = "abcd".getBytes(HConstants.UTF8_ENCODING); value = "abcd".getBytes(HConstants.UTF8_ENCODING);
smallValue = "a".getBytes(HConstants.UTF8_ENCODING);
} }
/** /**
@ -59,6 +64,12 @@ public class TestBatchUpdate extends HBaseClusterTestCase {
super.setUp(); super.setUp();
this.desc = new HTableDescriptor("test"); this.desc = new HTableDescriptor("test");
desc.addFamily(new HColumnDescriptor(CONTENTS_STR)); desc.addFamily(new HColumnDescriptor(CONTENTS_STR));
desc.addFamily(new HColumnDescriptor(SMALLFAM,
HColumnDescriptor.DEFAULT_VERSIONS,
HColumnDescriptor.DEFAULT_COMPRESSION,
HColumnDescriptor.DEFAULT_IN_MEMORY,
HColumnDescriptor.DEFAULT_BLOCKCACHE, SMALL_LENGTH,
HColumnDescriptor.DEFAULT_TTL, HColumnDescriptor.DEFAULT_BLOOMFILTER));
HBaseAdmin admin = new HBaseAdmin(conf); HBaseAdmin admin = new HBaseAdmin(conf);
admin.createTable(desc); admin.createTable(desc);
table = new HTable(conf, desc.getName()); table = new HTable(conf, desc.getName());
@ -86,4 +97,32 @@ public class TestBatchUpdate extends HBaseClusterTestCase {
} }
} }
} }
public void testBatchUpdateMaxLength() {
// Test for a single good value
BatchUpdate batchUpdate = new BatchUpdate("row1");
batchUpdate.put(SMALLFAM, value);
try {
table.commit(batchUpdate);
fail("Value is too long, should throw exception");
} catch (IOException e) {
// This is expected
}
// Try to see if it's still inserted
try {
Cell cell = table.get("row1", SMALLFAM_STR);
assertNull(cell);
} catch (IOException e) {
e.printStackTrace();
fail("This is unexpected");
}
// Try to put a good value
batchUpdate = new BatchUpdate("row1");
batchUpdate.put(SMALLFAM, smallValue);
try {
table.commit(batchUpdate);
} catch (IOException e) {
fail("Value is long enough, should not throw exception");
}
}
} }