From 643ba90185f20419016080d6d32adba9fe7019dd Mon Sep 17 00:00:00 2001 From: Andrew Purtell Date: Wed, 12 Aug 2015 16:32:37 -0700 Subject: [PATCH] HBASE-14196 Thrift server idle connection timeout issue (Vladimir Rodionov) --- .../hbase/thrift/IncrementCoalescer.java | 8 +- .../hbase/thrift/ThriftServerRunner.java | 137 +++++++++++++----- 2 files changed, 109 insertions(+), 36 deletions(-) diff --git a/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/IncrementCoalescer.java b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/IncrementCoalescer.java index 13a2e5034ab..e937f2db740 100644 --- a/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/IncrementCoalescer.java +++ b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/IncrementCoalescer.java @@ -264,8 +264,9 @@ public class IncrementCoalescer implements IncrementCoalescerMBean { if (counter == null) { continue; } + Table table = null; try { - Table table = handler.getTable(row.getTable()); + table = handler.getTable(row.getTable()); if (failures > 2) { throw new IOException("Auto-Fail rest of ICVs"); } @@ -278,8 +279,11 @@ public class IncrementCoalescer implements IncrementCoalescerMBean { + Bytes.toStringBinary(row.getRowKey()) + ", " + Bytes.toStringBinary(row.getFamily()) + ", " + Bytes.toStringBinary(row.getQualifier()) + ", " + counter, e); + } finally{ + if(table != null){ + table.close(); + } } - } return failures; } diff --git a/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/ThriftServerRunner.java b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/ThriftServerRunner.java index a5239ed5ba4..668aeb6ac23 100644 --- a/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/ThriftServerRunner.java +++ b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/ThriftServerRunner.java @@ -638,15 +638,6 @@ public class ThriftServerRunner implements Runnable { private ThriftMetrics metrics = null; private final ConnectionCache connectionCache; - - private static ThreadLocal> threadLocalTables = - new ThreadLocal>() { - @Override - protected Map initialValue() { - return new TreeMap(); - } - }; - IncrementCoalescer coalescer = null; static final String CLEANUP_INTERVAL = "hbase.thrift.connection.cleanup-interval"; @@ -679,11 +670,7 @@ public class ThriftServerRunner implements Runnable { public Table getTable(final byte[] tableName) throws IOException { String table = Bytes.toString(tableName); - Map tables = threadLocalTables.get(); - if (!tables.containsKey(table)) { - tables.put(table, (Table)connectionCache.getTable(table)); - } - return tables.get(table); + return connectionCache.getTable(table); } public Table getTable(final ByteBuffer tableName) throws IOException { @@ -879,8 +866,9 @@ public class ThriftServerRunner implements Runnable { byte[] family, byte[] qualifier, Map attributes) throws IOError { + Table table = null; try { - Table table = getTable(tableName); + table = getTable(tableName); Get get = new Get(getBytes(row)); addAttributes(get, attributes); if (qualifier == null) { @@ -893,6 +881,8 @@ public class ThriftServerRunner implements Runnable { } catch (IOException e) { LOG.warn(e.getMessage(), e); throw new IOError(Throwables.getStackTraceAsString(e)); + } finally { + closeTable(table); } } @@ -920,8 +910,10 @@ public class ThriftServerRunner implements Runnable { */ public List getVer(ByteBuffer tableName, ByteBuffer row, byte[] family, byte[] qualifier, int numVersions, Map attributes) throws IOError { + + Table table = null; try { - Table table = getTable(tableName); + table = getTable(tableName); Get get = new Get(getBytes(row)); addAttributes(get, attributes); if (null == qualifier) { @@ -935,6 +927,8 @@ public class ThriftServerRunner implements Runnable { } catch (IOException e) { LOG.warn(e.getMessage(), e); throw new IOError(Throwables.getStackTraceAsString(e)); + } finally{ + closeTable(table); } } @@ -962,8 +956,10 @@ public class ThriftServerRunner implements Runnable { protected List getVerTs(ByteBuffer tableName, ByteBuffer row, byte[] family, byte[] qualifier, long timestamp, int numVersions, Map attributes) throws IOError { + + Table table = null; try { - Table table = getTable(tableName); + table = getTable(tableName); Get get = new Get(getBytes(row)); addAttributes(get, attributes); if (null == qualifier) { @@ -978,6 +974,8 @@ public class ThriftServerRunner implements Runnable { } catch (IOException e) { LOG.warn(e.getMessage(), e); throw new IOError(Throwables.getStackTraceAsString(e)); + } finally{ + closeTable(table); } } @@ -1010,8 +1008,10 @@ public class ThriftServerRunner implements Runnable { public List getRowWithColumnsTs( ByteBuffer tableName, ByteBuffer row, List columns, long timestamp, Map attributes) throws IOError { + + Table table = null; try { - Table table = getTable(tableName); + table = getTable(tableName); if (columns == null) { Get get = new Get(getBytes(row)); addAttributes(get, attributes); @@ -1035,6 +1035,8 @@ public class ThriftServerRunner implements Runnable { } catch (IOException e) { LOG.warn(e.getMessage(), e); throw new IOError(Throwables.getStackTraceAsString(e)); + } finally{ + closeTable(table); } } @@ -1072,9 +1074,11 @@ public class ThriftServerRunner implements Runnable { List rows, List columns, long timestamp, Map attributes) throws IOError { + + Table table= null; try { List gets = new ArrayList(rows.size()); - Table table = getTable(tableName); + table = getTable(tableName); if (metrics != null) { metrics.incNumRowKeysInBatchGet(rows.size()); } @@ -1100,6 +1104,8 @@ public class ThriftServerRunner implements Runnable { } catch (IOException e) { LOG.warn(e.getMessage(), e); throw new IOError(Throwables.getStackTraceAsString(e)); + } finally{ + closeTable(table); } } @@ -1117,8 +1123,9 @@ public class ThriftServerRunner implements Runnable { ByteBuffer row, ByteBuffer column, long timestamp, Map attributes) throws IOError { + Table table = null; try { - Table table = getTable(tableName); + table = getTable(tableName); Delete delete = new Delete(getBytes(row)); addAttributes(delete, attributes); byte [][] famAndQf = KeyValue.parseColumn(getBytes(column)); @@ -1132,6 +1139,8 @@ public class ThriftServerRunner implements Runnable { } catch (IOException e) { LOG.warn(e.getMessage(), e); throw new IOError(Throwables.getStackTraceAsString(e)); + } finally { + closeTable(table); } } @@ -1146,14 +1155,17 @@ public class ThriftServerRunner implements Runnable { public void deleteAllRowTs( ByteBuffer tableName, ByteBuffer row, long timestamp, Map attributes) throws IOError { + Table table = null; try { - Table table = getTable(tableName); + table = getTable(tableName); Delete delete = new Delete(getBytes(row), timestamp); addAttributes(delete, attributes); table.delete(delete); } catch (IOException e) { LOG.warn(e.getMessage(), e); throw new IOError(Throwables.getStackTraceAsString(e)); + } finally { + closeTable(table); } } @@ -1260,6 +1272,8 @@ public class ThriftServerRunner implements Runnable { } catch (IllegalArgumentException e) { LOG.warn(e.getMessage(), e); throw new IllegalArgument(Throwables.getStackTraceAsString(e)); + } finally{ + closeTable(table); } } @@ -1331,6 +1345,8 @@ public class ThriftServerRunner implements Runnable { } catch (IllegalArgumentException e) { LOG.warn(e.getMessage(), e); throw new IllegalArgument(Throwables.getStackTraceAsString(e)); + } finally{ + closeTable(table); } } @@ -1348,7 +1364,7 @@ public class ThriftServerRunner implements Runnable { protected long atomicIncrement(ByteBuffer tableName, ByteBuffer row, byte [] family, byte [] qualifier, long amount) throws IOError, IllegalArgument, TException { - Table table; + Table table = null; try { table = getTable(tableName); return table.incrementColumnValue( @@ -1356,6 +1372,8 @@ public class ThriftServerRunner implements Runnable { } catch (IOException e) { LOG.warn(e.getMessage(), e); throw new IOError(Throwables.getStackTraceAsString(e)); + } finally { + closeTable(table); } } @@ -1405,8 +1423,10 @@ public class ThriftServerRunner implements Runnable { public int scannerOpenWithScan(ByteBuffer tableName, TScan tScan, Map attributes) throws IOError { + + Table table = null; try { - Table table = getTable(tableName); + table = getTable(tableName); Scan scan = new Scan(); addAttributes(scan, attributes); if (tScan.isSetStartRow()) { @@ -1446,6 +1466,8 @@ public class ThriftServerRunner implements Runnable { } catch (IOException e) { LOG.warn(e.getMessage(), e); throw new IOError(Throwables.getStackTraceAsString(e)); + } finally{ + closeTable(table); } } @@ -1453,8 +1475,10 @@ public class ThriftServerRunner implements Runnable { public int scannerOpen(ByteBuffer tableName, ByteBuffer startRow, List columns, Map attributes) throws IOError { + + Table table = null; try { - Table table = getTable(tableName); + table = getTable(tableName); Scan scan = new Scan(getBytes(startRow)); addAttributes(scan, attributes); if(columns != null && columns.size() != 0) { @@ -1471,6 +1495,8 @@ public class ThriftServerRunner implements Runnable { } catch (IOException e) { LOG.warn(e.getMessage(), e); throw new IOError(Throwables.getStackTraceAsString(e)); + } finally{ + closeTable(table); } } @@ -1479,8 +1505,10 @@ public class ThriftServerRunner implements Runnable { ByteBuffer stopRow, List columns, Map attributes) throws IOError, TException { + + Table table = null; try { - Table table = getTable(tableName); + table = getTable(tableName); Scan scan = new Scan(getBytes(startRow), getBytes(stopRow)); addAttributes(scan, attributes); if(columns != null && columns.size() != 0) { @@ -1497,6 +1525,8 @@ public class ThriftServerRunner implements Runnable { } catch (IOException e) { LOG.warn(e.getMessage(), e); throw new IOError(Throwables.getStackTraceAsString(e)); + } finally{ + closeTable(table); } } @@ -1506,8 +1536,10 @@ public class ThriftServerRunner implements Runnable { List columns, Map attributes) throws IOError, TException { + + Table table = null; try { - Table table = getTable(tableName); + table = getTable(tableName); Scan scan = new Scan(getBytes(startAndPrefix)); addAttributes(scan, attributes); Filter f = new WhileMatchFilter( @@ -1527,6 +1559,8 @@ public class ThriftServerRunner implements Runnable { } catch (IOException e) { LOG.warn(e.getMessage(), e); throw new IOError(Throwables.getStackTraceAsString(e)); + } finally{ + closeTable(table); } } @@ -1534,8 +1568,10 @@ public class ThriftServerRunner implements Runnable { public int scannerOpenTs(ByteBuffer tableName, ByteBuffer startRow, List columns, long timestamp, Map attributes) throws IOError, TException { + + Table table = null; try { - Table table = getTable(tableName); + table = getTable(tableName); Scan scan = new Scan(getBytes(startRow)); addAttributes(scan, attributes); scan.setTimeRange(0, timestamp); @@ -1553,6 +1589,8 @@ public class ThriftServerRunner implements Runnable { } catch (IOException e) { LOG.warn(e.getMessage(), e); throw new IOError(Throwables.getStackTraceAsString(e)); + } finally{ + closeTable(table); } } @@ -1561,8 +1599,10 @@ public class ThriftServerRunner implements Runnable { ByteBuffer stopRow, List columns, long timestamp, Map attributes) throws IOError, TException { + + Table table = null; try { - Table table = getTable(tableName); + table = getTable(tableName); Scan scan = new Scan(getBytes(startRow), getBytes(stopRow)); addAttributes(scan, attributes); scan.setTimeRange(0, timestamp); @@ -1581,17 +1621,21 @@ public class ThriftServerRunner implements Runnable { } catch (IOException e) { LOG.warn(e.getMessage(), e); throw new IOError(Throwables.getStackTraceAsString(e)); + } finally{ + closeTable(table); } } @Override public Map getColumnDescriptors( ByteBuffer tableName) throws IOError, TException { + + Table table = null; try { TreeMap columns = new TreeMap(); - Table table = getTable(tableName); + table = getTable(tableName); HTableDescriptor desc = table.getTableDescriptor(); for (HColumnDescriptor e : desc.getFamilies()) { @@ -1602,9 +1646,23 @@ public class ThriftServerRunner implements Runnable { } catch (IOException e) { LOG.warn(e.getMessage(), e); throw new IOError(Throwables.getStackTraceAsString(e)); + } finally { + closeTable(table); } } - + + private void closeTable(Table table) throws IOError + { + try{ + if(table != null){ + table.close(); + } + } catch (IOException e){ + LOG.error(e.getMessage(), e); + throw new IOError(Throwables.getStackTraceAsString(e)); + } + } + @Override public TRegionInfo getRegionInfo(ByteBuffer searchRow) throws IOError { try { @@ -1650,10 +1708,13 @@ public class ThriftServerRunner implements Runnable { scan.setReversed(true); scan.addFamily(family); scan.setStartRow(row); - - Table table = getTable(tableName); + Table table = getTable(tableName); try (ResultScanner scanner = table.getScanner(scan)) { return scanner.next(); + } finally{ + if(table != null){ + table.close(); + } } } @@ -1673,13 +1734,16 @@ public class ThriftServerRunner implements Runnable { return; } + Table table = null; try { - Table table = getTable(tincrement.getTable()); + table = getTable(tincrement.getTable()); Increment inc = ThriftUtilities.incrementFromThrift(tincrement); table.increment(inc); } catch (IOException e) { LOG.warn(e.getMessage(), e); throw new IOError(Throwables.getStackTraceAsString(e)); + } finally{ + closeTable(table); } } @@ -1700,14 +1764,17 @@ public class ThriftServerRunner implements Runnable { throw new TException("Must supply a table and a row key; can't append"); } + Table table = null; try { - Table table = getTable(tappend.getTable()); + table = getTable(tappend.getTable()); Append append = ThriftUtilities.appendFromThrift(tappend); Result result = table.append(append); return ThriftUtilities.cellFromHBase(result.rawCells()); } catch (IOException e) { LOG.warn(e.getMessage(), e); throw new IOError(Throwables.getStackTraceAsString(e)); + } finally{ + closeTable(table); } } @@ -1743,6 +1810,8 @@ public class ThriftServerRunner implements Runnable { } catch (IllegalArgumentException e) { LOG.warn(e.getMessage(), e); throw new IllegalArgument(Throwables.getStackTraceAsString(e)); + } finally { + closeTable(table); } } }