HBASE-18145 The flush may cause the corrupt data for reading
Signed-off-by: Andrew Purtell <apurtell@apache.org>
This commit is contained in:
parent
858bccfcb8
commit
da3c023635
|
@ -95,9 +95,12 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
|
|||
private final long maxRowSize;
|
||||
private final long cellsPerHeartbeatCheck;
|
||||
|
||||
// Collects all the KVHeap that are eagerly getting closed during the
|
||||
// course of a scan
|
||||
private final List<KeyValueHeap> heapsForDelayedClose = new ArrayList<>();
|
||||
// 1) Collects all the KVHeap that are eagerly getting closed during the
|
||||
// course of a scan
|
||||
// 2) Collects the unused memstore scanners. If we close the memstore scanners
|
||||
// before sending data to client, the chunk may be reclaimed by other
|
||||
// updates and the data will be corrupt.
|
||||
private final List<KeyValueScanner> scannersForDelayedClose = new ArrayList<>();
|
||||
|
||||
/**
|
||||
* The number of KVs seen by the scanner. Includes explicitly skipped KVs, but not
|
||||
|
@ -485,23 +488,20 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
|
|||
close(true);
|
||||
}
|
||||
|
||||
private void close(boolean withHeapClose) {
|
||||
private void close(boolean withDelayedScannersClose) {
|
||||
if (this.closing) {
|
||||
return;
|
||||
}
|
||||
if (withHeapClose) {
|
||||
if (withDelayedScannersClose) {
|
||||
this.closing = true;
|
||||
}
|
||||
// Under test, we dont have a this.store
|
||||
if (this.store != null) {
|
||||
this.store.deleteChangedReaderObserver(this);
|
||||
}
|
||||
if (withHeapClose) {
|
||||
if (withDelayedScannersClose) {
|
||||
clearAndClose(scannersForDelayedClose);
|
||||
clearAndClose(memStoreScannersAfterFlush);
|
||||
for (KeyValueHeap h : this.heapsForDelayedClose) {
|
||||
h.close();
|
||||
}
|
||||
this.heapsForDelayedClose.clear();
|
||||
if (this.heap != null) {
|
||||
this.heap.close();
|
||||
this.currentScanners.clear();
|
||||
|
@ -509,7 +509,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
|
|||
}
|
||||
} else {
|
||||
if (this.heap != null) {
|
||||
this.heapsForDelayedClose.add(this.heap);
|
||||
this.scannersForDelayedClose.add(this.heap);
|
||||
this.currentScanners.clear();
|
||||
this.heap = null;
|
||||
}
|
||||
|
@ -879,7 +879,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
|
|||
// remove the older memstore scanner
|
||||
for (int i = currentScanners.size() - 1; i >=0; i--) {
|
||||
if (!currentScanners.get(i).isFileScanner()) {
|
||||
currentScanners.remove(i).close();
|
||||
scannersForDelayedClose.add(currentScanners.remove(i));
|
||||
} else {
|
||||
// we add the memstore scanner to the end of currentScanners
|
||||
break;
|
||||
|
@ -1121,8 +1121,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
|
|||
}
|
||||
matcher.beforeShipped();
|
||||
// There wont be further fetch of Cells from these scanners. Just close.
|
||||
this.heapsForDelayedClose.forEach(KeyValueHeap::close);
|
||||
this.heapsForDelayedClose.clear();
|
||||
clearAndClose(scannersForDelayedClose);
|
||||
if (this.heap != null) {
|
||||
this.heap.shipped();
|
||||
// When switching from pread to stream, we will open a new scanner for each store file, but
|
||||
|
|
|
@ -36,6 +36,7 @@ import java.util.Collection;
|
|||
import java.util.Collections;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.ListIterator;
|
||||
import java.util.NavigableSet;
|
||||
import java.util.TreeSet;
|
||||
import java.util.concurrent.ConcurrentSkipListSet;
|
||||
|
@ -44,6 +45,7 @@ import java.util.concurrent.ExecutorService;
|
|||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.function.Consumer;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
@ -193,7 +195,7 @@ public class TestStore {
|
|||
} else {
|
||||
htd.addFamily(hcd);
|
||||
}
|
||||
ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null);
|
||||
ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, MemStoreLABImpl.CHUNK_SIZE_DEFAULT, 1, 0, null);
|
||||
HRegionInfo info = new HRegionInfo(htd.getTableName(), null, null, false);
|
||||
final Configuration walConf = new Configuration(conf);
|
||||
FSUtils.setRootDir(walConf, basedir);
|
||||
|
@ -1155,6 +1157,62 @@ public class TestStore {
|
|||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testReclaimChunkWhenScaning() throws IOException {
|
||||
init("testReclaimChunkWhenScaning");
|
||||
long ts = EnvironmentEdgeManager.currentTime();
|
||||
long seqId = 100;
|
||||
byte[] value = Bytes.toBytes("value");
|
||||
// older data whihc shouldn't be "seen" by client
|
||||
store.add(createCell(qf1, ts, seqId, value), null);
|
||||
store.add(createCell(qf2, ts, seqId, value), null);
|
||||
store.add(createCell(qf3, ts, seqId, value), null);
|
||||
TreeSet<byte[]> quals = new TreeSet<>(Bytes.BYTES_COMPARATOR);
|
||||
quals.add(qf1);
|
||||
quals.add(qf2);
|
||||
quals.add(qf3);
|
||||
try (InternalScanner scanner = (InternalScanner) store.getScanner(
|
||||
new Scan(new Get(row)), quals, seqId)) {
|
||||
List<Cell> results = new MyList<>(size -> {
|
||||
switch (size) {
|
||||
// 1) we get the first cell (qf1)
|
||||
// 2) flush the data to have StoreScanner update inner scanners
|
||||
// 3) the chunk will be reclaimed after updaing
|
||||
case 1:
|
||||
try {
|
||||
flushStore(store, id++);
|
||||
} catch (IOException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
break;
|
||||
// 1) we get the second cell (qf2)
|
||||
// 2) add some cell to fill some byte into the chunk (we have only one chunk)
|
||||
case 2:
|
||||
try {
|
||||
byte[] newValue = Bytes.toBytes("newValue");
|
||||
// older data whihc shouldn't be "seen" by client
|
||||
store.add(createCell(qf1, ts + 1, seqId + 1, newValue), null);
|
||||
store.add(createCell(qf2, ts + 1, seqId + 1, newValue), null);
|
||||
store.add(createCell(qf3, ts + 1, seqId + 1, newValue), null);
|
||||
} catch (IOException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
break;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
});
|
||||
scanner.next(results);
|
||||
assertEquals(3, results.size());
|
||||
for (Cell c : results) {
|
||||
byte[] actualValue = CellUtil.cloneValue(c);
|
||||
assertTrue("expected:" + Bytes.toStringBinary(value)
|
||||
+ ", actual:" + Bytes.toStringBinary(actualValue)
|
||||
, Bytes.equals(actualValue, value));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private MyStore initMyStore(String methodName, Configuration conf, MyScannerHook hook) throws IOException {
|
||||
HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(table));
|
||||
HColumnDescriptor hcd = new HColumnDescriptor(family);
|
||||
|
@ -1222,4 +1280,82 @@ public class TestStore {
|
|||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
private static class MyList<T> implements List<T> {
|
||||
private final List<T> delegatee = new ArrayList<>();
|
||||
private final Consumer<Integer> hookAtAdd;
|
||||
MyList(final Consumer<Integer> hookAtAdd) {
|
||||
this.hookAtAdd = hookAtAdd;
|
||||
}
|
||||
@Override
|
||||
public int size() {return delegatee.size();}
|
||||
|
||||
@Override
|
||||
public boolean isEmpty() {return delegatee.isEmpty();}
|
||||
|
||||
@Override
|
||||
public boolean contains(Object o) {return delegatee.contains(o);}
|
||||
|
||||
@Override
|
||||
public Iterator<T> iterator() {return delegatee.iterator();}
|
||||
|
||||
@Override
|
||||
public Object[] toArray() {return delegatee.toArray();}
|
||||
|
||||
@Override
|
||||
public <T> T[] toArray(T[] a) {return delegatee.toArray(a);}
|
||||
|
||||
@Override
|
||||
public boolean add(T e) {
|
||||
hookAtAdd.accept(size());
|
||||
return delegatee.add(e);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean remove(Object o) {return delegatee.remove(o);}
|
||||
|
||||
@Override
|
||||
public boolean containsAll(Collection<?> c) {return delegatee.containsAll(c);}
|
||||
|
||||
@Override
|
||||
public boolean addAll(Collection<? extends T> c) {return delegatee.addAll(c);}
|
||||
|
||||
@Override
|
||||
public boolean addAll(int index, Collection<? extends T> c) {return delegatee.addAll(index, c);}
|
||||
|
||||
@Override
|
||||
public boolean removeAll(Collection<?> c) {return delegatee.removeAll(c);}
|
||||
|
||||
@Override
|
||||
public boolean retainAll(Collection<?> c) {return delegatee.retainAll(c);}
|
||||
|
||||
@Override
|
||||
public void clear() {delegatee.clear();}
|
||||
|
||||
@Override
|
||||
public T get(int index) {return delegatee.get(index);}
|
||||
|
||||
@Override
|
||||
public T set(int index, T element) {return delegatee.set(index, element);}
|
||||
|
||||
@Override
|
||||
public void add(int index, T element) {delegatee.add(index, element);}
|
||||
|
||||
@Override
|
||||
public T remove(int index) {return delegatee.remove(index);}
|
||||
|
||||
@Override
|
||||
public int indexOf(Object o) {return delegatee.indexOf(o);}
|
||||
|
||||
@Override
|
||||
public int lastIndexOf(Object o) {return delegatee.lastIndexOf(o);}
|
||||
|
||||
@Override
|
||||
public ListIterator<T> listIterator() {return delegatee.listIterator();}
|
||||
|
||||
@Override
|
||||
public ListIterator<T> listIterator(int index) {return delegatee.listIterator(index);}
|
||||
|
||||
@Override
|
||||
public List<T> subList(int fromIndex, int toIndex) {return delegatee.subList(fromIndex, toIndex);}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue