[jira] [HBase-5021] Enforce upper bound on timestamp
Summary: We have been getting hit with performance problems on the ODS side due to invalid timestamps being inserted by the timestamp. ODS is working on adding proper checks to app server, but production performance could be severely impacted with significant recovery time if something slips past. Therefore, we should also allow the option to check the upper bound in HBase. This is the first draft. Probably should allow per-CF customization. Test Plan: - mvn test -Dtest=TestHRegion#testPutWithTsTooNew Reviewers: Kannan, Liyin, JIRA CC: stack, nspiegelberg, tedyu, Kannan, mbautin Differential Revision: 849 git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1221532 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
a952738257
commit
82e376b3ea
|
@ -38,7 +38,7 @@ public final class HConstants {
|
||||||
public enum OperationStatusCode {
|
public enum OperationStatusCode {
|
||||||
NOT_RUN,
|
NOT_RUN,
|
||||||
SUCCESS,
|
SUCCESS,
|
||||||
BAD_FAMILY,
|
SANITY_CHECK_FAILURE,
|
||||||
FAILURE;
|
FAILURE;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -281,6 +281,7 @@ public class HRegion implements HeapSize { // , Writable{
|
||||||
final WriteState writestate = new WriteState();
|
final WriteState writestate = new WriteState();
|
||||||
|
|
||||||
long memstoreFlushSize;
|
long memstoreFlushSize;
|
||||||
|
final long timestampSlop;
|
||||||
private volatile long lastFlushTime;
|
private volatile long lastFlushTime;
|
||||||
final RegionServerServices rsServices;
|
final RegionServerServices rsServices;
|
||||||
private List<Pair<Long, Long>> recentFlushes = new ArrayList<Pair<Long,Long>>();
|
private List<Pair<Long, Long>> recentFlushes = new ArrayList<Pair<Long,Long>>();
|
||||||
|
@ -396,6 +397,7 @@ public class HRegion implements HeapSize { // , Writable{
|
||||||
this.rowLockWaitDuration = DEFAULT_ROWLOCK_WAIT_DURATION;
|
this.rowLockWaitDuration = DEFAULT_ROWLOCK_WAIT_DURATION;
|
||||||
this.rsServices = null;
|
this.rsServices = null;
|
||||||
this.fs = null;
|
this.fs = null;
|
||||||
|
this.timestampSlop = HConstants.LATEST_TIMESTAMP;
|
||||||
this.memstoreFlushSize = 0L;
|
this.memstoreFlushSize = 0L;
|
||||||
this.log = null;
|
this.log = null;
|
||||||
this.regiondir = null;
|
this.regiondir = null;
|
||||||
|
@ -449,6 +451,16 @@ public class HRegion implements HeapSize { // , Writable{
|
||||||
this.regiondir = getRegionDir(this.tableDir, encodedNameStr);
|
this.regiondir = getRegionDir(this.tableDir, encodedNameStr);
|
||||||
this.scannerReadPoints = new ConcurrentHashMap<RegionScanner, Long>();
|
this.scannerReadPoints = new ConcurrentHashMap<RegionScanner, Long>();
|
||||||
|
|
||||||
|
/*
|
||||||
|
* timestamp.slop provides a server-side constraint on the timestamp. This
|
||||||
|
* assumes that you base your TS around currentTimeMillis(). In this case,
|
||||||
|
* throw an error to the user if the user-specified TS is newer than now +
|
||||||
|
* slop. LATEST_TIMESTAMP == don't use this functionality
|
||||||
|
*/
|
||||||
|
this.timestampSlop = conf.getLong(
|
||||||
|
"hbase.hregion.keyvalue.timestamp.slop.millisecs",
|
||||||
|
HConstants.LATEST_TIMESTAMP);
|
||||||
|
|
||||||
// don't initialize coprocessors if not running within a regionserver
|
// don't initialize coprocessors if not running within a regionserver
|
||||||
// TODO: revisit if coprocessors should load in other cases
|
// TODO: revisit if coprocessors should load in other cases
|
||||||
if (rsServices != null) {
|
if (rsServices != null) {
|
||||||
|
@ -464,6 +476,7 @@ public class HRegion implements HeapSize { // , Writable{
|
||||||
if (this.htableDescriptor == null) return;
|
if (this.htableDescriptor == null) return;
|
||||||
LOG.info("Setting up tabledescriptor config now ...");
|
LOG.info("Setting up tabledescriptor config now ...");
|
||||||
long flushSize = this.htableDescriptor.getMemStoreFlushSize();
|
long flushSize = this.htableDescriptor.getMemStoreFlushSize();
|
||||||
|
|
||||||
if (flushSize == HTableDescriptor.DEFAULT_MEMSTORE_FLUSH_SIZE) {
|
if (flushSize == HTableDescriptor.DEFAULT_MEMSTORE_FLUSH_SIZE) {
|
||||||
flushSize = conf.getLong("hbase.hregion.memstore.flush.size",
|
flushSize = conf.getLong("hbase.hregion.memstore.flush.size",
|
||||||
HTableDescriptor.DEFAULT_MEMSTORE_FLUSH_SIZE);
|
HTableDescriptor.DEFAULT_MEMSTORE_FLUSH_SIZE);
|
||||||
|
@ -1876,6 +1889,7 @@ public class HRegion implements HeapSize { // , Writable{
|
||||||
// we acquire at least one.
|
// we acquire at least one.
|
||||||
// ----------------------------------
|
// ----------------------------------
|
||||||
int numReadyToWrite = 0;
|
int numReadyToWrite = 0;
|
||||||
|
long now = EnvironmentEdgeManager.currentTimeMillis();
|
||||||
while (lastIndexExclusive < batchOp.operations.length) {
|
while (lastIndexExclusive < batchOp.operations.length) {
|
||||||
Pair<Put, Integer> nextPair = batchOp.operations[lastIndexExclusive];
|
Pair<Put, Integer> nextPair = batchOp.operations[lastIndexExclusive];
|
||||||
Put put = nextPair.getFirst();
|
Put put = nextPair.getFirst();
|
||||||
|
@ -1895,10 +1909,11 @@ public class HRegion implements HeapSize { // , Writable{
|
||||||
// Check the families in the put. If bad, skip this one.
|
// Check the families in the put. If bad, skip this one.
|
||||||
try {
|
try {
|
||||||
checkFamilies(familyMap.keySet());
|
checkFamilies(familyMap.keySet());
|
||||||
} catch (NoSuchColumnFamilyException nscf) {
|
checkTimestamps(put, now);
|
||||||
LOG.warn("No such column family in batch put", nscf);
|
} catch (DoNotRetryIOException dnrioe) {
|
||||||
|
LOG.warn("No such column family in batch put", dnrioe);
|
||||||
batchOp.retCodeDetails[lastIndexExclusive] = new OperationStatus(
|
batchOp.retCodeDetails[lastIndexExclusive] = new OperationStatus(
|
||||||
OperationStatusCode.BAD_FAMILY, nscf.getMessage());
|
OperationStatusCode.SANITY_CHECK_FAILURE, dnrioe.getMessage());
|
||||||
lastIndexExclusive++;
|
lastIndexExclusive++;
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
@ -1936,7 +1951,7 @@ public class HRegion implements HeapSize { // , Writable{
|
||||||
|
|
||||||
// we should record the timestamp only after we have acquired the rowLock,
|
// we should record the timestamp only after we have acquired the rowLock,
|
||||||
// otherwise, newer puts are not guaranteed to have a newer timestamp
|
// otherwise, newer puts are not guaranteed to have a newer timestamp
|
||||||
long now = EnvironmentEdgeManager.currentTimeMillis();
|
now = EnvironmentEdgeManager.currentTimeMillis();
|
||||||
byte[] byteNow = Bytes.toBytes(now);
|
byte[] byteNow = Bytes.toBytes(now);
|
||||||
|
|
||||||
// Nothing to put -- an exception in the above such as NoSuchColumnFamily?
|
// Nothing to put -- an exception in the above such as NoSuchColumnFamily?
|
||||||
|
@ -2307,6 +2322,7 @@ public class HRegion implements HeapSize { // , Writable{
|
||||||
this.updatesLock.readLock().lock();
|
this.updatesLock.readLock().lock();
|
||||||
try {
|
try {
|
||||||
checkFamilies(familyMap.keySet());
|
checkFamilies(familyMap.keySet());
|
||||||
|
checkTimestamps(familyMap, now);
|
||||||
updateKVTimestamps(familyMap.values(), byteNow);
|
updateKVTimestamps(familyMap.values(), byteNow);
|
||||||
// write/sync to WAL should happen before we touch memstore.
|
// write/sync to WAL should happen before we touch memstore.
|
||||||
//
|
//
|
||||||
|
@ -2431,6 +2447,26 @@ public class HRegion implements HeapSize { // , Writable{
|
||||||
checkFamily(family);
|
checkFamily(family);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
private void checkTimestamps(Put p, long now) throws DoNotRetryIOException {
|
||||||
|
checkTimestamps(p.getFamilyMap(), now);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void checkTimestamps(final Map<byte[], List<KeyValue>> familyMap,
|
||||||
|
long now) throws DoNotRetryIOException {
|
||||||
|
if (timestampSlop == HConstants.LATEST_TIMESTAMP) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
long maxTs = now + timestampSlop;
|
||||||
|
for (List<KeyValue> kvs : familyMap.values()) {
|
||||||
|
for (KeyValue kv : kvs) {
|
||||||
|
// see if the user-side TS is out of range. latest = server-side
|
||||||
|
if (!kv.isLatestTimestamp() && kv.getTimestamp() > maxTs) {
|
||||||
|
throw new DoNotRetryIOException("Timestamp for KV out of range "
|
||||||
|
+ kv + " (too.new=" + timestampSlop + ")");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Append the given map of family->edits to a WALEdit data structure.
|
* Append the given map of family->edits to a WALEdit data structure.
|
||||||
|
|
|
@ -3285,8 +3285,8 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler,
|
||||||
if (code.getOperationStatusCode() == OperationStatusCode.SUCCESS) {
|
if (code.getOperationStatusCode() == OperationStatusCode.SUCCESS) {
|
||||||
result = new Result();
|
result = new Result();
|
||||||
} else if (code.getOperationStatusCode()
|
} else if (code.getOperationStatusCode()
|
||||||
== OperationStatusCode.BAD_FAMILY) {
|
== OperationStatusCode.SANITY_CHECK_FAILURE) {
|
||||||
result = new NoSuchColumnFamilyException(code.getExceptionMsg());
|
result = new DoNotRetryIOException(code.getExceptionMsg());
|
||||||
}
|
}
|
||||||
// FAILURE && NOT_RUN becomes null, aka: need to run again.
|
// FAILURE && NOT_RUN becomes null, aka: need to run again.
|
||||||
|
|
||||||
|
|
|
@ -28,6 +28,7 @@ import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
|
||||||
import org.apache.hadoop.util.StringUtils;
|
import org.apache.hadoop.util.StringUtils;
|
||||||
|
import org.apache.hadoop.hbase.DoNotRetryIOException;
|
||||||
import org.apache.hadoop.hbase.HColumnDescriptor;
|
import org.apache.hadoop.hbase.HColumnDescriptor;
|
||||||
import org.apache.hadoop.hbase.KeyValue;
|
import org.apache.hadoop.hbase.KeyValue;
|
||||||
import org.apache.hadoop.hbase.client.Get;
|
import org.apache.hadoop.hbase.client.Get;
|
||||||
|
@ -74,7 +75,7 @@ public class RowResultGenerator extends ResultGenerator {
|
||||||
if (result != null && !result.isEmpty()) {
|
if (result != null && !result.isEmpty()) {
|
||||||
valuesI = result.list().iterator();
|
valuesI = result.list().iterator();
|
||||||
}
|
}
|
||||||
} catch (NoSuchColumnFamilyException e) {
|
} catch (DoNotRetryIOException e) {
|
||||||
// Warn here because Stargate will return 404 in the case if multiple
|
// Warn here because Stargate will return 404 in the case if multiple
|
||||||
// column families were specified but one did not exist -- currently
|
// column families were specified but one did not exist -- currently
|
||||||
// HBase will fail the whole Get.
|
// HBase will fail the whole Get.
|
||||||
|
|
|
@ -473,7 +473,7 @@ public class TestHRegion extends HBaseTestCase {
|
||||||
boolean exception = false;
|
boolean exception = false;
|
||||||
try {
|
try {
|
||||||
this.region.put(p);
|
this.region.put(p);
|
||||||
} catch (NoSuchColumnFamilyException e) {
|
} catch (DoNotRetryIOException e) {
|
||||||
exception = true;
|
exception = true;
|
||||||
}
|
}
|
||||||
assertTrue(exception);
|
assertTrue(exception);
|
||||||
|
@ -510,7 +510,7 @@ public class TestHRegion extends HBaseTestCase {
|
||||||
codes = this.region.put(puts);
|
codes = this.region.put(puts);
|
||||||
assertEquals(10, codes.length);
|
assertEquals(10, codes.length);
|
||||||
for (int i = 0; i < 10; i++) {
|
for (int i = 0; i < 10; i++) {
|
||||||
assertEquals((i == 5) ? OperationStatusCode.BAD_FAMILY :
|
assertEquals((i == 5) ? OperationStatusCode.SANITY_CHECK_FAILURE :
|
||||||
OperationStatusCode.SUCCESS, codes[i].getOperationStatusCode());
|
OperationStatusCode.SUCCESS, codes[i].getOperationStatusCode());
|
||||||
}
|
}
|
||||||
assertEquals(1, HLog.getSyncTime().count);
|
assertEquals(1, HLog.getSyncTime().count);
|
||||||
|
@ -548,7 +548,7 @@ public class TestHRegion extends HBaseTestCase {
|
||||||
assertEquals(1, HLog.getSyncTime().count);
|
assertEquals(1, HLog.getSyncTime().count);
|
||||||
codes = retFromThread.get();
|
codes = retFromThread.get();
|
||||||
for (int i = 0; i < 10; i++) {
|
for (int i = 0; i < 10; i++) {
|
||||||
assertEquals((i == 5) ? OperationStatusCode.BAD_FAMILY :
|
assertEquals((i == 5) ? OperationStatusCode.SANITY_CHECK_FAILURE :
|
||||||
OperationStatusCode.SUCCESS, codes[i].getOperationStatusCode());
|
OperationStatusCode.SUCCESS, codes[i].getOperationStatusCode());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -565,7 +565,7 @@ public class TestHRegion extends HBaseTestCase {
|
||||||
codes = region.put(putsAndLocks.toArray(new Pair[0]));
|
codes = region.put(putsAndLocks.toArray(new Pair[0]));
|
||||||
LOG.info("...performed put");
|
LOG.info("...performed put");
|
||||||
for (int i = 0; i < 10; i++) {
|
for (int i = 0; i < 10; i++) {
|
||||||
assertEquals((i == 5) ? OperationStatusCode.BAD_FAMILY :
|
assertEquals((i == 5) ? OperationStatusCode.SANITY_CHECK_FAILURE :
|
||||||
OperationStatusCode.SUCCESS, codes[i].getOperationStatusCode());
|
OperationStatusCode.SUCCESS, codes[i].getOperationStatusCode());
|
||||||
}
|
}
|
||||||
// Make sure we didn't do an extra batch
|
// Make sure we didn't do an extra batch
|
||||||
|
@ -1056,6 +1056,35 @@ public class TestHRegion extends HBaseTestCase {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Tests that there is server-side filtering for invalid timestamp upper
|
||||||
|
* bound. Note that the timestamp lower bound is automatically handled for us
|
||||||
|
* by the TTL field.
|
||||||
|
*/
|
||||||
|
public void testPutWithTsSlop() throws IOException {
|
||||||
|
byte[] tableName = Bytes.toBytes("testtable");
|
||||||
|
byte[] fam = Bytes.toBytes("info");
|
||||||
|
byte[][] families = { fam };
|
||||||
|
String method = this.getName();
|
||||||
|
HBaseConfiguration conf = new HBaseConfiguration();
|
||||||
|
|
||||||
|
// add data with a timestamp that is too recent for range. Ensure assert
|
||||||
|
conf.setInt("hbase.hregion.keyvalue.timestamp.slop.millisecs", 1000);
|
||||||
|
initHRegion(tableName, method, conf, families);
|
||||||
|
try {
|
||||||
|
// no TS specified == use latest. should not error
|
||||||
|
region.put(new Put(row).add(fam, Bytes.toBytes("qual"), Bytes
|
||||||
|
.toBytes("value")), false);
|
||||||
|
// TS out of range. should error
|
||||||
|
region.put(new Put(row).add(fam, Bytes.toBytes("qual"),
|
||||||
|
System.currentTimeMillis() + 2000,
|
||||||
|
Bytes.toBytes("value")), false);
|
||||||
|
fail("Expected IOE for TS out of configured timerange");
|
||||||
|
} catch (DoNotRetryIOException ioe) {
|
||||||
|
LOG.debug("Received expected exception", ioe);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
public void testScanner_DeleteOneFamilyNotAnother() throws IOException {
|
public void testScanner_DeleteOneFamilyNotAnother() throws IOException {
|
||||||
byte [] tableName = Bytes.toBytes("test_table");
|
byte [] tableName = Bytes.toBytes("test_table");
|
||||||
byte [] fam1 = Bytes.toBytes("columnA");
|
byte [] fam1 = Bytes.toBytes("columnA");
|
||||||
|
@ -1204,7 +1233,7 @@ public class TestHRegion extends HBaseTestCase {
|
||||||
//Test
|
//Test
|
||||||
try {
|
try {
|
||||||
region.get(get, null);
|
region.get(get, null);
|
||||||
} catch (NoSuchColumnFamilyException e){
|
} catch (DoNotRetryIOException e) {
|
||||||
assertFalse(false);
|
assertFalse(false);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue