HBASE-16799 CP exposed Store should not expose unwanted APIs.

This commit is contained in:
anoopsamjohn 2016-10-14 22:16:54 +05:30
parent 39d43ab779
commit 5f75fa0c4f
9 changed files with 91 additions and 142 deletions

View File

@ -60,7 +60,7 @@ public class FlushNonSloppyStoresFirstPolicy extends FlushLargeStoresPolicy {
super.configureForRegion(region);
this.flushSizeLowerBound = getFlushSizeLowerBound(region);
for(Store store : region.stores.values()) {
if(store.getMemStore().isSloppy()) {
if(store.isSloppyMemstore()) {
sloppyStores.add(store);
} else {
regularStores.add(store);

View File

@ -511,13 +511,6 @@ public class HMobStore extends HStore {
}
}
@Override public void finalizeFlush() {
}
@Override public MemStore getMemStore() {
return null;
}
public void updateCellsCountCompactedToMob(long count) {
cellsCountCompactedToMob += count;
}

View File

@ -942,8 +942,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
Future<HStore> future = completionService.take();
HStore store = future.get();
this.stores.put(store.getFamily().getName(), store);
MemStore memStore = store.getMemStore();
if(memStore != null && memStore.isSloppy()) {
if (store.isSloppyMemstore()) {
hasSloppyStores = true;
}
@ -2561,7 +2560,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
// If we get to here, the HStores have been written.
for(Store storeToFlush :storesToFlush) {
storeToFlush.finalizeFlush();
((HStore) storeToFlush).finalizeFlush();
}
if (wal != null) {
wal.completeCacheFlush(this.getRegionInfo().getEncodedNameAsBytes());
@ -3863,9 +3862,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
// Any change in how we update Store/MemStore needs to also be done in other applyToMemstore!!!!
boolean upsert = delta && store.getFamily().getMaxVersions() == 1;
if (upsert) {
return store.upsert(cells, getSmallestReadPoint());
return ((HStore) store).upsert(cells, getSmallestReadPoint());
} else {
return store.add(cells);
return ((HStore) store).add(cells);
}
}
@ -3880,7 +3879,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
checkFamily(CellUtil.cloneFamily(cell));
// Unreachable because checkFamily will throw exception
}
return store.add(cell);
return ((HStore) store).add(cell);
}
@Override
@ -4121,7 +4120,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
long editsCount = 0;
long intervalEdits = 0;
WAL.Entry entry;
Store store = null;
HStore store = null;
boolean reported_once = false;
ServerNonceManager ng = this.rsServices == null ? null : this.rsServices.getNonceManager();
@ -4217,7 +4216,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
}
// Figure which store the edit is meant for.
if (store == null || !CellUtil.matchingFamily(cell, store.getFamily().getName())) {
store = getStore(cell);
store = getHStore(cell);
}
if (store == null) {
// This should never happen. Perhaps schema was changed between
@ -4344,7 +4343,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
startRegionOperation(Operation.REPLAY_EVENT);
try {
Store store = this.getStore(compaction.getFamilyName().toByteArray());
HStore store = this.getHStore(compaction.getFamilyName().toByteArray());
if (store == null) {
LOG.warn(getRegionInfo().getEncodedName() + " : "
+ "Found Compaction WAL edit for deleted family:"
@ -4927,7 +4926,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
for (StoreDescriptor storeDescriptor : bulkLoadEvent.getStoresList()) {
// stores of primary may be different now
family = storeDescriptor.getFamilyName().toByteArray();
Store store = getStore(family);
HStore store = getHStore(family);
if (store == null) {
LOG.warn(getRegionInfo().getEncodedName() + " : "
+ "Received a bulk load marker from primary, but the family is not found. "
@ -5129,7 +5128,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
* @param cell Cell to add.
* @return True if we should flush.
*/
protected boolean restoreEdit(final Store s, final Cell cell) {
protected boolean restoreEdit(final HStore s, final Cell cell) {
long kvSize = s.add(cell);
if (this.rsAccounting != null) {
rsAccounting.addAndGetRegionReplayEditsSize(getRegionInfo().getRegionName(), kvSize);
@ -5167,19 +5166,23 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
@Override
public Store getStore(final byte[] column) {
return this.stores.get(column);
return getHStore(column);
}
public HStore getHStore(final byte[] column) {
return (HStore) this.stores.get(column);
}
/**
* Return HStore instance. Does not do any copy: as the number of store is limited, we
* iterate on the list.
*/
private Store getStore(Cell cell) {
private HStore getHStore(Cell cell) {
for (Map.Entry<byte[], Store> famStore : stores.entrySet()) {
if (Bytes.equals(
cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength(),
famStore.getKey(), 0, famStore.getKey().length)) {
return famStore.getValue();
return (HStore) famStore.getValue();
}
}
@ -5484,7 +5487,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
byte[] familyName = p.getFirst();
String path = p.getSecond();
Store store = getStore(familyName);
HStore store = getHStore(familyName);
if (store == null) {
IOException ioe = new org.apache.hadoop.hbase.DoNotRetryIOException(
"No such column family " + Bytes.toStringBinary(familyName));
@ -5542,7 +5545,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
for (Pair<byte[], String> p : familyPaths) {
byte[] familyName = p.getFirst();
String path = p.getSecond();
Store store = getStore(familyName);
HStore store = getHStore(familyName);
try {
String finalPath = path;
if (bulkLoadListener != null) {
@ -7089,8 +7092,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
// If no WAL, need to stamp it here.
CellUtil.setSequenceId(cell, sequenceId);
}
Store store = getStore(cell);
addedSize += applyToMemstore(store, cell);
addedSize += applyToMemstore(getHStore(cell), cell);
}
}
// STEP 8. Complete mvcc.

View File

@ -28,13 +28,11 @@ import com.google.common.collect.Sets;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.net.InetSocketAddress;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.NavigableSet;
import java.util.Set;
@ -63,9 +61,6 @@ import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.Tag;
import org.apache.hadoop.hbase.TagType;
import org.apache.hadoop.hbase.TagUtil;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.conf.ConfigurationManager;
@ -638,7 +633,11 @@ public class HStore implements Store {
return storeFile;
}
@Override
/**
* Adds a value to the memstore
* @param cell
* @return memstore size delta
*/
public long add(final Cell cell) {
lock.readLock().lock();
try {
@ -648,7 +647,11 @@ public class HStore implements Store {
}
}
@Override
/**
* Adds the specified value to the memstore
* @param cells
* @return memstore size delta
*/
public long add(final Iterable<Cell> cells) {
lock.readLock().lock();
try {
@ -686,7 +689,10 @@ public class HStore implements Store {
return this.storeEngine.getStoreFileManager().getStorefiles();
}
@Override
/**
* This throws a WrongRegionException if the HFile does not fit in this region, or an
* InvalidHFileException if the HFile is not valid.
*/
public void assertBulkLoadHFileOk(Path srcPath) throws IOException {
HFile.Reader reader = null;
try {
@ -757,7 +763,13 @@ public class HStore implements Store {
}
}
@Override
/**
* This method should only be called from Region. It is assumed that the ranges of values in the
* HFile fit within the stores assigned region. (assertBulkLoadHFileOk checks this)
*
* @param srcPathStr
* @param seqNum sequence Id associated with the HFile
*/
public Path bulkLoadHFile(String srcPathStr, long seqNum) throws IOException {
Path srcPath = new Path(srcPathStr);
Path dstPath = fs.bulkLoadStoreFile(getColumnFamilyName(), srcPath, seqNum);
@ -774,7 +786,6 @@ public class HStore implements Store {
return dstPath;
}
@Override
public void bulkLoadHFile(StoreFileInfo fileInfo) throws IOException {
StoreFile sf = createStoreFileAndReader(fileInfo);
bulkLoadHFile(sf);
@ -1415,7 +1426,6 @@ public class HStore implements Store {
* See HBASE-2231.
* @param compaction
*/
@Override
public void replayCompactionMarker(CompactionDescriptor compaction,
boolean pickCompactionFiles, boolean removeFiles)
throws IOException {
@ -2089,7 +2099,19 @@ public class HStore implements Store {
}
}
@Override
/**
* Adds or replaces the specified KeyValues.
* <p>
* For each KeyValue specified, if a cell with the same row, family, and qualifier exists in
* MemStore, it will be replaced. Otherwise, it will just be inserted to MemStore.
* <p>
* This operation is atomic on each KeyValue (row/family/qualifier) but not necessarily atomic
* across all of them.
* @param cells
* @param readpoint readpoint below which we can safely remove duplicate KVs
* @return memstore size delta
* @throws IOException
*/
public long upsert(Iterable<Cell> cells, long readpoint) throws IOException {
this.lock.readLock().lock();
try {
@ -2454,12 +2476,13 @@ public class HStore implements Store {
}
}
@Override public void finalizeFlush() {
public void finalizeFlush() {
memstore.finalizeFlush();
}
@Override public MemStore getMemStore() {
return memstore;
@Override
public boolean isSloppyMemstore() {
return this.memstore.isSloppy();
}
private void clearCompactedfiles(final List<StoreFile> filesToRemove) throws IOException {

View File

@ -23,8 +23,6 @@ import java.util.List;
import java.util.NavigableSet;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellComparator;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.HColumnDescriptor;
@ -38,7 +36,6 @@ import org.apache.hadoop.hbase.io.HeapSize;
import org.apache.hadoop.hbase.io.compress.Compression;
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoder;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.CompactionDescriptor;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
@ -128,35 +125,6 @@ public interface Store extends HeapSize, StoreConfigInformation, PropagatingConf
ScanInfo getScanInfo();
/**
* Adds or replaces the specified KeyValues.
* <p>
* For each KeyValue specified, if a cell with the same row, family, and qualifier exists in
* MemStore, it will be replaced. Otherwise, it will just be inserted to MemStore.
* <p>
* This operation is atomic on each KeyValue (row/family/qualifier) but not necessarily atomic
* across all of them.
* @param cells
* @param readpoint readpoint below which we can safely remove duplicate KVs
* @return memstore size delta
* @throws IOException
*/
long upsert(Iterable<Cell> cells, long readpoint) throws IOException;
/**
* Adds a value to the memstore
* @param cell
* @return memstore size delta
*/
long add(Cell cell);
/**
* Adds the specified value to the memstore
* @param cells
* @return memstore size delta
*/
long add(Iterable<Cell> cells);
/**
* When was the last edit done in the memstore
*/
@ -267,19 +235,6 @@ public interface Store extends HeapSize, StoreConfigInformation, PropagatingConf
StoreFlushContext createFlushContext(long cacheFlushId);
/**
* Call to complete a compaction. Its for the case where we find in the WAL a compaction
* that was not finished. We could find one recovering a WAL after a regionserver crash.
* See HBASE-2331.
* @param compaction the descriptor for compaction
* @param pickCompactionFiles whether or not pick up the new compaction output files and
* add it to the store
* @param removeFiles whether to remove/archive files from filesystem
*/
void replayCompactionMarker(CompactionDescriptor compaction, boolean pickCompactionFiles,
boolean removeFiles)
throws IOException;
// Split oriented methods
boolean canSplit();
@ -290,23 +245,6 @@ public interface Store extends HeapSize, StoreConfigInformation, PropagatingConf
*/
byte[] getSplitPoint();
// Bulk Load methods
/**
* This throws a WrongRegionException if the HFile does not fit in this region, or an
* InvalidHFileException if the HFile is not valid.
*/
void assertBulkLoadHFileOk(Path srcPath) throws IOException;
/**
* This method should only be called from Region. It is assumed that the ranges of values in the
* HFile fit within the stores assigned region. (assertBulkLoadHFileOk checks this)
*
* @param srcPathStr
* @param sequenceId sequence Id associated with the HFile
*/
Path bulkLoadHFile(String srcPathStr, long sequenceId) throws IOException;
// General accessors into the state of the store
// TODO abstract some of this out into a metrics class
@ -528,8 +466,6 @@ public interface Store extends HeapSize, StoreConfigInformation, PropagatingConf
*/
void refreshStoreFiles(Collection<String> newFiles) throws IOException;
void bulkLoadHFile(StoreFileInfo fileInfo) throws IOException;
boolean isPrimaryReplicaStore();
/**
@ -538,11 +474,7 @@ public interface Store extends HeapSize, StoreConfigInformation, PropagatingConf
void closeAndArchiveCompactedFiles() throws IOException;
/**
* This method is called when it is clear that the flush to disk is completed.
* The store may do any post-flush actions at this point.
* One example is to update the wal with sequence number that is known only at the store level.
* @return true if the memstore may need some extra memory space
*/
void finalizeFlush();
MemStore getMemStore();
boolean isSloppyMemstore();
}

View File

@ -36,7 +36,6 @@ import org.apache.hadoop.hbase.regionserver.ConstantSizeRegionSplitPolicy;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.HStore;
import org.apache.hadoop.hbase.regionserver.MemStore;
import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.regionserver.RegionServerServices;
import org.apache.hadoop.hbase.regionserver.Store;
@ -206,13 +205,6 @@ public class TestIOFencing {
}
super.completeCompaction(compactedFiles);
}
@Override public void finalizeFlush() {
}
@Override public MemStore getMemStore() {
return null;
}
}
private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();

View File

@ -41,6 +41,7 @@ import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValueTestUtil;
import org.apache.hadoop.hbase.KeyValueUtil;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.exceptions.UnexpectedStateException;
import org.apache.hadoop.hbase.testclassification.MediumTests;
@ -957,14 +958,13 @@ public class TestDefaultMemStore {
EnvironmentEdgeForMemstoreTest edge = new EnvironmentEdgeForMemstoreTest();
EnvironmentEdgeManager.injectEdge(edge);
HBaseTestingUtility hbaseUtility = HBaseTestingUtility.createLocalHTU(conf);
HRegion region = hbaseUtility.createTestRegion("foobar", new HColumnDescriptor("foo"));
String cf = "foo";
HRegion region = hbaseUtility.createTestRegion("foobar", new HColumnDescriptor(cf));
List<Store> stores = region.getStores();
assertTrue(stores.size() == 1);
Store s = stores.iterator().next();
edge.setCurrentTimeMillis(1234);
s.add(KeyValueTestUtil.create("r", "f", "q", 100, "v"));
Put p = new Put(Bytes.toBytes("r"));
p.add(KeyValueTestUtil.create("r", cf, "q", 100, "v"));
region.put(p);
edge.setCurrentTimeMillis(1234 + 100);
StringBuffer sb = new StringBuffer();
assertTrue(!region.shouldFlush(sb));

View File

@ -171,11 +171,11 @@ public class TestWalAndCompactingMemStoreFlush {
String s = "\n\n----------------------------------\n"
+ "Upon initial insert and before any flush, size of CF1 is:"
+ cf1MemstoreSizePhaseI + ", is CF1 compacted memstore?:"
+ region.getStore(FAMILY1).getMemStore().isSloppy() + ". Size of CF2 is:"
+ region.getStore(FAMILY1).isSloppyMemstore() + ". Size of CF2 is:"
+ cf2MemstoreSizePhaseI + ", is CF2 compacted memstore?:"
+ region.getStore(FAMILY2).getMemStore().isSloppy() + ". Size of CF3 is:"
+ region.getStore(FAMILY2).isSloppyMemstore() + ". Size of CF3 is:"
+ cf3MemstoreSizePhaseI + ", is CF3 compacted memstore?:"
+ region.getStore(FAMILY3).getMemStore().isSloppy() + "\n";
+ region.getStore(FAMILY3).isSloppyMemstore() + "\n";
// The overall smallest LSN in the region's memstores should be the same as
// the LSN of the smallest edit in CF1
@ -208,8 +208,10 @@ public class TestWalAndCompactingMemStoreFlush {
// Since CF1 and CF3 should be flushed to memory (not to disk),
// CF2 is going to be flushed to disk.
// CF1 - nothing to compact (but flattening), CF3 - should be twice compacted
((CompactingMemStore) region.getStore(FAMILY1).getMemStore()).flushInMemory();
((CompactingMemStore) region.getStore(FAMILY3).getMemStore()).flushInMemory();
CompactingMemStore cms1 = (CompactingMemStore) ((HStore) region.getStore(FAMILY1)).memstore;
CompactingMemStore cms3 = (CompactingMemStore) ((HStore) region.getStore(FAMILY3)).memstore;
cms1.flushInMemory();
cms3.flushInMemory();
region.flush(false);
// Recalculate everything
@ -423,11 +425,11 @@ public class TestWalAndCompactingMemStoreFlush {
String s = "\n\n----------------------------------\n"
+ "Upon initial insert and before any flush, size of CF1 is:"
+ cf1MemstoreSizePhaseI + ", is CF1 compacted memstore?:"
+ region.getStore(FAMILY1).getMemStore().isSloppy() + ". Size of CF2 is:"
+ region.getStore(FAMILY1).isSloppyMemstore() + ". Size of CF2 is:"
+ cf2MemstoreSizePhaseI + ", is CF2 compacted memstore?:"
+ region.getStore(FAMILY2).getMemStore().isSloppy() + ". Size of CF3 is:"
+ region.getStore(FAMILY2).isSloppyMemstore() + ". Size of CF3 is:"
+ cf3MemstoreSizePhaseI + ", is CF3 compacted memstore?:"
+ region.getStore(FAMILY3).getMemStore().isSloppy() + "\n";
+ region.getStore(FAMILY3).isSloppyMemstore() + "\n";
// The overall smallest LSN in the region's memstores should be the same as
// the LSN of the smallest edit in CF1
@ -459,8 +461,10 @@ public class TestWalAndCompactingMemStoreFlush {
// Since CF1 and CF3 should be flushed to memory (not to disk),
// CF2 is going to be flushed to disk.
// CF1 - nothing to compact, CF3 - should be twice compacted
((CompactingMemStore) region.getStore(FAMILY1).getMemStore()).flushInMemory();
((CompactingMemStore) region.getStore(FAMILY3).getMemStore()).flushInMemory();
CompactingMemStore cms1 = (CompactingMemStore) ((HStore) region.getStore(FAMILY1)).memstore;
CompactingMemStore cms3 = (CompactingMemStore) ((HStore) region.getStore(FAMILY3)).memstore;
cms1.flushInMemory();
cms3.flushInMemory();
region.flush(false);
// Recalculate everything
@ -670,8 +674,10 @@ public class TestWalAndCompactingMemStoreFlush {
cf1MemstoreSizePhaseI + cf2MemstoreSizePhaseI + cf3MemstoreSizePhaseI);
// Flush!
((CompactingMemStore) region.getStore(FAMILY1).getMemStore()).flushInMemory();
((CompactingMemStore) region.getStore(FAMILY3).getMemStore()).flushInMemory();
CompactingMemStore cms1 = (CompactingMemStore) ((HStore) region.getStore(FAMILY1)).memstore;
CompactingMemStore cms3 = (CompactingMemStore) ((HStore) region.getStore(FAMILY3)).memstore;
cms1.flushInMemory();
cms3.flushInMemory();
region.flush(false);
long cf2MemstoreSizePhaseII = region.getStore(FAMILY2).getMemStoreSize();
@ -720,8 +726,10 @@ public class TestWalAndCompactingMemStoreFlush {
+ smallestSeqCF2PhaseIII +", the smallest sequence in CF3:" + smallestSeqCF3PhaseIII + "\n";
// Flush!
((CompactingMemStore) region.getStore(FAMILY1).getMemStore()).flushInMemory();
((CompactingMemStore) region.getStore(FAMILY3).getMemStore()).flushInMemory();
cms1 = (CompactingMemStore) ((HStore) region.getStore(FAMILY1)).memstore;
cms3 = (CompactingMemStore) ((HStore) region.getStore(FAMILY3)).memstore;
cms1.flushInMemory();
cms3.flushInMemory();
region.flush(false);
long smallestSeqInRegionCurrentMemstorePhaseIV =

View File

@ -80,6 +80,7 @@ import org.apache.hadoop.hbase.regionserver.FlushRequestListener;
import org.apache.hadoop.hbase.regionserver.FlushRequester;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.HStore;
import org.apache.hadoop.hbase.regionserver.MemStoreSnapshot;
import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
import org.apache.hadoop.hbase.regionserver.Region;
@ -95,8 +96,6 @@ import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.HFileTestUtil;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
import org.apache.hadoop.hbase.wal.AsyncFSWALProvider;
import org.apache.hadoop.hbase.wal.FSHLogProvider;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.wal.WALFactory;
import org.apache.hadoop.hbase.wal.WALKey;
@ -561,7 +560,7 @@ public abstract class AbstractTestWALReplay {
final AtomicInteger countOfRestoredEdits = new AtomicInteger(0);
HRegion region3 = new HRegion(basedir, wal3, newFS, newConf, hri, htd, null) {
@Override
protected boolean restoreEdit(Store s, Cell cell) {
protected boolean restoreEdit(HStore s, Cell cell) {
boolean b = super.restoreEdit(s, cell);
countOfRestoredEdits.incrementAndGet();
return b;