HBASE-18875 Thrift server supports read-only mode

Signed-off-by: tedyu <yuzhihong@gmail.com>
This commit is contained in:
Guangxu Cheng 2017-09-26 00:29:24 +08:00 committed by tedyu
parent ad60bc5f60
commit bb81e9f3ca
3 changed files with 503 additions and 1 deletions

View File

@ -49,6 +49,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.client.RegionLocator;
@ -94,6 +95,10 @@ public class ThriftHBaseServiceHandler implements THBaseService.Iface {
static final String CLEANUP_INTERVAL = "hbase.thrift.connection.cleanup-interval";
static final String MAX_IDLETIME = "hbase.thrift.connection.max-idletime";
private static final IOException ioe
= new DoNotRetryIOException("Thrift Server is in Read-only mode.");
private boolean isReadOnly;
public static THBaseService.Iface newInstance(
THBaseService.Iface handler, ThriftMetrics metrics) {
return (THBaseService.Iface) Proxy.newProxyInstance(handler.getClass().getClassLoader(),
@ -174,6 +179,7 @@ public class ThriftHBaseServiceHandler implements THBaseService.Iface {
int maxIdleTime = conf.getInt(MAX_IDLETIME, 10 * 60 * 1000);
connectionCache = new ConnectionCache(
conf, userProvider, cleanInterval, maxIdleTime);
isReadOnly = conf.getBoolean("hbase.thrift.readonly", false);
}
private Table getTable(ByteBuffer tableName) {
@ -294,6 +300,7 @@ public class ThriftHBaseServiceHandler implements THBaseService.Iface {
@Override
public void put(ByteBuffer table, TPut put) throws TIOError, TException {
checkReadOnlyMode();
Table htable = getTable(table);
try {
htable.put(putFromThrift(put));
@ -307,6 +314,7 @@ public class ThriftHBaseServiceHandler implements THBaseService.Iface {
@Override
public boolean checkAndPut(ByteBuffer table, ByteBuffer row, ByteBuffer family,
ByteBuffer qualifier, ByteBuffer value, TPut put) throws TIOError, TException {
checkReadOnlyMode();
Table htable = getTable(table);
try {
return htable.checkAndPut(byteBufferToByteArray(row), byteBufferToByteArray(family),
@ -321,6 +329,7 @@ public class ThriftHBaseServiceHandler implements THBaseService.Iface {
@Override
public void putMultiple(ByteBuffer table, List<TPut> puts) throws TIOError, TException {
checkReadOnlyMode();
Table htable = getTable(table);
try {
htable.put(putsFromThrift(puts));
@ -333,6 +342,7 @@ public class ThriftHBaseServiceHandler implements THBaseService.Iface {
@Override
public void deleteSingle(ByteBuffer table, TDelete deleteSingle) throws TIOError, TException {
checkReadOnlyMode();
Table htable = getTable(table);
try {
htable.delete(deleteFromThrift(deleteSingle));
@ -346,6 +356,7 @@ public class ThriftHBaseServiceHandler implements THBaseService.Iface {
@Override
public List<TDelete> deleteMultiple(ByteBuffer table, List<TDelete> deletes) throws TIOError,
TException {
checkReadOnlyMode();
Table htable = getTable(table);
try {
htable.delete(deletesFromThrift(deletes));
@ -361,6 +372,7 @@ public class ThriftHBaseServiceHandler implements THBaseService.Iface {
public boolean checkAndMutate(ByteBuffer table, ByteBuffer row, ByteBuffer family,
ByteBuffer qualifier, TCompareOp compareOp, ByteBuffer value, TRowMutations rowMutations)
throws TIOError, TException {
checkReadOnlyMode();
try (final Table htable = getTable(table)) {
return htable.checkAndMutate(byteBufferToByteArray(row), byteBufferToByteArray(family),
byteBufferToByteArray(qualifier), compareOpFromThrift(compareOp),
@ -373,8 +385,8 @@ public class ThriftHBaseServiceHandler implements THBaseService.Iface {
@Override
public boolean checkAndDelete(ByteBuffer table, ByteBuffer row, ByteBuffer family,
ByteBuffer qualifier, ByteBuffer value, TDelete deleteSingle) throws TIOError, TException {
checkReadOnlyMode();
Table htable = getTable(table);
try {
if (value == null) {
return htable.checkAndDelete(byteBufferToByteArray(row), byteBufferToByteArray(family),
@ -393,6 +405,7 @@ public class ThriftHBaseServiceHandler implements THBaseService.Iface {
@Override
public TResult increment(ByteBuffer table, TIncrement increment) throws TIOError, TException {
checkReadOnlyMode();
Table htable = getTable(table);
try {
return resultFromHBase(htable.increment(incrementFromThrift(increment)));
@ -405,6 +418,7 @@ public class ThriftHBaseServiceHandler implements THBaseService.Iface {
@Override
public TResult append(ByteBuffer table, TAppend append) throws TIOError, TException {
checkReadOnlyMode();
Table htable = getTable(table);
try {
return resultFromHBase(htable.append(appendFromThrift(append)));
@ -485,6 +499,7 @@ public class ThriftHBaseServiceHandler implements THBaseService.Iface {
@Override
public void mutateRow(ByteBuffer table, TRowMutations rowMutations) throws TIOError, TException {
checkReadOnlyMode();
Table htable = getTable(table);
try {
htable.mutateRow(rowMutationsFromThrift(rowMutations));
@ -539,4 +554,14 @@ public class ThriftHBaseServiceHandler implements THBaseService.Iface {
}
}
}
private void checkReadOnlyMode() throws TIOError {
if (isReadOnly()) {
throw getTIOError(ioe);
}
}
private boolean isReadOnly() {
return isReadOnly;
}
}

View File

@ -156,6 +156,8 @@ public class ThriftServer extends Configured implements Tool {
"Amount of time in milliseconds before a server thread will timeout " +
"waiting for client to send data on a connected socket. Currently, " +
"only applies to TBoundedThreadPoolServer");
options.addOption("ro", "readonly", false,
"Respond only to read method requests [default: false]");
OptionGroup servers = new OptionGroup();
servers.addOption(
new Option("nonblocking", false, "Use the TNonblockingServer. This implies the framed transport."));
@ -407,6 +409,14 @@ public class ThriftServer extends Configured implements Tool {
bindAddress = conf.get("hbase.thrift.info.bindAddress");
}
// check if server should only process read requests, if so override the conf
if (cmd.hasOption("readonly")) {
conf.setBoolean("hbase.thrift.readonly", true);
if (log.isDebugEnabled()) {
log.debug("readonly set to true");
}
}
// Get read timeout
int readTimeout = THRIFT_SERVER_SOCKET_READ_TIMEOUT_DEFAULT;
if (cmd.hasOption(READ_TIMEOUT_OPTION)) {

View File

@ -0,0 +1,467 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.thrift2;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.security.UserProvider;
import org.apache.hadoop.hbase.testclassification.ClientTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.thrift2.generated.TAppend;
import org.apache.hadoop.hbase.thrift2.generated.TColumnIncrement;
import org.apache.hadoop.hbase.thrift2.generated.TColumnValue;
import org.apache.hadoop.hbase.thrift2.generated.TCompareOp;
import org.apache.hadoop.hbase.thrift2.generated.TDelete;
import org.apache.hadoop.hbase.thrift2.generated.TGet;
import org.apache.hadoop.hbase.thrift2.generated.TIOError;
import org.apache.hadoop.hbase.thrift2.generated.TIncrement;
import org.apache.hadoop.hbase.thrift2.generated.TMutation;
import org.apache.hadoop.hbase.thrift2.generated.TPut;
import org.apache.hadoop.hbase.thrift2.generated.TRowMutations;
import org.apache.hadoop.hbase.thrift2.generated.TScan;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.thrift.TException;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import static java.nio.ByteBuffer.wrap;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
@Category({ClientTests.class, MediumTests.class})
public class TestThriftHBaseServiceHandlerWithReadOnly {
private static final Log LOG = LogFactory.getLog(TestThriftHBaseServiceHandlerWithReadOnly.class);
private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
// Static names for tables, columns, rows, and values
private static byte[] tableAname = Bytes.toBytes("tableA");
private static byte[] familyAname = Bytes.toBytes("familyA");
private static byte[] familyBname = Bytes.toBytes("familyB");
private static byte[] qualifierAname = Bytes.toBytes("qualifierA");
private static byte[] qualifierBname = Bytes.toBytes("qualifierB");
private static byte[] valueAname = Bytes.toBytes("valueA");
private static byte[] valueBname = Bytes.toBytes("valueB");
private static HColumnDescriptor[] families = new HColumnDescriptor[] {
new HColumnDescriptor(familyAname).setMaxVersions(3),
new HColumnDescriptor(familyBname).setMaxVersions(2)
};
@BeforeClass
public static void beforeClass() throws Exception {
UTIL.getConfiguration().setBoolean("hbase.thrift.readonly", true);
UTIL.getConfiguration().set("hbase.client.retries.number", "3");
UTIL.startMiniCluster();
Admin admin = UTIL.getAdmin();
HTableDescriptor tableDescriptor = new HTableDescriptor(TableName.valueOf(tableAname));
for (HColumnDescriptor family : families) {
tableDescriptor.addFamily(family);
}
admin.createTable(tableDescriptor);
admin.close();
}
@AfterClass
public static void afterClass() throws Exception {
UTIL.shutdownMiniCluster();
}
@Before
public void setup() throws Exception {
}
private ThriftHBaseServiceHandler createHandler() throws TException {
try {
Configuration conf = UTIL.getConfiguration();
return new ThriftHBaseServiceHandler(conf, UserProvider.instantiate(conf));
} catch (IOException ie) {
throw new TException(ie);
}
}
@Test
public void testExistsWithReadOnly() throws TException {
ThriftHBaseServiceHandler handler = createHandler();
byte[] rowName = "testExists".getBytes();
ByteBuffer table = wrap(tableAname);
TGet get = new TGet(wrap(rowName));
boolean exceptionCaught = false;
try {
handler.exists(table, get);
} catch (TIOError e) {
exceptionCaught = true;
} finally {
assertFalse(exceptionCaught);
}
}
@Test
public void testExistsAllWithReadOnly() throws TException {
ThriftHBaseServiceHandler handler = createHandler();
byte[] rowName1 = "testExistsAll1".getBytes();
byte[] rowName2 = "testExistsAll2".getBytes();
ByteBuffer table = wrap(tableAname);
List<TGet> gets = new ArrayList<>();
gets.add(new TGet(wrap(rowName1)));
gets.add(new TGet(wrap(rowName2)));
boolean exceptionCaught = false;
try {
handler.existsAll(table, gets);
} catch (TIOError e) {
exceptionCaught = true;
} finally {
assertFalse(exceptionCaught);
}
}
@Test
public void testGetWithReadOnly() throws Exception {
ThriftHBaseServiceHandler handler = createHandler();
byte[] rowName = "testGet".getBytes();
ByteBuffer table = wrap(tableAname);
TGet get = new TGet(wrap(rowName));
boolean exceptionCaught = false;
try {
handler.get(table, get);
} catch (TIOError e) {
exceptionCaught = true;
} finally {
assertFalse(exceptionCaught);
}
}
@Test
public void testGetMultipleWithReadOnly() throws Exception {
ThriftHBaseServiceHandler handler = createHandler();
ByteBuffer table = wrap(tableAname);
byte[] rowName1 = "testGetMultiple1".getBytes();
byte[] rowName2 = "testGetMultiple2".getBytes();
List<TGet> gets = new ArrayList<>(2);
gets.add(new TGet(wrap(rowName1)));
gets.add(new TGet(wrap(rowName2)));
boolean exceptionCaught = false;
try {
handler.getMultiple(table, gets);
} catch (TIOError e) {
exceptionCaught = true;
} finally {
assertFalse(exceptionCaught);
}
}
@Test
public void testPutWithReadOnly() throws Exception {
ThriftHBaseServiceHandler handler = createHandler();
ByteBuffer table = wrap(tableAname);
byte[] rowName = "testPut".getBytes();
List<TColumnValue> columnValues = new ArrayList<>(2);
columnValues.add(new TColumnValue(wrap(familyAname), wrap(qualifierAname), wrap(valueAname)));
columnValues.add(new TColumnValue(wrap(familyBname), wrap(qualifierBname), wrap(valueBname)));
TPut put = new TPut(wrap(rowName), columnValues);
boolean exceptionCaught = false;
try {
handler.put(table, put);
} catch (TIOError e) {
exceptionCaught = true;
assertTrue(e.getCause() instanceof DoNotRetryIOException);
assertEquals("Thrift Server is in Read-only mode.", e.getMessage());
} finally {
assertTrue(exceptionCaught);
}
}
@Test
public void testCheckAndPutWithReadOnly() throws Exception {
ThriftHBaseServiceHandler handler = createHandler();
byte[] rowName = "testCheckAndPut".getBytes();
ByteBuffer table = wrap(tableAname);
List<TColumnValue> columnValuesA = new ArrayList<>(1);
TColumnValue columnValueA = new TColumnValue(wrap(familyAname), wrap(qualifierAname),
wrap(valueAname));
columnValuesA.add(columnValueA);
TPut putA = new TPut(wrap(rowName), columnValuesA);
putA.setColumnValues(columnValuesA);
List<TColumnValue> columnValuesB = new ArrayList<>(1);
TColumnValue columnValueB = new TColumnValue(wrap(familyBname), wrap(qualifierBname),
wrap(valueBname));
columnValuesB.add(columnValueB);
TPut putB = new TPut(wrap(rowName), columnValuesB);
putB.setColumnValues(columnValuesB);
boolean exceptionCaught = false;
try {
handler.checkAndPut(table, wrap(rowName), wrap(familyAname),
wrap(qualifierAname), wrap(valueAname), putB);
} catch (TIOError e) {
exceptionCaught = true;
assertTrue(e.getCause() instanceof DoNotRetryIOException);
assertEquals("Thrift Server is in Read-only mode.", e.getMessage());
} finally {
assertTrue(exceptionCaught);
}
}
@Test
public void testPutMultipleWithReadOnly() throws Exception {
ThriftHBaseServiceHandler handler = createHandler();
ByteBuffer table = wrap(tableAname);
byte[] rowName1 = "testPutMultiple1".getBytes();
byte[] rowName2 = "testPutMultiple2".getBytes();
List<TColumnValue> columnValues = new ArrayList<>(2);
columnValues.add(new TColumnValue(wrap(familyAname), wrap(qualifierAname), wrap(valueAname)));
columnValues.add(new TColumnValue(wrap(familyBname), wrap(qualifierBname), wrap(valueBname)));
List<TPut> puts = new ArrayList<>(2);
puts.add(new TPut(wrap(rowName1), columnValues));
puts.add(new TPut(wrap(rowName2), columnValues));
boolean exceptionCaught = false;
try {
handler.putMultiple(table, puts);
} catch (TIOError e) {
exceptionCaught = true;
assertTrue(e.getCause() instanceof DoNotRetryIOException);
assertEquals("Thrift Server is in Read-only mode.", e.getMessage());
} finally {
assertTrue(exceptionCaught);
}
}
@Test
public void testDeleteWithReadOnly() throws Exception {
ThriftHBaseServiceHandler handler = createHandler();
byte[] rowName = "testDelete".getBytes();
ByteBuffer table = wrap(tableAname);
TDelete delete = new TDelete(wrap(rowName));
boolean exceptionCaught = false;
try {
handler.deleteSingle(table, delete);
} catch (TIOError e) {
exceptionCaught = true;
assertTrue(e.getCause() instanceof DoNotRetryIOException);
assertEquals("Thrift Server is in Read-only mode.", e.getMessage());
} finally {
assertTrue(exceptionCaught);
}
}
@Test
public void testDeleteMultipleWithReadOnly() throws Exception {
ThriftHBaseServiceHandler handler = createHandler();
ByteBuffer table = wrap(tableAname);
byte[] rowName1 = "testDeleteMultiple1".getBytes();
byte[] rowName2 = "testDeleteMultiple2".getBytes();
List<TDelete> deletes = new ArrayList<>(2);
deletes.add(new TDelete(wrap(rowName1)));
deletes.add(new TDelete(wrap(rowName2)));
boolean exceptionCaught = false;
try {
handler.deleteMultiple(table, deletes);
} catch (TIOError e) {
exceptionCaught = true;
assertTrue(e.getCause() instanceof DoNotRetryIOException);
assertEquals("Thrift Server is in Read-only mode.", e.getMessage());
} finally {
assertTrue(exceptionCaught);
}
}
@Test
public void testCheckAndMutateWithReadOnly() throws Exception {
ThriftHBaseServiceHandler handler = createHandler();
ByteBuffer table = wrap(tableAname);
ByteBuffer row = wrap("row".getBytes());
ByteBuffer family = wrap(familyAname);
ByteBuffer qualifier = wrap(qualifierAname);
ByteBuffer value = wrap(valueAname);
List<TColumnValue> columnValuesB = new ArrayList<>(1);
TColumnValue columnValueB = new TColumnValue(family, wrap(qualifierBname), wrap(valueBname));
columnValuesB.add(columnValueB);
TPut putB = new TPut(row, columnValuesB);
putB.setColumnValues(columnValuesB);
TRowMutations tRowMutations = new TRowMutations(row,
Arrays.<TMutation> asList(TMutation.put(putB)));
boolean exceptionCaught = false;
try {
handler.checkAndMutate(table, row, family, qualifier, TCompareOp.EQUAL, value,
tRowMutations);
} catch (TIOError e) {
exceptionCaught = true;
assertTrue(e.getCause() instanceof DoNotRetryIOException);
assertEquals("Thrift Server is in Read-only mode.", e.getMessage());
} finally {
assertTrue(exceptionCaught);
}
}
@Test
public void testCheckAndDeleteWithReadOnly() throws Exception {
ThriftHBaseServiceHandler handler = createHandler();
byte[] rowName = "testCheckAndDelete".getBytes();
ByteBuffer table = wrap(tableAname);
TDelete delete = new TDelete(wrap(rowName));
boolean exceptionCaught = false;
try {
handler.checkAndDelete(table, wrap(rowName), wrap(familyAname),
wrap(qualifierAname), wrap(valueAname), delete);
} catch (TIOError e) {
exceptionCaught = true;
assertTrue(e.getCause() instanceof DoNotRetryIOException);
assertEquals("Thrift Server is in Read-only mode.", e.getMessage());
} finally {
assertTrue(exceptionCaught);
}
}
@Test
public void testIncrementWithReadOnly() throws Exception {
ThriftHBaseServiceHandler handler = createHandler();
byte[] rowName = "testIncrement".getBytes();
ByteBuffer table = wrap(tableAname);
List<TColumnIncrement> incrementColumns = new ArrayList<>(1);
incrementColumns.add(new TColumnIncrement(wrap(familyAname), wrap(qualifierAname)));
TIncrement increment = new TIncrement(wrap(rowName), incrementColumns);
boolean exceptionCaught = false;
try {
handler.increment(table, increment);
} catch (TIOError e) {
exceptionCaught = true;
assertTrue(e.getCause() instanceof DoNotRetryIOException);
assertEquals("Thrift Server is in Read-only mode.", e.getMessage());
} finally {
assertTrue(exceptionCaught);
}
}
@Test
public void testAppendWithReadOnly() throws Exception {
ThriftHBaseServiceHandler handler = createHandler();
byte[] rowName = "testAppend".getBytes();
ByteBuffer table = wrap(tableAname);
byte[] v1 = Bytes.toBytes("42");
List<TColumnValue> appendColumns = new ArrayList<>(1);
appendColumns.add(new TColumnValue(wrap(familyAname), wrap(qualifierAname), wrap(v1)));
TAppend append = new TAppend(wrap(rowName), appendColumns);
boolean exceptionCaught = false;
try {
handler.append(table, append);
} catch (TIOError e) {
exceptionCaught = true;
assertTrue(e.getCause() instanceof DoNotRetryIOException);
assertEquals("Thrift Server is in Read-only mode.", e.getMessage());
} finally {
assertTrue(exceptionCaught);
}
}
@Test
public void testMutateRowWithReadOnly() throws Exception {
ThriftHBaseServiceHandler handler = createHandler();
byte[] rowName = "testMutateRow".getBytes();
ByteBuffer table = wrap(tableAname);
List<TColumnValue> columnValuesA = new ArrayList<>(1);
TColumnValue columnValueA = new TColumnValue(wrap(familyAname), wrap(qualifierAname),
wrap(valueAname));
columnValuesA.add(columnValueA);
TPut putA = new TPut(wrap(rowName), columnValuesA);
putA.setColumnValues(columnValuesA);
TDelete delete = new TDelete(wrap(rowName));
List<TMutation> mutations = new ArrayList<>(2);
TMutation mutationA = TMutation.put(putA);
mutations.add(mutationA);
TMutation mutationB = TMutation.deleteSingle(delete);
mutations.add(mutationB);
TRowMutations tRowMutations = new TRowMutations(wrap(rowName),mutations);
boolean exceptionCaught = false;
try {
handler.mutateRow(table,tRowMutations);
} catch (TIOError e) {
exceptionCaught = true;
assertTrue(e.getCause() instanceof DoNotRetryIOException);
assertEquals("Thrift Server is in Read-only mode.", e.getMessage());
} finally {
assertTrue(exceptionCaught);
}
}
@Test
public void testScanWithReadOnly() throws Exception {
ThriftHBaseServiceHandler handler = createHandler();
ByteBuffer table = wrap(tableAname);
TScan scan = new TScan();
boolean exceptionCaught = false;
try {
int scanId = handler.openScanner(table, scan);
handler.getScannerRows(scanId, 10);
handler.closeScanner(scanId);
} catch (TIOError e) {
exceptionCaught = true;
} finally {
assertFalse(exceptionCaught);
}
}
}