HBASE-18448 Added refresh HFiles coprocessor endpoint
Signed-off-by: anoopsamjohn <anoopsamjohn@gmail.com>
This commit is contained in:
parent
321bc55f91
commit
612c23556d
|
@ -0,0 +1,95 @@
|
|||
/*
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.hbase.client.example;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.client.Connection;
|
||||
import org.apache.hadoop.hbase.client.ConnectionFactory;
|
||||
import org.apache.hadoop.hbase.client.Table;
|
||||
import org.apache.hadoop.hbase.client.coprocessor.Batch;
|
||||
import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils.BlockingRpcCallback;
|
||||
import org.apache.hadoop.hbase.ipc.ServerRpcController;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.RefreshHFilesProtos;
|
||||
|
||||
import java.io.Closeable;
|
||||
import java.io.IOException;
|
||||
|
||||
/**
|
||||
* This client class is for invoking the refresh HFile function deployed on the
|
||||
* Region Server side via the RefreshHFilesService.
|
||||
*/
|
||||
public class RefreshHFilesClient implements Closeable {
|
||||
private static final Log LOG = LogFactory.getLog(RefreshHFilesClient.class);
|
||||
private final Connection connection;
|
||||
|
||||
/**
|
||||
* Constructor with Conf object
|
||||
*
|
||||
* @param cfg
|
||||
*/
|
||||
public RefreshHFilesClient(Configuration cfg) {
|
||||
try {
|
||||
this.connection = ConnectionFactory.createConnection(cfg);
|
||||
} catch (IOException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() throws IOException {
|
||||
if (this.connection != null && !this.connection.isClosed()) {
|
||||
this.connection.close();
|
||||
}
|
||||
}
|
||||
|
||||
public void refreshHFiles(final TableName tableName) throws Throwable {
|
||||
try (Table table = connection.getTable(tableName)) {
|
||||
refreshHFiles(table);
|
||||
}
|
||||
}
|
||||
|
||||
public void refreshHFiles(final Table table) throws Throwable {
|
||||
final RefreshHFilesProtos.RefreshHFilesRequest request = RefreshHFilesProtos.RefreshHFilesRequest
|
||||
.getDefaultInstance();
|
||||
table.coprocessorService(RefreshHFilesProtos.RefreshHFilesService.class, HConstants.EMPTY_START_ROW,
|
||||
HConstants.EMPTY_END_ROW,
|
||||
new Batch.Call<RefreshHFilesProtos.RefreshHFilesService,
|
||||
RefreshHFilesProtos.RefreshHFilesResponse>() {
|
||||
@Override
|
||||
public RefreshHFilesProtos.RefreshHFilesResponse call(
|
||||
RefreshHFilesProtos.RefreshHFilesService refreshHFilesService)
|
||||
throws IOException {
|
||||
ServerRpcController controller = new ServerRpcController();
|
||||
BlockingRpcCallback<RefreshHFilesProtos.RefreshHFilesResponse> rpcCallback =
|
||||
new BlockingRpcCallback<>();
|
||||
refreshHFilesService.refreshHFiles(controller, request, rpcCallback);
|
||||
if (controller.failedOnException()) {
|
||||
throw controller.getFailedOn();
|
||||
}
|
||||
return rpcCallback.get();
|
||||
}
|
||||
});
|
||||
LOG.debug("Done refreshing HFiles");
|
||||
}
|
||||
}
|
|
@ -0,0 +1,86 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.hbase.coprocessor.example;
|
||||
|
||||
import com.google.protobuf.RpcCallback;
|
||||
import com.google.protobuf.RpcController;
|
||||
import com.google.protobuf.Service;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.hbase.Coprocessor;
|
||||
import org.apache.hadoop.hbase.CoprocessorEnvironment;
|
||||
import org.apache.hadoop.hbase.coprocessor.CoprocessorException;
|
||||
import org.apache.hadoop.hbase.coprocessor.CoprocessorService;
|
||||
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
|
||||
import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.RefreshHFilesProtos;
|
||||
import org.apache.hadoop.hbase.regionserver.Store;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
/**
|
||||
* Coprocessor endpoint to refresh HFiles on replica.
|
||||
* <p>
|
||||
* <p>
|
||||
* For the protocol buffer definition of the RefreshHFilesService, see the source file located under
|
||||
* hbase-protocol/src/main/protobuf/RefreshHFiles.proto.
|
||||
* </p>
|
||||
*/
|
||||
public class RefreshHFilesEndpoint extends RefreshHFilesProtos.RefreshHFilesService
|
||||
implements Coprocessor, CoprocessorService {
|
||||
protected static final Log LOG = LogFactory.getLog(RefreshHFilesEndpoint.class);
|
||||
private RegionCoprocessorEnvironment env;
|
||||
|
||||
public RefreshHFilesEndpoint() {
|
||||
}
|
||||
|
||||
@Override
|
||||
public Service getService() {
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void refreshHFiles(RpcController controller, RefreshHFilesProtos.RefreshHFilesRequest request,
|
||||
RpcCallback<RefreshHFilesProtos.RefreshHFilesResponse> done) {
|
||||
try {
|
||||
for (Store store : env.getRegion().getStores()) {
|
||||
LOG.debug("Refreshing HFiles for region: " + store.getRegionInfo().getRegionNameAsString() +
|
||||
" and store: " + store.getColumnFamilyName() + "class:" + store.getClass());
|
||||
store.refreshStoreFiles();
|
||||
}
|
||||
} catch (IOException ioe) {
|
||||
LOG.error("Exception while trying to refresh store files: ", ioe);
|
||||
CoprocessorRpcUtils.setControllerException(controller, ioe);
|
||||
}
|
||||
done.run(RefreshHFilesProtos.RefreshHFilesResponse.getDefaultInstance());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void start(CoprocessorEnvironment env) throws IOException {
|
||||
if (env instanceof RegionCoprocessorEnvironment) {
|
||||
this.env = (RegionCoprocessorEnvironment) env;
|
||||
} else {
|
||||
throw new CoprocessorException("Must be loaded on a table region!");
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void stop(CoprocessorEnvironment env) throws IOException {
|
||||
}
|
||||
}
|
|
@ -0,0 +1,36 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package hbase.pb;
|
||||
|
||||
option java_package = "org.apache.hadoop.hbase.protobuf.generated";
|
||||
option java_outer_classname = "RefreshHFilesProtos";
|
||||
option java_generic_services = true;
|
||||
option java_generate_equals_and_hash = true;
|
||||
option optimize_for = SPEED;
|
||||
|
||||
message RefreshHFilesRequest {
|
||||
}
|
||||
|
||||
message RefreshHFilesResponse {
|
||||
}
|
||||
|
||||
service RefreshHFilesService {
|
||||
rpc refreshHFiles(RefreshHFilesRequest)
|
||||
returns (RefreshHFilesResponse);
|
||||
}
|
|
@ -0,0 +1,177 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
* <p>
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* <p>
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.hbase.coprocessor.example;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.HColumnDescriptor;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.HRegionInfo;
|
||||
import org.apache.hadoop.hbase.MiniHBaseCluster;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
|
||||
import org.apache.hadoop.hbase.client.RetriesExhaustedException;
|
||||
import org.apache.hadoop.hbase.client.Table;
|
||||
import org.apache.hadoop.hbase.client.TableDescriptor;
|
||||
import org.apache.hadoop.hbase.client.example.RefreshHFilesClient;
|
||||
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
|
||||
import org.apache.hadoop.hbase.master.MasterFileSystem;
|
||||
import org.apache.hadoop.hbase.regionserver.HRegion;
|
||||
import org.apache.hadoop.hbase.regionserver.HStore;
|
||||
import org.apache.hadoop.hbase.regionserver.Region;
|
||||
import org.apache.hadoop.hbase.regionserver.RegionServerServices;
|
||||
import org.apache.hadoop.hbase.regionserver.Store;
|
||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.FSUtils;
|
||||
import org.apache.hadoop.hbase.util.HFileTestUtil;
|
||||
import org.apache.hadoop.hbase.wal.WAL;
|
||||
import org.junit.After;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.fail;
|
||||
|
||||
@Category(MediumTests.class)
|
||||
public class TestRefreshHFilesEndpoint {
|
||||
private static final Log LOG = LogFactory.getLog(TestRefreshHFilesEndpoint.class);
|
||||
private static final HBaseTestingUtility HTU = new HBaseTestingUtility();
|
||||
private static final int NUM_MASTER = 1;
|
||||
private static final int NUM_RS = 2;
|
||||
private static final TableName TABLE_NAME = TableName.valueOf("testRefreshRegionHFilesEP");
|
||||
private static final byte[] FAMILY = Bytes.toBytes("family");
|
||||
private static final byte[] QUALIFIER = Bytes.toBytes("qualifier");
|
||||
private static final byte[][] SPLIT_KEY = new byte[][] { Bytes.toBytes("30") };
|
||||
private static final int NUM_ROWS = 5;
|
||||
private static final String HFILE_NAME = "123abcdef";
|
||||
|
||||
private static Configuration CONF = HTU.getConfiguration();
|
||||
private static MiniHBaseCluster cluster;
|
||||
private static Table table;
|
||||
|
||||
public static void setUp(String regionImpl) {
|
||||
try {
|
||||
CONF.set(HConstants.REGION_IMPL, regionImpl);
|
||||
CONF.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2);
|
||||
|
||||
CONF.setStrings(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, RefreshHFilesEndpoint.class.getName());
|
||||
cluster = HTU.startMiniCluster(NUM_MASTER, NUM_RS);
|
||||
|
||||
// Create table
|
||||
table = HTU.createTable(TABLE_NAME, FAMILY, SPLIT_KEY);
|
||||
|
||||
// this will create 2 regions spread across slaves
|
||||
HTU.loadNumericRows(table, FAMILY, 1, 20);
|
||||
HTU.flush(TABLE_NAME);
|
||||
} catch (Exception ex) {
|
||||
LOG.error("Couldn't finish setup", ex);
|
||||
}
|
||||
}
|
||||
|
||||
@After
|
||||
public void tearDown() throws Exception {
|
||||
HTU.shutdownMiniCluster();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRefreshRegionHFilesEndpoint() throws Exception {
|
||||
setUp(HRegion.class.getName());
|
||||
MasterFileSystem mfs = HTU.getMiniHBaseCluster().getMaster().getMasterFileSystem();
|
||||
Path tableDir = FSUtils.getTableDir(mfs.getRootDir(), TABLE_NAME);
|
||||
for (Region region : cluster.getRegions(TABLE_NAME)) {
|
||||
Path regionDir = new Path(tableDir, region.getRegionInfo().getEncodedName());
|
||||
Path familyDir = new Path(regionDir, Bytes.toString(FAMILY));
|
||||
HFileTestUtil
|
||||
.createHFile(HTU.getConfiguration(), HTU.getTestFileSystem(), new Path(familyDir, HFILE_NAME), FAMILY,
|
||||
QUALIFIER, Bytes.toBytes("50"), Bytes.toBytes("60"), NUM_ROWS);
|
||||
}
|
||||
assertEquals(2, HTU.getNumHFiles(TABLE_NAME, FAMILY));
|
||||
callRefreshRegionHFilesEndPoint();
|
||||
assertEquals(4, HTU.getNumHFiles(TABLE_NAME, FAMILY));
|
||||
}
|
||||
|
||||
@Test(expected = IOException.class)
|
||||
public void testRefreshRegionHFilesEndpointWithException() throws IOException {
|
||||
setUp(HRegionForRefreshHFilesEP.class.getName());
|
||||
callRefreshRegionHFilesEndPoint();
|
||||
}
|
||||
|
||||
private void callRefreshRegionHFilesEndPoint() throws IOException {
|
||||
try {
|
||||
RefreshHFilesClient refreshHFilesClient = new RefreshHFilesClient(CONF);
|
||||
refreshHFilesClient.refreshHFiles(TABLE_NAME);
|
||||
} catch (RetriesExhaustedException rex) {
|
||||
if (rex.getCause() instanceof IOException)
|
||||
throw new IOException();
|
||||
} catch (Throwable ex) {
|
||||
LOG.error(ex);
|
||||
fail("Couldn't call the RefreshRegionHFilesEndpoint");
|
||||
}
|
||||
}
|
||||
|
||||
public static class HRegionForRefreshHFilesEP extends HRegion {
|
||||
HStoreWithFaultyRefreshHFilesAPI store;
|
||||
|
||||
public HRegionForRefreshHFilesEP(final Path tableDir, final WAL wal, final FileSystem fs,
|
||||
final Configuration confParam, final HRegionInfo regionInfo,
|
||||
final TableDescriptor htd, final RegionServerServices rsServices) {
|
||||
super(tableDir, wal, fs, confParam, regionInfo, htd, rsServices);
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<Store> getStores() {
|
||||
List<Store> list = new ArrayList<Store>(stores.size());
|
||||
/**
|
||||
* This is used to trigger the custom definition (faulty)
|
||||
* of refresh HFiles API.
|
||||
*/
|
||||
try {
|
||||
if (this.store == null)
|
||||
store = new HStoreWithFaultyRefreshHFilesAPI(this, new HColumnDescriptor(FAMILY), this.conf);
|
||||
list.add(store);
|
||||
} catch (IOException ioe) {
|
||||
LOG.info("Couldn't instantiate custom store implementation", ioe);
|
||||
}
|
||||
|
||||
list.addAll(stores.values());
|
||||
return list;
|
||||
}
|
||||
}
|
||||
|
||||
public static class HStoreWithFaultyRefreshHFilesAPI extends HStore {
|
||||
public HStoreWithFaultyRefreshHFilesAPI(final HRegion region, final ColumnFamilyDescriptor family,
|
||||
final Configuration confParam) throws IOException {
|
||||
super(region, family, confParam);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void refreshStoreFiles() throws IOException {
|
||||
throw new IOException();
|
||||
}
|
||||
}
|
||||
}
|
|
@ -4425,4 +4425,22 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility {
|
|||
HBaseKerberosUtils.setKeytabFileForTesting(keytabFile.getAbsolutePath());
|
||||
return kdc;
|
||||
}
|
||||
|
||||
public int getNumHFiles(final TableName tableName, final byte[] family) {
|
||||
int numHFiles = 0;
|
||||
for (RegionServerThread regionServerThread : getMiniHBaseCluster().getRegionServerThreads()) {
|
||||
numHFiles+= getNumHFilesForRS(regionServerThread.getRegionServer(), tableName,
|
||||
family);
|
||||
}
|
||||
return numHFiles;
|
||||
}
|
||||
|
||||
public int getNumHFilesForRS(final HRegionServer rs, final TableName tableName,
|
||||
final byte[] family) {
|
||||
int numHFiles = 0;
|
||||
for (Region region : rs.getOnlineRegions(tableName)) {
|
||||
numHFiles += region.getStore(family).getStorefilesCount();
|
||||
}
|
||||
return numHFiles;
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue