HBASE-27733 hfile split occurs during bulkload, the new HFile file does not specify favored nodes (#5121)
Co-authored-by: alanzhao <alanzhao@126.com> Signed-off-by: Wellington Chevreuil <wchevreuil@apache.org>
This commit is contained in:
parent
5d82d4f746
commit
5cea8112fd
|
@ -22,6 +22,7 @@ import static java.lang.String.format;
|
||||||
import java.io.FileNotFoundException;
|
import java.io.FileNotFoundException;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.InterruptedIOException;
|
import java.io.InterruptedIOException;
|
||||||
|
import java.net.InetSocketAddress;
|
||||||
import java.nio.ByteBuffer;
|
import java.nio.ByteBuffer;
|
||||||
import java.util.ArrayDeque;
|
import java.util.ArrayDeque;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
|
@ -56,13 +57,17 @@ import org.apache.hadoop.fs.FileStatus;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.fs.permission.FsPermission;
|
import org.apache.hadoop.fs.permission.FsPermission;
|
||||||
|
import org.apache.hadoop.hbase.Cell;
|
||||||
|
import org.apache.hadoop.hbase.CellUtil;
|
||||||
import org.apache.hadoop.hbase.HBaseConfiguration;
|
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||||
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
|
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
|
||||||
import org.apache.hadoop.hbase.HConstants;
|
import org.apache.hadoop.hbase.HConstants;
|
||||||
|
import org.apache.hadoop.hbase.HRegionLocation;
|
||||||
import org.apache.hadoop.hbase.TableName;
|
import org.apache.hadoop.hbase.TableName;
|
||||||
import org.apache.hadoop.hbase.TableNotFoundException;
|
import org.apache.hadoop.hbase.TableNotFoundException;
|
||||||
import org.apache.hadoop.hbase.client.AsyncAdmin;
|
import org.apache.hadoop.hbase.client.AsyncAdmin;
|
||||||
import org.apache.hadoop.hbase.client.AsyncClusterConnection;
|
import org.apache.hadoop.hbase.client.AsyncClusterConnection;
|
||||||
|
import org.apache.hadoop.hbase.client.AsyncTableRegionLocator;
|
||||||
import org.apache.hadoop.hbase.client.ClusterConnectionFactory;
|
import org.apache.hadoop.hbase.client.ClusterConnectionFactory;
|
||||||
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
|
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
|
||||||
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
|
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
|
||||||
|
@ -114,6 +119,13 @@ public class BulkLoadHFilesTool extends Configured implements BulkLoadHFiles, To
|
||||||
|
|
||||||
private static final Logger LOG = LoggerFactory.getLogger(BulkLoadHFilesTool.class);
|
private static final Logger LOG = LoggerFactory.getLogger(BulkLoadHFilesTool.class);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Keep locality while generating HFiles for bulkload. See HBASE-12596
|
||||||
|
*/
|
||||||
|
public static final String LOCALITY_SENSITIVE_CONF_KEY =
|
||||||
|
"hbase.bulkload.locality.sensitive.enabled";
|
||||||
|
private static final boolean DEFAULT_LOCALITY_SENSITIVE = true;
|
||||||
|
|
||||||
public static final String NAME = "completebulkload";
|
public static final String NAME = "completebulkload";
|
||||||
/**
|
/**
|
||||||
* Whether to run validation on hfiles before loading.
|
* Whether to run validation on hfiles before loading.
|
||||||
|
@ -540,7 +552,6 @@ public class BulkLoadHFilesTool extends Configured implements BulkLoadHFiles, To
|
||||||
Set<Future<Pair<List<LoadQueueItem>, String>>> splittingFutures = new HashSet<>();
|
Set<Future<Pair<List<LoadQueueItem>, String>>> splittingFutures = new HashSet<>();
|
||||||
while (!queue.isEmpty()) {
|
while (!queue.isEmpty()) {
|
||||||
final LoadQueueItem item = queue.remove();
|
final LoadQueueItem item = queue.remove();
|
||||||
|
|
||||||
final Callable<Pair<List<LoadQueueItem>, String>> call =
|
final Callable<Pair<List<LoadQueueItem>, String>> call =
|
||||||
() -> groupOrSplit(conn, tableName, regionGroups, item, startEndKeys);
|
() -> groupOrSplit(conn, tableName, regionGroups, item, startEndKeys);
|
||||||
splittingFutures.add(pool.submit(call));
|
splittingFutures.add(pool.submit(call));
|
||||||
|
@ -578,8 +589,8 @@ public class BulkLoadHFilesTool extends Configured implements BulkLoadHFiles, To
|
||||||
return UUID.randomUUID().toString().replaceAll("-", "");
|
return UUID.randomUUID().toString().replaceAll("-", "");
|
||||||
}
|
}
|
||||||
|
|
||||||
private List<LoadQueueItem> splitStoreFile(LoadQueueItem item, TableDescriptor tableDesc,
|
private List<LoadQueueItem> splitStoreFile(AsyncTableRegionLocator loc, LoadQueueItem item,
|
||||||
byte[] splitKey) throws IOException {
|
TableDescriptor tableDesc, byte[] splitKey) throws IOException {
|
||||||
Path hfilePath = item.getFilePath();
|
Path hfilePath = item.getFilePath();
|
||||||
byte[] family = item.getFamily();
|
byte[] family = item.getFamily();
|
||||||
Path tmpDir = hfilePath.getParent();
|
Path tmpDir = hfilePath.getParent();
|
||||||
|
@ -594,7 +605,8 @@ public class BulkLoadHFilesTool extends Configured implements BulkLoadHFiles, To
|
||||||
|
|
||||||
Path botOut = new Path(tmpDir, uniqueName + ".bottom");
|
Path botOut = new Path(tmpDir, uniqueName + ".bottom");
|
||||||
Path topOut = new Path(tmpDir, uniqueName + ".top");
|
Path topOut = new Path(tmpDir, uniqueName + ".top");
|
||||||
splitStoreFile(getConf(), hfilePath, familyDesc, splitKey, botOut, topOut);
|
|
||||||
|
splitStoreFile(loc, getConf(), hfilePath, familyDesc, splitKey, botOut, topOut);
|
||||||
|
|
||||||
FileSystem fs = tmpDir.getFileSystem(getConf());
|
FileSystem fs = tmpDir.getFileSystem(getConf());
|
||||||
fs.setPermission(tmpDir, FsPermission.valueOf("-rwxrwxrwx"));
|
fs.setPermission(tmpDir, FsPermission.valueOf("-rwxrwxrwx"));
|
||||||
|
@ -718,8 +730,9 @@ public class BulkLoadHFilesTool extends Configured implements BulkLoadHFiles, To
|
||||||
checkRegionIndexValid(splitIdx, startEndKeys, tableName);
|
checkRegionIndexValid(splitIdx, startEndKeys, tableName);
|
||||||
}
|
}
|
||||||
byte[] splitPoint = startEndKeys.get(splitIdx).getSecond();
|
byte[] splitPoint = startEndKeys.get(splitIdx).getSecond();
|
||||||
List<LoadQueueItem> lqis =
|
List<LoadQueueItem> lqis = splitStoreFile(conn.getRegionLocator(tableName), item,
|
||||||
splitStoreFile(item, FutureUtils.get(conn.getAdmin().getDescriptor(tableName)), splitPoint);
|
FutureUtils.get(conn.getAdmin().getDescriptor(tableName)), splitPoint);
|
||||||
|
|
||||||
return new Pair<>(lqis, null);
|
return new Pair<>(lqis, null);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -729,25 +742,27 @@ public class BulkLoadHFilesTool extends Configured implements BulkLoadHFiles, To
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Split a storefile into a top and bottom half, maintaining the metadata, recreating bloom
|
* Split a storefile into a top and bottom half with favored nodes, maintaining the metadata,
|
||||||
* filters, etc.
|
* recreating bloom filters, etc.
|
||||||
*/
|
*/
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
static void splitStoreFile(Configuration conf, Path inFile, ColumnFamilyDescriptor familyDesc,
|
static void splitStoreFile(AsyncTableRegionLocator loc, Configuration conf, Path inFile,
|
||||||
byte[] splitKey, Path bottomOut, Path topOut) throws IOException {
|
ColumnFamilyDescriptor familyDesc, byte[] splitKey, Path bottomOut, Path topOut)
|
||||||
|
throws IOException {
|
||||||
// Open reader with no block cache, and not in-memory
|
// Open reader with no block cache, and not in-memory
|
||||||
Reference topReference = Reference.createTopReference(splitKey);
|
Reference topReference = Reference.createTopReference(splitKey);
|
||||||
Reference bottomReference = Reference.createBottomReference(splitKey);
|
Reference bottomReference = Reference.createBottomReference(splitKey);
|
||||||
|
|
||||||
copyHFileHalf(conf, inFile, topOut, topReference, familyDesc);
|
copyHFileHalf(conf, inFile, topOut, topReference, familyDesc, loc);
|
||||||
copyHFileHalf(conf, inFile, bottomOut, bottomReference, familyDesc);
|
copyHFileHalf(conf, inFile, bottomOut, bottomReference, familyDesc, loc);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Copy half of an HFile into a new HFile.
|
* Copy half of an HFile into a new HFile with favored nodes.
|
||||||
*/
|
*/
|
||||||
private static void copyHFileHalf(Configuration conf, Path inFile, Path outFile,
|
private static void copyHFileHalf(Configuration conf, Path inFile, Path outFile,
|
||||||
Reference reference, ColumnFamilyDescriptor familyDescriptor) throws IOException {
|
Reference reference, ColumnFamilyDescriptor familyDescriptor, AsyncTableRegionLocator loc)
|
||||||
|
throws IOException {
|
||||||
FileSystem fs = inFile.getFileSystem(conf);
|
FileSystem fs = inFile.getFileSystem(conf);
|
||||||
CacheConfig cacheConf = CacheConfig.DISABLED;
|
CacheConfig cacheConf = CacheConfig.DISABLED;
|
||||||
HalfStoreFileReader halfReader = null;
|
HalfStoreFileReader halfReader = null;
|
||||||
|
@ -769,12 +784,50 @@ public class BulkLoadHFilesTool extends Configured implements BulkLoadHFiles, To
|
||||||
.withBytesPerCheckSum(StoreUtils.getBytesPerChecksum(conf)).withBlockSize(blocksize)
|
.withBytesPerCheckSum(StoreUtils.getBytesPerChecksum(conf)).withBlockSize(blocksize)
|
||||||
.withDataBlockEncoding(familyDescriptor.getDataBlockEncoding()).withIncludesTags(true)
|
.withDataBlockEncoding(familyDescriptor.getDataBlockEncoding()).withIncludesTags(true)
|
||||||
.withCreateTime(EnvironmentEdgeManager.currentTime()).build();
|
.withCreateTime(EnvironmentEdgeManager.currentTime()).build();
|
||||||
halfWriter = new StoreFileWriter.Builder(conf, cacheConf, fs).withFilePath(outFile)
|
|
||||||
.withBloomType(bloomFilterType).withFileContext(hFileContext).build();
|
|
||||||
HFileScanner scanner = halfReader.getScanner(false, false, false);
|
HFileScanner scanner = halfReader.getScanner(false, false, false);
|
||||||
scanner.seekTo();
|
scanner.seekTo();
|
||||||
do {
|
do {
|
||||||
halfWriter.append(scanner.getCell());
|
final Cell cell = scanner.getCell();
|
||||||
|
if (null != halfWriter) {
|
||||||
|
halfWriter.append(cell);
|
||||||
|
} else {
|
||||||
|
|
||||||
|
// init halfwriter
|
||||||
|
if (conf.getBoolean(LOCALITY_SENSITIVE_CONF_KEY, DEFAULT_LOCALITY_SENSITIVE)) {
|
||||||
|
byte[] rowKey = CellUtil.cloneRow(cell);
|
||||||
|
HRegionLocation hRegionLocation = FutureUtils.get(loc.getRegionLocation(rowKey));
|
||||||
|
InetSocketAddress[] favoredNodes = null;
|
||||||
|
if (null == hRegionLocation) {
|
||||||
|
LOG.warn(
|
||||||
|
"Failed get region location for rowkey {} , Using writer without favoured nodes.",
|
||||||
|
Bytes.toString(rowKey));
|
||||||
|
halfWriter = new StoreFileWriter.Builder(conf, cacheConf, fs).withFilePath(outFile)
|
||||||
|
.withBloomType(bloomFilterType).withFileContext(hFileContext).build();
|
||||||
|
} else {
|
||||||
|
LOG.debug("First rowkey: [{}]", Bytes.toString(rowKey));
|
||||||
|
InetSocketAddress initialIsa =
|
||||||
|
new InetSocketAddress(hRegionLocation.getHostname(), hRegionLocation.getPort());
|
||||||
|
if (initialIsa.isUnresolved()) {
|
||||||
|
LOG.warn("Failed get location for region {} , Using writer without favoured nodes.",
|
||||||
|
hRegionLocation);
|
||||||
|
halfWriter = new StoreFileWriter.Builder(conf, cacheConf, fs).withFilePath(outFile)
|
||||||
|
.withBloomType(bloomFilterType).withFileContext(hFileContext).build();
|
||||||
|
} else {
|
||||||
|
LOG.debug("Use favored nodes writer: {}", initialIsa.getHostString());
|
||||||
|
favoredNodes = new InetSocketAddress[] { initialIsa };
|
||||||
|
halfWriter = new StoreFileWriter.Builder(conf, cacheConf, fs).withFilePath(outFile)
|
||||||
|
.withBloomType(bloomFilterType).withFileContext(hFileContext)
|
||||||
|
.withFavoredNodes(favoredNodes).build();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
halfWriter = new StoreFileWriter.Builder(conf, cacheConf, fs).withFilePath(outFile)
|
||||||
|
.withBloomType(bloomFilterType).withFileContext(hFileContext).build();
|
||||||
|
}
|
||||||
|
halfWriter.append(cell);
|
||||||
|
}
|
||||||
|
|
||||||
} while (scanner.next());
|
} while (scanner.next());
|
||||||
|
|
||||||
for (Map.Entry<byte[], byte[]> entry : fileInfo.entrySet()) {
|
for (Map.Entry<byte[], byte[]> entry : fileInfo.entrySet()) {
|
||||||
|
|
|
@ -26,6 +26,7 @@ import static org.junit.Assert.assertTrue;
|
||||||
import static org.junit.Assert.fail;
|
import static org.junit.Assert.fail;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.net.InetAddress;
|
||||||
import java.nio.ByteBuffer;
|
import java.nio.ByteBuffer;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
|
@ -43,10 +44,12 @@ import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||||
import org.apache.hadoop.hbase.HBaseTestingUtil;
|
import org.apache.hadoop.hbase.HBaseTestingUtil;
|
||||||
import org.apache.hadoop.hbase.HConstants;
|
import org.apache.hadoop.hbase.HConstants;
|
||||||
|
import org.apache.hadoop.hbase.HRegionLocation;
|
||||||
import org.apache.hadoop.hbase.NamespaceDescriptor;
|
import org.apache.hadoop.hbase.NamespaceDescriptor;
|
||||||
import org.apache.hadoop.hbase.TableName;
|
import org.apache.hadoop.hbase.TableName;
|
||||||
import org.apache.hadoop.hbase.TableNotFoundException;
|
import org.apache.hadoop.hbase.TableNotFoundException;
|
||||||
import org.apache.hadoop.hbase.client.AsyncClusterConnection;
|
import org.apache.hadoop.hbase.client.AsyncClusterConnection;
|
||||||
|
import org.apache.hadoop.hbase.client.AsyncTableRegionLocator;
|
||||||
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
|
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
|
||||||
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
|
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
|
||||||
import org.apache.hadoop.hbase.client.Table;
|
import org.apache.hadoop.hbase.client.Table;
|
||||||
|
@ -63,7 +66,12 @@ import org.apache.hadoop.hbase.testclassification.LargeTests;
|
||||||
import org.apache.hadoop.hbase.testclassification.MiscTests;
|
import org.apache.hadoop.hbase.testclassification.MiscTests;
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
import org.apache.hadoop.hbase.util.CommonFSUtils;
|
import org.apache.hadoop.hbase.util.CommonFSUtils;
|
||||||
|
import org.apache.hadoop.hbase.util.FutureUtils;
|
||||||
import org.apache.hadoop.hbase.util.HFileTestUtil;
|
import org.apache.hadoop.hbase.util.HFileTestUtil;
|
||||||
|
import org.apache.hadoop.hdfs.DistributedFileSystem;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
|
||||||
import org.hamcrest.MatcherAssert;
|
import org.hamcrest.MatcherAssert;
|
||||||
import org.junit.AfterClass;
|
import org.junit.AfterClass;
|
||||||
import org.junit.BeforeClass;
|
import org.junit.BeforeClass;
|
||||||
|
@ -555,34 +563,70 @@ public class TestBulkLoadHFiles {
|
||||||
FileSystem fs = util.getTestFileSystem();
|
FileSystem fs = util.getTestFileSystem();
|
||||||
Path testIn = new Path(dir, "testhfile");
|
Path testIn = new Path(dir, "testhfile");
|
||||||
ColumnFamilyDescriptor familyDesc = ColumnFamilyDescriptorBuilder.of(FAMILY);
|
ColumnFamilyDescriptor familyDesc = ColumnFamilyDescriptorBuilder.of(FAMILY);
|
||||||
|
String tableName = tn.getMethodName();
|
||||||
|
util.createTable(TableName.valueOf(tableName), familyDesc.getNameAsString());
|
||||||
HFileTestUtil.createHFile(util.getConfiguration(), fs, testIn, FAMILY, QUALIFIER,
|
HFileTestUtil.createHFile(util.getConfiguration(), fs, testIn, FAMILY, QUALIFIER,
|
||||||
Bytes.toBytes("aaa"), Bytes.toBytes("zzz"), 1000);
|
Bytes.toBytes("aaa"), Bytes.toBytes("zzz"), 1000);
|
||||||
|
|
||||||
Path bottomOut = new Path(dir, "bottom.out");
|
Path bottomOut = new Path(dir, "bottom.out");
|
||||||
Path topOut = new Path(dir, "top.out");
|
Path topOut = new Path(dir, "top.out");
|
||||||
|
|
||||||
BulkLoadHFilesTool.splitStoreFile(util.getConfiguration(), testIn, familyDesc,
|
BulkLoadHFilesTool.splitStoreFile(
|
||||||
Bytes.toBytes("ggg"), bottomOut, topOut);
|
util.getAsyncConnection().getRegionLocator(TableName.valueOf(tableName)),
|
||||||
|
util.getConfiguration(), testIn, familyDesc, Bytes.toBytes("ggg"), bottomOut, topOut);
|
||||||
|
|
||||||
int rowCount = verifyHFile(bottomOut);
|
int rowCount = verifyHFile(bottomOut);
|
||||||
rowCount += verifyHFile(topOut);
|
rowCount += verifyHFile(topOut);
|
||||||
assertEquals(1000, rowCount);
|
assertEquals(1000, rowCount);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test hfile splits with the favored nodes
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testSplitStoreFileWithFavoriteNodes() throws IOException {
|
||||||
|
|
||||||
|
Path dir = new Path(util.getDefaultRootDirPath(), "testhfile");
|
||||||
|
FileSystem fs = util.getDFSCluster().getFileSystem();
|
||||||
|
|
||||||
|
Path testIn = new Path(dir, "testSplitStoreFileWithFavoriteNodes");
|
||||||
|
ColumnFamilyDescriptor familyDesc = ColumnFamilyDescriptorBuilder.of(FAMILY);
|
||||||
|
String tableName = tn.getMethodName();
|
||||||
|
Table table = util.createTable(TableName.valueOf(tableName), familyDesc.getNameAsString());
|
||||||
|
HFileTestUtil.createHFile(util.getConfiguration(), fs, testIn, FAMILY, QUALIFIER,
|
||||||
|
Bytes.toBytes("aaa"), Bytes.toBytes("zzz"), 1000);
|
||||||
|
|
||||||
|
Path bottomOut = new Path(dir, "bottom.out");
|
||||||
|
Path topOut = new Path(dir, "top.out");
|
||||||
|
|
||||||
|
final AsyncTableRegionLocator regionLocator =
|
||||||
|
util.getAsyncConnection().getRegionLocator(TableName.valueOf(tableName));
|
||||||
|
BulkLoadHFilesTool.splitStoreFile(regionLocator, util.getConfiguration(), testIn, familyDesc,
|
||||||
|
Bytes.toBytes("ggg"), bottomOut, topOut);
|
||||||
|
verifyHFileFavoriteNode(topOut, regionLocator, fs);
|
||||||
|
verifyHFileFavoriteNode(bottomOut, regionLocator, fs);
|
||||||
|
int rowCount = verifyHFile(bottomOut);
|
||||||
|
rowCount += verifyHFile(topOut);
|
||||||
|
assertEquals(1000, rowCount);
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testSplitStoreFileWithCreateTimeTS() throws IOException {
|
public void testSplitStoreFileWithCreateTimeTS() throws IOException {
|
||||||
Path dir = util.getDataTestDirOnTestFS("testSplitStoreFileWithCreateTimeTS");
|
Path dir = util.getDataTestDirOnTestFS("testSplitStoreFileWithCreateTimeTS");
|
||||||
FileSystem fs = util.getTestFileSystem();
|
FileSystem fs = util.getTestFileSystem();
|
||||||
Path testIn = new Path(dir, "testhfile");
|
Path testIn = new Path(dir, "testhfile");
|
||||||
ColumnFamilyDescriptor familyDesc = ColumnFamilyDescriptorBuilder.of(FAMILY);
|
ColumnFamilyDescriptor familyDesc = ColumnFamilyDescriptorBuilder.of(FAMILY);
|
||||||
|
String tableName = tn.getMethodName();
|
||||||
|
util.createTable(TableName.valueOf(tableName), familyDesc.getNameAsString());
|
||||||
HFileTestUtil.createHFile(util.getConfiguration(), fs, testIn, FAMILY, QUALIFIER,
|
HFileTestUtil.createHFile(util.getConfiguration(), fs, testIn, FAMILY, QUALIFIER,
|
||||||
Bytes.toBytes("aaa"), Bytes.toBytes("zzz"), 1000);
|
Bytes.toBytes("aaa"), Bytes.toBytes("zzz"), 1000);
|
||||||
|
|
||||||
Path bottomOut = new Path(dir, "bottom.out");
|
Path bottomOut = new Path(dir, "bottom.out");
|
||||||
Path topOut = new Path(dir, "top.out");
|
Path topOut = new Path(dir, "top.out");
|
||||||
|
|
||||||
BulkLoadHFilesTool.splitStoreFile(util.getConfiguration(), testIn, familyDesc,
|
BulkLoadHFilesTool.splitStoreFile(
|
||||||
Bytes.toBytes("ggg"), bottomOut, topOut);
|
util.getAsyncConnection().getRegionLocator(TableName.valueOf(tableName)),
|
||||||
|
util.getConfiguration(), testIn, familyDesc, Bytes.toBytes("ggg"), bottomOut, topOut);
|
||||||
|
|
||||||
verifyHFileCreateTimeTS(bottomOut);
|
verifyHFileCreateTimeTS(bottomOut);
|
||||||
verifyHFileCreateTimeTS(topOut);
|
verifyHFileCreateTimeTS(topOut);
|
||||||
|
@ -615,14 +659,17 @@ public class TestBulkLoadHFiles {
|
||||||
Path testIn = new Path(dir, "testhfile");
|
Path testIn = new Path(dir, "testhfile");
|
||||||
ColumnFamilyDescriptor familyDesc =
|
ColumnFamilyDescriptor familyDesc =
|
||||||
ColumnFamilyDescriptorBuilder.newBuilder(FAMILY).setDataBlockEncoding(cfEncoding).build();
|
ColumnFamilyDescriptorBuilder.newBuilder(FAMILY).setDataBlockEncoding(cfEncoding).build();
|
||||||
|
String tableName = tn.getMethodName();
|
||||||
|
util.createTable(TableName.valueOf(tableName), familyDesc.getNameAsString());
|
||||||
HFileTestUtil.createHFileWithDataBlockEncoding(util.getConfiguration(), fs, testIn,
|
HFileTestUtil.createHFileWithDataBlockEncoding(util.getConfiguration(), fs, testIn,
|
||||||
bulkloadEncoding, FAMILY, QUALIFIER, Bytes.toBytes("aaa"), Bytes.toBytes("zzz"), 1000);
|
bulkloadEncoding, FAMILY, QUALIFIER, Bytes.toBytes("aaa"), Bytes.toBytes("zzz"), 1000);
|
||||||
|
|
||||||
Path bottomOut = new Path(dir, "bottom.out");
|
Path bottomOut = new Path(dir, "bottom.out");
|
||||||
Path topOut = new Path(dir, "top.out");
|
Path topOut = new Path(dir, "top.out");
|
||||||
|
|
||||||
BulkLoadHFilesTool.splitStoreFile(util.getConfiguration(), testIn, familyDesc,
|
BulkLoadHFilesTool.splitStoreFile(
|
||||||
Bytes.toBytes("ggg"), bottomOut, topOut);
|
util.getAsyncConnection().getRegionLocator(TableName.valueOf(tableName)),
|
||||||
|
util.getConfiguration(), testIn, familyDesc, Bytes.toBytes("ggg"), bottomOut, topOut);
|
||||||
|
|
||||||
int rowCount = verifyHFile(bottomOut);
|
int rowCount = verifyHFile(bottomOut);
|
||||||
rowCount += verifyHFile(topOut);
|
rowCount += verifyHFile(topOut);
|
||||||
|
@ -654,6 +701,61 @@ public class TestBulkLoadHFiles {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* test split storefile with favorite node information
|
||||||
|
*/
|
||||||
|
private void verifyHFileFavoriteNode(Path p, AsyncTableRegionLocator regionLocator, FileSystem fs)
|
||||||
|
throws IOException {
|
||||||
|
Configuration conf = util.getConfiguration();
|
||||||
|
|
||||||
|
try (HFile.Reader reader = HFile.createReader(fs, p, new CacheConfig(conf), true, conf);) {
|
||||||
|
|
||||||
|
final byte[] firstRowkey = reader.getFirstRowKey().get();
|
||||||
|
final HRegionLocation hRegionLocation =
|
||||||
|
FutureUtils.get(regionLocator.getRegionLocation(firstRowkey));
|
||||||
|
|
||||||
|
final String targetHostName = hRegionLocation.getHostname();
|
||||||
|
|
||||||
|
if (fs instanceof DistributedFileSystem) {
|
||||||
|
String pathStr = p.toUri().getPath();
|
||||||
|
LocatedBlocks blocks =
|
||||||
|
((DistributedFileSystem) fs).getClient().getLocatedBlocks(pathStr, 0L);
|
||||||
|
|
||||||
|
boolean isFavoriteNode = false;
|
||||||
|
List<LocatedBlock> locatedBlocks = blocks.getLocatedBlocks();
|
||||||
|
int index = 0;
|
||||||
|
do {
|
||||||
|
if (index > 0) {
|
||||||
|
assertTrue("failed use favored nodes", isFavoriteNode);
|
||||||
|
}
|
||||||
|
isFavoriteNode = false;
|
||||||
|
final LocatedBlock block = locatedBlocks.get(index);
|
||||||
|
|
||||||
|
final DatanodeInfo[] locations = block.getLocations();
|
||||||
|
for (DatanodeInfo location : locations) {
|
||||||
|
|
||||||
|
final String hostName = location.getHostName();
|
||||||
|
if (
|
||||||
|
targetHostName.equals(hostName.equals("127.0.0.1")
|
||||||
|
? InetAddress.getLocalHost().getHostName()
|
||||||
|
: "127.0.0.1") || targetHostName.equals(hostName)
|
||||||
|
) {
|
||||||
|
isFavoriteNode = true;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
index++;
|
||||||
|
} while (index < locatedBlocks.size());
|
||||||
|
if (index > 0) {
|
||||||
|
assertTrue("failed use favored nodes", isFavoriteNode);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private void addStartEndKeysForTest(TreeMap<byte[], Integer> map, byte[] first, byte[] last) {
|
private void addStartEndKeysForTest(TreeMap<byte[], Integer> map, byte[] first, byte[] last) {
|
||||||
Integer value = map.containsKey(first) ? map.get(first) : 0;
|
Integer value = map.containsKey(first) ? map.get(first) : 0;
|
||||||
map.put(first, value + 1);
|
map.put(first, value + 1);
|
||||||
|
|
Loading…
Reference in New Issue