HBASE-18448 Added refresh HFiles coprocessor endpoint
Signed-off-by: anoopsamjohn <anoopsamjohn@gmail.com>
This commit is contained in:
parent
08212e50ff
commit
e5a8f162a2
|
@ -0,0 +1,95 @@
|
||||||
|
/*
|
||||||
|
*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.hbase.client.example;
|
||||||
|
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.hbase.HConstants;
|
||||||
|
import org.apache.hadoop.hbase.TableName;
|
||||||
|
import org.apache.hadoop.hbase.client.Connection;
|
||||||
|
import org.apache.hadoop.hbase.client.ConnectionFactory;
|
||||||
|
import org.apache.hadoop.hbase.client.Table;
|
||||||
|
import org.apache.hadoop.hbase.client.coprocessor.Batch;
|
||||||
|
import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils.BlockingRpcCallback;
|
||||||
|
import org.apache.hadoop.hbase.ipc.ServerRpcController;
|
||||||
|
import org.apache.hadoop.hbase.protobuf.generated.RefreshHFilesProtos;
|
||||||
|
|
||||||
|
import java.io.Closeable;
|
||||||
|
import java.io.IOException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This client class is for invoking the refresh HFile function deployed on the
|
||||||
|
* Region Server side via the RefreshHFilesService.
|
||||||
|
*/
|
||||||
|
public class RefreshHFilesClient implements Closeable {
|
||||||
|
private static final Log LOG = LogFactory.getLog(RefreshHFilesClient.class);
|
||||||
|
private final Connection connection;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Constructor with Conf object
|
||||||
|
*
|
||||||
|
* @param cfg
|
||||||
|
*/
|
||||||
|
public RefreshHFilesClient(Configuration cfg) {
|
||||||
|
try {
|
||||||
|
this.connection = ConnectionFactory.createConnection(cfg);
|
||||||
|
} catch (IOException e) {
|
||||||
|
throw new RuntimeException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void close() throws IOException {
|
||||||
|
if (this.connection != null && !this.connection.isClosed()) {
|
||||||
|
this.connection.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public void refreshHFiles(final TableName tableName) throws Throwable {
|
||||||
|
try (Table table = connection.getTable(tableName)) {
|
||||||
|
refreshHFiles(table);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public void refreshHFiles(final Table table) throws Throwable {
|
||||||
|
final RefreshHFilesProtos.RefreshHFilesRequest request = RefreshHFilesProtos.RefreshHFilesRequest
|
||||||
|
.getDefaultInstance();
|
||||||
|
table.coprocessorService(RefreshHFilesProtos.RefreshHFilesService.class, HConstants.EMPTY_START_ROW,
|
||||||
|
HConstants.EMPTY_END_ROW,
|
||||||
|
new Batch.Call<RefreshHFilesProtos.RefreshHFilesService,
|
||||||
|
RefreshHFilesProtos.RefreshHFilesResponse>() {
|
||||||
|
@Override
|
||||||
|
public RefreshHFilesProtos.RefreshHFilesResponse call(
|
||||||
|
RefreshHFilesProtos.RefreshHFilesService refreshHFilesService)
|
||||||
|
throws IOException {
|
||||||
|
ServerRpcController controller = new ServerRpcController();
|
||||||
|
BlockingRpcCallback<RefreshHFilesProtos.RefreshHFilesResponse> rpcCallback =
|
||||||
|
new BlockingRpcCallback<>();
|
||||||
|
refreshHFilesService.refreshHFiles(controller, request, rpcCallback);
|
||||||
|
if (controller.failedOnException()) {
|
||||||
|
throw controller.getFailedOn();
|
||||||
|
}
|
||||||
|
return rpcCallback.get();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
LOG.debug("Done refreshing HFiles");
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,86 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.hbase.coprocessor.example;
|
||||||
|
|
||||||
|
import com.google.protobuf.RpcCallback;
|
||||||
|
import com.google.protobuf.RpcController;
|
||||||
|
import com.google.protobuf.Service;
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
import org.apache.hadoop.hbase.Coprocessor;
|
||||||
|
import org.apache.hadoop.hbase.CoprocessorEnvironment;
|
||||||
|
import org.apache.hadoop.hbase.coprocessor.CoprocessorException;
|
||||||
|
import org.apache.hadoop.hbase.coprocessor.CoprocessorService;
|
||||||
|
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
|
||||||
|
import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils;
|
||||||
|
import org.apache.hadoop.hbase.protobuf.generated.RefreshHFilesProtos;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.Store;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Coprocessor endpoint to refresh HFiles on replica.
|
||||||
|
* <p>
|
||||||
|
* <p>
|
||||||
|
* For the protocol buffer definition of the RefreshHFilesService, see the source file located under
|
||||||
|
* hbase-protocol/src/main/protobuf/RefreshHFiles.proto.
|
||||||
|
* </p>
|
||||||
|
*/
|
||||||
|
public class RefreshHFilesEndpoint extends RefreshHFilesProtos.RefreshHFilesService
|
||||||
|
implements Coprocessor, CoprocessorService {
|
||||||
|
protected static final Log LOG = LogFactory.getLog(RefreshHFilesEndpoint.class);
|
||||||
|
private RegionCoprocessorEnvironment env;
|
||||||
|
|
||||||
|
public RefreshHFilesEndpoint() {
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Service getService() {
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void refreshHFiles(RpcController controller, RefreshHFilesProtos.RefreshHFilesRequest request,
|
||||||
|
RpcCallback<RefreshHFilesProtos.RefreshHFilesResponse> done) {
|
||||||
|
try {
|
||||||
|
for (Store store : env.getRegion().getStores()) {
|
||||||
|
LOG.debug("Refreshing HFiles for region: " + store.getRegionInfo().getRegionNameAsString() +
|
||||||
|
" and store: " + store.getColumnFamilyName() + "class:" + store.getClass());
|
||||||
|
store.refreshStoreFiles();
|
||||||
|
}
|
||||||
|
} catch (IOException ioe) {
|
||||||
|
LOG.error("Exception while trying to refresh store files: ", ioe);
|
||||||
|
CoprocessorRpcUtils.setControllerException(controller, ioe);
|
||||||
|
}
|
||||||
|
done.run(RefreshHFilesProtos.RefreshHFilesResponse.getDefaultInstance());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void start(CoprocessorEnvironment env) throws IOException {
|
||||||
|
if (env instanceof RegionCoprocessorEnvironment) {
|
||||||
|
this.env = (RegionCoprocessorEnvironment) env;
|
||||||
|
} else {
|
||||||
|
throw new CoprocessorException("Must be loaded on a table region!");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void stop(CoprocessorEnvironment env) throws IOException {
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,36 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package hbase.pb;
|
||||||
|
|
||||||
|
option java_package = "org.apache.hadoop.hbase.protobuf.generated";
|
||||||
|
option java_outer_classname = "RefreshHFilesProtos";
|
||||||
|
option java_generic_services = true;
|
||||||
|
option java_generate_equals_and_hash = true;
|
||||||
|
option optimize_for = SPEED;
|
||||||
|
|
||||||
|
message RefreshHFilesRequest {
|
||||||
|
}
|
||||||
|
|
||||||
|
message RefreshHFilesResponse {
|
||||||
|
}
|
||||||
|
|
||||||
|
service RefreshHFilesService {
|
||||||
|
rpc refreshHFiles(RefreshHFilesRequest)
|
||||||
|
returns (RefreshHFilesResponse);
|
||||||
|
}
|
|
@ -0,0 +1,177 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
* <p>
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
* <p>
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.hbase.coprocessor.example;
|
||||||
|
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||||
|
import org.apache.hadoop.hbase.HColumnDescriptor;
|
||||||
|
import org.apache.hadoop.hbase.HConstants;
|
||||||
|
import org.apache.hadoop.hbase.HRegionInfo;
|
||||||
|
import org.apache.hadoop.hbase.MiniHBaseCluster;
|
||||||
|
import org.apache.hadoop.hbase.TableName;
|
||||||
|
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
|
||||||
|
import org.apache.hadoop.hbase.client.RetriesExhaustedException;
|
||||||
|
import org.apache.hadoop.hbase.client.Table;
|
||||||
|
import org.apache.hadoop.hbase.client.TableDescriptor;
|
||||||
|
import org.apache.hadoop.hbase.client.example.RefreshHFilesClient;
|
||||||
|
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
|
||||||
|
import org.apache.hadoop.hbase.master.MasterFileSystem;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.HRegion;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.HStore;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.Region;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.RegionServerServices;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.Store;
|
||||||
|
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||||
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
|
import org.apache.hadoop.hbase.util.FSUtils;
|
||||||
|
import org.apache.hadoop.hbase.util.HFileTestUtil;
|
||||||
|
import org.apache.hadoop.hbase.wal.WAL;
|
||||||
|
import org.junit.After;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.junit.experimental.categories.Category;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.fail;
|
||||||
|
|
||||||
|
@Category(MediumTests.class)
|
||||||
|
public class TestRefreshHFilesEndpoint {
|
||||||
|
private static final Log LOG = LogFactory.getLog(TestRefreshHFilesEndpoint.class);
|
||||||
|
private static final HBaseTestingUtility HTU = new HBaseTestingUtility();
|
||||||
|
private static final int NUM_MASTER = 1;
|
||||||
|
private static final int NUM_RS = 2;
|
||||||
|
private static final TableName TABLE_NAME = TableName.valueOf("testRefreshRegionHFilesEP");
|
||||||
|
private static final byte[] FAMILY = Bytes.toBytes("family");
|
||||||
|
private static final byte[] QUALIFIER = Bytes.toBytes("qualifier");
|
||||||
|
private static final byte[][] SPLIT_KEY = new byte[][] { Bytes.toBytes("30") };
|
||||||
|
private static final int NUM_ROWS = 5;
|
||||||
|
private static final String HFILE_NAME = "123abcdef";
|
||||||
|
|
||||||
|
private static Configuration CONF = HTU.getConfiguration();
|
||||||
|
private static MiniHBaseCluster cluster;
|
||||||
|
private static Table table;
|
||||||
|
|
||||||
|
public static void setUp(String regionImpl) {
|
||||||
|
try {
|
||||||
|
CONF.set(HConstants.REGION_IMPL, regionImpl);
|
||||||
|
CONF.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2);
|
||||||
|
|
||||||
|
CONF.setStrings(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, RefreshHFilesEndpoint.class.getName());
|
||||||
|
cluster = HTU.startMiniCluster(NUM_MASTER, NUM_RS);
|
||||||
|
|
||||||
|
// Create table
|
||||||
|
table = HTU.createTable(TABLE_NAME, FAMILY, SPLIT_KEY);
|
||||||
|
|
||||||
|
// this will create 2 regions spread across slaves
|
||||||
|
HTU.loadNumericRows(table, FAMILY, 1, 20);
|
||||||
|
HTU.flush(TABLE_NAME);
|
||||||
|
} catch (Exception ex) {
|
||||||
|
LOG.error("Couldn't finish setup", ex);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@After
|
||||||
|
public void tearDown() throws Exception {
|
||||||
|
HTU.shutdownMiniCluster();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testRefreshRegionHFilesEndpoint() throws Exception {
|
||||||
|
setUp(HRegion.class.getName());
|
||||||
|
MasterFileSystem mfs = HTU.getMiniHBaseCluster().getMaster().getMasterFileSystem();
|
||||||
|
Path tableDir = FSUtils.getTableDir(mfs.getRootDir(), TABLE_NAME);
|
||||||
|
for (Region region : cluster.getRegions(TABLE_NAME)) {
|
||||||
|
Path regionDir = new Path(tableDir, region.getRegionInfo().getEncodedName());
|
||||||
|
Path familyDir = new Path(regionDir, Bytes.toString(FAMILY));
|
||||||
|
HFileTestUtil
|
||||||
|
.createHFile(HTU.getConfiguration(), HTU.getTestFileSystem(), new Path(familyDir, HFILE_NAME), FAMILY,
|
||||||
|
QUALIFIER, Bytes.toBytes("50"), Bytes.toBytes("60"), NUM_ROWS);
|
||||||
|
}
|
||||||
|
assertEquals(2, HTU.getNumHFiles(TABLE_NAME, FAMILY));
|
||||||
|
callRefreshRegionHFilesEndPoint();
|
||||||
|
assertEquals(4, HTU.getNumHFiles(TABLE_NAME, FAMILY));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(expected = IOException.class)
|
||||||
|
public void testRefreshRegionHFilesEndpointWithException() throws IOException {
|
||||||
|
setUp(HRegionForRefreshHFilesEP.class.getName());
|
||||||
|
callRefreshRegionHFilesEndPoint();
|
||||||
|
}
|
||||||
|
|
||||||
|
private void callRefreshRegionHFilesEndPoint() throws IOException {
|
||||||
|
try {
|
||||||
|
RefreshHFilesClient refreshHFilesClient = new RefreshHFilesClient(CONF);
|
||||||
|
refreshHFilesClient.refreshHFiles(TABLE_NAME);
|
||||||
|
} catch (RetriesExhaustedException rex) {
|
||||||
|
if (rex.getCause() instanceof IOException)
|
||||||
|
throw new IOException();
|
||||||
|
} catch (Throwable ex) {
|
||||||
|
LOG.error(ex);
|
||||||
|
fail("Couldn't call the RefreshRegionHFilesEndpoint");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public static class HRegionForRefreshHFilesEP extends HRegion {
|
||||||
|
HStoreWithFaultyRefreshHFilesAPI store;
|
||||||
|
|
||||||
|
public HRegionForRefreshHFilesEP(final Path tableDir, final WAL wal, final FileSystem fs,
|
||||||
|
final Configuration confParam, final HRegionInfo regionInfo,
|
||||||
|
final TableDescriptor htd, final RegionServerServices rsServices) {
|
||||||
|
super(tableDir, wal, fs, confParam, regionInfo, htd, rsServices);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public List<Store> getStores() {
|
||||||
|
List<Store> list = new ArrayList<Store>(stores.size());
|
||||||
|
/**
|
||||||
|
* This is used to trigger the custom definition (faulty)
|
||||||
|
* of refresh HFiles API.
|
||||||
|
*/
|
||||||
|
try {
|
||||||
|
if (this.store == null)
|
||||||
|
store = new HStoreWithFaultyRefreshHFilesAPI(this, new HColumnDescriptor(FAMILY), this.conf);
|
||||||
|
list.add(store);
|
||||||
|
} catch (IOException ioe) {
|
||||||
|
LOG.info("Couldn't instantiate custom store implementation", ioe);
|
||||||
|
}
|
||||||
|
|
||||||
|
list.addAll(stores.values());
|
||||||
|
return list;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public static class HStoreWithFaultyRefreshHFilesAPI extends HStore {
|
||||||
|
public HStoreWithFaultyRefreshHFilesAPI(final HRegion region, final ColumnFamilyDescriptor family,
|
||||||
|
final Configuration confParam) throws IOException {
|
||||||
|
super(region, family, confParam);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void refreshStoreFiles() throws IOException {
|
||||||
|
throw new IOException();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -4425,4 +4425,22 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility {
|
||||||
HBaseKerberosUtils.setKeytabFileForTesting(keytabFile.getAbsolutePath());
|
HBaseKerberosUtils.setKeytabFileForTesting(keytabFile.getAbsolutePath());
|
||||||
return kdc;
|
return kdc;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public int getNumHFiles(final TableName tableName, final byte[] family) {
|
||||||
|
int numHFiles = 0;
|
||||||
|
for (RegionServerThread regionServerThread : getMiniHBaseCluster().getRegionServerThreads()) {
|
||||||
|
numHFiles+= getNumHFilesForRS(regionServerThread.getRegionServer(), tableName,
|
||||||
|
family);
|
||||||
|
}
|
||||||
|
return numHFiles;
|
||||||
|
}
|
||||||
|
|
||||||
|
public int getNumHFilesForRS(final HRegionServer rs, final TableName tableName,
|
||||||
|
final byte[] family) {
|
||||||
|
int numHFiles = 0;
|
||||||
|
for (Region region : rs.getOnlineRegions(tableName)) {
|
||||||
|
numHFiles += region.getStore(family).getStorefilesCount();
|
||||||
|
}
|
||||||
|
return numHFiles;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue