HDFS-5335. Merge 1531152 from trunk

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1531153 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Suresh Srinivas 2013-10-11 00:03:28 +00:00
parent 5c0382166f
commit 2a60a8c3c4
3 changed files with 109 additions and 26 deletions

View File

@ -120,6 +120,9 @@ Release 2.2.1 - UNRELEASED
HDFS-5337. should do hsync for a commit request even there is no pending HDFS-5337. should do hsync for a commit request even there is no pending
writes (brandonli) writes (brandonli)
HDFS-5335. Hive query failed with possible race in dfs output stream.
(Haohui Mai via suresh)
Release 2.2.0 - 2013-10-13 Release 2.2.0 - 2013-10-13
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -38,6 +38,7 @@ import java.util.LinkedList;
import java.util.List; import java.util.List;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.fs.CanSetDropBehind; import org.apache.hadoop.fs.CanSetDropBehind;
@ -85,7 +86,6 @@ import org.apache.hadoop.util.Daemon;
import org.apache.hadoop.util.DataChecksum; import org.apache.hadoop.util.DataChecksum;
import org.apache.hadoop.util.Progressable; import org.apache.hadoop.util.Progressable;
import org.apache.hadoop.util.Time; import org.apache.hadoop.util.Time;
import org.mortbay.log.Log;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import com.google.common.cache.CacheBuilder; import com.google.common.cache.CacheBuilder;
@ -141,7 +141,7 @@ public class DFSOutputStream extends FSOutputSummer
private long bytesCurBlock = 0; // bytes writen in current block private long bytesCurBlock = 0; // bytes writen in current block
private int packetSize = 0; // write packet size, not including the header. private int packetSize = 0; // write packet size, not including the header.
private int chunksPerPacket = 0; private int chunksPerPacket = 0;
private volatile IOException lastException = null; private final AtomicReference<IOException> lastException = new AtomicReference<IOException>();
private long artificialSlowdown = 0; private long artificialSlowdown = 0;
private long lastFlushOffset = 0; // offset when flush was invoked private long lastFlushOffset = 0; // offset when flush was invoked
//persist blocks on namenode //persist blocks on namenode
@ -814,8 +814,8 @@ public class DFSOutputStream extends FSOutputSummer
if (++pipelineRecoveryCount > 5) { if (++pipelineRecoveryCount > 5) {
DFSClient.LOG.warn("Error recovering pipeline for writing " + DFSClient.LOG.warn("Error recovering pipeline for writing " +
block + ". Already retried 5 times for the same packet."); block + ". Already retried 5 times for the same packet.");
lastException = new IOException("Failing write. Tried pipeline " + lastException.set(new IOException("Failing write. Tried pipeline " +
"recovery 5 times without success."); "recovery 5 times without success."));
streamerClosed = true; streamerClosed = true;
return false; return false;
} }
@ -1005,8 +1005,8 @@ public class DFSOutputStream extends FSOutputSummer
} }
} }
if (nodes.length <= 1) { if (nodes.length <= 1) {
lastException = new IOException("All datanodes " + pipelineMsg lastException.set(new IOException("All datanodes " + pipelineMsg
+ " are bad. Aborting..."); + " are bad. Aborting..."));
streamerClosed = true; streamerClosed = true;
return false; return false;
} }
@ -1021,7 +1021,7 @@ public class DFSOutputStream extends FSOutputSummer
newnodes.length-errorIndex); newnodes.length-errorIndex);
nodes = newnodes; nodes = newnodes;
hasError = false; hasError = false;
lastException = null; lastException.set(null);
errorIndex = -1; errorIndex = -1;
} }
@ -1065,7 +1065,7 @@ public class DFSOutputStream extends FSOutputSummer
ExtendedBlock oldBlock = block; ExtendedBlock oldBlock = block;
do { do {
hasError = false; hasError = false;
lastException = null; lastException.set(null);
errorIndex = -1; errorIndex = -1;
success = false; success = false;
@ -1279,9 +1279,7 @@ public class DFSOutputStream extends FSOutputSummer
} }
private void setLastException(IOException e) { private void setLastException(IOException e) {
if (lastException == null) { lastException.compareAndSet(null, e);
lastException = e;
}
} }
} }
@ -1313,7 +1311,7 @@ public class DFSOutputStream extends FSOutputSummer
protected void checkClosed() throws IOException { protected void checkClosed() throws IOException {
if (closed) { if (closed) {
IOException e = lastException; IOException e = lastException.get();
throw e != null ? e : new ClosedChannelException(); throw e != null ? e : new ClosedChannelException();
} }
} }
@ -1469,6 +1467,7 @@ public class DFSOutputStream extends FSOutputSummer
private void waitAndQueueCurrentPacket() throws IOException { private void waitAndQueueCurrentPacket() throws IOException {
synchronized (dataQueue) { synchronized (dataQueue) {
try {
// If queue is full, then wait till we have enough space // If queue is full, then wait till we have enough space
while (!closed && dataQueue.size() + ackQueue.size() > MAX_PACKETS) { while (!closed && dataQueue.size() + ackQueue.size() > MAX_PACKETS) {
try { try {
@ -1487,6 +1486,8 @@ public class DFSOutputStream extends FSOutputSummer
} }
checkClosed(); checkClosed();
queueCurrentPacket(); queueCurrentPacket();
} catch (ClosedChannelException e) {
}
} }
} }
@ -1735,7 +1736,7 @@ public class DFSOutputStream extends FSOutputSummer
DFSClient.LOG.warn("Error while syncing", e); DFSClient.LOG.warn("Error while syncing", e);
synchronized (this) { synchronized (this) {
if (!closed) { if (!closed) {
lastException = new IOException("IOException flush:" + e); lastException.set(new IOException("IOException flush:" + e));
closeThreads(true); closeThreads(true);
} }
} }
@ -1793,21 +1794,25 @@ public class DFSOutputStream extends FSOutputSummer
if (DFSClient.LOG.isDebugEnabled()) { if (DFSClient.LOG.isDebugEnabled()) {
DFSClient.LOG.debug("Waiting for ack for: " + seqno); DFSClient.LOG.debug("Waiting for ack for: " + seqno);
} }
synchronized (dataQueue) { try {
while (!closed) { synchronized (dataQueue) {
checkClosed(); while (!closed) {
if (lastAckedSeqno >= seqno) { checkClosed();
break; if (lastAckedSeqno >= seqno) {
} break;
try { }
dataQueue.wait(1000); // when we receive an ack, we notify on dataQueue try {
} catch (InterruptedException ie) { dataQueue.wait(1000); // when we receive an ack, we notify on
throw new InterruptedIOException( // dataQueue
"Interrupted while waiting for data to be acknowledged by pipeline"); } catch (InterruptedException ie) {
throw new InterruptedIOException(
"Interrupted while waiting for data to be acknowledged by pipeline");
}
} }
} }
checkClosed();
} catch (ClosedChannelException e) {
} }
checkClosed();
} }
private synchronized void start() { private synchronized void start() {
@ -1853,7 +1858,7 @@ public class DFSOutputStream extends FSOutputSummer
@Override @Override
public synchronized void close() throws IOException { public synchronized void close() throws IOException {
if (closed) { if (closed) {
IOException e = lastException; IOException e = lastException.getAndSet(null);
if (e == null) if (e == null)
return; return;
else else
@ -1880,6 +1885,7 @@ public class DFSOutputStream extends FSOutputSummer
closeThreads(false); closeThreads(false);
completeFile(lastBlock); completeFile(lastBlock);
dfsClient.endFileLease(src); dfsClient.endFileLease(src);
} catch (ClosedChannelException e) {
} finally { } finally {
closed = true; closed = true;
} }

View File

@ -0,0 +1,74 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicReference;
import junit.framework.Assert;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.mockito.internal.util.reflection.Whitebox;
public class TestDFSOutputStream {
static MiniDFSCluster cluster;
@BeforeClass
public static void setup() throws IOException {
Configuration conf = new Configuration();
cluster = new MiniDFSCluster.Builder(conf).build();
}
/**
* The close() method of DFSOutputStream should never throw the same exception
* twice. See HDFS-5335 for details.
*/
@Test
public void testCloseTwice() throws IOException {
DistributedFileSystem fs = cluster.getFileSystem();
FSDataOutputStream os = fs.create(new Path("/test"));
DFSOutputStream dos = (DFSOutputStream) Whitebox.getInternalState(os,
"wrappedStream");
@SuppressWarnings("unchecked")
AtomicReference<IOException> ex = (AtomicReference<IOException>) Whitebox
.getInternalState(dos, "lastException");
Assert.assertEquals(null, ex.get());
dos.close();
IOException dummy = new IOException("dummy");
ex.set(dummy);
try {
dos.close();
} catch (IOException e) {
Assert.assertEquals(e, dummy);
}
Assert.assertEquals(null, ex.get());
dos.close();
}
@AfterClass
public static void tearDown() {
cluster.shutdown();
}
}