diff --git a/hbase-examples/src/main/java/org/apache/hadoop/hbase/client/example/RefreshHFilesClient.java b/hbase-examples/src/main/java/org/apache/hadoop/hbase/client/example/RefreshHFilesClient.java new file mode 100644 index 00000000000..0401959b68f --- /dev/null +++ b/hbase-examples/src/main/java/org/apache/hadoop/hbase/client/example/RefreshHFilesClient.java @@ -0,0 +1,95 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.client.example; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.client.coprocessor.Batch; +import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils.BlockingRpcCallback; +import org.apache.hadoop.hbase.ipc.ServerRpcController; +import org.apache.hadoop.hbase.protobuf.generated.RefreshHFilesProtos; + +import java.io.Closeable; +import java.io.IOException; + +/** + * This client class is for invoking the refresh HFile function deployed on the + * Region Server side via the RefreshHFilesService. + */ +public class RefreshHFilesClient implements Closeable { + private static final Log LOG = LogFactory.getLog(RefreshHFilesClient.class); + private final Connection connection; + + /** + * Constructor with Conf object + * + * @param cfg + */ + public RefreshHFilesClient(Configuration cfg) { + try { + this.connection = ConnectionFactory.createConnection(cfg); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Override + public void close() throws IOException { + if (this.connection != null && !this.connection.isClosed()) { + this.connection.close(); + } + } + + public void refreshHFiles(final TableName tableName) throws Throwable { + try (Table table = connection.getTable(tableName)) { + refreshHFiles(table); + } + } + + public void refreshHFiles(final Table table) throws Throwable { + final RefreshHFilesProtos.RefreshHFilesRequest request = RefreshHFilesProtos.RefreshHFilesRequest + .getDefaultInstance(); + table.coprocessorService(RefreshHFilesProtos.RefreshHFilesService.class, HConstants.EMPTY_START_ROW, + HConstants.EMPTY_END_ROW, + new Batch.Call() { + @Override + public RefreshHFilesProtos.RefreshHFilesResponse call( + RefreshHFilesProtos.RefreshHFilesService refreshHFilesService) + throws IOException { + ServerRpcController controller = new ServerRpcController(); + BlockingRpcCallback rpcCallback = + new BlockingRpcCallback<>(); + refreshHFilesService.refreshHFiles(controller, request, rpcCallback); + if (controller.failedOnException()) { + throw controller.getFailedOn(); + } + return rpcCallback.get(); + } + }); + LOG.debug("Done refreshing HFiles"); + } +} diff --git a/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/RefreshHFilesEndpoint.java b/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/RefreshHFilesEndpoint.java new file mode 100644 index 00000000000..5b974112a4a --- /dev/null +++ b/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/RefreshHFilesEndpoint.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.coprocessor.example; + +import com.google.protobuf.RpcCallback; +import com.google.protobuf.RpcController; +import com.google.protobuf.Service; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.Coprocessor; +import org.apache.hadoop.hbase.CoprocessorEnvironment; +import org.apache.hadoop.hbase.coprocessor.CoprocessorException; +import org.apache.hadoop.hbase.coprocessor.CoprocessorService; +import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; +import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils; +import org.apache.hadoop.hbase.protobuf.generated.RefreshHFilesProtos; +import org.apache.hadoop.hbase.regionserver.Store; + +import java.io.IOException; + +/** + * Coprocessor endpoint to refresh HFiles on replica. + *

+ *

+ * For the protocol buffer definition of the RefreshHFilesService, see the source file located under + * hbase-protocol/src/main/protobuf/RefreshHFiles.proto. + *

+ */ +public class RefreshHFilesEndpoint extends RefreshHFilesProtos.RefreshHFilesService + implements Coprocessor, CoprocessorService { + protected static final Log LOG = LogFactory.getLog(RefreshHFilesEndpoint.class); + private RegionCoprocessorEnvironment env; + + public RefreshHFilesEndpoint() { + } + + @Override + public Service getService() { + return this; + } + + @Override + public void refreshHFiles(RpcController controller, RefreshHFilesProtos.RefreshHFilesRequest request, + RpcCallback done) { + try { + for (Store store : env.getRegion().getStores()) { + LOG.debug("Refreshing HFiles for region: " + store.getRegionInfo().getRegionNameAsString() + + " and store: " + store.getColumnFamilyName() + "class:" + store.getClass()); + store.refreshStoreFiles(); + } + } catch (IOException ioe) { + LOG.error("Exception while trying to refresh store files: ", ioe); + CoprocessorRpcUtils.setControllerException(controller, ioe); + } + done.run(RefreshHFilesProtos.RefreshHFilesResponse.getDefaultInstance()); + } + + @Override + public void start(CoprocessorEnvironment env) throws IOException { + if (env instanceof RegionCoprocessorEnvironment) { + this.env = (RegionCoprocessorEnvironment) env; + } else { + throw new CoprocessorException("Must be loaded on a table region!"); + } + } + + @Override + public void stop(CoprocessorEnvironment env) throws IOException { + } +} diff --git a/hbase-examples/src/main/protobuf/RefreshHFiles.proto b/hbase-examples/src/main/protobuf/RefreshHFiles.proto new file mode 100644 index 00000000000..11cbab048bc --- /dev/null +++ b/hbase-examples/src/main/protobuf/RefreshHFiles.proto @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package hbase.pb; + +option java_package = "org.apache.hadoop.hbase.protobuf.generated"; +option java_outer_classname = "RefreshHFilesProtos"; +option java_generic_services = true; +option java_generate_equals_and_hash = true; +option optimize_for = SPEED; + +message RefreshHFilesRequest { +} + +message RefreshHFilesResponse { +} + +service RefreshHFilesService { + rpc refreshHFiles(RefreshHFilesRequest) + returns (RefreshHFilesResponse); +} diff --git a/hbase-examples/src/test/java/org/apache/hadoop/hbase/coprocessor/example/TestRefreshHFilesEndpoint.java b/hbase-examples/src/test/java/org/apache/hadoop/hbase/coprocessor/example/TestRefreshHFilesEndpoint.java new file mode 100644 index 00000000000..a037f852ad3 --- /dev/null +++ b/hbase-examples/src/test/java/org/apache/hadoop/hbase/coprocessor/example/TestRefreshHFilesEndpoint.java @@ -0,0 +1,177 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.coprocessor.example; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.MiniHBaseCluster; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; +import org.apache.hadoop.hbase.client.RetriesExhaustedException; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.client.TableDescriptor; +import org.apache.hadoop.hbase.client.example.RefreshHFilesClient; +import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; +import org.apache.hadoop.hbase.master.MasterFileSystem; +import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.regionserver.HStore; +import org.apache.hadoop.hbase.regionserver.Region; +import org.apache.hadoop.hbase.regionserver.RegionServerServices; +import org.apache.hadoop.hbase.regionserver.Store; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.FSUtils; +import org.apache.hadoop.hbase.util.HFileTestUtil; +import org.apache.hadoop.hbase.wal.WAL; +import org.junit.After; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + +@Category(MediumTests.class) +public class TestRefreshHFilesEndpoint { + private static final Log LOG = LogFactory.getLog(TestRefreshHFilesEndpoint.class); + private static final HBaseTestingUtility HTU = new HBaseTestingUtility(); + private static final int NUM_MASTER = 1; + private static final int NUM_RS = 2; + private static final TableName TABLE_NAME = TableName.valueOf("testRefreshRegionHFilesEP"); + private static final byte[] FAMILY = Bytes.toBytes("family"); + private static final byte[] QUALIFIER = Bytes.toBytes("qualifier"); + private static final byte[][] SPLIT_KEY = new byte[][] { Bytes.toBytes("30") }; + private static final int NUM_ROWS = 5; + private static final String HFILE_NAME = "123abcdef"; + + private static Configuration CONF = HTU.getConfiguration(); + private static MiniHBaseCluster cluster; + private static Table table; + + public static void setUp(String regionImpl) { + try { + CONF.set(HConstants.REGION_IMPL, regionImpl); + CONF.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2); + + CONF.setStrings(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, RefreshHFilesEndpoint.class.getName()); + cluster = HTU.startMiniCluster(NUM_MASTER, NUM_RS); + + // Create table + table = HTU.createTable(TABLE_NAME, FAMILY, SPLIT_KEY); + + // this will create 2 regions spread across slaves + HTU.loadNumericRows(table, FAMILY, 1, 20); + HTU.flush(TABLE_NAME); + } catch (Exception ex) { + LOG.error("Couldn't finish setup", ex); + } + } + + @After + public void tearDown() throws Exception { + HTU.shutdownMiniCluster(); + } + + @Test + public void testRefreshRegionHFilesEndpoint() throws Exception { + setUp(HRegion.class.getName()); + MasterFileSystem mfs = HTU.getMiniHBaseCluster().getMaster().getMasterFileSystem(); + Path tableDir = FSUtils.getTableDir(mfs.getRootDir(), TABLE_NAME); + for (Region region : cluster.getRegions(TABLE_NAME)) { + Path regionDir = new Path(tableDir, region.getRegionInfo().getEncodedName()); + Path familyDir = new Path(regionDir, Bytes.toString(FAMILY)); + HFileTestUtil + .createHFile(HTU.getConfiguration(), HTU.getTestFileSystem(), new Path(familyDir, HFILE_NAME), FAMILY, + QUALIFIER, Bytes.toBytes("50"), Bytes.toBytes("60"), NUM_ROWS); + } + assertEquals(2, HTU.getNumHFiles(TABLE_NAME, FAMILY)); + callRefreshRegionHFilesEndPoint(); + assertEquals(4, HTU.getNumHFiles(TABLE_NAME, FAMILY)); + } + + @Test(expected = IOException.class) + public void testRefreshRegionHFilesEndpointWithException() throws IOException { + setUp(HRegionForRefreshHFilesEP.class.getName()); + callRefreshRegionHFilesEndPoint(); + } + + private void callRefreshRegionHFilesEndPoint() throws IOException { + try { + RefreshHFilesClient refreshHFilesClient = new RefreshHFilesClient(CONF); + refreshHFilesClient.refreshHFiles(TABLE_NAME); + } catch (RetriesExhaustedException rex) { + if (rex.getCause() instanceof IOException) + throw new IOException(); + } catch (Throwable ex) { + LOG.error(ex); + fail("Couldn't call the RefreshRegionHFilesEndpoint"); + } + } + + public static class HRegionForRefreshHFilesEP extends HRegion { + HStoreWithFaultyRefreshHFilesAPI store; + + public HRegionForRefreshHFilesEP(final Path tableDir, final WAL wal, final FileSystem fs, + final Configuration confParam, final HRegionInfo regionInfo, + final TableDescriptor htd, final RegionServerServices rsServices) { + super(tableDir, wal, fs, confParam, regionInfo, htd, rsServices); + } + + @Override + public List getStores() { + List list = new ArrayList(stores.size()); + /** + * This is used to trigger the custom definition (faulty) + * of refresh HFiles API. + */ + try { + if (this.store == null) + store = new HStoreWithFaultyRefreshHFilesAPI(this, new HColumnDescriptor(FAMILY), this.conf); + list.add(store); + } catch (IOException ioe) { + LOG.info("Couldn't instantiate custom store implementation", ioe); + } + + list.addAll(stores.values()); + return list; + } + } + + public static class HStoreWithFaultyRefreshHFilesAPI extends HStore { + public HStoreWithFaultyRefreshHFilesAPI(final HRegion region, final ColumnFamilyDescriptor family, + final Configuration confParam) throws IOException { + super(region, family, confParam); + } + + @Override + public void refreshStoreFiles() throws IOException { + throw new IOException(); + } + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java index 28d2a242265..1ec5ecdb647 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java @@ -4425,4 +4425,22 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility { HBaseKerberosUtils.setKeytabFileForTesting(keytabFile.getAbsolutePath()); return kdc; } + + public int getNumHFiles(final TableName tableName, final byte[] family) { + int numHFiles = 0; + for (RegionServerThread regionServerThread : getMiniHBaseCluster().getRegionServerThreads()) { + numHFiles+= getNumHFilesForRS(regionServerThread.getRegionServer(), tableName, + family); + } + return numHFiles; + } + + public int getNumHFilesForRS(final HRegionServer rs, final TableName tableName, + final byte[] family) { + int numHFiles = 0; + for (Region region : rs.getOnlineRegions(tableName)) { + numHFiles += region.getStore(family).getStorefilesCount(); + } + return numHFiles; + } }