HBASE-1558 deletes use 'HConstants.LATEST_TIMESTAMP' but no one translates that into 'now'
git-svn-id: https://svn.apache.org/repos/asf/hadoop/hbase/trunk@787336 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
2bf31afa70
commit
88de4001bb
|
@ -212,6 +212,8 @@ Release 0.20.0 - Unreleased
|
|||
HBASE-1547 atomicIncrement doesnt increase hregion.memcacheSize
|
||||
HBASE-1553 ClassSize missing in trunk
|
||||
HBASE-1561 HTable Mismatch between javadoc and what it actually does
|
||||
HBASE-1558 deletes use 'HConstants.LATEST_TIMESTAMP' but no one translates
|
||||
that into 'now'
|
||||
|
||||
IMPROVEMENTS
|
||||
HBASE-1089 Add count of regions on filesystem to master UI; add percentage
|
||||
|
|
|
@ -43,6 +43,9 @@ public class TableInputFormat extends TableInputFormatBase implements
|
|||
* space delimited list of columns
|
||||
*/
|
||||
public static final String COLUMN_LIST = "hbase.mapred.tablecolumns";
|
||||
|
||||
public static final String TIME_RANGE_MAX = "hbase.mapred.timerange.max";
|
||||
public static final String TIME_RANGE_MIN = "hbase.mapred.timerange.min";
|
||||
|
||||
public void configure(JobConf job) {
|
||||
Path[] tableNames = FileInputFormat.getInputPaths(job);
|
||||
|
@ -53,6 +56,11 @@ public class TableInputFormat extends TableInputFormatBase implements
|
|||
m_cols[i] = Bytes.toBytes(colNames[i]);
|
||||
}
|
||||
setInputColumns(m_cols);
|
||||
|
||||
String minArg = job.get(TIME_RANGE_MIN);
|
||||
String maxArg = job.get(TIME_RANGE_MAX);
|
||||
setTimeRange(Long.parseLong(minArg), Long.parseLong(maxArg));
|
||||
|
||||
try {
|
||||
setHTable(new HTable(new HBaseConfiguration(job), tableNames[0].getName()));
|
||||
} catch (Exception e) {
|
||||
|
@ -78,5 +86,15 @@ public class TableInputFormat extends TableInputFormatBase implements
|
|||
if (colArg == null || colArg.length() == 0) {
|
||||
throw new IOException("expecting at least one column");
|
||||
}
|
||||
|
||||
// maxStamp must be higher then minStamp
|
||||
String minArg = job.get(TIME_RANGE_MIN);
|
||||
String maxArg = job.get(TIME_RANGE_MAX);
|
||||
if (minArg == null || minArg.length() == 0
|
||||
|| maxArg == null || maxArg.length() == 0
|
||||
|| Long.parseLong(maxArg) <= Long.parseLong(minArg)) {
|
||||
throw new IOException("invalid time stamp values");
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
|
|
@ -29,8 +29,8 @@ import org.apache.hadoop.hbase.HConstants;
|
|||
import org.apache.hadoop.hbase.UnknownScannerException;
|
||||
import org.apache.hadoop.hbase.client.HTable;
|
||||
import org.apache.hadoop.hbase.client.Result;
|
||||
import org.apache.hadoop.hbase.client.Scan;
|
||||
import org.apache.hadoop.hbase.client.ResultScanner;
|
||||
import org.apache.hadoop.hbase.client.Scan;
|
||||
import org.apache.hadoop.hbase.filter.RowFilterInterface;
|
||||
import org.apache.hadoop.hbase.filter.StopRowFilter;
|
||||
import org.apache.hadoop.hbase.filter.WhileMatchRowFilter;
|
||||
|
@ -77,6 +77,7 @@ public abstract class TableInputFormatBase
|
|||
implements InputFormat<ImmutableBytesWritable, Result> {
|
||||
final Log LOG = LogFactory.getLog(TableInputFormatBase.class);
|
||||
private byte [][] inputColumns;
|
||||
private long minStamp, maxStamp;
|
||||
private HTable table;
|
||||
private TableRecordReader tableRecordReader;
|
||||
private RowFilterInterface rowFilter;
|
||||
|
@ -93,7 +94,8 @@ implements InputFormat<ImmutableBytesWritable, Result> {
|
|||
private ResultScanner scanner;
|
||||
private HTable htable;
|
||||
private byte [][] trrInputColumns;
|
||||
|
||||
private long minStamp, maxStamp;
|
||||
|
||||
/**
|
||||
* Restart from survivable exceptions by creating a new scanner.
|
||||
*
|
||||
|
@ -101,28 +103,33 @@ implements InputFormat<ImmutableBytesWritable, Result> {
|
|||
* @throws IOException
|
||||
*/
|
||||
public void restart(byte[] firstRow) throws IOException {
|
||||
Scan scan = null;
|
||||
if ((endRow != null) && (endRow.length > 0)) {
|
||||
if (trrRowFilter != null) {
|
||||
final Set<RowFilterInterface> rowFiltersSet =
|
||||
new HashSet<RowFilterInterface>();
|
||||
rowFiltersSet.add(new WhileMatchRowFilter(new StopRowFilter(endRow)));
|
||||
rowFiltersSet.add(trrRowFilter);
|
||||
Scan scan = new Scan(startRow);
|
||||
scan.addColumns(trrInputColumns);
|
||||
scan = new Scan(startRow);
|
||||
// scan.setFilter(new RowFilterSet(RowFilterSet.Operator.MUST_PASS_ALL,
|
||||
// rowFiltersSet));
|
||||
this.scanner = this.htable.getScanner(scan);
|
||||
} else {
|
||||
Scan scan = new Scan(firstRow, endRow);
|
||||
scan.addColumns(trrInputColumns);
|
||||
this.scanner = this.htable.getScanner(scan);
|
||||
scan = new Scan(firstRow, endRow);
|
||||
|
||||
}
|
||||
} else {
|
||||
Scan scan = new Scan(firstRow);
|
||||
scan.addColumns(trrInputColumns);
|
||||
scan = new Scan(firstRow);
|
||||
// scan.setFilter(trrRowFilter);
|
||||
this.scanner = this.htable.getScanner(scan);
|
||||
}
|
||||
scan.addColumns(trrInputColumns);
|
||||
|
||||
if (minStamp !=0 && maxStamp !=0) { // if timestamps are ungiven.
|
||||
scan.setTimeRange(minStamp, maxStamp);
|
||||
scan.setMaxVersions();
|
||||
}
|
||||
|
||||
this.scanner = this.htable.getScanner(scan);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -147,6 +154,11 @@ implements InputFormat<ImmutableBytesWritable, Result> {
|
|||
public void setInputColumns(final byte [][] inputColumns) {
|
||||
this.trrInputColumns = inputColumns;
|
||||
}
|
||||
|
||||
public void setTimeRange(long minStamp, long maxStamp) {
|
||||
this.minStamp = minStamp;
|
||||
this.maxStamp = maxStamp;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param startRow the first row in the split
|
||||
|
@ -251,6 +263,7 @@ implements InputFormat<ImmutableBytesWritable, Result> {
|
|||
trr.setEndRow(tSplit.getEndRow());
|
||||
trr.setHTable(this.table);
|
||||
trr.setInputColumns(this.inputColumns);
|
||||
trr.setTimeRange(this.minStamp, this.maxStamp);
|
||||
trr.setRowFilter(this.rowFilter);
|
||||
trr.init();
|
||||
return trr;
|
||||
|
@ -310,6 +323,16 @@ implements InputFormat<ImmutableBytesWritable, Result> {
|
|||
this.inputColumns = inputColumns;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* @param minStamp
|
||||
* @param maxStam
|
||||
*/
|
||||
protected void setTimeRange(long minStamp, long maxStamp) {
|
||||
this.minStamp = minStamp;
|
||||
this.maxStamp = maxStamp;
|
||||
}
|
||||
|
||||
/**
|
||||
* Allows subclasses to get the {@link HTable}.
|
||||
*/
|
||||
|
@ -344,4 +367,5 @@ implements InputFormat<ImmutableBytesWritable, Result> {
|
|||
protected void setRowFilter(RowFilterInterface rowFilter) {
|
||||
this.rowFilter = rowFilter;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -51,13 +51,39 @@ public class TableMapReduceUtil {
|
|||
Class<? extends TableMap> mapper,
|
||||
Class<? extends WritableComparable> outputKeyClass,
|
||||
Class<? extends Writable> outputValueClass, JobConf job) {
|
||||
|
||||
|
||||
initTableMapJob(table, columns, mapper, outputKeyClass,
|
||||
outputValueClass, job, 0, 0);
|
||||
}
|
||||
|
||||
/**
|
||||
* Use this before submitting a TableMap job. It will
|
||||
* appropriately set up the JocConf.
|
||||
*
|
||||
* @param table The table name to read from.
|
||||
* @param columns The columns to scan.
|
||||
* @param mapper The mapper class to use.
|
||||
* @param outputKeyClass The class of the output key.
|
||||
* @param outputValueClass The class of the output value.
|
||||
* @param job The current job configuration to adjust.
|
||||
* @param minStamp the minimum timestamp, inclusive
|
||||
* @param maxStamp the maximum timestamp, exclusive
|
||||
*/
|
||||
public static void initTableMapJob(String table, String columns,
|
||||
Class<? extends TableMap> mapper,
|
||||
Class<? extends WritableComparable> outputKeyClass,
|
||||
Class<? extends Writable> outputValueClass, JobConf job,
|
||||
long minStamp,
|
||||
long maxStamp) {
|
||||
|
||||
job.setInputFormat(TableInputFormat.class);
|
||||
job.setMapOutputValueClass(outputValueClass);
|
||||
job.setMapOutputKeyClass(outputKeyClass);
|
||||
job.setMapperClass(mapper);
|
||||
FileInputFormat.addInputPaths(job, table);
|
||||
job.set(TableInputFormat.COLUMN_LIST, columns);
|
||||
job.setLong(TableInputFormat.TIME_RANGE_MIN, minStamp);
|
||||
job.setLong(TableInputFormat.TIME_RANGE_MAX, maxStamp);
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -1128,8 +1128,10 @@ public class HRegion implements HConstants { // , Writable{
|
|||
public void delete(byte [] family, List<KeyValue> kvs, boolean writeToWAL)
|
||||
throws IOException {
|
||||
long now = System.currentTimeMillis();
|
||||
byte [] byteNow = Bytes.toBytes(now);
|
||||
boolean flush = false;
|
||||
this.updatesLock.readLock().lock();
|
||||
|
||||
try {
|
||||
if (writeToWAL) {
|
||||
this.log.append(regionInfo.getRegionName(),
|
||||
|
@ -1158,7 +1160,10 @@ public class HRegion implements HConstants { // , Writable{
|
|||
KeyValue getkv = result.get(0);
|
||||
Bytes.putBytes(kv.getBuffer(), kv.getTimestampOffset(),
|
||||
getkv.getBuffer(), getkv.getTimestampOffset(), Bytes.SIZEOF_LONG);
|
||||
} else {
|
||||
kv.updateLatestStamp(byteNow);
|
||||
}
|
||||
|
||||
size = this.memcacheSize.addAndGet(store.delete(kv));
|
||||
}
|
||||
flush = isFlushSize(size);
|
||||
|
|
|
@ -29,6 +29,7 @@ import java.util.NavigableMap;
|
|||
import java.util.SortedMap;
|
||||
|
||||
import junit.framework.TestCase;
|
||||
import junit.framework.AssertionFailedError;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
@ -658,4 +659,13 @@ public abstract class HBaseTestCase extends TestCase {
|
|||
root.getLog().closeAndDelete();
|
||||
}
|
||||
}
|
||||
|
||||
public void assertByteEquals(byte[] expected,
|
||||
byte[] actual) {
|
||||
if (Bytes.compareTo(expected, actual) != 0) {
|
||||
throw new AssertionFailedError("expected:<" +
|
||||
Bytes.toString(expected) + "> but was:<" +
|
||||
Bytes.toString(actual) + ">");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -23,6 +23,7 @@ import java.io.IOException;
|
|||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.TreeMap;
|
||||
import java.util.Iterator;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
@ -57,7 +58,14 @@ public class TestHRegion extends HBaseTestCase {
|
|||
private final String DIR = "test/build/data/TestHRegion/";
|
||||
|
||||
private final int MAX_VERSIONS = 2;
|
||||
|
||||
|
||||
// Test names
|
||||
private final byte[] tableName = Bytes.toBytes("testtable");;
|
||||
private final byte[] qual1 = Bytes.toBytes("qual1");
|
||||
private final byte[] value1 = Bytes.toBytes("value1");
|
||||
private final byte[] value2 = Bytes.toBytes("value2");
|
||||
private final byte [] row = Bytes.toBytes("rowA");
|
||||
|
||||
/**
|
||||
* @see org.apache.hadoop.hbase.HBaseTestCase#setUp()
|
||||
*/
|
||||
|
@ -325,13 +333,68 @@ public class TestHRegion extends HBaseTestCase {
|
|||
assertTrue(Bytes.equals(rowB, results.get(0).getRow()));
|
||||
|
||||
}
|
||||
|
||||
public void testDeleteColumns_PostInsert() throws IOException,
|
||||
InterruptedException {
|
||||
Delete delete = new Delete(row);
|
||||
delete.deleteColumns(fam1, qual1);
|
||||
doTestDelete_AndPostInsert(delete);
|
||||
}
|
||||
|
||||
public void testDeleteFamily_PostInsert() throws IOException, InterruptedException {
|
||||
Delete delete = new Delete(row);
|
||||
delete.deleteFamily(fam1);
|
||||
doTestDelete_AndPostInsert(delete);
|
||||
}
|
||||
|
||||
public void doTestDelete_AndPostInsert(Delete delete)
|
||||
throws IOException, InterruptedException {
|
||||
initHRegion(tableName, getName(), fam1);
|
||||
Put put = new Put(row);
|
||||
put.add(fam1, qual1, value1);
|
||||
region.put(put);
|
||||
|
||||
Thread.sleep(10);
|
||||
|
||||
// now delete the value:
|
||||
region.delete(delete, null, true);
|
||||
|
||||
Thread.sleep(10);
|
||||
|
||||
// ok put data:
|
||||
put = new Put(row);
|
||||
put.add(fam1, qual1, value2);
|
||||
region.put(put);
|
||||
|
||||
// ok get:
|
||||
Get get = new Get(row);
|
||||
get.addColumn(fam1, qual1);
|
||||
|
||||
Result r = region.get(get, null);
|
||||
assertEquals(1, r.size());
|
||||
assertByteEquals(value2, r.getValue(fam1, qual1));
|
||||
|
||||
// next:
|
||||
Scan scan = new Scan(row);
|
||||
scan.addColumn(fam1, qual1);
|
||||
InternalScanner s = region.getScanner(scan);
|
||||
|
||||
List<KeyValue> results = new ArrayList<KeyValue>();
|
||||
assertEquals(false, s.next(results));
|
||||
assertEquals(1, results.size());
|
||||
KeyValue kv = results.get(0);
|
||||
|
||||
assertByteEquals(value2, kv.getValue());
|
||||
assertByteEquals(fam1, kv.getFamily());
|
||||
assertByteEquals(qual1, kv.getQualifier());
|
||||
assertByteEquals(row, kv.getRow());
|
||||
}
|
||||
|
||||
|
||||
|
||||
//Visual test, since the method doesn't return anything
|
||||
public void testDelete_CheckTimestampUpdated()
|
||||
throws IOException {
|
||||
byte [] tableName = Bytes.toBytes("testtable");
|
||||
byte [] row1 = Bytes.toBytes("row1");
|
||||
byte [] fam1 = Bytes.toBytes("fam1");
|
||||
byte [] col1 = Bytes.toBytes("col1");
|
||||
byte [] col2 = Bytes.toBytes("col2");
|
||||
byte [] col3 = Bytes.toBytes("col3");
|
||||
|
@ -345,8 +408,19 @@ public class TestHRegion extends HBaseTestCase {
|
|||
kvs.add(new KeyValue(row1, fam1, col1, null));
|
||||
kvs.add(new KeyValue(row1, fam1, col2, null));
|
||||
kvs.add(new KeyValue(row1, fam1, col3, null));
|
||||
|
||||
|
||||
region.delete(fam1, kvs, true);
|
||||
|
||||
// extract the key values out the memcache:
|
||||
// This is kinda hacky, but better than nothing...
|
||||
long now = System.currentTimeMillis();
|
||||
KeyValue firstKv = region.getStore(fam1).memcache.memcache.first();
|
||||
assertTrue(firstKv.getTimestamp() <= now);
|
||||
now = firstKv.getTimestamp();
|
||||
for (KeyValue kv : region.getStore(fam1).memcache.memcache) {
|
||||
assertTrue(kv.getTimestamp() <= now);
|
||||
now = kv.getTimestamp();
|
||||
}
|
||||
}
|
||||
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
|
@ -1054,15 +1128,14 @@ public class TestHRegion extends HBaseTestCase {
|
|||
byte [] qf1 = Bytes.toBytes("qualifier1");
|
||||
byte [] qf2 = Bytes.toBytes("qualifier2");
|
||||
byte [] fam1 = Bytes.toBytes("fam1");
|
||||
byte [][] families = {fam1};
|
||||
|
||||
|
||||
long ts1 = 1; //System.currentTimeMillis();
|
||||
long ts2 = ts1 + 1;
|
||||
long ts3 = ts1 + 2;
|
||||
|
||||
//Setting up region
|
||||
String method = this.getName();
|
||||
initHRegion(tableName, method, families);
|
||||
initHRegion(tableName, method, fam1);
|
||||
|
||||
//Putting data in Region
|
||||
Put put = null;
|
||||
|
@ -1104,6 +1177,56 @@ public class TestHRegion extends HBaseTestCase {
|
|||
for(int i=0; i<expected.size(); i++) {
|
||||
assertEquals(expected.get(i), actual.get(i));
|
||||
}
|
||||
}
|
||||
|
||||
public void testScanner_StopRow1542() throws IOException {
|
||||
byte [] tableName = Bytes.toBytes("test_table");
|
||||
byte [] family = Bytes.toBytes("testFamily");
|
||||
initHRegion(tableName, getName(), family);
|
||||
|
||||
byte [] row1 = Bytes.toBytes("row111");
|
||||
byte [] row2 = Bytes.toBytes("row222");
|
||||
byte [] row3 = Bytes.toBytes("row333");
|
||||
byte [] row4 = Bytes.toBytes("row444");
|
||||
byte [] row5 = Bytes.toBytes("row555");
|
||||
|
||||
byte [] col1 = Bytes.toBytes("Pub111");
|
||||
byte [] col2 = Bytes.toBytes("Pub222");
|
||||
|
||||
|
||||
|
||||
Put put = new Put(row1);
|
||||
put.add(family, col1, Bytes.toBytes(10L));
|
||||
region.put(put);
|
||||
|
||||
put = new Put(row2);
|
||||
put.add(family, col1, Bytes.toBytes(15L));
|
||||
region.put(put);
|
||||
|
||||
put = new Put(row3);
|
||||
put.add(family, col2, Bytes.toBytes(20L));
|
||||
region.put(put);
|
||||
|
||||
put = new Put(row4);
|
||||
put.add(family, col2, Bytes.toBytes(30L));
|
||||
region.put(put);
|
||||
|
||||
put = new Put(row5);
|
||||
put.add(family, col1, Bytes.toBytes(40L));
|
||||
region.put(put);
|
||||
|
||||
Scan scan = new Scan(row3, row4);
|
||||
scan.setMaxVersions();
|
||||
scan.addColumn(family, col1);
|
||||
InternalScanner s = region.getScanner(scan);
|
||||
|
||||
List<KeyValue> results = new ArrayList<KeyValue>();
|
||||
assertEquals(false, s.next(results));
|
||||
assertEquals(0, results.size());
|
||||
|
||||
|
||||
|
||||
|
||||
}
|
||||
|
||||
public void testScanner_Wildcard_FromMemcacheAndFiles_EnforceVersions()
|
||||
|
@ -1111,9 +1234,8 @@ public class TestHRegion extends HBaseTestCase {
|
|||
byte [] tableName = Bytes.toBytes("testtable");
|
||||
byte [] row1 = Bytes.toBytes("row1");
|
||||
byte [] fam1 = Bytes.toBytes("fam1");
|
||||
byte [][] families = {fam1};
|
||||
byte [] qf1 = Bytes.toBytes("qualifier1");
|
||||
byte [] qf2 = Bytes.toBytes("qualifier2");
|
||||
byte [] qf2 = Bytes.toBytes("quateslifier2");
|
||||
|
||||
long ts1 = 1;
|
||||
long ts2 = ts1 + 1;
|
||||
|
@ -1122,7 +1244,7 @@ public class TestHRegion extends HBaseTestCase {
|
|||
|
||||
//Setting up region
|
||||
String method = this.getName();
|
||||
initHRegion(tableName, method, families);
|
||||
initHRegion(tableName, method, fam1);
|
||||
|
||||
//Putting data in Region
|
||||
KeyValue kv14 = new KeyValue(row1, fam1, qf1, ts4, KeyValue.Type.Put, null);
|
||||
|
|
Loading…
Reference in New Issue