HDFS-8990. Move RemoteBlockReader to hdfs-client module. Contributed by Mingliang Liu.

This commit is contained in:
Haohui Mai 2015-08-31 13:54:14 -07:00
parent f39c749f2a
commit 94ab3b9e8f
8 changed files with 22 additions and 13 deletions

View File

@ -50,6 +50,8 @@ import org.apache.hadoop.util.DataChecksum;
import org.apache.htrace.Sampler; import org.apache.htrace.Sampler;
import org.apache.htrace.Trace; import org.apache.htrace.Trace;
import org.apache.htrace.TraceScope; import org.apache.htrace.TraceScope;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/** /**
@ -60,6 +62,8 @@ import org.apache.htrace.TraceScope;
@InterfaceAudience.Private @InterfaceAudience.Private
@Deprecated @Deprecated
public class RemoteBlockReader extends FSInputChecker implements BlockReader { public class RemoteBlockReader extends FSInputChecker implements BlockReader {
static final Logger LOG = LoggerFactory.getLogger(FSInputChecker.class);
private final Peer peer; private final Peer peer;
private final DatanodeID datanodeID; private final DatanodeID datanodeID;
private final DataInputStream in; private final DataInputStream in;
@ -488,7 +492,7 @@ public class RemoteBlockReader extends FSInputChecker implements BlockReader {
public int available() throws IOException { public int available() throws IOException {
// An optimistic estimate of how much data is available // An optimistic estimate of how much data is available
// to us without doing network I/O. // to us without doing network I/O.
return DFSClient.TCP_WINDOW_SIZE; return RemoteBlockReader2.TCP_WINDOW_SIZE;
} }
@Override @Override

View File

@ -28,8 +28,6 @@ import java.nio.channels.ReadableByteChannel;
import java.util.EnumSet; import java.util.EnumSet;
import java.util.UUID; import java.util.UUID;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.fs.ReadOption; import org.apache.hadoop.fs.ReadOption;
import org.apache.hadoop.hdfs.net.Peer; import org.apache.hadoop.hdfs.net.Peer;
@ -56,6 +54,9 @@ import org.apache.htrace.TraceScope;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/** /**
* This is a wrapper around connection to datanode * This is a wrapper around connection to datanode
* and understands checksum, offset etc. * and understands checksum, offset etc.
@ -85,16 +86,18 @@ import com.google.common.annotations.VisibleForTesting;
@InterfaceAudience.Private @InterfaceAudience.Private
public class RemoteBlockReader2 implements BlockReader { public class RemoteBlockReader2 implements BlockReader {
static final Log LOG = LogFactory.getLog(RemoteBlockReader2.class); static final Logger LOG = LoggerFactory.getLogger(RemoteBlockReader2.class);
static final int TCP_WINDOW_SIZE = 128 * 1024; // 128 KB;
final private Peer peer; final private Peer peer;
final private DatanodeID datanodeID; final private DatanodeID datanodeID;
final private PeerCache peerCache; final private PeerCache peerCache;
final private long blockId; final private long blockId;
private final ReadableByteChannel in; private final ReadableByteChannel in;
private DataChecksum checksum;
private DataChecksum checksum;
private final PacketReceiver packetReceiver = new PacketReceiver(true); private final PacketReceiver packetReceiver = new PacketReceiver(true);
private ByteBuffer curDataSlice = null; private ByteBuffer curDataSlice = null;
/** offset in block of the last chunk received */ /** offset in block of the last chunk received */
@ -457,7 +460,7 @@ public class RemoteBlockReader2 implements BlockReader {
public int available() throws IOException { public int available() throws IOException {
// An optimistic estimate of how much data is available // An optimistic estimate of how much data is available
// to us without doing network I/O. // to us without doing network I/O.
return DFSClient.TCP_WINDOW_SIZE; return TCP_WINDOW_SIZE;
} }
@Override @Override

View File

@ -24,14 +24,14 @@ import java.io.InputStream;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.nio.channels.ReadableByteChannel; import java.nio.channels.ReadableByteChannel;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.util.DirectBufferPool; import org.apache.hadoop.util.DirectBufferPool;
import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.IOUtils;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import com.google.common.primitives.Ints; import com.google.common.primitives.Ints;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/** /**
* Class to handle reading packets one-at-a-time from the wire. * Class to handle reading packets one-at-a-time from the wire.
@ -47,7 +47,7 @@ public class PacketReceiver implements Closeable {
*/ */
private static final int MAX_PACKET_SIZE = 16 * 1024 * 1024; private static final int MAX_PACKET_SIZE = 16 * 1024 * 1024;
static final Log LOG = LogFactory.getLog(PacketReceiver.class); static final Logger LOG = LoggerFactory.getLogger(PacketReceiver.class);
private static final DirectBufferPool bufferPool = new DirectBufferPool(); private static final DirectBufferPool bufferPool = new DirectBufferPool();
private final boolean useDirectBuffers; private final boolean useDirectBuffers;

View File

@ -522,6 +522,9 @@ Release 2.8.0 - UNRELEASED
HDFS-8980. Remove unnecessary block replacement in INodeFile. (jing9) HDFS-8980. Remove unnecessary block replacement in INodeFile. (jing9)
HDFS-8990. Move RemoteBlockReader to hdfs-client module.
(Mingliang via wheat9)
OPTIMIZATIONS OPTIMIZATIONS
HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than

View File

@ -209,7 +209,6 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
DataEncryptionKeyFactory { DataEncryptionKeyFactory {
public static final Log LOG = LogFactory.getLog(DFSClient.class); public static final Log LOG = LogFactory.getLog(DFSClient.class);
public static final long SERVER_DEFAULTS_VALIDITY_PERIOD = 60 * 60 * 1000L; // 1 hour public static final long SERVER_DEFAULTS_VALIDITY_PERIOD = 60 * 60 * 1000L; // 1 hour
static final int TCP_WINDOW_SIZE = 128 * 1024; // 128 KB
private final Configuration conf; private final Configuration conf;
private final DfsClientConf dfsClientConf; private final DfsClientConf dfsClientConf;

View File

@ -24,10 +24,10 @@ import static org.mockito.Mockito.verify;
import java.util.List; import java.util.List;
import org.apache.commons.logging.impl.Log4JLogger;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.log4j.Level; import org.apache.log4j.Level;
import org.junit.AfterClass; import org.junit.AfterClass;
import org.junit.BeforeClass; import org.junit.BeforeClass;
@ -41,7 +41,7 @@ public class TestClientBlockVerification {
static LocatedBlock testBlock = null; static LocatedBlock testBlock = null;
static { static {
((Log4JLogger)RemoteBlockReader2.LOG).getLogger().setLevel(Level.ALL); GenericTestUtils.setLogLevel(RemoteBlockReader2.LOG, Level.ALL);
} }
@BeforeClass @BeforeClass
public static void setupCluster() throws Exception { public static void setupCluster() throws Exception {