From 3a097784b506090eddddac0cfc734b0417dad307 Mon Sep 17 00:00:00 2001 From: Ted Yu Date: Wed, 25 Jun 2014 03:39:40 +0000 Subject: [PATCH] HBASE-11397 When merging expired stripes, we need to create an empty file to preserve metadata (Victor Xu) --- .../regionserver/StripeMultiFileWriter.java | 11 ++ .../TestStripeCompactionPolicy.java | 124 +++++++++++++++++- 2 files changed, 132 insertions(+), 3 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeMultiFileWriter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeMultiFileWriter.java index e0baf61085a..891bb6b1fa4 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeMultiFileWriter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeMultiFileWriter.java @@ -382,6 +382,17 @@ public abstract class StripeMultiFileWriter implements Compactor.CellSink { sanityCheckRight( right, lastKv.getRowArray(), lastKv.getRowOffset(), lastKv.getRowLength()); } + + // When expired stripes were going to be merged into one, and if no writer was created during + // the compaction, we need to create an empty file to preserve metadata. + if (existingWriters.isEmpty() && 1 == targetCount) { + if (LOG.isDebugEnabled()) { + LOG.debug("Merge expired stripes into one, create an empty file to preserve metadata."); + } + boundaries.add(left); + existingWriters.add(writerFactory.createWriter()); + } + this.boundaries.add(right); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactionPolicy.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactionPolicy.java index c7365d9edf7..64a41478566 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactionPolicy.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactionPolicy.java @@ -18,15 +18,23 @@ package org.apache.hadoop.hbase.regionserver.compactions; import static org.apache.hadoop.hbase.regionserver.StripeStoreFileManager.OPEN_KEY; -import static org.junit.Assert.*; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; import static org.mockito.AdditionalMatchers.aryEq; import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyBoolean; import static org.mockito.Matchers.anyInt; import static org.mockito.Matchers.anyLong; import static org.mockito.Matchers.argThat; import static org.mockito.Matchers.eq; import static org.mockito.Matchers.isNull; -import static org.mockito.Mockito.*; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.only; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; import java.io.IOException; import java.util.ArrayList; @@ -36,21 +44,30 @@ import java.util.List; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.SmallTests; +import org.apache.hadoop.hbase.io.compress.Compression; +import org.apache.hadoop.hbase.io.hfile.HFile; +import org.apache.hadoop.hbase.regionserver.BloomType; +import org.apache.hadoop.hbase.regionserver.InternalScanner; +import org.apache.hadoop.hbase.regionserver.ScanType; +import org.apache.hadoop.hbase.regionserver.Store; import org.apache.hadoop.hbase.regionserver.StoreConfigInformation; import org.apache.hadoop.hbase.regionserver.StoreFile; +import org.apache.hadoop.hbase.regionserver.StoreFileScanner; import org.apache.hadoop.hbase.regionserver.StripeMultiFileWriter; import org.apache.hadoop.hbase.regionserver.StripeStoreConfig; import org.apache.hadoop.hbase.regionserver.StripeStoreFileManager; import org.apache.hadoop.hbase.regionserver.StripeStoreFlusher; +import org.apache.hadoop.hbase.regionserver.TestStripeCompactor.StoreFileWritersCapture; import org.apache.hadoop.hbase.regionserver.compactions.StripeCompactionPolicy.StripeInformationProvider; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.ConcatenatedLists; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.ManualEnvironmentEdge; -import org.apache.hadoop.hbase.regionserver.TestStripeCompactor.StoreFileWritersCapture; import org.junit.Test; import org.junit.experimental.categories.Category; import org.mockito.ArgumentMatcher; @@ -317,6 +334,38 @@ public class TestStripeCompactionPolicy { } } + @SuppressWarnings("unchecked") + @Test + public void testMergeExpiredStripes() throws Exception { + // HBASE-11397 + ManualEnvironmentEdge edge = new ManualEnvironmentEdge(); + long now = defaultTtl + 2; + edge.setValue(now); + EnvironmentEdgeManager.injectEdge(edge); + try { + StoreFile expiredFile = createFile(), notExpiredFile = createFile(); + when(expiredFile.getReader().getMaxTimestamp()).thenReturn(now - defaultTtl - 1); + when(notExpiredFile.getReader().getMaxTimestamp()).thenReturn(now - defaultTtl + 1); + List expired = Lists.newArrayList(expiredFile, expiredFile); + List notExpired = Lists.newArrayList(notExpiredFile, notExpiredFile); + + StripeCompactionPolicy policy = + createPolicy(HBaseConfiguration.create(), defaultSplitSize, defaultSplitCount, + defaultInitialCount, true); + + // Merge all three expired stripes into one. + StripeCompactionPolicy.StripeInformationProvider si = + createStripesWithFiles(expired, expired, expired); + verifyMergeCompatcion(policy, si, 0, 2); + + // Merge two adjacent expired stripes into one. + si = createStripesWithFiles(notExpired, expired, notExpired, expired, expired, notExpired); + verifyMergeCompatcion(policy, si, 3, 4); + } finally { + EnvironmentEdgeManager.reset(); + } + } + private static StripeCompactionPolicy.StripeInformationProvider createStripesWithFiles( List... stripeFiles) throws Exception { return createStripesWithFiles(createBoundaries(stripeFiles.length), @@ -368,6 +417,19 @@ public class TestStripeCompactionPolicy { return new ArrayList(Arrays.asList(sfs)); } + private void verifyMergeCompatcion(StripeCompactionPolicy policy, StripeInformationProvider si, + int from, int to) throws Exception { + StripeCompactionPolicy.StripeCompactionRequest scr = policy.selectCompaction(si, al(), false); + Collection sfs = getAllFiles(si, from, to); + verifyCollectionsEqual(sfs, scr.getRequest().getFiles()); + + // All the Stripes are expired, so the Compactor will not create any Writers. We need to create + // an empty file to preserve metadata + StripeCompactor sc = createCompactor(); + List paths = scr.execute(sc); + assertEquals(1, paths.size()); + } + /** * Verify the compaction that includes several entire stripes. * @param policy Policy to test. @@ -629,7 +691,12 @@ public class TestStripeCompactionPolicy { StoreFile.Reader r = mock(StoreFile.Reader.class); when(r.getEntries()).thenReturn(size); when(r.length()).thenReturn(size); + when(r.getBloomFilterType()).thenReturn(BloomType.NONE); + when(r.getHFileReader()).thenReturn(mock(HFile.Reader.class)); + when(r.getStoreFileScanner(anyBoolean(), anyBoolean(), anyBoolean(), anyLong())).thenReturn( + mock(StoreFileScanner.class)); when(sf.getReader()).thenReturn(r); + when(sf.createReader()).thenReturn(r); return sf; } @@ -641,4 +708,55 @@ public class TestStripeCompactionPolicy { when(sf.getMetadataValue(StripeStoreFileManager.STRIPE_START_KEY)).thenReturn(startKey); when(sf.getMetadataValue(StripeStoreFileManager.STRIPE_END_KEY)).thenReturn(endKey); } + + private static StripeCompactor createCompactor() throws Exception { + HColumnDescriptor col = new HColumnDescriptor(Bytes.toBytes("foo")); + StoreFileWritersCapture writers = new StoreFileWritersCapture(); + Store store = mock(Store.class); + when(store.getFamily()).thenReturn(col); + when( + store.createWriterInTmp(anyLong(), any(Compression.Algorithm.class), anyBoolean(), + anyBoolean(), anyBoolean())).thenAnswer(writers); + + Configuration conf = HBaseConfiguration.create(); + final Scanner scanner = new Scanner(); + return new StripeCompactor(conf, store) { + @Override + protected InternalScanner createScanner(Store store, List scanners, + long smallestReadPoint, long earliestPutTs, byte[] dropDeletesFromRow, + byte[] dropDeletesToRow) throws IOException { + return scanner; + } + + @Override + protected InternalScanner createScanner(Store store, List scanners, + ScanType scanType, long smallestReadPoint, long earliestPutTs) throws IOException { + return scanner; + } + }; + } + + private static class Scanner implements InternalScanner { + private final ArrayList kvs; + + public Scanner(KeyValue... kvs) { + this.kvs = new ArrayList(Arrays.asList(kvs)); + } + + @Override + public boolean next(List results) throws IOException { + if (kvs.isEmpty()) return false; + results.add(kvs.remove(0)); + return !kvs.isEmpty(); + } + + @Override + public boolean next(List result, int limit) throws IOException { + return next(result); + } + + @Override + public void close() throws IOException { + } + } }