From 9cb57ae35e3f90d7906a565abed64ea021b9dce8 Mon Sep 17 00:00:00 2001 From: Chia-Ping Tsai Date: Tue, 6 Jun 2017 15:12:25 +0800 Subject: [PATCH] HBASE-18145 The flush may cause the corrupt data for reading Signed-off-by: Andrew Purtell --- .../hbase/regionserver/StoreScanner.java | 8 +- .../hadoop/hbase/regionserver/TestStore.java | 146 ++++++++++++++++++ 2 files changed, 153 insertions(+), 1 deletion(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java index 05cfe24d77a..b10c37d39ef 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java @@ -91,6 +91,11 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner protected final long maxRowSize; protected final long cellsPerHeartbeatCheck; + /** + * If we close the memstore scanners before sending data to client, the chunk may be reclaimed + * by other updates and the data will be corrupt. + */ + private final List scannersForDelayedClose = new ArrayList<>(); /** * The number of KVs seen by the scanner. Includes explicitly skipped KVs, but not * KVs skipped via seeking to next row/column. TODO: estimate them? @@ -455,6 +460,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner public void close() { if (this.closing) return; this.closing = true; + clearAndClose(scannersForDelayedClose); clearAndClose(memStoreScannersAfterFlush); // Under test, we dont have a this.store if (this.store != null) @@ -878,7 +884,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner // remove the older memstore scanner for (int i = 0; i < currentScanners.size(); i++) { if (!currentScanners.get(i).isFileScanner()) { - currentScanners.remove(i).close(); + scannersForDelayedClose.add(currentScanners.remove(i)); break; } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java index 1dc6ae5df4d..573de11ffb1 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java @@ -37,6 +37,7 @@ import java.util.Collection; import java.util.Collections; import java.util.Iterator; import java.util.List; +import java.util.ListIterator; import java.util.NavigableSet; import java.util.TreeSet; import java.util.concurrent.ConcurrentSkipListSet; @@ -74,6 +75,7 @@ import org.apache.hadoop.hbase.io.hfile.HFile; import org.apache.hadoop.hbase.io.hfile.HFileContext; import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; import org.apache.hadoop.hbase.monitoring.MonitoredTask; +import static org.apache.hadoop.hbase.regionserver.MemStoreChunkPool.CHUNK_POOL_MAXSIZE_KEY; import org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration; import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor; import org.apache.hadoop.hbase.regionserver.querymatcher.ScanQueryMatcher; @@ -1228,6 +1230,67 @@ public class TestStore { } + @Test + public void testReclaimChunkWhenScaning() throws IOException { + Configuration conf = HBaseConfiguration.create(); + conf.setFloat(CHUNK_POOL_MAXSIZE_KEY, 1); + init("testReclaimChunkWhenScaning", conf); + final long ts = EnvironmentEdgeManager.currentTime(); + final long seqId = 100; + byte[] value = Bytes.toBytes("value"); + // older data whihc shouldn't be "seen" by client + store.add(createCell(qf1, ts, seqId, value)); + store.add(createCell(qf2, ts, seqId, value)); + store.add(createCell(qf3, ts, seqId, value)); + TreeSet quals = new TreeSet<>(Bytes.BYTES_COMPARATOR); + quals.add(qf1); + quals.add(qf2); + quals.add(qf3); + try (InternalScanner scanner = (InternalScanner) store.getScanner( + new Scan(new Get(row)), quals, seqId)) { + List results = new MyList<>(new MyListHook() { + @Override + public void hook(int size) { + switch (size) { + // 1) we get the first cell (qf1) + // 2) flush the data to have StoreScanner update inner scanners + // 3) the chunk will be reclaimed after updaing + case 1: + try { + flushStore(store, id++); + } catch (IOException e) { + throw new RuntimeException(e); + } + break; + // 1) we get the second cell (qf2) + // 2) add some cell to fill some byte into the chunk (we have only one chunk) + case 2: + try { + byte[] newValue = Bytes.toBytes("newValue"); + // older data whihc shouldn't be "seen" by client + store.add(createCell(qf1, ts + 1, seqId + 1, newValue)); + store.add(createCell(qf2, ts + 1, seqId + 1, newValue)); + store.add(createCell(qf3, ts + 1, seqId + 1, newValue)); + } catch (IOException e) { + throw new RuntimeException(e); + } + break; + default: + break; + } + } + }); + scanner.next(results); + assertEquals(3, results.size()); + for (Cell c : results) { + byte[] actualValue = CellUtil.cloneValue(c); + assertTrue("expected:" + Bytes.toStringBinary(value) + + ", actual:" + Bytes.toStringBinary(actualValue) + , Bytes.equals(actualValue, value)); + } + } + } + private MyStore initMyStore(String methodName, Configuration conf, MyScannerHook hook) throws IOException { HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(table)); HColumnDescriptor hcd = new HColumnDescriptor(family); @@ -1259,4 +1322,87 @@ public class TestStore { void hook(MyStore store) throws IOException; } + + interface MyListHook { + void hook(int currentSize); + } + + private static class MyList implements List { + private final List delegatee = new ArrayList<>(); + private final MyListHook hookAtAdd; + MyList(final MyListHook hookAtAdd) { + this.hookAtAdd = hookAtAdd; + } + @Override + public int size() {return delegatee.size();} + + @Override + public boolean isEmpty() {return delegatee.isEmpty();} + + @Override + public boolean contains(Object o) {return delegatee.contains(o);} + + @Override + public Iterator iterator() {return delegatee.iterator();} + + @Override + public Object[] toArray() {return delegatee.toArray();} + + @Override + public T[] toArray(T[] a) {return delegatee.toArray(a);} + + @Override + public boolean add(T e) { + hookAtAdd.hook(size()); + return delegatee.add(e); + } + + @Override + public boolean remove(Object o) {return delegatee.remove(o);} + + @Override + public boolean containsAll(Collection c) {return delegatee.containsAll(c);} + + @Override + public boolean addAll(Collection c) {return delegatee.addAll(c);} + + @Override + public boolean addAll(int index, Collection c) {return delegatee.addAll(index, c);} + + @Override + public boolean removeAll(Collection c) {return delegatee.removeAll(c);} + + @Override + public boolean retainAll(Collection c) {return delegatee.retainAll(c);} + + @Override + public void clear() {delegatee.clear();} + + @Override + public T get(int index) {return delegatee.get(index);} + + @Override + public T set(int index, T element) {return delegatee.set(index, element);} + + @Override + public void add(int index, T element) {delegatee.add(index, element);} + + @Override + public T remove(int index) {return delegatee.remove(index);} + + @Override + public int indexOf(Object o) {return delegatee.indexOf(o);} + + @Override + public int lastIndexOf(Object o) {return delegatee.lastIndexOf(o);} + + @Override + public ListIterator listIterator() {return delegatee.listIterator();} + + @Override + public ListIterator listIterator(int index) {return delegatee.listIterator(index);} + + @Override + public List subList(int fromIndex, int toIndex) {return delegatee.subList(fromIndex, toIndex);} + } } \ No newline at end of file