HBASE-14196 Thrift server idle connection timeout issue (Vladimir Rodionov)

This commit is contained in:
Andrew Purtell 2015-08-12 16:32:37 -07:00
parent 5e5bcceb53
commit 643ba90185
2 changed files with 109 additions and 36 deletions

View File

@ -264,8 +264,9 @@ public class IncrementCoalescer implements IncrementCoalescerMBean {
if (counter == null) { if (counter == null) {
continue; continue;
} }
Table table = null;
try { try {
Table table = handler.getTable(row.getTable()); table = handler.getTable(row.getTable());
if (failures > 2) { if (failures > 2) {
throw new IOException("Auto-Fail rest of ICVs"); throw new IOException("Auto-Fail rest of ICVs");
} }
@ -278,8 +279,11 @@ public class IncrementCoalescer implements IncrementCoalescerMBean {
+ Bytes.toStringBinary(row.getRowKey()) + ", " + Bytes.toStringBinary(row.getRowKey()) + ", "
+ Bytes.toStringBinary(row.getFamily()) + ", " + Bytes.toStringBinary(row.getFamily()) + ", "
+ Bytes.toStringBinary(row.getQualifier()) + ", " + counter, e); + Bytes.toStringBinary(row.getQualifier()) + ", " + counter, e);
} finally{
if(table != null){
table.close();
}
} }
} }
return failures; return failures;
} }

View File

@ -638,15 +638,6 @@ public class ThriftServerRunner implements Runnable {
private ThriftMetrics metrics = null; private ThriftMetrics metrics = null;
private final ConnectionCache connectionCache; private final ConnectionCache connectionCache;
private static ThreadLocal<Map<String, Table>> threadLocalTables =
new ThreadLocal<Map<String, Table>>() {
@Override
protected Map<String, Table> initialValue() {
return new TreeMap<String, Table>();
}
};
IncrementCoalescer coalescer = null; IncrementCoalescer coalescer = null;
static final String CLEANUP_INTERVAL = "hbase.thrift.connection.cleanup-interval"; static final String CLEANUP_INTERVAL = "hbase.thrift.connection.cleanup-interval";
@ -679,11 +670,7 @@ public class ThriftServerRunner implements Runnable {
public Table getTable(final byte[] tableName) throws public Table getTable(final byte[] tableName) throws
IOException { IOException {
String table = Bytes.toString(tableName); String table = Bytes.toString(tableName);
Map<String, Table> tables = threadLocalTables.get(); return connectionCache.getTable(table);
if (!tables.containsKey(table)) {
tables.put(table, (Table)connectionCache.getTable(table));
}
return tables.get(table);
} }
public Table getTable(final ByteBuffer tableName) throws IOException { public Table getTable(final ByteBuffer tableName) throws IOException {
@ -879,8 +866,9 @@ public class ThriftServerRunner implements Runnable {
byte[] family, byte[] family,
byte[] qualifier, byte[] qualifier,
Map<ByteBuffer, ByteBuffer> attributes) throws IOError { Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
Table table = null;
try { try {
Table table = getTable(tableName); table = getTable(tableName);
Get get = new Get(getBytes(row)); Get get = new Get(getBytes(row));
addAttributes(get, attributes); addAttributes(get, attributes);
if (qualifier == null) { if (qualifier == null) {
@ -893,6 +881,8 @@ public class ThriftServerRunner implements Runnable {
} catch (IOException e) { } catch (IOException e) {
LOG.warn(e.getMessage(), e); LOG.warn(e.getMessage(), e);
throw new IOError(Throwables.getStackTraceAsString(e)); throw new IOError(Throwables.getStackTraceAsString(e));
} finally {
closeTable(table);
} }
} }
@ -920,8 +910,10 @@ public class ThriftServerRunner implements Runnable {
*/ */
public List<TCell> getVer(ByteBuffer tableName, ByteBuffer row, byte[] family, public List<TCell> getVer(ByteBuffer tableName, ByteBuffer row, byte[] family,
byte[] qualifier, int numVersions, Map<ByteBuffer, ByteBuffer> attributes) throws IOError { byte[] qualifier, int numVersions, Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
Table table = null;
try { try {
Table table = getTable(tableName); table = getTable(tableName);
Get get = new Get(getBytes(row)); Get get = new Get(getBytes(row));
addAttributes(get, attributes); addAttributes(get, attributes);
if (null == qualifier) { if (null == qualifier) {
@ -935,6 +927,8 @@ public class ThriftServerRunner implements Runnable {
} catch (IOException e) { } catch (IOException e) {
LOG.warn(e.getMessage(), e); LOG.warn(e.getMessage(), e);
throw new IOError(Throwables.getStackTraceAsString(e)); throw new IOError(Throwables.getStackTraceAsString(e));
} finally{
closeTable(table);
} }
} }
@ -962,8 +956,10 @@ public class ThriftServerRunner implements Runnable {
protected List<TCell> getVerTs(ByteBuffer tableName, ByteBuffer row, byte[] family, protected List<TCell> getVerTs(ByteBuffer tableName, ByteBuffer row, byte[] family,
byte[] qualifier, long timestamp, int numVersions, Map<ByteBuffer, ByteBuffer> attributes) byte[] qualifier, long timestamp, int numVersions, Map<ByteBuffer, ByteBuffer> attributes)
throws IOError { throws IOError {
Table table = null;
try { try {
Table table = getTable(tableName); table = getTable(tableName);
Get get = new Get(getBytes(row)); Get get = new Get(getBytes(row));
addAttributes(get, attributes); addAttributes(get, attributes);
if (null == qualifier) { if (null == qualifier) {
@ -978,6 +974,8 @@ public class ThriftServerRunner implements Runnable {
} catch (IOException e) { } catch (IOException e) {
LOG.warn(e.getMessage(), e); LOG.warn(e.getMessage(), e);
throw new IOError(Throwables.getStackTraceAsString(e)); throw new IOError(Throwables.getStackTraceAsString(e));
} finally{
closeTable(table);
} }
} }
@ -1010,8 +1008,10 @@ public class ThriftServerRunner implements Runnable {
public List<TRowResult> getRowWithColumnsTs( public List<TRowResult> getRowWithColumnsTs(
ByteBuffer tableName, ByteBuffer row, List<ByteBuffer> columns, ByteBuffer tableName, ByteBuffer row, List<ByteBuffer> columns,
long timestamp, Map<ByteBuffer, ByteBuffer> attributes) throws IOError { long timestamp, Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
Table table = null;
try { try {
Table table = getTable(tableName); table = getTable(tableName);
if (columns == null) { if (columns == null) {
Get get = new Get(getBytes(row)); Get get = new Get(getBytes(row));
addAttributes(get, attributes); addAttributes(get, attributes);
@ -1035,6 +1035,8 @@ public class ThriftServerRunner implements Runnable {
} catch (IOException e) { } catch (IOException e) {
LOG.warn(e.getMessage(), e); LOG.warn(e.getMessage(), e);
throw new IOError(Throwables.getStackTraceAsString(e)); throw new IOError(Throwables.getStackTraceAsString(e));
} finally{
closeTable(table);
} }
} }
@ -1072,9 +1074,11 @@ public class ThriftServerRunner implements Runnable {
List<ByteBuffer> rows, List<ByteBuffer> rows,
List<ByteBuffer> columns, long timestamp, List<ByteBuffer> columns, long timestamp,
Map<ByteBuffer, ByteBuffer> attributes) throws IOError { Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
Table table= null;
try { try {
List<Get> gets = new ArrayList<Get>(rows.size()); List<Get> gets = new ArrayList<Get>(rows.size());
Table table = getTable(tableName); table = getTable(tableName);
if (metrics != null) { if (metrics != null) {
metrics.incNumRowKeysInBatchGet(rows.size()); metrics.incNumRowKeysInBatchGet(rows.size());
} }
@ -1100,6 +1104,8 @@ public class ThriftServerRunner implements Runnable {
} catch (IOException e) { } catch (IOException e) {
LOG.warn(e.getMessage(), e); LOG.warn(e.getMessage(), e);
throw new IOError(Throwables.getStackTraceAsString(e)); throw new IOError(Throwables.getStackTraceAsString(e));
} finally{
closeTable(table);
} }
} }
@ -1117,8 +1123,9 @@ public class ThriftServerRunner implements Runnable {
ByteBuffer row, ByteBuffer row,
ByteBuffer column, ByteBuffer column,
long timestamp, Map<ByteBuffer, ByteBuffer> attributes) throws IOError { long timestamp, Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
Table table = null;
try { try {
Table table = getTable(tableName); table = getTable(tableName);
Delete delete = new Delete(getBytes(row)); Delete delete = new Delete(getBytes(row));
addAttributes(delete, attributes); addAttributes(delete, attributes);
byte [][] famAndQf = KeyValue.parseColumn(getBytes(column)); byte [][] famAndQf = KeyValue.parseColumn(getBytes(column));
@ -1132,6 +1139,8 @@ public class ThriftServerRunner implements Runnable {
} catch (IOException e) { } catch (IOException e) {
LOG.warn(e.getMessage(), e); LOG.warn(e.getMessage(), e);
throw new IOError(Throwables.getStackTraceAsString(e)); throw new IOError(Throwables.getStackTraceAsString(e));
} finally {
closeTable(table);
} }
} }
@ -1146,14 +1155,17 @@ public class ThriftServerRunner implements Runnable {
public void deleteAllRowTs( public void deleteAllRowTs(
ByteBuffer tableName, ByteBuffer row, long timestamp, ByteBuffer tableName, ByteBuffer row, long timestamp,
Map<ByteBuffer, ByteBuffer> attributes) throws IOError { Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
Table table = null;
try { try {
Table table = getTable(tableName); table = getTable(tableName);
Delete delete = new Delete(getBytes(row), timestamp); Delete delete = new Delete(getBytes(row), timestamp);
addAttributes(delete, attributes); addAttributes(delete, attributes);
table.delete(delete); table.delete(delete);
} catch (IOException e) { } catch (IOException e) {
LOG.warn(e.getMessage(), e); LOG.warn(e.getMessage(), e);
throw new IOError(Throwables.getStackTraceAsString(e)); throw new IOError(Throwables.getStackTraceAsString(e));
} finally {
closeTable(table);
} }
} }
@ -1260,6 +1272,8 @@ public class ThriftServerRunner implements Runnable {
} catch (IllegalArgumentException e) { } catch (IllegalArgumentException e) {
LOG.warn(e.getMessage(), e); LOG.warn(e.getMessage(), e);
throw new IllegalArgument(Throwables.getStackTraceAsString(e)); throw new IllegalArgument(Throwables.getStackTraceAsString(e));
} finally{
closeTable(table);
} }
} }
@ -1331,6 +1345,8 @@ public class ThriftServerRunner implements Runnable {
} catch (IllegalArgumentException e) { } catch (IllegalArgumentException e) {
LOG.warn(e.getMessage(), e); LOG.warn(e.getMessage(), e);
throw new IllegalArgument(Throwables.getStackTraceAsString(e)); throw new IllegalArgument(Throwables.getStackTraceAsString(e));
} finally{
closeTable(table);
} }
} }
@ -1348,7 +1364,7 @@ public class ThriftServerRunner implements Runnable {
protected long atomicIncrement(ByteBuffer tableName, ByteBuffer row, protected long atomicIncrement(ByteBuffer tableName, ByteBuffer row,
byte [] family, byte [] qualifier, long amount) byte [] family, byte [] qualifier, long amount)
throws IOError, IllegalArgument, TException { throws IOError, IllegalArgument, TException {
Table table; Table table = null;
try { try {
table = getTable(tableName); table = getTable(tableName);
return table.incrementColumnValue( return table.incrementColumnValue(
@ -1356,6 +1372,8 @@ public class ThriftServerRunner implements Runnable {
} catch (IOException e) { } catch (IOException e) {
LOG.warn(e.getMessage(), e); LOG.warn(e.getMessage(), e);
throw new IOError(Throwables.getStackTraceAsString(e)); throw new IOError(Throwables.getStackTraceAsString(e));
} finally {
closeTable(table);
} }
} }
@ -1405,8 +1423,10 @@ public class ThriftServerRunner implements Runnable {
public int scannerOpenWithScan(ByteBuffer tableName, TScan tScan, public int scannerOpenWithScan(ByteBuffer tableName, TScan tScan,
Map<ByteBuffer, ByteBuffer> attributes) Map<ByteBuffer, ByteBuffer> attributes)
throws IOError { throws IOError {
Table table = null;
try { try {
Table table = getTable(tableName); table = getTable(tableName);
Scan scan = new Scan(); Scan scan = new Scan();
addAttributes(scan, attributes); addAttributes(scan, attributes);
if (tScan.isSetStartRow()) { if (tScan.isSetStartRow()) {
@ -1446,6 +1466,8 @@ public class ThriftServerRunner implements Runnable {
} catch (IOException e) { } catch (IOException e) {
LOG.warn(e.getMessage(), e); LOG.warn(e.getMessage(), e);
throw new IOError(Throwables.getStackTraceAsString(e)); throw new IOError(Throwables.getStackTraceAsString(e));
} finally{
closeTable(table);
} }
} }
@ -1453,8 +1475,10 @@ public class ThriftServerRunner implements Runnable {
public int scannerOpen(ByteBuffer tableName, ByteBuffer startRow, public int scannerOpen(ByteBuffer tableName, ByteBuffer startRow,
List<ByteBuffer> columns, List<ByteBuffer> columns,
Map<ByteBuffer, ByteBuffer> attributes) throws IOError { Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
Table table = null;
try { try {
Table table = getTable(tableName); table = getTable(tableName);
Scan scan = new Scan(getBytes(startRow)); Scan scan = new Scan(getBytes(startRow));
addAttributes(scan, attributes); addAttributes(scan, attributes);
if(columns != null && columns.size() != 0) { if(columns != null && columns.size() != 0) {
@ -1471,6 +1495,8 @@ public class ThriftServerRunner implements Runnable {
} catch (IOException e) { } catch (IOException e) {
LOG.warn(e.getMessage(), e); LOG.warn(e.getMessage(), e);
throw new IOError(Throwables.getStackTraceAsString(e)); throw new IOError(Throwables.getStackTraceAsString(e));
} finally{
closeTable(table);
} }
} }
@ -1479,8 +1505,10 @@ public class ThriftServerRunner implements Runnable {
ByteBuffer stopRow, List<ByteBuffer> columns, ByteBuffer stopRow, List<ByteBuffer> columns,
Map<ByteBuffer, ByteBuffer> attributes) Map<ByteBuffer, ByteBuffer> attributes)
throws IOError, TException { throws IOError, TException {
Table table = null;
try { try {
Table table = getTable(tableName); table = getTable(tableName);
Scan scan = new Scan(getBytes(startRow), getBytes(stopRow)); Scan scan = new Scan(getBytes(startRow), getBytes(stopRow));
addAttributes(scan, attributes); addAttributes(scan, attributes);
if(columns != null && columns.size() != 0) { if(columns != null && columns.size() != 0) {
@ -1497,6 +1525,8 @@ public class ThriftServerRunner implements Runnable {
} catch (IOException e) { } catch (IOException e) {
LOG.warn(e.getMessage(), e); LOG.warn(e.getMessage(), e);
throw new IOError(Throwables.getStackTraceAsString(e)); throw new IOError(Throwables.getStackTraceAsString(e));
} finally{
closeTable(table);
} }
} }
@ -1506,8 +1536,10 @@ public class ThriftServerRunner implements Runnable {
List<ByteBuffer> columns, List<ByteBuffer> columns,
Map<ByteBuffer, ByteBuffer> attributes) Map<ByteBuffer, ByteBuffer> attributes)
throws IOError, TException { throws IOError, TException {
Table table = null;
try { try {
Table table = getTable(tableName); table = getTable(tableName);
Scan scan = new Scan(getBytes(startAndPrefix)); Scan scan = new Scan(getBytes(startAndPrefix));
addAttributes(scan, attributes); addAttributes(scan, attributes);
Filter f = new WhileMatchFilter( Filter f = new WhileMatchFilter(
@ -1527,6 +1559,8 @@ public class ThriftServerRunner implements Runnable {
} catch (IOException e) { } catch (IOException e) {
LOG.warn(e.getMessage(), e); LOG.warn(e.getMessage(), e);
throw new IOError(Throwables.getStackTraceAsString(e)); throw new IOError(Throwables.getStackTraceAsString(e));
} finally{
closeTable(table);
} }
} }
@ -1534,8 +1568,10 @@ public class ThriftServerRunner implements Runnable {
public int scannerOpenTs(ByteBuffer tableName, ByteBuffer startRow, public int scannerOpenTs(ByteBuffer tableName, ByteBuffer startRow,
List<ByteBuffer> columns, long timestamp, List<ByteBuffer> columns, long timestamp,
Map<ByteBuffer, ByteBuffer> attributes) throws IOError, TException { Map<ByteBuffer, ByteBuffer> attributes) throws IOError, TException {
Table table = null;
try { try {
Table table = getTable(tableName); table = getTable(tableName);
Scan scan = new Scan(getBytes(startRow)); Scan scan = new Scan(getBytes(startRow));
addAttributes(scan, attributes); addAttributes(scan, attributes);
scan.setTimeRange(0, timestamp); scan.setTimeRange(0, timestamp);
@ -1553,6 +1589,8 @@ public class ThriftServerRunner implements Runnable {
} catch (IOException e) { } catch (IOException e) {
LOG.warn(e.getMessage(), e); LOG.warn(e.getMessage(), e);
throw new IOError(Throwables.getStackTraceAsString(e)); throw new IOError(Throwables.getStackTraceAsString(e));
} finally{
closeTable(table);
} }
} }
@ -1561,8 +1599,10 @@ public class ThriftServerRunner implements Runnable {
ByteBuffer stopRow, List<ByteBuffer> columns, long timestamp, ByteBuffer stopRow, List<ByteBuffer> columns, long timestamp,
Map<ByteBuffer, ByteBuffer> attributes) Map<ByteBuffer, ByteBuffer> attributes)
throws IOError, TException { throws IOError, TException {
Table table = null;
try { try {
Table table = getTable(tableName); table = getTable(tableName);
Scan scan = new Scan(getBytes(startRow), getBytes(stopRow)); Scan scan = new Scan(getBytes(startRow), getBytes(stopRow));
addAttributes(scan, attributes); addAttributes(scan, attributes);
scan.setTimeRange(0, timestamp); scan.setTimeRange(0, timestamp);
@ -1581,17 +1621,21 @@ public class ThriftServerRunner implements Runnable {
} catch (IOException e) { } catch (IOException e) {
LOG.warn(e.getMessage(), e); LOG.warn(e.getMessage(), e);
throw new IOError(Throwables.getStackTraceAsString(e)); throw new IOError(Throwables.getStackTraceAsString(e));
} finally{
closeTable(table);
} }
} }
@Override @Override
public Map<ByteBuffer, ColumnDescriptor> getColumnDescriptors( public Map<ByteBuffer, ColumnDescriptor> getColumnDescriptors(
ByteBuffer tableName) throws IOError, TException { ByteBuffer tableName) throws IOError, TException {
Table table = null;
try { try {
TreeMap<ByteBuffer, ColumnDescriptor> columns = TreeMap<ByteBuffer, ColumnDescriptor> columns =
new TreeMap<ByteBuffer, ColumnDescriptor>(); new TreeMap<ByteBuffer, ColumnDescriptor>();
Table table = getTable(tableName); table = getTable(tableName);
HTableDescriptor desc = table.getTableDescriptor(); HTableDescriptor desc = table.getTableDescriptor();
for (HColumnDescriptor e : desc.getFamilies()) { for (HColumnDescriptor e : desc.getFamilies()) {
@ -1602,6 +1646,20 @@ public class ThriftServerRunner implements Runnable {
} catch (IOException e) { } catch (IOException e) {
LOG.warn(e.getMessage(), e); LOG.warn(e.getMessage(), e);
throw new IOError(Throwables.getStackTraceAsString(e)); throw new IOError(Throwables.getStackTraceAsString(e));
} finally {
closeTable(table);
}
}
private void closeTable(Table table) throws IOError
{
try{
if(table != null){
table.close();
}
} catch (IOException e){
LOG.error(e.getMessage(), e);
throw new IOError(Throwables.getStackTraceAsString(e));
} }
} }
@ -1650,10 +1708,13 @@ public class ThriftServerRunner implements Runnable {
scan.setReversed(true); scan.setReversed(true);
scan.addFamily(family); scan.addFamily(family);
scan.setStartRow(row); scan.setStartRow(row);
Table table = getTable(tableName); Table table = getTable(tableName);
try (ResultScanner scanner = table.getScanner(scan)) { try (ResultScanner scanner = table.getScanner(scan)) {
return scanner.next(); return scanner.next();
} finally{
if(table != null){
table.close();
}
} }
} }
@ -1673,13 +1734,16 @@ public class ThriftServerRunner implements Runnable {
return; return;
} }
Table table = null;
try { try {
Table table = getTable(tincrement.getTable()); table = getTable(tincrement.getTable());
Increment inc = ThriftUtilities.incrementFromThrift(tincrement); Increment inc = ThriftUtilities.incrementFromThrift(tincrement);
table.increment(inc); table.increment(inc);
} catch (IOException e) { } catch (IOException e) {
LOG.warn(e.getMessage(), e); LOG.warn(e.getMessage(), e);
throw new IOError(Throwables.getStackTraceAsString(e)); throw new IOError(Throwables.getStackTraceAsString(e));
} finally{
closeTable(table);
} }
} }
@ -1700,14 +1764,17 @@ public class ThriftServerRunner implements Runnable {
throw new TException("Must supply a table and a row key; can't append"); throw new TException("Must supply a table and a row key; can't append");
} }
Table table = null;
try { try {
Table table = getTable(tappend.getTable()); table = getTable(tappend.getTable());
Append append = ThriftUtilities.appendFromThrift(tappend); Append append = ThriftUtilities.appendFromThrift(tappend);
Result result = table.append(append); Result result = table.append(append);
return ThriftUtilities.cellFromHBase(result.rawCells()); return ThriftUtilities.cellFromHBase(result.rawCells());
} catch (IOException e) { } catch (IOException e) {
LOG.warn(e.getMessage(), e); LOG.warn(e.getMessage(), e);
throw new IOError(Throwables.getStackTraceAsString(e)); throw new IOError(Throwables.getStackTraceAsString(e));
} finally{
closeTable(table);
} }
} }
@ -1743,6 +1810,8 @@ public class ThriftServerRunner implements Runnable {
} catch (IllegalArgumentException e) { } catch (IllegalArgumentException e) {
LOG.warn(e.getMessage(), e); LOG.warn(e.getMessage(), e);
throw new IllegalArgument(Throwables.getStackTraceAsString(e)); throw new IllegalArgument(Throwables.getStackTraceAsString(e));
} finally {
closeTable(table);
} }
} }
} }