HBASE-27060 Allow sharing connections between AggregationClient instances (#4566)

Signed-off-by: Duo Zhang <zhangduo@apache.org>
This commit is contained in:
Bryan Beaudreault 2022-06-24 10:47:33 -04:00 committed by GitHub
parent b1691a5318
commit 25c56caec8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 185 additions and 12 deletions

View File

@ -80,6 +80,7 @@ public class AggregationClient implements Closeable {
// TODO: This class is not used. Move to examples? // TODO: This class is not used. Move to examples?
private static final Logger log = LoggerFactory.getLogger(AggregationClient.class); private static final Logger log = LoggerFactory.getLogger(AggregationClient.class);
private final Connection connection; private final Connection connection;
private final boolean manageConnection;
/** /**
* An RpcController implementation for use here in this endpoint. * An RpcController implementation for use here in this endpoint.
@ -129,25 +130,73 @@ public class AggregationClient implements Closeable {
} }
/** /**
* Constructor with Conf object * Creates AggregationClient with no underlying Connection. Users of this constructor should limit
* @param cfg Configuration to use * themselves to methods here which take a {@link Table} argument, such as
* {@link #rowCount(Table, ColumnInterpreter, Scan)}. Use of methods which instead take a
* TableName, such as {@link #rowCount(TableName, ColumnInterpreter, Scan)}, will throw an
* IOException.
*/
public AggregationClient() {
this(null, false);
}
/**
* Creates AggregationClient using the passed in Connection, which will be used by methods taking
* a {@link TableName} to create the necessary {@link Table} for the call. The Connection is
* externally managed by the caller and will not be closed if {@link #close()} is called. There is
* no need to call {@link #close()} for AggregationClients created this way.
* @param connection the connection to use
*/
public AggregationClient(Connection connection) {
this(connection, false);
}
/**
* Creates AggregationClient with internally managed Connection, which will be used by methods
* taking a {@link TableName} to create the necessary {@link Table} for the call. The Connection
* will immediately be created will be closed when {@link #close()} is called. It's important to
* call {@link #close()} when done with this AggregationClient and to otherwise treat it as a
* shared Singleton.
* @param cfg Configuration to use to create connection
*/ */
public AggregationClient(Configuration cfg) { public AggregationClient(Configuration cfg) {
try {
// Create a connection on construction. Will use it making each of the calls below. // Create a connection on construction. Will use it making each of the calls below.
this.connection = ConnectionFactory.createConnection(cfg); this(createConnection(cfg), true);
}
private static Connection createConnection(Configuration cfg) {
try {
return ConnectionFactory.createConnection(cfg);
} catch (IOException e) { } catch (IOException e) {
throw new RuntimeException(e); throw new RuntimeException(e);
} }
} }
private AggregationClient(Connection connection, boolean manageConnection) {
this.connection = connection;
this.manageConnection = manageConnection;
}
@Override @Override
public void close() throws IOException { public void close() throws IOException {
if (this.connection != null && !this.connection.isClosed()) { if (manageConnection && this.connection != null && !this.connection.isClosed()) {
this.connection.close(); this.connection.close();
} }
} }
// visible for tests
boolean isClosed() {
return manageConnection && this.connection != null && this.connection.isClosed();
}
private Connection getConnection() throws IOException {
if (connection == null) {
throw new IOException(
"Connection not initialized. Use the correct constructor, or use the methods taking a Table");
}
return connection;
}
/** /**
* It gives the maximum value of a column for a given column family for the given range. In case * It gives the maximum value of a column for a given column family for the given range. In case
* qualifier is null, a max of all values for the given family is returned. * qualifier is null, a max of all values for the given family is returned.
@ -161,7 +210,7 @@ public class AggregationClient implements Closeable {
public <R, S, P extends Message, Q extends Message, T extends Message> R public <R, S, P extends Message, Q extends Message, T extends Message> R
max(final TableName tableName, final ColumnInterpreter<R, S, P, Q, T> ci, final Scan scan) max(final TableName tableName, final ColumnInterpreter<R, S, P, Q, T> ci, final Scan scan)
throws Throwable { throws Throwable {
try (Table table = connection.getTable(tableName)) { try (Table table = getConnection().getTable(tableName)) {
return max(table, ci, scan); return max(table, ci, scan);
} }
} }
@ -228,7 +277,7 @@ public class AggregationClient implements Closeable {
public <R, S, P extends Message, Q extends Message, T extends Message> R public <R, S, P extends Message, Q extends Message, T extends Message> R
min(final TableName tableName, final ColumnInterpreter<R, S, P, Q, T> ci, final Scan scan) min(final TableName tableName, final ColumnInterpreter<R, S, P, Q, T> ci, final Scan scan)
throws Throwable { throws Throwable {
try (Table table = connection.getTable(tableName)) { try (Table table = getConnection().getTable(tableName)) {
return min(table, ci, scan); return min(table, ci, scan);
} }
} }
@ -300,7 +349,7 @@ public class AggregationClient implements Closeable {
public <R, S, P extends Message, Q extends Message, T extends Message> long public <R, S, P extends Message, Q extends Message, T extends Message> long
rowCount(final TableName tableName, final ColumnInterpreter<R, S, P, Q, T> ci, final Scan scan) rowCount(final TableName tableName, final ColumnInterpreter<R, S, P, Q, T> ci, final Scan scan)
throws Throwable { throws Throwable {
try (Table table = connection.getTable(tableName)) { try (Table table = getConnection().getTable(tableName)) {
return rowCount(table, ci, scan); return rowCount(table, ci, scan);
} }
} }
@ -370,7 +419,7 @@ public class AggregationClient implements Closeable {
public <R, S, P extends Message, Q extends Message, T extends Message> S public <R, S, P extends Message, Q extends Message, T extends Message> S
sum(final TableName tableName, final ColumnInterpreter<R, S, P, Q, T> ci, final Scan scan) sum(final TableName tableName, final ColumnInterpreter<R, S, P, Q, T> ci, final Scan scan)
throws Throwable { throws Throwable {
try (Table table = connection.getTable(tableName)) { try (Table table = getConnection().getTable(tableName)) {
return sum(table, ci, scan); return sum(table, ci, scan);
} }
} }
@ -439,7 +488,7 @@ public class AggregationClient implements Closeable {
private <R, S, P extends Message, Q extends Message, T extends Message> Pair<S, Long> getAvgArgs( private <R, S, P extends Message, Q extends Message, T extends Message> Pair<S, Long> getAvgArgs(
final TableName tableName, final ColumnInterpreter<R, S, P, Q, T> ci, final Scan scan) final TableName tableName, final ColumnInterpreter<R, S, P, Q, T> ci, final Scan scan)
throws Throwable { throws Throwable {
try (Table table = connection.getTable(tableName)) { try (Table table = getConnection().getTable(tableName)) {
return getAvgArgs(table, ci, scan); return getAvgArgs(table, ci, scan);
} }
} }
@ -625,7 +674,7 @@ public class AggregationClient implements Closeable {
*/ */
public <R, S, P extends Message, Q extends Message, T extends Message> double std( public <R, S, P extends Message, Q extends Message, T extends Message> double std(
final TableName tableName, ColumnInterpreter<R, S, P, Q, T> ci, Scan scan) throws Throwable { final TableName tableName, ColumnInterpreter<R, S, P, Q, T> ci, Scan scan) throws Throwable {
try (Table table = connection.getTable(tableName)) { try (Table table = getConnection().getTable(tableName)) {
return std(table, ci, scan); return std(table, ci, scan);
} }
} }
@ -729,7 +778,7 @@ public class AggregationClient implements Closeable {
*/ */
public <R, S, P extends Message, Q extends Message, T extends Message> R median( public <R, S, P extends Message, Q extends Message, T extends Message> R median(
final TableName tableName, ColumnInterpreter<R, S, P, Q, T> ci, Scan scan) throws Throwable { final TableName tableName, ColumnInterpreter<R, S, P, Q, T> ci, Scan scan) throws Throwable {
try (Table table = connection.getTable(tableName)) { try (Table table = getConnection().getTable(tableName)) {
return median(table, ci, scan); return median(table, ci, scan);
} }
} }

View File

@ -0,0 +1,124 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client.coprocessor;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtil;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.coprocessor.AggregateImplementation;
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
import org.apache.hadoop.hbase.testclassification.CoprocessorTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@Category({ MediumTests.class, CoprocessorTests.class })
public class TestAggregationClient {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestAggregationClient.class);
private static final HBaseTestingUtil UTIL = new HBaseTestingUtil();
private static final TableName TABLE_NAME = TableName.valueOf("TestAggregationClient");
private static final byte[] CF = Bytes.toBytes("CF");
private static Connection CONN;
private static Table TABLE;
@BeforeClass
public static void setUp() throws Exception {
Configuration conf = UTIL.getConfiguration();
conf.setStrings(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY,
AggregateImplementation.class.getName());
UTIL.startMiniCluster(1);
UTIL.createTable(TABLE_NAME, CF);
CONN = ConnectionFactory.createConnection(conf);
TABLE = CONN.getTable(TABLE_NAME);
}
@AfterClass
public static void tearDown() throws Exception {
CONN.close();
UTIL.shutdownMiniCluster();
}
@Test
public void itCreatesConnectionless() throws Throwable {
AggregationClient client = new AggregationClient();
assertFalse(client.isClosed());
try {
client.rowCount(TABLE_NAME, new LongColumnInterpreter(), new Scan());
fail("Expected IOException");
} catch (Throwable e) {
assertTrue(e instanceof IOException);
assertTrue(e.getMessage().contains("Connection not initialized"));
}
client.rowCount(TABLE, new LongColumnInterpreter(), new Scan());
client.close();
assertFalse(CONN.isClosed());
assertFalse(client.isClosed());
}
@Test
public void itCreatesExternalConnection() throws Throwable {
AggregationClient client = new AggregationClient(CONN);
assertFalse(client.isClosed());
client.rowCount(TABLE_NAME, new LongColumnInterpreter(), new Scan());
client.rowCount(TABLE, new LongColumnInterpreter(), new Scan());
client.close();
assertFalse(CONN.isClosed());
assertFalse(client.isClosed());
}
@Test
public void itCreatesManagedConnection() throws Throwable {
AggregationClient client = new AggregationClient(CONN.getConfiguration());
assertFalse(client.isClosed());
client.rowCount(TABLE_NAME, new LongColumnInterpreter(), new Scan());
client.rowCount(TABLE, new LongColumnInterpreter(), new Scan());
client.close();
assertFalse(CONN.isClosed());
assertTrue(client.isClosed());
}
}