diff --git a/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java b/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java index 46b1c5659b6..7f1efc6d91a 100644 --- a/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java +++ b/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java @@ -25,10 +25,13 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.NotServingRegionException; import org.apache.hadoop.hbase.RemoteExceptionHandler; +import org.apache.hadoop.hbase.UnknownScannerException; import org.apache.hadoop.hbase.client.metrics.ScanMetrics; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.RequestConverter; @@ -48,6 +51,10 @@ import com.google.protobuf.ServiceException; @InterfaceAudience.Public @InterfaceStability.Stable public class ScannerCallable extends ServerCallable { + public static final String LOG_SCANNER_LATENCY_CUTOFF + = "hbase.client.log.scanner.latency.cutoff"; + public static final String LOG_SCANNER_ACTIVITY = "hbase.client.log.scanner.activity"; + private static final Log LOG = LogFactory.getLog(ScannerCallable.class); private long scannerId = -1L; private boolean instantiated = false; @@ -55,6 +62,8 @@ public class ScannerCallable extends ServerCallable { private Scan scan; private int caching = 1; private ScanMetrics scanMetrics; + private boolean logScannerActivity = false; + private int logCutOffLatency = 1000; // indicate if it is a remote server call private boolean isRegionServerRemote = true; @@ -71,6 +80,9 @@ public class ScannerCallable extends ServerCallable { super(connection, tableName, scan.getStartRow()); this.scan = scan; this.scanMetrics = scanMetrics; + Configuration conf = connection.getConfiguration(); + logScannerActivity = conf.getBoolean(LOG_SCANNER_ACTIVITY, false); + logCutOffLatency = conf.getInt(LOG_SCANNER_LATENCY_CUTOFF, 1000); } /** @@ -129,7 +141,16 @@ public class ScannerCallable extends ServerCallable { RequestConverter.buildScanRequest(scannerId, caching, false); try { ScanResponse response = server.scan(null, request); + long timestamp = System.currentTimeMillis(); rrs = ResponseConverter.getResults(response); + if (logScannerActivity) { + long now = System.currentTimeMillis(); + if (now - timestamp > logCutOffLatency) { + int rows = rrs == null ? 0 : rrs.length; + LOG.info("Took " + (now-timestamp) + "ms to fetch " + + rows + " rows from scanner=" + scannerId); + } + } if (response.hasMoreResults() && !response.getMoreResults()) { scannerId = -1L; @@ -141,10 +162,25 @@ public class ScannerCallable extends ServerCallable { } updateResultsMetrics(rrs); } catch (IOException e) { + if (logScannerActivity) { + LOG.info("Got exception in fetching from scanner=" + + scannerId, e); + } IOException ioe = e; if (e instanceof RemoteException) { ioe = RemoteExceptionHandler.decodeRemoteException((RemoteException)e); } + if (logScannerActivity && (ioe instanceof UnknownScannerException)) { + try { + HRegionLocation location = + connection.relocateRegion(tableName, scan.getStartRow()); + LOG.info("Scanner=" + scannerId + + " expired, current region location is " + location.toString() + + " ip:" + location.getServerAddress().getBindAddress()); + } catch (Throwable t) { + LOG.info("Failed to relocate region", t); + } + } if (ioe instanceof NotServingRegionException) { // Throw a DNRE so that we break out of cycle of calling NSRE // when what we need is to open scanner against new location. @@ -221,7 +257,13 @@ public class ScannerCallable extends ServerCallable { this.scan, 0, false); try { ScanResponse response = server.scan(null, request); - return response.getScannerId(); + long id = response.getScannerId(); + if (logScannerActivity) { + LOG.info("Open scanner=" + id + " for scan=" + scan.toString() + + " on region " + this.location.toString() + " ip:" + + this.location.getServerAddress().getBindAddress()); + } + return id; } catch (ServiceException se) { throw ProtobufUtil.getRemoteException(se); } diff --git a/src/main/java/org/apache/hadoop/hbase/mapred/TableRecordReaderImpl.java b/src/main/java/org/apache/hadoop/hbase/mapred/TableRecordReaderImpl.java index 42569fbdd25..a3639716cc8 100644 --- a/src/main/java/org/apache/hadoop/hbase/mapred/TableRecordReaderImpl.java +++ b/src/main/java/org/apache/hadoop/hbase/mapred/TableRecordReaderImpl.java @@ -23,11 +23,13 @@ import java.io.IOException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.ScannerCallable; import org.apache.hadoop.hbase.filter.Filter; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableInputFormat; @@ -35,6 +37,7 @@ import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Writables; import org.apache.hadoop.util.StringUtils; +import static org.apache.hadoop.hbase.mapreduce.TableRecordReaderImpl.LOG_PER_ROW_COUNT; /** * Iterate over an HBase table data, return (Text, RowResult) pairs @@ -49,6 +52,10 @@ public class TableRecordReaderImpl { private ResultScanner scanner; private HTable htable; private byte [][] trrInputColumns; + private long timestamp; + private int rowcount; + private boolean logScannerActivity = false; + private int logPerRowCount = 100; /** * Restart from survivable exceptions by creating a new scanner. @@ -57,6 +64,7 @@ public class TableRecordReaderImpl { * @throws IOException */ public void restart(byte[] firstRow) throws IOException { + Scan currentScan; if ((endRow != null) && (endRow.length > 0)) { if (trrRowFilter != null) { Scan scan = new Scan(firstRow, endRow); @@ -64,6 +72,7 @@ public class TableRecordReaderImpl { scan.setFilter(trrRowFilter); scan.setCacheBlocks(false); this.scanner = this.htable.getScanner(scan); + currentScan = scan; } else { LOG.debug("TIFB.restart, firstRow: " + Bytes.toStringBinary(firstRow) + ", endRow: " + @@ -71,6 +80,7 @@ public class TableRecordReaderImpl { Scan scan = new Scan(firstRow, endRow); TableInputFormat.addColumns(scan, trrInputColumns); this.scanner = this.htable.getScanner(scan); + currentScan = scan; } } else { LOG.debug("TIFB.restart, firstRow: " + @@ -80,6 +90,12 @@ public class TableRecordReaderImpl { TableInputFormat.addColumns(scan, trrInputColumns); scan.setFilter(trrRowFilter); this.scanner = this.htable.getScanner(scan); + currentScan = scan; + } + if (logScannerActivity) { + LOG.info("Current scan=" + currentScan.toString()); + timestamp = System.currentTimeMillis(); + rowcount = 0; } } @@ -99,6 +115,10 @@ public class TableRecordReaderImpl { * @param htable the {@link HTable} to scan. */ public void setHTable(HTable htable) { + Configuration conf = htable.getConfiguration(); + logScannerActivity = conf.getBoolean( + ScannerCallable.LOG_SCANNER_ACTIVITY, false); + logPerRowCount = conf.getInt(LOG_PER_ROW_COUNT, 100); this.htable = htable; } @@ -174,32 +194,55 @@ public class TableRecordReaderImpl { throws IOException { Result result; try { - result = this.scanner.next(); - } catch (DoNotRetryIOException e) { - throw e; - } catch (IOException e) { - LOG.debug("recovered from " + StringUtils.stringifyException(e)); - if (lastSuccessfulRow == null) { - LOG.warn("We are restarting the first next() invocation," + - " if your mapper's restarted a few other times like this" + - " then you should consider killing this job and investigate" + - " why it's taking so long."); + try { + result = this.scanner.next(); + if (logScannerActivity) { + rowcount ++; + if (rowcount >= logPerRowCount) { + long now = System.currentTimeMillis(); + LOG.info("Mapper took " + (now-timestamp) + + "ms to process " + rowcount + " rows"); + timestamp = now; + rowcount = 0; + } + } + } catch (DoNotRetryIOException e) { + throw e; + } catch (IOException e) { + LOG.debug("recovered from " + StringUtils.stringifyException(e)); + if (lastSuccessfulRow == null) { + LOG.warn("We are restarting the first next() invocation," + + " if your mapper has restarted a few other times like this" + + " then you should consider killing this job and investigate" + + " why it's taking so long."); + } + if (lastSuccessfulRow == null) { + restart(startRow); + } else { + restart(lastSuccessfulRow); + this.scanner.next(); // skip presumed already mapped row + } + result = this.scanner.next(); } - if (lastSuccessfulRow == null) { - restart(startRow); - } else { - restart(lastSuccessfulRow); - this.scanner.next(); // skip presumed already mapped row - } - result = this.scanner.next(); - } - if (result != null && result.size() > 0) { - key.set(result.getRow()); - lastSuccessfulRow = key.get(); - Writables.copyWritable(result, value); - return true; + if (result != null && result.size() > 0) { + key.set(result.getRow()); + lastSuccessfulRow = key.get(); + Writables.copyWritable(result, value); + return true; + } + return false; + } catch (IOException ioe) { + if (logScannerActivity) { + long now = System.currentTimeMillis(); + LOG.info("Mapper took " + (now-timestamp) + + "ms to process " + rowcount + " rows"); + LOG.info(ioe); + String lastRow = lastSuccessfulRow == null ? + "null" : Bytes.toStringBinary(lastSuccessfulRow); + LOG.info("lastSuccessfulRow=" + lastRow); + } + throw ioe; } - return false; } } diff --git a/src/main/java/org/apache/hadoop/hbase/mapreduce/TableRecordReaderImpl.java b/src/main/java/org/apache/hadoop/hbase/mapreduce/TableRecordReaderImpl.java index 1c8a393ac51..29f8915215b 100644 --- a/src/main/java/org/apache/hadoop/hbase/mapreduce/TableRecordReaderImpl.java +++ b/src/main/java/org/apache/hadoop/hbase/mapreduce/TableRecordReaderImpl.java @@ -24,11 +24,13 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.ScannerCallable; import org.apache.hadoop.hbase.client.metrics.ScanMetrics; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.util.Bytes; @@ -46,7 +48,8 @@ import org.apache.hadoop.util.StringUtils; @InterfaceAudience.Public @InterfaceStability.Stable public class TableRecordReaderImpl { - + public static final String LOG_PER_ROW_COUNT + = "hbase.mapreduce.log.scanner.rowcount"; static final Log LOG = LogFactory.getLog(TableRecordReader.class); @@ -62,6 +65,10 @@ public class TableRecordReaderImpl { private Result value = null; private TaskAttemptContext context = null; private Method getCounter = null; + private long timestamp; + private int rowcount; + private boolean logScannerActivity = false; + private int logPerRowCount = 100; /** * Restart from survivable exceptions by creating a new scanner. @@ -75,6 +82,11 @@ public class TableRecordReaderImpl { currentScan.setAttribute(Scan.SCAN_ATTRIBUTES_METRICS_ENABLE, Bytes.toBytes(Boolean.TRUE)); this.scanner = this.htable.getScanner(currentScan); + if (logScannerActivity) { + LOG.info("Current scan=" + currentScan.toString()); + timestamp = System.currentTimeMillis(); + rowcount = 0; + } } /** @@ -103,6 +115,10 @@ public class TableRecordReaderImpl { * @param htable The {@link HTable} to scan. */ public void setHTable(HTable htable) { + Configuration conf = htable.getConfiguration(); + logScannerActivity = conf.getBoolean( + ScannerCallable.LOG_SCANNER_ACTIVITY, false); + logPerRowCount = conf.getInt(LOG_PER_ROW_COUNT, 100); this.htable = htable; } @@ -174,33 +190,56 @@ public class TableRecordReaderImpl { if (key == null) key = new ImmutableBytesWritable(); if (value == null) value = new Result(); try { - value = this.scanner.next(); - } catch (DoNotRetryIOException e) { - throw e; - } catch (IOException e) { - LOG.info("recovered from " + StringUtils.stringifyException(e)); - if (lastSuccessfulRow == null) { - LOG.warn("We are restarting the first next() invocation," + - " if your mapper's restarted a few other times like this" + - " then you should consider killing this job and investigate" + - " why it's taking so long."); + try { + value = this.scanner.next(); + if (logScannerActivity) { + rowcount ++; + if (rowcount >= logPerRowCount) { + long now = System.currentTimeMillis(); + LOG.info("Mapper took " + (now-timestamp) + + "ms to process " + rowcount + " rows"); + timestamp = now; + rowcount = 0; + } + } + } catch (DoNotRetryIOException e) { + throw e; + } catch (IOException e) { + LOG.info("recovered from " + StringUtils.stringifyException(e)); + if (lastSuccessfulRow == null) { + LOG.warn("We are restarting the first next() invocation," + + " if your mapper has restarted a few other times like this" + + " then you should consider killing this job and investigate" + + " why it's taking so long."); + } + if (lastSuccessfulRow == null) { + restart(scan.getStartRow()); + } else { + restart(lastSuccessfulRow); + scanner.next(); // skip presumed already mapped row + } + value = scanner.next(); } - if (lastSuccessfulRow == null) { - restart(scan.getStartRow()); - } else { - restart(lastSuccessfulRow); - scanner.next(); // skip presumed already mapped row + if (value != null && value.size() > 0) { + key.set(value.getRow()); + lastSuccessfulRow = key.get(); + return true; } - value = scanner.next(); - } - if (value != null && value.size() > 0) { - key.set(value.getRow()); - lastSuccessfulRow = key.get(); - return true; - } - updateCounters(); - return false; + updateCounters(); + return false; + } catch (IOException ioe) { + if (logScannerActivity) { + long now = System.currentTimeMillis(); + LOG.info("Mapper took " + (now-timestamp) + + "ms to process " + rowcount + " rows"); + LOG.info(ioe); + String lastRow = lastSuccessfulRow == null ? + "null" : Bytes.toStringBinary(lastSuccessfulRow); + LOG.info("lastSuccessfulRow=" + lastRow); + } + throw ioe; + } } /**