HBASE-18145 The flush may cause the corrupt data for reading
Signed-off-by: Andrew Purtell <apurtell@apache.org>
This commit is contained in:
parent
e0dbafd7cc
commit
9cb57ae35e
|
@ -91,6 +91,11 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
|
||||||
protected final long maxRowSize;
|
protected final long maxRowSize;
|
||||||
protected final long cellsPerHeartbeatCheck;
|
protected final long cellsPerHeartbeatCheck;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* If we close the memstore scanners before sending data to client, the chunk may be reclaimed
|
||||||
|
* by other updates and the data will be corrupt.
|
||||||
|
*/
|
||||||
|
private final List<KeyValueScanner> scannersForDelayedClose = new ArrayList<>();
|
||||||
/**
|
/**
|
||||||
* The number of KVs seen by the scanner. Includes explicitly skipped KVs, but not
|
* The number of KVs seen by the scanner. Includes explicitly skipped KVs, but not
|
||||||
* KVs skipped via seeking to next row/column. TODO: estimate them?
|
* KVs skipped via seeking to next row/column. TODO: estimate them?
|
||||||
|
@ -455,6 +460,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
|
||||||
public void close() {
|
public void close() {
|
||||||
if (this.closing) return;
|
if (this.closing) return;
|
||||||
this.closing = true;
|
this.closing = true;
|
||||||
|
clearAndClose(scannersForDelayedClose);
|
||||||
clearAndClose(memStoreScannersAfterFlush);
|
clearAndClose(memStoreScannersAfterFlush);
|
||||||
// Under test, we dont have a this.store
|
// Under test, we dont have a this.store
|
||||||
if (this.store != null)
|
if (this.store != null)
|
||||||
|
@ -878,7 +884,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
|
||||||
// remove the older memstore scanner
|
// remove the older memstore scanner
|
||||||
for (int i = 0; i < currentScanners.size(); i++) {
|
for (int i = 0; i < currentScanners.size(); i++) {
|
||||||
if (!currentScanners.get(i).isFileScanner()) {
|
if (!currentScanners.get(i).isFileScanner()) {
|
||||||
currentScanners.remove(i).close();
|
scannersForDelayedClose.add(currentScanners.remove(i));
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -37,6 +37,7 @@ import java.util.Collection;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.Iterator;
|
import java.util.Iterator;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.ListIterator;
|
||||||
import java.util.NavigableSet;
|
import java.util.NavigableSet;
|
||||||
import java.util.TreeSet;
|
import java.util.TreeSet;
|
||||||
import java.util.concurrent.ConcurrentSkipListSet;
|
import java.util.concurrent.ConcurrentSkipListSet;
|
||||||
|
@ -74,6 +75,7 @@ import org.apache.hadoop.hbase.io.hfile.HFile;
|
||||||
import org.apache.hadoop.hbase.io.hfile.HFileContext;
|
import org.apache.hadoop.hbase.io.hfile.HFileContext;
|
||||||
import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
|
import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
|
||||||
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
|
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
|
||||||
|
import static org.apache.hadoop.hbase.regionserver.MemStoreChunkPool.CHUNK_POOL_MAXSIZE_KEY;
|
||||||
import org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration;
|
import org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration;
|
||||||
import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor;
|
import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor;
|
||||||
import org.apache.hadoop.hbase.regionserver.querymatcher.ScanQueryMatcher;
|
import org.apache.hadoop.hbase.regionserver.querymatcher.ScanQueryMatcher;
|
||||||
|
@ -1228,6 +1230,67 @@ public class TestStore {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testReclaimChunkWhenScaning() throws IOException {
|
||||||
|
Configuration conf = HBaseConfiguration.create();
|
||||||
|
conf.setFloat(CHUNK_POOL_MAXSIZE_KEY, 1);
|
||||||
|
init("testReclaimChunkWhenScaning", conf);
|
||||||
|
final long ts = EnvironmentEdgeManager.currentTime();
|
||||||
|
final long seqId = 100;
|
||||||
|
byte[] value = Bytes.toBytes("value");
|
||||||
|
// older data whihc shouldn't be "seen" by client
|
||||||
|
store.add(createCell(qf1, ts, seqId, value));
|
||||||
|
store.add(createCell(qf2, ts, seqId, value));
|
||||||
|
store.add(createCell(qf3, ts, seqId, value));
|
||||||
|
TreeSet<byte[]> quals = new TreeSet<>(Bytes.BYTES_COMPARATOR);
|
||||||
|
quals.add(qf1);
|
||||||
|
quals.add(qf2);
|
||||||
|
quals.add(qf3);
|
||||||
|
try (InternalScanner scanner = (InternalScanner) store.getScanner(
|
||||||
|
new Scan(new Get(row)), quals, seqId)) {
|
||||||
|
List<Cell> results = new MyList<>(new MyListHook() {
|
||||||
|
@Override
|
||||||
|
public void hook(int size) {
|
||||||
|
switch (size) {
|
||||||
|
// 1) we get the first cell (qf1)
|
||||||
|
// 2) flush the data to have StoreScanner update inner scanners
|
||||||
|
// 3) the chunk will be reclaimed after updaing
|
||||||
|
case 1:
|
||||||
|
try {
|
||||||
|
flushStore(store, id++);
|
||||||
|
} catch (IOException e) {
|
||||||
|
throw new RuntimeException(e);
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
// 1) we get the second cell (qf2)
|
||||||
|
// 2) add some cell to fill some byte into the chunk (we have only one chunk)
|
||||||
|
case 2:
|
||||||
|
try {
|
||||||
|
byte[] newValue = Bytes.toBytes("newValue");
|
||||||
|
// older data whihc shouldn't be "seen" by client
|
||||||
|
store.add(createCell(qf1, ts + 1, seqId + 1, newValue));
|
||||||
|
store.add(createCell(qf2, ts + 1, seqId + 1, newValue));
|
||||||
|
store.add(createCell(qf3, ts + 1, seqId + 1, newValue));
|
||||||
|
} catch (IOException e) {
|
||||||
|
throw new RuntimeException(e);
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
default:
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
scanner.next(results);
|
||||||
|
assertEquals(3, results.size());
|
||||||
|
for (Cell c : results) {
|
||||||
|
byte[] actualValue = CellUtil.cloneValue(c);
|
||||||
|
assertTrue("expected:" + Bytes.toStringBinary(value)
|
||||||
|
+ ", actual:" + Bytes.toStringBinary(actualValue)
|
||||||
|
, Bytes.equals(actualValue, value));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private MyStore initMyStore(String methodName, Configuration conf, MyScannerHook hook) throws IOException {
|
private MyStore initMyStore(String methodName, Configuration conf, MyScannerHook hook) throws IOException {
|
||||||
HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(table));
|
HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(table));
|
||||||
HColumnDescriptor hcd = new HColumnDescriptor(family);
|
HColumnDescriptor hcd = new HColumnDescriptor(family);
|
||||||
|
@ -1259,4 +1322,87 @@ public class TestStore {
|
||||||
|
|
||||||
void hook(MyStore store) throws IOException;
|
void hook(MyStore store) throws IOException;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
interface MyListHook {
|
||||||
|
void hook(int currentSize);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static class MyList<T> implements List<T> {
|
||||||
|
private final List<T> delegatee = new ArrayList<>();
|
||||||
|
private final MyListHook hookAtAdd;
|
||||||
|
MyList(final MyListHook hookAtAdd) {
|
||||||
|
this.hookAtAdd = hookAtAdd;
|
||||||
|
}
|
||||||
|
@Override
|
||||||
|
public int size() {return delegatee.size();}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean isEmpty() {return delegatee.isEmpty();}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean contains(Object o) {return delegatee.contains(o);}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Iterator<T> iterator() {return delegatee.iterator();}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Object[] toArray() {return delegatee.toArray();}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public <T> T[] toArray(T[] a) {return delegatee.toArray(a);}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean add(T e) {
|
||||||
|
hookAtAdd.hook(size());
|
||||||
|
return delegatee.add(e);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean remove(Object o) {return delegatee.remove(o);}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean containsAll(Collection<?> c) {return delegatee.containsAll(c);}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean addAll(Collection<? extends T> c) {return delegatee.addAll(c);}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean addAll(int index, Collection<? extends T> c) {return delegatee.addAll(index, c);}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean removeAll(Collection<?> c) {return delegatee.removeAll(c);}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean retainAll(Collection<?> c) {return delegatee.retainAll(c);}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void clear() {delegatee.clear();}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public T get(int index) {return delegatee.get(index);}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public T set(int index, T element) {return delegatee.set(index, element);}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void add(int index, T element) {delegatee.add(index, element);}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public T remove(int index) {return delegatee.remove(index);}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int indexOf(Object o) {return delegatee.indexOf(o);}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int lastIndexOf(Object o) {return delegatee.lastIndexOf(o);}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ListIterator<T> listIterator() {return delegatee.listIterator();}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ListIterator<T> listIterator(int index) {return delegatee.listIterator(index);}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public List<T> subList(int fromIndex, int toIndex) {return delegatee.subList(fromIndex, toIndex);}
|
||||||
|
}
|
||||||
}
|
}
|
Loading…
Reference in New Issue