HBASE-24791 Improve HFileOutputFormat2 to avoid always call getTableRelativePath method (#2167)

Signed-off-by: Anoop <anoopsamjohn@apache.org>
Signed-off-by: Ted Yu <tyu@apache.org>
This commit is contained in:
YeChao Chen 2020-08-03 16:02:48 +08:00 committed by GitHub
parent 9a1bad84bf
commit 8e33bb04bc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 21 additions and 28 deletions

View File

@ -222,6 +222,7 @@ public class HFileOutputFormat2
private final Map<byte[], WriterLength> writers = new TreeMap<>(Bytes.BYTES_COMPARATOR); private final Map<byte[], WriterLength> writers = new TreeMap<>(Bytes.BYTES_COMPARATOR);
private final Map<byte[], byte[]> previousRows = new TreeMap<>(Bytes.BYTES_COMPARATOR); private final Map<byte[], byte[]> previousRows = new TreeMap<>(Bytes.BYTES_COMPARATOR);
private final long now = EnvironmentEdgeManager.currentTime(); private final long now = EnvironmentEdgeManager.currentTime();
private byte[] tableNameBytes = writeMultipleTables ? null : Bytes.toBytes(writeTableNames);
@Override @Override
public void write(ImmutableBytesWritable row, V cell) throws IOException { public void write(ImmutableBytesWritable row, V cell) throws IOException {
@ -235,7 +236,6 @@ public class HFileOutputFormat2
byte[] rowKey = CellUtil.cloneRow(kv); byte[] rowKey = CellUtil.cloneRow(kv);
int length = (PrivateCellUtil.estimatedSerializedSizeOf(kv)) - Bytes.SIZEOF_INT; int length = (PrivateCellUtil.estimatedSerializedSizeOf(kv)) - Bytes.SIZEOF_INT;
byte[] family = CellUtil.cloneFamily(kv); byte[] family = CellUtil.cloneFamily(kv);
byte[] tableNameBytes = null;
if (writeMultipleTables) { if (writeMultipleTables) {
tableNameBytes = MultiTableHFileOutputFormat.getTableName(row.get()); tableNameBytes = MultiTableHFileOutputFormat.getTableName(row.get());
tableNameBytes = TableName.valueOf(tableNameBytes).getNameWithNamespaceInclAsString() tableNameBytes = TableName.valueOf(tableNameBytes).getNameWithNamespaceInclAsString()
@ -244,11 +244,7 @@ public class HFileOutputFormat2
throw new IllegalArgumentException("TableName " + Bytes.toString(tableNameBytes) + throw new IllegalArgumentException("TableName " + Bytes.toString(tableNameBytes) +
" not expected"); " not expected");
} }
} else {
tableNameBytes = Bytes.toBytes(writeTableNames);
} }
String tableName = Bytes.toString(tableNameBytes);
Path tableRelPath = getTableRelativePath(tableNameBytes);
byte[] tableAndFamily = getTableNameSuffixedWithFamily(tableNameBytes, family); byte[] tableAndFamily = getTableNameSuffixedWithFamily(tableNameBytes, family);
WriterLength wl = this.writers.get(tableAndFamily); WriterLength wl = this.writers.get(tableAndFamily);
@ -257,9 +253,9 @@ public class HFileOutputFormat2
if (wl == null) { if (wl == null) {
Path writerPath = null; Path writerPath = null;
if (writeMultipleTables) { if (writeMultipleTables) {
writerPath = new Path(outputDir,new Path(tableRelPath, Bytes.toString(family))); Path tableRelPath = getTableRelativePath(tableNameBytes);
} writerPath = new Path(outputDir, new Path(tableRelPath, Bytes.toString(family)));
else { } else {
writerPath = new Path(outputDir, Bytes.toString(family)); writerPath = new Path(outputDir, Bytes.toString(family));
} }
fs.mkdirs(writerPath); fs.mkdirs(writerPath);
@ -274,39 +270,37 @@ public class HFileOutputFormat2
// create a new WAL writer, if necessary // create a new WAL writer, if necessary
if (wl == null || wl.writer == null) { if (wl == null || wl.writer == null) {
InetSocketAddress[] favoredNodes = null;
if (conf.getBoolean(LOCALITY_SENSITIVE_CONF_KEY, DEFAULT_LOCALITY_SENSITIVE)) { if (conf.getBoolean(LOCALITY_SENSITIVE_CONF_KEY, DEFAULT_LOCALITY_SENSITIVE)) {
HRegionLocation loc = null; HRegionLocation loc = null;
String tableName = Bytes.toString(tableNameBytes);
if (tableName != null) { if (tableName != null) {
try (Connection connection = ConnectionFactory.createConnection(conf); try (Connection connection = ConnectionFactory.createConnection(conf);
RegionLocator locator = RegionLocator locator =
connection.getRegionLocator(TableName.valueOf(tableName))) { connection.getRegionLocator(TableName.valueOf(tableName))) {
loc = locator.getRegionLocation(rowKey); loc = locator.getRegionLocation(rowKey);
} catch (Throwable e) { } catch (Throwable e) {
LOG.warn("Something wrong locating rowkey {} in {}", LOG.warn("Something wrong locating rowkey {} in {}", Bytes.toString(rowKey),
Bytes.toString(rowKey), tableName, e); tableName, e);
loc = null; loc = null;
} } }
}
if (null == loc) { if (null == loc) {
LOG.trace("Failed get of location, use default writer {}", Bytes.toString(rowKey)); LOG.trace("Failed get of location, use default writer {}", Bytes.toString(rowKey));
wl = getNewWriter(tableNameBytes, family, conf, null);
} else { } else {
LOG.debug("First rowkey: [{}]", Bytes.toString(rowKey)); LOG.debug("First rowkey: [{}]", Bytes.toString(rowKey));
InetSocketAddress initialIsa = InetSocketAddress initialIsa =
new InetSocketAddress(loc.getHostname(), loc.getPort()); new InetSocketAddress(loc.getHostname(), loc.getPort());
if (initialIsa.isUnresolved()) { if (initialIsa.isUnresolved()) {
LOG.trace("Failed resolve address {}, use default writer", loc.getHostnamePort()); LOG.trace("Failed resolve address {}, use default writer", loc.getHostnamePort());
wl = getNewWriter(tableNameBytes, family, conf, null);
} else { } else {
LOG.debug("Use favored nodes writer: {}", initialIsa.getHostString()); LOG.debug("Use favored nodes writer: {}", initialIsa.getHostString());
wl = getNewWriter(tableNameBytes, family, conf, new InetSocketAddress[] { initialIsa favoredNodes = new InetSocketAddress[] { initialIsa };
});
} }
} }
} else {
wl = getNewWriter(tableNameBytes, family, conf, null);
} }
wl = getNewWriter(tableNameBytes, family, conf, favoredNodes);
} }
// we now have the proper WAL writer. full steam ahead // we now have the proper WAL writer. full steam ahead
@ -321,9 +315,9 @@ public class HFileOutputFormat2
private Path getTableRelativePath(byte[] tableNameBytes) { private Path getTableRelativePath(byte[] tableNameBytes) {
String tableName = Bytes.toString(tableNameBytes); String tableName = Bytes.toString(tableNameBytes);
String[] tableNameParts = tableName.split(":"); String[] tableNameParts = tableName.split(":");
Path tableRelPath = new Path(tableName.split(":")[0]); Path tableRelPath = new Path(tableNameParts[0]);
if (tableNameParts.length > 1) { if (tableNameParts.length > 1) {
tableRelPath = new Path(tableRelPath, tableName.split(":")[1]); tableRelPath = new Path(tableRelPath, tableNameParts[1]);
} }
return tableRelPath; return tableRelPath;
} }
@ -376,16 +370,15 @@ public class HFileOutputFormat2
DataBlockEncoding encoding = overriddenEncoding; DataBlockEncoding encoding = overriddenEncoding;
encoding = encoding == null ? datablockEncodingMap.get(tableAndFamily) : encoding; encoding = encoding == null ? datablockEncodingMap.get(tableAndFamily) : encoding;
encoding = encoding == null ? DataBlockEncoding.NONE : encoding; encoding = encoding == null ? DataBlockEncoding.NONE : encoding;
HFileContextBuilder contextBuilder = new HFileContextBuilder() HFileContextBuilder contextBuilder = new HFileContextBuilder().withCompression(compression)
.withCompression(compression).withChecksumType(HStore.getChecksumType(conf)) .withDataBlockEncoding(encoding).withChecksumType(HStore.getChecksumType(conf))
.withBytesPerCheckSum(HStore.getBytesPerChecksum(conf)).withBlockSize(blockSize) .withBytesPerCheckSum(HStore.getBytesPerChecksum(conf)).withBlockSize(blockSize)
.withColumnFamily(family).withTableName(tableName); .withColumnFamily(family).withTableName(tableName);
if (HFile.getFormatVersion(conf) >= HFile.MIN_FORMAT_VERSION_WITH_TAGS) { if (HFile.getFormatVersion(conf) >= HFile.MIN_FORMAT_VERSION_WITH_TAGS) {
contextBuilder.withIncludesTags(true); contextBuilder.withIncludesTags(true);
} }
contextBuilder.withDataBlockEncoding(encoding);
HFileContext hFileContext = contextBuilder.build(); HFileContext hFileContext = contextBuilder.build();
if (null == favoredNodes) { if (null == favoredNodes) {
wl.writer = new StoreFileWriter.Builder(conf, CacheConfig.DISABLED, fs) wl.writer = new StoreFileWriter.Builder(conf, CacheConfig.DISABLED, fs)