diff --git a/CHANGES.txt b/CHANGES.txt index 41a5d74ede7..46c8caa8496 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -212,6 +212,8 @@ Release 0.20.0 - Unreleased HBASE-1547 atomicIncrement doesnt increase hregion.memcacheSize HBASE-1553 ClassSize missing in trunk HBASE-1561 HTable Mismatch between javadoc and what it actually does + HBASE-1558 deletes use 'HConstants.LATEST_TIMESTAMP' but no one translates + that into 'now' IMPROVEMENTS HBASE-1089 Add count of regions on filesystem to master UI; add percentage diff --git a/src/java/org/apache/hadoop/hbase/mapreduce/TableInputFormat.java b/src/java/org/apache/hadoop/hbase/mapreduce/TableInputFormat.java index 790a2cf7506..28b0eb4c63e 100644 --- a/src/java/org/apache/hadoop/hbase/mapreduce/TableInputFormat.java +++ b/src/java/org/apache/hadoop/hbase/mapreduce/TableInputFormat.java @@ -43,6 +43,9 @@ public class TableInputFormat extends TableInputFormatBase implements * space delimited list of columns */ public static final String COLUMN_LIST = "hbase.mapred.tablecolumns"; + + public static final String TIME_RANGE_MAX = "hbase.mapred.timerange.max"; + public static final String TIME_RANGE_MIN = "hbase.mapred.timerange.min"; public void configure(JobConf job) { Path[] tableNames = FileInputFormat.getInputPaths(job); @@ -53,6 +56,11 @@ public class TableInputFormat extends TableInputFormatBase implements m_cols[i] = Bytes.toBytes(colNames[i]); } setInputColumns(m_cols); + + String minArg = job.get(TIME_RANGE_MIN); + String maxArg = job.get(TIME_RANGE_MAX); + setTimeRange(Long.parseLong(minArg), Long.parseLong(maxArg)); + try { setHTable(new HTable(new HBaseConfiguration(job), tableNames[0].getName())); } catch (Exception e) { @@ -78,5 +86,15 @@ public class TableInputFormat extends TableInputFormatBase implements if (colArg == null || colArg.length() == 0) { throw new IOException("expecting at least one column"); } + + // maxStamp must be higher then minStamp + String minArg = job.get(TIME_RANGE_MIN); + String maxArg = job.get(TIME_RANGE_MAX); + if (minArg == null || minArg.length() == 0 + || maxArg == null || maxArg.length() == 0 + || Long.parseLong(maxArg) <= Long.parseLong(minArg)) { + throw new IOException("invalid time stamp values"); + } + } } diff --git a/src/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java b/src/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java index dcd86028314..6449a731f90 100644 --- a/src/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java +++ b/src/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java @@ -29,8 +29,8 @@ import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.UnknownScannerException; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.filter.RowFilterInterface; import org.apache.hadoop.hbase.filter.StopRowFilter; import org.apache.hadoop.hbase.filter.WhileMatchRowFilter; @@ -77,6 +77,7 @@ public abstract class TableInputFormatBase implements InputFormat { final Log LOG = LogFactory.getLog(TableInputFormatBase.class); private byte [][] inputColumns; + private long minStamp, maxStamp; private HTable table; private TableRecordReader tableRecordReader; private RowFilterInterface rowFilter; @@ -93,7 +94,8 @@ implements InputFormat { private ResultScanner scanner; private HTable htable; private byte [][] trrInputColumns; - + private long minStamp, maxStamp; + /** * Restart from survivable exceptions by creating a new scanner. * @@ -101,28 +103,33 @@ implements InputFormat { * @throws IOException */ public void restart(byte[] firstRow) throws IOException { + Scan scan = null; if ((endRow != null) && (endRow.length > 0)) { if (trrRowFilter != null) { final Set rowFiltersSet = new HashSet(); rowFiltersSet.add(new WhileMatchRowFilter(new StopRowFilter(endRow))); rowFiltersSet.add(trrRowFilter); - Scan scan = new Scan(startRow); - scan.addColumns(trrInputColumns); + scan = new Scan(startRow); // scan.setFilter(new RowFilterSet(RowFilterSet.Operator.MUST_PASS_ALL, // rowFiltersSet)); this.scanner = this.htable.getScanner(scan); } else { - Scan scan = new Scan(firstRow, endRow); - scan.addColumns(trrInputColumns); - this.scanner = this.htable.getScanner(scan); + scan = new Scan(firstRow, endRow); + } } else { - Scan scan = new Scan(firstRow); - scan.addColumns(trrInputColumns); + scan = new Scan(firstRow); // scan.setFilter(trrRowFilter); - this.scanner = this.htable.getScanner(scan); } + scan.addColumns(trrInputColumns); + + if (minStamp !=0 && maxStamp !=0) { // if timestamps are ungiven. + scan.setTimeRange(minStamp, maxStamp); + scan.setMaxVersions(); + } + + this.scanner = this.htable.getScanner(scan); } /** @@ -147,6 +154,11 @@ implements InputFormat { public void setInputColumns(final byte [][] inputColumns) { this.trrInputColumns = inputColumns; } + + public void setTimeRange(long minStamp, long maxStamp) { + this.minStamp = minStamp; + this.maxStamp = maxStamp; + } /** * @param startRow the first row in the split @@ -251,6 +263,7 @@ implements InputFormat { trr.setEndRow(tSplit.getEndRow()); trr.setHTable(this.table); trr.setInputColumns(this.inputColumns); + trr.setTimeRange(this.minStamp, this.maxStamp); trr.setRowFilter(this.rowFilter); trr.init(); return trr; @@ -310,6 +323,16 @@ implements InputFormat { this.inputColumns = inputColumns; } + + /** + * @param minStamp + * @param maxStam + */ + protected void setTimeRange(long minStamp, long maxStamp) { + this.minStamp = minStamp; + this.maxStamp = maxStamp; + } + /** * Allows subclasses to get the {@link HTable}. */ @@ -344,4 +367,5 @@ implements InputFormat { protected void setRowFilter(RowFilterInterface rowFilter) { this.rowFilter = rowFilter; } + } diff --git a/src/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java b/src/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java index f982a469834..1b76bd6f942 100644 --- a/src/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java +++ b/src/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java @@ -51,13 +51,39 @@ public class TableMapReduceUtil { Class mapper, Class outputKeyClass, Class outputValueClass, JobConf job) { - + + initTableMapJob(table, columns, mapper, outputKeyClass, + outputValueClass, job, 0, 0); + } + + /** + * Use this before submitting a TableMap job. It will + * appropriately set up the JocConf. + * + * @param table The table name to read from. + * @param columns The columns to scan. + * @param mapper The mapper class to use. + * @param outputKeyClass The class of the output key. + * @param outputValueClass The class of the output value. + * @param job The current job configuration to adjust. + * @param minStamp the minimum timestamp, inclusive + * @param maxStamp the maximum timestamp, exclusive + */ + public static void initTableMapJob(String table, String columns, + Class mapper, + Class outputKeyClass, + Class outputValueClass, JobConf job, + long minStamp, + long maxStamp) { + job.setInputFormat(TableInputFormat.class); job.setMapOutputValueClass(outputValueClass); job.setMapOutputKeyClass(outputKeyClass); job.setMapperClass(mapper); FileInputFormat.addInputPaths(job, table); job.set(TableInputFormat.COLUMN_LIST, columns); + job.setLong(TableInputFormat.TIME_RANGE_MIN, minStamp); + job.setLong(TableInputFormat.TIME_RANGE_MAX, maxStamp); } /** diff --git a/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 56d87fe0d38..4ad78182eef 100644 --- a/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -1128,8 +1128,10 @@ public class HRegion implements HConstants { // , Writable{ public void delete(byte [] family, List kvs, boolean writeToWAL) throws IOException { long now = System.currentTimeMillis(); + byte [] byteNow = Bytes.toBytes(now); boolean flush = false; this.updatesLock.readLock().lock(); + try { if (writeToWAL) { this.log.append(regionInfo.getRegionName(), @@ -1158,7 +1160,10 @@ public class HRegion implements HConstants { // , Writable{ KeyValue getkv = result.get(0); Bytes.putBytes(kv.getBuffer(), kv.getTimestampOffset(), getkv.getBuffer(), getkv.getTimestampOffset(), Bytes.SIZEOF_LONG); + } else { + kv.updateLatestStamp(byteNow); } + size = this.memcacheSize.addAndGet(store.delete(kv)); } flush = isFlushSize(size); diff --git a/src/test/org/apache/hadoop/hbase/HBaseTestCase.java b/src/test/org/apache/hadoop/hbase/HBaseTestCase.java index d7541f36369..ad5eaf1a979 100644 --- a/src/test/org/apache/hadoop/hbase/HBaseTestCase.java +++ b/src/test/org/apache/hadoop/hbase/HBaseTestCase.java @@ -29,6 +29,7 @@ import java.util.NavigableMap; import java.util.SortedMap; import junit.framework.TestCase; +import junit.framework.AssertionFailedError; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -658,4 +659,13 @@ public abstract class HBaseTestCase extends TestCase { root.getLog().closeAndDelete(); } } + + public void assertByteEquals(byte[] expected, + byte[] actual) { + if (Bytes.compareTo(expected, actual) != 0) { + throw new AssertionFailedError("expected:<" + + Bytes.toString(expected) + "> but was:<" + + Bytes.toString(actual) + ">"); + } + } } diff --git a/src/test/org/apache/hadoop/hbase/regionserver/TestHRegion.java b/src/test/org/apache/hadoop/hbase/regionserver/TestHRegion.java index 8b7ec81e981..1310609af52 100644 --- a/src/test/org/apache/hadoop/hbase/regionserver/TestHRegion.java +++ b/src/test/org/apache/hadoop/hbase/regionserver/TestHRegion.java @@ -23,6 +23,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.TreeMap; +import java.util.Iterator; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -57,7 +58,14 @@ public class TestHRegion extends HBaseTestCase { private final String DIR = "test/build/data/TestHRegion/"; private final int MAX_VERSIONS = 2; - + + // Test names + private final byte[] tableName = Bytes.toBytes("testtable");; + private final byte[] qual1 = Bytes.toBytes("qual1"); + private final byte[] value1 = Bytes.toBytes("value1"); + private final byte[] value2 = Bytes.toBytes("value2"); + private final byte [] row = Bytes.toBytes("rowA"); + /** * @see org.apache.hadoop.hbase.HBaseTestCase#setUp() */ @@ -325,13 +333,68 @@ public class TestHRegion extends HBaseTestCase { assertTrue(Bytes.equals(rowB, results.get(0).getRow())); } + + public void testDeleteColumns_PostInsert() throws IOException, + InterruptedException { + Delete delete = new Delete(row); + delete.deleteColumns(fam1, qual1); + doTestDelete_AndPostInsert(delete); + } + + public void testDeleteFamily_PostInsert() throws IOException, InterruptedException { + Delete delete = new Delete(row); + delete.deleteFamily(fam1); + doTestDelete_AndPostInsert(delete); + } + + public void doTestDelete_AndPostInsert(Delete delete) + throws IOException, InterruptedException { + initHRegion(tableName, getName(), fam1); + Put put = new Put(row); + put.add(fam1, qual1, value1); + region.put(put); + + Thread.sleep(10); + + // now delete the value: + region.delete(delete, null, true); + + Thread.sleep(10); + + // ok put data: + put = new Put(row); + put.add(fam1, qual1, value2); + region.put(put); + + // ok get: + Get get = new Get(row); + get.addColumn(fam1, qual1); + + Result r = region.get(get, null); + assertEquals(1, r.size()); + assertByteEquals(value2, r.getValue(fam1, qual1)); + + // next: + Scan scan = new Scan(row); + scan.addColumn(fam1, qual1); + InternalScanner s = region.getScanner(scan); + + List results = new ArrayList(); + assertEquals(false, s.next(results)); + assertEquals(1, results.size()); + KeyValue kv = results.get(0); + + assertByteEquals(value2, kv.getValue()); + assertByteEquals(fam1, kv.getFamily()); + assertByteEquals(qual1, kv.getQualifier()); + assertByteEquals(row, kv.getRow()); + } + + - //Visual test, since the method doesn't return anything public void testDelete_CheckTimestampUpdated() throws IOException { - byte [] tableName = Bytes.toBytes("testtable"); byte [] row1 = Bytes.toBytes("row1"); - byte [] fam1 = Bytes.toBytes("fam1"); byte [] col1 = Bytes.toBytes("col1"); byte [] col2 = Bytes.toBytes("col2"); byte [] col3 = Bytes.toBytes("col3"); @@ -345,8 +408,19 @@ public class TestHRegion extends HBaseTestCase { kvs.add(new KeyValue(row1, fam1, col1, null)); kvs.add(new KeyValue(row1, fam1, col2, null)); kvs.add(new KeyValue(row1, fam1, col3, null)); - + region.delete(fam1, kvs, true); + + // extract the key values out the memcache: + // This is kinda hacky, but better than nothing... + long now = System.currentTimeMillis(); + KeyValue firstKv = region.getStore(fam1).memcache.memcache.first(); + assertTrue(firstKv.getTimestamp() <= now); + now = firstKv.getTimestamp(); + for (KeyValue kv : region.getStore(fam1).memcache.memcache) { + assertTrue(kv.getTimestamp() <= now); + now = kv.getTimestamp(); + } } ////////////////////////////////////////////////////////////////////////////// @@ -1054,15 +1128,14 @@ public class TestHRegion extends HBaseTestCase { byte [] qf1 = Bytes.toBytes("qualifier1"); byte [] qf2 = Bytes.toBytes("qualifier2"); byte [] fam1 = Bytes.toBytes("fam1"); - byte [][] families = {fam1}; - + long ts1 = 1; //System.currentTimeMillis(); long ts2 = ts1 + 1; long ts3 = ts1 + 2; //Setting up region String method = this.getName(); - initHRegion(tableName, method, families); + initHRegion(tableName, method, fam1); //Putting data in Region Put put = null; @@ -1104,6 +1177,56 @@ public class TestHRegion extends HBaseTestCase { for(int i=0; i results = new ArrayList(); + assertEquals(false, s.next(results)); + assertEquals(0, results.size()); + + + + } public void testScanner_Wildcard_FromMemcacheAndFiles_EnforceVersions() @@ -1111,9 +1234,8 @@ public class TestHRegion extends HBaseTestCase { byte [] tableName = Bytes.toBytes("testtable"); byte [] row1 = Bytes.toBytes("row1"); byte [] fam1 = Bytes.toBytes("fam1"); - byte [][] families = {fam1}; byte [] qf1 = Bytes.toBytes("qualifier1"); - byte [] qf2 = Bytes.toBytes("qualifier2"); + byte [] qf2 = Bytes.toBytes("quateslifier2"); long ts1 = 1; long ts2 = ts1 + 1; @@ -1122,7 +1244,7 @@ public class TestHRegion extends HBaseTestCase { //Setting up region String method = this.getName(); - initHRegion(tableName, method, families); + initHRegion(tableName, method, fam1); //Putting data in Region KeyValue kv14 = new KeyValue(row1, fam1, qf1, ts4, KeyValue.Type.Put, null);