HBASE-26598 Fix excessive connections in MajorCompactor (#3961)
Signed-off-by: Duo Zhang <zhangduo@apache.org>
This commit is contained in:
parent
f16b7b1bfa
commit
bf258cd68f
|
@ -22,13 +22,11 @@ import java.util.Collection;
|
|||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.client.Admin;
|
||||
import org.apache.hadoop.hbase.client.Connection;
|
||||
import org.apache.hadoop.hbase.client.ConnectionFactory;
|
||||
import org.apache.hadoop.hbase.client.RegionInfo;
|
||||
import org.apache.hadoop.hbase.regionserver.HRegionFileSystem;
|
||||
import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
|
||||
|
@ -45,26 +43,26 @@ class MajorCompactionRequest {
|
|||
|
||||
private static final Logger LOG = LoggerFactory.getLogger(MajorCompactionRequest.class);
|
||||
|
||||
protected final Configuration configuration;
|
||||
protected final Connection connection;
|
||||
protected final RegionInfo region;
|
||||
private Set<String> stores;
|
||||
|
||||
MajorCompactionRequest(Configuration configuration, RegionInfo region) {
|
||||
this.configuration = configuration;
|
||||
MajorCompactionRequest(Connection connection, RegionInfo region) {
|
||||
this.connection = connection;
|
||||
this.region = region;
|
||||
}
|
||||
|
||||
MajorCompactionRequest(Configuration configuration, RegionInfo region,
|
||||
MajorCompactionRequest(Connection connection, RegionInfo region,
|
||||
Set<String> stores) {
|
||||
this(configuration, region);
|
||||
this(connection, region);
|
||||
this.stores = stores;
|
||||
}
|
||||
|
||||
static Optional<MajorCompactionRequest> newRequest(Configuration configuration, RegionInfo info,
|
||||
static Optional<MajorCompactionRequest> newRequest(Connection connection, RegionInfo info,
|
||||
Set<String> stores, long timestamp) throws IOException {
|
||||
MajorCompactionRequest request =
|
||||
new MajorCompactionRequest(configuration, info, stores);
|
||||
return request.createRequest(configuration, stores, timestamp);
|
||||
new MajorCompactionRequest(connection, info, stores);
|
||||
return request.createRequest(connection, stores, timestamp);
|
||||
}
|
||||
|
||||
RegionInfo getRegion() {
|
||||
|
@ -79,28 +77,26 @@ class MajorCompactionRequest {
|
|||
this.stores = stores;
|
||||
}
|
||||
|
||||
Optional<MajorCompactionRequest> createRequest(Configuration configuration,
|
||||
Optional<MajorCompactionRequest> createRequest(Connection connection,
|
||||
Set<String> stores, long timestamp) throws IOException {
|
||||
Set<String> familiesToCompact = getStoresRequiringCompaction(stores, timestamp);
|
||||
MajorCompactionRequest request = null;
|
||||
if (!familiesToCompact.isEmpty()) {
|
||||
request = new MajorCompactionRequest(configuration, region, familiesToCompact);
|
||||
request = new MajorCompactionRequest(connection, region, familiesToCompact);
|
||||
}
|
||||
return Optional.ofNullable(request);
|
||||
}
|
||||
|
||||
Set<String> getStoresRequiringCompaction(Set<String> requestedStores, long timestamp)
|
||||
throws IOException {
|
||||
try(Connection connection = getConnection(configuration)) {
|
||||
HRegionFileSystem fileSystem = getFileSystem(connection);
|
||||
Set<String> familiesToCompact = Sets.newHashSet();
|
||||
for (String family : requestedStores) {
|
||||
if (shouldCFBeCompacted(fileSystem, family, timestamp)) {
|
||||
familiesToCompact.add(family);
|
||||
}
|
||||
HRegionFileSystem fileSystem = getFileSystem();
|
||||
Set<String> familiesToCompact = Sets.newHashSet();
|
||||
for (String family : requestedStores) {
|
||||
if (shouldCFBeCompacted(fileSystem, family, timestamp)) {
|
||||
familiesToCompact.add(family);
|
||||
}
|
||||
return familiesToCompact;
|
||||
}
|
||||
return familiesToCompact;
|
||||
}
|
||||
|
||||
boolean shouldCFBeCompacted(HRegionFileSystem fileSystem, String family, long ts)
|
||||
|
@ -141,10 +137,6 @@ class MajorCompactionRequest {
|
|||
return false;
|
||||
}
|
||||
|
||||
Connection getConnection(Configuration configuration) throws IOException {
|
||||
return ConnectionFactory.createConnection(configuration);
|
||||
}
|
||||
|
||||
protected boolean familyHasReferenceFile(HRegionFileSystem fileSystem, String family, long ts)
|
||||
throws IOException {
|
||||
List<Path> referenceFiles =
|
||||
|
@ -166,12 +158,13 @@ class MajorCompactionRequest {
|
|||
return FSUtils.getReferenceFilePaths(fileSystem, familyDir);
|
||||
}
|
||||
|
||||
HRegionFileSystem getFileSystem(Connection connection) throws IOException {
|
||||
Admin admin = connection.getAdmin();
|
||||
return HRegionFileSystem.openRegionFromFileSystem(admin.getConfiguration(),
|
||||
CommonFSUtils.getCurrentFileSystem(admin.getConfiguration()), CommonFSUtils.getTableDir(
|
||||
CommonFSUtils.getRootDir(admin.getConfiguration()), region.getTable()),
|
||||
region, true);
|
||||
HRegionFileSystem getFileSystem() throws IOException {
|
||||
try (Admin admin = connection.getAdmin()) {
|
||||
return HRegionFileSystem.openRegionFromFileSystem(admin.getConfiguration(),
|
||||
CommonFSUtils.getCurrentFileSystem(admin.getConfiguration()),
|
||||
CommonFSUtils.getTableDir(CommonFSUtils.getRootDir(admin.getConfiguration()),
|
||||
region.getTable()), region, true);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -22,7 +22,6 @@ import java.io.IOException;
|
|||
import java.util.Collection;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
|
||||
import org.apache.hadoop.hbase.client.Connection;
|
||||
|
@ -45,40 +44,38 @@ public class MajorCompactionTTLRequest extends MajorCompactionRequest {
|
|||
|
||||
private static final Logger LOG = LoggerFactory.getLogger(MajorCompactionTTLRequest.class);
|
||||
|
||||
MajorCompactionTTLRequest(Configuration conf, RegionInfo region) {
|
||||
super(conf, region);
|
||||
MajorCompactionTTLRequest(Connection connection, RegionInfo region) {
|
||||
super(connection, region);
|
||||
}
|
||||
|
||||
static Optional<MajorCompactionRequest> newRequest(Configuration conf, RegionInfo info,
|
||||
static Optional<MajorCompactionRequest> newRequest(Connection connection, RegionInfo info,
|
||||
TableDescriptor htd) throws IOException {
|
||||
MajorCompactionTTLRequest request = new MajorCompactionTTLRequest(conf, info);
|
||||
return request.createRequest(conf, htd);
|
||||
MajorCompactionTTLRequest request = new MajorCompactionTTLRequest(connection, info);
|
||||
return request.createRequest(connection, htd);
|
||||
}
|
||||
|
||||
private Optional<MajorCompactionRequest> createRequest(Configuration conf, TableDescriptor htd)
|
||||
private Optional<MajorCompactionRequest> createRequest(Connection connection, TableDescriptor htd)
|
||||
throws IOException {
|
||||
Map<String, Long> familiesToCompact = getStoresRequiringCompaction(htd);
|
||||
MajorCompactionRequest request = null;
|
||||
if (!familiesToCompact.isEmpty()) {
|
||||
LOG.debug("Compaction families for region: " + region + " CF: " + familiesToCompact.keySet());
|
||||
request = new MajorCompactionTTLRequest(conf, region);
|
||||
request = new MajorCompactionTTLRequest(connection, region);
|
||||
}
|
||||
return Optional.ofNullable(request);
|
||||
}
|
||||
|
||||
Map<String, Long> getStoresRequiringCompaction(TableDescriptor htd) throws IOException {
|
||||
try(Connection connection = getConnection(configuration)) {
|
||||
HRegionFileSystem fileSystem = getFileSystem(connection);
|
||||
Map<String, Long> familyTTLMap = Maps.newHashMap();
|
||||
for (ColumnFamilyDescriptor descriptor : htd.getColumnFamilies()) {
|
||||
long ts = getColFamilyCutoffTime(descriptor);
|
||||
// If the table's TTL is forever, lets not compact any of the regions.
|
||||
if (ts > 0 && shouldCFBeCompacted(fileSystem, descriptor.getNameAsString(), ts)) {
|
||||
familyTTLMap.put(descriptor.getNameAsString(), ts);
|
||||
}
|
||||
HRegionFileSystem fileSystem = getFileSystem();
|
||||
Map<String, Long> familyTTLMap = Maps.newHashMap();
|
||||
for (ColumnFamilyDescriptor descriptor : htd.getColumnFamilies()) {
|
||||
long ts = getColFamilyCutoffTime(descriptor);
|
||||
// If the table's TTL is forever, lets not compact any of the regions.
|
||||
if (ts > 0 && shouldCFBeCompacted(fileSystem, descriptor.getNameAsString(), ts)) {
|
||||
familyTTLMap.put(descriptor.getNameAsString(), ts);
|
||||
}
|
||||
return familyTTLMap;
|
||||
}
|
||||
return familyTTLMap;
|
||||
}
|
||||
|
||||
// If the CF has no TTL, return -1, else return the current time - TTL.
|
||||
|
|
|
@ -202,8 +202,7 @@ public class MajorCompactor extends Configured implements Tool {
|
|||
|
||||
protected Optional<MajorCompactionRequest> getMajorCompactionRequest(RegionInfo hri)
|
||||
throws IOException {
|
||||
return MajorCompactionRequest.newRequest(connection.getConfiguration(), hri, storesToCompact,
|
||||
timestamp);
|
||||
return MajorCompactionRequest.newRequest(connection, hri, storesToCompact, timestamp);
|
||||
}
|
||||
|
||||
private Collection<ServerName> getServersToCompact(Set<ServerName> snSet) {
|
||||
|
@ -352,8 +351,7 @@ public class MajorCompactor extends Configured implements Tool {
|
|||
for (HRegionLocation location : locations) {
|
||||
if (location.getRegion().getRegionId() > timestamp) {
|
||||
Optional<MajorCompactionRequest> compactionRequest = MajorCompactionRequest
|
||||
.newRequest(connection.getConfiguration(), location.getRegion(), storesToCompact,
|
||||
timestamp);
|
||||
.newRequest(connection, location.getRegion(), storesToCompact, timestamp);
|
||||
compactionRequest.ifPresent(request -> clusterCompactionQueues
|
||||
.addToCompactionQueue(location.getServerName(), request));
|
||||
}
|
||||
|
|
|
@ -76,7 +76,7 @@ public class MajorCompactorTTL extends MajorCompactor {
|
|||
@Override
|
||||
protected Optional<MajorCompactionRequest> getMajorCompactionRequest(RegionInfo hri)
|
||||
throws IOException {
|
||||
return MajorCompactionTTLRequest.newRequest(connection.getConfiguration(), hri, htd);
|
||||
return MajorCompactionTTLRequest.newRequest(connection, hri, htd);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -171,4 +171,4 @@ public class MajorCompactorTTL extends MajorCompactor {
|
|||
public static void main(String[] args) throws Exception {
|
||||
ToolRunner.run(HBaseConfiguration.create(), new MajorCompactorTTL(), args);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -21,7 +21,6 @@ import static org.junit.Assert.assertEquals;
|
|||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.mockito.ArgumentMatchers.any;
|
||||
import static org.mockito.ArgumentMatchers.eq;
|
||||
import static org.mockito.ArgumentMatchers.isA;
|
||||
import static org.mockito.Matchers.anyString;
|
||||
import static org.mockito.Mockito.doReturn;
|
||||
|
@ -36,7 +35,6 @@ import java.util.Set;
|
|||
import java.util.stream.Collectors;
|
||||
|
||||
import org.apache.commons.lang3.RandomStringUtils;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
|
@ -81,13 +79,13 @@ public class TestMajorCompactionRequest {
|
|||
List<StoreFileInfo> storeFiles = mockStoreFiles(regionStoreDir, 5, 10);
|
||||
MajorCompactionRequest request = makeMockRequest(storeFiles, false);
|
||||
Optional<MajorCompactionRequest> result =
|
||||
request.createRequest(mock(Configuration.class), Sets.newHashSet(FAMILY), 100);
|
||||
request.createRequest(mock(Connection.class), Sets.newHashSet(FAMILY), 100);
|
||||
assertTrue(result.isPresent());
|
||||
|
||||
// store files newer than timestamp
|
||||
storeFiles = mockStoreFiles(regionStoreDir, 5, 101);
|
||||
request = makeMockRequest(storeFiles, false);
|
||||
result = request.createRequest(mock(Configuration.class), Sets.newHashSet(FAMILY), 100);
|
||||
result = request.createRequest(mock(Connection.class), Sets.newHashSet(FAMILY), 100);
|
||||
assertFalse(result.isPresent());
|
||||
}
|
||||
|
||||
|
@ -100,19 +98,18 @@ public class TestMajorCompactionRequest {
|
|||
HRegion region =
|
||||
HBaseTestingUtil.createRegionAndWAL(hri, rootRegionDir, UTILITY.getConfiguration(), htd);
|
||||
|
||||
Configuration configuration = mock(Configuration.class);
|
||||
Connection connection = mock(Connection.class);
|
||||
// the reference file timestamp is newer
|
||||
List<StoreFileInfo> storeFiles = mockStoreFiles(regionStoreDir, 4, 101);
|
||||
List<Path> paths = storeFiles.stream().map(StoreFileInfo::getPath).collect(Collectors.toList());
|
||||
// the files that are referenced are older, thus we still compact.
|
||||
HRegionFileSystem fileSystem =
|
||||
mockFileSystem(region.getRegionInfo(), true, storeFiles, 50);
|
||||
MajorCompactionRequest majorCompactionRequest = spy(new MajorCompactionRequest(configuration,
|
||||
MajorCompactionRequest majorCompactionRequest = spy(new MajorCompactionRequest(connection,
|
||||
region.getRegionInfo(), Sets.newHashSet(FAMILY)));
|
||||
doReturn(mock(Connection.class)).when(majorCompactionRequest).getConnection(eq(configuration));
|
||||
doReturn(paths).when(majorCompactionRequest).getReferenceFilePaths(any(FileSystem.class),
|
||||
any(Path.class));
|
||||
doReturn(fileSystem).when(majorCompactionRequest).getFileSystem(any(Connection.class));
|
||||
doReturn(fileSystem).when(majorCompactionRequest).getFileSystem();
|
||||
Set<String> result =
|
||||
majorCompactionRequest.getStoresRequiringCompaction(Sets.newHashSet("a"), 100);
|
||||
assertEquals(FAMILY, Iterables.getOnlyElement(result));
|
||||
|
@ -158,16 +155,15 @@ public class TestMajorCompactionRequest {
|
|||
|
||||
private MajorCompactionRequest makeMockRequest(List<StoreFileInfo> storeFiles,
|
||||
boolean references) throws IOException {
|
||||
Configuration configuration = mock(Configuration.class);
|
||||
Connection connection = mock(Connection.class);
|
||||
RegionInfo regionInfo = mock(RegionInfo.class);
|
||||
when(regionInfo.getEncodedName()).thenReturn("HBase");
|
||||
when(regionInfo.getTable()).thenReturn(TableName.valueOf("foo"));
|
||||
MajorCompactionRequest request =
|
||||
new MajorCompactionRequest(configuration, regionInfo, Sets.newHashSet("a"));
|
||||
new MajorCompactionRequest(connection, regionInfo, Sets.newHashSet("a"));
|
||||
MajorCompactionRequest spy = spy(request);
|
||||
HRegionFileSystem fileSystem = mockFileSystem(regionInfo, references, storeFiles);
|
||||
doReturn(fileSystem).when(spy).getFileSystem(isA(Connection.class));
|
||||
doReturn(mock(Connection.class)).when(spy).getConnection(eq(configuration));
|
||||
doReturn(fileSystem).when(spy).getFileSystem();
|
||||
return spy;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -20,8 +20,6 @@ package org.apache.hadoop.hbase.util.compaction;
|
|||
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.mockito.Matchers.eq;
|
||||
import static org.mockito.Matchers.isA;
|
||||
import static org.mockito.Mockito.doReturn;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.spy;
|
||||
|
@ -31,7 +29,6 @@ import java.io.IOException;
|
|||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
|
@ -72,29 +69,28 @@ public class TestMajorCompactionTTLRequest extends TestMajorCompactionRequest {
|
|||
MajorCompactionTTLRequest request = makeMockRequest(storeFiles);
|
||||
// All files are <= 100, so region should not be compacted.
|
||||
Optional<MajorCompactionRequest> result =
|
||||
request.createRequest(mock(Configuration.class), Sets.newHashSet(FAMILY), 10);
|
||||
request.createRequest(mock(Connection.class), Sets.newHashSet(FAMILY), 10);
|
||||
assertFalse(result.isPresent());
|
||||
|
||||
// All files are <= 100, so region should not be compacted yet.
|
||||
result = request.createRequest(mock(Configuration.class), Sets.newHashSet(FAMILY), 100);
|
||||
result = request.createRequest(mock(Connection.class), Sets.newHashSet(FAMILY), 100);
|
||||
assertFalse(result.isPresent());
|
||||
|
||||
// All files are <= 100, so they should be considered for compaction
|
||||
result = request.createRequest(mock(Configuration.class), Sets.newHashSet(FAMILY), 101);
|
||||
result = request.createRequest(mock(Connection.class), Sets.newHashSet(FAMILY), 101);
|
||||
assertTrue(result.isPresent());
|
||||
}
|
||||
|
||||
private MajorCompactionTTLRequest makeMockRequest(List<StoreFileInfo> storeFiles)
|
||||
throws IOException {
|
||||
Configuration configuration = mock(Configuration.class);
|
||||
Connection connection = mock(Connection.class);
|
||||
RegionInfo regionInfo = mock(RegionInfo.class);
|
||||
when(regionInfo.getEncodedName()).thenReturn("HBase");
|
||||
when(regionInfo.getTable()).thenReturn(TableName.valueOf("foo"));
|
||||
MajorCompactionTTLRequest request = new MajorCompactionTTLRequest(configuration, regionInfo);
|
||||
MajorCompactionTTLRequest request = new MajorCompactionTTLRequest(connection, regionInfo);
|
||||
MajorCompactionTTLRequest spy = spy(request);
|
||||
HRegionFileSystem fileSystem = mockFileSystem(regionInfo, false, storeFiles);
|
||||
doReturn(fileSystem).when(spy).getFileSystem(isA(Connection.class));
|
||||
doReturn(mock(Connection.class)).when(spy).getConnection(eq(configuration));
|
||||
doReturn(fileSystem).when(spy).getFileSystem();
|
||||
return spy;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue