HDFS-3571. Allow EditLogFileInputStream to read from a remote URL. Contributed by Todd Lipcon.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1355174 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Todd Lipcon 2012-06-28 21:46:12 +00:00
parent 9255cb83c8
commit 1d54e2b331
4 changed files with 225 additions and 24 deletions

View File

@ -97,6 +97,8 @@ Trunk (unreleased changes)
BlockPlacementPolicyDefault extensible for reusing code in subclasses. BlockPlacementPolicyDefault extensible for reusing code in subclasses.
(Junping Du via szetszwo) (Junping Du via szetszwo)
HDFS-3571. Allow EditLogFileInputStream to read from a remote URL (todd)
OPTIMIZATIONS OPTIMIZATIONS
BUG FIXES BUG FIXES

View File

@ -18,27 +18,37 @@
package org.apache.hadoop.hdfs.server.namenode; package org.apache.hadoop.hdfs.server.namenode;
import java.io.BufferedInputStream;
import java.io.DataInputStream;
import java.io.EOFException;
import java.io.File; import java.io.File;
import java.io.FileInputStream; import java.io.FileInputStream;
import java.io.IOException; import java.io.IOException;
import java.io.BufferedInputStream; import java.io.InputStream;
import java.io.EOFException; import java.net.HttpURLConnection;
import java.io.DataInputStream; import java.net.URL;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.server.common.Storage; import org.apache.hadoop.hdfs.server.common.Storage;
import org.apache.hadoop.hdfs.server.namenode.TransferFsImage.HttpGetFailedException;
import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.security.SecurityUtil;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import com.google.common.base.Throwables; import com.google.common.base.Throwables;
/** /**
* An implementation of the abstract class {@link EditLogInputStream}, which * An implementation of the abstract class {@link EditLogInputStream}, which
* reads edits from a local file. * reads edits from a file. That file may be either on the local disk or
* accessible via a URL.
*/ */
@InterfaceAudience.Private
public class EditLogFileInputStream extends EditLogInputStream { public class EditLogFileInputStream extends EditLogInputStream {
private final File file; private final LogSource log;
private final long firstTxId; private final long firstTxId;
private final long lastTxId; private final long lastTxId;
private final boolean isInProgress; private final boolean isInProgress;
@ -48,7 +58,7 @@ public class EditLogFileInputStream extends EditLogInputStream {
CLOSED CLOSED
} }
private State state = State.UNINIT; private State state = State.UNINIT;
private FileInputStream fStream = null; private InputStream fStream = null;
private int logVersion = 0; private int logVersion = 0;
private FSEditLogOp.Reader reader = null; private FSEditLogOp.Reader reader = null;
private FSEditLogLoader.PositionTrackingInputStream tracker = null; private FSEditLogLoader.PositionTrackingInputStream tracker = null;
@ -81,7 +91,29 @@ public class EditLogFileInputStream extends EditLogInputStream {
*/ */
public EditLogFileInputStream(File name, long firstTxId, long lastTxId, public EditLogFileInputStream(File name, long firstTxId, long lastTxId,
boolean isInProgress) { boolean isInProgress) {
this.file = name; this(new FileLog(name), firstTxId, lastTxId, isInProgress);
}
/**
* Open an EditLogInputStream for the given URL.
*
* @param url the url hosting the log
* @param startTxId the expected starting txid
* @param endTxId the expected ending txid
* @param inProgress whether the log is in-progress
* @return a stream from which edits may be read
*/
public static EditLogInputStream fromUrl(URL url, long startTxId,
long endTxId, boolean inProgress) {
return new EditLogFileInputStream(new URLLog(url),
startTxId, endTxId, inProgress);
}
private EditLogFileInputStream(LogSource log,
long firstTxId, long lastTxId,
boolean isInProgress) {
this.log = log;
this.firstTxId = firstTxId; this.firstTxId = firstTxId;
this.lastTxId = lastTxId; this.lastTxId = lastTxId;
this.isInProgress = isInProgress; this.isInProgress = isInProgress;
@ -91,7 +123,7 @@ public class EditLogFileInputStream extends EditLogInputStream {
Preconditions.checkState(state == State.UNINIT); Preconditions.checkState(state == State.UNINIT);
BufferedInputStream bin = null; BufferedInputStream bin = null;
try { try {
fStream = new FileInputStream(file); fStream = log.getInputStream();
bin = new BufferedInputStream(fStream); bin = new BufferedInputStream(fStream);
tracker = new FSEditLogLoader.PositionTrackingInputStream(bin); tracker = new FSEditLogLoader.PositionTrackingInputStream(bin);
dataIn = new DataInputStream(tracker); dataIn = new DataInputStream(tracker);
@ -122,7 +154,7 @@ public class EditLogFileInputStream extends EditLogInputStream {
@Override @Override
public String getName() { public String getName() {
return file.getPath(); return log.getName();
} }
private FSEditLogOp nextOpImpl(boolean skipBrokenEdits) throws IOException { private FSEditLogOp nextOpImpl(boolean skipBrokenEdits) throws IOException {
@ -160,7 +192,7 @@ public class EditLogFileInputStream extends EditLogInputStream {
// we were supposed to read out of the stream. // we were supposed to read out of the stream.
// So we force an EOF on all subsequent reads. // So we force an EOF on all subsequent reads.
// //
long skipAmt = file.length() - tracker.getPos(); long skipAmt = log.length() - tracker.getPos();
if (skipAmt > 0) { if (skipAmt > 0) {
LOG.warn("skipping " + skipAmt + " bytes at the end " + LOG.warn("skipping " + skipAmt + " bytes at the end " +
"of edit log '" + getName() + "': reached txid " + txId + "of edit log '" + getName() + "': reached txid " + txId +
@ -220,7 +252,7 @@ public class EditLogFileInputStream extends EditLogInputStream {
@Override @Override
public long length() throws IOException { public long length() throws IOException {
// file size + size of both buffers // file size + size of both buffers
return file.length(); return log.length();
} }
@Override @Override
@ -291,4 +323,85 @@ public class EditLogFileInputStream extends EditLogInputStream {
super(msg); super(msg);
} }
} }
private interface LogSource {
public InputStream getInputStream() throws IOException;
public long length();
public String getName();
}
private static class FileLog implements LogSource {
private final File file;
public FileLog(File file) {
this.file = file;
}
@Override
public InputStream getInputStream() throws IOException {
return new FileInputStream(file);
}
@Override
public long length() {
return file.length();
}
@Override
public String getName() {
return file.getPath();
}
}
private static class URLLog implements LogSource {
private final URL url;
private long advertisedSize = -1;
private final static String CONTENT_LENGTH = "Content-Length";
public URLLog(URL url) {
this.url = url;
}
@Override
public InputStream getInputStream() throws IOException {
HttpURLConnection connection = (HttpURLConnection)
SecurityUtil.openSecureHttpConnection(url);
if (connection.getResponseCode() != HttpURLConnection.HTTP_OK) {
throw new HttpGetFailedException(
"Fetch of " + url +
" failed with status code " + connection.getResponseCode() +
"\nResponse message:\n" + connection.getResponseMessage(),
connection);
}
String contentLength = connection.getHeaderField(CONTENT_LENGTH);
if (contentLength != null) {
advertisedSize = Long.parseLong(contentLength);
if (advertisedSize <= 0) {
throw new IOException("Invalid " + CONTENT_LENGTH + " header: " +
contentLength);
}
} else {
throw new IOException(CONTENT_LENGTH + " header is not provided " +
"by the server when trying to fetch " + url);
}
return connection.getInputStream();
}
@Override
public long length() {
Preconditions.checkState(advertisedSize != -1,
"must get input stream before length is available");
return advertisedSize;
}
@Override
public String getName() {
return url.toString();
}
}
} }

View File

@ -229,27 +229,33 @@ public abstract class FSImageTestUtil {
*/ */
public static EnumMap<FSEditLogOpCodes,Holder<Integer>> countEditLogOpTypes( public static EnumMap<FSEditLogOpCodes,Holder<Integer>> countEditLogOpTypes(
File editLog) throws Exception { File editLog) throws Exception {
EnumMap<FSEditLogOpCodes, Holder<Integer>> opCounts =
new EnumMap<FSEditLogOpCodes, Holder<Integer>>(FSEditLogOpCodes.class);
EditLogInputStream elis = new EditLogFileInputStream(editLog); EditLogInputStream elis = new EditLogFileInputStream(editLog);
try { try {
FSEditLogOp op; return countEditLogOpTypes(elis);
while ((op = elis.readOp()) != null) {
Holder<Integer> i = opCounts.get(op.opCode);
if (i == null) {
i = new Holder<Integer>(0);
opCounts.put(op.opCode, i);
}
i.held++;
}
} finally { } finally {
IOUtils.closeStream(elis); IOUtils.closeStream(elis);
} }
return opCounts;
} }
/**
* @see #countEditLogOpTypes(File)
*/
public static EnumMap<FSEditLogOpCodes, Holder<Integer>> countEditLogOpTypes(
EditLogInputStream elis) throws IOException {
EnumMap<FSEditLogOpCodes, Holder<Integer>> opCounts =
new EnumMap<FSEditLogOpCodes, Holder<Integer>>(FSEditLogOpCodes.class);
FSEditLogOp op;
while ((op = elis.readOp()) != null) {
Holder<Integer> i = opCounts.get(op.opCode);
if (i == null) {
i = new Holder<Integer>(0);
opCounts.put(op.opCode, i);
}
i.held++;
}
return opCounts;
}
/** /**
* Assert that all of the given directories have the same newest filename * Assert that all of the given directories have the same newest filename

View File

@ -0,0 +1,80 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.namenode;
import static org.junit.Assert.*;
import java.io.IOException;
import java.io.OutputStream;
import java.net.URL;
import java.util.EnumMap;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.util.Holder;
import org.apache.hadoop.http.HttpServer;
import org.junit.Test;
public class TestEditLogFileInputStream {
private static final byte[] FAKE_LOG_DATA = TestEditLog.HADOOP20_SOME_EDITS;
@Test
public void testReadURL() throws Exception {
// Start a simple web server which hosts the log data.
HttpServer server = new HttpServer("test", "0.0.0.0", 0, true);
server.start();
try {
server.addServlet("fakeLog", "/fakeLog", FakeLogServlet.class);
URL url = new URL("http://localhost:" + server.getPort() + "/fakeLog");
EditLogInputStream elis = EditLogFileInputStream.fromUrl(
url, HdfsConstants.INVALID_TXID, HdfsConstants.INVALID_TXID,
false);
// Read the edit log and verify that we got all of the data.
EnumMap<FSEditLogOpCodes, Holder<Integer>> counts =
FSImageTestUtil.countEditLogOpTypes(elis);
assertEquals(1L, (long)counts.get(FSEditLogOpCodes.OP_ADD).held);
assertEquals(1L, (long)counts.get(FSEditLogOpCodes.OP_SET_GENSTAMP).held);
assertEquals(1L, (long)counts.get(FSEditLogOpCodes.OP_CLOSE).held);
// Check that length header was picked up.
assertEquals(FAKE_LOG_DATA.length, elis.length());
elis.close();
} finally {
server.stop();
}
}
@SuppressWarnings("serial")
public static class FakeLogServlet extends HttpServlet {
@Override
public void doGet(HttpServletRequest request,
HttpServletResponse response
) throws ServletException, IOException {
response.setHeader("Content-Length",
String.valueOf(FAKE_LOG_DATA.length));
OutputStream out = response.getOutputStream();
out.write(FAKE_LOG_DATA);
out.close();
}
}
}