HDFS-5335. Hive query failed with possible race in dfs output stream. Contributed by Haohui Mai.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1531152 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
f218527fff
commit
f2f5cdb555
|
@ -364,6 +364,9 @@ Release 2.2.1 - UNRELEASED
|
|||
HDFS-5337. should do hsync for a commit request even there is no pending
|
||||
writes (brandonli)
|
||||
|
||||
HDFS-5335. Hive query failed with possible race in dfs output stream.
|
||||
(Haohui Mai via suresh)
|
||||
|
||||
Release 2.2.0 - 2013-10-13
|
||||
|
||||
INCOMPATIBLE CHANGES
|
||||
|
|
|
@ -38,6 +38,7 @@ import java.util.LinkedList;
|
|||
import java.util.List;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.fs.CanSetDropBehind;
|
||||
|
@ -85,7 +86,6 @@ import org.apache.hadoop.util.Daemon;
|
|||
import org.apache.hadoop.util.DataChecksum;
|
||||
import org.apache.hadoop.util.Progressable;
|
||||
import org.apache.hadoop.util.Time;
|
||||
import org.mortbay.log.Log;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.cache.CacheBuilder;
|
||||
|
@ -141,7 +141,7 @@ public class DFSOutputStream extends FSOutputSummer
|
|||
private long bytesCurBlock = 0; // bytes writen in current block
|
||||
private int packetSize = 0; // write packet size, not including the header.
|
||||
private int chunksPerPacket = 0;
|
||||
private volatile IOException lastException = null;
|
||||
private final AtomicReference<IOException> lastException = new AtomicReference<IOException>();
|
||||
private long artificialSlowdown = 0;
|
||||
private long lastFlushOffset = 0; // offset when flush was invoked
|
||||
//persist blocks on namenode
|
||||
|
@ -809,8 +809,8 @@ public class DFSOutputStream extends FSOutputSummer
|
|||
if (++pipelineRecoveryCount > 5) {
|
||||
DFSClient.LOG.warn("Error recovering pipeline for writing " +
|
||||
block + ". Already retried 5 times for the same packet.");
|
||||
lastException = new IOException("Failing write. Tried pipeline " +
|
||||
"recovery 5 times without success.");
|
||||
lastException.set(new IOException("Failing write. Tried pipeline " +
|
||||
"recovery 5 times without success."));
|
||||
streamerClosed = true;
|
||||
return false;
|
||||
}
|
||||
|
@ -1000,8 +1000,8 @@ public class DFSOutputStream extends FSOutputSummer
|
|||
}
|
||||
}
|
||||
if (nodes.length <= 1) {
|
||||
lastException = new IOException("All datanodes " + pipelineMsg
|
||||
+ " are bad. Aborting...");
|
||||
lastException.set(new IOException("All datanodes " + pipelineMsg
|
||||
+ " are bad. Aborting..."));
|
||||
streamerClosed = true;
|
||||
return false;
|
||||
}
|
||||
|
@ -1016,7 +1016,7 @@ public class DFSOutputStream extends FSOutputSummer
|
|||
newnodes.length-errorIndex);
|
||||
nodes = newnodes;
|
||||
hasError = false;
|
||||
lastException = null;
|
||||
lastException.set(null);
|
||||
errorIndex = -1;
|
||||
}
|
||||
|
||||
|
@ -1060,7 +1060,7 @@ public class DFSOutputStream extends FSOutputSummer
|
|||
ExtendedBlock oldBlock = block;
|
||||
do {
|
||||
hasError = false;
|
||||
lastException = null;
|
||||
lastException.set(null);
|
||||
errorIndex = -1;
|
||||
success = false;
|
||||
|
||||
|
@ -1275,9 +1275,7 @@ public class DFSOutputStream extends FSOutputSummer
|
|||
}
|
||||
|
||||
private void setLastException(IOException e) {
|
||||
if (lastException == null) {
|
||||
lastException = e;
|
||||
}
|
||||
lastException.compareAndSet(null, e);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1309,7 +1307,7 @@ public class DFSOutputStream extends FSOutputSummer
|
|||
|
||||
protected void checkClosed() throws IOException {
|
||||
if (closed) {
|
||||
IOException e = lastException;
|
||||
IOException e = lastException.get();
|
||||
throw e != null ? e : new ClosedChannelException();
|
||||
}
|
||||
}
|
||||
|
@ -1465,6 +1463,7 @@ public class DFSOutputStream extends FSOutputSummer
|
|||
|
||||
private void waitAndQueueCurrentPacket() throws IOException {
|
||||
synchronized (dataQueue) {
|
||||
try {
|
||||
// If queue is full, then wait till we have enough space
|
||||
while (!closed && dataQueue.size() + ackQueue.size() > MAX_PACKETS) {
|
||||
try {
|
||||
|
@ -1483,6 +1482,8 @@ public class DFSOutputStream extends FSOutputSummer
|
|||
}
|
||||
checkClosed();
|
||||
queueCurrentPacket();
|
||||
} catch (ClosedChannelException e) {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1726,7 +1727,7 @@ public class DFSOutputStream extends FSOutputSummer
|
|||
DFSClient.LOG.warn("Error while syncing", e);
|
||||
synchronized (this) {
|
||||
if (!closed) {
|
||||
lastException = new IOException("IOException flush:" + e);
|
||||
lastException.set(new IOException("IOException flush:" + e));
|
||||
closeThreads(true);
|
||||
}
|
||||
}
|
||||
|
@ -1784,21 +1785,25 @@ public class DFSOutputStream extends FSOutputSummer
|
|||
if (DFSClient.LOG.isDebugEnabled()) {
|
||||
DFSClient.LOG.debug("Waiting for ack for: " + seqno);
|
||||
}
|
||||
synchronized (dataQueue) {
|
||||
while (!closed) {
|
||||
checkClosed();
|
||||
if (lastAckedSeqno >= seqno) {
|
||||
break;
|
||||
}
|
||||
try {
|
||||
dataQueue.wait(1000); // when we receive an ack, we notify on dataQueue
|
||||
} catch (InterruptedException ie) {
|
||||
throw new InterruptedIOException(
|
||||
"Interrupted while waiting for data to be acknowledged by pipeline");
|
||||
try {
|
||||
synchronized (dataQueue) {
|
||||
while (!closed) {
|
||||
checkClosed();
|
||||
if (lastAckedSeqno >= seqno) {
|
||||
break;
|
||||
}
|
||||
try {
|
||||
dataQueue.wait(1000); // when we receive an ack, we notify on
|
||||
// dataQueue
|
||||
} catch (InterruptedException ie) {
|
||||
throw new InterruptedIOException(
|
||||
"Interrupted while waiting for data to be acknowledged by pipeline");
|
||||
}
|
||||
}
|
||||
}
|
||||
checkClosed();
|
||||
} catch (ClosedChannelException e) {
|
||||
}
|
||||
checkClosed();
|
||||
}
|
||||
|
||||
private synchronized void start() {
|
||||
|
@ -1844,7 +1849,7 @@ public class DFSOutputStream extends FSOutputSummer
|
|||
@Override
|
||||
public synchronized void close() throws IOException {
|
||||
if (closed) {
|
||||
IOException e = lastException;
|
||||
IOException e = lastException.getAndSet(null);
|
||||
if (e == null)
|
||||
return;
|
||||
else
|
||||
|
@ -1872,6 +1877,7 @@ public class DFSOutputStream extends FSOutputSummer
|
|||
closeThreads(false);
|
||||
completeFile(lastBlock);
|
||||
dfsClient.endFileLease(src);
|
||||
} catch (ClosedChannelException e) {
|
||||
} finally {
|
||||
closed = true;
|
||||
}
|
||||
|
|
|
@ -0,0 +1,74 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hdfs;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
import junit.framework.Assert;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
import org.mockito.internal.util.reflection.Whitebox;
|
||||
|
||||
public class TestDFSOutputStream {
|
||||
static MiniDFSCluster cluster;
|
||||
|
||||
@BeforeClass
|
||||
public static void setup() throws IOException {
|
||||
Configuration conf = new Configuration();
|
||||
cluster = new MiniDFSCluster.Builder(conf).build();
|
||||
}
|
||||
|
||||
/**
|
||||
* The close() method of DFSOutputStream should never throw the same exception
|
||||
* twice. See HDFS-5335 for details.
|
||||
*/
|
||||
@Test
|
||||
public void testCloseTwice() throws IOException {
|
||||
DistributedFileSystem fs = cluster.getFileSystem();
|
||||
FSDataOutputStream os = fs.create(new Path("/test"));
|
||||
DFSOutputStream dos = (DFSOutputStream) Whitebox.getInternalState(os,
|
||||
"wrappedStream");
|
||||
@SuppressWarnings("unchecked")
|
||||
AtomicReference<IOException> ex = (AtomicReference<IOException>) Whitebox
|
||||
.getInternalState(dos, "lastException");
|
||||
Assert.assertEquals(null, ex.get());
|
||||
|
||||
dos.close();
|
||||
|
||||
IOException dummy = new IOException("dummy");
|
||||
ex.set(dummy);
|
||||
try {
|
||||
dos.close();
|
||||
} catch (IOException e) {
|
||||
Assert.assertEquals(e, dummy);
|
||||
}
|
||||
Assert.assertEquals(null, ex.get());
|
||||
dos.close();
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void tearDown() {
|
||||
cluster.shutdown();
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue