HBASE-14054 Acknowledged writes may get lost if regionserver clock is set backwards
This commit is contained in:
parent
a399ac9c4f
commit
d1262331eb
|
@ -3288,13 +3288,16 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
||||||
boolean valueIsNull = comparator.getValue() == null ||
|
boolean valueIsNull = comparator.getValue() == null ||
|
||||||
comparator.getValue().length == 0;
|
comparator.getValue().length == 0;
|
||||||
boolean matches = false;
|
boolean matches = false;
|
||||||
|
long cellTs = 0;
|
||||||
if (result.size() == 0 && valueIsNull) {
|
if (result.size() == 0 && valueIsNull) {
|
||||||
matches = true;
|
matches = true;
|
||||||
} else if (result.size() > 0 && result.get(0).getValueLength() == 0 &&
|
} else if (result.size() > 0 && result.get(0).getValueLength() == 0 &&
|
||||||
valueIsNull) {
|
valueIsNull) {
|
||||||
matches = true;
|
matches = true;
|
||||||
|
cellTs = result.get(0).getTimestamp();
|
||||||
} else if (result.size() == 1 && !valueIsNull) {
|
} else if (result.size() == 1 && !valueIsNull) {
|
||||||
Cell kv = result.get(0);
|
Cell kv = result.get(0);
|
||||||
|
cellTs = kv.getTimestamp();
|
||||||
int compareResult = CellComparator.compareValue(kv, comparator);
|
int compareResult = CellComparator.compareValue(kv, comparator);
|
||||||
switch (compareOp) {
|
switch (compareOp) {
|
||||||
case LESS:
|
case LESS:
|
||||||
|
@ -3321,6 +3324,20 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
||||||
}
|
}
|
||||||
//If matches put the new put or delete the new delete
|
//If matches put the new put or delete the new delete
|
||||||
if (matches) {
|
if (matches) {
|
||||||
|
// We have acquired the row lock already. If the system clock is NOT monotonically
|
||||||
|
// non-decreasing (see HBASE-14070) we should make sure that the mutation has a
|
||||||
|
// larger timestamp than what was observed via Get. doBatchMutate already does this, but
|
||||||
|
// there is no way to pass the cellTs. See HBASE-14054.
|
||||||
|
long now = EnvironmentEdgeManager.currentTime();
|
||||||
|
long ts = Math.max(now, cellTs); // ensure write is not eclipsed
|
||||||
|
byte[] byteTs = Bytes.toBytes(ts);
|
||||||
|
|
||||||
|
if (w instanceof Put) {
|
||||||
|
updateCellTimestamps(w.getFamilyCellMap().values(), byteTs);
|
||||||
|
}
|
||||||
|
// else delete is not needed since it already does a second get, and sets the timestamp
|
||||||
|
// from get (see prepareDeleteTimestamps).
|
||||||
|
|
||||||
// All edits for the given row (across all column families) must
|
// All edits for the given row (across all column families) must
|
||||||
// happen atomically.
|
// happen atomically.
|
||||||
doBatchMutate(w);
|
doBatchMutate(w);
|
||||||
|
@ -3367,13 +3384,16 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
||||||
boolean valueIsNull = comparator.getValue() == null ||
|
boolean valueIsNull = comparator.getValue() == null ||
|
||||||
comparator.getValue().length == 0;
|
comparator.getValue().length == 0;
|
||||||
boolean matches = false;
|
boolean matches = false;
|
||||||
|
long cellTs = 0;
|
||||||
if (result.size() == 0 && valueIsNull) {
|
if (result.size() == 0 && valueIsNull) {
|
||||||
matches = true;
|
matches = true;
|
||||||
} else if (result.size() > 0 && result.get(0).getValueLength() == 0 &&
|
} else if (result.size() > 0 && result.get(0).getValueLength() == 0 &&
|
||||||
valueIsNull) {
|
valueIsNull) {
|
||||||
matches = true;
|
matches = true;
|
||||||
|
cellTs = result.get(0).getTimestamp();
|
||||||
} else if (result.size() == 1 && !valueIsNull) {
|
} else if (result.size() == 1 && !valueIsNull) {
|
||||||
Cell kv = result.get(0);
|
Cell kv = result.get(0);
|
||||||
|
cellTs = kv.getTimestamp();
|
||||||
int compareResult = CellComparator.compareValue(kv, comparator);
|
int compareResult = CellComparator.compareValue(kv, comparator);
|
||||||
switch (compareOp) {
|
switch (compareOp) {
|
||||||
case LESS:
|
case LESS:
|
||||||
|
@ -3400,6 +3420,22 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
||||||
}
|
}
|
||||||
//If matches put the new put or delete the new delete
|
//If matches put the new put or delete the new delete
|
||||||
if (matches) {
|
if (matches) {
|
||||||
|
// We have acquired the row lock already. If the system clock is NOT monotonically
|
||||||
|
// non-decreasing (see HBASE-14070) we should make sure that the mutation has a
|
||||||
|
// larger timestamp than what was observed via Get. doBatchMutate already does this, but
|
||||||
|
// there is no way to pass the cellTs. See HBASE-14054.
|
||||||
|
long now = EnvironmentEdgeManager.currentTime();
|
||||||
|
long ts = Math.max(now, cellTs); // ensure write is not eclipsed
|
||||||
|
byte[] byteTs = Bytes.toBytes(ts);
|
||||||
|
|
||||||
|
for (Mutation w : rm.getMutations()) {
|
||||||
|
if (w instanceof Put) {
|
||||||
|
updateCellTimestamps(w.getFamilyCellMap().values(), byteTs);
|
||||||
|
}
|
||||||
|
// else delete is not needed since it already does a second get, and sets the timestamp
|
||||||
|
// from get (see prepareDeleteTimestamps).
|
||||||
|
}
|
||||||
|
|
||||||
// All edits for the given row (across all column families) must
|
// All edits for the given row (across all column families) must
|
||||||
// happen atomically.
|
// happen atomically.
|
||||||
mutateRow(rm);
|
mutateRow(rm);
|
||||||
|
|
|
@ -102,6 +102,7 @@ import org.apache.hadoop.hbase.client.Get;
|
||||||
import org.apache.hadoop.hbase.client.Increment;
|
import org.apache.hadoop.hbase.client.Increment;
|
||||||
import org.apache.hadoop.hbase.client.Put;
|
import org.apache.hadoop.hbase.client.Put;
|
||||||
import org.apache.hadoop.hbase.client.Result;
|
import org.apache.hadoop.hbase.client.Result;
|
||||||
|
import org.apache.hadoop.hbase.client.RowMutations;
|
||||||
import org.apache.hadoop.hbase.client.Scan;
|
import org.apache.hadoop.hbase.client.Scan;
|
||||||
import org.apache.hadoop.hbase.client.Table;
|
import org.apache.hadoop.hbase.client.Table;
|
||||||
import org.apache.hadoop.hbase.exceptions.FailedSanityCheckException;
|
import org.apache.hadoop.hbase.exceptions.FailedSanityCheckException;
|
||||||
|
@ -145,6 +146,7 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper;
|
import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper;
|
||||||
import org.apache.hadoop.hbase.util.FSUtils;
|
import org.apache.hadoop.hbase.util.FSUtils;
|
||||||
import org.apache.hadoop.hbase.util.IncrementingEnvironmentEdge;
|
import org.apache.hadoop.hbase.util.IncrementingEnvironmentEdge;
|
||||||
|
import org.apache.hadoop.hbase.util.ManualEnvironmentEdge;
|
||||||
import org.apache.hadoop.hbase.util.PairOfSameType;
|
import org.apache.hadoop.hbase.util.PairOfSameType;
|
||||||
import org.apache.hadoop.hbase.util.Threads;
|
import org.apache.hadoop.hbase.util.Threads;
|
||||||
import org.apache.hadoop.hbase.wal.DefaultWALProvider;
|
import org.apache.hadoop.hbase.wal.DefaultWALProvider;
|
||||||
|
@ -6240,6 +6242,125 @@ public class TestHRegion {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testIncrementTimestampsAreMonotonic() throws IOException {
|
||||||
|
HRegion region = initHRegion(tableName, name.getMethodName(), CONF, fam1);
|
||||||
|
ManualEnvironmentEdge edge = new ManualEnvironmentEdge();
|
||||||
|
EnvironmentEdgeManager.injectEdge(edge);
|
||||||
|
|
||||||
|
edge.setValue(10);
|
||||||
|
Increment inc = new Increment(row);
|
||||||
|
inc.setDurability(Durability.SKIP_WAL);
|
||||||
|
inc.addColumn(fam1, qual1, 1L);
|
||||||
|
region.increment(inc);
|
||||||
|
|
||||||
|
Result result = region.get(new Get(row));
|
||||||
|
Cell c = result.getColumnLatestCell(fam1, qual1);
|
||||||
|
assertNotNull(c);
|
||||||
|
assertEquals(c.getTimestamp(), 10L);
|
||||||
|
|
||||||
|
edge.setValue(1); // clock goes back
|
||||||
|
region.increment(inc);
|
||||||
|
result = region.get(new Get(row));
|
||||||
|
c = result.getColumnLatestCell(fam1, qual1);
|
||||||
|
assertEquals(c.getTimestamp(), 10L);
|
||||||
|
assertEquals(Bytes.toLong(c.getValueArray(), c.getValueOffset(), c.getValueLength()), 2L);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testAppendTimestampsAreMonotonic() throws IOException {
|
||||||
|
HRegion region = initHRegion(tableName, name.getMethodName(), CONF, fam1);
|
||||||
|
ManualEnvironmentEdge edge = new ManualEnvironmentEdge();
|
||||||
|
EnvironmentEdgeManager.injectEdge(edge);
|
||||||
|
|
||||||
|
edge.setValue(10);
|
||||||
|
Append a = new Append(row);
|
||||||
|
a.setDurability(Durability.SKIP_WAL);
|
||||||
|
a.add(fam1, qual1, qual1);
|
||||||
|
region.append(a);
|
||||||
|
|
||||||
|
Result result = region.get(new Get(row));
|
||||||
|
Cell c = result.getColumnLatestCell(fam1, qual1);
|
||||||
|
assertNotNull(c);
|
||||||
|
assertEquals(c.getTimestamp(), 10L);
|
||||||
|
|
||||||
|
edge.setValue(1); // clock goes back
|
||||||
|
region.append(a);
|
||||||
|
result = region.get(new Get(row));
|
||||||
|
c = result.getColumnLatestCell(fam1, qual1);
|
||||||
|
assertEquals(c.getTimestamp(), 10L);
|
||||||
|
|
||||||
|
byte[] expected = new byte[qual1.length*2];
|
||||||
|
System.arraycopy(qual1, 0, expected, 0, qual1.length);
|
||||||
|
System.arraycopy(qual1, 0, expected, qual1.length, qual1.length);
|
||||||
|
|
||||||
|
assertTrue(Bytes.equals(c.getValueArray(), c.getValueOffset(), c.getValueLength(),
|
||||||
|
expected, 0, expected.length));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testCheckAndMutateTimestampsAreMonotonic() throws IOException {
|
||||||
|
HRegion region = initHRegion(tableName, name.getMethodName(), CONF, fam1);
|
||||||
|
ManualEnvironmentEdge edge = new ManualEnvironmentEdge();
|
||||||
|
EnvironmentEdgeManager.injectEdge(edge);
|
||||||
|
|
||||||
|
edge.setValue(10);
|
||||||
|
Put p = new Put(row);
|
||||||
|
p.setDurability(Durability.SKIP_WAL);
|
||||||
|
p.addColumn(fam1, qual1, qual1);
|
||||||
|
region.put(p);
|
||||||
|
|
||||||
|
Result result = region.get(new Get(row));
|
||||||
|
Cell c = result.getColumnLatestCell(fam1, qual1);
|
||||||
|
assertNotNull(c);
|
||||||
|
assertEquals(c.getTimestamp(), 10L);
|
||||||
|
|
||||||
|
edge.setValue(1); // clock goes back
|
||||||
|
p = new Put(row);
|
||||||
|
p.setDurability(Durability.SKIP_WAL);
|
||||||
|
p.addColumn(fam1, qual1, qual2);
|
||||||
|
region.checkAndMutate(row, fam1, qual1, CompareOp.EQUAL, new BinaryComparator(qual1), p, false);
|
||||||
|
result = region.get(new Get(row));
|
||||||
|
c = result.getColumnLatestCell(fam1, qual1);
|
||||||
|
assertEquals(c.getTimestamp(), 10L);
|
||||||
|
|
||||||
|
assertTrue(Bytes.equals(c.getValueArray(), c.getValueOffset(), c.getValueLength(),
|
||||||
|
qual2, 0, qual2.length));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testCheckAndRowMutateTimestampsAreMonotonic() throws IOException {
|
||||||
|
HRegion region = initHRegion(tableName, name.getMethodName(), CONF, fam1);
|
||||||
|
ManualEnvironmentEdge edge = new ManualEnvironmentEdge();
|
||||||
|
EnvironmentEdgeManager.injectEdge(edge);
|
||||||
|
|
||||||
|
edge.setValue(10);
|
||||||
|
Put p = new Put(row);
|
||||||
|
p.setDurability(Durability.SKIP_WAL);
|
||||||
|
p.addColumn(fam1, qual1, qual1);
|
||||||
|
region.put(p);
|
||||||
|
|
||||||
|
Result result = region.get(new Get(row));
|
||||||
|
Cell c = result.getColumnLatestCell(fam1, qual1);
|
||||||
|
assertNotNull(c);
|
||||||
|
assertEquals(c.getTimestamp(), 10L);
|
||||||
|
|
||||||
|
edge.setValue(1); // clock goes back
|
||||||
|
p = new Put(row);
|
||||||
|
p.setDurability(Durability.SKIP_WAL);
|
||||||
|
p.addColumn(fam1, qual1, qual2);
|
||||||
|
RowMutations rm = new RowMutations(row);
|
||||||
|
rm.add(p);
|
||||||
|
region.checkAndRowMutate(row, fam1, qual1, CompareOp.EQUAL, new BinaryComparator(qual1),
|
||||||
|
rm, false);
|
||||||
|
result = region.get(new Get(row));
|
||||||
|
c = result.getColumnLatestCell(fam1, qual1);
|
||||||
|
assertEquals(c.getTimestamp(), 10L);
|
||||||
|
|
||||||
|
assertTrue(Bytes.equals(c.getValueArray(), c.getValueOffset(), c.getValueLength(),
|
||||||
|
qual2, 0, qual2.length));
|
||||||
|
}
|
||||||
|
|
||||||
static HRegion initHRegion(TableName tableName, String callingMethod,
|
static HRegion initHRegion(TableName tableName, String callingMethod,
|
||||||
byte[]... families) throws IOException {
|
byte[]... families) throws IOException {
|
||||||
return initHRegion(tableName, callingMethod, HBaseConfiguration.create(),
|
return initHRegion(tableName, callingMethod, HBaseConfiguration.create(),
|
||||||
|
|
Loading…
Reference in New Issue