HBASE-8462 Custom timestamps should not be allowed to be negative
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1518322 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
102f84d272
commit
1e71b56731
@ -128,7 +128,7 @@ public class Delete extends Mutation implements Comparable<Row> {
|
||||
public Delete(final byte [] rowArray, final int rowOffset, final int rowLength, long ts) {
|
||||
checkRow(rowArray, rowOffset, rowLength);
|
||||
this.row = Bytes.copy(rowArray, rowOffset, rowLength);
|
||||
this.ts = ts;
|
||||
setTimestamp(ts);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -195,6 +195,9 @@ public class Delete extends Mutation implements Comparable<Row> {
|
||||
*/
|
||||
@SuppressWarnings("unchecked")
|
||||
public Delete deleteFamily(byte [] family, long timestamp) {
|
||||
if (timestamp < 0) {
|
||||
throw new IllegalArgumentException("Timestamp cannot be negative. ts=" + timestamp);
|
||||
}
|
||||
List<Cell> list = familyMap.get(family);
|
||||
if(list == null) {
|
||||
list = new ArrayList<Cell>();
|
||||
@ -247,6 +250,9 @@ public class Delete extends Mutation implements Comparable<Row> {
|
||||
*/
|
||||
@SuppressWarnings("unchecked")
|
||||
public Delete deleteColumns(byte [] family, byte [] qualifier, long timestamp) {
|
||||
if (timestamp < 0) {
|
||||
throw new IllegalArgumentException("Timestamp cannot be negative. ts=" + timestamp);
|
||||
}
|
||||
List<Cell> list = familyMap.get(family);
|
||||
if (list == null) {
|
||||
list = new ArrayList<Cell>();
|
||||
@ -280,6 +286,9 @@ public class Delete extends Mutation implements Comparable<Row> {
|
||||
*/
|
||||
@SuppressWarnings("unchecked")
|
||||
public Delete deleteColumn(byte [] family, byte [] qualifier, long timestamp) {
|
||||
if (timestamp < 0) {
|
||||
throw new IllegalArgumentException("Timestamp cannot be negative. ts=" + timestamp);
|
||||
}
|
||||
List<Cell> list = familyMap.get(family);
|
||||
if(list == null) {
|
||||
list = new ArrayList<Cell>();
|
||||
@ -292,10 +301,13 @@ public class Delete extends Mutation implements Comparable<Row> {
|
||||
|
||||
/**
|
||||
* Set the timestamp of the delete.
|
||||
*
|
||||
*
|
||||
* @param timestamp
|
||||
*/
|
||||
public void setTimestamp(long timestamp) {
|
||||
if (timestamp < 0) {
|
||||
throw new IllegalArgumentException("Timestamp cannot be negative. ts=" + timestamp);
|
||||
}
|
||||
this.ts = timestamp;
|
||||
}
|
||||
|
||||
|
@ -84,6 +84,9 @@ public class Put extends Mutation implements HeapSize, Comparable<Row> {
|
||||
checkRow(rowArray, rowOffset, rowLength);
|
||||
this.row = Bytes.copy(rowArray, rowOffset, rowLength);
|
||||
this.ts = ts;
|
||||
if (ts < 0) {
|
||||
throw new IllegalArgumentException("Timestamp cannot be negative. ts=" + ts);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
@ -121,6 +124,9 @@ public class Put extends Mutation implements HeapSize, Comparable<Row> {
|
||||
*/
|
||||
@SuppressWarnings("unchecked")
|
||||
public Put add(byte [] family, byte [] qualifier, long ts, byte [] value) {
|
||||
if (ts < 0) {
|
||||
throw new IllegalArgumentException("Timestamp cannot be negative. ts=" + ts);
|
||||
}
|
||||
List<Cell> list = getCellList(family);
|
||||
KeyValue kv = createPutKeyValue(family, qualifier, ts, value);
|
||||
list.add(kv);
|
||||
|
@ -124,7 +124,7 @@ public interface Cell {
|
||||
|
||||
/**
|
||||
* @return Long value representing time at which this cell was "Put" into the row. Typically
|
||||
* represents the time of insertion, but can be any value from Long.MIN_VALUE to Long.MAX_VALUE.
|
||||
* represents the time of insertion, but can be any value from 0 to Long.MAX_VALUE.
|
||||
*/
|
||||
long getTimestamp();
|
||||
|
||||
|
@ -72,6 +72,10 @@ public class TimeRange {
|
||||
*/
|
||||
public TimeRange(long minStamp, long maxStamp)
|
||||
throws IOException {
|
||||
if (minStamp < 0 || maxStamp < 0) {
|
||||
throw new IllegalArgumentException("Timestamp cannot be negative. minStamp:" + minStamp
|
||||
+ ", maxStamp" + maxStamp);
|
||||
}
|
||||
if(maxStamp < minStamp) {
|
||||
throw new IOException("maxStamp is smaller than minStamp");
|
||||
}
|
||||
|
@ -55,6 +55,7 @@ import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.TableNotFoundException;
|
||||
import org.apache.hadoop.hbase.client.Delete;
|
||||
import org.apache.hadoop.hbase.client.Durability;
|
||||
import org.apache.hadoop.hbase.client.Get;
|
||||
import org.apache.hadoop.hbase.client.HBaseAdmin;
|
||||
import org.apache.hadoop.hbase.client.HTable;
|
||||
@ -64,7 +65,6 @@ import org.apache.hadoop.hbase.client.Put;
|
||||
import org.apache.hadoop.hbase.client.Result;
|
||||
import org.apache.hadoop.hbase.client.ResultScanner;
|
||||
import org.apache.hadoop.hbase.client.Scan;
|
||||
import org.apache.hadoop.hbase.client.Durability;
|
||||
import org.apache.hadoop.hbase.filter.Filter;
|
||||
import org.apache.hadoop.hbase.filter.ParseFilter;
|
||||
import org.apache.hadoop.hbase.filter.PrefixFilter;
|
||||
@ -299,7 +299,7 @@ public class ThriftServerRunner implements Runnable {
|
||||
if (implType == ImplType.HS_HA || implType == ImplType.NONBLOCKING ||
|
||||
implType == ImplType.THREADED_SELECTOR) {
|
||||
|
||||
InetAddress listenAddress = getBindAddress(conf);
|
||||
InetAddress listenAddress = getBindAddress(conf);
|
||||
TNonblockingServerTransport serverTransport = new TNonblockingServerSocket(
|
||||
new InetSocketAddress(listenAddress, listenPort));
|
||||
|
||||
@ -366,7 +366,7 @@ public class ThriftServerRunner implements Runnable {
|
||||
tserver.getClass().getName());
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
registerFilters(conf);
|
||||
}
|
||||
@ -737,7 +737,7 @@ public class ThriftServerRunner implements Runnable {
|
||||
Get get = new Get(getBytes(row));
|
||||
addAttributes(get, attributes);
|
||||
get.addColumn(family, qualifier);
|
||||
get.setTimeRange(Long.MIN_VALUE, timestamp);
|
||||
get.setTimeRange(0, timestamp);
|
||||
get.setMaxVersions(numVersions);
|
||||
Result result = table.get(get);
|
||||
return ThriftUtilities.cellFromHBase(result.raw());
|
||||
@ -781,7 +781,7 @@ public class ThriftServerRunner implements Runnable {
|
||||
if (columns == null) {
|
||||
Get get = new Get(getBytes(row));
|
||||
addAttributes(get, attributes);
|
||||
get.setTimeRange(Long.MIN_VALUE, timestamp);
|
||||
get.setTimeRange(0, timestamp);
|
||||
Result result = table.get(get);
|
||||
return ThriftUtilities.rowResultFromHBase(result);
|
||||
}
|
||||
@ -795,7 +795,7 @@ public class ThriftServerRunner implements Runnable {
|
||||
get.addColumn(famAndQf[0], famAndQf[1]);
|
||||
}
|
||||
}
|
||||
get.setTimeRange(Long.MIN_VALUE, timestamp);
|
||||
get.setTimeRange(0, timestamp);
|
||||
Result result = table.get(get);
|
||||
return ThriftUtilities.rowResultFromHBase(result);
|
||||
} catch (IOException e) {
|
||||
@ -858,7 +858,7 @@ public class ThriftServerRunner implements Runnable {
|
||||
}
|
||||
}
|
||||
}
|
||||
get.setTimeRange(Long.MIN_VALUE, timestamp);
|
||||
get.setTimeRange(0, timestamp);
|
||||
gets.add(get);
|
||||
}
|
||||
Result[] result = table.get(gets);
|
||||
@ -1085,7 +1085,7 @@ public class ThriftServerRunner implements Runnable {
|
||||
table.put(puts);
|
||||
if (!deletes.isEmpty())
|
||||
table.delete(deletes);
|
||||
|
||||
|
||||
} catch (IOException e) {
|
||||
LOG.warn(e.getMessage(), e);
|
||||
throw new IOError(e.getMessage());
|
||||
@ -1122,6 +1122,7 @@ public class ThriftServerRunner implements Runnable {
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void scannerClose(int id) throws IOError, IllegalArgument {
|
||||
LOG.debug("scannerClose: id=" + id);
|
||||
ResultScannerWrapper resultScannerWrapper = getScanner(id);
|
||||
@ -1163,6 +1164,7 @@ public class ThriftServerRunner implements Runnable {
|
||||
return scannerGetList(id,1);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int scannerOpenWithScan(ByteBuffer tableName, TScan tScan,
|
||||
Map<ByteBuffer, ByteBuffer> attributes)
|
||||
throws IOError {
|
||||
@ -1177,7 +1179,7 @@ public class ThriftServerRunner implements Runnable {
|
||||
scan.setStopRow(tScan.getStopRow());
|
||||
}
|
||||
if (tScan.isSetTimestamp()) {
|
||||
scan.setTimeRange(Long.MIN_VALUE, tScan.getTimestamp());
|
||||
scan.setTimeRange(0, tScan.getTimestamp());
|
||||
}
|
||||
if (tScan.isSetCaching()) {
|
||||
scan.setCaching(tScan.getCaching());
|
||||
@ -1296,7 +1298,7 @@ public class ThriftServerRunner implements Runnable {
|
||||
HTable table = getTable(tableName);
|
||||
Scan scan = new Scan(getBytes(startRow));
|
||||
addAttributes(scan, attributes);
|
||||
scan.setTimeRange(Long.MIN_VALUE, timestamp);
|
||||
scan.setTimeRange(0, timestamp);
|
||||
if (columns != null && columns.size() != 0) {
|
||||
for (ByteBuffer column : columns) {
|
||||
byte [][] famQf = KeyValue.parseColumn(getBytes(column));
|
||||
@ -1323,7 +1325,7 @@ public class ThriftServerRunner implements Runnable {
|
||||
HTable table = getTable(tableName);
|
||||
Scan scan = new Scan(getBytes(startRow), getBytes(stopRow));
|
||||
addAttributes(scan, attributes);
|
||||
scan.setTimeRange(Long.MIN_VALUE, timestamp);
|
||||
scan.setTimeRange(0, timestamp);
|
||||
if (columns != null && columns.size() != 0) {
|
||||
for (ByteBuffer column : columns) {
|
||||
byte [][] famQf = KeyValue.parseColumn(getBytes(column));
|
||||
@ -1334,7 +1336,7 @@ public class ThriftServerRunner implements Runnable {
|
||||
}
|
||||
}
|
||||
}
|
||||
scan.setTimeRange(Long.MIN_VALUE, timestamp);
|
||||
scan.setTimeRange(0, timestamp);
|
||||
return addScanner(table.getScanner(scan), false);
|
||||
} catch (IOException e) {
|
||||
LOG.warn(e.getMessage(), e);
|
||||
|
@ -5103,6 +5103,65 @@ public class TestFromClientSide {
|
||||
assertEquals(2, bar[0].size());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testNegativeTimestamp() throws IOException {
|
||||
HTable table = TEST_UTIL.createTable(Bytes.toBytes("testNegativeTimestamp"), FAMILY);
|
||||
|
||||
try {
|
||||
Put put = new Put(ROW, -1);
|
||||
put.add(FAMILY, QUALIFIER, VALUE);
|
||||
table.put(put);
|
||||
fail("Negative timestamps should not have been allowed");
|
||||
} catch (IllegalArgumentException ex) {
|
||||
assertTrue(ex.getMessage().contains("negative"));
|
||||
}
|
||||
|
||||
try {
|
||||
Put put = new Put(ROW);
|
||||
put.add(FAMILY, QUALIFIER, -1, VALUE);
|
||||
table.put(put);
|
||||
fail("Negative timestamps should not have been allowed");
|
||||
} catch (IllegalArgumentException ex) {
|
||||
assertTrue(ex.getMessage().contains("negative"));
|
||||
}
|
||||
|
||||
try {
|
||||
Delete delete = new Delete(ROW, -1);
|
||||
table.delete(delete);
|
||||
fail("Negative timestamps should not have been allowed");
|
||||
} catch (IllegalArgumentException ex) {
|
||||
assertTrue(ex.getMessage().contains("negative"));
|
||||
}
|
||||
|
||||
try {
|
||||
Delete delete = new Delete(ROW);
|
||||
delete.deleteFamily(FAMILY, -1);
|
||||
table.delete(delete);
|
||||
fail("Negative timestamps should not have been allowed");
|
||||
} catch (IllegalArgumentException ex) {
|
||||
assertTrue(ex.getMessage().contains("negative"));
|
||||
}
|
||||
|
||||
try {
|
||||
Scan scan = new Scan();
|
||||
scan.setTimeRange(-1, 1);
|
||||
table.getScanner(scan);
|
||||
fail("Negative timestamps should not have been allowed");
|
||||
} catch (IllegalArgumentException ex) {
|
||||
assertTrue(ex.getMessage().contains("negative"));
|
||||
}
|
||||
|
||||
// KeyValue should allow negative timestamps for backwards compat. Otherwise, if the user
|
||||
// already has negative timestamps in cluster data, HBase won't be able to handle that
|
||||
try {
|
||||
new KeyValue(Bytes.toBytes(42), Bytes.toBytes(42), Bytes.toBytes(42), -1, Bytes.toBytes(42));
|
||||
} catch (IllegalArgumentException ex) {
|
||||
fail("KeyValue SHOULD allow negative timestamps");
|
||||
}
|
||||
|
||||
table.close();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRawScanRespectsVersions() throws Exception {
|
||||
byte[] TABLE = Bytes.toBytes("testRawScan");
|
||||
|
@ -86,7 +86,7 @@ public class TestHLogFiltering {
|
||||
Delete del = new Delete(row);
|
||||
for (int iCol = 0; iCol < 10; ++iCol) {
|
||||
final byte[] cf = rand.nextBoolean() ? CF1 : CF2;
|
||||
final long ts = rand.nextInt();
|
||||
final long ts = Math.abs(rand.nextInt());
|
||||
final byte[] qual = Bytes.toBytes("col" + iCol);
|
||||
if (rand.nextBoolean()) {
|
||||
final byte[] value = Bytes.toBytes("value_for_row_" + iRow +
|
||||
|
Loading…
x
Reference in New Issue
Block a user