HDFS-7854. Separate class DataStreamer out of DFSOutputStream. Contributed by Li Bo.

(cherry picked from commit a16bfff71b)

Conflicts:
	hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
This commit is contained in:
Jing Zhao 2015-03-24 11:06:13 -07:00
parent 260dbe96c3
commit 483f77b75b
7 changed files with 1893 additions and 1584 deletions

View File

@ -19,6 +19,9 @@ Release 2.8.0 - UNRELEASED
HDFS-7829. Code clean up for LocatedBlock. (Takanobu Asanuma via jing9) HDFS-7829. Code clean up for LocatedBlock. (Takanobu Asanuma via jing9)
HDFS-7854. Separate class DataStreamer out of DFSOutputStream. (Li Bo via
jing9)
OPTIMIZATIONS OPTIMIZATIONS
BUG FIXES BUG FIXES

View File

@ -77,7 +77,7 @@
ResponseProccessor is thread that is designed to catch RuntimeException. ResponseProccessor is thread that is designed to catch RuntimeException.
--> -->
<Match> <Match>
<Class name="org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer$ResponseProcessor" /> <Class name="org.apache.hadoop.hdfs.DataStreamer$ResponseProcessor" />
<Method name="run" /> <Method name="run" />
<Bug pattern="REC_CATCH_EXCEPTION" /> <Bug pattern="REC_CATCH_EXCEPTION" />
</Match> </Match>

View File

@ -917,7 +917,7 @@ public class DFSTestUtil {
public static BlockOpResponseProto transferRbw(final ExtendedBlock b, public static BlockOpResponseProto transferRbw(final ExtendedBlock b,
final DFSClient dfsClient, final DatanodeInfo... datanodes) throws IOException { final DFSClient dfsClient, final DatanodeInfo... datanodes) throws IOException {
assertEquals(2, datanodes.length); assertEquals(2, datanodes.length);
final Socket s = DFSOutputStream.createSocketForPipeline(datanodes[0], final Socket s = DataStreamer.createSocketForPipeline(datanodes[0],
datanodes.length, dfsClient); datanodes.length, dfsClient);
final long writeTimeout = dfsClient.getDatanodeWriteTimeout(datanodes.length); final long writeTimeout = dfsClient.getDatanodeWriteTimeout(datanodes.length);
final DataOutputStream out = new DataOutputStream(new BufferedOutputStream( final DataOutputStream out = new DataOutputStream(new BufferedOutputStream(

View File

@ -51,8 +51,11 @@ public class TestDFSOutputStream {
DFSOutputStream dos = (DFSOutputStream) Whitebox.getInternalState(os, DFSOutputStream dos = (DFSOutputStream) Whitebox.getInternalState(os,
"wrappedStream"); "wrappedStream");
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
DataStreamer streamer = (DataStreamer) Whitebox
.getInternalState(dos, "streamer");
@SuppressWarnings("unchecked")
AtomicReference<IOException> ex = (AtomicReference<IOException>) Whitebox AtomicReference<IOException> ex = (AtomicReference<IOException>) Whitebox
.getInternalState(dos, "lastException"); .getInternalState(streamer, "lastException");
Assert.assertEquals(null, ex.get()); Assert.assertEquals(null, ex.get());
dos.close(); dos.close();

View File

@ -43,6 +43,8 @@ import java.io.File;
import java.io.FileNotFoundException; import java.io.FileNotFoundException;
import java.io.FileReader; import java.io.FileReader;
import java.io.IOException; import java.io.IOException;
import java.lang.reflect.Field;
import java.lang.reflect.Modifier;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.net.URI; import java.net.URI;
import java.net.UnknownHostException; import java.net.UnknownHostException;
@ -603,7 +605,8 @@ public class TestFileCreation {
* Test that file leases are persisted across namenode restarts. * Test that file leases are persisted across namenode restarts.
*/ */
@Test @Test
public void testFileCreationNamenodeRestart() throws IOException { public void testFileCreationNamenodeRestart()
throws IOException, NoSuchFieldException, IllegalAccessException {
Configuration conf = new HdfsConfiguration(); Configuration conf = new HdfsConfiguration();
final int MAX_IDLE_TIME = 2000; // 2s final int MAX_IDLE_TIME = 2000; // 2s
conf.setInt("ipc.client.connection.maxidletime", MAX_IDLE_TIME); conf.setInt("ipc.client.connection.maxidletime", MAX_IDLE_TIME);
@ -702,11 +705,18 @@ public class TestFileCreation {
// new blocks for files that were renamed. // new blocks for files that were renamed.
DFSOutputStream dfstream = (DFSOutputStream) DFSOutputStream dfstream = (DFSOutputStream)
(stm.getWrappedStream()); (stm.getWrappedStream());
dfstream.setTestFilename(file1.toString());
Field f = DFSOutputStream.class.getDeclaredField("src");
Field modifiersField = Field.class.getDeclaredField("modifiers");
modifiersField.setAccessible(true);
modifiersField.setInt(f, f.getModifiers() & ~Modifier.FINAL);
f.setAccessible(true);
f.set(dfstream, file1.toString());
dfstream = (DFSOutputStream) (stm3.getWrappedStream()); dfstream = (DFSOutputStream) (stm3.getWrappedStream());
dfstream.setTestFilename(file3new.toString()); f.set(dfstream, file3new.toString());
dfstream = (DFSOutputStream) (stm4.getWrappedStream()); dfstream = (DFSOutputStream) (stm4.getWrappedStream());
dfstream.setTestFilename(file4new.toString()); f.set(dfstream, file4new.toString());
// write 1 byte to file. This should succeed because the // write 1 byte to file. This should succeed because the
// namenode should have persisted leases. // namenode should have persisted leases.