svn merge -c 1330535 from trunk for HDFS-3319. Change DFSOutputStream to not to start a thread in constructors.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1330536 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
f65f193d78
commit
ab658346fb
|
@ -431,6 +431,9 @@ Release 2.0.0 - UNRELEASED
|
||||||
|
|
||||||
HDFS-3314. HttpFS operation for getHomeDirectory is incorrect. (tucu)
|
HDFS-3314. HttpFS operation for getHomeDirectory is incorrect. (tucu)
|
||||||
|
|
||||||
|
HDFS-3319. Change DFSOutputStream to not to start a thread in constructors.
|
||||||
|
(szetszwo)
|
||||||
|
|
||||||
BREAKDOWN OF HDFS-1623 SUBTASKS
|
BREAKDOWN OF HDFS-1623 SUBTASKS
|
||||||
|
|
||||||
HDFS-2179. Add fencing framework and mechanisms for NameNode HA. (todd)
|
HDFS-2179. Add fencing framework and mechanisms for NameNode HA. (todd)
|
||||||
|
|
|
@ -1085,9 +1085,9 @@ public class DFSClient implements java.io.Closeable {
|
||||||
if(LOG.isDebugEnabled()) {
|
if(LOG.isDebugEnabled()) {
|
||||||
LOG.debug(src + ": masked=" + masked);
|
LOG.debug(src + ": masked=" + masked);
|
||||||
}
|
}
|
||||||
final DFSOutputStream result = new DFSOutputStream(this, src, masked, flag,
|
final DFSOutputStream result = DFSOutputStream.newStreamForCreate(this,
|
||||||
createParent, replication, blockSize, progress, buffersize,
|
src, masked, flag, createParent, replication, blockSize, progress,
|
||||||
dfsClientConf.createChecksum());
|
buffersize, dfsClientConf.createChecksum());
|
||||||
leaserenewer.put(src, result, this);
|
leaserenewer.put(src, result, this);
|
||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
|
@ -1134,7 +1134,7 @@ public class DFSClient implements java.io.Closeable {
|
||||||
DataChecksum checksum = DataChecksum.newDataChecksum(
|
DataChecksum checksum = DataChecksum.newDataChecksum(
|
||||||
dfsClientConf.checksumType,
|
dfsClientConf.checksumType,
|
||||||
bytesPerChecksum);
|
bytesPerChecksum);
|
||||||
result = new DFSOutputStream(this, src, absPermission,
|
result = DFSOutputStream.newStreamForCreate(this, src, absPermission,
|
||||||
flag, createParent, replication, blockSize, progress, buffersize,
|
flag, createParent, replication, blockSize, progress, buffersize,
|
||||||
checksum);
|
checksum);
|
||||||
}
|
}
|
||||||
|
@ -1193,7 +1193,7 @@ public class DFSClient implements java.io.Closeable {
|
||||||
UnsupportedOperationException.class,
|
UnsupportedOperationException.class,
|
||||||
UnresolvedPathException.class);
|
UnresolvedPathException.class);
|
||||||
}
|
}
|
||||||
return new DFSOutputStream(this, src, buffersize, progress,
|
return DFSOutputStream.newStreamForAppend(this, src, buffersize, progress,
|
||||||
lastBlock, stat, dfsClientConf.createChecksum());
|
lastBlock, stat, dfsClientConf.createChecksum());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -100,7 +100,7 @@ import org.apache.hadoop.util.Progressable;
|
||||||
* starts sending packets from the dataQueue.
|
* starts sending packets from the dataQueue.
|
||||||
****************************************************************/
|
****************************************************************/
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
public final class DFSOutputStream extends FSOutputSummer implements Syncable {
|
public class DFSOutputStream extends FSOutputSummer implements Syncable {
|
||||||
private final DFSClient dfsClient;
|
private final DFSClient dfsClient;
|
||||||
private static final int MAX_PACKETS = 80; // each packet 64K, total 5MB
|
private static final int MAX_PACKETS = 80; // each packet 64K, total 5MB
|
||||||
private Socket s;
|
private Socket s;
|
||||||
|
@ -1234,14 +1234,11 @@ public final class DFSOutputStream extends FSOutputSummer implements Syncable {
|
||||||
this.checksum = checksum;
|
this.checksum = checksum;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/** Construct a new output stream for creating a file. */
|
||||||
* Create a new output stream to the given DataNode.
|
private DFSOutputStream(DFSClient dfsClient, String src, FsPermission masked,
|
||||||
* @see ClientProtocol#create(String, FsPermission, String, EnumSetWritable, boolean, short, long)
|
EnumSet<CreateFlag> flag, boolean createParent, short replication,
|
||||||
*/
|
long blockSize, Progressable progress, int buffersize,
|
||||||
DFSOutputStream(DFSClient dfsClient, String src, FsPermission masked, EnumSet<CreateFlag> flag,
|
DataChecksum checksum) throws IOException {
|
||||||
boolean createParent, short replication, long blockSize, Progressable progress,
|
|
||||||
int buffersize, DataChecksum checksum)
|
|
||||||
throws IOException {
|
|
||||||
this(dfsClient, src, blockSize, progress, checksum, replication);
|
this(dfsClient, src, blockSize, progress, checksum, replication);
|
||||||
|
|
||||||
computePacketChunkSize(dfsClient.getConf().writePacketSize,
|
computePacketChunkSize(dfsClient.getConf().writePacketSize,
|
||||||
|
@ -1261,14 +1258,21 @@ public final class DFSOutputStream extends FSOutputSummer implements Syncable {
|
||||||
UnresolvedPathException.class);
|
UnresolvedPathException.class);
|
||||||
}
|
}
|
||||||
streamer = new DataStreamer();
|
streamer = new DataStreamer();
|
||||||
streamer.start();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
static DFSOutputStream newStreamForCreate(DFSClient dfsClient, String src,
|
||||||
* Create a new output stream to the given DataNode.
|
FsPermission masked, EnumSet<CreateFlag> flag, boolean createParent,
|
||||||
* @see ClientProtocol#create(String, FsPermission, String, boolean, short, long)
|
short replication, long blockSize, Progressable progress, int buffersize,
|
||||||
*/
|
DataChecksum checksum) throws IOException {
|
||||||
DFSOutputStream(DFSClient dfsClient, String src, int buffersize, Progressable progress,
|
final DFSOutputStream out = new DFSOutputStream(dfsClient, src, masked,
|
||||||
|
flag, createParent, replication, blockSize, progress, buffersize,
|
||||||
|
checksum);
|
||||||
|
out.streamer.start();
|
||||||
|
return out;
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Construct a new output stream for append. */
|
||||||
|
private DFSOutputStream(DFSClient dfsClient, String src, int buffersize, Progressable progress,
|
||||||
LocatedBlock lastBlock, HdfsFileStatus stat,
|
LocatedBlock lastBlock, HdfsFileStatus stat,
|
||||||
DataChecksum checksum) throws IOException {
|
DataChecksum checksum) throws IOException {
|
||||||
this(dfsClient, src, stat.getBlockSize(), progress, checksum, stat.getReplication());
|
this(dfsClient, src, stat.getBlockSize(), progress, checksum, stat.getReplication());
|
||||||
|
@ -1286,7 +1290,15 @@ public final class DFSOutputStream extends FSOutputSummer implements Syncable {
|
||||||
checksum.getBytesPerChecksum());
|
checksum.getBytesPerChecksum());
|
||||||
streamer = new DataStreamer();
|
streamer = new DataStreamer();
|
||||||
}
|
}
|
||||||
streamer.start();
|
}
|
||||||
|
|
||||||
|
static DFSOutputStream newStreamForAppend(DFSClient dfsClient, String src,
|
||||||
|
int buffersize, Progressable progress, LocatedBlock lastBlock,
|
||||||
|
HdfsFileStatus stat, DataChecksum checksum) throws IOException {
|
||||||
|
final DFSOutputStream out = new DFSOutputStream(dfsClient, src, buffersize,
|
||||||
|
progress, lastBlock, stat, checksum);
|
||||||
|
out.streamer.start();
|
||||||
|
return out;
|
||||||
}
|
}
|
||||||
|
|
||||||
private void computePacketChunkSize(int psize, int csize) {
|
private void computePacketChunkSize(int psize, int csize) {
|
||||||
|
|
|
@ -99,7 +99,7 @@ public class TestLeaseRenewer {
|
||||||
clientName.startsWith("DFSClient_NONMAPREDUCE_"));
|
clientName.startsWith("DFSClient_NONMAPREDUCE_"));
|
||||||
}
|
}
|
||||||
|
|
||||||
// @Test
|
@Test
|
||||||
public void testRenewal() throws Exception {
|
public void testRenewal() throws Exception {
|
||||||
// Keep track of how many times the lease gets renewed
|
// Keep track of how many times the lease gets renewed
|
||||||
final AtomicInteger leaseRenewalCount = new AtomicInteger();
|
final AtomicInteger leaseRenewalCount = new AtomicInteger();
|
||||||
|
@ -135,7 +135,7 @@ public class TestLeaseRenewer {
|
||||||
* to several DFSClients with the same name, the first of which has no files
|
* to several DFSClients with the same name, the first of which has no files
|
||||||
* open. Previously, this was causing the lease to not get renewed.
|
* open. Previously, this was causing the lease to not get renewed.
|
||||||
*/
|
*/
|
||||||
// @Test
|
@Test
|
||||||
public void testManyDfsClientsWhereSomeNotOpen() throws Exception {
|
public void testManyDfsClientsWhereSomeNotOpen() throws Exception {
|
||||||
// First DFSClient has no files open so doesn't renew leases.
|
// First DFSClient has no files open so doesn't renew leases.
|
||||||
final DFSClient mockClient1 = createMockClient();
|
final DFSClient mockClient1 = createMockClient();
|
||||||
|
@ -181,7 +181,7 @@ public class TestLeaseRenewer {
|
||||||
renewer.closeFile(filePath, mockClient2);
|
renewer.closeFile(filePath, mockClient2);
|
||||||
}
|
}
|
||||||
|
|
||||||
// @Test
|
@Test
|
||||||
public void testThreadName() throws Exception {
|
public void testThreadName() throws Exception {
|
||||||
DFSOutputStream mockStream = Mockito.mock(DFSOutputStream.class);
|
DFSOutputStream mockStream = Mockito.mock(DFSOutputStream.class);
|
||||||
String filePath = "/foo";
|
String filePath = "/foo";
|
||||||
|
|
Loading…
Reference in New Issue