From 2a60a8c3c409083d0727ac306e0376be8e3d4ede Mon Sep 17 00:00:00 2001 From: Suresh Srinivas Date: Fri, 11 Oct 2013 00:03:28 +0000 Subject: [PATCH] HDFS-5335. Merge 1531152 from trunk git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1531153 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 3 + .../apache/hadoop/hdfs/DFSOutputStream.java | 58 ++++++++------- .../hadoop/hdfs/TestDFSOutputStream.java | 74 +++++++++++++++++++ 3 files changed, 109 insertions(+), 26 deletions(-) create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSOutputStream.java diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 25171c6bd29..daaf0a2ddc3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -120,6 +120,9 @@ Release 2.2.1 - UNRELEASED HDFS-5337. should do hsync for a commit request even there is no pending writes (brandonli) + HDFS-5335. Hive query failed with possible race in dfs output stream. + (Haohui Mai via suresh) + Release 2.2.0 - 2013-10-13 INCOMPATIBLE CHANGES diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java index 0747f418a21..80294af440c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java @@ -38,6 +38,7 @@ import java.util.LinkedList; import java.util.List; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.fs.CanSetDropBehind; @@ -85,7 +86,6 @@ import org.apache.hadoop.util.Daemon; import org.apache.hadoop.util.DataChecksum; import org.apache.hadoop.util.Progressable; import org.apache.hadoop.util.Time; -import org.mortbay.log.Log; import com.google.common.annotations.VisibleForTesting; import com.google.common.cache.CacheBuilder; @@ -141,7 +141,7 @@ public class DFSOutputStream extends FSOutputSummer private long bytesCurBlock = 0; // bytes writen in current block private int packetSize = 0; // write packet size, not including the header. private int chunksPerPacket = 0; - private volatile IOException lastException = null; + private final AtomicReference lastException = new AtomicReference(); private long artificialSlowdown = 0; private long lastFlushOffset = 0; // offset when flush was invoked //persist blocks on namenode @@ -814,8 +814,8 @@ public class DFSOutputStream extends FSOutputSummer if (++pipelineRecoveryCount > 5) { DFSClient.LOG.warn("Error recovering pipeline for writing " + block + ". Already retried 5 times for the same packet."); - lastException = new IOException("Failing write. Tried pipeline " + - "recovery 5 times without success."); + lastException.set(new IOException("Failing write. Tried pipeline " + + "recovery 5 times without success.")); streamerClosed = true; return false; } @@ -1005,8 +1005,8 @@ public class DFSOutputStream extends FSOutputSummer } } if (nodes.length <= 1) { - lastException = new IOException("All datanodes " + pipelineMsg - + " are bad. Aborting..."); + lastException.set(new IOException("All datanodes " + pipelineMsg + + " are bad. Aborting...")); streamerClosed = true; return false; } @@ -1021,7 +1021,7 @@ public class DFSOutputStream extends FSOutputSummer newnodes.length-errorIndex); nodes = newnodes; hasError = false; - lastException = null; + lastException.set(null); errorIndex = -1; } @@ -1065,7 +1065,7 @@ public class DFSOutputStream extends FSOutputSummer ExtendedBlock oldBlock = block; do { hasError = false; - lastException = null; + lastException.set(null); errorIndex = -1; success = false; @@ -1279,9 +1279,7 @@ public class DFSOutputStream extends FSOutputSummer } private void setLastException(IOException e) { - if (lastException == null) { - lastException = e; - } + lastException.compareAndSet(null, e); } } @@ -1313,7 +1311,7 @@ public class DFSOutputStream extends FSOutputSummer protected void checkClosed() throws IOException { if (closed) { - IOException e = lastException; + IOException e = lastException.get(); throw e != null ? e : new ClosedChannelException(); } } @@ -1469,6 +1467,7 @@ public class DFSOutputStream extends FSOutputSummer private void waitAndQueueCurrentPacket() throws IOException { synchronized (dataQueue) { + try { // If queue is full, then wait till we have enough space while (!closed && dataQueue.size() + ackQueue.size() > MAX_PACKETS) { try { @@ -1487,6 +1486,8 @@ public class DFSOutputStream extends FSOutputSummer } checkClosed(); queueCurrentPacket(); + } catch (ClosedChannelException e) { + } } } @@ -1735,7 +1736,7 @@ public class DFSOutputStream extends FSOutputSummer DFSClient.LOG.warn("Error while syncing", e); synchronized (this) { if (!closed) { - lastException = new IOException("IOException flush:" + e); + lastException.set(new IOException("IOException flush:" + e)); closeThreads(true); } } @@ -1793,21 +1794,25 @@ public class DFSOutputStream extends FSOutputSummer if (DFSClient.LOG.isDebugEnabled()) { DFSClient.LOG.debug("Waiting for ack for: " + seqno); } - synchronized (dataQueue) { - while (!closed) { - checkClosed(); - if (lastAckedSeqno >= seqno) { - break; - } - try { - dataQueue.wait(1000); // when we receive an ack, we notify on dataQueue - } catch (InterruptedException ie) { - throw new InterruptedIOException( - "Interrupted while waiting for data to be acknowledged by pipeline"); + try { + synchronized (dataQueue) { + while (!closed) { + checkClosed(); + if (lastAckedSeqno >= seqno) { + break; + } + try { + dataQueue.wait(1000); // when we receive an ack, we notify on + // dataQueue + } catch (InterruptedException ie) { + throw new InterruptedIOException( + "Interrupted while waiting for data to be acknowledged by pipeline"); + } } } + checkClosed(); + } catch (ClosedChannelException e) { } - checkClosed(); } private synchronized void start() { @@ -1853,7 +1858,7 @@ public class DFSOutputStream extends FSOutputSummer @Override public synchronized void close() throws IOException { if (closed) { - IOException e = lastException; + IOException e = lastException.getAndSet(null); if (e == null) return; else @@ -1880,6 +1885,7 @@ public class DFSOutputStream extends FSOutputSummer closeThreads(false); completeFile(lastBlock); dfsClient.endFileLease(src); + } catch (ClosedChannelException e) { } finally { closed = true; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSOutputStream.java new file mode 100644 index 00000000000..86b732f992a --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSOutputStream.java @@ -0,0 +1,74 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs; + +import java.io.IOException; +import java.util.concurrent.atomic.AtomicReference; + +import junit.framework.Assert; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.Path; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.mockito.internal.util.reflection.Whitebox; + +public class TestDFSOutputStream { + static MiniDFSCluster cluster; + + @BeforeClass + public static void setup() throws IOException { + Configuration conf = new Configuration(); + cluster = new MiniDFSCluster.Builder(conf).build(); + } + + /** + * The close() method of DFSOutputStream should never throw the same exception + * twice. See HDFS-5335 for details. + */ + @Test + public void testCloseTwice() throws IOException { + DistributedFileSystem fs = cluster.getFileSystem(); + FSDataOutputStream os = fs.create(new Path("/test")); + DFSOutputStream dos = (DFSOutputStream) Whitebox.getInternalState(os, + "wrappedStream"); + @SuppressWarnings("unchecked") + AtomicReference ex = (AtomicReference) Whitebox + .getInternalState(dos, "lastException"); + Assert.assertEquals(null, ex.get()); + + dos.close(); + + IOException dummy = new IOException("dummy"); + ex.set(dummy); + try { + dos.close(); + } catch (IOException e) { + Assert.assertEquals(e, dummy); + } + Assert.assertEquals(null, ex.get()); + dos.close(); + } + + @AfterClass + public static void tearDown() { + cluster.shutdown(); + } +}