HBASE-10560 Per cell TTLs

This commit is contained in:
Andrew Purtell 2014-12-05 11:00:19 -08:00
parent 1bd27bfa24
commit 004e977ba0
21 changed files with 792 additions and 102 deletions

View File

@ -183,4 +183,9 @@ public class Append extends Mutation {
public Append setACL(Map<String, Permission> perms) {
return (Append) super.setACL(perms);
}
@Override
public Append setTTL(long ttl) {
return (Append) super.setTTL(ttl);
}
}

View File

@ -473,4 +473,9 @@ public class Delete extends Mutation implements Comparable<Row> {
public Delete setACL(Map<String, Permission> perms) {
return (Delete) super.setACL(perms);
}
@Override
public Delete setTTL(long ttl) {
throw new UnsupportedOperationException("Setting TTLs on Deletes is not supported");
}
}

View File

@ -329,4 +329,9 @@ public class Increment extends Mutation implements Comparable<Row> {
public Increment setACL(Map<String, Permission> perms) {
return (Increment) super.setACL(perms);
}
@Override
public Increment setTTL(long ttl) {
return (Increment) super.setTTL(ttl);
}
}

View File

@ -78,6 +78,11 @@ public abstract class Mutation extends OperationWithAttributes implements Row, C
*/
private static final String CONSUMED_CLUSTER_IDS = "_cs.id";
/**
* The attribute for storing TTL for the result of the mutation.
*/
private static final String OP_ATTRIBUTE_TTL = "_ttl";
protected byte [] row = null;
protected long ts = HConstants.LATEST_TIMESTAMP;
protected Durability durability = Durability.USE_DEFAULT;
@ -201,6 +206,12 @@ public abstract class Mutation extends OperationWithAttributes implements Row, C
if (getId() != null) {
map.put("id", getId());
}
// Add the TTL if set
// Long.MAX_VALUE is the default, and is interpreted to mean this attribute
// has not been set.
if (getTTL() != Long.MAX_VALUE) {
map.put("ttl", getTTL());
}
return map;
}
@ -473,6 +484,29 @@ public abstract class Mutation extends OperationWithAttributes implements Row, C
return this;
}
/**
* Return the TTL requested for the result of the mutation, in milliseconds.
* @return the TTL requested for the result of the mutation, in milliseconds,
* or Long.MAX_VALUE if unset
*/
public long getTTL() {
byte[] ttlBytes = getAttribute(OP_ATTRIBUTE_TTL);
if (ttlBytes != null) {
return Bytes.toLong(ttlBytes);
}
return Long.MAX_VALUE;
}
/**
* Set the TTL desired for the result of the mutation, in milliseconds.
* @param ttl the TTL desired for the result of the mutation, in milliseconds
* @return this
*/
public Mutation setTTL(long ttl) {
setAttribute(OP_ATTRIBUTE_TTL, Bytes.toBytes(ttl));
return this;
}
/**
* Subclasses should override this method to add the heap size of their own fields.
* @return the heap size to add (will be aligned).

View File

@ -472,4 +472,9 @@ public class Put extends Mutation implements HeapSize, Comparable<Row> {
public Put setACL(Map<String, Permission> perms) {
return (Put) super.setACL(perms);
}
@Override
public Put setTTL(long ttl) {
return (Put) super.setTTL(ttl);
}
}

View File

@ -30,4 +30,5 @@ public final class TagType {
public static final byte VISIBILITY_EXP_SERIALIZATION_FORMAT_TAG_TYPE = (byte)4;
// String based tag type used in replication
public static final byte STRING_VIS_TAG_TYPE = (byte) 7;
public static final byte TTL_TAG_TYPE = (byte)8;
}

View File

@ -69,7 +69,7 @@ public class CellCreator {
byte[] qualifier, int qoffset, int qlength, long timestamp, byte[] value, int voffset,
int vlength) throws IOException {
return create(row, roffset, rlength, family, foffset, flength, qualifier, qoffset, qlength,
timestamp, value, voffset, vlength, null);
timestamp, value, voffset, vlength, (List<Tag>)null);
}
/**
@ -90,6 +90,7 @@ public class CellCreator {
* @return created Cell
* @throws IOException
*/
@Deprecated
public Cell create(byte[] row, int roffset, int rlength, byte[] family, int foffset, int flength,
byte[] qualifier, int qoffset, int qlength, long timestamp, byte[] value, int voffset,
int vlength, String visExpression) throws IOException {
@ -100,4 +101,36 @@ public class CellCreator {
return new KeyValue(row, roffset, rlength, family, foffset, flength, qualifier, qoffset,
qlength, timestamp, KeyValue.Type.Put, value, voffset, vlength, visTags);
}
/**
* @param row row key
* @param roffset row offset
* @param rlength row length
* @param family family name
* @param foffset family offset
* @param flength family length
* @param qualifier column qualifier
* @param qoffset qualifier offset
* @param qlength qualifier length
* @param timestamp version timestamp
* @param value column value
* @param voffset value offset
* @param vlength value length
* @param tags
* @return created Cell
* @throws IOException
*/
public Cell create(byte[] row, int roffset, int rlength, byte[] family, int foffset, int flength,
byte[] qualifier, int qoffset, int qlength, long timestamp, byte[] value, int voffset,
int vlength, List<Tag> tags) throws IOException {
return new KeyValue(row, roffset, rlength, family, foffset, flength, qualifier, qoffset,
qlength, timestamp, KeyValue.Type.Put, value, voffset, vlength, tags);
}
/**
* @return Visibility expression resolver
*/
public VisibilityExpressionResolver getVisibilityExpressionResolver() {
return this.visExpResolver;
}
}

View File

@ -125,13 +125,20 @@ public class ImportTsv extends Configured implements Tool {
public static final String CELL_VISIBILITY_COLUMN_SPEC = "HBASE_CELL_VISIBILITY";
public static final String CELL_TTL_COLUMN_SPEC = "HBASE_CELL_TTL";
private int attrKeyColumnIndex = DEFAULT_ATTRIBUTES_COLUMN_INDEX;
public static final int DEFAULT_ATTRIBUTES_COLUMN_INDEX = -1;
public static final int DEFAULT_CELL_VISIBILITY_COLUMN_INDEX = -1;
public static final int DEFAULT_CELL_TTL_COLUMN_INDEX = -1;
private int cellVisibilityColumnIndex = DEFAULT_CELL_VISIBILITY_COLUMN_INDEX;
private int cellTTLColumnIndex = DEFAULT_CELL_TTL_COLUMN_INDEX;
/**
* @param columnsSpecification the list of columns to parser out, comma separated.
* The row key should be the special token TsvParser.ROWKEY_COLUMN_SPEC
@ -162,14 +169,18 @@ public class ImportTsv extends Configured implements Tool {
timestampKeyColumnIndex = i;
continue;
}
if(ATTRIBUTES_COLUMN_SPEC.equals(str)) {
if (ATTRIBUTES_COLUMN_SPEC.equals(str)) {
attrKeyColumnIndex = i;
continue;
}
if(CELL_VISIBILITY_COLUMN_SPEC.equals(str)) {
if (CELL_VISIBILITY_COLUMN_SPEC.equals(str)) {
cellVisibilityColumnIndex = i;
continue;
}
if (CELL_TTL_COLUMN_SPEC.equals(str)) {
cellTTLColumnIndex = i;
continue;
}
String[] parts = str.split(":", 2);
if (parts.length == 1) {
families[i] = str.getBytes();
@ -197,6 +208,10 @@ public class ImportTsv extends Configured implements Tool {
return cellVisibilityColumnIndex != DEFAULT_CELL_VISIBILITY_COLUMN_INDEX;
}
public boolean hasCellTTL() {
return cellTTLColumnIndex != DEFAULT_CELL_VISIBILITY_COLUMN_INDEX;
}
public int getAttributesKeyColumnIndex() {
return attrKeyColumnIndex;
}
@ -204,9 +219,15 @@ public class ImportTsv extends Configured implements Tool {
public int getCellVisibilityColumnIndex() {
return cellVisibilityColumnIndex;
}
public int getCellTTLColumnIndex() {
return cellTTLColumnIndex;
}
public int getRowKeyColumnIndex() {
return rowKeyColumnIndex;
}
public byte[] getFamily(int idx) {
return families[idx];
}
@ -238,8 +259,10 @@ public class ImportTsv extends Configured implements Tool {
throw new BadTsvLineException("No timestamp");
} else if (hasAttributes() && tabOffsets.size() <= getAttributesKeyColumnIndex()) {
throw new BadTsvLineException("No attributes specified");
} else if(hasCellVisibility() && tabOffsets.size() <= getCellVisibilityColumnIndex()) {
} else if (hasCellVisibility() && tabOffsets.size() <= getCellVisibilityColumnIndex()) {
throw new BadTsvLineException("No cell visibility specified");
} else if (hasCellTTL() && tabOffsets.size() <= getCellTTLColumnIndex()) {
throw new BadTsvLineException("No cell TTL specified");
}
return new ParsedLine(tabOffsets, lineBytes);
}
@ -336,6 +359,31 @@ public class ImportTsv extends Configured implements Tool {
}
}
public int getCellTTLColumnOffset() {
if (hasCellTTL()) {
return getColumnOffset(cellTTLColumnIndex);
} else {
return DEFAULT_CELL_TTL_COLUMN_INDEX;
}
}
public int getCellTTLColumnLength() {
if (hasCellTTL()) {
return getColumnLength(cellTTLColumnIndex);
} else {
return DEFAULT_CELL_TTL_COLUMN_INDEX;
}
}
public long getCellTTL() {
if (!hasCellTTL()) {
return 0;
} else {
return Bytes.toLong(lineBytes, getColumnOffset(cellTTLColumnIndex),
getColumnLength(cellTTLColumnIndex));
}
}
public int getColumnOffset(int idx) {
if (idx > 0)
return tabOffsets.get(idx - 1) + 1;
@ -535,6 +583,7 @@ public class ImportTsv extends Configured implements Tool {
if (TsvParser.ROWKEY_COLUMN_SPEC.equals(aColumn)
|| TsvParser.TIMESTAMPKEY_COLUMN_SPEC.equals(aColumn)
|| TsvParser.CELL_VISIBILITY_COLUMN_SPEC.equals(aColumn)
|| TsvParser.CELL_TTL_COLUMN_SPEC.equals(aColumn)
|| TsvParser.ATTRIBUTES_COLUMN_SPEC.equals(aColumn))
continue;
// we are only concerned with the first one (in case this is a cf:cq)

View File

@ -18,7 +18,9 @@
package org.apache.hadoop.hbase.mapreduce;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.TreeSet;
@ -28,8 +30,11 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValueUtil;
import org.apache.hadoop.hbase.Tag;
import org.apache.hadoop.hbase.TagType;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.util.Base64;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.Reducer;
@ -62,6 +67,9 @@ public class TextSortReducer extends
/** Cell visibility expr **/
private String cellVisibilityExpr;
/** Cell TTL */
private long ttl;
private CellCreator kvCreator;
public long getTs() {
@ -148,18 +156,30 @@ public class TextSortReducer extends
// Retrieve timestamp if exists
ts = parsed.getTimestamp(ts);
cellVisibilityExpr = parsed.getCellVisibility();
ttl = parsed.getCellTTL();
for (int i = 0; i < parsed.getColumnCount(); i++) {
if (i == parser.getRowKeyColumnIndex() || i == parser.getTimestampKeyColumnIndex()
|| i == parser.getAttributesKeyColumnIndex() || i == parser.getCellVisibilityColumnIndex()) {
|| i == parser.getAttributesKeyColumnIndex() || i == parser.getCellVisibilityColumnIndex()
|| i == parser.getCellTTLColumnIndex()) {
continue;
}
// Creating the KV which needs to be directly written to HFiles. Using the Facade
// KVCreator for creation of kvs.
List<Tag> tags = new ArrayList<Tag>();
if (cellVisibilityExpr != null) {
tags.addAll(kvCreator.getVisibilityExpressionResolver()
.createVisibilityExpTags(cellVisibilityExpr));
}
// Add TTL directly to the KV so we can vary them when packing more than one KV
// into puts
if (ttl > 0) {
tags.add(new Tag(TagType.TTL_TAG_TYPE, Bytes.toBytes(ttl)));
}
Cell cell = this.kvCreator.create(lineBytes, parsed.getRowKeyOffset(),
parsed.getRowKeyLength(), parser.getFamily(i), 0, parser.getFamily(i).length,
parser.getQualifier(i), 0, parser.getQualifier(i).length, ts, lineBytes,
parsed.getColumnOffset(i), parsed.getColumnLength(i), cellVisibilityExpr);
parsed.getColumnOffset(i), parsed.getColumnLength(i), tags);
KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
kvs.add(kv);
curSize += kv.heapSize();

View File

@ -18,17 +18,22 @@
package org.apache.hadoop.hbase.mapreduce;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.Tag;
import org.apache.hadoop.hbase.TagType;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.ImportTsv.TsvParser.BadTsvLineException;
import org.apache.hadoop.hbase.security.visibility.CellVisibility;
import org.apache.hadoop.hbase.util.Base64;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Counter;
@ -59,6 +64,8 @@ extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put>
protected String cellVisibilityExpr;
protected long ttl;
protected CellCreator kvCreator;
private String hfileOutPath;
@ -144,11 +151,13 @@ extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put>
// Retrieve timestamp if exists
ts = parsed.getTimestamp(ts);
cellVisibilityExpr = parsed.getCellVisibility();
ttl = parsed.getCellTTL();
Put put = new Put(rowKey.copyBytes());
for (int i = 0; i < parsed.getColumnCount(); i++) {
if (i == parser.getRowKeyColumnIndex() || i == parser.getTimestampKeyColumnIndex()
|| i == parser.getAttributesKeyColumnIndex() || i == parser.getCellVisibilityColumnIndex()) {
|| i == parser.getAttributesKeyColumnIndex() || i == parser.getCellVisibilityColumnIndex()
|| i == parser.getCellTTLColumnIndex()) {
continue;
}
populatePut(lineBytes, parsed, put, i);
@ -192,13 +201,26 @@ extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put>
// the validation
put.setCellVisibility(new CellVisibility(cellVisibilityExpr));
}
if (ttl > 0) {
put.setTTL(ttl);
}
} else {
// Creating the KV which needs to be directly written to HFiles. Using the Facade
// KVCreator for creation of kvs.
List<Tag> tags = new ArrayList<Tag>();
if (cellVisibilityExpr != null) {
tags.addAll(kvCreator.getVisibilityExpressionResolver()
.createVisibilityExpTags(cellVisibilityExpr));
}
// Add TTL directly to the KV so we can vary them when packing more than one KV
// into puts
if (ttl > 0) {
tags.add(new Tag(TagType.TTL_TAG_TYPE, Bytes.toBytes(ttl)));
}
cell = this.kvCreator.create(lineBytes, parsed.getRowKeyOffset(), parsed.getRowKeyLength(),
parser.getFamily(i), 0, parser.getFamily(i).length, parser.getQualifier(i), 0,
parser.getQualifier(i).length, ts, lineBytes, parsed.getColumnOffset(i),
parsed.getColumnLength(i), cellVisibilityExpr);
parsed.getColumnLength(i), tags);
}
put.add(cell);
}

View File

@ -34,7 +34,7 @@ import org.apache.hadoop.hbase.util.Bytes;
/**
* State and utility processing {@link HRegion#getClosestRowBefore(byte[], byte[])}.
* Like {@link ScanDeleteTracker} and {@link ScanDeleteTracker} but does not
* Like {@link ScanQueryMatcher} and {@link ScanDeleteTracker} but does not
* implement the {@link DeleteTracker} interface since state spans rows (There
* is no update nor reset method).
*/
@ -42,7 +42,8 @@ import org.apache.hadoop.hbase.util.Bytes;
class GetClosestRowBeforeTracker {
private final KeyValue targetkey;
// Any cell w/ a ts older than this is expired.
private final long oldestts;
private final long now;
private final long oldestUnexpiredTs;
private Cell candidate = null;
private final KVComparator kvcomparator;
// Flag for whether we're doing getclosest on a metaregion.
@ -75,20 +76,13 @@ class GetClosestRowBeforeTracker {
HConstants.DELIMITER) - this.rowoffset;
}
this.tablenamePlusDelimiterLength = metaregion? l + 1: -1;
this.oldestts = System.currentTimeMillis() - ttl;
this.now = System.currentTimeMillis();
this.oldestUnexpiredTs = now - ttl;
this.kvcomparator = c;
KeyValue.RowOnlyComparator rc = new KeyValue.RowOnlyComparator(this.kvcomparator);
this.deletes = new TreeMap<KeyValue, NavigableSet<KeyValue>>(rc);
}
/**
* @param kv
* @return True if this <code>kv</code> is expired.
*/
boolean isExpired(final Cell kv) {
return HStore.isExpired(kv, this.oldestts);
}
/*
* Add the specified KeyValue to the list of deletes.
* @param kv
@ -173,6 +167,15 @@ class GetClosestRowBeforeTracker {
return false;
}
/**
* @param cell
* @return true if the cell is expired
*/
public boolean isExpired(final Cell cell) {
return cell.getTimestamp() < this.oldestUnexpiredTs ||
HStore.isCellTTLExpired(cell, this.oldestUnexpiredTs, this.now);
}
/*
* Handle keys whose values hold deletes.
* Add to the set of deletes and then if the candidate keys contain any that

View File

@ -85,6 +85,8 @@ import org.apache.hadoop.hbase.KeyValueUtil;
import org.apache.hadoop.hbase.NotServingRegionException;
import org.apache.hadoop.hbase.RegionTooBusyException;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.Tag;
import org.apache.hadoop.hbase.TagType;
import org.apache.hadoop.hbase.UnknownScannerException;
import org.apache.hadoop.hbase.backup.HFileArchiver;
import org.apache.hadoop.hbase.client.Append;
@ -2644,6 +2646,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
prepareDeleteTimestamps(mutation, familyMaps[i], byteNow);
noOfDeletes++;
}
rewriteCellTags(familyMaps[i], mutation);
}
lock(this.updatesLock.readLock(), numReadyToWrite);
@ -3118,6 +3121,59 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
}
}
/**
* Possibly rewrite incoming cell tags.
*/
void rewriteCellTags(Map<byte[], List<Cell>> familyMap, final Mutation m) {
// Check if we have any work to do and early out otherwise
// Update these checks as more logic is added here
if (m.getTTL() == Long.MAX_VALUE) {
return;
}
// From this point we know we have some work to do
for (Map.Entry<byte[], List<Cell>> e: familyMap.entrySet()) {
List<Cell> cells = e.getValue();
assert cells instanceof RandomAccess;
int listSize = cells.size();
for (int i = 0; i < listSize; i++) {
Cell cell = cells.get(i);
List<Tag> newTags = new ArrayList<Tag>();
Iterator<Tag> tagIterator = CellUtil.tagsIterator(cell.getTagsArray(),
cell.getTagsOffset(), cell.getTagsLength());
// Carry forward existing tags
while (tagIterator.hasNext()) {
// Add any filters or tag specific rewrites here
newTags.add(tagIterator.next());
}
// Cell TTL handling
// Check again if we need to add a cell TTL because early out logic
// above may change when there are more tag based features in core.
if (m.getTTL() != Long.MAX_VALUE) {
// Add a cell TTL tag
newTags.add(new Tag(TagType.TTL_TAG_TYPE, Bytes.toBytes(m.getTTL())));
}
// Rewrite the cell with the updated set of tags
cells.set(i, new KeyValue(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength(),
cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength(),
cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength(),
cell.getTimestamp(), KeyValue.Type.codeToType(cell.getTypeByte()),
cell.getValueArray(), cell.getValueOffset(), cell.getValueLength(),
newTags));
}
}
}
/*
* Check if resources to support an update.
*
@ -5215,6 +5271,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
processor.preBatchMutate(this, walEdit);
// 7. Apply to memstore
for (Mutation m : mutations) {
// Handle any tag based cell features
rewriteCellTags(m.getFamilyCellMap(), m);
for (CellScanner cellScanner = m.cellScanner(); cellScanner.advance();) {
Cell cell = cellScanner.current();
CellUtil.setSequenceId(cell, mvccNum);
@ -5353,8 +5412,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
return append(append, HConstants.NO_NONCE, HConstants.NO_NONCE);
}
// TODO: There's a lot of boiler plate code identical
// to increment... See how to better unify that.
// TODO: There's a lot of boiler plate code identical to increment.
// We should refactor append and increment as local get-mutate-put
// transactions, so all stores only go through one code path for puts.
/**
* Perform one or more append operations on a row.
*
@ -5425,8 +5485,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
// Iterate the input columns and update existing values if they were
// found, otherwise add new column initialized to the append value
// Avoid as much copying as possible. Every byte is copied at most
// once.
// Avoid as much copying as possible. We may need to rewrite and
// consolidate tags. Bytes are only copied once.
// Would be nice if KeyValue had scatter/gather logic
int idx = 0;
for (Cell cell : family.getValue()) {
@ -5436,40 +5496,87 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
&& CellUtil.matchingQualifier(results.get(idx), cell)) {
oldCell = results.get(idx);
long ts = Math.max(now, oldCell.getTimestamp());
// allocate an empty kv once
// Process cell tags
List<Tag> newTags = new ArrayList<Tag>();
// Make a union of the set of tags in the old and new KVs
if (oldCell.getTagsLength() > 0) {
Iterator<Tag> i = CellUtil.tagsIterator(oldCell.getTagsArray(),
oldCell.getTagsOffset(), oldCell.getTagsLength());
while (i.hasNext()) {
newTags.add(i.next());
}
}
if (cell.getTagsLength() > 0) {
Iterator<Tag> i = CellUtil.tagsIterator(cell.getTagsArray(), cell.getTagsOffset(),
cell.getTagsLength());
while (i.hasNext()) {
newTags.add(i.next());
}
}
// Cell TTL handling
if (append.getTTL() != Long.MAX_VALUE) {
// Add the new TTL tag
newTags.add(new Tag(TagType.TTL_TAG_TYPE, Bytes.toBytes(append.getTTL())));
}
// Rebuild tags
byte[] tagBytes = Tag.fromList(newTags);
// allocate an empty cell once
newCell = new KeyValue(row.length, cell.getFamilyLength(),
cell.getQualifierLength(), ts, KeyValue.Type.Put,
oldCell.getValueLength() + cell.getValueLength(),
oldCell.getTagsLength() + cell.getTagsLength());
// copy in the value
System.arraycopy(oldCell.getValueArray(), oldCell.getValueOffset(),
newCell.getValueArray(), newCell.getValueOffset(),
oldCell.getValueLength());
System.arraycopy(cell.getValueArray(), cell.getValueOffset(),
newCell.getValueArray(),
newCell.getValueOffset() + oldCell.getValueLength(),
cell.getValueLength());
// copy in the tags
System.arraycopy(oldCell.getTagsArray(), oldCell.getTagsOffset(),
newCell.getTagsArray(), newCell.getTagsOffset(), oldCell.getTagsLength());
System.arraycopy(cell.getTagsArray(), cell.getTagsOffset(), newCell.getTagsArray(),
newCell.getTagsOffset() + oldCell.getTagsLength(), cell.getTagsLength());
tagBytes.length);
// copy in row, family, and qualifier
System.arraycopy(cell.getRowArray(), cell.getRowOffset(),
newCell.getRowArray(), newCell.getRowOffset(), cell.getRowLength());
newCell.getRowArray(), newCell.getRowOffset(), cell.getRowLength());
System.arraycopy(cell.getFamilyArray(), cell.getFamilyOffset(),
newCell.getFamilyArray(), newCell.getFamilyOffset(),
cell.getFamilyLength());
newCell.getFamilyArray(), newCell.getFamilyOffset(),
cell.getFamilyLength());
System.arraycopy(cell.getQualifierArray(), cell.getQualifierOffset(),
newCell.getQualifierArray(), newCell.getQualifierOffset(),
cell.getQualifierLength());
newCell.getQualifierArray(), newCell.getQualifierOffset(),
cell.getQualifierLength());
// copy in the value
System.arraycopy(oldCell.getValueArray(), oldCell.getValueOffset(),
newCell.getValueArray(), newCell.getValueOffset(),
oldCell.getValueLength());
System.arraycopy(cell.getValueArray(), cell.getValueOffset(),
newCell.getValueArray(),
newCell.getValueOffset() + oldCell.getValueLength(),
cell.getValueLength());
// Copy in tag data
System.arraycopy(tagBytes, 0, newCell.getTagsArray(), newCell.getTagsOffset(),
tagBytes.length);
idx++;
} else {
// Append's KeyValue.Type==Put and ts==HConstants.LATEST_TIMESTAMP,
// so only need to update the timestamp to 'now'
// Append's KeyValue.Type==Put and ts==HConstants.LATEST_TIMESTAMP
CellUtil.updateLatestStamp(cell, now);
newCell = cell;
}
// Cell TTL handling
if (append.getTTL() != Long.MAX_VALUE) {
List<Tag> newTags = new ArrayList<Tag>(1);
newTags.add(new Tag(TagType.TTL_TAG_TYPE, Bytes.toBytes(append.getTTL())));
// Add the new TTL tag
newCell = new KeyValue(cell.getRowArray(), cell.getRowOffset(),
cell.getRowLength(),
cell.getFamilyArray(), cell.getFamilyOffset(),
cell.getFamilyLength(),
cell.getQualifierArray(), cell.getQualifierOffset(),
cell.getQualifierLength(),
cell.getTimestamp(), KeyValue.Type.codeToType(cell.getTypeByte()),
cell.getValueArray(), cell.getValueOffset(), cell.getValueLength(),
newTags);
} else {
newCell = cell;
}
}
CellUtil.setSequenceId(newCell, mvccNum);
// Give coprocessors a chance to update the new cell
if (coprocessorHost != null) {
@ -5573,6 +5680,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
return increment(increment, HConstants.NO_NONCE, HConstants.NO_NONCE);
}
// TODO: There's a lot of boiler plate code identical to append.
// We should refactor append and increment as local get-mutate-put
// transactions, so all stores only go through one code path for puts.
/**
* Perform one or more increment operations on a row.
* @return new keyvalues after increment
@ -5645,13 +5755,23 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
// Iterate the input columns and update existing values if they were
// found, otherwise add new column initialized to the increment amount
int idx = 0;
for (Cell kv: family.getValue()) {
long amount = Bytes.toLong(CellUtil.cloneValue(kv));
for (Cell cell: family.getValue()) {
long amount = Bytes.toLong(CellUtil.cloneValue(cell));
boolean noWriteBack = (amount == 0);
List<Tag> newTags = new ArrayList<Tag>();
// Carry forward any tags that might have been added by a coprocessor
if (cell.getTagsLength() > 0) {
Iterator<Tag> i = CellUtil.tagsIterator(cell.getTagsArray(),
cell.getTagsOffset(), cell.getTagsLength());
while (i.hasNext()) {
newTags.add(i.next());
}
}
Cell c = null;
long ts = now;
if (idx < results.size() && CellUtil.matchingQualifier(results.get(idx), kv)) {
if (idx < results.size() && CellUtil.matchingQualifier(results.get(idx), cell)) {
c = results.get(idx);
ts = Math.max(now, c.getTimestamp());
if(c.getValueLength() == Bytes.SIZEOF_LONG) {
@ -5661,32 +5781,36 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
throw new org.apache.hadoop.hbase.DoNotRetryIOException(
"Attempted to increment field that isn't 64 bits wide");
}
// Carry tags forward from previous version
if (c.getTagsLength() > 0) {
Iterator<Tag> i = CellUtil.tagsIterator(c.getTagsArray(),
c.getTagsOffset(), c.getTagsLength());
while (i.hasNext()) {
newTags.add(i.next());
}
}
idx++;
}
// Append new incremented KeyValue to list
byte[] q = CellUtil.cloneQualifier(kv);
byte[] q = CellUtil.cloneQualifier(cell);
byte[] val = Bytes.toBytes(amount);
int oldCellTagsLen = (c == null) ? 0 : c.getTagsLength();
int incCellTagsLen = kv.getTagsLength();
Cell newKV = new KeyValue(row.length, family.getKey().length, q.length, ts,
KeyValue.Type.Put, val.length, oldCellTagsLen + incCellTagsLen);
System.arraycopy(row, 0, newKV.getRowArray(), newKV.getRowOffset(), row.length);
System.arraycopy(family.getKey(), 0, newKV.getFamilyArray(), newKV.getFamilyOffset(),
family.getKey().length);
System.arraycopy(q, 0, newKV.getQualifierArray(), newKV.getQualifierOffset(), q.length);
// copy in the value
System.arraycopy(val, 0, newKV.getValueArray(), newKV.getValueOffset(), val.length);
// copy tags
if (oldCellTagsLen > 0) {
System.arraycopy(c.getTagsArray(), c.getTagsOffset(), newKV.getTagsArray(),
newKV.getTagsOffset(), oldCellTagsLen);
}
if (incCellTagsLen > 0) {
System.arraycopy(kv.getTagsArray(), kv.getTagsOffset(), newKV.getTagsArray(),
newKV.getTagsOffset() + oldCellTagsLen, incCellTagsLen);
// Add the TTL tag if the mutation carried one
if (increment.getTTL() != Long.MAX_VALUE) {
newTags.add(new Tag(TagType.TTL_TAG_TYPE, Bytes.toBytes(increment.getTTL())));
}
Cell newKV = new KeyValue(row, 0, row.length,
family.getKey(), 0, family.getKey().length,
q, 0, q.length,
ts,
KeyValue.Type.Put,
val, 0, val.length,
newTags);
CellUtil.setSequenceId(newKV, mvccNum);
// Give coprocessors a chance to update the new cell
if (coprocessorHost != null) {
newKV = coprocessorHost.postMutationBeforeWAL(

View File

@ -57,6 +57,8 @@ import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.RemoteExceptionHandler;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.Tag;
import org.apache.hadoop.hbase.TagType;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.conf.ConfigurationManager;
@ -1673,8 +1675,38 @@ public class HStore implements Store {
return wantedVersions > maxVersions ? maxVersions: wantedVersions;
}
static boolean isExpired(final Cell key, final long oldestTimestamp) {
return key.getTimestamp() < oldestTimestamp;
/**
* @param cell
* @param oldestTimestamp
* @return true if the cell is expired
*/
static boolean isCellTTLExpired(final Cell cell, final long oldestTimestamp, final long now) {
// Do not create an Iterator or Tag objects unless the cell actually has
// tags
if (cell.getTagsLength() > 0) {
// Look for a TTL tag first. Use it instead of the family setting if
// found. If a cell has multiple TTLs, resolve the conflict by using the
// first tag encountered.
Iterator<Tag> i = CellUtil.tagsIterator(cell.getTagsArray(), cell.getTagsOffset(),
cell.getTagsLength());
while (i.hasNext()) {
Tag t = i.next();
if (TagType.TTL_TAG_TYPE == t.getType()) {
// Unlike in schema cell TTLs are stored in milliseconds, no need
// to convert
long ts = cell.getTimestamp();
assert t.getTagLength() == Bytes.SIZEOF_LONG;
long ttl = Bytes.toLong(t.getBuffer(), t.getTagOffset(), t.getTagLength());
if (ts + ttl < now) {
return true;
}
// Per cell TTLs cannot extend lifetime beyond family settings, so
// fall through to check that
break;
}
}
}
return false;
}
@Override

View File

@ -102,6 +102,10 @@ public class ScanQueryMatcher {
private final long earliestPutTs;
private final long ttl;
/** The oldest timestamp we are interested in, based on TTL */
private final long oldestUnexpiredTS;
private final long now;
/** readPoint over which the KVs are unconditionally included */
protected long maxReadPointToTrackVersions;
@ -154,7 +158,7 @@ public class ScanQueryMatcher {
*/
public ScanQueryMatcher(Scan scan, ScanInfo scanInfo, NavigableSet<byte[]> columns,
ScanType scanType, long readPointToUse, long earliestPutTs, long oldestUnexpiredTS,
RegionCoprocessorHost regionCoprocessorHost) throws IOException {
long now, RegionCoprocessorHost regionCoprocessorHost) throws IOException {
this.tr = scan.getTimeRange();
this.rowComparator = scanInfo.getComparator();
this.regionCoprocessorHost = regionCoprocessorHost;
@ -164,6 +168,9 @@ public class ScanQueryMatcher {
scanInfo.getFamily());
this.filter = scan.getFilter();
this.earliestPutTs = earliestPutTs;
this.oldestUnexpiredTS = oldestUnexpiredTS;
this.now = now;
this.maxReadPointToTrackVersions = readPointToUse;
this.timeToPurgeDeletes = scanInfo.getTimeToPurgeDeletes();
this.ttl = oldestUnexpiredTS;
@ -218,18 +225,18 @@ public class ScanQueryMatcher {
* @param scanInfo The store's immutable scan info
* @param columns
* @param earliestPutTs Earliest put seen in any of the store files.
* @param oldestUnexpiredTS the oldest timestamp we are interested in,
* based on TTL
* @param oldestUnexpiredTS the oldest timestamp we are interested in, based on TTL
* @param now the current server time
* @param dropDeletesFromRow The inclusive left bound of the range; can be EMPTY_START_ROW.
* @param dropDeletesToRow The exclusive right bound of the range; can be EMPTY_END_ROW.
* @param regionCoprocessorHost
* @throws IOException
*/
public ScanQueryMatcher(Scan scan, ScanInfo scanInfo, NavigableSet<byte[]> columns,
long readPointToUse, long earliestPutTs, long oldestUnexpiredTS, byte[] dropDeletesFromRow,
long readPointToUse, long earliestPutTs, long oldestUnexpiredTS, long now, byte[] dropDeletesFromRow,
byte[] dropDeletesToRow, RegionCoprocessorHost regionCoprocessorHost) throws IOException {
this(scan, scanInfo, columns, ScanType.COMPACT_RETAIN_DELETES, readPointToUse, earliestPutTs,
oldestUnexpiredTS, regionCoprocessorHost);
oldestUnexpiredTS, now, regionCoprocessorHost);
Preconditions.checkArgument((dropDeletesFromRow != null) && (dropDeletesToRow != null));
this.dropDeletesFromRow = dropDeletesFromRow;
this.dropDeletesToRow = dropDeletesToRow;
@ -239,10 +246,10 @@ public class ScanQueryMatcher {
* Constructor for tests
*/
ScanQueryMatcher(Scan scan, ScanInfo scanInfo,
NavigableSet<byte[]> columns, long oldestUnexpiredTS) throws IOException {
NavigableSet<byte[]> columns, long oldestUnexpiredTS, long now) throws IOException {
this(scan, scanInfo, columns, ScanType.USER_SCAN,
Long.MAX_VALUE, /* max Readpoint to track versions */
HConstants.LATEST_TIMESTAMP, oldestUnexpiredTS, null);
HConstants.LATEST_TIMESTAMP, oldestUnexpiredTS, now, null);
}
/**
@ -300,12 +307,17 @@ public class ScanQueryMatcher {
int qualifierOffset = cell.getQualifierOffset();
int qualifierLength = cell.getQualifierLength();
long timestamp = cell.getTimestamp();
// check for early out based on timestamp alone
if (columns.isDone(timestamp)) {
return columns.getNextRowOrNextColumn(cell.getQualifierArray(), qualifierOffset,
qualifierLength);
}
// check if the cell is expired by cell TTL
if (HStore.isCellTTLExpired(cell, this.oldestUnexpiredTS, this.now)) {
return MatchCode.SKIP;
}
/*
* The delete logic is pretty complicated now.

View File

@ -77,6 +77,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
protected final Scan scan;
protected final NavigableSet<byte[]> columns;
protected final long oldestUnexpiredTS;
protected final long now;
protected final int minVersions;
protected final long maxRowSize;
@ -123,7 +124,8 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
explicitColumnQuery = numCol > 0;
this.scan = scan;
this.columns = columns;
oldestUnexpiredTS = EnvironmentEdgeManager.currentTime() - ttl;
this.now = EnvironmentEdgeManager.currentTime();
this.oldestUnexpiredTS = now - ttl;
this.minVersions = minVersions;
if (store != null && ((HStore)store).getHRegion() != null
@ -176,7 +178,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
}
matcher = new ScanQueryMatcher(scan, scanInfo, columns,
ScanType.USER_SCAN, Long.MAX_VALUE, HConstants.LATEST_TIMESTAMP,
oldestUnexpiredTS, store.getCoprocessorHost());
oldestUnexpiredTS, now, store.getCoprocessorHost());
this.store.addChangedReaderObserver(this);
@ -241,10 +243,10 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
((HStore)store).getHRegion().getReadpoint(IsolationLevel.READ_COMMITTED));
if (dropDeletesFromRow == null) {
matcher = new ScanQueryMatcher(scan, scanInfo, null, scanType, smallestReadPoint,
earliestPutTs, oldestUnexpiredTS, store.getCoprocessorHost());
earliestPutTs, oldestUnexpiredTS, now, store.getCoprocessorHost());
} else {
matcher = new ScanQueryMatcher(scan, scanInfo, null, smallestReadPoint, earliestPutTs,
oldestUnexpiredTS, dropDeletesFromRow, dropDeletesToRow, store.getCoprocessorHost());
oldestUnexpiredTS, now, dropDeletesFromRow, dropDeletesToRow, store.getCoprocessorHost());
}
// Filter the list of scanners using Bloom filters, time range, TTL, etc.
@ -284,7 +286,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
this(null, scan.getCacheBlocks(), scan, columns, scanInfo.getTtl(),
scanInfo.getMinVersions(), readPt);
this.matcher = new ScanQueryMatcher(scan, scanInfo, columns, scanType,
Long.MAX_VALUE, earliestPutTs, oldestUnexpiredTS, null);
Long.MAX_VALUE, earliestPutTs, oldestUnexpiredTS, now, null);
// In unit tests, the store could be null
if (this.store != null) {

View File

@ -0,0 +1,172 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.mapreduce;
import static org.junit.Assert.assertEquals;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.UUID;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.LargeTests;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@Category(LargeTests.class)
public class TestImportTSVWithTTLs implements Configurable {
protected static final Log LOG = LogFactory.getLog(TestImportTSVWithTTLs.class);
protected static final String NAME = TestImportTsv.class.getSimpleName();
protected static HBaseTestingUtility util = new HBaseTestingUtility();
/**
* Delete the tmp directory after running doMROnTableTest. Boolean. Default is
* false.
*/
protected static final String DELETE_AFTER_LOAD_CONF = NAME + ".deleteAfterLoad";
/**
* Force use of combiner in doMROnTableTest. Boolean. Default is true.
*/
protected static final String FORCE_COMBINER_CONF = NAME + ".forceCombiner";
private final String FAMILY = "FAM";
private static Configuration conf;
@Override
public Configuration getConf() {
return util.getConfiguration();
}
@Override
public void setConf(Configuration conf) {
throw new IllegalArgumentException("setConf not supported");
}
@BeforeClass
public static void provisionCluster() throws Exception {
conf = util.getConfiguration();
// We don't check persistence in HFiles in this test, but if we ever do we will
// need this where the default hfile version is not 3 (i.e. 0.98)
conf.setInt("hfile.format.version", 3);
conf.set("hbase.coprocessor.region.classes", TTLCheckingObserver.class.getName());
util.startMiniCluster();
util.startMiniMapReduceCluster();
}
@AfterClass
public static void releaseCluster() throws Exception {
util.shutdownMiniMapReduceCluster();
util.shutdownMiniCluster();
}
@Test
public void testMROnTable() throws Exception {
String tableName = "test-" + UUID.randomUUID();
// Prepare the arguments required for the test.
String[] args = new String[] {
"-D" + ImportTsv.MAPPER_CONF_KEY
+ "=org.apache.hadoop.hbase.mapreduce.TsvImporterMapper",
"-D" + ImportTsv.COLUMNS_CONF_KEY + "=HBASE_ROW_KEY,FAM:A,FAM:B,HBASE_CELL_TTL",
"-D" + ImportTsv.SEPARATOR_CONF_KEY + "=\u001b", tableName };
String data = "KEY\u001bVALUE1\u001bVALUE2\u001b1000000\n";
util.createTable(TableName.valueOf(tableName), FAMILY);
doMROnTableTest(util, FAMILY, data, args, 1);
util.deleteTable(tableName);
}
protected static Tool doMROnTableTest(HBaseTestingUtility util, String family, String data,
String[] args, int valueMultiplier) throws Exception {
TableName table = TableName.valueOf(args[args.length - 1]);
Configuration conf = new Configuration(util.getConfiguration());
// populate input file
FileSystem fs = FileSystem.get(conf);
Path inputPath = fs.makeQualified(new Path(util
.getDataTestDirOnTestFS(table.getNameAsString()), "input.dat"));
FSDataOutputStream op = fs.create(inputPath, true);
op.write(Bytes.toBytes(data));
op.close();
LOG.debug(String.format("Wrote test data to file: %s", inputPath));
if (conf.getBoolean(FORCE_COMBINER_CONF, true)) {
LOG.debug("Forcing combiner.");
conf.setInt("mapreduce.map.combine.minspills", 1);
}
// run the import
List<String> argv = new ArrayList<String>(Arrays.asList(args));
argv.add(inputPath.toString());
Tool tool = new ImportTsv();
LOG.debug("Running ImportTsv with arguments: " + argv);
try {
// Job will fail if observer rejects entries without TTL
assertEquals(0, ToolRunner.run(conf, tool, argv.toArray(args)));
} finally {
// Clean up
if (conf.getBoolean(DELETE_AFTER_LOAD_CONF, true)) {
LOG.debug("Deleting test subdirectory");
util.cleanupDataTestDirOnTestFS(table.getNameAsString());
}
}
return tool;
}
public static class TTLCheckingObserver extends BaseRegionObserver {
@Override
public void prePut(ObserverContext<RegionCoprocessorEnvironment> e, Put put, WALEdit edit,
Durability durability) throws IOException {
HRegion region = e.getEnvironment().getRegion();
if (!region.getRegionInfo().isMetaTable()
&& !region.getRegionInfo().getTable().isSystemTable()) {
// The put carries the TTL attribute
if (put.getTTL() != Long.MAX_VALUE) {
return;
}
throw new IOException("Operation does not have TTL set");
}
}
}
}

View File

@ -78,6 +78,7 @@ import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.RegionTooBusyException;
import org.apache.hadoop.hbase.Tag;
import org.apache.hadoop.hbase.HConstants.OperationStatusCode;
import org.apache.hadoop.hbase.HDFSBlocksDistribution;
import org.apache.hadoop.hbase.HRegionInfo;
@ -91,6 +92,7 @@ import org.apache.hadoop.hbase.MultithreadedTestUtil.TestThread;
import org.apache.hadoop.hbase.NotServingRegionException;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.TagType;
import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.client.Append;
import org.apache.hadoop.hbase.client.Delete;
@ -112,6 +114,7 @@ import org.apache.hadoop.hbase.filter.NullComparator;
import org.apache.hadoop.hbase.filter.PrefixFilter;
import org.apache.hadoop.hbase.filter.SingleColumnValueExcludeFilter;
import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
import org.apache.hadoop.hbase.monitoring.TaskMonitor;
@ -139,6 +142,7 @@ import org.apache.hadoop.hbase.wal.WALSplitter;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.test.MetricsAssertHelper;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.IncrementingEnvironmentEdge;
@ -5766,6 +5770,133 @@ public class TestHRegion {
}
}
@Test
public void testCellTTLs() throws IOException {
IncrementingEnvironmentEdge edge = new IncrementingEnvironmentEdge();
EnvironmentEdgeManager.injectEdge(edge);
final byte[] row = Bytes.toBytes("testRow");
final byte[] q1 = Bytes.toBytes("q1");
final byte[] q2 = Bytes.toBytes("q2");
final byte[] q3 = Bytes.toBytes("q3");
final byte[] q4 = Bytes.toBytes("q4");
HTableDescriptor htd = new HTableDescriptor(TableName.valueOf("testCellTTLs"));
HColumnDescriptor hcd = new HColumnDescriptor(fam1);
hcd.setTimeToLive(10); // 10 seconds
htd.addFamily(hcd);
Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
conf.setInt(HFile.FORMAT_VERSION_KEY, HFile.MIN_FORMAT_VERSION_WITH_TAGS);
HRegion region = HRegion.createHRegion(new HRegionInfo(htd.getTableName(),
HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY),
TEST_UTIL.getDataTestDir(), conf, htd);
assertNotNull(region);
try {
long now = EnvironmentEdgeManager.currentTime();
// Add a cell that will expire in 5 seconds via cell TTL
region.put(new Put(row).add(new KeyValue(row, fam1, q1, now,
HConstants.EMPTY_BYTE_ARRAY, new Tag[] {
// TTL tags specify ts in milliseconds
new Tag(TagType.TTL_TAG_TYPE, Bytes.toBytes(5000L)) } )));
// Add a cell that will expire after 10 seconds via family setting
region.put(new Put(row).add(fam1, q2, now, HConstants.EMPTY_BYTE_ARRAY));
// Add a cell that will expire in 15 seconds via cell TTL
region.put(new Put(row).add(new KeyValue(row, fam1, q3, now + 10000 - 1,
HConstants.EMPTY_BYTE_ARRAY, new Tag[] {
// TTL tags specify ts in milliseconds
new Tag(TagType.TTL_TAG_TYPE, Bytes.toBytes(5000L)) } )));
// Add a cell that will expire in 20 seconds via family setting
region.put(new Put(row).add(fam1, q4, now + 10000 - 1, HConstants.EMPTY_BYTE_ARRAY));
// Flush so we are sure store scanning gets this right
region.flushcache();
// A query at time T+0 should return all cells
Result r = region.get(new Get(row));
assertNotNull(r.getValue(fam1, q1));
assertNotNull(r.getValue(fam1, q2));
assertNotNull(r.getValue(fam1, q3));
assertNotNull(r.getValue(fam1, q4));
// Increment time to T+5 seconds
edge.incrementTime(5000);
r = region.get(new Get(row));
assertNull(r.getValue(fam1, q1));
assertNotNull(r.getValue(fam1, q2));
assertNotNull(r.getValue(fam1, q3));
assertNotNull(r.getValue(fam1, q4));
// Increment time to T+10 seconds
edge.incrementTime(5000);
r = region.get(new Get(row));
assertNull(r.getValue(fam1, q1));
assertNull(r.getValue(fam1, q2));
assertNotNull(r.getValue(fam1, q3));
assertNotNull(r.getValue(fam1, q4));
// Increment time to T+15 seconds
edge.incrementTime(5000);
r = region.get(new Get(row));
assertNull(r.getValue(fam1, q1));
assertNull(r.getValue(fam1, q2));
assertNull(r.getValue(fam1, q3));
assertNotNull(r.getValue(fam1, q4));
// Increment time to T+20 seconds
edge.incrementTime(10000);
r = region.get(new Get(row));
assertNull(r.getValue(fam1, q1));
assertNull(r.getValue(fam1, q2));
assertNull(r.getValue(fam1, q3));
assertNull(r.getValue(fam1, q4));
// Fun with disappearing increments
// Start at 1
region.put(new Put(row).add(fam1, q1, Bytes.toBytes(1L)));
r = region.get(new Get(row));
byte[] val = r.getValue(fam1, q1);
assertNotNull(val);
assertEquals(Bytes.toLong(val), 1L);
// Increment with a TTL of 5 seconds
Increment incr = new Increment(row).addColumn(fam1, q1, 1L);
incr.setTTL(5000);
region.increment(incr); // 2
// New value should be 2
r = region.get(new Get(row));
val = r.getValue(fam1, q1);
assertNotNull(val);
assertEquals(Bytes.toLong(val), 2L);
// Increment time to T+25 seconds
edge.incrementTime(5000);
// Value should be back to 1
r = region.get(new Get(row));
val = r.getValue(fam1, q1);
assertNotNull(val);
assertEquals(Bytes.toLong(val), 1L);
// Increment time to T+30 seconds
edge.incrementTime(5000);
// Original value written at T+20 should be gone now via family TTL
r = region.get(new Get(row));
assertNull(r.getValue(fam1, q1));
} finally {
HRegion.closeHRegion(region);
}
}
private static HRegion initHRegion(byte[] tableName, String callingMethod,
byte[]... families) throws IOException {
return initHRegion(tableName, callingMethod, HBaseConfiguration.create(),

View File

@ -91,11 +91,12 @@ public class TestQueryMatcher extends HBaseTestCase {
}
private void _testMatch_ExplicitColumns(Scan scan, List<MatchCode> expected) throws IOException {
// 2,4,5
private void _testMatch_ExplicitColumns(Scan scan, List<MatchCode> expected) throws IOException {
long now = EnvironmentEdgeManager.currentTime();
// 2,4,5
ScanQueryMatcher qm = new ScanQueryMatcher(scan, new ScanInfo(fam2,
0, 1, ttl, KeepDeletedCells.FALSE, 0, rowComparator), get.getFamilyMap().get(fam2),
EnvironmentEdgeManager.currentTime() - ttl);
now - ttl, now);
List<KeyValue> memstore = new ArrayList<KeyValue>();
memstore.add(new KeyValue(row1, fam2, col1, 1, data));
@ -175,9 +176,10 @@ public class TestQueryMatcher extends HBaseTestCase {
expected.add(ScanQueryMatcher.MatchCode.INCLUDE);
expected.add(ScanQueryMatcher.MatchCode.DONE);
long now = EnvironmentEdgeManager.currentTime();
ScanQueryMatcher qm = new ScanQueryMatcher(scan, new ScanInfo(fam2,
0, 1, ttl, KeepDeletedCells.FALSE, 0, rowComparator), null,
EnvironmentEdgeManager.currentTime() - ttl);
now - ttl, now);
List<KeyValue> memstore = new ArrayList<KeyValue>();
memstore.add(new KeyValue(row1, fam2, col1, 1, data));
@ -230,8 +232,8 @@ public class TestQueryMatcher extends HBaseTestCase {
long now = EnvironmentEdgeManager.currentTime();
ScanQueryMatcher qm =
new ScanQueryMatcher(scan, new ScanInfo(fam2, 0, 1, testTTL, KeepDeletedCells.FALSE, 0,
rowComparator), get.getFamilyMap().get(fam2), now - testTTL);
new ScanQueryMatcher(scan, new ScanInfo(fam2, 0, 1, testTTL, KeepDeletedCells.FALSE, 0,
rowComparator), get.getFamilyMap().get(fam2), now - testTTL, now);
KeyValue [] kvs = new KeyValue[] {
new KeyValue(row1, fam2, col1, now-100, data),
@ -285,7 +287,7 @@ public class TestQueryMatcher extends HBaseTestCase {
long now = EnvironmentEdgeManager.currentTime();
ScanQueryMatcher qm = new ScanQueryMatcher(scan, new ScanInfo(fam2,
0, 1, testTTL, KeepDeletedCells.FALSE, 0, rowComparator), null,
now - testTTL);
now - testTTL, now);
KeyValue [] kvs = new KeyValue[] {
new KeyValue(row1, fam2, col1, now-100, data),
@ -343,7 +345,7 @@ public class TestQueryMatcher extends HBaseTestCase {
NavigableSet<byte[]> cols = get.getFamilyMap().get(fam2);
ScanQueryMatcher qm = new ScanQueryMatcher(scan, scanInfo, cols, Long.MAX_VALUE,
HConstants.OLDEST_TIMESTAMP, HConstants.OLDEST_TIMESTAMP, from, to, null);
HConstants.OLDEST_TIMESTAMP, HConstants.OLDEST_TIMESTAMP, now, from, to, null);
List<ScanQueryMatcher.MatchCode> actual =
new ArrayList<ScanQueryMatcher.MatchCode>(rows.length);
byte[] prevRow = null;

View File

@ -418,8 +418,13 @@ public class TestTags {
tags = TestCoprocessorForTags.tags;
assertEquals(5L, Bytes.toLong(kv.getValueArray(), kv.getValueOffset(), kv.getValueLength()));
assertEquals(2, tags.size());
assertEquals("tag1", Bytes.toString(tags.get(0).getValue()));
assertEquals("tag2", Bytes.toString(tags.get(1).getValue()));
// We cannot assume the ordering of tags
List<String> tagValues = new ArrayList<String>();
for (Tag tag: tags) {
tagValues.add(Bytes.toString(tag.getValue()));
}
assertTrue(tagValues.contains("tag1"));
assertTrue(tagValues.contains("tag2"));
TestCoprocessorForTags.checkTagPresence = false;
TestCoprocessorForTags.tags = null;
@ -475,8 +480,13 @@ public class TestTags {
kv = KeyValueUtil.ensureKeyValue(result.getColumnLatestCell(f, q));
tags = TestCoprocessorForTags.tags;
assertEquals(2, tags.size());
assertEquals("tag1", Bytes.toString(tags.get(0).getValue()));
assertEquals("tag2", Bytes.toString(tags.get(1).getValue()));
// We cannot assume the ordering of tags
tagValues.clear();
for (Tag tag: tags) {
tagValues.add(Bytes.toString(tag.getValue()));
}
assertTrue(tagValues.contains("tag1"));
assertTrue(tagValues.contains("tag2"));
TestCoprocessorForTags.checkTagPresence = false;
TestCoprocessorForTags.tags = null;

View File

@ -141,17 +141,19 @@ EOF
set_attributes(p, attributes) if attributes
visibility = args[VISIBILITY]
set_cell_visibility(p, visibility) if visibility
ttl = args[TTL]
set_op_ttl(p, ttl) if ttl
end
#Case where attributes are specified without timestamp
if timestamp.kind_of?(Hash)
timestamp.each do |k, v|
if v.kind_of?(Hash)
set_attributes(p, v) if v
end
if v.kind_of?(String)
set_cell_visibility(p, v) if v
end
if k == 'ATTRIBUTES'
set_attributes(p, v)
elsif k == 'VISIBILITY'
set_cell_visibility(p, v)
elsif k == "TTL"
set_op_ttl(p, v)
end
end
timestamp = nil
end
@ -219,6 +221,8 @@ EOF
visibility = args[VISIBILITY]
set_attributes(incr, attributes) if attributes
set_cell_visibility(incr, visibility) if visibility
ttl = args[TTL]
set_op_ttl(incr, ttl) if ttl
end
incr.addColumn(family, qualifier, value)
@table.increment(incr)
@ -237,6 +241,8 @@ EOF
visibility = args[VISIBILITY]
set_attributes(append, attributes) if attributes
set_cell_visibility(append, visibility) if visibility
ttl = args[TTL]
set_op_ttl(append, ttl) if ttl
end
append.add(family, qualifier, value.to_s.to_java_bytes)
@table.append(append)
@ -545,6 +551,10 @@ EOF
auths.to_java(:string)))
end
def set_op_ttl(op, ttl)
op.setTTL(ttl.to_java(:long))
end
#----------------------------
# Add general administration utilities to the shell
# each of the names below adds this method name to the table

View File

@ -530,5 +530,18 @@ module Hbase
end
end
define_test "mutation with TTL should expire" do
@test_table.put('ttlTest', 'x:a', 'foo', { TTL => 1000 } )
begin
res = @test_table._get_internal('ttlTest', 'x:a')
assert_not_nil(res)
sleep 2
res = @test_table._get_internal('ttlTest', 'x:a')
assert_nil(res)
ensure
@test_table.delete('ttlTest', 'x:a')
end
end
end
end