From 8b64fbab1a4c7d65a5daf515c2d170d6a2fd4917 Mon Sep 17 00:00:00 2001 From: Inigo Goiri Date: Fri, 19 Oct 2018 10:36:03 -0700 Subject: [PATCH] HDFS-13994. Improve DataNode BlockSender waitForMinLength. Contributed by BELUGA BEHR. --- .../hdfs/server/datanode/BlockSender.java | 27 +-------- .../datanode/LocalReplicaInPipeline.java | 57 ++++++++++++++++--- .../server/datanode/ReplicaInPipeline.java | 14 +++++ .../server/datanode/SimulatedFSDataset.java | 18 ++++++ .../extdataset/ExternalReplicaInPipeline.java | 6 ++ 5 files changed, 88 insertions(+), 34 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java index bff47fa6ba6..74c1025171f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java @@ -30,6 +30,7 @@ import java.net.SocketTimeoutException; import java.nio.ByteBuffer; import java.nio.channels.FileChannel; import java.util.Arrays; +import java.util.concurrent.TimeUnit; import org.apache.commons.logging.Log; import org.apache.hadoop.fs.ChecksumException; @@ -258,7 +259,7 @@ class BlockSender implements java.io.Closeable { } if (replica.getState() == ReplicaState.RBW) { final ReplicaInPipeline rbw = (ReplicaInPipeline) replica; - waitForMinLength(rbw, startOffset + length); + rbw.waitForMinLength(startOffset + length, 3, TimeUnit.SECONDS); chunkChecksum = rbw.getLastChecksumAndDataLen(); } if (replica instanceof FinalizedReplica) { @@ -493,30 +494,6 @@ class BlockSender implements java.io.Closeable { } return replica; } - - /** - * Wait for rbw replica to reach the length - * @param rbw replica that is being written to - * @param len minimum length to reach - * @throws IOException on failing to reach the len in given wait time - */ - private static void waitForMinLength(ReplicaInPipeline rbw, long len) - throws IOException { - // Wait for 3 seconds for rbw replica to reach the minimum length - for (int i = 0; i < 30 && rbw.getBytesOnDisk() < len; i++) { - try { - Thread.sleep(100); - } catch (InterruptedException ie) { - throw new IOException(ie); - } - } - long bytesOnDisk = rbw.getBytesOnDisk(); - if (bytesOnDisk < len) { - throw new IOException( - String.format("Need %d bytes, but only %d bytes available", len, - bytesOnDisk)); - } - } /** * Converts an IOExcpetion (not subclasses) to SocketException. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/LocalReplicaInPipeline.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/LocalReplicaInPipeline.java index 345c329c622..99d2fc8e04e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/LocalReplicaInPipeline.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/LocalReplicaInPipeline.java @@ -22,7 +22,11 @@ import java.io.FileOutputStream; import java.io.IOException; import java.io.OutputStream; import java.io.RandomAccessFile; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState; @@ -42,7 +46,11 @@ import org.apache.hadoop.util.StringUtils; * The base class implements a temporary replica */ public class LocalReplicaInPipeline extends LocalReplica - implements ReplicaInPipeline { + implements ReplicaInPipeline { + + private final Lock lock = new ReentrantLock(); + private final Condition bytesOnDiskChange = lock.newCondition(); + private long bytesAcked; private long bytesOnDisk; private byte[] lastChecksum; @@ -167,16 +175,47 @@ public class LocalReplicaInPipeline extends LocalReplica bytesReserved = 0; } - @Override // ReplicaInPipeline - public synchronized void setLastChecksumAndDataLen(long dataLength, - byte[] checksum) { - this.bytesOnDisk = dataLength; - this.lastChecksum = checksum; + @Override + public void setLastChecksumAndDataLen(long dataLength, byte[] checksum) { + lock.lock(); + try { + this.bytesOnDisk = dataLength; + this.lastChecksum = checksum; + bytesOnDiskChange.signalAll(); + } finally { + lock.unlock(); + } } - @Override // ReplicaInPipeline - public synchronized ChunkChecksum getLastChecksumAndDataLen() { - return new ChunkChecksum(getBytesOnDisk(), lastChecksum); + @Override + public ChunkChecksum getLastChecksumAndDataLen() { + lock.lock(); + try { + return new ChunkChecksum(getBytesOnDisk(), lastChecksum); + } finally { + lock.unlock(); + } + } + + @Override + public void waitForMinLength(long minLength, long time, TimeUnit unit) + throws IOException { + long nanos = unit.toNanos(time); + lock.lock(); + try { + while (bytesOnDisk < minLength) { + if (nanos <= 0L) { + throw new IOException( + String.format("Need %d bytes, but only %d bytes available", + minLength, bytesOnDisk)); + } + nanos = bytesOnDiskChange.awaitNanos(nanos); + } + } catch (InterruptedException e) { + throw new IOException(e); + } finally { + lock.unlock(); + } } @Override // ReplicaInPipeline diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java index efa6ea686f4..174827b5a20 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hdfs.server.datanode; import java.io.IOException; import java.io.OutputStream; +import java.util.concurrent.TimeUnit; import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaOutputStreams; import org.apache.hadoop.util.DataChecksum; @@ -104,4 +105,17 @@ public interface ReplicaInPipeline extends Replica { * @throws IOException the waiting is interrupted */ void stopWriter(long xceiverStopTimeout) throws IOException; + + /** + * Causes the current thread to wait until a minimum length is reached, the + * thread is interrupted, or the specified waiting time elapses. + * + * @param minLength The minimum length to achieve + * @param time the maximum time to wait + * @param unit the time unit of the time argument + * @throws IOException if the current thread is interrupted or the minimum + * length is not achieved within the time allowed. + */ + void waitForMinLength(long minLength, long time, TimeUnit unit) + throws IOException; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java index 0dc2733e94a..cb66c636e93 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java @@ -33,6 +33,7 @@ import java.util.Objects; import java.util.Set; import java.util.TreeMap; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; import javax.management.NotCompliantMBeanException; import javax.management.ObjectName; @@ -396,6 +397,23 @@ public class SimulatedFSDataset implements FsDatasetSpi { @Override public void stopWriter(long xceiverStopTimeout) throws IOException { } + + @Override + public void waitForMinLength(long minLength, long time, TimeUnit unit) + throws IOException { + final long deadLine = System.currentTimeMillis() + unit.toMillis(time); + do { + if (getBytesOnDisk() >= minLength) { + return; + } + try { + Thread.sleep(100L); + } catch (InterruptedException e) { + throw new IOException(e); + } + } while (deadLine > System.currentTimeMillis()); + throw new IOException("Minimum length was not achieved within timeout"); + } } /** diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalReplicaInPipeline.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalReplicaInPipeline.java index 5c172e5f1ca..b135411203f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalReplicaInPipeline.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalReplicaInPipeline.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs.server.datanode.extdataset; import java.io.IOException; import java.io.OutputStream; +import java.util.concurrent.TimeUnit; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState; import org.apache.hadoop.hdfs.server.datanode.ChunkChecksum; @@ -129,4 +130,9 @@ public class ExternalReplicaInPipeline implements ReplicaInPipeline { @Override public void interruptThread() { } + + @Override + public void waitForMinLength(long minLength, long time, TimeUnit unit) + throws IOException { + } }