HDFS-9829. Erasure Coding: Improve few exception handling logic of ErasureCodingWorker. Contributed by Rakesh R.

This commit is contained in:
Jing Zhao 2016-02-19 10:40:11 -08:00
parent ba1c9d484a
commit 6546d9e7ff
2 changed files with 17 additions and 23 deletions

View File

@ -225,6 +225,9 @@ Trunk (Unreleased)
HDFS-9795. OIV Delimited should show which files are ACL-enabled (lei) HDFS-9795. OIV Delimited should show which files are ACL-enabled (lei)
HDFS-9829. Erasure Coding: Improve few exception handling logic of
ErasureCodingWorker. (Rakesh R via jing9)
OPTIMIZATIONS OPTIMIZATIONS
BUG FIXES BUG FIXES

View File

@ -124,7 +124,7 @@ public final class ErasureCodingWorker {
} }
private void initializeStripedReadThreadPool(int num) { private void initializeStripedReadThreadPool(int num) {
LOG.debug("Using striped reads; pool threads=" + num); LOG.debug("Using striped reads; pool threads={}", num);
EC_RECONSTRUCTION_STRIPED_READ_THREAD_POOL = new ThreadPoolExecutor(1, num, EC_RECONSTRUCTION_STRIPED_READ_THREAD_POOL = new ThreadPoolExecutor(1, num,
60, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), 60, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(),
@ -150,7 +150,7 @@ public final class ErasureCodingWorker {
} }
private void initializeStripedBlkReconstructionThreadPool(int num) { private void initializeStripedBlkReconstructionThreadPool(int num) {
LOG.debug("Using striped block reconstruction; pool threads=" + num); LOG.debug("Using striped block reconstruction; pool threads={}" + num);
EC_RECONSTRUCTION_STRIPED_BLK_THREAD_POOL = new ThreadPoolExecutor(2, num, EC_RECONSTRUCTION_STRIPED_BLK_THREAD_POOL = new ThreadPoolExecutor(2, num,
60, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), 60, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(),
new Daemon.DaemonFactory() { new Daemon.DaemonFactory() {
@ -180,8 +180,8 @@ public final class ErasureCodingWorker {
EC_RECONSTRUCTION_STRIPED_BLK_THREAD_POOL EC_RECONSTRUCTION_STRIPED_BLK_THREAD_POOL
.submit(new ReconstructAndTransferBlock(reconstructionInfo)); .submit(new ReconstructAndTransferBlock(reconstructionInfo));
} catch (Throwable e) { } catch (Throwable e) {
LOG.warn("Failed to reconstruct striped block " LOG.warn("Failed to reconstruct striped block {}",
+ reconstructionInfo.getExtendedBlock().getLocalBlock(), e); reconstructionInfo.getExtendedBlock().getLocalBlock(), e);
} }
} }
} }
@ -476,12 +476,12 @@ public final class ErasureCodingWorker {
// Currently we don't check the acks for packets, this is similar as // Currently we don't check the acks for packets, this is similar as
// block replication. // block replication.
} catch (Throwable e) { } catch (Throwable e) {
LOG.warn("Failed to reconstruct striped block: " + blockGroup, e); LOG.warn("Failed to reconstruct striped block: {}", blockGroup, e);
} finally { } finally {
datanode.decrementXmitsInProgress(); datanode.decrementXmitsInProgress();
// close block readers // close block readers
for (StripedReader stripedReader : stripedReaders) { for (StripedReader stripedReader : stripedReaders) {
closeBlockReader(stripedReader.blockReader); IOUtils.closeStream(stripedReader.blockReader);
} }
for (int i = 0; i < targets.length; i++) { for (int i = 0; i < targets.length; i++) {
IOUtils.closeStream(targetOutputStreams[i]); IOUtils.closeStream(targetOutputStreams[i]);
@ -588,7 +588,7 @@ public final class ErasureCodingWorker {
// If read failed for some source DN, we should not use it anymore // If read failed for some source DN, we should not use it anymore
// and schedule read from another source DN. // and schedule read from another source DN.
StripedReader failedReader = stripedReaders.get(result.index); StripedReader failedReader = stripedReaders.get(result.index);
closeBlockReader(failedReader.blockReader); IOUtils.closeStream(failedReader.blockReader);
failedReader.blockReader = null; failedReader.blockReader = null;
resultIndex = scheduleNewRead(used, reconstructLength, resultIndex = scheduleNewRead(used, reconstructLength,
corruptionMap); corruptionMap);
@ -609,6 +609,8 @@ public final class ErasureCodingWorker {
} }
} catch (InterruptedException e) { } catch (InterruptedException e) {
LOG.info("Read data interrupted.", e); LOG.info("Read data interrupted.", e);
cancelReads(futures.keySet());
futures.clear();
break; break;
} }
} }
@ -736,7 +738,7 @@ public final class ErasureCodingWorker {
StripedReader r = stripedReaders.get(i); StripedReader r = stripedReaders.get(i);
toRead = getReadLength(liveIndices[i], reconstructLen); toRead = getReadLength(liveIndices[i], reconstructLen);
if (toRead > 0) { if (toRead > 0) {
closeBlockReader(r.blockReader); IOUtils.closeStream(r.blockReader);
r.blockReader = newBlockReader( r.blockReader = newBlockReader(
getBlock(blockGroup, liveIndices[i]), positionInBlock, getBlock(blockGroup, liveIndices[i]), positionInBlock,
sources[i]); sources[i]);
@ -784,8 +786,8 @@ public final class ErasureCodingWorker {
actualReadFromBlock(reader.blockReader, buf); actualReadFromBlock(reader.blockReader, buf);
return null; return null;
} catch (ChecksumException e) { } catch (ChecksumException e) {
LOG.warn("Found Checksum error for " + reader.block + " from " LOG.warn("Found Checksum error for {} from {} at {}", reader.block,
+ reader.source + " at " + e.getPos()); reader.source, e.getPos());
addCorruptedBlock(reader.block, reader.source, corruptionMap); addCorruptedBlock(reader.block, reader.source, corruptionMap);
throw e; throw e;
} catch (IOException e) { } catch (IOException e) {
@ -837,17 +839,6 @@ public final class ErasureCodingWorker {
} }
} }
// close block reader
private void closeBlockReader(BlockReader blockReader) {
try {
if (blockReader != null) {
blockReader.close();
}
} catch (IOException e) {
// ignore
}
}
private InetSocketAddress getSocketAddress4Transfer(DatanodeInfo dnInfo) { private InetSocketAddress getSocketAddress4Transfer(DatanodeInfo dnInfo) {
return NetUtils.createSocketAddr(dnInfo.getXferAddr( return NetUtils.createSocketAddr(dnInfo.getXferAddr(
datanode.getDnConf().getConnectToDnViaHostname())); datanode.getDnConf().getConnectToDnViaHostname()));
@ -867,8 +858,6 @@ public final class ErasureCodingWorker {
* read directly from DN and need to check the replica is FINALIZED * read directly from DN and need to check the replica is FINALIZED
* state, notice we should not use short-circuit local read which * state, notice we should not use short-circuit local read which
* requires config for domain-socket in UNIX or legacy config in Windows. * requires config for domain-socket in UNIX or legacy config in Windows.
*
* TODO: add proper tracer
*/ */
return RemoteBlockReader2.newBlockReader( return RemoteBlockReader2.newBlockReader(
"dummy", block, blockToken, offsetInBlock, "dummy", block, blockToken, offsetInBlock,
@ -876,6 +865,8 @@ public final class ErasureCodingWorker {
"", newConnectedPeer(block, dnAddr, blockToken, dnInfo), dnInfo, "", newConnectedPeer(block, dnAddr, blockToken, dnInfo), dnInfo,
null, cachingStrategy, datanode.getTracer()); null, cachingStrategy, datanode.getTracer());
} catch (IOException e) { } catch (IOException e) {
LOG.debug("Exception while creating remote block reader, datanode {}",
dnInfo, e);
return null; return null;
} }
} }