Revert "HBASE-18448 Added refresh HFiles coprocessor endpoint"
This reverts commit 612c23556d
.
This commit is contained in:
parent
612c23556d
commit
88356029f1
|
@ -1,95 +0,0 @@
|
||||||
/*
|
|
||||||
*
|
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one
|
|
||||||
* or more contributor license agreements. See the NOTICE file
|
|
||||||
* distributed with this work for additional information
|
|
||||||
* regarding copyright ownership. The ASF licenses this file
|
|
||||||
* to you under the Apache License, Version 2.0 (the
|
|
||||||
* "License"); you may not use this file except in compliance
|
|
||||||
* with the License. You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
|
|
||||||
package org.apache.hadoop.hbase.client.example;
|
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
|
||||||
import org.apache.commons.logging.LogFactory;
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
|
||||||
import org.apache.hadoop.hbase.HConstants;
|
|
||||||
import org.apache.hadoop.hbase.TableName;
|
|
||||||
import org.apache.hadoop.hbase.client.Connection;
|
|
||||||
import org.apache.hadoop.hbase.client.ConnectionFactory;
|
|
||||||
import org.apache.hadoop.hbase.client.Table;
|
|
||||||
import org.apache.hadoop.hbase.client.coprocessor.Batch;
|
|
||||||
import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils.BlockingRpcCallback;
|
|
||||||
import org.apache.hadoop.hbase.ipc.ServerRpcController;
|
|
||||||
import org.apache.hadoop.hbase.protobuf.generated.RefreshHFilesProtos;
|
|
||||||
|
|
||||||
import java.io.Closeable;
|
|
||||||
import java.io.IOException;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* This client class is for invoking the refresh HFile function deployed on the
|
|
||||||
* Region Server side via the RefreshHFilesService.
|
|
||||||
*/
|
|
||||||
public class RefreshHFilesClient implements Closeable {
|
|
||||||
private static final Log LOG = LogFactory.getLog(RefreshHFilesClient.class);
|
|
||||||
private final Connection connection;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Constructor with Conf object
|
|
||||||
*
|
|
||||||
* @param cfg
|
|
||||||
*/
|
|
||||||
public RefreshHFilesClient(Configuration cfg) {
|
|
||||||
try {
|
|
||||||
this.connection = ConnectionFactory.createConnection(cfg);
|
|
||||||
} catch (IOException e) {
|
|
||||||
throw new RuntimeException(e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void close() throws IOException {
|
|
||||||
if (this.connection != null && !this.connection.isClosed()) {
|
|
||||||
this.connection.close();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public void refreshHFiles(final TableName tableName) throws Throwable {
|
|
||||||
try (Table table = connection.getTable(tableName)) {
|
|
||||||
refreshHFiles(table);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public void refreshHFiles(final Table table) throws Throwable {
|
|
||||||
final RefreshHFilesProtos.RefreshHFilesRequest request = RefreshHFilesProtos.RefreshHFilesRequest
|
|
||||||
.getDefaultInstance();
|
|
||||||
table.coprocessorService(RefreshHFilesProtos.RefreshHFilesService.class, HConstants.EMPTY_START_ROW,
|
|
||||||
HConstants.EMPTY_END_ROW,
|
|
||||||
new Batch.Call<RefreshHFilesProtos.RefreshHFilesService,
|
|
||||||
RefreshHFilesProtos.RefreshHFilesResponse>() {
|
|
||||||
@Override
|
|
||||||
public RefreshHFilesProtos.RefreshHFilesResponse call(
|
|
||||||
RefreshHFilesProtos.RefreshHFilesService refreshHFilesService)
|
|
||||||
throws IOException {
|
|
||||||
ServerRpcController controller = new ServerRpcController();
|
|
||||||
BlockingRpcCallback<RefreshHFilesProtos.RefreshHFilesResponse> rpcCallback =
|
|
||||||
new BlockingRpcCallback<>();
|
|
||||||
refreshHFilesService.refreshHFiles(controller, request, rpcCallback);
|
|
||||||
if (controller.failedOnException()) {
|
|
||||||
throw controller.getFailedOn();
|
|
||||||
}
|
|
||||||
return rpcCallback.get();
|
|
||||||
}
|
|
||||||
});
|
|
||||||
LOG.debug("Done refreshing HFiles");
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -1,86 +0,0 @@
|
||||||
/*
|
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one
|
|
||||||
* or more contributor license agreements. See the NOTICE file
|
|
||||||
* distributed with this work for additional information
|
|
||||||
* regarding copyright ownership. The ASF licenses this file
|
|
||||||
* to you under the Apache License, Version 2.0 (the
|
|
||||||
* "License"); you may not use this file except in compliance
|
|
||||||
* with the License. You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
|
|
||||||
package org.apache.hadoop.hbase.coprocessor.example;
|
|
||||||
|
|
||||||
import com.google.protobuf.RpcCallback;
|
|
||||||
import com.google.protobuf.RpcController;
|
|
||||||
import com.google.protobuf.Service;
|
|
||||||
import org.apache.commons.logging.Log;
|
|
||||||
import org.apache.commons.logging.LogFactory;
|
|
||||||
import org.apache.hadoop.hbase.Coprocessor;
|
|
||||||
import org.apache.hadoop.hbase.CoprocessorEnvironment;
|
|
||||||
import org.apache.hadoop.hbase.coprocessor.CoprocessorException;
|
|
||||||
import org.apache.hadoop.hbase.coprocessor.CoprocessorService;
|
|
||||||
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
|
|
||||||
import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils;
|
|
||||||
import org.apache.hadoop.hbase.protobuf.generated.RefreshHFilesProtos;
|
|
||||||
import org.apache.hadoop.hbase.regionserver.Store;
|
|
||||||
|
|
||||||
import java.io.IOException;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Coprocessor endpoint to refresh HFiles on replica.
|
|
||||||
* <p>
|
|
||||||
* <p>
|
|
||||||
* For the protocol buffer definition of the RefreshHFilesService, see the source file located under
|
|
||||||
* hbase-protocol/src/main/protobuf/RefreshHFiles.proto.
|
|
||||||
* </p>
|
|
||||||
*/
|
|
||||||
public class RefreshHFilesEndpoint extends RefreshHFilesProtos.RefreshHFilesService
|
|
||||||
implements Coprocessor, CoprocessorService {
|
|
||||||
protected static final Log LOG = LogFactory.getLog(RefreshHFilesEndpoint.class);
|
|
||||||
private RegionCoprocessorEnvironment env;
|
|
||||||
|
|
||||||
public RefreshHFilesEndpoint() {
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public Service getService() {
|
|
||||||
return this;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void refreshHFiles(RpcController controller, RefreshHFilesProtos.RefreshHFilesRequest request,
|
|
||||||
RpcCallback<RefreshHFilesProtos.RefreshHFilesResponse> done) {
|
|
||||||
try {
|
|
||||||
for (Store store : env.getRegion().getStores()) {
|
|
||||||
LOG.debug("Refreshing HFiles for region: " + store.getRegionInfo().getRegionNameAsString() +
|
|
||||||
" and store: " + store.getColumnFamilyName() + "class:" + store.getClass());
|
|
||||||
store.refreshStoreFiles();
|
|
||||||
}
|
|
||||||
} catch (IOException ioe) {
|
|
||||||
LOG.error("Exception while trying to refresh store files: ", ioe);
|
|
||||||
CoprocessorRpcUtils.setControllerException(controller, ioe);
|
|
||||||
}
|
|
||||||
done.run(RefreshHFilesProtos.RefreshHFilesResponse.getDefaultInstance());
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void start(CoprocessorEnvironment env) throws IOException {
|
|
||||||
if (env instanceof RegionCoprocessorEnvironment) {
|
|
||||||
this.env = (RegionCoprocessorEnvironment) env;
|
|
||||||
} else {
|
|
||||||
throw new CoprocessorException("Must be loaded on a table region!");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void stop(CoprocessorEnvironment env) throws IOException {
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -1,36 +0,0 @@
|
||||||
/**
|
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one
|
|
||||||
* or more contributor license agreements. See the NOTICE file
|
|
||||||
* distributed with this work for additional information
|
|
||||||
* regarding copyright ownership. The ASF licenses this file
|
|
||||||
* to you under the Apache License, Version 2.0 (the
|
|
||||||
* "License"); you may not use this file except in compliance
|
|
||||||
* with the License. You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
|
|
||||||
package hbase.pb;
|
|
||||||
|
|
||||||
option java_package = "org.apache.hadoop.hbase.protobuf.generated";
|
|
||||||
option java_outer_classname = "RefreshHFilesProtos";
|
|
||||||
option java_generic_services = true;
|
|
||||||
option java_generate_equals_and_hash = true;
|
|
||||||
option optimize_for = SPEED;
|
|
||||||
|
|
||||||
message RefreshHFilesRequest {
|
|
||||||
}
|
|
||||||
|
|
||||||
message RefreshHFilesResponse {
|
|
||||||
}
|
|
||||||
|
|
||||||
service RefreshHFilesService {
|
|
||||||
rpc refreshHFiles(RefreshHFilesRequest)
|
|
||||||
returns (RefreshHFilesResponse);
|
|
||||||
}
|
|
|
@ -1,177 +0,0 @@
|
||||||
/**
|
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one
|
|
||||||
* or more contributor license agreements. See the NOTICE file
|
|
||||||
* distributed with this work for additional information
|
|
||||||
* regarding copyright ownership. The ASF licenses this file
|
|
||||||
* to you under the Apache License, Version 2.0 (the
|
|
||||||
* "License"); you may not use this file except in compliance
|
|
||||||
* with the License. You may obtain a copy of the License at
|
|
||||||
* <p>
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
* <p>
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
|
|
||||||
package org.apache.hadoop.hbase.coprocessor.example;
|
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
|
||||||
import org.apache.commons.logging.LogFactory;
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
|
||||||
import org.apache.hadoop.fs.Path;
|
|
||||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
|
||||||
import org.apache.hadoop.hbase.HColumnDescriptor;
|
|
||||||
import org.apache.hadoop.hbase.HConstants;
|
|
||||||
import org.apache.hadoop.hbase.HRegionInfo;
|
|
||||||
import org.apache.hadoop.hbase.MiniHBaseCluster;
|
|
||||||
import org.apache.hadoop.hbase.TableName;
|
|
||||||
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
|
|
||||||
import org.apache.hadoop.hbase.client.RetriesExhaustedException;
|
|
||||||
import org.apache.hadoop.hbase.client.Table;
|
|
||||||
import org.apache.hadoop.hbase.client.TableDescriptor;
|
|
||||||
import org.apache.hadoop.hbase.client.example.RefreshHFilesClient;
|
|
||||||
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
|
|
||||||
import org.apache.hadoop.hbase.master.MasterFileSystem;
|
|
||||||
import org.apache.hadoop.hbase.regionserver.HRegion;
|
|
||||||
import org.apache.hadoop.hbase.regionserver.HStore;
|
|
||||||
import org.apache.hadoop.hbase.regionserver.Region;
|
|
||||||
import org.apache.hadoop.hbase.regionserver.RegionServerServices;
|
|
||||||
import org.apache.hadoop.hbase.regionserver.Store;
|
|
||||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
|
||||||
import org.apache.hadoop.hbase.util.FSUtils;
|
|
||||||
import org.apache.hadoop.hbase.util.HFileTestUtil;
|
|
||||||
import org.apache.hadoop.hbase.wal.WAL;
|
|
||||||
import org.junit.After;
|
|
||||||
import org.junit.Test;
|
|
||||||
import org.junit.experimental.categories.Category;
|
|
||||||
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.util.ArrayList;
|
|
||||||
import java.util.List;
|
|
||||||
|
|
||||||
import static org.junit.Assert.assertEquals;
|
|
||||||
import static org.junit.Assert.fail;
|
|
||||||
|
|
||||||
@Category(MediumTests.class)
|
|
||||||
public class TestRefreshHFilesEndpoint {
|
|
||||||
private static final Log LOG = LogFactory.getLog(TestRefreshHFilesEndpoint.class);
|
|
||||||
private static final HBaseTestingUtility HTU = new HBaseTestingUtility();
|
|
||||||
private static final int NUM_MASTER = 1;
|
|
||||||
private static final int NUM_RS = 2;
|
|
||||||
private static final TableName TABLE_NAME = TableName.valueOf("testRefreshRegionHFilesEP");
|
|
||||||
private static final byte[] FAMILY = Bytes.toBytes("family");
|
|
||||||
private static final byte[] QUALIFIER = Bytes.toBytes("qualifier");
|
|
||||||
private static final byte[][] SPLIT_KEY = new byte[][] { Bytes.toBytes("30") };
|
|
||||||
private static final int NUM_ROWS = 5;
|
|
||||||
private static final String HFILE_NAME = "123abcdef";
|
|
||||||
|
|
||||||
private static Configuration CONF = HTU.getConfiguration();
|
|
||||||
private static MiniHBaseCluster cluster;
|
|
||||||
private static Table table;
|
|
||||||
|
|
||||||
public static void setUp(String regionImpl) {
|
|
||||||
try {
|
|
||||||
CONF.set(HConstants.REGION_IMPL, regionImpl);
|
|
||||||
CONF.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2);
|
|
||||||
|
|
||||||
CONF.setStrings(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, RefreshHFilesEndpoint.class.getName());
|
|
||||||
cluster = HTU.startMiniCluster(NUM_MASTER, NUM_RS);
|
|
||||||
|
|
||||||
// Create table
|
|
||||||
table = HTU.createTable(TABLE_NAME, FAMILY, SPLIT_KEY);
|
|
||||||
|
|
||||||
// this will create 2 regions spread across slaves
|
|
||||||
HTU.loadNumericRows(table, FAMILY, 1, 20);
|
|
||||||
HTU.flush(TABLE_NAME);
|
|
||||||
} catch (Exception ex) {
|
|
||||||
LOG.error("Couldn't finish setup", ex);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@After
|
|
||||||
public void tearDown() throws Exception {
|
|
||||||
HTU.shutdownMiniCluster();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testRefreshRegionHFilesEndpoint() throws Exception {
|
|
||||||
setUp(HRegion.class.getName());
|
|
||||||
MasterFileSystem mfs = HTU.getMiniHBaseCluster().getMaster().getMasterFileSystem();
|
|
||||||
Path tableDir = FSUtils.getTableDir(mfs.getRootDir(), TABLE_NAME);
|
|
||||||
for (Region region : cluster.getRegions(TABLE_NAME)) {
|
|
||||||
Path regionDir = new Path(tableDir, region.getRegionInfo().getEncodedName());
|
|
||||||
Path familyDir = new Path(regionDir, Bytes.toString(FAMILY));
|
|
||||||
HFileTestUtil
|
|
||||||
.createHFile(HTU.getConfiguration(), HTU.getTestFileSystem(), new Path(familyDir, HFILE_NAME), FAMILY,
|
|
||||||
QUALIFIER, Bytes.toBytes("50"), Bytes.toBytes("60"), NUM_ROWS);
|
|
||||||
}
|
|
||||||
assertEquals(2, HTU.getNumHFiles(TABLE_NAME, FAMILY));
|
|
||||||
callRefreshRegionHFilesEndPoint();
|
|
||||||
assertEquals(4, HTU.getNumHFiles(TABLE_NAME, FAMILY));
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test(expected = IOException.class)
|
|
||||||
public void testRefreshRegionHFilesEndpointWithException() throws IOException {
|
|
||||||
setUp(HRegionForRefreshHFilesEP.class.getName());
|
|
||||||
callRefreshRegionHFilesEndPoint();
|
|
||||||
}
|
|
||||||
|
|
||||||
private void callRefreshRegionHFilesEndPoint() throws IOException {
|
|
||||||
try {
|
|
||||||
RefreshHFilesClient refreshHFilesClient = new RefreshHFilesClient(CONF);
|
|
||||||
refreshHFilesClient.refreshHFiles(TABLE_NAME);
|
|
||||||
} catch (RetriesExhaustedException rex) {
|
|
||||||
if (rex.getCause() instanceof IOException)
|
|
||||||
throw new IOException();
|
|
||||||
} catch (Throwable ex) {
|
|
||||||
LOG.error(ex);
|
|
||||||
fail("Couldn't call the RefreshRegionHFilesEndpoint");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public static class HRegionForRefreshHFilesEP extends HRegion {
|
|
||||||
HStoreWithFaultyRefreshHFilesAPI store;
|
|
||||||
|
|
||||||
public HRegionForRefreshHFilesEP(final Path tableDir, final WAL wal, final FileSystem fs,
|
|
||||||
final Configuration confParam, final HRegionInfo regionInfo,
|
|
||||||
final TableDescriptor htd, final RegionServerServices rsServices) {
|
|
||||||
super(tableDir, wal, fs, confParam, regionInfo, htd, rsServices);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public List<Store> getStores() {
|
|
||||||
List<Store> list = new ArrayList<Store>(stores.size());
|
|
||||||
/**
|
|
||||||
* This is used to trigger the custom definition (faulty)
|
|
||||||
* of refresh HFiles API.
|
|
||||||
*/
|
|
||||||
try {
|
|
||||||
if (this.store == null)
|
|
||||||
store = new HStoreWithFaultyRefreshHFilesAPI(this, new HColumnDescriptor(FAMILY), this.conf);
|
|
||||||
list.add(store);
|
|
||||||
} catch (IOException ioe) {
|
|
||||||
LOG.info("Couldn't instantiate custom store implementation", ioe);
|
|
||||||
}
|
|
||||||
|
|
||||||
list.addAll(stores.values());
|
|
||||||
return list;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public static class HStoreWithFaultyRefreshHFilesAPI extends HStore {
|
|
||||||
public HStoreWithFaultyRefreshHFilesAPI(final HRegion region, final ColumnFamilyDescriptor family,
|
|
||||||
final Configuration confParam) throws IOException {
|
|
||||||
super(region, family, confParam);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void refreshStoreFiles() throws IOException {
|
|
||||||
throw new IOException();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -4425,22 +4425,4 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility {
|
||||||
HBaseKerberosUtils.setKeytabFileForTesting(keytabFile.getAbsolutePath());
|
HBaseKerberosUtils.setKeytabFileForTesting(keytabFile.getAbsolutePath());
|
||||||
return kdc;
|
return kdc;
|
||||||
}
|
}
|
||||||
|
|
||||||
public int getNumHFiles(final TableName tableName, final byte[] family) {
|
|
||||||
int numHFiles = 0;
|
|
||||||
for (RegionServerThread regionServerThread : getMiniHBaseCluster().getRegionServerThreads()) {
|
|
||||||
numHFiles+= getNumHFilesForRS(regionServerThread.getRegionServer(), tableName,
|
|
||||||
family);
|
|
||||||
}
|
|
||||||
return numHFiles;
|
|
||||||
}
|
|
||||||
|
|
||||||
public int getNumHFilesForRS(final HRegionServer rs, final TableName tableName,
|
|
||||||
final byte[] family) {
|
|
||||||
int numHFiles = 0;
|
|
||||||
for (Region region : rs.getOnlineRegions(tableName)) {
|
|
||||||
numHFiles += region.getStore(family).getStorefilesCount();
|
|
||||||
}
|
|
||||||
return numHFiles;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue