HDFS-7515. Fix new findbugs warnings in hadoop-hdfs. Contributed by Haohui Mai.
This commit is contained in:
parent
41f0d20fcb
commit
c8eb139977
|
@ -315,6 +315,8 @@ Release 2.7.0 - UNRELEASED
|
|||
HDFS-7475. Make TestLazyPersistFiles#testLazyPersistBlocksAreSaved
|
||||
deterministic. (Xiaoyu Yao via Arpit Agarwal)
|
||||
|
||||
HDFS-7515. Fix new findbugs warnings in hadoop-hdfs. (wheat9)
|
||||
|
||||
Release 2.6.1 - UNRELEASED
|
||||
|
||||
INCOMPATIBLE CHANGES
|
||||
|
|
|
@ -668,7 +668,6 @@ public class BlockReaderFactory implements ShortCircuitReplicaCreator {
|
|||
Peer peer = null;
|
||||
try {
|
||||
curPeer = nextTcpPeer();
|
||||
if (curPeer == null) break;
|
||||
if (curPeer.fromCache) remainingCacheTries--;
|
||||
peer = curPeer.peer;
|
||||
blockReader = getRemoteBlockReader(peer);
|
||||
|
@ -699,7 +698,6 @@ public class BlockReaderFactory implements ShortCircuitReplicaCreator {
|
|||
}
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
public static class BlockReaderPeer {
|
||||
|
|
|
@ -39,6 +39,7 @@ import java.util.LinkedList;
|
|||
import java.util.List;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
import org.apache.hadoop.HadoopIllegalArgumentException;
|
||||
|
@ -241,8 +242,6 @@ public class DFSOutputStream extends FSOutputSummer
|
|||
/**
|
||||
* Create a new packet.
|
||||
*
|
||||
* @param pktSize maximum size of the packet,
|
||||
* including checksum data and actual data.
|
||||
* @param chunksPerPkt maximum number of chunks per packet.
|
||||
* @param offsetInBlock offset in bytes into the HDFS block.
|
||||
*/
|
||||
|
@ -405,7 +404,8 @@ public class DFSOutputStream extends FSOutputSummer
|
|||
private String[] favoredNodes;
|
||||
volatile boolean hasError = false;
|
||||
volatile int errorIndex = -1;
|
||||
volatile int restartingNodeIndex = -1; // Restarting node index
|
||||
// Restarting node index
|
||||
AtomicInteger restartingNodeIndex = new AtomicInteger(-1);
|
||||
private long restartDeadline = 0; // Deadline of DN restart
|
||||
private BlockConstructionStage stage; // block construction stage
|
||||
private long bytesSent = 0; // number of bytes that've been sent
|
||||
|
@ -556,7 +556,7 @@ public class DFSOutputStream extends FSOutputSummer
|
|||
try {
|
||||
// process datanode IO errors if any
|
||||
boolean doSleep = false;
|
||||
if (hasError && (errorIndex >= 0 || restartingNodeIndex >= 0)) {
|
||||
if (hasError && (errorIndex >= 0 || restartingNodeIndex.get() >= 0)) {
|
||||
doSleep = processDatanodeError();
|
||||
}
|
||||
|
||||
|
@ -699,7 +699,7 @@ public class DFSOutputStream extends FSOutputSummer
|
|||
}
|
||||
} catch (Throwable e) {
|
||||
// Log warning if there was a real error.
|
||||
if (restartingNodeIndex == -1) {
|
||||
if (restartingNodeIndex.get() == -1) {
|
||||
DFSClient.LOG.warn("DataStreamer Exception", e);
|
||||
}
|
||||
if (e instanceof IOException) {
|
||||
|
@ -708,7 +708,7 @@ public class DFSOutputStream extends FSOutputSummer
|
|||
setLastException(new IOException("DataStreamer Exception: ",e));
|
||||
}
|
||||
hasError = true;
|
||||
if (errorIndex == -1 && restartingNodeIndex == -1) {
|
||||
if (errorIndex == -1 && restartingNodeIndex.get() == -1) {
|
||||
// Not a datanode issue
|
||||
streamerClosed = true;
|
||||
}
|
||||
|
@ -806,7 +806,7 @@ public class DFSOutputStream extends FSOutputSummer
|
|||
|
||||
/** Set the restarting node index. Called by responder */
|
||||
synchronized void setRestartingNodeIndex(int idx) {
|
||||
restartingNodeIndex = idx;
|
||||
restartingNodeIndex.set(idx);
|
||||
// If the data streamer has already set the primary node
|
||||
// bad, clear it. It is likely that the write failed due to
|
||||
// the DN shutdown. Even if it was a real failure, the pipeline
|
||||
|
@ -821,7 +821,7 @@ public class DFSOutputStream extends FSOutputSummer
|
|||
*/
|
||||
synchronized void tryMarkPrimaryDatanodeFailed() {
|
||||
// There should be no existing error and no ongoing restart.
|
||||
if ((errorIndex == -1) && (restartingNodeIndex == -1)) {
|
||||
if ((errorIndex == -1) && (restartingNodeIndex.get() == -1)) {
|
||||
errorIndex = 0;
|
||||
}
|
||||
}
|
||||
|
@ -962,7 +962,7 @@ public class DFSOutputStream extends FSOutputSummer
|
|||
synchronized (dataQueue) {
|
||||
dataQueue.notifyAll();
|
||||
}
|
||||
if (restartingNodeIndex == -1) {
|
||||
if (restartingNodeIndex.get() == -1) {
|
||||
DFSClient.LOG.warn("DFSOutputStream ResponseProcessor exception "
|
||||
+ " for block " + block, e);
|
||||
}
|
||||
|
@ -1186,7 +1186,7 @@ public class DFSOutputStream extends FSOutputSummer
|
|||
// Sleep before reconnect if a dn is restarting.
|
||||
// This process will be repeated until the deadline or the datanode
|
||||
// starts back up.
|
||||
if (restartingNodeIndex >= 0) {
|
||||
if (restartingNodeIndex.get() >= 0) {
|
||||
// 4 seconds or the configured deadline period, whichever is shorter.
|
||||
// This is the retry interval and recovery will be retried in this
|
||||
// interval until timeout or success.
|
||||
|
@ -1196,7 +1196,7 @@ public class DFSOutputStream extends FSOutputSummer
|
|||
Thread.sleep(delay);
|
||||
} catch (InterruptedException ie) {
|
||||
lastException.set(new IOException("Interrupted while waiting for " +
|
||||
"datanode to restart. " + nodes[restartingNodeIndex]));
|
||||
"datanode to restart. " + nodes[restartingNodeIndex.get()]));
|
||||
streamerClosed = true;
|
||||
return false;
|
||||
}
|
||||
|
@ -1237,21 +1237,21 @@ public class DFSOutputStream extends FSOutputSummer
|
|||
setPipeline(newnodes, newStorageTypes, newStorageIDs);
|
||||
|
||||
// Just took care of a node error while waiting for a node restart
|
||||
if (restartingNodeIndex >= 0) {
|
||||
if (restartingNodeIndex.get() >= 0) {
|
||||
// If the error came from a node further away than the restarting
|
||||
// node, the restart must have been complete.
|
||||
if (errorIndex > restartingNodeIndex) {
|
||||
restartingNodeIndex = -1;
|
||||
} else if (errorIndex < restartingNodeIndex) {
|
||||
if (errorIndex > restartingNodeIndex.get()) {
|
||||
restartingNodeIndex.set(-1);
|
||||
} else if (errorIndex < restartingNodeIndex.get()) {
|
||||
// the node index has shifted.
|
||||
restartingNodeIndex--;
|
||||
restartingNodeIndex.decrementAndGet();
|
||||
} else {
|
||||
// this shouldn't happen...
|
||||
assert false;
|
||||
}
|
||||
}
|
||||
|
||||
if (restartingNodeIndex == -1) {
|
||||
if (restartingNodeIndex.get() == -1) {
|
||||
hasError = false;
|
||||
}
|
||||
lastException.set(null);
|
||||
|
@ -1293,10 +1293,10 @@ public class DFSOutputStream extends FSOutputSummer
|
|||
success = createBlockOutputStream(nodes, storageTypes, newGS, isRecovery);
|
||||
}
|
||||
|
||||
if (restartingNodeIndex >= 0) {
|
||||
if (restartingNodeIndex.get() >= 0) {
|
||||
assert hasError == true;
|
||||
// check errorIndex set above
|
||||
if (errorIndex == restartingNodeIndex) {
|
||||
if (errorIndex == restartingNodeIndex.get()) {
|
||||
// ignore, if came from the restarting node
|
||||
errorIndex = -1;
|
||||
}
|
||||
|
@ -1306,8 +1306,8 @@ public class DFSOutputStream extends FSOutputSummer
|
|||
}
|
||||
// expired. declare the restarting node dead
|
||||
restartDeadline = 0;
|
||||
int expiredNodeIndex = restartingNodeIndex;
|
||||
restartingNodeIndex = -1;
|
||||
int expiredNodeIndex = restartingNodeIndex.get();
|
||||
restartingNodeIndex.set(-1);
|
||||
DFSClient.LOG.warn("Datanode did not restart in time: " +
|
||||
nodes[expiredNodeIndex]);
|
||||
// Mark the restarting node as failed. If there is any other failed
|
||||
|
@ -1459,7 +1459,7 @@ public class DFSOutputStream extends FSOutputSummer
|
|||
// from the local datanode. Thus it is safe to treat this as a
|
||||
// regular node error.
|
||||
if (PipelineAck.isRestartOOBStatus(pipelineStatus) &&
|
||||
restartingNodeIndex == -1) {
|
||||
restartingNodeIndex.get() == -1) {
|
||||
checkRestart = true;
|
||||
throw new IOException("A datanode is restarting.");
|
||||
}
|
||||
|
@ -1476,10 +1476,10 @@ public class DFSOutputStream extends FSOutputSummer
|
|||
assert null == blockStream : "Previous blockStream unclosed";
|
||||
blockStream = out;
|
||||
result = true; // success
|
||||
restartingNodeIndex = -1;
|
||||
restartingNodeIndex.set(-1);
|
||||
hasError = false;
|
||||
} catch (IOException ie) {
|
||||
if (restartingNodeIndex == -1) {
|
||||
if (restartingNodeIndex.get() == -1) {
|
||||
DFSClient.LOG.info("Exception in createBlockOutputStream", ie);
|
||||
}
|
||||
if (ie instanceof InvalidEncryptionKeyException && refetchEncryptionKey > 0) {
|
||||
|
@ -1511,10 +1511,10 @@ public class DFSOutputStream extends FSOutputSummer
|
|||
if (checkRestart && shouldWaitForRestart(errorIndex)) {
|
||||
restartDeadline = dfsClient.getConf().datanodeRestartTimeout +
|
||||
Time.now();
|
||||
restartingNodeIndex = errorIndex;
|
||||
restartingNodeIndex.set(errorIndex);
|
||||
errorIndex = -1;
|
||||
DFSClient.LOG.info("Waiting for the datanode to be restarted: " +
|
||||
nodes[restartingNodeIndex]);
|
||||
nodes[restartingNodeIndex.get()]);
|
||||
}
|
||||
hasError = true;
|
||||
setLastException(ie);
|
||||
|
|
|
@ -233,6 +233,7 @@ public class JournalNode implements Tool, Configurable, JournalNodeMXBean {
|
|||
Preconditions.checkArgument(jid != null &&
|
||||
!jid.isEmpty(),
|
||||
"bad journal identifier: %s", jid);
|
||||
assert jid != null;
|
||||
return new File(new File(dir), jid);
|
||||
}
|
||||
|
||||
|
|
|
@ -727,7 +727,7 @@ public abstract class Storage extends StorageInfo {
|
|||
file.close();
|
||||
throw e;
|
||||
}
|
||||
if (res != null && !deletionHookAdded) {
|
||||
if (!deletionHookAdded) {
|
||||
// If the file existed prior to our startup, we didn't
|
||||
// call deleteOnExit above. But since we successfully locked
|
||||
// the dir, we can take care of cleaning it up.
|
||||
|
|
|
@ -29,6 +29,8 @@ import java.io.FileOutputStream;
|
|||
import java.io.FileWriter;
|
||||
import java.io.IOException;
|
||||
import java.io.OutputStream;
|
||||
import java.io.OutputStreamWriter;
|
||||
import java.io.Writer;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.Arrays;
|
||||
import java.util.LinkedList;
|
||||
|
@ -836,9 +838,8 @@ class BlockReceiver implements Closeable {
|
|||
LOG.warn("Failed to delete restart meta file: " +
|
||||
restartMeta.getPath());
|
||||
}
|
||||
FileWriter out = null;
|
||||
try {
|
||||
out = new FileWriter(restartMeta);
|
||||
try (Writer out = new OutputStreamWriter(
|
||||
new FileOutputStream(restartMeta), "UTF-8")) {
|
||||
// write out the current time.
|
||||
out.write(Long.toString(Time.now() + restartBudget));
|
||||
out.flush();
|
||||
|
|
|
@ -583,7 +583,8 @@ public class DataNode extends ReconfigurableBase
|
|||
try {
|
||||
IOException ioe = ioExceptionFuture.get();
|
||||
if (ioe != null) {
|
||||
errorMessageBuilder.append(String.format("FAILED TO ADD: %s: %s\n",
|
||||
errorMessageBuilder.append(
|
||||
String.format("FAILED TO ADD: %s: %s%n",
|
||||
volume, ioe.getMessage()));
|
||||
LOG.error("Failed to add volume: " + volume, ioe);
|
||||
} else {
|
||||
|
@ -591,8 +592,9 @@ public class DataNode extends ReconfigurableBase
|
|||
LOG.info("Successfully added volume: " + volume);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
errorMessageBuilder.append(String.format("FAILED to ADD: %s: %s\n",
|
||||
volume, e.getMessage()));
|
||||
errorMessageBuilder.append(
|
||||
String.format("FAILED to ADD: %s: %s%n", volume,
|
||||
e.toString()));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -412,7 +412,7 @@ public class DataStorage extends Storage {
|
|||
LOG.warn(String.format(
|
||||
"I/O error attempting to unlock storage directory %s.",
|
||||
sd.getRoot()), e);
|
||||
errorMsgBuilder.append(String.format("Failed to remove %s: %s\n",
|
||||
errorMsgBuilder.append(String.format("Failed to remove %s: %s%n",
|
||||
sd.getRoot(), e.getMessage()));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -22,10 +22,13 @@ import java.io.DataInputStream;
|
|||
import java.io.File;
|
||||
import java.io.FileInputStream;
|
||||
import java.io.FileNotFoundException;
|
||||
import java.io.FileOutputStream;
|
||||
import java.io.FileWriter;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.io.OutputStreamWriter;
|
||||
import java.io.RandomAccessFile;
|
||||
import java.io.Writer;
|
||||
import java.util.Scanner;
|
||||
|
||||
import org.apache.commons.io.FileUtils;
|
||||
|
@ -192,7 +195,7 @@ class BlockPoolSlice {
|
|||
Scanner sc;
|
||||
|
||||
try {
|
||||
sc = new Scanner(new File(currentDir, DU_CACHE_FILE));
|
||||
sc = new Scanner(new File(currentDir, DU_CACHE_FILE), "UTF-8");
|
||||
} catch (FileNotFoundException fnfe) {
|
||||
return -1;
|
||||
}
|
||||
|
@ -233,23 +236,18 @@ class BlockPoolSlice {
|
|||
outFile.getParent());
|
||||
}
|
||||
|
||||
FileWriter out = null;
|
||||
try {
|
||||
long used = getDfsUsed();
|
||||
if (used > 0) {
|
||||
out = new FileWriter(outFile);
|
||||
try (Writer out = new OutputStreamWriter(
|
||||
new FileOutputStream(outFile), "UTF-8")) {
|
||||
// mtime is written last, so that truncated writes won't be valid.
|
||||
out.write(Long.toString(used) + " " + Long.toString(Time.now()));
|
||||
out.flush();
|
||||
out.close();
|
||||
out = null;
|
||||
}
|
||||
} catch (IOException ioe) {
|
||||
// If write failed, the volume might be bad. Since the cache file is
|
||||
// not critical, log the error and continue.
|
||||
FsDatasetImpl.LOG.warn("Failed to write dfsUsed to " + outFile, ioe);
|
||||
} finally {
|
||||
IOUtils.cleanup(null, out);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -453,7 +451,7 @@ class BlockPoolSlice {
|
|||
File.pathSeparator + "." + file.getName() + ".restart");
|
||||
Scanner sc = null;
|
||||
try {
|
||||
sc = new Scanner(restartMeta);
|
||||
sc = new Scanner(restartMeta, "UTF-8");
|
||||
// The restart meta file exists
|
||||
if (sc.hasNextLong() && (sc.nextLong() > Time.now())) {
|
||||
// It didn't expire. Load the replica as a RBW.
|
||||
|
|
|
@ -770,7 +770,6 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|||
final byte[] crcs = new byte[checksum.getChecksumSize(data.length)];
|
||||
|
||||
DataOutputStream metaOut = null;
|
||||
InputStream dataIn = null;
|
||||
try {
|
||||
File parentFile = dstMeta.getParentFile();
|
||||
if (parentFile != null) {
|
||||
|
@ -783,22 +782,23 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|||
new FileOutputStream(dstMeta), HdfsConstants.SMALL_BUFFER_SIZE));
|
||||
BlockMetadataHeader.writeHeader(metaOut, checksum);
|
||||
|
||||
dataIn = isNativeIOAvailable ?
|
||||
NativeIO.getShareDeleteFileInputStream(blockFile) :
|
||||
new FileInputStream(blockFile);
|
||||
|
||||
int offset = 0;
|
||||
for(int n; (n = dataIn.read(data, offset, data.length - offset)) != -1; ) {
|
||||
if (n > 0) {
|
||||
n += offset;
|
||||
offset = n % checksum.getBytesPerChecksum();
|
||||
final int length = n - offset;
|
||||
try (InputStream dataIn = isNativeIOAvailable ?
|
||||
NativeIO.getShareDeleteFileInputStream(blockFile) :
|
||||
new FileInputStream(blockFile)) {
|
||||
|
||||
if (length > 0) {
|
||||
checksum.calculateChunkedSums(data, 0, length, crcs, 0);
|
||||
metaOut.write(crcs, 0, checksum.getChecksumSize(length));
|
||||
for (int n; (n = dataIn.read(data, offset, data.length - offset)) != -1; ) {
|
||||
if (n > 0) {
|
||||
n += offset;
|
||||
offset = n % checksum.getBytesPerChecksum();
|
||||
final int length = n - offset;
|
||||
|
||||
System.arraycopy(data, length, data, 0, offset);
|
||||
if (length > 0) {
|
||||
checksum.calculateChunkedSums(data, 0, length, crcs, 0);
|
||||
metaOut.write(crcs, 0, checksum.getChecksumSize(length));
|
||||
|
||||
System.arraycopy(data, length, data, 0, offset);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -807,7 +807,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|||
checksum.calculateChunkedSums(data, 0, offset, crcs, 0);
|
||||
metaOut.write(crcs, 0, 4);
|
||||
} finally {
|
||||
IOUtils.cleanup(LOG, dataIn, metaOut);
|
||||
IOUtils.cleanup(LOG, metaOut);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1600,11 +1600,6 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|||
}
|
||||
f = info.getBlockFile();
|
||||
v = (FsVolumeImpl)info.getVolume();
|
||||
if (f == null) {
|
||||
errors.add("Failed to delete replica " + invalidBlks[i]
|
||||
+ ": File not found, volume=" + v);
|
||||
continue;
|
||||
}
|
||||
if (v == null) {
|
||||
errors.add("Failed to delete replica " + invalidBlks[i]
|
||||
+ ". No volume for this replica, file=" + f);
|
||||
|
|
|
@ -17,6 +17,7 @@
|
|||
*/
|
||||
package org.apache.hadoop.hdfs.server.datanode.web.webhdfs;
|
||||
|
||||
import com.google.common.base.Charsets;
|
||||
import com.sun.jersey.api.ParamException;
|
||||
import com.sun.jersey.api.container.ContainerException;
|
||||
import io.netty.buffer.Unpooled;
|
||||
|
@ -39,7 +40,7 @@ import static io.netty.handler.codec.http.HttpResponseStatus.FORBIDDEN;
|
|||
import static io.netty.handler.codec.http.HttpResponseStatus.INTERNAL_SERVER_ERROR;
|
||||
import static io.netty.handler.codec.http.HttpResponseStatus.NOT_FOUND;
|
||||
import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1;
|
||||
import static org.apache.hadoop.hdfs.server.datanode.web.webhdfs.WebHdfsHandler.APPLICATION_JSON;
|
||||
import static org.apache.hadoop.hdfs.server.datanode.web.webhdfs.WebHdfsHandler.APPLICATION_JSON_UTF8;
|
||||
|
||||
class ExceptionHandler {
|
||||
static Log LOG = WebHdfsHandler.LOG;
|
||||
|
@ -82,11 +83,11 @@ class ExceptionHandler {
|
|||
s = INTERNAL_SERVER_ERROR;
|
||||
}
|
||||
|
||||
final byte[] js = JsonUtil.toJsonString(e).getBytes();
|
||||
final byte[] js = JsonUtil.toJsonString(e).getBytes(Charsets.UTF_8);
|
||||
DefaultFullHttpResponse resp =
|
||||
new DefaultFullHttpResponse(HTTP_1_1, s, Unpooled.wrappedBuffer(js));
|
||||
|
||||
resp.headers().set(CONTENT_TYPE, APPLICATION_JSON);
|
||||
resp.headers().set(CONTENT_TYPE, APPLICATION_JSON_UTF8);
|
||||
resp.headers().set(CONTENT_LENGTH, js.length);
|
||||
return resp;
|
||||
}
|
||||
|
|
|
@ -29,6 +29,7 @@ import io.netty.handler.codec.http.HttpMethod;
|
|||
import io.netty.handler.codec.http.HttpRequest;
|
||||
import io.netty.handler.codec.http.QueryStringDecoder;
|
||||
import io.netty.handler.stream.ChunkedStream;
|
||||
import org.apache.commons.io.Charsets;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
|
@ -77,7 +78,8 @@ public class WebHdfsHandler extends SimpleChannelInboundHandler<HttpRequest> {
|
|||
public static final int WEBHDFS_PREFIX_LENGTH = WEBHDFS_PREFIX.length();
|
||||
public static final String APPLICATION_OCTET_STREAM =
|
||||
"application/octet-stream";
|
||||
public static final String APPLICATION_JSON = "application/json";
|
||||
public static final String APPLICATION_JSON_UTF8 =
|
||||
"application/json; charset=utf-8";
|
||||
|
||||
private final Configuration conf;
|
||||
private final Configuration confForCreate;
|
||||
|
@ -224,11 +226,11 @@ public class WebHdfsHandler extends SimpleChannelInboundHandler<HttpRequest> {
|
|||
} finally {
|
||||
IOUtils.cleanup(LOG, dfsclient);
|
||||
}
|
||||
final byte[] js = JsonUtil.toJsonString(checksum).getBytes();
|
||||
final byte[] js = JsonUtil.toJsonString(checksum).getBytes(Charsets.UTF_8);
|
||||
DefaultFullHttpResponse resp =
|
||||
new DefaultFullHttpResponse(HTTP_1_1, OK, Unpooled.wrappedBuffer(js));
|
||||
|
||||
resp.headers().set(CONTENT_TYPE, APPLICATION_JSON);
|
||||
resp.headers().set(CONTENT_TYPE, APPLICATION_JSON_UTF8);
|
||||
resp.headers().set(CONTENT_LENGTH, js.length);
|
||||
resp.headers().set(CONNECTION, CLOSE);
|
||||
ctx.writeAndFlush(resp).addListener(ChannelFutureListener.CLOSE);
|
||||
|
|
|
@ -48,8 +48,10 @@ import org.apache.hadoop.util.Tool;
|
|||
import org.apache.hadoop.util.ToolRunner;
|
||||
|
||||
import java.io.BufferedReader;
|
||||
import java.io.FileInputStream;
|
||||
import java.io.FileReader;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStreamReader;
|
||||
import java.net.URI;
|
||||
import java.text.DateFormat;
|
||||
import java.util.*;
|
||||
|
@ -579,7 +581,8 @@ public class Mover {
|
|||
|
||||
private static String[] readPathFile(String file) throws IOException {
|
||||
List<String> list = Lists.newArrayList();
|
||||
BufferedReader reader = new BufferedReader(new FileReader(file));
|
||||
BufferedReader reader = new BufferedReader(
|
||||
new InputStreamReader(new FileInputStream(file), "UTF-8"));
|
||||
try {
|
||||
String line;
|
||||
while ((line = reader.readLine()) != null) {
|
||||
|
|
|
@ -433,7 +433,6 @@ class FSDirRenameOp {
|
|||
} else {
|
||||
fsd.addLastINodeNoQuotaCheck(dstIIP, removedDst);
|
||||
}
|
||||
assert removedDst != null;
|
||||
if (removedDst.isReference()) {
|
||||
final INodeReference removedDstRef = removedDst.asReference();
|
||||
final INodeReference.WithCount wc = (INodeReference.WithCount)
|
||||
|
|
|
@ -19,6 +19,7 @@
|
|||
package org.apache.hadoop.hdfs.server.namenode;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
import org.apache.commons.io.Charsets;
|
||||
import org.apache.hadoop.fs.ContentSummary;
|
||||
import org.apache.hadoop.fs.DirectoryListingStartAfterNotFoundException;
|
||||
import org.apache.hadoop.fs.FileEncryptionInfo;
|
||||
|
@ -50,7 +51,7 @@ class FSDirStatAndListingOp {
|
|||
FSPermissionChecker pc = fsd.getPermissionChecker();
|
||||
byte[][] pathComponents = FSDirectory
|
||||
.getPathComponentsForReservedPath(srcArg);
|
||||
final String startAfterString = new String(startAfter);
|
||||
final String startAfterString = new String(startAfter, Charsets.UTF_8);
|
||||
final String src = fsd.resolvePath(pc, srcArg, pathComponents);
|
||||
final INodesInPath iip = fsd.getINodesInPath(src, true);
|
||||
|
||||
|
@ -195,8 +196,7 @@ class FSDirStatAndListingOp {
|
|||
cur.getLocalStoragePolicyID():
|
||||
BlockStoragePolicySuite.ID_UNSPECIFIED;
|
||||
listing[i] = createFileStatus(fsd, cur.getLocalNameBytes(), cur,
|
||||
needLocation, fsd.getStoragePolicyID(curPolicy,
|
||||
parentStoragePolicy), snapshot, isRawPath, inodesInPath);
|
||||
needLocation, fsd.getStoragePolicyID(curPolicy, parentStoragePolicy), snapshot, isRawPath, inodesInPath);
|
||||
listingCnt++;
|
||||
if (needLocation) {
|
||||
// Once we hit lsLimit locations, stop.
|
||||
|
|
|
@ -23,6 +23,7 @@ import java.io.InputStream;
|
|||
import java.io.RandomAccessFile;
|
||||
import java.util.Arrays;
|
||||
|
||||
import org.apache.commons.io.Charsets;
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hdfs.protocol.LayoutVersion.Feature;
|
||||
|
@ -32,7 +33,8 @@ import org.apache.hadoop.io.compress.CompressionCodec;
|
|||
|
||||
@InterfaceAudience.Private
|
||||
public final class FSImageUtil {
|
||||
public static final byte[] MAGIC_HEADER = "HDFSIMG1".getBytes();
|
||||
public static final byte[] MAGIC_HEADER =
|
||||
"HDFSIMG1".getBytes(Charsets.UTF_8);
|
||||
public static final int FILE_VERSION = 1;
|
||||
|
||||
public static boolean checkFileFormat(RandomAccessFile file)
|
||||
|
|
|
@ -299,7 +299,7 @@ public class FileJournalManager implements JournalManager {
|
|||
.matcher(name);
|
||||
if (staleInprogressEditsMatch.matches()) {
|
||||
try {
|
||||
long startTxId = Long.valueOf(staleInprogressEditsMatch.group(1));
|
||||
long startTxId = Long.parseLong(staleInprogressEditsMatch.group(1));
|
||||
ret.add(new EditLogFile(f, startTxId, HdfsConstants.INVALID_TXID,
|
||||
true));
|
||||
continue;
|
||||
|
|
|
@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs.server.namenode;
|
|||
import java.io.PrintStream;
|
||||
import java.io.PrintWriter;
|
||||
import java.io.StringWriter;
|
||||
import java.nio.charset.Charset;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
|
@ -769,8 +770,7 @@ public abstract class INode implements INodeAttributes, Diff.Element<byte[]> {
|
|||
|
||||
@VisibleForTesting
|
||||
public final void dumpTreeRecursively(PrintStream out) {
|
||||
dumpTreeRecursively(new PrintWriter(out, true), new StringBuilder(),
|
||||
Snapshot.CURRENT_STATE_ID);
|
||||
out.println(dumpTreeRecursively().toString());
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -969,10 +969,6 @@ class NameNodeRpcServer implements NamenodeProtocols {
|
|||
public DatanodeInfo[] getDatanodeReport(DatanodeReportType type)
|
||||
throws IOException {
|
||||
DatanodeInfo results[] = namesystem.datanodeReport(type);
|
||||
if (results == null ) {
|
||||
throw new IOException("Failed to get datanode report for " + type
|
||||
+ " datanodes.");
|
||||
}
|
||||
return results;
|
||||
}
|
||||
|
||||
|
@ -980,10 +976,6 @@ class NameNodeRpcServer implements NamenodeProtocols {
|
|||
public DatanodeStorageReport[] getDatanodeStorageReport(
|
||||
DatanodeReportType type) throws IOException {
|
||||
final DatanodeStorageReport[] reports = namesystem.getDatanodeStorageReport(type);
|
||||
if (reports == null ) {
|
||||
throw new IOException("Failed to get datanode storage report for " + type
|
||||
+ " datanodes.");
|
||||
}
|
||||
return reports;
|
||||
}
|
||||
|
||||
|
|
|
@ -638,10 +638,6 @@ public class NamenodeFsck implements DataEncryptionKeyFactory {
|
|||
}
|
||||
if (fos == null) {
|
||||
fos = dfs.create(target + "/" + chain, true);
|
||||
if (fos == null) {
|
||||
throw new IOException("Failed to copy " + fullName +
|
||||
" to /lost+found: could not store chain " + chain);
|
||||
}
|
||||
chain++;
|
||||
}
|
||||
|
||||
|
|
|
@ -100,7 +100,7 @@ public class XAttrPermissionFilter {
|
|||
static List<XAttr> filterXAttrsForApi(FSPermissionChecker pc,
|
||||
List<XAttr> xAttrs, boolean isRawPath) {
|
||||
assert xAttrs != null : "xAttrs can not be null";
|
||||
if (xAttrs == null || xAttrs.isEmpty()) {
|
||||
if (xAttrs.isEmpty()) {
|
||||
return xAttrs;
|
||||
}
|
||||
|
||||
|
|
|
@ -1476,7 +1476,7 @@ public class DFSAdmin extends FsShell {
|
|||
} else {
|
||||
out.print("FAILED: ");
|
||||
}
|
||||
out.printf("Change property %s\n\tFrom: \"%s\"\n\tTo: \"%s\"\n",
|
||||
out.printf("Change property %s%n\tFrom: \"%s\"%n\tTo: \"%s\"%n",
|
||||
result.getKey().prop, result.getKey().oldVal,
|
||||
result.getKey().newVal);
|
||||
if (result.getValue().isPresent()) {
|
||||
|
|
|
@ -144,7 +144,7 @@ class DelimitedImageVisitor extends TextWriterImageVisitor {
|
|||
|
||||
// Special case of file size, which is sum of the num bytes in each block
|
||||
if(element == ImageElement.NUM_BYTES)
|
||||
fileSize += Long.valueOf(value);
|
||||
fileSize += Long.parseLong(value);
|
||||
|
||||
if(elements.containsKey(element) && element != ImageElement.NUM_BYTES)
|
||||
elements.put(element, value);
|
||||
|
|
|
@ -17,11 +17,7 @@
|
|||
*/
|
||||
package org.apache.hadoop.hdfs.tools.offlineImageViewer;
|
||||
|
||||
import java.io.FileNotFoundException;
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import com.google.common.base.Charsets;
|
||||
import io.netty.buffer.ByteBuf;
|
||||
import io.netty.buffer.Unpooled;
|
||||
import io.netty.channel.ChannelFutureListener;
|
||||
|
@ -30,19 +26,31 @@ import io.netty.channel.SimpleChannelInboundHandler;
|
|||
import io.netty.channel.group.ChannelGroup;
|
||||
import io.netty.handler.codec.http.DefaultFullHttpResponse;
|
||||
import io.netty.handler.codec.http.DefaultHttpResponse;
|
||||
import static io.netty.handler.codec.http.HttpResponseStatus.*;
|
||||
|
||||
import io.netty.handler.codec.http.HttpMethod;
|
||||
import io.netty.handler.codec.http.HttpRequest;
|
||||
import io.netty.handler.codec.http.HttpResponseStatus;
|
||||
import static io.netty.handler.codec.http.HttpVersion.*;
|
||||
import io.netty.handler.codec.http.QueryStringDecoder;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.hdfs.web.JsonUtil;
|
||||
import org.apache.hadoop.hdfs.web.resources.ExceptionHandler;
|
||||
|
||||
import java.io.FileNotFoundException;
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import static io.netty.handler.codec.http.HttpHeaders.Names.CONNECTION;
|
||||
import static io.netty.handler.codec.http.HttpHeaders.Names.CONTENT_LENGTH;
|
||||
import static io.netty.handler.codec.http.HttpHeaders.Names.CONTENT_TYPE;
|
||||
import static io.netty.handler.codec.http.HttpHeaders.Values.CLOSE;
|
||||
import static io.netty.handler.codec.http.HttpResponseStatus.BAD_REQUEST;
|
||||
import static io.netty.handler.codec.http.HttpResponseStatus.INTERNAL_SERVER_ERROR;
|
||||
import static io.netty.handler.codec.http.HttpResponseStatus.METHOD_NOT_ALLOWED;
|
||||
import static io.netty.handler.codec.http.HttpResponseStatus.NOT_FOUND;
|
||||
import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1;
|
||||
import static org.apache.hadoop.hdfs.server.datanode.web.webhdfs.WebHdfsHandler.APPLICATION_JSON_UTF8;
|
||||
import static org.apache.hadoop.hdfs.server.datanode.web.webhdfs.WebHdfsHandler.WEBHDFS_PREFIX;
|
||||
import static org.apache.hadoop.hdfs.server.datanode.web.webhdfs.WebHdfsHandler.WEBHDFS_PREFIX_LENGTH;
|
||||
/**
|
||||
* Implement the read-only WebHDFS API for fsimage.
|
||||
*/
|
||||
|
@ -67,7 +75,7 @@ class FSImageHandler extends SimpleChannelInboundHandler<HttpRequest> {
|
|||
if (request.getMethod() != HttpMethod.GET) {
|
||||
DefaultHttpResponse resp = new DefaultHttpResponse(HTTP_1_1,
|
||||
METHOD_NOT_ALLOWED);
|
||||
resp.headers().set("Connection", "close");
|
||||
resp.headers().set(CONNECTION, CLOSE);
|
||||
ctx.write(resp).addListener(ChannelFutureListener.CLOSE);
|
||||
return;
|
||||
}
|
||||
|
@ -77,24 +85,29 @@ class FSImageHandler extends SimpleChannelInboundHandler<HttpRequest> {
|
|||
|
||||
final String content;
|
||||
String path = getPath(decoder);
|
||||
if ("GETFILESTATUS".equals(op)) {
|
||||
content = image.getFileStatus(path);
|
||||
} else if ("LISTSTATUS".equals(op)) {
|
||||
content = image.listStatus(path);
|
||||
} else if ("GETACLSTATUS".equals(op)) {
|
||||
content = image.getAclStatus(path);
|
||||
} else {
|
||||
throw new IllegalArgumentException("Invalid value for webhdfs parameter" + " \"op\"");
|
||||
switch (op) {
|
||||
case "GETFILESTATUS":
|
||||
content = image.getFileStatus(path);
|
||||
break;
|
||||
case "LISTSTATUS":
|
||||
content = image.listStatus(path);
|
||||
break;
|
||||
case "GETACLSTATUS":
|
||||
content = image.getAclStatus(path);
|
||||
break;
|
||||
default:
|
||||
throw new IllegalArgumentException(
|
||||
"Invalid value for webhdfs parameter" + " \"op\"");
|
||||
}
|
||||
|
||||
LOG.info("op=" + op + " target=" + path);
|
||||
|
||||
DefaultFullHttpResponse resp = new DefaultFullHttpResponse(
|
||||
HTTP_1_1, HttpResponseStatus.OK,
|
||||
Unpooled.wrappedBuffer(content.getBytes()));
|
||||
resp.headers().set("Content-Type", "application/json");
|
||||
resp.headers().set("Content-Length", resp.content().readableBytes());
|
||||
resp.headers().set("Connection", "close");
|
||||
Unpooled.wrappedBuffer(content.getBytes(Charsets.UTF_8)));
|
||||
resp.headers().set(CONTENT_TYPE, APPLICATION_JSON_UTF8);
|
||||
resp.headers().set(CONTENT_LENGTH, resp.content().readableBytes());
|
||||
resp.headers().set(CONNECTION, CLOSE);
|
||||
ctx.write(resp).addListener(ChannelFutureListener.CLOSE);
|
||||
}
|
||||
|
||||
|
@ -109,19 +122,19 @@ class FSImageHandler extends SimpleChannelInboundHandler<HttpRequest> {
|
|||
Exception e = cause instanceof Exception ? (Exception) cause : new
|
||||
Exception(cause);
|
||||
final String output = JsonUtil.toJsonString(e);
|
||||
ByteBuf content = Unpooled.wrappedBuffer(output.getBytes());
|
||||
ByteBuf content = Unpooled.wrappedBuffer(output.getBytes(Charsets.UTF_8));
|
||||
final DefaultFullHttpResponse resp = new DefaultFullHttpResponse(
|
||||
HTTP_1_1, INTERNAL_SERVER_ERROR, content);
|
||||
|
||||
resp.headers().set("Content-Type", "application/json");
|
||||
resp.headers().set(CONTENT_TYPE, APPLICATION_JSON_UTF8);
|
||||
if (e instanceof IllegalArgumentException) {
|
||||
resp.setStatus(BAD_REQUEST);
|
||||
} else if (e instanceof FileNotFoundException) {
|
||||
resp.setStatus(NOT_FOUND);
|
||||
}
|
||||
|
||||
resp.headers().set("Content-Length", resp.content().readableBytes());
|
||||
resp.headers().set("Connection", "close");
|
||||
resp.headers().set(CONTENT_LENGTH, resp.content().readableBytes());
|
||||
resp.headers().set(CONNECTION, CLOSE);
|
||||
ctx.write(resp).addListener(ChannelFutureListener.CLOSE);
|
||||
}
|
||||
|
||||
|
@ -134,11 +147,11 @@ class FSImageHandler extends SimpleChannelInboundHandler<HttpRequest> {
|
|||
private static String getPath(QueryStringDecoder decoder)
|
||||
throws FileNotFoundException {
|
||||
String path = decoder.path();
|
||||
if (path.startsWith("/webhdfs/v1/")) {
|
||||
return path.substring(11);
|
||||
if (path.startsWith(WEBHDFS_PREFIX)) {
|
||||
return path.substring(WEBHDFS_PREFIX_LENGTH);
|
||||
} else {
|
||||
throw new FileNotFoundException("Path: " + path + " should " +
|
||||
"start with \"/webhdfs/v1/\"");
|
||||
"start with " + WEBHDFS_PREFIX);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -111,17 +111,15 @@ class FSImageLoader {
|
|||
}
|
||||
|
||||
FsImageProto.FileSummary summary = FSImageUtil.loadSummary(file);
|
||||
FileInputStream fin = null;
|
||||
|
||||
try {
|
||||
|
||||
try (FileInputStream fin = new FileInputStream(file.getFD())) {
|
||||
// Map to record INodeReference to the referred id
|
||||
ImmutableList<Long> refIdList = null;
|
||||
String[] stringTable = null;
|
||||
byte[][] inodes = null;
|
||||
Map<Long, long[]> dirmap = null;
|
||||
|
||||
fin = new FileInputStream(file.getFD());
|
||||
|
||||
ArrayList<FsImageProto.FileSummary.Section> sections =
|
||||
Lists.newArrayList(summary.getSectionsList());
|
||||
Collections.sort(sections,
|
||||
|
@ -169,8 +167,6 @@ class FSImageLoader {
|
|||
}
|
||||
}
|
||||
return new FSImageLoader(stringTable, inodes, dirmap);
|
||||
} finally {
|
||||
IOUtils.cleanup(null, fin);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -21,7 +21,7 @@ import java.io.BufferedInputStream;
|
|||
import java.io.FileInputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.io.PrintWriter;
|
||||
import java.io.PrintStream;
|
||||
import java.io.RandomAccessFile;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
|
@ -30,7 +30,6 @@ import org.apache.hadoop.hdfs.server.namenode.FSImageFormatProtobuf.SectionName;
|
|||
import org.apache.hadoop.hdfs.server.namenode.FSImageUtil;
|
||||
import org.apache.hadoop.hdfs.server.namenode.FsImageProto.FileSummary;
|
||||
import org.apache.hadoop.hdfs.server.namenode.FsImageProto.INodeSection;
|
||||
import org.apache.hadoop.io.IOUtils;
|
||||
import org.apache.hadoop.util.LimitInputStream;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
|
@ -67,7 +66,7 @@ final class FileDistributionCalculator {
|
|||
private final Configuration conf;
|
||||
private final long maxSize;
|
||||
private final int steps;
|
||||
private final PrintWriter out;
|
||||
private final PrintStream out;
|
||||
|
||||
private final int[] distribution;
|
||||
private int totalFiles;
|
||||
|
@ -77,7 +76,7 @@ final class FileDistributionCalculator {
|
|||
private long maxFileSize;
|
||||
|
||||
FileDistributionCalculator(Configuration conf, long maxSize, int steps,
|
||||
PrintWriter out) {
|
||||
PrintStream out) {
|
||||
this.conf = conf;
|
||||
this.maxSize = maxSize == 0 ? MAX_SIZE_DEFAULT : maxSize;
|
||||
this.steps = steps == 0 ? INTERVAL_DEFAULT : steps;
|
||||
|
@ -96,9 +95,7 @@ final class FileDistributionCalculator {
|
|||
}
|
||||
|
||||
FileSummary summary = FSImageUtil.loadSummary(file);
|
||||
FileInputStream in = null;
|
||||
try {
|
||||
in = new FileInputStream(file.getFD());
|
||||
try (FileInputStream in = new FileInputStream(file.getFD())) {
|
||||
for (FileSummary.Section s : summary.getSectionsList()) {
|
||||
if (SectionName.fromString(s.getName()) != SectionName.INODE) {
|
||||
continue;
|
||||
|
@ -111,8 +108,6 @@ final class FileDistributionCalculator {
|
|||
run(is);
|
||||
output();
|
||||
}
|
||||
} finally {
|
||||
IOUtils.cleanup(null, in);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -159,10 +159,10 @@ class FileDistributionVisitor extends TextWriterImageVisitor {
|
|||
current.path = (value.equals("") ? "/" : value);
|
||||
break;
|
||||
case REPLICATION:
|
||||
current.replication = Integer.valueOf(value);
|
||||
current.replication = Integer.parseInt(value);
|
||||
break;
|
||||
case NUM_BYTES:
|
||||
current.fileSize += Long.valueOf(value);
|
||||
current.fileSize += Long.parseLong(value);
|
||||
break;
|
||||
default:
|
||||
break;
|
||||
|
|
|
@ -135,7 +135,7 @@ class LsImageVisitor extends TextWriterImageVisitor {
|
|||
perms = value;
|
||||
break;
|
||||
case REPLICATION:
|
||||
replication = Integer.valueOf(value);
|
||||
replication = Integer.parseInt(value);
|
||||
break;
|
||||
case USER_NAME:
|
||||
username = value;
|
||||
|
@ -144,7 +144,7 @@ class LsImageVisitor extends TextWriterImageVisitor {
|
|||
group = value;
|
||||
break;
|
||||
case NUM_BYTES:
|
||||
filesize += Long.valueOf(value);
|
||||
filesize += Long.parseLong(value);
|
||||
break;
|
||||
case MODIFICATION_TIME:
|
||||
modTime = value;
|
||||
|
@ -173,6 +173,6 @@ class LsImageVisitor extends TextWriterImageVisitor {
|
|||
if(element == ImageElement.INODE)
|
||||
newLine();
|
||||
else if (element == ImageElement.BLOCKS)
|
||||
numBlocks = Integer.valueOf(value);
|
||||
numBlocks = Integer.parseInt(value);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -18,9 +18,8 @@
|
|||
package org.apache.hadoop.hdfs.tools.offlineImageViewer;
|
||||
|
||||
import java.io.EOFException;
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.io.PrintWriter;
|
||||
import java.io.PrintStream;
|
||||
import java.io.RandomAccessFile;
|
||||
|
||||
import org.apache.commons.cli.CommandLine;
|
||||
|
@ -33,7 +32,6 @@ import org.apache.commons.logging.Log;
|
|||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.io.IOUtils;
|
||||
import org.apache.hadoop.net.NetUtils;
|
||||
|
||||
/**
|
||||
|
@ -144,36 +142,33 @@ public class OfflineImageViewerPB {
|
|||
String processor = cmd.getOptionValue("p", "Web");
|
||||
String outputFile = cmd.getOptionValue("o", "-");
|
||||
|
||||
PrintWriter out = outputFile.equals("-") ?
|
||||
new PrintWriter(System.out) : new PrintWriter(new File(outputFile));
|
||||
|
||||
Configuration conf = new Configuration();
|
||||
try {
|
||||
if (processor.equals("FileDistribution")) {
|
||||
long maxSize = Long.parseLong(cmd.getOptionValue("maxSize", "0"));
|
||||
int step = Integer.parseInt(cmd.getOptionValue("step", "0"));
|
||||
new FileDistributionCalculator(conf, maxSize, step, out)
|
||||
.visit(new RandomAccessFile(inputFile, "r"));
|
||||
} else if (processor.equals("XML")) {
|
||||
new PBImageXmlWriter(conf, out).visit(new RandomAccessFile(inputFile,
|
||||
"r"));
|
||||
} else if (processor.equals("Web")) {
|
||||
String addr = cmd.getOptionValue("addr", "localhost:5978");
|
||||
WebImageViewer viewer = new WebImageViewer(NetUtils.createSocketAddr
|
||||
(addr));
|
||||
try {
|
||||
viewer.start(inputFile);
|
||||
} finally {
|
||||
viewer.close();
|
||||
}
|
||||
try (PrintStream out = outputFile.equals("-") ?
|
||||
System.out : new PrintStream(outputFile, "UTF-8")) {
|
||||
switch (processor) {
|
||||
case "FileDistribution":
|
||||
long maxSize = Long.parseLong(cmd.getOptionValue("maxSize", "0"));
|
||||
int step = Integer.parseInt(cmd.getOptionValue("step", "0"));
|
||||
new FileDistributionCalculator(conf, maxSize, step, out).visit(
|
||||
new RandomAccessFile(inputFile, "r"));
|
||||
break;
|
||||
case "XML":
|
||||
new PBImageXmlWriter(conf, out).visit(
|
||||
new RandomAccessFile(inputFile, "r"));
|
||||
break;
|
||||
case "Web":
|
||||
String addr = cmd.getOptionValue("addr", "localhost:5978");
|
||||
try (WebImageViewer viewer = new WebImageViewer(
|
||||
NetUtils.createSocketAddr(addr))) {
|
||||
viewer.start(inputFile);
|
||||
}
|
||||
break;
|
||||
}
|
||||
return 0;
|
||||
} catch (EOFException e) {
|
||||
System.err.println("Input file ended unexpectedly. Exiting");
|
||||
} catch (IOException e) {
|
||||
System.err.println("Encountered exception. Exiting: " + e.getMessage());
|
||||
} finally {
|
||||
IOUtils.cleanup(null, out);
|
||||
}
|
||||
return -1;
|
||||
}
|
||||
|
|
|
@ -21,7 +21,7 @@ import java.io.BufferedInputStream;
|
|||
import java.io.FileInputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.io.PrintWriter;
|
||||
import java.io.PrintStream;
|
||||
import java.io.RandomAccessFile;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
|
@ -50,7 +50,6 @@ import org.apache.hadoop.hdfs.server.namenode.FsImageProto.SnapshotDiffSection;
|
|||
import org.apache.hadoop.hdfs.server.namenode.FsImageProto.SnapshotSection;
|
||||
import org.apache.hadoop.hdfs.server.namenode.FsImageProto.StringTableSection;
|
||||
import org.apache.hadoop.hdfs.util.XMLUtils;
|
||||
import org.apache.hadoop.io.IOUtils;
|
||||
import org.apache.hadoop.util.LimitInputStream;
|
||||
|
||||
import com.google.common.collect.Lists;
|
||||
|
@ -62,10 +61,10 @@ import com.google.common.collect.Lists;
|
|||
@InterfaceAudience.Private
|
||||
public final class PBImageXmlWriter {
|
||||
private final Configuration conf;
|
||||
private final PrintWriter out;
|
||||
private final PrintStream out;
|
||||
private String[] stringTable;
|
||||
|
||||
public PBImageXmlWriter(Configuration conf, PrintWriter out) {
|
||||
public PBImageXmlWriter(Configuration conf, PrintStream out) {
|
||||
this.conf = conf;
|
||||
this.out = out;
|
||||
}
|
||||
|
@ -76,9 +75,7 @@ public final class PBImageXmlWriter {
|
|||
}
|
||||
|
||||
FileSummary summary = FSImageUtil.loadSummary(file);
|
||||
FileInputStream fin = null;
|
||||
try {
|
||||
fin = new FileInputStream(file.getFD());
|
||||
try (FileInputStream fin = new FileInputStream(file.getFD())) {
|
||||
out.print("<?xml version=\"1.0\"?>\n<fsimage>");
|
||||
|
||||
ArrayList<FileSummary.Section> sections = Lists.newArrayList(summary
|
||||
|
@ -140,8 +137,6 @@ public final class PBImageXmlWriter {
|
|||
}
|
||||
}
|
||||
out.print("</fsimage>\n");
|
||||
} finally {
|
||||
IOUtils.cleanup(null, fin);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -1265,11 +1265,11 @@ public class TestEncryptionZones {
|
|||
}
|
||||
|
||||
// Run the XML OIV processor
|
||||
StringWriter output = new StringWriter();
|
||||
PrintWriter pw = new PrintWriter(output);
|
||||
ByteArrayOutputStream output = new ByteArrayOutputStream();
|
||||
PrintStream pw = new PrintStream(output);
|
||||
PBImageXmlWriter v = new PBImageXmlWriter(new Configuration(), pw);
|
||||
v.visit(new RandomAccessFile(originalFsimage, "r"));
|
||||
final String xml = output.getBuffer().toString();
|
||||
final String xml = output.toString();
|
||||
SAXParser parser = SAXParserFactory.newInstance().newSAXParser();
|
||||
parser.parse(new InputSource(new StringReader(xml)), new DefaultHandler());
|
||||
}
|
||||
|
|
|
@ -25,15 +25,15 @@ import static org.junit.Assert.fail;
|
|||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.io.PrintWriter;
|
||||
import java.io.PrintStream;
|
||||
import java.io.RandomAccessFile;
|
||||
import java.io.StringWriter;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.EnumSet;
|
||||
import java.util.HashMap;
|
||||
import java.util.Random;
|
||||
|
||||
import org.apache.commons.io.output.NullOutputStream;
|
||||
import org.apache.commons.logging.impl.Log4JLogger;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FSDataInputStream;
|
||||
|
@ -57,6 +57,7 @@ import org.apache.hadoop.hdfs.server.namenode.INodeDirectory;
|
|||
import org.apache.hadoop.hdfs.server.namenode.snapshot.SnapshotTestHelper.TestDirectoryTree;
|
||||
import org.apache.hadoop.hdfs.server.namenode.snapshot.SnapshotTestHelper.TestDirectoryTree.Node;
|
||||
import org.apache.hadoop.hdfs.tools.offlineImageViewer.PBImageXmlWriter;
|
||||
import org.apache.hadoop.io.IOUtils;
|
||||
import org.apache.hadoop.ipc.RemoteException;
|
||||
import org.apache.hadoop.test.GenericTestUtils;
|
||||
import org.apache.hadoop.util.Time;
|
||||
|
@ -256,8 +257,7 @@ public class TestSnapshot {
|
|||
FSImageTestUtil.getFSImage(
|
||||
cluster.getNameNode()).getStorage().getStorageDir(0));
|
||||
assertNotNull("Didn't generate or can't find fsimage", originalFsimage);
|
||||
StringWriter output = new StringWriter();
|
||||
PrintWriter o = new PrintWriter(output);
|
||||
PrintStream o = new PrintStream(NullOutputStream.NULL_OUTPUT_STREAM);
|
||||
PBImageXmlWriter v = new PBImageXmlWriter(new Configuration(), o);
|
||||
v.visit(new RandomAccessFile(originalFsimage, "r"));
|
||||
}
|
||||
|
|
|
@ -20,18 +20,18 @@ package org.apache.hadoop.hdfs.tools.offlineImageViewer;
|
|||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.File;
|
||||
import java.io.FileInputStream;
|
||||
import java.io.FileOutputStream;
|
||||
import java.io.FileWriter;
|
||||
import java.io.IOException;
|
||||
import java.io.PrintStream;
|
||||
import java.io.PrintWriter;
|
||||
import java.io.RandomAccessFile;
|
||||
import java.io.StringReader;
|
||||
import java.io.StringWriter;
|
||||
import java.net.HttpURLConnection;
|
||||
import java.net.URI;
|
||||
import java.net.URISyntaxException;
|
||||
import java.net.URL;
|
||||
import java.util.Collections;
|
||||
import java.util.Comparator;
|
||||
|
@ -43,6 +43,7 @@ import javax.xml.parsers.ParserConfigurationException;
|
|||
import javax.xml.parsers.SAXParser;
|
||||
import javax.xml.parsers.SAXParserFactory;
|
||||
|
||||
import org.apache.commons.io.output.NullOutputStream;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
|
@ -186,10 +187,10 @@ public class TestOfflineImageViewer {
|
|||
@Test(expected = IOException.class)
|
||||
public void testTruncatedFSImage() throws IOException {
|
||||
File truncatedFile = folder.newFile();
|
||||
StringWriter output = new StringWriter();
|
||||
PrintStream output = new PrintStream(NullOutputStream.NULL_OUTPUT_STREAM);
|
||||
copyPartOfFile(originalFsimage, truncatedFile);
|
||||
new FileDistributionCalculator(new Configuration(), 0, 0, new PrintWriter(
|
||||
output)).visit(new RandomAccessFile(truncatedFile, "r"));
|
||||
new FileDistributionCalculator(new Configuration(), 0, 0, output)
|
||||
.visit(new RandomAccessFile(truncatedFile, "r"));
|
||||
}
|
||||
|
||||
private void copyPartOfFile(File src, File dest) throws IOException {
|
||||
|
@ -208,20 +209,21 @@ public class TestOfflineImageViewer {
|
|||
|
||||
@Test
|
||||
public void testFileDistributionCalculator() throws IOException {
|
||||
StringWriter output = new StringWriter();
|
||||
PrintWriter o = new PrintWriter(output);
|
||||
ByteArrayOutputStream output = new ByteArrayOutputStream();
|
||||
PrintStream o = new PrintStream(output);
|
||||
new FileDistributionCalculator(new Configuration(), 0, 0, o)
|
||||
.visit(new RandomAccessFile(originalFsimage, "r"));
|
||||
o.close();
|
||||
|
||||
String outputString = output.toString();
|
||||
Pattern p = Pattern.compile("totalFiles = (\\d+)\n");
|
||||
Matcher matcher = p.matcher(output.getBuffer());
|
||||
Matcher matcher = p.matcher(outputString);
|
||||
assertTrue(matcher.find() && matcher.groupCount() == 1);
|
||||
int totalFiles = Integer.parseInt(matcher.group(1));
|
||||
assertEquals(NUM_DIRS * FILES_PER_DIR, totalFiles);
|
||||
|
||||
p = Pattern.compile("totalDirectories = (\\d+)\n");
|
||||
matcher = p.matcher(output.getBuffer());
|
||||
matcher = p.matcher(outputString);
|
||||
assertTrue(matcher.find() && matcher.groupCount() == 1);
|
||||
int totalDirs = Integer.parseInt(matcher.group(1));
|
||||
// totalDirs includes root directory, empty directory, and xattr directory
|
||||
|
@ -236,7 +238,7 @@ public class TestOfflineImageViewer {
|
|||
}
|
||||
});
|
||||
p = Pattern.compile("maxFileSize = (\\d+)\n");
|
||||
matcher = p.matcher(output.getBuffer());
|
||||
matcher = p.matcher(output.toString("UTF-8"));
|
||||
assertTrue(matcher.find() && matcher.groupCount() == 1);
|
||||
assertEquals(maxFile.getLen(), Long.parseLong(matcher.group(1)));
|
||||
}
|
||||
|
@ -252,13 +254,13 @@ public class TestOfflineImageViewer {
|
|||
@Test
|
||||
public void testPBImageXmlWriter() throws IOException, SAXException,
|
||||
ParserConfigurationException {
|
||||
StringWriter output = new StringWriter();
|
||||
PrintWriter o = new PrintWriter(output);
|
||||
ByteArrayOutputStream output = new ByteArrayOutputStream();
|
||||
PrintStream o = new PrintStream(output);
|
||||
PBImageXmlWriter v = new PBImageXmlWriter(new Configuration(), o);
|
||||
v.visit(new RandomAccessFile(originalFsimage, "r"));
|
||||
SAXParserFactory spf = SAXParserFactory.newInstance();
|
||||
SAXParser parser = spf.newSAXParser();
|
||||
final String xml = output.getBuffer().toString();
|
||||
final String xml = output.toString();
|
||||
parser.parse(new InputSource(new StringReader(xml)), new DefaultHandler());
|
||||
}
|
||||
|
||||
|
@ -298,7 +300,7 @@ public class TestOfflineImageViewer {
|
|||
verifyHttpResponseCode(HttpURLConnection.HTTP_NOT_FOUND, url);
|
||||
|
||||
// LISTSTATUS operation to a invalid prefix
|
||||
url = new URL("http://localhost:" + port + "/webhdfs/v1?op=LISTSTATUS");
|
||||
url = new URL("http://localhost:" + port + "/foo");
|
||||
verifyHttpResponseCode(HttpURLConnection.HTTP_NOT_FOUND, url);
|
||||
|
||||
// GETFILESTATUS operation
|
||||
|
|
Loading…
Reference in New Issue