HBASE-6004. Adding more logging to help debugging MR job (Jimmy Xiang)
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1339806 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
289205c517
commit
da7ff6c6c4
|
@ -25,10 +25,13 @@ import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.classification.InterfaceStability;
|
import org.apache.hadoop.classification.InterfaceStability;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.hbase.DoNotRetryIOException;
|
import org.apache.hadoop.hbase.DoNotRetryIOException;
|
||||||
import org.apache.hadoop.hbase.HRegionInfo;
|
import org.apache.hadoop.hbase.HRegionInfo;
|
||||||
|
import org.apache.hadoop.hbase.HRegionLocation;
|
||||||
import org.apache.hadoop.hbase.NotServingRegionException;
|
import org.apache.hadoop.hbase.NotServingRegionException;
|
||||||
import org.apache.hadoop.hbase.RemoteExceptionHandler;
|
import org.apache.hadoop.hbase.RemoteExceptionHandler;
|
||||||
|
import org.apache.hadoop.hbase.UnknownScannerException;
|
||||||
import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
|
import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
|
||||||
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
|
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
|
||||||
import org.apache.hadoop.hbase.protobuf.RequestConverter;
|
import org.apache.hadoop.hbase.protobuf.RequestConverter;
|
||||||
|
@ -48,6 +51,10 @@ import com.google.protobuf.ServiceException;
|
||||||
@InterfaceAudience.Public
|
@InterfaceAudience.Public
|
||||||
@InterfaceStability.Stable
|
@InterfaceStability.Stable
|
||||||
public class ScannerCallable extends ServerCallable<Result[]> {
|
public class ScannerCallable extends ServerCallable<Result[]> {
|
||||||
|
public static final String LOG_SCANNER_LATENCY_CUTOFF
|
||||||
|
= "hbase.client.log.scanner.latency.cutoff";
|
||||||
|
public static final String LOG_SCANNER_ACTIVITY = "hbase.client.log.scanner.activity";
|
||||||
|
|
||||||
private static final Log LOG = LogFactory.getLog(ScannerCallable.class);
|
private static final Log LOG = LogFactory.getLog(ScannerCallable.class);
|
||||||
private long scannerId = -1L;
|
private long scannerId = -1L;
|
||||||
private boolean instantiated = false;
|
private boolean instantiated = false;
|
||||||
|
@ -55,6 +62,8 @@ public class ScannerCallable extends ServerCallable<Result[]> {
|
||||||
private Scan scan;
|
private Scan scan;
|
||||||
private int caching = 1;
|
private int caching = 1;
|
||||||
private ScanMetrics scanMetrics;
|
private ScanMetrics scanMetrics;
|
||||||
|
private boolean logScannerActivity = false;
|
||||||
|
private int logCutOffLatency = 1000;
|
||||||
|
|
||||||
// indicate if it is a remote server call
|
// indicate if it is a remote server call
|
||||||
private boolean isRegionServerRemote = true;
|
private boolean isRegionServerRemote = true;
|
||||||
|
@ -71,6 +80,9 @@ public class ScannerCallable extends ServerCallable<Result[]> {
|
||||||
super(connection, tableName, scan.getStartRow());
|
super(connection, tableName, scan.getStartRow());
|
||||||
this.scan = scan;
|
this.scan = scan;
|
||||||
this.scanMetrics = scanMetrics;
|
this.scanMetrics = scanMetrics;
|
||||||
|
Configuration conf = connection.getConfiguration();
|
||||||
|
logScannerActivity = conf.getBoolean(LOG_SCANNER_ACTIVITY, false);
|
||||||
|
logCutOffLatency = conf.getInt(LOG_SCANNER_LATENCY_CUTOFF, 1000);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -129,7 +141,16 @@ public class ScannerCallable extends ServerCallable<Result[]> {
|
||||||
RequestConverter.buildScanRequest(scannerId, caching, false);
|
RequestConverter.buildScanRequest(scannerId, caching, false);
|
||||||
try {
|
try {
|
||||||
ScanResponse response = server.scan(null, request);
|
ScanResponse response = server.scan(null, request);
|
||||||
|
long timestamp = System.currentTimeMillis();
|
||||||
rrs = ResponseConverter.getResults(response);
|
rrs = ResponseConverter.getResults(response);
|
||||||
|
if (logScannerActivity) {
|
||||||
|
long now = System.currentTimeMillis();
|
||||||
|
if (now - timestamp > logCutOffLatency) {
|
||||||
|
int rows = rrs == null ? 0 : rrs.length;
|
||||||
|
LOG.info("Took " + (now-timestamp) + "ms to fetch "
|
||||||
|
+ rows + " rows from scanner=" + scannerId);
|
||||||
|
}
|
||||||
|
}
|
||||||
if (response.hasMoreResults()
|
if (response.hasMoreResults()
|
||||||
&& !response.getMoreResults()) {
|
&& !response.getMoreResults()) {
|
||||||
scannerId = -1L;
|
scannerId = -1L;
|
||||||
|
@ -141,10 +162,25 @@ public class ScannerCallable extends ServerCallable<Result[]> {
|
||||||
}
|
}
|
||||||
updateResultsMetrics(rrs);
|
updateResultsMetrics(rrs);
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
|
if (logScannerActivity) {
|
||||||
|
LOG.info("Got exception in fetching from scanner="
|
||||||
|
+ scannerId, e);
|
||||||
|
}
|
||||||
IOException ioe = e;
|
IOException ioe = e;
|
||||||
if (e instanceof RemoteException) {
|
if (e instanceof RemoteException) {
|
||||||
ioe = RemoteExceptionHandler.decodeRemoteException((RemoteException)e);
|
ioe = RemoteExceptionHandler.decodeRemoteException((RemoteException)e);
|
||||||
}
|
}
|
||||||
|
if (logScannerActivity && (ioe instanceof UnknownScannerException)) {
|
||||||
|
try {
|
||||||
|
HRegionLocation location =
|
||||||
|
connection.relocateRegion(tableName, scan.getStartRow());
|
||||||
|
LOG.info("Scanner=" + scannerId
|
||||||
|
+ " expired, current region location is " + location.toString()
|
||||||
|
+ " ip:" + location.getServerAddress().getBindAddress());
|
||||||
|
} catch (Throwable t) {
|
||||||
|
LOG.info("Failed to relocate region", t);
|
||||||
|
}
|
||||||
|
}
|
||||||
if (ioe instanceof NotServingRegionException) {
|
if (ioe instanceof NotServingRegionException) {
|
||||||
// Throw a DNRE so that we break out of cycle of calling NSRE
|
// Throw a DNRE so that we break out of cycle of calling NSRE
|
||||||
// when what we need is to open scanner against new location.
|
// when what we need is to open scanner against new location.
|
||||||
|
@ -221,7 +257,13 @@ public class ScannerCallable extends ServerCallable<Result[]> {
|
||||||
this.scan, 0, false);
|
this.scan, 0, false);
|
||||||
try {
|
try {
|
||||||
ScanResponse response = server.scan(null, request);
|
ScanResponse response = server.scan(null, request);
|
||||||
return response.getScannerId();
|
long id = response.getScannerId();
|
||||||
|
if (logScannerActivity) {
|
||||||
|
LOG.info("Open scanner=" + id + " for scan=" + scan.toString()
|
||||||
|
+ " on region " + this.location.toString() + " ip:"
|
||||||
|
+ this.location.getServerAddress().getBindAddress());
|
||||||
|
}
|
||||||
|
return id;
|
||||||
} catch (ServiceException se) {
|
} catch (ServiceException se) {
|
||||||
throw ProtobufUtil.getRemoteException(se);
|
throw ProtobufUtil.getRemoteException(se);
|
||||||
}
|
}
|
||||||
|
|
|
@ -23,11 +23,13 @@ import java.io.IOException;
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.hbase.DoNotRetryIOException;
|
import org.apache.hadoop.hbase.DoNotRetryIOException;
|
||||||
import org.apache.hadoop.hbase.client.HTable;
|
import org.apache.hadoop.hbase.client.HTable;
|
||||||
import org.apache.hadoop.hbase.client.Result;
|
import org.apache.hadoop.hbase.client.Result;
|
||||||
import org.apache.hadoop.hbase.client.ResultScanner;
|
import org.apache.hadoop.hbase.client.ResultScanner;
|
||||||
import org.apache.hadoop.hbase.client.Scan;
|
import org.apache.hadoop.hbase.client.Scan;
|
||||||
|
import org.apache.hadoop.hbase.client.ScannerCallable;
|
||||||
import org.apache.hadoop.hbase.filter.Filter;
|
import org.apache.hadoop.hbase.filter.Filter;
|
||||||
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
|
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
|
||||||
import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
|
import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
|
||||||
|
@ -35,6 +37,7 @@ import org.apache.hadoop.hbase.util.Bytes;
|
||||||
import org.apache.hadoop.hbase.util.Writables;
|
import org.apache.hadoop.hbase.util.Writables;
|
||||||
import org.apache.hadoop.util.StringUtils;
|
import org.apache.hadoop.util.StringUtils;
|
||||||
|
|
||||||
|
import static org.apache.hadoop.hbase.mapreduce.TableRecordReaderImpl.LOG_PER_ROW_COUNT;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Iterate over an HBase table data, return (Text, RowResult) pairs
|
* Iterate over an HBase table data, return (Text, RowResult) pairs
|
||||||
|
@ -49,6 +52,10 @@ public class TableRecordReaderImpl {
|
||||||
private ResultScanner scanner;
|
private ResultScanner scanner;
|
||||||
private HTable htable;
|
private HTable htable;
|
||||||
private byte [][] trrInputColumns;
|
private byte [][] trrInputColumns;
|
||||||
|
private long timestamp;
|
||||||
|
private int rowcount;
|
||||||
|
private boolean logScannerActivity = false;
|
||||||
|
private int logPerRowCount = 100;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Restart from survivable exceptions by creating a new scanner.
|
* Restart from survivable exceptions by creating a new scanner.
|
||||||
|
@ -57,6 +64,7 @@ public class TableRecordReaderImpl {
|
||||||
* @throws IOException
|
* @throws IOException
|
||||||
*/
|
*/
|
||||||
public void restart(byte[] firstRow) throws IOException {
|
public void restart(byte[] firstRow) throws IOException {
|
||||||
|
Scan currentScan;
|
||||||
if ((endRow != null) && (endRow.length > 0)) {
|
if ((endRow != null) && (endRow.length > 0)) {
|
||||||
if (trrRowFilter != null) {
|
if (trrRowFilter != null) {
|
||||||
Scan scan = new Scan(firstRow, endRow);
|
Scan scan = new Scan(firstRow, endRow);
|
||||||
|
@ -64,6 +72,7 @@ public class TableRecordReaderImpl {
|
||||||
scan.setFilter(trrRowFilter);
|
scan.setFilter(trrRowFilter);
|
||||||
scan.setCacheBlocks(false);
|
scan.setCacheBlocks(false);
|
||||||
this.scanner = this.htable.getScanner(scan);
|
this.scanner = this.htable.getScanner(scan);
|
||||||
|
currentScan = scan;
|
||||||
} else {
|
} else {
|
||||||
LOG.debug("TIFB.restart, firstRow: " +
|
LOG.debug("TIFB.restart, firstRow: " +
|
||||||
Bytes.toStringBinary(firstRow) + ", endRow: " +
|
Bytes.toStringBinary(firstRow) + ", endRow: " +
|
||||||
|
@ -71,6 +80,7 @@ public class TableRecordReaderImpl {
|
||||||
Scan scan = new Scan(firstRow, endRow);
|
Scan scan = new Scan(firstRow, endRow);
|
||||||
TableInputFormat.addColumns(scan, trrInputColumns);
|
TableInputFormat.addColumns(scan, trrInputColumns);
|
||||||
this.scanner = this.htable.getScanner(scan);
|
this.scanner = this.htable.getScanner(scan);
|
||||||
|
currentScan = scan;
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
LOG.debug("TIFB.restart, firstRow: " +
|
LOG.debug("TIFB.restart, firstRow: " +
|
||||||
|
@ -80,6 +90,12 @@ public class TableRecordReaderImpl {
|
||||||
TableInputFormat.addColumns(scan, trrInputColumns);
|
TableInputFormat.addColumns(scan, trrInputColumns);
|
||||||
scan.setFilter(trrRowFilter);
|
scan.setFilter(trrRowFilter);
|
||||||
this.scanner = this.htable.getScanner(scan);
|
this.scanner = this.htable.getScanner(scan);
|
||||||
|
currentScan = scan;
|
||||||
|
}
|
||||||
|
if (logScannerActivity) {
|
||||||
|
LOG.info("Current scan=" + currentScan.toString());
|
||||||
|
timestamp = System.currentTimeMillis();
|
||||||
|
rowcount = 0;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -99,6 +115,10 @@ public class TableRecordReaderImpl {
|
||||||
* @param htable the {@link HTable} to scan.
|
* @param htable the {@link HTable} to scan.
|
||||||
*/
|
*/
|
||||||
public void setHTable(HTable htable) {
|
public void setHTable(HTable htable) {
|
||||||
|
Configuration conf = htable.getConfiguration();
|
||||||
|
logScannerActivity = conf.getBoolean(
|
||||||
|
ScannerCallable.LOG_SCANNER_ACTIVITY, false);
|
||||||
|
logPerRowCount = conf.getInt(LOG_PER_ROW_COUNT, 100);
|
||||||
this.htable = htable;
|
this.htable = htable;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -174,32 +194,55 @@ public class TableRecordReaderImpl {
|
||||||
throws IOException {
|
throws IOException {
|
||||||
Result result;
|
Result result;
|
||||||
try {
|
try {
|
||||||
result = this.scanner.next();
|
try {
|
||||||
} catch (DoNotRetryIOException e) {
|
result = this.scanner.next();
|
||||||
throw e;
|
if (logScannerActivity) {
|
||||||
} catch (IOException e) {
|
rowcount ++;
|
||||||
LOG.debug("recovered from " + StringUtils.stringifyException(e));
|
if (rowcount >= logPerRowCount) {
|
||||||
if (lastSuccessfulRow == null) {
|
long now = System.currentTimeMillis();
|
||||||
LOG.warn("We are restarting the first next() invocation," +
|
LOG.info("Mapper took " + (now-timestamp)
|
||||||
" if your mapper's restarted a few other times like this" +
|
+ "ms to process " + rowcount + " rows");
|
||||||
" then you should consider killing this job and investigate" +
|
timestamp = now;
|
||||||
" why it's taking so long.");
|
rowcount = 0;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} catch (DoNotRetryIOException e) {
|
||||||
|
throw e;
|
||||||
|
} catch (IOException e) {
|
||||||
|
LOG.debug("recovered from " + StringUtils.stringifyException(e));
|
||||||
|
if (lastSuccessfulRow == null) {
|
||||||
|
LOG.warn("We are restarting the first next() invocation," +
|
||||||
|
" if your mapper has restarted a few other times like this" +
|
||||||
|
" then you should consider killing this job and investigate" +
|
||||||
|
" why it's taking so long.");
|
||||||
|
}
|
||||||
|
if (lastSuccessfulRow == null) {
|
||||||
|
restart(startRow);
|
||||||
|
} else {
|
||||||
|
restart(lastSuccessfulRow);
|
||||||
|
this.scanner.next(); // skip presumed already mapped row
|
||||||
|
}
|
||||||
|
result = this.scanner.next();
|
||||||
}
|
}
|
||||||
if (lastSuccessfulRow == null) {
|
|
||||||
restart(startRow);
|
|
||||||
} else {
|
|
||||||
restart(lastSuccessfulRow);
|
|
||||||
this.scanner.next(); // skip presumed already mapped row
|
|
||||||
}
|
|
||||||
result = this.scanner.next();
|
|
||||||
}
|
|
||||||
|
|
||||||
if (result != null && result.size() > 0) {
|
if (result != null && result.size() > 0) {
|
||||||
key.set(result.getRow());
|
key.set(result.getRow());
|
||||||
lastSuccessfulRow = key.get();
|
lastSuccessfulRow = key.get();
|
||||||
Writables.copyWritable(result, value);
|
Writables.copyWritable(result, value);
|
||||||
return true;
|
return true;
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
} catch (IOException ioe) {
|
||||||
|
if (logScannerActivity) {
|
||||||
|
long now = System.currentTimeMillis();
|
||||||
|
LOG.info("Mapper took " + (now-timestamp)
|
||||||
|
+ "ms to process " + rowcount + " rows");
|
||||||
|
LOG.info(ioe);
|
||||||
|
String lastRow = lastSuccessfulRow == null ?
|
||||||
|
"null" : Bytes.toStringBinary(lastSuccessfulRow);
|
||||||
|
LOG.info("lastSuccessfulRow=" + lastRow);
|
||||||
|
}
|
||||||
|
throw ioe;
|
||||||
}
|
}
|
||||||
return false;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -24,11 +24,13 @@ import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.classification.InterfaceStability;
|
import org.apache.hadoop.classification.InterfaceStability;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.hbase.DoNotRetryIOException;
|
import org.apache.hadoop.hbase.DoNotRetryIOException;
|
||||||
import org.apache.hadoop.hbase.client.HTable;
|
import org.apache.hadoop.hbase.client.HTable;
|
||||||
import org.apache.hadoop.hbase.client.Result;
|
import org.apache.hadoop.hbase.client.Result;
|
||||||
import org.apache.hadoop.hbase.client.ResultScanner;
|
import org.apache.hadoop.hbase.client.ResultScanner;
|
||||||
import org.apache.hadoop.hbase.client.Scan;
|
import org.apache.hadoop.hbase.client.Scan;
|
||||||
|
import org.apache.hadoop.hbase.client.ScannerCallable;
|
||||||
import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
|
import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
|
||||||
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
|
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
|
@ -46,7 +48,8 @@ import org.apache.hadoop.util.StringUtils;
|
||||||
@InterfaceAudience.Public
|
@InterfaceAudience.Public
|
||||||
@InterfaceStability.Stable
|
@InterfaceStability.Stable
|
||||||
public class TableRecordReaderImpl {
|
public class TableRecordReaderImpl {
|
||||||
|
public static final String LOG_PER_ROW_COUNT
|
||||||
|
= "hbase.mapreduce.log.scanner.rowcount";
|
||||||
|
|
||||||
static final Log LOG = LogFactory.getLog(TableRecordReader.class);
|
static final Log LOG = LogFactory.getLog(TableRecordReader.class);
|
||||||
|
|
||||||
|
@ -62,6 +65,10 @@ public class TableRecordReaderImpl {
|
||||||
private Result value = null;
|
private Result value = null;
|
||||||
private TaskAttemptContext context = null;
|
private TaskAttemptContext context = null;
|
||||||
private Method getCounter = null;
|
private Method getCounter = null;
|
||||||
|
private long timestamp;
|
||||||
|
private int rowcount;
|
||||||
|
private boolean logScannerActivity = false;
|
||||||
|
private int logPerRowCount = 100;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Restart from survivable exceptions by creating a new scanner.
|
* Restart from survivable exceptions by creating a new scanner.
|
||||||
|
@ -75,6 +82,11 @@ public class TableRecordReaderImpl {
|
||||||
currentScan.setAttribute(Scan.SCAN_ATTRIBUTES_METRICS_ENABLE,
|
currentScan.setAttribute(Scan.SCAN_ATTRIBUTES_METRICS_ENABLE,
|
||||||
Bytes.toBytes(Boolean.TRUE));
|
Bytes.toBytes(Boolean.TRUE));
|
||||||
this.scanner = this.htable.getScanner(currentScan);
|
this.scanner = this.htable.getScanner(currentScan);
|
||||||
|
if (logScannerActivity) {
|
||||||
|
LOG.info("Current scan=" + currentScan.toString());
|
||||||
|
timestamp = System.currentTimeMillis();
|
||||||
|
rowcount = 0;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -103,6 +115,10 @@ public class TableRecordReaderImpl {
|
||||||
* @param htable The {@link HTable} to scan.
|
* @param htable The {@link HTable} to scan.
|
||||||
*/
|
*/
|
||||||
public void setHTable(HTable htable) {
|
public void setHTable(HTable htable) {
|
||||||
|
Configuration conf = htable.getConfiguration();
|
||||||
|
logScannerActivity = conf.getBoolean(
|
||||||
|
ScannerCallable.LOG_SCANNER_ACTIVITY, false);
|
||||||
|
logPerRowCount = conf.getInt(LOG_PER_ROW_COUNT, 100);
|
||||||
this.htable = htable;
|
this.htable = htable;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -174,33 +190,56 @@ public class TableRecordReaderImpl {
|
||||||
if (key == null) key = new ImmutableBytesWritable();
|
if (key == null) key = new ImmutableBytesWritable();
|
||||||
if (value == null) value = new Result();
|
if (value == null) value = new Result();
|
||||||
try {
|
try {
|
||||||
value = this.scanner.next();
|
try {
|
||||||
} catch (DoNotRetryIOException e) {
|
value = this.scanner.next();
|
||||||
throw e;
|
if (logScannerActivity) {
|
||||||
} catch (IOException e) {
|
rowcount ++;
|
||||||
LOG.info("recovered from " + StringUtils.stringifyException(e));
|
if (rowcount >= logPerRowCount) {
|
||||||
if (lastSuccessfulRow == null) {
|
long now = System.currentTimeMillis();
|
||||||
LOG.warn("We are restarting the first next() invocation," +
|
LOG.info("Mapper took " + (now-timestamp)
|
||||||
" if your mapper's restarted a few other times like this" +
|
+ "ms to process " + rowcount + " rows");
|
||||||
" then you should consider killing this job and investigate" +
|
timestamp = now;
|
||||||
" why it's taking so long.");
|
rowcount = 0;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} catch (DoNotRetryIOException e) {
|
||||||
|
throw e;
|
||||||
|
} catch (IOException e) {
|
||||||
|
LOG.info("recovered from " + StringUtils.stringifyException(e));
|
||||||
|
if (lastSuccessfulRow == null) {
|
||||||
|
LOG.warn("We are restarting the first next() invocation," +
|
||||||
|
" if your mapper has restarted a few other times like this" +
|
||||||
|
" then you should consider killing this job and investigate" +
|
||||||
|
" why it's taking so long.");
|
||||||
|
}
|
||||||
|
if (lastSuccessfulRow == null) {
|
||||||
|
restart(scan.getStartRow());
|
||||||
|
} else {
|
||||||
|
restart(lastSuccessfulRow);
|
||||||
|
scanner.next(); // skip presumed already mapped row
|
||||||
|
}
|
||||||
|
value = scanner.next();
|
||||||
}
|
}
|
||||||
if (lastSuccessfulRow == null) {
|
if (value != null && value.size() > 0) {
|
||||||
restart(scan.getStartRow());
|
key.set(value.getRow());
|
||||||
} else {
|
lastSuccessfulRow = key.get();
|
||||||
restart(lastSuccessfulRow);
|
return true;
|
||||||
scanner.next(); // skip presumed already mapped row
|
|
||||||
}
|
}
|
||||||
value = scanner.next();
|
|
||||||
}
|
|
||||||
if (value != null && value.size() > 0) {
|
|
||||||
key.set(value.getRow());
|
|
||||||
lastSuccessfulRow = key.get();
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
updateCounters();
|
updateCounters();
|
||||||
return false;
|
return false;
|
||||||
|
} catch (IOException ioe) {
|
||||||
|
if (logScannerActivity) {
|
||||||
|
long now = System.currentTimeMillis();
|
||||||
|
LOG.info("Mapper took " + (now-timestamp)
|
||||||
|
+ "ms to process " + rowcount + " rows");
|
||||||
|
LOG.info(ioe);
|
||||||
|
String lastRow = lastSuccessfulRow == null ?
|
||||||
|
"null" : Bytes.toStringBinary(lastSuccessfulRow);
|
||||||
|
LOG.info("lastSuccessfulRow=" + lastRow);
|
||||||
|
}
|
||||||
|
throw ioe;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
Loading…
Reference in New Issue