diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index c0df4bf10f0..37d0f08c585 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -203,7 +203,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi public static final String LOAD_CFS_ON_DEMAND_CONFIG_KEY = "hbase.hregion.scan.loadColumnFamiliesOnDemand"; - + // in milliseconds private static final String MAX_WAIT_FOR_SEQ_ID_KEY = "hbase.hregion.max.wait.for.seq.id"; @@ -3288,13 +3288,16 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi boolean valueIsNull = comparator.getValue() == null || comparator.getValue().length == 0; boolean matches = false; + long cellTs = 0; if (result.size() == 0 && valueIsNull) { matches = true; } else if (result.size() > 0 && result.get(0).getValueLength() == 0 && valueIsNull) { matches = true; + cellTs = result.get(0).getTimestamp(); } else if (result.size() == 1 && !valueIsNull) { Cell kv = result.get(0); + cellTs = kv.getTimestamp(); int compareResult = CellComparator.compareValue(kv, comparator); switch (compareOp) { case LESS: @@ -3321,6 +3324,20 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } //If matches put the new put or delete the new delete if (matches) { + // We have acquired the row lock already. If the system clock is NOT monotonically + // non-decreasing (see HBASE-14070) we should make sure that the mutation has a + // larger timestamp than what was observed via Get. doBatchMutate already does this, but + // there is no way to pass the cellTs. See HBASE-14054. + long now = EnvironmentEdgeManager.currentTime(); + long ts = Math.max(now, cellTs); // ensure write is not eclipsed + byte[] byteTs = Bytes.toBytes(ts); + + if (w instanceof Put) { + updateCellTimestamps(w.getFamilyCellMap().values(), byteTs); + } + // else delete is not needed since it already does a second get, and sets the timestamp + // from get (see prepareDeleteTimestamps). + // All edits for the given row (across all column families) must // happen atomically. doBatchMutate(w); @@ -3367,13 +3384,16 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi boolean valueIsNull = comparator.getValue() == null || comparator.getValue().length == 0; boolean matches = false; + long cellTs = 0; if (result.size() == 0 && valueIsNull) { matches = true; } else if (result.size() > 0 && result.get(0).getValueLength() == 0 && valueIsNull) { matches = true; + cellTs = result.get(0).getTimestamp(); } else if (result.size() == 1 && !valueIsNull) { Cell kv = result.get(0); + cellTs = kv.getTimestamp(); int compareResult = CellComparator.compareValue(kv, comparator); switch (compareOp) { case LESS: @@ -3400,6 +3420,22 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } //If matches put the new put or delete the new delete if (matches) { + // We have acquired the row lock already. If the system clock is NOT monotonically + // non-decreasing (see HBASE-14070) we should make sure that the mutation has a + // larger timestamp than what was observed via Get. doBatchMutate already does this, but + // there is no way to pass the cellTs. See HBASE-14054. + long now = EnvironmentEdgeManager.currentTime(); + long ts = Math.max(now, cellTs); // ensure write is not eclipsed + byte[] byteTs = Bytes.toBytes(ts); + + for (Mutation w : rm.getMutations()) { + if (w instanceof Put) { + updateCellTimestamps(w.getFamilyCellMap().values(), byteTs); + } + // else delete is not needed since it already does a second get, and sets the timestamp + // from get (see prepareDeleteTimestamps). + } + // All edits for the given row (across all column families) must // happen atomically. mutateRow(rm); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java index 56a9d4b6dea..826c9b3a208 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java @@ -102,6 +102,7 @@ import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Increment; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.RowMutations; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.exceptions.FailedSanityCheckException; @@ -145,6 +146,7 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper; import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.IncrementingEnvironmentEdge; +import org.apache.hadoop.hbase.util.ManualEnvironmentEdge; import org.apache.hadoop.hbase.util.PairOfSameType; import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.wal.DefaultWALProvider; @@ -6240,6 +6242,125 @@ public class TestHRegion { } } + @Test + public void testIncrementTimestampsAreMonotonic() throws IOException { + HRegion region = initHRegion(tableName, name.getMethodName(), CONF, fam1); + ManualEnvironmentEdge edge = new ManualEnvironmentEdge(); + EnvironmentEdgeManager.injectEdge(edge); + + edge.setValue(10); + Increment inc = new Increment(row); + inc.setDurability(Durability.SKIP_WAL); + inc.addColumn(fam1, qual1, 1L); + region.increment(inc); + + Result result = region.get(new Get(row)); + Cell c = result.getColumnLatestCell(fam1, qual1); + assertNotNull(c); + assertEquals(c.getTimestamp(), 10L); + + edge.setValue(1); // clock goes back + region.increment(inc); + result = region.get(new Get(row)); + c = result.getColumnLatestCell(fam1, qual1); + assertEquals(c.getTimestamp(), 10L); + assertEquals(Bytes.toLong(c.getValueArray(), c.getValueOffset(), c.getValueLength()), 2L); + } + + @Test + public void testAppendTimestampsAreMonotonic() throws IOException { + HRegion region = initHRegion(tableName, name.getMethodName(), CONF, fam1); + ManualEnvironmentEdge edge = new ManualEnvironmentEdge(); + EnvironmentEdgeManager.injectEdge(edge); + + edge.setValue(10); + Append a = new Append(row); + a.setDurability(Durability.SKIP_WAL); + a.add(fam1, qual1, qual1); + region.append(a); + + Result result = region.get(new Get(row)); + Cell c = result.getColumnLatestCell(fam1, qual1); + assertNotNull(c); + assertEquals(c.getTimestamp(), 10L); + + edge.setValue(1); // clock goes back + region.append(a); + result = region.get(new Get(row)); + c = result.getColumnLatestCell(fam1, qual1); + assertEquals(c.getTimestamp(), 10L); + + byte[] expected = new byte[qual1.length*2]; + System.arraycopy(qual1, 0, expected, 0, qual1.length); + System.arraycopy(qual1, 0, expected, qual1.length, qual1.length); + + assertTrue(Bytes.equals(c.getValueArray(), c.getValueOffset(), c.getValueLength(), + expected, 0, expected.length)); + } + + @Test + public void testCheckAndMutateTimestampsAreMonotonic() throws IOException { + HRegion region = initHRegion(tableName, name.getMethodName(), CONF, fam1); + ManualEnvironmentEdge edge = new ManualEnvironmentEdge(); + EnvironmentEdgeManager.injectEdge(edge); + + edge.setValue(10); + Put p = new Put(row); + p.setDurability(Durability.SKIP_WAL); + p.addColumn(fam1, qual1, qual1); + region.put(p); + + Result result = region.get(new Get(row)); + Cell c = result.getColumnLatestCell(fam1, qual1); + assertNotNull(c); + assertEquals(c.getTimestamp(), 10L); + + edge.setValue(1); // clock goes back + p = new Put(row); + p.setDurability(Durability.SKIP_WAL); + p.addColumn(fam1, qual1, qual2); + region.checkAndMutate(row, fam1, qual1, CompareOp.EQUAL, new BinaryComparator(qual1), p, false); + result = region.get(new Get(row)); + c = result.getColumnLatestCell(fam1, qual1); + assertEquals(c.getTimestamp(), 10L); + + assertTrue(Bytes.equals(c.getValueArray(), c.getValueOffset(), c.getValueLength(), + qual2, 0, qual2.length)); + } + + @Test + public void testCheckAndRowMutateTimestampsAreMonotonic() throws IOException { + HRegion region = initHRegion(tableName, name.getMethodName(), CONF, fam1); + ManualEnvironmentEdge edge = new ManualEnvironmentEdge(); + EnvironmentEdgeManager.injectEdge(edge); + + edge.setValue(10); + Put p = new Put(row); + p.setDurability(Durability.SKIP_WAL); + p.addColumn(fam1, qual1, qual1); + region.put(p); + + Result result = region.get(new Get(row)); + Cell c = result.getColumnLatestCell(fam1, qual1); + assertNotNull(c); + assertEquals(c.getTimestamp(), 10L); + + edge.setValue(1); // clock goes back + p = new Put(row); + p.setDurability(Durability.SKIP_WAL); + p.addColumn(fam1, qual1, qual2); + RowMutations rm = new RowMutations(row); + rm.add(p); + region.checkAndRowMutate(row, fam1, qual1, CompareOp.EQUAL, new BinaryComparator(qual1), + rm, false); + result = region.get(new Get(row)); + c = result.getColumnLatestCell(fam1, qual1); + assertEquals(c.getTimestamp(), 10L); + + assertTrue(Bytes.equals(c.getValueArray(), c.getValueOffset(), c.getValueLength(), + qual2, 0, qual2.length)); + } + static HRegion initHRegion(TableName tableName, String callingMethod, byte[]... families) throws IOException { return initHRegion(tableName, callingMethod, HBaseConfiguration.create(),