diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormat.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormat.java index 50da9bccd86..8896eb08389 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormat.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormat.java @@ -49,6 +49,7 @@ import org.apache.hadoop.util.StringUtils; public class TableInputFormat extends TableInputFormatBase implements Configurable { + @SuppressWarnings("hiding") private static final Log LOG = LogFactory.getLog(TableInputFormat.class); /** Job parameter that specifies the input table. */ @@ -112,13 +113,6 @@ implements Configurable { @Override public void setConf(Configuration configuration) { this.conf = configuration; - TableName tableName = TableName.valueOf(conf.get(INPUT_TABLE)); - try { - // NOTE: This connection doesn't currently get closed explicit1ly. - initializeTable(ConnectionFactory.createConnection(new Configuration(conf)), tableName); - } catch (Exception e) { - LOG.error(StringUtils.stringifyException(e)); - } Scan scan = null; @@ -180,6 +174,16 @@ implements Configurable { setScan(scan); } + @Override + protected void initialize() { + TableName tableName = TableName.valueOf(conf.get(INPUT_TABLE)); + try { + initializeTable(ConnectionFactory.createConnection(new Configuration(conf)), tableName); + } catch (Exception e) { + LOG.error(StringUtils.stringifyException(e)); + } + } + /** * Parses a combined family and qualifier and adds either both or just the * family in case there is no qualifier. This assumes the older colon diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java index d6e814d4476..6ab7ba8d2cc 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java @@ -67,12 +67,10 @@ import org.apache.hadoop.util.StringUtils; *
  *   class ExampleTIF extends TableInputFormatBase implements JobConfigurable {
  *
+ *     private JobConf job;
+ *
  *     public void configure(JobConf job) {
- *       Connection connection =
- *          ConnectionFactory.createConnection(HBaseConfiguration.create(job));
- *       TableName tableName = TableName.valueOf("exampleTable");
- *       // mandatory
- *       initializeTable(connection, tableName);
+ *       this.job = job;
  *       Text[] inputColumns = new byte [][] { Bytes.toBytes("cf1:columnA"),
  *         Bytes.toBytes("cf2") };
  *       // mandatory
@@ -81,6 +79,14 @@ import org.apache.hadoop.util.StringUtils;
  *       // optional
  *       setRowFilter(exampleFilter);
  *     }
+ *     
+ *     protected void initialize() {
+ *       Connection connection =
+ *          ConnectionFactory.createConnection(HBaseConfiguration.create(job));
+ *       TableName tableName = TableName.valueOf("exampleTable");
+ *       // mandatory
+ *       initializeTable(connection, tableName);
+ *    }
  *
  *     public void validateInput(JobConf job) throws IOException {
  *     }
@@ -116,13 +122,14 @@ extends InputFormat {
   private RegionLocator regionLocator;
   /** The reader scanning the table, can be a custom one. */
   private TableRecordReader tableRecordReader = null;
+  /** The underlying {@link Connection} of the table. */
+  private Connection connection;
+
   
   /** The reverse DNS lookup cache mapping: IPAddress => HostName */
   private HashMap reverseDNSCacheMap =
     new HashMap();
 
-  private Connection connection;
-
   /**
    * Builds a {@link TableRecordReader}. If no {@link TableRecordReader} was provided, uses
    * the default.
@@ -140,6 +147,10 @@ extends InputFormat {
       InputSplit split, TaskAttemptContext context)
   throws IOException {
     if (table == null) {
+      initialize();
+    }
+    if (getTable() == null) {
+      // initialize() must not have been implemented in the subclass.
       throw new IOException("Cannot create a record reader because of a" +
           " previous error. Please look at the previous logs lines from" +
           " the task's full log for more details.");
@@ -152,19 +163,13 @@ extends InputFormat {
     sc.setStartRow(tSplit.getStartRow());
     sc.setStopRow(tSplit.getEndRow());
     trr.setScan(sc);
-    trr.setTable(table);
+    trr.setTable(getTable());
     return new RecordReader() {
 
       @Override
       public void close() throws IOException {
         trr.close();
-        close(admin, table, regionLocator, connection);
-      }
-
-      private void close(Closeable... closables) throws IOException {
-        for (Closeable c : closables) {
-          if(c != null) { c.close(); }
-        }
+        closeTable();
       }
 
       @Override
@@ -196,7 +201,7 @@ extends InputFormat {
   }
 
   protected Pair getStartEndKeys() throws IOException {
-    return regionLocator.getStartEndKeys();
+    return getRegionLocator().getStartEndKeys();
   }
 
   /**
@@ -211,91 +216,109 @@ extends InputFormat {
    */
   @Override
   public List getSplits(JobContext context) throws IOException {
+    boolean closeOnFinish = false;
+
     if (table == null) {
+      initialize();
+      closeOnFinish = true;
+    }
+
+    if (getTable() == null) {
+      // initialize() wasn't implemented, so the table is null.
       throw new IOException("No table was provided.");
     }
 
-    RegionSizeCalculator sizeCalculator = new RegionSizeCalculator(regionLocator, admin);
-
-    Pair keys = getStartEndKeys();
-    if (keys == null || keys.getFirst() == null ||
-        keys.getFirst().length == 0) {
-      HRegionLocation regLoc = regionLocator.getRegionLocation(HConstants.EMPTY_BYTE_ARRAY, false);
-      if (null == regLoc) {
-        throw new IOException("Expecting at least one region.");
-      }
-      List splits = new ArrayList(1);
-      long regionSize = sizeCalculator.getRegionSize(regLoc.getRegionInfo().getRegionName());
-      TableSplit split = new TableSplit(table.getName(),
-          HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY, regLoc
-              .getHostnamePort().split(Addressing.HOSTNAME_PORT_SEPARATOR)[0], regionSize);
-      splits.add(split);
-      return splits;
-    }
-    List splits = new ArrayList(keys.getFirst().length);
-    for (int i = 0; i < keys.getFirst().length; i++) {
-      if (!includeRegionInSplit(keys.getFirst()[i], keys.getSecond()[i])) {
-        continue;
-      }
-      HRegionLocation location = regionLocator.getRegionLocation(keys.getFirst()[i], false);
-      // The below InetSocketAddress creation does a name resolution.
-      InetSocketAddress isa = new InetSocketAddress(location.getHostname(), location.getPort());
-      if (isa.isUnresolved()) {
-        LOG.warn("Failed resolve " + isa);
-      }
-      InetAddress regionAddress = isa.getAddress();
-      String regionLocation;
-      try {
-        regionLocation = reverseDNS(regionAddress);
-      } catch (NamingException e) {
-        LOG.warn("Cannot resolve the host name for " + regionAddress + " because of " + e);
-        regionLocation = location.getHostname();
-      }
-
-      byte[] startRow = scan.getStartRow();
-      byte[] stopRow = scan.getStopRow();
-      // determine if the given start an stop key fall into the region
-      if ((startRow.length == 0 || keys.getSecond()[i].length == 0 ||
-          Bytes.compareTo(startRow, keys.getSecond()[i]) < 0) &&
-          (stopRow.length == 0 ||
-           Bytes.compareTo(stopRow, keys.getFirst()[i]) > 0)) {
-        byte[] splitStart = startRow.length == 0 ||
-          Bytes.compareTo(keys.getFirst()[i], startRow) >= 0 ?
-            keys.getFirst()[i] : startRow;
-        byte[] splitStop = (stopRow.length == 0 ||
-          Bytes.compareTo(keys.getSecond()[i], stopRow) <= 0) &&
-          keys.getSecond()[i].length > 0 ?
-            keys.getSecond()[i] : stopRow;
-
-        byte[] regionName = location.getRegionInfo().getRegionName();
-        long regionSize = sizeCalculator.getRegionSize(regionName);
-        TableSplit split = new TableSplit(table.getName(),
-          splitStart, splitStop, regionLocation, regionSize);
+    try {
+      RegionSizeCalculator sizeCalculator =
+          new RegionSizeCalculator(getRegionLocator(), getAdmin());
+      
+      TableName tableName = getTable().getName();
+  
+      Pair keys = getStartEndKeys();
+      if (keys == null || keys.getFirst() == null ||
+          keys.getFirst().length == 0) {
+        HRegionLocation regLoc =
+            getRegionLocator().getRegionLocation(HConstants.EMPTY_BYTE_ARRAY, false);
+        if (null == regLoc) {
+          throw new IOException("Expecting at least one region.");
+        }
+        List splits = new ArrayList(1);
+        long regionSize = sizeCalculator.getRegionSize(regLoc.getRegionInfo().getRegionName());
+        TableSplit split = new TableSplit(tableName,
+            HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY, regLoc
+                .getHostnamePort().split(Addressing.HOSTNAME_PORT_SEPARATOR)[0], regionSize);
         splits.add(split);
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("getSplits: split -> " + i + " -> " + split);
+        return splits;
+      }
+      List splits = new ArrayList(keys.getFirst().length);
+      for (int i = 0; i < keys.getFirst().length; i++) {
+        if (!includeRegionInSplit(keys.getFirst()[i], keys.getSecond()[i])) {
+          continue;
+        }
+        HRegionLocation location = getRegionLocator().getRegionLocation(keys.getFirst()[i], false);
+        // The below InetSocketAddress creation does a name resolution.
+        InetSocketAddress isa = new InetSocketAddress(location.getHostname(), location.getPort());
+        if (isa.isUnresolved()) {
+          LOG.warn("Failed resolve " + isa);
+        }
+        InetAddress regionAddress = isa.getAddress();
+        String regionLocation;
+        try {
+          regionLocation = reverseDNS(regionAddress);
+        } catch (NamingException e) {
+          LOG.warn("Cannot resolve the host name for " + regionAddress + " because of " + e);
+          regionLocation = location.getHostname();
+        }
+  
+        byte[] startRow = scan.getStartRow();
+        byte[] stopRow = scan.getStopRow();
+        // determine if the given start an stop key fall into the region
+        if ((startRow.length == 0 || keys.getSecond()[i].length == 0 ||
+            Bytes.compareTo(startRow, keys.getSecond()[i]) < 0) &&
+            (stopRow.length == 0 ||
+             Bytes.compareTo(stopRow, keys.getFirst()[i]) > 0)) {
+          byte[] splitStart = startRow.length == 0 ||
+            Bytes.compareTo(keys.getFirst()[i], startRow) >= 0 ?
+              keys.getFirst()[i] : startRow;
+          byte[] splitStop = (stopRow.length == 0 ||
+            Bytes.compareTo(keys.getSecond()[i], stopRow) <= 0) &&
+            keys.getSecond()[i].length > 0 ?
+              keys.getSecond()[i] : stopRow;
+  
+          byte[] regionName = location.getRegionInfo().getRegionName();
+          long regionSize = sizeCalculator.getRegionSize(regionName);
+          TableSplit split = new TableSplit(tableName,
+            splitStart, splitStop, regionLocation, regionSize);
+          splits.add(split);
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("getSplits: split -> " + i + " -> " + split);
+          }
         }
       }
-    }
-    //The default value of "hbase.mapreduce.input.autobalance" is false, which means not enabled.
-    boolean enableAutoBalance = context.getConfiguration()
-      .getBoolean(MAPREDUCE_INPUT_AUTOBALANCE, false);
-    if (enableAutoBalance) {
-      long totalRegionSize=0;
-      for (int i = 0; i < splits.size(); i++){
-        TableSplit ts = (TableSplit)splits.get(i);
-        totalRegionSize += ts.getLength();
+      //The default value of "hbase.mapreduce.input.autobalance" is false, which means not enabled.
+      boolean enableAutoBalance = context.getConfiguration()
+        .getBoolean(MAPREDUCE_INPUT_AUTOBALANCE, false);
+      if (enableAutoBalance) {
+        long totalRegionSize=0;
+        for (int i = 0; i < splits.size(); i++){
+          TableSplit ts = (TableSplit)splits.get(i);
+          totalRegionSize += ts.getLength();
+        }
+        long averageRegionSize = totalRegionSize / splits.size();
+        // the averageRegionSize must be positive.
+        if (averageRegionSize <= 0) {
+            LOG.warn("The averageRegionSize is not positive: "+ averageRegionSize + ", " +
+                    "set it to 1.");
+            averageRegionSize = 1;
+        }
+        return calculateRebalancedSplits(splits, context, averageRegionSize);
+      } else {
+        return splits;
       }
-      long averageRegionSize = totalRegionSize / splits.size();
-      // the averageRegionSize must be positive.
-      if (averageRegionSize <= 0) {
-          LOG.warn("The averageRegionSize is not positive: "+ averageRegionSize + ", " +
-                  "set it to 1.");
-          averageRegionSize = 1;
+    } finally {
+      if (closeOnFinish) {
+        closeTable();
       }
-      return calculateRebalancedSplits(splits, context, averageRegionSize);
-    } else {
-      return splits;
     }
   }
 
@@ -343,6 +366,7 @@ extends InputFormat {
     int count = 0;
     while (count < list.size()) {
       TableSplit ts = (TableSplit)list.get(count);
+      TableName tableName = ts.getTable();
       String regionLocation = ts.getRegionLocation();
       long regionSize = ts.getLength();
       if (regionSize >= dataSkewThreshold) {
@@ -351,9 +375,9 @@ extends InputFormat {
         byte[] splitKey = getSplitKey(ts.getStartRow(), ts.getEndRow(), isTextKey);
          //Set the size of child TableSplit as 1/2 of the region size. The exact size of the
          // MapReduce input splits is not far off.
-        TableSplit t1 = new TableSplit(table.getName(), ts.getStartRow(), splitKey, regionLocation,
+        TableSplit t1 = new TableSplit(tableName, ts.getStartRow(), splitKey, regionLocation,
                 regionSize / 2);
-        TableSplit t2 = new TableSplit(table.getName(), splitKey, ts.getEndRow(), regionLocation,
+        TableSplit t2 = new TableSplit(tableName, splitKey, ts.getEndRow(), regionLocation,
                 regionSize - regionSize / 2);
         resultList.add(t1);
         resultList.add(t2);
@@ -380,7 +404,7 @@ extends InputFormat {
             break;
           }
         }
-        TableSplit t = new TableSplit(table.getName(), splitStartKey, splitEndKey,
+        TableSplit t = new TableSplit(tableName, splitStartKey, splitEndKey,
                 regionLocation, totalSize);
         resultList.add(t);
       }
@@ -515,13 +539,16 @@ extends InputFormat {
    */
   @Deprecated
   protected HTable getHTable() {
-    return (HTable) this.table;
+    return (HTable) this.getTable();
   }
 
   /**
    * Allows subclasses to get the {@link RegionLocator}.
    */
   protected RegionLocator getRegionLocator() {
+    if (regionLocator == null) {
+      initialize();
+    }
     return regionLocator;
   }
   
@@ -529,6 +556,9 @@ extends InputFormat {
    * Allows subclasses to get the {@link Table}.
    */
   protected Table getTable() {
+    if (table == null) {
+      initialize();
+    }
     return table;
   }
 
@@ -536,6 +566,9 @@ extends InputFormat {
    * Allows subclasses to get the {@link Admin}.
    */
   protected Admin getAdmin() {
+    if (admin == null) {
+      initialize();
+    }
     return admin;
   }
 
@@ -550,7 +583,8 @@ extends InputFormat {
   protected void setHTable(HTable table) throws IOException {
     this.table = table;
     this.regionLocator = table.getRegionLocator();
-    this.admin = table.getConnection().getAdmin();
+    this.connection = table.getConnection();
+    this.admin = this.connection.getAdmin();
   }
 
   /**
@@ -595,4 +629,34 @@ extends InputFormat {
   protected void setTableRecordReader(TableRecordReader tableRecordReader) {
     this.tableRecordReader = tableRecordReader;
   }
+  
+  /**
+   * This method will be called when any of the following are referenced, but not yet initialized:
+   * admin, regionLocator, table. Subclasses will have the opportunity to call
+   * {@link #initializeTable(Connection, TableName)}
+   */
+  protected void initialize() {
+   
+  }
+
+  /**
+   * Close the Table and related objects that were initialized via
+   * {@link #initializeTable(Connection, TableName)}.
+   *
+   * @throws IOException
+   */
+  protected void closeTable() throws IOException {
+    close(admin, table, regionLocator, connection);
+    admin = null;
+    table = null;
+    regionLocator = null;
+    connection = null;
+  }
+
+  private void close(Closeable... closables) throws IOException {
+    for (Closeable c : closables) {
+      if(c != null) { c.close(); }
+    }
+  }
+
 }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableOutputFormat.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableOutputFormat.java
index 107e7b60a64..80ba4727350 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableOutputFormat.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableOutputFormat.java
@@ -78,15 +78,26 @@ implements Configurable {
   /** The configuration. */
   private Configuration conf = null;
 
-  private Table table;
-  private Connection connection;
-
   /**
    * Writes the reducer output to an HBase table.
    */
   protected class TableRecordWriter
   extends RecordWriter {
 
+    private Connection connection;
+    private Table table;
+
+    /**
+     * @throws IOException 
+     * 
+     */
+    public TableRecordWriter() throws IOException {
+      String tableName = conf.get(OUTPUT_TABLE);
+      this.connection = ConnectionFactory.createConnection(conf);
+      this.table = connection.getTable(TableName.valueOf(tableName));
+      this.table.setAutoFlushTo(false);
+      LOG.info("Created table instance for "  + tableName);
+    }
     /**
      * Closes the writer, in this case flush table commits.
      *
@@ -164,6 +175,7 @@ implements Configurable {
     return new TableOutputCommitter();
   }
 
+  @Override
   public Configuration getConf() {
     return conf;
   }
@@ -192,10 +204,6 @@ implements Configurable {
       if (zkClientPort != 0) {
         this.conf.setInt(HConstants.ZOOKEEPER_CLIENT_PORT, zkClientPort);
       }
-      this.connection = ConnectionFactory.createConnection(this.conf);
-      this.table = connection.getTable(TableName.valueOf(tableName));
-      this.table.setAutoFlushTo(false);
-      LOG.info("Created table instance for "  + tableName);
     } catch(IOException e) {
       LOG.error(e);
       throw new RuntimeException(e);