HBASE-15876 Remove doBulkLoad(Path hfofDir, final HTable table) though it has not been through a full deprecation cycle
Signed-off-by: stack <stack@apache.org>
This commit is contained in:
parent
ae42c65cfd
commit
7130a222ce
|
@ -25,6 +25,32 @@ import com.google.common.collect.Multimap;
|
|||
import com.google.common.collect.Multimaps;
|
||||
import com.google.common.util.concurrent.ThreadFactoryBuilder;
|
||||
|
||||
import java.io.FileNotFoundException;
|
||||
import java.io.IOException;
|
||||
import java.io.InterruptedIOException;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.Deque;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.Iterator;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Map.Entry;
|
||||
import java.util.Set;
|
||||
import java.util.TreeMap;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.Callable;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Future;
|
||||
import java.util.concurrent.LinkedBlockingQueue;
|
||||
import java.util.concurrent.ThreadPoolExecutor;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.apache.commons.lang.mutable.MutableInt;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
@ -45,7 +71,6 @@ import org.apache.hadoop.hbase.classification.InterfaceStability;
|
|||
import org.apache.hadoop.hbase.client.Admin;
|
||||
import org.apache.hadoop.hbase.client.Connection;
|
||||
import org.apache.hadoop.hbase.client.ConnectionFactory;
|
||||
import org.apache.hadoop.hbase.client.HTable;
|
||||
import org.apache.hadoop.hbase.client.RegionLocator;
|
||||
import org.apache.hadoop.hbase.client.RegionServerCallable;
|
||||
import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory;
|
||||
|
@ -76,32 +101,6 @@ import org.apache.hadoop.hbase.util.Pair;
|
|||
import org.apache.hadoop.util.Tool;
|
||||
import org.apache.hadoop.util.ToolRunner;
|
||||
|
||||
import java.io.FileNotFoundException;
|
||||
import java.io.IOException;
|
||||
import java.io.InterruptedIOException;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.Deque;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.Iterator;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Map.Entry;
|
||||
import java.util.Set;
|
||||
import java.util.TreeMap;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.Callable;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Future;
|
||||
import java.util.concurrent.LinkedBlockingQueue;
|
||||
import java.util.concurrent.ThreadPoolExecutor;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
/**
|
||||
* Tool to load the output of HFileOutputFormat into an existing table.
|
||||
* @see #usage()
|
||||
|
@ -165,7 +164,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
|
|||
+ "\n");
|
||||
}
|
||||
|
||||
private static interface BulkHFileVisitor<TFamily> {
|
||||
private interface BulkHFileVisitor<TFamily> {
|
||||
TFamily bulkFamily(final byte[] familyName)
|
||||
throws IOException;
|
||||
void bulkHFile(final TFamily family, final FileStatus hfileStatus)
|
||||
|
@ -308,25 +307,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
|
|||
* pre-existing table. This method is not threadsafe.
|
||||
*
|
||||
* @param hfofDir the directory that was provided as the output path
|
||||
* of a job using HFileOutputFormat
|
||||
* @param table the table to load into
|
||||
* @throws TableNotFoundException if table does not yet exist
|
||||
*/
|
||||
@SuppressWarnings("deprecation")
|
||||
public void doBulkLoad(Path hfofDir, final HTable table)
|
||||
throws TableNotFoundException, IOException {
|
||||
try (Admin admin = table.getConnection().getAdmin();
|
||||
RegionLocator rl = table.getRegionLocator()) {
|
||||
doBulkLoad(hfofDir, admin, table, rl);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Perform a bulk load of the given directory into the given
|
||||
* pre-existing table. This method is not threadsafe.
|
||||
*
|
||||
* @param hfofDir the directory that was provided as the output path
|
||||
* of a job using HFileOutputFormat
|
||||
* of a job using HFileOutputFormat
|
||||
* @param table the table to load into
|
||||
* @throws TableNotFoundException if table does not yet exist
|
||||
*/
|
||||
|
@ -341,7 +322,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
|
|||
|
||||
// LQI queue does not need to be threadsafe -- all operations on this queue
|
||||
// happen in this thread
|
||||
Deque<LoadQueueItem> queue = new LinkedList<LoadQueueItem>();
|
||||
Deque<LoadQueueItem> queue = new LinkedList<>();
|
||||
try {
|
||||
/*
|
||||
* Checking hfile format is a time-consuming operation, we should have an option to skip
|
||||
|
@ -426,8 +407,8 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
|
|||
}
|
||||
|
||||
if (queue != null && !queue.isEmpty()) {
|
||||
throw new RuntimeException("Bulk load aborted with some files not yet loaded."
|
||||
+ "Please check log for more details.");
|
||||
throw new RuntimeException("Bulk load aborted with some files not yet loaded."
|
||||
+ "Please check log for more details.");
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -563,9 +544,9 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
|
|||
private boolean checkHFilesCountPerRegionPerFamily(
|
||||
final Multimap<ByteBuffer, LoadQueueItem> regionGroups) {
|
||||
for (Entry<ByteBuffer,
|
||||
? extends Collection<LoadQueueItem>> e: regionGroups.asMap().entrySet()) {
|
||||
? extends Collection<LoadQueueItem>> e: regionGroups.asMap().entrySet()) {
|
||||
final Collection<LoadQueueItem> lqis = e.getValue();
|
||||
HashMap<byte[], MutableInt> filesMap = new HashMap<byte[], MutableInt>();
|
||||
HashMap<byte[], MutableInt> filesMap = new HashMap<>();
|
||||
for (LoadQueueItem lqi: lqis) {
|
||||
MutableInt count = filesMap.get(lqi.family);
|
||||
if (count == null) {
|
||||
|
@ -597,7 +578,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
|
|||
final Multimap<ByteBuffer, LoadQueueItem> regionGroups = Multimaps.synchronizedMultimap(rgs);
|
||||
|
||||
// drain LQIs and figure out bulk load groups
|
||||
Set<Future<List<LoadQueueItem>>> splittingFutures = new HashSet<Future<List<LoadQueueItem>>>();
|
||||
Set<Future<List<LoadQueueItem>>> splittingFutures = new HashSet<>();
|
||||
while (!queue.isEmpty()) {
|
||||
final LoadQueueItem item = queue.remove();
|
||||
|
||||
|
@ -650,7 +631,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
|
|||
}
|
||||
|
||||
LOG.info("HFile at " + hfilePath + " no longer fits inside a single " +
|
||||
"region. Splitting...");
|
||||
"region. Splitting...");
|
||||
|
||||
String uniqueName = getUniqueName();
|
||||
HColumnDescriptor familyDesc = table.getTableDescriptor().getFamily(item.family);
|
||||
|
@ -692,7 +673,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
|
|||
* LQI's corresponding to the resultant hfiles.
|
||||
*
|
||||
* protected for testing
|
||||
* @throws IOException
|
||||
* @throws IOException if an IO failure is encountered
|
||||
*/
|
||||
protected List<LoadQueueItem> groupOrSplit(Multimap<ByteBuffer, LoadQueueItem> regionGroups,
|
||||
final LoadQueueItem item, final Table table,
|
||||
|
@ -786,13 +767,13 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
|
|||
* Protected for testing.
|
||||
*
|
||||
* @return empty list if success, list of items to retry on recoverable
|
||||
* failure
|
||||
* failure
|
||||
*/
|
||||
protected List<LoadQueueItem> tryAtomicRegionLoad(final Connection conn,
|
||||
final TableName tableName, final byte[] first, final Collection<LoadQueueItem> lqis)
|
||||
throws IOException {
|
||||
final List<Pair<byte[], String>> famPaths =
|
||||
new ArrayList<Pair<byte[], String>>(lqis.size());
|
||||
new ArrayList<>(lqis.size());
|
||||
for (LoadQueueItem lqi : lqis) {
|
||||
famPaths.add(Pair.newPair(lqi.family, lqi.hfilePath.toString()));
|
||||
}
|
||||
|
@ -857,7 +838,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
|
|||
};
|
||||
|
||||
try {
|
||||
List<LoadQueueItem> toRetry = new ArrayList<LoadQueueItem>();
|
||||
List<LoadQueueItem> toRetry = new ArrayList<>();
|
||||
Configuration conf = getConf();
|
||||
boolean success = RpcRetryingCallerFactory.instantiate(conf,
|
||||
null).<Boolean> newCaller()
|
||||
|
@ -890,8 +871,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
|
|||
static void splitStoreFile(
|
||||
Configuration conf, Path inFile,
|
||||
HColumnDescriptor familyDesc, byte[] splitKey,
|
||||
Path bottomOut, Path topOut) throws IOException
|
||||
{
|
||||
Path bottomOut, Path topOut) throws IOException {
|
||||
// Open reader with no block cache, and not in-memory
|
||||
Reference topReference = Reference.createTopReference(splitKey);
|
||||
Reference bottomReference = Reference.createBottomReference(splitKey);
|
||||
|
@ -944,8 +924,12 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
|
|||
}
|
||||
}
|
||||
} finally {
|
||||
if (halfWriter != null) halfWriter.close();
|
||||
if (halfReader != null) halfReader.close(cacheConf.shouldEvictOnClose());
|
||||
if (halfWriter != null) {
|
||||
halfWriter.close();
|
||||
}
|
||||
if (halfReader != null) {
|
||||
halfReader.close(cacheConf.shouldEvictOnClose());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -972,16 +956,20 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
|
|||
* 2) Return the boundary list.
|
||||
*/
|
||||
public static byte[][] inferBoundaries(TreeMap<byte[], Integer> bdryMap) {
|
||||
ArrayList<byte[]> keysArray = new ArrayList<byte[]>();
|
||||
ArrayList<byte[]> keysArray = new ArrayList<>();
|
||||
int runningValue = 0;
|
||||
byte[] currStartKey = null;
|
||||
boolean firstBoundary = true;
|
||||
|
||||
for (Map.Entry<byte[], Integer> item: bdryMap.entrySet()) {
|
||||
if (runningValue == 0) currStartKey = item.getKey();
|
||||
if (runningValue == 0) {
|
||||
currStartKey = item.getKey();
|
||||
}
|
||||
runningValue += item.getValue();
|
||||
if (runningValue == 0) {
|
||||
if (!firstBoundary) keysArray.add(currStartKey);
|
||||
if (!firstBoundary) {
|
||||
keysArray.add(currStartKey);
|
||||
}
|
||||
firstBoundary = false;
|
||||
}
|
||||
}
|
||||
|
@ -1000,7 +988,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
|
|||
// Add column families
|
||||
// Build a set of keys
|
||||
final HTableDescriptor htd = new HTableDescriptor(tableName);
|
||||
final TreeMap<byte[], Integer> map = new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR);
|
||||
final TreeMap<byte[], Integer> map = new TreeMap<>(Bytes.BYTES_COMPARATOR);
|
||||
visitBulkHFiles(fs, hfofDir, new BulkHFileVisitor<HColumnDescriptor>() {
|
||||
@Override
|
||||
public HColumnDescriptor bulkFamily(final byte[] familyName) {
|
||||
|
@ -1073,8 +1061,8 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
|
|||
Path hfofDir = new Path(dirPath);
|
||||
|
||||
try (Table table = connection.getTable(tableName);
|
||||
RegionLocator locator = connection.getRegionLocator(tableName)) {
|
||||
doBulkLoad(hfofDir, admin, table, locator);
|
||||
RegionLocator locator = connection.getRegionLocator(tableName)) {
|
||||
doBulkLoad(hfofDir, admin, table, locator);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -50,7 +50,6 @@ import org.apache.hadoop.hbase.TagType;
|
|||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.client.Connection;
|
||||
import org.apache.hadoop.hbase.client.ConnectionFactory;
|
||||
import org.apache.hadoop.hbase.client.HTable;
|
||||
import org.apache.hadoop.hbase.client.Scan;
|
||||
import org.apache.hadoop.hbase.client.Table;
|
||||
import org.apache.hadoop.hbase.io.HFileLink;
|
||||
|
@ -90,10 +89,10 @@ public class PartitionedMobCompactor extends MobCompactor {
|
|||
protected int compactionBatchSize;
|
||||
protected int compactionKVMax;
|
||||
|
||||
private Path tempPath;
|
||||
private Path bulkloadPath;
|
||||
private CacheConfig compactionCacheConfig;
|
||||
private Tag tableNameTag;
|
||||
private final Path tempPath;
|
||||
private final Path bulkloadPath;
|
||||
private final CacheConfig compactionCacheConfig;
|
||||
private final Tag tableNameTag;
|
||||
private Encryption.Context cryptoContext = Encryption.Context.NONE;
|
||||
|
||||
public PartitionedMobCompactor(Configuration conf, FileSystem fs, TableName tableName,
|
||||
|
@ -137,13 +136,12 @@ public class PartitionedMobCompactor extends MobCompactor {
|
|||
* @param candidates All the candidates.
|
||||
* @param allFiles Whether add all mob files into the compaction.
|
||||
* @return A compaction request.
|
||||
* @throws IOException
|
||||
* @throws IOException if IO failure is encountered
|
||||
*/
|
||||
protected PartitionedMobCompactionRequest select(List<FileStatus> candidates,
|
||||
boolean allFiles) throws IOException {
|
||||
Collection<FileStatus> allDelFiles = new ArrayList<FileStatus>();
|
||||
Map<CompactionPartitionId, CompactionPartition> filesToCompact =
|
||||
new HashMap<CompactionPartitionId, CompactionPartition>();
|
||||
Collection<FileStatus> allDelFiles = new ArrayList<>();
|
||||
Map<CompactionPartitionId, CompactionPartition> filesToCompact = new HashMap<>();
|
||||
int selectedFileCount = 0;
|
||||
int irrelevantFileCount = 0;
|
||||
for (FileStatus file : candidates) {
|
||||
|
@ -202,17 +200,17 @@ public class PartitionedMobCompactor extends MobCompactor {
|
|||
* </ol>
|
||||
* @param request The compaction request.
|
||||
* @return The paths of new mob files generated in the compaction.
|
||||
* @throws IOException
|
||||
* @throws IOException if IO failure is encountered
|
||||
*/
|
||||
protected List<Path> performCompaction(PartitionedMobCompactionRequest request)
|
||||
throws IOException {
|
||||
// merge the del files
|
||||
List<Path> delFilePaths = new ArrayList<Path>();
|
||||
List<Path> delFilePaths = new ArrayList<>();
|
||||
for (FileStatus delFile : request.delFiles) {
|
||||
delFilePaths.add(delFile.getPath());
|
||||
}
|
||||
List<Path> newDelPaths = compactDelFiles(request, delFilePaths);
|
||||
List<StoreFile> newDelFiles = new ArrayList<StoreFile>();
|
||||
List<StoreFile> newDelFiles = new ArrayList<>();
|
||||
List<Path> paths = null;
|
||||
try {
|
||||
for (Path newDelPath : newDelPaths) {
|
||||
|
@ -247,7 +245,7 @@ public class PartitionedMobCompactor extends MobCompactor {
|
|||
* @param request The compaction request.
|
||||
* @param delFiles The del files.
|
||||
* @return The paths of new mob files after compactions.
|
||||
* @throws IOException
|
||||
* @throws IOException if IO failure is encountered
|
||||
*/
|
||||
protected List<Path> compactMobFiles(final PartitionedMobCompactionRequest request,
|
||||
final List<StoreFile> delFiles) throws IOException {
|
||||
|
@ -256,24 +254,23 @@ public class PartitionedMobCompactor extends MobCompactor {
|
|||
LOG.info("No partitions of mob files");
|
||||
return Collections.emptyList();
|
||||
}
|
||||
List<Path> paths = new ArrayList<Path>();
|
||||
Connection c = ConnectionFactory.createConnection(conf);
|
||||
List<Path> paths = new ArrayList<>();
|
||||
final Connection c = ConnectionFactory.createConnection(conf);
|
||||
final Table table = c.getTable(tableName);
|
||||
try {
|
||||
Map<CompactionPartitionId, Future<List<Path>>> results =
|
||||
new HashMap<CompactionPartitionId, Future<List<Path>>>();
|
||||
Map<CompactionPartitionId, Future<List<Path>>> results = new HashMap<>();
|
||||
// compact the mob files by partitions in parallel.
|
||||
for (final CompactionPartition partition : partitions) {
|
||||
results.put(partition.getPartitionId(), pool.submit(new Callable<List<Path>>() {
|
||||
@Override
|
||||
public List<Path> call() throws Exception {
|
||||
LOG.info("Compacting mob files for partition " + partition.getPartitionId());
|
||||
return compactMobFilePartition(request, partition, delFiles, table);
|
||||
return compactMobFilePartition(request, partition, delFiles, c, table);
|
||||
}
|
||||
}));
|
||||
}
|
||||
// compact the partitions in parallel.
|
||||
List<CompactionPartitionId> failedPartitions = new ArrayList<CompactionPartitionId>();
|
||||
List<CompactionPartitionId> failedPartitions = new ArrayList<>();
|
||||
for (Entry<CompactionPartitionId, Future<List<Path>>> result : results.entrySet()) {
|
||||
try {
|
||||
paths.addAll(result.getValue().get());
|
||||
|
@ -291,7 +288,7 @@ public class PartitionedMobCompactor extends MobCompactor {
|
|||
try {
|
||||
table.close();
|
||||
} catch (IOException e) {
|
||||
LOG.error("Failed to close the HTable", e);
|
||||
LOG.error("Failed to close the Table", e);
|
||||
}
|
||||
}
|
||||
return paths;
|
||||
|
@ -302,13 +299,16 @@ public class PartitionedMobCompactor extends MobCompactor {
|
|||
* @param request The compaction request.
|
||||
* @param partition A compaction partition.
|
||||
* @param delFiles The del files.
|
||||
* @param table The current table.
|
||||
* @return The paths of new mob files after compactions.
|
||||
* @throws IOException
|
||||
* @param connection to use
|
||||
* @param table The current table. @return The paths of new mob files after compactions.
|
||||
* @throws IOException if IO failure is encountered
|
||||
*/
|
||||
private List<Path> compactMobFilePartition(PartitionedMobCompactionRequest request,
|
||||
CompactionPartition partition, List<StoreFile> delFiles, Table table) throws IOException {
|
||||
List<Path> newFiles = new ArrayList<Path>();
|
||||
CompactionPartition partition,
|
||||
List<StoreFile> delFiles,
|
||||
Connection connection,
|
||||
Table table) throws IOException {
|
||||
List<Path> newFiles = new ArrayList<>();
|
||||
List<FileStatus> files = partition.listFiles();
|
||||
int offset = 0;
|
||||
Path bulkloadPathOfPartition = new Path(bulkloadPath, partition.getPartitionId().toString());
|
||||
|
@ -328,7 +328,7 @@ public class PartitionedMobCompactor extends MobCompactor {
|
|||
// clean the bulkload directory to avoid loading old files.
|
||||
fs.delete(bulkloadPathOfPartition, true);
|
||||
// add the selected mob files and del files into filesToCompact
|
||||
List<StoreFile> filesToCompact = new ArrayList<StoreFile>();
|
||||
List<StoreFile> filesToCompact = new ArrayList<>();
|
||||
for (int i = offset; i < batch + offset; i++) {
|
||||
StoreFile sf = new StoreFile(fs, files.get(i).getPath(), conf, compactionCacheConfig,
|
||||
BloomType.NONE);
|
||||
|
@ -336,7 +336,7 @@ public class PartitionedMobCompactor extends MobCompactor {
|
|||
}
|
||||
filesToCompact.addAll(delFiles);
|
||||
// compact the mob files in a batch.
|
||||
compactMobFilesInBatch(request, partition, table, filesToCompact, batch,
|
||||
compactMobFilesInBatch(request, partition, connection, table, filesToCompact, batch,
|
||||
bulkloadPathOfPartition, bulkloadColumnPath, newFiles);
|
||||
// move to the next batch.
|
||||
offset += batch;
|
||||
|
@ -364,19 +364,23 @@ public class PartitionedMobCompactor extends MobCompactor {
|
|||
* Compacts a partition of selected small mob files and all the del files in a batch.
|
||||
* @param request The compaction request.
|
||||
* @param partition A compaction partition.
|
||||
* @param connection To use for transport
|
||||
* @param table The current table.
|
||||
* @param filesToCompact The files to be compacted.
|
||||
* @param batch The number of mob files to be compacted in a batch.
|
||||
* @param bulkloadPathOfPartition The directory where the bulkload column of the current
|
||||
* partition is saved.
|
||||
* partition is saved.
|
||||
* @param bulkloadColumnPath The directory where the bulkload files of current partition
|
||||
* are saved.
|
||||
* are saved.
|
||||
* @param newFiles The paths of new mob files after compactions.
|
||||
* @throws IOException
|
||||
* @throws IOException if IO failure is encountered
|
||||
*/
|
||||
private void compactMobFilesInBatch(PartitionedMobCompactionRequest request,
|
||||
CompactionPartition partition, Table table, List<StoreFile> filesToCompact, int batch,
|
||||
Path bulkloadPathOfPartition, Path bulkloadColumnPath, List<Path> newFiles)
|
||||
CompactionPartition partition,
|
||||
Connection connection, Table table,
|
||||
List<StoreFile> filesToCompact, int batch,
|
||||
Path bulkloadPathOfPartition, Path bulkloadColumnPath,
|
||||
List<Path> newFiles)
|
||||
throws IOException {
|
||||
// open scanner to the selected mob files and del files.
|
||||
StoreScanner scanner = createScanner(filesToCompact, ScanType.COMPACT_DROP_DELETES);
|
||||
|
@ -400,8 +404,8 @@ public class PartitionedMobCompactor extends MobCompactor {
|
|||
refFileWriter = MobUtils.createRefFileWriter(conf, fs, column, bulkloadColumnPath, fileInfo
|
||||
.getSecond().longValue(), compactionCacheConfig, cryptoContext);
|
||||
refFilePath = refFileWriter.getPath();
|
||||
List<Cell> cells = new ArrayList<Cell>();
|
||||
boolean hasMore = false;
|
||||
List<Cell> cells = new ArrayList<>();
|
||||
boolean hasMore;
|
||||
ScannerContext scannerContext =
|
||||
ScannerContext.newBuilder().setBatchLimit(compactionKVMax).build();
|
||||
do {
|
||||
|
@ -428,7 +432,7 @@ public class PartitionedMobCompactor extends MobCompactor {
|
|||
// commit mob file
|
||||
MobUtils.commitFile(conf, fs, filePath, mobFamilyDir, compactionCacheConfig);
|
||||
// bulkload the ref file
|
||||
bulkloadRefFile(table, bulkloadPathOfPartition, filePath.getName());
|
||||
bulkloadRefFile(connection, table, bulkloadPathOfPartition, filePath.getName());
|
||||
newFiles.add(new Path(mobFamilyDir, filePath.getName()));
|
||||
} else {
|
||||
// remove the new files
|
||||
|
@ -450,10 +454,10 @@ public class PartitionedMobCompactor extends MobCompactor {
|
|||
/**
|
||||
* Compacts the del files in batches which avoids opening too many files.
|
||||
* @param request The compaction request.
|
||||
* @param delFilePaths
|
||||
* @param delFilePaths Del file paths to compact
|
||||
* @return The paths of new del files after merging or the original files if no merging
|
||||
* is necessary.
|
||||
* @throws IOException
|
||||
* @throws IOException if IO failure is encountered
|
||||
*/
|
||||
protected List<Path> compactDelFiles(PartitionedMobCompactionRequest request,
|
||||
List<Path> delFilePaths) throws IOException {
|
||||
|
@ -462,14 +466,14 @@ public class PartitionedMobCompactor extends MobCompactor {
|
|||
}
|
||||
// when there are more del files than the number that is allowed, merge it firstly.
|
||||
int offset = 0;
|
||||
List<Path> paths = new ArrayList<Path>();
|
||||
List<Path> paths = new ArrayList<>();
|
||||
while (offset < delFilePaths.size()) {
|
||||
// get the batch
|
||||
int batch = compactionBatchSize;
|
||||
if (delFilePaths.size() - offset < compactionBatchSize) {
|
||||
batch = delFilePaths.size() - offset;
|
||||
}
|
||||
List<StoreFile> batchedDelFiles = new ArrayList<StoreFile>();
|
||||
List<StoreFile> batchedDelFiles = new ArrayList<>();
|
||||
if (batch == 1) {
|
||||
// only one file left, do not compact it, directly add it to the new files.
|
||||
paths.add(delFilePaths.get(offset));
|
||||
|
@ -493,7 +497,7 @@ public class PartitionedMobCompactor extends MobCompactor {
|
|||
* @param request The compaction request.
|
||||
* @param delFiles The del files.
|
||||
* @return The path of new del file after merging.
|
||||
* @throws IOException
|
||||
* @throws IOException if IO failure is encountered
|
||||
*/
|
||||
private Path compactDelFilesInBatch(PartitionedMobCompactionRequest request,
|
||||
List<StoreFile> delFiles) throws IOException {
|
||||
|
@ -507,8 +511,8 @@ public class PartitionedMobCompactor extends MobCompactor {
|
|||
column.getCompactionCompression(), HConstants.EMPTY_START_ROW, compactionCacheConfig,
|
||||
cryptoContext);
|
||||
filePath = writer.getPath();
|
||||
List<Cell> cells = new ArrayList<Cell>();
|
||||
boolean hasMore = false;
|
||||
List<Cell> cells = new ArrayList<>();
|
||||
boolean hasMore;
|
||||
ScannerContext scannerContext =
|
||||
ScannerContext.newBuilder().setBatchLimit(compactionKVMax).build();
|
||||
do {
|
||||
|
@ -544,7 +548,7 @@ public class PartitionedMobCompactor extends MobCompactor {
|
|||
* @param filesToCompact The files to be compacted.
|
||||
* @param scanType The scan type.
|
||||
* @return The store scanner.
|
||||
* @throws IOException
|
||||
* @throws IOException if IO failure is encountered
|
||||
*/
|
||||
private StoreScanner createScanner(List<StoreFile> filesToCompact, ScanType scanType)
|
||||
throws IOException {
|
||||
|
@ -561,17 +565,23 @@ public class PartitionedMobCompactor extends MobCompactor {
|
|||
|
||||
/**
|
||||
* Bulkloads the current file.
|
||||
*
|
||||
* @param connection to use to get admin/RegionLocator
|
||||
* @param table The current table.
|
||||
* @param bulkloadDirectory The path of bulkload directory.
|
||||
* @param fileName The current file name.
|
||||
* @throws IOException
|
||||
* @throws IOException if IO failure is encountered
|
||||
*/
|
||||
private void bulkloadRefFile(Table table, Path bulkloadDirectory, String fileName)
|
||||
private void bulkloadRefFile(Connection connection, Table table, Path bulkloadDirectory,
|
||||
String fileName)
|
||||
throws IOException {
|
||||
// bulkload the ref file
|
||||
try {
|
||||
LoadIncrementalHFiles bulkload = new LoadIncrementalHFiles(conf);
|
||||
bulkload.doBulkLoad(bulkloadDirectory, (HTable)table);
|
||||
bulkload.doBulkLoad(bulkloadDirectory,
|
||||
connection.getAdmin(),
|
||||
table,
|
||||
connection.getRegionLocator(table.getName()));
|
||||
} catch (Exception e) {
|
||||
// delete the committed mob file
|
||||
deletePath(new Path(mobFamilyDir, fileName));
|
||||
|
@ -587,7 +597,7 @@ public class PartitionedMobCompactor extends MobCompactor {
|
|||
* @param writer The mob file writer.
|
||||
* @param maxSeqId Maximum sequence id.
|
||||
* @param mobCellsCount The number of mob cells.
|
||||
* @throws IOException
|
||||
* @throws IOException if IO failure is encountered
|
||||
*/
|
||||
private void closeMobFileWriter(StoreFileWriter writer, long maxSeqId, long mobCellsCount)
|
||||
throws IOException {
|
||||
|
@ -606,7 +616,7 @@ public class PartitionedMobCompactor extends MobCompactor {
|
|||
* @param writer The ref file writer.
|
||||
* @param maxSeqId Maximum sequence id.
|
||||
* @param bulkloadTime The timestamp at which the bulk load file is created.
|
||||
* @throws IOException
|
||||
* @throws IOException if IO failure is encountered
|
||||
*/
|
||||
private void closeRefFileWriter(StoreFileWriter writer, long maxSeqId, long bulkloadTime)
|
||||
throws IOException {
|
||||
|
@ -626,7 +636,7 @@ public class PartitionedMobCompactor extends MobCompactor {
|
|||
* Gets the max seqId and number of cells of the store files.
|
||||
* @param storeFiles The store files.
|
||||
* @return The pair of the max seqId and number of cells of the store files.
|
||||
* @throws IOException
|
||||
* @throws IOException if IO failure is encountered
|
||||
*/
|
||||
private Pair<Long, Long> getFileInfo(List<StoreFile> storeFiles) throws IOException {
|
||||
long maxSeqId = 0;
|
||||
|
@ -639,7 +649,7 @@ public class PartitionedMobCompactor extends MobCompactor {
|
|||
maxKeyCount += Bytes.toLong(count);
|
||||
}
|
||||
}
|
||||
return new Pair<Long, Long>(Long.valueOf(maxSeqId), Long.valueOf(maxKeyCount));
|
||||
return new Pair<>(Long.valueOf(maxSeqId), Long.valueOf(maxKeyCount));
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
Loading…
Reference in New Issue