HBASE-22122 Change to release mob hfile's block after rpc server shipped response to client
This commit is contained in:
parent
7dedb5625a
commit
ca92378e42
|
@ -244,19 +244,21 @@ public class DefaultMobStoreCompactor extends DefaultCompactor {
|
|||
writer.append(c);
|
||||
} else {
|
||||
// If the value is not larger than the threshold, it's not regarded a mob. Retrieve
|
||||
// the mob cell from the mob file, and write it back to the store file.
|
||||
Cell mobCell = mobStore.resolve(c, false);
|
||||
if (mobCell.getValueLength() != 0) {
|
||||
// put the mob data back to the store file
|
||||
PrivateCellUtil.setSequenceId(mobCell, c.getSequenceId());
|
||||
writer.append(mobCell);
|
||||
cellsCountCompactedFromMob++;
|
||||
cellsSizeCompactedFromMob += mobCell.getValueLength();
|
||||
} else {
|
||||
// If the value of a file is empty, there might be issues when retrieving,
|
||||
// directly write the cell to the store file, and leave it to be handled by the
|
||||
// next compaction.
|
||||
writer.append(c);
|
||||
// the mob cell from the mob file, and write it back to the store file. Must
|
||||
// close the mob scanner once the life cycle finished.
|
||||
try (MobCell mobCell = mobStore.resolve(c, false)) {
|
||||
if (mobCell.getCell().getValueLength() != 0) {
|
||||
// put the mob data back to the store file
|
||||
PrivateCellUtil.setSequenceId(mobCell.getCell(), c.getSequenceId());
|
||||
writer.append(mobCell.getCell());
|
||||
cellsCountCompactedFromMob++;
|
||||
cellsSizeCompactedFromMob += mobCell.getCell().getValueLength();
|
||||
} else {
|
||||
// If the value of a file is empty, there might be issues when retrieving,
|
||||
// directly write the cell to the store file, and leave it to be handled by the
|
||||
// next compaction.
|
||||
writer.append(c);
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
|
|
|
@ -0,0 +1,74 @@
|
|||
/**
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.hbase.mob;
|
||||
|
||||
import java.io.Closeable;
|
||||
import java.io.IOException;
|
||||
|
||||
import org.apache.hadoop.hbase.Cell;
|
||||
import org.apache.hadoop.hbase.regionserver.StoreFileScanner;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
|
||||
/**
|
||||
* The MobCell will maintain a {@link Cell} and a {@link StoreFileScanner} inside. Now, the mob cell
|
||||
* is backend by NIO ByteBuffers which are allocated from ByteBuffAllocator, so we cannot just read
|
||||
* the cell and close the MOB file scanner because the MOB file scanner closing will deallocate the
|
||||
* NIO ByteBuffers, which resulting memory leak.
|
||||
* <p>
|
||||
* Actually, the right solution is: <br>
|
||||
* 1. Read the normal cell; <br>
|
||||
* 2. Parse the value of normal cell and get MOB fileName,offset,length; <br>
|
||||
* 3. Open scanner to read the mob value; <br>
|
||||
* 4. Construct the response cell whose key is from the normal cell and value is from the mob cell.
|
||||
* <br>
|
||||
* 5. Ship the response cell to HBase client. <br>
|
||||
* 6. Release both normal cell's block and mob cell's block. <br>
|
||||
* <p>
|
||||
* For mob cell, the block releasing just means closing the the mob scanner, so here we need to keep
|
||||
* the {@link StoreFileScanner} inside and close only when we're ensure that the MobCell has been
|
||||
* shipped to RPC client.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public class MobCell implements Closeable {
|
||||
|
||||
private final Cell cell;
|
||||
private final StoreFileScanner sfScanner;
|
||||
|
||||
public MobCell(Cell cell) {
|
||||
this.cell = cell;
|
||||
this.sfScanner = null;
|
||||
}
|
||||
|
||||
public MobCell(Cell cell, StoreFileScanner sfScanner) {
|
||||
this.cell = cell;
|
||||
this.sfScanner = sfScanner;
|
||||
}
|
||||
|
||||
public Cell getCell() {
|
||||
return cell;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() throws IOException {
|
||||
if (this.sfScanner != null) {
|
||||
this.sfScanner.close();
|
||||
}
|
||||
}
|
||||
}
|
|
@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.mob;
|
|||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
|
@ -70,7 +71,7 @@ public class MobFile {
|
|||
* @return The cell in the mob file.
|
||||
* @throws IOException
|
||||
*/
|
||||
public Cell readCell(Cell search, boolean cacheMobBlocks) throws IOException {
|
||||
public MobCell readCell(Cell search, boolean cacheMobBlocks) throws IOException {
|
||||
return readCell(search, cacheMobBlocks, sf.getMaxMemStoreTS());
|
||||
}
|
||||
|
||||
|
@ -82,26 +83,26 @@ public class MobFile {
|
|||
* @return The cell in the mob file.
|
||||
* @throws IOException
|
||||
*/
|
||||
public Cell readCell(Cell search, boolean cacheMobBlocks, long readPt) throws IOException {
|
||||
Cell result = null;
|
||||
public MobCell readCell(Cell search, boolean cacheMobBlocks, long readPt) throws IOException {
|
||||
StoreFileScanner scanner = null;
|
||||
List<HStoreFile> sfs = new ArrayList<>();
|
||||
sfs.add(sf);
|
||||
boolean succ = false;
|
||||
try {
|
||||
List<StoreFileScanner> sfScanners = StoreFileScanner.getScannersForStoreFiles(sfs,
|
||||
cacheMobBlocks, true, false, false, readPt);
|
||||
List<StoreFileScanner> sfScanners = StoreFileScanner.getScannersForStoreFiles(
|
||||
Collections.singletonList(sf), cacheMobBlocks, true, false, false, readPt);
|
||||
if (!sfScanners.isEmpty()) {
|
||||
scanner = sfScanners.get(0);
|
||||
if (scanner.seek(search)) {
|
||||
result = scanner.peek();
|
||||
MobCell mobCell = new MobCell(scanner.peek(), scanner);
|
||||
succ = true;
|
||||
return mobCell;
|
||||
}
|
||||
}
|
||||
return null;
|
||||
} finally {
|
||||
if (scanner != null) {
|
||||
if (scanner != null && !succ) {
|
||||
scanner.close();
|
||||
}
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -38,7 +38,6 @@ import org.apache.hadoop.hbase.CellComparator;
|
|||
import org.apache.hadoop.hbase.DoNotRetryIOException;
|
||||
import org.apache.hadoop.hbase.ExtendedCellBuilderFactory;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.KeyValueUtil;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.Tag;
|
||||
import org.apache.hadoop.hbase.TagType;
|
||||
|
@ -49,6 +48,7 @@ import org.apache.hadoop.hbase.filter.Filter;
|
|||
import org.apache.hadoop.hbase.filter.FilterList;
|
||||
import org.apache.hadoop.hbase.io.compress.Compression;
|
||||
import org.apache.hadoop.hbase.io.hfile.CorruptHFileException;
|
||||
import org.apache.hadoop.hbase.mob.MobCell;
|
||||
import org.apache.hadoop.hbase.mob.MobConstants;
|
||||
import org.apache.hadoop.hbase.mob.MobFile;
|
||||
import org.apache.hadoop.hbase.mob.MobFileCache;
|
||||
|
@ -298,14 +298,14 @@ public class HMobStore extends HStore {
|
|||
}
|
||||
|
||||
/**
|
||||
* Reads the cell from the mob file, and the read point does not count.
|
||||
* This is used for DefaultMobStoreCompactor where we can read empty value for the missing cell.
|
||||
* Reads the cell from the mob file, and the read point does not count. This is used for
|
||||
* DefaultMobStoreCompactor where we can read empty value for the missing cell.
|
||||
* @param reference The cell found in the HBase, its value is a path to a mob file.
|
||||
* @param cacheBlocks Whether the scanner should cache blocks.
|
||||
* @return The cell found in the mob file.
|
||||
* @throws IOException
|
||||
*/
|
||||
public Cell resolve(Cell reference, boolean cacheBlocks) throws IOException {
|
||||
public MobCell resolve(Cell reference, boolean cacheBlocks) throws IOException {
|
||||
return resolve(reference, cacheBlocks, -1, true);
|
||||
}
|
||||
|
||||
|
@ -314,14 +314,14 @@ public class HMobStore extends HStore {
|
|||
* @param reference The cell found in the HBase, its value is a path to a mob file.
|
||||
* @param cacheBlocks Whether the scanner should cache blocks.
|
||||
* @param readPt the read point.
|
||||
* @param readEmptyValueOnMobCellMiss Whether return null value when the mob file is
|
||||
* missing or corrupt.
|
||||
* @param readEmptyValueOnMobCellMiss Whether return null value when the mob file is missing or
|
||||
* corrupt.
|
||||
* @return The cell found in the mob file.
|
||||
* @throws IOException
|
||||
*/
|
||||
public Cell resolve(Cell reference, boolean cacheBlocks, long readPt,
|
||||
boolean readEmptyValueOnMobCellMiss) throws IOException {
|
||||
Cell result = null;
|
||||
public MobCell resolve(Cell reference, boolean cacheBlocks, long readPt,
|
||||
boolean readEmptyValueOnMobCellMiss) throws IOException {
|
||||
MobCell mobCell = null;
|
||||
if (MobUtils.hasValidMobRefCellValue(reference)) {
|
||||
String fileName = MobUtils.getMobFileName(reference);
|
||||
Tag tableNameTag = MobUtils.getTableNameTag(reference);
|
||||
|
@ -336,35 +336,34 @@ public class HMobStore extends HStore {
|
|||
locations = new ArrayList<>(2);
|
||||
TableName tn = TableName.valueOf(tableNameString);
|
||||
locations.add(MobUtils.getMobFamilyPath(conf, tn, family.getNameAsString()));
|
||||
locations.add(HFileArchiveUtil.getStoreArchivePath(conf, tn, MobUtils
|
||||
.getMobRegionInfo(tn).getEncodedName(), family.getNameAsString()));
|
||||
locations.add(HFileArchiveUtil.getStoreArchivePath(conf, tn,
|
||||
MobUtils.getMobRegionInfo(tn).getEncodedName(), family.getNameAsString()));
|
||||
map.put(tableNameString, locations);
|
||||
}
|
||||
} finally {
|
||||
keyLock.releaseLockEntry(lockEntry);
|
||||
}
|
||||
}
|
||||
result = readCell(locations, fileName, reference, cacheBlocks, readPt,
|
||||
mobCell = readCell(locations, fileName, reference, cacheBlocks, readPt,
|
||||
readEmptyValueOnMobCellMiss);
|
||||
}
|
||||
}
|
||||
if (result == null) {
|
||||
if (mobCell == null) {
|
||||
LOG.warn("The Cell result is null, assemble a new Cell with the same row,family,"
|
||||
+ "qualifier,timestamp,type and tags but with an empty value to return.");
|
||||
result = ExtendedCellBuilderFactory.create(CellBuilderType.DEEP_COPY)
|
||||
.setRow(reference.getRowArray(), reference.getRowOffset(), reference.getRowLength())
|
||||
.setFamily(reference.getFamilyArray(), reference.getFamilyOffset(),
|
||||
reference.getFamilyLength())
|
||||
.setQualifier(reference.getQualifierArray(),
|
||||
reference.getQualifierOffset(), reference.getQualifierLength())
|
||||
.setTimestamp(reference.getTimestamp())
|
||||
.setType(reference.getTypeByte())
|
||||
.setValue(HConstants.EMPTY_BYTE_ARRAY)
|
||||
.setTags(reference.getTagsArray(), reference.getTagsOffset(),
|
||||
reference.getTagsLength())
|
||||
.build();
|
||||
Cell cell = ExtendedCellBuilderFactory.create(CellBuilderType.DEEP_COPY)
|
||||
.setRow(reference.getRowArray(), reference.getRowOffset(), reference.getRowLength())
|
||||
.setFamily(reference.getFamilyArray(), reference.getFamilyOffset(),
|
||||
reference.getFamilyLength())
|
||||
.setQualifier(reference.getQualifierArray(), reference.getQualifierOffset(),
|
||||
reference.getQualifierLength())
|
||||
.setTimestamp(reference.getTimestamp()).setType(reference.getTypeByte())
|
||||
.setValue(HConstants.EMPTY_BYTE_ARRAY)
|
||||
.setTags(reference.getTagsArray(), reference.getTagsOffset(), reference.getTagsLength())
|
||||
.build();
|
||||
mobCell = new MobCell(cell);
|
||||
}
|
||||
return result;
|
||||
return mobCell;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -383,8 +382,8 @@ public class HMobStore extends HStore {
|
|||
* @return The found cell. Null if there's no such a cell.
|
||||
* @throws IOException
|
||||
*/
|
||||
private Cell readCell(List<Path> locations, String fileName, Cell search, boolean cacheMobBlocks,
|
||||
long readPt, boolean readEmptyValueOnMobCellMiss) throws IOException {
|
||||
private MobCell readCell(List<Path> locations, String fileName, Cell search,
|
||||
boolean cacheMobBlocks, long readPt, boolean readEmptyValueOnMobCellMiss) throws IOException {
|
||||
FileSystem fs = getFileSystem();
|
||||
Throwable throwable = null;
|
||||
for (Path location : locations) {
|
||||
|
@ -392,12 +391,8 @@ public class HMobStore extends HStore {
|
|||
Path path = new Path(location, fileName);
|
||||
try {
|
||||
file = mobFileCache.openFile(fs, path, cacheConf);
|
||||
Cell cell = readPt != -1 ? file.readCell(search, cacheMobBlocks, readPt)
|
||||
return readPt != -1 ? file.readCell(search, cacheMobBlocks, readPt)
|
||||
: file.readCell(search, cacheMobBlocks);
|
||||
// Now we will return blocks to allocator for mob cells before shipping to rpc client.
|
||||
// it will be memory leak. so just copy cell as an on-heap KV here. will remove this in
|
||||
// HBASE-22122 (TODO)
|
||||
return KeyValueUtil.copyToNewKeyValue(cell);
|
||||
} catch (IOException e) {
|
||||
mobFileCache.evictFile(fileName);
|
||||
throwable = e;
|
||||
|
@ -425,7 +420,7 @@ public class HMobStore extends HStore {
|
|||
}
|
||||
}
|
||||
LOG.error("The mob file " + fileName + " could not be found in the locations " + locations
|
||||
+ " or it is corrupt");
|
||||
+ " or it is corrupt");
|
||||
if (readEmptyValueOnMobCellMiss) {
|
||||
return null;
|
||||
} else if ((throwable instanceof FileNotFoundException)
|
||||
|
|
|
@ -19,13 +19,17 @@
|
|||
package org.apache.hadoop.hbase.regionserver;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.NavigableSet;
|
||||
|
||||
import org.apache.hadoop.hbase.Cell;
|
||||
import org.apache.hadoop.hbase.client.Scan;
|
||||
import org.apache.hadoop.hbase.mob.MobCell;
|
||||
import org.apache.hadoop.hbase.mob.MobUtils;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
/**
|
||||
* Scanner scans both the memstore and the MOB Store. Coalesce KeyValue stream into
|
||||
|
@ -34,10 +38,13 @@ import org.apache.yetus.audience.InterfaceAudience;
|
|||
@InterfaceAudience.Private
|
||||
public class MobStoreScanner extends StoreScanner {
|
||||
|
||||
private static final Logger LOG = LoggerFactory.getLogger(MobStoreScanner.class);
|
||||
|
||||
private boolean cacheMobBlocks = false;
|
||||
private boolean rawMobScan = false;
|
||||
private boolean readEmptyValueOnMobCellMiss = false;
|
||||
private final HMobStore mobStore;
|
||||
private final List<MobCell> referencedMobCells;
|
||||
|
||||
public MobStoreScanner(HStore store, ScanInfo scanInfo, Scan scan,
|
||||
final NavigableSet<byte[]> columns, long readPt) throws IOException {
|
||||
|
@ -49,6 +56,7 @@ public class MobStoreScanner extends StoreScanner {
|
|||
throw new IllegalArgumentException("The store " + store + " is not a HMobStore");
|
||||
}
|
||||
mobStore = (HMobStore) store;
|
||||
this.referencedMobCells = new ArrayList<>();
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -69,11 +77,13 @@ public class MobStoreScanner extends StoreScanner {
|
|||
for (int i = 0; i < outResult.size(); i++) {
|
||||
Cell cell = outResult.get(i);
|
||||
if (MobUtils.isMobReferenceCell(cell)) {
|
||||
Cell mobCell = mobStore
|
||||
.resolve(cell, cacheMobBlocks, readPt, readEmptyValueOnMobCellMiss);
|
||||
MobCell mobCell =
|
||||
mobStore.resolve(cell, cacheMobBlocks, readPt, readEmptyValueOnMobCellMiss);
|
||||
mobKVCount++;
|
||||
mobKVSize += mobCell.getValueLength();
|
||||
outResult.set(i, mobCell);
|
||||
mobKVSize += mobCell.getCell().getValueLength();
|
||||
outResult.set(i, mobCell.getCell());
|
||||
// Keep the MobCell here unless we shipped the RPC or close the scanner.
|
||||
referencedMobCells.add(mobCell);
|
||||
}
|
||||
}
|
||||
mobStore.updateMobScanCellsCount(mobKVCount);
|
||||
|
@ -81,4 +91,27 @@ public class MobStoreScanner extends StoreScanner {
|
|||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
private void freeAllReferencedMobCells() throws IOException {
|
||||
for (MobCell cell : referencedMobCells) {
|
||||
cell.close();
|
||||
}
|
||||
referencedMobCells.clear();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void shipped() throws IOException {
|
||||
super.shipped();
|
||||
this.freeAllReferencedMobCells();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
super.close();
|
||||
try {
|
||||
this.freeAllReferencedMobCells();
|
||||
} catch (IOException e) {
|
||||
LOG.warn("Failed to free referenced mob cells: ", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -291,7 +291,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
|
|||
*/
|
||||
static final int BATCH_ROWS_THRESHOLD_DEFAULT = 5000;
|
||||
|
||||
protected static final String RESERVOIR_ENABLED_KEY = "hbase.ipc.server.reservoir.enabled";
|
||||
public static final String RESERVOIR_ENABLED_KEY = "hbase.ipc.server.reservoir.enabled";
|
||||
|
||||
// Request counter. (Includes requests that are not serviced by regions.)
|
||||
// Count only once for requests with multiple actions like multi/caching-scan/replayBatch
|
||||
|
|
|
@ -19,26 +19,31 @@
|
|||
package org.apache.hadoop.hbase.regionserver;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.NavigableSet;
|
||||
|
||||
import org.apache.hadoop.hbase.Cell;
|
||||
import org.apache.hadoop.hbase.client.Scan;
|
||||
import org.apache.hadoop.hbase.mob.MobCell;
|
||||
import org.apache.hadoop.hbase.mob.MobUtils;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
/**
|
||||
* ReversedMobStoreScanner extends from ReversedStoreScanner, and is used to support
|
||||
* reversed scanning in both the memstore and the MOB store.
|
||||
*
|
||||
* ReversedMobStoreScanner extends from ReversedStoreScanner, and is used to support reversed
|
||||
* scanning in both the memstore and the MOB store.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public class ReversedMobStoreScanner extends ReversedStoreScanner {
|
||||
|
||||
private static final Logger LOG = LoggerFactory.getLogger(ReversedMobStoreScanner.class);
|
||||
private boolean cacheMobBlocks = false;
|
||||
private boolean rawMobScan = false;
|
||||
private boolean readEmptyValueOnMobCellMiss = false;
|
||||
protected final HMobStore mobStore;
|
||||
private final HMobStore mobStore;
|
||||
private final List<MobCell> referencedMobCells;
|
||||
|
||||
ReversedMobStoreScanner(HStore store, ScanInfo scanInfo, Scan scan, NavigableSet<byte[]> columns,
|
||||
long readPt) throws IOException {
|
||||
|
@ -50,6 +55,7 @@ public class ReversedMobStoreScanner extends ReversedStoreScanner {
|
|||
throw new IllegalArgumentException("The store " + store + " is not a HMobStore");
|
||||
}
|
||||
mobStore = (HMobStore) store;
|
||||
this.referencedMobCells = new ArrayList<>();
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -70,11 +76,13 @@ public class ReversedMobStoreScanner extends ReversedStoreScanner {
|
|||
for (int i = 0; i < outResult.size(); i++) {
|
||||
Cell cell = outResult.get(i);
|
||||
if (MobUtils.isMobReferenceCell(cell)) {
|
||||
Cell mobCell = mobStore
|
||||
.resolve(cell, cacheMobBlocks, readPt, readEmptyValueOnMobCellMiss);
|
||||
MobCell mobCell =
|
||||
mobStore.resolve(cell, cacheMobBlocks, readPt, readEmptyValueOnMobCellMiss);
|
||||
mobKVCount++;
|
||||
mobKVSize += mobCell.getValueLength();
|
||||
outResult.set(i, mobCell);
|
||||
mobKVSize += mobCell.getCell().getValueLength();
|
||||
outResult.set(i, mobCell.getCell());
|
||||
// Keep the MobCell here unless we shipped the RPC or close the scanner.
|
||||
referencedMobCells.add(mobCell);
|
||||
}
|
||||
}
|
||||
mobStore.updateMobScanCellsCount(mobKVCount);
|
||||
|
@ -82,4 +90,27 @@ public class ReversedMobStoreScanner extends ReversedStoreScanner {
|
|||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
private void freeAllReferencedMobCells() throws IOException {
|
||||
for (MobCell mobCell : referencedMobCells) {
|
||||
mobCell.close();
|
||||
}
|
||||
referencedMobCells.clear();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void shipped() throws IOException {
|
||||
super.shipped();
|
||||
this.freeAllReferencedMobCells();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
super.close();
|
||||
try {
|
||||
this.freeAllReferencedMobCells();
|
||||
} catch (IOException e) {
|
||||
LOG.warn("Failed to free referenced mob cells: ", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -97,18 +97,6 @@ public class StoreFileScanner implements KeyValueScanner {
|
|||
this.reader.incrementRefCount();
|
||||
}
|
||||
|
||||
boolean isPrimaryReplica() {
|
||||
return reader.isPrimaryReplicaReader();
|
||||
}
|
||||
|
||||
/**
|
||||
* Return an array of scanners corresponding to the given set of store files.
|
||||
*/
|
||||
public static List<StoreFileScanner> getScannersForStoreFiles(Collection<HStoreFile> files,
|
||||
boolean cacheBlocks, boolean usePread, long readPt) throws IOException {
|
||||
return getScannersForStoreFiles(files, cacheBlocks, usePread, false, false, readPt);
|
||||
}
|
||||
|
||||
/**
|
||||
* Return an array of scanners corresponding to the given set of store files.
|
||||
*/
|
||||
|
|
|
@ -113,46 +113,45 @@ public class TestCachedMobFile {
|
|||
Path testDir = TEST_UTIL.getDataTestDir();
|
||||
FileSystem fs = testDir.getFileSystem(conf);
|
||||
HFileContext meta = new HFileContextBuilder().withBlockSize(8 * 1024).build();
|
||||
StoreFileWriter writer = new StoreFileWriter.Builder(conf, cacheConf, fs)
|
||||
.withOutputDir(testDir).withFileContext(meta).build();
|
||||
StoreFileWriter writer = new StoreFileWriter.Builder(conf, cacheConf, fs).withOutputDir(testDir)
|
||||
.withFileContext(meta).build();
|
||||
String caseName = testName.getMethodName();
|
||||
MobTestUtil.writeStoreFile(writer, caseName);
|
||||
CachedMobFile cachedMobFile = CachedMobFile.create(fs, writer.getPath(), conf, cacheConf);
|
||||
byte[] family = Bytes.toBytes(caseName);
|
||||
byte[] qualify = Bytes.toBytes(caseName);
|
||||
// Test the start key
|
||||
byte[] startKey = Bytes.toBytes("aa"); // The start key bytes
|
||||
byte[] startKey = Bytes.toBytes("aa"); // The start key bytes
|
||||
KeyValue expectedKey =
|
||||
new KeyValue(startKey, family, qualify, Long.MAX_VALUE, Type.Put, startKey);
|
||||
KeyValue seekKey = expectedKey.createKeyOnly(false);
|
||||
Cell cell = cachedMobFile.readCell(seekKey, false);
|
||||
Cell cell = cachedMobFile.readCell(seekKey, false).getCell();
|
||||
MobTestUtil.assertCellEquals(expectedKey, cell);
|
||||
|
||||
// Test the end key
|
||||
byte[] endKey = Bytes.toBytes("zz"); // The end key bytes
|
||||
byte[] endKey = Bytes.toBytes("zz"); // The end key bytes
|
||||
expectedKey = new KeyValue(endKey, family, qualify, Long.MAX_VALUE, Type.Put, endKey);
|
||||
seekKey = expectedKey.createKeyOnly(false);
|
||||
cell = cachedMobFile.readCell(seekKey, false);
|
||||
cell = cachedMobFile.readCell(seekKey, false).getCell();
|
||||
MobTestUtil.assertCellEquals(expectedKey, cell);
|
||||
|
||||
// Test the random key
|
||||
byte[] randomKey = Bytes.toBytes(MobTestUtil.generateRandomString(2));
|
||||
expectedKey = new KeyValue(randomKey, family, qualify, Long.MAX_VALUE, Type.Put, randomKey);
|
||||
seekKey = expectedKey.createKeyOnly(false);
|
||||
cell = cachedMobFile.readCell(seekKey, false);
|
||||
cell = cachedMobFile.readCell(seekKey, false).getCell();
|
||||
MobTestUtil.assertCellEquals(expectedKey, cell);
|
||||
|
||||
// Test the key which is less than the start key
|
||||
byte[] lowerKey = Bytes.toBytes("a1"); // Smaller than "aa"
|
||||
expectedKey = new KeyValue(startKey, family, qualify, Long.MAX_VALUE, Type.Put, startKey);
|
||||
seekKey = new KeyValue(lowerKey, family, qualify, Long.MAX_VALUE, Type.Put, lowerKey);
|
||||
cell = cachedMobFile.readCell(seekKey, false);
|
||||
cell = cachedMobFile.readCell(seekKey, false).getCell();
|
||||
MobTestUtil.assertCellEquals(expectedKey, cell);
|
||||
|
||||
// Test the key which is more than the end key
|
||||
byte[] upperKey = Bytes.toBytes("z{"); // Bigger than "zz"
|
||||
seekKey = new KeyValue(upperKey, family, qualify, Long.MAX_VALUE, Type.Put, upperKey);
|
||||
cell = cachedMobFile.readCell(seekKey, false);
|
||||
Assert.assertNull(cell);
|
||||
Assert.assertNull(cachedMobFile.readCell(seekKey, false));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -43,8 +43,6 @@ import org.junit.Rule;
|
|||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
import org.junit.rules.TestName;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
@Category(SmallTests.class)
|
||||
public class TestMobFile {
|
||||
|
@ -53,7 +51,6 @@ public class TestMobFile {
|
|||
public static final HBaseClassTestRule CLASS_RULE =
|
||||
HBaseClassTestRule.forClass(TestMobFile.class);
|
||||
|
||||
static final Logger LOG = LoggerFactory.getLogger(TestMobFile.class);
|
||||
private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
|
||||
private Configuration conf = TEST_UTIL.getConfiguration();
|
||||
private CacheConfig cacheConf = new CacheConfig(conf);
|
||||
|
@ -64,11 +61,9 @@ public class TestMobFile {
|
|||
public void testReadKeyValue() throws Exception {
|
||||
Path testDir = TEST_UTIL.getDataTestDir();
|
||||
FileSystem fs = testDir.getFileSystem(conf);
|
||||
HFileContext meta = new HFileContextBuilder().withBlockSize(8*1024).build();
|
||||
StoreFileWriter writer = new StoreFileWriter.Builder(conf, cacheConf, fs)
|
||||
.withOutputDir(testDir)
|
||||
.withFileContext(meta)
|
||||
.build();
|
||||
HFileContext meta = new HFileContextBuilder().withBlockSize(8 * 1024).build();
|
||||
StoreFileWriter writer = new StoreFileWriter.Builder(conf, cacheConf, fs).withOutputDir(testDir)
|
||||
.withFileContext(meta).build();
|
||||
String caseName = testName.getMethodName();
|
||||
MobTestUtil.writeStoreFile(writer, caseName);
|
||||
|
||||
|
@ -78,39 +73,38 @@ public class TestMobFile {
|
|||
byte[] qualify = Bytes.toBytes(caseName);
|
||||
|
||||
// Test the start key
|
||||
byte[] startKey = Bytes.toBytes("aa"); // The start key bytes
|
||||
byte[] startKey = Bytes.toBytes("aa"); // The start key bytes
|
||||
KeyValue expectedKey =
|
||||
new KeyValue(startKey, family, qualify, Long.MAX_VALUE, Type.Put, startKey);
|
||||
KeyValue seekKey = expectedKey.createKeyOnly(false);
|
||||
Cell cell = mobFile.readCell(seekKey, false);
|
||||
Cell cell = mobFile.readCell(seekKey, false).getCell();
|
||||
MobTestUtil.assertCellEquals(expectedKey, cell);
|
||||
|
||||
// Test the end key
|
||||
byte[] endKey = Bytes.toBytes("zz"); // The end key bytes
|
||||
byte[] endKey = Bytes.toBytes("zz"); // The end key bytes
|
||||
expectedKey = new KeyValue(endKey, family, qualify, Long.MAX_VALUE, Type.Put, endKey);
|
||||
seekKey = expectedKey.createKeyOnly(false);
|
||||
cell = mobFile.readCell(seekKey, false);
|
||||
cell = mobFile.readCell(seekKey, false).getCell();
|
||||
MobTestUtil.assertCellEquals(expectedKey, cell);
|
||||
|
||||
// Test the random key
|
||||
byte[] randomKey = Bytes.toBytes(MobTestUtil.generateRandomString(2));
|
||||
expectedKey = new KeyValue(randomKey, family, qualify, Long.MAX_VALUE, Type.Put, randomKey);
|
||||
seekKey = expectedKey.createKeyOnly(false);
|
||||
cell = mobFile.readCell(seekKey, false);
|
||||
cell = mobFile.readCell(seekKey, false).getCell();
|
||||
MobTestUtil.assertCellEquals(expectedKey, cell);
|
||||
|
||||
// Test the key which is less than the start key
|
||||
byte[] lowerKey = Bytes.toBytes("a1"); // Smaller than "aa"
|
||||
expectedKey = new KeyValue(startKey, family, qualify, Long.MAX_VALUE, Type.Put, startKey);
|
||||
seekKey = new KeyValue(lowerKey, family, qualify, Long.MAX_VALUE, Type.Put, lowerKey);
|
||||
cell = mobFile.readCell(seekKey, false);
|
||||
cell = mobFile.readCell(seekKey, false).getCell();
|
||||
MobTestUtil.assertCellEquals(expectedKey, cell);
|
||||
|
||||
// Test the key which is more than the end key
|
||||
byte[] upperKey = Bytes.toBytes("z{"); // Bigger than "zz"
|
||||
seekKey = new KeyValue(upperKey, family, qualify, Long.MAX_VALUE, Type.Put, upperKey);
|
||||
cell = mobFile.readCell(seekKey, false);
|
||||
assertNull(cell);
|
||||
assertNull(mobFile.readCell(seekKey, false));
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
|
@ -0,0 +1,114 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.hbase.mob;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.Cell;
|
||||
import org.apache.hadoop.hbase.CellUtil;
|
||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.client.Admin;
|
||||
import org.apache.hadoop.hbase.client.Result;
|
||||
import org.apache.hadoop.hbase.client.ResultScanner;
|
||||
import org.apache.hadoop.hbase.client.Scan;
|
||||
import org.apache.hadoop.hbase.client.Table;
|
||||
import org.apache.hadoop.hbase.regionserver.RSRpcServices;
|
||||
import org.apache.hadoop.hbase.snapshot.MobSnapshotTestingUtils;
|
||||
import org.apache.hadoop.hbase.snapshot.SnapshotTestingUtils;
|
||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.Assert;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.ClassRule;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
/**
|
||||
* Test the MOB feature when enable RPC ByteBuffAllocator (HBASE-22122)
|
||||
*/
|
||||
@Category({ MediumTests.class })
|
||||
public class TestMobWithByteBuffAllocator {
|
||||
|
||||
@ClassRule
|
||||
public static final HBaseClassTestRule CLASS_RULE =
|
||||
HBaseClassTestRule.forClass(TestMobWithByteBuffAllocator.class);
|
||||
|
||||
private static final String TABLE_NAME = "TestMobWithByteBuffAllocator";
|
||||
private static final Logger LOG = LoggerFactory.getLogger(TestMobWithByteBuffAllocator.class);
|
||||
|
||||
private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
|
||||
private static final Configuration CONF = UTIL.getConfiguration();
|
||||
private static final byte[] FAMILY = Bytes.toBytes("f");
|
||||
|
||||
@BeforeClass
|
||||
public static void setUp() throws Exception {
|
||||
// Must use the ByteBuffAllocator here
|
||||
CONF.setBoolean(RSRpcServices.RESERVOIR_ENABLED_KEY, true);
|
||||
// Must use OFF-HEAP BucketCache here.
|
||||
CONF.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0.1f);
|
||||
CONF.set(HConstants.BUCKET_CACHE_IOENGINE_KEY, "offheap");
|
||||
// 32MB for BucketCache.
|
||||
CONF.setFloat(HConstants.BUCKET_CACHE_SIZE_KEY, 32);
|
||||
CONF.setInt(MobConstants.MOB_FILE_CACHE_SIZE_KEY, 0);
|
||||
UTIL.startMiniCluster();
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void tearDown() throws Exception {
|
||||
UTIL.shutdownMiniCluster();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testReadingCellsFromHFile() throws Exception {
|
||||
TableName tableName = TableName.valueOf(TABLE_NAME);
|
||||
MobSnapshotTestingUtils.createMobTable(UTIL, tableName, 1, FAMILY);
|
||||
LOG.info("Create an mob table {} successfully.", tableName);
|
||||
|
||||
int expectedRows = 500;
|
||||
SnapshotTestingUtils.loadData(UTIL, tableName, expectedRows, FAMILY);
|
||||
LOG.info("Load 500 rows data into table {} successfully.", tableName);
|
||||
|
||||
// Flush all the data into HFiles.
|
||||
try (Admin admin = UTIL.getConnection().getAdmin()) {
|
||||
admin.flush(tableName);
|
||||
}
|
||||
|
||||
// Scan the rows
|
||||
MobSnapshotTestingUtils.verifyMobRowCount(UTIL, tableName, expectedRows);
|
||||
|
||||
// Reversed scan the rows
|
||||
int rows = 0;
|
||||
try (Table table = UTIL.getConnection().getTable(tableName)) {
|
||||
try (ResultScanner scanner = table.getScanner(new Scan().setReversed(true))) {
|
||||
for (Result res; (res = scanner.next()) != null;) {
|
||||
rows++;
|
||||
for (Cell cell : res.listCells()) {
|
||||
Assert.assertTrue(CellUtil.cloneValue(cell).length > 0);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
Assert.assertEquals(expectedRows, rows);
|
||||
}
|
||||
}
|
|
@ -448,17 +448,14 @@ public class TestHMobStore {
|
|||
String targetPathName = MobUtils.formatDate(currentDate);
|
||||
Path targetPath = new Path(store.getPath(), targetPathName);
|
||||
store.commitFile(mobFilePath, targetPath);
|
||||
//resolve
|
||||
Cell resultCell1 = store.resolve(seekKey1, false);
|
||||
Cell resultCell2 = store.resolve(seekKey2, false);
|
||||
Cell resultCell3 = store.resolve(seekKey3, false);
|
||||
//compare
|
||||
Assert.assertEquals(Bytes.toString(value),
|
||||
Bytes.toString(CellUtil.cloneValue(resultCell1)));
|
||||
Assert.assertEquals(Bytes.toString(value),
|
||||
Bytes.toString(CellUtil.cloneValue(resultCell2)));
|
||||
Assert.assertEquals(Bytes.toString(value2),
|
||||
Bytes.toString(CellUtil.cloneValue(resultCell3)));
|
||||
// resolve
|
||||
Cell resultCell1 = store.resolve(seekKey1, false).getCell();
|
||||
Cell resultCell2 = store.resolve(seekKey2, false).getCell();
|
||||
Cell resultCell3 = store.resolve(seekKey3, false).getCell();
|
||||
// compare
|
||||
Assert.assertEquals(Bytes.toString(value), Bytes.toString(CellUtil.cloneValue(resultCell1)));
|
||||
Assert.assertEquals(Bytes.toString(value), Bytes.toString(CellUtil.cloneValue(resultCell2)));
|
||||
Assert.assertEquals(Bytes.toString(value2), Bytes.toString(CellUtil.cloneValue(resultCell3)));
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
Loading…
Reference in New Issue