HBASE-27464 In memory compaction 'COMPACT' may cause data corruption when adding cells large than maxAlloc(default 256k) size (#4881)

Co-authored-by: comnetwork <comnetwork@163.com>
Signed-off-by: Duo Zhang <zhangduo@apache.org>
(cherry picked from commit 7b0ac6451d)
This commit is contained in:
chenglei 2022-11-17 23:13:12 +08:00 committed by Duo Zhang
parent 309ed58f07
commit daa39a46ab
2 changed files with 107 additions and 2 deletions

View File

@ -159,8 +159,11 @@ public class CellChunkImmutableSegment extends ImmutableSegment {
offsetInCurentChunk = ChunkCreator.SIZEOF_CHUNK_HEADER;
}
if (action == MemStoreCompactionStrategy.Action.COMPACT && !alreadyCopied) {
// for compaction copy cell to the new segment (MSLAB copy)
c = maybeCloneWithAllocator(c, false);
// For compaction copy cell to the new segment (MSLAB copy),here we set forceCloneOfBigCell
// to true, because the chunk which the cell is allocated may be freed after the compaction
// is completed, see HBASE-27464.
c = maybeCloneWithAllocator(c, true);
}
offsetInCurentChunk = // add the Cell reference to the index chunk
createCellReference((ByteBufferKeyValue) c, chunks[currentChunkIdx].getData(),

View File

@ -70,6 +70,7 @@ import org.apache.hadoop.hbase.CellBuilderType;
import org.apache.hadoop.hbase.CellComparator;
import org.apache.hadoop.hbase.CellComparatorImpl;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.ExtendedCell;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
@ -100,6 +101,7 @@ import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
import org.apache.hadoop.hbase.nio.RefCnt;
import org.apache.hadoop.hbase.quotas.RegionSizeStoreImpl;
import org.apache.hadoop.hbase.regionserver.ChunkCreator.ChunkType;
import org.apache.hadoop.hbase.regionserver.MemStoreCompactionStrategy.Action;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration;
import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor;
@ -1896,6 +1898,106 @@ public class TestHStore {
myCompactingMemStore.inMemoryCompactionEndCyclicBarrier.await();
}
/**
* This test is for HBASE-27464, before this JIRA,when init {@link CellChunkImmutableSegment} for
* 'COMPACT' action, we not force copy to current MSLab. When cell size bigger than
* {@link MemStoreLABImpl#maxAlloc}, cell will stay in previous chunk which will recycle after
* segment replace, and we may read wrong data when these chunk reused by others.
*/
@Test
public void testForceCloneOfBigCellForCellChunkImmutableSegment() throws Exception {
Configuration conf = HBaseConfiguration.create();
int maxAllocByteSize = conf.getInt(MemStoreLAB.MAX_ALLOC_KEY, MemStoreLAB.MAX_ALLOC_DEFAULT);
// Construct big cell,which is large than {@link MemStoreLABImpl#maxAlloc}.
byte[] cellValue = new byte[maxAllocByteSize + 1];
final long timestamp = EnvironmentEdgeManager.currentTime();
final long seqId = 100;
final byte[] rowKey1 = Bytes.toBytes("rowKey1");
final Cell originalCell1 = createCell(rowKey1, qf1, timestamp, seqId, cellValue);
final byte[] rowKey2 = Bytes.toBytes("rowKey2");
final Cell originalCell2 = createCell(rowKey2, qf1, timestamp, seqId, cellValue);
TreeSet<byte[]> quals = new TreeSet<>(Bytes.BYTES_COMPARATOR);
quals.add(qf1);
int cellByteSize = MutableSegment.getCellLength(originalCell1);
int inMemoryFlushByteSize = cellByteSize - 1;
// set CompactingMemStore.inmemoryFlushSize to flushByteSize.
conf.set(HStore.MEMSTORE_CLASS_NAME, MyCompactingMemStore6.class.getName());
conf.setDouble(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 0.005);
conf.set(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, String.valueOf(inMemoryFlushByteSize * 200));
conf.setBoolean(WALFactory.WAL_ENABLED, false);
// Use {@link MemoryCompactionPolicy#EAGER} for always compacting.
init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family)
.setInMemoryCompaction(MemoryCompactionPolicy.EAGER).build());
MyCompactingMemStore6 myCompactingMemStore = ((MyCompactingMemStore6) store.memstore);
assertTrue((int) (myCompactingMemStore.getInmemoryFlushSize()) == inMemoryFlushByteSize);
// Data chunk Pool is disabled.
assertTrue(ChunkCreator.getInstance().getMaxCount(ChunkType.DATA_CHUNK) == 0);
MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing();
// First compact
store.add(originalCell1, memStoreSizing);
// Waiting for the first in-memory compaction finished
myCompactingMemStore.inMemoryCompactionEndCyclicBarrier.await();
StoreScanner storeScanner =
(StoreScanner) store.getScanner(new Scan(new Get(rowKey1)), quals, seqId + 1);
SegmentScanner segmentScanner = getTypeKeyValueScanner(storeScanner, SegmentScanner.class);
Cell resultCell1 = segmentScanner.next();
assertTrue(CellUtil.equals(resultCell1, originalCell1));
int cell1ChunkId = ((ExtendedCell) resultCell1).getChunkId();
assertTrue(cell1ChunkId != ExtendedCell.CELL_NOT_BASED_ON_CHUNK);
assertNull(segmentScanner.next());
segmentScanner.close();
storeScanner.close();
Segment segment = segmentScanner.segment;
assertTrue(segment instanceof CellChunkImmutableSegment);
MemStoreLABImpl memStoreLAB1 = (MemStoreLABImpl) (segmentScanner.segment.getMemStoreLAB());
assertTrue(!memStoreLAB1.isClosed());
assertTrue(!memStoreLAB1.chunks.isEmpty());
assertTrue(!memStoreLAB1.isReclaimed());
// Second compact
store.add(originalCell2, memStoreSizing);
// Waiting for the second in-memory compaction finished
myCompactingMemStore.inMemoryCompactionEndCyclicBarrier.await();
// Before HBASE-27464, here may throw java.lang.IllegalArgumentException: In CellChunkMap, cell
// must be associated with chunk.. We were looking for a cell at index 0.
// The cause for this exception is because the data chunk Pool is disabled,when the data chunks
// are recycled after the second in-memory compaction finished,the
// {@link ChunkCreator.putbackChunks} method does not put the chunks back to the data chunk
// pool,it just removes them from {@link ChunkCreator#chunkIdMap},so in
// {@link CellChunkMap#getCell} we could not get the data chunk by chunkId.
storeScanner = (StoreScanner) store.getScanner(new Scan(new Get(rowKey1)), quals, seqId + 1);
segmentScanner = getTypeKeyValueScanner(storeScanner, SegmentScanner.class);
Cell newResultCell1 = segmentScanner.next();
assertTrue(newResultCell1 != resultCell1);
assertTrue(CellUtil.equals(newResultCell1, originalCell1));
Cell resultCell2 = segmentScanner.next();
assertTrue(CellUtil.equals(resultCell2, originalCell2));
assertNull(segmentScanner.next());
segmentScanner.close();
storeScanner.close();
segment = segmentScanner.segment;
assertTrue(segment instanceof CellChunkImmutableSegment);
MemStoreLABImpl memStoreLAB2 = (MemStoreLABImpl) (segmentScanner.segment.getMemStoreLAB());
assertTrue(!memStoreLAB2.isClosed());
assertTrue(!memStoreLAB2.chunks.isEmpty());
assertTrue(!memStoreLAB2.isReclaimed());
assertTrue(memStoreLAB1.isClosed());
assertTrue(memStoreLAB1.chunks.isEmpty());
assertTrue(memStoreLAB1.isReclaimed());
}
// This test is for HBASE-26210 also, test write large cell and small cell concurrently when
// InmemoryFlushSize is smaller,equal with and larger than cell size.
@Test