HBASE-26598 Fix excessive connections in MajorCompactor (#3961)

Signed-off-by: Duo Zhang <zhangduo@apache.org>
This commit is contained in:
Samir Khan 2021-12-22 20:29:48 -06:00 committed by Duo Zhang
parent c6aac613a0
commit 55ddcbaaf5
6 changed files with 57 additions and 77 deletions

View File

@ -22,13 +22,11 @@ import java.util.Collection;
import java.util.List; import java.util.List;
import java.util.Optional; import java.util.Optional;
import java.util.Set; import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.regionserver.HRegionFileSystem; import org.apache.hadoop.hbase.regionserver.HRegionFileSystem;
import org.apache.hadoop.hbase.regionserver.StoreFileInfo; import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
@ -45,26 +43,26 @@ class MajorCompactionRequest {
private static final Logger LOG = LoggerFactory.getLogger(MajorCompactionRequest.class); private static final Logger LOG = LoggerFactory.getLogger(MajorCompactionRequest.class);
protected final Configuration configuration; protected final Connection connection;
protected final RegionInfo region; protected final RegionInfo region;
private Set<String> stores; private Set<String> stores;
MajorCompactionRequest(Configuration configuration, RegionInfo region) { MajorCompactionRequest(Connection connection, RegionInfo region) {
this.configuration = configuration; this.connection = connection;
this.region = region; this.region = region;
} }
MajorCompactionRequest(Configuration configuration, RegionInfo region, MajorCompactionRequest(Connection connection, RegionInfo region,
Set<String> stores) { Set<String> stores) {
this(configuration, region); this(connection, region);
this.stores = stores; this.stores = stores;
} }
static Optional<MajorCompactionRequest> newRequest(Configuration configuration, RegionInfo info, static Optional<MajorCompactionRequest> newRequest(Connection connection, RegionInfo info,
Set<String> stores, long timestamp) throws IOException { Set<String> stores, long timestamp) throws IOException {
MajorCompactionRequest request = MajorCompactionRequest request =
new MajorCompactionRequest(configuration, info, stores); new MajorCompactionRequest(connection, info, stores);
return request.createRequest(configuration, stores, timestamp); return request.createRequest(connection, stores, timestamp);
} }
RegionInfo getRegion() { RegionInfo getRegion() {
@ -79,20 +77,19 @@ class MajorCompactionRequest {
this.stores = stores; this.stores = stores;
} }
Optional<MajorCompactionRequest> createRequest(Configuration configuration, Optional<MajorCompactionRequest> createRequest(Connection connection,
Set<String> stores, long timestamp) throws IOException { Set<String> stores, long timestamp) throws IOException {
Set<String> familiesToCompact = getStoresRequiringCompaction(stores, timestamp); Set<String> familiesToCompact = getStoresRequiringCompaction(stores, timestamp);
MajorCompactionRequest request = null; MajorCompactionRequest request = null;
if (!familiesToCompact.isEmpty()) { if (!familiesToCompact.isEmpty()) {
request = new MajorCompactionRequest(configuration, region, familiesToCompact); request = new MajorCompactionRequest(connection, region, familiesToCompact);
} }
return Optional.ofNullable(request); return Optional.ofNullable(request);
} }
Set<String> getStoresRequiringCompaction(Set<String> requestedStores, long timestamp) Set<String> getStoresRequiringCompaction(Set<String> requestedStores, long timestamp)
throws IOException { throws IOException {
try(Connection connection = getConnection(configuration)) { HRegionFileSystem fileSystem = getFileSystem();
HRegionFileSystem fileSystem = getFileSystem(connection);
Set<String> familiesToCompact = Sets.newHashSet(); Set<String> familiesToCompact = Sets.newHashSet();
for (String family : requestedStores) { for (String family : requestedStores) {
if (shouldCFBeCompacted(fileSystem, family, timestamp)) { if (shouldCFBeCompacted(fileSystem, family, timestamp)) {
@ -101,7 +98,6 @@ class MajorCompactionRequest {
} }
return familiesToCompact; return familiesToCompact;
} }
}
boolean shouldCFBeCompacted(HRegionFileSystem fileSystem, String family, long ts) boolean shouldCFBeCompacted(HRegionFileSystem fileSystem, String family, long ts)
throws IOException { throws IOException {
@ -142,10 +138,6 @@ class MajorCompactionRequest {
return false; return false;
} }
Connection getConnection(Configuration configuration) throws IOException {
return ConnectionFactory.createConnection(configuration);
}
protected boolean familyHasReferenceFile(HRegionFileSystem fileSystem, String family, long ts) protected boolean familyHasReferenceFile(HRegionFileSystem fileSystem, String family, long ts)
throws IOException { throws IOException {
List<Path> referenceFiles = List<Path> referenceFiles =
@ -167,12 +159,13 @@ class MajorCompactionRequest {
return FSUtils.getReferenceFilePaths(fileSystem, familyDir); return FSUtils.getReferenceFilePaths(fileSystem, familyDir);
} }
HRegionFileSystem getFileSystem(Connection connection) throws IOException { HRegionFileSystem getFileSystem() throws IOException {
Admin admin = connection.getAdmin(); try (Admin admin = connection.getAdmin()) {
return HRegionFileSystem.openRegionFromFileSystem(admin.getConfiguration(), return HRegionFileSystem.openRegionFromFileSystem(admin.getConfiguration(),
CommonFSUtils.getCurrentFileSystem(admin.getConfiguration()), CommonFSUtils.getTableDir( CommonFSUtils.getCurrentFileSystem(admin.getConfiguration()),
CommonFSUtils.getRootDir(admin.getConfiguration()), region.getTable()), CommonFSUtils.getTableDir(CommonFSUtils.getRootDir(admin.getConfiguration()),
region, true); region.getTable()), region, true);
}
} }
@Override @Override

View File

@ -22,7 +22,6 @@ import java.io.IOException;
import java.util.Collection; import java.util.Collection;
import java.util.Map; import java.util.Map;
import java.util.Optional; import java.util.Optional;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.Connection;
@ -45,30 +44,29 @@ public class MajorCompactionTTLRequest extends MajorCompactionRequest {
private static final Logger LOG = LoggerFactory.getLogger(MajorCompactionTTLRequest.class); private static final Logger LOG = LoggerFactory.getLogger(MajorCompactionTTLRequest.class);
MajorCompactionTTLRequest(Configuration conf, RegionInfo region) { MajorCompactionTTLRequest(Connection connection, RegionInfo region) {
super(conf, region); super(connection, region);
} }
static Optional<MajorCompactionRequest> newRequest(Configuration conf, RegionInfo info, static Optional<MajorCompactionRequest> newRequest(Connection connection, RegionInfo info,
TableDescriptor htd) throws IOException { TableDescriptor htd) throws IOException {
MajorCompactionTTLRequest request = new MajorCompactionTTLRequest(conf, info); MajorCompactionTTLRequest request = new MajorCompactionTTLRequest(connection, info);
return request.createRequest(conf, htd); return request.createRequest(connection, htd);
} }
private Optional<MajorCompactionRequest> createRequest(Configuration conf, TableDescriptor htd) private Optional<MajorCompactionRequest> createRequest(Connection connection, TableDescriptor htd)
throws IOException { throws IOException {
Map<String, Long> familiesToCompact = getStoresRequiringCompaction(htd); Map<String, Long> familiesToCompact = getStoresRequiringCompaction(htd);
MajorCompactionRequest request = null; MajorCompactionRequest request = null;
if (!familiesToCompact.isEmpty()) { if (!familiesToCompact.isEmpty()) {
LOG.debug("Compaction families for region: " + region + " CF: " + familiesToCompact.keySet()); LOG.debug("Compaction families for region: " + region + " CF: " + familiesToCompact.keySet());
request = new MajorCompactionTTLRequest(conf, region); request = new MajorCompactionTTLRequest(connection, region);
} }
return Optional.ofNullable(request); return Optional.ofNullable(request);
} }
Map<String, Long> getStoresRequiringCompaction(TableDescriptor htd) throws IOException { Map<String, Long> getStoresRequiringCompaction(TableDescriptor htd) throws IOException {
try(Connection connection = getConnection(configuration)) { HRegionFileSystem fileSystem = getFileSystem();
HRegionFileSystem fileSystem = getFileSystem(connection);
Map<String, Long> familyTTLMap = Maps.newHashMap(); Map<String, Long> familyTTLMap = Maps.newHashMap();
for (ColumnFamilyDescriptor descriptor : htd.getColumnFamilies()) { for (ColumnFamilyDescriptor descriptor : htd.getColumnFamilies()) {
long ts = getColFamilyCutoffTime(descriptor); long ts = getColFamilyCutoffTime(descriptor);
@ -79,7 +77,6 @@ public class MajorCompactionTTLRequest extends MajorCompactionRequest {
} }
return familyTTLMap; return familyTTLMap;
} }
}
// If the CF has no TTL, return -1, else return the current time - TTL. // If the CF has no TTL, return -1, else return the current time - TTL.
private long getColFamilyCutoffTime(ColumnFamilyDescriptor colDesc) { private long getColFamilyCutoffTime(ColumnFamilyDescriptor colDesc) {

View File

@ -202,8 +202,7 @@ public class MajorCompactor extends Configured implements Tool {
protected Optional<MajorCompactionRequest> getMajorCompactionRequest(RegionInfo hri) protected Optional<MajorCompactionRequest> getMajorCompactionRequest(RegionInfo hri)
throws IOException { throws IOException {
return MajorCompactionRequest.newRequest(connection.getConfiguration(), hri, storesToCompact, return MajorCompactionRequest.newRequest(connection, hri, storesToCompact, timestamp);
timestamp);
} }
private Collection<ServerName> getServersToCompact(Set<ServerName> snSet) { private Collection<ServerName> getServersToCompact(Set<ServerName> snSet) {
@ -352,8 +351,7 @@ public class MajorCompactor extends Configured implements Tool {
for (HRegionLocation location : locations) { for (HRegionLocation location : locations) {
if (location.getRegion().getRegionId() > timestamp) { if (location.getRegion().getRegionId() > timestamp) {
Optional<MajorCompactionRequest> compactionRequest = MajorCompactionRequest Optional<MajorCompactionRequest> compactionRequest = MajorCompactionRequest
.newRequest(connection.getConfiguration(), location.getRegion(), storesToCompact, .newRequest(connection, location.getRegion(), storesToCompact, timestamp);
timestamp);
compactionRequest.ifPresent(request -> clusterCompactionQueues compactionRequest.ifPresent(request -> clusterCompactionQueues
.addToCompactionQueue(location.getServerName(), request)); .addToCompactionQueue(location.getServerName(), request));
} }

View File

@ -76,7 +76,7 @@ public class MajorCompactorTTL extends MajorCompactor {
@Override @Override
protected Optional<MajorCompactionRequest> getMajorCompactionRequest(RegionInfo hri) protected Optional<MajorCompactionRequest> getMajorCompactionRequest(RegionInfo hri)
throws IOException { throws IOException {
return MajorCompactionTTLRequest.newRequest(connection.getConfiguration(), hri, htd); return MajorCompactionTTLRequest.newRequest(connection, hri, htd);
} }
@Override @Override

View File

@ -21,7 +21,6 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.ArgumentMatchers.isA; import static org.mockito.ArgumentMatchers.isA;
import static org.mockito.Matchers.anyString; import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.doReturn;
@ -36,7 +35,6 @@ import java.util.Set;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import org.apache.commons.lang3.RandomStringUtils; import org.apache.commons.lang3.RandomStringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
@ -81,13 +79,13 @@ public class TestMajorCompactionRequest {
List<StoreFileInfo> storeFiles = mockStoreFiles(regionStoreDir, 5, 10); List<StoreFileInfo> storeFiles = mockStoreFiles(regionStoreDir, 5, 10);
MajorCompactionRequest request = makeMockRequest(storeFiles, false); MajorCompactionRequest request = makeMockRequest(storeFiles, false);
Optional<MajorCompactionRequest> result = Optional<MajorCompactionRequest> result =
request.createRequest(mock(Configuration.class), Sets.newHashSet(FAMILY), 100); request.createRequest(mock(Connection.class), Sets.newHashSet(FAMILY), 100);
assertTrue(result.isPresent()); assertTrue(result.isPresent());
// store files newer than timestamp // store files newer than timestamp
storeFiles = mockStoreFiles(regionStoreDir, 5, 101); storeFiles = mockStoreFiles(regionStoreDir, 5, 101);
request = makeMockRequest(storeFiles, false); request = makeMockRequest(storeFiles, false);
result = request.createRequest(mock(Configuration.class), Sets.newHashSet(FAMILY), 100); result = request.createRequest(mock(Connection.class), Sets.newHashSet(FAMILY), 100);
assertFalse(result.isPresent()); assertFalse(result.isPresent());
} }
@ -100,19 +98,18 @@ public class TestMajorCompactionRequest {
HRegion region = HRegion region =
HBaseTestingUtility.createRegionAndWAL(hri, rootRegionDir, UTILITY.getConfiguration(), htd); HBaseTestingUtility.createRegionAndWAL(hri, rootRegionDir, UTILITY.getConfiguration(), htd);
Configuration configuration = mock(Configuration.class); Connection connection = mock(Connection.class);
// the reference file timestamp is newer // the reference file timestamp is newer
List<StoreFileInfo> storeFiles = mockStoreFiles(regionStoreDir, 4, 101); List<StoreFileInfo> storeFiles = mockStoreFiles(regionStoreDir, 4, 101);
List<Path> paths = storeFiles.stream().map(StoreFileInfo::getPath).collect(Collectors.toList()); List<Path> paths = storeFiles.stream().map(StoreFileInfo::getPath).collect(Collectors.toList());
// the files that are referenced are older, thus we still compact. // the files that are referenced are older, thus we still compact.
HRegionFileSystem fileSystem = HRegionFileSystem fileSystem =
mockFileSystem(region.getRegionInfo(), true, storeFiles, 50); mockFileSystem(region.getRegionInfo(), true, storeFiles, 50);
MajorCompactionRequest majorCompactionRequest = spy(new MajorCompactionRequest(configuration, MajorCompactionRequest majorCompactionRequest = spy(new MajorCompactionRequest(connection,
region.getRegionInfo(), Sets.newHashSet(FAMILY))); region.getRegionInfo(), Sets.newHashSet(FAMILY)));
doReturn(mock(Connection.class)).when(majorCompactionRequest).getConnection(eq(configuration));
doReturn(paths).when(majorCompactionRequest).getReferenceFilePaths(any(FileSystem.class), doReturn(paths).when(majorCompactionRequest).getReferenceFilePaths(any(FileSystem.class),
any(Path.class)); any(Path.class));
doReturn(fileSystem).when(majorCompactionRequest).getFileSystem(any(Connection.class)); doReturn(fileSystem).when(majorCompactionRequest).getFileSystem();
Set<String> result = Set<String> result =
majorCompactionRequest.getStoresRequiringCompaction(Sets.newHashSet("a"), 100); majorCompactionRequest.getStoresRequiringCompaction(Sets.newHashSet("a"), 100);
assertEquals(FAMILY, Iterables.getOnlyElement(result)); assertEquals(FAMILY, Iterables.getOnlyElement(result));
@ -158,16 +155,15 @@ public class TestMajorCompactionRequest {
private MajorCompactionRequest makeMockRequest(List<StoreFileInfo> storeFiles, private MajorCompactionRequest makeMockRequest(List<StoreFileInfo> storeFiles,
boolean references) throws IOException { boolean references) throws IOException {
Configuration configuration = mock(Configuration.class); Connection connection = mock(Connection.class);
RegionInfo regionInfo = mock(RegionInfo.class); RegionInfo regionInfo = mock(RegionInfo.class);
when(regionInfo.getEncodedName()).thenReturn("HBase"); when(regionInfo.getEncodedName()).thenReturn("HBase");
when(regionInfo.getTable()).thenReturn(TableName.valueOf("foo")); when(regionInfo.getTable()).thenReturn(TableName.valueOf("foo"));
MajorCompactionRequest request = MajorCompactionRequest request =
new MajorCompactionRequest(configuration, regionInfo, Sets.newHashSet("a")); new MajorCompactionRequest(connection, regionInfo, Sets.newHashSet("a"));
MajorCompactionRequest spy = spy(request); MajorCompactionRequest spy = spy(request);
HRegionFileSystem fileSystem = mockFileSystem(regionInfo, references, storeFiles); HRegionFileSystem fileSystem = mockFileSystem(regionInfo, references, storeFiles);
doReturn(fileSystem).when(spy).getFileSystem(isA(Connection.class)); doReturn(fileSystem).when(spy).getFileSystem();
doReturn(mock(Connection.class)).when(spy).getConnection(eq(configuration));
return spy; return spy;
} }
} }

View File

@ -20,8 +20,6 @@ package org.apache.hadoop.hbase.util.compaction;
import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.eq;
import static org.mockito.Matchers.isA;
import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy; import static org.mockito.Mockito.spy;
@ -31,7 +29,6 @@ import java.io.IOException;
import java.util.List; import java.util.List;
import java.util.Optional; import java.util.Optional;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableName;
@ -72,29 +69,28 @@ public class TestMajorCompactionTTLRequest extends TestMajorCompactionRequest {
MajorCompactionTTLRequest request = makeMockRequest(storeFiles); MajorCompactionTTLRequest request = makeMockRequest(storeFiles);
// All files are <= 100, so region should not be compacted. // All files are <= 100, so region should not be compacted.
Optional<MajorCompactionRequest> result = Optional<MajorCompactionRequest> result =
request.createRequest(mock(Configuration.class), Sets.newHashSet(FAMILY), 10); request.createRequest(mock(Connection.class), Sets.newHashSet(FAMILY), 10);
assertFalse(result.isPresent()); assertFalse(result.isPresent());
// All files are <= 100, so region should not be compacted yet. // All files are <= 100, so region should not be compacted yet.
result = request.createRequest(mock(Configuration.class), Sets.newHashSet(FAMILY), 100); result = request.createRequest(mock(Connection.class), Sets.newHashSet(FAMILY), 100);
assertFalse(result.isPresent()); assertFalse(result.isPresent());
// All files are <= 100, so they should be considered for compaction // All files are <= 100, so they should be considered for compaction
result = request.createRequest(mock(Configuration.class), Sets.newHashSet(FAMILY), 101); result = request.createRequest(mock(Connection.class), Sets.newHashSet(FAMILY), 101);
assertTrue(result.isPresent()); assertTrue(result.isPresent());
} }
private MajorCompactionTTLRequest makeMockRequest(List<StoreFileInfo> storeFiles) private MajorCompactionTTLRequest makeMockRequest(List<StoreFileInfo> storeFiles)
throws IOException { throws IOException {
Configuration configuration = mock(Configuration.class); Connection connection = mock(Connection.class);
RegionInfo regionInfo = mock(RegionInfo.class); RegionInfo regionInfo = mock(RegionInfo.class);
when(regionInfo.getEncodedName()).thenReturn("HBase"); when(regionInfo.getEncodedName()).thenReturn("HBase");
when(regionInfo.getTable()).thenReturn(TableName.valueOf("foo")); when(regionInfo.getTable()).thenReturn(TableName.valueOf("foo"));
MajorCompactionTTLRequest request = new MajorCompactionTTLRequest(configuration, regionInfo); MajorCompactionTTLRequest request = new MajorCompactionTTLRequest(connection, regionInfo);
MajorCompactionTTLRequest spy = spy(request); MajorCompactionTTLRequest spy = spy(request);
HRegionFileSystem fileSystem = mockFileSystem(regionInfo, false, storeFiles); HRegionFileSystem fileSystem = mockFileSystem(regionInfo, false, storeFiles);
doReturn(fileSystem).when(spy).getFileSystem(isA(Connection.class)); doReturn(fileSystem).when(spy).getFileSystem();
doReturn(mock(Connection.class)).when(spy).getConnection(eq(configuration));
return spy; return spy;
} }
} }