HBASE-11397 When merging expired stripes, we need to create an empty file to preserve metadata (Victor Xu)
This commit is contained in:
parent
235aaee4fe
commit
3a097784b5
|
@ -382,6 +382,17 @@ public abstract class StripeMultiFileWriter implements Compactor.CellSink {
|
||||||
sanityCheckRight(
|
sanityCheckRight(
|
||||||
right, lastKv.getRowArray(), lastKv.getRowOffset(), lastKv.getRowLength());
|
right, lastKv.getRowArray(), lastKv.getRowOffset(), lastKv.getRowLength());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// When expired stripes were going to be merged into one, and if no writer was created during
|
||||||
|
// the compaction, we need to create an empty file to preserve metadata.
|
||||||
|
if (existingWriters.isEmpty() && 1 == targetCount) {
|
||||||
|
if (LOG.isDebugEnabled()) {
|
||||||
|
LOG.debug("Merge expired stripes into one, create an empty file to preserve metadata.");
|
||||||
|
}
|
||||||
|
boundaries.add(left);
|
||||||
|
existingWriters.add(writerFactory.createWriter());
|
||||||
|
}
|
||||||
|
|
||||||
this.boundaries.add(right);
|
this.boundaries.add(right);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -18,15 +18,23 @@
|
||||||
package org.apache.hadoop.hbase.regionserver.compactions;
|
package org.apache.hadoop.hbase.regionserver.compactions;
|
||||||
|
|
||||||
import static org.apache.hadoop.hbase.regionserver.StripeStoreFileManager.OPEN_KEY;
|
import static org.apache.hadoop.hbase.regionserver.StripeStoreFileManager.OPEN_KEY;
|
||||||
import static org.junit.Assert.*;
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.assertFalse;
|
||||||
|
import static org.junit.Assert.assertNull;
|
||||||
|
import static org.junit.Assert.assertTrue;
|
||||||
import static org.mockito.AdditionalMatchers.aryEq;
|
import static org.mockito.AdditionalMatchers.aryEq;
|
||||||
import static org.mockito.Matchers.any;
|
import static org.mockito.Matchers.any;
|
||||||
|
import static org.mockito.Matchers.anyBoolean;
|
||||||
import static org.mockito.Matchers.anyInt;
|
import static org.mockito.Matchers.anyInt;
|
||||||
import static org.mockito.Matchers.anyLong;
|
import static org.mockito.Matchers.anyLong;
|
||||||
import static org.mockito.Matchers.argThat;
|
import static org.mockito.Matchers.argThat;
|
||||||
import static org.mockito.Matchers.eq;
|
import static org.mockito.Matchers.eq;
|
||||||
import static org.mockito.Matchers.isNull;
|
import static org.mockito.Matchers.isNull;
|
||||||
import static org.mockito.Mockito.*;
|
import static org.mockito.Mockito.mock;
|
||||||
|
import static org.mockito.Mockito.only;
|
||||||
|
import static org.mockito.Mockito.times;
|
||||||
|
import static org.mockito.Mockito.verify;
|
||||||
|
import static org.mockito.Mockito.when;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
|
@ -36,21 +44,30 @@ import java.util.List;
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.hbase.Cell;
|
||||||
import org.apache.hadoop.hbase.HBaseConfiguration;
|
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||||
|
import org.apache.hadoop.hbase.HColumnDescriptor;
|
||||||
import org.apache.hadoop.hbase.KeyValue;
|
import org.apache.hadoop.hbase.KeyValue;
|
||||||
import org.apache.hadoop.hbase.SmallTests;
|
import org.apache.hadoop.hbase.SmallTests;
|
||||||
|
import org.apache.hadoop.hbase.io.compress.Compression;
|
||||||
|
import org.apache.hadoop.hbase.io.hfile.HFile;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.BloomType;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.InternalScanner;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.ScanType;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.Store;
|
||||||
import org.apache.hadoop.hbase.regionserver.StoreConfigInformation;
|
import org.apache.hadoop.hbase.regionserver.StoreConfigInformation;
|
||||||
import org.apache.hadoop.hbase.regionserver.StoreFile;
|
import org.apache.hadoop.hbase.regionserver.StoreFile;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.StoreFileScanner;
|
||||||
import org.apache.hadoop.hbase.regionserver.StripeMultiFileWriter;
|
import org.apache.hadoop.hbase.regionserver.StripeMultiFileWriter;
|
||||||
import org.apache.hadoop.hbase.regionserver.StripeStoreConfig;
|
import org.apache.hadoop.hbase.regionserver.StripeStoreConfig;
|
||||||
import org.apache.hadoop.hbase.regionserver.StripeStoreFileManager;
|
import org.apache.hadoop.hbase.regionserver.StripeStoreFileManager;
|
||||||
import org.apache.hadoop.hbase.regionserver.StripeStoreFlusher;
|
import org.apache.hadoop.hbase.regionserver.StripeStoreFlusher;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.TestStripeCompactor.StoreFileWritersCapture;
|
||||||
import org.apache.hadoop.hbase.regionserver.compactions.StripeCompactionPolicy.StripeInformationProvider;
|
import org.apache.hadoop.hbase.regionserver.compactions.StripeCompactionPolicy.StripeInformationProvider;
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
import org.apache.hadoop.hbase.util.ConcatenatedLists;
|
import org.apache.hadoop.hbase.util.ConcatenatedLists;
|
||||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||||
import org.apache.hadoop.hbase.util.ManualEnvironmentEdge;
|
import org.apache.hadoop.hbase.util.ManualEnvironmentEdge;
|
||||||
import org.apache.hadoop.hbase.regionserver.TestStripeCompactor.StoreFileWritersCapture;
|
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.junit.experimental.categories.Category;
|
import org.junit.experimental.categories.Category;
|
||||||
import org.mockito.ArgumentMatcher;
|
import org.mockito.ArgumentMatcher;
|
||||||
|
@ -317,6 +334,38 @@ public class TestStripeCompactionPolicy {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
|
@Test
|
||||||
|
public void testMergeExpiredStripes() throws Exception {
|
||||||
|
// HBASE-11397
|
||||||
|
ManualEnvironmentEdge edge = new ManualEnvironmentEdge();
|
||||||
|
long now = defaultTtl + 2;
|
||||||
|
edge.setValue(now);
|
||||||
|
EnvironmentEdgeManager.injectEdge(edge);
|
||||||
|
try {
|
||||||
|
StoreFile expiredFile = createFile(), notExpiredFile = createFile();
|
||||||
|
when(expiredFile.getReader().getMaxTimestamp()).thenReturn(now - defaultTtl - 1);
|
||||||
|
when(notExpiredFile.getReader().getMaxTimestamp()).thenReturn(now - defaultTtl + 1);
|
||||||
|
List<StoreFile> expired = Lists.newArrayList(expiredFile, expiredFile);
|
||||||
|
List<StoreFile> notExpired = Lists.newArrayList(notExpiredFile, notExpiredFile);
|
||||||
|
|
||||||
|
StripeCompactionPolicy policy =
|
||||||
|
createPolicy(HBaseConfiguration.create(), defaultSplitSize, defaultSplitCount,
|
||||||
|
defaultInitialCount, true);
|
||||||
|
|
||||||
|
// Merge all three expired stripes into one.
|
||||||
|
StripeCompactionPolicy.StripeInformationProvider si =
|
||||||
|
createStripesWithFiles(expired, expired, expired);
|
||||||
|
verifyMergeCompatcion(policy, si, 0, 2);
|
||||||
|
|
||||||
|
// Merge two adjacent expired stripes into one.
|
||||||
|
si = createStripesWithFiles(notExpired, expired, notExpired, expired, expired, notExpired);
|
||||||
|
verifyMergeCompatcion(policy, si, 3, 4);
|
||||||
|
} finally {
|
||||||
|
EnvironmentEdgeManager.reset();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private static StripeCompactionPolicy.StripeInformationProvider createStripesWithFiles(
|
private static StripeCompactionPolicy.StripeInformationProvider createStripesWithFiles(
|
||||||
List<StoreFile>... stripeFiles) throws Exception {
|
List<StoreFile>... stripeFiles) throws Exception {
|
||||||
return createStripesWithFiles(createBoundaries(stripeFiles.length),
|
return createStripesWithFiles(createBoundaries(stripeFiles.length),
|
||||||
|
@ -368,6 +417,19 @@ public class TestStripeCompactionPolicy {
|
||||||
return new ArrayList<StoreFile>(Arrays.asList(sfs));
|
return new ArrayList<StoreFile>(Arrays.asList(sfs));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void verifyMergeCompatcion(StripeCompactionPolicy policy, StripeInformationProvider si,
|
||||||
|
int from, int to) throws Exception {
|
||||||
|
StripeCompactionPolicy.StripeCompactionRequest scr = policy.selectCompaction(si, al(), false);
|
||||||
|
Collection<StoreFile> sfs = getAllFiles(si, from, to);
|
||||||
|
verifyCollectionsEqual(sfs, scr.getRequest().getFiles());
|
||||||
|
|
||||||
|
// All the Stripes are expired, so the Compactor will not create any Writers. We need to create
|
||||||
|
// an empty file to preserve metadata
|
||||||
|
StripeCompactor sc = createCompactor();
|
||||||
|
List<Path> paths = scr.execute(sc);
|
||||||
|
assertEquals(1, paths.size());
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Verify the compaction that includes several entire stripes.
|
* Verify the compaction that includes several entire stripes.
|
||||||
* @param policy Policy to test.
|
* @param policy Policy to test.
|
||||||
|
@ -629,7 +691,12 @@ public class TestStripeCompactionPolicy {
|
||||||
StoreFile.Reader r = mock(StoreFile.Reader.class);
|
StoreFile.Reader r = mock(StoreFile.Reader.class);
|
||||||
when(r.getEntries()).thenReturn(size);
|
when(r.getEntries()).thenReturn(size);
|
||||||
when(r.length()).thenReturn(size);
|
when(r.length()).thenReturn(size);
|
||||||
|
when(r.getBloomFilterType()).thenReturn(BloomType.NONE);
|
||||||
|
when(r.getHFileReader()).thenReturn(mock(HFile.Reader.class));
|
||||||
|
when(r.getStoreFileScanner(anyBoolean(), anyBoolean(), anyBoolean(), anyLong())).thenReturn(
|
||||||
|
mock(StoreFileScanner.class));
|
||||||
when(sf.getReader()).thenReturn(r);
|
when(sf.getReader()).thenReturn(r);
|
||||||
|
when(sf.createReader()).thenReturn(r);
|
||||||
return sf;
|
return sf;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -641,4 +708,55 @@ public class TestStripeCompactionPolicy {
|
||||||
when(sf.getMetadataValue(StripeStoreFileManager.STRIPE_START_KEY)).thenReturn(startKey);
|
when(sf.getMetadataValue(StripeStoreFileManager.STRIPE_START_KEY)).thenReturn(startKey);
|
||||||
when(sf.getMetadataValue(StripeStoreFileManager.STRIPE_END_KEY)).thenReturn(endKey);
|
when(sf.getMetadataValue(StripeStoreFileManager.STRIPE_END_KEY)).thenReturn(endKey);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static StripeCompactor createCompactor() throws Exception {
|
||||||
|
HColumnDescriptor col = new HColumnDescriptor(Bytes.toBytes("foo"));
|
||||||
|
StoreFileWritersCapture writers = new StoreFileWritersCapture();
|
||||||
|
Store store = mock(Store.class);
|
||||||
|
when(store.getFamily()).thenReturn(col);
|
||||||
|
when(
|
||||||
|
store.createWriterInTmp(anyLong(), any(Compression.Algorithm.class), anyBoolean(),
|
||||||
|
anyBoolean(), anyBoolean())).thenAnswer(writers);
|
||||||
|
|
||||||
|
Configuration conf = HBaseConfiguration.create();
|
||||||
|
final Scanner scanner = new Scanner();
|
||||||
|
return new StripeCompactor(conf, store) {
|
||||||
|
@Override
|
||||||
|
protected InternalScanner createScanner(Store store, List<StoreFileScanner> scanners,
|
||||||
|
long smallestReadPoint, long earliestPutTs, byte[] dropDeletesFromRow,
|
||||||
|
byte[] dropDeletesToRow) throws IOException {
|
||||||
|
return scanner;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected InternalScanner createScanner(Store store, List<StoreFileScanner> scanners,
|
||||||
|
ScanType scanType, long smallestReadPoint, long earliestPutTs) throws IOException {
|
||||||
|
return scanner;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
private static class Scanner implements InternalScanner {
|
||||||
|
private final ArrayList<KeyValue> kvs;
|
||||||
|
|
||||||
|
public Scanner(KeyValue... kvs) {
|
||||||
|
this.kvs = new ArrayList<KeyValue>(Arrays.asList(kvs));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean next(List<Cell> results) throws IOException {
|
||||||
|
if (kvs.isEmpty()) return false;
|
||||||
|
results.add(kvs.remove(0));
|
||||||
|
return !kvs.isEmpty();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean next(List<Cell> result, int limit) throws IOException {
|
||||||
|
return next(result);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void close() throws IOException {
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue