HDFS-8300. Fix unit test failures and findbugs warning caused by HDFS-8283. Contributed by Jing Zhao.
(cherry picked from commit 98a6176628
)
This commit is contained in:
parent
34d686e6e9
commit
ec7b392965
|
@ -268,6 +268,9 @@ Release 2.8.0 - UNRELEASED
|
||||||
|
|
||||||
HDFS-8214. Secondary NN Web UI shows wrong date for Last Checkpoint. (clamb via wang)
|
HDFS-8214. Secondary NN Web UI shows wrong date for Last Checkpoint. (clamb via wang)
|
||||||
|
|
||||||
|
HDFS-8300. Fix unit test failures and findbugs warning caused by HDFS-8283.
|
||||||
|
(jing9)
|
||||||
|
|
||||||
Release 2.7.1 - UNRELEASED
|
Release 2.7.1 - UNRELEASED
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
|
|
@ -767,7 +767,7 @@ public class DFSOutputStream extends FSOutputSummer
|
||||||
|
|
||||||
protected synchronized void closeImpl() throws IOException {
|
protected synchronized void closeImpl() throws IOException {
|
||||||
if (isClosed()) {
|
if (isClosed()) {
|
||||||
streamer.getLastException().check();
|
streamer.getLastException().check(true);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -174,13 +174,13 @@ class DataStreamer extends Daemon {
|
||||||
packets.clear();
|
packets.clear();
|
||||||
}
|
}
|
||||||
|
|
||||||
static class LastException {
|
static class LastExceptionInStreamer {
|
||||||
private Throwable thrown;
|
private IOException thrown;
|
||||||
|
|
||||||
synchronized void set(Throwable t) {
|
synchronized void set(Throwable t) {
|
||||||
Preconditions.checkNotNull(t);
|
assert t != null;
|
||||||
Preconditions.checkState(thrown == null);
|
this.thrown = t instanceof IOException ?
|
||||||
this.thrown = t;
|
(IOException) t : new IOException(t);
|
||||||
}
|
}
|
||||||
|
|
||||||
synchronized void clear() {
|
synchronized void clear() {
|
||||||
|
@ -188,17 +188,23 @@ class DataStreamer extends Daemon {
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Check if there already is an exception. */
|
/** Check if there already is an exception. */
|
||||||
synchronized void check() throws IOException {
|
synchronized void check(boolean resetToNull) throws IOException {
|
||||||
if (thrown != null) {
|
if (thrown != null) {
|
||||||
throw new IOException(thrown);
|
if (LOG.isTraceEnabled()) {
|
||||||
|
// wrap and print the exception to know when the check is called
|
||||||
|
LOG.trace("Got Exception while checking", new Throwable(thrown));
|
||||||
|
}
|
||||||
|
final IOException e = thrown;
|
||||||
|
if (resetToNull) {
|
||||||
|
thrown = null;
|
||||||
|
}
|
||||||
|
throw e;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
synchronized void throwException4Close() throws IOException {
|
synchronized void throwException4Close() throws IOException {
|
||||||
check();
|
check(false);
|
||||||
final IOException ioe = new ClosedChannelException();
|
throw new ClosedChannelException();
|
||||||
thrown = ioe;
|
|
||||||
throw ioe;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -234,7 +240,7 @@ class DataStreamer extends Daemon {
|
||||||
private long lastQueuedSeqno = -1;
|
private long lastQueuedSeqno = -1;
|
||||||
private long lastAckedSeqno = -1;
|
private long lastAckedSeqno = -1;
|
||||||
private long bytesCurBlock = 0; // bytes written in current block
|
private long bytesCurBlock = 0; // bytes written in current block
|
||||||
private final LastException lastException = new LastException();
|
private final LastExceptionInStreamer lastException = new LastExceptionInStreamer();
|
||||||
private Socket s;
|
private Socket s;
|
||||||
|
|
||||||
private final DFSClient dfsClient;
|
private final DFSClient dfsClient;
|
||||||
|
@ -1741,7 +1747,7 @@ class DataStreamer extends Daemon {
|
||||||
/**
|
/**
|
||||||
* @return the last exception
|
* @return the last exception
|
||||||
*/
|
*/
|
||||||
LastException getLastException(){
|
LastExceptionInStreamer getLastException(){
|
||||||
return lastException;
|
return lastException;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -28,6 +28,7 @@ import java.util.concurrent.atomic.AtomicReference;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.hdfs.DataStreamer.LastExceptionInStreamer;
|
||||||
import org.apache.hadoop.hdfs.client.impl.DfsClientConf;
|
import org.apache.hadoop.hdfs.client.impl.DfsClientConf;
|
||||||
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
||||||
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
||||||
|
@ -65,9 +66,10 @@ public class TestDFSOutputStream {
|
||||||
DataStreamer streamer = (DataStreamer) Whitebox
|
DataStreamer streamer = (DataStreamer) Whitebox
|
||||||
.getInternalState(dos, "streamer");
|
.getInternalState(dos, "streamer");
|
||||||
@SuppressWarnings("unchecked")
|
@SuppressWarnings("unchecked")
|
||||||
AtomicReference<IOException> ex = (AtomicReference<IOException>) Whitebox
|
LastExceptionInStreamer ex = (LastExceptionInStreamer) Whitebox
|
||||||
.getInternalState(streamer, "lastException");
|
.getInternalState(streamer, "lastException");
|
||||||
Assert.assertEquals(null, ex.get());
|
Throwable thrown = (Throwable) Whitebox.getInternalState(ex, "thrown");
|
||||||
|
Assert.assertNull(thrown);
|
||||||
|
|
||||||
dos.close();
|
dos.close();
|
||||||
|
|
||||||
|
@ -78,7 +80,8 @@ public class TestDFSOutputStream {
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
Assert.assertEquals(e, dummy);
|
Assert.assertEquals(e, dummy);
|
||||||
}
|
}
|
||||||
Assert.assertEquals(null, ex.get());
|
thrown = (Throwable) Whitebox.getInternalState(ex, "thrown");
|
||||||
|
Assert.assertNull(thrown);
|
||||||
dos.close();
|
dos.close();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue