HBASE-14054 Acknowledged writes may get lost if regionserver clock is set backwards

Conflicts:
	hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
	hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
This commit is contained in:
Enis Soztutar 2015-08-13 11:03:37 +03:00
parent 0dbaf0ea51
commit a0086e97ad
2 changed files with 158 additions and 1 deletions

View File

@ -201,7 +201,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
public static final String LOAD_CFS_ON_DEMAND_CONFIG_KEY =
"hbase.hregion.scan.loadColumnFamiliesOnDemand";
// in milliseconds
private static final String MAX_WAIT_FOR_SEQ_ID_KEY =
"hbase.hregion.max.wait.for.seq.id";
@ -3317,13 +3317,16 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
boolean valueIsNull = comparator.getValue() == null ||
comparator.getValue().length == 0;
boolean matches = false;
long cellTs = 0;
if (result.size() == 0 && valueIsNull) {
matches = true;
} else if (result.size() > 0 && result.get(0).getValueLength() == 0 &&
valueIsNull) {
matches = true;
cellTs = result.get(0).getTimestamp();
} else if (result.size() == 1 && !valueIsNull) {
Cell kv = result.get(0);
cellTs = kv.getTimestamp();
int compareResult = comparator.compareTo(kv.getValueArray(),
kv.getValueOffset(), kv.getValueLength());
switch (compareOp) {
@ -3351,6 +3354,20 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
}
//If matches put the new put or delete the new delete
if (matches) {
// We have acquired the row lock already. If the system clock is NOT monotonically
// non-decreasing (see HBASE-14070) we should make sure that the mutation has a
// larger timestamp than what was observed via Get. doBatchMutate already does this, but
// there is no way to pass the cellTs. See HBASE-14054.
long now = EnvironmentEdgeManager.currentTime();
long ts = Math.max(now, cellTs); // ensure write is not eclipsed
byte[] byteTs = Bytes.toBytes(ts);
if (w instanceof Put) {
updateCellTimestamps(w.getFamilyCellMap().values(), byteTs);
}
// else delete is not needed since it already does a second get, and sets the timestamp
// from get (see prepareDeleteTimestamps).
// All edits for the given row (across all column families) must
// happen atomically.
doBatchMutate(w);
@ -3397,13 +3414,16 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
boolean valueIsNull = comparator.getValue() == null ||
comparator.getValue().length == 0;
boolean matches = false;
long cellTs = 0;
if (result.size() == 0 && valueIsNull) {
matches = true;
} else if (result.size() > 0 && result.get(0).getValueLength() == 0 &&
valueIsNull) {
matches = true;
cellTs = result.get(0).getTimestamp();
} else if (result.size() == 1 && !valueIsNull) {
Cell kv = result.get(0);
cellTs = kv.getTimestamp();
int compareResult = comparator.compareTo(kv.getValueArray(),
kv.getValueOffset(), kv.getValueLength());
switch (compareOp) {
@ -3431,6 +3451,22 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
}
//If matches put the new put or delete the new delete
if (matches) {
// We have acquired the row lock already. If the system clock is NOT monotonically
// non-decreasing (see HBASE-14070) we should make sure that the mutation has a
// larger timestamp than what was observed via Get. doBatchMutate already does this, but
// there is no way to pass the cellTs. See HBASE-14054.
long now = EnvironmentEdgeManager.currentTime();
long ts = Math.max(now, cellTs); // ensure write is not eclipsed
byte[] byteTs = Bytes.toBytes(ts);
for (Mutation w : rm.getMutations()) {
if (w instanceof Put) {
updateCellTimestamps(w.getFamilyCellMap().values(), byteTs);
}
// else delete is not needed since it already does a second get, and sets the timestamp
// from get (see prepareDeleteTimestamps).
}
// All edits for the given row (across all column families) must
// happen atomically.
mutateRow(rm);

View File

@ -103,6 +103,7 @@ import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Increment;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.RowMutations;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.exceptions.FailedSanityCheckException;
@ -145,6 +146,7 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.IncrementingEnvironmentEdge;
import org.apache.hadoop.hbase.util.ManualEnvironmentEdge;
import org.apache.hadoop.hbase.util.PairOfSameType;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.wal.DefaultWALProvider;
@ -6194,6 +6196,125 @@ public class TestHRegion {
}
}
@Test
public void testIncrementTimestampsAreMonotonic() throws IOException {
HRegion region = initHRegion(tableName, name.getMethodName(), CONF, fam1);
ManualEnvironmentEdge edge = new ManualEnvironmentEdge();
EnvironmentEdgeManager.injectEdge(edge);
edge.setValue(10);
Increment inc = new Increment(row);
inc.setDurability(Durability.SKIP_WAL);
inc.addColumn(fam1, qual1, 1L);
region.increment(inc);
Result result = region.get(new Get(row));
Cell c = result.getColumnLatestCell(fam1, qual1);
assertNotNull(c);
assertEquals(c.getTimestamp(), 10L);
edge.setValue(1); // clock goes back
region.increment(inc);
result = region.get(new Get(row));
c = result.getColumnLatestCell(fam1, qual1);
assertEquals(c.getTimestamp(), 10L);
assertEquals(Bytes.toLong(c.getValueArray(), c.getValueOffset(), c.getValueLength()), 2L);
}
@Test
public void testAppendTimestampsAreMonotonic() throws IOException {
HRegion region = initHRegion(tableName, name.getMethodName(), CONF, fam1);
ManualEnvironmentEdge edge = new ManualEnvironmentEdge();
EnvironmentEdgeManager.injectEdge(edge);
edge.setValue(10);
Append a = new Append(row);
a.setDurability(Durability.SKIP_WAL);
a.add(fam1, qual1, qual1);
region.append(a);
Result result = region.get(new Get(row));
Cell c = result.getColumnLatestCell(fam1, qual1);
assertNotNull(c);
assertEquals(c.getTimestamp(), 10L);
edge.setValue(1); // clock goes back
region.append(a);
result = region.get(new Get(row));
c = result.getColumnLatestCell(fam1, qual1);
assertEquals(c.getTimestamp(), 10L);
byte[] expected = new byte[qual1.length*2];
System.arraycopy(qual1, 0, expected, 0, qual1.length);
System.arraycopy(qual1, 0, expected, qual1.length, qual1.length);
assertTrue(Bytes.equals(c.getValueArray(), c.getValueOffset(), c.getValueLength(),
expected, 0, expected.length));
}
@Test
public void testCheckAndMutateTimestampsAreMonotonic() throws IOException {
HRegion region = initHRegion(tableName, name.getMethodName(), CONF, fam1);
ManualEnvironmentEdge edge = new ManualEnvironmentEdge();
EnvironmentEdgeManager.injectEdge(edge);
edge.setValue(10);
Put p = new Put(row);
p.setDurability(Durability.SKIP_WAL);
p.addColumn(fam1, qual1, qual1);
region.put(p);
Result result = region.get(new Get(row));
Cell c = result.getColumnLatestCell(fam1, qual1);
assertNotNull(c);
assertEquals(c.getTimestamp(), 10L);
edge.setValue(1); // clock goes back
p = new Put(row);
p.setDurability(Durability.SKIP_WAL);
p.addColumn(fam1, qual1, qual2);
region.checkAndMutate(row, fam1, qual1, CompareOp.EQUAL, new BinaryComparator(qual1), p, false);
result = region.get(new Get(row));
c = result.getColumnLatestCell(fam1, qual1);
assertEquals(c.getTimestamp(), 10L);
assertTrue(Bytes.equals(c.getValueArray(), c.getValueOffset(), c.getValueLength(),
qual2, 0, qual2.length));
}
@Test
public void testCheckAndRowMutateTimestampsAreMonotonic() throws IOException {
HRegion region = initHRegion(tableName, name.getMethodName(), CONF, fam1);
ManualEnvironmentEdge edge = new ManualEnvironmentEdge();
EnvironmentEdgeManager.injectEdge(edge);
edge.setValue(10);
Put p = new Put(row);
p.setDurability(Durability.SKIP_WAL);
p.addColumn(fam1, qual1, qual1);
region.put(p);
Result result = region.get(new Get(row));
Cell c = result.getColumnLatestCell(fam1, qual1);
assertNotNull(c);
assertEquals(c.getTimestamp(), 10L);
edge.setValue(1); // clock goes back
p = new Put(row);
p.setDurability(Durability.SKIP_WAL);
p.addColumn(fam1, qual1, qual2);
RowMutations rm = new RowMutations(row);
rm.add(p);
region.checkAndRowMutate(row, fam1, qual1, CompareOp.EQUAL, new BinaryComparator(qual1),
rm, false);
result = region.get(new Get(row));
c = result.getColumnLatestCell(fam1, qual1);
assertEquals(c.getTimestamp(), 10L);
assertTrue(Bytes.equals(c.getValueArray(), c.getValueOffset(), c.getValueLength(),
qual2, 0, qual2.length));
}
static HRegion initHRegion(byte[] tableName, String callingMethod,
byte[]... families) throws IOException {
return initHRegion(tableName, callingMethod, HBaseConfiguration.create(),