HBASE-27464 In memory compaction 'COMPACT' may cause data corruption when adding cells large than maxAlloc(default 256k) size (#4881)
Co-authored-by: comnetwork <comnetwork@163.com>
Signed-off-by: Duo Zhang <zhangduo@apache.org>
(cherry picked from commit 7b0ac6451d
)
This commit is contained in:
parent
fd92057f22
commit
acc4d13e7f
|
@ -159,8 +159,11 @@ public class CellChunkImmutableSegment extends ImmutableSegment {
|
|||
offsetInCurentChunk = ChunkCreator.SIZEOF_CHUNK_HEADER;
|
||||
}
|
||||
if (action == MemStoreCompactionStrategy.Action.COMPACT && !alreadyCopied) {
|
||||
// for compaction copy cell to the new segment (MSLAB copy)
|
||||
c = maybeCloneWithAllocator(c, false);
|
||||
|
||||
// For compaction copy cell to the new segment (MSLAB copy),here we set forceCloneOfBigCell
|
||||
// to true, because the chunk which the cell is allocated may be freed after the compaction
|
||||
// is completed, see HBASE-27464.
|
||||
c = maybeCloneWithAllocator(c, true);
|
||||
}
|
||||
offsetInCurentChunk = // add the Cell reference to the index chunk
|
||||
createCellReference((ByteBufferKeyValue) c, chunks[currentChunkIdx].getData(),
|
||||
|
|
|
@ -68,6 +68,7 @@ import org.apache.hadoop.hbase.CellBuilderType;
|
|||
import org.apache.hadoop.hbase.CellComparator;
|
||||
import org.apache.hadoop.hbase.CellComparatorImpl;
|
||||
import org.apache.hadoop.hbase.CellUtil;
|
||||
import org.apache.hadoop.hbase.ExtendedCell;
|
||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
|
@ -97,6 +98,7 @@ import org.apache.hadoop.hbase.io.hfile.HFileContext;
|
|||
import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
|
||||
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
|
||||
import org.apache.hadoop.hbase.quotas.RegionSizeStoreImpl;
|
||||
import org.apache.hadoop.hbase.regionserver.ChunkCreator.ChunkType;
|
||||
import org.apache.hadoop.hbase.regionserver.MemStoreCompactionStrategy.Action;
|
||||
import org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration;
|
||||
import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor;
|
||||
|
@ -1807,6 +1809,106 @@ public class TestHStore {
|
|||
myCompactingMemStore.inMemoryCompactionEndCyclicBarrier.await();
|
||||
}
|
||||
|
||||
/**
|
||||
* This test is for HBASE-27464, before this JIRA,when init {@link CellChunkImmutableSegment} for
|
||||
* 'COMPACT' action, we not force copy to current MSLab. When cell size bigger than
|
||||
* {@link MemStoreLABImpl#maxAlloc}, cell will stay in previous chunk which will recycle after
|
||||
* segment replace, and we may read wrong data when these chunk reused by others.
|
||||
*/
|
||||
@Test
|
||||
public void testForceCloneOfBigCellForCellChunkImmutableSegment() throws Exception {
|
||||
Configuration conf = HBaseConfiguration.create();
|
||||
int maxAllocByteSize = conf.getInt(MemStoreLAB.MAX_ALLOC_KEY, MemStoreLAB.MAX_ALLOC_DEFAULT);
|
||||
|
||||
// Construct big cell,which is large than {@link MemStoreLABImpl#maxAlloc}.
|
||||
byte[] cellValue = new byte[maxAllocByteSize + 1];
|
||||
final long timestamp = EnvironmentEdgeManager.currentTime();
|
||||
final long seqId = 100;
|
||||
final byte[] rowKey1 = Bytes.toBytes("rowKey1");
|
||||
final Cell originalCell1 = createCell(rowKey1, qf1, timestamp, seqId, cellValue);
|
||||
final byte[] rowKey2 = Bytes.toBytes("rowKey2");
|
||||
final Cell originalCell2 = createCell(rowKey2, qf1, timestamp, seqId, cellValue);
|
||||
TreeSet<byte[]> quals = new TreeSet<>(Bytes.BYTES_COMPARATOR);
|
||||
quals.add(qf1);
|
||||
|
||||
int cellByteSize = MutableSegment.getCellLength(originalCell1);
|
||||
int inMemoryFlushByteSize = cellByteSize - 1;
|
||||
|
||||
// set CompactingMemStore.inmemoryFlushSize to flushByteSize.
|
||||
conf.set(HStore.MEMSTORE_CLASS_NAME, MyCompactingMemStore6.class.getName());
|
||||
conf.setDouble(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 0.005);
|
||||
conf.set(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, String.valueOf(inMemoryFlushByteSize * 200));
|
||||
conf.setBoolean(WALFactory.WAL_ENABLED, false);
|
||||
|
||||
// Use {@link MemoryCompactionPolicy#EAGER} for always compacting.
|
||||
init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family)
|
||||
.setInMemoryCompaction(MemoryCompactionPolicy.EAGER).build());
|
||||
|
||||
MyCompactingMemStore6 myCompactingMemStore = ((MyCompactingMemStore6) store.memstore);
|
||||
assertTrue((int) (myCompactingMemStore.getInmemoryFlushSize()) == inMemoryFlushByteSize);
|
||||
|
||||
// Data chunk Pool is disabled.
|
||||
assertTrue(ChunkCreator.getInstance().getMaxCount(ChunkType.DATA_CHUNK) == 0);
|
||||
|
||||
MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing();
|
||||
|
||||
// First compact
|
||||
store.add(originalCell1, memStoreSizing);
|
||||
// Waiting for the first in-memory compaction finished
|
||||
myCompactingMemStore.inMemoryCompactionEndCyclicBarrier.await();
|
||||
|
||||
StoreScanner storeScanner =
|
||||
(StoreScanner) store.getScanner(new Scan(new Get(rowKey1)), quals, seqId + 1);
|
||||
SegmentScanner segmentScanner = getTypeKeyValueScanner(storeScanner, SegmentScanner.class);
|
||||
Cell resultCell1 = segmentScanner.next();
|
||||
assertTrue(CellUtil.equals(resultCell1, originalCell1));
|
||||
int cell1ChunkId = ((ExtendedCell) resultCell1).getChunkId();
|
||||
assertTrue(cell1ChunkId != ExtendedCell.CELL_NOT_BASED_ON_CHUNK);
|
||||
assertNull(segmentScanner.next());
|
||||
segmentScanner.close();
|
||||
storeScanner.close();
|
||||
Segment segment = segmentScanner.segment;
|
||||
assertTrue(segment instanceof CellChunkImmutableSegment);
|
||||
MemStoreLABImpl memStoreLAB1 = (MemStoreLABImpl) (segmentScanner.segment.getMemStoreLAB());
|
||||
assertTrue(!memStoreLAB1.isClosed());
|
||||
assertTrue(!memStoreLAB1.chunks.isEmpty());
|
||||
assertTrue(!memStoreLAB1.isReclaimed());
|
||||
|
||||
// Second compact
|
||||
store.add(originalCell2, memStoreSizing);
|
||||
// Waiting for the second in-memory compaction finished
|
||||
myCompactingMemStore.inMemoryCompactionEndCyclicBarrier.await();
|
||||
|
||||
// Before HBASE-27464, here may throw java.lang.IllegalArgumentException: In CellChunkMap, cell
|
||||
// must be associated with chunk.. We were looking for a cell at index 0.
|
||||
// The cause for this exception is because the data chunk Pool is disabled,when the data chunks
|
||||
// are recycled after the second in-memory compaction finished,the
|
||||
// {@link ChunkCreator.putbackChunks} method does not put the chunks back to the data chunk
|
||||
// pool,it just removes them from {@link ChunkCreator#chunkIdMap},so in
|
||||
// {@link CellChunkMap#getCell} we could not get the data chunk by chunkId.
|
||||
storeScanner = (StoreScanner) store.getScanner(new Scan(new Get(rowKey1)), quals, seqId + 1);
|
||||
segmentScanner = getTypeKeyValueScanner(storeScanner, SegmentScanner.class);
|
||||
Cell newResultCell1 = segmentScanner.next();
|
||||
assertTrue(newResultCell1 != resultCell1);
|
||||
assertTrue(CellUtil.equals(newResultCell1, originalCell1));
|
||||
|
||||
Cell resultCell2 = segmentScanner.next();
|
||||
assertTrue(CellUtil.equals(resultCell2, originalCell2));
|
||||
assertNull(segmentScanner.next());
|
||||
segmentScanner.close();
|
||||
storeScanner.close();
|
||||
|
||||
segment = segmentScanner.segment;
|
||||
assertTrue(segment instanceof CellChunkImmutableSegment);
|
||||
MemStoreLABImpl memStoreLAB2 = (MemStoreLABImpl) (segmentScanner.segment.getMemStoreLAB());
|
||||
assertTrue(!memStoreLAB2.isClosed());
|
||||
assertTrue(!memStoreLAB2.chunks.isEmpty());
|
||||
assertTrue(!memStoreLAB2.isReclaimed());
|
||||
assertTrue(memStoreLAB1.isClosed());
|
||||
assertTrue(memStoreLAB1.chunks.isEmpty());
|
||||
assertTrue(memStoreLAB1.isReclaimed());
|
||||
}
|
||||
|
||||
// This test is for HBASE-26210 also, test write large cell and small cell concurrently when
|
||||
// InmemoryFlushSize is smaller,equal with and larger than cell size.
|
||||
@Test
|
||||
|
|
Loading…
Reference in New Issue